In [1]:
import json
import random
import gzip
import requests
import torch
from torch.utils.data import Dataset, DataLoader
from transformers import AutoTokenizer, AutoModelForCausalLM
from torch.optim import AdamW
from tqdm import tqdm
import re

In [2]:
def encode_text(tokenizer, text, retunr_tensor=False):
  if retunr_tensor:
    return tokenizer.encode(text, add_special_tokens=False, return_tensors="pt")
  else:
    return tokenizer.encode(text,add_special_tokens=False)

def decode_text(tokenizer, token_ids):
    return tokenizer.decode(token_ids, skip_special_tokens=True)


class PromptCompletionDataset(Dataset):

  def __init__(self, data, tokenizer ):
    self.data = data
    self.tokenizer = tokenizer

  def __len__(self):
    return len(self.data)

  def __getitem__(self, idx):
    item = self.data[idx]
    prompt = item["prompt"]
    completion = item["completion"]

    encoded_prompt = encode_text(self.tokenizer, prompt)
    encoded_completion = encode_text(self.tokenizer, completion)
    eos_token = self.tokenizer.eos_token_id

    input_ids = encoded_prompt + encoded_completion + [eos_token]

    labels = [-100] * len(encoded_prompt) + encoded_completion + [eos_token]

    return {
        "input_ids": input_ids,
        "labels": labels,
        "prompt": prompt,
        "expected_completion": completion
    }

def collate_fun(batch):
  max_length = max([len(item["input_ids"]) for item in batch])

  input_ids = [
      item["input_ids"] + [tokenizer.pad_token_id] * (max_length - len(item["input_ids"]))
      for item in batch
  ]

  labels = [
      item["labels"] + [-100] * (max_length - len(item["labels"]))
      for item in batch
  ]

  attention_mask = [[1] * len(item["input_ids"]) + [0] * (max_length - len(item["input_ids"])) for item in batch]

  prompts = [item["prompt"] for item in batch]
  expected_completions = [item["expected_completion"] for item in batch]

  return {
      "input_ids": torch.tensor(input_ids),
      "labels": torch.tensor(labels),
      "attention_mask": torch.tensor(attention_mask),
      "prompts": prompts,
      "expected_completions": expected_completions
  }

In [3]:
def get_hyperparameters():
  return 2, 16, 5e-5

def build_prompt(text):
  return f"Predict the emotion for the following text: {text}\nEmotion:"

def download_and_prepare_data(data_url, tokenizer, batch_size, test_ratio=0.1):
  response = requests.get(data_url)
  content = gzip.decompress(response.content).decode()

  dataset = []
  for entry in map(json.loads, content.splitlines()):
    dataset.append({
        "prompt": build_prompt(entry['text']),
        "completion": entry["label"].strip()
    })
  random.shuffle(dataset)
  split_index = int(len(dataset)* (1-test_ratio))
  train_data = dataset[:split_index]
  test_data = dataset[split_index:]

  train_dataset = PromptCompletionDataset(train_data, tokenizer)
  test_dataset = PromptCompletionDataset(test_data, tokenizer)

  train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, collate_fn=collate_fun)
  test_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, collate_fn=collate_fun)

  return train_dataloader, test_dataloader


#copied

def set_seed(seed):
    """
    Sets random seeds for reproducibility across different libraries.

    Args:
        seed (int): Seed value for random number generation
    """
    # Set Python's built-in random seed
    random.seed(seed)
    # Set PyTorch's CPU random seed
    torch.manual_seed(seed)
    # Set seed for all available GPUs
    torch.cuda.manual_seed_all(seed)
    # Request cuDNN to use deterministic algorithms
    torch.backends.cudnn.deterministic = True
    # Disable cuDNN's auto-tuner for consistent behavior
    torch.backends.cudnn.benchmark = False

def normalize_text(text):
    """
    Normalizes text for consistent comparison.

    Args:
        text (str): Input text

    Returns:
        str: Normalized text
    """
    # Remove leading/trailing whitespace and convert to lowercase
    text = text.strip().lower()
    # Replace multiple whitespace characters with single space
    text = re.sub(r'\s+', ' ', text)
    return text

def generate_text(model, tokenizer, prompt, max_new_tokens=50):
    """
    Generates text completion for a given prompt.

    Args:
        model: Fine-tuned model
        tokenizer: Associated tokenizer
        prompt (str): Input prompt
        max_new_tokens (int): Maximum number of tokens to generate

    Returns:
        str: Generated completion
    """
    # Encode prompt and move to model's device
    input_ids = tokenizer(prompt, return_tensors="pt").to(model.device)

    # Generate completion using model
    output_ids = model.generate(
        input_ids=input_ids["input_ids"],
        attention_mask=input_ids["attention_mask"],
        max_new_tokens=max_new_tokens,
        pad_token_id=tokenizer.pad_token_id,
        eos_token_id=tokenizer.eos_token_id,
        use_cache=True,        # Use KV cache for faster generation
        num_beams=1,           # Use greedy decoding
        do_sample=False,       # Don't use sampling
    )[0]

    # Extract and decode only the generated part (excluding prompt)
    generated_text = decode_text(tokenizer, output_ids[input_ids["input_ids"].shape[1]:])
    return generated_text.strip()

