In [1]:
import os
import re
import random
import chess
import torch
import torch.nn as nn
import torch.optim as optim
from chess import Piece
from torch.utils.data import Dataset, DataLoader
from torch.optim.lr_scheduler import MultiplicativeLR


def encode_board(board):
    encoded = torch.zeros(2, 6, 8, 8)
    for square in chess.SQUARES:
        piece = board.piece_at(square)
        if piece:
            color = int(piece.color)
            piece_type = piece.piece_type - 1
            rank = square // 8
            file = square % 8
            encoded[color, piece_type, rank, file] = 1
    return encoded.flatten()


In [2]:
class ChessDataset(Dataset):
    def __init__(self, folder_path):
        self.train = True
        self.filtered = True
        self.data_filtered = []
        self.data_unfiltered = []
        self.process_files(folder_path)
        random.shuffle(self.data_filtered)
        random.shuffle(self.data_unfiltered)
        self.train_data_filtered = self.data_filtered[:int(len(self.data_filtered)*0.8)]
        self.test_data_filtered = self.data_filtered[int(len(self.data_filtered)*0.8):]
        self.train_data_unfiltered = self.data_unfiltered[:int(len(self.data_unfiltered)*0.8)]
        self.test_data_unfiltered = self.data_unfiltered[int(len(self.data_unfiltered)*0.8):]

    def process_files(self, folder_path):
        move_pattern = re.compile(r'\b([NBRQK]?[a-h]?[1-8]?x?[a-h][1-8](=[NBRQ])?|O-O(?:-O)?)')
        for i, filename in enumerate(os.listdir(folder_path)):
            if i % 5000 == 0: print(i)
            if i == 50_000:
                break
            if filename.endswith('.pgn'):
                file_path = os.path.join(folder_path, filename)
                with open(file_path, 'r') as pgn_file:
                    content = pgn_file.read()
                    moves = move_pattern.findall(content)
                    result_match = re.search(r'\s(1-0|0-1|1/2-1/2)\s', content)
                    result = result_match.group(1) if result_match else None
                    if not moves or not result:
                        continue
                    board = chess.Board()
                    valid_moves = []
                    moves = [i for i, _ in moves]
                    for move in moves:
                        try:
                            board.push_san(move)
                            valid_moves.append(move)
                        except ValueError:
                            break
                    if valid_moves:
                        num_samples = 1 + len(valid_moves) // 5
                        sample_indices = random.sample(range(len(valid_moves)), num_samples)
                        for sample_index in sample_indices:
                            board = chess.Board()
                            for i, move in enumerate(valid_moves):
                                if i == sample_index:
                                    break
                                board.push_san(move)
                            board_state = encode_board(board)
                            next_move = valid_moves[sample_index] if sample_index < len(valid_moves) else None
                            next_move1 = valid_moves[sample_index + 1] if sample_index + 1 < len(valid_moves) else None
                            next_move2 = valid_moves[sample_index + 2] if sample_index + 2 < len(valid_moves) else None
                            is_capture = ((("x" in next_move) if next_move else 0) or
                                          (("x" in next_move) if next_move else 0) or
                                            (("x" in next_move2) if next_move2 else 0))
                            if result == '1-0':
                                y = 1.0
                            elif result == '0-1':
                                y = 0.0
                            else:
                                y = 0.5
                            if not is_capture:
                                self.data_filtered.append((board_state.cuda(), torch.tensor(y, dtype=torch.float32).cuda()))
                            self.data_unfiltered.append((board_state.cuda(), torch.tensor(y, dtype=torch.float32).cuda()))

    def __len__(self):
        if self.train and self.filtered:
            return len(self.train_data_filtered)
        elif self.train and (not self.filtered):
            return len(self.train_data_unfiltered)
        elif (not self.train) and self.filtered:
            return len(self.test_data_filtered)
        else:
            return len(self.test_data_unfiltered)


    def __getitem__(self, idx):
        if self.train and self.filtered:
            return (self.train_data_filtered[idx][0], self.train_data_filtered[idx][1])
        elif self.train and (not self.filtered):
            return (self.train_data_unfiltered[idx][0], self.train_data_unfiltered[idx][1])
        elif (not self.train) and self.filtered:
            return (self.test_data_filtered[idx][0], self.test_data_filtered[idx][1])
        else:
            return (self.test_data_unfiltered[idx][0], self.test_data_unfiltered[idx][1])




In [3]:
class PSQT(nn.Module):
    def __init__(self):
        super(PSQT, self).__init__()
        self.psqt1 = nn.Linear(768, 32, bias=False) # lahko posedemo, pospeši konvergenco
        torch.nn.init.constant_(self.psqt1.weight, 0)
        self.psqt2 = nn.Linear(32, 1, bias=False) # posamezne vrednosti lahko pridobimo z self.psqt2(self.psqt1(x)), kjer je x one hot encoding
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        return self.sigmoid(self.psqt2(self.psqt1(x)))



