In [None]:
learning_rate = 0.0015
weight_decay = 0.0001
batch_size = 512
epochs = 6

history_count = 4
filter_size = 64
main_model_blocks = 4
move_processor_blocks = 8
use_batchnorm = False

min_elo_win = 2000
min_elo_tie = 2500

### Downloader

In [None]:
! pip install -q python-chess
! pip install -q kaggle

! mkdir ~/.kaggle
! echo '{"username":"threetnt","key":"7454bbee8bfde646c02f2a915ea5d02a"}' > ~/.kaggle/kaggle.json
! chmod 600 ~/.kaggle/kaggle.json
! kaggle datasets download -d milesh1/35-million-chess-games
! unzip -p 35-million-chess-games.zip all_with_filtered_anotations_since1998.txt > rawdata.txt

### Processing

In [None]:
import pandas as pd
import numpy as np
import chess.pgn
import io
import time
import os
from sklearn.utils import resample
import warnings

warnings.filterwarnings("ignore")

def board_to_array(board):
    board_state = np.zeros((6, 8, 8), dtype=np.int8)

    piece_dict = {
        'P': 0,  # White Pawn
        'R': 1,  # White Rook
        'N': 2,  # White Knight
        'B': 3,  # White Bishop
        'Q': 4,  # White Queen
        'K': 5,  # White King
    }

    for i in range(8):
        for j in range(8):
            piece = board.piece_at(chess.square(i, j))

            if piece:
                piece_str = str(piece)
                color = int(piece_str.isupper())
                layer = piece_dict[piece_str.upper()]
                board_state[layer, 7-j, i] = color*2-1
            
    return board_state

def pgn_to_states(p):
    game_states = []

    p = io.StringIO(p)
    game = chess.pgn.read_game(p)
    p.close()

    board = game.board()
    for move in game.mainline_moves():
        board.push(move)
        board_state = board_to_array(board)
        game_states.append(board_state)

    return game_states


In [None]:
@lambda f:f()
def process():
    with open("rawdata.txt", "r") as f:
        data = f.readlines()[5:]

    filehandle = open("chess_processed_small.csv", "w")
    filehandle.write("winner,white_elo,black_elo,diff,pgn\n")

    def convert_to_pgn(move_sequence):
        moves = move_sequence.split(" ")
        pgn_moves = []
        for i in range(0, len(moves), 2):
            move_number = i // 2 + 1
            white_move = moves[i].split('.')[1]
            black_move = moves[i+1].split('.')[1] if i + 1 < len(moves) and '.' in moves[i+1] else ''
            pgn_moves.append(f"{move_number}.{white_move} {black_move}")
        return " ".join(pgn_moves)

    _time = time.time()
    _processed, _total = 0, 0

    processed_cache = ""
    for line in data:
        metadata, pgn = line.split(" ### ")
        pgn = pgn.strip()
        metadata = metadata.strip().split(" ")
        
        winner = metadata[2]
        if "blen_false" in metadata[15] and metadata[3] != "None" and metadata[4] != "None":
            welo, belo = int(metadata[3]), int(metadata[4])
            if pgn.endswith("#"):
                if welo < min_elo_win or belo < min_elo_win:
                    continue
            elif winner == "1/2-1/2":
                if welo < min_elo_tie or belo < min_elo_tie:
                    continue
            else: continue

            pgn = convert_to_pgn(pgn)
            if winner == "1-0": 
                winner = 1
            elif winner == "0-1": 
                winner = -1
            else: 
                winner = 0
            processed_cache += f"{winner},{welo},{belo},{welo - belo},{pgn}\n"
            _processed += 1
        _total += 1

        if time.time() - _time > 0.2:
            print(f"{_processed} / {_total}")
            _time = time.time()

        if _processed > 5000: # Too much data will overflow the RAM
            break

    print("done")
    filehandle.write(processed_cache)
    filehandle.close()

