# 01 - Bronze Ingestion

In [None]:
import os

# Config
os.environ["AZURE_TENANT_ID"] = "id"
os.environ["AZURE_CLIENT_ID"] = "id"
os.environ["AZURE_CLIENT_SECRET"] = "id"

os.environ["STEAM__API_KEY"] = "id"
os.environ["FABRIC__WORKSPACE_ID"] = "id"
os.environ["FABRIC__BRONZE_LAKEHOUSE_ID"] = "id"
os.environ["FABRIC__SILVER_LAKEHOUSE_ID"] = "id"
os.environ["FABRIC__GOLD_LAKEHOUSE_ID"] = "id"

import sys
sys.path.insert(0, "/lakehouse/default/Files")

from src.steam_analytics.config import Settings
settings = Settings()

BRONZE_PATH = settings.fabric.bronze_abfss_path
CATALOG_PATH = f"{BRONZE_PATH}/Tables/game_catalog"

print(f"Bronze Path: {BRONZE_PATH}")
print(f"Catalog Path: {CATALOG_PATH}")

In [None]:
# Load Catalog with Staleness Check
from pyspark.sql import functions as F
from src.steam_analytics.catalog import GameCatalogManager, SyncPriority
from datetime import datetime, timedelta

print("=" * 60)
print("LOADING CATALOG")
print("=" * 60)

catalog_df = spark.read.format("delta").load(CATALOG_PATH)
total_in_catalog = catalog_df.count()
print(f"Total games in catalog: {total_in_catalog:,}")

# Determine priorities based on day of week
manager = GameCatalogManager()
now = datetime.utcnow()
day_of_week = now.weekday()
day_names = ["Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"]

priorities = manager.get_sync_schedule(day_of_week)
priority_values = [p.value for p in priorities]

print(f"\nToday: {day_names[day_of_week]}")
print(f"Priorities to sync: {priority_values}")

# Filter by priority
apps_df = catalog_df.filter(F.col("priority").isin(priority_values))

stale_conditions = (
    # Never synced
    (F.col("last_synced_at").isNull()) |
    # HIGH: stale after 1 day
    ((F.col("priority") == "high") & 
     (F.col("last_synced_at") < F.lit(now - timedelta(days=1)))) |
    # MEDIUM: stale after 7 days
    ((F.col("priority") == "medium") & 
     (F.col("last_synced_at") < F.lit(now - timedelta(days=7)))) |
    # LOW: stale after 30 days
    ((F.col("priority") == "low") & 
     (F.col("last_synced_at") < F.lit(now - timedelta(days=30))))
)

apps_to_sync_df = apps_df.filter(stale_conditions)

# Get app IDs
APP_IDS = [row.app_id for row in apps_to_sync_df.select("app_id").collect()]

print(f"\nGames to sync (stale): {len(APP_IDS):,}")

# Breakdown
print("\nBreakdown:")
apps_to_sync_df.groupBy("priority").count().orderBy("priority").show()

In [None]:
import asyncio
from src.steam_analytics.ingestion.orchestrator import IngestionOrchestrator, OutputTarget

print("=" * 60)
print("RUNNING INGESTION")
print("=" * 60)
print(f"Games to process: {len(APP_IDS):,}")

orchestrator = IngestionOrchestrator(target=OutputTarget.ONELAKE)
result = await orchestrator.run(APP_IDS)

In [None]:
from delta.tables import DeltaTable
from pyspark.sql.types import StructType, StructField, IntegerType, TimestampType
from datetime import datetime, timezone, timedelta

print("=" * 60)
print("UPDATING CATALOG (DELTA MERGE)")
print("=" * 60)

# Get successfully synced app IDs
synced_ids = APP_IDS  # O usa result.processed_ids si tu orquestador lo devuelve

