In [None]:
!pip install transformers datasets evaluate peft torch


In [None]:
import torch
import numpy as np
import pandas as pd
import json
from transformers import (
    GPT2LMHeadModel,
    GPT2Tokenizer,
    GPT2Config,
    DataCollatorWithPadding,
    TrainingArguments,
    Trainer
)
from peft import LoraConfig, get_peft_model, TaskType
from datasets import Dataset, DatasetDict
import matplotlib.pyplot as plt
from tqdm import tqdm
import warnings
warnings.filterwarnings('ignore')


## Initialize GPT-2 Model with LoRA Configuration


In [None]:
model_checkpoint = "gpt2"

# Load tokenizer and add special tokens for QA
tokenizer = GPT2Tokenizer.from_pretrained(model_checkpoint)
tokenizer.pad_token = tokenizer.eos_token
tokenizer.add_special_tokens({
    'sep_token': '<SEP>',
    'cls_token': '<CLS>'
})

# Load base model
model = GPT2LMHeadModel.from_pretrained(model_checkpoint)
model.resize_token_embeddings(len(tokenizer))

# Configure LoRA
lora_config = LoraConfig(
    task_type=TaskType.CAUSAL_LM,
    inference_mode=False,
    r=16,  # rank
    lora_alpha=32,  # scaling parameter
    lora_dropout=0.1,
    target_modules=["c_attn", "c_proj", "c_fc"],  # All linear layers in GPT-2
    bias="none"
)

# Apply LoRA to the model
model = get_peft_model(model, lora_config)

# Print trainable parameters
model.print_trainable_parameters()

print(f"Total parameters: {sum(p.numel() for p in model.parameters()):,}")
print(f"Trainable parameters: {sum(p.numel() for p in model.parameters() if p.requires_grad):,}")


## Load and Explore SQuAD Dataset from Kaggle Files


In [None]:
# Load the JSON files
with open('train-v1.1.json', 'r') as f:
    train_data = json.load(f)

with open('dev-v1.1.json', 'r') as f:
    dev_data = json.load(f)

# Function to convert SQuAD format to flat structure for GPT-2 QA format
def squad_json_to_dataframe(squad_data):
    contexts = []
    questions = []
    answers = []
    ids = []
    
    for article in squad_data['data']:
        for paragraph in article['paragraphs']:
            context = paragraph['context']
            for qa in paragraph['qas']:
                contexts.append(context)
                questions.append(qa['question'])
                ids.append(qa['id'])
                
                # Handle answers
                if qa.get('answers'):
                    answer_starts = [answer['answer_start'] for answer in qa['answers']]
                    answer_texts = [answer['text'] for answer in qa['answers']]
                    answers.append({
                        'text': answer_texts,
                        'answer_start': answer_starts
                    })
                else:
                    answers.append({
                        'text': [],
                        'answer_start': []
                    })
    
    return {
        'id': ids,
        'context': contexts,
        'question': questions,
        'answers': answers
    }

# Convert to dataframes
train_df = squad_json_to_dataframe(train_data)
dev_df = squad_json_to_dataframe(dev_data)

# Create datasets
dataset = DatasetDict({
    'train': Dataset.from_dict(train_df),
    'validation': Dataset.from_dict(dev_df)
})

# Display dataset info
print("Dataset Structure:")
print(dataset)
print("\nTraining examples:", len(dataset['train']))
print("Validation examples:", len(dataset['validation']))

# Display sample data
print("\nSample training example:")
sample = dataset['train'][0]
print(f"Context: {sample['context'][:200]}...")
print(f"Question: {sample['question']}")
print(f"Answer: {sample['answers']}")


## Preprocessing for GPT-2 QA Format


In [None]:
max_length = 512

