In [1]:
import numpy as np
import chess 
import chess.pgn
import random
from datetime import datetime
import pickle
import os
from tqdm import tqdm
import pandas as pd

In [57]:
def convert_to_float(frac_str):
    '''
    Adapted from:
    James Errico
    https://stackoverflow.com/questions/1806278/convert-fraction-to-float
    '''
    try:
        return float(frac_str)
    except ValueError:
        try:
            num, denom = frac_str.split('/')
        except ValueError:
            return None
        try:
            leading, num = num.split(' ')
        except ValueError:
            try:
                return float(num) / float(denom)        
            except:
                return None
        if float(leading) < 0:
            sign_mult = -1
        else:
            sign_mult = 1
        return float(leading) + sign_mult * (float(num) / float(denom))
    

def set_seed(seed_num):
    '''
    Sets the seed for the python random module
    '''
    random.seed(seed_num)
    
def outcome(game):
    '''
    Returns the results of a game.
        - 0: White won
        - 1: Black won
        - 2: Draw
    '''
    return str(game.headers['Result'][-1])


    
def pgn_to_fen(game):
    '''
    Reads a game and parses through the moves of the entire game. Returns two 
    lists, position_list and move_list which contains a list of FEN and SAN notation respectively.
    '''
    node = game
    position_list = []
    move_list = []
    comments = []
    
    while not node.is_end():
        next_node = node.variation(0)
        position_list.append(next_node.board().fen())
        move_list.append(node.board().san(next_node.move))
        comments.append(node.comment)
        node = next_node
        
    return position_list, move_list, comments

def fen_to_bitboard(fen):
    '''
    Takes in a FEN position and returns a bitboard notation
    '''
    chess_pieces = {
        'p': 0,
        'n': 1,
        'b': 2,
        'r': 3,
        'q': 4,
        'k': 5,
        'P': 6,
        'N': 7,
        'B': 8,
        'R': 9,
        'Q': 10,
        'K': 11
    }
    
    bitboard = [0]*773
    currIndex = 0
    [position, turn, castling, _, _, _] = fen.split(' ')
    for ch in position:
        if ch == '/':
            continue
        elif ch >= '1' and ch <= '8':
            currIndex += (ord(ch) - ord('0')) * 12
        else:
            bitboard[currIndex + chess_pieces[ch]] = 1
            currIndex += 12
    bitboard[768] = 1 if turn == 'w' else 0
    bitboard[769] = 1 if 'K' in castling else 0
    bitboard[770] = 1 if 'Q' in castling else 0
    bitboard[771] = 1 if 'k' in castling else 0
    bitboard[772] = 1 if 'q' in castling else 0
    return bitboard

def games_to_bitboard(*, file_path, num_positions=10, num_omit=5, seed_num=0, fp):
    '''
    Acts as the driver function that reads the games and call all other functions to
    eventually write the:
        - bitboard representation 
        - label
    into a pickle file
    '''
    # Set seed and print current time
    set_seed(seed_num)
    print("Start Time =", datetime.now().strftime("%H:%M:%S"))     
    
    # Initialize Variables
    dataframe = {
    'bitboard':[],
    'label':[],
    'comment':[]
    }
    pgn = open(file_path)
    game = chess.pgn.read_game(pgn)
    counter = 0
    file_num = 1
    
    # Go through all games in given pgn file
    while game is not None:
        result = int(outcome(game))
        # If results is not a draw, then we record the games
        # result = 0 means white won
        # result = 1 means black won
        # result = 2 means draw
        if result != 2:
            position_list, move_list, comments = pgn_to_fen(game)
            position_list = sample_position(position_list, move_list, comments, num_positions, num_omit)
            for pos in position_list:
                dataframe['bitboard'].append(fen_to_bitboard(pos[0][0]))
                dataframe['label'].append(result)
                dataframe['comment'].append(pos[1])
                counter += 1
                # Every record_num moves, we write to a pickle file (using file_num to track
                # the updated pickle file) and clear our dataframe
                record_num = 10000
                if counter%record_num == 0:
                    dataframe = pd.DataFrame(dataframe)
                    # dataframe['bitboard'].to_numpy()
                    # dataframe['bitboard'] = dataframe['bitboard'].apply(lambda x: np.array(x).astype('uint8'))
                    # dataframe['comment'] = dataframe['comment'].apply(lambda x: x.split()[-2] if len(x.split())>=2 else None)
                    # dataframe['comment'] = dataframe['comment'].apply(lambda x: float(x.split('/')[-2]) if x and len(x.split('/'))>=2 else None)
                    # # dataframe['comment'] = dataframe['comment'].apply(convert_to_float)
                    return dataframe

                            
        # Read the next game (chess.pgn.read_game() is a generator)
        game = chess.pgn.read_game(pgn)
    
    # Write the remaining moves to a pkl file
    write_to_pickle(fp+'_'+str(file_num)+'.pkl', dataframe)
    print('Total Moves Recorded: {}, Remaining {} moves recorded in {}, Time: {}'.format(counter, len(dataframe['bitboard']), fp+'_'+str(file_num)+'.pkl', datetime.now().strftime('%H:%M:%S')))
    print()
    
