# Purpose:
To obtain data on lichess games, sort and organize by username, sanitize and sort.

In [2]:
from typing import TypedDict, Optional

In [None]:
import duckdb

con = duckdb.connect()

# point to the actual parquet file
df = con.execute(
    """
    SELECT 
      COUNT(*) AS total_rows,
    FROM '/Users/a/Documents/personalprojects/chess-opening-recommender/data/raw/train-00000-of-00072.parquet'
    LIMIT 60
"""
).df()

# Print the first 20 rows instead of the default 5
print(df.head(20))

# Display the total number of rows in the result
print(f"\nTotal rows in result: {len(df)}")

              White          Black Result
0        Panchito0O    PauloPeru78    1-0
1        igloknight     atacan3131    0-1
2    draughts2chess       xhoxhi64    1-0
3     GodofPastries      Mickey187    1-0
4         elprimo27      knocikIII    0-1
5   gustavstromberg         kaddy3    1-0
6           rennbj4     XXXZeusXXX    1-0
7    diegocrafter44       espenono    1-0
8      Not-Magnus-C      Yala_Baba    0-1
9     Nosferatu_rrr       Perinell    1-0
10           so_nyc     Hansiate88    0-1
11        Alpenyeti    Mike_Brewer    1-0
12       NASSER1010  AurelianPetru    1-0
13         Elmir013   Lucas_Falcao    1-0
14       birdmanow9    bernardo_sr    1-0
15        DrAmrkoya    denisdorcol    1-0
16         Pazividi      aldair_sh    1-0
17            mrral   zagorbonelli    0-1
18          Qvintuz     nemnemsoha    0-1
19   PalestineViper        faleth7    1-0

Total rows in result: 60


In [8]:
import chess.pgn
import zstandard as zstd
import io

# Path to the compressed PGN file
pgn_path = "/Users/a/Documents/personalprojects/chess-opening-recommender/data/raw/lichess_db_standard_rated_2025-07.pgn.zst"

# Open and decompress the file
with open(pgn_path, 'rb') as f:
    dctx = zstd.ZstdDecompressor()
    stream_reader = dctx.stream_reader(f)
    text_stream = io.TextIOWrapper(stream_reader, encoding='utf-8')
    
    # Read the first game as an example
    game = chess.pgn.read_game(text_stream)
    
    # Print the game details
    if game:
        print(f"Event: {game.headers['Event']}")
        print(f"White: {game.headers['White']} (Elo: {game.headers.get('WhiteElo', 'N/A')})")
        print(f"Black: {game.headers['Black']} (Elo: {game.headers.get('BlackElo', 'N/A')})")
        print(f"Result: {game.headers['Result']}")
        print(f"Opening: {game.headers.get('Opening', 'N/A')}")
        print(f"ECO: {game.headers.get('ECO', 'N/A')}")
        print(f"\nMoves:")
        print(game)
    else:
        print("No game found in the file.")

Event: Rated Bullet game
White: my_name_jeff (Elo: 1706)
Black: xxxgrishaxxx (Elo: 1671)
Result: 0-1
Opening: Benoni Defense: Old Benoni
ECO: A43

Moves:
[Event "Rated Bullet game"]
[Site "https://lichess.org/VsUqVhC2"]
[Date "2025.07.01"]
[Round "-"]
[White "my_name_jeff"]
[Black "xxxgrishaxxx"]
[Result "0-1"]
[UTCDate "2025.07.01"]
[UTCTime "00:00:31"]
[WhiteElo "1706"]
[BlackElo "1671"]
[WhiteRatingDiff "-6"]
[BlackRatingDiff "+6"]
[ECO "A43"]
[Opening "Benoni Defense: Old Benoni"]
[TimeControl "60+0"]
[Termination "Time forfeit"]

1. d4 { [%clk 0:01:00] } 1... c5 { [%clk 0:01:00] } 2. e3 { [%clk 0:01:00] } 2... e6 { [%clk 0:00:59] } 3. dxc5 { [%clk 0:00:59] } 3... Bxc5 { [%clk 0:00:58] } 4. Nf3 { [%clk 0:00:59] } 4... Nf6 { [%clk 0:00:57] } 5. c3 { [%clk 0:00:59] } 5... Nc6 { [%clk 0:00:56] } 6. Bb5 { [%clk 0:00:58] } 6... a6 { [%clk 0:00:55] } 7. Bxc6 { [%clk 0:00:57] } 7... bxc6 { [%clk 0:00:55] } 8. O-O { [%clk 0:00:57] } 8... d5 { [%clk 0:00:54] } 9. Nd4 { [%clk 0:00:56] } 9...

