In [None]:
%pip install kaggle -q
%pip install chess -q
!kaggle datasets download arevel/chess-games
!unzip -qq chess-games.zip

In [None]:
# Import necessary libraries
import os
import re
import gc
import chess
import chess.svg
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split

In [None]:
# Define letter to number and number to letter dictionaries
letter_2_num = {'a': 0, 'b': 1, 'c': 2, 'd': 3, 'e': 4, 'f': 5, 'g': 6, 'h': 7}
num_2_letter = {0: 'a', 1: 'b', 2: 'c', 3: 'd', 4: 'e', 5: 'f', 6: 'g', 7: 'h'}

# Data processing functions
def board_2_rep(board):
    pieces = ['p', 'r', 'n', 'b', 'q', 'k']
    layers = []
    for piece in pieces:
        layers.append(create_rep_layer(board, piece))
    board_rep = np.stack(layers).astype(np.float32)  # Ensure float32 type
    return board_rep

def create_rep_layer(board, type):
    s = str(board)
    s = re.sub(f'[^{type}{type.upper()} \n]', '.', s)
    s = re.sub(f'{type}', '-1', s)
    s = re.sub(f'{type.upper()}', '1', s)
    s = re.sub(f'\.', '0', s)

    board_mat = []
    for row in s.split('\n'):
        row = row.split(' ')
        row = [int(x) for x in row]
        board_mat.append(row)

    return np.array(board_mat).astype(np.float32)

# Define move representation function
def move_2_rep(move, board):
    board.push_san(move).uci()
    move = str(board.pop())

    from_output_layer = np.zeros((8, 8))
    from_row = 8 - int(move[1])
    from_column = letter_2_num[move[0]]
    from_output_layer[from_row, from_column] = 1

    to_output_layer = np.zeros((8, 8))
    to_row = 8 - int(move[3])
    to_column = letter_2_num[move[2]]
    to_output_layer[to_row, to_column] = 1

    return np.stack([from_output_layer, to_output_layer])

def create_move_list(s):
    return re.sub('\d*\. ', '', s).split(' ')[:-1]

In [None]:
chess_data_raw = pd.read_csv('chess_games.csv', usecols=['AN', 'WhiteElo'])
chess_data = chess_data_raw[chess_data_raw['WhiteElo'] > 1600]
del chess_data_raw
gc.collect()
chess_data = chess_data[['AN']]
chess_data = chess_data[~chess_data['AN'].str.contains('{')]
chess_data = chess_data[chess_data['AN'].str.len() > 20]
print(chess_data.shape[0])

3800027


In [None]:
# Define the ChessDataset class
class ChessDataset(Dataset):

    def __init__(self, games):
        super(ChessDataset, self).__init__()
        self.games = games

    def __len__(self):
        # return len(self.games)
        return self.games.shape[0]

    def __getitem__(self, index):
        game_i = np.random.randint(self.games.shape[0])
        random_game = self.games[game_i]
        moves = create_move_list(random_game)
        game_state_i = np.random.randint(len(moves) - 1)
        next_move = moves[game_state_i]
        moves = moves[:game_state_i]
        board = chess.Board()
        for move in moves:
            board.push_san(move)
        x = board_2_rep(board)
        y = move_2_rep(next_move, board)
        if game_state_i % 2 == 1:
            x *= -1
        return torch.tensor(x).float(), torch.tensor(y).float()

# Define the neural network modules
class module(nn.Module):

    def __init__(self, hidden_size):
        super(module, self).__init__()
        self.conv1 = nn.Conv2d(hidden_size, hidden_size, 3, stride=1, padding=1)
        self.conv2 = nn.Conv2d(hidden_size, hidden_size, 3, stride=1, padding=1)
        self.bn1 = nn.BatchNorm2d(hidden_size)
        self.bn2 = nn.BatchNorm2d(hidden_size)
        self.activation1 = nn.SELU()
        self.activation2 = nn.SELU()

    def forward(self, x):
        x_input = torch.clone(x)
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.activation1(x)
        x = self.conv2(x)
        x = self.bn2(x)
        x = x + x_input
        x = self.activation2(x)
        return x

