In [4]:
import chess.pgn
import pandas as pd
from tqdm import tqdm
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import as_completed
import os
import numpy as np

import warnings
warnings.filterwarnings("ignore", category=FutureWarning)

In [102]:
# create csv file
# so we can append to it instead of having the whole dataset in memory

data = pd.DataFrame(columns=['board', 'move', 'result'])

data.to_csv('data/dataset.csv', index=False)

In [12]:
from dataclasses import dataclass
from enum import Enum
from typing import Tuple
from chess import PieceType
import numpy as np


class QueenDirection(Enum):
    # eight directions
    NORTHWEST = 0
    NORTH = 1
    NORTHEAST = 2
    EAST = 3
    SOUTHEAST = 4
    SOUTH = 5
    SOUTHWEST = 6
    WEST = 7


class KnightMove(Enum):
    # eight possible knight moves
    NORTH_LEFT = 0  # diff == -15
    NORTH_RIGHT = 1  # diff == -17
    EAST_UP = 2  # diff == -6
    EAST_DOWN = 3  # diff == 10
    SOUTH_RIGHT = 4  # diff == 15
    SOUTH_LEFT = 5  # diff == 17
    WEST_DOWN = 6  # diff == 6
    WEST_UP = 7  # diff == -10


class UnderPromotion(Enum):
    KNIGHT = 0
    BISHOP = 1
    ROOK = 2


class Mapping:
    """
    The mapper is a dictionary of moves.
    * the index is the type of move
    * the value is the plane's index, or an array of plane indices (for distance)
    """
    # knight moves from north_left to west_up (clockwise)
    knight_mappings = [-15, -17, -6, 10, 15, 17, 6, -10]

    def get_index(self, piece_type: PieceType, direction: Enum, distance: int = 1) -> int:
        if piece_type == PieceType.KNIGHT:
            return 56 + KnightMove(direction).value
        else:
            return QueenDirection(direction) * 8 + distance

    @staticmethod
    def get_underpromotion_move(piece_type: PieceType, from_square: int, to_square: int) -> Tuple[UnderPromotion, int]:
        piece_type = UnderPromotion(piece_type - 2)
        diff = from_square - to_square
        if to_square < 8:
            # black promotes (1st rank)
            direction = diff - 8
        elif to_square > 55:
            # white promotes (8th rank)
            direction = diff + 8
        return (piece_type, direction)

    @staticmethod
    def get_knight_move(from_square: int, to_square: int) -> KnightMove:
        return KnightMove(Mapping.knight_mappings.index(from_square - to_square))

    @staticmethod
    def get_queenlike_move(from_square: int, to_square: int) -> Tuple[QueenDirection, int]:
        diff = from_square - to_square
        if diff % 8 == 0:
            # north and south
            if diff > 0:
                direction = QueenDirection.SOUTH
            else:
                direction = QueenDirection.NORTH
            distance = int(diff / 8)
        elif diff % 9 == 0:
            # southwest and northeast
            if diff > 0:
                direction = QueenDirection.SOUTHWEST
            else:
                direction = QueenDirection.NORTHEAST
            distance = np.abs(int(diff / 8))
        elif from_square // 8 == to_square // 8:
            # east and west
            if diff > 0:
                direction = QueenDirection.WEST
            else:
                direction = QueenDirection.EAST
            distance = np.abs(diff)
        elif diff % 7 == 0:
            if diff > 0:
                direction = QueenDirection.SOUTHEAST
            else:
                direction = QueenDirection.NORTHWEST
            distance = np.abs(int(diff / 8)) + 1
        else:
            raise Exception("Invalid queen-like move")
        return (direction, distance)

    mapper = {
        # queens
        QueenDirection.NORTHWEST: [0, 1, 2, 3, 4, 5, 6],
        QueenDirection.NORTH: [7, 8, 9, 10, 11, 12, 13],
        QueenDirection.NORTHEAST: [14, 15, 16, 17, 18, 19, 20],
        QueenDirection.EAST: [21, 22, 23, 24, 25, 26, 27],
        QueenDirection.SOUTHEAST: [28, 29, 30, 31, 32, 33, 34],
        QueenDirection.SOUTH: [35, 36, 37, 38, 39, 40, 41],
        QueenDirection.SOUTHWEST: [42, 43, 44, 45, 46, 47, 48],
        QueenDirection.WEST: [49, 50, 51, 52, 53, 54, 55],
        # knights
        KnightMove.NORTH_LEFT: 56,
        KnightMove.NORTH_RIGHT: 57,
        KnightMove.EAST_UP: 58,
        KnightMove.EAST_DOWN: 59,
        KnightMove.SOUTH_RIGHT: 60,
        KnightMove.SOUTH_LEFT: 61,
        KnightMove.WEST_DOWN: 62,
        KnightMove.WEST_UP: 63,
        # underpromotions
        UnderPromotion.KNIGHT: [64, 65, 66],
        UnderPromotion.BISHOP: [67, 68, 69],
        UnderPromotion.ROOK: [70, 71, 72]
    }


