# Question Answering 

In [1]:
!pip install datasets
import os
os.environ['TRANSFORMERS_NO_ADVISORY_WARNINGS'] = 'true'




# IMPORT LIBRARIES

In [2]:
import torch
import torch.nn as nn
from tqdm import tqdm
import pandas as pd
from datasets import load_dataset
from transformers import BertTokenizerFast
from torch.utils.data import DataLoader
from transformers import BertModel
from sklearn.metrics import accuracy_score, precision_score, recall_score,f1_score

# LOAD DATA

In [3]:
class LoadFormatData:
    """
    Class to load and format data from the SWAG dataset.
    """
    def __init__(self, train_data_size=1000, batch_size=32):
        """
        Initialize the DataLoader with specified data size and batch size.

        :param train_data_size: Size of the dataset to load.
        :param batch_size: Batch size for DataLoader.
        """
        self.batch_size = batch_size
        self.tokenizer = BertTokenizerFast.from_pretrained('bert-base-uncased', use_fast=True)
        self.dataset = load_dataset('swag', 'regular')

        # Limit dataset size
        self.dataset['train'] = self.dataset['train'].select(range(train_data_size))
        validation_data_size = int(train_data_size*0.2)
        self.dataset['validation'] = self.dataset['validation'].select(range(validation_data_size))

        # Tokenize and encode the dataset
        self.encoded_data = self.dataset.map(self.tokenize_data_batched, batched=True)
        self.train_data = self.encoded_data['train']
        self.validation_data = self.encoded_data['validation']

    def tokenize_data_batched(self, batch):
        """
        Tokenize a batch of data from the SWAG dataset.

        :param batch: Batch object containing the data.
        :return: Tokenized data.
        """
        swag_data = {
            'sent1': batch['sent1'],
            'sent2': batch['sent2'],
            'ending0': batch['ending0'],
            'ending1': batch['ending1'],
            'ending2': batch['ending2'],
            'ending3': batch['ending3'],
            'label': batch['label']
        }

        # Convert dictionary to DataFrame
        swag_data_df = pd.DataFrame(swag_data)

        # Expand sentence headers to match the number of choices
        ending_columns = ['ending0', 'ending1', 'ending2', 'ending3']
        first_sentences = [context for context in swag_data_df['sent1'] for _ in range(4)]
        second_sentences = [
            "{} {}".format(header, swag_data_df[ending].iloc[i])
            for i, header in enumerate(swag_data_df['sent2'])
            for ending in ending_columns
        ]

        # Tokenize the sentences using the tokenizer
        tokenized_batch = self.tokenizer(first_sentences, second_sentences, truncation=True)

        # Reshape tokenized examples
        reshaped_tokenized = {}
        for key, value in tokenized_batch.items():
            reshaped_tokenized[key] = [value[i:i + 4] for i in range(0, len(value), 4)]

        return reshaped_tokenized

    @staticmethod
    def collate_batch(batch):
        """
        Collate a batch of tokenized data.

        :param batch: List of tokenized data.
        :return: Padded and reshaped tokenized data with labels.
        """
        batch_size = len(batch)
        num_choices = 4

        # Flatten features
        flattened_features = [
            {k: v[i] for k, v in feature.items() if k in ["input_ids", "attention_mask"]}
            for feature in batch
            for i in range(num_choices)
        ]

        # Padding using tokenizer
        tokenizer = BertTokenizerFast.from_pretrained('bert-base-uncased')
        flattened_features_padded = tokenizer.pad(
            flattened_features,
            padding=True,
            return_tensors="pt",
        )

        # Reshape tokenized examples
        for k, v in flattened_features_padded.items():
            flattened_features_padded[k] = v.view(batch_size, num_choices, -1)

        # Convert labels to tensor
        labels = [feature['label'] for feature in batch]
        flattened_features_padded["label"] = torch.tensor(labels, dtype=torch.int64)

        return flattened_features_padded

    def get_train_loader(self):
        """
        Get the DataLoader for the training dataset.

        :return: Training DataLoader.
        """
        return DataLoader(
            self.train_data, batch_size=self.batch_size, collate_fn=LoadFormatData.collate_batch, shuffle=True
        )

    def get_validation_loader(self):
        """
        Get the DataLoader for the training dataset.

        :return: Validation DataLoader.
        """
        return DataLoader(
            self.validation_data, batch_size=self.batch_size, collate_fn=LoadFormatData.collate_batch, shuffle=True
        )

# CUSTOM MODEL

In [4]:
class CustomBertForMultipleChoice(nn.Module):
    """
    Custom BERT model for multiple choice tasks.
    """
    def __init__(self, model_name='bert-base-uncased'):
        """
        Initialize the CustomBertForMultipleChoice model.

        :param model_name: Name of the pretrained BERT model.
        """
        super(CustomBertForMultipleChoice, self).__init__()

        # Load pretrained BERT model
        self.bert = BertModel.from_pretrained(model_name)

        # Dropout layer for regularization
        self.dropout = nn.Dropout(0.1)

        # Classifier layer
        self.classifier = nn.Linear(self.bert.config.hidden_size, 1)

    def forward(self, input_ids, attention_mask):
        """
        Forward pass of the CustomBertForMultipleChoice model.

        :param input_ids: Input token IDs.
        :param attention_mask: Attention mask.
        :return: Logits and predicted indices.
        """

        # Flatten the input to (batch_size*num_choices, seq_length)
        batch_size, num_choices, seq_length = input_ids.size()
        flat_input_ids = input_ids.view(batch_size * num_choices, seq_length)
        flat_attention_mask = attention_mask.view(batch_size * num_choices, seq_length)

        # Pass through BERT model
        outputs = self.bert(input_ids=flat_input_ids, attention_mask=flat_attention_mask)

        # Extract pooled output and apply dropout
        pooled_output = self.dropout(outputs.pooler_output)

        # Pass through classifier
        logits = self.classifier(pooled_output)

        # Reshape logits to batch_size x num_choices
        logits = logits.view(batch_size, num_choices)

        # Get predicted indices
        _, predicted = torch.max(logits, 1)

        return logits, predicted

