In [None]:
"""
Training of a transformer to play chess. Most effort went into variable naming tbh.
"""

In [None]:
!pip install datasets torch

In [2]:
# imports
import torch
from datasets import load_dataset
from tqdm import tqdm
import json
from torch.utils.data import DataLoader, TensorDataset, random_split
import torch.nn as nn
import math

In [3]:
ds = load_dataset("conacts/stockfish_dataset")
ds

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


In [5]:
#!mkdir -p stockfish_data

# Save the dataset to the local directory
#ds.save_to_disk("./stockfish_data")

In [10]:
train = ds['train']
games = train

#Dataset handling
def extract_moves(game_data):
    if isinstance(game_data, str):
        return game_data.split()
    elif isinstance(game_data, dict) and 'moves' in game_data:
        return game_data['moves'].split()
    else:
        raise TypeError(f"Unexpected game_data type: {type(game_data)}. Expected a dictionary with a 'moves' key or a string.")



unique_moves = set()

# progress bar is nice
for game in tqdm(games, desc="Processing games"):
    try:
        moves = extract_moves(game)
        unique_moves.update(moves)
        len(unique_moves)

    except TypeError as e:
        print(f"Skipping invalid game data: {e}")

# Build vocab
vocab = {move: idx for idx, move in enumerate(unique_moves)}
len(vocab)


Processing games:   1%|▏         | 99998/7896048 [00:06<08:04, 16091.69it/s]


8243

In [11]:
# Save vocab as json
with open('vocab.json', 'w') as f:
  json.dump(vocab, f)

In [13]:
def tokenize_moves(moves, vocab):
    return [vocab[move] for move in moves]

#Example:
tokenized_moves = tokenize_moves(moves, vocab)
print(tokenized_moves[:10])


8243
[1305, 2983, 1419, 8238, 6696, 2636, 4103, 4989, 4337, 3201]


In [15]:
def prepare_data(tokenized_moves, seq_len):
    data = []
    for i in range(len(tokenized_moves) - seq_len):
        src = torch.tensor(tokenized_moves[i:i+seq_len], dtype=torch.long)
        tgt = torch.tensor(tokenized_moves[i+1:i+seq_len+1], dtype=torch.long)
        data.append((src, tgt))
    return data

In [16]:
# Tuneable hyperparams
D_MODEL = 512
NHEAD = 8
NUM_LAYERS = 8
DIM_FEEDFORWARD = 4096
DROPOUT = 0.2
LEARNING_RATE = 0.0001
WEIGHT_DECAY = 1e-4
STEP_SIZE = 5
GAMMA = 0.1
MAX_GRAD_NORM = 1.0
BATCH_SIZE = 64
EPOCHS = 30
SEQ_LEN = 15
TRAIN_SPLIT = 0.8
ACCUMULATION_STEPS = 4

class PositionalEncoding(nn.Module):
    def __init__(self, d_model, dropout=DROPOUT, max_len=5000):
        super(PositionalEncoding, self).__init__()
        self.dropout = nn.Dropout(p=dropout)

        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0).transpose(0, 1)
        self.register_buffer('pe', pe)

    def forward(self, x):
        x = x + self.pe[:x.size(0), :]
        return self.dropout(x)

class PreNormTransformerEncoderLayer(nn.Module):
    def __init__(self, d_model, nhead, dim_feedforward=DIM_FEEDFORWARD, dropout=DROPOUT):
        super(PreNormTransformerEncoderLayer, self).__init__()
        self.self_attn = nn.MultiheadAttention(d_model, nhead, dropout=dropout)
        self.linear1 = nn.Linear(d_model, dim_feedforward)
        self.dropout = nn.Dropout(dropout)
        self.linear2 = nn.Linear(dim_feedforward, d_model)

        #pre layer normalization
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)

        # gelu activation
        self.activation = nn.GELU()

        self.alpha = nn.Parameter(torch.ones(1))

    def forward(self, src):
        src2 = self.self_attn(self.norm1(src), self.norm1(src), self.norm1(src))[0]
        src = src + self.alpha * self.dropout(src2)

        src2 = self.linear2(self.dropout(self.activation(self.linear1(self.norm2(src)))))
        src = src + self.alpha * self.dropout(src2)

        return src

class ChessTransformer(nn.Module):
    def __init__(self, vocab_size, d_model=D_MODEL, nhead=NHEAD, num_layers=NUM_LAYERS, dropout=DROPOUT):
        super(ChessTransformer, self).__init__()
        self.d_model = d_model
        self.embedding = nn.Embedding(vocab_size, d_model)
        self.pos_encoder = PositionalEncoding(d_model, dropout)

        transformer_layers = nn.TransformerEncoderLayer(d_model, nhead, dim_feedforward=DIM_FEEDFORWARD, dropout=dropout)
        self.transformer = nn.TransformerEncoder(transformer_layers, num_layers)
        self.fc_out = nn.Linear(d_model, vocab_size)
        self.dropout = nn.Dropout(dropout)

    def forward(self, src):
        src = self.embedding(src) * math.sqrt(self.d_model)
        src = self.pos_encoder(src)
        output = self.transformer(src)
        output = self.fc_out(output)
        return output

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Init model, loss function, and optimizer with wd
model = ChessTransformer(vocab_size=len(vocab), d_model=D_MODEL, nhead=NHEAD, num_layers=NUM_LAYERS, dropout=DROPOUT).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.AdamW(model.parameters(), lr=LEARNING_RATE, weight_decay=WEIGHT_DECAY)

