In [0]:
from pyspark.sql.types import StructType, StructField, StringType, LongType, BooleanType
from pyspark.sql.functions import lit
import requests

# Define the nested schemas first
accuracies_schema = StructType([
    StructField("white", StringType(), True),
    StructField("black", StringType(), True)
])

player_schema = StructType([
    StructField("rating", LongType(), True),
    StructField("result", StringType(), True),
    StructField("@id", StringType(), True),
    StructField("username", StringType(), True),
    StructField("uuid", StringType(), True)
])

# Define main schema
schema = StructType([
    StructField("url", StringType(), True),
    StructField("pgn", StringType(), True),
    StructField("time_control", StringType(), True),
    StructField("end_time", LongType(), True),
    StructField("rated", BooleanType(), True),
    StructField("accuracies", accuracies_schema, True),
    StructField("tcn", StringType(), True),
    StructField("uuid", StringType(), True),
    StructField("initial_setup", StringType(), True),
    StructField("fen", StringType(), True),
    StructField("start_time", LongType(), True),
    StructField("time_class", StringType(), True),
    StructField("rules", StringType(), True),
    StructField("white", player_schema, True),
    StructField("black", player_schema, True),
    StructField("eco", StringType(), True),
    StructField("username", StringType(), True)
])

url = "https://api.chess.com/pub/leaderboards"
response = requests.get(url, headers={"User-Agent": "mychessdatapipeline/1.0"})
format_list = list(response.json().keys())

players_list = []
for form in format_list:
    for i in range(len(response.json().get(form))):
        user = response.json().get(form)[i].get('username')
        players_list.append(user.lower())

# Deduplicate usernames
players_list_deduped = list(set(players_list))


# Create DataFrame with enforced schema
df_main = spark.createDataFrame([], schema=schema)
date = "2025/05"
for username in players_list_deduped[:15]:
    url = f"https://api.chess.com/pub/player/{username}/games/{date}"
    games_response = requests.get(url, headers={"User-Agent": "mychessdatapipeline/1.0"})
    df_temp = spark.createDataFrame(games_response.json()["games"], schema=schema).withColumn("username", lit(username))
    df_main = df_main.union(df_temp)

In [0]:
df_main.write.mode("overwrite").format("parquet").saveAsTable("dbt_demo.chess_games")