In [7]:
# Part 1: Initialization and Loading PGN Files

# Import required libraries
import chess
import chess.engine
import chess.pgn
import chess.svg
import subprocess
import os
from PIL import Image
import io
import json
import random
import time
import csv
from IPython.display import display

# Paths to your engines and Maia models
stockfish_path = "/opt/homebrew/bin/stockfish"
maia_model_paths = {
    1100: "maia_weights/maia-1100.pb.gz",
    1200: "maia_weights/maia-1200.pb.gz",
    1300: "maia_weights/maia-1300.pb.gz",
    1400: "maia_weights/maia-1400.pb.gz",
    1500: "maia_weights/maia-1500.pb.gz",
    1600: "maia_weights/maia-1600.pb.gz",
    1700: "maia_weights/maia-1700.pb.gz",
    1800: "maia_weights/maia-1800.pb.gz",
    1900: "maia_weights/maia-1900.pb.gz",
}

# Initialize Stockfish engine
print("Initializing Stockfish engine...")
try:
    stockfish_engine = chess.engine.SimpleEngine.popen_uci(stockfish_path, stderr=subprocess.DEVNULL)
    print("Stockfish engine initialized.")
except Exception as e:
    print(f"Error initializing Stockfish engine: {e}")

# Initialize Maia engines
print("Initializing Maia engines...")
maia_engines = {}
for elo, path in maia_model_paths.items():
    try:
        print(f"Initializing Maia {elo} engine...")
        maia_engines[elo] = chess.engine.SimpleEngine.popen_uci(["lc0", f"--weights={path}"], stderr=subprocess.DEVNULL)
        print(f"Maia {elo} engine initialized.")
    except Exception as e:
        print(f"Error initializing Maia {elo} engine: {e}")


# Function to load games from a PGN file
def load_games_from_pgn(file_path):
    games = []
    with open(file_path, 'r') as pgn_file:
        while True:
            game = chess.pgn.read_game(pgn_file)
            if game is None:
                break
            games.append(game)
    print(f"Loaded {len(games)} games from PGN file {file_path}.")
    return games


# Directory containing PGN files
pgn_directory = "analysis_pgns_test"
all_games = []

print("Starting to load PGN files...")

for pgn_file in os.listdir(pgn_directory):
    if pgn_file.endswith(".pgn"):
        pgn_file_path = os.path.join(pgn_directory, pgn_file)
        print(f"Loading {pgn_file_path}...")
        all_games.extend(load_games_from_pgn(pgn_file_path))

print(f"Total games loaded: {len(all_games)}")

Initializing Stockfish engine...
Stockfish engine initialized.
Initializing Maia engines...
Initializing Maia 1100 engine...
Maia 1100 engine initialized.
Initializing Maia 1200 engine...
Maia 1200 engine initialized.
Initializing Maia 1300 engine...
Maia 1300 engine initialized.
Initializing Maia 1400 engine...
Maia 1400 engine initialized.
Initializing Maia 1500 engine...
Maia 1500 engine initialized.
Initializing Maia 1600 engine...
Maia 1600 engine initialized.
Initializing Maia 1700 engine...
Maia 1700 engine initialized.
Initializing Maia 1800 engine...
Maia 1800 engine initialized.
Initializing Maia 1900 engine...
Maia 1900 engine initialized.
Starting to load PGN files...
Loading analysis_pgns_test/4c6eS6SH---1969 Petrosian vs. Spassky .pgn...
Loaded 9 games from PGN file analysis_pgns_test/4c6eS6SH---1969 Petrosian vs. Spassky .pgn.
Loading analysis_pgns_test/4S5UuAGn---1986 Karpov vs. Kasparov III.pgn...
Loaded 11 games from PGN file analysis_pgns_test/4S5UuAGn---1986 Karpov 

In [8]:
# Part 2: Analysis Functions

import chess.pgn


# Function to calculate average error for a position using Stockfish
def calculate_average_error(board, stockfish_engine):
    legal_moves = list(board.legal_moves)
    total_error = 0
    num_moves = len(legal_moves)

    for move in legal_moves:
        board.push(move)
        info = stockfish_engine.analyse(board, chess.engine.Limit(time=0.1))
        eval_score = info["score"].relative.score(mate_score=10000) / 100.0
        total_error += abs(eval_score)
        board.pop()

    average_error = total_error / num_moves if num_moves > 0 else 0
    return average_error


