## Final Project Introduction to Deep Learning
### 12/1/2024
### Sean Diab

#### Github: https://github.com/Sean-Diab/CU-Boulder-MSCS/blob/main/Deep%20Learning/Final%20Project.ipynb

In this project I will be using deep learning to create a chess ai. More specifically, I will use deep learning to predict what the next move is.

### Imports

In [None]:
import numpy as np
import re
import pandas as pd
import gc
import torch
from torch.utils.data import DataLoader, Dataset
import torch.nn as nn
import torch.nn.functional as F
import chess
import time
import random

In [None]:
print("CUDA Available:", torch.cuda.is_available())
print("CUDA Device Count:", torch.cuda.device_count())
print("CUDA Device Name:", torch.cuda.get_device_name(0) if torch.cuda.is_available() else "No CUDA device")

In [None]:
torch.backends.cudnn.benchmark = True  # Optimizes CUDA kernel selection

Import Data & Define Variables

In [None]:
chess_data_raw = pd.read_csv('./content/chess_games.csv', usecols=['AN', 'WhiteElo'])
chess_data = chess_data_raw[chess_data_raw['WhiteElo'] > 2000]
del chess_data_raw
gc.collect()

chess_data = chess_data[['AN']]
chess_data = chess_data[~chess_data['AN'].str.contains('{')]
chess_data = chess_data[chess_data['AN'].str.len() > 20]
print(chess_data.shape[0])

letter_2_num = {'a':0, 'b':1, 'c':2, 'd':3, 'e':4, 'f':5, 'g':6, 'h':7}
num_2_letter = {0:'a', 1:'b', 2:'c', 3:'d', 4:'e', 5:'f', 6:'g', 7:'h'}

Functions & Classes

In [None]:
def create_rep_layer(board, piece_type):
    s = str(board)
    #print("Raw board string:", s)
    s = re.sub(f"[^{piece_type}{piece_type.upper()} \n]", ".", s)
    #print("After regex substitution:", s)
    s = re.sub(f"{piece_type}|{piece_type.upper()}", "-1", s)
    s = s.replace(".", "0")
    #print("After replacements:", s)

    board_mat = []
    for row in s.split("\n"):
        row = row.split(" ")
        row = [int(x) for x in row]
        board_mat.append(row)

    return np.array(board_mat)
 
def board_2_rep(board):
    pieces = ['p', 'r', 'n', 'b', 'q', 'k']
    layers = []
    for piece in pieces:
        layers.append(create_rep_layer(board, piece))
    board_rep = np.stack(layers)
    return board_rep

def move_2_rep(move, board):
    board.push_san(move).uci()
    move = str(board.pop())

    from_output_layer = np.zeros((8, 8))
    from_row = 8 - int(move[1])
    from_column = letter_2_num[move[0]]
    from_output_layer[from_row, from_column] = 1

    to_output_layer = np.zeros((8, 8))
    to_row = 8 - int(move[3])
    to_column = letter_2_num[move[2]]
    to_output_layer[to_row, to_column] = 1

    return np.stack([from_output_layer, to_output_layer])

# Function to convert move notation to a class index
def move_to_index(move, board):
    try:
        # Parse SAN and convert to UCI
        uci_move = board.parse_san(move).uci()
        from_row = 8 - int(uci_move[1])
        from_col = letter_2_num[uci_move[0]]
        to_row = 8 - int(uci_move[3])
        to_col = letter_2_num[uci_move[2]]

        from_square = from_row * 8 + from_col
        to_square = to_row * 8 + to_col
        return from_square * 64 + to_square
    except Exception as e:
        raise ValueError(f"Error processing move: {move} -> {e}")

