In [0]:
transactions_df = spark.read.parquet('/mnt/sleeperprojectdl/processed/transactions')

In [0]:
transactions_transformed_df = transactions_df.orderBy("created").filter("status = 'complete'")

In [0]:
display(transactions_transformed_df)

transaction_id,created,status,status_updated,week,type,roster_ids,consenter_ids,drops,adds,draft_picks,waiver_bid,notes,season,ingestion_date
849100194858541056,2022-07-02T23:16:11Z,complete,2022-07-02T23:16:11Z,1,commissioner,List(1),,,Map(1479 -> 1),List(),,,2022,2024-07-16T23:39:51.726Z
849122088177512448,2022-07-03T00:43:11Z,complete,2022-07-03T00:43:11Z,1,free_agent,List(1),List(1),Map(536 -> 1),,List(),,,2022,2024-07-16T23:39:51.726Z
849122955958018048,2022-07-03T00:46:37Z,complete,2022-07-03T00:46:37Z,1,commissioner,"List(6, 1)",,Map(1479 -> 6),Map(1479 -> 1),List(),,,2022,2024-07-16T23:39:51.726Z
849126393919954944,2022-07-03T01:00:17Z,complete,2022-07-03T01:00:17Z,1,free_agent,List(4),List(4),Map(8183 -> 4),,List(),,,2022,2024-07-16T23:39:51.726Z
849126490481209344,2022-07-03T01:00:40Z,complete,2022-07-03T01:00:40Z,1,free_agent,List(4),List(4),Map(4351 -> 4),Map(8408 -> 4),List(),,,2022,2024-07-16T23:39:51.726Z
849128963933609984,2022-07-03T01:10:30Z,complete,2022-07-03T01:10:30Z,1,free_agent,List(8),List(8),Map(7496 -> 8),,List(),,,2022,2024-07-16T23:39:51.726Z
849190398495678464,2022-07-03T05:14:37Z,complete,2022-07-04T14:13:07Z,1,trade,"List(2, 1)","List(2, 1)","Map(6803 -> 1, 6801 -> 2)","Map(6803 -> 2, 6801 -> 1)","List(List(1, 2, 1, null, 2023, 1), List(1, 2, 1, null, 2023, 2), List(1, 2, 1, null, 2024, 1), List(1, 2, 1, null, 2025, 1))",,,2022,2024-07-16T23:39:51.726Z
849211303250759680,2022-07-03T06:37:41Z,complete,2022-07-03T06:37:41Z,1,free_agent,List(10),List(10),Map(6845 -> 10),,List(),,,2022,2024-07-16T23:39:51.726Z
849211378903388160,2022-07-03T06:37:59Z,complete,2022-07-03T06:37:59Z,1,free_agent,List(10),List(10),Map(8111 -> 10),,List(),,,2022,2024-07-16T23:39:51.726Z
849211761344229376,2022-07-03T06:39:30Z,complete,2022-07-03T06:39:30Z,1,free_agent,List(10),List(10),Map(4089 -> 10),,List(),,,2022,2024-07-16T23:39:51.726Z


In [0]:
from pyspark.sql.functions import col, explode, lit, when

#### Creating Transactions Table

In [0]:
final_transactions_df = transactions_transformed_df.select(
        col("transaction_id"),
        col("type"),
        col("created"),
        col("status_updated").alias("completed"),
        col("waiver_bid"),
        col("week"),
        col("season"),
        col("ingestion_date")
    ) \
    .orderBy("completed") \
    .withColumn("type", when(col("type") == "free_agent", lit("free agent")).otherwise(col("type")))

In [0]:
final_transactions_df.write.mode("overwrite").parquet("/mnt/sleeperprojectdl/presentation/transactions")

#### Creating Consenters Table

In [0]:
consenters_df = transactions_transformed_df.select(
        col("transaction_id"),
        explode(col("roster_ids")).alias("roster_id")
    )

In [0]:
consenters_df.write.mode("overwrite").parquet("/mnt/sleeperprojectdl/presentation/consenters")

#### Creating Roster Actions Table

In [0]:
adds_df = transactions_transformed_df.select(
        col("transaction_id"),
        explode(col("adds"))
    ) \
    .withColumnsRenamed({
        "key": "player_id",
        "value": "roster_id"
    }) \
    .withColumn("action", lit("add"))

rearranged_adds_df = adds_df.select(
        col("transaction_id"),
        col("action"),
        col("roster_id"),
        col("player_id")
    ) \
    .orderBy("transaction_id")

In [0]:
drops_df = transactions_transformed_df.select(
        col("transaction_id"),
        explode(col("drops"))
    ) \
    .withColumnsRenamed({
        "key": "player_id",
        "value": "roster_id"
    }) \
    .withColumn("action", lit("drop"))

rearranged_drops_df = drops_df.select(
        col("transaction_id"),
        col("action"),
        col("roster_id"),
        col("player_id")
    ) \
    .orderBy("transaction_id")

In [0]:
rearranged_adds_df.write.mode("append").parquet("/mnt/sleeperprojectdl/presentation/roster_actions")
rearranged_drops_df.write.mode("append").parquet("/mnt/sleeperprojectdl/presentation/roster_actions")

In [0]:
final_roster_actions_df = spark.read.parquet("/mnt/sleeperprojectdl/presentation/roster_actions") \
    .orderBy("transaction_id", "roster_id")

final_roster_actions_df.write.mode("overwrite").parquet("/mnt/sleeperprojectdl/presentation/roster_actions")

#### Creating Traded Draft Picks Table

In [0]:
draft_picks_df = transactions_transformed_df.select(
        col("transaction_id"),
        explode(col("draft_picks"))
    )

draft_picks_final_df = draft_picks_df.select(
    col("transaction_id"),
    col("col.previous_owner_id").alias("previous_roster_id"),
    col("col.owner_id").alias("new_roster_id"),
    col("col.season"),
    col("col.round")
)

draft_picks_final_df.write.mode("overwrite").parquet("/mnt/sleeperprojectdl/presentation/traded_picks")