# Read Input

In [None]:
import os
import kagglehub
from kagglehub import KaggleDatasetAdapter
import polars as pl

def read_snapshot(filename: str) -> pl.LazyFrame:
    lf = kagglehub.dataset_load(
      KaggleDatasetAdapter.POLARS,
      "sirwerto/travian-map-snapshots",
      filename)
    return lf

snapshots = []
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        snapshots.append(filename)



lf_snapshots = pl.concat([read_snapshot(snapshot) for snapshot in snapshots])

In [None]:
def join_snapshots_with_server_dimension(
    lf_snapshots: pl.LazyFrame, lf_server: pl.LazyFrame
) -> pl.LazyFrame:
    date_condition = (pl.col("ingestion_date") >= pl.col("start_date")) & (
        (pl.col("end_date").is_null())
        | (pl.col("ingestion_date") <= pl.col("end_date"))
    )

    return lf_snapshots.with_columns(
        ingestion_date=pl.col("ingestion_datetime").dt.date()
    ).join_where(
        lf_server, [pl.col("subdomain") == pl.col("subdomain_right"), date_condition]
    )


In [None]:
def join_snapshots_with_village_dimension(
    lf_snapshots: pl.LazyFrame, lf_village: pl.LazyFrame
) -> pl.LazyFrame:
    date_condition = (pl.col("ingestion_date") >= pl.col("start_date")) & (
        (pl.col("end_date").is_null())
        | (pl.col("ingestion_date") <= pl.col("end_date"))
    )

    return lf_snapshots.with_columns(
        ingestion_date=pl.col("ingestion_datetime").dt.date()
    ).join_where(
        lf_village, [
            pl.col("server_id") == pl.col("server_id_right"),
            pl.col("village_travian_id") == pl.col("village_travian_id_right"),
            date_condition]
    )


# Create Tables

## Server dimension table

In [None]:
w = {
    "partition_by": "subdomain",
    "order_by": "ingestion_date",
    "descending": False
}

MAX_DAYS_GAP_TO_CONSIDER_A_NEW_SERVER = 7
(
    lf_snapshots
    .select(
        "subdomain",
        pl.col("ingestion_datetime").dt.date().alias("ingestion_date")
    )
    .unique()
    .with_columns(ingestion_date_shifted=pl.col("ingestion_date").shift().over(**w))
    .with_columns(
        date_diff=(pl.col("ingestion_date") - pl.col("ingestion_date_shifted")).dt.total_days()
    )
    .filter((pl.col("date_diff") >= MAX_DAYS_GAP_TO_CONSIDER_A_NEW_SERVER) | (pl.col("date_diff").is_null()))
    .with_columns(
        start_date = pl.col("ingestion_date"),
        end_date = pl.col("ingestion_date").shift().over(**w),
    )
    .with_columns(
        has_finished = pl.col("end_date").is_not_null(),
        server_id = pl.col("subdomain").rank(method="ordinal").over(partition_by=pl.lit(1), order_by=["ingestion_date", "subdomain"], descending=False),
        speed = pl.col("subdomain").str.extract(r"\.x(\d)\.", 1).cast(pl.UInt8)
                 
    )
    .select("server_id", "subdomain", "ingestion_date", "start_date", "end_date", "has_finished", "speed")
    .sort("server_id")
    .sink_parquet("server_dimension.parquet") 
)
lf_server = pl.scan_parquet("server_dimension.parquet")

## Village dimension table

In [None]:
# Generate village_id
lf_village_id = (
    lf_snapshots
    .pipe(join_snapshots_with_server_dimension, lf_server=lf_server)
    .rename({"village_id": "village_travian_id"})
    .group_by("server_id", "village_travian_id")
    .agg(
        first_appereance = pl.col("ingestion_date").sort_by("ingestion_date", descending=False).first()
    )
    .with_columns(
        village_id= pl.col("village_travian_id").rank(method="ordinal").over(partition_by=pl.lit(1), order_by=["first_appereance", "server_id", "village_travian_id"], descending=False),
    )

)
#lf_village_id.show_graph()
#lf_village_id.collect()

In [None]:
w = {
    "partition_by": "village_id",
    "order_by": "ingestion_date",
    "descending": False
}

