In [6]:
from datasets import Dataset
from tokenizers import Tokenizer, models, trainers, pre_tokenizers
from transformers import AutoTokenizer, GPT2Config, GPT2LMHeadModel, Trainer, TrainingArguments, DataCollatorForLanguageModeling
import zstandard as zstd
import chess.pgn
import io
import re
from torch.utils.data import DataLoader
import math
from datasets import Dataset, DatasetDict
from sklearn.model_selection import train_test_split
import stockfish
import chess
import chess.engine

# READ DATA

note to reader: this is where you can determine data size by choosing N when running read_first_n_games_as_strings

In [2]:
# Ensure special moves and pieces are followed by a space
def format_pgn(text):

    # Remove metadata lines (anything within square brackets)
    text = re.sub(r'\[.*?\]', '', text)

    # Remove excessive spaces and newlines
    text = ' '.join(text.split())

    text = re.sub(r'\{.*?\}', '', text)  # Remove `{ ... }`
    text = ' '.join(text.split())  # Normalize spaces 
    # Define patterns for different components
    move_number_pattern = re.compile(r'(\d+\.)')  # Move numbers (e.g., "1.")
    piece_pattern = re.compile(r'([KQRBN])')  # Chess pieces (e.g., "N", "K")
    square_pattern = re.compile(r'([a-h][1-8])')  # Board squares (e4, d5, etc.)
    special_move_pattern = re.compile(r'(O-O|O-O-O|\+|#|x|=Q|=R|=B|=N)')  # Castling, check, capture, promotions
    
    # Ensure move numbers, pieces, and special moves are space-separated
    text = move_number_pattern.sub(r'\1 ', text)  # Move number spacing
    text = piece_pattern.sub(r'\1 ', text)  # Piece spacing
    text = special_move_pattern.sub(r' \1 ', text)  # Special moves spacing
    
    return ' '.join(text.split()) 


def read_first_n_games_as_strings(file_path, n=100):
    with open(file_path, 'rb') as f:
        dctx = zstd.ZstdDecompressor()
        decompressed = dctx.stream_reader(f)
        pgn_text = io.TextIOWrapper(decompressed, encoding='utf-8')

        games = []
        for _ in range(n):
            game = chess.pgn.read_game(pgn_text)
            if game is None:
                break  # Stop if no more games are available
            
            # Convert game to string
            game_str = io.StringIO()
            game.accept(chess.pgn.StringExporter())  # Corrected line
            games.append(format_pgn(str(game)))

    return games


# PREP DATA FOR TOKENIZER

In [None]:
# Replace with your file path
file_path = "/Users/nourfahmy/Documents/GitHub/llm-playbooks/lichess_db_standard_rated_2025-02.pgn.zst"
# Replace with number of games
number_of_games = 1000
games_as_strings = read_first_n_games_as_strings(file_path, n=number_of_games)

# Print the first game's PGN as a string
print(games_as_strings[0])

# Convert to Hugging Face Dataset format
dataset = Dataset.from_dict({"text": games_as_strings})

train_data, val_data = train_test_split(dataset["text"], test_size=0.2, random_state=42)

train_dataset = Dataset.from_dict({"text": train_data})
val_dataset = Dataset.from_dict({"text": val_data})

dataset = DatasetDict({
    "train": train_dataset,
    "validation": val_dataset
})


1. d4 1. .. N f6 2. f3 2. .. d5 3. c4 3. .. c6 4. e3 4. .. g6 5. N c3 5. .. B g7 6. B d3 6. .. O-O 7. N ge2 7. .. N a6 8. a3 8. .. N c7 9. b4 9. .. R e8 10. O-O 10. .. B d7 11. c5 11. .. e5 12. h3 12. .. e x d4 13. N x d4 13. .. N e6 14. R b1 14. .. N x d4 15. e x d4 15. .. N h5 16. N e2 16. .. Q f6 17. B c2 17. .. B f5 18. B x f5 18. .. Q x f5 19. B b2 19. .. N f4 20. N x f4 20. .. Q x f4 21. Q c2 21. .. R e3 22. B c1 22. .. B x d4 23. B x e3 23. .. B x e3 + 24. K h1 24. .. h5 25. Q e2 25. .. d4 26. R be1 26. .. R e8 27. Q d3 27. .. R e6 28. R e2 28. .. Q g3 29. R x e3 29. .. d x e3 30. Q d8 + 30. .. K g7 31. Q d3 31. .. e2 32. R g1 32. .. e1 =Q 33. R x e1 33. .. R x e1 + 0-1


