In [None]:

import torch
import torch.utils.data
import pytorch_lightning as pl
from torch.utils.data import DataLoader
from pytorch_lightning.loggers import TensorBoardLogger

import chess
from network import ChessBoardEvalNN
from data.dataset import ChessPositionsDataset
from data.database import Database

import data.database

%load_ext autoreload
# %load_ext line_profiler
%autoreload 2

The following converts a number games from the specified pgn file to the individual positions.
The positions will be one hot encoded and stored in the database.

For each type of piece a 8x8 matrix is used and if a respective piece is located on a square it is assigned a 1 and else 0.
Due to sparsity we store this data in sparse format by only storing indicies.
All positions are mirrored to be from the perspective of the white player as this makes training easier for the DNN as it only has to learn to play from one perspective.

We only extract the games which where annotated by a engine before as this takes a lot of computational ressources to do.
We normalize and translate the scores into an interval from -1 and 1 (where 1 indicates an advantage for black and 1 for white).


We keep track (by hashing) which games already are stored and therefore dont allow them to be processed and stored multiple times.

NOTE: This process takes a lot of time (on my device ~1h per 100_000 games).

In [None]:
with Database() as db:
    db.store_positions_from_pgn_file("./data/pgn/comp-2019-03.pgn", num_games = 300_000)

Let's check how many different games we have currently stored and how many positions they contain.

-> we make sure every position is only stored once as well

In [None]:


with Database() as db:
    count = db.cur.execute("SELECT COUNT(*) FROM games").fetchone()[0]
    print(f"{count} games stored")
    count = db.cur.execute("SELECT COUNT(*) FROM positions").fetchone()[0]
    print(f"{count} positions stored")


So now lets create a Dataset to be used for training.

The ChessPostionsDataset accesses the SQLite database in a dynamic (load as you use) and therefore the memory consumption on the GPU for training is managable.

In [None]:
dataset = ChessPositionsDataset(num_positions = 8_414_441)

# maybe inspect how one dataelement looks like
# print(dataset[0])

Now we split our data into different sets.

In [None]:
val_set, train_set = torch.utils.data.random_split(dataset, [414_441, 8_000_000], generator=torch.Generator().manual_seed(42))

Take a look how the evalutations are distributed. Looks like a normal distribution. Nice

In [None]:
import seaborn as sns
import matplotlib.pyplot as plt

ax = sns.displot(dataset.get_all_evaluations())

Now we define our training process and our model.

We use PyTorch Lightning as it eases up the whole process and makes it less complicated.

We also log the whole process with TensorBoard to better understand whats going on.

In [None]:
import multiprocessing


hparams = {"learning_rate": 1e-3}

# create a new model
model : ChessBoardEvalNN = ChessBoardEvalNN(hparams=hparams)

# OR: import a previous version already trained some weights) of the model
# model = ChessBoardEvalNN.load_model(hparams)

logger = TensorBoardLogger("./tb_logs", name="ChessNN", default_hp_metric=False, log_graph=True)

trainer = pl.Trainer(
    max_epochs=20,
    accelerator="gpu" if torch.cuda.is_available() else None,
    logger = logger,
    log_every_n_steps=1
)

Now start fitting our model to the data on see how it performs.

In [None]:
train_dataloader = DataLoader(train_set, batch_size=1024, num_workers=multiprocessing.cpu_count(), shuffle=True)
val_dataloader = DataLoader(val_set, batch_size=1024, num_workers=multiprocessing.cpu_count())

trainer.fit(model, train_dataloader, val_dataloader)

In [None]:

for i in range(2000, 2020):
    board, target_id, evaluation = dataset[i]
    board = board.to(model.device)
    board = board[None, :]
    model.eval()
    moves, score = model.forward(board)
    print(f"truth eval: {evaluation}")
    print(f"predicted eval: {score}")
    print(f"truth move: {target_id.argmax()}")
    print(f"predicted move: {torch.topk(moves.flatten(), 5).indices} | prob: {moves[torch.topk(moves.flatten(), 5).indices]}")

model.save_model("models/model1.model")