# mT5 Question Answering System with LoRA Fine-tuning
## Bangla NCTB Textbook Dataset

This notebook fine-tunes mT5-base model using LoRA for Bangla question answering.

## 1. Install Required Packages

In [None]:
!pip install transformers peft datasets sentencepiece accelerate matplotlib pandas torch scikit-learn

## 2. Import Libraries

In [None]:
import os
import pandas as pd
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader
from transformers import (
    MT5ForConditionalGeneration,
    MT5Tokenizer,
    get_linear_schedule_with_warmup
)
from peft import (
    get_peft_model,
    LoraConfig,
    TaskType,
    PeftModel
)
from tqdm.auto import tqdm
import matplotlib.pyplot as plt
import json
from collections import Counter
import string
import re

# Set random seeds
def set_seed(seed=42):
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    np.random.seed(seed)

set_seed(42)

# Check device
device = torch.device('mps' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")
if torch.cuda.is_available():
    print(f"GPU: {torch.cuda.get_device_name(0)}")
    print(f"GPU Memory: {torch.cuda.get_device_properties(0).total_memory / 1e9:.2f} GB")

## 3. Load and Explore Dataset

In [None]:
# Load dataset - Update this path to your CSV file location
df = pd.read_csv('Textbook_Dataset_from_NCTB.csv')

print(f"Total samples: {len(df)}")
print(f"\nColumns: {df.columns.tolist()}")
print(f"\nFirst few samples:")
df.head()

In [None]:
# Clean data
df = df.dropna()
df = df.reset_index(drop=True)
print(f"Samples after cleaning: {len(df)}")

# Analyze text lengths
df['passage_length'] = df['Passage'].str.len()
df['question_length'] = df['Question'].str.len()
df['answer_length'] = df['AnsText'].str.len()

print(f"\nText Statistics:")
print(df[['passage_length', 'question_length', 'answer_length']].describe())

In [None]:
# Split dataset (80% train, 10% validation, 10% test)
train_size = int(0.8 * len(df))
val_size = int(0.1 * len(df))

train_df = df[:train_size].reset_index(drop=True)
val_df = df[train_size:train_size + val_size].reset_index(drop=True)
test_df = df[train_size + val_size:].reset_index(drop=True)

print(f"Train size: {len(train_df)}")
print(f"Validation size: {len(val_df)}")
print(f"Test size: {len(test_df)}")

## 4. Create Dataset Class

In [None]:
class QADataset(Dataset):
    """Custom Dataset for Question Answering"""
    
    def __init__(self, dataframe, tokenizer, max_source_length=512, max_target_length=128):
        self.data = dataframe
        self.tokenizer = tokenizer
        self.max_source_length = max_source_length
        self.max_target_length = max_target_length
    
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, idx):
        row = self.data.iloc[idx]
        
        # Format: "question: <question> context: <passage>"
        source_text = f"question: {row['Question']} context: {row['Passage']}"
        target_text = row['AnsText']
        
        # Tokenize source
        source_encoding = self.tokenizer(
            source_text,
            max_length=self.max_source_length,
            padding='max_length',
            truncation=True,
            return_tensors='pt'
        )
        
        # Tokenize target
        target_encoding = self.tokenizer(
            target_text,
            max_length=self.max_target_length,
            padding='max_length',
            truncation=True,
            return_tensors='pt'
        )
        
        labels = target_encoding['input_ids'].squeeze()
        # Replace padding token id with -100 for loss computation
        labels[labels == self.tokenizer.pad_token_id] = -100
        
        return {
            'input_ids': source_encoding['input_ids'].squeeze(),
            'attention_mask': source_encoding['attention_mask'].squeeze(),
            'labels': labels
        }

## 5. Define Evaluation Metrics

In [None]:
def normalize_answer(s):
    """Normalize answer text for evaluation"""
    def remove_articles(text):
        return re.sub(r'\b(a|an|the)\b', ' ', text)
    
    def white_space_fix(text):
        return ' '.join(text.split())
    
    def remove_punc(text):
        exclude = set(string.punctuation)
        return ''.join(ch for ch in text if ch not in exclude)
    
    def lower(text):
        return text.lower()
    
    return white_space_fix(remove_articles(remove_punc(lower(s))))

def exact_match_score(prediction, ground_truth):
    """Compute exact match score"""
    return int(normalize_answer(prediction) == normalize_answer(ground_truth))

def f1_score(prediction, ground_truth):
    """Compute F1 score between prediction and ground truth"""
    pred_tokens = normalize_answer(prediction).split()
    truth_tokens = normalize_answer(ground_truth).split()
    
    if len(pred_tokens) == 0 or len(truth_tokens) == 0:
        return int(pred_tokens == truth_tokens)
    
    common_tokens = Counter(pred_tokens) & Counter(truth_tokens)
    num_common = sum(common_tokens.values())
    
    if num_common == 0:
        return 0
    
    precision = num_common / len(pred_tokens)
    recall = num_common / len(truth_tokens)
    f1 = 2 * (precision * recall) / (precision + recall)
    
    return f1

