In [1]:
# Code to generate a main CSV file form all the games from the PGN. Took 5 hours for 2 million games
import chess.pgn
import pandas as pd
import os
from tqdm import tqdm
from multiprocessing import Pool, cpu_count
from io import StringIO
import glob

# Configuration
CHUNK_SIZE = 5000  # Games per CSV file
OUTPUT_DIR = "pgn_chunks"
os.makedirs(OUTPUT_DIR, exist_ok=True)

def process_game(game_str):
    """Process a single game string"""
    try:
        game = chess.pgn.read_game(StringIO(game_str))
        if not game:
            return None
            
        headers = game.headers
        return {
            'Event': headers.get('Event'),
            'White': headers.get('White'),
            'Black': headers.get('Black'),
            'Result': headers.get('Result'),
            'WhiteElo': try_int(headers.get('WhiteElo')),
            'BlackElo': try_int(headers.get('BlackElo')),
            'ECO': headers.get('ECO'),
            'TimeControl': headers.get('TimeControl'),
            'Termination': headers.get('Termination'),
            'Moves': ' '.join(str(move) for move in game.mainline_moves())
        }
    except Exception as e:
        print(f"Error processing game: {e}")
        return None

def try_int(value):
    """Safely convert to integer"""
    try:
        return int(value) if value and str(value).isdigit() else None
    except:
        return None

def process_chunk(chunk, chunk_number):
    """Process and save a single chunk"""
    with Pool(cpu_count()) as pool:
        results = [r for r in pool.map(process_game, chunk) if r is not None]
    
    if results:
        pd.DataFrame(results).to_csv(
            f"{OUTPUT_DIR}/chunk_{chunk_number:04d}.csv",
            index=False
        )
    return len(results)

def process_pgn_file(pgn_path):
    """Main processing function"""
    # First count total games if possible
    total_games = None
    try:
        with open(pgn_path) as temp_file:
            total_games = sum(1 for _ in chess.pgn.scan_headers(temp_file))
    except:
        pass
    
    # Process file in chunks
    chunk_number = 0
    processed_games = 0
    current_chunk = []
    
    with open(pgn_path) as pgn_file, \
         tqdm(desc="Processing PGN", total=total_games, unit="game") as pbar:
        
        while True:
            game = chess.pgn.read_game(pgn_file)
            if game is None:
                if current_chunk:
                    processed_games += process_chunk(current_chunk, chunk_number)
                    chunk_number += 1
                break
                
            exporter = chess.pgn.StringExporter(headers=True, variations=True, comments=True)
            current_chunk.append(game.accept(exporter))
            
            if len(current_chunk) >= CHUNK_SIZE:
                processed_games += process_chunk(current_chunk, chunk_number)
                pbar.update(len(current_chunk))
                current_chunk = []
                chunk_number += 1
    
    # Combine all CSV files
    combined_df = pd.concat(
        [pd.read_csv(f) for f in glob.glob(f"{OUTPUT_DIR}/chunk_*.csv")],
        ignore_index=True
    )
    
    # Save final combined file
    combined_df.to_csv("combined_chess_games.csv", index=False)
    
    # Optionally clean up chunk files
    for f in glob.glob(f"{OUTPUT_DIR}/chunk_*.csv"):
        os.remove(f)
    
    return combined_df

# Usage
df = process_pgn_file(
    "/home/igorreis/Documentos/Python Projects/chess-data-coach/data/lichess_games_05_2025.pgn"
)
print(f"Processed {len(df)} games")

Processing PGN: 2785000game [4:10:55, 184.99game/s]


Processed 2789900 games


In [2]:
import pandas as pd

df = pd.read_csv("combined_chess_games.csv")
# Values to exclude
values_to_exclude = ['Abandoned', 'Unterminated', 'Rules infraction']

# Filter rows where column 'B' is NOT in the exclusion list
df = df[(~df['Termination'].isin(values_to_exclude)) & (~df['Moves'].isna()) & (df['Moves'].str.count(" ") > 30)]
sample_df = df.copy()

In [2]:
import chess
import chess.pgn
import io
import pandas as pd
from tqdm import tqdm
from multiprocessing import Pool, cpu_count
import numpy as np

def get_fen_after_move(args):
    """Process single game with error handling."""
    move_text, move_number = args
    try:
        game = chess.pgn.read_game(io.StringIO(move_text))
        if not game:
            return np.nan
            
        board = game.board()
        for move in game.mainline_moves():
            board.push(move)
            if board.fullmove_number > move_number:
                break
        return board.fen()
    except Exception:
        return np.nan

def batch_process(df, move_number=30, n_workers=None):
    """Parallel processing of FEN extraction."""
    n_workers = n_workers or max(1, cpu_count() - 1)
    args = [(move, move_number) for move in df['Moves']]
    
    with Pool(n_workers) as pool:
        results = list(tqdm(
            pool.imap(get_fen_after_move, args),
            total=len(df),
            desc=f"Extracting FENs after move {move_number}"
        ))
    
    return pd.Series(results, index=df.index)

# Usage (3-5x faster than progress_apply)
sample_df['fen_after_15'] = batch_process(sample_df, move_number=15)

Extracting FENs after move 15: 100%|██████████| 2488096/2488096 [24:46<00:00, 1674.23it/s]


In [None]:
# Retrieve evaluations from fens
import logging
import chess
import chess.engine
import numpy as np
from tqdm import tqdm

def get_stockfish_eval_batch(fens, depth=10, threads=6, hash_mb=512, move_time=0.1):
    with chess.engine.SimpleEngine.popen_uci("/usr/games/stockfish") as engine:
        engine.configure({
            "Threads": threads,
            "Hash": hash_mb,
        })
        results = []
        for fen in tqdm(fens, desc="Evaluating FENs with Stockfish"):
            if fen is None:
                results.append(None)
                continue
            board = chess.Board(fen)
            try:
                result = engine.analyse(board, chess.engine.Limit(depth=depth, time=move_time))
                results.append(result["score"])
            except Exception as e:
                logging.warning(f"Error analyzing FEN: {fen} — {e}")
                results.append(None)
        return results

def format_evaluation(score):
    if score is None:
        return None
    if score.is_mate():
        mate_in = score.relative.mate()
        return float("inf") if mate_in > 0 else float("-inf")
    return score.white().score()

def get_readable_eval(score):
    if score is None:
        return "No evaluation"
    if score == float("inf"):
        return "White mates"
    elif score == float("-inf"):
        return "Black mates"
    return f"{score / 100:.2f}"

# Evaluate with Stockfish
sample_df["opening_eval"] = get_stockfish_eval_batch(sample_df["fen_after_15"].tolist())

# Format evaluation
sample_df["opening_eval"] = sample_df["opening_eval"].apply(format_evaluation).apply(get_readable_eval)
sample_df.to_csv("sample_df.csv", index = False)

Evaluating FENs with Stockfish: 100%|██████████| 2488096/2488096 [11:24:45<00:00, 60.56it/s]  


In [5]:
import pandas as pd
import numpy as np

sample_df = pd.read_csv("sample_df.csv")

# Filter data with no check mates and turn numeric columns to numeric
sample_df = sample_df.query(
    "opening_eval not in ['White mates', 'Black mates']"
).copy()
sample_df['WhiteElo'] = pd.to_numeric(sample_df['WhiteElo'])
sample_df['BlackElo'] = pd.to_numeric(sample_df['BlackElo'])
sample_df["opening_eval"] = pd.to_numeric(sample_df["opening_eval"])

# Define the bracket edges (e.g., 0-200, 201-400, ..., up to the max Elo)
sample_df['AvgElo'] = ((sample_df['WhiteElo'] + sample_df['BlackElo']) / 2).astype(int)
max_elo = sample_df['AvgElo'].max()
bracket_step = 200
bracket_edges = list(range(0, int(max_elo) + bracket_step, bracket_step))

# Create labels for the brackets (e.g., "1-200", "201-400", etc.)
bracket_labels = [f"{i+1}-{i+bracket_step}" for i in bracket_edges[:-1]]

# Assign each match to a bracket
sample_df['EloBracket'] = pd.cut(
    sample_df['AvgElo'],
    bins=bracket_edges,
    labels=bracket_labels,
    include_lowest=True
)

# Define conditions
conditions = [
    (sample_df['Result'] == '1-0') & (sample_df['opening_eval'] > 0),
    (sample_df['Result'] == '1-0') | (sample_df['Result'] == '1/2-1/2') & (sample_df['opening_eval'] < 0)
]

# Define corresponding values
choices = ['Advantage Capitalization', 'Resourcefulness']

# Apply conditions
sample_df['conversion_type'] = np.select(conditions, choices, default="Other")

# Data to be used later
conversion_type = sample_df['conversion_type'].value_counts(normalize = True).round(2)
conversion_type_df = conversion_type.reset_index(name='percentage')

proportions = sample_df.groupby('EloBracket')['conversion_type'].value_counts(normalize=True).round(2)
proportions_df = proportions.reset_index(name='percentage')

opening_eval_per_eco = sample_df.groupby('ECO')['opening_eval'].mean().sort_values().round(2)
opening_eval_per_eco_df = opening_eval_per_eco.reset_index(name='evaluation')

popular_openings = sample_df['ECO'].value_counts(normalize=True).round(2)
popular_openings_df = popular_openings.reset_index(name='percentage')

# Save the data to JSON
import json