# Function to analyze a game using Stockfish and Maia engines
def analyze_with_engines(game, stockfish_engine, maia_engines):
    board = game.board()
    blunders = []

    for move in game.mainline_moves():
        fen_before = board.fen()  # Get FEN before the move
        player = board.turn  # True for White, False for Black

        # Analyze the position before the move
        stockfish_info_before = stockfish_engine.analyse(board, chess.engine.Limit(time=0.1))
        stockfish_eval_before = stockfish_info_before["score"].relative.score(mate_score=10000) / 100.0

        board.push(move)

        # Analyze the position after the move
        stockfish_info_after = stockfish_engine.analyse(board, chess.engine.Limit(time=0.1))
        stockfish_eval_after = stockfish_info_after["score"].relative.score(mate_score=10000) / 100.0
        stockfish_best_move_after = stockfish_info_after["pv"][0] if "pv" in stockfish_info_after else None

        # Define blunder threshold
        if player and stockfish_eval_after < stockfish_eval_before - 4:
            # White's turn, evaluation worsened significantly
            maia_suggestions = {}
            board.pop()  # Revert to the position before the blunder
            for elo, maia_engine in maia_engines.items():
                # Analyze the position before the blunder for Maia suggestions
                maia_info = maia_engine.analyse(board, chess.engine.Limit(nodes=1))
                maia_eval = maia_info["score"].relative.score(mate_score=10000) / 100.0
                maia_best_move = maia_info["pv"][0] if "pv" in maia_info else None
                maia_suggestions[elo] = (maia_best_move, maia_eval)
            board.push(move)  # Push the move again to keep the board state

            blunders.append((fen_before, move, stockfish_eval_after, stockfish_best_move_after, maia_suggestions))
        elif not player and stockfish_eval_after > stockfish_eval_before + 4:
            # Black's turn, evaluation worsened significantly
            maia_suggestions = {}
            board.pop()  # Revert to the position before the blunder
            for elo, maia_engine in maia_engines.items():
                # Analyze the position before the blunder for Maia suggestions
                maia_info = maia_engine.analyse(board, chess.engine.Limit(nodes=1))
                maia_eval = maia_info["score"].relative.score(mate_score=10000) / 100.0
                maia_best_move = maia_info["pv"][0] if "pv" in maia_info else None
                maia_suggestions[elo] = (maia_best_move, maia_eval)
            board.push(move)  # Push the move again to keep the board state

            blunders.append((fen_before, move, stockfish_eval_after, stockfish_best_move_after, maia_suggestions))

    print(f"Analyzed game with {len(blunders)} blunders found.")
    return blunders


# Function to filter the best blunders based on majority agreement and magnitude of evaluation change
def filter_best_blunders(blunders, evaluation_change_threshold=3.0, majority_threshold=0.8):
    filtered_blunders = []

    for fen, move, stockfish_eval, stockfish_best_move, maia_suggestions in blunders:
        move_counts = {}

        # Count how many times each move is suggested by Maia engines above 1300
        for elo, (best_move, maia_eval) in maia_suggestions.items():
            if elo > 1300:
                if best_move in move_counts:
                    move_counts[best_move] += 1
                else:
                    move_counts[best_move] = 1

        # Find the move with the highest count
        if move_counts:
            most_common_move, count = max(move_counts.items(), key=lambda item: item[1])

            # Calculate the agreement ratio
            total_considered = sum(1 for elo in maia_suggestions if elo > 1300)
            agreement_ratio = count / total_considered if total_considered > 0 else 0

            # Calculate the evaluation change
            eval_change = abs(stockfish_eval)

            # Filter based on majority agreement and evaluation change threshold
            if agreement_ratio >= majority_threshold and eval_change >= evaluation_change_threshold:
                filtered_blunders.append((fen, move, stockfish_eval, most_common_move, maia_suggestions))

    print(f"Filtered blunders down to {len(filtered_blunders)} consistent ones.")
    return filtered_blunders

