In [None]:
import chess.pgn
import chess.engine
import io
import re
import zstandard as zstd
from datasets import Dataset, DatasetDict
from sklearn.model_selection import train_test_split
from transformers import AutoTokenizer, GPT2Config, GPT2LMHeadModel, Trainer, TrainingArguments, DataCollatorForLanguageModeling
import tarfile
import torch

In [2]:
def filter_high_quality_games(games_list, min_rating=1800, max_moves=150):
    filtered_games = []
    
    for game_text in games_list:
        # Extract ratings from PGN headers (if available)
        white_elo_match = re.search(r'\[WhiteElo "(\d+)"\]', game_text)
        black_elo_match = re.search(r'\[BlackElo "(\d+)"\]', game_text)
        
        # Check if both players have minimum rating
        if white_elo_match and black_elo_match:
            white_elo = int(white_elo_match.group(1))
            black_elo = int(black_elo_match.group(1))
            
            if white_elo >= min_rating and black_elo >= min_rating:
                # Count moves to avoid ultra-long games
                move_count = len(re.findall(r'\d+\.', game_text))
                if move_count <= max_moves:
                    filtered_games.append(game_text)
    
    return filtered_games

In [3]:
def enhanced_format_pgn(text):
    # Remove metadata lines
    text = re.sub(r'\[.*?\]', '', text)
    
    # Remove game result
    text = re.sub(r'\s*(1-0|0-1|1/2-1/2)\s*', '', text)
    
    # Remove comments and analysis
    text = re.sub(r'\{.*?\}', '', text)
    text = re.sub(r'\(.*?\)', '', text)  # Remove variations
    
    # Clean up spaces
    text = ' '.join(text.split())
    
    # Enhanced tokenization patterns
    move_number_pattern = re.compile(r'(\d+\.)')
    piece_pattern = re.compile(r'([KQRBN])')
    square_pattern = re.compile(r'([a-h][1-8])')
    special_move_pattern = re.compile(r'(O-O-O|O-O|\+|#|x|=Q|=R|=B|=N)')
    
    # Apply spacing
    text = move_number_pattern.sub(r'\1 ', text)
    text = piece_pattern.sub(r'\1 ', text)
    text = special_move_pattern.sub(r' \1 ', text)
    
    # Add game start and end tokens
    text = '<GAME_START> ' + text + ' <GAME_END>'
    
    return ' '.join(text.split())

In [4]:
def read_and_filter_games(file_path, n=100000, min_rating=1800):
    """Read and filter high-quality games"""
    with open(file_path, 'rb') as f:
        dctx = zstd.ZstdDecompressor()
        decompressed = dctx.stream_reader(f)
        pgn_text = io.TextIOWrapper(decompressed, encoding='utf-8')

        all_games = []
        games_read = 0
        
        while games_read < n * 2:  # Read more to filter later
            game = chess.pgn.read_game(pgn_text)
            if game is None:
                break
            
            # Get full game text including headers
            game_str = str(game)
            all_games.append(game_str)
            games_read += 1

    # Filter for high quality
    filtered_games = filter_high_quality_games(all_games, min_rating=min_rating)
    
    # Take only the number we need
    filtered_games = filtered_games[:n]
    
    # Format the games
    formatted_games = [enhanced_format_pgn(game) for game in filtered_games]
    
    return formatted_games

In [5]:
def create_large_model_config(tokenizer, model_size="medium"):
    """Create larger model configurations"""
    
    configs = {
        "small": {
            "n_embd": 768,
            "n_layer": 12,
            "n_head": 12
        },
        "medium": {
            "n_embd": 1024,
            "n_layer": 24,
            "n_head": 16
        },
        "large": {
            "n_embd": 1280,
            "n_layer": 36,
            "n_head": 20
        }
    }
    
    config_params = configs[model_size]
    
    config = GPT2Config(
        vocab_size=tokenizer.vocab_size,
        n_positions=256,  # Increased context length
        n_embd=config_params["n_embd"],
        n_layer=config_params["n_layer"],
        n_head=config_params["n_head"],
        dropout=0.1,
        attn_dropout=0.1
    )
    
    return config


