# Chess Neural Network

Building a neural network that learns to play chess based on my gameplay data.

## Goal
Train a move prediction model to create an AI that plays like me.




In [32]:
# Necessary libraries
import pandas as pd
import numpy as np
import ast
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, SubsetRandomSampler, SequentialSampler
from sklearn.model_selection import TimeSeriesSplit

## Step 2: Load Chess Data

Load cleaned data from csv file


In [33]:
df = pd.read_csv("/Users/riteshbhandari/Documents/Dokumentit – Ritesh - MacBook Pro/GitHub/Chess-engine/src/data-analysis/cleaned_data.csv")
df.head()

Unnamed: 0,game_id,moves,num_moves,first_move
0,123118274906,"[('white', 'e4', True), ('black', 'e6', False)...",90,e4
1,123118510404,"[('white', 'd4', False), ('black', 'c5', True)...",141,d4
2,123118790014,"[('white', 'e4', False), ('black', 'e5', True)...",46,e4
3,123158939328,"[('white', 'e4', False), ('black', 'd5', True)...",88,e4
4,123160166430,"[('white', 'e4', True), ('black', 'e5', False)...",110,e4


# Step 2.1:  Further Pre-processing


In [34]:
# convert string representation of list to actual list
df["moves"] = df["moves"].apply(ast.literal_eval)

# saving all the moves to single list to be encoded 
every_move = []
for game in df["moves"]:
    for move in game:
        every_move.append(move[1])

every_move[:10]

['e4', 'e6', 'd4', 'Qh4', 'Nc3', 'f5', 'Nf3', 'Qe7', 'e5', 'Qb4']

In [35]:
# getting all the unique moves
unique_moves = set(every_move)  # just the unique moves
print("Number of different moves:", len(unique_moves))
print()

# give move a number
move_to_number = {}

# turning integer back to moves (for future use)
number_to_move= {}

for i, move in enumerate(unique_moves):
    move_to_number[move] = i
    number_to_move[i] = move

# turning all the numbers into integers
number_moves = []
for move in every_move:
    number_moves.append(move_to_number[move])

# first 10 moves
print("First 10 moves as numbers: ")
print(number_moves[:10])
print()

# first 10 original moves
print("First 10 original moves: ")
print(every_move[:10]) 

Number of different moves: 1927

First 10 moves as numbers: 
[1448, 1115, 350, 5, 1349, 1840, 1022, 703, 1359, 1263]

First 10 original moves: 
['e4', 'e6', 'd4', 'Qh4', 'Nc3', 'f5', 'Nf3', 'Qe7', 'e5', 'Qb4']


In [36]:
# Create colors per game
colors_per_game = []

for game in df["moves"]:
    
    game_colors = []
    for move in game:
        if move[0] == "white":
            game_colors.append(1)
        else:  # black
            game_colors.append(0)
    colors_per_game.append(game_colors)

# Add to DataFrame
df["colors"] = colors_per_game

# Check first row
print(df[["moves", "colors"]].iloc[0])


