# Dataset

In [1]:
from kaggle.api.kaggle_api_extended import KaggleApi

# Instantiate the Kaggle API
api = KaggleApi()

# Download the dataset
api.dataset_download_files('datasnaek/chess', path='../data', unzip=True)

Dataset URL: https://www.kaggle.com/datasets/datasnaek/chess


In [2]:
import pandas as pd 

data = pd.read_csv('../data/games.csv')
print(len(data))

20058


In [3]:
data['moves_split'] = data['moves'].str.split()
data['moves_length'] = data['moves_split'].apply(len)
total_length = data['moves_length'].sum()
print(total_length)

1212827


In [4]:
data.head(1)

Unnamed: 0,id,rated,created_at,last_move_at,turns,victory_status,winner,increment_code,white_id,white_rating,black_id,black_rating,moves,opening_eco,opening_name,opening_ply,moves_split,moves_length
0,TZJHLljE,False,1504210000000.0,1504210000000.0,13,outoftime,white,15+2,bourgris,1500,a-00,1191,d4 d5 c4 c6 cxd5 e6 dxe6 fxe6 Nf3 Bb4+ Nc3 Ba5...,D10,Slav Defense: Exchange Variation,5,"[d4, d5, c4, c6, cxd5, e6, dxe6, fxe6, Nf3, Bb...",13


# Preprocessing

## ECO Grouping

In [5]:
grouped_eco_labels = {
    'A00': 'Polish (Sokolsky) opening',
    'A01': 'Nimzovich-Larsen attack',
    'A02-A03': "Bird's opening",
    'A04-A09': 'Reti opening',
    'A10-A39': 'English opening',
    'A40-A41': "Queen's pawn",
    'A42': 'Modern defence, Averbakh system',
    'A43-A44': 'Old Benoni defence',
    'A45-A46': "Queen's pawn game",
    'A47': "Queen's Indian defence",
    'A48-A49': "King's Indian, East Indian defence",
    'A50': "Queen's pawn game",
    'A51-A52': 'Budapest defence',
    'A53-A55': 'Old Indian defence',
    'A56': 'Benoni defence',
    'A57-A59': 'Benko gambit',
    'A60-A79': 'Benoni defence',
    'A80-A99': 'Dutch',
    'B00': "King's pawn opening",
    'B01': 'Scandinavian (centre counter) defence',
    'B02-B05': "Alekhine's defence",
    'B06': 'Robatsch (modern) defence',
    'B07-B09': 'Pirc defence',
    'B10-B19': 'Caro-Kann defence',
    'B20-B99': 'Sicilian defence',
    'C00-C19': 'French defence',
    'C20': "King's pawn game",
    'C21-C22': 'Centre game',
    'C23-C24': "Bishop's opening",
    'C25-C29': 'Vienna game',
    'C30-C39': "King's gambit",
    'C40': "King's knight opening",
    'C41': "Philidor's defence",
    'C42-C43': "Petrov's defence",
    'C44': "King's pawn game",
    'C45': "Scotch game",
    'C46': "Three knights game",
    'C47-C49': "Four knights, Scotch variation",
    'C50': "Italian Game",
    'C51-C52': "Evans gambit",
    'C53-C54': "Giuoco Piano",
    'C55-C59': "Two knights defence",
    'C60-C99': "Ruy Lopez (Spanish opening)",
    'D00': "Queen's pawn game",
    'D01': 'Richter-Veresov attack',
    'D02': "Queen's pawn game",
    'D03': 'Torre attack (Tartakower variation)',
    'D04-D05': "Queen's pawn game",
    'D06': "Queen's Gambit",
    'D07-D09': "Queen's Gambit Declined, Chigorin defence",
    'D10-D15': "Queen's Gambit Declined Slav defence",
    'D16': "Queen's Gambit Declined Slav accepted, Alapin variation",
    'D17-D19': "Queen's Gambit Declined Slav, Czech defence",
    'D20-D29': "Queen's gambit accepted",
    'D30-D42': "Queen's gambit declined",
    'D43-D49': "Queen's Gambit Declined semi-Slav",
    'D50-D69': "Queen's Gambit Declined",
    'D70-D79': 'Neo-Gruenfeld defence',
    'D80-D99': 'Gruenfeld defence',
    'E00': "Queen's pawn game",
    'E01-E09': 'Catalan, closed',
    'E10': "Queen's pawn game",
    'E11': 'Bogo-Indian defence',
    'E12-E19': "Queen's Indian defence",
    'E20-E59': 'Nimzo-Indian defence',
    'E60-E99': "King's Indian defence"
}