def compute_metrics(predictions, ground_truths):
    """Compute exact match and F1 scores for a list of predictions"""
    em_scores = []
    f1_scores = []
    
    for pred, truth in zip(predictions, ground_truths):
        em_scores.append(exact_match_score(pred, truth))
        f1_scores.append(f1_score(pred, truth))
    
    return {
        'exact_match': np.mean(em_scores) * 100,
        'f1': np.mean(f1_scores) * 100
    }

## 6. Load Model and Apply LoRA

In [None]:
# Load tokenizer
model_name = "google/mt5-base"
print(f"Loading tokenizer: {model_name}")
tokenizer = MT5Tokenizer.from_pretrained(model_name)

# Load base model
print(f"Loading model: {model_name}")
base_model = MT5ForConditionalGeneration.from_pretrained(
    model_name,
    torch_dtype=torch.float32
)

print(f"\nTotal parameters: {sum(p.numel() for p in base_model.parameters()):,}")

In [None]:
# Configure LoRA
lora_config = LoraConfig(
    task_type=TaskType.SEQ_2_SEQ_LM,
    inference_mode=False,
    r=16,  # LoRA rank
    lora_alpha=32,  # LoRA alpha parameter
    lora_dropout=0.1,
    target_modules=["q", "v"]  # Apply LoRA to query and value matrices
)

# Apply LoRA to the model
model = get_peft_model(base_model, lora_config)
model.print_trainable_parameters()

model = model.to(device)

## 7. Create DataLoaders

In [None]:
# Create datasets
train_dataset = QADataset(train_df, tokenizer)
val_dataset = QADataset(val_df, tokenizer)
test_dataset = QADataset(test_df, tokenizer)

# Create dataloaders
batch_size = 8
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

print(f"Batch size: {batch_size}")
print(f"Training batches: {len(train_loader)}")
print(f"Validation batches: {len(val_loader)}")
print(f"Test batches: {len(test_loader)}")

## 8. Setup Training

In [None]:
# Training configuration
num_epochs = 5
learning_rate = 5e-4

optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)
total_steps = len(train_loader) * num_epochs
scheduler = get_linear_schedule_with_warmup(
    optimizer,
    num_warmup_steps=int(0.1 * total_steps),
    num_training_steps=total_steps
)

print(f"Number of epochs: {num_epochs}")
print(f"Learning rate: {learning_rate}")
print(f"Total training steps: {total_steps}")
print(f"Warmup steps: {int(0.1 * total_steps)}")

## 9. Training and Evaluation Functions

In [None]:
def train_epoch(model, dataloader, optimizer, scheduler, device):
    """Train for one epoch"""
    model.train()
    total_loss = 0
    progress_bar = tqdm(dataloader, desc="Training")
    
    for batch in progress_bar:
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        labels = batch['labels'].to(device)
        
        # Forward pass
        outputs = model(
            input_ids=input_ids,
            attention_mask=attention_mask,
            labels=labels
        )
        
        loss = outputs.loss
        total_loss += loss.item()
        
        # Backward pass
        optimizer.zero_grad()
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        optimizer.step()
        scheduler.step()
        
        progress_bar.set_postfix({'loss': loss.item()})
    
    return total_loss / len(dataloader)

def evaluate(model, dataloader, tokenizer, device):
    """Evaluate the model"""
    model.eval()
    total_loss = 0
    predictions = []
    ground_truths = []
    
    progress_bar = tqdm(dataloader, desc="Evaluating")
    
    with torch.no_grad():
        for batch in progress_bar:
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['labels'].to(device)
            
            # Compute loss
            outputs = model(
                input_ids=input_ids,
                attention_mask=attention_mask,
                labels=labels
            )
            total_loss += outputs.loss.item()
            
            # Generate predictions
            generated_ids = model.generate(
                input_ids=input_ids,
                attention_mask=attention_mask,
                max_length=128,
                num_beams=4,
                early_stopping=True
            )
            
            # Decode predictions and labels
            pred_texts = tokenizer.batch_decode(generated_ids, skip_special_tokens=True)
            label_ids = labels.clone()
            label_ids[label_ids == -100] = tokenizer.pad_token_id
            label_texts = tokenizer.batch_decode(label_ids, skip_special_tokens=True)
            
            predictions.extend(pred_texts)
            ground_truths.extend(label_texts)
    
    avg_loss = total_loss / len(dataloader)
    metrics = compute_metrics(predictions, ground_truths)
    
    return avg_loss, metrics, predictions, ground_truths

## 10. Train the Model