In [4]:
import time

# Function to read multiple games
def read_games(file_path, max_games: int):
    games = []
    
    with open(file_path, 'rb') as f:
        dctx = zstd.ZstdDecompressor()
        stream_reader = dctx.stream_reader(f)
        text_stream = io.TextIOWrapper(stream_reader, encoding='utf-8')
        
        for _ in range(max_games):
            # Tracking how long this takes because there are a lot of games
            start_time = time.time() if _ == 0 else start_time  # Start timer at first iteration

            game = chess.pgn.read_game(text_stream)
            if (_ + 1) % 5_000 == 0:
                elapsed = time.time() - start_time
                print(f"Processed {_ + 1} games in {elapsed:.2f} seconds")
            if game is None:
                break
            games.append(game)
    
    return games

# Read 5 games as an example
games = read_games(pgn_path, max_games=30_000_000)

Processed 5000 games in 13.70 seconds
Processed 10000 games in 26.57 seconds
Processed 15000 games in 40.51 seconds
Processed 20000 games in 53.19 seconds
Processed 25000 games in 66.30 seconds
Processed 30000 games in 80.04 seconds
Processed 35000 games in 93.85 seconds
Processed 40000 games in 106.71 seconds
Processed 45000 games in 120.45 seconds
Processed 50000 games in 132.03 seconds
Processed 55000 games in 146.76 seconds
Processed 60000 games in 158.33 seconds
Processed 65000 games in 174.10 seconds
Processed 70000 games in 186.94 seconds
Processed 75000 games in 199.13 seconds
Processed 80000 games in 211.76 seconds
Processed 85000 games in 228.77 seconds


Processed 5000 games in 13.70 seconds
Processed 10000 games in 26.57 seconds
Processed 15000 games in 40.51 seconds
Processed 20000 games in 53.19 seconds
Processed 25000 games in 66.30 seconds
Processed 30000 games in 80.04 seconds
Processed 35000 games in 93.85 seconds
Processed 40000 games in 106.71 seconds
Processed 45000 games in 120.45 seconds
Processed 50000 games in 132.03 seconds
Processed 55000 games in 146.76 seconds
Processed 60000 games in 158.33 seconds
Processed 65000 games in 174.10 seconds
Processed 70000 games in 186.94 seconds
Processed 75000 games in 199.13 seconds
Processed 80000 games in 211.76 seconds
Processed 85000 games in 228.77 seconds


KeyboardInterrupt: 

In [None]:
# Now to filter out games we don't want, and extract only the information we need
# We'll do this bit by bit to make sure we don't mess it up.

import io

# Function to extract just the headers from a game
def extract_game_headers(game):
    if game is None:
        return None
    
    # Create a new dictionary with just the headers
    headers_dict = dict(game.headers)
    return headers_dict

# Extract headers from the first game as a test
game_headers = extract_game_headers(games[0])
print("Game headers only:")
print(game_headers)

# Process all games to get just the headers
all_game_headers = []
for i, game in enumerate(games, 1):  # Start with just 5 games
    headers = extract_game_headers(game)
    all_game_headers.append(headers)
    # print(f"Game {i} headers:")
    # print(f"  White: {headers['White']} (Elo: {headers.get('WhiteElo', 'N/A')})")
    # print(f"  Black: {headers['Black']} (Elo: {headers.get('BlackElo', 'N/A')})")
    # print(f"  Result: {headers['Result']}")
    # print(f"  ECO: {headers.get('ECO', 'N/A')}")
    # print(f"  Opening: {headers.get('Opening', 'N/A')}")
    # print()
    print(headers)

# Let's write a function that takes in this list of game headers
# Now to only extract the information we need from the headers
# Event, Date, White, Black, Result, WhiteElo, BlackElo, ECO, WhiteRatingDiff, BlackRatingDiff (this may be redundant), ECO, Opening, TimeControl, Termination (maybe)
# Make an object that extracts only these items and puts it in an object called... idk need a good name

# Return type of below function

class GameInfo(TypedDict):
    Event: str
    Date: str
    White: str
    Black: str
    Result: str
    WhiteElo: str
    BlackElo: str
    ECO: str
    Opening: str
    TimeControl: str
    Termination: str
    WhiteRatingDiff: str
    BlackRatingDiff: str