# Define the ChessNet class
class ChessNet(nn.Module):

    def __init__(self, hidden_layers=8, hidden_size=200):
        super(ChessNet, self).__init__()
        self.hidden_layers = hidden_layers
        self.input_layer = nn.Conv2d(6, hidden_size, 3, stride=1, padding=1)
        self.module_list = nn.ModuleList([module(hidden_size) for i in range(hidden_layers)])
        self.output_layer = nn.Conv2d(hidden_size, 2, 3, stride=1, padding=1)

    def forward(self, x):
        x = self.input_layer(x)
        x = F.relu(x)

        for i in range(self.hidden_layers):
            x = self.module_list[i](x)

        x = self.output_layer(x)

        return x


In [None]:
train_data, val_data = train_test_split(chess_data['AN'].values, test_size=0.2, random_state=42)
train_dataset = ChessDataset(train_data)
val_dataset = ChessDataset(val_data)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)

In [None]:
# Training setup
device = torch.device('cuda')
model = ChessNet().to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min', patience=2)

In [None]:
checkpoint_path = 'chess_net.pth'
if os.path.exists(checkpoint_path):
    checkpoint = torch.load(checkpoint_path)
    model.load_state_dict(checkpoint['model_state_dict'])
    optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
    epoch = checkpoint['epoch']
    loss = checkpoint['loss']
else:
    model = ChessNet().to(device)
    optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)
    epoch = 0
    loss = None

In [None]:
train_losses = []
val_losses = []

num_epochs = 10
best_val_loss = float('inf')

for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    for i, (inputs, targets) in enumerate(train_loader):
        inputs, targets = inputs.to(device), targets.to(device)

        optimizer.zero_grad()
        outputs = model(inputs)

        outputs_from = outputs[:, 0, :, :].reshape(-1, 64)  # (batch_size, 64)
        outputs_to = outputs[:, 1, :, :].reshape(-1, 64)    # (batch_size, 64)
        targets_from = targets[:, 0, :, :].reshape(-1, 64)  # (batch_size, 64)
        targets_to = targets[:, 1, :, :].reshape(-1, 64)    # (batch_size, 64)

        targets_from = targets_from.argmax(dim=1).long()  # (batch_size * 64,)
        targets_to = targets_to.argmax(dim=1).long()      # (batch_size * 64,)

        loss_from = nn.CrossEntropyLoss()(outputs_from, targets_from)
        loss_to = nn.CrossEntropyLoss()(outputs_to, targets_to)
        loss = loss_from + loss_to

        loss.backward()
        optimizer.step()

        running_loss += loss.item()

    train_loss = running_loss / len(train_loader)
    train_losses.append(train_loss)

    # Validation phase
    model.eval()
    val_loss = 0.0
    with torch.no_grad():
        for inputs, targets in val_loader:
            inputs, targets = inputs.to(device), targets.to(device)
            outputs = model(inputs)

            outputs_from = outputs[:, 0, :, :].reshape(-1, 64)
            outputs_to = outputs[:, 1, :, :].reshape(-1, 64)
            targets_from = targets[:, 0, :, :].reshape(-1, 64)
            targets_to = targets[:, 1, :, :].reshape(-1, 64)

            targets_from = targets_from.argmax(dim=1).long()
            targets_to = targets_to.argmax(dim=1).long()

            loss_from = nn.CrossEntropyLoss()(outputs_from, targets_from)
            loss_to = nn.CrossEntropyLoss()(outputs_to, targets_to)
            val_loss += loss_from.item() + loss_to.item()

    val_loss /= len(val_loader)
    val_losses.append(val_loss)
    print(f'Epoch {epoch + 1}, Train Loss: {train_loss:.4f}, Validation Loss: {val_loss:.4f}')
    scheduler.step(val_loss)

    # Save the best model
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        torch.save({
            'epoch': epoch + 1,
            'model_state_dict': model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            'loss': val_loss,
        }, 'best_model.pth')

# Save the final model
torch.save(model.state_dict(), 'final_chess_net.pth')
torch.save({
    'model_state_dict': model.state_dict(),
    'optimizer_state_dict': optimizer.state_dict(),
    'loss': loss,
    'epoch': epoch,
}, 'final_model_and_optimizer.pth')

# Plot the training and validation loss
import matplotlib.pyplot as plt

