In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as f

In [2]:
spark = SparkSession.builder \
    .appName('spark-bq-predictions') \
    .getOrCreate()

24/03/29 11:55:59 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [None]:
spark

In [3]:
teams = spark.read.format('bigquery') \
  .option('table', 'dez-nba-analytics.nba_database.nba_teams') \
  .load()

teams = teams.select('id', 'full_name', 'abbreviation')

In [4]:
box_df = spark.read.format('bigquery') \
  .option('table', 'dez-nba-analytics.nba_database.player_boxscore_par_cl') \
  .load()

In [5]:
box_df = box_df.join(teams, box_df.team == teams.abbreviation, 'inner')

In [6]:
box_df.show()

24/03/29 11:56:15 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

+--------+-------------------+--------------+---------+--------------+----+----+----+-----------+---+---------------+------------------+-------------------+-------------+----------------+------------+---------------+------------------+--------------+----+----+---+---+---+---+---+---+---------+---+---------+----------+--------------------+------------+
| game_id|          game_date|   season_type|player_id|        player|team|home|away|mins_played|PTS|field_goal_made|field_goal_attempt|     field_goal_pct|three_pt_made|three_pt_attempt|three_pt_pct|free_throw_made|free_throw_attempt|free_throw_pct|OREB|DREB|REB|AST|STL|BLK|TOV| PF|plusminus|win|season_id|        id|           full_name|abbreviation|
+--------+-------------------+--------------+---------+--------------+----+----+----+-----------+---+---------------+------------------+-------------------+-------------+----------------+------------+---------------+------------------+--------------+----+----+---+---+---+---+---+---+--------

In [7]:
box_df.createOrReplaceTempView('box_df')

In [8]:
cummulatives_df = spark.sql("""
SELECT
        game_date,
        game_id,
        season_type,
        season_id,
        id AS team_id,
        full_name AS team_name,
        player_id,
        player,
        pts AS points_scored,
        (SUM(PTS) OVER(PARTITION BY season_id, player_id ORDER BY game_date, game_id ROWS UNBOUNDED PRECEDING)/COUNT(game_id) OVER(PARTITION BY season_id, player_id ORDER BY game_date, game_id ROWS UNBOUNDED PRECEDING)) AS cumavg_pts,
        (SUM(field_goal_made) OVER(PARTITION BY season_id, player_id ORDER BY game_date, game_id ROWS UNBOUNDED PRECEDING)/COUNT(field_goal_made) OVER(PARTITION BY season_id, player_id ORDER BY game_date, game_id ROWS UNBOUNDED PRECEDING)) AS cumavg_fg_made,
        (SUM(field_goal_attempt) OVER(PARTITION BY season_id, player_id ORDER BY game_date, game_id ROWS UNBOUNDED PRECEDING)/COUNT(field_goal_attempt) OVER(PARTITION BY season_id, player_id ORDER BY game_date, game_id ROWS UNBOUNDED PRECEDING)) AS cumavg_fg_attempt,
        (SUM(free_throw_made) OVER(PARTITION BY season_id, player_id ORDER BY game_date, game_id ROWS UNBOUNDED PRECEDING)/COUNT(free_throw_made) OVER(PARTITION BY season_id, player_id ORDER BY game_date, game_id ROWS UNBOUNDED PRECEDING)) AS cumavg_ft_made,
        (SUM(free_throw_attempt) OVER(PARTITION BY season_id, player_id ORDER BY game_date, game_id ROWS UNBOUNDED PRECEDING)/COUNT(free_throw_attempt) OVER(PARTITION BY season_id, player_id ORDER BY game_date, game_id ROWS UNBOUNDED PRECEDING)) AS cumavg_ft_attempt,
        (SUM(OREB) OVER(PARTITION BY season_id, player_id ORDER BY game_date, game_id ROWS UNBOUNDED PRECEDING)/COUNT(OREB) OVER(PARTITION BY season_id, player_id ORDER BY game_date, game_id ROWS UNBOUNDED PRECEDING)) AS cumavg_oreb,
        (SUM(DREB) OVER(PARTITION BY season_id, player_id ORDER BY game_date, game_id ROWS UNBOUNDED PRECEDING)/COUNT(DREB) OVER(PARTITION BY season_id, player_id ORDER BY game_date, game_id ROWS UNBOUNDED PRECEDING)) AS cumavg_dreb,
        (SUM(STL) OVER(PARTITION BY season_id, player_id ORDER BY game_date, game_id ROWS UNBOUNDED PRECEDING)/COUNT(STL) OVER(PARTITION BY season_id, player_id ORDER BY game_date, game_id ROWS UNBOUNDED PRECEDING)) AS cumavg_stl,
        (SUM(AST) OVER(PARTITION BY season_id, player_id ORDER BY game_date, game_id ROWS UNBOUNDED PRECEDING)/COUNT(AST) OVER(PARTITION BY season_id, player_id ORDER BY game_date, game_id ROWS UNBOUNDED PRECEDING)) AS cumavg_ast,
        (SUM(BLK) OVER(PARTITION BY season_id, player_id ORDER BY game_date, game_id ROWS UNBOUNDED PRECEDING)/COUNT(BLK) OVER(PARTITION BY season_id, player_id ORDER BY game_date, game_id ROWS UNBOUNDED PRECEDING)) AS cumavg_blk,
        (SUM(PF) OVER(PARTITION BY season_id, player_id ORDER BY game_date, game_id ROWS UNBOUNDED PRECEDING)/COUNT(PF) OVER(PARTITION BY season_id, player_id ORDER BY game_date, game_id ROWS UNBOUNDED PRECEDING)) AS cumavg_pf,
        (SUM(TOV) OVER(PARTITION BY season_id, player_id ORDER BY game_date, game_id ROWS UNBOUNDED PRECEDING)/COUNT(TOV) OVER(PARTITION BY season_id, player_id ORDER BY game_date, game_id ROWS UNBOUNDED PRECEDING)) AS cumavg_tov
    FROM
        box_df
""")

