We're going to finetune GPT-2 to classify emotions in text. Before that, we'll create and score a baseline emotion classifier.

In [73]:
from datetime import datetime, timedelta
import gzip
import json
import random
import re
import requests
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score
import torch
from torch.optim import AdamW 
from torch.utils.data import Dataset, DataLoader
from transformers import AutoTokenizer, AutoModelForCausalLM
from tqdm import tqdm

In [14]:
random_state = 42
random.seed(random_state)

## Utilities

In [84]:
# Source: https://github.com/bryanesmith/notes/blob/master/The%20Hundred-Page%20Language%20Models%20Book/Chap%203/RNN.ipynb
def get_device_label():
    if torch.backends.mps.is_available():
        return "mps"
    elif torch.cuda.is_available():
        return "cuda"
    else:
        return "cpu"


# Source: https://github.com/bryanesmith/notes/blob/master/The%20Hundred-Page%20Language%20Models%20Book/Chap%204/decoder-only-transformer.ipynb
def print_training_progress(batch_idx, start, train_dataloader, epoch):
    ellapsed = datetime.now() - start
    percent_done = 100 * batch_idx/len(train_dataloader)
    estimated_total_time = 'unknown'
    if percent_done > 0:
        estimated_total_time = f'{ellapsed.total_seconds() / (percent_done/100) / (60 * 60):.1f} hr'
    print(f'[epoch={epoch}] batch {batch_idx} of {len(train_dataloader)} - {percent_done:.2f}% done - {timedelta(seconds=ellapsed.total_seconds())} ellapsed, est. total time: {estimated_total_time}')



# Source: https://github.com/aburkov/theLMbook/blob/main/emotion_classifier_LR.ipynb
def download_and_split_data(data_url, test_ratio=0.1):
    """
    Downloads emotion classification dataset from URL and splits into train/test sets.
    Handles decompression and JSON parsing of the raw data.

    Args:
        data_url (str): URL of the gzipped JSON dataset
        test_ratio (float): Proportion of data to use for testing (default: 0.1)

    Returns:
        tuple: (X_train, y_train, X_test, y_test) containing:
            - X_train, X_test: Lists of text examples for training and testing
            - y_train, y_test: Lists of corresponding emotion labels
    """
    # Download and decompress the dataset
    response = requests.get(data_url)
    content = gzip.decompress(response.content).decode()

    # Parse JSON lines into list of dictionaries
    dataset = [json.loads(line) for line in content.splitlines()]

    # Shuffle dataset for random split
    random.shuffle(dataset)

    # Split into train and test sets
    split_index = int(len(dataset) * (1 - test_ratio))
    train, test = dataset[:split_index], dataset[split_index:]

    # Separate text and labels
    X_train = [item["text"] for item in train]
    y_train = [item["label"] for item in train]
    X_test = [item["text"] for item in test]
    y_test = [item["label"] for item in test]

    return X_train, y_train, X_test, y_test


# Source: https://github.com/aburkov/theLMbook/blob/main/emotion_GPT2_as_text_generator.ipynb
def download_and_prepare_data(data_url, tokenizer, batch_size, test_ratio=0.1):
    """
    Downloads and prepares dataset for training.

    Args:
        data_url (str): URL of the dataset
        tokenizer: Tokenizer for text processing
        batch_size (int): Batch size for DataLoader
        test_ratio (float): Proportion of data for testing

    Returns:
        tuple: (train_loader, test_loader)
    """
    # Download compressed dataset
    response = requests.get(data_url)
    # Decompress and decode the content
    content = gzip.decompress(response.content).decode()

    # Parse each line as JSON and format into prompt-completion pairs
    dataset = []
    for entry in map(json.loads, content.splitlines()):
        dataset.append({
            "prompt": build_prompt(entry['text']),
            "completion": entry["label"].strip()
        })

    # Randomly shuffle dataset for better split
    random.shuffle(dataset)
    # Calculate split index based on test ratio
    split_index = int(len(dataset) * (1 - test_ratio))
    # Split into train and test sets
    train_data = dataset[:split_index]
    test_data = dataset[split_index:]

    # Create dataset objects
    train_dataset = PromptCompletionDataset(train_data, tokenizer)
    test_dataset = PromptCompletionDataset(test_data, tokenizer)

    # Create data loaders with appropriate settings
    train_loader = DataLoader(
        train_dataset,
        batch_size=batch_size,
        shuffle=True,         # Shuffle training data
        collate_fn=collate_fn  # Custom collation for padding
    )
    test_loader = DataLoader(
        test_dataset,
        batch_size=batch_size,
        shuffle=False,        # Don't shuffle test data
        collate_fn=collate_fn
    )

    return train_loader, test_loader


