In [47]:
import requests
import os
import psycopg2

In [48]:
#Estadisticas de NBA
test_url = "https://stats.nba.com/stats/leagueLeaders?LeagueID=00&PerMode=PerGame&Scope=S&Season=2022-23&SeasonType=Playoffs&StatCategory=PTS"
r =requests.get(url=test_url).json()


In [49]:
from pyspark.sql import SparkSession
#from pyspark.sql.functions import when, lit, col

# Postgres and Redshift JDBCs
driver_path = "/home/coder/working_dir/driver_jdbc/postgresql-42.2.27.jre7.jar"

os.environ['PYSPARK_SUBMIT_ARGS'] = f'--driver-class-path {driver_path} --jars {driver_path} pyspark-shell'
os.environ['SPARK_CLASSPATH'] = driver_path

# Create SparkSession 
spark = SparkSession.builder \
        .master("local") \
        .appName("Conexion entre Pyspark y Redshift") \
        .config("spark.jars", driver_path) \
        .config("spark.executor.extraClassPath", driver_path) \
        .getOrCreate()

In [50]:
#Dar formato a la informacion obtenida
Columnas = r['resultSet']["headers"]
Tabla = spark.createDataFrame(data = r['resultSet']["rowSet"],schema=Columnas) 

In [51]:
Tabla.printSchema()
Tabla.show()

root
 |-- PLAYER_ID: long (nullable = true)
 |-- RANK: long (nullable = true)
 |-- PLAYER: string (nullable = true)
 |-- TEAM_ID: long (nullable = true)
 |-- TEAM: string (nullable = true)
 |-- GP: long (nullable = true)
 |-- MIN: double (nullable = true)
 |-- FGM: double (nullable = true)
 |-- FGA: double (nullable = true)
 |-- FG_PCT: double (nullable = true)
 |-- FG3M: double (nullable = true)
 |-- FG3A: double (nullable = true)
 |-- FG3_PCT: double (nullable = true)
 |-- FTM: double (nullable = true)
 |-- FTA: double (nullable = true)
 |-- FT_PCT: double (nullable = true)
 |-- OREB: double (nullable = true)
 |-- DREB: double (nullable = true)
 |-- REB: double (nullable = true)
 |-- AST: double (nullable = true)
 |-- STL: double (nullable = true)
 |-- BLK: double (nullable = true)
 |-- TOV: double (nullable = true)
 |-- PTS: double (nullable = true)
 |-- EFF: double (nullable = true)

+---------+----+-----------------+----------+----+---+----+----+----+------+----+----+-------+---+-

In [52]:
#Hago chequeo de que no haya jugadores duplicados en la tabla
Tabla.count()

144

In [53]:
TablacheckC = Tabla
TablacheckC.dropDuplicates(['PLAYER'])
TablacheckC.count()

144

In [54]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

#Genero 2 variables nuevas
w = Window.partitionBy('TEAM')
Tabla = Tabla.withColumn('TEAM_PLAYERS', F.count('TEAM').over(w))
Tabla = Tabla.withColumn('SUM_GP_TEAM', F.sum('GP').over(w))
#Tabla.show()

+---------+----+-------------------+----------+----+---+----+----+----+------+----+----+-------+---+---+------+----+----+----+----+---+---+---+----+----+------------+-----------+
|PLAYER_ID|RANK|             PLAYER|   TEAM_ID|TEAM| GP| MIN| FGM| FGA|FG_PCT|FG3M|FG3A|FG3_PCT|FTM|FTA|FT_PCT|OREB|DREB| REB| AST|STL|BLK|TOV| PTS| EFF|TEAM_PLAYERS|SUM_GP_TEAM|
+---------+----+-------------------+----------+----+---+----+----+----+------+----+----+-------+---+---+------+----+----+----+----+---+---+---+----+----+------------+-----------+
|  1629027|   5|         Trae Young|1610612737| ATL|  6|38.3|10.0|24.8| 0.403| 3.0| 9.0|  0.333|6.2|7.2|  0.86| 0.8| 2.8| 3.7|10.2|1.7|0.7|4.0|29.2|25.5|           9|         53|
|  1627749|  20|    Dejounte Murray|1610612737| ATL|  5|38.0| 9.2|20.6| 0.447| 2.8| 7.4|  0.378|1.8|1.8|   1.0| 1.8| 5.4| 7.2| 6.8|2.0|0.2|2.0|23.0|25.8|           9|         53|
|  1629631|  38|    De'Andre Hunter|1610612737| ATL|  6|37.3| 6.5|14.2| 0.459| 2.3| 6.3|  0.368|1.3|1.7| 

