In [45]:
import os
import numpy as np

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

import chess
import chess.pgn

from torch.utils.data import Dataset, DataLoader, IterableDataset

from time import time
import tqdm

HEADERS = ("result", "whiteElo", "blackElo", "timeControl", "sideToPlay", "bitmaps", "movePlayed")
TRAINING_SET_SIZE = 260_000
BATCH_SIZE = 10_000

In [46]:
def extract_fens_from_pgn(pgn_file, label_from="result"):
    positions = []
    with open(pgn_file, "r", encoding="utf-8") as f:
        while True:
            game = chess.pgn.read_game(f)
            if game is None:
                break

            board = game.board()
            result = game.headers.get("Result")

            # Label based on game outcome
            if result == "1-0":
                label = 1.0
            elif result == "0-1":
                label = -1.0
            else:
                label = 0.0

            # Step through moves
            for move in game.mainline_moves():
                board.push(move)
                fen = board.fen()
                positions.append((fen, label))  # ← You can also call Stockfish here if you want

    return positions


In [47]:
import torch.nn as nn
import torch.nn.functional as F
class PositionEvaluatorNet(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv1 = nn.Conv2d(13, 32, 3, padding=1)
        self.conv2 = nn.Conv2d(32, 64, 3, padding=1)
        self.fc1 = nn.Linear(64 * 8 * 8, 128)
        self.fc2 = nn.Linear(128, 1)

    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.relu(self.conv2(x))
        x = x.view(-1, 64 * 8 * 8)
        x = F.relu(self.fc1(x))
        return torch.tanh(self.fc2(x))  # Output between -1 and 1
    

class ChessBotNetwork(nn.Module):
    def __init__(self):
        super().__init__()
        # 13 channels: 12 pieces + side to play (tensor[true's|false's])
        self.conv1 = nn.Conv2d(13, 32, 3, padding=1)
        self.conv2 = nn.Conv2d(32, 64, 3, padding=1)
        self.fc1 = nn.Linear(64 * 8 * 8, 128)
        self.fc2 = nn.Linear(128, 64) # In case of pawn promotion, we assume it promotes to a queen

    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.relu(self.conv2(x))
        x = x.view(-1, 64 * 8 * 8)
        x = F.relu(self.fc1(x))
        return self.fc2(x)

## Idea:
##   One model that answers "best piece to move in this position"
##   Then another model that answers "best square to move piece X to"

In [48]:
piece_to_idx = {
    'P': 0, 'N': 1, 'B': 2, 'R': 3, 'Q': 4, 'K': 5,
    'p': 6, 'n': 7, 'b': 8, 'r': 9, 'q': 10, 'k': 11
}

def board_to_tensor(board):
    tensor = np.zeros((12, 8, 8), dtype=np.uint8)
    for square in chess.SQUARES:
        piece = board.piece_at(square)
        if piece:
            idx = piece_to_idx[piece.symbol()]
            row = 7 - square // 8
            col = square % 8
            tensor[idx, row, col] = 1
    return tensor

In [None]:
import ast
import random
from typing import Literal
import pandas as pd

# class ChessEvalDataset(Dataset):
class ChessEvalDataset(IterableDataset):
    def __init__(self, file: str, model: Literal["pieces", "moves"] = "pieces"):
        self.file = file
        self.model = model


    def __len__(self):
        return TRAINING_SET_SIZE
    
    def __fetch_from_file(self, idx: int):
        dataset = pd.read_csv(
            self.file,
            skiprows=idx,
            nrows=1,
            names=HEADERS
        )

        # Convert string to list
        # Convert to NumPy array
        bitmaps = np.array(
            dataset["bitmaps"]
              .apply(ast.literal_eval)
              .tolist()
        )

        side_to_play_bool = dataset["sideToPlay"].to_numpy()
        side_channels = []
        for play_white in side_to_play_bool:
            side_channel = np.ones((1, 8, 8)) if play_white else np.zeros((1, 8, 8))
            side_channels.append(side_channel)
        side_channels = np.array(side_channels)
        bitmaps = np.concatenate((bitmaps, side_channels), axis=1)

        # Convert to tensor
        X = torch.tensor(bitmaps, dtype=torch.float32)[0]

        # From "a2b5", extract a2 if piece or b5 if move
        if self.model == "pieces":
            y = np.array(dataset["movePlayed"].astype(str).str[:2])[0]
        elif self.model == "moves":
            y = np.array(dataset["movePlayed"].astype(str).str[2:])[0]
        
        return X, y

    def __getitem__(self, idx):
        # aux function
        moveToSquare = lambda move: (ord(move[0]) - ord('a')) + (int(move[1]) - 1) * 8
        position, move = self.__fetch_from_file(idx)
        return position.clone().detach(), torch.tensor(moveToSquare(move)).long()
    

    def parse_line(self, line: str):
        csv.reader(csvfile, delimiter=',')
        # parts = 
        # Map to column names
        entry = {
            "bitmaps": ast.literal_eval(parts[0]),
            "sideToPlay": parts[1] == "True",
            "movePlayed": parts[2]
        }

        bitmaps = np.array(entry["bitmaps"])
        side_channel = np.ones((1, 8, 8)) if entry["sideToPlay"] else np.zeros((1, 8, 8))
        bitmaps = np.concatenate((bitmaps, side_channel), axis=0)
        X = torch.tensor(bitmaps, dtype=torch.float32)

        move = entry["movePlayed"]
        if self.model == "pieces":
            label = move[:2]
        else:
            label = move[2:]
        moveToSquare = lambda m: (ord(m[0]) - ord('a')) + (int(m[1]) - 1) * 8
        y = torch.tensor(moveToSquare(label)).long()

        return X, y
    
    def __iter__(self):
        chosen_indices = set(sorted(random.sample(range(1, TRAINING_SET_SIZE), BATCH_SIZE)))
        with open(self.file, "r") as f:
            for i, line in enumerate(f):
                if i in chosen_indices:
                    yield self.parse_line(line)

In [50]:
DATASET_PATH = '../dataset/processed/results_2.csv'
NUM_EPOCHS = 100


TRAINING_MODE = "pieces" # "pieces" or "moves"
MODEL_WEIGHTS_OUTPUT_PATH = "pieces_model.pth"

test = ChessEvalDataset(file = DATASET_PATH, model=TRAINING_MODE)
loader = DataLoader(test, batch_size=BATCH_SIZE, shuffle=False)


device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = ChessBotNetwork().to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
loss_fn = torch.nn.CrossEntropyLoss()

for epoch in range(NUM_EPOCHS):
    model.train()
    t0 = time()
    for board_tensor_batch, target_eval_batch in tqdm.tqdm(loader):
        # Load data to GPU
        board_tensor_batch, target_eval_batch = board_tensor_batch.to(device), target_eval_batch.to(device)
        # predict
        pred = model(board_tensor_batch)

        # Calculate loss
        loss = loss_fn(pred, target_eval_batch)
        loss.backward()

        optimizer.zero_grad()
        optimizer.step()
    tf = time()
    print(f"Epoch {epoch}, Loss: {loss.item():.4f} | Time: {tf-t0}")


# Save the trained model
torch.save(model.state_dict(), MODEL_WEIGHTS_OUTPUT_PATH)

  0%|          | 0/26 [00:00<?, ?it/s]


ValueError: malformed node or string on line 1: <ast.BinOp object at 0x7cb8f5ad7b20>

In [None]:
# import csv
# import pandas as pd
# import numpy as np
# import ast


# test = pd.read_csv(
#     DATASET_PATH, 
#     nrows = 5, 
#     names = HEADERS,
# )
# test


Unnamed: 0,result,whiteElo,blackElo,timeControl,sideToPlay,bitmaps,movePlayed
0,1-0,1901,1896,300+5,True,"[[[0, 0, 0, 0, 0, 0, 0, 0], [0, 0, 0, 0, 0, 0,...",d2d4
1,1-0,1901,1896,300+5,False,"[[[0, 0, 0, 0, 0, 0, 0, 0], [0, 0, 0, 0, 0, 0,...",d7d5
2,1-0,1901,1896,300+5,True,"[[[0, 0, 0, 0, 0, 0, 0, 0], [0, 0, 0, 0, 0, 0,...",c2c4
3,1-0,1901,1896,300+5,False,"[[[0, 0, 0, 0, 0, 0, 0, 0], [0, 0, 0, 0, 0, 0,...",c7c6
4,1-0,1901,1896,300+5,True,"[[[0, 0, 0, 0, 0, 0, 0, 0], [0, 0, 0, 0, 0, 0,...",e2e3


In [None]:
# trained_model = ChessBotNetwork()
# trained_model.load_state_dict(torch.load(MODEL_WEIGHTS_OUTPUT_PATH))
# trained_model.eval()  # Set model to evaluation mode