In [None]:
def process():
    if 'data' in os.listdir():
        if 'X.npy' in os.listdir('data') and 'Y.npy' in os.listdir('data'):
            return np.load('data/X.npy'), np.load('data/Y.npy')
    else:
        os.mkdir('data')

    df = pd.read_csv('chess_processed_small.csv')

    winner_w = df[df['winner'] == 1]
    winner_b = df[df['winner'] == -1]
    tie = df[df['winner'] == 0]

    minlen = min(len(winner_w), len(winner_b), len(tie))
    winner_w = resample(winner_w, replace=False, n_samples=minlen, random_state=1337)
    winner_b = resample(winner_b, replace=False, n_samples=minlen, random_state=1337)
    tie = resample(tie, replace=False, n_samples=minlen, random_state=1337)

    df:pd.DataFrame = pd.concat([winner_w, winner_b, tie])
    print(df['winner'].value_counts())

    X = []
    Y = []

    t = time.time()
    done = 0
    for idx, row in df.iterrows():
        states = pgn_to_states(row['pgn'])
        winner = row['winner']
        
        laststates = [np.zeros((6, 8, 8), dtype=np.int8) for i in range(4)]
        for state in states:
            del laststates.pop(0)
            laststates.append(state)
            X.append(np.array(laststates).reshape((6*history_count, 8, 8)))
            Y.append(winner)
        
        if time.time() - t > 1:
            print("Done:", done, "| Time elapsed:", time.time() - t)
            t = time.time()

    X = np.array(X)
    Y = np.array(Y)

    np.save('data/X.npy', X)
    np.save('data/Y.npy', Y)

    return X, Y

X, Y = process()

### Training

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

# Basic block of a ResNet
class ResBlock(nn.Module):
    def __init__(self) -> None:
        super().__init__()
        self.convs = nn.Sequential(
            nn.Conv2d(filter_size, filter_size, kernel_size=3, padding=1),
            nn.LeakyReLU(),
            nn.Conv2d(filter_size, filter_size, kernel_size=3, padding=1),
        ) if not use_batchnorm else nn.Sequential(
            nn.Conv2d(filter_size, filter_size, kernel_size=3, padding=1),
            nn.BatchNorm2d(filter_size),
            nn.LeakyReLU(),
            nn.Conv2d(filter_size, filter_size, kernel_size=3, padding=1),
            nn.BatchNorm2d(filter_size),
        )
    
    def forward(self, x):
        x1 = self.convs(x)
        x1 += x
        return F.leaky_relu(x1)

# Processes a single state from the last 4 states
class MoveProcessor(nn.Module):
    def __init__(self) -> None:
        super().__init__()
        self.convs = nn.Sequential(
            nn.Conv2d(6, filter_size, kernel_size=3, padding=1),
            nn.LeakyReLU(),
            *[ResBlock() for _ in range(move_processor_blocks)],
            nn.Conv2d(filter_size, filter_size, kernel_size=3, padding=1),
            nn.LeakyReLU(),
        )
    
    def forward(self, x):
        return self.convs(x)

# The main model
class ChessNet(nn.Module):
    def __init__(self) -> None:
        super().__init__()
        self.move_processor = MoveProcessor()
        self.convs = nn.Sequential(
            nn.Conv2d(filter_size * history_count, filter_size, kernel_size=1, padding=0),
            *[ResBlock() for _ in range(main_model_blocks)],
            nn.Flatten(),
            nn.Linear(64*8*8, 256),
            nn.LeakyReLU(),
            nn.Linear(256, 1),
            nn.Tanh()
        )
    
    def forward(self, x):
        # We do this because it leverages the GPU more
        x = self.move_processor(
            x.reshape(x.shape[0]*history_count, 6, 8, 8)
        ).reshape(
            x.shape[0], filter_size*history_count, 8, 8
        )
        return self.convs(x)
    
    def predict(self, x):
        x = x.reshape(1, 6*history_count, 8, 8)
        return self.forward(x).item()