<a href="https://colab.research.google.com/github/gis2010/AgentGPT/blob/main/emotion_GPT2_as_text_generator_LoRA.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Import required libraries
import json
import random
import gzip
import requests
import torch
from peft import get_peft_model, LoraConfig, TaskType
from torch.utils.data import Dataset, DataLoader
from transformers import AutoTokenizer, AutoModelForCausalLM
from torch.optim import AdamW
from tqdm import tqdm
import re

# Set random seeds for reproducibility
def set_seed(seed):
    random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

# Build prompt for emotion classification
def build_prompt(text):
    return f"Predict the emotion for the following text: {text}\nEmotion:"

# Encode text using tokenizer
def encode_text(tokenizer, text, return_tensor=False):
    if return_tensor:
        return tokenizer.encode(text, add_special_tokens=False, return_tensors="pt")
    else:
        return tokenizer.encode(text, add_special_tokens=False)

# Decode token IDs back to text
def decode_text(tokenizer, token_ids):
    return tokenizer.decode(token_ids, skip_special_tokens=True)

# Define HDC operations
class HDCOperations:
    def __init__(self, dim=10000):
        self.dim = dim

    def bind(self, a, b):
        """
        Binding operation: XOR for binary vectors.
        """
        return a ^ b

    def unbind(self, a, b):
        """
        Unbinding operation: XOR for binary vectors (self-inverse).
        """
        return a ^ b

    def bundle(self, vectors):
        """
        Bundling operation: Thresholded sum for binary vectors.
        """
        return (sum(vectors) > len(vectors) / 2).int()

# HDC-based memory storage
class HDCMemory:
    def __init__(self, dim=10000):
        self.dim = dim
        self.memory = {}
        self.hdc_ops = HDCOperations(dim)

    def store(self, key, value):
        """
        Store a key-value pair using HDC binding.
        """
        self.memory[key] = self.hdc_ops.bind(key, value)

    def retrieve(self, key):
        """
        Retrieve a value using HDC unbinding.
        """
        if key in self.memory:
            return self.hdc_ops.unbind(key, self.memory[key])
        else:
            raise KeyError("Key not found in memory.")

    def bundle(self, keys):
        """
        Bundle multiple keys into a single vector.
        """
        vectors = [self.memory[key] for key in keys]
        return self.hdc_ops.bundle(vectors)

# Dataset class with HDC integration
class PromptCompletionDataset(Dataset):
    def __init__(self, data, tokenizer, hdc_memory):
        self.data = data
        self.tokenizer = tokenizer
        self.hdc_memory = hdc_memory

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        item = self.data[idx]
        prompt = item["prompt"]
        completion = item["completion"]

        # Encode prompt and completion as high-dimensional vectors
        encoded_prompt = self.tokenizer.encode(prompt, add_special_tokens=False)
        encoded_completion = self.tokenizer.encode(completion, add_special_tokens=False)

        # Store in HDC memory
        key = torch.tensor(encoded_prompt, dtype=torch.int)
        value = torch.tensor(encoded_completion, dtype=torch.int)
        self.hdc_memory.store(key, value)

        # Retrieve from HDC memory (for demonstration)
        retrieved_value = self.hdc_memory.retrieve(key)

        return {
            "input_ids": encoded_prompt + encoded_completion + [self.tokenizer.eos_token_id],
            "labels": [-100] * len(encoded_prompt) + encoded_completion + [self.tokenizer.eos_token_id],
            "prompt": prompt,
            "expected_completion": completion,
            "retrieved_completion": self.tokenizer.decode(retrieved_value.tolist(), skip_special_tokens=True)
        }

# Collate function for DataLoader
def collate_fn(batch):
    max_length = max(len(item["input_ids"]) for item in batch)
    input_ids = [item["input_ids"] + [tokenizer.pad_token_id] * (max_length - len(item["input_ids"])) for item in batch]
    labels = [item["labels"] + [-100] * (max_length - len(item["labels"])) for item in batch]
    attention_mask = [[1] * len(item["input_ids"]) + [0] * (max_length - len(item["input_ids"])) for item in batch]
    prompts = [item["prompt"] for item in batch]
    expected_completions = [item["expected_completion"] for item in batch]
    retrieved_completions = [item["retrieved_completion"] for item in batch]

    return (
        torch.tensor(input_ids),
        torch.tensor(attention_mask),
        torch.tensor(labels),
        prompts,
        expected_completions,
        retrieved_completions
    )