plt.figure(figsize=(10, 5))
plt.plot(train_losses, label='Train Loss')
plt.plot(val_losses, label='Validation Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.title('Training and Validation Loss Across Epochs')
plt.show()

In [None]:
import chess
from stockfish import Stockfish
import torch
import numpy as np
import time

stockfish = Stockfish(path="./stockfish/stockfish-ubuntu-x86-64-avx2")
stockfish.set_skill_level(3)  # Set the skill level (0 to 20)

# Define the function to predict the next move
def predict(x):
    model.eval()
    with torch.no_grad():
        outputs = model(x)
    return outputs.cpu().numpy()

# Function to check for checkmate in a single move
def check_mate_single(board):
    board = board.copy()
    legal_moves = list(board.legal_moves)
    for move in legal_moves:
        board.push_uci(str(move))
        if board.is_checkmate():
            move = board.pop()
            return move
        _ = board.pop()
    return None

# Function to compute distribution over moves
def distribution_over_moves(vals):
    probs = np.array(vals)
    probs = np.exp(probs)
    probs = probs / probs.sum()
    probs = probs ** 3
    probs = probs / probs.sum()
    return probs

# Function to choose the next move for the AI
def choose_move(board, color):
    legal_moves = list(board.legal_moves)

    # Check for immediate checkmate move
    move = check_mate_single(board)
    if move is not None:
        return move

    # Prepare the input representation
    x = torch.Tensor(board_2_rep(board)).float().to(device)
    if color == chess.BLACK:
        x *= -1
    x = x.unsqueeze(0)

    # Predict the move
    move_output = predict(x)

    # Choose the move from the output
    vals = []
    froms = [str(legal_move)[:2] for legal_move in legal_moves]
    froms = list(set(froms))
    for from_ in froms:
        val = move_output[0, 0, 8 - int(from_[1]), letter_2_num[from_[0]]]
        vals.append(val)

    probs = distribution_over_moves(vals)
    chosen_from = str(np.random.choice(froms, size=1, p=probs)[0])[:2]

    vals = []
    for legal_move in legal_moves:
        from_ = str(legal_move)[:2]
        if from_ == chosen_from:
            to = str(legal_move)[2:]
            val = move_output[0, 1, 8 - int(to[1]), letter_2_num[to[0]]]
            vals.append(val)
        else:
            vals.append(0)

    chosen_move = legal_moves[np.argmax(vals)]
    return chosen_move

# Main game loop
def play_game():
    board = chess.Board()
    cnn_times = []

    while not board.is_game_over():
        # Human (CNN) move (White)
        start_time = time.time()
        ai_move = choose_move(board, chess.WHITE)
        cnn_time = (time.time() - start_time) * 1000  # Convert to milliseconds
        cnn_times.append(cnn_time)
        board.push(ai_move)
        print(f"CNN move: {ai_move} (took {cnn_time:.2f} ms)")

        if board.is_game_over():
            break

        # Stockfish move (Black)
        stockfish.set_fen_position(board.fen())
        stockfish_move = stockfish.get_best_move()
        move = chess.Move.from_uci(stockfish_move)
        board.push(move)
        print(f"Stockfish move: {move}")

    result = "Game over: " + board.result()
    print(result)
    return board.result(), cnn_times

# Evaluate performance over 10 games
def evaluate_performance(num_games):
    results = {"CNN": 0, "Stockfish": 0, "Draw": 0}
    all_cnn_times = []
    for i in range(num_games):
        print(f"Game {i+1}")
        result, cnn_times = play_game()
        all_cnn_times.extend(cnn_times)
        if result == "1-0":
            results["CNN"] += 1
        elif result == "0-1":
            results["Stockfish"] += 1
        else:
            results["Draw"] += 1
        print(f"Current Results: {results}")

    # Calculate the average move time in milliseconds
    average_time = sum(all_cnn_times) / len(all_cnn_times)
    return results, all_cnn_times, average_time

# Start the evaluation
if __name__ == "__main__":
    num_games = 10
    final_results, cnn_times, average_time = evaluate_performance(num_games)
    print(f"Final Results after {num_games} games: {final_results}")
    print(f"Average CNN move time: {average_time:.2f} ms")