In [None]:
# Example: Fetching NBA players using nba_api (optional, not Spark)
from nba_api.stats.static import players
nba_players = players.get_active_players()
print(f"Number of players fetched: {len(nba_players)}")
nba_players[:5]

In [None]:
team_df = wr.s3.read_parquet('s3://bucketname/silver/ingest_date=2026-02/teamdata/run-.parquet', boto3_session=session)

pd.set_option('display.max_columns', None)

df = wr.s3.read_parquet('s3://bucketname/gold/ingest_date=2026-02/teamdata/season=2026/par.snappy.parquet', boto3_session=session)

print(df.columns)

In [None]:
s3_path = "s3a://bucketname/silver/ingest_date=2026-02/teamdata/run.parquet"

# Read the Parquet file directly into a Spark DataFrame
df = spark.read.parquet(s3_path)
df.show()

In [None]:
# Get unique team names
unique_teams = df.select('team').distinct().rdd.flatMap(lambda x: x).collect()
print(unique_teams)
# Get all players from team 2TM
team_2tm_players = df.filter(col('team') == '2TM')
team_2tm_players.show()

In [None]:
df.columns

In [None]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, regexp_extract, lit, array_contains
import os

s3_players_path = "s3://bucketname/gold/playerdata/season=2016/team=BOS/part.snappy.parquet"

# Read the Parquet file into a Spark DataFrame
# (Assumes AWS credentials are set in environment or Hadoop config)
df = pd.read.parquet(s3_players_path)

# ---- User parameters ----
season = 2025  # Set your season
season_df = df.filter(col('season') == season)


In [None]:
# ---- Identify Team Change Players ----
team_change_df = season_df.filter(col('team').isin(['2TM', '3TM', '4TM']))
normal_df = season_df.filter(~col('team').isin(['2TM', '3TM', '4TM']))

# ---- Extract Number of Teams ----
team_change_df = team_change_df.withColumn('team_count', regexp_extract(col('team'), r'(\d)', 1).cast('int'))
normal_df = normal_df.withColumn('team_count', lit(1))

# ---- Mark players with multiple teams ----
multi_team_player_ids = [row['player_id'] for row in team_change_df.select('player_id').distinct().collect()]
from pyspark.sql.functions import when
normal_df = normal_df.withColumn('changed_team', when(col('player_id').isin(multi_team_player_ids), 1).otherwise(0))

# ---- Drop 2TM, 3TM, 4TM rows ----
final_gold_df = normal_df

# ---- Filter Only Valid Teams ----
valid_teams = [
    'SAC','HOU','MIA','TOR','MEM','ATL','NOP','PHO','CLE','UTA','MIL',
    'NYK','POR','LAL','WAS','CHO','ORL','PHI','SAS','OKC','LAC','MIN',
    'BOS','IND','DEN','GSW','CHI','DAL','BRK','DET'
]
final_gold_df = final_gold_df.filter(col('team').isin(valid_teams + ['MULTI']))
#check final gold_df rows count > 0
final_gold_df.count()

In [None]:
final_gold_df.filter(col('player_id') == 'anderky01').show()

In [None]:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from pyspark.sql.functions import col, regexp_extract, lit, when
from awsglue.context import GlueContext
from awsglue.job import Job
from awsgluedq.transforms import EvaluateDataQuality
import re


args = getResolvedOptions(sys.argv, ['JOB_NAME','season','silver_path','gold_path'])

season = args["season"]
silver_path = args["silver_path"]
gold_path = args["gold_path"]


sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")
job.init(args['JOB_NAME'], args)


df = spark.read.parquet(silver_path)

from pyspark.sql.functions import col

# Assuming `df` is your Spark DataFrame
df = df.withColumn("w", col("w").cast("integer")) \
       .withColumn("n_rtg", col("n_rtg").cast("double")) \
       .withColumn("pace", col("pace").cast("double")) \
       .withColumn("playoffs", col("playoffs").cast("boolean")) \
       .withColumn("orb_percent", col("orb_percent").cast("double")) \
       .withColumn("f_tr", col("f_tr").cast("double")) \
       .withColumn("drb_percent", col("drb_percent").cast("double")) \
       .withColumn("opp_tov_percent", col("opp_tov_percent").cast("double")) \
       .withColumn("abbreviation", col("abbreviation").cast("string")) \
       .withColumn("ft_fga", col("ft_fga").cast("double")) \
       .withColumn("tov_percent", col("tov_percent").cast("double")) \
       .withColumn("o_rtg", col("o_rtg").cast("double")) \
       .withColumn("d_rtg", col("d_rtg").cast("double")) \
       .withColumn("age", col("age").cast("double")) \
       .withColumn("ts_percent", col("ts_percent").cast("double")) \
       .withColumn("arena", col("arena").cast("string")) \
       .withColumn("opp_e_fg_percent", col("opp_e_fg_percent").cast("double")) \
       .withColumn("team", col("team").cast("string")) \
       .withColumn("l", col("l").cast("integer")) \
       .withColumn("x3p_ar", col("x3p_ar").cast("double")) \
       .withColumn("e_fg_percent", col("e_fg_percent").cast("double")) \
       .withColumn("opp_ft_fga", col("opp_ft_fga").cast("double")) \
       .withColumn("ast_per_game", col("ast_per_game").cast("double")) \
       .withColumn("x3p_per_game", col("x3p_per_game").cast("double")) \
       .withColumn("orb_per_game", col("orb_per_game").cast("double")) \
       .withColumn("fta_per_game", col("fta_per_game").cast("double")) \
       .withColumn("trb_per_game", col("trb_per_game").cast("double")) \
       .withColumn("fg_percent", col("fg_percent").cast("double")) \
       .withColumn("x3pa_per_game", col("x3pa_per_game").cast("double")) \
       .withColumn("stl_per_game", col("stl_per_game").cast("double")) \
       .withColumn("ft_percent", col("ft_percent").cast("double")) \
       .withColumn("drb_per_game", col("drb_per_game").cast("double")) \
       .withColumn("mp_per_game", col("mp_per_game").cast("double")) \
       .withColumn("g", col("g").cast("integer")) \
       .withColumn("x2pa_per_game", col("x2pa_per_game").cast("double")) \
       .withColumn("x3p_percent", col("x3p_percent").cast("double")) \
       .withColumn("pts_per_game", col("pts_per_game").cast("double")) \
       .withColumn("ft_per_game", col("ft_per_game").cast("double")) \
       .withColumn("tov_per_game", col("tov_per_game").cast("double")) \
       .withColumn("pf_per_game", col("pf_per_game").cast("double")) \
       .withColumn("fg_per_game", col("fg_per_game").cast("double")) \
       .withColumn("blk_per_game", col("blk_per_game").cast("double")) \
       .withColumn("x2p_percent", col("x2p_percent").cast("double")) \
       .withColumn("fga_per_game", col("fga_per_game").cast("double")) \
       .withColumn("x2p_per_game", col("x2p_per_game").cast("double"))


final_gold_df = df[df['season']==season]





row_count = final_gold_df.count()
if row_count <= 0:
    raise ValueError("Sanity check failed: final_gold_df is empty!")
else:
    print(f"Sanity check passed: final_gold_df has {row_count} rows.")

final_gold_df.write.mode("overwrite").partitionBy("season").parquet(gold_path)

job.commit()