In [1]:
import os
import io
import math
import chess
import chess.pgn
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from tqdm import tqdm
from torch.utils.data import DataLoader, TensorDataset
from sqlalchemy import create_engine, Column, Integer, String, Text
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")
torch.cuda.is_available()

Using device: cuda


True

In [2]:
Base = declarative_base()

class ChessGame(Base):
    __tablename__ = 'games'
    id = Column(Integer, primary_key=True)
    pgn = Column(Text)

engine = create_engine('sqlite:///../chess_games.db')
Session = sessionmaker(bind=engine)
session = Session()

  Base = declarative_base()


In [3]:
def load_openings_from_pgn(pgn_file):
    openings = []
    with open(pgn_file) as f:
        while True:
            game = chess.pgn.read_game(f)
            if game is None:
                break

            board = game.board()
            moves = []
            for move in game.mainline_moves():
                moves.append(board.san(move))
                board.push(move)

            openings.append(moves)
    return openings

#Function to find matching openings based on played moves
def find_matching_openings(played_moves, openings):
    """
    Finding opening works by checking if the played moves match the start of any opening in the opening book.
    This is because a chosen opening can be diverged from at any point by the opponent.
    This makes the bot more dynamic in the opening phase.
    """
    matching_openings = []
    for opening in openings:
        if played_moves == opening[:len(played_moves)]:
            matching_openings.append(opening)
    return matching_openings

#Function to choose the next move from matching openings
def select_next_move(played_moves, matching_openings):
    if not matching_openings:
        return None  # No matching opening found, time for engine

    # Check if there is a next move available in the matching opening
    for opening in matching_openings:
        if len(opening) > len(played_moves):
            next_move = opening[len(played_moves)]
            return next_move
    
    return None  # No more moves in the opening book, time for engine

#Load the openings
openings = load_openings_from_pgn("eco.pgn")

#Example usage
played_moves = ['e4']  
matching_openings = find_matching_openings(played_moves, openings)
next_move = select_next_move(played_moves, matching_openings)

if next_move:
    print(f"Bot's next move: {next_move}")
else:
    print("No matching opening found, calculate the move using engine logic.")

Bot's next move: g6


In [4]:
class ChessNet(nn.Module):
    def __init__(self):
        super(ChessNet, self).__init__()

        self.conv1 = nn.Conv2d(12, 64, kernel_size=3, padding=1)
        self.conv2 = nn.Conv2d(64, 128, kernel_size=3, padding=1)
        self.conv3 = nn.Conv2d(128, 128, kernel_size=3, padding=1)
        
        #Positional encoding for the transformer
        self.positional_encoding = PositionalEncoding(d_model=128)

        #Transformer Encoder
        encoder_layers = nn.TransformerEncoderLayer(d_model=128, nhead=8)
        self.transformer = nn.TransformerEncoder(encoder_layers, num_layers=4)

        #Fully connected layer
        self.fc1 = nn.Linear(8*8*128, 4096)  #4096 possible moves

    def forward(self, x):

        x = self.conv1(x)
        x = F.relu(x)
        x = self.conv2(x)
        x = F.relu(x)
        x = self.conv3(x)
        x = F.relu(x)

        x = x.view(-1, 128, 8*8)  #[batch_size, d_model, sequence_length]
        x = x.permute(2, 0, 1)  #[sequence_length, batch_size, d_model]

        # Positional encoding
        x = self.positional_encoding(x)

        # Transformer encoder
        x = self.transformer(x)

        x = x.permute(1, 0, 2).contiguous()  #[batch_size, sequence_length, d_model]
        x = x.view(-1, 8*8*128)
        x = self.fc1(x)
        return x

#Positional encoding for the transformer in order to give the model information about the position of the pieces
#Uses the sine and cosine functions to encode the position of the board in a unique way
#Experimental, might be overkill. Saw somewhere it could be useful for the transformer, but not sure if it is properly implemented here
class PositionalEncoding(nn.Module): 
    def __init__(self, d_model, max_len=64):
        super(PositionalEncoding, self).__init__()
        self.encoding = torch.zeros(max_len, d_model)
        self.encoding.requires_grad = False

        pos = torch.arange(0, max_len).float().unsqueeze(1)
        _2i = torch.arange(0, d_model, 2).float()

        self.encoding[:, 0::2] = torch.sin(pos / (10000 ** (_2i / d_model)))
        self.encoding[:, 1::2] = torch.cos(pos / (10000 ** (_2i / d_model)))
        self.encoding = self.encoding.unsqueeze(1)

    def forward(self, x):
        return x + self.encoding[:x.size(0), :].to(x.device)