In [6]:
def map_eco_to_grouped_label(eco_code):
    for key, value in grouped_eco_labels.items():
        if '-' in key:
            start, end = key.split('-')
            if start <= eco_code <= end:
                return value
        elif eco_code == key:
            return value
    return 'Other'

data['grouped_opening'] = data['opening_eco'].apply(map_eco_to_grouped_label)

## llegal Moves Check

In [7]:
import chess

def count_illegal_moves(moves):
    board = chess.Board()
    illegal_count = 0
    for move in moves.split():
        try:
            board.push_san(move)
        except:
            illegal_count += 1
            break
    return illegal_count


data['illegal_moves'] = data['moves'].apply(count_illegal_moves)
illegal_moves_count = data['illegal_moves'].sum()
print(f"Total illegal moves detected: {illegal_moves_count}")

Total illegal moves detected: 0


## Filter Dataset
- Illegal Moves
- Less than 10 Opening Moves Occuring

In [8]:
data = data[data['illegal_moves'] == 0]

grouped_opening_counts = data['grouped_opening'].value_counts()

valid_openings = grouped_opening_counts[grouped_opening_counts >= 10].index

filtered_data = data[data['grouped_opening'].isin(valid_openings)]

data = filtered_data

## Generate 4D Board States

In [9]:
import numpy as np
from tqdm import tqdm
tqdm.pandas()

def generate_4d_board_states(moves):
    board = chess.Board()
    piece_type_map = {'P': 0, 'N': 1, 'B': 2, 'R': 3, 'Q': 4, 'K': 5,
                      'p': 6, 'n': 7, 'b': 8, 'r': 9, 'q': 10, 'k': 11}
    board_states = []

    for move in moves.split():
        board.push_san(move)
        board_state = np.zeros((8, 8, 12))
        for square in chess.SQUARES:
            piece = board.piece_at(square)
            if piece:
                rank, file = chess.square_rank(square), chess.square_file(square)
                piece_type = piece_type_map[str(piece)]
                board_state[rank, file, piece_type] = 1
        board_states.append(board_state)

    return board_states

data['board_states_4d'] = data['moves'].progress_apply(generate_4d_board_states)

100%|██████████| 20033/20033 [00:42<00:00, 468.13it/s]


In [10]:
data.head(1)

Unnamed: 0,id,rated,created_at,last_move_at,turns,victory_status,winner,increment_code,white_id,white_rating,...,black_rating,moves,opening_eco,opening_name,opening_ply,moves_split,moves_length,grouped_opening,illegal_moves,board_states_4d
0,TZJHLljE,False,1504210000000.0,1504210000000.0,13,outoftime,white,15+2,bourgris,1500,...,1191,d4 d5 c4 c6 cxd5 e6 dxe6 fxe6 Nf3 Bb4+ Nc3 Ba5...,D10,Slav Defense: Exchange Variation,5,"[d4, d5, c4, c6, cxd5, e6, dxe6, fxe6, Nf3, Bb...",13,Queen's Gambit Declined Slav defence,0,"[[[[0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0.], [0. ..."


# Write to pickle

In [11]:
import pickle
import os

reduced_data = data[['moves','moves_length','board_states_4d', 'grouped_opening']]

# Create the directory if it doesn't exist
os.makedirs('./data', exist_ok=True)

