In [0]:
%python
import importlib.util
if importlib.util.find_spec("nba_api") is None:
    %pip install nba_api
from find_games import get_games_by_date
from etl_pipeline.nba_api_connector.get_game import Game
from pyspark.sql import functions as F
from time import sleep
from etl_pipeline.utilities.schemas import Schema

In [0]:
catalog="nba"
source_schema = "source"
date="2025-12-05"
volume_name = "games"

API_KEY = dbutils.secrets.get(
    scope="nba_secrets",
    key="balldontlie_api_key"
)
from datetime import datetime, timedelta

start_date = "2025-10-01"
end_date = "2025-12-06"
list_dates_between = [
    (datetime.strptime(start_date, "%Y-%m-%d") + timedelta(days=i)).strftime("%Y-%m-%d")
    for i in range((datetime.strptime(end_date, "%Y-%m-%d") - datetime.strptime(start_date, "%Y-%m-%d")).days + 1)
]

games_list = []
# for i, date in enumerate(list_dates_between):
#     games_list.extend(get_games_by_date(date, API_KEY))
#     if (i + 1) % 5 == 0:
#          sleep(61)
# from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType
games_list = get_games_by_date(date, API_KEY)
games = spark.createDataFrame(games_list, Schema.games())

In [0]:
spark.sql(f"CREATE VOLUME IF NOT EXISTS {catalog}.{source_schema}.{volume_name}")
volume_path = f"/Volumes/{catalog}/{source_schema}/{volume_name}/games.parquet"
games.write.mode("overwrite").format("parquet").partitionBy("date").save(volume_path)

In [0]:
games_fixed = games.withColumn(
    "game_id",
    F.concat(
        F.lit("00"),
        (F.col("game_id").cast("int") + F.lit(4052978)).cast("string")
    )
)
game_full_results=[]
for row in games_fixed.select("game_id").collect():
    game_id = str(row["game_id"])
    try:
        game = Game(game_id)
        game_full_results.append(game)
    except Exception as e:
        print(f"Boxscore not available for game_id {game_id}: {e}")

In [0]:
game_infos = [game.game_info() for game in game_full_results]
game_results = spark.createDataFrame(game_infos, schema=Schema.boxscores())
game_results = game_results.withColumn("date_day", F.date_format(F.date_sub(F.col("date"), 1), "yyyy-MM-dd"))
volume_name="game_boxscores"
spark.sql(f"CREATE VOLUME IF NOT EXISTS {catalog}.{source_schema}.{volume_name}")
volume_path = f"/Volumes/{catalog}/{source_schema}/{volume_name}/game_boxscore.parquet"
game_results.write.mode("overwrite").format("parquet").partitionBy("date_day").save(volume_path)

In [0]:
all_officials = []
for game in game_full_results:
    all_officials.extend(game.get_officials())
officials_df = spark.createDataFrame(all_officials, schema=Schema.officials())
volume_name="game_officials"
spark.sql(f"CREATE VOLUME IF NOT EXISTS {catalog}.{source_schema}.{volume_name}")
volume_path = f"/Volumes/{catalog}/{source_schema}/{volume_name}/game_officials.parquet"
officials_df.write.mode("overwrite").format("parquet").partitionBy("game_id").save(volume_path)

In [0]:
all_stats = []
for game in game_full_results:
    home_stats, away_stats = game.get_team_stats()
    all_stats.append([
        [game.home_team_id, game.away_team_id ,k, str(v), 'True', game.game_id] for k, v in home_stats.items()
    ])
    all_stats.append([
        [game.away_team_id, game.home_team_id ,k, str(v), 'False', game.game_id] for k, v in away_stats.items()
    ])

game_stats_df = spark.createDataFrame([item for sublist in all_stats for item in sublist], schema=Schema.team_stats())
volume_name="game_team_stats"
spark.sql(f"CREATE VOLUME IF NOT EXISTS {catalog}.{source_schema}.{volume_name}")
volume_path = f"/Volumes/{catalog}/{source_schema}/{volume_name}/game_team_stats.parquet"
game_stats_df.write.mode("overwrite").format("parquet").partitionBy("game_id").save(volume_path)
display(game_stats_df)

In [0]:
players = []
player_stats=[]
for game in game_full_results:
    home_players, away_players = game.get_players()
    for player in home_players:
        player_dict = {
            "game_id": game.game_id,
            "team_id": game.home_team_id,
            "played_at_home": "True",
            "against_team_id": game.away_team_id,
            "status": str(player.get("status")),
            "order": str(player.get("order")),
            "player_id": str(player.get("personId")),
            "jersey_num": str(player.get("jerseyNum")),
            "position": str(player.get("position")),
            "starter": str(player.get("starter")),
            "oncourt": str(player.get("oncourt")),
            "played": str(player.get("played")),
            "name": str(player.get("name")),
            "name_i": str(player.get("nameI")),
            "first_name": str(player.get("firstName")),
            "family_name": str(player.get("familyName"))
        }
        players.append(player_dict)
        player_stats.extend([game.game_id, game.home_team_id, player.get("personId"),k, str(v)] for k, v in player.get("statistics").items())
    for player in away_players:
        player_dict = {
            "game_id": game.game_id,
            "team_id": game.away_team_id,
            "played_at_home": "False",
            "against_team_id": game.home_team_id,
            "status": str(player.get("status")),
            "order": str(player.get("order")),
            "player_id": str(player.get("personId")),
            "jersey_num": str(player.get("jerseyNum")),
            "position": str(player.get("position")),
            "starter": str(player.get("starter")),
            "oncourt": str(player.get("oncourt")),
            "played": str(player.get("played")),
            "name": str(player.get("name")),
            "name_i": str(player.get("nameI")),
            "first_name": str(player.get("firstName")),
            "family_name": str(player.get("familyName"))
            }
        players.append(player_dict)
        player_stats.extend([game.game_id, game.away_team_id, player.get("personId"),k, str(v)] for k, v in player.get("statistics").items())
player_df = spark.createDataFrame(players, schema=Schema.players())
volume_name="game_players"
spark.sql(f"CREATE VOLUME IF NOT EXISTS {catalog}.{source_schema}.{volume_name}")
volume_path = f"/Volumes/{catalog}/{source_schema}/{volume_name}/game_players.parquet"
player_df.write.mode("overwrite").format("parquet").partitionBy(["game_id", "team_id"]).save(volume_path)

player_stats_df = spark.createDataFrame(player_stats, schema=Schema.player_stats())
volume_name="game_player_stats"
spark.sql(f"CREATE VOLUME IF NOT EXISTS {catalog}.{source_schema}.{volume_name}")
volume_path = f"/Volumes/{catalog}/{source_schema}/{volume_name}/player_stats.parquet"
player_stats_df.write.mode("overwrite").format("parquet").partitionBy(["game_id", "team_id","player_id"]).save(volume_path)
display(player_stats_df)