def create_move_list(s):
    # Remove move numbers and extra characters
    moves = re.sub(r'\d*\.\s*', '', s).split(' ')
    sanitized_moves = []
    for move in moves:
        move = move.replace('+', '').replace('#', '')  # Remove check/checkmate symbols
        if '=' in move:  # Handle promotion (e.g., e7e8=Q)
            move = move.split('=')[0] + move[-1]  # Keep the base move and promotion piece
        if 'x' in move:  # Handle captures (e.g., Nxe4 -> e4)
            move = move.replace('x', '')
        if move in ['1-0', '0-1', '1/2-1/2']:  # Ignore game results
            continue
        if len(move) >= 2:  # Keep valid moves (e.g., e4, e2e4)
            sanitized_moves.append(move)
    return sanitized_moves

class ChessDataset(Dataset):
    def __init__(self, games):
        super(ChessDataset, self).__init__()
        self.games = games

    def __len__(self):
        return len(self.games)
        #return 1000

    def __getitem__(self, index):
        try:
            random_game = self.games.values[index]
            moves = create_move_list(random_game)
            board = chess.Board()

            positions = []
            labels = []

            for move in moves:
                board_rep = board_2_rep(board).astype(np.float32)
                positions.append(board_rep)

                move_index = move_to_index(move, board)
                labels.append(move_index)

                board.push_san(move)

            return np.stack(positions), np.array(labels)
        except ValueError as e:
            print(f"Error processing game at index {index}: {e}")
            raise

    
def collate_fn(batch):
    positions = []
    labels = []

    for game_positions, game_labels in batch:
        positions.extend(game_positions)
        labels.extend(game_labels)

    positions = np.array(positions, dtype=np.float32)  # Ensure float32 for input
    labels = np.array(labels, dtype=np.int64)  # Ensure int64 for targets

    return torch.from_numpy(positions), torch.from_numpy(labels)



data_train = ChessDataset(chess_data['AN'])
data_train_loader = DataLoader(data_train, batch_size=1, shuffle=True, drop_last=True, collate_fn=collate_fn)
#data_train_loader = DataLoader(data_train, batch_size=8, shuffle=True, num_workers=4, drop_last=True)

Train the CNN

In [None]:
class module(nn.Module):
    def __init__(self, hidden_size):
        super(module, self).__init__()
        # 2 convolutional layers
        self.conv1 = nn.Conv2d(hidden_size, hidden_size, 3, stride=1, padding=1)
        self.conv2 = nn.Conv2d(hidden_size, hidden_size, 3, stride=1, padding=1)

        # batch normalization
        self.bn1 = nn.BatchNorm2d(hidden_size)
        self.bn2 = nn.BatchNorm2d(hidden_size)

        # selu activations
        self.activation1 = nn.SELU()
        self.activation2 = nn.SELU()

    def forward(self, x):
        x_input = torch.clone(x)
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.activation1(x)
        x = self.conv2(x)
        x = self.bn2(x)
        x = x + x_input
        x = self.activation2(x)
        return x
    

# process board representation, output a probability distribution
class ChessNet(nn.Module):
    def __init__(self, hidden_layers=4, hidden_size=200):
        super(ChessNet, self).__init__()
        self.hidden_layers = hidden_layers
        self.input_layer = nn.Conv2d(6, hidden_size, 3, stride=1, padding=1)
        self.module_list = nn.ModuleList([module(hidden_size) for i in range(hidden_layers)])
        
        self.output_layer = nn.Linear(hidden_size * 8 * 8, 64 * 64)

    def forward(self, x):
        x = self.input_layer(x)
        x = F.relu(x)

        for i in range(self.hidden_layers):
            x = self.module_list[i](x)

        x = x.view(x.size(0), -1)
        x = self.output_layer(x)

        return x

Define and Initialize the Model

In [None]:
save_interval = 20_000
num_epochs = 1
num_iters = 500_000
update_counter = 5000

if torch.cuda.is_available():
    print('Using cuda')
    device = torch.device('cuda')
else:
    print('using cpu :(')
    device = torch.device('cpu')

model = ChessNet(hidden_layers=4, hidden_size=200).to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

criterion = nn.CrossEntropyLoss()

# Training loop
total_start_time = time.time()
scaler = torch.cuda.amp.GradScaler()
print('Total Games:', len(chess_data))

loop_counter = 0  # Keep track of total iterations

