In [1]:
import chess

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from flatbuffers.packer import float32
from tensorflow.python.keras.backend import learning_phase

import encoding_tools as EncodingTools

from model import ChessNet
from train import train

import torch
from torch.utils.data import DataLoader, Dataset


MODE = "DEBUG"  # If in release mode, please comment this line
# MODE = "RELEASE"

In [2]:
# Pulling in training data using Pandas
train_df = pd.read_csv('datasets/train.csv', index_col='id')

train_df = train_df[:10000] if MODE == "DEBUG" else train_df[:25000]

# We'll also grab the last 1000 examples as a validation set
val_df = train_df[-1000:]

# We'll stack all our encoded boards into a single numpy array
X_train = np.stack(train_df['board'].apply(EncodingTools.encode_fen_string))
y_train = train_df['black_score']

X_val = np.stack(val_df['board'].apply(EncodingTools.encode_fen_string))
y_val = val_df['black_score']

In [11]:
# Instantiate the model
model = ChessNet()

# Move tensors to device if CUDA or MPS is available
if torch.cuda.is_available():
    device = "cuda"
# elif torch.mps.is_available():
#     device = "mps"
else:
    device = "cpu"

model = model.to(device)

In [12]:
# Custom Dataset class to handle X and y pairs
class ChessDataset(Dataset):
    def __init__(self, X, y):
        self.X = X
        self.y = y

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

my_dtype = torch.float32

# Convert training and validation data to PyTorch tensors
X_train_tensor = torch.tensor(X_train, dtype=my_dtype)
print("X_train_tensor shape:", X_train_tensor.shape) if MODE == "DEBUG" else None

y_train_tensor = torch.tensor(y_train.values, dtype=my_dtype).unsqueeze(1)  # Add extra dimension for compatibility
print("Y_train_tensor shape:", y_train_tensor.shape) if MODE == "DEBUG" else None

X_val_tensor = torch.tensor(X_val, dtype=my_dtype)
print("X_val_tensor shape:", X_val_tensor.shape) if MODE == "DEBUG" else None

y_val_tensor = torch.tensor(y_val.values, dtype=my_dtype).unsqueeze(1)  # Add extra dimension for compatibility
print("Y_val_tensor shape:", y_val_tensor.shape) if MODE == "DEBUG" else None

# Create Dataset objects for training and validation
train_dataset = ChessDataset(X_train_tensor, y_train_tensor)
val_dataset = ChessDataset(X_val_tensor, y_val_tensor)

# Create DataLoaders for batching
batch_size = 256
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)

X_train_tensor, y_train_tensor = X_train_tensor.to(device), y_train_tensor.to(device)
X_val_tensor, y_val_tensor = X_val_tensor.to(device), y_val_tensor.to(device)

X_train_tensor shape: torch.Size([10000, 8, 8, 13])
Y_train_tensor shape: torch.Size([10000, 1])
X_val_tensor shape: torch.Size([1000, 8, 8, 13])
Y_val_tensor shape: torch.Size([1000, 1])


In [13]:
from tqdm import tqdm
import torch.nn as nn
import torch.optim as optim

n_epochs = 40
learning_rate = 0.001
train_losses, val_losses = [], []
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
loss_fn = nn.MSELoss().to(device)

for epoch in range(n_epochs):
    model.train()  # Set model to training mode
    epoch_train_loss = 0

    for X_batch, y_batch in tqdm(train_loader, desc=f"Epoch {epoch + 1}/{n_epochs} [Training]"):
        X_batch = X_batch.permute(0, 3, 1, 2).to(device)
        y_batch = y_batch.to(device)
        # Change from [batch_size, height, width, channels] to [batch_size, channels, height, width]
        # print("X_batch shape:", X_batch.shape) if MODE == "DEBUG" else None
        # print("Y_batch shape:", y_batch.shape) if MODE == "DEBUG" else None
        optimizer.zero_grad()
        predictions = model(X_batch)
        loss = loss_fn(predictions, y_batch)
        loss.backward()
        optimizer.step()
        epoch_train_loss += loss.item()

    # Average training loss for the epoch
    train_losses.append(epoch_train_loss / len(train_loader))
    print(f"Epoch {epoch + 1}: Training Loss = {train_losses[-1]}")

    # Validation loop
    model.eval()  # Set model to evaluation mode
    epoch_val_loss = 0
    with torch.no_grad():
        for X_batch, y_batch in tqdm(val_loader, desc=f"Epoch {epoch + 1}/{n_epochs} [Validation]"):
            predictions = model(X_batch)
            loss = loss_fn(predictions, y_batch)
            epoch_val_loss += loss.item()

    # Average validation loss for the epoch
    val_losses.append(epoch_val_loss / len(val_loader))
    print(f"Epoch {epoch + 1}: Validation Loss = {val_losses[-1]}")

Epoch 1/40 [Training]:  12%|█▎        | 5/40 [00:34<04:03,  6.97s/it]


KeyboardInterrupt: 

In [None]:
# Plotting results (optional)
plt.style.use('ggplot')
plt.plot(train_losses, label='Train Loss')
plt.plot(val_losses, label='Validation Loss')
plt.legend()
plt.title('Loss During Training')
plt.show()

In [None]:
# Implementing our model as a function
def play_nn(fen, show_move_evaluations=False):
    # We can create a python-chess board instance from the FEN string like this:
    board = chess.Board(fen=fen)

    # And then evaluate all legal moves
    moves = []
    input_vectors = []
    for move in board.legal_moves:
        # For each move, we'll make a copy of the board and try that move out
        candidate_board = board.copy()
        candidate_board.push(move)
        moves.append(move)
        input_vectors.append(EncodingTools.encode_board(str(candidate_board)).astype(np.int32).flatten())

    input_vectors = np.stack(input_vectors)
    # This is where our model gets to shine! It tells us how good the resultant score board is for black:
    scores = model.predict(input_vectors, verbose=0)
    # argmax gives us the index of the highest scoring move
    if board.turn == chess.BLACK:
        index_of_best_move = np.argmax(scores)
    else:
        # If we're playing as white, we want black's score to be as small as possible, so we take argmax of the negative of our array
        index_of_best_move = np.argmax(-scores)

    if show_move_evaluations:
        print(zip(moves, scores))

    best_move = moves[index_of_best_move]

    # Now we turn our move into a string, return it and call it a day!
    return str(best_move)

In [None]:
# Now we'll import our test set, and make some final predictions!

test_df = pd.read_csv('datasets/test.csv')

test_df.head()

In [None]:
# Making all of our predictions happens in this one line!
# We're basically saying "run play_nn on all the boards in the test_df, and then keep the results as best_move"
# Because this invovles running our model a _ton_ this step will take a while.

test_df['best_move'] = test_df['board'].apply(play_nn)

In [None]:
test_df['best_move']

In [None]:
# Let's make sure our submission looks like the sample submission
submission = test_df[['id', 'best_move']]
print(submission.head())

sample_submission = pd.read_csv('datasets/sample_submission.csv', index_col='id')
print(sample_submission.head())

In [None]:
# We should not output the submission file
# submission.to_csv('submission.csv', index=False)