In [1]:
import json
from transformers import DistilBertTokenizerFast
from typing import List, Dict, Tuple

def extract_squad_data(json_path: str) -> Tuple[List[str], List[str], List[Dict]]:
    """
    Extracts context, questions and answers from SQuAD format dataset
    """
    # Initialize storage lists
    extracted_contexts = []
    extracted_questions = []
    extracted_answers = []
    
    # Parse JSON file
    try:
        with open(json_path, 'rb') as squad_file:
            raw_data = json.load(squad_file)
    except Exception as e:
        raise Exception(f"Failed to load dataset from {json_path}: {str(e)}")

    # Iterate through data structure
    for data_entry in raw_data['data']:
        for para in data_entry['paragraphs']:
            current_context = para['context']
            
            # Process QA pairs
            for qa in para['qas']:
                # Handle both regular and plausible answers
                ans_field = 'plausible_answers' if 'plausible_answers' in qa else 'answers'
                
                # Extract QA pairs
                for ans in qa[ans_field]:
                    extracted_contexts.append(current_context)
                    extracted_questions.append(qa['question'])
                    extracted_answers.append(ans)
    
    return extracted_contexts, extracted_questions, extracted_answers

# Dataset paths
TRAIN_PATH = 'Spoken-SQuAD-master/spoken_train-v1.1.json'
VAL_PATH = 'Spoken-SQuAD-master/spoken_test-v1.1.json'

# Extract training and validation data
training_contexts, training_questions, training_answers = extract_squad_data(TRAIN_PATH)
validation_contexts, validation_questions, validation_answers = extract_squad_data(VAL_PATH)

In [2]:
def calculate_answer_boundaries(answer_data: List[Dict], context_data: List[str]) -> None:
   """
   Calculates and adds ending indices for answers, handling potential offsets
   Args:
       answer_data: List of answer dictionaries containing text and start position
       context_data: List of context strings corresponding to answers
   """
   MAX_OFFSET = 2
   
   for ans, ctx in zip(answer_data, context_data):
       # Extract base answer information
       target_text = ans['text']
       start_idx = ans['answer_start']
       predicted_end = start_idx + len(target_text)
       
       # Check exact match first
       text_slice = ctx[start_idx:predicted_end]
       if text_slice == target_text:
           ans['answer_end'] = predicted_end
           continue
           
       # Try small offsets if exact match fails
       for shift in range(1, MAX_OFFSET + 1):
           adjusted_start = start_idx - shift
           adjusted_end = predicted_end - shift
           
           if ctx[adjusted_start:adjusted_end] == target_text:
               # Update indices with offset correction
               ans['answer_start'] = adjusted_start 
               ans['answer_end'] = adjusted_end
               break

# Process training and validation sets
calculate_answer_boundaries(training_answers, training_contexts)
calculate_answer_boundaries(validation_answers, validation_contexts)

In [3]:
def setup_bert_tokenization(
   context_data: List[str], 
   question_data: List[str],
   model_name: str = 'distilbert-base-uncased'
) -> Tuple[dict, DistilBertTokenizerFast]:
   """
   Initializes BERT tokenizer and encodes context-question pairs
   
   Args:
       context_data: List of context paragraphs
       question_data: List of questions
       model_name: Pre-trained model identifier
       
   Returns:
       Tuple containing encoded data and tokenizer instance
   """
   # Initialize tokenizer with specified model
   bert_tokenizer = DistilBertTokenizerFast.from_pretrained(model_name)
   
   # Generate encodings for context-question pairs
   encoded_data = bert_tokenizer(
       context_data,
       question_data,
       truncation=True,
       padding=True,
       return_tensors='pt'  # PyTorch tensors
   )
   
   return encoded_data, bert_tokenizer

# Model configuration
PRETRAINED_MODEL = 'distilbert-base-uncased'

# Process datasets
train_encoded, bert_tokenizer = setup_bert_tokenization(
   training_contexts, 
   training_questions,
   PRETRAINED_MODEL
)

val_encoded, _ = setup_bert_tokenization(
   validation_contexts,
   validation_questions,
   PRETRAINED_MODEL
)