for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    epoch_start_time = time.time()
    batch_idx = 1

    for x, y in data_train_loader:
        x, y = x.to(device), y.to(device).long()

        optimizer.zero_grad()

        with torch.amp.autocast(device_type='cuda' if torch.cuda.is_available() else 'cpu'):
            output = model(x)
            loss = criterion(output, y)

        scaler.scale(loss).backward()
        scaler.step(optimizer)
        scaler.update()

        running_loss += loss.item()

        if batch_idx % update_counter == 0:
            duration = time.time() - epoch_start_time
            print(f'Batch {batch_idx}, Loss: {loss.item():.4f}, duration: {duration:.2f}')

        loop_counter += 1
        if loop_counter % save_interval == 0:
            save_path = f'chessbot_checkpoint_{loop_counter}.pth'
            torch.save({
                'model_state_dict': model.state_dict(),
                'optimizer_state_dict': optimizer.state_dict(),
                'loop_counter': loop_counter,
                'epoch': epoch
            }, save_path)
            print(f"Checkpoint saved at iteration {loop_counter} to '{save_path}'")

        if batch_idx == num_iters:
            break
        batch_idx += 1

    epoch_end_time = time.time()
    epoch_duration = epoch_end_time - epoch_start_time
    print(f"Epoch {epoch + 1}/{num_epochs}, Average Loss: {running_loss / len(data_train_loader):.4f}, "
          f"Time: {epoch_duration:.2f} seconds")

total_end_time = time.time()
total_duration = total_end_time - total_start_time
print(f"Total Training Time: {total_duration:.2f} seconds")

torch.save(model.state_dict(), 'chessbot_model.pth')
print("Model saved to 'chessbot_model.pth'")


Picking Moves

In [None]:
def check_mate_single(board): # Check if there is checkmate possible
    board = board.copy()
    legal_moves = list(board.legal_moves)

    for move in legal_moves:
        board.push_uci(str(move))
        if board.is_checkmate():
            move = board.pop()
            return move
        _ = board.pop()

def distribution_over_moves(vals): # Softmax
    probs = np.array(vals)
    probs = np.exp(probs)
    probs = probs / probs.sum()
    probs = probs ** 3
    probs = probs / probs.sum()
    return probs

def predict(x):
    model.eval()

    with torch.no_grad():
        output = model(x)

    return output

def choose_move(board, model, color):
    x = torch.Tensor(board_2_rep(board)).float().to(device)
    if color == chess.BLACK:
        x *= -1
    x = x.unsqueeze(0)

    with torch.no_grad():
        output = model(x)  # Output shape: (1, 4096)
        probabilities = F.softmax(output, dim=1)
        _, predicted_index = torch.max(probabilities, 1)
        predicted_index = predicted_index.item()

    from_square = predicted_index // 64
    to_square = predicted_index % 64
    from_row, from_col = divmod(from_square, 8)
    to_row, to_col = divmod(to_square, 8)
    from_square_uci = num_2_letter[from_col] + str(8 - from_row)
    to_square_uci = num_2_letter[to_col] + str(8 - to_row)
    move_uci = from_square_uci + to_square_uci

    legal_moves = [move.uci() for move in board.legal_moves]
    if move_uci in legal_moves:
        return move_uci
    else:
        # If the predicted move is illegal, choose a random legal move
        return random.choice(legal_moves)


### Results
The chess ai plays pretty well, it seems to have a decent understanding of the game and doesn't make too many blunders (bad moves for example where you completely lose a chess piece). It often makes some pretty random moves, but they aren't bad by any means. I would say the level of the bot matches up with an intermediate chess player.

### Conclusion
In this project I made a chess ai that predicts what move to play. I imported many high level chess games and turned them into numerical format that is suitable for machine learning. I used a convolutional neural network to learn how to predict the moves by analyzing the current state of the board.  
The results were pretty resonable, with the bot matching up to the chess strength of an intermediate player.  
There is definitely room for improvement, for example by integrated reinforcement learning or using advanced search algorithms like the Monte Carlo Tree Search combined with pruning.