def extract_relevant_game_info(headers: dict[str, str] | None) -> Optional[GameInfo]:
    """Extract relevant information from a single game's headers."""

    if headers is None:
        return None

    relevant_game_info: GameInfo = {
        "Event": headers.get("Event", "N/A"),
        "Date": headers.get("Date", "N/A"),
        "White": headers.get("White", "N/A"),
        "Black": headers.get("Black", "N/A"),
        "Result": headers.get("Result", "N/A"),
        "WhiteElo": headers.get("WhiteElo", "N/A"),
        "BlackElo": headers.get("BlackElo", "N/A"),
        "ECO": headers.get("ECO", "N/A"),
        "Opening": headers.get("Opening", "N/A"),
        "TimeControl": headers.get("TimeControl", "N/A"),
        "Termination": headers.get("Termination", "N/A"),
        "WhiteRatingDiff": headers.get("WhiteRatingDiff", "N/A"),
        "BlackRatingDiff": headers.get("BlackRatingDiff", "N/A"),
    }


    return relevant_game_info

print("len(games)", len(games))


def extract_relevant_info_from_games(games: list[chess.pgn.Game]) -> list[GameInfo]:
    """Extract relevant information from a list of games."""
    relevant_info_list: list[GameInfo] = []

    for game in games:
        print("games length", len(games))
        headers = extract_game_headers(game)
        relevant_info = extract_relevant_game_info(headers)
        if relevant_info:
            relevant_info_list.append(relevant_info)

    return relevant_info_list

games_with_only_relevant_info = extract_relevant_info_from_games(games)
print(f"Extracted re levant info from {len(games_with_only_relevant_info)} games.")
print(games_with_only_relevant_info[0])

In [None]:
# Now to filter games we don't want.

## Next Steps

1. **Data Collection and Preprocessing**:
   - Filter games according to the specified criteria (rated games, not bullet/ultra-bullet, etc.)
   - Group games by username
   - Exclude users with insufficient number of games
   - Structure data for analysis

2. **Feature Engineering**:
   - Extract relevant features from games (opening choices, play style, etc.)
   - Create user profiles based on opening preferences
   - Identify patterns in user opening choices

3. **Model Development**:
   - Create a recommendation system that suggests openings based on user profiles
   - Evaluate model performance
   - Refine the model based on evaluation results

In [None]:
# Training data structure

# Note, I have a copilot chat in history about this

# Stuff we don't care about:
# Specific moves in the game
# Maybe time control? Adds a lot of complexity to training data
#

players_stats = {
    "my_username": {
        # Need to decide which TC this is in
        "rating": 1750,
        "black_games": {
            "opening_eco_code_1": {
                "opening_name": "French Defense",
                "results": {
                    "score_percentage_with_opening": 50,
                    "num_games": 74,
                    "num_wins": 15,
                    "num_losses": 25,
                    "num_draws": 7,
                },
            },
        },
        "white_games": {
            "opening_eco_code_1": {
                "opening_name": "French Defense",
                "results": {
                    "score_percentage_with_opening": 50,
                    "num_games": 74,
                    "num_wins": 15,
                    "num_losses": 25,
                    "num_draws": 7,
                },
            },
        },
        "num_games_total": 100,
    },
    "another_username": {
        # ....
    },
}

In [None]:
# How to extract data from PGN

# Data needed:
# Usernames
# Result
# Opening name and ECO code
# Time control - still need to figure out what we do with this, if any

# Exclude:
# Games shorter than a certain number of moves
# Cheat detected - maybe don't bother with this filter, it's rare and adds complexity
# Bullet games, probably
# Correspondence games maybe?

# Possibly:
# Weight games higher if they're Classical etc? Since players will spend more time on less classical games. Though this adds complexity



In [None]:
# Process all games and organize by player