def preprocess_function(examples):
    """
    Preprocess examples for GPT-2 question answering.
    Format: <CLS> Question <SEP> Context <SEP> Answer
    """
    input_ids_list = []
    attention_mask_list = []
    labels_list = []
    
    for i in range(len(examples['question'])):
        question = examples['question'][i].strip()
        context = examples['context'][i].strip()
        
        # Get the first answer (if available)
        if examples['answers'][i]['text']:
            answer = examples['answers'][i]['text'][0].strip()
        else:
            answer = "No answer found."
        
        # Create input text in QA format
        input_text = f"{tokenizer.cls_token} {question} {tokenizer.sep_token} {context} {tokenizer.sep_token}"
        full_text = f"{input_text} {answer}{tokenizer.eos_token}"
        
        # Tokenize the full text
        encoded = tokenizer(
            full_text,
            max_length=max_length,
            truncation=True,
            padding=False,
            return_tensors=None
        )
        
        input_ids = encoded["input_ids"]
        attention_mask = encoded["attention_mask"]
        
        # Create labels (same as input_ids)
        labels = input_ids.copy()
        
        # Find where the answer starts (after the second SEP token)
        sep_positions = [j for j, token_id in enumerate(input_ids) if token_id == tokenizer.sep_token_id]
        if len(sep_positions) >= 2:
            answer_start = sep_positions[1] + 1
            # Mask everything before the answer
            for j in range(answer_start):
                labels[j] = -100
        
        input_ids_list.append(input_ids)
        attention_mask_list.append(attention_mask)
        labels_list.append(labels)
    
    return {
        "input_ids": input_ids_list,
        "attention_mask": attention_mask_list,
        "labels": labels_list
    }


## Prepare Datasets


In [None]:
# Preprocess datasets
tokenized_train = dataset["train"].map(
    preprocess_function,
    batched=True,
    remove_columns=dataset["train"].column_names,
    num_proc=2
)

tokenized_validation = dataset["validation"].map(
    preprocess_function,
    batched=True,
    remove_columns=dataset["validation"].column_names,
    num_proc=2
)

# Use a smaller subset for faster training (optional)
train_size = min(10000, len(tokenized_train))
val_size = min(1000, len(tokenized_validation))

tokenized_train = tokenized_train.select(range(train_size))
tokenized_validation = tokenized_validation.select(range(val_size))

print(f"Training examples: {len(tokenized_train)}")
print(f"Validation examples: {len(tokenized_validation)}")


## Training Setup with LoRA


In [None]:
# Disable wandb logging
import os
os.environ["WANDB_DISABLED"] = "true"

# Custom data collator to handle our specific case
from dataclasses import dataclass
from typing import Any, Dict, List, Union
import torch

@dataclass
class CustomDataCollatorForCausalLM:
    tokenizer: Any
    max_length: int = 512
    
    def __call__(self, features: List[Dict[str, Any]]) -> Dict[str, torch.Tensor]:
        batch = {}
        
        # Get the maximum length in this batch
        max_len = min(max([len(f["input_ids"]) for f in features]), self.max_length)
        
        # Pad sequences
        input_ids = []
        attention_mask = []
        labels = []
        
        for feature in features:
            # Pad input_ids
            padded_input_ids = feature["input_ids"][:max_len]
            padded_input_ids += [self.tokenizer.pad_token_id] * (max_len - len(padded_input_ids))
            input_ids.append(padded_input_ids)
            
            # Pad attention_mask
            padded_attention_mask = feature["attention_mask"][:max_len]
            padded_attention_mask += [0] * (max_len - len(padded_attention_mask))
            attention_mask.append(padded_attention_mask)
            
            # Pad labels
            padded_labels = feature["labels"][:max_len]
            padded_labels += [-100] * (max_len - len(padded_labels))
            labels.append(padded_labels)
        
        batch["input_ids"] = torch.tensor(input_ids, dtype=torch.long)
        batch["attention_mask"] = torch.tensor(attention_mask, dtype=torch.long)
        batch["labels"] = torch.tensor(labels, dtype=torch.long)
        
        return batch