In [55]:
#Saco apostrofes de los nombres para que no genere conflicto
from pyspark.sql.functions import regexp_replace,col

Tabla = Tabla.withColumn('PLAYER', regexp_replace('PLAYER',"'",""))
#Tabla.show()

In [56]:
#Selecciono columnas y los primeros 20 registros
Tablaresumen = Tabla.select('PLAYER_ID', 'RANK','PLAYER', 'TEAM_ID', 'TEAM', 'GP', 'REB', 'AST', 'STL', 'BLK','TEAM_PLAYERS','SUM_GP_TEAM')
Tablaresumen = Tablaresumen.sort("RANK")
Top20 = Tablaresumen[Tablaresumen["RANK"]<20]
Top20.show()

+---------+----+-----------------+----------+----+---+----+----+---+---+------------+-----------+
|PLAYER_ID|RANK|           PLAYER|   TEAM_ID|TEAM| GP| REB| AST|STL|BLK|TEAM_PLAYERS|SUM_GP_TEAM|
+---------+----+-----------------+----------+----+---+----+----+---+---+------------+-----------+
|  1626164|   1|     Devin Booker|1610612756| PHX| 11| 4.8| 7.2|1.7|0.8|           8|         79|
|  1630162|   2|  Anthony Edwards|1610612750| MIN|  5| 5.0| 5.2|1.8|2.0|           9|         43|
|   201939|   3|    Stephen Curry|1610612744| GSW| 13| 5.2| 6.1|1.0|0.5|          10|        124|
|   203999|   4|     Nikola Jokic|1610612743| DEN| 20|13.5| 9.5|1.1|1.0|           8|        159|
|  1629027|   5|       Trae Young|1610612737| ATL|  6| 3.7|10.2|1.7|0.7|           9|         53|
|   201142|   6|     Kevin Durant|1610612756| PHX| 11| 8.7| 5.5|0.8|1.4|           8|         79|
|  1628973|   7|    Jalen Brunson|1610612752| NYK| 11| 4.9| 5.6|1.5|0.1|          10|        101|
|  1628368|   8|    

In [43]:
env = os.environ

In [45]:
conn = psycopg2.connect(
    host=env['AWS_REDSHIFT_HOST'],
    port=env['AWS_REDSHIFT_PORT'],
    dbname=env['AWS_REDSHIFT_DBNAME'],
    user=env['AWS_REDSHIFT_USER'],
    password=env['AWS_REDSHIFT_PASSWORD']
)

In [46]:
cursor = conn.cursor()
cursor.execute(f"""
create table if not exists {env['AWS_REDSHIFT_SCHEMA']}.NBAstat2 (
    PLAYER_ID float distkey, 
    RANK float, PLAYER varchar,
    TEAM_ID float ,
    TEAM varchar,
    GP float,
    REB float,
    AST float,
    STL float,
    BLK float,
    TEAM_PLAYERS float,
     SUM_GP_TEAM float
    ) sortkey(TEAM);
""")
conn.commit()
cursor.close()
print("Table created!")

Table created!


In [47]:
Top20.write \
    .format("jdbc") \
    .option("url", f"jdbc:postgresql://{env['AWS_REDSHIFT_HOST']}:{env['AWS_REDSHIFT_PORT']}/{env['AWS_REDSHIFT_DBNAME']}") \
    .option("dbtable", f"{env['AWS_REDSHIFT_SCHEMA']}.NBAstat2") \
    .option("user", env['AWS_REDSHIFT_USER']) \
    .option("password", env['AWS_REDSHIFT_PASSWORD']) \
    .option("driver", "org.postgresql.Driver") \
    .mode("overwrite") \
    .save()