In [0]:
df = spark.read.table('sleeper.bronze_players')

In [0]:
display(df)

In [0]:
%sql

SELECT * FROM sleeper.bronze_rosters

In [0]:
from pyspark.sql.functions import current_timestamp, expr, explode

In [0]:
df = spark.read.table("sleeper.bronze_rosters") \
    .withColumn("player_id", explode("players")) \
    .withColumn("is_starter", expr("array_contains(starters, player_id)")) \
    .withColumn("player_nickname", expr("metadata['p_nick_' || player_id]"))

df = df.select("owner_id", "roster_id", "player_id", "is_starter", "player_nickname", "_league_id", "_matchup_week", "_year", "_ingested_ts")\
    .withColumn("_snapshot_ts", current_timestamp())
display(df)

In [0]:
%sql

SELECT * FROM sleeper.bronze_rosters

In [0]:
df = spark.read.table("sleeper.bronze_rosters") \
    .withColumn("streak", expr("metadata['streak']")) \
    .withColumn("record", expr("metadata['record']"))\
    .withColumn("wins", expr("settings['wins']"))\
    .withColumn("losses", expr("settings['losses']"))\
    .withColumn("ties", expr("settings['ties']"))\
    .withColumn("fpts", expr("settings['fpts'] + settings['fpts_decimal'] / 100")) \
    .withColumn("fpts_against", expr("settings['fpts_against'] + settings['fpts_against_decimal'] / 100")) \
    .withColumn("total_moves", expr("settings['total_moves']")) \
    .withColumn("waiver_budget_used", expr("settings['waiver_budget_used']")) \
    .withColumn("waiver_position", expr("settings['waiver_position']"))

df = df.select(
    "owner_id", 
    "roster_id",
    "streak",
    "record",
    "wins",
    "losses",
    "ties",
    "fpts",
    "fpts_against",
    "total_moves",
    "waiver_budget_used",
    "waiver_position",
    "_league_id",
    "_matchup_week",
    "_year",
    "_ingested_ts",
)

display(df)

In [0]:
%sql

SELECT * FROM sleeper.bronze_matchups

In [0]:
df = spark.read.table('sleeper.bronze_matchups')\
    .select(
        "matchup_id",
        "roster_id",
        "points",
        "_league_id",
        "_matchup_week",
        "_year",
        "_ingested_ts"
    ).withColumn("_snapshot_ts", current_timestamp())

In [0]:
%sql

SELECT * FROM sleeper.bronze_matchups

In [0]:
from pyspark.sql.functions import array_contains, col, explode

df = spark.read.table('sleeper.bronze_matchups') \
    .withColumn("player_id", explode(col("players"))) \
    .withColumn("is_starter", array_contains(col("starters"), col("player_id"))) \
    .withColumn("player_points", col("players_points")[col("player_id")])

df = df.select(
    "roster_id",
    "matchup_id",
    "player_id",
    "player_points",
    "is_starter",
    "_league_id",
    "_matchup_week",
    "_year",
    "_ingested_ts",
)

display(df)

In [0]:
%sql

SELECT * FROM sleeper.bronze_users

In [0]:
df = spark.read.table('sleeper.bronze_users')\
    .withColumnRenamed("display_name", "owner_name")\
    .withColumnRenamed("user_id", "owner_id")\
    .withColumnRenamed("is_owner", "is_commissioner")\
    .withColumn("team_name", col("metadata.team_name"))

df = df.select(
    "owner_id",
    "owner_name",
    "is_bot",
    "is_commissioner",
    "team_name",
    "_league_id",
    "_matchup_week",
    "_year",
    "_ingested_ts"
)

display(df)

In [0]:
%sql

SELECT * FROM sleeper.silver_matchups_players_dim

In [0]:
from pyspark.sql.functions import col, concat_ws, coalesce, when
from pyspark.sql.window import Window
import pyspark.sql.functions as F

df_matchups_players_dim = spark.read.table('sleeper.silver_matchups_players_dim')
df_players_dim = spark.read.table('sleeper.silver_players_dim')