In [6]:
def create_enhanced_tokenizer(dataset, vocab_size=8000):
    """Create a chess-specific tokenizer with more vocabulary"""
    
    # Start with base tokenizer
    old_tokenizer = AutoTokenizer.from_pretrained("gpt2")
    
    # Define chess-specific special tokens
    special_tokens = {
        "eos_token": "<GAME_END>",
        "bos_token": "<GAME_START>",
        "pad_token": "[PAD]",
        "additional_special_tokens": [
            "<GAME_START>", "<GAME_END>", 
            "<CHECK>", "<CHECKMATE>", "<CASTLING>",
            "<CAPTURE>", "<PROMOTION>"
        ]
    }
    
    # Add special tokens
    old_tokenizer.add_special_tokens(special_tokens)
    
    # Train new tokenizer with larger vocabulary
    tokenizer = old_tokenizer.train_new_from_iterator(dataset, vocab_size)
    
    return tokenizer

In [None]:
def train_improved_chess_model(
    file_path, 
    num_games=200000,  # Increased dataset size
    model_size="medium",
    min_rating=1800,
    epochs=5,
    batch_size=16
):
    
    print("Reading and filtering games...")
    games = read_and_filter_games(file_path, n=num_games, min_rating=min_rating)
    print(f"Loaded {len(games)} high-quality games")
    
    # Create dataset
    dataset = Dataset.from_dict({"text": games})
    
    print("Training enhanced tokenizer...")
    tokenizer = create_enhanced_tokenizer(dataset["text"], vocab_size=8000)
    
    # Split data
    train_data, val_data = train_test_split(dataset["text"], test_size=0.15, random_state=42)
    train_dataset = Dataset.from_dict({"text": train_data})
    val_dataset = Dataset.from_dict({"text": val_data})
    
    dataset_dict = DatasetDict({
        "train": train_dataset,
        "validation": val_dataset
    })
    
    # Tokenize with increased max length
    def tokenize_function(examples):
        return tokenizer(
            examples['text'],
            truncation=True,
            padding='max_length',
            max_length=256,  # Increased from 128
            return_special_tokens_mask=True
        )
    
    print("Tokenizing datasets...")
    tokenized_datasets = dataset_dict.map(
        tokenize_function,
        batched=True,
        remove_columns=["text"]
    )
    
    # Create larger model
    print(f"Creating {model_size} model...")
    config = create_large_model_config(tokenizer, model_size=model_size)
    model = GPT2LMHeadModel(config)
    
    print(f"Model has {model.num_parameters():,} parameters")
    
    # Enhanced training arguments
    training_args = TrainingArguments(
        output_dir=f"./improved-chess-{model_size}",
        eval_steps=1000,
        save_strategy="steps",
        save_steps=2000,
        logging_dir="./logs",
        per_device_train_batch_size=batch_size,
        per_device_eval_batch_size=batch_size,
        gradient_accumulation_steps=2,  # Effective batch size = batch_size * 2
        num_train_epochs=epochs,
        weight_decay=0.01,
        learning_rate=5e-5,
        lr_scheduler_type="cosine",
        warmup_steps=1000,
        logging_steps=100,
        save_total_limit=3,
        load_best_model_at_end=True,
        metric_for_best_model="eval_loss",
        dataloader_num_workers=4,
        fp16=True,  # Mixed precision training
        gradient_checkpointing=True,  # Save memory
    )
    
    # Data collator
    data_collator = DataCollatorForLanguageModeling(
        tokenizer=tokenizer,
        mlm=False
    )
    
    # Create trainer
    trainer = Trainer(
        model=model,
        args=training_args,
        train_dataset=tokenized_datasets["train"],
        eval_dataset=tokenized_datasets["validation"],
        tokenizer=tokenizer,
        data_collator=data_collator
    )
    
    print("Starting training...")
    trainer.train()
    
    # Save final model
    trainer.save_model()
    tokenizer.save_pretrained(f"./improved-chess-{model_size}")
    
    return model, tokenizer

