# Set configs


In [10]:
#!/usr/bin/env python3
"""
Load and test a local Hugging Face model in GPT-2 format.
Supports basic text generation and BLIMP linguistic testing.
"""

import typing as t
from pathlib import Path
import argparse
import json

import torch
from transformers import (
    AutoModelForCausalLM,
    AutoTokenizer,
    pipeline,
    set_seed,
)


def load_model(
    model_path: str,
    use_cuda: bool = torch.cuda.is_available(),
) -> tuple[AutoModelForCausalLM, AutoTokenizer]:
    """
    Load a local language model and tokenizer from the specified path.
    Works with GPT-2 and other causal language models.
    
    Args:
        model_path: Path to the local model directory
        use_cuda: Whether to use CUDA for model inference
        
    Returns:
        Tuple of (model, tokenizer)
    """
    model_path = Path(model_path)
    
    # Check if the path exists
    if not model_path.exists():
        raise FileNotFoundError(f"Model path {model_path} does not exist")
    
    # Load the tokenizer and model
    tokenizer = AutoTokenizer.from_pretrained(model_path)
    model = AutoModelForCausalLM.from_pretrained(model_path)
    
    # Move model to GPU if available and requested
    device = torch.device("cuda" if use_cuda else "cpu")
    model.to(device)
    
    print(f"Model loaded successfully to {device}")
    return model, tokenizer


def generate_text(
    model: AutoModelForCausalLM,
    tokenizer: AutoTokenizer,
    prompt: str,
    max_length: int = 100,
    num_return_sequences: int = 1,
    temperature: float = 1.0,
    top_p: float = 0.9,
    top_k: int = 50,
    repetition_penalty: float = 1.0,
    seed: int = 42,
) -> list[str]:
    """
    Generate text using the loaded model.
    
    Args:
        model: Loaded GPT-2 model
        tokenizer: Loaded GPT-2 tokenizer
        prompt: Input text to continue from
        max_length: Maximum length of generated sequence
        num_return_sequences: Number of sequences to generate
        temperature: Sampling temperature
        top_p: Nucleus sampling parameter
        top_k: Top-k sampling parameter
        repetition_penalty: Penalty for repeating tokens
        seed: Random seed for reproducibility
        
    Returns:
        List of generated text sequences
    """
    set_seed(seed)
    
    # Create a text generation pipeline
    generator = pipeline(
        "text-generation",
        model=model,
        tokenizer=tokenizer,
        device=0 if torch.cuda.is_available() else -1
    )
    
    # Generate text
    outputs = generator(
        prompt,
        max_length=max_length,
        num_return_sequences=num_return_sequences,
        temperature=temperature,
        top_p=top_p,
        top_k=top_k,
        repetition_penalty=repetition_penalty,
        return_full_text=True,
    )
    
    # Extract generated texts
    generated_texts = [output["generated_text"] for output in outputs]
    return generated_texts


def evaluate_blimp(
    model: AutoModelForCausalLM, 
    tokenizer: AutoTokenizer,
    blimp_path: str,
    num_samples: int = 100,
) -> dict[str, float]:
    """
    Evaluate model on BLiMP (Benchmark of Linguistic Minimal Pairs) tests.
    
    Args:
        model: Loaded GPT-2 model
        tokenizer: Loaded GPT-2 tokenizer
        blimp_path: Path to BLiMP dataset directory or specific test file
        num_samples: Number of samples to evaluate (max)
        
    Returns:
        Dictionary with test name and accuracy
    """
    blimp_path = Path(blimp_path)
    results = {}
    
    # Process single file or directory
    if blimp_path.is_file():
        files = [blimp_path]
    else:
        files = list(blimp_path.glob("*.jsonl"))
    
    for file_path in files:
        test_name = file_path.stem
        print(f"Evaluating {test_name}...")
        
        # Load BLiMP data
        samples = []
        with open(file_path, "r") as f:
            for i, line in enumerate(f):
                if i >= num_samples:
                    break
                samples.append(json.loads(line))
        
        correct = 0
        total = len(samples)
        
        # Process each minimal pair
        for sample in samples:
            good_sentence = sample["sentence_good"]
            bad_sentence = sample["sentence_bad"]
            
            # Calculate perplexity for both sentences
            good_perplexity = calculate_perplexity(model, tokenizer, good_sentence)
            bad_perplexity = calculate_perplexity(model, tokenizer, bad_sentence)
            
            # A linguistically aware model should assign lower perplexity to good sentences
            if good_perplexity < bad_perplexity:
                correct += 1
        
        accuracy = correct / total
        results[test_name] = accuracy
        print(f"{test_name}: {accuracy:.2%} ({correct}/{total})")
    
    return results


