In [None]:
# This was a feat to develop and tune as it required working with an unwieldy library that made parallel processing of engine evaluations very difficult, slowing preprocessing to a halt. After many iterations, we landed on a set of functions that reduced the runtime drastically, enough to work with out time table.

In [None]:
# Install Python Chess Library

#!pip install chess
#!pip install pandas
#!pip install numpy

In [1]:
import pandas as pd
import chess.engine
import chess.pgn
import numpy as np
import math

In [None]:
# The handle_nonetypes function handles cases when the engine is unable to evaluate a position within the time limit; where the evaluation score produced by the engine is NoneType. It computes an average score for a streak of non-None scores by taking the average of the last non-None score encountered and the next known evaluation ( ie non-None ).

def handle_nonetypes(scores):
    newScores = []
    lastScore = 0
    endOfStreak = False
    steakLen =  0
    for score in scores:
        if score is not None:
            if endOfStreak:
                updatedScore = ( lastScore + score )/ 2
                newScores += [updatedScore] * steakLen
                steakLen = 0
                endOfStreak = False
            lastScore = score
            newScores.append(score)
        else:
            steakLen += 1
            endOfStreak = True
    return newScores if len(newScores) > 0 else [0.0] * len(scores)

In [None]:
# The process_moves_fast function analyzes the move in a given game and encodes average evaluation score and the average emt (elapsed movetimes) for each segment of a game (opening, midgame and endgame). It also extracts the result and result comment (more detailed result outcome detailing how the game ended) of the game comments.

def process_moves_fast(game):
    board = chess.Board()
    game_dict = dict(game.headers)
    df = pd.DataFrame([game_dict.values()], columns=game_dict.keys())
    game_len = int(game.headers["PlyCount"])
    avg_evals = {i: [] for i in range(3)}
    avg_emts = {i: [] for i in range(3)}
    scores = []
    times = []

    for i, node in enumerate(game.mainline()):
        board.push(node.move)
        analysis = engine.analyse(board, chess.engine.Limit(time=0.01))
        score = analysis["score"].white().score()
        scores.append(score)
        comment = node.comment
        if game_len < 1:
            avg_evals = [0.0] * 3
            avg_emts = [0.0] * 3
        elif game_len < 3:
            if i == game_len - 1:
                avg_evals = [np.nanmean(handle_nonetypes(scores))] + [0.0] * (3- game_len)
                avg_emts = [np.nanmean(handle_nonetypes(times))] + [0.0] * (3- game_len)
        elif (i == game_len-1):
            avg_evals[2].append(np.nanmean(handle_nonetypes(scores)))
            avg_emts[2].append(np.mean(times))
        elif (i % (game_len // 3) == 0 and i > 0) :
            segment = (i - 1) // (game_len // 3)
            avg_evals[segment].append(np.nanmean(handle_nonetypes(scores)))
            avg_emts[segment].append(np.mean(times))
            scores = []
            times = []

        if comment:
            try:
                emt = float(comment.split()[-1][1:-1])
                times.append(emt)
            except (ValueError, IndexError):
                pass

    new_evals = []
    new_emts = []
    evalLen = len(avg_evals)
    evalLast = avg_evals[evalLen-1]
    if (evalLen < 3):
        avg_evals = avg_evals + ([evalLast] * (3 - evalLen))
        avg_emts = avg_emts + ([0.0] * (3 - evalLen)) 
    avg_evals = [np.nanmean(avg_evals[i]) for i in range(3)]
    avg_emts = [np.nanmean(avg_emts[i]) for i in range(3)]

    df['AvgEvalOpening']=avg_evals[0]
    df['AvgEvalMiddle']=avg_evals[1]
    df['AvgEvalEnd']=avg_evals[2]
    df['AvgEmtOpening']=avg_emts[0]
    df['AvgEmtMiddle']=avg_emts[1]
    df['AvgEmtEnd']=avg_emts[2]
    df['ResultComment']=game.end().comment
    return df

In [None]:
# The script is run on each month of data for the last 12 months and loads each game, reading the PGN file, processing each game using process_moves_fast, and appending the resulting dataframe. That list of dataframes is then concatenated and written to a CSV file with the name of the year for later cleaning and training.

#Loop through each year to load and convert each pgn to csv
years = ['January','February','March','April','May','June','July','August','September','October','November','December']
for year in years:
    with open(year + '.pgn') as f:
        games_in_pgn = (math.floor(sum(line.isspace() for line in f)/2))
        print(games_in_pgn)

    engine = chess.engine.SimpleEngine.popen_uci("./stockfish-windows-2022-x86-64-avx2.exe")

    # def process_headers(game):
    #     board = chess.Board()
    #     df = dict(game.headers)
    #     return (df)


    dfs = []
    with open(year + '.pgn') as f:

        game = chess.pgn.read_game(f)

        # Skip to a game in the pgn
        gameIndex = 0
        for _ in range(gameIndex):
            _ = chess.pgn.read_game(f)

        # Generate df with headers and moves by reading each pgn and running it through process_moves
        i = 0
        gamesToDo = games_in_pgn
        while game and i < (gamesToDo - gameIndex):

            if (i % (gamesToDo//10) == 0):
                perCent = float(i*100/gamesToDo)
                print ('Completion: {perCent}%'.format(perCent=perCent))

            moves = process_moves_fast(game)
            dfs.append(moves)
            game = chess.pgn.read_game(f)
            i+=1
        print ("Completion: 100%\n Done")
    df = pd.concat(dfs, ignore_index=True)
    df.to_csv(year + '.csv',index=False)