# Training arguments optimized for LoRA fine-tuning
training_args = TrainingArguments(
    output_dir="./gpt2-lora-qa",
    num_train_epochs=3,
    per_device_train_batch_size=4,
    per_device_eval_batch_size=4,
    gradient_accumulation_steps=4,
    warmup_steps=100,
    learning_rate=5e-4,  # Higher LR for LoRA
    fp16=True,
    logging_steps=50,
    eval_strategy="steps",
    eval_steps=500,
    save_strategy="steps",
    save_steps=500,
    load_best_model_at_end=True,
    metric_for_best_model="eval_loss",
    greater_is_better=False,
    report_to=[],  # Explicitly disable all reporting
    remove_unused_columns=False,
)

# Use our custom data collator
data_collator = CustomDataCollatorForCausalLM(
    tokenizer=tokenizer,
    max_length=max_length
)

# Initialize trainer
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=tokenized_train,
    eval_dataset=tokenized_validation,
    data_collator=data_collator,
    tokenizer=tokenizer,
)


## Train Model with LoRA


In [None]:
# Train the model
print("Starting LoRA fine-tuning...")
trainer.train()

# Save the LoRA adapters
model.save_pretrained("./gpt2-lora-qa-final")
tokenizer.save_pretrained("./gpt2-lora-qa-final")
print("LoRA adapters saved!")


## Plot Training History


In [None]:
# Plot training history
log_history = trainer.state.log_history

# Extract training losses and steps
train_data = []
eval_data = []

for log in log_history:
    if 'train_loss' in log and 'step' in log:
        train_data.append((log['step'], log['train_loss']))
    if 'eval_loss' in log and 'step' in log:
        eval_data.append((log['step'], log['eval_loss']))

# Separate steps and losses
train_steps, train_losses = zip(*train_data) if train_data else ([], [])
eval_steps, eval_losses = zip(*eval_data) if eval_data else ([], [])

plt.figure(figsize=(15, 5))

# Combined plot showing both training and validation losses
plt.subplot(1, 3, 1)
if train_losses:
    plt.plot(train_steps, train_losses, 'bo-', linewidth=2, markersize=8, label='Training Loss')
if eval_losses:
    plt.plot(eval_steps, eval_losses, 'ro-', linewidth=2, markersize=8, label='Validation Loss')

plt.title('Training Progress')
plt.xlabel('Steps')
plt.ylabel('Loss')
plt.grid(True, alpha=0.3)
plt.legend()

# Training loss only
plt.subplot(1, 3, 2)
if train_losses:
    plt.plot(train_steps, train_losses, 'b-', linewidth=2, marker='o', markersize=8)
    plt.title('Training Loss')
    plt.xlabel('Steps')
    plt.ylabel('Loss')
    plt.grid(True, alpha=0.3)
    # Add value annotations
    for x, y in zip(train_steps, train_losses):
        plt.annotate(f'{y:.4f}', (x, y), textcoords="offset points", xytext=(0,10), ha='center')
else:
    plt.text(0.5, 0.5, 'No training loss data available', 
             horizontalalignment='center', verticalalignment='center', 
             transform=plt.gca().transAxes)
    plt.title('Training Loss')

# Validation loss only
plt.subplot(1, 3, 3)
if eval_losses:
    plt.plot(eval_steps, eval_losses, 'r-', linewidth=2, marker='o', markersize=8)
    plt.title('Validation Loss')
    plt.xlabel('Steps')
    plt.ylabel('Loss')
    plt.grid(True, alpha=0.3)
    # Add value annotations
    for x, y in zip(eval_steps, eval_losses):
        plt.annotate(f'{y:.4f}', (x, y), textcoords="offset points", xytext=(0,10), ha='center')
else:
    plt.text(0.5, 0.5, 'No validation loss data available', 
             horizontalalignment='center', verticalalignment='center', 
             transform=plt.gca().transAxes)
    plt.title('Validation Loss')

