In [4]:
!pip install chess

Collecting chess
  Downloading chess-1.11.1.tar.gz (156 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m156.5/156.5 kB[0m [31m5.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: chess
  Building wheel for chess (setup.py) ... [?25l[?25hdone
  Created wheel for chess: filename=chess-1.11.1-py3-none-any.whl size=148499 sha256=857f0e0f015000d3f430026e6d2c056a914510bd1b3e28f4878a7efb2b90da49
  Stored in directory: /root/.cache/pip/wheels/2e/2d/23/1bfc95db984ed3ecbf6764167dc7526d0ab521cf9a9852544e
Successfully built chess
Installing collected packages: chess
Successfully installed chess-1.11.1


In [46]:
import pandas as pd
import numpy as np
import torch
import chess.pgn
from tqdm.notebook import tqdm
import re
from torch import nn
import torch.nn.functional as F
import time
device = "cuda" if torch.cuda.is_available() else "cpu"

In [6]:
df = pd.read_csv('/kaggle/input/chess-games/chess_games.csv')
games = df[(df['WhiteElo'] + df['BlackElo']) / 2 >= 2000]['AN']
games = [game for game in games if '{' not in game]

In [8]:
def get_moves(game):
    return re.sub('\d*\. ', '', game).split()[:-1]

In [9]:
def fen_to_tensor(fen):
    position = fen.split()[0]
    color = fen.split()[1]
    res = torch.zeros((6, 8, 8))
    for i, row in enumerate(position.split('/')):
        j = 0
        for c in row:
            if c.isdigit():
                j += int(c)
            else:
                if c == 'p':
                    res[0, i, j] = -1
                elif c == 'P':
                    res[0, i, j] = 1
                elif c == 'r':
                    res[1, i, j] = -1
                elif c == 'R':
                    res[1, i, j] = 1
                elif c == 'n':
                    res[2, i, j] = -1
                elif c == 'N':
                    res[2, i, j] = 1
                elif c == 'b':
                    res[3, i, j] = -1
                elif c == 'B':
                    res[3, i, j] = 1
                elif c == 'q':
                    res[4, i, j] = -1
                elif c == 'Q':
                    res[4, i, j] = 1
                elif c == 'k':
                    res[5, i, j] = -1
                elif c == 'K':
                    res[5, i, j] = 1
                j += 1
    if color == 'b':
        res = -1 * res
    return res

In [10]:
class ChessDataset(torch.utils.data.Dataset):

    def __init__(self, games):
        self.games = games

    def __len__(self):
        return len(self.games)

    def __getitem__(self, idx):
        X = []
        y = []
        game = self.games[idx]
        moves = get_moves(game)
        board = chess.Board()
        for move in moves:
            
            X.append(fen_to_tensor(board.fen()))
            
            move = board.push_san(move)
            y_from = torch.zeros(64)
            y_from[move.from_square] = 1
            y_to = torch.zeros(64)
            y_to[move.to_square] = 1

            y.append(torch.stack([y_from, y_to]))
            
        return torch.stack(X), torch.stack(y)

In [11]:
dataset = ChessDataset(games)
X, y = dataset.__getitem__(0)

In [12]:
loader = torch.utils.data.DataLoader(
    dataset, batch_size=1, shuffle=True
)

In [13]:
class module(nn.Module):

    def __init__(self, hidden_size):
        super(module, self).__init__()
        self.conv1 = nn.Conv2d(hidden_size, hidden_size, 3, stride=1, padding=1)
        self.conv2 = nn.Conv2d(hidden_size, hidden_size, 3, stride=1, padding=1)
        self.bn1 = nn.BatchNorm2d(hidden_size)
        self.bn2 = nn.BatchNorm2d(hidden_size)
        self.activation1 = nn.SELU()
        self.activation2 = nn.SELU()

    def forward(self, x):
        x_input = torch.clone(x)
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.activation1(x)
        x = self.conv2(x)
        x = self.bn2(x)
        x = x + x_input
        x = self.activation2(x)
        return x

In [14]:
class ChessNet(nn.Module):
    def __init__(self, hidden_layers=4, hidden_size=200):
        super(ChessNet, self).__init__()
        self.hidden_layers = hidden_layers
        self.input_layer = nn.Conv2d(6, hidden_size, 3, stride=1, padding=1)
        self.module_list = nn.ModuleList([module(hidden_size) for i in range(hidden_layers)])
        self.output_layer = nn.Conv2d(hidden_size, 2, 3, stride=1, padding=1)

    def forward(self, x):
        x = self.input_layer(x)
        x = F.relu(x)

        for i in range(self.hidden_layers):
            x = self.module_list[i](x)

        x = self.output_layer(x)

        return torch.flatten(x, 2)

In [None]:
import wandb
wandb.login(key='d11d73a7c6b0d8fb009db80fc66f63cd11b7811d')

In [None]:
run = wandb.init(
    # Set the project where this run will be logged
    project="chess",
    # Track hyperparameters and run metadata
    config={
        "learning_rate": 3e-4,
        "epochs": 1,
    },
)

In [None]:
model = ChessNet()
model.to(device)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.AdamW(model.parameters(), lr=3e-4)
epochs = 1

In [None]:
for epoch in tqdm(range(1, epochs + 1)):
    print(f'Epoch {epoch}/{epochs}')
    for X, y in tqdm(loader):
        optimizer.zero_grad()
        
        X = X.squeeze()
        if X.dim() == 3:
            continue
        
        y_pred = model(X.to(device))
        y_true = y.squeeze().to(device)
        
        loss_from = criterion(y_pred[:, 0, :], y_true[:, 0, :])
        loss_to = criterion(y_pred[:, 1, :], y_true[:, 1, :])
        loss = loss_from + loss_to

        wandb.log({'loss': loss})
        
        loss.backward()
        optimizer.step()

In [None]:
torch.save(model.state_dict(), '/kaggle/working/ckeckpoint.pth')