In [4]:
def map_character_positions_to_tokens(
   encoded_data: dict, 
   answer_data: List[Dict], 
   sequence_length: int
) -> None:
   """
   Maps character-level answer positions to token positions in the encoded data
   
   Args:
       encoded_data: Dictionary containing tokenized input data
       answer_data: List of answer dictionaries with start/end positions
       sequence_length: Maximum sequence length for the model
   """
   token_starts = []
   token_ends = []
   
   for sample_idx in range(len(answer_data)):
       current_answer = answer_data[sample_idx]
       
       # Map start position to token
       start_token_idx = encoded_data.char_to_token(
           sample_idx, 
           current_answer['answer_start']
       )
       
       # Map end position to token
       end_token_idx = encoded_data.char_to_token(
           sample_idx, 
           current_answer['answer_end']
       )
       
       # Handle truncation case
       if start_token_idx is None:
           start_token_idx = sequence_length
       
       # Handle whitespace ending by backtracking
       backtrack = 1
       while end_token_idx is None:
           end_token_idx = encoded_data.char_to_token(
               sample_idx, 
               current_answer['answer_end'] - backtrack
           )
           backtrack += 1
       
       # Store mapped positions
       token_starts.append(start_token_idx)
       token_ends.append(end_token_idx)
   
   # Update encodings with token positions
   encoded_data.update({
       'start_positions': token_starts,
       'end_positions': token_ends
   })

# Process datasets
MAX_SEQ_LENGTH = bert_tokenizer.model_max_length

map_character_positions_to_tokens(
   train_encoded, 
   training_answers, 
   MAX_SEQ_LENGTH
)

map_character_positions_to_tokens(
   val_encoded, 
   validation_answers, 
   MAX_SEQ_LENGTH
)

In [5]:
import torch
from torch.utils.data import Dataset
from typing import Dict

class QuestionAnsweringDataset(Dataset):
   """
   Custom Dataset class for Question Answering task
   Wraps encoded data for use with PyTorch DataLoader
   """
   
   def __init__(self, encoded_features: Dict) -> None:
       """
       Initialize dataset with encoded features
       
       Args:
           encoded_features: Dictionary containing tokenized and encoded data
       """
       self.features = encoded_features

   def __getitem__(self, index: int) -> Dict:
       """
       Get a single sample from the dataset
       
       Args:
           index: Index of the sample to retrieve
           
       Returns:
           Dictionary with tensor versions of all features at given index
       """
       return {
           feature_name: torch.tensor(feature_values[index]) 
           for feature_name, feature_values in self.features.items()
       }

   def __len__(self) -> int:
       """
       Get dataset size
       
       Returns:
           Number of samples in dataset
       """
       return len(self.features.input_ids)

# Initialize train and validation datasets
training_data = QuestionAnsweringDataset(train_encoded)
validation_data = QuestionAnsweringDataset(val_encoded)

In [6]:
from transformers import DistilBertForQuestionAnswering
from typing import Union, Optional

def create_bert_qa_model(
   model_name: str = "distilbert-base-uncased",
   device: Optional[str] = None
) -> DistilBertForQuestionAnswering:
   """
   Initializes a DistilBERT model for question answering task
   
   Args:
       model_name: Name of the pre-trained model to use
       device: Target device for model deployment (optional)
       
   Returns:
       Initialized DistilBERT QA model
   """
   qa_model = DistilBertForQuestionAnswering.from_pretrained(model_name)
   
   if device:
       qa_model = qa_model.to(device)
       
   return qa_model

# Model configuration
PRETRAINED_MODEL_NAME = "distilbert-base-uncased"

# Initialize QA model
bert_qa_model = create_bert_qa_model(PRETRAINED_MODEL_NAME)