plt.tight_layout()
plt.show()

# Print the training metrics table
print("\nTraining Metrics Summary:")
print("Step\tTraining Loss\tValidation Loss")
print("-" * 40)

# Create a combined view of training and validation losses
all_steps = sorted(set(train_steps + eval_steps))
for step in all_steps:
    train_loss = next((loss for s, loss in train_data if s == step), None)
    eval_loss = next((loss for s, loss in eval_data if s == step), None)
    
    train_str = f"{train_loss:.6f}" if train_loss is not None else "-"
    eval_str = f"{eval_loss:.6f}" if eval_loss is not None else "-"
    
    print(f"{step}\t{train_str}\t\t{eval_str}")

# Analysis of training progress
print(f"\nTraining Analysis:")
if eval_losses:
    print(f"• Validation loss improved from {eval_losses[0]:.4f} to {eval_losses[-1]:.4f}")
    print(f"• Total validation loss reduction: {(eval_losses[0] - eval_losses[-1]):.4f}")
    print(f"• Improvement percentage: {((eval_losses[0] - eval_losses[-1]) / eval_losses[0] * 100):.1f}%")

if train_losses:
    print(f"• Final training loss: {train_losses[-1]:.4f}")
    
if train_losses and eval_losses:
    # Check if there's overfitting by comparing the last available losses
    if train_losses[-1] < eval_losses[-1]:
        gap = eval_losses[-1] - train_losses[-1]
        print(f"• Generalization gap: {gap:.4f} (validation loss higher than training loss)")
    else:
        print(f"• Model seems to be generalizing well")


## Inference Function for GPT-2 QA


In [None]:
def answer_question_gpt2(question, context, model, tokenizer, max_new_tokens=50):
    """
    Generate answer using fine-tuned GPT-2 with LoRA
    """
    # Format input
    input_text = f"{tokenizer.cls_token} {question.strip()} {tokenizer.sep_token} {context.strip()} {tokenizer.sep_token}"
    
    # Tokenize
    inputs = tokenizer(
        input_text,
        return_tensors="pt",
        max_length=max_length-max_new_tokens,
        truncation=True
    )
    
    # Move inputs to the same device as the model
    device = next(model.parameters()).device
    inputs = {k: v.to(device) for k, v in inputs.items()}
    
    # Generate
    model.eval()
    with torch.no_grad():
        outputs = model.generate(
            inputs["input_ids"],
            attention_mask=inputs["attention_mask"],
            max_new_tokens=max_new_tokens,
            num_return_sequences=1,
            temperature=0.7,
            do_sample=True,
            pad_token_id=tokenizer.eos_token_id,
            eos_token_id=tokenizer.eos_token_id
        )
    
    # Decode and extract answer
    full_response = tokenizer.decode(outputs[0], skip_special_tokens=False)
    
    # Extract answer part (after the second SEP token)
    parts = full_response.split(tokenizer.sep_token)
    if len(parts) >= 3:
        answer = parts[2].replace(tokenizer.eos_token, "").strip()
    else:
        answer = "Could not generate answer."
    
    return answer

# Test with examples
test_examples = [
    {
        "context": "The Eiffel Tower is a wrought-iron lattice tower on the Champ de Mars in Paris, France. It is named after the engineer Gustave Eiffel, whose company designed and built the tower. Constructed from 1887 to 1889, it was initially criticized by some of France's leading artists and intellectuals.",
        "question": "When was the Eiffel Tower built?"
    },
    {
        "context": "Machine learning is a branch of artificial intelligence (AI) and computer science which focuses on the use of data and algorithms to imitate the way that humans learn, gradually improving its accuracy. IBM has a rich history with machine learning.",
        "question": "What is machine learning?"
    }
]

print("GPT-2 LoRA Question Answering Examples:\n")
for example in test_examples:
    answer = answer_question_gpt2(example["question"], example["context"], model, tokenizer)
    print(f"Context: {example['context'][:100]}...")
    print(f"Question: {example['question']}")
    print(f"Answer: {answer}")
    print("-" * 80 + "\n")


