# Model-1 (Base Gpt2)

In [4]:
import torch
from transformers import GPT2Tokenizer, GPT2LMHeadModel, Trainer, TrainingArguments
from torch.utils.data import Dataset
import pandas as pd
import re

# Enable GPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Load and preprocess data
def parse_chess_games(file_path, limit=25000):
    data = []
    with open(file_path, "r") as file:
        lines = file.readlines()
    for count, line in enumerate(lines):
        if "###" in line:
            if count >= limit:
                break
            raw_moves = line.split("###")[1].strip()
            moves = re.findall(r"\.\s*([^\s]+)", raw_moves)
            if moves:
                game_sequence = " ".join(moves).strip()
                data.append(game_sequence)
    return pd.DataFrame(data, columns=["text"])

# Custom dataset class
class ChessDataset(Dataset):
    def __init__(self, data, tokenizer, max_length=128):
        self.data = data
        self.tokenizer = tokenizer
        self.max_length = max_length

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        text = self.data.iloc[idx]["text"]
        # Tokenize the entire sequence
        tokenized = self.tokenizer(
            text,
            truncation=True,
            padding="max_length",
            max_length=self.max_length,
            return_tensors="pt",
        )
        input_ids = tokenized.input_ids.squeeze()  # Remove batch dimension
        attention_mask = tokenized.attention_mask.squeeze()
        return {"input_ids": input_ids, "attention_mask": attention_mask, "labels": input_ids}

# Load the dataset
file_path = "/content/all_with_filtered_anotations_since1998 copy.txt"
parsed_data = parse_chess_games(file_path)


# Initialize tokenizer and model
tokenizer = GPT2Tokenizer.from_pretrained("gpt2")
# Add the following line to define the padding token
tokenizer.pad_token = tokenizer.eos_token
model = GPT2LMHeadModel.from_pretrained("gpt2").to(device)

# Prepare dataset
dataset = ChessDataset(parsed_data, tokenizer)

# Training arguments
training_args = TrainingArguments(
    output_dir="./results",
    num_train_epochs=1,
    per_device_train_batch_size=4,
    gradient_accumulation_steps=8,
    evaluation_strategy="no",
    save_steps=10_000,
    save_total_limit=2,
    logging_dir="./logs",
)

# Train the model
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=dataset,
)

trainer.train()






Step,Training Loss
500,1.6345


TrainOutput(global_step=778, training_loss=1.5694689517768918, metrics={'train_runtime': 983.8371, 'train_samples_per_second': 25.331, 'train_steps_per_second': 0.791, 'total_flos': 1626281607168000.0, 'train_loss': 1.5694689517768918, 'epoch': 0.9988765848178462})

In [None]:
model.save_pretrained("./fine_tuned_model_BaseModel")
tokenizer.save_pretrained("./fine_tuned_model_BaseModel")

In [None]:
# Predict the next move
def predict_next_move(model, tokenizer, prompt, max_length=50):
    inputs = tokenizer(prompt, return_tensors="pt").to(device)
    outputs = model.generate(
        inputs["input_ids"], max_length=max_length, num_return_sequences=1, pad_token_id=tokenizer.eos_token_id
    )
    return tokenizer.decode(outputs[0], skip_special_tokens=True)

# Example prediction
prompt = "e4 e5 Nf3"
next_move = predict_next_move(model, tokenizer, prompt)
print("Next Move:", next_move)

#Model-2 (LoRA)

In [None]:
import pandas as pd
import re
from transformers import AutoTokenizer, AutoModelForCausalLM, Trainer, TrainingArguments
from peft import LoraConfig, get_peft_model
import torch
from torch.utils.data import Dataset
from accelerate import Accelerator
# Dataset Preparation - Parse Chess Game Moves as Sequences
def parse_chess_games(file_path, limit=25000):
    data = []
    with open(file_path, "r") as file:
        lines = file.readlines()
    for count, line in enumerate(lines):
        if "###" in line:
            if count >= limit:
                break
            raw_moves = line.split("###")[1].strip()
            # Extract moves without prefixes like W1. and B1.
            moves = re.findall(r"\.\s*([^\s]+)", raw_moves)  # Matches moves after a dot
            if not moves:  # Skip if no moves found
                continue
            # Combine all moves into a single sequence
            game_sequence = " ".join(moves).strip()
            if game_sequence:  # Ensure non-empty
                data.append(game_sequence)
    return pd.DataFrame(data, columns=["text"])





# Dataset Class for Language Modeling
class ChessDataset(Dataset):
    def __init__(self, data, tokenizer, max_length=128):
        self.data = data
        self.tokenizer = tokenizer
        self.max_length = max_length

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        text = self.data.iloc[idx]["text"]
        # Tokenize the entire sequence
        tokenized = self.tokenizer(
            text,
            truncation=True,
            padding="max_length",
            max_length=self.max_length,
            return_tensors="pt",
        )
        input_ids = tokenized.input_ids.squeeze()  # Remove batch dimension
        attention_mask = tokenized.attention_mask.squeeze()
        return {"input_ids": input_ids, "attention_mask": attention_mask, "labels": input_ids}

# Load and Prepare Data
file_path = "/content/all_with_filtered_anotations_since1998 copy.txt"
raw_data = parse_chess_games(file_path)
train_data = raw_data.sample(frac=0.8, random_state=42)
val_data = raw_data.drop(train_data.index)
print(f"Dataset Size: {len(train_data)} training samples")
print("Example Data:", train_data.iloc[0])

# Model and Tokenizer Setup
model_name = "gpt2"
tokenizer = AutoTokenizer.from_pretrained(model_name)

# Add padding token if not already present
if tokenizer.pad_token is None:
    tokenizer.pad_token = tokenizer.eos_token

base_model = AutoModelForCausalLM.from_pretrained(model_name)

# LoRA Configuration for Efficient Fine-Tuning
lora_config = LoraConfig(
    r=8, lora_alpha=32, target_modules=["c_attn"], lora_dropout=0.1
)
model = get_peft_model(base_model, lora_config)

# Create Datasets
train_dataset = ChessDataset(train_data, tokenizer)
val_dataset = ChessDataset(val_data, tokenizer)

# Accelerator setup (optional)
accelerator = Accelerator()

# Training Arguments
training_args = TrainingArguments(
    output_dir="./results",
    evaluation_strategy="epoch",
    per_device_train_batch_size=8,
    per_device_eval_batch_size=8,
    num_train_epochs=7,
    learning_rate=5e-5,
    weight_decay=0.01,
    logging_dir="./logs",
    save_strategy="epoch",
    save_total_limit=2,
    # Enable mixed precision for T4 GPU
    fp16=True,
)

# Initialize Trainer (without the accelerator argument)
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=val_dataset,
    tokenizer=tokenizer,
)

# Use accelerator to prepare model and datasets for multi-device/multi-GPU setups
model, train_dataset, val_dataset = accelerator.prepare(
    model, train_dataset, val_dataset
)

# Fine-Tune the Model
trainer.train()

# Save the Fine-Tuned Model
model.save_pretrained("./fine_tuned_model_unsupe")
tokenizer.save_pretrained("./fine_tuned_model_unsupe")


Dataset Size: 19938 training samples
Example Data: text    d4 e6 e4 d5 Nc3 dxe4 Nxe4 Bd7 Nf3 Bc6 Bd3 Nd7 ...
Name: 18780, dtype: object


Prediction with Cuda

In [None]:
def predict_next_move(model, tokenizer, input_moves, max_new_tokens=100):
    # Move model to the GPU
    device = model.device

    # Tokenize the input moves
    inputs = tokenizer(input_moves, return_tensors="pt", padding=True, truncation=True)

    # Move the input tensors to the same device as the model
    input_ids = inputs['input_ids'].to(device)
    attention_mask = inputs['attention_mask'].to(device)  # Ensure attention mask is also on the correct device

    # Ensure pad_token_id is set correctly for open-ended generation
    pad_token_id = tokenizer.pad_token_id if tokenizer.pad_token_id is not None else tokenizer.eos_token_id

    # Generate the next tokens
    outputs = model.generate(
        input_ids=input_ids,
        attention_mask=attention_mask,
        max_new_tokens=max_new_tokens,
        pad_token_id=pad_token_id  # Set pad_token_id
    )

    # Decode and return the predicted moves
    return tokenizer.decode(outputs[0], skip_special_tokens=True)

# Example Usage
input_moves = "W1.e4 B1.c5 W2.Nf3 B2.e6 W3.Nc3 B3.a6 W4.d4 B4.cxd4 W5.Nxd4 B5.Nf6"
predicted_moves = predict_next_move(model, tokenizer, input_moves)
print(f"Predicted moves: {predicted_moves}")


# Loading Saved Model and predicting

In [None]:
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM

# Path to the saved model directory
model_path = "/content/fine_tuned_model_unsupe"