def process_games_by_player(games, max_games=None):
    """Process games and organize them by player username"""
    players_data = {}
    
    for i, game in enumerate(games[:max_games] if max_games else games):
        headers = extract_game_headers(game)
        
        white_player = headers['White']
        black_player = headers['Black']
        white_elo = int(headers.get('WhiteElo', 0))
        black_elo = int(headers.get('BlackElo', 0))
        result = headers['Result']
        eco_code = headers.get('ECO', 'Unknown')
        opening_name = headers.get('Opening', 'Unknown Opening')
        time_control = headers.get('TimeControl', 'Unknown')
        
        # Process white player's game
        if white_player not in players_data:
            players_data[white_player] = {
                "rating": white_elo,  # Will be updated as we process more games
                "white_games": {},
                "black_games": {},
                "num_games_total": 0
            }
        
        # Update white player's data
        if eco_code not in players_data[white_player]["white_games"]:
            players_data[white_player]["white_games"][eco_code] = {
                "opening_name": opening_name,
                "results": {
                    "num_games": 0,
                    "num_wins": 0,
                    "num_losses": 0,
                    "num_draws": 0,
                    "score_percentage_with_opening": 0
                }
            }
        
        # Update game counts
        players_data[white_player]["num_games_total"] += 1
        players_data[white_player]["white_games"][eco_code]["results"]["num_games"] += 1
        
        # Update result counts
        if result == "1-0":  # White win
            players_data[white_player]["white_games"][eco_code]["results"]["num_wins"] += 1
        elif result == "0-1":  # Black win (white loss)
            players_data[white_player]["white_games"][eco_code]["results"]["num_losses"] += 1
        elif result == "1/2-1/2":  # Draw
            players_data[white_player]["white_games"][eco_code]["results"]["num_draws"] += 1
            
        # Update score percentage
        wins = players_data[white_player]["white_games"][eco_code]["results"]["num_wins"]
        draws = players_data[white_player]["white_games"][eco_code]["results"]["num_draws"]
        total = players_data[white_player]["white_games"][eco_code]["results"]["num_games"]
        score = (wins + (draws * 0.5)) / total * 100 if total > 0 else 0
        players_data[white_player]["white_games"][eco_code]["results"]["score_percentage_with_opening"] = round(score, 1)
        
        # Similarly process black player's game
        if black_player not in players_data:
            players_data[black_player] = {
                "rating": black_elo,
                "white_games": {},
                "black_games": {},
                "num_games_total": 0
            }
        
        # Update black player's data
        if eco_code not in players_data[black_player]["black_games"]:
            players_data[black_player]["black_games"][eco_code] = {
                "opening_name": opening_name,
                "results": {
                    "num_games": 0,
                    "num_wins": 0,
                    "num_losses": 0,
                    "num_draws": 0,
                    "score_percentage_with_opening": 0
                }
            }
        
        # Update game counts
        players_data[black_player]["num_games_total"] += 1
        players_data[black_player]["black_games"][eco_code]["results"]["num_games"] += 1
        
        # Update result counts
        if result == "0-1":  # Black win
            players_data[black_player]["black_games"][eco_code]["results"]["num_wins"] += 1
        elif result == "1-0":  # White win (black loss)
            players_data[black_player]["black_games"][eco_code]["results"]["num_losses"] += 1
        elif result == "1/2-1/2":  # Draw
            players_data[black_player]["black_games"][eco_code]["results"]["num_draws"] += 1
            
        # Update score percentage
        wins = players_data[black_player]["black_games"][eco_code]["results"]["num_wins"]
        draws = players_data[black_player]["black_games"][eco_code]["results"]["num_draws"]
        total = players_data[black_player]["black_games"][eco_code]["results"]["num_games"]
        score = (wins + (draws * 0.5)) / total * 100 if total > 0 else 0
        players_data[black_player]["black_games"][eco_code]["results"]["score_percentage_with_opening"] = round(score, 1)
    
    return players_data

# Process a small set of games as a test
players_stats_sample = process_games_by_player(games, max_games=50)

# Print stats for one player to verify
import random
if players_stats_sample:
    sample_player = random.choice(list(players_stats_sample.keys()))
    print(f"Sample stats for player: {sample_player}")
    print(f"Rating: {players_stats_sample[sample_player]['rating']}")
    print(f"Total games: {players_stats_sample[sample_player]['num_games_total']}")
    print("\nWhite openings:")
    for eco, data in players_stats_sample[sample_player]['white_games'].items():
        print(f"  {eco} - {data['opening_name']}: {data['results']['score_percentage_with_opening']}% score in {data['results']['num_games']} games")
    print("\nBlack openings:")
    for eco, data in players_stats_sample[sample_player]['black_games'].items():
        print(f"  {eco} - {data['opening_name']}: {data['results']['score_percentage_with_opening']}% score in {data['results']['num_games']} games")

Sample stats for player: ghridjvw
Rating: 1588
Total games: 1

White openings:

Black openings:
  A00 - Hungarian Opening: Dutch Defense: 0.0% score in 1 games