# Save the reduced_data dataframe as a pickle file
with open('./data/reduced_data.pkl', 'wb') as f:
    pickle.dump(reduced_data, f)

In [12]:
import gc

del data
del filtered_data
del reduced_data

gc.collect()

0

# Load Pickle

In [1]:
import pickle

# Load the pickle file
with open('./data/reduced_data.pkl', 'rb') as f:
    reduced_data_loaded = pickle.load(f)

# Print the loaded data
reduced_data_loaded.head(5)
reduced_data_loaded = reduced_data_loaded.head(2000)

In [2]:
print(len(reduced_data_loaded))

2000


In [3]:
print(len(reduced_data_loaded['board_states_4d'][0]))
print(reduced_data_loaded['board_states_4d'][0][0].shape)

13
(8, 8, 12)


# Flatten 4D Board States

In [4]:
import numpy as np

def convert_board_states(row):
    board_states = np.array(row['board_states_4d'])
    assert board_states.shape == (row['moves_length'], 8, 8, 12), "Shape mismatch"
    return board_states

# Apply the conversion function
reduced_data_loaded['board_states_array'] = reduced_data_loaded.apply(convert_board_states, axis=1)

In [11]:
from sklearn.preprocessing import LabelEncoder

X = reduced_data_loaded['board_states_array'].values
y = reduced_data_loaded['grouped_opening'].values

# Initialize the LabelEncoder
label_encoder = LabelEncoder()

# Fit and transform the labels
y_encoded = label_encoder.fit_transform(y)

# Convert to a numpy array of type float32 (or int64, depending on your use case)
y_encoded = np.array(y_encoded, dtype=np.float32)

# Converting to Torch

## Dataset

In [15]:
import torch
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split

class ChessDataset4D(Dataset):
    def __init__(self, X, y):
        # Convert X from numpy objects to list of tensors
        self.X = [torch.tensor(x, dtype=torch.float32).view(-1, 8 * 8 * 12) for x in X]
        self.y = torch.tensor(y, dtype=torch.float32)
        
    def __len__(self):
        return len(self.X)
    
    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]
    
def collate_fn(batch):
    X_batch, y_batch = zip(*batch)
    X_batch = torch.nn.utils.rnn.pad_sequence([torch.tensor(x, dtype=torch.float32) for x in X_batch], batch_first=True)
    y_batch = torch.stack([torch.tensor(y, dtype=torch.float32) for y in y_batch])
    return X_batch, y_batch

# Assuming X and y_encoded are your full dataset
# Split the data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X, y_encoded, test_size=0.2, random_state=42)

# Convert the splits into datasets and dataloaders
train_dataset = ChessDataset4D(X_train, y_train)
test_dataset = ChessDataset4D(X_test, y_test)

train_dataloader = DataLoader(train_dataset, batch_size=32, collate_fn=collate_fn, shuffle=True)
test_dataloader = DataLoader(test_dataset, batch_size=32, collate_fn=collate_fn)


## Model

In [16]:
import torch.nn as nn

class LSTMModel(nn.Module):
    def __init__(self, input_size, hidden_size, output_size, num_layers=1):
        super(LSTMModel, self).__init__()
        self.lstm = nn.LSTM(input_size=input_size, hidden_size=hidden_size, num_layers=num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)
        
    def forward(self, x):
        # x: (batch_size, sequence_length, input_size)
        packed_x = nn.utils.rnn.pack_padded_sequence(x, lengths=[len(seq) for seq in x], batch_first=True, enforce_sorted=False)
        lstm_out, _ = self.lstm(packed_x)
        unpacked_x, _ = nn.utils.rnn.pad_packed_sequence(lstm_out, batch_first=True)
        out = self.fc(unpacked_x[:, -1, :])
        return out

# Number of classes (unique labels)
num_classes = len(np.unique(y_encoded))

# Define parameters
input_size = 8 * 8 * 12  # Flattened size of (8, 8, 12)
hidden_size = 128
output_size = num_classes  # Number of classes