(
    lf_snapshots
    .pipe(join_snapshots_with_server_dimension, lf_server=lf_server)
    .rename({"village_id": "village_travian_id"})
    .join(
        other=lf_village_id,
        how="inner",
        on=["server_id", "village_travian_id"]
    )

    .with_columns(
        tribe_id_shifted = pl.col("tribe_id").shift().over(**w),
        is_capital_shifted = pl.col("is_capital").shift().over(**w),
        is_city_shifted = pl.col("is_city").shift().over(**w),
        has_harbor_shifted = pl.col("has_harbor").shift().over(**w),
    )
    .with_columns(
        # SCD type 2 https://www.kimballgroup.com/data-warehouse-business-intelligence-resources/kimball-techniques/dimensional-modeling-techniques/type-2/
        has_changed_any_dimension_attributes = pl.when(
            (pl.col("tribe_id_shifted").is_null())
            | (pl.col("tribe_id") != pl.col("tribe_id_shifted"))
            | (pl.col("is_capital") != pl.col("is_capital_shifted"))
            | (pl.col("is_city") != pl.col("is_city_shifted"))
            | (pl.col("has_harbor") != pl.col("has_harbor_shifted"))
        ).then(pl.lit(True)).otherwise(pl.lit(False))
    )
    .filter(pl.col("has_changed_any_dimension_attributes") == True)
    .with_columns(
        start_date = pl.col("ingestion_date"),
        end_date = pl.col("ingestion_date").shift(-1).over(**w).dt.offset_by("-1d"),
        village_key = pl.col("village_id").rank(method="ordinal").over(partition_by=pl.lit(1), order_by=["ingestion_date", "village_id"], descending=False)
    )
    .select(
        "server_id",
        "village_id",
        "village_travian_id",
        pl.col("village_name").alias("current_name"),
        "ingestion_date",               
        "grid_position",
        "x_position",
        "y_position",
        "region",
        "village_key",
        "start_date",
        "end_date",
        "tribe_id",
        "is_city",
        "is_capital",
        "has_harbor",

    )
    .sink_parquet("village_dimension.parquet", compression="zstd", compression_level=22)
)
del lf_village_id
lf_village = pl.scan_parquet("village_dimension.parquet")
#lf_village.collect()

## Player dimension table

In [None]:
# Generate player_id
lf_player_id = (
    lf_snapshots
    .pipe(join_snapshots_with_server_dimension, lf_server=lf_server)
    .rename({"player_id": "player_travian_id"})
    .group_by("server_id", "player_travian_id")
    .agg(
        first_appereance = pl.col("ingestion_date").sort_by("ingestion_date", descending=False).first()
    )
    .with_columns(
        player_id= pl.col("player_travian_id").rank(method="ordinal").over(partition_by=pl.lit(1), order_by=["first_appereance", "server_id", "player_travian_id"], descending=False),
    )
)

In [None]:
(
    lf_snapshots
    .pipe(join_snapshots_with_server_dimension, lf_server=lf_server)
    .rename({
        "village_id": "village_travian_id",
        "player_id": "player_travian_id"
            })
    .join(
        other=lf_player_id,
        how="inner",
        on=["server_id", "player_travian_id"]
    )
    .group_by("player_id")
    .agg(
        player_travian_id=pl.col("player_travian_id").first(),
        server_id=pl.col("server_id").first(),
        current_name=pl.col("player_name").sort_by("ingestion_date", descending=True).first(),
        original_tribe_id=pl.col("tribe_id").sort_by(["ingestion_date", "village_travian_id"], descending=[False, False]).first()
    )
    .sink_parquet("player_dimension.parquet")
)

del lf_player_id
lf_player = pl.scan_parquet("player_dimension.parquet")
#lf_player.collect()

## Alliance dimension table

In [None]:
# Generate alliance_id
lf_alliance_id = (
    lf_snapshots
    .pipe(join_snapshots_with_server_dimension, lf_server=lf_server)
    .rename({"alliance_id": "alliance_travian_id"})
    .group_by("server_id", "alliance_travian_id")
    .agg(
        first_appereance = pl.col("ingestion_date").sort_by("ingestion_date", descending=False).first()
    )
    .with_columns(
        alliance_id= pl.col("alliance_travian_id").rank(method="ordinal").over(partition_by=pl.lit(1), order_by=["first_appereance", "server_id", "alliance_travian_id"], descending=False),
    )
)

In [None]:
(
    lf_snapshots
    .pipe(join_snapshots_with_server_dimension, lf_server=lf_server)
    .rename({"alliance_id": "alliance_travian_id"})
    .join(
        other=lf_alliance_id,
        how="inner",
        on=["server_id", "alliance_travian_id"]
    )
    .group_by("alliance_id")
    .agg(
        alliance_travian_id=pl.col("alliance_travian_id").first(),
        server_id=pl.col("server_id").first(),
        current_name=pl.col("alliance_name").sort_by("ingestion_date", descending=True).first()
    )
    .with_columns(
        is_placeholder_alliance=pl.when((pl.col("alliance_travian_id") == 0) & (pl.col("current_name") == "")).then(pl.lit(True)).otherwise(pl.lit(False))
    )
    .with_columns(
        current_name = pl.when(pl.col("is_placeholder_alliance") == False).then(pl.col("current_name")).otherwise(pl.lit("NO_ALLIANCE"))
    )
    .sink_parquet("alliance_dimension.parquet")
)
del lf_alliance_id
lf_alliance = pl.scan_parquet("alliance_dimension.parquet")
#lf_alliance.collect()

## Tribe dimension table

In [None]:
UNKOWN_TRIBE = "unknown_tribe"
(
    lf_snapshots
    .select("tribe_id")
    .unique()
    .with_columns(
        pl
        .when(pl.col("tribe_id") == 1).then(pl.lit("romans"))
        .when(pl.col("tribe_id") == 2).then(pl.lit("teutons"))
        .when(pl.col("tribe_id") == 3).then(pl.lit("gauls"))
        .when(pl.col("tribe_id") == 4).then(pl.lit("nature"))
        .when(pl.col("tribe_id") == 5).then(pl.lit("natars"))
        .when(pl.col("tribe_id") == 6).then(pl.lit("egyptians"))
        .when(pl.col("tribe_id") == 7).then(pl.lit("huns"))
        .when(pl.col("tribe_id") == 8).then(pl.lit("spartans"))
        .otherwise(pl.lit(UNKOWN_TRIBE))
        .alias("tribe_name")
    )
    .select(
        "tribe_id",
        pl.when(pl.col("tribe_name") == UNKOWN_TRIBE).then(pl.concat_str([pl.col("tribe_name"), pl.lit("_"), pl.col("tribe_id")])).otherwise(pl.col("tribe_name"))
    )
    .sort("tribe_id")
    .sink_parquet("tribe_dimension.parquet")
)
lf_tribe = pl.scan_parquet("tribe_dimension.parquet")
#lf_tribe.collect()

## Daily village snapshot fact table

In [None]:
(
    lf_snapshots
    .drop("village_name", "player_name", "alliance_name", "grid_position", "x_position", "y_position", "region", "is_capital", "is_city", "has_harbor", "victory_points")
    .pipe(join_snapshots_with_server_dimension, lf_server=lf_server.select("server_id", "subdomain", "start_date", "end_date"))
    .drop("subdomain_right","start_date","end_date")
    .rename({
        "village_id": "village_travian_id",
        "player_id": "player_travian_id",
        "alliance_id": "alliance_travian_id"
    })
    .pipe(join_snapshots_with_village_dimension, lf_village.select("server_id", "village_id", "village_travian_id", "village_key", "start_date", "end_date"))
    .drop("village_travian_id", "village_id", "start_date", "end_date")
    .join(
        other=lf_player,
        how="inner",
        on=["server_id", "player_travian_id"]
    )
    .drop("player_travian_id", "current_name")
    .join(
        other=lf_alliance,
        how="inner",
        on=["server_id", "alliance_travian_id"]
    )
    .drop("alliance_travian_id", "current_name", "is_placeholder_alliance")
    .with_columns(ingestion_time=pl.col("ingestion_datetime").dt.time())
    .drop("subdomain", "ingestion_datetime")
    .select(
        "server_id",
        "village_key",
        "player_id",
        "alliance_id",
        "population",
        "ingestion_date",
        "ingestion_time"        
    )
    .sink_parquet("daily_village_snapshot.parquet", compression="zstd", compression_level=22)
)
lf_daily_village_snapshot = pl.scan_parquet("daily_village_snapshot.parquet")
#lf_daily_village_snapshot.collect()
#lf_daily_village_snapshot.collect().shape

## Remove the server_id from the village, player and alliance dimension tables

In [None]:
lf_village.drop("server_id").collect().write_parquet("village_dimension.parquet")
lf_player.drop("server_id").collect().write_parquet("player_dimension.parquet")
lf_alliance.drop("server_id").collect().write_parquet("alliance_dimension.parquet")

