In [1]:
# System
import os
import pickle
import numpy as np

# ML
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

In [2]:
# Model parameters

EPOCHS = 2
BATCH_SIZE = 64
LEARNING_RATE = 0.1


In [3]:
# Define path where the dataset is located
dataset_path = './'

def get_data():
    with open(dataset_path + 'X.pkl', 'rb') as f:
        X = pickle.load(f)

    with open(dataset_path + 'y.pkl', 'rb') as f:
        y = pickle.load(f)

    #print(X.shape)
    #print(y.shape)
    return X,y

# Get the input data and true labels
X,y = get_data()

In [4]:
# Define Custom Dataloaders

from torch.utils.data import Dataset, DataLoader

class trainData(Dataset):   
    def __init__(self, X_data, y_data):
        
        self.X_data = X_data
        self.y_data = y_data
        
    def __getitem__(self, index):
        return self.X_data[index], self.y_data[index]
        
    def __len__ (self):
        return len(self.X_data)
    
    def __getXshape__(self):
        return self.X_data.size()

    def __getYshape__(self):
        return self.y_data.size()


train_data = trainData(torch.FloatTensor(X), 
                       torch.FloatTensor(y))


In [5]:
# Check the shape of the dataset
print(train_data.__getXshape__())
print(train_data.__getYshape__())

torch.Size([4537, 8, 8, 12])
torch.Size([4537, 1, 64, 64])


In [6]:
# Initialize data loaders
train_loader = DataLoader(dataset=train_data, batch_size=BATCH_SIZE, shuffle=True)


In [7]:
# Checking the shapes for the input and output matrices

test_input, test_output = next(iter(train_loader))
print(test_input.size())
print(test_output.size())

# Permute the matrices because pytorch accepts this format: (batch, channels, H, W)
print("Permute \n")

print(test_input.permute(0,3,2,1).size())
print(test_output.permute(0,3,2,1).size())


torch.Size([64, 8, 8, 12])
torch.Size([64, 1, 64, 64])
Permute 

torch.Size([64, 12, 8, 8])
torch.Size([64, 64, 64, 1])


In [8]:
class TorchNeuralNetwork(nn.Module):
    def __init__(self):
        super(TorchNeuralNetwork, self).__init__()
        self.conv1 = nn.Conv2d(in_channels = 12, out_channels = 64, kernel_size = 2, stride = (2,2))
        self.conv2 = nn.Conv2d(in_channels = 64, out_channels = 128, kernel_size = 2, stride = (2,2))
        self.conv3 = nn.Conv2d(in_channels = 128, out_channels = 256, kernel_size = 2, stride = (2,2))
        self.fc = nn.Linear(in_features=256, out_features = 4096) # fully connected layer

    def forward(self, inputs):
        #print("InputLayer: ", inputs.size())
        x = self.conv1(inputs)
        #print("Conv2d (conv2D): ", x.size())
        x = self.conv2(x)
        #print("Conv2d_1 (conv2D): ", x.size())
        x = self.conv3(x)
        #print("Conv2d_2 (conv2D): ", x.size())
        x = torch.flatten(x,start_dim=1)        
        #print("flatten (Flatten): ", x.size())
        x = self.fc(x)        
        #print("dense (Dense): ", x.size())
                
        output_reshaped = x.view(64,1,64,64)
                
        return output_reshaped.permute(0,3,2,1)


In [9]:
# Run on GPU if available
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print("Device type: ", device)

model = TorchNeuralNetwork()
model.to(device)
print(model)

Device type:  cpu
TorchNeuralNetwork(
  (conv1): Conv2d(12, 64, kernel_size=(2, 2), stride=(2, 2))
  (conv2): Conv2d(64, 128, kernel_size=(2, 2), stride=(2, 2))
  (conv3): Conv2d(128, 256, kernel_size=(2, 2), stride=(2, 2))
  (fc): Linear(in_features=256, out_features=4096, bias=True)
)


In [10]:
# Loss
criterion = nn.CrossEntropyLoss() # no sigmoid/softmax needed in the model definition

# Optimizer
optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)


In [11]:
# Define accuracy of the model

def binary_acc(y_pred, y_test):
    y_pred_tag = y_pred
    
    correct_results_sum = (y_pred_tag == y_test).sum().float()
    acc = correct_results_sum/y_test.shape[0]
    acc = torch.round(acc * 100)
    
    return acc