In [None]:
class ImprovedChessSimulator:
    def __init__(self, model, tokenizer, stockfish_path, skill_level=10):
        self.model = model
        self.tokenizer = tokenizer
        self.stockfish_path = stockfish_path
        self.skill_level = skill_level
        
    def evaluate_model(self, num_games=10):
        wins = 0
        
        for game_num in range(num_games):
            print(f"Playing game {game_num + 1}/{num_games}")
            
            try:
                result = self.play_single_game()
                if result == 1:  # Model wins or draws
                    wins += 1
                    print(f"Game {game_num + 1}: Model won/drew")
                else:
                    print(f"Game {game_num + 1}: Model lost")
                    
            except Exception as e:
                print(f"Game {game_num + 1}: Error - {e}")
                
        win_rate = wins / num_games
        print(f"\nModel performance: {wins}/{num_games} wins/draws ({win_rate:.1%})")
        return win_rate
    
    def play_single_game(self):
        board = chess.Board()
        engine = chess.engine.SimpleEngine.popen_uci(self.stockfish_path)
        
        # Set Stockfish skill level (1-20, where 20 is strongest)
        engine.configure({"Skill Level": self.skill_level})
        
        prompt = "<GAME_START> "
        move_number = 1
        model_is_white = True
        
        try:
            while not board.is_game_over() and len(board.move_stack) < 200:
                if model_is_white:
                    # Model's turn (white)
                    move = self.generate_model_move(prompt, board, move_number)
                    if move is None:
                        return 0  # Model failed to make valid move
                    
                    board.push(move)
                    san_move = board.san(move)
                    prompt += f"{move_number}. {san_move} "
                else:
                    # Stockfish's turn (black)
                    result = engine.play(board, chess.engine.Limit(time=1.0))
                    board.push(result.move)
                    san_move = board.san(result.move)
                    prompt += f"{move_number}. .. {san_move} "
                    move_number += 1
                
                model_is_white = not model_is_white
                
        finally:
            engine.quit()
        
        # Determine result
        if board.is_checkmate():
            return 1 if board.turn == chess.BLACK else 0  # Model wins if black is checkmated
        else:
            return 0.5  # Draw or other result
    
    def generate_model_move(self, prompt, board, move_number, max_attempts=5):
        for attempt in range(max_attempts):
            try:
                # Add move number to prompt
                current_prompt = prompt + f"{move_number}. "
                
                # Tokenize and generate
                inputs = self.tokenizer(current_prompt, return_tensors="pt")
                
                with torch.no_grad():
                    outputs = self.model.generate(
                        **inputs,
                        max_new_tokens=10,
                        temperature=0.8,
                        do_sample=True,
                        top_p=0.9,
                        pad_token_id=self.tokenizer.pad_token_id
                    )
                
                # Decode response
                generated_text = self.tokenizer.decode(outputs[0], skip_special_tokens=True)
                
                # Extract move
                move_text = generated_text[len(current_prompt):].split()[0]
                
                # Try to parse move
                move = board.parse_san(move_text.strip())
                
                if move in board.legal_moves:
                    return move
                    
            except Exception as e:
                continue
        
        # If all attempts fail, return a random legal move
        import random
        legal_moves = list(board.legal_moves)
        return random.choice(legal_moves) if legal_moves else None

In [None]:
def download_chess_data():
    import urllib.request
    import os
    
    print("Downloading chess data from Lichess...")
    
    # Download a month of lichess games (smaller file for testing)
    url = "https://database.lichess.org/standard/lichess_db_standard_rated_2024-01.pgn.zst"
    filename = "lichess_games.pgn.zst"
    
    if not os.path.exists(filename):
        print(f"Downloading {filename}...")
        urllib.request.urlretrieve(url, filename)
        print("Download complete!")
    else:
        print(f"{filename} already exists")
    
    return filename

def setup_stockfish():
    import platform
    import urllib.request
    import zipfile
    import os
    
    system = platform.system().lower()
    print(f"Setting up Stockfish for {system}...")
    

    url = "https://github.com/official-stockfish/Stockfish/releases/download/sf_17.1/stockfish-ubuntu-x86-64-avx2.tar"
    filename = "stockfish-linux.tar"
    exe_name = "stockfish"
    is_tar = True
    
    if not os.path.exists("stockfish"):
        os.makedirs("stockfish")
    
    stockfish_path = os.path.join("stockfish", exe_name)
    
    if not os.path.exists(stockfish_path):
        print(f"Downloading Stockfish...")
        urllib.request.urlretrieve(url, filename)
        
        print("Extracting Stockfish...")
        if is_tar:
            # Handle tar files
            with tarfile.open(filename, 'r') as tar_ref:
                tar_ref.extractall("temp_stockfish")
        else:
            # Handle zip files (Windows)
            with zipfile.ZipFile(filename, 'r') as zip_ref:
                zip_ref.extractall("temp_stockfish")
        
        # Find and move the executable
        found = False
        for root, dirs, files in os.walk("temp_stockfish"):
            for file in files:
                if file == exe_name or file.startswith("stockfish"):
                    import shutil
                    src_path = os.path.join(root, file)
                    shutil.move(src_path, stockfish_path)
                    if system != "windows":
                        os.chmod(stockfish_path, 0o755)  # Make executable
                    found = True
                    break
            if found:
                break
        
        # Cleanup
        import shutil
        shutil.rmtree("temp_stockfish")
        os.remove(filename)
        print("Stockfish setup complete!")
    else:
        print("Stockfish already exists")
    
    return stockfish_path