In [5]:
def board_to_input(board):
    board_planes = np.zeros((8, 8, 12), dtype=np.float32)
    for square in chess.SQUARES:
        piece = board.piece_at(square)
        if piece:
            plane = piece.piece_type - 1
            if piece.color == chess.BLACK:
                plane += 6
            row, col = divmod(square, 8)
            board_planes[row, col, plane] = 1
    return board_planes

#Encode the move
def move_to_output(move):
    from_square = move.from_square
    to_square = move.to_square
    return from_square * 64 + to_square

def calculate_accuracy(output, target):
    _, predicted = torch.max(output, 1)
    correct = (predicted == target).sum().item()
    return correct / target.size(0)

#Training step with move skipping and batching
def train_on_batch(games, model, optimizer, criterion, device, skip_moves=6):
    all_board_inputs = []
    all_targets = []
    total_moves = 0

    for game_str in games:
        pgn_io = io.StringIO(game_str)
        game = chess.pgn.read_game(pgn_io)
        board = game.board()
        move_count = 0

        for move in game.mainline_moves():
            #A static number of moves are skipped to avoid overfitting to the opening book
            #More sophisticated methods can be used to skip exact amount of book moves, but it is too inefficient for my machine
            if move_count < skip_moves:
                board.push(move)
                move_count += 1
                continue

            #Prepare the input and output
            board_input = board_to_input(board)
            board_input = torch.tensor(board_input, dtype=torch.float32).unsqueeze(0).permute(0, 3, 1, 2).to(device)
            actual_output = move_to_output(move)
            actual_output = torch.tensor([actual_output], dtype=torch.long).to(device)

            all_board_inputs.append(board_input)
            all_targets.append(actual_output)
            total_moves += 1

            #Update the board with the actual move
            board.push(move)

    if all_board_inputs:
        #Stack all inputs and targets
        batch_inputs = torch.cat(all_board_inputs, dim=0)
        batch_targets = torch.cat(all_targets, dim=0)

        optimizer.zero_grad()
        output = model(batch_inputs)

        loss = criterion(output, batch_targets)
        accuracy = calculate_accuracy(output, batch_targets)

        loss.backward()
        optimizer.step()

        return loss.item(), accuracy, total_moves
    else:
        return 0, 0, 0  #If no valid moves in batch

#Training loop
batch_size = 1000
game_batch_size = 10 
offset = 0
step = 0

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = ChessNet().to(device)
optimizer = optim.Adam(model.parameters())
criterion = nn.CrossEntropyLoss()

while True:
    games = session.query(ChessGame).offset(offset).limit(batch_size).all()
    if not games:
        break

    total_loss = 0.0
    total_accuracy = 0.0
    total_moves = 0
    j = 0

    with tqdm(total=len(games) // game_batch_size, desc=f"Processing Batch {offset // batch_size + 1}") as pbar:
        for i in range(0, len(games), game_batch_size):
            game_batch = [game.pgn for game in games[i:i + game_batch_size]]
            loss, accuracy, moves = train_on_batch(game_batch, model, optimizer, criterion, device, skip_moves=6)
            total_loss += loss * moves
            total_accuracy += accuracy * moves
            total_moves += moves
            pbar.update(1)
            if total_moves > 0:
                pbar.set_postfix({'Loss': total_loss / total_moves, 'Accuracy': total_accuracy / total_moves})
    j += 1
    model_save_path = os.path.join('savedModels', f'cnn_transformer_model_epoch_{j}.pth')
    torch.save(model.state_dict(), model_save_path)
    offset += batch_size
    
#Close the session and TensorBoard writer
#Still have not tried TensorBoard, might not work
session.close()
model_save_path = os.path.join('savedModels', f'cnn_transformer_model_final.pth')
torch.save(model.state_dict(), model_save_path)


  attn_output = scaled_dot_product_attention(q, k, v, attn_mask, dropout_p, is_causal)
Processing Batch 1: 100%|██████████| 100/100 [03:14<00:00,  1.94s/it, Loss=8.02, Accuracy=0.00603]
Processing Batch 2:  33%|███▎      | 33/100 [00:54<01:58,  1.77s/it, Loss=7.16, Accuracy=0.00807]