# Download and prepare dataset
def download_and_prepare_data(data_url, tokenizer, batch_size, test_ratio=0.1):
    response = requests.get(data_url)
    content = gzip.decompress(response.content).decode()
    dataset = [{"prompt": build_prompt(entry['text']), "completion": entry["label"].strip()} for entry in map(json.loads, content.splitlines())]
    random.shuffle(dataset)
    split_index = int(len(dataset) * (1 - test_ratio))
    train_data = dataset[:split_index]
    test_data = dataset[split_index:]

    # Initialize HDC memory
    hdc_memory = HDCMemory()

    # Create datasets
    train_dataset = PromptCompletionDataset(train_data, tokenizer, hdc_memory)
    test_dataset = PromptCompletionDataset(test_data, tokenizer, hdc_memory)

    # Create data loaders
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, collate_fn=collate_fn)
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, collate_fn=collate_fn)

    return train_loader, test_loader, hdc_memory

# Calculate accuracy
def calculate_accuracy(model, tokenizer, loader):
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for input_ids, attention_mask, labels, prompts, expected_completions, retrieved_completions in loader:
            for prompt, expected_completion in zip(prompts, expected_completions):
                generated_text = generate_text(model, tokenizer, prompt)
                if generated_text.strip().lower() == expected_completion.strip().lower():
                    correct += 1
                total += 1
    accuracy = correct / total if total > 0 else 0
    model.train()
    return accuracy

# Generate text using the model
def generate_text(model, tokenizer, prompt, max_new_tokens=50):
    input_ids = tokenizer(prompt, return_tensors="pt").to(model.device)
    output_ids = model.generate(
        input_ids=input_ids["input_ids"],
        attention_mask=input_ids["attention_mask"],
        max_new_tokens=max_new_tokens,
        pad_token_id=tokenizer.pad_token_id,
        eos_token_id=tokenizer.eos_token_id
    )[0]
    generated_text = decode_text(tokenizer, output_ids[input_ids["input_ids"].shape[1]:])
    return generated_text.strip()

# Test the model
def test_model(model_path, test_input, hdc_memory):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print(f"Using device: {device}")

    # Load saved model and tokenizer
    model = AutoModelForCausalLM.from_pretrained(model_path).to(device)
    tokenizer = AutoTokenizer.from_pretrained(model_path)

    # Configure padding token
    if tokenizer.pad_token is None:
        tokenizer.pad_token = tokenizer.eos_token
    model.config.pad_token_id = tokenizer.pad_token_id

    # Generate and display prediction
    prompt = build_prompt(test_input)
    generated_text = generate_text(model, tokenizer, prompt)

    print(f"Input: {test_input}")
    print(f"Generated emotion: {generated_text}")

    # Retrieve from HDC memory (for demonstration)
    key = torch.tensor(tokenizer.encode(prompt, add_special_tokens=False), dtype=torch.int)
    retrieved_value = hdc_memory.retrieve(key)
    print(f"Retrieved completion from HDC memory: {tokenizer.decode(retrieved_value.tolist(), skip_special_tokens=True)}")

# Main training script
if __name__ == "__main__":
    set_seed(42)

    # Configure basic training parameters
    data_url = "https://www.thelmbook.com/data/emotions"
    model_name = "openai-community/gpt2"
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print(f"Using device: {device}")

    # Initialize tokenizer
    tokenizer = AutoTokenizer.from_pretrained(model_name)
    tokenizer.pad_token = tokenizer.eos_token

    # Configure LoRA parameters
    peft_config = LoraConfig(
        task_type=TaskType.CAUSAL_LM,
        inference_mode=False,
        r=16,
        lora_alpha=32
    )

    # Load model and apply LoRA configuration
    model = AutoModelForCausalLM.from_pretrained(model_name).to(device)
    model = get_peft_model(model, peft_config)

    # Get hyperparameters and prepare data
    num_epochs, batch_size, learning_rate = 18, 16, 5e-5
    train_loader, test_loader, hdc_memory = download_and_prepare_data(data_url, tokenizer, batch_size)

    # Initialize optimizer
    optimizer = AdamW(model.parameters(), lr=learning_rate)

    # Training loop
    for epoch in range(num_epochs):
        total_loss = 0
        num_batches = 0
        progress_bar = tqdm(train_loader, desc=f"Epoch {epoch+1}/{num_epochs}")

        for input_ids, attention_mask, labels, _, _, _ in progress_bar:
            input_ids = input_ids.to(device)
            attention_mask = attention_mask.to(device)
            labels = labels.to(device)

            # Forward pass
            outputs = model(input_ids=input_ids, attention_mask=attention_mask, labels=labels)
            loss = outputs.loss

            # Backward pass and optimization
            loss.backward()
            optimizer.step()
            optimizer.zero_grad()

            # Update metrics
            total_loss += loss.item()
            num_batches += 1
            progress_bar.set_postfix({"Loss": total_loss / num_batches})

        # Evaluate on test set
        test_acc = calculate_accuracy(model, tokenizer, test_loader)
        print(f"Epoch {epoch+1} - Average loss: {total_loss / num_batches:.4f}, Test accuracy: {test_acc:.4f}")

    # Save the model and tokenizer
    model.save_pretrained("./finetuned_model")
    tokenizer.save_pretrained("./finetuned_model")

    # Test the finetuned model
    test_input = "I'm so happy to be able to finetune an LLM!"
    test_model("./finetuned_model", test_input, hdc_memory)