def calculate_perplexity(
    model: AutoModelForCausalLM, 
    tokenizer: AutoTokenizer, 
    text: str
) -> float:
    """
    Calculate the perplexity of text using the model.
    
    Args:
        model: Loaded GPT-2 model
        tokenizer: Loaded GPT-2 tokenizer
        text: Input text
        
    Returns:
        Perplexity score (lower is better)
    """
    # Encode the text
    encodings = tokenizer(text, return_tensors="pt")
    input_ids = encodings.input_ids.to(model.device)
    
    # Get model output with cross entropy loss
    with torch.no_grad():
        outputs = model(input_ids, labels=input_ids)
        
    # Calculate perplexity from cross entropy loss
    loss = outputs.loss.item()
    perplexity = torch.exp(torch.tensor(loss)).item()
    
    return perplexity



In [11]:
# laod model
model_path = "/Users/jliu/Desktop/checkpoints/1e6_finetuned/1e6_seed_1_entropy_001_lm_loss_001/best_reward"

model, tokenizer = load_model(model_path,use_cuda=False)

Some weights of the model checkpoint at /Users/jliu/Desktop/checkpoints/1e6_finetuned/1e6_seed_1_entropy_001_lm_loss_001/best_reward were not used when initializing GPT2LMHeadModel: ['v_head.summary.bias', 'v_head.summary.weight']
- This IS expected if you are initializing GPT2LMHeadModel from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing GPT2LMHeadModel from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).


Model loaded successfully to cpu


In [13]:
prompt = "i am "
generate_text(model,tokenizer,prompt)

['i am  i going to put it in there?']

In [14]:
text = 'i am  i going to put it in there?'
calculate_perplexity(model,tokenizer,text)

7.053261756896973

In [None]:
#!/usr/bin/env python3
"""
Fine-tuning script for decoder-only transformer language models.
Loads text files as training data and fine-tunes a pre-trained model.
"""

import typing as t
from pathlib import Path
import argparse
import logging
import math
import os
import random
from dataclasses import dataclass, field

import numpy as np
import torch
from torch.utils.data import Dataset
from datasets import load_dataset

import transformers
from transformers import (
    AutoModelForCausalLM,
    AutoTokenizer,
    HfArgumentParser,
    DataCollatorForLanguageModeling,
    Trainer,
    TrainingArguments,
    set_seed,
)


# Configure logging
logging.basicConfig(
    format="%(asctime)s - %(levelname)s - %(name)s - %(message)s",
    datefmt="%Y-%m-%d %H:%M:%S",
    level=logging.INFO,
)
logger = logging.getLogger(__name__)


@dataclass
class ModelArguments:
    """Arguments pertaining to which model/config/tokenizer we are going to fine-tune."""

    model_name_or_path: str = field(
        metadata={"help": "Path to pretrained model or model identifier from huggingface.co/models"}
    )
    tokenizer_name_or_path: str = field(
        default=None,
        metadata={"help": "Path to pretrained tokenizer or tokenizer identifier (defaults to model_name_or_path)"}
    )
    cache_dir: str = field(
        default=None,
        metadata={"help": "Where to store the pretrained models downloaded from huggingface.co"}
    )
    use_fast_tokenizer: bool = field(
        default=True,
        metadata={"help": "Whether to use one of the fast tokenizer implementations"}
    )
    torch_dtype: str = field(
        default="auto",
        metadata={"help": "Floating-point format in which the model weights should be initialized and trained"}
    )


@dataclass
class DataArguments:
    """Arguments pertaining to what data we are going to input our model for training and evaluation."""

    train_file: str = field(
        default=None,
        metadata={"help": "Path to training file (.txt)"}
    )
    train_dir: str = field(
        default=None,
        metadata={"help": "Directory containing training files (.txt)"}
    )
    validation_file: str = field(
        default=None,
        metadata={"help": "Path to validation file (.txt)"}
    )
    validation_dir: str = field(
        default=None,
        metadata={"help": "Directory containing validation files (.txt)"}
    )
    max_seq_length: int = field(
        default=1024,
        metadata={"help": "Maximum sequence length that the model might handle"}
    )
    preprocessing_num_workers: int = field(
        default=None,
        metadata={"help": "Number of processes for preprocessing"}
    )
    overwrite_cache: bool = field(
        default=False,
        metadata={"help": "Overwrite the cached training and evaluation sets"}
    )
    preprocessing_only: bool = field(
        default=False,
        metadata={"help": "Only run the preprocessing script to be cached for future use"}
    )
    keep_linebreaks: bool = field(
        default=True,
        metadata={"help": "Whether to keep line breaks when processing input files"}
    )