#lr scheduler
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=EPOCHS)

def create_data_loader(data, batch_size=BATCH_SIZE):
    srcs, tgts = zip(*data)
    srcs = torch.stack(srcs)
    tgts = torch.stack(tgts)
    dataset = TensorDataset(srcs, tgts)
    return DataLoader(dataset, batch_size=batch_size, shuffle=True)

train_data = prepare_data(tokenized_moves, seq_len=SEQ_LEN)

#Valid/train Split
train_size = int(TRAIN_SPLIT * len(train_data))
valid_size = len(train_data) - train_size
train_data, valid_data = random_split(train_data, [train_size, valid_size])

# Prepare dls
train_loader = create_data_loader(train_data, batch_size=BATCH_SIZE)
valid_loader = create_data_loader(valid_data, batch_size=BATCH_SIZE)

# Training loop
def train_model(model, train_loader, valid_loader, epochs=EPOCHS):
    for epoch in range(epochs):

        model.train()
        total_train_loss = 0
        correct_train_predictions = 0
        total_train_predictions = 0

        for i, batch in enumerate(train_loader):
            src, tgt = batch
            src, tgt = src.to(device), tgt.to(device)

            optimizer.zero_grad()
            output = model(src)
            loss = criterion(output.view(-1, len(vocab)), tgt.view(-1))
            loss = loss / ACCUMULATION_STEPS
            loss.backward()  # backprop

            if (i + 1) % ACCUMULATION_STEPS == 0:
                # gradient clipping
                torch.nn.utils.clip_grad_norm_(model.parameters(), MAX_GRAD_NORM)
                optimizer.step()

            total_train_loss += loss.item() * ACCUMULATION_STEPS

            #accuracy on training set
            predictions = output.argmax(dim=-1)
            correct_train_predictions += (predictions == tgt).sum().item()
            total_train_predictions += tgt.numel()

        avg_train_loss = total_train_loss / len(train_loader)
        train_accuracy = correct_train_predictions / total_train_predictions

        # Validation
        model.eval()
        total_valid_loss = 0
        correct_valid_predictions = 0
        total_valid_predictions = 0

        with torch.no_grad():
            for batch in valid_loader:
                src, tgt = batch
                src, tgt = src.to(device), tgt.to(device)

                output = model(src)
                loss = criterion(output.view(-1, len(vocab)), tgt.view(-1))
                total_valid_loss += loss.item()


                predictions = output.argmax(dim=-1)
                correct_valid_predictions += (predictions == tgt).sum().item()
                total_valid_predictions += tgt.numel()

        avg_valid_loss = total_valid_loss / len(valid_loader)
        valid_accuracy = correct_valid_predictions / total_valid_predictions


        optimizer.step()
        scheduler.step()

        print(f"Epoch {epoch+1}/{epochs}, "
              f"Train Loss: {avg_train_loss:.4f}, Train Accuracy: {train_accuracy:.4f}, "
              f"Valid Loss: {avg_valid_loss:.4f}, Valid Accuracy: {valid_accuracy:.4f}")

# train
train_model(model, train_loader, valid_loader, epochs=EPOCHS)




Epoch 1/30, Train Loss: 9.2072, Train Accuracy: 0.0000, Valid Loss: 9.1963, Valid Accuracy: 0.0000


KeyboardInterrupt: 

In [18]:
#test set
test_data = ds['test']

test_loader = create_data_loader(test_data, batch_size=64)


#Evaulation on test set
def evaluate_model(model, test_loader):
    model.eval()
    total_test_loss = 0
    correct_test_predictions = 0
    total_test_predictions = 0

    with torch.no_grad():
        for batch in test_loader:
            src, tgt = batch
            src, tgt = src.to(device), tgt.to(device)

            output = model(src)
            loss = criterion(output.view(-1, len(vocab)), tgt.view(-1))
            total_test_loss += loss.item()

            # calculating accuracy on test set
            predictions = output.argmax(dim=-1)
            correct_test_predictions += (predictions == tgt).sum().item()
            total_test_predictions += tgt.numel()

    avg_test_loss = total_test_loss / len(test_loader)
    test_accuracy = correct_test_predictions / total_test_predictions

    print(f"Test Loss: {avg_test_loss:.4f}, Test Accuracy: {test_accuracy:.4f}")


evaluate_model(model, test_loader)

ValueError: too many values to unpack (expected 2)

In [None]:
# Save model
torch.save(model.state_dict(), "chess_transformer.pth")