In [None]:
# Training loop
train_losses = []
val_losses = []
val_em_scores = []
val_f1_scores = []
best_f1 = 0

print("="*70)
print("Starting Training")
print("="*70)

for epoch in range(num_epochs):
    print(f"\nEpoch {epoch + 1}/{num_epochs}")
    print("-" * 70)
    
    # Train
    train_loss = train_epoch(model, train_loader, optimizer, scheduler, device)
    train_losses.append(train_loss)
    print(f"Training Loss: {train_loss:.4f}")
    
    # Validate
    val_loss, val_metrics, _, _ = evaluate(model, val_loader, tokenizer, device)
    val_losses.append(val_loss)
    val_em_scores.append(val_metrics['exact_match'])
    val_f1_scores.append(val_metrics['f1'])
    
    print(f"Validation Loss: {val_loss:.4f}")
    print(f"Exact Match: {val_metrics['exact_match']:.2f}%")
    print(f"F1 Score: {val_metrics['f1']:.2f}%")
    
    # Save best model
    if val_metrics['f1'] > best_f1:
        best_f1 = val_metrics['f1']
        print(f"âœ“ New best F1 score! Saving model...")
        model.save_pretrained('./best_model')
        tokenizer.save_pretrained('./best_model')

print("\n" + "="*70)
print("Training Completed!")
print("="*70)

## 11. Evaluate on Test Set

In [None]:
# Load best model
print("Loading best model...")
best_model = MT5ForConditionalGeneration.from_pretrained(model_name)
best_model = PeftModel.from_pretrained(best_model, './best_model')
best_model = best_model.to(device)

# Evaluate on test set
test_loss, test_metrics, test_predictions, test_ground_truths = evaluate(
    best_model, test_loader, tokenizer, device
)

print("\nTest Results:")
print("="*70)
print(f"Test Loss: {test_loss:.4f}")
print(f"Exact Match: {test_metrics['exact_match']:.2f}%")
print(f"F1 Score: {test_metrics['f1']:.2f}%")
print("="*70)

## 12. Visualize Results

In [None]:
# Create visualizations
fig, axes = plt.subplots(2, 2, figsize=(15, 12))

epochs_range = range(1, num_epochs + 1)

# Plot 1: Training and Validation Loss
axes[0, 0].plot(epochs_range, train_losses, 'b-o', label='Training Loss', linewidth=2)
axes[0, 0].plot(epochs_range, val_losses, 'r-o', label='Validation Loss', linewidth=2)
axes[0, 0].set_xlabel('Epoch', fontsize=12)
axes[0, 0].set_ylabel('Loss', fontsize=12)
axes[0, 0].set_title('Training and Validation Loss per Epoch', fontsize=14, fontweight='bold')
axes[0, 0].legend(fontsize=10)
axes[0, 0].grid(True, alpha=0.3)

# Plot 2: Exact Match Score
axes[0, 1].plot(epochs_range, val_em_scores, 'g-o', label='Exact Match', linewidth=2, markersize=8)
axes[0, 1].set_xlabel('Epoch', fontsize=12)
axes[0, 1].set_ylabel('Exact Match (%)', fontsize=12)
axes[0, 1].set_title('Exact Match Score per Epoch', fontsize=14, fontweight='bold')
axes[0, 1].legend(fontsize=10)
axes[0, 1].grid(True, alpha=0.3)
axes[0, 1].set_ylim([0, 100])

# Plot 3: F1 Score
axes[1, 0].plot(epochs_range, val_f1_scores, 'm-o', label='F1 Score', linewidth=2, markersize=8)
axes[1, 0].set_xlabel('Epoch', fontsize=12)
axes[1, 0].set_ylabel('F1 Score (%)', fontsize=12)
axes[1, 0].set_title('F1 Score per Epoch', fontsize=14, fontweight='bold')
axes[1, 0].legend(fontsize=10)
axes[1, 0].grid(True, alpha=0.3)
axes[1, 0].set_ylim([0, 100])

# Plot 4: Combined Metrics
axes[1, 1].plot(epochs_range, val_em_scores, 'g-o', label='Exact Match', linewidth=2, markersize=8)
axes[1, 1].plot(epochs_range, val_f1_scores, 'm-o', label='F1 Score', linewidth=2, markersize=8)
axes[1, 1].set_xlabel('Epoch', fontsize=12)
axes[1, 1].set_ylabel('Score (%)', fontsize=12)
axes[1, 1].set_title('Exact Match vs F1 Score Comparison', fontsize=14, fontweight='bold')
axes[1, 1].legend(fontsize=10)
axes[1, 1].grid(True, alpha=0.3)
axes[1, 1].set_ylim([0, 100])

plt.tight_layout()
plt.savefig('training_metrics.png', dpi=300, bbox_inches='tight')
plt.show()

print("Training metrics saved to 'training_metrics.png'")