In [12]:
model.train()
total_step = len(train_loader)
for e in range(1, EPOCHS+1):
    epoch_loss = 0
    epoch_acc = 0
    for i, (X_batch_original, y_batch_original) in enumerate(train_loader):
        X_batch = X_batch_original.permute(0,3,2,1)
        y_batch = y_batch_original.permute(0,3,2,1)

        optimizer.zero_grad()                
        
        #X_batch, y_batch = X_batch.to(device), y_batch.to(device)
        
        y_pred = model(X_batch)      
                
        target = torch.argmax(y_batch, 1)
        
        loss = criterion(y_pred, target) 
        
        #acc = binary_acc(y_pred, y_batch)
        # Backprop and perform Adam optimisation
        loss.backward()
        optimizer.step()
        
        epoch_loss += loss.item()
        #epoch_acc += acc.item()
        epoch_acc = 100 
        
        if (i + 1) % 100 == 0:
            print('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}, Accuracy: {:.3f}%'
                  .format(e, EPOCHS, i + 1, total_step, epoch_loss/total_step, epoch_acc/total_step))
            
        # Break the training after a certain amount
        # Used for debugging only
        if i == 50:
            break

    #print(f'Epoch {e+0:03}:  |  Loss: {epoch_loss/len(train_loader):.5f}  |  Acc: {epoch_acc/len(train_loader):.3f}')

In [13]:
# Save the model

# Specify a path
PATH = "chess_engine_pytorch_CNN.pt"

# Save
torch.save(model.state_dict(), PATH)

# Test the model

In [14]:
# Load the model

# Specify a path
PATH = "chess_engine_pytorch_CNN.pt"

# Load
model = TorchNeuralNetwork()
model.load_state_dict(torch.load(PATH))
model.eval()

TorchNeuralNetwork(
  (conv1): Conv2d(12, 64, kernel_size=(2, 2), stride=(2, 2))
  (conv2): Conv2d(64, 128, kernel_size=(2, 2), stride=(2, 2))
  (conv3): Conv2d(128, 256, kernel_size=(2, 2), stride=(2, 2))
  (fc): Linear(in_features=256, out_features=4096, bias=True)
)

In [15]:
from board_conversion import *
import chess
from keras.models import load_model
from time import sleep
from IPython.display import display, HTML, clear_output

        
board = chess.Board()

def display_board(board, use_svg):
    if use_svg:
        return board._repr_svg_()
    else:
        return "<pre>" + str(board) + "</pre>"

moves = []
pgn = ''
counter = 1

In [16]:
import os

def save_svg(svg, filepath):
    """
    Save svg content in filepath

    :param str  svg:        SVG content
    :param str  filepath:   Path of the SVG file to save
    :return:
    """
    try:
        file_handle = open(filepath, 'w')
    except IOError as e:
        print(str(e))
        exit(1)

    file_handle.write(svg)
    file_handle.close() 



In [None]:
game_length = 50
for i in range(game_length):
    if i % 2 == 0:
        pgn += str(counter)+ '.'
        counter += 1
        
    # Get the matrix of the current board        
    translated = translate_board(board)
    
    # Convert the matrix to a tensor
    t = torch.from_numpy(translated.astype(np.float32))
    
    # Expand the tensor to take into account the batch dimension and rotate 
    board_tensor = t.expand(64, -1,-1,-1).permute(0,3,2,1)

    
    move_matrix_tensor = model(board_tensor)
    move_matrix = move_matrix_tensor.detach().numpy()
    move_matrix = filter_legal_moves(board,move_matrix)

    move= np.unravel_index(np.argmax(move_matrix, axis=None), move_matrix.shape)
    move = chess.Move(move[0],move[1])
    
    string = str(board.san(move))+' '
    pgn+=string
    moves.append(board.san(move))
    board.push(move)
    html = display_board(board, True)
    
    # Save the images
    save_svg(html, "./output_images/image-move-"+str(i)+".svg")
    clear_output(wait=True)
    display(HTML(html))
    sleep(1)
    
    if board.is_game_over():
        break

# Make a gif of the output game

### We are converting svg to pdf 
### Then from pdf to png because there is an issue when converting svg to png directly

In [None]:
import imageio # create a gif

# Convert svg to png
from svglib.svglib import svg2rlg
from reportlab.graphics import renderPM
import cairosvg
    
# Convert pdf to png
from pdf2image import convert_from_path

    
def convert_all(PATH):
    images = []
    for file_name in sorted(os.listdir(PATH)):
        if file_name.endswith('.svg'):
            file_path = os.path.join(PATH, file_name)
            convert_svg_to_pdf(file_path)
            pdf_name = file_path[:-3]+"pdf"
            convert_pdf_to_png(pdf_name)
    print("Finished converting all files to png")


def convert_svg_to_pdf(input_file):
    cairosvg.svg2pdf(url=input_file, write_to=input_file[:-3]+"pdf")
    

def convert_pdf_to_png(pdf_path):
    images = convert_from_path(pdf_path)
    for index, image in enumerate(images):
        image.save(pdf_path[:-3]+"png")
    

In [None]:
convert_all("./output_images")

In [None]:
# Create a final gif

PATH = "./output_images/"
images_list = []
for image_number in range(game_length):
    file_path = os.path.join(PATH, "image-move-"+str(image_number)+".png")
    images_list.append(imageio.imread(file_path))
imageio.mimsave('movie.gif', images_list, fps=1)
            
    