In [13]:
import chess
import chess.pgn
import pyarrow as pa
import pyarrow.parquet as pq
import io
from tqdm.notebook import tqdm
import os
import matplotlib.pyplot as plt
%matplotlib inline

In [7]:
import requests
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
from datetime import datetime
from dateutil.relativedelta import relativedelta
import os

# Set your Chess.com username
username = "alexulanch"

# Set date range (March 2025 to July 2021)
start_date = datetime(2025, 3, 1)
end_date = datetime(2021, 7, 1)

# Output PGN file (single merged file)
output_file = f"data/chesscom_{username}_games_{end_date.year}-{start_date.year}.pgn"

# User-Agent header to prevent Chess.com from blocking requests
headers = {
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
}

# Function to fetch PGN data
def fetch_pgn(year, month, max_retries=3):
    formatted_month = f"{month:02d}"  # Ensure two-digit month format
    url = f"https://api.chess.com/pub/player/{username}/games/{year}/{formatted_month}/pgn"

    for attempt in range(max_retries):
        try:
            response = requests.get(url, headers=headers)
            if response.status_code == 200 and response.text.strip():
                print(f"✅ Nice! You played some games in {year}-{formatted_month}. Adding them to the archive! 📂♟️")
                return f"\n\n; Games from {year}-{formatted_month}\n" + response.text
            elif response.status_code == 200 and not response.text.strip():
                print(f"🤔 Hmm.. looks like you didn't play in {year}-{formatted_month}. Were you on vacation? 🏖️")
                return None  # No games for this month
            else:
                raise requests.exceptions.RequestException(f"Unexpected status: {response.status_code}")
        except requests.exceptions.RequestException as e:
            time.sleep(2)  # Wait before retrying
            if attempt == max_retries - 1:
                print(f"❌ Failed after {max_retries} attempts: {year}-{formatted_month} → {e}")
                return None

# Generate all year-month pairs in reverse order
date_list = []
current_date = start_date
while current_date >= end_date:
    date_list.append((current_date.year, current_date.month))
    current_date -= relativedelta(months=1)

# Fetch PGN files in parallel and merge them into one file
print(f"🚀 Fetching and merging Chess.com PGNs for {username} from {start_date.year}-{start_date.month} to {end_date.year}-{end_date.month}")

merged_pgns = []
with ThreadPoolExecutor(max_workers=5) as executor:  # Adjust workers based on rate limit
    future_to_date = {executor.submit(fetch_pgn, year, month): (year, month) for year, month in date_list}

    for future in as_completed(future_to_date):
        pgn_data = future.result()
        if pgn_data:
            merged_pgns.append(pgn_data)

# Save all merged PGNs into a single file
with open(output_file, "w", encoding="utf-8") as outfile:
    outfile.write("\n".join(merged_pgns))

print(f"\n🎉 Done! All games are saved in '{output_file}'.")

🚀 Fetching and merging Chess.com PGNs for alexulanch from 2025-3 to 2021-7
✅ Nice! You played some games in 2025-03. Adding them to the archive! 📂♟️
✅ Nice! You played some games in 2025-02. Adding them to the archive! 📂♟️
✅ Nice! You played some games in 2024-11. Adding them to the archive! 📂♟️
✅ Nice! You played some games in 2025-01. Adding them to the archive! 📂♟️
✅ Nice! You played some games in 2024-12. Adding them to the archive! 📂♟️
✅ Nice! You played some games in 2024-10. Adding them to the archive! 📂♟️
✅ Nice! You played some games in 2024-09. Adding them to the archive! 📂♟️
✅ Nice! You played some games in 2024-07. Adding them to the archive! 📂♟️
✅ Nice! You played some games in 2024-08. Adding them to the archive! 📂♟️
✅ Nice! You played some games in 2024-06. Adding them to the archive! 📂♟️
✅ Nice! You played some games in 2024-04. Adding them to the archive! 📂♟️
✅ Nice! You played some games in 2024-03. Adding them to the archive! 📂♟️
✅ Nice! You played some games in 2024

In [16]:
import os
import chess.pgn
import pyarrow as pa
import pyarrow.parquet as pq

