# Standard Imports

In [204]:
# imports
import requests
from io import StringIO

import chess
import chess.svg
import chess.engine
import chess.pgn

import random
import re
import numpy as np
import pandas as pd

# Load Data

In [205]:
tahlon_raw = 'https://raw.githubusercontent.com/TahlonBrahic/Data-Science-Portfolio/main/Data%20Science/Brother%20Chess%20Bot/tahlon_games.pgn'
tyler_raw = 'https://raw.githubusercontent.com/TahlonBrahic/Data-Science-Portfolio/main/Data%20Science/Brother%20Chess%20Bot/tahlon_games.pgn'

tahlon_response = requests.get(tahlon_raw)
tyler_response = requests.get(tyler_raw)

tahlon_pgn = StringIO(tahlon_response.text)
tyler_pgn = StringIO(tyler_response.text)


# Fix Games

In [206]:
def pgn_to_dataframe(pgn):
    games = []
    for game in pgn:
        game = chess.pgn.read_game(pgn)
        games.append(game)
        df = pd.DataFrame(games)
    return df

In [207]:
pd.set_option('display.max_colwidth', None)


tahlon_games = pgn_to_dataframe(tahlon_pgn)
tyler_games = pgn_to_dataframe(tyler_pgn)

games_list = [tahlon_games, tyler_games]

# Testing Functions

In [208]:
# random board
def random_board(max_depth=200):
    board = chess.Board()
    depth = random.randrange(0, max_depth)

    for _ in range(depth):
        all_moves = list(board.legal_moves)
        random_move = random.choice(all_moves)
        board.push(random_move)
        if board.is_game_over():
            break
    
    return board

# position score
def position_score(board, depth):
    engine = chess.engine.SimpleEngine.popen_uci('/content/stockfish')
    with engine:
        result = engine.analyse(board, chess.engine.Limit(depth=depth))
        score = result['score'].white().score()
        return score

# Transionlation Dictionary
In order for our model to interpret the chess board we have to use a python dictionary to translate chess algebraic notiation into integers.

In [209]:
algebraic_translation_dictionary = {'a':1,'b':2,'c':3,'d':4,'e':5,'f':6,'g':7,'h':8}

# Clean Data
As you can see above our data is very messy. We need to clean it up so the algebraic representation can be tranlated to the UCI representation. Therefore it should look something like this when we are done: 1. e3 d5 2. d4 Nf3 ect...

In [210]:
def rename_column(df):
    df.rename(columns = {0:'games'}, inplace = True)

In [211]:
def clean_data(df):
    df.games = df.games.apply(lambda x: re.split(r"\n\n", str(x))[-1]) # removes inital text
    df.games = df.games.apply(lambda x: re.sub(r'\{[^{}]*\}', '', x)) # removes clock
    df.games = df.games.apply(lambda x: re.sub(r'\.\.', ' ', x)) # removes trailing dots

In [212]:
for game in games_list:
    rename_column(game)
    clean_data(game)

# Feature Engineering
We will be using deep learning to create this chess bot. In order to teach this neural network we have to transform the chess board into 3D tensors for each chess piece.
The framework for these data extraction functions were developed by Moran Reznik in this video: https://www.youtube.com/watch?v=aOwvRvTPQrs&t=371s&ab_channel=MoranReznik

In [None]:
# current work: fix feature engineering to actually process data to be fed to neural network

In [213]:
def create_rep_layer(board, piece):
    s = str(board)
    s = re.sub(f'[^{piece}{piece.upper()}]', '0', s)
    s = re.sub(f'{piece}', '-1', s)
    s = re.sub(f'{piece.upper()}', '1', s)

    board_mat = []
    for row in s.split('\n'):
        row = row.split(' ')
        row = [int(x) for x in row] # Replace string type of number with integer type
        board_mat.append(row)
    
    return np.array(board_mat) # Create 3-d representation of the board

def board_to_matrix(board):
    pieces = ['p','r','n','b','q','k']
    layers = []
    for piece in pieces:
        layers.append(create_rep_layer(board, piece))
    board_rep = np.stack(layers)
    return board_rep

