In [0]:
dbutils.widgets.removeAll()
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import functions as F

# =========================
# (1) PARÁMETROS Y WIDGETS
# =========================
dbutils.widgets.text("catalogo", "catalog_footballdata")
dbutils.widgets.text("esquema_source", "bronze")
dbutils.widgets.text("esquema_sink", "silver")

catalogo = dbutils.widgets.get("catalogo")
esquema_source = dbutils.widgets.get("esquema_source")
esquema_sink = dbutils.widgets.get("esquema_sink")

# ID oficial de Sporting Cristal (archivos)
ID_CRISTAL = 1635 #

# =========================
# (2) LECTURA DE TABLAS BRONZE
# =========================
df_events = spark.read.table(f"{catalogo}.{esquema_source}.match_events")
df_teams = spark.read.table(f"{catalogo}.{esquema_source}.teams")
df_editions = spark.read.table(f"{catalogo}.{esquema_source}.competitions")
df_physical = spark.read.table(f"{catalogo}.{esquema_source}.physical_performance")


In [0]:

# =========================
# (3) TRANSFORMACIONES: MATCH EVENTS
# =========================
# - 1. Creación de flag 'es_cristal' para diferenciar nuestras acciones y las del rival
# - 2. Joins / Merge para traer nombres oficiales y torneos (Ya que team_name en matches es 'SP Cristal' vs su nombre oficial 'Sporting Cristal')
events_silver = df_events \
    .withColumn("es_cristal", F.col("match_id").isNotNull() & (F.col("team_id") == ID_CRISTAL)) \
    .join(df_teams.select(col("team_id").alias("t_id"), col("team_name").alias("full_team_name")), 
          df_events.team_id == col("t_id"), "left") \
    .drop("t_id")
# Nuevas columnas agregadas: es_cristal, full_team_name

In [0]:
# =========================
# (3) TRANSFORMACIONES: MATCH EVENTS
# =========================

# Helper: Identificar el nombre del rival para cada match_id
# Esto nos permite saber contra quién jugamos, incluso en las filas donde Sporting Cristal tiene el balón
df_rivals = df_events.filter(F.col("team_id") != ID_CRISTAL) \
    .select("match_id", "team_id").distinct() \
    .join(df_teams.select(F.col("team_id").alias("rid"), F.col("team_name").alias("rival_name")), 
          F.col("team_id") == F.col("rid"), "inner") \
    .select("match_id", "rival_name")

# - 1. Creación de flag 'es_cristal' para diferenciar nuestras acciones y las del rival
# - 2. Joins / Merge para traer nombres oficiales y torneos (Ya que team_name en matches es 'SP Cristal' vs su nombre oficial 'Sporting Cristal')
# - 3. Nueva columna de dificultad de equipo, para poder visualizar el rendimiento en partidos normales vs de alta demanda
# - 4. Sería ideal poder también tener acá si el partido fue en altura (por el momento complicaría las cosas, así que lo dejaré de lado)
events_silver = df_events \
    .withColumn("es_cristal", F.col("match_id").isNotNull() & (F.col("team_id") == ID_CRISTAL)) \
    .join(df_teams.select(col("team_id").alias("t_id"), col("team_name").alias("full_team_name")), 
          df_events.team_id == col("t_id"), "left") \
    .drop("t_id") \
    .join(df_rivals, "match_id", "left") \
    .withColumn("dificultad_rival", 
        F.when(F.col("rival_name").isin(
            "Club Universitario de Deportes", 
            "Club Alianza Lima", 
            "FBC Melgar", 
            "Cusco Fútbol Club"
        ), "alta").otherwise("normal")
    )
# Nuevas columnas agregadas: es_cristal, full_team_name, rival_name, dificultad_rival

In [0]:
# =========================
# (4) TRANSFORMACIONES: PHYSICAL DATA
# =========================
# - 1. Casteo de fecha para el Line Chart cronológico (fundamental para el eje X del gráfico)
# - 2. Cálculo de métricas de intensidad por minuto jugado (normalización para comparar rendimientos)
#           métricas seleccionadas: distancia_por_minuto, sprints_por_minuto, intensidad_hi_por_minuto

physical_silver = df_physical \
    .withColumn("match_date_dt", F.to_date(F.col("match_date"))) \
    .withColumn("distancia_por_minuto", 
                F.round(F.col("total_distance_full_all") / F.col("minutes_full_all"), 2)) \
    .withColumn("sprints_por_minuto", 
                F.round(F.col("sprint_count_full_all") / F.col("minutes_full_all"), 4)) \
    .withColumn("intensidad_hi_por_minuto", 
                F.round(F.col("hi_distance_full_all") / F.col("minutes_full_all"), 2))


In [0]:
# =========================
# (5) CARGA A CAPA SILVER (SINK)
# =========================
# Guardamos las tablas finales aplicando overwriteSchema para registrar las nuevas columnas calculadas
events_silver.write.mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable(f"{catalogo}.{esquema_sink}.fact_match_events")

physical_silver.write.mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable(f"{catalogo}.{esquema_sink}.fact_physical_performance")

print(f"Capa Silver procesada: {esquema_sink}.fact_match_events y {esquema_sink}.fact_physical_performance listas.")

In [0]:
%sql

SELECT * FROM catalog_footballdata.silver.fact_match_events
LIMIT 10