# Source: https://github.com/aburkov/theLMbook/blob/main/emotion_GPT2_as_text_generator.ipynb
def build_prompt(text):
    """
    Creates a standardized prompt for emotion classification.

    Args:
        text (str): Input text to classify

    Returns:
        str: Formatted prompt for the model
    """
    # Format the input text into a consistent prompt structure
    # Include explicit task instruction and expected output format
    return f"Predict the emotion for the following text: {text}\nEmotion:"


# Source: https://github.com/aburkov/theLMbook/blob/main/emotion_GPT2_as_text_generator.ipynb
class PromptCompletionDataset(Dataset):
    """
    PyTorch Dataset for prompt-completion pairs.
    Handles the conversion of text data into model-ready format.

    Args:
        data (list): List of dictionaries containing prompts and completions
        tokenizer: Hugging Face tokenizer
    """
    def __init__(self, data, tokenizer):
        # Store the raw data and tokenizer for later use
        self.data = data
        self.tokenizer = tokenizer

    def __len__(self):
        # Return the total number of examples in the dataset
        return len(self.data)

    def __getitem__(self, idx):
        """
        Returns a single training example.

        Args:
            idx (int): Index of the example to fetch

        Returns:
            dict: Contains input_ids, labels, prompt, and expected completion
        """
        # Get the specific example from our dataset
        item = self.data[idx]
        prompt = item["prompt"]
        completion = item["completion"]

        # Convert text to token IDs for both prompt and completion
        encoded_prompt = encode_text(self.tokenizer, prompt)
        encoded_completion = encode_text(self.tokenizer, completion)
        # Get the end-of-sequence token ID
        eos_token = self.tokenizer.eos_token_id

        # Combine prompt and completion tokens with EOS token
        input_ids = encoded_prompt + encoded_completion + [eos_token]
        # Create labels: -100 for prompt (ignored in loss), completion tokens for learning
        labels = [-100] * len(encoded_prompt) + encoded_completion + [eos_token]

        return {
            "input_ids": input_ids,
            "labels": labels,
            "prompt": prompt,
            "expected_completion": completion
        }


# Source: https://github.com/aburkov/theLMbook/blob/main/emotion_GPT2_as_text_generator.ipynb
def collate_fn(batch):
    """
    Collates batch of examples into training-ready format.
    Handles padding and conversion to tensors.

    Args:
        batch: List of examples from Dataset

    Returns:
        tuple: (input_ids, attention_mask, labels, prompts, expected_completions)
    """
    # Find the longest sequence in the batch for padding
    max_length = max(len(item["input_ids"]) for item in batch)

    # Pad input sequences to max_length with pad token
    input_ids = [
        item["input_ids"] +
        [tokenizer.pad_token_id] * (max_length - len(item["input_ids"]))
        for item in batch
    ]

    # Pad label sequences with -100 (ignored in loss calculation)
    labels = [
        item["labels"] +
        [-100] * (max_length - len(item["labels"]))
        for item in batch
    ]

    # Create attention masks: 1 for real tokens, 0 for padding
    attention_mask = [
        [1] * len(item["input_ids"]) +
        [0] * (max_length - len(item["input_ids"]))
        for item in batch
    ]

    # Keep original prompts and completions for evaluation
    prompts = [item["prompt"] for item in batch]
    expected_completions = [item["expected_completion"] for item in batch]

    # Convert everything to PyTorch tensors except text
    return (
        torch.tensor(input_ids),
        torch.tensor(attention_mask),
        torch.tensor(labels),
        prompts,
        expected_completions
    )


# Source: https://github.com/aburkov/theLMbook/blob/main/emotion_GPT2_as_text_generator.ipynb
def encode_text(tokenizer, text, return_tensor=False):
    """
    Encodes text using the provided tokenizer.

    Args:
        tokenizer: Hugging Face tokenizer
        text (str): Text to encode
        return_tensor (bool): Whether to return PyTorch tensor

    Returns:
        List or tensor of token IDs
    """
    # If tensor output is requested, encode with PyTorch tensors
    if return_tensor:
        return tokenizer.encode(
            text, add_special_tokens=False, return_tensors="pt"
        )
    # Otherwise return list of token IDs
    else:
        return tokenizer.encode(text, add_special_tokens=False)