In [4]:
class Pieces(nn.Module):
    def __init__(self):
        super(Pieces, self).__init__()
        self.pieces = nn.Parameter(torch.zeros(6))
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        x = x.view(-1, 2, 6, 64)
        x = x.sum(dim=3)
        x = x[:, 1, :] - x[:, 0, :]
        x = (torch.sum(x * self.pieces, dim=1, keepdim=True))
        return self.sigmoid(x)


In [5]:
def train_model(model, dataloader, criterion, optimizer, scheduler, num_epochs):
    for epoch in range(num_epochs):
        running_loss = 0.0
        for inputs, labels in dataloader:
            inputs = inputs
            labels = labels
            optimizer.zero_grad()
            outputs = model(inputs)
            try:
                loss = criterion(outputs.squeeze(), labels)
            except ValueError:
                continue
            loss.backward()
            optimizer.step()
            running_loss += loss.item()
        if epoch % 10 == 0:
            print(f'Epoch {epoch+1}, Loss: {running_loss/len(dataloader)}')
        scheduler.step()


In [6]:
def run_experiment(dataset, experiment):
    print("train")
    model, filtered, criterion, optimizer, scheduler, epochs = experiment
    if filtered == "filtered":
        dataset.filtered = True
    else:
        dataset.filtered = False
    dataset.train = True

    train_dataloader = DataLoader(dataset, batch_size=2 ** 5, shuffle=True)
    train_model(model, train_dataloader, criterion, optimizer, scheduler, num_epochs=epochs)

    print("test")
    dataset.train = False
    train_model(model, train_dataloader, criterion, optim.Adam(model.parameters(), lr=0), scheduler, num_epochs=1)
    return model


In [7]:
folder_path = 'pgns-run1-test80-20220404-1254'
dataset = ChessDataset(folder_path)

experiments = [
    (model := Pieces().cuda(), "unfiltered", nn.BCELoss(), optimizer := optim.Adam(model.parameters(), lr=0.01), MultiplicativeLR(optimizer, lr_lambda=lambda x: 0.8), 31),
    (model := Pieces().cuda(), "filtered", nn.BCELoss(), optimizer := optim.Adam(model.parameters(), lr=0.01), MultiplicativeLR(optimizer, lr_lambda=lambda x: 0.8), 31),
    (model := Pieces().cuda(), "unfiltered", nn.MSELoss(), optimizer := optim.Adam(model.parameters(), lr=0.01), MultiplicativeLR(optimizer, lr_lambda=lambda x: 0.8), 31),
    (model := Pieces().cuda(), "filtered", nn.MSELoss(), optimizer := optim.Adam(model.parameters(), lr=0.01), MultiplicativeLR(optimizer, lr_lambda=lambda x: 0.8), 31),
    (model := Pieces().cuda(), "unfiltered", nn.L1Loss(), optimizer := optim.Adam(model.parameters(), lr=0.01), MultiplicativeLR(optimizer, lr_lambda=lambda x: 0.8), 31),
    (model := Pieces().cuda(), "filtered", nn.L1Loss(), optimizer := optim.Adam(model.parameters(), lr=0.01), MultiplicativeLR(optimizer, lr_lambda=lambda x: 0.8), 31),

    (model := PSQT().cuda(), "unfiltered", nn.MSELoss(), optimizer := optim.Adam(model.parameters(), lr=0.01), MultiplicativeLR(optimizer, lr_lambda=lambda x: 0.8), 31),
    (model := PSQT().cuda(), "filtered", nn.MSELoss(), optimizer := optim.Adam(model.parameters(), lr=0.01), MultiplicativeLR(optimizer, lr_lambda=lambda x: 0.8), 31),
]

for i in experiments:
    print(f"\nRunning experiment: {i}")
    model = run_experiment(dataset, i)
    print("parameters")
    if type(model) == Pieces:
        print(list(model.parameters())[:5])
    else:
        print(-model.psqt2(model.psqt1(torch.Tensor([[i == j for i in range(768)] for j in range(64)]).cuda())))
        

0
5000
10000
15000
20000
25000
30000
35000
40000
45000
50000

Running experiment: (Pieces(
  (sigmoid): Sigmoid()
), 'unfiltered', BCELoss(), Adam (
Parameter Group 0
    amsgrad: False
    betas: (0.9, 0.999)
    capturable: False
    differentiable: False
    eps: 1e-08
    foreach: None
    fused: None
    initial_lr: 0.01
    lr: 0.01
    maximize: False
    weight_decay: 0
), <torch.optim.lr_scheduler.MultiplicativeLR object at 0x7887ec5875b0>, 31)
train
Epoch 1, Loss: 0.6237303920685981
Epoch 11, Loss: 0.6230529361803224
Epoch 21, Loss: 0.6230176040849421
Epoch 31, Loss: 0.6230124386297317
test
Epoch 1, Loss: 0.6228334694511597
parameters
[Parameter containing:
tensor([0.4715, 0.9801, 1.0760, 1.6307, 2.9666, 0.0000], device='cuda:0',
       requires_grad=True)]

Running experiment: (Pieces(
  (sigmoid): Sigmoid()
), 'filtered', BCELoss(), Adam (
Parameter Group 0
    amsgrad: False
    betas: (0.9, 0.999)
    capturable: False
    differentiable: False
    eps: 1e-08
    foreach: