In [None]:
# Notebook: Silver to Gold Aggregation
# Purpose: Read data from Silver layer and aggregate it into Gold layer

from pyspark.sql.functions import *
from pyspark.sql.window import Window
import os

storage_account = "dota2lakehousenew"
container = "data"
storage_account_key = os.environ.get("AZURE_STORAGE_KEY")

spark.conf.set(
    f"fs.azure.account.key.{storage_account}.dfs.core.windows.net",
    storage_account_key
)

SILVER_PATH = f"abfss://{container}@{storage_account}.dfs.core.windows.net/silver"
GOLD_PATH = f"abfss://{container}@{storage_account}.dfs.core.windows.net/gold"

print("Silver to Gold aggregation started")

In [None]:
print("\nLoading data from Silver layer...")

df_matches = spark.read.format("delta").load(f"{SILVER_PATH}/cleaned_matches")
df_players = spark.read.format("delta").load(f"{SILVER_PATH}/cleaned_players")
df_picks_bans = spark.read.format("delta").load(f"{SILVER_PATH}/cleaned_picks_bans")

print(f"Matches: {df_matches.count():,} rows")
print(f"Players: {df_players.count():,} rows")
print(f"Picks/Bans: {df_picks_bans.count():,} rows")

In [None]:
print("\nCreating Player Statistics...")

df_players_valid = df_players.filter(col("is_outlier") == False)

df_player_stats = df_players_valid \
    .filter(col("account_id").isNotNull()) \
    .groupBy("account_id") \
    .agg(
        count("match_id").alias("total_matches"),
        sum("win").alias("total_wins"),
        round(avg("kills"), 2).alias("avg_kills"),
        round(avg("deaths"), 2).alias("avg_deaths"),
        round(avg("assists"), 2).alias("avg_assists"),
        round(avg("kda_calculated"), 2).alias("avg_kda"),
        sum("kills").alias("total_kills"),
        sum("deaths").alias("total_deaths"),
        sum("assists").alias("total_assists"),
        round(avg("gold_per_min"), 2).alias("avg_gpm"),
        round(avg("xp_per_min"), 2).alias("avg_xpm"),
        round(avg("hero_damage"), 0).alias("avg_hero_damage"),
        round(avg("tower_damage"), 0).alias("avg_tower_damage"),
        round(avg("last_hits"), 2).alias("avg_last_hits"),
        countDistinct("hero_id").alias("unique_heroes_played"),
        first("personaname").alias("last_known_name"),
        max("rank_tier").alias("highest_rank")
    )

df_player_stats = df_player_stats \
    .withColumn("win_rate", round(col("total_wins") / col("total_matches") * 100, 2)) \
    .withColumn("calculated_at", current_timestamp())

window_kda = Window.orderBy(desc("avg_kda"))
window_winrate = Window.orderBy(desc("win_rate"))

df_player_stats = df_player_stats \
    .withColumn("kda_rank", row_number().over(window_kda)) \
    .withColumn("winrate_rank", row_number().over(window_winrate))

print(f"Player stats created: {df_player_stats.count():,} unique players")
df_player_stats.select("account_id", "total_matches", "avg_kda", "win_rate", "kda_rank").show(10)

In [None]:
print("\nCreating Hero Statistics (Meta Analysis)...")

df_hero_performance = df_players_valid \
    .groupBy("hero_id") \
    .agg(
        count("match_id").alias("times_played"),
        sum("win").alias("wins"),
        round(avg("kills"), 2).alias("avg_kills"),
        round(avg("deaths"), 2).alias("avg_deaths"),
        round(avg("assists"), 2).alias("avg_assists"),
        round(avg("kda_calculated"), 2).alias("avg_kda"),
        round(avg("gold_per_min"), 2).alias("avg_gpm"),
        round(avg("xp_per_min"), 2).alias("avg_xpm"),
        round(avg("hero_damage"), 0).alias("avg_hero_damage"),
        round(avg("tower_damage"), 0).alias("avg_tower_damage")
    ) \
    .withColumn("win_rate", round(col("wins") / col("times_played") * 100, 2))

df_pick_stats = df_picks_bans \
    .filter(col("is_pick") == True) \
    .groupBy("hero_id") \
    .agg(count("*").alias("pick_count"))