Using device: cuda


Epoch 1/18: 100%|██████████| 1125/1125 [00:53<00:00, 20.98it/s, Loss=0.613]


Epoch 1 - Average loss: 0.6127, Test accuracy: 0.7605


Epoch 2/18: 100%|██████████| 1125/1125 [00:54<00:00, 20.62it/s, Loss=0.353]


Epoch 2 - Average loss: 0.3532, Test accuracy: 0.7970


Epoch 3/18: 100%|██████████| 1125/1125 [00:54<00:00, 20.61it/s, Loss=0.237]


Epoch 3 - Average loss: 0.2375, Test accuracy: 0.8530


Epoch 4/18: 100%|██████████| 1125/1125 [00:55<00:00, 20.45it/s, Loss=0.184]


Epoch 4 - Average loss: 0.1843, Test accuracy: 0.8985


Epoch 5/18: 100%|██████████| 1125/1125 [00:54<00:00, 20.68it/s, Loss=0.146]


Epoch 5 - Average loss: 0.1457, Test accuracy: 0.9175


Epoch 6/18: 100%|██████████| 1125/1125 [00:54<00:00, 20.63it/s, Loss=0.121]


Epoch 6 - Average loss: 0.1208, Test accuracy: 0.9215


Epoch 7/18: 100%|██████████| 1125/1125 [00:54<00:00, 20.50it/s, Loss=0.103]


Epoch 7 - Average loss: 0.1028, Test accuracy: 0.9260


Epoch 8/18: 100%|██████████| 1125/1125 [00:54<00:00, 20.62it/s, Loss=0.0927]


Epoch 8 - Average loss: 0.0927, Test accuracy: 0.9260


Epoch 9/18: 100%|██████████| 1125/1125 [00:54<00:00, 20.59it/s, Loss=0.0887]


Epoch 9 - Average loss: 0.0887, Test accuracy: 0.9330


Epoch 10/18: 100%|██████████| 1125/1125 [00:54<00:00, 20.58it/s, Loss=0.079]


Epoch 10 - Average loss: 0.0790, Test accuracy: 0.9315


Epoch 11/18: 100%|██████████| 1125/1125 [00:54<00:00, 20.69it/s, Loss=0.0771]


Epoch 11 - Average loss: 0.0771, Test accuracy: 0.9325


Epoch 12/18: 100%|██████████| 1125/1125 [00:54<00:00, 20.60it/s, Loss=0.0699]


Epoch 12 - Average loss: 0.0699, Test accuracy: 0.9345


Epoch 13/18: 100%|██████████| 1125/1125 [00:54<00:00, 20.60it/s, Loss=0.0663]


Epoch 13 - Average loss: 0.0663, Test accuracy: 0.9265


Epoch 14/18: 100%|██████████| 1125/1125 [00:54<00:00, 20.61it/s, Loss=0.064]


Epoch 14 - Average loss: 0.0640, Test accuracy: 0.9375


Epoch 15/18: 100%|██████████| 1125/1125 [00:54<00:00, 20.60it/s, Loss=0.0633]


Epoch 15 - Average loss: 0.0633, Test accuracy: 0.9380


Epoch 16/18: 100%|██████████| 1125/1125 [00:54<00:00, 20.75it/s, Loss=0.0605]


Epoch 16 - Average loss: 0.0605, Test accuracy: 0.9390


Epoch 17/18: 100%|██████████| 1125/1125 [00:53<00:00, 20.85it/s, Loss=0.0571]


Epoch 17 - Average loss: 0.0571, Test accuracy: 0.9370


Epoch 18/18: 100%|██████████| 1125/1125 [00:55<00:00, 20.26it/s, Loss=0.0574]


Epoch 18 - Average loss: 0.0574, Test accuracy: 0.9420
Training accuracy: 0.9424
Test accuracy: 0.9420
Using device: cuda
Input: I'm so happy to be able to finetune an LLM!
Generated emotion: joy