## Evaluation Metrics for GPT-2 QA


In [None]:
def compute_exact_match(prediction, ground_truth):
    """
    Compute exact match score between prediction and ground truth.
    Returns 1 if exact match, 0 otherwise.
    """
    return int(prediction.strip().lower() == ground_truth.strip().lower())

def compute_f1(prediction, ground_truth):
    """
    Compute F1 score between prediction and ground truth.
    F1 = 2 * (precision * recall) / (precision + recall)
    """
    pred_tokens = prediction.strip().lower().split()
    truth_tokens = ground_truth.strip().lower().split()
    
    # Handle empty predictions/truths
    if len(pred_tokens) == 0 or len(truth_tokens) == 0:
        return int(pred_tokens == truth_tokens)
    
    # Find common tokens
    common_tokens = set(pred_tokens) & set(truth_tokens)
    
    if len(common_tokens) == 0:
        return 0
    
    # Calculate precision and recall
    precision = len(common_tokens) / len(pred_tokens)
    recall = len(common_tokens) / len(truth_tokens)
    
    # Calculate F1 score
    return 2 * (precision * recall) / (precision + recall)

def evaluate_gpt2_qa(model, tokenizer, dataset, max_examples=None, max_new_tokens=50):
    """
    Evaluate GPT-2 QA model using Exact Match and F1 metrics.
    Adapted from the original DistilBERT evaluation approach.
    """
    print("Starting GPT-2 QA Evaluation...")
    
    # Limit examples if specified
    examples = dataset if max_examples is None else dataset.select(range(min(max_examples, len(dataset))))
    
    total_em = 0
    total_f1 = 0
    total_count = 0
    
    predictions = []
    references = []
    
    # Process each example
    for i, example in enumerate(tqdm(examples, desc="Evaluating")):
        question = example['question']
        context = example['context']
        ground_truths = example['answers']['text']
        
        # Skip examples with no answers
        if not ground_truths:
            continue
            
        # Generate prediction
        try:
            prediction_text = answer_question_gpt2(question, context, model, tokenizer, max_new_tokens)
        except Exception as e:
            print(f"Error generating answer for example {i}: {e}")
            prediction_text = ""
        
        # Store for later analysis
        predictions.append({
            "id": example.get('id', f'example_{i}'),
            "question": question,
            "context": context[:200] + "..." if len(context) > 200 else context,
            "prediction": prediction_text,
            "ground_truths": ground_truths
        })
        
        # Calculate metrics against all possible ground truths
        if ground_truths:
            em_scores = [compute_exact_match(prediction_text, gt) for gt in ground_truths]
            f1_scores = [compute_f1(prediction_text, gt) for gt in ground_truths]
            
            # Take the best score among all ground truths
            best_em = max(em_scores)
            best_f1 = max(f1_scores)
            
            total_em += best_em
            total_f1 += best_f1
            total_count += 1
    
    # Calculate final metrics
    exact_match = (total_em / total_count) * 100 if total_count > 0 else 0
    f1_score = (total_f1 / total_count) * 100 if total_count > 0 else 0
    
    results = {
        "exact_match": exact_match,
        "f1_score": f1_score,
        "total_examples": total_count,
        "predictions": predictions
    }
    
    return results


## Run Evaluation on Validation Set


In [None]:
# Run evaluation on a subset of validation data
print("Evaluating GPT-2 LoRA model on validation set...")

# Evaluate on a smaller subset first (100 examples) for quick testing
eval_results = evaluate_gpt2_qa(
    model=model, 
    tokenizer=tokenizer, 
    dataset=dataset["validation"], 
    max_examples=100,  # Increase this for full evaluation
    max_new_tokens=50
)

