Carregamento das tabelas Bronze

In [0]:
from pyspark.sql.functions import col, trim, lower, regexp_replace, sum, avg, first, expr

stats_df = spark.read.table("workspace.nba_lakehouse_bronze.nba_stats_bronze")
adp_df = spark.read.table("workspace.nba_lakehouse_bronze.nba_adp_bronze")
proj_df = spark.read.table("workspace.nba_lakehouse_bronze.nba_projections_bronze")
injury_df = spark.read.table("workspace.nba_lakehouse_bronze.nba_injury_report")


Visualização inicial

In [0]:
stats_df.printSchema()

In [0]:
display(stats_df)

Funções de limpeza dos DataFrames

In [0]:
import unicodedata
from pyspark.sql.functions import col, trim, lower, regexp_replace, first, expr, when, row_number, udf
from pyspark.sql.window import Window
from pyspark.sql.types import StringType

def remove_accents(input_str):
    if input_str is None:
        return None
    nfkd_form = unicodedata.normalize('NFKD', input_str)
    return u"".join([c for c in nfkd_form if not unicodedata.combining(c)])

remove_accents_udf = udf(remove_accents, StringType())

def clean_stats(df):
    player_window = Window.partitionBy("Player").orderBy(when(col("Team").rlike("^\\d+TM$"), 0).otherwise(1))
    df_filtered = df.withColumn("row_num", row_number().over(player_window)).filter(col("row_num") == 1).drop("row_num")

    return (
        df_filtered.withColumn("normalized_player", remove_accents_udf(col("Player")))
          .withColumn("Player_Key", trim(lower(regexp_replace(col("normalized_player"), r"[^\w\s]", ""))))
          .groupBy("Player_Key", "Player")
          .agg(
              first("Pos").alias("Pos"),
              first("G").alias("G"),
              first("MP").alias("MP"),
              first("PTS").alias("PTS"),
              first("TRB").alias("TRB"),
              first("AST").alias("AST"),
              first("STL").alias("STL"),
              first("BLK").alias("BLK"),
              first("TOV").alias("TOV"),
              first("FG").alias("FG"),
              first("FGA").alias("FGA"),
              first("FT").alias("FT"),
              first("FTA").alias("FTA"),
              first("3P").alias("3P"),
              first("3PA").alias("3PA")
          )
          .withColumn("FG_PCT", expr("try_divide(FG, FGA)"))
          .withColumn("3P_PCT", expr("try_divide(`3P`, `3PA`)"))
          .withColumn("FT_PCT", expr("try_divide(FT, FTA)"))
          .select(
              "Player_Key", "Player", "Pos", "G", "MP",
              "PTS", "TRB", "AST", "STL", "BLK", "TOV",
              "FG_PCT", "3P_PCT", "FT_PCT"
          )
    )

def clean_adp(df):
    return (
        df.withColumn("normalized_player", remove_accents_udf(col("Player")))
          .withColumn("Player_Key", trim(lower(regexp_replace(col("normalized_player"), r"[^\w\s]", ""))))
          .select(
              col("Player_Key"),
              col("Rank").alias("ADP_Rank"),
              col("AVG").alias("ADP_Avg")
          )
    )

def clean_proj(df):
    return (
        df.withColumn("normalized_player", remove_accents_udf(col("Player")))
          .withColumn("Player_Key", trim(lower(regexp_replace(col("normalized_player"), r"[^\w\s]", ""))))
          .select(
              "Player_Key",
              col("PTS").alias("Projected_PTS"),
              col("TRB").alias("Projected_TRB"),
              col("AST").alias("Projected_AST"),
              col("STL").alias("Projected_STL"),
              col("BLK").alias("Projected_BLK"),
              col("TOV").alias("Projected_TOV")
          )
    )

In [0]:
from pyspark.sql.functions import col, trim, lower, regexp_replace, concat, lit, concat_ws

def transform_player_info_to_silver(bronze_table_name):
    
    df_bronze = spark.read.table(bronze_table_name)
    
    df_with_key = df_bronze.withColumn(
        "Player_Name",
        concat_ws(' ', col("fname"), col("lname"))
    ).withColumn(
        "Player_Key",
        trim(lower(regexp_replace(col("Player_Name"), r'[^\w\s]', '')))
    )
    
    df_with_url = df_with_key.withColumn(
        "Image_URL",
        concat(
            lit("https://cdn.nba.com/headshots/nba/latest/1040x760/"),
            col("playerid"),
            lit(".png")
        )
    )
    
    df_silver_players = df_with_url.select(
        col("Player_Key"),
        col("playerid").alias("Player_ID_NBA"),
        col("Player_Name"),
        col("Image_URL"),
        col("position").alias("Position"),
        col("height").alias("Height_Str"),
        col("weight").alias("Weight_kg"),
        col("birthday").alias("Birthdate"),
        col("country").alias("Country"),
        col("school").alias("School"),
        col("draft_year").alias("Draft_Year"),
        col("draft_round").alias("Draft_Round"),
        col("draft_number").alias("Draft_Number")
    )
    
    return df_silver_players

In [0]:
from pyspark.sql.functions import col, trim, lower, regexp_replace

def add_injury_status(main_df, injury_df):
    
    injury_processed_df = injury_df.withColumn(
        "Player_Key",
        trim(lower(regexp_replace(remove_accents_udf(col("Player")), r"[^\w\s]", "")))
    ).select("Player_Key", "Status")

    df_with_status = main_df.join(injury_processed_df, "Player_Key", "left")
    
    df_final = df_with_status.na.fill({"Status": "Healthy"})
    
    return df_final

Execução da limpeza

In [0]:
stats_clean = clean_stats(stats_df)
adp_clean = clean_adp(adp_df)
proj_clean = clean_proj(proj_df)


In [0]:
# display(stats_clean)
# display(adp_clean)
# display(proj_clean)

Junção e geração do DataFrame Silver

In [0]:
silver_df = (
    stats_clean.join(adp_clean, "Player_Key", "left")
               .join(proj_clean, "Player_Key", "left")
               .filter(col("ADP_Rank").isNotNull())
               .select(
                   "Player_Key",
                   "Player",
                   "Pos",
                   "G",
                   col("PTS").alias("Past_Season_PTS"),
                   col("TRB").alias("Past_Season_TRB"),
                   col("AST").alias("Past_Season_AST"),
                   col("STL").alias("Past_Season_STL"),
                   col("BLK").alias("Past_Season_BLK"),
                   col("TOV").alias("Past_Season_TOV"),
                   "Projected_PTS",
                   "Projected_TRB",
                   "Projected_AST",
                   "Projected_STL",
                   "Projected_BLK",
                   "Projected_TOV",
                   "ADP_Rank",
                   "ADP_Avg"
               )
)

In [0]:
# display(injury_df)

In [0]:
# silver_df_final = add_injury_status(silver_df, injury_df)

Exportação para delta table

In [0]:
display(silver_df)

In [0]:
def save_dataframe_as_table(df, table_name, mode="overwrite"):
    df.write.mode(mode).format("delta").saveAsTable(table_name)
    print(f"Tabela '{table_name}' salva com sucesso.")

silver_dim_table_name = "workspace.nba_lakehouse_silver.dim_jogadores_silver"
df_silver_player_dims = transform_player_info_to_silver("workspace.nba_lakehouse_bronze.kaggle_player_info_bronze")

save_dataframe_as_table(df_silver_player_dims, silver_dim_table_name)

In [0]:
silver_df = spark.table("workspace.nba_lakehouse_silver.fato_ranking_fantasy")
display(silver_df)