In [9]:
# Part 3: Analyzing Games and Saving Results

import csv
import json

# Directory containing PGN files
pgn_directory = "analysis_pgns_test"
all_positions = []

# Function to save results incrementally to CSV
def save_results_to_csv(csv_file_path, data):
    with open(csv_file_path, "a", newline='') as csvfile:
        csvwriter = csv.writer(csvfile)
        for entry in data:
            csvwriter.writerow([
                entry["FEN"], entry["Average Error"], entry["Event"], entry["Move"],
                entry["Best Move"], entry["Player"], entry["To Move"], entry["White"],
                entry["Black"], entry["Round"], entry["Result"], entry["WhiteElo"],
                entry["BlackElo"], entry["Opening"], entry["File"]
            ])

# Function to save results incrementally to JSON
def save_results_to_json(json_file_path, data):
    if os.path.exists(json_file_path):
        with open(json_file_path, "r") as jsonfile:
            json_data = json.load(jsonfile)
    else:
        json_data = {}

    for entry in data:
        event = entry["Event"]
        if event not in json_data:
            json_data[event] = []
        json_data[event].append(entry)

    with open(json_file_path, "w") as jsonfile:
        json.dump(json_data, jsonfile, indent=4)

# Paths to save results
csv_file_path = "positions_with_errors.csv"
json_file_path = "positions_with_errors.json"

# Initialize CSV file with headers
with open(csv_file_path, "w", newline='') as csvfile:
    csvwriter = csv.writer(csvfile)
    csvwriter.writerow([
        "FEN", "Average Error", "Event", "Move", "Best Move", "Player",
        "To Move", "White", "Black", "Round", "Result", "WhiteElo",
        "BlackElo", "Opening", "File"
    ])

print("Starting analysis of PGN files...")

for pgn_file in os.listdir(pgn_directory):
    if pgn_file.endswith(".pgn"):
        pgn_file_path = os.path.join(pgn_directory, pgn_file)
        print(f"Analyzing {pgn_file_path}...")
        all_games = load_games_from_pgn(pgn_file_path)
        print(f"Loaded {len(all_games)} games from PGN file {pgn_file_path}.")

        # Analyze games without using Maia
        for game in all_games:
            print(f"Analyzing game {game.headers.get('Event', 'Unknown Event')} without Maia...")
            board = game.board()
            node = game
            current_positions = []
            while node.variations:
                move = node.variation(0).move
                fen = board.fen()
                print(f"Analyzing position: {fen}")
                avg_error = calculate_average_error(board, stockfish_engine)
                stockfish_info = stockfish_engine.analyse(board, chess.engine.Limit(time=0.1))
                best_move = stockfish_info["pv"][0] if "pv" in stockfish_info else None

                # Check for move equality and filter out if they are the same
                if str(move) == str(best_move):
                    print(f"Skipping move {move} as it is the same as the best move {best_move}.")
                    board.push(move)
                    node = node.variation(0)
                    continue

                event_details = {
                    "FEN": fen,
                    "Average Error": avg_error,
                    "Event": game.headers.get("Event", "Unknown Event"),
                    "Move": str(move),
                    "Best Move": str(best_move),
                    "Player": "White" if board.turn else "Black",
                    "To Move": "White" if board.turn else "Black",
                    "White": game.headers.get("White", "Unknown"),
                    "Black": game.headers.get("Black", "Unknown"),
                    "Round": game.headers.get("Round", "Unknown"),
                    "Result": game.headers.get("Result", "*"),
                    "WhiteElo": game.headers.get("WhiteElo", "N/A"),
                    "BlackElo": game.headers.get("BlackElo", "N/A"),
                    "Opening": game.headers.get("Opening", "Unknown Opening"),
                    "File": pgn_file_path
                }
                current_positions.append(event_details)
                try:
                    board.push(move)
                except AssertionError as e:
                    print(f"Error: {e}, move: {move}, position: {board.fen()}")
                    break  # Skip to the next game
                node = node.variation(0)  # Move to the next node in the game tree

                # Debugging print to ensure we are making progress
                print(f"Processed move: {move}, current positions collected: {len(current_positions)}")

            # Save results incrementally
            save_results_to_csv(csv_file_path, current_positions)
            save_results_to_json(json_file_path, current_positions)

            print(f"Finished analyzing game {game.headers.get('Event', 'Unknown Event')} without Maia.")

