In [16]:
import chess
import chess.engine
import chess.pgn
from datetime import datetime
import pandas as pd
import math
from tqdm import tqdm
from typing import Union, List
import zipfile
import os
import numpy as np
import json
import time

In [2]:
def evaluate_position(board: chess.Board , engine: chess.engine.SimpleEngine, limit: chess.engine.Limit):
   info = engine.analyse(board, limit)
   return info['score'].white().score(mate_score=1000)

def parse_elo_rating(rating_str: str) -> Union[int, None]:
    try: 
        rating = int(rating_str)
    except ValueError:
        return None
    return rating

def parse_date(date_str: str) -> Union[datetime, None]:
    try:
        date = datetime.strptime(date_str, '%Y.%m.%d')
    except:
        try:
            date = datetime.strptime(date_str, '%Y.??.??')
        except:
            return None

    return date 

def read_games(pgn_path: str) -> List[chess.pgn.Game]:
    games = []
    with open(pgn_path) as file:
        while True:
            game = chess.pgn.read_game(file)
            if game is None:
                break  # end of games in file
            games.append(game)
    return games

def save_output(path: str, output: List[dict]):
    with open(path, 'w') as fout:
        json.dump(output, fout)

In [3]:
stockfish_path = '/usr/local/Cellar/stockfish/15/bin/stockfish'
engine = chess.engine.SimpleEngine.popen_uci(stockfish_path)
movetimesec = 999
depth = 10
limit = chess.engine.Limit(time=movetimesec, depth=depth)

In [4]:
# import multiprocessing as mp
import multiprocess as mp
print(mp.cpu_count())

12


In [5]:
# for player_png_file in os.listdir(data_path):
  #   player_name = player_png_file.split('.')[0].lower()
    # if player_name not in include_players:
       #  continue

In [6]:
include_players = ['Anand', 'Aronian', 'Carlsen', 'Caruana', 'Duda']

In [7]:
def process_player_games(player_name: str):
    data_path = 'data' 
    output_path = 'processed_data'
    output = []
    try:
        pgn_path = f'{data_path}/{player_name}.pgn'
        games = read_games(pgn_path)
    except:
        print(f'Couldnt read games of player: {player_name}.')
        return 

    for i, game in enumerate(games, start=1):
        if i > 10:
            break
        try: 
            print(f'[{player_name}] Game: {i}/{len(games)}')
            event = game.headers['Event']
            date = game.headers['Date']
            white_player = game.headers['White']
            black_player = game.headers['Black']
            white_elo = parse_elo_rating(game.headers['WhiteElo'])
            black_elo = parse_elo_rating(game.headers['BlackElo'])
            result = game.headers['Result']
            white_won = result == '1-0'
            draw = result == '1/2-1/2'
            black_won = result == '0-1'

            if white_elo is None and black_elo is None:
                print('Missing ELO for both players.')
                continue

            board = game.board()
            init_evaluation = evaluate_position(board, engine, limit)
            evaluations = [init_evaluation]
            moves = game.mainline_moves()
            for move in moves:
                board.push(move)
                position_evaluation = evaluate_position(board, engine, limit)
                evaluations.append(position_evaluation)
                info = engine.analyse(board, limit)

            evaluations = np.array(evaluations)
            evaluations[evaluations < -1000] = -1000
            evaluations[evaluations > 1000] = 1000

            white_centipawn_losses = -np.diff(evaluations)[::2]
            black_centipawn_losses  = np.diff(evaluations)[1::2]
            white_centipawn_losses[white_centipawn_losses < 0] = 0
            black_centipawn_losses[black_centipawn_losses < 0] = 0

            white_avg_cp_loss = np.mean(white_centipawn_losses)
            black_avg_cp_loss = np.mean(black_centipawn_losses)
            
            # print(f'Avg white cp loss: {white_avg_cp_loss:.2f}')
            # print(f'Avg black cp loss: {black_avg_cp_loss:.2f}')

            game_output = {
                'event': event,
                'date': date,
                'white_player': white_player,
                'black_player': black_player,
                'white_elo': white_elo,
                'black_elo': black_elo,
                'result': result,
                'avg_white_cp_loss': white_avg_cp_loss,
                'avg_black_cp_loss': black_avg_cp_loss,
                'white_cp_losses': white_centipawn_losses,
                'black_cp_losses': black_centipawn_losses,
            }
            output.append(game_output)
        except:
            print(f'Couldnt process game: {i} of player: {player_name}.')

    np.save(f'{output_path}/{player_name}.npy', output)
    # save_output(f'{output_path}/{player_name}.json', output)