# TEST TRAIN LOOP

In [5]:
class SwagTrainer:
    """
    Class for training and testing a model.
    """
    def __init__(self, model, train_loader,validation_loader, lr=1e-5):
        """
        Initialize the SwagTrainer.

        :param model: The model to train and test.
        :param train_loader: DataLoader for training data.
        :param validation_loader: Learning rate for the optimizer
        :param lr:
        """
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.model = model.to(self.device)
        self.train_loader = train_loader
        self.validation_loader = validation_loader
        self.optimizer = torch.optim.AdamW(self.model.parameters(), lr=lr)
        self.criterion = nn.CrossEntropyLoss()

    def train(self, epochs=5):
        """
        Train the model.

        :param epochs: Number of epochs
        :return: Number of training epochs
        """
        self.model.train()
        for epoch in range(epochs):
            total_loss = 0
            correct_predictions = 0
            total_samples = 0

            # Initialize tqdm to visualize progress
            pbar = tqdm(total=len(self.train_loader), desc=f'Epoch {epoch+1}', unit='batch')

            for batch in self.train_loader:
                input_ids = batch["input_ids"].to(self.device)
                attention_mask = batch["attention_mask"].to(self.device)
                labels = batch["label"].to(self.device)

                # Zero out gradients
                self.optimizer.zero_grad()

                # Forward pass
                logits, predicted = self.model(input_ids=input_ids, attention_mask=attention_mask)

                # Calculate loss
                loss = self.criterion(logits.view(-1, 4), labels.view(-1))

                # Backward pass and optimization
                loss.backward()
                self.optimizer.step()

                total_loss += loss.item()

                # Calculate accuracy
                correct_predictions += (predicted == labels).sum().item()
                total_samples += labels.size(0)

                pbar.update(1)

            pbar.close()

            avg_loss = total_loss / len(self.train_loader)
            avg_accuracy = correct_predictions / total_samples
            print(f'Epoch {epoch + 1}/{epochs},Training Loss: {avg_loss}, Training Accuracy: {avg_accuracy * 100:.2f}%')

    def validation(self):
        """
        Test the model on the test data.
        """
        self.model.eval()
        correct_predictions = 0
        total_samples = 0
        losses = []
        y_true = []
        y_pred = []

        with torch.no_grad():
            for batch in tqdm(self.validation_loader, desc="Validation"):
                input_ids = batch["input_ids"].to(self.device)
                attention_mask = batch["attention_mask"].to(self.device)
                labels = batch["label"].to(self.device)

                # Forward pass
                logits, predicted = self.model(input_ids=input_ids, attention_mask=attention_mask)

                # Calculate loss
                loss = self.criterion(logits.view(-1, 4), labels.view(-1))
                losses.append(loss.item())

                # Calculate accuracy
                correct_predictions += (predicted == labels).sum().item()
                total_samples += labels.size(0)

                # Append true labels and predicted labels
                y_true.extend(labels.cpu().numpy())
                y_pred.extend(predicted.cpu().numpy())

        val_accuracy = correct_predictions / total_samples
        val_loss = sum(losses)/len(losses)
        precision = precision_score(y_true, y_pred, average='macro')
        recall = recall_score(y_true, y_pred, average='macro')
        f1 = f1_score(y_true, y_pred, average='macro')

        print(f'Validation Accuracy: {val_accuracy * 100:.2f}%')
        print(f'Validation Precision: {precision * 100:.2f}%')
        print(f'Validation Recall: {recall * 100:.2f}%')
        print(f'Validation Loss: {val_loss * 100:.2f}')

# RUN CODE

In [6]:
# Define data size and batch size
dataset = load_dataset('swag', 'regular')
print("Total training size ",len(dataset['train']))

# Reducing the data size for training so as to do the training faster and it suits the memory restrictions as supported by Colab
train_data_size = len(dataset['train'])//5

batch_size = 32
num_epoch = 3

# Initialize and load the formatted data
loader = LoadFormatData(train_data_size, batch_size)

# Get DataLoader for training and validation
train_loader = loader.get_train_loader()
validation_loader = loader.get_validation_loader()

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


Total training size  73546


Map:   0%|          | 0/2941 [00:00<?, ? examples/s]

In [7]:

# Initialize the custom BERT model for multiple choice
model = CustomBertForMultipleChoice()

# Initialize the trainer and tester with the model and data loaders
swag_train_obj = SwagTrainer(model, train_loader,validation_loader)

# Train the model
swag_train_obj.train(epochs=num_epoch)

swag_train_obj.validation()

Epoch 1: 100%|██████████| 460/460 [09:23<00:00,  1.23s/batch]


Epoch 1/3,Training Loss: 0.9774736372025117, Training Accuracy: 59.60%


Epoch 2: 100%|██████████| 460/460 [09:22<00:00,  1.22s/batch]


Epoch 2/3,Training Loss: 0.6567545266255088, Training Accuracy: 74.88%


Epoch 3: 100%|██████████| 460/460 [09:24<00:00,  1.23s/batch]


Epoch 3/3,Training Loss: 0.435696672356647, Training Accuracy: 83.87%


Validation: 100%|██████████| 92/92 [00:46<00:00,  1.96it/s]

Validation Accuracy: 74.94%
Validation Precision: 74.95%
Validation Recall: 74.93%
Validation Loss: 68.68



