In [None]:
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
import time
from torch.cuda.amp import GradScaler, autocast

In [None]:
final_df = pd.read_csv('final_chess_games.csv')
final_df.info()

In [None]:
final_df = final_df[['Result', 'AN']]
final_df.head()

In [None]:
# Preprocess the data
def preprocess_data(df):
    sequences = []
    for index, row in df.iterrows():
        moves = row['AN'].split()
        sequences.append(moves)
    return sequences

print("Preprocessing data...")
sequences = preprocess_data(final_df)
print("Data preprocessing completed.")

In [None]:
class ChessDataset(Dataset):
    def __init__(self, sequences, vocab, max_length):
        self.sequences = sequences
        self.vocab = vocab
        self.max_length = max_length

    def __len__(self):
        return len(self.sequences)

    def __getitem__(self, idx):
        sequence = self.sequences[idx]
        input_seq = [self.vocab[move] for move in sequence[:-1]]
        target_seq = [self.vocab[move] for move in sequence[1:]]

        # Pad sequences
        input_seq = input_seq + [self.vocab['<pad>']] * (self.max_length - len(input_seq))
        target_seq = target_seq + [self.vocab['<pad>']] * (self.max_length - len(target_seq))

        return torch.tensor(input_seq), torch.tensor(target_seq)

# Build vocabulary
all_moves = [move for seq in sequences for move in seq]
vocab = {move: idx for idx, move in enumerate(set(all_moves))}
vocab['<pad>'] = len(vocab)  # Add padding token
vocab_size = len(vocab)

# Determine the maximum sequence length
max_length = max(len(seq) for seq in sequences) - 1

print("Creating datasets...")
train_sequences, val_sequences = train_test_split(sequences, test_size=0.2, random_state=42)
train_dataset = ChessDataset(train_sequences, vocab, max_length)
val_dataset = ChessDataset(val_sequences, vocab, max_length)
print("Datasets created.")

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)
print("Data loaders initialized.")

In [None]:

class TransformerModel(nn.Module):
    def __init__(self, vocab_size, d_model, nhead, num_encoder_layers, num_decoder_layers, dim_feedforward, dropout=0.1):
        super(TransformerModel, self).__init__()
        self.embedding = nn.Embedding(vocab_size, d_model)
        self.transformer = nn.Transformer(d_model, nhead, num_encoder_layers, num_decoder_layers, dim_feedforward, dropout)
        self.fc = nn.Linear(d_model, vocab_size)

    def forward(self, src, tgt):
        src = self.embedding(src)
        tgt = self.embedding(tgt)
        output = self.transformer(src, tgt)
        output = self.fc(output)
        return output

# Hyperparameters
d_model = 512
nhead = 8
num_encoder_layers = 6
num_decoder_layers = 6
dim_feedforward = 2048
dropout = 0.1

print("Initializing model, loss function, and optimizer...")
model = TransformerModel(vocab_size, d_model, nhead, num_encoder_layers, num_decoder_layers, dim_feedforward, dropout)
criterion = nn.CrossEntropyLoss(ignore_index=vocab['<pad>'])  # Ignore padding token in loss calculation
optimizer = optim.Adam(model.parameters(), lr=0.001)
print("Model, loss function, and optimizer initialized.")

In [None]:
# Training loop
num_epochs = 10
checkpoint_interval = 2  # Save checkpoint every 2 epochs

# Move model to GPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

scaler = GradScaler()

print("Starting training...")
for epoch in range(num_epochs):
    start_time = time.time()
    model.train()
    train_loss = 0
    for batch_idx, (src, tgt) in enumerate(train_loader):
        src, tgt = src.to(device), tgt.to(device)
        optimizer.zero_grad()
        with autocast():
            output = model(src, tgt)
            loss = criterion(output.view(-1, vocab_size), tgt.view(-1))
        scaler.scale(loss).backward()
        scaler.step(optimizer)
        scaler.update()
        train_loss += loss.item()
        if batch_idx % 100 == 0:
            print(f"Epoch {epoch+1}, Batch {batch_idx}, Loss: {loss.item()}")

    elapsed_time = time.time() - start_time
    print(f"Epoch {epoch+1} training completed. Average Loss: {train_loss/len(train_loader)}. Elapsed Time: {elapsed_time:.2f} seconds")

    model.eval()
    val_loss = 0
    with torch.no_grad():
        for src, tgt in val_loader:
            src, tgt = src.to(device), tgt.to(device)
            with autocast():
                output = model(src, tgt)
                loss = criterion(output.view(-1, vocab_size), tgt.view(-1))
            val_loss += loss.item()

    print(f"Epoch {epoch+1}, Validation Loss: {val_loss/len(val_loader)}")

    if (epoch + 1) % checkpoint_interval == 0:
        checkpoint_path = f'transformer_model_epoch_{epoch+1}.pth'
        torch.save(model.state_dict(), checkpoint_path)
        print(f"Checkpoint saved at {checkpoint_path}")

# Save the final model
torch.save(model.state_dict(), 'transformer_model_final.pth')
print("Training completed and model saved.")

In [None]:
def generate_moves(model, start_sequence, max_length=50):
    model.eval()
    input_seq = [vocab[move] for move in start_sequence]
    input_tensor = torch.tensor(input_seq).unsqueeze(1)  # Add batch dimension
    generated_moves = start_sequence

    for _ in range(max_length):
        output = model(input_tensor, input_tensor)
        next_move_idx = output.argmax(dim=-1)[-1].item()
        next_move = list(vocab.keys())[list(vocab.values()).index(next_move_idx)]
        generated_moves.append(next_move)
        input_tensor = torch.cat((input_tensor, torch.tensor([[next_move_idx]])), dim=0)

        if next_move == '#':  # Assuming '#' is the end token
            break

    return generated_moves

# Example usage
start_sequence = ['1.', 'e4', 'e5', '2.', 'Nf3', 'Nc6']
generated_moves = generate_moves(model, start_sequence)
print("Generated moves:", ' '.join(generated_moves))