In [1]:
import os
import json
import time
import chess
import chess
import chess.engine
import pandas as pd
import matplotlib.pyplot as plt
from tqdm import tqdm
import os
import json
from chess_analysis import extract_score, evaluate_moves, decode, process_game, should_process_file
import hashlib

In [2]:
%%file chess_analysis.py
import os
import json
import time
import chess
import chess.engine
import pandas as pd
import matplotlib.pyplot as plt
from tqdm import tqdm
import os
import json
import hashlib

engine_path = "/opt/homebrew/bin/stockfish"


def extract_score(score_obj):
    if score_obj.is_mate():
        # Return 99 or -99 depending on the sign of the mate count
        return 99 if score_obj.white().mate() > 0 else -99
    else:
        # Convert centipawn score to regular pawn units
        return score_obj.white().score() / 100

def evaluate_moves(moves, engine_path, multi_pv_lines=5, thinking_time=1):
    try:
        board = chess.Board()
        engine = chess.engine.SimpleEngine.popen_uci(engine_path)

        evaluations = []

        for move in tqdm(moves):
            # Construct the UCI string, considering pawn promotions
            uci_move = move['from'] + move['to']
            if 'promotion' in move:
                uci_move += move['promotion'].lower()

            # Find the number of legal moves in the position
            legal_moves_count = len(list(board.legal_moves))

            # Analyse the position to the desired depth with multi-PV
            multi_pv_result = engine.analyse(board, chess.engine.Limit(time=thinking_time), multipv=min(multi_pv_lines, legal_moves_count))

            # Extract the moves and evaluations from the engine's output
            pv_moves = [info.get('pv')[0] for info in multi_pv_result if info.get('pv')]
            pv_evals = [extract_score(info.get('score')) for info in multi_pv_result]

            # If the actual move is in the top multi-PV lines, get its rank and eval, otherwise set them to -1
            actual_move = board.push_uci(uci_move)

            if actual_move in pv_moves:
                rank = pv_moves.index(actual_move) + 1
                actual_eval = pv_evals[pv_moves.index(actual_move)]
            else:
                rank = -1
                actual = engine.analyse(board, chess.engine.Limit(time=thinking_time))
                actual_eval = extract_score(actual['score'])

            best_move = pv_moves[0]
            best_eval = pv_evals[0]

            evaluations.append({
                'Best Move': best_move,
                'Best Move Eval': best_eval,
                'Ranking Real Move': rank,
                'Real Move Eval': actual_eval
            })

        engine.quit()
        return evaluations
    except Exception as e:
        print(e)
        engine.quit()
        return None


T = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789!?{~}(^)[_]@#$,./&-*++="

def decode(e):
    f = []
    g = len(e)
    for c in range(0, g, 2):
        d = {}
        b = T.index(e[c])
        a = T.index(e[c + 1])
        if a > 63:
            d["promotion"] = "qnrbkp"[int((a - 64) / 3)]
            a = b + (-8 if b < 16 else 8) + (a - 1) % 3 - 1
        if b > 75:
            d["drop"] = "qnrbkp"[b - 79]
        else:
            d["from"] = T[b % 8] + str(int(b / 8) + 1)
        d["to"] = T[a % 8] + str(int(a / 8) + 1)
        f.append(d)
    return f

def process_game(game, engine_path, multi_pv_lines=5, thinking_time=1):
    new_filename = game[:-5] + "_analysed.json"  # Assuming '.json' extension for the original game files
    if os.path.exists("../Data/Analysed/" + new_filename):
        return
    # Load game data
    with open("../Data/Games/" + game) as f:
        game_json = json.load(f)

    enc_movelist = game_json["game"]["moveList"]
    movelist = decode(enc_movelist)
    evaluation = evaluate_moves(movelist, engine_path, multi_pv_lines=multi_pv_lines, thinking_time=thinking_time)
    if not evaluation:
        print(f"Error evaluating game {game}. Skipping...")
        return
    # Modify the evaluation dictionaries
    for index, eval_dict in enumerate(evaluation):
        eval_dict["Best Move"] = eval_dict["Best Move"].uci()  # Convert chess.Move to string
        eval_dict["plycount"] = index + 1
        eval_dict["player"] = "white" if eval_dict["plycount"] % 2 == 1 else "black"
        eval_dict["difference"] = eval_dict["Real Move Eval"] - eval_dict["Best Move Eval"]
        eval_dict["difference"] *= -1 if eval_dict["player"] == "white" else 1

    # Append evaluations and additional metadata to the game's JSON
    game_json["evaluations"] = evaluation
    game_json["multi_pv_lines"] = multi_pv_lines
    game_json["thinking_time"] = thinking_time
    game_json["timestamp"] = int(time.time())

    # Write to a new file
    with open("../Data/Analysed/" + new_filename, "w") as f:
        json.dump(game_json, f, indent=4)