def move_to_matrix(move, board): 
    board.push_san(move).uci() #convert to algebraic to uci here
    move = str(board.pop())

    from_output_layer = np.zeros((8,8))
    from_row = 8 - int(move[1])
    from_column = algebraic_translation_dictionary[move[0]]
    from_output_layer[from_row,from_column] = 1

    to_output_layer = np.zeroes((8,8))
    to_row = 8 - int(move[3])
    to_column = algebraic_translation_dictionary[move[2]]
    to_output_layer[to_row, to_column] = 1

    return np.stack([from_output_layer, to_output_layer])

def create_move_list(s):
    s = str(s)
    return re.sub('\d*\. ', '', s).split(' ')[:-1]



In [226]:
test_game = create_move_list(tyler_games.iloc[0])


['games',
 '',
 '',
 '',
 'd4',
 '',
 '1',
 'Nc6',
 '',
 'Nc3',
 '',
 '2',
 'b6',
 '',
 'Bf4',
 '',
 '3',
 'Ba6',
 '',
 'e4',
 '',
 '4',
 'e5',
 '',
 'Bxa6',
 '',
 '5',
 'exf4',
 '',
 'g4',
 '',
 '6',
 'Nxd4',
 '',
 'Qxd4',
 '',
 '7',
 'Ne7',
 '',
 'Qe5',
 '',
 '8',
 'd6',
 '',
 'Qxf4',
 '',
 '9',
 'g5',
 '',
 'Qxg5',
 '',
 '10',
 'Bg7',
 '',
 'Qxg7',
 '',
 '11',
 'Rg8',
 '',
 'Qxh7',
 '',
 '12',
 'f6',
 '',
 'O-O-O',
 '',
 '13',
 'b5',
 '',
 'Re1',
 '',
 '14',
 'b4',
 '',
 'Na4',
 '',
 '15',
 'Rb8',
 '',
 'b3',
 '',
 '16',
 'c5',
 '',
 'Nc3',
 '',
 '17',
 'bxc3',
 '',
 'Ne2',
 '',
 '18',
 'd5',
 '',
 'Rhg1',
 '',
 '19',
 'd4',
 '',
 'g5',
 '',
 '20',
 'fxg5',
 '',
 'Rxg5',
 '',
 '21',
 'Rb6',
 '',
 'Rxg8+',
 '',
 '22',
 'Nxg8',
 '',
 'Qxg8+',
 '',
 '23',
 'Kd7',
 '',
 'Qxd8+',
 '',
 '24',
 'Kxd8',
 '',
 'Rh1',
 '',
 '25',
 'Rxa6',
 '',
 'h4',
 '',
 '26',
 'Rxa2',
 '',
 'h5',
 '',
 '27',
 'a5',
 '',
 'Kb1',
 '',
 '28',
 'Ra3',
 '',
 'h6',
 '',
 '29',
 'Ke8',
 '',
 'h7',
 '',
 '30',
 'd

In [215]:
from torch.utils.data import Dataset

class ChessDataset(Dataset):

    def __init__(self, games):
        super(ChessDataset, self).__init__()
        self.games = games
    
    def __len__(self):
        return 40_000

    def __getitem__(self, index): # select a random game then a random move from that game
        game_i = np.random.randint(self.games.shape[0])
        random_game = tahlon_games.games.values[game_i]
        moves = create_move_list(random_game)
        game_state_i = np.random.randint(len(moves)-1)
        next_move = moves[game_state_i]
        moves = moves[:game_state_i]
        board = chess.Board()
        for move in moves:
            board.push_san(move)
        x = board_to_matrix(board)
        y = move_to_matrix(next_move, board)
        if game_state_i % 2 == 1: 
            x *= -1
            return x,y

# Convolutional Neural Network
The data we are training on is a collection of 25 games played by my brother Tyler. The purpose of this bot is to recreate his playing style.

In [216]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class module(nn.Module):
    def __init__(self, hidden_size):
        super(module, self).__init__()
        self.conv1 = nn.Conv2d(hidden_size, hidden_size, 3, stride=1, padding=1)
        self.conv2 = nn.Conv2d(hidden_size, hidden_size, 3, stride=1, padding=1)
        self.bn1 = nn.BatchNorm2d(hidden_size)
        self.bn2 = nn.BatchNorm2d(hidden_size)
        self.activation1 = nn.SELU()
        self.activation2 = nn.SELU()

    def forward(self, x):
        x_input = torch.clone(x)
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.activation1(x)
        x = self.conv2(x)
        x = self.bn2(x)
        x = x + x_input
        x = self.activation2(x)
        return x
    
class ChessNet(nn.Module):
    def __init__(self, hidden_layers=4, hidden_size=200):
        super(ChessNet, self).__init__()
        self.hidden_layers = hidden_layers
        self.input_layer = nn.Conv2d(6, hidden_size, 3, stride=1, padding=1)
        self.module_list = nn.ModuleList([module(hidden_size) for i in range(hidden_layers)])
        self.output_layer = nn.Conv2d(hidden_size, 2, 3, stride=1, padding=1)

        def foward(self, x):
            x = self.input_layer(x)
            x = F.relu(x)
            for i in range(self.hidden_layers):
                x = self.module_list[i](x)
            x = self.output_layer(x)
            return x

# Training the model

In [217]:
from torch.utils.data import DataLoader

data_train = tyler_games.games
data_train_loader = DataLoader(data_train, batch_size=32, shuffle=True, drop_last=True)

In [218]:
chess_net = ChessNet(hidden_layers=4, hidden_size=200)
loss_function = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(chess_net.parameters(), lr=0.001)

num_epochs = 10  

for epoch in range(num_epochs):
    for inputs, targets in data_train_loader:
        optimizer.zero_grad()

        # Forward pass
        outputs = chess_net(inputs)
        loss = loss_function(outputs, targets)

        # Backward pass and optimization
        loss.backward()
        optimizer.step()

    print(f'Epoch {epoch + 1}/{num_epochs}, Loss: {loss}') # need to fix loss

Epoch 1/10, Loss: 0
Epoch 2/10, Loss: 0
Epoch 3/10, Loss: 0
Epoch 4/10, Loss: 0
Epoch 5/10, Loss: 0
Epoch 6/10, Loss: 0
Epoch 7/10, Loss: 0
Epoch 8/10, Loss: 0
Epoch 9/10, Loss: 0
Epoch 10/10, Loss: 0


# Choosing Move
The reason we have both a predict and a choose move function is that there may be some cases that we do not want our model to predict the next move. Instead we want to rely on our algorithm to decide like in cases where there is a check in one.

In [219]:
def check_mate_single(board):
    board = board.copy()
    legal_moves = list(board.legal_moves)
    for move in legal_moves:
        board.push_uci(str(move))
        if board.is_checkmate():
            move = board.pop()
            return move
        _ = board.pop()

def distribution_over_moves(vals):
    probs = np.array(vals)
    probs = np.exp(probs)
    probs = probs / probs.sum()
    probs = probs ** 3
    probs = probs / probs.sum()
    return probs

def predict_next_move(board):
    pass

def choose_move(board, player, color):
    legal_moves = list(board.legal_moves)
    move = check_mate_single(board)
    if move is not None:
        return move
    
    x = torch.Tensor(board_to_matrix(board)).float().to('cuda')
    if color == chess.BLACK:
        x *= -1
    x = x.unsqueeze(0)
    move = chess_net.predict_next_move(x)

    vals = []
    froms = [str(legal_move)[:2] for legal_move in legal_moves]
    froms = list(set(froms))
    for from_ in froms:
        vals = move[0,:,:][8 - int(from_[1]), algebraic_translation_dictionary[from_[0]]]
        vals.append(val)

    probs = distribution_over_moves(vals)

    chosen_from = str(np.random.choice(froms, size=1, p=probs)[0][:2])
    vals = []
    for legal_move in legal_moves:
        from_ = str(legal_move)[:2]
        if from_ == chosen_from:
            to = str(legal_move)[2:]
            val = move[1,:,:][8 - int(to[1], algebraic_translation_dictionary[to[0]])]
            vals.append(val)
        else:
            vals.append(0)

    choosen_move = legal_moves[np.argmax(vals)]
    return choosen_move