In [9]:
cummulatives_df.show()

[Stage 5:>                                                          (0 + 1) / 1]

+-------------------+--------+--------------+---------+----------+-------------+---------+-----------+-------------+------------------+-------------------+------------------+------------------+------------------+------------------+-------------------+--------------------+-------------------+-------------------+------------------+-------------------+
|          game_date| game_id|   season_type|season_id|   team_id|    team_name|player_id|     player|points_scored|        cumavg_pts|     cumavg_fg_made| cumavg_fg_attempt|    cumavg_ft_made| cumavg_ft_attempt|       cumavg_oreb|        cumavg_dreb|          cumavg_stl|         cumavg_ast|         cumavg_blk|         cumavg_pf|         cumavg_tov|
+-------------------+--------+--------------+---------+----------+-------------+---------+-----------+-------------+------------------+-------------------+------------------+------------------+------------------+------------------+-------------------+--------------------+-------------------+----

                                                                                

In [10]:
cummulatives_df = cummulatives_df \
    .withColumn('pts_prediction', \
                f.col('cumavg_pts') + \
                0.4*f.col('cumavg_fg_made') - \
                0.7*f.col('cumavg_fg_attempt') - \
                0.4*(f.col('cumavg_ft_attempt') - f.col('cumavg_ft_made')) + \
                0.7*f.col('cumavg_oreb') + \
                0.3*f.col('cumavg_dreb') + \
                f.col('cumavg_stl') + \
                0.7*f.col('cumavg_ast') + \
                0.7*f.col('cumavg_blk') + \
                0.4*f.col('cumavg_pf') - \
                f.col('cumavg_tov') \
               )

In [11]:
cummulatives_df.show()

[Stage 9:>                                                          (0 + 1) / 1]

+-------------------+--------+--------------+---------+----------+-------------+---------+-----------+-------------+------------------+-------------------+------------------+------------------+------------------+------------------+-------------------+--------------------+-------------------+-------------------+------------------+-------------------+------------------+
|          game_date| game_id|   season_type|season_id|   team_id|    team_name|player_id|     player|points_scored|        cumavg_pts|     cumavg_fg_made| cumavg_fg_attempt|    cumavg_ft_made| cumavg_ft_attempt|       cumavg_oreb|        cumavg_dreb|          cumavg_stl|         cumavg_ast|         cumavg_blk|         cumavg_pf|         cumavg_tov|    pts_prediction|
+-------------------+--------+--------------+---------+----------+-------------+---------+-----------+-------------+------------------+-------------------+------------------+------------------+------------------+------------------+-------------------+-------

                                                                                

In [13]:
cummulatives_df.write \
    .format('bigquery') \
    .option('temporaryGcsBucket', 'dataproc-temp-us-central1-385360674362-ioatwhvx') \
    .mode('overwrite') \
    .save('dez-nba-analytics.nba_database.player_points_prediction')

                                                                                