In [13]:
def result_to_tahn(result: str) -> float:
    if result == '1-0':
        return 1
    elif result == '0-1':
        return -1
    else:
        return 0

def one_hot_encode_board(board: chess.Board) -> np.ndarray:
    one_hot_board = np.zeros((8, 8, 119))

    for i in range(8):
        for j in range(8):
            piece = board.piece_at(i*8+j)
            if piece:
                if piece.color == chess.WHITE:
                    one_hot_board[i][j][piece.piece_type - 1] = 1
                else:
                    one_hot_board[i][j][6 + piece.piece_type - 1] = 1

    if board.has_kingside_castling_rights(chess.WHITE):
        one_hot_board[0][7][12] = 1
    if board.has_queenside_castling_rights(chess.WHITE):
        one_hot_board[0][7][13] = 1
    if board.has_kingside_castling_rights(chess.BLACK):
        one_hot_board[7][7][14] = 1
    if board.has_queenside_castling_rights(chess.BLACK):
        one_hot_board[7][7][15] = 1

    if board.ep_square:
        col = board.ep_square % 8
        row = board.ep_square // 8
        one_hot_board[row][col][16] = 1

    one_hot_board[:, :, 17] = board.halfmove_clock / 50

    return one_hot_board

def determine_plane_index(board, move):
    from_square = move.from_square
    to_square = move.to_square
    piece = board.piece_at(from_square)
    direction = None
    if piece is None:
        raise Exception(f"No piece at {from_square}")
    if move.promotion and move.promotion != chess.QUEEN:
        piece_type, direction = Mapping.get_underpromotion_move(
            move.promotion, from_square, to_square)
        plane_index = Mapping.mapper[piece_type][1 - direction]
    else:
        if piece.piece_type == chess.KNIGHT:
            direction = Mapping.get_knight_move(from_square, to_square)
            plane_index = Mapping.mapper[direction]
        else:
            direction, distance = Mapping.get_queenlike_move(
                from_square, to_square)
            plane_index = Mapping.mapper[direction][np.abs(distance)-1]
    row = from_square % 8
    col = 7 - (from_square // 8)    
    return [plane_index, row, col]

In [31]:
# Initialize an empty DataFrame to store the data
data = pd.DataFrame(columns=['board', 'move', 'move_prob','result'])

# Iterate over all the PGN files in a directory
for file in tqdm(os.listdir('data/Lichess Elite Database')):
    if file.endswith('.pgn'):
        file = os.path.join('data/Lichess Elite Database', file)
        with open(file, 'r') as pgn:
            game = chess.pgn.read_game(pgn)
            while game is not None:
                # Process the game and extract the board, value, and move probs
                board = game.board()
                result = game.headers['Result']
                for move in game.mainline_moves():
                    policy_output = np.zeros((73,8,8))
                    move_mapping = determine_plane_index(board, move)
                    policy_output[move_mapping[0]][move_mapping[1]][move_mapping[2]] = 1
                    data = data.append({'board_fen': board.fen(),'board': one_hot_encode_board(board), 'move': move.uci(), 'move_prob': policy_output ,'result': result_to_tahn(result)}, ignore_index=True)
                    board.push_uci(move.uci())
                game = chess.pgn.read_game(pgn)

    datafilename = 'data/df' + file[-11:-4] + '.pkl'

    # overwrite pickle file
    data.to_pickle(datafilename)

    # clear data in memory
    data = pd.DataFrame(columns=['board', 'move', 'result'])

 15%|█▌        | 12/80 [13:45<1:17:59, 68.81s/it]


KeyboardInterrupt: 