In [11]:
from pyspark.sql import SparkSession

In [12]:
spark = SparkSession.builder.appName("playersTableCreation").getOrCreate()

In [13]:
spark

In [27]:
players_season = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://postgres:5432/postgres") \
    .option("dbtable", "player_seasons") \
    .option("user", "postgres") \
    .option("password", "postgres") \
    .option("driver", "org.postgresql.Driver") \
    .load()

In [29]:
#For first time testings - 
#players_season.show(5)

In [16]:
def cumulatively_increment_players(spark, player_seasons, players, min_season, max_season):
    # Ensure `season` in player_seasons is an integer
    player_seasons = player_seasons.withColumn("season", player_seasons["season"].cast("int"))
    
    player_seasons.createOrReplaceTempView("player_seasons")
    players.createOrReplaceTempView("players")
    
    # Iterate over each season
    for i in range(min_season, max_season + 1):
        current_season = i - 1
        season = i
        print(f"Current Season in players = {current_season} \n season {season}")
        # SparkSQL query
        cumulative_query = f"""
        WITH yesterday AS (
            SELECT * FROM players
            WHERE current_season = {current_season}
        ),
        today AS (
            SELECT * FROM player_seasons
            WHERE season = {season}
        )
        SELECT
            COALESCE(t.player_name, y.player_name) AS player_name,
            COALESCE(t.height, y.height) AS height,
            COALESCE(t.college, y.college) AS college,
            COALESCE(t.country, y.country) AS country,
            COALESCE(t.draft_year, y.draft_year) AS draft_year,
            COALESCE(t.draft_round, y.draft_round) AS draft_round,
            COALESCE(t.draft_number, y.draft_number) AS draft_number,
            CASE 
                WHEN y.season_stats IS NULL THEN array(named_struct('season', t.season, 'gp', t.gp, 'pts', t.pts, 'reb', t.reb, 'ast', t.ast))
                WHEN t.season IS NOT NULL THEN concat(y.season_stats, array(named_struct('season', t.season, 'gp', t.gp, 'pts', t.pts, 'reb', t.reb, 'ast', t.ast)))
                ELSE y.season_stats
            END AS season_stats,
            COALESCE(t.season, y.current_season + 1) as current_season
        FROM today t
        FULL OUTER JOIN yesterday y
        ON t.player_name = y.player_name
        """
        
        # Execute the query
        updated_players = spark.sql(cumulative_query)
        
        # Append the new results to `players` (cumulative union)
        players = players.union(updated_players)
    
    return players

In [30]:
from pyspark.sql.types import *

## In postgresSQL we created a TYPE. In Spark we need to created an StructType[StructField("Column", DataType, Nullable Boolean)]
     CREATE TYPE season_stats AS (
 	season INTEGER,
     gp INTEGER,
     pts REAL,
     reb REAL,
     ast REAL
     );

In [31]:
season_stats_schema = StructType([
    StructField("season", IntegerType(), True),
    StructField("gp", IntegerType(), True),
    StructField("pts", FloatType(), True),
    StructField("reb", FloatType(), True),
    StructField("ast", FloatType(), True)
])

In [52]:
players_schema = StructType([
    StructField("player_name", StringType(), True),
    StructField("height", StringType(), True),
    StructField("college", StringType(), True),
    StructField("country", StringType(), True),
    StructField("draft_year", StringType(), True),
    StructField("draft_round", StringType(), True),
    StructField("draft_number", StringType(), True),
    StructField("season_stats", ArrayType(season_stats_schema), True),
    StructField("current_season", IntegerType(), True)
])

In [53]:
# Initialize an empty DataFrame for players
players = spark.createDataFrame([], players_schema)

In [54]:
# Execute the cumulative function
output_df = cumulatively_increment_players(spark, players_season, players, 1996, 2022)

Current Season in players = 1995 
 season 1996
Current Season in players = 1996 
 season 1997
Current Season in players = 1997 
 season 1998
Current Season in players = 1998 
 season 1999
Current Season in players = 1999 
 season 2000
Current Season in players = 2000 
 season 2001
Current Season in players = 2001 
 season 2002
Current Season in players = 2002 
 season 2003
Current Season in players = 2003 
 season 2004
Current Season in players = 2004 
 season 2005
Current Season in players = 2005 
 season 2006
Current Season in players = 2006 
 season 2007
Current Season in players = 2007 
 season 2008
Current Season in players = 2008 
 season 2009
Current Season in players = 2009 
 season 2010
Current Season in players = 2010 
 season 2011
Current Season in players = 2011 
 season 2012
Current Season in players = 2012 
 season 2013
Current Season in players = 2013 
 season 2014
Current Season in players = 2014 
 season 2015
Current Season in players = 2015 
 season 2016
Current Seaso

In [56]:
output_df.show(20)

                                                                                

+----------------+------+-----------------+-------+----------+-----------+------------+--------------------+--------------+
|     player_name|height|          college|country|draft_year|draft_round|draft_number|        season_stats|current_season|
+----------------+------+-----------------+-------+----------+-----------+------------+--------------------+--------------+
|   Reggie Miller|   6-7|             UCLA|    USA|      1987|          1|          11|[{1996, 81, 21.6,...|          1996|
|     Terry Mills|  6-10|         Michigan|    USA|      1990|          1|          16|[{1996, 79, 10.8,...|          1996|
|    Bobby Phills|   6-5|         Southern|    USA|      1991|          2|          45|[{1996, 69, 12.6,...|          1996|
|    Carl Herrera|   6-9|          Houston|    USA|      1990|          2|          30|[{1996, 75, 8, 4....|          1996|
|   Eric Williams|   6-8|       Providence|    USA|      1995|          1|          14|[{1996, 72, 15, 4...|          1996|
|   Lloy