lf_village = pl.scan_parquet("village_dimension.parquet")
lf_player = pl.scan_parquet("player_dimension.parquet")
lf_alliance = pl.scan_parquet("alliance_dimension.parquet")

## Daily player snapshot fact table

w = {
    "partition_by": "player_id",
    "order_by": "ingestion_date",
    "descending": False
}

(
    lf_daily_village_snapshot
    .join(
        lf_village,
        on="village_key"
    )
    .group_by("player_id", "ingestion_date")
    .agg(
        server_id=pl.col("server_id").first(),
        alliance_id=pl.col("alliance_id").first(),
        number_of_villages=pl.col("village_id").len(),
        number_of_cities=pl.col("is_city").sum(),
        number_of_harbors=pl.col("has_harbor").sum(),
        total_population=pl.col("population").sum(),
        median_population=pl.col("population").median(),
        max_population=pl.col("population").max(),
        min_population=pl.col("population").min(),
        q1_population=pl.col("population").quantile(0.1),
        q2_population=pl.col("population").quantile(0.2),
        q3_population=pl.col("population").quantile(0.3),
        q4_population=pl.col("population").quantile(0.4),
        q5_population=pl.col("population").quantile(0.5),
        q6_population=pl.col("population").quantile(0.6),
        q7_population=pl.col("population").quantile(0.7),
        q8_population=pl.col("population").quantile(0.8),
        q9_population=pl.col("population").quantile(0.9),
    )
    .with_columns(population_increase=pl.col("total_population").diff().over(**w))
    .with_columns(population_increase_speed=pl.col("population_increase").diff().over(**w))
    .sink_parquet("daily_player_snapshot.parquet", compression="zstd", compression_level=22)   
)
lf_daily_player_snapshot = pl.scan_parquet("daily_player_snapshot.parquet")
#lf_daily_player_snapshot.collect()

# Upload Tables

# Push to Kaggle