# Source: https://github.com/aburkov/theLMbook/blob/main/emotion_GPT2_as_text_generator.ipynb
def decode_text(tokenizer, token_ids):
    """
    Decodes token IDs back to text.

    Args:
        tokenizer: Hugging Face tokenizer
        token_ids: List or tensor of token IDs

    Returns:
        str: Decoded text
    """
    # Convert token IDs back to text, skipping special tokens
    return tokenizer.decode(token_ids, skip_special_tokens=True)


# Source: https://github.com/aburkov/theLMbook/blob/main/emotion_GPT2_as_text_generator.ipynb
def calculate_accuracy(model, tokenizer, loader):
    """
    Calculates prediction accuracy on a dataset.

    Args:
        model: Fine-tuned model
        tokenizer: Associated tokenizer
        loader: DataLoader containing evaluation examples

    Returns:
        float: Accuracy score
    """
    # Initialize counters for accuracy calculation
    correct = 0
    total = 0

    # Disable gradient computation for efficiency
    with torch.no_grad():
        # Iterate through batches
        for input_ids, attention_mask, labels, prompts, expected_completions in loader:
            # Process each example in the batch
            for prompt, expected_completion in zip(prompts, expected_completions):
                # Generate model's prediction for this prompt
                generated_text = generate_text(model, tokenizer, prompt)
                # Compare normalized versions of prediction and expected completion
                if normalize_text(generated_text) == normalize_text(expected_completion):
                    correct += 1
                total += 1

    # Calculate accuracy, handling empty dataset case
    accuracy = correct / total if total > 0 else 0
    # Reset model to training mode
    return accuracy


# Source: https://github.com/aburkov/theLMbook/blob/main/emotion_GPT2_as_text_generator.ipynb
def generate_text(model, tokenizer, prompt, max_new_tokens=50):
    """
    Generates text completion for a given prompt.

    Args:
        model: Fine-tuned model
        tokenizer: Associated tokenizer
        prompt (str): Input prompt
        max_new_tokens (int): Maximum number of tokens to generate

    Returns:
        str: Generated completion
    """
    # Encode prompt and move to model's device
    input_ids = tokenizer(prompt, return_tensors="pt").to(model.device)

    # Generate completion using model
    output_ids = model.generate(
        input_ids=input_ids["input_ids"],
        attention_mask=input_ids["attention_mask"],
        max_new_tokens=max_new_tokens,
        pad_token_id=tokenizer.pad_token_id,
        eos_token_id=tokenizer.eos_token_id,
        use_cache=True,        # Use KV cache for faster generation
        num_beams=1,           # Use greedy decoding
        do_sample=False,       # Don't use sampling
    )[0]

    # Extract and decode only the generated part (excluding prompt)
    generated_text = decode_text(tokenizer, output_ids[input_ids["input_ids"].shape[1]:])
    return generated_text.strip()


# Source: https://github.com/aburkov/theLMbook/blob/main/emotion_GPT2_as_text_generator.ipynb
def normalize_text(text):
    """
    Normalizes text for consistent comparison.

    Args:
        text (str): Input text

    Returns:
        str: Normalized text
    """
    # Remove leading/trailing whitespace and convert to lowercase
    text = text.strip().lower()
    # Replace multiple whitespace characters with single space
    text = re.sub(r'\s+', ' ', text)
    return text

## Baseline Emotion Classifier

We'll use a multinomial logistic regression as a baseline model, with expectation that if we finetune our LLM properly, it should outperform the logistic regression.

### Dataset
We'll download a JSONL dataset with the following format:

```
{"text": "i slammed the door and yelled", "label": "anger"}
```

We will split this labeled data into training and test datasets. We then transform the data into a **bag-of-words**.

In [30]:
data_url = "https://www.thelmbook.com/data/emotions"
X_train_text, y_train, X_test_text, y_test = download_and_split_data(data_url, test_ratio=0.1)
print(f'|X_train_text| = {len(X_train_text)} ; |X_test_text| = {len(X_test_text)}')

|X_train_text| = 18000 ; |X_test_text| = 2000


In [31]:
vectorizer = CountVectorizer(max_features=10_000, binary=True)
X_train = vectorizer.fit_transform(X_train_text)
X_test = vectorizer.transform(X_test_text)