if synced_ids:
    now = datetime.now(timezone.utc)
    
    # 1. Crear un DataFrame pequeño SOLO con las actualizaciones (Updates)
    # Esto es mucho más rápido que leer todo el catálogo de 150k filas
    updates_data = [{"app_id": app_id, "last_synced_at": now} for app_id in synced_ids]
    
    updates_schema = StructType([
        StructField("app_id", IntegerType(), False),
        StructField("last_synced_at", TimestampType(), True)
    ])
    
    updates_df = spark.createDataFrame(updates_data, schema=updates_schema)
    
    # 2. Instanciar la tabla Delta destino
    target_table = DeltaTable.forPath(spark, CATALOG_PATH)
    
    # 3. Ejecutar el MERGE
    (target_table.alias("target")
        .merge(
            updates_df.alias("source"),
            "target.app_id = source.app_id"
        )
        .whenMatchedUpdate(set={
            "last_synced_at": "source.last_synced_at"
        })
        .execute()
    )
    
    print(f"Efficiently merged updates for {len(synced_ids):,} games")

else:
    print("No updates to merge.")

In [None]:
print("=" * 60)
print("UPDATING PRIORITIES FROM FRESH DATA")
print("=" * 60)

# Read latest player counts from Bronze
player_stats_path = f"{BRONZE_PATH}/Tables/raw_steam_player_stats"

try:
    # Get today's player counts
    today_str = datetime.utcnow().strftime("%Y-%m-%d")
    
    latest_players = spark.read.format("delta").load(player_stats_path) \
        .filter(F.col("ingestion_date") == today_str) \
        .select("app_id", "player_count") \
        .dropDuplicates(["app_id"])
    
    fresh_count = latest_players.count()
    print(f"Fresh player counts available: {fresh_count:,}")
    
    if fresh_count > 0:
        # Read current catalog
        catalog_df = spark.read.format("delta").load(CATALOG_PATH)
        
        # Join with fresh player counts
        updated_catalog = catalog_df.alias("c").join(
            latest_players.alias("p"),
            F.col("c.app_id") == F.col("p.app_id"),
            "left"
        ).select(
            F.col("c.app_id"),
            F.col("c.name"),
            # Use fresh player_count if available, else keep old
            F.coalesce(F.col("p.player_count"), F.col("c.player_count")).alias("player_count"),
            F.col("c.discovered_at"),
            F.col("c.last_synced_at"),
            # Recalculate priority based on new player count
            F.when(F.coalesce(F.col("p.player_count"), F.col("c.player_count")) >= 1000, "high")
             .when(F.coalesce(F.col("p.player_count"), F.col("c.player_count")) >= 100, "medium")
             .when(F.coalesce(F.col("p.player_count"), F.col("c.player_count")) >= 1, "low")
             .otherwise("skip").alias("priority")
        )
        
        # Check for priority changes
        priority_changes = updated_catalog.alias("new").join(
            catalog_df.alias("old"),
            F.col("new.app_id") == F.col("old.app_id")
        ).filter(
            F.col("new.priority") != F.col("old.priority")
        ).count()
        
        print(f"Priority changes detected: {priority_changes:,}")
        
        # Save updated catalog
        updated_catalog.write.format("delta").mode("overwrite").save(CATALOG_PATH)
        print("✅ Catalog priorities updated!")
        
except Exception as e:
    print(f"⚠️ Could not update priorities: {e}")
    print("Continuing without priority update...")

## Results

In [None]:
from notebookutils import mssparkutils
import json

print("=" * 60)
print("BRONZE INGESTION COMPLETE")
print("=" * 60)
print(f"Run ID:         {result.run_id}")
print(f"Duration:       {result.duration_seconds:.2f}s")
print(f"Batches:        {len(result.batches_written)}")
print(f"Success Rate:   {result.success_rate}%")
print(f"Games Synced:   {len(APP_IDS):,}")

if result.errors:
    print(f"Errors:         {len(result.errors)}")
    for err in result.errors[:5]:
        print(f"  - {err}")

# Exit with status
mssparkutils.notebook.exit(json.dumps({
    "status": "success" if result.success_rate >= 80.0 else "partial",
    "run_id": str(result.run_id),
    "batches_written": len(result.batches_written),
    "success_rate": result.success_rate,
    "games_synced": len(APP_IDS),
}, default=str))