## Demo

In [None]:
def create_demo_chess_data():
    demo_games = [
        "1. e4 e5 2. Nf3 Nc6 3. Bb5 a6 4. Ba4 Nf6 5. O-O Be7 6. Re1 b5 7. Bb3 d6 8. c3 O-O 9. h3 Nb8 10. d4 Nbd7",
        "1. d4 d5 2. c4 c6 3. Nf3 Nf6 4. Nc3 dxc4 5. a4 Bf5 6. e3 e6 7. Bxc4 Bb4 8. O-O Nbd7 9. Qe2 Bg6",
        "1. e4 c5 2. Nf3 d6 3. d4 cxd4 4. Nxd4 Nf6 5. Nc3 a6 6. Be3 e6 7. f3 b5 8. Qd2 Bb7 9. O-O-O Nbd7",
        "1. Nf3 Nf6 2. g3 g6 3. Bg2 Bg7 4. O-O O-O 5. d3 d6 6. e4 e5 7. Nc3 c6 8. a4 Re8 9. h3 Nbd7",
        "1. c4 e5 2. Nc3 Nf6 3. Nf3 Nc6 4. g3 d5 5. cxd5 Nxd5 6. Bg2 Be7 7. O-O O-O 8. d3 Be6 9. Ng5 Bxg5"
    ] * 1000  # Repeat to make dataset larger
    
    # Format games properly
    formatted_games = []
    for game in demo_games:
        formatted_game = enhanced_format_pgn(game)
        formatted_games.append(formatted_game)
    
    return formatted_games

def train_demo_model():
    print("Creating demo chess dataset...")
    games = create_demo_chess_data()
    
    # Create dataset
    dataset = Dataset.from_dict({"text": games})
    
    print("Training tokenizer...")
    tokenizer = create_enhanced_tokenizer(dataset["text"], vocab_size=2000)
    
    # Split data
    train_data, val_data = train_test_split(dataset["text"], test_size=0.2, random_state=42)
    train_dataset = Dataset.from_dict({"text": train_data})
    val_dataset = Dataset.from_dict({"text": val_data})
    
    dataset_dict = DatasetDict({
        "train": train_dataset,
        "validation": val_dataset
    })
    
    # Tokenize
    def tokenize_function(examples):
        return tokenizer(
            examples['text'],
            truncation=True,
            padding='max_length',
            max_length=128,
            return_special_tokens_mask=True
        )
    
    tokenized_datasets = dataset_dict.map(
        tokenize_function,
        batched=True,
        remove_columns=["text"]
    )
    
    # Create small model for demo
    config = create_large_model_config(tokenizer, model_size="small")
    model = GPT2LMHeadModel(config)
    
    print(f"Demo model has {model.num_parameters():,} parameters")
    
    # Quick training
    training_args = TrainingArguments(
        output_dir="./demo-chess-model",
        eval_steps=100,
        save_strategy="steps",
        save_steps=200,
        per_device_train_batch_size=4,
        per_device_eval_batch_size=4,
        num_train_epochs=2,
        weight_decay=0.01,
        learning_rate=5e-5,
        warmup_steps=100,
        logging_steps=50,
        save_total_limit=2,
    )
    
    data_collator = DataCollatorForLanguageModeling(
        tokenizer=tokenizer,
        mlm=False
    )
    
    trainer = Trainer(
        model=model,
        args=training_args,
        train_dataset=tokenized_datasets["train"],
        eval_dataset=tokenized_datasets["validation"],
        tokenizer=tokenizer,
        data_collator=data_collator
    )
    
    print("Training demo model...")
    trainer.train()
    
    return model, tokenizer