moves     [(white, e4, True), (black, e6, False), (white...
colors    [1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, ...
Name: 0, dtype: object


In [37]:
# turn white or black to integers ( 1  = (White), 0 = (Black) )
teoriat_moves_per_game = []

for game in df["moves"]:
    
    game_teoriat = []
    for move in game:
        if move[2] == True:
            game_teoriat.append(1)
        else:
            game_teoriat.append(0)
    teoriat_moves_per_game.append(game_teoriat)

# Add to DataFrame
df["teoriat_moves"] = teoriat_moves_per_game

# Check first row
print(df[["moves", "teoriat_moves"]].iloc[0])

moves            [(white, e4, True), (black, e6, False), (white...
teoriat_moves    [1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, ...
Name: 0, dtype: object


In [38]:
#combine all the features into a single list of tuples seperated by games 
combined_features_per_game = []

for game_idx in range(len(df)):
    
    game_combined = []
    moves = df["moves"].iloc[game_idx]
    colors = df["colors"].iloc[game_idx]
    teoriat = df["teoriat_moves"].iloc[game_idx]
    
    for i in range(len(moves)):
        # (color, move_as_integer, your_move)
        game_combined.append((
            colors[i],
            move_to_number[moves[i][1]],  # convert move to integer
            teoriat[i]
        ))
    # Append the combined features for the game
    combined_features_per_game.append(game_combined)

# Add to DataFrame
df["game_data"] = combined_features_per_game

# Check first game data
print(combined_features_per_game[0])

[(1, 1448, 1), (0, 1115, 0), (1, 350, 1), (0, 5, 0), (1, 1349, 1), (0, 1840, 0), (1, 1022, 1), (0, 703, 0), (1, 1359, 1), (0, 1263, 0), (1, 178, 1), (0, 939, 0), (1, 1040, 1), (0, 1642, 0), (1, 1229, 1), (0, 1919, 0), (1, 63, 1), (0, 1695, 0), (1, 228, 1), (0, 269, 0), (1, 386, 1), (0, 1638, 0), (1, 1379, 1), (0, 1542, 0), (1, 1622, 1), (0, 672, 0), (1, 573, 1), (0, 926, 0), (1, 127, 1), (0, 1025, 0), (1, 386, 1), (0, 235, 0), (1, 445, 1), (0, 678, 0), (1, 250, 1), (0, 741, 0), (1, 538, 1), (0, 1411, 0), (1, 1575, 1), (0, 352, 0), (1, 945, 1), (0, 988, 0), (1, 1049, 1), (0, 1019, 0), (1, 250, 1), (0, 343, 0), (1, 941, 1), (0, 1664, 0), (1, 8, 1), (0, 876, 0), (1, 795, 1), (0, 210, 0), (1, 812, 1), (0, 1695, 0), (1, 1717, 1), (0, 265, 0), (1, 250, 1), (0, 129, 0), (1, 1575, 1), (0, 995, 0), (1, 1077, 1), (0, 849, 0), (1, 1540, 1), (0, 681, 0), (1, 1602, 1), (0, 1652, 0), (1, 689, 1), (0, 395, 0), (1, 1077, 1), (0, 1652, 0), (1, 689, 1), (0, 395, 0), (1, 211, 1), (0, 1785, 0), (1, 1521, 

In [39]:
# lets drop the columns with unneccessary data 
df = df.drop(columns=["moves", "colors", "teoriat_moves", "first_move","num_moves"])
df.head()

Unnamed: 0,game_id,game_data
0,123118274906,"[(1, 1448, 1), (0, 1115, 0), (1, 350, 1), (0, ..."
1,123118510404,"[(1, 350, 0), (0, 77, 1), (1, 606, 0), (0, 135..."
2,123118790014,"[(1, 1448, 0), (0, 1359, 1), (1, 1022, 0), (0,..."
3,123158939328,"[(1, 1448, 0), (0, 606, 1), (1, 1359, 0), (0, ..."
4,123160166430,"[(1, 1448, 1), (0, 1359, 0), (1, 1923, 1), (0,..."


## Step 2.2: Data Pipeline

### Dataset Class
Organize chess games into training sequences with sliding windows and padding.

In [40]:
unique_moves = set()
for game in df["game_data"]:
    for (color, move, your_move) in game:
        unique_moves.add(move)

print(f"Number of unique moves: {len(unique_moves)}")
print(f"Move range: {min(unique_moves)} to {max(unique_moves)}")

Number of unique moves: 1927
Move range: 0 to 1926


In [41]:
VOCAB_SIZE = 1928 # 1927 moves and one padding token
MAX_SEQUENCE_LENGTH = 6 # for now 
PAD_TOKEN = 1927 # for not known moves

class chessdataset(Dataset):
    
    def __init__(self, games_data, max_seq_len = 6):
        self.max_seq_len = max_seq_len
        self.pad_token = PAD_TOKEN
        self.sequences = []
        self.targets = []
        
        # loop through each games 
        for game in games_data:
            for i in  range(len(game) -1):
                start_idx = max(0,i-5)
                sequence = game[start_idx : i+1]  
            
                while len(sequence) <6:
                    sequence.insert(0,(0,1927,0))
                   
                target = game[i+1][1] # the real seventh move 
                self.sequences.append(sequence)
                self.targets.append(target)



    # training samples
    def __len__ (self):
        return len(self.targets)

        
      # creating batches           
    def __getitem__ (self, idx):
        sequence = self.sequences[idx]
        target = self.targets[idx]

        # saving training samples seperately to it's own category
        colors = [move_tuple[0] for move_tuple in sequence]
        moves = [move_tuple[1] for move_tuple in sequence]
        theory = [move_tuple[2] for move_tuple in sequence]
        
         # total 60 600 training samples
        return {
            'colors': torch.tensor(colors),
            'moves': torch.tensor(moves),
            'theory': torch.tensor(theory),
            'target': torch.tensor(target), 
    }                

In [42]:
# Create the dataset
dataset = chessdataset(df['game_data'])

# Check how many training examples
print(f"Total training examples: {len(dataset)}")

# Get one example
example = dataset[0]
print(f"\nFirst training example:")
print(f"Colors: {example['colors']}")
print(f"Moves: {example['moves']}")
print(f"Theory: {example['theory']}")
print(f"Target: {example['target']}")

print(f"\nShapes:")
print(f"Colors shape: {example['colors'].shape}")
print(f"Moves shape: {example['moves'].shape}")

Total training examples: 60600

First training example:
Colors: tensor([0, 0, 0, 0, 0, 1])
Moves: tensor([1927, 1927, 1927, 1927, 1927, 1448])
Theory: tensor([0, 0, 0, 0, 0, 1])
Target: 1115

Shapes:
Colors shape: torch.Size([6])
Moves shape: torch.Size([6])


### Purge K-Fold Cross-Validation
Setup  Purge cross-validation splits for robust model evaluation and for avoiding data leaks. 

In [43]:
# time based split to 5
n_splits = 5

#TimeSeriesSplit (time order)
tscv = TimeSeriesSplit(n_splits=n_splits)

# gives index range for each fold
splits = tscv.split(range(len(dataset)))

# train test split
fold_num = 1

for split in splits:
    train = split[0]
    test = split[1]


    fold_num += 1

## Step 2.3: Build the RNN Model

In [None]:
# Data parameters
BATCH_SIZE = 64
MAX_SEQUENCE_LENGTH = 6
VOCAB_SIZE = 1928
PAD_TOKEN = 1927

# Training parameters
NUM_EPOCHS = 25
LEARNING_RATE = 0.001
WEIGHT_DECAY = 0.01

# Model architecture
EMBEDDING_DIM = 128
HIDDEN_DIM = 256
NUM_LAYERS = 2
DROPOUT = 0.3

# Model save path
MODEL_SAVE_PATH = 'best_chess_model.pth'

In [None]:
class chessRNN(nn.Module):
    def __init__(
        self,
        vocab_size=1928,
        embedding_dim=128,
        hidden_dim=256,
        num_layers=2,
        dropout=0.3,
        rnn_type="gru"
    ):
        super(chessRNN, self).__init__()
        
        self.move_embedding = nn.Embedding(vocab_size, embedding_dim)
        self.color_embedding = nn.Embedding(2, 6)
        self.theory_embedding = nn.Embedding(2, 6)
        
        input_dim = embedding_dim + 6 + 6
        self.hidden_dim = hidden_dim
        self.num_layers = num_layers
        
        self.rnn = nn.GRU(
            input_size=input_dim,
            hidden_size=hidden_dim,
            num_layers=num_layers,
            dropout=dropout,
            batch_first=True
        )
        
        self.dropout = nn.Dropout(dropout)
        self.fc = nn.Linear(hidden_dim, vocab_size)
    
    def forward(self, colors, moves, theory):
        move_embedded = self.move_embedding(moves)
        color_embedded = self.color_embedding(colors)
        theory_embedded = self.theory_embedding(theory)
        
        combined_embedded = torch.cat([move_embedded, color_embedded, theory_embedded], dim=2)
        
        rnn_output, hidden_state = self.rnn(combined_embedded)
        last_hidden = hidden_state[-1, :, :]
        dropped_output = self.dropout(last_hidden)
        logits = self.fc(dropped_output)
        
        return logits

In [None]:
# Final device setup
if torch.cuda.is_available():
    device = torch.device("cuda")
else:
    device = torch.device("cpu")

print(f"Using device: {device}")

# Initialize loss function 
loss_func = nn.CrossEntropyLoss()

CrossEntropyLoss()

## 2.4 Training the Model 

we will contact training on csc using all the folder, but for testing we will do locally by using just one folder

In [None]:
# Get last fold (Fold 5)
train, test = list(tscv.split(range(len(dataset))))[-1]

print(f"Training examples: {len(train)}")
print(f"Test examples: {len(test)}")

# Create samplers
train_sampler = SubsetRandomSampler(train)
test_sampler = SequentialSampler(test)

# Create DataLoaders
train_loader = DataLoader(dataset, batch_size=BATCH_SIZE, sampler=train_sampler)
test_loader = DataLoader(dataset, batch_size=BATCH_SIZE, sampler=test_sampler)

print(f"Number of training batches: {len(train_loader)}")
print(f"Number of test batches: {len(test_loader)}")

Training examples: 50500
Test examples: 10100
Number of training batches: 790
Number of test batches: 158


In [None]:
def train_epoch(model, dataloader, optimizer, loss_func, device):
    model.train()
    total_loss = 0
    correct = 0
    total = 0

    for batch in dataloader: 
        colors = batch['colors'].to(device)
        moves = batch['moves'].to(device)
        theory = batch['theory'].to(device)
        targets = batch['target'].to(device)

        optimizer.zero_grad()  # Fixed typo: zerp_grad -> zero_grad

        logits = model(colors, moves, theory)
        loss = loss_func(logits, targets)
        loss.backward()
        optimizer.step()

        total_loss += loss.item()
        _, predicted = torch.max(logits, 1)
        correct += (predicted == targets).sum().item()
        total += targets.size(0)
    
    # Added return statement
    return total_loss / len(dataloader), correct / total

In [None]:
def validate(model, dataloader, loss_func, device):
    model.eval()
    total_loss = 0
    correct = 0
    total = 0
    
    with torch.no_grad():
        for batch in dataloader:
            colors = batch['colors'].to(device)
            moves = batch['moves'].to(device)
            theory = batch['theory'].to(device)
            targets = batch['target'].to(device)
            
            logits = model(colors, moves, theory)
            loss = loss_func(logits, targets)
            
            total_loss += loss.item()
            _, predicted = torch.max(logits, 1)
            correct += (predicted == targets).sum().item()
            total += targets.size(0)
    
    return total_loss / len(dataloader), correct / total

In [None]:
# Reinitialize model with hyperparameters
model = chessRNN(
    vocab_size=VOCAB_SIZE,
    embedding_dim=EMBEDDING_DIM,
    hidden_dim=HIDDEN_DIM,
    num_layers=NUM_LAYERS,
    dropout=DROPOUT
).to(device)

optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE, weight_decay=WEIGHT_DECAY)
best_val_acc = 0

print("Starting training...")
print(f"Training on {len(train)} examples, validating on {len(test)} examples")
print(f"Device: {device}\n")

for epoch in range(NUM_EPOCHS):
    train_loss, train_acc = train_epoch(model, train_loader, optimizer, loss_func, device)
    val_loss, val_acc = validate(model, test_loader, loss_func, device)
    
    print(f"Epoch {epoch+1}/{NUM_EPOCHS} | Train Loss: {train_loss:.4f}, Acc: {train_acc:.3f} | Val Loss: {val_loss:.4f}, Acc: {val_acc:.3f}")
    
    if val_acc > best_val_acc:
        best_val_acc = val_acc
        torch.save(model.state_dict(), MODEL_SAVE_PATH)
        print(f"  ✓ Saved new best: {val_acc:.3f}")

print(f"\n{'='*50}")
print(f"Training complete! Best validation accuracy: {best_val_acc:.3f}")
print(f"Model saved to: {MODEL_SAVE_PATH}")