@dataclass
class TrainingArguments(transformers.TrainingArguments):
    """Custom training arguments."""
    
    report_to: str = field(
        default="tensorboard",
        metadata={"help": "The integration to report the results and logs to."}
    )
    gradient_checkpointing: bool = field(
        default=False,
        metadata={"help": "Enable gradient checkpointing to save memory at the expense of slower backward pass."}
    )


class TextDataset(Dataset):
    """Dataset for loading text files for causal language modeling."""
    
    def __init__(
        self,
        tokenizer: AutoTokenizer,
        file_paths: list[str | Path],
        block_size: int,
        keep_linebreaks: bool = True,
    ):
        self.tokenizer = tokenizer
        self.file_paths = [Path(path) for path in file_paths]
        self.block_size = block_size
        self.keep_linebreaks = keep_linebreaks
        
        # Load and tokenize all the texts
        logger.info(f"Loading and tokenizing {len(file_paths)} text files...")
        self.examples = self._load_and_tokenize()
        logger.info(f"Created {len(self.examples)} training examples of size {block_size}")
    
    def _load_and_tokenize(self) -> list[torch.Tensor]:
        """Load all text files and tokenize them into block-sized chunks."""
        tokenized_examples = []
        
        for file_path in self.file_paths:
            try:
                with open(file_path, encoding="utf-8") as f:
                    text = f.read()
                    
                # Optional processing
                if not self.keep_linebreaks:
                    text = text.replace("\n", " ").replace("  ", " ")
                
                # Tokenize the text
                tokenized_text = self.tokenizer(text, return_tensors="pt", add_special_tokens=True)
                input_ids = tokenized_text.input_ids[0]
                
                # Create examples of block_size
                for i in range(0, len(input_ids) - self.block_size + 1, self.block_size):
                    tokenized_examples.append(input_ids[i:i + self.block_size])
                
            except Exception as e:
                logger.warning(f"Error processing file {file_path}: {e}")
        
        return tokenized_examples
    
    def __len__(self) -> int:
        return len(self.examples)
    
    def __getitem__(self, idx: int) -> torch.Tensor:
        return {"input_ids": self.examples[idx], "labels": self.examples[idx]}


def get_file_paths(path: str | Path) -> list[Path]:
    """Get all .txt file paths from a file or directory."""
    path = Path(path)
    
    if path.is_file():
        if path.suffix.lower() == ".txt":
            return [path]
        else:
            logger.warning(f"Ignoring non-txt file: {path}")
            return []
    elif path.is_dir():
        return list(path.glob("**/*.txt"))
    else:
        logger.warning(f"Path does not exist: {path}")
        return []


def setup_tokenizer(tokenizer: AutoTokenizer) -> AutoTokenizer:
    """Setup tokenizer for training by ensuring it has padding token, etc."""
    special_tokens_dict = {}
    
    # Ensure the tokenizer has a pad token
    if tokenizer.pad_token is None:
        if tokenizer.eos_token is not None:
            tokenizer.pad_token = tokenizer.eos_token
        else:
            special_tokens_dict["pad_token"] = "[PAD]"
    
    # Add special tokens if needed
    if special_tokens_dict:
        tokenizer.add_special_tokens(special_tokens_dict)
    
    return tokenizer