In [None]:
metadata = """
{
  "title": "Travian base start schema",
  "subtitle": "Travian map.sql modeled as a start schema.",
  "description": "A start schema generated from the map.sql files provide by Travian. All the tables are generated in a daily basis. Each table is a dimension table or a fact table. Special remark, I have generated unique ids for village, player, and alliance to be able to merge without the help of server_id to the daily_village_snapshot_fact_table",
  "id": "sirwerto/travian-base-start-schema",
  "licenses": [
    {
      "name": "CC0-1.0"
    }
  ],
  "resources": [
    {
      "path": "village_dimension.parquet",
      "description": "Table keeping a registry of all the villages in Travian and all the changes in these villages using a Slowly Changing Dimension Type 2.",
      "schema": {
        "fields": [
          {
            "name": "village_id",
            "description": "Internal surrogate identifier for the village dimension record.",
            "type": "int"
          },
          {
            "name": "village_travian_id",
            "description": "Original Travian village identifier as provided by the game.",
            "type": "int"
          },
          {
            "name": "current_name",
            "description": "Current name of the village at the time of the record.",
            "type": "string"
          },
          {
            "name": "ingestion_date",
            "description": "Date when this record was ingested into the data pipeline.",
            "type": "string"
          },
          {
            "name": "grid_position",
            "description": "Unique grid position identifier on the Travian map.",
            "type": "int"
          },
          {
            "name": "x_position",
            "description": "X coordinate of the village on the Travian map.",
            "type": "int"
          },
          {
            "name": "y_position",
            "description": "Y coordinate of the village on the Travian map.",
            "type": "int"
          },
          {
            "name": "region",
            "description": "Region of the Travian map where the village is located.",
            "type": "string"
          },
          {
            "name": "village_key",
            "description": "Business key used to track village history across SCD Type 2 records.",
            "type": "int"
          },
          {
            "name": "start_date",
            "description": "Date when this version of the village record became valid.",
            "type": "string"
          },
          {
            "name": "end_date",
            "description": "Date when this version of the village record stopped being valid.",
            "type": "string"
          },
          {
            "name": "tribe_id",
            "description": "Travian tribe identifier owning the village.",
            "type": "int"
          },
          {
            "name": "is_city",
            "description": "Flag indicating whether the village is classified as a city.",
            "type": "int"
          },
          {
            "name": "is_capital",
            "description": "Flag indicating whether the village is the player's capital.",
            "type": "int"
          },
          {
            "name": "has_harbor",
            "description": "Flag indicating whether the village has a harbor.",
            "type": "int"
          }
        ]
      }
    },
    {
      "path": "server_dimension.parquet",
      "description": "Dimension table describing Travian game servers and their lifecycle.",
      "schema": {
        "fields": [
          {
            "name": "server_id",
            "description": "Internal surrogate identifier for the server.",
            "type": "int"
          },
          {
            "name": "subdomain",
            "description": "Server subdomain used to access the Travian game world.",
            "type": "string"
          },
          {
            "name": "ingestion_date",
            "description": "Date when the server record was ingested.",
            "type": "string"
          },
          {
            "name": "start_date",
            "description": "Date when the server started.",
            "type": "string"
          },
          {
            "name": "end_date",
            "description": "Date when the server ended or is expected to end.",
            "type": "string"
          },
          {
            "name": "has_finished",
            "description": "Flag indicating whether the server has finished.",
            "type": "int"
          },
          {
            "name": "speed",
            "description": "Game speed multiplier of the server.",
            "type": "int"
          }
        ]
      }
    },
    {
      "path": "player_dimension.parquet",
      "description": "Dimension table tracking players and their core identifying attributes.",
      "schema": {
        "fields": [
          {
            "name": "player_id",
            "description": "Internal surrogate identifier for the player.",
            "type": "int"
          },
          {
            "name": "player_travian_id",
            "description": "Original Travian player identifier.",
            "type": "int"
          },
          {
            "name": "current_name",
            "description": "Current in-game name of the player.",
            "type": "string"
          },
          {
            "name": "original_tribe_id",
            "description": "Tribe identifier chosen by the player at the start of the server.",
            "type": "int"
          }
        ]
      }
    },
    {
      "path": "alliance_dimension.parquet",
      "description": "Dimension table describing alliances and their identifying properties.",
      "schema": {
        "fields": [
          {
            "name": "alliance_id",
            "description": "Internal surrogate identifier for the alliance.",
            "type": "int"
          },
          {
            "name": "alliance_travian_id",
            "description": "Original Travian alliance identifier.",
            "type": "int"
          },
          {
            "name": "current_name",
            "description": "Current name of the alliance.",
            "type": "string"
          },
          {
            "name": "is_placeholder_alliance",
            "description": "Flag indicating whether the alliance is a placeholder. For example, 'NO_ALLIANCE' is a place holder for player without alliance",
            "type": "int"
          }
        ]
      }
    },
    {
      "path": "tribe_dimension.parquet",
      "description": "Dimension table listing all Travian tribes.",
      "schema": {
        "fields": [
          {
            "name": "tribe_id",
            "description": "Travian tribe identifier.",
            "type": "int"
          },
          {
            "name": "tribe_name",
            "description": "Name of the Travian tribe.",
            "type": "string"
          }
        ]
      }
    },
    {
      "path": "daily_village_snapshot.parquet",
      "description": "Daily snapshot fact table capturing village state and ownership.",
      "schema": {
        "fields": [
          {
            "name": "server_id",
            "description": "Identifier of the server where the village exists.",
            "type": "int"
          },
          {
            "name": "village_key",
            "description": "Business key identifying the village across time.",
            "type": "int"
          },
          {
            "name": "player_id",
            "description": "Identifier of the player owning the village at snapshot time.",
            "type": "int"
          },
          {
            "name": "alliance_id",
            "description": "Identifier of the alliance associated with the village owner.",
            "type": "int"
          },
          {
            "name": "population",
            "description": "Population of the village at snapshot time.",
            "type": "int"
          },
          {
            "name": "ingestion_date",
            "description": "Date when the snapshot was ingested.",
            "type": "string"
          },
          {
            "name": "ingestion_time",
            "description": "Time when the snapshot was ingested.",
            "type": "string"
          }
        ]
      }
    }
  ],
  "keywords": ["Games", "Beginner", "Intermediate", "Advanced"]
}

"""

In [None]:
with open("dataset-metadata.json", "w") as text_file:
    text_file.write(metadata)

In [None]:
! mkdir -p /kaggle/working/to_upload
! mv /kaggle/working/*.parquet /kaggle/working/to_upload

In [None]:
from datetime import datetime
datetime.now().isoformat()
kagglehub.dataset_upload("sirwerto/travian-base-start-schema", "/kaggle/working/to_upload", version_notes=datetime.now().isoformat())