# TRAIN TOKENIZER

note to reader: if experimenting with model size, adjust tokenizer name accordingly below

In [5]:
# note to reader: if experimenting with model size, adjust tokenizer name accordingly below
tokenizer_name = "gpt2"
old_tokenizer = AutoTokenizer.from_pretrained(tokenizer_name)

tokenizer = old_tokenizer.train_new_from_iterator(dataset['train'], 120)






# TOKENIZE DATA

In [None]:
# Tokenize Function
def tokenize_function(examples):
    return tokenizer(
        examples['text'],
        truncation=True,
        padding='max_length',
        max_length=128,
        return_special_tokens_mask=True
    )

tokenizer.add_special_tokens({'pad_token': '[PAD]'})

# Tokenize Dataset
tokenized_datasets = dataset.map(
    tokenize_function,
    batched=True,
    remove_columns=["text"]
)

train_subset = tokenized_datasets["train"]
eval_subset = tokenized_datasets["validation"]

# SET UP AND CONFIGURE MODEL

In [None]:
# Create GPT model configuration
# note to reader: you can modify configuration to see how it affects chess performance
config = GPT2Config(
    vocab_size=tokenizer.vocab_size,  # Match tokenizer's vocab size
    n_positions=128,
    n_embd=768,
    n_layer=12,
    n_head=12
)

# Initialize model
model = GPT2LMHeadModel(config)


data_collator = DataCollatorForLanguageModeling(
    tokenizer=tokenizer,
    mlm=False 
)

training_args = TrainingArguments(
    output_dir="./gpt-chess",
    evaluation_strategy="epoch",
    save_strategy="epoch",
    logging_dir="./logs",
    per_device_train_batch_size=8,
    per_device_eval_batch_size=8,
    num_train_epochs=3,
    weight_decay=0.01,
    save_total_limit=2,
    warmup_steps=500,
    logging_steps=10
)

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_subset,
    eval_dataset=eval_subset,
    tokenizer=tokenizer,
    data_collator=data_collator
)

trainer.train()



huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)
Map: 100%|██████████| 800/800 [00:00<00:00, 5239.28 examples/s]
Map: 100%|██████████| 200/200 [00:00<00:00, 5181.06 examples/s]
  trainer = Trainer(
  3%|▎         | 10/300 [00:08<01:51,  2.60it/s]

{'loss': 5.2487, 'grad_norm': 69.58563995361328, 'learning_rate': 1.0000000000000002e-06, 'epoch': 0.1}


  7%|▋         | 20/300 [00:11<01:28,  3.15it/s]

{'loss': 3.5979, 'grad_norm': 18.92144203186035, 'learning_rate': 2.0000000000000003e-06, 'epoch': 0.2}


 10%|█         | 30/300 [00:14<01:25,  3.17it/s]

{'loss': 2.7202, 'grad_norm': 10.164074897766113, 'learning_rate': 3e-06, 'epoch': 0.3}


 13%|█▎        | 40/300 [00:17<01:21,  3.19it/s]

{'loss': 2.2046, 'grad_norm': 6.215158939361572, 'learning_rate': 4.000000000000001e-06, 'epoch': 0.4}


 17%|█▋        | 50/300 [00:20<01:19,  3.16it/s]

{'loss': 1.8468, 'grad_norm': 10.298335075378418, 'learning_rate': 5e-06, 'epoch': 0.5}


 20%|██        | 60/300 [00:24<01:16,  3.13it/s]

{'loss': 1.582, 'grad_norm': 6.750159740447998, 'learning_rate': 6e-06, 'epoch': 0.6}


 23%|██▎       | 70/300 [00:27<01:13,  3.14it/s]

{'loss': 1.48, 'grad_norm': 19.53133201599121, 'learning_rate': 7.000000000000001e-06, 'epoch': 0.7}


 27%|██▋       | 80/300 [00:30<01:12,  3.04it/s]

{'loss': 1.3502, 'grad_norm': 8.607635498046875, 'learning_rate': 8.000000000000001e-06, 'epoch': 0.8}


 30%|███       | 90/300 [00:33<01:06,  3.15it/s]

{'loss': 1.2761, 'grad_norm': 9.880142211914062, 'learning_rate': 9e-06, 'epoch': 0.9}


 33%|███▎      | 100/300 [00:36<01:03,  3.14it/s]

{'loss': 1.2495, 'grad_norm': 14.392038345336914, 'learning_rate': 1e-05, 'epoch': 1.0}


                                                 
 33%|███▎      | 100/300 [00:39<01:03,  3.14it/s]

{'eval_loss': 1.1386611461639404, 'eval_runtime': 2.315, 'eval_samples_per_second': 86.394, 'eval_steps_per_second': 10.799, 'epoch': 1.0}


 37%|███▋      | 110/300 [00:43<01:09,  2.75it/s]

{'loss': 1.1964, 'grad_norm': 9.96692180633545, 'learning_rate': 1.1000000000000001e-05, 'epoch': 1.1}


 40%|████      | 120/300 [00:46<00:57,  3.16it/s]

{'loss': 1.0956, 'grad_norm': 7.692927837371826, 'learning_rate': 1.2e-05, 'epoch': 1.2}


 43%|████▎     | 130/300 [00:50<00:53,  3.18it/s]

{'loss': 1.0917, 'grad_norm': 13.373727798461914, 'learning_rate': 1.3000000000000001e-05, 'epoch': 1.3}


 47%|████▋     | 140/300 [00:53<00:50,  3.18it/s]

{'loss': 1.0676, 'grad_norm': 6.789460182189941, 'learning_rate': 1.4000000000000001e-05, 'epoch': 1.4}


 50%|█████     | 150/300 [00:56<00:47,  3.16it/s]

{'loss': 1.0619, 'grad_norm': 6.120540142059326, 'learning_rate': 1.5e-05, 'epoch': 1.5}


 53%|█████▎    | 160/300 [00:59<00:44,  3.15it/s]

{'loss': 1.0418, 'grad_norm': 9.279339790344238, 'learning_rate': 1.6000000000000003e-05, 'epoch': 1.6}


 57%|█████▋    | 170/300 [01:02<00:41,  3.16it/s]

{'loss': 1.055, 'grad_norm': 8.034231185913086, 'learning_rate': 1.7000000000000003e-05, 'epoch': 1.7}


 60%|██████    | 180/300 [01:05<00:37,  3.16it/s]

{'loss': 1.0599, 'grad_norm': 6.392711639404297, 'learning_rate': 1.8e-05, 'epoch': 1.8}


 63%|██████▎   | 190/300 [01:09<00:34,  3.17it/s]

{'loss': 0.9955, 'grad_norm': 5.997188091278076, 'learning_rate': 1.9e-05, 'epoch': 1.9}


 67%|██████▋   | 200/300 [01:12<00:31,  3.18it/s]

{'loss': 1.0119, 'grad_norm': 8.70809268951416, 'learning_rate': 2e-05, 'epoch': 2.0}


                                                 
 67%|██████▋   | 200/300 [01:14<00:31,  3.18it/s]

{'eval_loss': 0.9783404469490051, 'eval_runtime': 2.2752, 'eval_samples_per_second': 87.903, 'eval_steps_per_second': 10.988, 'epoch': 2.0}


 70%|███████   | 210/300 [01:18<00:31,  2.83it/s]

{'loss': 0.9443, 'grad_norm': 7.077550888061523, 'learning_rate': 2.1e-05, 'epoch': 2.1}


 73%|███████▎  | 220/300 [01:21<00:25,  3.17it/s]

{'loss': 0.9727, 'grad_norm': 4.930843353271484, 'learning_rate': 2.2000000000000003e-05, 'epoch': 2.2}


 77%|███████▋  | 230/300 [01:24<00:22,  3.12it/s]

{'loss': 0.9475, 'grad_norm': 4.891750812530518, 'learning_rate': 2.3000000000000003e-05, 'epoch': 2.3}


 80%|████████  | 240/300 [01:27<00:19,  3.14it/s]

{'loss': 0.9692, 'grad_norm': 5.357297897338867, 'learning_rate': 2.4e-05, 'epoch': 2.4}


 83%|████████▎ | 250/300 [01:31<00:15,  3.18it/s]

{'loss': 0.9084, 'grad_norm': 7.190113544464111, 'learning_rate': 2.5e-05, 'epoch': 2.5}


 87%|████████▋ | 260/300 [01:34<00:12,  3.17it/s]

{'loss': 0.9326, 'grad_norm': 8.515923500061035, 'learning_rate': 2.6000000000000002e-05, 'epoch': 2.6}


 90%|█████████ | 270/300 [01:37<00:09,  3.08it/s]

{'loss': 0.9426, 'grad_norm': 4.5157551765441895, 'learning_rate': 2.7000000000000002e-05, 'epoch': 2.7}


 93%|█████████▎| 280/300 [01:40<00:06,  3.13it/s]

{'loss': 0.9187, 'grad_norm': 6.070256233215332, 'learning_rate': 2.8000000000000003e-05, 'epoch': 2.8}


 97%|█████████▋| 290/300 [01:44<00:03,  3.11it/s]

{'loss': 0.8628, 'grad_norm': 5.270162105560303, 'learning_rate': 2.9e-05, 'epoch': 2.9}


100%|██████████| 300/300 [01:47<00:00,  3.11it/s]

{'loss': 0.8922, 'grad_norm': 3.7519924640655518, 'learning_rate': 3e-05, 'epoch': 3.0}


                                                 
100%|██████████| 300/300 [01:50<00:00,  3.11it/s]

{'eval_loss': 0.8538914322853088, 'eval_runtime': 2.293, 'eval_samples_per_second': 87.223, 'eval_steps_per_second': 10.903, 'epoch': 3.0}


100%|██████████| 300/300 [01:51<00:00,  2.70it/s]

{'train_runtime': 111.2398, 'train_samples_per_second': 21.575, 'train_steps_per_second': 2.697, 'train_loss': 1.4174757766723634, 'epoch': 3.0}





TrainOutput(global_step=300, training_loss=1.4174757766723634, metrics={'train_runtime': 111.2398, 'train_samples_per_second': 21.575, 'train_steps_per_second': 2.697, 'total_flos': 156775219200000.0, 'train_loss': 1.4174757766723634, 'epoch': 3.0})

# EVALUATE MODEL

In [None]:
class ChessSimulator(model, tokenizer, stockfish_filepath):

    def __init__(self):
        self.model = model
        self.tokenizer = tokenizer
        self.board = chess.Board()
        self.engine = chess.engine.SimpleEngine.popen_uci(stockfish_filepath)
        self.prompt = '1. '
        self.move_number = 1
        self.modelTurn = True
        self.currentMove = None
    
    def simulateGame(self):
        while not self.board.is_checkmate():
            try:
                if self.modelTurn:
                    self.generateModelMove()
                else:
                    self.generateChessMove()
                self.modelTurn = not self.modelTurn
            except:
                return 0
        self.engine.quit()

        if self.board.is_checkmate():
            if self.board.turn == chess.WHITE:
                return 0
            else: # counts draws as a win
                return 1

    def parseModelMove(self, move):
        # Extract move using regex
        match = re.search(r"\d+\.\s*([\w\d=+#-]+)", move)
        if match:
            move = match.group(1)  # Capture only the move
            print("Extracted Move:", move)
        self.board.push_san(move)

    
    def parseChessMove(self, move):
        # meant to modify prompt
        self.prompt += f' {self.move_number}... '
        piece_moved = self.board.piece_at(move.from_square)
        piece_name = piece_moved.symbol() # K Q B R N
        if piece_name in ['K','Q','B','R','N']:
            self.prompt += piece_name + ' '
        self.prompt += chess.square_name(move.to_square)


    def generateModelMove(self):
        inputs = tokenizer(self.prompt, return_tensors="pt")
        outputs = model.generate(**inputs, max_length=6, num_return_sequences=1)
        move = tokenizer.decode(outputs[0], skip_special_tokens=True)
        self.prompt = move
        self.currentMove = self.parseModelMove(move)


    def generateChessMove(self):
        result = self.engine.play(self.board, chess.engine.Limit(time=2.0))  # Time limit for the move
        self.board.push(result.move)
        self.parseChessMove(result.move)