model = LSTMModel(input_size=input_size, hidden_size=hidden_size, output_size=output_size)

## Training

In [17]:
import torch.optim as optim

# Define loss function and optimizer
criterion = nn.CrossEntropyLoss()  # For classification tasks
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Training loop
num_epochs = 10

for epoch in range(num_epochs):
    for batch_X, batch_y in train_dataloader:
        # Forward pass
        outputs = model(batch_X)
        loss = criterion(outputs, batch_y.long())  # CrossEntropyLoss expects target as long type
        
        # Backward and optimize
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    
    print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}')

  X_batch = torch.nn.utils.rnn.pad_sequence([torch.tensor(x, dtype=torch.float32) for x in X_batch], batch_first=True)
  y_batch = torch.stack([torch.tensor(y, dtype=torch.float32) for y in y_batch])


Epoch [1/10], Loss: 3.2785
Epoch [2/10], Loss: 3.3040
Epoch [3/10], Loss: 3.3067
Epoch [4/10], Loss: 3.3455
Epoch [5/10], Loss: 3.1915
Epoch [6/10], Loss: 3.5314
Epoch [7/10], Loss: 3.5056
Epoch [8/10], Loss: 3.3867
Epoch [9/10], Loss: 3.0815
Epoch [10/10], Loss: 3.2758


In [18]:
def evaluate_model(model, dataloader):
    model.eval()  # Set the model to evaluation mode
    correct = 0
    total = 0
    with torch.no_grad():  # Disable gradient computation
        for batch_X, batch_y in dataloader:
            outputs = model(batch_X)
            _, predicted = torch.max(outputs, 1)  # Get the index of the max log-probability
            total += batch_y.size(0)
            correct += (predicted == batch_y.long()).sum().item()
    accuracy = 100 * correct / total
    return accuracy

# Evaluate the model
accuracy = evaluate_model(model, test_dataloader)
print(f'Accuracy on test data: {accuracy:.2f}%')

  X_batch = torch.nn.utils.rnn.pad_sequence([torch.tensor(x, dtype=torch.float32) for x in X_batch], batch_first=True)
  y_batch = torch.stack([torch.tensor(y, dtype=torch.float32) for y in y_batch])


Accuracy on test data: 13.00%


# Old

In [None]:
import numpy as np

def flatten_board_states_to_disk(df, board_states_column, output_file, batch_size=1000):
    with open(output_file, 'wb') as f:
        for i in range(0, len(df), batch_size):
            batch = df[board_states_column][i:i+batch_size]
            flattened_batch = np.array([state.flatten() for game in batch for state in game])
            np.save(f, flattened_batch)


flatten_board_states_to_disk(reduced_data_loaded, 'board_states_4d', './data/flattened_states.npy')
X_4d = np.load('./data/flattened_states.npy')

In [None]:
def flatten_board_states_to_disk(df, board_states_column, output_file, batch_size=1000):
    for i in range(0, len(df), batch_size):
        batch = df[board_states_column][i:i+batch_size]
        flattened_batch = np.array([state.flatten() for game in batch for state in game])
        
        if i == 0:
            # If this is the first batch, create a new file
            np.save(output_file, flattened_batch)
        else:
            # Append the new batch to the existing file
            with open(output_file, 'ab') as f:  # 'ab' mode opens the file for binary appending
                np.save(f, flattened_batch)

# Example usage
flatten_board_states_to_disk(reduced_data_loaded, 'board_states_4d', './data/flattened_states.npy')

# Loading the data
with open('./data/flattened_states.npy', 'rb') as f:
    X_4d = []
    while True:
        try:
            X_4d.append(np.load(f))
        except:  # This will be raised when reaching the end of the file
            break
    X_4d = np.concatenate(X_4d)

In [None]:
y = np.repeat(reduced_data_loaded['grouped_opening'], reduced_data_loaded['moves'].apply(lambda x: len(x.split())))

In [None]:
print(X_4d.shape, y.shape)

(1211088, 768) (1211088,)