def write_to_pickle(fp, dataframe):
    with open(fp, 'wb') as f:
        pickle.dump(dataframe, f)
        
        
def sample_position(position_list, move_list, comments, num_positions, num_omit):
    '''
    Returns a list of candidate positions for training our model. These positions must
    fit the criteria of:
        - Only taken after first num_omit steps,
        - Does not involve a capture move
    '''
    # Only look for moves after first num_omit steps
    pos = position_list[num_omit:]
    moves = move_list[num_omit:]
    comments = comments[num_omit:]
    
    # Exclude moves that involved a capture (i.e. has an 'x' notation in the moves list)
    pos = [(pos[i], comments[i]) for i in range(len(moves)) if 'x' not in moves[i]]
    if len(pos) > num_positions:
        return random.sample(list(zip(pos, comments)), num_positions)
    
    return pos

In [58]:
df = pd.DataFrame(columns=['bitboard','label','comment'])
for f in os.listdir('../data/commented_pgn/'):
    file_path = f'../data/commented_pgn/{f}'
    fp = f'../data//bitboard_stockfish/stockfish_{f}'
    df = pd.concat([df, games_to_bitboard(file_path=file_path, fp=fp)])


Start Time = 00:34:27
Start Time = 00:40:42
Start Time = 00:45:23
Start Time = 00:49:09
Start Time = 00:53:28
Start Time = 00:57:19
Start Time = 01:01:13
Start Time = 01:05:11
Start Time = 01:09:00
Start Time = 01:12:42
Start Time = 01:16:30


In [63]:
df.to_pickle(path='../data/bitboard_stockfish/stockfish_df_raw.pkl')

In [102]:
dataframe = pd.read_pickle('../data/bitboard_stockfish/stockfish_df_raw.pkl')
dataframe['comment'] = dataframe['comment'].apply(lambda x: x.split()[-2] if x and len(x.split())>=2 else None)
dataframe['comment'] = dataframe['comment'].apply(lambda x: x.split('/')[-2] if x and len(x.split('/'))>=2 else None)
# dataframe['comment'] = dataframe['comment'].apply(lambda x: float(x) if x and len(x.split('/'))>=2 else None)

comments = list(dataframe['comment'])
for i,c in enumerate(comments):
    try:
        comments[i] = float(comments[i])
    except:
        comments[i] = None

dataframe['score'] = comments
dataframe = dataframe.drop(['comment'], axis=1)

dataframe.head()

Unnamed: 0,bitboard,label,score
0,"[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ...",0,-0.72
1,"[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ...",0,-0.14
2,"[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ...",0,-0.84
3,"[0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ...",0,
4,"[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ...",0,-0.09


In [104]:
dataframe.to_pickle('../data/bitboard_stockfish/stockfish_df.pkl')