In [1]:
import pyspark
from pyspark.sql import SparkSession, types
from pyspark.conf import SparkConf
from pyspark.context import SparkContext
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, IntegerType, DateType, StructField, StringType, TimestampType
import pyspark.pandas as ps
import logging, traceback
import requests
import sys
import pandas as pd
from datetime import datetime, date
from dateutil.relativedelta import relativedelta
from dateutil.parser import parse



In [2]:
# This is to create a Local Cluster 
#spark = SparkSession.builder \
#        .master("spark://LAPTOP-C9HBU13M.:7077") \
#        .appName('test') \
#        .getOrCreate()

In [3]:
#Spark version testing for Local Cluster

#spark

In [4]:
pyspark.__file__

'/home/camiloms/spark/spark-3.3.3-bin-hadoop3/python/pyspark/__init__.py'

In [5]:
credentials_json ='/home/camiloms/nba_stats_de/airflow/.google/credentials/google_credentials.json'

conf = SparkConf() \
    .setMaster('local[*]') \
    .setAppName('test') \
    .set("spark.jars", "./lib/gcs-connector-hadoop3-2.2.5.jar") \
    .set("spark.hadoop.google.cloud.auth.service.account.enable", "true") \
    .set("spark.hadoop.google.cloud.auth.service.account.json.keyfile",credentials_json)

In [6]:
sc = SparkContext(conf=conf)

sc._jsc.hadoopConfiguration().set("fs.AbstractFileSystem.gs.impl",  "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")
sc._jsc.hadoopConfiguration().set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
sc._jsc.hadoopConfiguration().set("fs.gs.auth.service.account.json.keyfile", credentials_json)
sc._jsc.hadoopConfiguration().set("fs.gs.auth.service.account.enable", "true")

23/09/08 16:39:27 WARN Utils: Your hostname, LAPTOP-C9HBU13M resolves to a loopback address: 127.0.1.1; using 172.21.82.95 instead (on interface eth0)
23/09/08 16:39:27 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
23/09/08 16:39:30 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/09/08 16:39:35 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [7]:
spark = SparkSession.builder \
    .config(conf=sc.getConf()) \
    .getOrCreate()

In [8]:
df_current = spark.read.parquet('gs://import-bucket-camiloms10/raw/players_game_stats_current_year.parquet')

                                                                                

In [9]:
print(df_current.columns)

['SEASON_YEAR', 'PLAYER_ID', 'PLAYER_NAME', 'NICKNAME', 'TEAM_ID', 'TEAM_ABBREVIATION', 'TEAM_NAME', 'GAME_ID', 'GAME_DATE', 'MATCHUP', 'WL', 'MIN', 'FGM', 'FGA', 'FG_PCT', 'FG3M', 'FG3A', 'FG3_PCT', 'FTM', 'FTA', 'FT_PCT', 'OREB', 'DREB', 'REB', 'AST', 'TOV', 'STL', 'BLK', 'BLKA', 'PF', 'PFD', 'PTS', 'PLUS_MINUS', 'NBA_FANTASY_PTS', 'DD2', 'TD3', 'WNBA_FANTASY_PTS', 'GP_RANK', 'W_RANK', 'L_RANK', 'W_PCT_RANK', 'MIN_RANK', 'FGM_RANK', 'FGA_RANK', 'FG_PCT_RANK', 'FG3M_RANK', 'FG3A_RANK', 'FG3_PCT_RANK', 'FTM_RANK', 'FTA_RANK', 'FT_PCT_RANK', 'OREB_RANK', 'DREB_RANK', 'REB_RANK', 'AST_RANK', 'TOV_RANK', 'STL_RANK', 'BLK_RANK', 'BLKA_RANK', 'PF_RANK', 'PFD_RANK', 'PTS_RANK', 'PLUS_MINUS_RANK', 'NBA_FANTASY_PTS_RANK', 'DD2_RANK', 'TD3_RANK', 'WNBA_FANTASY_PTS_RANK', 'AVAILABLE_FLAG', '__index_level_0__']


In [None]:
df_current.show()

In [None]:
df_past = spark.read.parquet('gs://import-bucket-camiloms10/raw/players_game_stats_till_last_year.parquet')

In [None]:
df_past.show()

In [None]:
full_dataset = df_past.unionByName(df_current, allowMissingColumns=True)

In [None]:
full_dataset.show()

In [None]:
full_dataset.fillna(0)

In [None]:
full_dataset.createOrReplaceTempView("table_df")
spark.sql("""SELECT * FROM table_df ORDER BY GAME_DATE DESC limit 1""").show()

In [None]:
schema = types.StructType([
                         types.StructField("SEASON_ID", types.StringType(), True),
                         types.StructField("PLAYER_ID", types.IntegerType(), True),
                         types.StructField("PLAYER_NAME", types.StringType(), True),
                         types.StructField("NICKNAME", types. StringType(), True),
                         types.StructField("TEAM_ID", types.IntegerType(), True),
                         types.StructField("TEAM_ABBREVIATION", types.StringType(), True),
                         types.StructField("TEAM_NAME", types. StringType(), True),
                         types.StructField("GAME_ID", types.StringType(), True),
                         types.StructField("GAME_DATE", types.StringType(), True),
                         types.StructField("MATCHUP", types.StringType(), True),
                         types.StructField("WL", types.StringType(), True),
                         types.StructField("MIN", types.IntegerType(), True),
                         types.StructField("FGM", types.IntegerType(), True),
                         types.StructField("FGA", types.FloatType(), True),
                         types.StructField("FG_PCT", types.FloatType(), True),
                         types.StructField("FG3M", types.IntegerType(), True),
                         types.StructField("FG3A", types.IntegerType(), True),
                         types.StructField("FG3_PCT", types.FloatType(), True),
                         types.StructField("FTM", types.IntegerType(), True),
                         types.StructField("FTA", types. IntegerType(), True),
                         types.StructField("FT_PCT", types.FloatType(), True),
                         types.StructField("OREB", types.IntegerType(), True),
                         types.StructField("DREB", types.IntegerType(), True),
                         types.StructField("REB", types.IntegerType(), True),
                         types.StructField("AST", types.IntegerType(), True),
                         types.StructField("STL", types.IntegerType(), True),
                         types.StructField("BLK", types.IntegerType(), True),
                         types.StructField("TOV", types.IntegerType(), True),
                         types.StructField("PF", types.IntegerType(), True),
                         types.StructField("PTS", types.IntegerType(), True),
                         types.StructField("PLUS_MINUS", types.IntegerType(), True),
                         types.StructField("VIDEO_AVAILABLE", types.IntegerType(), True)
])