<a href="https://colab.research.google.com/github/BuweiChen/mini-alpha-go-for-chess/blob/main/supervised_learning.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
! pip install kaggle -q
! mkdir ~/.kaggle
! cp kaggle.json ~/.kaggle/
! kaggle datasets download arevel/chess-games
! unzip -qq /content/chess-games.zip

mkdir: cannot create directory ‘/root/.kaggle’: File exists
Dataset URL: https://www.kaggle.com/datasets/arevel/chess-games
License(s): CC0-1.0
Downloading chess-games.zip to /content
 99% 1.44G/1.45G [00:22<00:00, 54.8MB/s]
100% 1.45G/1.45G [00:22<00:00, 68.6MB/s]


In [None]:
! pip install chess -q

In [None]:
letter_to_num = {'a': 0, 'b': 1, 'c': 2, 'd': 3, 'e': 4, 'f': 5, 'g': 6, 'h': 7}
num_to_letter = {0: 'a', 1: 'b', 2: 'c', 3: 'd', 4: 'e', 5: 'f', 6: 'g', 7: 'h'}

In [None]:
import numpy as np
import re
import chess
import pandas as pd
from torch.utils.data import Dataset, DataLoader
import torch
from torch import nn
from torch.nn import functional as F
import torch.optim as optim
import gc

In [None]:
def board_to_rep(board):
  pieces = ['p', 'r', 'n', 'k', 'q', 'b']
  layers = []
  for piece in pieces:
    layers.append(create_rep_layer(board, piece))
  board_rep = np.stack(layers)
  return board_rep

In [None]:
def create_rep_layer(board, type):
  s = str(board)
  s = re.sub(f'[^{type}{type.upper()} \n]', '.', s)
  s = re.sub(f'{type}', '-1', s)
  s = re.sub(f'{type.upper()}', '1', s)
  s = re.sub(f'\.', '0', s)
  board_mat = []
  for row in s.split('\n'):
    row = row.split(' ')
    row = [int(x) for x in row]
    board_mat.append(row)

  return np.array(board_mat)

In [None]:
test_board = chess.Board("r1bqkb1r/pppp1Qpp/2n2n2/4p3/2B1P3/8/PPPP1PPP/RNB1K1NR b KQkq - 0 4")
print(test_board)
print(board_to_rep(test_board))

