In [0]:
from pyspark.sql.functions import from_json, col, coalesce, lit
from pyspark.sql.types import *

In [0]:
# ler camada bronze
df_bronze = spark.table("project_data_football_bronze.pontuacao_rodada")
# ler dim rodada
df_dim_rodada = spark.table("project_data_football_silver.dim_rodada")

In [0]:
# Define o schema para o campo 'scout' com os tipos de cada atributo
schema_scout = StructType([
    StructField("G", LongType(), True),   # Gols
    StructField("A", LongType(), True),   # Assistências
    StructField("SG", LongType(), True),  # Saldo de gols
    StructField("CA", LongType(), True),  # Cartões amarelos
    StructField("CV", LongType(), True),  # Cartões vermelhos
    StructField("FS", LongType(), True),  # Faltas sofridas
    StructField("FC", LongType(), True)   # Faltas cometidas
])

In [0]:
# Converte a coluna 'scout' de JSON para um struct com o schema definido
df_bronze = df_bronze.withColumn(
    "scout_struct",
    from_json(col("scout"), schema_scout)
)

In [0]:
# Seleciona e transforma as colunas relevantes do DataFrame bronze para o DataFrame silver
df_silver = df_bronze.select(
    "atleta_id",  # ID do atleta
    "clube_id",   # ID do clube
    "rodada",     # Número da rodada
    "pontuacao",  # Pontuação do atleta na rodada
    coalesce(col("scout_struct.G"), lit(0)).alias("gols"),                # Gols (preenche nulo com 0)
    coalesce(col("scout_struct.A"), lit(0)).alias("assistencias"),        # Assistências (preenche nulo com 0)
    coalesce(col("scout_struct.SG"), lit(0)).alias("saldo_gols"),         # Saldo de gols (preenche nulo com 0)
    coalesce(col("scout_struct.CA"), lit(0)).alias("cartoes_amarelos"),   # Cartões amarelos (preenche nulo com 0)
    coalesce(col("scout_struct.CV"), lit(0)).alias("cartoes_vermelhos"),  # Cartões vermelhos (preenche nulo com 0)
    coalesce(col("scout_struct.FS"), lit(0)).alias("faltas_sofridas"),    # Faltas sofridas (preenche nulo com 0)
    coalesce(col("scout_struct.FC"), lit(0)).alias("faltas_cometidas"),   # Faltas cometidas (preenche nulo com 0)
    "entrou_em_campo",  # Indicador se o atleta entrou em campo
    "dt_ingestao"       # Data de ingestão do registro
)

In [0]:
# Realiza um join entre o DataFrame de pontuação (df_silver) e a dimensão de rodada (df_dim_rodada)
df_silver = df_silver.join(
    df_dim_rodada,
    df_silver["rodada"] == df_dim_rodada["numero_rodada"],
    "left"
).select(
    "rodada_id",         # ID da rodada
    "campeonato_id",     # ID do campeonato
    "atleta_id",         # ID do atleta
    "clube_id",          # ID do clube
    col("numero_rodada").alias("rodada"),  # Número da rodada
    "pontuacao",         # Pontuação do atleta
    "gols",              # Gols marcados
    "assistencias",      # Assistências
    "saldo_gols",        # Saldo de gols
    "cartoes_amarelos",  # Cartões amarelos
    "cartoes_vermelhos", # Cartões vermelhos
    "faltas_sofridas",   # Faltas sofridas
    "faltas_cometidas",  # Faltas cometidas
    "entrou_em_campo",   # Indicador se entrou em campo
    "dt_ingestao"        # Data de ingestão
)

In [0]:
# Salva o DataFrame df_silver como uma tabela Delta, sobrescrevendo se já existir e permitindo atualização de schema
df_silver.write \
    .format("delta") \
    .mode("overwrite") \
    .option("mergeSchema", "true") \
    .saveAsTable("project_data_football_silver.fato_pontuacao")