def finetune_model(
    model_args: ModelArguments,
    data_args: DataArguments,
    training_args: TrainingArguments,
) -> None:
    """Fine-tune the model with the given arguments."""
    # Set seed for reproducibility
    set_seed(training_args.seed)
    
    # Load pretrained model and tokenizer
    tokenizer_name = model_args.tokenizer_name_or_path or model_args.model_name_or_path
    tokenizer = AutoTokenizer.from_pretrained(
        tokenizer_name,
        cache_dir=model_args.cache_dir,
        use_fast=model_args.use_fast_tokenizer,
    )
    tokenizer = setup_tokenizer(tokenizer)
    
    # Determine the torch dtype
    torch_dtype = (
        model_args.torch_dtype
        if model_args.torch_dtype in ["auto", "float16", "float32"]
        else "auto"
    )
    
    # Load model
    logger.info(f"Loading pretrained model from {model_args.model_name_or_path}")
    model = AutoModelForCausalLM.from_pretrained(
        model_args.model_name_or_path,
        cache_dir=model_args.cache_dir,
        torch_dtype=torch_dtype,
    )
    
    # Resize token embeddings if needed
    model.resize_token_embeddings(len(tokenizer))
    
    # Collect text files
    train_files = []
    if data_args.train_file:
        train_files.extend(get_file_paths(data_args.train_file))
    if data_args.train_dir:
        train_files.extend(get_file_paths(data_args.train_dir))
    
    validation_files = []
    if data_args.validation_file:
        validation_files.extend(get_file_paths(data_args.validation_file))
    if data_args.validation_dir:
        validation_files.extend(get_file_paths(data_args.validation_dir))
    
    if not train_files:
        raise ValueError("No training files found. Please specify --train_file or --train_dir")
    
    logger.info(f"Found {len(train_files)} training files and {len(validation_files)} validation files")
    
    # Create datasets
    train_dataset = TextDataset(
        tokenizer=tokenizer,
        file_paths=train_files,
        block_size=data_args.max_seq_length,
        keep_linebreaks=data_args.keep_linebreaks,
    )
    
    validation_dataset = None
    if validation_files:
        validation_dataset = TextDataset(
            tokenizer=tokenizer,
            file_paths=validation_files,
            block_size=data_args.max_seq_length,
            keep_linebreaks=data_args.keep_linebreaks,
        )
    
    # Exit if we only want to preprocess the data
    if data_args.preprocessing_only:
        logger.info("Preprocessing completed. Exiting.")
        return
    
    # Data collator
    data_collator = DataCollatorForLanguageModeling(
        tokenizer=tokenizer,
        mlm=False,  # We're doing causal language modeling, not masked
    )
    
    # Initialize Trainer
    trainer = Trainer(
        model=model,
        args=training_args,
        train_dataset=train_dataset,
        eval_dataset=validation_dataset,
        tokenizer=tokenizer,
        data_collator=data_collator,
    )
    
    # Training
    train_result = trainer.train()
    
    # Save the model
    trainer.save_model()
    tokenizer.save_pretrained(training_args.output_dir)
    
    # Log and save metrics
    metrics = train_result.metrics
    trainer.log_metrics("train", metrics)
    trainer.save_metrics("train", metrics)
    trainer.save_state()
    
    logger.info("Training completed!")


def test_function() -> None:
    """
    Example showing how to use the script.
    """
    # This is a simple test to show how to use the script
    # In practice, you would call this from the command line
    
    # Example command:
    # python finetune_lm.py \
    #     --model_name_or_path gpt2 \
    #     --train_dir ./data/train \
    #     --validation_file ./data/valid/test.txt \
    #     --output_dir ./results \
    #     --num_train_epochs 3 \
    #     --per_device_train_batch_size 4 \
    #     --gradient_accumulation_steps 8 \
    #     --save_steps 500 \
    #     --save_total_limit 2
    
    # For testing in a Python script:
    os.environ["WANDB_DISABLED"] = "true"
    
    model_args = ModelArguments(
        model_name_or_path="gpt2-small",
    )
    
    data_args = DataArguments(
        train_dir="./data/train_texts",
        validation_file="./data/valid/sample.txt",
        max_seq_length=128,  # Small for testing
    )
    
    training_args = TrainingArguments(
        output_dir="./results",
        num_train_epochs=1,
        per_device_train_batch_size=2,
        save_steps=100,
        save_total_limit=2,
        evaluation_strategy="steps",
        eval_steps=100,
        logging_steps=10,
    )
    
    finetune_model(model_args, data_args, training_args)
    print("Test fine-tuning completed!")


def main() -> None:
    """Parse arguments and run fine-tuning."""
    parser = HfArgumentParser((ModelArguments, DataArguments, TrainingArguments))
    model_args, data_args, training_args = parser.parse_args_into_dataclasses()
    
    # Validate arguments
    if not data_args.train_file and not data_args.train_dir:
        raise ValueError("Need either a training file or directory")
    
    # Setup logging
    logging.basicConfig(
        format="%(asctime)s - %(levelname)s - %(name)s - %(message)s",
        datefmt="%m/%d/%Y %H:%M:%S",
        handlers=[logging.StreamHandler()],
    )
    
    logger.setLevel(logging.INFO if training_args.local_rank <= 0 else logging.WARN)
    logger.info(f"Training/evaluation parameters: {training_args}")
    
    # Fine-tune model
    finetune_model(model_args, data_args, training_args)


if __name__ == "__main__":
    main()