# Load the tokenizer and model
tokenizer = AutoTokenizer.from_pretrained(model_path)
model = AutoModelForCausalLM.from_pretrained(model_path)

# Ensure pad_token_id is correctly set for open-ended generation
if tokenizer.pad_token_id is None:
    tokenizer.pad_token_id = tokenizer.eos_token_id

# Move model to GPU if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)




GPT2LMHeadModel(
  (transformer): GPT2Model(
    (wte): Embedding(50257, 768)
    (wpe): Embedding(1024, 768)
    (drop): Dropout(p=0.1, inplace=False)
    (h): ModuleList(
      (0-11): 12 x GPT2Block(
        (ln_1): LayerNorm((768,), eps=1e-05, elementwise_affine=True)
        (attn): GPT2SdpaAttention(
          (c_attn): lora.Linear(
            (base_layer): Conv1D(nf=2304, nx=768)
            (lora_dropout): ModuleDict(
              (default): Dropout(p=0.1, inplace=False)
            )
            (lora_A): ModuleDict(
              (default): Linear(in_features=768, out_features=8, bias=False)
            )
            (lora_B): ModuleDict(
              (default): Linear(in_features=8, out_features=2304, bias=False)
            )
            (lora_embedding_A): ParameterDict()
            (lora_embedding_B): ParameterDict()
            (lora_magnitude_vector): ModuleDict()
          )
          (c_proj): Conv1D(nf=768, nx=768)
          (attn_dropout): Dropout(p=0.1, inplace

In [None]:
# Function to predict the next move
def predict_next_move(model, tokenizer, input_moves, max_new_tokens=3):
    # Move model to the GPU
    device = model.device

    # Tokenize the input moves
    inputs = tokenizer(input_moves, return_tensors="pt", padding=True, truncation=True)

    # Move the input tensors to the same device as the model
    input_ids = inputs['input_ids'].to(device)
    attention_mask = inputs['attention_mask'].to(device)  # Ensure attention mask is also on the correct device

    # Ensure pad_token_id is set correctly for open-ended generation
    pad_token_id = tokenizer.pad_token_id if tokenizer.pad_token_id is not None else tokenizer.eos_token_id

    # Generate the next tokens
    outputs = model.generate(
        input_ids=input_ids,
        attention_mask=attention_mask,
        max_new_tokens=max_new_tokens,
        pad_token_id=pad_token_id  # Set pad_token_id
    )

    # Decode and return the predicted moves
    return tokenizer.decode(outputs[0], skip_special_tokens=True)

# Example Usage
input_moves = "e4 c5 Nf3 d6 Ne5 dxe5"
predicted_moves = predict_next_move(model, tokenizer, input_moves)
print(f"Predicted moves: {predicted_moves}")

Predicted moves: W1.e4 B1.c5 W2.Nf3 B2.d6 W3.Ne5 B3.dxe5 W4.Nxe5


#Model-3 (LoRa, Illeagal move penalization)

# Illegal move penalization

In [None]:
pip install python-chess

Collecting python-chess
  Downloading python_chess-1.999-py3-none-any.whl.metadata (776 bytes)
Collecting chess<2,>=1 (from python-chess)
  Downloading chess-1.11.1.tar.gz (156 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m156.5/156.5 kB[0m [31m10.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Downloading python_chess-1.999-py3-none-any.whl (1.4 kB)
Building wheels for collected packages: chess
  Building wheel for chess (setup.py) ... [?25l[?25hdone
  Created wheel for chess: filename=chess-1.11.1-py3-none-any.whl size=148497 sha256=4d8a3784c75aae37150c361a7f058af7d7c2bc2be4ff49267b635c874c5ea0ba
  Stored in directory: /root/.cache/pip/wheels/2e/2d/23/1bfc95db984ed3ecbf6764167dc7526d0ab521cf9a9852544e
Successfully built chess
Installing collected packages: chess, python-chess
Successfully installed chess-1.11.1 python-chess-1.999


In [None]:
import chess
import chess.pgn
import pandas as pd
import re
from transformers import AutoTokenizer, AutoModelForCausalLM, Trainer, TrainingArguments
from peft import LoraConfig, get_peft_model
import torch
from torch.utils.data import Dataset
from accelerate import Accelerator

# Dataset Preparation - Parse Chess Game Moves as Sequences
def parse_chess_games(file_path, limit=50000):
    data = []
    with open(file_path, "r") as file:
        lines = file.readlines()
    for count, line in enumerate(lines):
        if "###" in line:
            if count >= limit:
                break

            # Extract Elo ratings from the metadata
            metadata = line.split("###")[0].strip()
            try:
                welo, belo = metadata.split()[3:5]  # Assuming Elo ratings are the 4th and 5th items
                welo = int(welo) if welo != "None" else 0
                belo = int(belo) if belo != "None" else 0
            except (IndexError, ValueError):
                continue  # Skip if parsing fails

            # Filter based on Elo ratings
            if welo < 2500 or belo < 2500:
                continue

            # Extract moves
            raw_moves = line.split("###")[1].strip()
            moves = re.findall(r"\.\s*([^\s]+)", raw_moves)  # Matches moves after a dot
            if not moves:  # Skip if no moves found
                continue

            # Combine all moves into a single sequence
            game_sequence = " ".join(moves).strip()
            if game_sequence:  # Ensure non-empty
                data.append(game_sequence)

    return pd.DataFrame(data, columns=["text"])


# Dataset Class for Language Modeling
class ChessDataset(Dataset):
    def __init__(self, data, tokenizer, max_length=128):
        self.data = data
        self.tokenizer = tokenizer
        self.max_length = max_length

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        text = self.data.iloc[idx]["text"]
        tokenized = self.tokenizer(
            text,
            truncation=True,
            padding="max_length",
            max_length=self.max_length,
            return_tensors="pt",
        )
        input_ids = tokenized.input_ids.squeeze()
        attention_mask = tokenized.attention_mask.squeeze()
        return {"input_ids": input_ids, "attention_mask": attention_mask, "labels": input_ids}

# Function to check legality of moves
def is_legal_move(board, move):
    try:
        board.push_san(move)
        board.pop()
        return True
    except ValueError:
        return False

# Custom Trainer with Modified Loss
class CustomTrainer(Trainer):
    def compute_loss(self, model, inputs, return_outputs=False, num_items_in_batch=None):  # Add num_items_in_batch argument
        # Get model outputs
        outputs = model(**inputs)
        logits = outputs.logits  # Shape: (batch_size, seq_len, vocab_size)
        labels = inputs["labels"]  # Shape: (batch_size, seq_len)

        # Shift logits and labels for causal language modeling
        shift_logits = logits[..., :-1, :].contiguous()
        shift_labels = labels[..., 1:].contiguous()

        # Calculate loss
        loss_fct = torch.nn.CrossEntropyLoss(reduction="none")
        loss = loss_fct(shift_logits.view(-1, shift_logits.size(-1)), shift_labels.view(-1))

        # Penalize illegal moves
        batch_size, seq_len = shift_labels.shape
        board = chess.Board()
        for i in range(batch_size):
            move_sequence = []
            for j in range(seq_len):
                token = shift_labels[i, j].item()
                if token == -100:  # Skip padding
                    continue
                move = tokenizer.decode([token]).strip()
                if not is_legal_move(board, move):
                    loss[i * seq_len + j] *= 2  # Increase penalty for illegal moves
                else:
                    board.push_san(move)  # Apply the move to the board

        return (loss.mean(), outputs) if return_outputs else loss.mean()

# Load and Prepare Data
file_path = "/content/all_with_filtered_anotations_since1998 copy.txt"
raw_data = parse_chess_games(file_path)
train_data = raw_data.sample(frac=0.8, random_state=42)
val_data = raw_data.drop(train_data.index)
print(f"Dataset Size: {len(train_data)} training samples")
print("Example Data:", train_data.iloc[0])

# Model and Tokenizer Setup
model_name = "gpt2"
tokenizer = AutoTokenizer.from_pretrained(model_name)
if tokenizer.pad_token_id is None:
    tokenizer.pad_token_id = tokenizer.eos_token_id

base_model = AutoModelForCausalLM.from_pretrained(model_name)

# LoRA Configuration for Efficient Fine-Tuning
lora_config = LoraConfig(
    r=8, lora_alpha=32, target_modules=["c_attn"], lora_dropout=0.1
)
model = get_peft_model(base_model, lora_config)

# Create Datasets
train_dataset = ChessDataset(train_data, tokenizer)
val_dataset = ChessDataset(val_data, tokenizer)

# Accelerator setup
accelerator = Accelerator()

# Training Arguments
training_args = TrainingArguments(
    output_dir="./results",
    evaluation_strategy="epoch",
    per_device_train_batch_size=8,
    per_device_eval_batch_size=8,
    num_train_epochs=4,
    learning_rate=5e-5,
    weight_decay=0.01,
    logging_dir="./logs",
    save_strategy="epoch",
    save_total_limit=2,
    fp16=True,  # Enable mixed precision for GPU
)

# Initialize Custom Trainer
trainer = CustomTrainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=val_dataset,
    tokenizer=tokenizer,
)

# Move model and data to GPU using Accelerator
model, train_dataset, val_dataset = accelerator.prepare(
    model, train_dataset, val_dataset
)

# Fine-Tune the Model
trainer.train()

# Save the Fine-Tuned Model
model.save_pretrained("./fine_tuned_model_unsupe")
tokenizer.save_pretrained("./fine_tuned_model_unsupe")


Dataset Size: 29587 training samples
Example Data: text    d4 d5 c4 e6 Nc3 Nf6 cxd5 exd5 Bg5 Be7 e3 c6 Qc...
Name: 3593, dtype: object


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


tokenizer_config.json:   0%|          | 0.00/26.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/665 [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/1.04M [00:00<?, ?B/s]

merges.txt:   0%|          | 0.00/456k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/1.36M [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/548M [00:00<?, ?B/s]

generation_config.json:   0%|          | 0.00/124 [00:00<?, ?B/s]

  trainer = CustomTrainer(
[34m[1mwandb[0m: Using wandb-core as the SDK backend.  Please refer to https://wandb.me/wandb-core for more information.


<IPython.core.display.Javascript object>

[34m[1mwandb[0m: Logging into wandb.ai. (Learn how to deploy a W&B server locally: https://wandb.me/wandb-server)
[34m[1mwandb[0m: You can find your API key in your browser here: https://wandb.ai/authorize
wandb: Paste an API key from your profile and hit enter, or press ctrl+c to quit:

 ··········


[34m[1mwandb[0m: Appending key for api.wandb.ai to your netrc file: /root/.netrc


Epoch,Training Loss,Validation Loss
1,3.5662,No log
2,3.3944,No log
3,3.3302,No log
4,3.2943,No log


('./fine_tuned_model_unsupe/tokenizer_config.json',
 './fine_tuned_model_unsupe/special_tokens_map.json',
 './fine_tuned_model_unsupe/vocab.json',
 './fine_tuned_model_unsupe/merges.txt',
 './fine_tuned_model_unsupe/added_tokens.json',
 './fine_tuned_model_unsupe/tokenizer.json')

In [None]:
def predict_next_move(model, tokenizer, input_moves, max_new_tokens=3):
    # Move model to the GPU
    device = model.device

    # Tokenize the input moves
    inputs = tokenizer(input_moves, return_tensors="pt", padding=True, truncation=True)

    # Move the input tensors to the same device as the model
    input_ids = inputs['input_ids'].to(device)
    attention_mask = inputs['attention_mask'].to(device)  # Ensure attention mask is also on the correct device

    # Ensure pad_token_id is set correctly for open-ended generation
    pad_token_id = tokenizer.pad_token_id if tokenizer.pad_token_id is not None else tokenizer.eos_token_id

    # Generate the next tokens
    outputs = model.generate(
        input_ids=input_ids,
        attention_mask=attention_mask,
        max_new_tokens=max_new_tokens,
        pad_token_id=pad_token_id  # Set pad_token_id
    )

    # Decode and return the predicted moves
    return tokenizer.decode(outputs[0], skip_special_tokens=True)
#e4 c5
# Example Usage
input_moves = "e4 e5 Nf3 Nf6 Bb5 Nxe4"
predicted_moves = predict_next_move(model, tokenizer, input_moves)
print(f"Predicted moves: {predicted_moves}")


Predicted moves: e4 e5 Nf3 Nf6 Bb5 Nxe4 Nxe4


#Model-4 ChessGpt(Scratch)

In [3]:
# prompt: install chess
!pip install python-chess

Collecting python-chess
  Using cached python_chess-1.999-py3-none-any.whl.metadata (776 bytes)
Collecting chess<2,>=1 (from python-chess)
  Downloading chess-1.11.1.tar.gz (156 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m156.5/156.5 kB[0m [31m4.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Downloading python_chess-1.999-py3-none-any.whl (1.4 kB)
Building wheels for collected packages: chess
  Building wheel for chess (setup.py) ... [?25l[?25hdone
  Created wheel for chess: filename=chess-1.11.1-py3-none-any.whl size=148497 sha256=741e84dcb3b64e0f56dbfd9fe17f393925e9c4769ec97c3179d03080e31051ca
  Stored in directory: /root/.cache/pip/wheels/2e/2d/23/1bfc95db984ed3ecbf6764167dc7526d0ab521cf9a9852544e
Successfully built chess
Installing collected packages: chess, python-chess
Successfully installed chess-1.11.1 python-chess-1.999


In [2]:
!pip install python-chess~=0.26
!pip install livelossplot==0.3.4
!wget https://www.dropbox.com/sh/75gzfgu7qo94pvh/AACk_w5M94GTwwhSItCqsemoa/Stockfish%205/stockfish-5-linux.zip
!unzip stockfish-5-linux.zip

Collecting python-chess~=0.26
  Downloading python_chess-0.31.4-py3-none-any.whl.metadata (12 kB)
Downloading python_chess-0.31.4-py3-none-any.whl (134 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m134.6/134.6 kB[0m [31m3.7 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: python-chess
Successfully installed python-chess-0.31.4
Collecting livelossplot==0.3.4
  Downloading livelossplot-0.3.4-py3-none-any.whl.metadata (5.0 kB)
Collecting jedi>=0.16 (from ipython>=5.0.0->ipykernel->notebook->livelossplot==0.3.4)
  Downloading jedi-0.19.2-py2.py3-none-any.whl.metadata (22 kB)
Downloading livelossplot-0.3.4-py3-none-any.whl (12 kB)
Downloading jedi-0.19.2-py2.py3-none-any.whl (1.6 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.6/1.6 MB[0m [31m21.3 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: jedi, livelossplot
Successfully installed jedi-0.19.2 livelossplot-0.3.4
--2024-12-10 01:56:36--  https://www.dropbox.

In [3]:
!chmod +x stockfish-5-linux/Linux/stockfish_14053109_x64

In [4]:
import chess
import chess.engine

In [5]:
# Cell 1: Imports
import os
import math
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader, random_split
from torch.nn.modules.transformer import TransformerEncoder, TransformerEncoderLayer
from tqdm import tqdm
import numpy as np
import re
from pathlib import Path



In [7]:
import os
import re


if __name__ == "__main__":
    vocab_counter = set()

    with open(f"dataset/processed_kaggle2.txt", "w", encoding="utf-8") as outf:
        with open("/content/dataset/all_with_filtered_anotations_since1998 copy (1).txt", "r", encoding="utf-8") as inpf:
            for line in inpf:
                try:
                    ostr = line.split("###")[1].strip()
                    ostr = re.sub("W\d+.", "", ostr)
                    ostr = re.sub("B\d+.", "", ostr)

                    if len(ostr) > 0:
                        if ostr[-1] != '\n':
                            ostr = ostr + '\n'

                        outf.write(ostr)

                        for move in ostr.split(" "):
                            move = move.replace("\n", "")

                            if move != "":
                                vocab_counter.add(move)
                    else:
                        a = 0
                except:
                    pass

        os.makedirs("vocabs", exist_ok=True)

        with open(f"vocabs/kaggle2.txt", "w", encoding="utf-8") as f:
            for v in vocab_counter:
                f.write(v + "\n")

In [6]:
import os


VOCAB_DIR = "vocabs"


class Tokenizer:
    pad_token_index: int = 0
    bos_token_index: int = 1
    eos_token_index: int = 2
    unk_token_index: int = 3

    pad_token: str = "<pad>"
    bos_token: str = "<bos>"
    eos_token: str = "<eos>"
    unk_token: str = "<unk>"

    def __init__(self, vocab_path: str = f"{VOCAB_DIR}/kaggle2_vocab.txt") -> None:
        self.vocab_dict = {
            self.pad_token: self.pad_token_index,
            self.bos_token: self.bos_token_index,
            self.eos_token: self.eos_token_index,
            self.unk_token: self.unk_token_index,
        }

        with open(vocab_path, "r", encoding="utf-8") as f:
            for i, token in enumerate(f):
                self.vocab_dict[token.replace("\n", "")] = i + 4

    def encode(self, token_str: str, add_bos_token=True):
        encoded = []

        if add_bos_token:
            encoded.append(self.bos_token_index)

        for token in token_str.split():
            if token in self.vocab_dict:
                encoded.append(self.vocab_dict[token])
            else:
                encoded.append(self.unk_token_index)

        return encoded

    def decode(self, token_ids: list):
        decoded = []

        for token_id in token_ids:
            for token, index in self.vocab_dict.items():
                if index == token_id:
                    decoded.append(token)

        return " ".join(decoded)


    def vocab_size(self) -> int:
        return len(self.vocab_dict)


    @classmethod
    def generate_vocab(cls, dataset_path: str):
        from pathlib import Path
        from tqdm import tqdm

        vocab_counter = set()

        for game in tqdm(Path(dataset_path).glob("*.txt")):
            game = game.read_text(encoding="utf-8")
            for move in game.split(" "):
                move = move.replace("\n", "")

                if move != "":
                    vocab_counter.add(move)

        os.makedirs(VOCAB_DIR, exist_ok=True)

        with open(f"{VOCAB_DIR}/kaggle2.txt", "w", encoding="utf-8") as f:
            for v in vocab_counter:
                f.write(v + "\n")


if __name__ == "__main__":
    # Tokenizer.generate_vocab("dataset/kaggle2/")
    tokenizer = Tokenizer("/content/kaggle2_vocab.txt")
    encoded = tokenizer.encode("d4 d5 c4 c6 cxd5 e6 dxe6 fxe6 Nf3 pepe Bb4+ Nc3 Ba5 Bf4 <eos>")
    decoded = tokenizer.decode(encoded)
    print(encoded)
    print(decoded)


[1, 7868, 3527, 6882, 5263, 8288, 7190, 9989, 7788, 463, 3, 365, 3589, 7400, 5293, 2]
<bos> d4 d5 c4 c6 cxd5 e6 dxe6 fxe6 Nf3 <unk> Bb4+ Nc3 Ba5 Bf4 <eos>


In [7]:
# Cell 2: PGNDataset Class
class PGNDataset(Dataset):
    def __init__(self, tokenizer: Tokenizer, path: str, n_positions=512):
        self.n_positions = n_positions
        self.tokenizer = tokenizer
        self.games = []

        with open(path, "r", encoding="utf-8") as f:
            for line in f:
                self.games.append(line)

        print("Dataset read.")

    def __pad(self, sample: list):
        while len(sample) < self.n_positions:
            sample.append(self.tokenizer.pad_token_index)
        return sample[:self.n_positions]

    def __len__(self):
        return len(self.games)

    def __getitem__(self, i):
        game = self.games[i]
        encoded = self.tokenizer.encode(game, add_bos_token=True)

        if len(encoded) < self.n_positions:
            encoded.append(self.tokenizer.eos_token_index)

        data = self.__pad(encoded)
        return torch.tensor(data)


In [8]:
import os
import math

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.nn.modules.transformer import TransformerEncoder, TransformerEncoderLayer




# DIR = os.path.dirname(os.path.realpath(__file__))
DEVICE = "cuda"


class PositionalEncoding(nn.Module):
    def __init__(self, dim_model, dropout_p, max_len):
        super().__init__()
        # Modified version from: https://pytorch.org/tutorials/beginner/transformer_tutorial.html
        # max_len determines how far the position can have an effect on a token (window)

        # Info
        self.dropout = nn.Dropout(dropout_p)

        # Encoding - From formula
        pos_encoding = torch.zeros(max_len, dim_model)
        positions_list = torch.arange(
            0, max_len, dtype=torch.float).view(-1, 1)  # 0, 1, 2, 3, 4, 5
        division_term = torch.exp(torch.arange(0, dim_model, 2).float(
        ) * (-math.log(10000.0)) / dim_model)  # 1000^(2i/dim_model)

        # PE(pos, 2i) = sin(pos/1000^(2i/dim_model))
        pos_encoding[:, 0::2] = torch.sin(positions_list * division_term)

        # PE(pos, 2i + 1) = cos(pos/1000^(2i/dim_model))
        pos_encoding[:, 1::2] = torch.cos(positions_list * division_term)

        # Saving buffer (same as parameter without gradients needed)
        pos_encoding = pos_encoding.unsqueeze(0).transpose(0, 1)
        self.register_buffer("pos_encoding", pos_encoding)

    def forward(self, token_embedding: torch.Tensor) -> torch.Tensor:
        # Residual connection + pos encoding
        return self.dropout(token_embedding + self.pos_encoding[:token_embedding.size(0), :])


class Transformer(nn.Module):

    def __init__(
        self,
        tokenizer: Tokenizer,
        num_tokens: int,
        dim_model: int,
        num_heads: int,
        d_hid: int,
        num_layers: int,
        dropout_p: float,
        n_positions: int,
    ):
        super().__init__()

        self.tokenizer = tokenizer

        # INFO
        self.model_type = "Transformer"
        self.dim_model = dim_model
        self.n_positions = n_positions

        # LAYERS
        self.positional_encoder = PositionalEncoding(
            dim_model=dim_model, dropout_p=dropout_p, max_len=n_positions
        )
        self.embedding = nn.Embedding(
            num_tokens, dim_model, padding_idx=self.tokenizer.pad_token_index)

        encoder_layers = TransformerEncoderLayer(
            dim_model,
            num_heads,
            d_hid,
            dropout_p,
            batch_first=False,
            activation=F.gelu,
            norm_first=True,
        )
        self.transformer_encoder = TransformerEncoder(
            encoder_layers, num_layers)

        self.out = nn.Linear(dim_model, num_tokens)

        self.init_weights()

    def init_weights(self) -> None:
        nn.init.xavier_uniform_(self.embedding.weight)
        nn.init.xavier_uniform_(self.out.weight)

    def forward(self, src, src_mask=None, src_pad_mask=None) -> torch.Tensor:
        # Embedding + positional encoding - Out size = (batch_size, sequence length, dim_model)
        src = self.embedding(src) * math.sqrt(self.dim_model)
        src = self.positional_encoder(src)

        # Transformer blocks - Out size = (sequence length, batch_size, num_tokens)
        transformer_out = self.transformer_encoder(
            src,
            src_mask,
            src_pad_mask,
        )

        out = self.out(transformer_out)

        return F.log_softmax(out, dim=-1)

    def get_src_mask(self, sz) -> torch.Tensor:
        return torch.triu(torch.ones(sz, sz) * float('-inf'), diagonal=1)

    def get_pad_mask(self, matrix: torch.Tensor, pad_token: int) -> torch.Tensor:
        return (matrix == pad_token).t()

    def predict(
        self,
        input_string: str = "<bos>",
        max_length=80,
        stop_at_next_move=False,
        temperature=0.5
    ) -> str:
        import chess

        board = chess.Board()
        self.eval()

        input_sequence = self.tokenizer.encode(
            input_string, add_bos_token=False)

        for token in input_string.split(" ")[1:]:
            board.push_san(token)

        if board.is_checkmate():
            input_string += " <eos>"

        y_input = torch.tensor(
            [input_sequence], dtype=torch.long, device="cpu").t()

        if stop_at_next_move:
            max_length = 1
        else:
            max_length -= len(input_sequence)

        for _ in range(max_length):
            y_size = y_input.size(0)
            begin_loc = max(y_size - self.n_positions, 0)

            if y_size > self.n_positions and begin_loc % 2 != 0:
                # Let's help the model know what turn it is
                begin_loc += 1

            end_loc = min(begin_loc + self.n_positions, y_size)
            input_ids = y_input[begin_loc:end_loc]

            src_mask = self.get_src_mask(input_ids.size(0)).to("cpu")
            pad_mask = self.get_pad_mask(
                input_ids, self.tokenizer.pad_token_index).to("cpu")

            pred = self.forward(input_ids, src_mask, pad_mask)

            word_weights = pred[-1].squeeze().div(temperature).exp()
            word_idx = torch.multinomial(word_weights, 10)

            for wi in word_idx:
                decoded = self.tokenizer.decode([wi])
                try:
                    board.parse_san(decoded)
                    word_idx = wi
                    break
                except:
                    continue

            if word_idx.ndim > 0:
                # If the model doesn't know what to move, surrenders
                next_item = torch.tensor([[self.tokenizer.eos_token_index]], device="cpu")
                y_input = torch.cat((y_input, next_item), dim=0)
                break

            next_item = torch.tensor([[word_idx]], device="cpu")
            board.push_san(self.tokenizer.decode([next_item]))

            # Concatenate previous input with predicted best word
            y_input = torch.cat((y_input, next_item), dim=0)

            if board.is_checkmate():
                # If it checkmates the opponent, return with <eos>
                next_item = torch.tensor([[self.tokenizer.eos_token_index]], device="cpu")
                y_input = torch.cat((y_input, next_item), dim=0)
                break

            # Stop if model predicts end of sentence
            if next_item.view(-1).item() == self.tokenizer.eos_token_index:
                break

        return self.tokenizer.decode(y_input.view(-1).tolist())

In [None]:
# Configuration Constants
n_positions = 80
dim_model = 768
d_hid = 3072
num_heads = 12
num_layers = 12
dropout_p = 0.1

import os
import argparse
from tqdm import tqdm
import numpy as np
import torch
from torch.utils.data import DataLoader, random_split



def _parse_args():
    parser = argparse.ArgumentParser(description='Chessformers trainer parser')

    # Providing default values for arguments
    parser.add_argument('--tokenizer', type=str, default="vocabs/kaggle2_vocab.txt", help='location of the tokenizer file')
    parser.add_argument('--dataset', type=str, default="dataset/processed_kaggle2.txt", help='location of the dataset')
    parser.add_argument('--batch_size', type=int, default=64, help='training batch size')
    parser.add_argument('--epochs', type=int, default=1, help='number of training epochs')
    parser.add_argument('--lr', type=float, default=0.00025, help='learning rate')
    parser.add_argument('--beta1', type=float, default=0.9, help='adam beta')
    parser.add_argument('--save_dir', type=str, default='./model', help='save model directory')
    parser.add_argument('--load_model', type=str, default=None, help='model to load and resume training')

    # Use parse_known_args to handle unknown arguments in Jupyter Notebook
    args, unknown = parser.parse_known_args()
    return args


class Trainer:
    def __init__(self, model, train_loader, val_loader, loss_fn, save_dir, learning_rate, num_epochs, adam_beta):
        self.save_dir = save_dir
        self.model = model
        self.train_loader = train_loader
        self.val_loader = val_loader
        self.lr = learning_rate
        self.loss_fn = loss_fn
        self.num_epochs = num_epochs

        self.optimizer = torch.optim.Adam(
            self.model.parameters(), lr=self.lr, betas=(adam_beta, 0.999))

        self.device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
        print(f'Selected device: {self.device}.')

        self.model.to(self.device)

    def train_epoch(self):
        self.model.train()
        train_loss = []
        for local_batch in tqdm(self.train_loader):
            X = local_batch.to(self.device).t().contiguous()
            y_input = X[:-1]
            y_expected = X[1:].reshape(-1)

            src_mask = self.model.get_src_mask(y_input.size(0)).to(self.device)
            pad_mask = self.model.get_pad_mask(
                y_input, self.model.tokenizer.pad_token_index).to(self.device)

            pred = self.model(y_input, src_mask, pad_mask)
            loss = self.loss_fn(pred.view(-1, self.model.tokenizer.vocab_size()), y_expected)

            self.optimizer.zero_grad()
            loss.backward()
            self.optimizer.step()
            train_loss.append(loss.detach().cpu().numpy())
        return np.mean(train_loss)

    def test_epoch(self):
        self.model.eval()
        total_loss = 0.0
        with torch.no_grad():
            for local_batch in self.val_loader:
                X = local_batch.to(self.device).t().contiguous()
                y_input = X[:-1]
                y_expected = X[1:].reshape(-1)

                src_mask = self.model.get_src_mask(y_input.size(0)).to(self.device)
                pad_mask = self.model.get_pad_mask(
                    y_input, self.model.tokenizer.pad_token_index).to(self.device)

                pred = self.model(y_input, src_mask, pad_mask)
                loss = self.loss_fn(pred.view(-1, self.model.tokenizer.vocab_size()), y_expected)
                total_loss += loss
        return total_loss / len(self.val_loader)

    def train(self):
        best_val_loss = float('inf')
        for epoch in range(self.num_epochs):
            print(f"\n -------- EPOCH {epoch + 1}/{self.num_epochs} --------")
            train_loss = self.train_epoch()
            val_loss = self.test_epoch()

            print(f"Train Loss: {train_loss:.4f} | Validation Loss: {val_loss:.4f}")

            if val_loss < best_val_loss:
                best_val_loss = val_loss
                torch.save(self.model.state_dict(), os.path.join(self.save_dir, f"model_epoch_{epoch + 1}.pth"))

        torch.save(self.model.state_dict(), os.path.join(self.save_dir, "final_model.pth"))


def main(args):
    os.makedirs(args.save_dir, exist_ok=True)
    tokenizer = Tokenizer(args.tokenizer)

    # Prepare the data
    dataset = PGNDataset(tokenizer, args.dataset, n_positions=n_positions)
    train_len = int(0.8 * len(dataset))
    train_data, val_data = random_split(dataset, [train_len, len(dataset) - train_len])

    train_loader = DataLoader(train_data, batch_size=args.batch_size, shuffle=True)
    val_loader = DataLoader(val_data, batch_size=args.batch_size, shuffle=False)

    # Define the model
    model = Transformer(
        tokenizer=tokenizer,
        num_tokens=tokenizer.vocab_size(),
        dim_model=dim_model,
        d_hid=d_hid,
        num_heads=num_heads,
        num_layers=num_layers,
        dropout_p=dropout_p,
        n_positions=n_positions
    )

    if args.load_model:
        print("Loading pre-trained model...")
        model.load_state_dict(torch.load(args.load_model))

    loss_fn = torch.nn.NLLLoss(ignore_index=tokenizer.pad_token_index)
    trainer = Trainer(model, train_loader, val_loader, loss_fn, args.save_dir, args.lr, args.epochs, args.beta1)
    trainer.train()


if __name__ == "__main__":
    args = _parse_args()
    main(args)


In [9]:
"""
Script used to play against the chessformers.
Human plays as white.
"""

import argparse
import torch


n_positions = 80
dim_model = 768
d_hid = 3072
num_heads = 12
num_layers = 12
dropout_p = 0.1

def _parse_args():
    parser = argparse.ArgumentParser(
        description='Chessformers inference parser')

    parser.add_argument('--load_model', type=str, default="/content/ChessGpt_Scratch.pth",
                        help='model to load and do inference')

    parser.add_argument('--tokenizer', type=str, default="/content/kaggle2_vocab.txt",
                        help='location of the tokenizer file')

    args, unknown = parser.parse_known_args()
    return args


def main(args) -> None:
    tokenizer = Tokenizer(args.tokenizer)
    model = Transformer(tokenizer,
                        num_tokens=tokenizer.vocab_size(),
                        dim_model=dim_model,
                        d_hid=d_hid,
                        num_heads=num_heads,
                        num_layers=num_layers,
                        dropout_p=dropout_p,
                        n_positions=n_positions,
                        )
    # model.load_state_dict(torch.load(args.load_model))
    model.load_state_dict(torch.load(args.load_model, map_location=torch.device('cpu')))

    print(
        "===== CHESSFORMERS ENGINE =====\n"
    + "    Enter valid moves in PGN format.\n"
    + "    Enter \\b to undo a move.\n"
    + "    Enter \\m to show all moves\n"
    )

    input_string = "<bos>"
    boards = [input_string]

    while (len(input_string.split(" ")) < n_positions
           and input_string.split(" ")[-1] != tokenizer.eos_token):
        next_move = input("WHITE MOVE: ")

        if next_move == "\\m":
            print(input_string)
            continue
        elif next_move == "\\b":
            if len(boards) > 1:
                boards.pop()

            input_string = boards[-1]
            continue

        prev_input_string = input_string
        input_string += " " + next_move
        print(input_string)
        try:
            input_string = model.predict(
                input_string,
                stop_at_next_move=True,
                temperature=0.2,
                )
            boards.append(input_string)
            print("BLACK MOVE:", input_string.split(" ")[-1])
        except ValueError:
            input_string = prev_input_string
            print("ILLEGAL MOVE. Please, try again.")
        except Exception as e:
            print("UNHANDLED EXCEPTION. Please, try again.")

    print("--- Final board ---")
    print(input_string)


if __name__ == "__main__":
    args = _parse_args()
    main(args)

  model.load_state_dict(torch.load(args.load_model, map_location=torch.device('cpu')))


===== CHESSFORMERS ENGINE =====
    Enter valid moves in PGN format.
    Enter \b to undo a move.
    Enter \m to show all moves

WHITE MOVE: e4
<bos> e4




BLACK MOVE: c5
WHITE MOVE: c4
<bos> e4 c5 c4
BLACK MOVE: Nc6
WHITE MOVE: Nf3
<bos> e4 c5 c4 Nc6 Nf3
BLACK MOVE: g6
WHITE MOVE: Nc3
<bos> e4 c5 c4 Nc6 Nf3 g6 Nc3
BLACK MOVE: Bg7
WHITE MOVE: Bd3
<bos> e4 c5 c4 Nc6 Nf3 g6 Nc3 Bg7 Bd3
BLACK MOVE: d6
WHITE MOVE: O-O
<bos> e4 c5 c4 Nc6 Nf3 g6 Nc3 Bg7 Bd3 d6 O-O
BLACK MOVE: Nf6
WHITE MOVE: h3
<bos> e4 c5 c4 Nc6 Nf3 g6 Nc3 Bg7 Bd3 d6 O-O Nf6 h3
BLACK MOVE: O-O
WHITE MOVE: a3
<bos> e4 c5 c4 Nc6 Nf3 g6 Nc3 Bg7 Bd3 d6 O-O Nf6 h3 O-O a3
BLACK MOVE: a6
WHITE MOVE: Bd5
<bos> e4 c5 c4 Nc6 Nf3 g6 Nc3 Bg7 Bd3 d6 O-O Nf6 h3 O-O a3 a6 Bd5
ILLEGAL MOVE. Please, try again.
WHITE MOVE: Nd5
<bos> e4 c5 c4 Nc6 Nf3 g6 Nc3 Bg7 Bd3 d6 O-O Nf6 h3 O-O a3 a6 Nd5
BLACK MOVE: e6
WHITE MOVE: Nxf6+
<bos> e4 c5 c4 Nc6 Nf3 g6 Nc3 Bg7 Bd3 d6 O-O Nf6 h3 O-O a3 a6 Nd5 e6 Nxf6+
BLACK MOVE: Bxf6
WHITE MOVE: Rb1
<bos> e4 c5 c4 Nc6 Nf3 g6 Nc3 Bg7 Bd3 d6 O-O Nf6 h3 O-O a3 a6 Nd5 e6 Nxf6+ Bxf6 Rb1
BLACK MOVE: Rb8
WHITE MOVE: b3
<bos> e4 c5 c4 Nc6 Nf3 g6 Nc3 Bg7 Bd3 d6 O-O Nf6 h3 

In [None]:
import chess#https://github.com/niklasf/python-chess

board = chess.Board()
board

# Evaluation code(Playing against Chess engine)

In [None]:
import chess
import chess.engine
import argparse
import torch

n_positions = 80
dim_model = 768
d_hid = 3072
num_heads = 12
num_layers = 12
dropout_p = 0.1

def _parse_args():
    parser = argparse.ArgumentParser(
        description='Chessformers inference parser')

    parser.add_argument('--load_model', type=str, default="/content/chessformer_epoch_13.pth",
                        help='model to load and do inference')
    parser.add_argument('--tokenizer', type=str, default="/content/vocabs/kaggle2_vocab.txt",
                        help='location of the tokenizer file')
    parser.add_argument('--engine_path', type=str, default="/content/stockfish-5-linux/Linux/stockfish_14053109_x64",
                        help='Path to the chess engine executable')
    parser.add_argument('--games', type=int, default=1000,
                        help='Number of games to simulate')

    args, _ = parser.parse_known_args()
    return args


def play_game(model, tokenizer, engine, results, game_number):
    board = chess.Board()
    input_string = "<bos>"
    print(f"\nGame {game_number} Start")
    engine.configure({"Skill Level": 1})
    # engine.configure({"UCI_LimitStrength": True, "UCI_Elo": 800})

    while not board.is_game_over() and len(input_string.split(" ")) < n_positions:
        try:
            if board.turn:  # White's turn (Model)
                lastmove = input_string.split(" ")[-1]
                input_string = model.predict(
                    input_string,
                    stop_at_next_move=True,
                    temperature=0.2,
                )
                # print(input_string)

                move = input_string.split(" ")[-1]
                if(move=="<eos>"):

                  if(lastmove==move):
                    if not board.is_game_over():
                        print("Unknown error")
                        print(f"debug: {input_string}")
                        results["Unknown"]["Unknown"] += 1
                        return "Unknown Result"
                    else:
                      print("Result: Checkmate")
                      if board.turn == chess.BLACK:
                          results["White Wins"]["Checkmate"] += 1
                          return "White Wins (Checkmate)"
                      else:
                          results["Black Wins"]["Checkmate"] += 1
                          return "Black Wins (Checkmate)"
                  else:
                    print("Game Over <eos>")
                    results['EOS']['EOS'] += 1
                    move = input_string.split(" ")[-2]
                    if not board.is_game_over():
                        print("Unknown error")
                        print(f"debug: {input_string}")

                    else:
                      print(f"EOS: {input_string}")
                      print("Result: Checkmate")
                      if board.turn == chess.BLACK:
                          results["White Wins"]["Checkmate"] += 1
                          return "White Wins (Checkmate)"
                      else:
                          results["Black Wins"]["Checkmate"] += 1
                          return "Black Wins (Checkmate)"


                # print(f"Predicted move (SAN): {move}")

                try:
                    # Parse SAN move and check legality
                    san_move_obj = board.parse_san(move)
                    # print(f"Predicted move (obj): {san_move_obj}")
                    if san_move_obj in board.legal_moves:
                        board.push(san_move_obj)
                        # print(f"White (Model): {move}")
                    else:
                        raise ValueError(f"Illegal move: {move}")
                except (chess.IllegalMoveError, ValueError) as e:
                    print(f"Model predicted an invalid move: {e}")
                    break
            else:  # Black's turn (Engine)
                result = engine.play(board, chess.engine.Limit(time=0.005))
                san_move = board.san(result.move)
                input_string = input_string + " " + san_move
                # print(f"Black (Engine): {san_move}")
                board.push(result.move)

        except Exception as e:
            print(f"An error occurred during the game loop: {e}")
            break
        if board.is_repetition():
            print("Result: Draw (Threefold Repetition)")
            results["Draw"]["Threefold Repetition"] += 1
            return "Draw (Threefold Repetition)"

    # Print the final board state
    print("\nFinal Board Position:")
    print(board)
    if(len(input_string.split(" ")) >= n_positions):
      results["Draw"]["80-move Rule"] += 1
      return "Draw (80+ moves)"

    # Determine result
    if board.is_checkmate():
        print("Result: Checkmate")
        if board.turn == chess.BLACK:
            results["White Wins"]["Checkmate"] += 1
            return "White Wins (Checkmate)"
        else:
            results["Black Wins"]["Checkmate"] += 1
            return "Black Wins (Checkmate)"
    elif board.is_stalemate():
        print("Result: Stalemate")
        results["Draw"]["Stalemate"] += 1
        return "Draw (Stalemate)"
    elif board.is_insufficient_material():
        print("Result: Draw (Insufficient Material)")
        results["Draw"]["Insufficient Material"] += 1
        return "Draw (Insufficient Material)"
    elif board.is_seventyfive_moves():
        print("Result: Draw (80-move Rule)")
        results["Draw"]["80-move Rule"] += 1
        return "Draw (80-move Rule)"

    results["Unknown"]["Unknown"] += 1
    print(f"UNKnown: {input_string}")
    return "Unknown Result"


def main(args):
    # Load the tokenizer and model
    tokenizer = Tokenizer(args.tokenizer)
    model = Transformer(
        tokenizer,
        num_tokens=tokenizer.vocab_size(),
        dim_model=dim_model,
        d_hid=d_hid,
        num_heads=num_heads,
        num_layers=num_layers,
        dropout_p=dropout_p,
        n_positions=n_positions,
    )
    model.load_state_dict(torch.load(args.load_model, map_location=torch.device('cpu')))

    # Initialize results tracker
    results = {
        "White Wins": {"Checkmate": 0},
        "Black Wins": {"Checkmate": 0},
        "Draw": {"Stalemate": 0, "Insufficient Material": 0, "80-move Rule": 0, "Threefold Repetition": 0},
        "Unknown":{"Unknown": 0},
        "EOS":{"EOS":0},
    }

    # Load the chess engine
    with chess.engine.SimpleEngine.popen_uci(args.engine_path) as engine:
        for game_number in range(1, args.games + 1):
            result = play_game(model, tokenizer, engine, results, game_number)

            # Print results after each game
            print(f"\nGame {game_number} Result: {result}")
            print("\nCumulative Results:")
            print(f"  White Wins: {results['White Wins']['Checkmate']}")
            print(f"  Black Wins: {results['Black Wins']['Checkmate']}")
            print(f"  Draws: {sum(results['Draw'].values())}")
            print(f"  Unknown: {results['Unknown']['Unknown']}")
            print(f"EOS: {results['EOS']['EOS']}")
            print("\nDetailed Draw Breakdown:")
            for reason, count in results["Draw"].items():
                print(f"    {reason}: {count}")


    # Calculate final statistics
    total_games = sum(
        [results["White Wins"]["Checkmate"], results["Black Wins"]["Checkmate"]] +
        list(results["Draw"].values())
    )
    white_wins = results["White Wins"]["Checkmate"]
    black_wins = results["Black Wins"]["Checkmate"]
    draws = sum(results["Draw"].values())

    print("\nFinal Game Statistics:")
    print(f"Total Games: {total_games}")
    print(f"White Wins: {white_wins}")
    print(f"Black Wins: {black_wins}")
    print(f"Draws: {draws}")
    print(f"  Unknown: {results['Unknown']['Unknown']}")
    print("\nDetailed Draw Breakdown:")
    for reason, count in results["Draw"].items():
        print(f"  {reason}: {count}")

    win_percentage = (white_wins / total_games) * 100
    draw_percentage = (draws / total_games) * 100
    loss_percentage = (black_wins / total_games) * 100

    print(f"\nWin Percentage: {win_percentage:.2f}%")
    print(f"Draw Percentage: {draw_percentage:.2f}%")
    print(f"Loss Percentage: {loss_percentage:.2f}%")


if __name__ == "__main__":
    args = _parse_args()
    main(args)


  model.load_state_dict(torch.load(args.load_model, map_location=torch.device('cpu')))



Game 1 Start




[1;30;43mStreaming output truncated to the last 5000 lines.[0m
  Unknown: 24
EOS: 57

Detailed Draw Breakdown:
    Stalemate: 2
    Insufficient Material: 0
    80-move Rule: 514
    Threefold Repetition: 60

Game 814 Start

Final Board Position:
. . . . r . . k
. . . . . . p .
. p . . . . B .
. . p . . . . Q
p . . . . . . .
P . . . . . . .
. P . . . P P P
. . . . . . K .

Game 814 Result: Draw (80+ moves)

Cumulative Results:
  White Wins: 28
  Black Wins: 185
  Draws: 577
  Unknown: 24
EOS: 57

Detailed Draw Breakdown:
    Stalemate: 2
    Insufficient Material: 0
    80-move Rule: 515
    Threefold Repetition: 60

Game 815 Start

Final Board Position:
r . . . . k . .
. p p . . p . .
p . . . . . . .
. . . . . . . .
. . P P . P . .
. . . . P . . .
P . . . . . . r
R . . . K . q .
Result: Checkmate

Game 815 Result: Black Wins (Checkmate)

Cumulative Results:
  White Wins: 28
  Black Wins: 186
  Draws: 577
  Unknown: 24
EOS: 57

Detailed Draw Breakdown:
    Stalemate: 2
    Insufficie

#Game analysis

In [56]:
import chess
import chess.engine
import argparse
import torch

n_positions = 80
dim_model = 768
d_hid = 3072
num_heads = 12
num_layers = 12
dropout_p = 0.1

import argparse
import chess
import chess.engine

# Define thresholds based on the difference from the best move
BEST_MOVE_THRESHOLD = 0       # Executed move is the best move or better
GOOD_MOVE_THRESHOLD = -10     # Executed move is within 10 centipawns of the best move
MISTAKE_THRESHOLD = -50       # Executed move is within 50 centipawns of the best move
BLUNDER_THRESHOLD = -100     # Executed move is worse than 100 centipawns from the best move

def _parse_args():
    parser = argparse.ArgumentParser(
        description='Chessformers inference parser')

    parser.add_argument('--load_model', type=str, default="/content/chessformer_epoch_13.pth",
                        help='model to load and do inference')
    parser.add_argument('--tokenizer', type=str, default="/content/vocabs/kaggle2_vocab.txt",
                        help='location of the tokenizer file')
    parser.add_argument('--engine_path', type=str, default="/content/stockfish-5-linux/Linux/stockfish_14053109_x64",
                        help='Path to the chess engine executable')
    parser.add_argument('--games', type=int, default=100,
                        help='Number of games to simulate')

    args, _ = parser.parse_known_args()
    return args


def analyze_game(board, engine, results, game_number):
    """
    Analyzes the game by replaying the moves from the start on a fresh board.
    Provides evaluation of the position and identifies critical mistakes or blunders.
    """
    print("\nStarting game analysis...")

    # Create a fresh board for analysis
    analysis_board = chess.Board()

    # Initialize move statistics
    white_best_moves = 0
    white_good_moves = 0
    white_mistakes = 0
    white_blunders = 0
    black_best_moves = 0
    black_good_moves = 0
    black_mistakes = 0
    black_blunders = 0

    # Replay the game move by move with engine evaluation
    for move_number, move in enumerate(board.move_stack, start=1):
        analysis_board.push(move)

        try:
            # Use the engine to analyze the current board position
            evaluation = engine.analyse(analysis_board, chess.engine.Limit(depth=1))

            # Check if the score is a mate evaluation
            if evaluation["score"].is_mate():
                # Handle mate in a special way, potentially set to a large + or - value
                score = float('inf') if evaluation["score"].white().score() > 0 else float('-inf')
            else:
                # Use regular centipawn score
                score = evaluation["score"].white().score()  # Ensure you get the correct type here

        except Exception as e:
            # Skip this move if evaluation fails
            print(f"Move {move_number}: {move} - Error during evaluation: {e}")
            score = 0


        # Determine whose turn it was for this move
        is_white_turn = move_number % 2 == 1  # Odd move numbers are White's moves

        # Define thresholds based on relative difference
        if is_white_turn:  # White's turn
            if score <= BLUNDER_THRESHOLD:  # Check Blunder first
                white_blunders += 1
                move_quality = "Blunder"
            elif score <= MISTAKE_THRESHOLD:  # Then Mistake
                white_mistakes += 1
                move_quality = "Mistake"
            elif score >= BEST_MOVE_THRESHOLD:  # Then Best Move
                white_best_moves += 1
                move_quality = "Best Move"
            elif score >= GOOD_MOVE_THRESHOLD:  # Then Good Move
                white_good_moves += 1
                move_quality = "Good Move"
            else:
                move_quality = "Uncategorized"
        else:  # Black's turn
            if score >= BLUNDER_THRESHOLD:  # Assuming BLUNDER_THRESHOLD is negative for Black
                black_blunders += 1
                move_quality = "Blunder"
            elif score >= MISTAKE_THRESHOLD:
                black_mistakes += 1
                move_quality = "Mistake"
            elif score <= BEST_MOVE_THRESHOLD:
                black_best_moves += 1
                move_quality = "Best Move"
            elif score <= GOOD_MOVE_THRESHOLD:
                black_good_moves += 1
                move_quality = "Good Move"
            else:
                move_quality = "Uncategorized"

        # Debugging output for move details
        print(f"Move {move_number}: {move}, Turn: {'White' if is_white_turn else 'Black'}, "
              f"Score: {score}, Quality: {move_quality}")

    # Store the analysis results for the game
    results["White Best Moves"] += white_best_moves
    results["White Good Moves"] += white_good_moves
    results["White Mistakes"] += white_mistakes
    results["White Blunders"] += white_blunders
    results["Black Best Moves"] += black_best_moves
    results["Black Good Moves"] += black_good_moves
    results["Black Mistakes"] += black_mistakes
    results["Black Blunders"] += black_blunders

    # Print individual game analysis
    print(f"Game {game_number} Analysis:")
    print(f"  White Best Moves: {white_best_moves}, White Good Moves: {white_good_moves}, "
          f"White Mistakes: {white_mistakes}, White Blunders: {white_blunders}")
    print(f"  Black Best Moves: {black_best_moves}, Black Good Moves: {black_good_moves}, "
          f"Black Mistakes: {black_mistakes}, Black Blunders:  {black_blunders}")

    print("Game analysis complete.\n")

def play_game(model, tokenizer, engine, results, game_number):
    board = chess.Board()
    input_string = "<bos>"
    print(f"\nGame {game_number} Start")
    engine.configure({"Skill Level": 1})

    while not board.is_game_over() and len(input_string.split(" ")) < n_positions:
        try:
            if board.turn:  # White's turn (Model)
                lastmove = input_string.split(" ")[-1]
                input_string = model.predict(
                    input_string,
                    stop_at_next_move=True,
                    temperature=0.2,
                )

                move = input_string.split(" ")[-1]
                if(move=="<eos>"):
                    if(lastmove==move):
                        if not board.is_game_over():
                            print("Unknown error")
                            print(f"debug: {input_string}")
                            results["Unknown"]["Unknown"] += 1
                            return "Unknown Result"
                        else:
                            print("Result: Checkmate")
                            if board.turn == chess.BLACK:
                                results["White Wins"]["Checkmate"] += 1
                                return "White Wins (Checkmate)"
                            else:
                                results["Black Wins"]["Checkmate"] += 1
                                return "Black Wins (Checkmate)"
                    else:
                        print("Game Over <eos>")
                        results['EOS']['EOS'] += 1
                        move = input_string.split(" ")[-2]
                        if not board.is_game_over():
                            print("Unknown error")
                            print(f"debug: {input_string}")
                        else:
                            print(f"EOS: {input_string}")
                            print("Result: Checkmate")
                            if board.turn == chess.BLACK:
                                results["White Wins"]["Checkmate"] += 1
                                return "White Wins (Checkmate)"
                            else:
                                results["Black Wins"]["Checkmate"] += 1
                                return "Black Wins (Checkmate)"
                try:
                    san_move_obj = board.parse_san(move)
                    if san_move_obj in board.legal_moves:
                        board.push(san_move_obj)
                    else:
                        raise ValueError(f"Illegal move: {move}")
                except (chess.IllegalMoveError, ValueError) as e:
                    print(f"Model predicted an invalid move: {e}")
                    break
            else:  # Black's turn (Engine)
                result = engine.play(board, chess.engine.Limit(time=0.005))
                san_move = board.san(result.move)
                input_string = input_string + " " + san_move
                board.push(result.move)

        except Exception as e:
            print(f"An error occurred during the game loop: {e}")
            break
        if board.is_repetition():
            print("Result: Draw (Threefold Repetition)")
            results["Draw"]["Threefold Repetition"] += 1
            return "Draw (Threefold Repetition)"

    print("\nFinal Board Position:")
    print(board)



    analyze_game(board, engine, results, game_number)

    if(len(input_string.split(" ")) >= n_positions):
        results["Draw"]["80-move Rule"] += 1
        return "Draw (80+ moves)"

    if board.is_checkmate():
        print("Result: Checkmate")
        if board.turn == chess.BLACK:
            results["White Wins"]["Checkmate"] += 1
            return "White Wins (Checkmate)"
        else:
            results["Black Wins"]["Checkmate"] += 1
            return "Black Wins (Checkmate)"
    elif board.is_stalemate():
        print("Result: Stalemate")
        results["Draw"]["Stalemate"] += 1
        return "Draw (Stalemate)"
    elif board.is_insufficient_material():
        print("Result: Draw (Insufficient Material)")
        results["Draw"]["Insufficient Material"] += 1
        return "Draw (Insufficient Material)"
    elif board.is_seventyfive_moves():
        print("Result: Draw (80-move Rule)")
        results["Draw"]["80-move Rule"] += 1
        return "Draw (80-move Rule)"

    results["Unknown"]["Unknown"] += 1
    print(f"Unknown: {input_string}")
    return "Unknown Result"

def main(args):
    tokenizer = Tokenizer(args.tokenizer)
    model = Transformer(
        tokenizer,
        num_tokens=tokenizer.vocab_size(),
        dim_model=dim_model,
        d_hid=d_hid,
        num_heads=num_heads,
        num_layers=num_layers,
        dropout_p=dropout_p,
        n_positions=n_positions,
    )
    model.load_state_dict(torch.load(args.load_model, map_location=torch.device('cpu')))

    results = {
        "White Wins": {"Checkmate": 0},
        "Black Wins": {"Checkmate": 0},
        "Draw": {"Stalemate": 0, "Insufficient Material": 0, "80-move Rule": 0, "Threefold Repetition": 0},
        "Unknown": {"Unknown": 0},
        "EOS": {"EOS": 0},
        "White Best Moves": 0,
        "White Good Moves": 0,
        "White Mistakes": 0,
        "White Blunders": 0,
        "Black Best Moves": 0,
        "Black Good Moves": 0,
        "Black Mistakes": 0,
        "Black Blunders": 0,
    }

    with chess.engine.SimpleEngine.popen_uci(args.engine_path) as engine:
        for game_number in range(1, args.games + 1):
            result = play_game(model, tokenizer, engine, results, game_number)

            print(f"\nGame {game_number} Result: {result}")
            print("\nCumulative Results:")
            print(f"  White Wins: {results['White Wins']['Checkmate']}")
            print(f"  Black Wins: {results['Black Wins']['Checkmate']}")
            print(f"  Draws: {sum(results['Draw'].values())}")
            print(f"  Unknown: {results['Unknown']['Unknown']}")
            print(f"EOS: {results['EOS']['EOS']}")
            print("\nDetailed Draw Breakdown:")
            for reason, count in results["Draw"].items():
                print(f"    {reason}: {count}")

    total_games = sum(
        [results["White Wins"]["Checkmate"], results["Black Wins"]["Checkmate"]] +
        list(results["Draw"].values())
    )
    white_wins = results["White Wins"]["Checkmate"]
    black_wins = results["Black Wins"]["Checkmate"]
    draws = sum(results["Draw"].values())

    print("\nFinal Game Statistics:")
    print(f"Total Games: {total_games}")
    print(f"White Wins: {white_wins}")
    print(f"Black Wins: {black_wins}")
    print(f"Draws: {draws}")
    print(f"  Unknown: {results['Unknown']['Unknown']}")
    print("\nDetailed Draw Breakdown:")
    for reason, count in results["Draw"].items():
        print(f"  {reason}: {count}")

    win_percentage = (white_wins / total_games) * 100
    draw_percentage = (draws / total_games) * 100
    loss_percentage = (black_wins / total_games) * 100

    print(f"\nWin Percentage: {win_percentage:.2f}%")
    print(f"Draw Percentage: {draw_percentage:.2f}%")
    print(f"Loss Percentage: {loss_percentage:.2f}%")

    # Total analysis after all games
    print("\nTotal Analysis After All Games:")
    print(f"Total White Best Moves: {results['White Best Moves']}")
    print(f"Total White Good Moves: {results['White Good Moves']}")
    print(f"Total White Mistakes: {results['White Mistakes']}")
    print(f"Total White Blunders: {results['White Blunders']}")
    print(f"Total Black Best Moves: {results['Black Best Moves']}")
    print(f"Total Black Good Moves: {results['Black Good Moves']}")
    print(f"Total Black Mistakes: {results['Black Mistakes']}")
    print(f"Total Black Blunders: {results['Black Blunders']}")

    White_move_accuracy=(results['White Good Moves']+results['White Best Moves'])/(results['White Good Moves']+results['White Best Moves']+results['White Mistakes']+results['White Blunders'])
    Black_move_accuracy=(results['Black Good Moves']+results['Black Best Moves'])/(results['Black Good Moves']+results['Black Best Moves']+results['Black Mistakes']+results['Black Blunders'])
    print(f"\nWhite Move Accuracy: {White_move_accuracy*100:.2f}%")
    print(f"Black Move Accuracy: {Black_move_accuracy*100:.2f}%")

if __name__ == "__main__":
    args = _parse_args()
    main(args)


  model.load_state_dict(torch.load(args.load_model, map_location=torch.device('cpu')))



Game 1 Start




[1;30;43mStreaming output truncated to the last 5000 lines.[0m
Move 1: d2d4, Turn: White, Score: 30, Quality: Best Move
Move 2: f7f5, Turn: Black, Score: 117, Quality: Blunder
Move 3: g2g3, Turn: White, Score: 31, Quality: Best Move
Move 4: e7e6, Turn: Black, Score: 31, Quality: Blunder
Move 5: f1g2, Turn: White, Score: 56, Quality: Best Move
Move 6: g8f6, Turn: Black, Score: 56, Quality: Blunder
Move 7: c2c4, Turn: White, Score: -21, Quality: Uncategorized
Move 8: b8c6, Turn: Black, Score: -11, Quality: Blunder
Move 9: g1f3, Turn: White, Score: 10, Quality: Best Move
Move 10: f8b4, Turn: Black, Score: 33, Quality: Blunder
Move 11: c1d2, Turn: White, Score: 9, Quality: Best Move
Move 12: b4d2, Turn: Black, Score: 53, Quality: Blunder
Move 13: b1d2, Turn: White, Score: 37, Quality: Best Move
Move 14: h7h5, Turn: Black, Score: 107, Quality: Blunder
Move 15: h2h4, Turn: White, Score: 58, Quality: Best Move
Move 16: b7b6, Turn: Black, Score: 60, Quality: Blunder
Move 17: e1g1, Turn: Whit

#Model description

In [12]:
n_positions = 80
dim_model = 768
d_hid = 3072
num_heads = 12
num_layers = 12
dropout_p = 0.1
tokenizer = Tokenizer("/content/vocabs/kaggle2_vocab.txt")
model = Transformer(
    tokenizer,
    num_tokens=tokenizer.vocab_size(),
    dim_model=dim_model,
    d_hid=d_hid,
    num_heads=num_heads,
    num_layers=num_layers,
    dropout_p=dropout_p,
    n_positions=n_positions,
)
model.load_state_dict(torch.load("/content/chessformer_epoch_13.pth", map_location=torch.device('cpu')))

  model.load_state_dict(torch.load("/content/chessformer_epoch_13.pth", map_location=torch.device('cpu')))


<All keys matched successfully>

In [13]:
model

Transformer(
  (positional_encoder): PositionalEncoding(
    (dropout): Dropout(p=0.1, inplace=False)
  )
  (embedding): Embedding(10049, 768, padding_idx=0)
  (transformer_encoder): TransformerEncoder(
    (layers): ModuleList(
      (0-11): 12 x TransformerEncoderLayer(
        (self_attn): MultiheadAttention(
          (out_proj): NonDynamicallyQuantizableLinear(in_features=768, out_features=768, bias=True)
        )
        (linear1): Linear(in_features=768, out_features=3072, bias=True)
        (dropout): Dropout(p=0.1, inplace=False)
        (linear2): Linear(in_features=3072, out_features=768, bias=True)
        (norm1): LayerNorm((768,), eps=1e-05, elementwise_affine=True)
        (norm2): LayerNorm((768,), eps=1e-05, elementwise_affine=True)
        (dropout1): Dropout(p=0.1, inplace=False)
        (dropout2): Dropout(p=0.1, inplace=False)
      )
    )
  )
  (out): Linear(in_features=768, out_features=10049, bias=True)
)

In [14]:
import torch

# Total and trainable parameters
total_params = sum(p.numel() for p in model.parameters())
trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)

print(f"Total parameters: {total_params}")
print(f"Trainable parameters: {trainable_params}")


Total parameters: 100499777
Trainable parameters: 100499777