Some weights of DistilBertForQuestionAnswering were not initialized from the model checkpoint at distilbert-base-uncased and are newly initialized: ['qa_outputs.bias', 'qa_outputs.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [None]:
from torch.utils.data import DataLoader
from transformers import AdamW, get_linear_schedule_with_warmup
from tqdm import tqdm
from accelerate import Accelerator
from typing import Tuple, Any

def configure_training_components(
   qa_model: DistilBertForQuestionAnswering,
   training_data: QuestionAnsweringDataset,
   epochs: int = 30,
   batch_size: int = 16,
   lr: float = 2e-6
) -> Tuple[Any, Any, DataLoader, Any, Accelerator]:
   """
   Configures all components needed for training
   
   Args:
       qa_model: The QA model to train
       training_data: Dataset containing training samples
       epochs: Number of training epochs
       batch_size: Batch size for training
       lr: Learning rate for optimization
   
   Returns:
       Tuple of prepared model, optimizer, dataloader, scheduler, and accelerator
   """
   # Initialize accelerator for distributed training
   training_accelerator = Accelerator()
   
   # Setup training components
   qa_model.train()
   model_optimizer = AdamW(qa_model.parameters(), lr=lr)
   data_loader = DataLoader(
       training_data, 
       batch_size=batch_size, 
       shuffle=True
   )
   
   # Configure learning rate scheduler
   total_steps = len(data_loader) * epochs
   lr_scheduler = get_linear_schedule_with_warmup(
       model_optimizer,
       num_warmup_steps=0,
       num_training_steps=total_steps
   )
   
   # Prepare components for distributed training
   prepared_components = training_accelerator.prepare(
       qa_model, 
       model_optimizer, 
       data_loader, 
       lr_scheduler
   )
   
   return (*prepared_components, training_accelerator)

def run_training_epoch(
   model: Any,
   data_loader: DataLoader,
   optimizer: AdamW,
   lr_scheduler: Any,
   accelerator: Accelerator,
   current_epoch: int
) -> None:
   """
   Executes one training epoch
   
   Args:
       model: Model being trained
       data_loader: DataLoader for training data
       optimizer: Optimization algorithm
       lr_scheduler: Learning rate scheduler
       accelerator: Training accelerator
       current_epoch: Current epoch number
   """
   progress_tracker = tqdm(data_loader, desc=f'Epoch {current_epoch}', leave=True)
   
   for batch_data in progress_tracker:
       # Clear gradients
       optimizer.zero_grad()
       
       # Forward pass
       model_output = model(
           input_ids=batch_data['input_ids'],
           attention_mask=batch_data['attention_mask'],
           start_positions=batch_data['start_positions'],
           end_positions=batch_data['end_positions']
       )
       
       # Backward pass
       batch_loss = model_output[0]
       accelerator.backward(batch_loss)
       
       # Update parameters
       optimizer.step()
       lr_scheduler.step()
       
       # Update progress bar
       progress_tracker.set_postfix(
           loss=batch_loss.item(),
           lr=optimizer.param_groups[0]['lr']
       )

# Training configuration
TRAINING_EPOCHS = 30

# Setup training environment
prepared_model, prepared_optimizer, prepared_loader, prepared_scheduler, training_accelerator = configure_training_components(
   bert_qa_model, 
   training_data
)

# Execute training loop
for epoch_num in range(TRAINING_EPOCHS):
   run_training_epoch(
       prepared_model,
       prepared_loader,
       prepared_optimizer,
       prepared_scheduler,
       training_accelerator,
       epoch_num
   )

Epoch 0: 100%|██████████| 2320/2320 [1:20:49<00:00,  2.09s/it, loss=1.99, lr=1.93e-6]
Epoch 1: 100%|██████████| 2320/2320 [1:20:38<00:00,  2.09s/it, loss=2.51, lr=1.87e-6]
Epoch 2: 100%|██████████| 2320/2320 [1:20:45<00:00,  2.09s/it, loss=2.45, lr=1.8e-6] 
Epoch 3: 100%|██████████| 2320/2320 [1:20:49<00:00,  2.09s/it, loss=1.82, lr=1.73e-6]
Epoch 4: 100%|██████████| 2320/2320 [1:20:44<00:00,  2.09s/it, loss=1.45, lr=1.67e-6] 
Epoch 5: 100%|██████████| 2320/2320 [1:20:39<00:00,  2.09s/it, loss=2.23, lr=1.6e-6]  
Epoch 6: 100%|██████████| 2320/2320 [1:20:38<00:00,  2.09s/it, loss=1.14, lr=1.53e-6] 
Epoch 7: 100%|██████████| 2320/2320 [1:20:45<00:00,  2.09s/it, loss=1.82, lr=1.47e-6] 
Epoch 8: 100%|██████████| 2320/2320 [1:20:52<00:00,  2.09s/it, loss=1.9, lr=1.4e-6]   
Epoch 9: 100%|██████████| 2320/2320 [1:21:11<00:00,  2.10s/it, loss=1.39, lr=1.33e-6] 
Epoch 10: 100%|██████████| 2320/2320 [1:21:21<00:00,  2.10s/it, loss=1.08, lr=1.27e-6] 
Epoch 11: 100%|██████████| 2320/2320 [1:22:02<

In [7]:
from pathlib import Path

# Device configuration 
DEVICE = 'cuda'

def export_qa_artifacts(
   qa_model,
   bert_tokenizer,
   output_dir: str = '../models',
   model_name: str = 'distilbert-custom'
) -> None:
   """
   Exports the fine-tuned model and its tokenizer
   
   Args:
       qa_model: Trained question answering model
       bert_tokenizer: Associated BERT tokenizer
       output_dir: Base directory for saving artifacts
       model_name: Name of the model directory
   """
   # Create output directory
   model_dir = Path(output_dir)
   model_dir.mkdir(parents=True, exist_ok=True)
   
   # Save artifacts
   model_save_path = model_dir / model_name
   qa_model.save_pretrained(model_save_path)
   bert_tokenizer.save_pretrained(model_save_path)

# Export model artifacts
export_qa_artifacts(bert_qa_model, bert_tokenizer)

In [8]:
def restore_qa_model(
   checkpoint_path: str,
   target_device: torch.device
) -> DistilBertForQuestionAnswering:
   """
   Loads a saved QA model from disk and moves it to specified device
   
   Args:
       checkpoint_path: Path to saved model checkpoint
       target_device: Device to load model onto
   
   Returns:
       Loaded QA model on specified device
   """
   loaded_model = DistilBertForQuestionAnswering.from_pretrained(checkpoint_path)
   return loaded_model.to(target_device)

# Load saved model
CHECKPOINT_DIR = "models/distilbert-custom"
bert_qa_model = restore_qa_model(CHECKPOINT_DIR, DEVICE)

In [None]:
from typing import Tuple, List

def compute_qa_performance(
   qa_model: DistilBertForQuestionAnswering,
   validation_data: QuestionAnsweringDataset,
   bert_tokenizer: DistilBertTokenizerFast,
   target_device: torch.device,
   eval_batch_size: int = 16
) -> Tuple[List[float], List[str], List[str]]:
   """
   Evaluates model performance on validation dataset
   
   Args:
       qa_model: Trained question answering model
       validation_data: Validation dataset
       bert_tokenizer: Tokenizer for text processing
       target_device: Device to run evaluation on
       eval_batch_size: Batch size for evaluation
       
   Returns:
       Tuple of (accuracy scores, predicted answers, ground truth answers)
   """
   qa_model.eval()
   eval_loader = DataLoader(validation_data, batch_size=eval_batch_size)
   progress_tracker = tqdm(eval_loader, desc='Evaluating Model')
   
   performance_scores = []
   model_predictions = []
   ground_truth = []
   
   with torch.no_grad():
       for batch_data in progress_tracker:
           # Move batch to device
           inputs = batch_data['input_ids'].to(target_device)
           attention = batch_data['attention_mask'].to(target_device)
           start_actual = batch_data['start_positions'].to(target_device)
           end_actual = batch_data['end_positions'].to(target_device)
           
           # Get model predictions
           model_output = qa_model(inputs, attention_mask=attention)
           
           # Extract predicted positions
           start_predicted = torch.argmax(model_output['start_logits'], dim=1)
           end_predicted = torch.argmax(model_output['end_logits'], dim=1)
           
           # Compute position accuracy
           start_accuracy = (start_predicted == start_actual).float().mean().item()
           end_accuracy = (end_predicted == end_actual).float().mean().item()
           performance_scores.extend([start_accuracy, end_accuracy])
           
           # Extract answer text
           for idx in range(len(start_predicted)):
               token_sequence = bert_tokenizer.convert_ids_to_tokens(batch_data['input_ids'][idx])
               
               predicted_span = token_sequence[start_predicted[idx]:end_predicted[idx] + 1]
               actual_span = token_sequence[start_actual[idx]:end_actual[idx] + 1]
               
               model_answer = bert_tokenizer.decode(bert_tokenizer.convert_tokens_to_ids(predicted_span))
               actual_answer = ' '.join(actual_span)
               
               model_predictions.append(model_answer)
               ground_truth.append(actual_answer)
   
   return performance_scores, model_predictions, ground_truth

# Evaluate model performance
accuracy_metrics, predicted_answers, reference_answers = compute_qa_performance(
   bert_qa_model, 
   validation_data, 
   bert_tokenizer, 
   DEVICE
)

Evaluating: 100%|██████████| 993/993 [15:00<00:00,  1.10it/s]


In [9]:
from collections import Counter
import string
import re
from typing import List, Dict, Union

class QAMetricsCalculator:
   """
   Calculates evaluation metrics for question answering model predictions
   """
   
   @staticmethod
   def standardize_answer(answer_text: str) -> str:
       """
       Standardizes answer text by removing articles, punctuation and extra whitespace
       """
       # Convert to lowercase
       normalized = answer_text.lower()
       
       # Remove articles
       normalized = re.sub(r'\b(a|an|the)\b', ' ', normalized)
       
       # Remove punctuation
       normalized = ''.join(char for char in normalized if char not in string.punctuation)
       
       # Normalize whitespace
       return ' '.join(normalized.split())
   
   @staticmethod
   def compute_f1_score(candidate: str, reference: str) -> float:
       """
       Calculates F1 score between predicted and ground truth answers
       """
       candidate_tokens = QAMetricsCalculator.standardize_answer(candidate).split()
       reference_tokens = QAMetricsCalculator.standardize_answer(reference).split()
       
       # Find common tokens
       overlap = Counter(candidate_tokens) & Counter(reference_tokens)
       matching_tokens = sum(overlap.values())
       
       # Return 0 if no overlap
       if matching_tokens == 0:
           return 0.0
       
       # Calculate precision and recall
       precision = matching_tokens / len(candidate_tokens)
       recall = matching_tokens / len(reference_tokens)
       
       # Return F1 score
       return 2 * (precision * recall) / (precision + recall)
   
   @staticmethod
   def is_exact_match(candidate: str, reference: str) -> bool:
       """
       Checks if predicted answer exactly matches ground truth after normalization
       """
       return QAMetricsCalculator.standardize_answer(candidate) == QAMetricsCalculator.standardize_answer(reference)
   
   @staticmethod
   def compute_metrics(
       model_predictions: List[str], 
       ground_truth_answers: List[str]
   ) -> Dict[str, float]:
       """
       Computes F1 and Exact Match metrics across all predictions
       
       Args:
           model_predictions: List of predicted answers
           ground_truth_answers: List of ground truth answers
           
       Returns:
           Dictionary containing F1 and Exact Match scores (as percentages)
       """
       if len(model_predictions) != len(ground_truth_answers):
           raise ValueError("Number of predictions must match number of ground truth answers")
       
       num_samples = len(model_predictions)
       f1_total = 0.0
       exact_match_count = 0
       
       # Calculate metrics for each prediction
       for pred, truth in zip(model_predictions, ground_truth_answers):
           f1_total += QAMetricsCalculator.compute_f1_score(pred, truth)
           exact_match_count += int(QAMetricsCalculator.is_exact_match(pred, truth))
       
       # Return percentage scores
       return {
           'f1': (f1_total * 100.0) / num_samples,
           'exact_match': (exact_match_count * 100.0) / num_samples
       }


In [None]:
# Calculate and display evaluation metrics
final_metrics = QAMetricsCalculator.compute_metrics(predicted_answers, reference_answers)

print(f"Model Performance Metrics:")
for metric_name, metric_value in final_metrics.items():
   print(f"{metric_name}: {metric_value:.2f}")

Evaluation Results:
f1: 52.54
exact_match: 39.02