r . b q k b . r
p p p p . Q p p
. . n . . n . .
. . . . p . . .
. . B . P . . .
. . . . . . . .
P P P P . P P P
R N B . K . N R
[[[ 0  0  0  0  0  0  0  0]
  [-1 -1 -1 -1  0  0 -1 -1]
  [ 0  0  0  0  0  0  0  0]
  [ 0  0  0  0 -1  0  0  0]
  [ 0  0  0  0  1  0  0  0]
  [ 0  0  0  0  0  0  0  0]
  [ 1  1  1  1  0  1  1  1]
  [ 0  0  0  0  0  0  0  0]]

 [[-1  0  0  0  0  0  0 -1]
  [ 0  0  0  0  0  0  0  0]
  [ 0  0  0  0  0  0  0  0]
  [ 0  0  0  0  0  0  0  0]
  [ 0  0  0  0  0  0  0  0]
  [ 0  0  0  0  0  0  0  0]
  [ 0  0  0  0  0  0  0  0]
  [ 1  0  0  0  0  0  0  1]]

 [[ 0  0  0  0  0  0  0  0]
  [ 0  0  0  0  0  0  0  0]
  [ 0  0 -1  0  0 -1  0  0]
  [ 0  0  0  0  0  0  0  0]
  [ 0  0  0  0  0  0  0  0]
  [ 0  0  0  0  0  0  0  0]
  [ 0  0  0  0  0  0  0  0]
  [ 0  1  0  0  0  0  1  0]]

 [[ 0  0  0  0 -1  0  0  0]
  [ 0  0  0  0  0  0  0  0]
  [ 0  0  0  0  0  0  0  0]
  [ 0  0  0  0  0  0  0  0]
  [ 0  0  0  0  0  0  0  0]
  [ 0  0  0  0  0  0  0  0]
  [ 0  0  0  0  0  0  0  0

In [None]:
def move_to_rep(move, board):
  board.push_san(move).uci()
  move = str(board.pop())

  from_output_layer = np.zeros((8,8))
  from_row = 8 - int(move[1])
  from_column = letter_to_num[move[0]]
  from_output_layer[from_row, from_column] = 1

  to_output_layer = np.zeros((8,8))
  to_row = 8 - int(move[3])
  to_column = letter_to_num[move[2]]
  to_output_layer[to_row, to_column] = 1

  return np.stack([from_output_layer, to_output_layer])

In [None]:
def create_move_list(s):
  return re.sub('\d*\. ', '', s).split(' ')[:-1]

In [None]:
chess_data_raw = pd.read_csv('/content/chess_games.csv', usecols=['AN', 'WhiteElo'])
chess_data = chess_data_raw[chess_data_raw['WhiteElo'] > 2000]
del chess_data_raw
gc.collect()
chess_data = chess_data[['AN']]
chess_data = chess_data[~chess_data['AN'].str.contains('{')]
chess_data = chess_data[chess_data['AN'].str.len() > 20]
print(chess_data.shape[0])

883376


In [None]:
class ChessDataset(Dataset):
  def __init__(self, games):
    super(ChessDataset, self).__init__()
    self.games = games

  def __len__(self):
    return 40000

  def __getitem__(self, index):
    game_i = np.random.randint(self.games.shape[0])
    random_game = chess_data['AN'].values[game_i]
    moves = create_move_list(random_game)
    game_state_i = np.random.randint(len(moves) - 1)
    next_move = moves[game_state_i]
    moves = moves[:game_state_i]
    board = chess.Board()
    for move in moves:
      board.push_san(move)
    x = board_to_rep(board)
    y = move_to_rep(next_move, board)
    if game_state_i % 2 == 1:
      x *= -1
    return x, y

In [None]:
data_train = ChessDataset(chess_data['AN'])
data_train_loader = DataLoader(data_train, batch_size=32, shuffle=True, drop_last=True)

In [None]:
class module(nn.Module):
  def __init__(self, hidden_size):
    super(module, self).__init__()
    self.conv1 = nn.Conv2d(hidden_size, hidden_size, 3, stride=1, padding=1)
    self.conv2 = nn.Conv2d(hidden_size, hidden_size, 3, stride=1, padding=1)
    self.bn1 = nn.BatchNorm2d(hidden_size)
    self.bn2 = nn.BatchNorm2d(hidden_size)
    self.activation1 = nn.SELU()
    self.activation2 = nn.SELU()
  def forward(self, x):
    x_input = torch.clone(x)
    x = self.conv1(x)
    x = self.bn1(x)
    x = self.activation1(x)
    x = self.conv2(x)
    x = self.bn2(x)
    x = x + x_input # residual connections
    x = self.activation2(x)
    return x

In [None]:
class ChessNet(nn.Module):
  def __init__(self, hidden_layers=4, hidden_size=200):
    super(ChessNet, self).__init__()
    self.hidden_layers = hidden_layers
    self.input_layer = nn.Conv2d(6, hidden_size, 3, stride=1, padding=1)
    self.module_list = nn.ModuleList([module(hidden_size) for i in range(hidden_layers)])
    self.output_layer = nn.Conv2d(hidden_size, 2, 3, stride=1, padding=1)

  def forward(self, x):
    x = self.input_layer(x)
    x = F.relu(x)

    for i in range(self.hidden_layers):
      x = self.module_list[i](x)

    x = self.output_layer(x)

    return x

In [None]:
if torch.cuda.is_available():
  device = torch.device("cuda")
  print("CUDA is available. Training on GPU.")
else:
  device = torch.device("cpu")
  print("CUDA not available. Training on CPU.")

# Initialize the ChessNet model

model_epoch = 9

model = ChessNet(hidden_layers=4, hidden_size=200).to(device)
if model_epoch >= 0:
  model.load_state_dict(torch.load(f'chess_model_epoch_{model_epoch}.pth'))
model.train()  # Set the model to training mode

# Define the optimizer
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Define separate loss functions for different parts of the output
metric_from = nn.CrossEntropyLoss()
metric_to = nn.CrossEntropyLoss()

# Training loop
num_epochs = 100  # Number of epochs to train for
for epoch in range(num_epochs):
  epoch += model_epoch + 1
  total_loss = 0.0
  for i, (x, y) in enumerate(data_train_loader):
    # Move data to the appropriate device (e.g., GPU or CPU)
    x = x.float().to(device)
    y = y.float().to(device)

    # Zero the parameter gradients
    optimizer.zero_grad()

    # Forward pass
    output = model(x)

    # Compute loss for both outputs
    loss_from = metric_from(output[:, 0, :], y[:, 0, :])
    loss_to = metric_to(output[:, 1, :], y[:, 1, :])
    loss = loss_from + loss_to

    # Backward pass and optimize
    loss.backward()
    optimizer.step()

    # Print statistics
    total_loss += loss.item()
    if (i + 1) % 100 == 0:  # Print every 100 mini-batches
      print(f'Epoch [{epoch + 1}/{num_epochs}], Step [{i + 1}/{len(data_train_loader)}], Loss: {total_loss / 100:.4f}')
      total_loss = 0.0

  torch.save(model.state_dict(), f'chess_model_epoch_{epoch}.pth')

print('Finished Training')

CUDA is available. Training on GPU.
Epoch [11/100], Step [100/1250], Loss: 0.1435
Epoch [11/100], Step [200/1250], Loss: 0.1427
Epoch [11/100], Step [300/1250], Loss: 0.1428
Epoch [11/100], Step [400/1250], Loss: 0.1399
Epoch [11/100], Step [500/1250], Loss: 0.1400
Epoch [11/100], Step [600/1250], Loss: 0.1421
Epoch [11/100], Step [700/1250], Loss: 0.1389
Epoch [11/100], Step [800/1250], Loss: 0.1440
Epoch [11/100], Step [900/1250], Loss: 0.1393
Epoch [11/100], Step [1000/1250], Loss: 0.1365
Epoch [11/100], Step [1100/1250], Loss: 0.1374
Epoch [11/100], Step [1200/1250], Loss: 0.1410
Epoch [12/100], Step [100/1250], Loss: 0.1407
Epoch [12/100], Step [200/1250], Loss: 0.1409
Epoch [12/100], Step [300/1250], Loss: 0.1389
Epoch [12/100], Step [400/1250], Loss: 0.1413
Epoch [12/100], Step [500/1250], Loss: 0.1369
Epoch [12/100], Step [600/1250], Loss: 0.1381
Epoch [12/100], Step [700/1250], Loss: 0.1368
Epoch [12/100], Step [800/1250], Loss: 0.1347
Epoch [12/100], Step [900/1250], Loss: 0.