## 13. Save Results

In [None]:
# Save training results
results = {
    'train_losses': train_losses,
    'val_losses': val_losses,
    'val_em_scores': val_em_scores,
    'val_f1_scores': val_f1_scores,
    'test_loss': test_loss,
    'test_exact_match': test_metrics['exact_match'],
    'test_f1': test_metrics['f1'],
    'num_epochs': num_epochs,
    'batch_size': batch_size,
    'learning_rate': learning_rate
}

with open('training_results.json', 'w', encoding='utf-8') as f:
    json.dump(results, f, indent=4)

print("Training results saved to 'training_results.json'")

# Save test predictions
test_results_df = pd.DataFrame({
    'question': test_df['Question'].values,
    'context': test_df['Passage'].values,
    'ground_truth': test_ground_truths,
    'prediction': test_predictions
})
test_results_df.to_csv('test_predictions.csv', index=False, encoding='utf-8')

print("Test predictions saved to 'test_predictions.csv'")

## 14. Show Sample Predictions

In [None]:
# Display sample predictions
print("\nSample Predictions from Test Set:")
print("="*70)

num_samples = min(10, len(test_results_df))
sample_indices = np.random.choice(len(test_results_df), num_samples, replace=False)

for idx in sample_indices:
    row = test_results_df.iloc[idx]
    print(f"\nQuestion: {row['question']}")
    print(f"Ground Truth: {row['ground_truth']}")
    print(f"Prediction: {row['prediction']}")
    
    # Calculate match
    em = exact_match_score(row['prediction'], row['ground_truth'])
    f1 = f1_score(row['prediction'], row['ground_truth'])
    print(f"EM: {em}, F1: {f1:.2f}")
    print("-" * 70)

## 15. Test Inference Function

In [None]:
def answer_question(question, context, model, tokenizer, device):
    """Generate answer for a given question and context"""
    model.eval()
    
    # Format input
    input_text = f"question: {question} context: {context}"
    
    # Tokenize
    input_ids = tokenizer(
        input_text,
        max_length=512,
        truncation=True,
        return_tensors='pt'
    ).input_ids.to(device)
    
    # Generate answer
    with torch.no_grad():
        output_ids = model.generate(
            input_ids,
            max_length=128,
            num_beams=4,
            early_stopping=True
        )
    
    # Decode answer
    answer = tokenizer.decode(output_ids[0], skip_special_tokens=True)
    
    return answer

In [None]:
# Test the inference function
sample = test_df.iloc[0]

print("Test Inference:")
print("="*70)
print(f"Question: {sample['Question']}")
print(f"Context: {sample['Passage'][:200]}...")
print(f"\nGround Truth: {sample['AnsText']}")

predicted_answer = answer_question(
    sample['Question'], 
    sample['Passage'], 
    best_model, 
    tokenizer, 
    device
)
print(f"Prediction: {predicted_answer}")
print("="*70)

## 16. Print Final Summary

In [None]:
print("\n" + "="*70)
print("FINAL SUMMARY")
print("="*70)
print(f"\nDataset:")
print(f"  Total samples: {len(df)}")
print(f"  Train: {len(train_df)}, Val: {len(val_df)}, Test: {len(test_df)}")
print(f"\nTraining:")
print(f"  Epochs: {num_epochs}")
print(f"  Batch size: {batch_size}")
print(f"  Learning rate: {learning_rate}")
print(f"\nInitial Performance (Epoch 1):")
print(f"  Train Loss: {train_losses[0]:.4f}")
print(f"  Val Loss: {val_losses[0]:.4f}")
print(f"  Exact Match: {val_em_scores[0]:.2f}%")
print(f"  F1 Score: {val_f1_scores[0]:.2f}%")
print(f"\nFinal Performance (Epoch {num_epochs}):")
print(f"  Train Loss: {train_losses[-1]:.4f}")
print(f"  Val Loss: {val_losses[-1]:.4f}")
print(f"  Exact Match: {val_em_scores[-1]:.2f}%")
print(f"  F1 Score: {val_f1_scores[-1]:.2f}%")
print(f"\nTest Set Performance:")
print(f"  Test Loss: {test_loss:.4f}")
print(f"  Exact Match: {test_metrics['exact_match']:.2f}%")
print(f"  F1 Score: {test_metrics['f1']:.2f}%")
print(f"\nImprovement:")
print(f"  Loss reduction: {((train_losses[0] - train_losses[-1]) / train_losses[0] * 100):.1f}%")
print(f"  EM improvement: +{val_em_scores[-1] - val_em_scores[0]:.1f}%")
print(f"  F1 improvement: +{val_f1_scores[-1] - val_f1_scores[0]:.1f}%")
print("\n" + "="*70)
print("Training completed successfully!")
print("Model saved in './best_model' directory")
print("="*70)