In [None]:
import os
import numpy as np
import torch
import time
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from chess import Board, pgn, Piece
from tqdm import tqdm

# Dataset

In [None]:
class ChessDataset(Dataset):

    def __init__(self, X, y):
        self.X = X
        self.y = y

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

# Model

In [None]:
class ChessModel(nn.Module):
    def __init__(self, num_classes):
        super(ChessModel, self).__init__()
        self.conv1 = nn.Conv2d(13, 64, kernel_size=3, padding=1)
        self.conv2 = nn.Conv2d(64, 128, kernel_size=3, padding=1)
        self.flatten = nn.Flatten()
        self.fc1 = nn.Linear(8 * 8 * 128, 256)
        self.fc2 = nn.Linear(256, num_classes)
        self.relu = nn.ReLU()
        
        # Initialize weights
        nn.init.kaiming_uniform_(self.conv1.weight, nonlinearity='relu')
        nn.init.kaiming_uniform_(self.conv2.weight, nonlinearity='relu')
        nn.init.xavier_uniform_(self.fc1.weight)
        nn.init.xavier_uniform_(self.fc2.weight)

    def forward(self, x):
        x = self.relu(self.conv1(x))
        x = self.relu(self.conv2(x))
        x = self.flatten(x)
        x = self.relu(self.fc1(x))
        x = self.fc2(x)  # Output raw logits
        return x

# Data preprocessing

## Load data

In [None]:
def load_pgn(file_path):
    games = []
    with open(file_path, 'r') as pgn_file:
        while True:
            game = pgn.read_game(pgn_file)
            if game is None:
                break
            games.append(game)
    return games

files = [file for file in os.listdir("data/pgn") if file.endswith(".pgn")]
LIMIT_OF_FILES = min(len(files), 28)
games = []
i = 1
for file in tqdm(files):
    games.extend(load_pgn(f"data/pgn/{file}"))
    if i >= LIMIT_OF_FILES:
        break
    i += 1

In [None]:
print(f"GAMES PARSED: {len(games)}")

## Convert data into tensors

In [None]:
def board_to_matrix(board: Board):
    # 8x8 is a size of the chess board. 
    # 12 = number of unique pieces. 
    # 13th board for legal moves (WHERE we can move)
    # maybe 14th for squares FROM WHICH we can move? idk
    matrix = np.zeros((8, 8, 13)) 
    piece_map = board.piece_map()
    
    # Populate first 12 8x8 boards (where pieces are)
    for square, piece in piece_map.items():
        row, col = divmod(square, 8)
        piece_type = piece.piece_type - 1
        piece_color = 0 if piece.color else 6
        matrix[row, col, piece_type + piece_color] = 1
    
    # Populate the legal moves board (13th 8x8 board)
    legal_moves = board.legal_moves
    for move in legal_moves:
        to_square = move.to_square
        row_to, col_to = divmod(to_square, 8)
        matrix[row_to, col_to, 12] = 1
    
    return matrix


def create_input_for_nn(games):
    X = []
    y = []
    for game in games:
        board = game.board()
        for move in game.mainline_moves():
            X.append(board_to_matrix(board))
            y.append(move.uci())
            board.push(move)
    return np.array(X, dtype=np.float32), np.array(y, dtype=np.float32)

def encode_moves(moves):
    move_to_int = {move: idx for idx, move in enumerate(set(moves))}
    return [move_to_int[move] for move in moves], move_to_int

In [None]:
X, y = create_input_for_nn(games)
# empty memory
games.clear()

print(f"NUMBER OF SAMPLES: {len(y)}")

In [None]:
X = X[0:2500000]
y = y[0:2500000]

In [None]:
y, move_to_int = encode_moves(y)
num_classes = len(move_to_int)

In [None]:
X = torch.tensor(X, dtype=torch.float32) # THIS WORKS TOO SLOW AND GIVES A WARNING
y = torch.tensor(y, dtype=torch.long)

The convolutional layers in PyTorch expects the input to be in the shape ```[batch_size, channels, height, width]```, 

but the current input is in the shape ```[batch_size, height, width, channels]```

In [None]:
print(f"before: {X.shape}")
X = np.transpose(X, (0, 3, 1, 2))  # Change shape from [num_samples, 8, 8, 13] to [num_samples, 8, 13, 8]
print(f"after: {X.shape}")

# Preliminary actions

In [None]:
# Create Dataset and DataLoader
dataset = ChessDataset(X, y)
dataloader = DataLoader(dataset, batch_size=64, shuffle=True)

# Check for GPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f'Using device: {device}')

# Model Initialization
model = ChessModel(num_classes=num_classes).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.0001)

# Training

In [None]:
# Training Loop with Verbose Output
num_epochs = 50
for epoch in range(num_epochs):
    start_time = time.time()
    model.train()
    running_loss = 0.0
    for inputs, labels in tqdm(dataloader):
        inputs, labels = inputs.to(device), labels.to(device)  # Move data to GPU
        optimizer.zero_grad()

        outputs = model(inputs)  # Raw logits

        # Compute loss
        loss = criterion(outputs, labels)
        loss.backward()
        
        # Gradient clipping
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        
        optimizer.step()
        running_loss += loss.item()
    end_time = time.time()
    epoch_time = end_time - start_time
    minutes: int = int(epoch_time // 60)
    seconds: int = int(epoch_time) - minutes * 60
    print(f'Epoch {epoch+1}/{num_epochs}, Loss: {running_loss / len(dataloader):.4f}, Time: {minutes}m{seconds}s')

# Save the model

In [None]:
# Save the model
torch.save(model.state_dict(), "model/chess_model.pth")

# Predictions

1. Prepare the Input Data
Convert the board state to a format suitable for the model:

In [None]:
def prepare_input(board: Board):
    matrix = board_to_matrix(board)
    X_tensor = torch.tensor(matrix, dtype=torch.float32).unsqueeze(0).permute(0, 3, 1, 2)  # Add batch dimension and permute
    return X_tensor

2. Load the Model and Move to GPU if Available

In [None]:
# Load the model
model = ChessModel(num_classes=len(move_to_int))
model.load_state_dict(torch.load("model/chess_model.pth"))
model.to(device)
model.eval()  # Set the model to evaluation mode

int_to_move = {v: k for k, v in move_to_int.items()}
# Function to make predictions
def predict_move(board: Board):
    X_tensor = prepare_input(board).to(device)
    
    with torch.no_grad():
        logits = model(X_tensor)
    
    logits = logits.squeeze(0)  # Remove batch dimension
    
    probabilities = torch.softmax(logits, dim=0).cpu().numpy()  # Convert to probabilities
    legal_moves = list(board.legal_moves)
    legal_moves_uci = [move.uci() for move in legal_moves]
    sorted_indices = np.argsort(probabilities)[::-1]
    for move_index in sorted_indices:
        move = int_to_move[move_index]
        if move in legal_moves_uci:
            return move
    
    return None

3. Use the ```predict_move``` function to get the best move and its probabilities for a given board state:

In [None]:
# Initialize a chess board
board = Board()

In [None]:
board.push_uci("c8e8")
board

In [None]:
# Get the best move and the probabilities
best_move = predict_move(board)
board.push_uci(best_move)
board


In [None]:
print(str(pgn.Game.from_board(board)))