# Analyze games using Maia
for pgn_file in os.listdir(pgn_directory):
    if pgn_file.endswith(".pgn"):
        pgn_file_path = os.path.join(pgn_directory, pgn_file)
        print(f"Analyzing {pgn_file_path} (with Maia)...")
        all_games = load_games_from_pgn(pgn_file_path)
        print(f"Loaded {len(all_games)} games from PGN file {pgn_file_path}.")

        for game in all_games:
            print(f"Analyzing game {game.headers.get('Event', 'Unknown Event')} with Maia...")
            blunders = analyze_with_engines(game, stockfish_engine, maia_engines)
            filtered_blunders = filter_best_blunders(blunders)
            current_positions = []

            for fen, move, stockfish_eval, best_move, maia_suggestions in filtered_blunders:
                avg_error = calculate_average_error(chess.Board(fen), stockfish_engine)

                # Check for move equality and filter out if they are the same
                if str(move) == str(best_move):
                    print(f"Skipping move {move} as it is the same as the best move {best_move}.")
                    continue

                event_details = {
                    "FEN": fen,
                    "Average Error": avg_error,
                    "Event": game.headers.get("Event", "Unknown Event"),
                    "Move": str(move),
                    "Best Move": str(best_move),
                    "Player": "White" if chess.Board(fen).turn else "Black",
                    "To Move": "White" if chess.Board(fen).turn else "Black",
                    "White": game.headers.get("White", "Unknown"),
                    "Black": game.headers.get("Black", "Unknown"),
                    "Round": game.headers.get("Round", "Unknown"),
                    "Result": game.headers.get("Result", "*"),
                    "WhiteElo": game.headers.get("WhiteElo", "N/A"),
                    "BlackElo": game.headers.get("BlackElo", "N/A"),
                    "Opening": game.headers.get("Opening", "Unknown Opening"),
                    "File": pgn_file_path
                }
                current_positions.append(event_details)

                # Debugging print to ensure we are making progress
                print(f"Processed move: {move}, current positions collected: {len(current_positions)}")

            # Save results incrementally
            save_results_to_csv(csv_file_path, current_positions)
            save_results_to_json(json_file_path, current_positions)

            print(f"Finished analyzing game {game.headers.get('Event', 'Unknown Event')} with Maia.")

print("Analysis complete. Results saved incrementally.")

Starting analysis of PGN files...
Analyzing analysis_pgns_test/4c6eS6SH---1969 Petrosian vs. Spassky .pgn...
Loaded 9 games from PGN file analysis_pgns_test/4c6eS6SH---1969 Petrosian vs. Spassky .pgn.
Loaded 9 games from PGN file analysis_pgns_test/4c6eS6SH---1969 Petrosian vs. Spassky .pgn.
Analyzing game FIDE World Championship Match 1969 without Maia...
Analyzing position: rnbqkbnr/pppppppp/8/8/8/8/PPPPPPPP/RNBQKBNR w KQkq - 0 1
Skipping move e2e4 as it is the same as the best move e2e4.
Analyzing position: rnbqkbnr/pppppppp/8/8/4P3/8/PPPP1PPP/RNBQKBNR b KQkq - 0 1
Skipping move c7c5 as it is the same as the best move c7c5.
Analyzing position: rnbqkbnr/pp1ppppp/8/2p5/4P3/8/PPPP1PPP/RNBQKBNR w KQkq - 0 2
Skipping move g1f3 as it is the same as the best move g1f3.
Analyzing position: rnbqkbnr/pp1ppppp/8/2p5/4P3/5N2/PPPP1PPP/RNBQKB1R b KQkq - 1 2
Processed move: e7e6, current positions collected: 1
Analyzing position: rnbqkbnr/pp1p1ppp/4p3/2p5/4P3/5N2/PPPP1PPP/RNBQKB1R w KQkq - 0 3
Pro

KeyboardInterrupt: 