In [0]:
from pyspark.sql.functions import col, sum as _sum, count, when, round, lit
from delta.tables import DeltaTable


events = spark.table("default.silver_events")
matches = spark.table("default.silver_matches").select("match_id", "stage", "match_date")

events_with_stage = events.join(matches, "match_id")


final_third_passes = events_with_stage.filter(
    (col("event_type") == "Pass") & 
    (col("loc_x") >= 80)
)

match_dominance = final_third_passes.groupBy("match_id", "stage", "team_name").agg(
    count("event_id").alias("final_third_passes")
)

match_totals = match_dominance.groupBy("match_id").agg(
    _sum("final_third_passes").alias("match_total_f3_passes")
)

# Join & Calculate Field Tilt %
gold_dominance = match_dominance.join(match_totals, "match_id") \
    .withColumn("field_tilt", col("final_third_passes") / col("match_total_f3_passes")) \
    .select("match_id", "stage", "team_name", "final_third_passes", "field_tilt")

gold_dominance.write.format("delta").mode("overwrite").saveAsTable("default.gold_dominance")
print(" KPI 1: Dominance (Field Tilt) Updated [Stage Aware]")

# KPI 2: TEAM EFFICIENCY (xG vs Goals)
# Filter for SHOTS only
shots = events_with_stage.filter(col("event_type") == "Shot")

# Aggregation: Group by STAGE and TEAM
efficiency_stats = shots.groupBy("stage", "team_name").agg(
    count("event_id").alias("total_shots"),
    _sum("xg").alias("total_xg"),
    count(when(col("shot_outcome") == "Goal", True)).alias("total_goals")
)

# Calculate Efficiency
gold_efficiency = efficiency_stats.withColumn(
    "efficiency_index", 
    round(col("total_goals") - col("total_xg"), 2)
).orderBy(col("efficiency_index").desc())

gold_efficiency.write.format("delta").mode("overwrite").saveAsTable("default.gold_efficiency")
print(" KPI 2: Efficiency Index Updated [Stage Aware]")

# KPI 3: PRESSING INTENSITY
# Logic: If possession team is Home, the Pressing team is Away
# We need the full match details (home/away teams) for this logic
matches_full = spark.table("default.silver_matches")
events_context = events.join(matches_full, "match_id") \
    .withColumn("pressing_team", 
                when(col("team_name") == col("home_team"), col("away_team"))
                .otherwise(col("home_team")))

# Filter for "Under Pressure" moments
pressures = events_context.filter(col("under_pressure") == True)

# Aggregate: Group by STAGE and PRESSING TEAM
gold_pressing = pressures.groupBy("stage", "pressing_team").agg(
    count("event_id").alias("total_pressures")
).orderBy(col("total_pressures").desc())

gold_pressing.write.format("delta").mode("overwrite").saveAsTable("default.gold_pressing")
print(" KPI 3: Pressing Intensity Updated [Stage Aware]")

# KPI 4: PLAYER IMPACT
player_events = events_with_stage.filter(col("player_name").isNotNull())

gold_player_impact = player_events.groupBy("stage", "player_name", "team_name").agg(
    count("event_id").alias("total_actions"),
    _sum("xg").alias("total_xg_generated")
).orderBy(col("total_actions").desc())

gold_player_impact.write.format("delta").mode("overwrite").saveAsTable("default.gold_player_impact")
print(" KPI 4: Player Impact Updated [Stage Aware]")

# KPI 5: DISCIPLINE (Fouls & Cards)
fouls = events_with_stage.filter(col("event_type") == "Foul Committed")

# We aggregate by STAGE and TEAM
# (Added check for 'total_cards' if your data supports it, otherwise defaults to total_fouls)
gold_discipline = fouls.groupBy("stage", "team_name").agg(
    count("event_id").alias("total_fouls")
).orderBy(col("total_fouls").desc())

gold_discipline.write.format("delta").mode("overwrite").saveAsTable("default.gold_discipline")
print(" KPI 5: Discipline Risk Updated [Stage Aware]")

# KPI 6: BRACKET (Already has stage, just refreshing)
df_matches = spark.table("silver_matches")
knockout_stages = ["Round of 16", "Quarter-finals", "Semi-finals", "3rd Place Final", "Final"]
bracket_df = df_matches.filter(col("stage").isin(knockout_stages))

bracket_df = bracket_df.withColumn("Winner", 
    when(col("home_score") > col("away_score"), col("home_team"))
    .when(col("away_score") > col("home_score"), col("away_team"))
    .when((col("home_team") == "Egypt") & (col("away_team") == "DR Congo"), "DR Congo")
    .when((col("home_team") == "Senegal") & (col("away_team") == "Côte d'Ivoire"), "Côte d'Ivoire")
    .when((col("home_team") == "Cape Verde") & (col("away_team") == "South Africa"), "South Africa")
    .when((col("home_team") == "South Africa") & (col("away_team") == "DR Congo"), "South Africa")
    .otherwise("TBD")
).withColumn("Stage_Sort", 
    when(col("stage") == "Round of 16", 1)
    .when(col("stage") == "Quarter-finals", 2)
    .when(col("stage") == "Semi-finals", 3)
    .when(col("stage") == "3rd Place Final", 4)
    .when(col("stage") == "Final", 5)
    .otherwise(99)
)

gold_bracket = bracket_df.select("match_id", "Stage_Sort", "stage", "match_date", "home_team", "away_team", "home_score", "away_score", "Winner")

if not spark.catalog.tableExists("gold_bracket"):
    gold_bracket.write.format("delta").saveAsTable("gold_bracket")
else:
    DeltaTable.forName(spark, "gold_bracket").alias("t").merge(
        gold_bracket.alias("s"), "t.match_id = s.match_id"
    ).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()

print(" KPI 6: Bracket Updated")

✅ KPI 1: Dominance (Field Tilt) Updated [Stage Aware]
✅ KPI 2: Efficiency Index Updated [Stage Aware]
✅ KPI 3: Pressing Intensity Updated [Stage Aware]
✅ KPI 4: Player Impact Updated [Stage Aware]
✅ KPI 5: Discipline Risk Updated [Stage Aware]
✅ KPI 6: Bracket Updated