def process_pgn_to_parquet(pgn_file_path, output_parquet_path, max_half_moves_per_game=None):
    """
    Process a PGN file, extract metadata and moves, convert all games to FENs, and write to a Parquet file.

    Args:
        pgn_file_path (str): Path to the PGN file
        output_parquet_path (str): Path for the output Parquet file
        max_half_moves_per_game (int, optional): Limit the number of half-moves per game

    Returns:
        dict: Summary of total games and positions processed
    """
    # Define schema including metadata fields
    schema = pa.schema([
        pa.field('game_id', pa.string()),
        pa.field('event', pa.string()),
        pa.field('site', pa.string()),
        pa.field('date', pa.string()),
        pa.field('round', pa.string()),
        pa.field('white_player', pa.string()),
        pa.field('black_player', pa.string()),
        pa.field('white_elo', pa.int32()),
        pa.field('black_elo', pa.int32()),
        pa.field('result', pa.string()),
        pa.field('time_control', pa.string()),
        pa.field('termination', pa.string()),
        pa.field('eco', pa.string()),
        pa.field('eco_url', pa.string()),
        pa.field('start_time', pa.string()),
        pa.field('end_time', pa.string()),
        pa.field('game_link', pa.string()),
        pa.field('move_number', pa.int32()),
        pa.field('half_move', pa.int32()),
        pa.field('fen', pa.string()),
        pa.field('move_san', pa.string()),
    ])

    # Storage for collected data
    data = {field.name: [] for field in schema}

    total_games = 0
    total_positions = 0

    # Read PGN file
    with open(pgn_file_path, 'r') as pgn_file:
        game_id = 0
        
        while True:
            game = chess.pgn.read_game(pgn_file)
            if game is None:
                break  # End of file

            game_id += 1
            total_games += 1

            # Extract metadata with safe defaults
            get_meta = lambda key, default="Unknown": game.headers.get(key, default)
            white_elo = int(get_meta("WhiteElo", "0")) if get_meta("WhiteElo", "0").isdigit() else None
            black_elo = int(get_meta("BlackElo", "0")) if get_meta("BlackElo", "0").isdigit() else None

            # Store metadata
            metadata = {
                "game_id": str(game_id),
                "event": get_meta("Event"),
                "site": get_meta("Site"),
                "date": get_meta("Date"),
                "round": get_meta("Round"),
                "white_player": get_meta("White"),
                "black_player": get_meta("Black"),
                "white_elo": white_elo,
                "black_elo": black_elo,
                "result": get_meta("Result"),
                "time_control": get_meta("TimeControl"),
                "termination": get_meta("Termination"),
                "eco": get_meta("ECO"),
                "eco_url": get_meta("ECOUrl"),
                "start_time": get_meta("StartTime"),
                "end_time": get_meta("EndTime"),
                "game_link": get_meta("Link"),
            }

            # Process moves
            board = game.board()
            half_move = 0

            # Store initial position
            for key, value in metadata.items():
                data[key].append(value)
            data["move_number"].append(0)
            data["half_move"].append(half_move)
            data["fen"].append(board.fen())
            data["move_san"].append("")  # No move yet
            total_positions += 1

            # Process each move
            mainline_moves = list(game.mainline_moves())

            if max_half_moves_per_game:
                mainline_moves = mainline_moves[:max_half_moves_per_game]

            for move in mainline_moves:
                san = board.san(move)
                board.push(move)
                half_move += 1
                move_number = (half_move + 1) // 2

                for key, value in metadata.items():
                    data[key].append(value)
                data["move_number"].append(move_number)
                data["half_move"].append(half_move)
                data["fen"].append(board.fen())
                data["move_san"].append(san)
                total_positions += 1

    # Convert collected data to PyArrow Table and write to Parquet
    table = pa.Table.from_pydict(data, schema=schema)
    pq.write_table(table, output_parquet_path)

    print(f"\n🎉 Processing complete! Total games: {total_games}, Total positions: {total_positions}")
    print(f"✅ Output written to: {output_parquet_path}")

    return {
        "total_games": total_games,
        "total_positions": total_positions,
        "output_file": output_parquet_path
    }

In [18]:
pgn_file_path = f"data/chesscom_{username}_games_{end_date.year}-{start_date.year}.pgn"
output_parquet_path = f"data/chesscom_{username}_games_{end_date.year}-{start_date.year}.parquet"  # Output file path

result = process_pgn_to_parquet(
    pgn_file_path=pgn_file_path,
    output_parquet_path=output_parquet_path,
)

# Check some statistics about the processing
print(f"Processed {result['total_games']} games")
print(f"Extracted {result['total_positions']} FEN positions")


🎉 Processing complete! Total games: 28847, Total positions: 2004808
✅ Output written to: data/chesscom_alexulanch_games_2021-2025.parquet
Processed 28847 games
Extracted 2004808 FEN positions


In [26]:
# Read Parquet file
table = pq.read_table(output_parquet_path)
df = table.to_pandas()

# Display first few rows
print(df.columns)

Index(['game_id', 'event', 'site', 'date', 'round', 'white_player',
       'black_player', 'white_elo', 'black_elo', 'result', 'time_control',
       'termination', 'eco', 'eco_url', 'start_time', 'end_time', 'game_link',
       'move_number', 'half_move', 'fen', 'move_san'],
      dtype='object')