def demo_model_generation(model, tokenizer):
    print("\nTesting model move generation:")
    
    test_positions = [
        "<GAME_START> 1. e4 e5 2. Nf3 Nc6 3. Bb5",
        "<GAME_START> 1. d4 d5 2. c4 c6 3. Nf3",
        "<GAME_START> 1. e4 c5 2. Nf3 d6 3. d4"
    ]
    
    for prompt in test_positions:
        print(f"\nPosition: {prompt}")
        
        inputs = tokenizer(prompt, return_tensors="pt")
        
        with torch.no_grad():
            outputs = model.generate(
                **inputs,
                max_new_tokens=20,
                temperature=0.8,
                do_sample=True,
                top_p=0.9,
                pad_token_id=tokenizer.pad_token_id
            )
        
        generated_text = tokenizer.decode(outputs[0], skip_special_tokens=True)
        continuation = generated_text[len(prompt):].strip()
        print(f"Model continues: {continuation}")

In [11]:
def main():
    # import argparse
    
    # parser = argparse.ArgumentParser(description="Train improved chess model")
    # parser.add_argument("--mode", choices=["demo", "full"], default="demo",
    #                    help="Run demo with synthetic data or full training with real data")
    # parser.add_argument("--download", action="store_true",
    #                    help="Download chess data and Stockfish automatically")
    
    # args = parser.parse_args()
    
    # if args.mode == "demo":
    print("Running DEMO mode with synthetic chess data...")
    print("="*50)
    
    # Train demo model
    model, tokenizer = train_demo_model()
    
    # Demo generation
    demo_model_generation(model, tokenizer)
    
    print("\nDemo complete! To run with real data:")
    print("python script.py --mode full --download")
    
    # else:  # full mode
    #     if args.download:
    #         # Download data and Stockfish
    #         pgn_file = download_chess_data()
    #         stockfish_path = setup_stockfish()
    #     else:
    #         # Use existing files
    #         pgn_file = input("Enter path to PGN file: ")
    #         stockfish_path = input("Enter path to Stockfish executable: ")
        
    #     print("Running FULL training mode...")
    #     print("="*50)
        
    #     # Train models of different sizes
    #     model_sizes = ["small", "medium"]
    #     results = {}
        
    #     for size in model_sizes:
    #         print(f"\nTraining {size} model...")
            
    #         # Train model
    #         model, tokenizer = train_improved_chess_model(
    #             file_path=pgn_file,
    #             num_games=50000 if size == "small" else 100000,
    #             model_size=size,
    #             min_rating=1800,
    #             epochs=3,
    #             batch_size=8
    #         )
            
    #         # Evaluate model
    #         print(f"Evaluating {size} model...")
    #         simulator = ImprovedChessSimulator(model, tokenizer, stockfish_path, skill_level=5)
    #         win_rate = simulator.evaluate_model(num_games=10)
    #         results[size] = win_rate
            
    #         # Clean up memory
    #         del model, tokenizer
    #         if torch.cuda.is_available():
    #             torch.cuda.empty_cache()
        
    #     # Print final results
    #     print(f"\n{'='*50}")
    #     print("FINAL RESULTS")
    #     print(f"{'='*50}")
    #     for size, win_rate in results.items():
    #         print(f"{size.capitalize()} model: {win_rate:.1%} win rate")

In [12]:
if __name__ == "__main__":
    main()

Running DEMO mode with synthetic chess data...
Creating demo chess dataset...
Training tokenizer...


Map:   0%|          | 0/4000 [00:00<?, ? examples/s]

Map:   0%|          | 0/1000 [00:00<?, ? examples/s]

Demo model has 85,483,008 parameters
Training demo model...


  trainer = Trainer(
`loss_type=None` was set in the config but it is unrecognised.Using the default loss: `ForCausalLMLoss`.


Step,Training Loss
50,2.4362
100,0.0995
150,0.0527
200,0.0341
250,0.0288
300,0.0309
350,0.0273
400,0.0254
450,0.0258
500,0.0251



Testing model move generation:

Position: <GAME_START> 1. e4 e5 2. Nf3 Nc6 3. Bb5
Model continues: N f6 5. O-O B e7 6.

Position: <GAME_START> 1. d4 d5 2. c4 c6 3. Nf3
Model continues: 3 d x c4 5. a4 B f5 6

Position: <GAME_START> 1. e4 c5 2. Nf3 d6 3. d4
Model continues: x d4 N f6 5. N c3 a6

Demo complete! To run with real data:
python script.py --mode full --download