df_ban_stats = df_picks_bans \
    .filter(col("is_pick") == False) \
    .groupBy("hero_id") \
    .agg(count("*").alias("ban_count"))

df_hero_stats = df_hero_performance \
    .join(df_pick_stats, "hero_id", "left") \
    .join(df_ban_stats, "hero_id", "left") \
    .fillna({"pick_count": 0, "ban_count": 0}) \
    .withColumn("total_presence", col("pick_count") + col("ban_count")) \
    .withColumn("calculated_at", current_timestamp())

total_matches = df_matches.count()
df_hero_stats = df_hero_stats \
    .withColumn("presence_rate", round(col("total_presence") / (total_matches * 2) * 100, 2)) \
    .withColumn("meta_tier",
        when(col("presence_rate") > 50, "S-Tier")
        .when(col("presence_rate") > 30, "A-Tier")
        .when(col("presence_rate") > 15, "B-Tier")
        .when(col("presence_rate") > 5, "C-Tier")
        .otherwise("D-Tier")
    )

print(f"Hero stats created: {df_hero_stats.count()} heroes")
df_hero_stats.orderBy(desc("presence_rate")).select(
    "hero_id", "times_played", "win_rate", "pick_count", "ban_count", "presence_rate", "meta_tier"
).show(10)

In [None]:
print("\nCreating Match Analytics...")

df_daily_stats = df_matches \
    .groupBy("match_date") \
    .agg(
        count("match_id").alias("match_count"),
        round(avg("duration_minutes"), 2).alias("avg_duration"),
        round(avg("total_kills"), 2).alias("avg_total_kills"),
        sum(when(col("radiant_win") == True, 1).otherwise(0)).alias("radiant_wins"),
        sum(when(col("radiant_win") == False, 1).otherwise(0)).alias("dire_wins"),
        sum(when(col("is_stomp") == True, 1).otherwise(0)).alias("stomp_games")
    ) \
    .withColumn("radiant_win_rate", round(col("radiant_wins") / col("match_count") * 100, 2)) \
    .orderBy("match_date")

print(f"Daily stats: {df_daily_stats.count()} days of data")

df_duration_analysis = df_matches \
    .withColumn("duration_bucket",
        when(col("duration_minutes") < 25, "Very Short (<25min)")
        .when(col("duration_minutes") < 35, "Short (25-35min)")
        .when(col("duration_minutes") < 45, "Medium (35-45min)")
        .when(col("duration_minutes") < 55, "Long (45-55min)")
        .otherwise("Very Long (>55min)")
    ) \
    .groupBy("duration_bucket") \
    .agg(
        count("match_id").alias("match_count"),
        round(avg("total_kills"), 2).alias("avg_kills"),
        sum(when(col("radiant_win") == True, 1).otherwise(0)).alias("radiant_wins")
    ) \
    .withColumn("radiant_win_rate", round(col("radiant_wins") / col("match_count") * 100, 2))

print("\nDuration Analysis:")
df_duration_analysis.show()

In [None]:
print("\nCreating ML Feature Table...")

df_ml_features = df_players_valid \
    .join(
        df_matches.select(
            "match_id", "duration_minutes", "match_hour", 
            "match_day_of_week", "radiant_win", "game_mode", "region"
        ),
        "match_id",
        "inner"
    ) \
    .select(
        "match_id", "account_id", "hero_id",
        "kills", "deaths", "assists", "kda_calculated",
        "gold_per_min", "xp_per_min", "hero_damage", "tower_damage",
        "last_hits", "denies", "level",
        "lane", "lane_role",
        "duration_minutes", "match_hour", "match_day_of_week",
        "game_mode", "region",
        "win"
    )

print(f"ML features created: {df_ml_features.count():,} rows")
print(f"Feature columns: {len(df_ml_features.columns)}")
df_ml_features.printSchema()

In [None]:
print("\nWriting to Gold Layer...")

df_player_stats.write.format("delta").mode("overwrite").save(f"{GOLD_PATH}/player_stats")
print("Player stats written")

df_hero_stats.write.format("delta").mode("overwrite").save(f"{GOLD_PATH}/hero_stats")
print("Hero stats written")