df_players_dim = df_players_dim.select(
    "player_id",
    "_league_id",
    "_matchup_week",
    coalesce(col("full_name"), col("last_name")).alias("player_name"),
    col("position").alias("player_position"),
    col("team").alias("nfl_team"),
    "years_exp",
    "injury_status",
    concat_ws(" ", col("injury_body_part"), col("injury_notes")).alias("injury_notes"),
    "college",
    when(col("years_exp") == 1, True).otherwise(False).alias("is_rookie")
)

df_joined = df_matchups_players_dim.join(
    df_players_dim,
    (df_matchups_players_dim.player_id == df_players_dim.player_id) &
    (df_matchups_players_dim._league_id == df_players_dim._league_id) &
    (df_matchups_players_dim._matchup_week == df_players_dim._matchup_week)
)

window_spec = Window.partitionBy(
    df_matchups_players_dim["_league_id"],
    df_matchups_players_dim["_matchup_week"],
    "player_position"
).orderBy("player_points")

df_joined = df_joined.withColumn("position_points_percentile", F.percent_rank().over(window_spec))

df_joined = df_joined.select(
    "roster_id",
    "matchup_id",
    df_matchups_players_dim.player_id,
    "player_name",
    "player_position",
    "nfl_team",
    "player_points",
    "position_points_percentile",
    "is_starter",
    "years_exp",
    "is_rookie",
    "injury_status",
    "injury_notes",
    "college",
    df_matchups_players_dim._league_id,
    df_matchups_players_dim._matchup_week,
    df_matchups_players_dim._year
)

display(df_joined)

In [0]:
%sql

SELECT * FROM sleeper.silver_matchups_fact

In [0]:
df_matchups_fact = spark.read.table('sleeper.silver_matchups_fact')
df_rosters_dim = spark.read.table('sleeper.silver_rosters_dim')
df_users_dim = spark.read.table('sleeper.silver_users_dim')

df_result = df_matchups_fact.join(
    df_rosters_dim,
    (df_matchups_fact._league_id == df_rosters_dim._league_id) &
    (df_matchups_fact.roster_id == df_rosters_dim.roster_id) &
    (df_matchups_fact._matchup_week == df_rosters_dim._matchup_week)
).join(
    df_users_dim,
    (df_matchups_fact._league_id == df_users_dim._league_id) &
    (df_rosters_dim.owner_id == df_users_dim.owner_id) &
    (df_matchups_fact._matchup_week == df_users_dim._matchup_week)
).select(
    "matchup_id",
    df_matchups_fact.roster_id,
    df_users_dim.owner_id,
    df_users_dim.owner_name,
    df_users_dim.is_commissioner,
    df_users_dim.team_name,
    df_matchups_fact.points,
    df_rosters_dim.streak,
    df_rosters_dim.record,
    df_rosters_dim.wins,
    df_rosters_dim.losses,
    df_rosters_dim.ties,
    df_rosters_dim.fpts,
    df_rosters_dim.fpts_against,
    df_rosters_dim.waiver_budget_used,
    df_rosters_dim.waiver_position,
    df_matchups_fact._league_id,
    df_matchups_fact._matchup_week,
    df_matchups_fact._year
)

display(df_result)

In [0]:
%sql

SELECT * FROM sleeper.silver_model_roster_results

In [0]:
%sql

SELECT * FROM sleeper.silver_model_player_performances

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.functions import col

df_roster_results = spark.read.table('sleeper.silver_model_roster_results')
df_player_performances = spark.read.table('sleeper.silver_model_player_performances')

df_joined_results = df_roster_results.alias("roster").join(
    df_player_performances.alias("performance"),
    (col("roster.roster_id") == col("performance.roster_id")) &
    (col("roster.matchup_id") == col("performance.matchup_id")) &
    (col("roster._matchup_week") == col("performance._matchup_week"))
)

# Aggregate starter points and bench points
df_aggregated = df_joined_results.groupBy(
    "roster.matchup_id",
    "roster.roster_id",
    "roster.owner_id",
    "roster.owner_name",
    "roster.is_commissioner",
    "roster.team_name",
    "roster.points",
    "roster.streak",
    "roster.record",
    "roster.wins",
    "roster.losses",
    "roster.ties",
    "roster.fpts",
    "roster.fpts_against",
    "roster.waiver_budget_used",
    "roster.waiver_position",
    "roster._league_id",
    "roster._matchup_week",
    "roster._year"
).agg(
    F.sum(F.when(col("performance.is_starter"), col("performance.player_points")).otherwise(0)).alias("starter_points"),
    F.sum(F.when(~col("performance.is_starter"), col("performance.player_points")).otherwise(0)).alias("bench_points")
)

