In [68]:
import numpy as np
import tensorflow as tf
import chess.pgn
import os

In [69]:
# Parse Dataset (PGN) to Input Matrix
inp_shape = (13, 8, 8) 

# the first index is determined by piece type
king = 0
queen = 1
rook = 2
bishop = 3
knight = 4
pawn = 5

san_translate = {
    "p": pawn,
    "n": knight,
    "b": bishop,
    "r": rook, 
    "q": queen,
    "k": king
}

# structure of matrix:
#    - White is on top (0 - 5)
#    - Black is mirrored (6 - 11)
#    - Next move is 12 (0 - black, 1 - white)

def ajust_color(index, is_white):
    if is_white:
        return index
    else:
        return inp_shape[0] - index - 1

def get_index(san):
    is_white = san.isupper()
    index = san_translate[san.lower()]
    return ajust_color(index, is_white)


def fen_to_tensor(fen, move):
    ret = np.zeros(inp_shape)
    y = 0
    for rank in fen.split("/"):
        x = 0
        for char in rank:
            if char.isnumeric():
                x += int(char)
            else:
                ret[get_index(char)][y][x] = 1
                x += 1
        y += 1
    
    ret[-1] = move
    return tf.constant(ret, dtype=tf.int8)

In [70]:
# Returns X, Y
def game_to_tensor(game: chess.pgn.Game, beta: float=0.875) -> tuple:
    board = game.board()
    X = []
    Y = []
    result = 1 if game.headers["Result"] == "1-0" else -1

    for move in game.mainline_moves():
        board.push(move)
        fen = board.fen()
        
        index = fen.find(" ")
        fen = fen[:index]
        turn = 1 if board.turn == chess.WHITE else 0 


        board_tensor = fen_to_tensor(fen, turn)
        X.append(board_tensor)

    moves = len(X)
    for i in range(moves):
        Y.append(result * beta**(moves - i - 1))

    X = tf.stack(X)
    Y = tf.constant(np.array(Y), dtype=tf.float32)

    return X, Y

In [71]:
max_mem_limit = 1E9
max_tensors = max_mem_limit // (200 * ((np.prod(np.array(inp_shape)) + 1) + 70))

def parallel_shuffle(a, b):
    rng_state = np.random.get_state()
    np.random.shuffle(a)
    np.random.set_state(rng_state)
    np.random.shuffle(b)

def parse_games(filename, out_dir, formatter="data_{}"):
    pgn_file = open(filename)

    game = chess.pgn.read_game(pgn_file)
    X_set, Y_set = ([], [])
    save_index = 0
    while game:
        if game.headers["Result"] == "1/2-1/2":
            game = chess.pgn.read_game(pgn_file)
            continue

        X, Y = game_to_tensor(game)

        if len(X.shape) != 4 or X.shape[0] < 4:
            game = chess.pgn.read_game(pgn_file)
            continue

        X_set.append(X)
        Y_set.append(Y)

        if len(Y_set) > max_tensors:
            # save
            X_set = tf.concat(X_set, 0)
            Y_set = tf.concat(Y_set, 0)
            file_loc = os.path.join(out_dir, formatter.format(save_index))
            X_np = X_set.numpy()
            Y_np = Y_set.numpy()

            X_set, Y_set = ([], [])
            
            parallel_shuffle(X_np, Y_np)
            np.savez_compressed(file_loc, X=X_np, Y=Y_np)
            print("saved chunk {}".format(save_index))

            save_index += 1

        game = chess.pgn.read_game(pgn_file)

    # save final chunk
    X_set = tf.concat(X_set, 0)
    Y_set = tf.concat(Y_set, 0)
    file_loc = os.path.join(out_dir, formatter.format(save_index))
    np.savez_compressed(file_loc, X=X_set.numpy(), Y=Y_set.numpy())
    print("done...")


In [72]:
parse_games("../data/raw/small/games.pgn", "../data/processed/small")

saved chunk 0
saved chunk 1
saved chunk 2
saved chunk 3
saved chunk 4
saved chunk 5
saved chunk 6
saved chunk 7
saved chunk 8
saved chunk 9
saved chunk 10
saved chunk 11
saved chunk 12
done...


In [73]:
max_tensors

5537.0