In [None]:
!pip install -q transformers datasets accelerate bitsandbytes

In [None]:
!pip install -q -U bitsandbytes  # Upgrade bitsandbytes to the latest version

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
from datasets import load_dataset
from transformers import (
    AutoModelForCausalLM,
    AutoTokenizer,
    TrainingArguments,
    Trainer,
    DataCollatorForLanguageModeling,
    BitsAndBytesConfig,
    TrainerCallback
)
import torch
from datetime import datetime
import os

In [None]:
# Custom callback to log additional information.
class CustomLoggingCallback(TrainerCallback):
    def on_epoch_end(self, args, state, control, **kwargs):
        # Print out a message at the end of each epoch.
        # state.log_history is a list of logged metrics.
        last_log = state.log_history[-1] if state.log_history else {}
        print(f"\n>> Epoch {state.epoch:.2f} completed. Logged Metrics: {last_log}")
        return control

In [None]:
def prepare_pgn_dataset(tokenizer):
    """
    Loads a chess PGN dataset and tokenizes it.

    Expected:
      - The dataset contains examples with a key "pgn" for raw PGN moves.
      - Replace "your_chess_pgn_dataset" with your actual dataset identifier.

    Returns the tokenized dataset.
    """
    print("Loading chess PGN dataset...")
    dataset = load_dataset("parquet", data_files="/content/drive/MyDrive/Colab_Data/data/chess_pgn_dataset_10k.parquet")  # path to drive

    def format_pgn(example):
        # Remap the PGN string to the "text" field for tokenization.
        return {"text": example["pgn"]}

    # Format the dataset with our PGN string.
    dataset = dataset.map(format_pgn)

    # For quick experiments, select a small subset.
    if "train" in dataset:
        print("train in dataset")
        small_dataset = dataset["train"].select(range(4000))
    else:
        print("no train in dataset")
        small_dataset = dataset.select(range(5000))

    # Print the first 5 examples
    for i in range(5):
        print(small_dataset[i])

    # Ensure the tokenizer has a pad token. Option 1: Set pad token to eos_token
    if tokenizer.pad_token is None:
        tokenizer.pad_token = tokenizer.eos_token

    # Alternatively, to add a new pad token:
    # tokenizer.add_special_tokens({'pad_token': '[PAD]'})

    print("Tokenizing the PGN dataset...")
    def tokenize_function(examples):
        return tokenizer(
            examples["text"],
            truncation=True,
            max_length=512,  # We keep the start of the PGN; anything beyond 512 tokens is truncated.
            padding="max_length"
        )

    tokenized_dataset = small_dataset.map(
        tokenize_function,
        batched=True,
        remove_columns=small_dataset.column_names
    )

    print(f"Prepared tokenized dataset with {len(tokenized_dataset)} examples.")
    return tokenized_dataset



In [None]:
def prepare_pgn_finetuning():
    """
    Sets up and runs PGN-based domain-adaptive fine-tuning.

    Key Points:
      - Uses raw PGN strings for next-token prediction (i.e. domain adaptation).
      - Uses a lower learning rate (1e-5) for gentle, full-model updates.
      - Designed to update the model's entire latent space (hence no PEFT/LoRA).

    The final model and checkpoints will be saved to your Google Drive.
    """
    # Create a structured output directory in Google Drive.
    base_dir = "/content/drive/MyDrive/Colab_Data/fine_tuned_models/chess_pgn_finetuning"
    os.makedirs(base_dir, exist_ok=True)
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    output_dir = os.path.join(base_dir, f"run_{timestamp}")
    print(f"Output directory: {output_dir}")
    

    print("Loading instruct model and tokenizer for chess PGN fine-tuning...")
    # Use the instruct model that has shown better PGN prediction performance.
    model_id = "meta-llama/Llama-3.2-1B-Instruct"
    tokenizer = AutoTokenizer.from_pretrained(model_id, trust_remote_code=True)

    # Define quantization configuration
    # quant_config = BitsAndBytesConfig(
    #    load_in_4bit=True,
    #    bnb_4bit_quant_type="nf4",  # or "fp4" depending on your preference
    #    bnb_4bit_use_double_quant=True,
    #    bnb_4bit_compute_dtype=torch.float16
    # )


    model = AutoModelForCausalLM.from_pretrained(
        model_id,
        trust_remote_code=True,
        device_map="auto",   # Automatically allocate layers to available GPU(s) or CPU.
        # quantization_config=quant_config,
        # torch_dtype=torch.float16  # Mixed-precision training. # autocast auto handles these conversions
    )

    # Prepare the tokenized PGN dataset.
    tokenized_dataset = prepare_pgn_dataset(tokenizer)


    # Split the tokenized dataset into train (90%) and eval (10%) subsets
    split_dataset = tokenized_dataset.train_test_split(test_size=0.1)
    train_dataset = split_dataset["train"]
    eval_dataset = split_dataset["test"]



    print("Configuring training arguments...")
    training_args = TrainingArguments(
        output_dir=output_dir,            # Directory to store model checkpoints.
        num_train_epochs=5,               # Number of training epochs.
        per_device_train_batch_size=4,    # Batch size per device.
        gradient_accumulation_steps=4,    # To simulate a larger batch size.
        save_steps=500,                   # Save checkpoints every 500 steps.
        save_total_limit=2,               # Only save the last 2 checkpoints.
        logging_steps=10,                 # Log training metrics every 10 steps.
        learning_rate=1e-5,               # Lower learning rate for gentle fine-tuning.
        fp16=True,                        # Use 16-bit precision.
        warmup_steps=50,                  # Warmup steps.
        report_to="none",                 # Disable external logging.
        evaluation_strategy="steps",
        eval_steps=100,                   # Evaluate every 100 steps.
        load_best_model_at_end=True,
    )

    trainer = Trainer(
        model=model,
        args=training_args,
        train_dataset=train_dataset,
        eval_dataset=eval_dataset,
        data_collator=DataCollatorForLanguageModeling(
            tokenizer=tokenizer,
            mlm=False  # Causal language modeling (next-token prediction).
        ),
        callbacks=[CustomLoggingCallback()]
    )

    print("Starting PGN fine-tuning...")
    trainer.train()

    final_output_dir = f"{output_dir}_final"
    print(f"Saving final model to {final_output_dir}...")
    trainer.save_model(final_output_dir)
    print("PGN fine-tuning completed!")


In [None]:
# Execute the fine-tuning process.
prepare_pgn_finetuning()