# Find bench players who scored more than starters in the same position
window_spec = Window.partitionBy(
    "performance.matchup_id", "performance.roster_id", "performance.player_position"
).orderBy(F.desc("performance.player_points"))

df_ranked = df_joined_results.withColumn("rank", F.rank().over(window_spec))

df_bench_better_than_starters = df_ranked.filter(
    (col("rank") == 1) & (~col("performance.is_starter"))
).select(
    "performance.matchup_id",
    "performance.roster_id",
    "performance.player_id",
    "performance.player_name",
    "performance.player_position",
    "performance.player_points"
)

# Join back to find the corresponding starter
df_starters = df_joined_results.filter(col("performance.is_starter")).select(
    "performance.matchup_id",
    "performance.roster_id",
    "performance.player_position",
    col("performance.player_name").alias("starter_player_name"),
    col("performance.player_points").alias("starter_player_points")
)

df_bench_better_than_starters = df_bench_better_than_starters.join(
    df_starters,
    (df_bench_better_than_starters.matchup_id == df_starters.matchup_id) &
    (df_bench_better_than_starters.roster_id == df_starters.roster_id) &
    (df_bench_better_than_starters.player_position == df_starters.player_position)
).select(
    df_bench_better_than_starters.matchup_id,
    df_bench_better_than_starters.roster_id,
    F.struct(
        df_bench_better_than_starters.player_name.alias("benched_player_name"),
        df_bench_better_than_starters.player_points.alias("benched_player_points"),
        df_starters.starter_player_name,
        df_starters.starter_player_points,
        (df_bench_better_than_starters.player_points - df_starters.starter_player_points).alias("point_opportunity_cost")
    ).alias("bench_better_than_starter")
)

# Aggregate the structs into a list and sum point_opportunity_cost
df_bench_better_than_starters_agg = df_bench_better_than_starters.groupBy(
    "matchup_id", "roster_id"
).agg(
    F.collect_list("bench_better_than_starter").alias("bench_better_than_starters"),
    F.sum("bench_better_than_starter.point_opportunity_cost").alias("missed_starter_points")
)

# Join the aggregated bench better than starters to df_aggregated
df_aggregated = df_aggregated.join(
    df_bench_better_than_starters_agg,
    ["matchup_id", "roster_id"],
    "left"
)

# Create a column for highest scoring players (top 3 performing players)
window_spec_highest_scoring = Window.partitionBy("roster.matchup_id", "roster.roster_id").orderBy(F.desc("performance.player_points"))

df_highest_scoring = df_joined_results.withColumn("rank", F.row_number().over(window_spec_highest_scoring)).filter(col("rank") <= 3)

df_highest_scoring_agg = df_highest_scoring.groupBy("roster.matchup_id", "roster.roster_id").agg(
    F.collect_list(
        F.struct(
            col("performance.player_name").alias("highest_scoring_player_name"),
            col("performance.player_points").alias("highest_scoring_player_points")
        )
    ).alias("highest_scoring_players")
)

df_aggregated = df_aggregated.join(
    df_highest_scoring_agg,
    ["matchup_id", "roster_id"],
    "left"
)

# Add opponent points
df_opponent_points = df_aggregated.select(
    col("matchup_id"),
    col("roster_id").alias("opponent_roster_id"),
    col("starter_points").alias("opponent_starter_points")
)

df_aggregated = df_aggregated.join(
    df_opponent_points,
    (df_aggregated.matchup_id == df_opponent_points.matchup_id) &
    (df_aggregated.roster_id != df_opponent_points.opponent_roster_id),
    "left"
)

# Add coulve_won_with_bench column
df_aggregated = df_aggregated.withColumn(
    "couldve_won_with_missed_bench_points",
    F.when(
        col("starter_points") > col("opponent_starter_points"),
        None
    ).when(
        (col("starter_points") + col("missed_starter_points")) > col("opponent_starter_points"),
        True
    ).otherwise(False)
)

display(df_aggregated)
df_aggregated.printSchema()