def calculate_accuracy(model, tokenizer, loader):
    """
    Calculates prediction accuracy on a dataset.

    Args:
        model: Fine-tuned model
        tokenizer: Associated tokenizer
        loader: DataLoader containing evaluation examples

    Returns:
        float: Accuracy score
    """
    # Set model to evaluation mode (disables dropout, etc.)
    model.eval()
    # Initialize counters for accuracy calculation
    correct = 0
    total = 0

    # Disable gradient computation for efficiency
    with torch.no_grad():
        # Iterate through batches
        for input_ids, attention_mask, labels, prompts, expected_completions in loader:
            # Process each example in the batch
            for prompt, expected_completion in zip(prompts, expected_completions):
                # Generate model's prediction for this prompt
                generated_text = generate_text(model, tokenizer, prompt)
                # Compare normalized versions of prediction and expected completion
                if normalize_text(generated_text) == normalize_text(expected_completion):
                    correct += 1
                total += 1

    # Calculate accuracy, handling empty dataset case
    accuracy = correct / total if total > 0 else 0
    # Reset model to training mode
    model.train()
    return accuracy

def test_model(model_path, test_input):
    """
    Tests a saved model on a single input.

    Args:
        model_path (str): Path to saved model
        test_input (str): Text to classify
    """
    # Determine device (GPU if available, else CPU)
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print(f"Using device: {device}")

    # Load saved model and move to appropriate device
    model = AutoModelForCausalLM.from_pretrained(model_path).to(device)
    tokenizer = AutoTokenizer.from_pretrained(model_path)

    # Ensure model has proper padding token configuration
    if tokenizer.pad_token is None:
        tokenizer.pad_token = tokenizer.eos_token
    model.config.pad_token_id = tokenizer.pad_token_id

    # Create prompt and generate prediction
    prompt = build_prompt(test_input)
    generated_text = generate_text(model, tokenizer, prompt)

    # Display results
    print(f"Input: {test_input}")
    print(f"Generated emotion: {generated_text}")

In [4]:
if __name__ == "__main__":
  set_seed(41)

  data_url = "https://www.thelmbook.com/data/emotions"
  model_name = "openai-community/gpt2"
  device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
  print(f"Using device: {device}")

  tokenizer = AutoTokenizer.from_pretrained(model_name)
  tokenizer.pad_token = tokenizer.eos_token

  model = AutoModelForCausalLM.from_pretrained(model_name).to(device)

  num_epochs, batch_size, learning_rate = get_hyperparameters()

  train_loader, test_loader = download_and_prepare_data(data_url, tokenizer, batch_size)

  optimizer = AdamW(model.parameters(), lr=learning_rate)

  for epoch in range(num_epochs):
    total_loss = 0
    num_batches = 0
    progress_bar = tqdm(train_loader, desc=f"Epoch {epoch+1}/{num_epochs}")
    for batch in progress_bar:
      input_ids = batch["input_ids"].to(device)
      attention_mask = batch["attention_mask"].to(device)
      labels = batch["labels"].to(device)


      outputs = model(input_ids=input_ids, attention_mask=attention_mask, labels=labels)
      loss = outputs.loss

      loss.backward()
      optimizer.step()
      optimizer.zero_grad()

      total_loss += loss.item()
      num_batches += 1

      progress_bar.set_postfix({"loss": total_loss / num_batches})

    avg_loss = total_loss / num_batches
    test_acc = calculate_accuracy(model, tokenizer, test_loader)
    print(f"Epoch {epoch+1}/{num_epochs}, Average Loss: {avg_loss}, test accuracy: {test_acc:.4f}")


# Calculate final model performance
  train_acc = calculate_accuracy(model, tokenizer, train_loader)
  print(f"Training accuracy: {train_acc:.4f}")
  print(f"Test accuracy: {test_acc:.4f}")

  # Save the trained model and tokenizer
  model.save_pretrained("./finetuned_model")
  tokenizer.save_pretrained("./finetuned_model")

  # Test model with a sample input
  test_input = "I'm so happy to be able to finetune an LLM!"
  generated_completion = generate_text(model, tokenizer, build_prompt(test_input))
  print(f"Prompt: {test_input}\nGenerated Completion: {generated_completion}")

Using device: cuda


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.
Epoch 1/2:   0%|          | 0/1125 [00:00<?, ?it/s]`loss_type=None` was set in the config but it is unrecognized. Using the default loss: `ForCausalLMLoss`.
Epoch 1/2: 100%|██████████| 1125/1125 [05:17<00:00,  3.54it/s, loss=0.123]


Epoch 1/2, Average Loss: 0.12278269641763634, test accuracy: 0.0000


Epoch 2/2: 100%|██████████| 1125/1125 [05:16<00:00,  3.56it/s, loss=0.0585]


Epoch 2/2, Average Loss: 0.05850858378454318, test accuracy: 0.0000
Training accuracy: 0.0000
Test accuracy: 0.0000
Prompt: I'm so happy to be able to finetune an LLM!
Generated Completion: joy


In [5]:
# work on accuracy fn