# 🏆 Chess Analysis - Data Preparation Pipeline

## Notebook Overview
This notebook performs the heavy data transformations required for our chess analysis. It takes the clean, game-level Parquet data and creates two specialized, analysis-ready assets:

1.  **Move-Level Checkpoint**: A detailed dataset where every single move from every game is its own row.
2.  **Player Rating Summary**: A summary CSV file with the average Blitz, Bullet, and Classical rating for every unique player.

These assets will serve as fast, efficient inputs for our final visualization notebook.

---
### **Part 1: Setup and Initialization**
This section imports all necessary libraries and initializes the Spark Session, which is the entry point for our data processing.

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, regexp_replace, trim, split, explode, regexp_extract, when, substring, avg, round
import os
import shutil
import pandas as pd

In [2]:
spark = SparkSession.builder \
    .appName("ChessAnalysisPipeline") \
    .master("local[*]") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()

processed_path = "/home/yash/chess_project/processed/chess_games_parquet"
df_games = spark.read.parquet(processed_path)
print("--- Game-level data loaded successfully. ---")

---
### **Part 2: Create Move-Level Checkpoint**

This is a heavy transformation. We will process the `Moves` column for every game, explode it into a massive DataFrame with one row per move, and save the result as an efficient Parquet checkpoint. This allows our final visualizations to run in seconds instead of minutes.

#### **Step 2.1: Define All Transformations**
All data manipulations are defined in a single, chained command. Spark's lazy evaluation will optimize this entire chain for efficient execution.

In [3]:
df_per_move = df_games \
    .withColumn("CleanedMoves", regexp_replace(col("Moves"), r"\{.*?\}\s*", "")) \
    .withColumn("CleanedMoves", regexp_replace(col("CleanedMoves"), r"\d+\.+\s*", "")) \
    .withColumn("CleanedMoves", regexp_replace(col("CleanedMoves"), r"\s*(1-0|0-1|1/2-1/2)$", "")) \
    .withColumn("CleanedMoves", regexp_replace(col("CleanedMoves"), r"\s+", " ")) \
    .withColumn("CleanedMoves", trim(col("CleanedMoves"))) \
    .withColumn("MovesArray", split(col("CleanedMoves"), " ")) \
    .withColumn("Move", explode("MovesArray")) \
    .withColumn("ToSquare", regexp_extract(col("Move"), r"([a-h][1-8])", 1)) \
    .withColumn("Piece",
        when(col("Move").like("O-O%"), "K")
        .when(col("Move").rlike("^[KQRBN]"), substring(col("Move"), 1, 1))
        .when(col("Move").rlike("^[a-h]"), "P")
        .otherwise("Unknown")
    ) \
    .filter(col("ToSquare") != "")

print("--- All transformations defined. ---")

#### **Step 2.2: Save the Checkpoint**
To avoid memory errors on this large dataset, we write the checkpoint one partition at a time.

In [4]:
print("--- Creating the lean checkpoint... ---")
lean_df = df_per_move.select(
    "EventType", "WhiteElo", "BlackElo",
    "Move", "Piece", "ToSquare"
)

checkpoint_path = "processed/final_moves_checkpoint.parquet"
if os.path.exists(checkpoint_path):
    shutil.rmtree(checkpoint_path)

event_types = [row.EventType for row in lean_df.select("EventType").distinct().collect()]

for event in event_types:
    print(f"Writing checkpoint for EventType: {event}")
    df_one_event = lean_df.filter(col("EventType") == event)
    partition_path = os.path.join(checkpoint_path, f"EventType={event}")
    df_one_event.drop("EventType").write.mode("overwrite").parquet(partition_path)

with open(os.path.join(checkpoint_path, "_SUCCESS"), 'w') as f: pass
print(f"--- Checkpoint saved successfully to {checkpoint_path} ---")

#### **Step 2.3: Quick Verification (Optional)**
As a final check, we can perform a quick aggregation on the transformed data to ensure it's correct.

In [5]:
print("\n--- Aggregating move counts for heatmap... ---")
heatmap_data_pd = df_per_move \
    .groupBy("ToSquare", "Piece") \
    .count() \
    .toPandas()

print("--- Aggregation complete! ---")
print("Top 20 most frequent destinations:")
print(heatmap_data_pd.sort_values(by="count", ascending=False).head(20))

---
### **Part 3: Create Player Rating Summary**
This section performs another heavy aggregation. It processes all games to calculate the average Blitz, Bullet, and Classical rating for every unique player, saving the final summary to a single, portable CSV file.

In [None]:
print("--- Aggregating ratings for each player... (This may take several minutes) ---")
player_ratings_df = df_games.select(col("White").alias("Player"), col("EventType"), col("WhiteElo").alias("Rating")) \
    .union(df_games.select(col("Black").alias("Player"), col("EventType"), col("BlackElo").alias("Rating")))

player_ratings_filtered = player_ratings_df.filter(col("EventType").isin(["blitz", "bullet", "classical"]))

player_ratings_wide = player_ratings_filtered.groupBy("Player").pivot("EventType", ["blitz", "bullet", "classical"]).agg(avg("Rating"))

player_ratings_cleaned = player_ratings_wide \
    .withColumn("blitz_rating", round(col("blitz"), 0)) \
    .withColumn("bullet_rating", round(col("bullet"), 0)) \
    .withColumn("classical_rating", round(col("classical"), 0)) \
    .select("Player", "blitz_rating", "bullet_rating", "classical_rating") \
    .na.drop(how="all", subset=["blitz_rating", "bullet_rating", "classical_rating"])

player_ratings_cleaned.coalesce(1).write.option("header", "true").mode("overwrite").csv("player_analysis_results/player_ratings.csv")

print("--- Player ratings CSV saved successfully! ---")