print(f'|X_train| = {X_train.shape} ; |X_test| = {X_test.shape}')

|X_train| = (18000, 10000) ; |X_test| = (2000, 10000)


### Build & Train

Next we train and test the model. We'll use accuracy as our metric for comparison.

In [54]:
model = LogisticRegression(random_state=random_state, max_iter=1000)
model.fit(X_train, y_train)

y_train_pred = model.predict(X_train)

train_accuracy = accuracy_score(y_train, y_train_pred)

print(f'Training accuracy: {train_accuracy * 100:.2f}%')

Training accuracy: 98.43%


### Evaluate

In [50]:
y_test_pred = model.predict(X_test)
test_accuracy = accuracy_score(y_test, y_test_pred)
print(f'Test accuracy: {test_accuracy * 100:.2f}%')

Test accuracy: 90.10%


## Finetuning GPT-2

In [77]:
device = get_device_label()
device = "cpu" # workaround "MPS backend out of memory"

### Dataset

In [51]:
model_name = "openai-community/gpt2"
num_epochs=8
batch_size=16
learning_rate=5e-5 # standard tuning rate for finetuning transformers

tokenizer = AutoTokenizer.from_pretrained(model_name)
tokenizer.pad_token = tokenizer.eos_token

In [64]:
train_loader, test_loader = download_and_prepare_data(
    data_url, tokenizer, batch_size
)

print(f'|train_loader| = {len(train_loader)} ; |test_loader| = {len(test_loader)}')

|train_loader| = 1125 ; |test_loader| = 125


### Build & Train

In [78]:
model = AutoModelForCausalLM.from_pretrained(model_name).to(device)
optimizer = AdamW(model.parameters(), lr=learning_rate)
start = datetime.now()
update_batch_cnt = len(train_loader) // 200  # print an update every 0.5% of progress

for epoch in range(num_epochs):
    for batch_idx, (input_ids, attention_mask, labels, _, _) in enumerate(train_loader):
        if batch_idx % update_batch_cnt == 0:
            print_training_progress(batch_idx, start, train_loader, epoch)
        input_ids = input_ids.to(device)
        attention_mask = attention_mask.to(device)
        labels = labels.to(device)
        outputs = model(input_ids=input_ids, labels=labels, attention_mask=attention_mask)
        outputs.loss.backward()
        optimizer.step()
        optimizer.zero_grad()

[epoch=0] batch 0 of 1125 - 0.00% done - 0:00:00.042153 ellapsed, est. total time: unknown
[epoch=0] batch 5 of 1125 - 0.44% done - 0:00:08.362048 ellapsed, est. total time: 0.5 hr
[epoch=0] batch 10 of 1125 - 0.89% done - 0:00:16.059798 ellapsed, est. total time: 0.5 hr
[epoch=0] batch 15 of 1125 - 1.33% done - 0:00:24.980465 ellapsed, est. total time: 0.5 hr
[epoch=0] batch 20 of 1125 - 1.78% done - 0:00:32.024190 ellapsed, est. total time: 0.5 hr
[epoch=0] batch 25 of 1125 - 2.22% done - 0:00:39.219023 ellapsed, est. total time: 0.5 hr
[epoch=0] batch 30 of 1125 - 2.67% done - 0:00:46.671640 ellapsed, est. total time: 0.5 hr
[epoch=0] batch 35 of 1125 - 3.11% done - 0:00:53.991504 ellapsed, est. total time: 0.5 hr
[epoch=0] batch 40 of 1125 - 3.56% done - 0:01:00.917847 ellapsed, est. total time: 0.5 hr
[epoch=0] batch 45 of 1125 - 4.00% done - 0:01:09.041752 ellapsed, est. total time: 0.5 hr
[epoch=0] batch 50 of 1125 - 4.44% done - 0:01:16.398622 ellapsed, est. total time: 0.5 hr


In [85]:
model.eval()
train_acc = calculate_accuracy(model, tokenizer, train_loader)
print(f'Training accuracy: {train_acc * 100:.2f}%')

Training accuracy: 98.31%


### Evaluate

In [86]:
test_acc = calculate_accuracy(model, tokenizer, test_loader)
print(f'Test accuracy: {test_acc * 100:.2f}%')

Test accuracy: 93.90%


In [87]:
prompt = build_prompt("I'm so happy to be able to finetune an LLM!")
generated_text = generate_text(model, tokenizer, prompt)
generated_text

'joy'