df_daily_stats.write.format("delta").mode("overwrite").save(f"{GOLD_PATH}/daily_stats")
print("Daily stats written")

df_ml_features.write.format("delta").mode("overwrite").save(f"{GOLD_PATH}/ml_features")
print("ML features written")

print("\nGold layer complete")

In [None]:
print("\nExporting CSV samples from Gold Layer...")

from pyspark.sql.functions import rand
from datetime import datetime

SAMPLES_PATH = f"{GOLD_PATH}/samples"
timestamp = datetime.now().strftime("%Y%m%d")

print("Exporting all player stats...")
player_count = df_player_stats.count()
player_stats_file = f"{SAMPLES_PATH}/gold_player_stats_complete_{player_count}rows_{timestamp}.csv"
df_player_stats.coalesce(1).write.mode("overwrite").option("header", "true").csv(player_stats_file)
print(f"  Written {player_count:,} rows")

print("Exporting all hero stats...")
hero_count = df_hero_stats.count()
hero_stats_file = f"{SAMPLES_PATH}/gold_hero_stats_complete_{hero_count}rows_{timestamp}.csv"
df_hero_stats.coalesce(1).write.mode("overwrite").option("header", "true").csv(hero_stats_file)
print(f"  Written {hero_count:,} rows")

print("Exporting all daily stats...")
daily_count = df_daily_stats.count()
daily_stats_file = f"{SAMPLES_PATH}/gold_daily_stats_complete_{daily_count}rows_{timestamp}.csv"
df_daily_stats.coalesce(1).write.mode("overwrite").option("header", "true").csv(daily_stats_file)
print(f"  Written {daily_count:,} rows")

print("Exporting stratified ML features sample...")
ML_SAMPLE_SIZE = 5000
total_ml_count = df_ml_features.count()

if total_ml_count > ML_SAMPLE_SIZE:
    wins_sample = df_ml_features.filter(col("win") == 1).orderBy(rand(seed=42)).limit(ML_SAMPLE_SIZE // 2)
    loses_sample = df_ml_features.filter(col("win") == 0).orderBy(rand(seed=42)).limit(ML_SAMPLE_SIZE // 2)
    ml_sample = wins_sample.union(loses_sample).orderBy(rand(seed=42))
    print(f"  Stratified: {ML_SAMPLE_SIZE // 2} wins + {ML_SAMPLE_SIZE // 2} losses")
else:
    ml_sample = df_ml_features
    ML_SAMPLE_SIZE = total_ml_count
    print(f"  Using all {total_ml_count:,} rows")

ml_features_file = f"{SAMPLES_PATH}/gold_ml_features_sample_{ML_SAMPLE_SIZE}rows_{timestamp}.csv"
ml_sample.coalesce(1).write.mode("overwrite").option("header", "true").csv(ml_features_file)
print(f"  Written {ML_SAMPLE_SIZE:,} rows")

print("\nGold samples exported successfully")
print(f"Location: {SAMPLES_PATH}")

In [None]:
print("\nCreating SQL tables...")

spark.sql("CREATE DATABASE IF NOT EXISTS esports_gold")

spark.sql(f"""
    CREATE TABLE IF NOT EXISTS esports_gold.player_stats
    USING DELTA
    LOCATION '{GOLD_PATH}/player_stats'
""")

spark.sql(f"""
    CREATE TABLE IF NOT EXISTS esports_gold.hero_stats
    USING DELTA
    LOCATION '{GOLD_PATH}/hero_stats'
""")

spark.sql(f"""
    CREATE TABLE IF NOT EXISTS esports_gold.daily_stats
    USING DELTA
    LOCATION '{GOLD_PATH}/daily_stats'
""")

spark.sql(f"""
    CREATE TABLE IF NOT EXISTS esports_gold.ml_features
    USING DELTA
    LOCATION '{GOLD_PATH}/ml_features'
""")

print("SQL tables created in 'esports_gold' database")

spark.sql("SHOW TABLES IN esports_gold").show()

print("\nCSV Samples created:")
try:
    sample_files = dbutils.fs.ls(f"{GOLD_PATH}/samples")
    for f in sample_files:
        if f.name.endswith('.csv/'):
            print(f"  {f.name}")
except Exception as e:
    print(f"  (No samples directory yet)")