data = {
    "conversion_type": conversion_type_df.to_dict(orient="records"),
    "opening_eval_per_eco": opening_eval_per_eco_df.to_dict(orient="records"),
    "proportions": proportions_df.to_dict(orient="records"),
    "popular_openings":popular_openings_df.to_dict(orient="records")
}

with open("summary_data.json", "w") as f:
    json.dump(data, f, indent=2)

  proportions = sample_df.groupby('EloBracket')['conversion_type'].value_counts(normalize=True).round(2)


In [6]:
# Calculate amount of games won when at advantage and disadvantage
def calculate_advantage_conversion(sample_df):
    """
    Calculate:
    1. Percentage of games won when having opening advantage
    2. Percentage of games won/drawn when at opening disadvantage
    
    Parameters:
        sample_df - DataFrame containing chess games with:
             - 'opening_eval': numeric evaluation after opening
             - 'result': game result (1=win, 0.5=draw, 0=loss)
    """
    # Create conditions
    advantage = sample_df['opening_eval'] > 1 # Esse eval é sempre do ponto de vista do jogador. Se preto e ganhando ainda é positivo
    disadvantage = sample_df['opening_eval'] < -1
    won = sample_df['Result'] == '1-0'
    drawn = sample_df['Result'] == '1/2-1/2'
    
    # Calculate stats
    total_advantage = advantage.sum()
    total_disadvantage = disadvantage.sum()
    
    if total_advantage > 0:
        pct_won_when_ahead = (advantage & won).sum() / total_advantage * 100
    else:
        pct_won_when_ahead = 0.0
    
    if total_disadvantage > 0:
        pct_won_or_drawn_when_behind = ((disadvantage & (won | drawn)).sum() / 
                                      total_disadvantage * 100)
    else:
        pct_won_or_drawn_when_behind = 0.0
    
    return {
        'pct_won_when_ahead': float(pct_won_when_ahead.round(2)),
        'pct_won_or_drawn_when_behind': float(pct_won_or_drawn_when_behind.round(2)),
        'games_with_advantage': int(total_advantage),
        'games_with_disadvantage': int(total_disadvantage)
    }

stats = calculate_advantage_conversion(sample_df)
print(f"Win % when ahead: {stats['pct_won_when_ahead']:.1f}%")
print(f"Win/draw % when behind: {stats['pct_won_or_drawn_when_behind']:.1f}%")

opening_eval_per_eco = sample_df.groupby('ECO')['opening_eval'].mean().sort_values().round(2)
opening_eval_per_eco_df = opening_eval_per_eco.reset_index(name='evaluation')

popular_openings = sample_df['ECO'].value_counts(normalize=True).round(4)
popular_openings_df = popular_openings.head().reset_index(name='percentage')

# Save the data to JSON
import json

data = {
    "conversion_stats": calculate_advantage_conversion(sample_df),
    "opening_eval_per_eco": opening_eval_per_eco_df.to_dict(orient="records"),
    "popular_openings":popular_openings_df.to_dict(orient="records")
}

with open("summary_data_lichess_games.json", "w") as f:
    json.dump(data, f, indent=2)

Win % when ahead: 62.8%
Win/draw % when behind: 34.9%


In [3]:
# Read the JSON data
import pandas as pd
import json

# Read JSON back into a dictionary
with open("summary_data_lichess_games.json", "r") as f:
    loaded_data = json.load(f)

loaded_data

{'conversion_stats': {'pct_won_when_ahead': 64.79,
  'pct_won_or_drawn_when_behind': 32.85,
  'games_with_advantage': 788081,
  'games_with_disadvantage': 522425},
 'opening_eval_per_eco': [{'ECO': 'E03', 'evaluation': -1.64},
  {'ECO': 'E23', 'evaluation': -1.2},
  {'ECO': 'C38', 'evaluation': -1.09},
  {'ECO': 'E13', 'evaluation': -0.82},
  {'ECO': 'D81', 'evaluation': -0.7},
  {'ECO': 'E29', 'evaluation': -0.64},
  {'ECO': 'E28', 'evaluation': -0.62},
  {'ECO': 'B20', 'evaluation': -0.43},
  {'ECO': 'A98', 'evaluation': -0.37},
  {'ECO': 'E59', 'evaluation': -0.32},
  {'ECO': 'C76', 'evaluation': -0.32},
  {'ECO': 'E78', 'evaluation': -0.28},
  {'ECO': 'B74', 'evaluation': -0.27},
  {'ECO': 'B87', 'evaluation': -0.23},
  {'ECO': 'B49', 'evaluation': -0.23},
  {'ECO': 'B70', 'evaluation': -0.23},
  {'ECO': 'E45', 'evaluation': -0.22},
  {'ECO': 'C37', 'evaluation': -0.21},
  {'ECO': 'B62', 'evaluation': -0.2},
  {'ECO': 'D28', 'evaluation': -0.19},
  {'ECO': 'A89', 'evaluation': -0.1