# Print results
print(f"\nEvaluation Results (on {eval_results['total_examples']} examples):")
print(f"Exact Match: {eval_results['exact_match']:.2f}%")
print(f"F1 Score: {eval_results['f1_score']:.2f}%")

# Show some example predictions
print(f"\nSample Predictions:")
print("-" * 100)
for i, pred in enumerate(eval_results['predictions'][:5]):  # Show first 5
    print(f"\nExample {i+1}:")
    print(f"Question: {pred['question']}")
    print(f"Context: {pred['context']}")
    print(f"Ground Truth: {pred['ground_truths']}")
    print(f"Prediction: '{pred['prediction']}'")
    
    # Calculate individual scores for this example
    best_em = max([compute_exact_match(pred['prediction'], gt) for gt in pred['ground_truths']])
    best_f1 = max([compute_f1(pred['prediction'], gt) for gt in pred['ground_truths']])
    print(f"EM: {best_em}, F1: {best_f1:.3f}")
    print("-" * 100)


## Comparison with Original Results


In [None]:
# Compare with original DistilBERT results
print("Performance Comparison:")
print("=" * 50)
print("Original DistilBERT (from question-answering-with-transformers.ipynb):")
print("  - Exact Match: 74.73%")
print("  - F1 Score: 84.81%")
print("  - Model Type: Extractive QA")
print("  - Parameters: ~66M")
print("  - Training: Full fine-tuning")

print(f"\nGPT-2 LoRA (Current):")
print(f"  - Exact Match: {eval_results['exact_match']:.2f}%")
print(f"  - F1 Score: {eval_results['f1_score']:.2f}%")
print(f"  - Model Type: Generative QA")
print(f"  - Parameters: ~124M total (~{sum(p.numel() for p in model.parameters() if p.requires_grad)/1e6:.1f}M trainable)")
print(f"  - Training: LoRA fine-tuning")

print(f"\nKey Differences:")
print("• DistilBERT: Extracts exact spans from context")
print("• GPT-2: Generates answers based on understanding")
print("• LoRA: Only ~0.3% of parameters trained vs 100% for DistilBERT")
print("• GPT-2 can potentially provide more natural/creative answers")

# Performance analysis
if eval_results['f1_score'] > 0:
    f1_ratio = eval_results['f1_score'] / 84.81
    em_ratio = eval_results['exact_match'] / 74.73
    print(f"\nRelative Performance:")
    print(f"• F1 Score: {f1_ratio:.1%} of original DistilBERT")
    print(f"• Exact Match: {em_ratio:.1%} of original DistilBERT")
    
    if f1_ratio > 0.7:
        print("• Strong performance considering parameter efficiency!")
    elif f1_ratio > 0.5:
        print("• Decent performance with much fewer trainable parameters")
    else:
        print("• May need more training or hyperparameter tuning")


## Model Analysis and LoRA Statistics


In [None]:
# Analyze the LoRA configuration
print("LoRA Configuration Analysis:")
print(f"Rank (r): {lora_config.r}")
print(f"Alpha: {lora_config.lora_alpha}")
print(f"Dropout: {lora_config.lora_dropout}")
print(f"Target modules: {lora_config.target_modules}")

# Print trainable parameters breakdown
print("\nTrainable Parameters Breakdown:")
total_params = 0
trainable_params = 0

for name, param in model.named_parameters():
    total_params += param.numel()
    if param.requires_grad:
        trainable_params += param.numel()
        if 'lora' in name.lower():
            print(f"  {name}: {param.numel():,} parameters")

print(f"\nTotal parameters: {total_params:,}")
print(f"Trainable parameters: {trainable_params:,}")
print(f"Percentage of trainable parameters: {100 * trainable_params / total_params:.2f}%")

# Show memory efficiency
print(f"\nMemory Efficiency:")
print(f"Original model would require training: {total_params:,} parameters")
print(f"With LoRA, only training: {trainable_params:,} parameters")
print(f"Memory reduction: {100 * (1 - trainable_params / total_params):.1f}%")