def should_process_file(filename,precomputed, modulus_target=0):
    """
    Hashes the filename and checks the modulus against the modulus_target.
    If they match, returns True; otherwise returns False.
    a = should return ca978112ca1bbdcafac231b39a23dc4da786eff8147c4e72b9807785afee48bb
    """
    if filename in precomputed:
        return False
    m = hashlib.sha256()
    m.update(filename.encode('utf-8'))
    hex_result = m.hexdigest()
    return int(hex_result, 16) % 2 == modulus_target

def process_game_helper(args):
    process_game(*args)

Overwriting chess_analysis.py


In [3]:
# Load precompuetd files from ../Data/filenames.txt
precomputed = []
with open("../Data/filenames.txt") as f:
    for line in f:
        precomputed.append(line.replace("_analysed","").strip())
precomputed = set(precomputed)

In [4]:
from multiprocessing import Pool
from chess_analysis import process_game_helper
import os 
import time 

def main():
    engine_path = "/opt/homebrew/bin/stockfish"
    games = [x for x in os.listdir("../Data/Games/") if "2023" in x and should_process_file(x, precomputed, 0)]
    print(f"Processing {len(games)} games...")
    # Using 8 CPUs
    num_processes = 10

    start_time = time.time()

    with Pool(processes=num_processes) as pool:
        pool.map(process_game_helper, [(game, "/opt/homebrew/bin/stockfish") for game in games])

    end_time = time.time()
    print(f"Finished processing in {end_time - start_time} seconds.")

if __name__ == "__main__":
    main()


Processing 32839 games...


0it [00:00, ?it/s]/139 [00:00<?, ?it/s]


Error evaluating game late-titled-tuesday-blitz-april-11-2023-3961659_74987644705.json. Skipping...


0it [00:00, ?it/s]


Error evaluating game early-titled-tuesday-blitz-july-11-2023-4158385_82833611587.json. Skipping...


0it [00:00, ?it/s]


Error evaluating game early-titled-tuesday-blitz-april-04-2023-3947951_74364172205.json. Skipping...


0it [00:00, ?it/s]
  1%|          | 1/87 [00:01<01:26,  1.01s/it]]

Error evaluating game early-titled-tuesday-blitz-april-11-2023-3961658_74967825275.json. Skipping...


0it [00:00, ?it/s]


Error evaluating game late-titled-tuesday-blitz-february-07-2023-3796152_69549863117.json. Skipping...


100%|██████████| 48/48 [00:58<00:00,  1.22s/it]]
100%|██████████| 58/58 [01:03<00:00,  1.09s/it]]
100%|██████████| 87/87 [01:37<00:00,  1.12s/it]]
100%|██████████| 92/92 [01:45<00:00,  1.15s/it]]
100%|██████████| 91/91 [01:47<00:00,  1.18s/it]]
100%|██████████| 90/90 [01:52<00:00,  1.25s/it]t]
100%|██████████| 63/63 [01:08<00:00,  1.09s/it]t]
100%|██████████| 124/124 [02:16<00:00,  1.10s/it]
100%|██████████| 73/73 [01:23<00:00,  1.15s/it]t]
100%|██████████| 141/141 [02:34<00:00,  1.09s/it]
100%|██████████| 139/139 [02:39<00:00,  1.15s/it]
100%|██████████| 150/150 [02:49<00:00,  1.13s/it]
100%|██████████| 64/64 [01:12<00:00,  1.13s/it]]
100%|██████████| 93/93 [01:41<00:00,  1.10s/it]t]
100%|██████████| 58/58 [01:05<00:00,  1.13s/it]]]
100%|██████████| 85/85 [01:36<00:00,  1.14s/it]t]
100%|██████████| 116/116 [02:13<00:00,  1.15s/it]
100%|██████████| 87/87 [01:47<00:00,  1.24s/it]]]
100%|██████████| 89/89 [01:47<00:00,  1.21s/it]]]
100%|██████████| 151/151 [02:52<00:00,  1.14s/it]
100%|█