# Preprocessing
The code for this part is largely taken from https://huggingface.co/docs/transformers/tasks/question_answering and https://huggingface.co/course/chapter7/7?fw=tf, with minor changes and more annotations/comments for our understanding

In [None]:
!pip install datasets
!pip install transformers

import datasets
from transformers import AutoTokenizer, AutoModelForQuestionAnswering
import torch
from torch import nn
from torch.nn.utils import prune
from torch.optim.lr_scheduler import ExponentialLR
from torch.utils.data import Dataset, DataLoader
from tqdm import tqdm
import pickle
import pandas as pd
from time import time

# TO CHANGE MODEL USED, CHANGE HERE
model_used = "distilbert-base-uncased"

In [None]:
# TO CHANGE THE TRAIN SIZE, CHANGE HERE
n_train = 10000
train_dataset_raw = datasets.load_dataset("squad",  split=f"train[:{n_train}]")

n_val = 1000
val_dataset_raw = datasets.load_dataset("squad", split=f"validation[:{n_val}]")

In [None]:
tokenizer = AutoTokenizer.from_pretrained(model_used)

In [None]:
train_dataset_raw[0]

In [None]:
context = train_dataset_raw[0]['context']
question = train_dataset_raw[0]['question']

inputs = tokenizer(
    question,
    context,
    max_length=100,
    truncation="only_second",
    stride=50,
    return_overflowing_tokens=True,
    return_offsets_mapping=True,
)

for ids in inputs["input_ids"]:
    print(tokenizer.decode(ids))

In [None]:
max_length = 384
stride = 128

def preprocess_data_point(data_point):
    questions = [q.strip() for q in data_point["question"]]
    inputs = tokenizer(
        questions,
        data_point["context"],
        max_length=max_length,
        truncation="only_second",
        stride=stride,
        return_overflowing_tokens=True,
        return_offsets_mapping=True,
        padding="max_length",
    )

    # [[(0, 0), (0, 2), (3, 7), (8, 11), (12, 15) ...], [(0, 0), (0, 2), (3, 7), (8, 11), (12, 15), ...]]
    # each element is a list of (start_pos, end_pos) of each token in that qn+context
    offset_ls = inputs["offset_mapping"]
    # [0, 0, 0, 0, 1, 1, 1, 1]
    # Each element refers to the original index of the initial dataset
    original_indices = inputs.pop("overflow_to_sample_mapping")
    # [{'text': ['Saint Bernadette Soubirous'], 'answer_start': [515]}, {'text': ['a copper statue of Christ'], 'answer_start': [188]}]
    answers = data_point["answers"]
    
    inputs.pop("offset_mapping")
    
    starts = []
    ends = []

    for i, offsets in enumerate(offset_ls):
        original_index = original_indices[i]
        ans = answers[original_index]
        ans_start_ind = ans["answer_start"][0]
        ans_end_ind = ans["answer_start"][0] + len(ans["text"][0])
        # [None, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, None, 1, 1, 1 ...]
        # None => additional token added, 0 => qn, 1 => context
        sequence_ids = inputs.sequence_ids(i)

        # Find context start and end
        context_start = sequence_ids.index(1)
        p = context_start
        while sequence_ids[p] == 1:
            p += 1
        context_end = p - 1

        # context_char_start => starting index of first token's first char in (truncated) context
        # context_char_end => ending index of last token's last char in (truncated) context
        context_char_start = offsets[context_start][0]
        context_char_end = offsets[context_end][1]
        # if answer is not within the context fully, (start, end) == (0, 0)
        if context_char_start > ans_start_ind or context_char_end < ans_end_ind:
            starts.append(0)
            ends.append(0)
        else:
            p = context_start
            # find ans start
            while p <= context_end:
                curr_offset = offsets[p]
                if curr_offset[0] >= ans_start_ind:
                    starts.append(p)
                    break
                else:
                    p += 1
            # find ans end
            while p <= context_end:
                curr_offset = offsets[p]
                if curr_offset[1] >= ans_end_ind:
                    ends.append(p)
                    break
                else:
                    p += 1

    inputs["start_positions"] = starts
    inputs["end_positions"] = ends
    return inputs

In [None]:
train_dataset = train_dataset_raw.map(
    preprocess_data_point,
    batched=True,
    remove_columns=train_dataset_raw.column_names,
)
len(train_dataset_raw), len(train_dataset)

In [None]:
# input_ids => a list of tokens (in integers). To see the the words, need to decode using the same tokenizer
# token_type_ids => 0: question, 1: context
# attention_mask => 0: padding, 1: non-padding
# start_positions => index of start of ans
# end_positions => index of end of ans
train_dataset

In [None]:
val_dataset = val_dataset_raw.map(
    preprocess_data_point,
    batched=True,
    remove_columns=val_dataset_raw.column_names,
)

# Dataloader, Model, Train & Test Loop (baseline)
From this point onwards, most of the codes are written by us, with references to standard deep-learning code format

In [None]:
device = "cuda" if torch.cuda.is_available() else "cpu"

print(f"Using {device} device")

In [None]:
# TO CHANGE BATCH SIZE, CHANGE HERE
BATCH_SIZE = 4

train_ds = train_dataset.with_format("torch")
train_dataloader = DataLoader(train_ds, batch_size=BATCH_SIZE)

val_ds = val_dataset.with_format("torch")
val_dataloader = DataLoader(val_ds, batch_size=BATCH_SIZE)

In [None]:
# TO CHANGE MODEL, CHANGE HERE
class CustomBERTModel(nn.Module):
    def __init__(self):
        super(CustomBERTModel, self).__init__()
        self.bert = AutoModelForQuestionAnswering.from_pretrained(model_used, output_hidden_states = True)

    def forward(self, ids, mask):
        output = self.bert(
            ids, 
            attention_mask=mask)
        return output[0], output[1]

model = CustomBERTModel()

In [None]:
def train_loop(dataloader, criterion, model, optimizer, epoch, time_dict, val_dataloader=None):
    n_total_steps = len(dataloader)
    correct = 0
    total = 0
    losses = []
    accs = []
    for i, dic in enumerate(tqdm(dataloader)):
        # Time limited training: stop after 20 minutes
        if time_dict['total_time'] + time() - time_dict['previous_time'] > 20 * 60:
            return

        input_ids = dic['input_ids'].to(device)
        attention_mask = dic['attention_mask'].to(device)
        starts = dic['start_positions'].to(device)
        ends = dic['end_positions'].to(device)

        start_logits, end_logits = model(input_ids, attention_mask)

        start_loss = criterion(start_logits, starts)
        end_loss = criterion(end_logits, ends)

        start_pred = torch.argmax(start_logits, dim = -1)
        end_pred = torch.argmax(end_logits, dim = -1)

        for j in range(len(start_pred)):
            if starts[j] == start_pred[j] and ends[j] == end_pred[j]:
                correct += 1
            total += 1
        
        combined_loss = (start_loss + end_loss) / 2
        losses.append(combined_loss)

        combined_loss.backward()

        optimizer.step()
        optimizer.zero_grad()

        # Time limited training: evaluate every 4 minutes (can change)
        # evaluation time is not counted towards total time
        current_time = time()
        if current_time - time_dict['previous_time'] > 4 * 60:
            time_dict['total_time'] += current_time - time_dict['previous_time']
            l = sum(losses) / len(losses)
            losses = []
            accs.append(correct / total * 100)
            tqdm.write(f'epoch {epoch + 1} / {epochs}, step {i + 1}/{n_total_steps}, loss = {combined_loss.item():.4f}, acc = {correct / total * 100}%')
            correct = 0
            total = 0
            if val_dataloader is not None:
                test_loop(val_dataloader, model, criterion, t)
            time_dict['previous_time'] = time()
            
    accs.append(correct / total * 100)
    
    print(f'Epoch accuracy: {sum(accs) / len(accs)}%')
    scheduler.step()
    
def compute_f1(pred, truth):
    common_tokens = set(pred.tolist()) & set(truth.tolist())
    
    if len(common_tokens) == 0:
        return 0
    
    precision = len(common_tokens) / len(pred)
    recall = len(common_tokens) / len(truth)
    
    return 2 * precision * recall / (precision + recall)

def test_loop(dataloader, model, criterion, epoch):
    test_loss, correct, total = 0, 0, 0
    f1s = []
  
    with torch.no_grad():
        for i, dic in enumerate(dataloader):
            input_ids = dic['input_ids'].to(device)
            attention_mask = dic['attention_mask'].to(device)
            starts = dic['start_positions'].to(device)
            ends = dic['end_positions'].to(device)

            start_logits, end_logits = model(input_ids, attention_mask)
            start_logits = torch.nn.functional.softmax(start_logits, dim = 1)
            end_logits = torch.nn.functional.softmax(end_logits, dim = 1)

            start_loss = criterion(start_logits, starts)
            end_loss = criterion(end_logits, ends)
            combined_loss = (start_loss + end_loss) / 2

            test_loss += combined_loss

            start_pred = torch.argmax(start_logits, dim = -1)
            end_pred = torch.argmax(end_logits, dim = -1)

            for i in range(len(start_pred)):
                ground_truth = input_ids[i, starts[i]:ends[i] + 1]
                if start_pred[i] <= end_pred[i]:
                    pred = input_ids[i, start_pred[i]:end_pred[i] + 1]
                else:
                    start_logits_item = start_logits[i]
                    end_logits_item = end_logits[i]
                    j_ind, k_ind, m = 0, 0, float('-inf')
                    for j in range(max_length):
                        for k in range(j, min(j + 10, max_length)):
                            if (start_logits_item[j] * end_logits_item[k]) > m:
                                j_ind = j
                                k_ind = k
                                m = start_logits_item[j] * end_logits_item[k]
                    pred = input_ids[i, j_ind:k_ind + 1]
                    
                f1s.append(compute_f1(pred, ground_truth))
                
                if torch.equal(ground_truth, pred):
                    correct += 1
                total += 1

    print(f"Accuracy: {(100 * correct / total):>0.1f}%, Avg loss: {test_loss / len(dataloader):>8f}, F1: {sum(f1s)/len(f1s)}\n")
    torch.save(model, 'model' + str(epoch))

In [None]:
model = CustomBERTModel()
model.to(device)

criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr = 2e-5)
scheduler = ExponentialLR(optimizer, gamma=0.9)

time_dict = {'total_time': 0.0, 'previous_time': time()}

epochs = 4

for t in range(epochs):
    print(f"Epoch {t+1}\n-------------------------------")
    train_loop(train_dataloader, criterion, model, optimizer, t, time_dict, val_dataloader)

test_loop(val_dataloader, model, criterion, t)

print("Done!")

# Knowledge Distillation

In [None]:
# THIS STEP CAN TAKE A LONG TIME.
# I'M EXTRACTING THE PREDICTION FROM AN EXISTING MODEL. IF THE EXISTING MODEL'S PREDICTION HAS ALREADY BEEN SAVED AS A PICKLE FILE, JUST USE THAT ONE
# MAKE SURE THAT YOU USE THE SAME TOKENIZER (E.G. IF YOU PREDICT USING BERT-UNCASED, YOU SHOULD TOKENIZED USING UNCASED)

# trained_start_all = []
# trained_end_all = []

# def populate_trained():
#     # If you want to run this, make sure you use the uncased tokenizer
#     fully_trained = AutoModelForQuestionAnswering.from_pretrained("bert-large-uncased-whole-word-masking-finetuned-squad").to(device)
#     for i, dic in enumerate(tqdm(train_dataloader)):
#         input_ids = dic['input_ids'].to(device)
#         attention_mask = dic['attention_mask'].to(device)

#         trained_output = fully_trained(input_ids, attention_mask)

#         trained_start_all.append(trained_output[0].tolist())
#         trained_end_all.append(trained_output[1].tolist())
        
# populate_trained()

In [None]:
# UNCOMMENT THIS IF YOU WANT TO SAVE PREDICTION RESULTS FROM A FINETUNED MODEL

# with open("trained_start_bert_uncased", "wb") as fp:   #Pickling
#     pickle.dump(trained_start_all, fp)

# with open("trained_end_bert_uncased", "wb") as fp:   #Pickling
#     pickle.dump(trained_end_all, fp)

In [None]:
# UNCOMMENT THIS IF YOU WANT TO LOAD A PRE-EXISTING PREDICTION FILES. CHANGE THE PATH TO WHERE YOU PUT THE PICKLE FILE.

with open("trained_start_bert_uncased_presoftmax", "rb") as fp:   # Unpickling
    trained_start_all = pickle.load(fp)

with open("trained_end_bert_uncased_presoftmax", "rb") as fp:   # Unpickling
    trained_end_all = pickle.load(fp)

In [None]:
def train_loop(dataloader, criterion, model, optimizer, epoch, time_dict, val_dataloader=None):
    n_total_steps = len(dataloader)
    correct = 0
    total = 0
    losses = []
    accs = []
    for i, dic in enumerate(tqdm(dataloader)):
        # Time limited training: stop after 20 minutes
        if time_dict['total_time'] + time() - time_dict['previous_time'] > 20 * 60:
            return

        input_ids = dic['input_ids'].to(device)
        attention_mask = dic['attention_mask'].to(device)
        starts = dic['start_positions'].to(device)
        ends = dic['end_positions'].to(device)

        start_logits, end_logits = model(input_ids, attention_mask)
        
        # Make sure you have trained_start_all + trained_end_all loaded/computed
        T = 1
        alpha = 1
        
        trained_start = torch.FloatTensor(trained_start_all[i]).to(device)
        trained_end = torch.FloatTensor(trained_end_all[i]).to(device)
        trained_start = torch.nn.functional.softmax(trained_start / T, dim = 1)
        trained_end = torch.nn.functional.softmax(trained_end / T, dim = 1)
        start_loss_kd = criterion(start_logits, trained_start)
        end_loss_kd = criterion(end_logits, trained_end)
        
        start_loss_label = criterion(start_logits, starts)
        end_loss_label = criterion(end_logits, ends)
        
        start_loss = alpha * start_loss_kd + (1 - alpha) * start_loss_label
        end_loss = alpha * end_loss_kd + (1 - alpha) * end_loss_label

        start_pred = torch.argmax(start_logits, dim = -1)
        end_pred = torch.argmax(end_logits, dim = -1)

        for j in range(len(start_pred)):
            if starts[j] == start_pred[j] and ends[j] == end_pred[j]:
                correct += 1
            total += 1
        
        combined_loss = (start_loss + end_loss) / 2
        losses.append(combined_loss)

        combined_loss.backward()

        optimizer.step()
        optimizer.zero_grad()

        # Time limited training: evaluate every 4 minutes (can change)
        # evaluation time is not counted towards total time
        current_time = time()
        if current_time - time_dict['previous_time'] > 4 * 60:
            time_dict['total_time'] += current_time - time_dict['previous_time']
            l = sum(losses) / len(losses)
            losses = []
            accs.append(correct / total * 100)
            tqdm.write(f'epoch {epoch + 1} / {epochs}, step {i + 1}/{n_total_steps}, loss = {combined_loss.item():.4f}, acc = {correct / total * 100}%')
            correct = 0
            total = 0
            if val_dataloader is not None:
                test_loop(val_dataloader, model, criterion, t)
            time_dict['previous_time'] = time()
            
    accs.append(correct / total * 100)
    
    print(f'Epoch accuracy: {sum(accs) / len(accs)}%')
    scheduler.step()

In [None]:
model = CustomBERTModel()
model.to(device)

criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr = 2e-5)
scheduler = ExponentialLR(optimizer, gamma=0.9)

time_dict = {'total_time': 0.0, 'previous_time': time()}

epochs = 4

for t in range(epochs):
    print(f"Epoch {t+1}\n-------------------------------")
    train_loop(train_dataloader, criterion, model, optimizer, t, time_dict, val_dataloader)

test_loop(val_dataloader, model, criterion, t)

print("Done!")

# Appending CNN & LSTM-based architectures after DistilBERT's last hidden layer

## Model 1
Distilbert uncased + LSTM (1 layer, 768 hidden, bidirectional) + linear of each time step (2x768 in, 2 out) out

In [None]:
class CustomBERTModel(nn.Module):
    def __init__(self):
        super(CustomBERTModel, self).__init__()
        self.bert = AutoModelForQuestionAnswering.from_pretrained(model_used, output_hidden_states=True)
        self.num_layers = 1
        self.bidirectional = True
        self.bidirectional_multiplier = 2 if self.bidirectional else 1
        self.hidden_size = 768
        self.lstm = nn.LSTM(input_size=768,
                            hidden_size=self.hidden_size,
                            num_layers=self.num_layers,
                            bidirectional=self.bidirectional,
                            batch_first=True)
        self.linear = nn.Linear(self.bidirectional_multiplier * self.hidden_size, 2)


    def forward(self, ids, mask):
        output = self.bert(ids, attention_mask=mask)  # output is a tuple of 3 elements (start_logits, end_logits, hidden_states)
        # hidden_states is a tuple of 7 elements, each element is a tensor of shape (batch_size, sequence_length, hidden_size)
        # Get the output of the last encoder layer
        last_encoder_layer_output = output[2][-1]
        # Initialize the hidden state of the LSTM with zeros
        h0 = torch.zeros(self.num_layers * self.bidirectional_multiplier,
                         last_encoder_layer_output.size(0),
                         self.hidden_size).to(device)
        # Initialize the cell state of the LSTM with zeros
        c0 = torch.zeros(self.num_layers * self.bidirectional_multiplier,
                         last_encoder_layer_output.size(0),
                         self.hidden_size).to(device)
        # Set the initial hidden and cell states
        self.lstm.flatten_parameters()
        self.lstm.hidden = (h0, c0)
        # Pass the output of the last encoder layer to the LSTM
        lstm_output, _ = self.lstm(last_encoder_layer_output)
        # Pass the output of the LSTM to the linear layer
        linear_output = self.linear(lstm_output)
        return linear_output[:, :, 0].squeeze(-1), linear_output[:, :, 1].squeeze(-1)


model = CustomBERTModel()

## Model 2

Distilbert uncased + LSTM (1 layer, 64 hidden, bidirectional) + linear of each time step (2x64 in, 2 out) out

In [None]:
class CustomBERTModel(nn.Module):
    def __init__(self):
        super(CustomBERTModel, self).__init__()
        self.bert = AutoModelForQuestionAnswering.from_pretrained(model_used, output_hidden_states=True)
        self.num_layers = 1
        self.bidirectional = True
        self.bidirectional_multiplier = 2 if self.bidirectional else 1
        self.hidden_size = 64
        self.lstm = nn.LSTM(input_size=768,
                            hidden_size=self.hidden_size,
                            num_layers=self.num_layers,
                            bidirectional=self.bidirectional,
                            batch_first=True)
        self.linear = nn.Linear(self.bidirectional_multiplier * self.hidden_size, 2)


    def forward(self, ids, mask):
        output = self.bert(ids, attention_mask=mask)  # output is a tuple of 3 elements (start_logits, end_logits, hidden_states)
        # hidden_states is a tuple of 7 elements, each element is a tensor of shape (batch_size, sequence_length, hidden_size)
        # Get the output of the last encoder layer
        last_encoder_layer_output = output[2][-1]
        # Initialize the hidden state of the LSTM with zeros
        h0 = torch.zeros(self.num_layers * self.bidirectional_multiplier,
                         last_encoder_layer_output.size(0),
                         self.hidden_size).to(device)
        # Initialize the cell state of the LSTM with zeros
        c0 = torch.zeros(self.num_layers * self.bidirectional_multiplier,
                         last_encoder_layer_output.size(0),
                         self.hidden_size).to(device)
        # Set the initial hidden and cell states
        self.lstm.flatten_parameters()
        self.lstm.hidden = (h0, c0)
        # Pass the output of the last encoder layer to the LSTM
        lstm_output, _ = self.lstm(last_encoder_layer_output)
        # Pass the output of the LSTM to the linear layer
        linear_output = self.linear(lstm_output)
        return linear_output[:, :, 0].squeeze(-1), linear_output[:, :, 1].squeeze(-1)


model = CustomBERTModel()

## Model 3

Distilbert uncased + LSTM (2 layers, 64 hidden, bidirectional) + linear of each time step (2x64 in, 2 out) out

In [None]:
class CustomBERTModel(nn.Module):
    def __init__(self):
        super(CustomBERTModel, self).__init__()
        self.bert = AutoModelForQuestionAnswering.from_pretrained(model_used, output_hidden_states=True)
        self.num_layers = 2
        self.bidirectional = True
        self.bidirectional_multiplier = 2 if self.bidirectional else 1
        self.hidden_size = 64
        self.lstm = nn.LSTM(input_size=768,
                            hidden_size=self.hidden_size,
                            num_layers=self.num_layers,
                            bidirectional=self.bidirectional,
                            batch_first=True)
        self.linear = nn.Linear(self.bidirectional_multiplier * self.hidden_size, 2)


    def forward(self, ids, mask):
        output = self.bert(ids, attention_mask=mask)  # output is a tuple of 3 elements (start_logits, end_logits, hidden_states)
        # hidden_states is a tuple of 7 elements, each element is a tensor of shape (batch_size, sequence_length, hidden_size)
        # Get the output of the last encoder layer
        last_encoder_layer_output = output[2][-1]
        # Initialize the hidden state of the LSTM with zeros
        h0 = torch.zeros(self.num_layers * self.bidirectional_multiplier,
                         last_encoder_layer_output.size(0),
                         self.hidden_size).to(device)
        # Initialize the cell state of the LSTM with zeros
        c0 = torch.zeros(self.num_layers * self.bidirectional_multiplier,
                         last_encoder_layer_output.size(0),
                         self.hidden_size).to(device)
        # Set the initial hidden and cell states
        self.lstm.flatten_parameters()
        self.lstm.hidden = (h0, c0)
        # Pass the output of the last encoder layer to the LSTM
        lstm_output, _ = self.lstm(last_encoder_layer_output)
        # Pass the output of the LSTM to the linear layer
        linear_output = self.linear(lstm_output)
        return linear_output[:, :, 0].squeeze(-1), linear_output[:, :, 1].squeeze(-1)


model = CustomBERTModel()

## Model 4

Distilbert uncased + CNN (Cin=1 Cout=64 kernel=(3, hidden_size) stride=1 pad=(1,0)) + relu + CNN (Cin=64 Cout=2 kernel=1 stride=1 pad=0)

In [None]:
class CustomBERTModel(nn.Module):
    def __init__(self):
        super(CustomBERTModel, self).__init__()
        self.bert = AutoModelForQuestionAnswering.from_pretrained(model_used, output_hidden_states=True)
        self.hidden_size = 768
        self.sequence_length = 384
        self.conv1 = nn.Conv2d(in_channels=1,
                               out_channels=64,
                               kernel_size=(3, self.hidden_size),
                               stride=1,
                               padding=(1, 0))
        self.conv2 = nn.Conv2d(in_channels=64,
                               out_channels=2,
                               kernel_size=1,
                               stride=1,
                               padding=0)

    def forward(self, ids, mask):
        output = self.bert(ids,
                           attention_mask=mask)  # output is a tuple of 3 elements (start_logits, end_logits, hidden_states)
        # hidden_states is a tuple of 7 elements, each element is a tensor of shape (batch_size, sequence_length, hidden_size)
        # Get the output of the last encoder layer
        last_encoder_layer_output = output[2][-1]
        # Reshape the output to (batch_size, 1, sequence_length, hidden_size)
        last_encoder_layer_output = last_encoder_layer_output.unsqueeze(1)
        # Apply the first convolutional layer
        conv1_output = self.conv1(last_encoder_layer_output)
        # Apply the ReLU activation function
        conv1_output = torch.relu(conv1_output)
        # Apply the second convolutional layer
        conv2_output = self.conv2(conv1_output)  # (batch_size, 2, sequence_length, 1)
        # Reshape the output to (batch_size, 2, sequence_length)
        conv2_output = conv2_output.squeeze(3)
        # Get the start_logits and end_logits
        start_logits = conv2_output[:, 0, :]
        end_logits = conv2_output[:, 1, :]
        return start_logits, end_logits

model = CustomBERTModel()

## Model 5

Distilbert uncased + 3 CNN (Cin=1 Cout=64 kernel=((3 and 5 and 7), hidden_size) stride=1 pad=(1,0)) + relu + concatenate in channels dimension + CNN (Cin=24 Cout=2 kernel=1 stride=1 pad=0)

In [None]:
class CustomBERTModel(nn.Module):
    def __init__(self):
        super(CustomBERTModel, self).__init__()
        self.bert = AutoModelForQuestionAnswering.from_pretrained(model_used, output_hidden_states=True)
        self.hidden_size = 768
        self.sequence_length = 384
        # Convolve the output with kernel size 3, 5, and 7
        self.conv1 = nn.Conv2d(in_channels=1,
                               out_channels=8,
                               kernel_size=(3, self.hidden_size),
                               stride=1,
                               padding=(1, 0))
        self.conv2 = nn.Conv2d(in_channels=1,
                               out_channels=8,
                               kernel_size=(5, self.hidden_size),
                               stride=1,
                               padding=(2, 0))
        self.conv3 = nn.Conv2d(in_channels=1,
                               out_channels=8,
                               kernel_size=(7, self.hidden_size),
                               stride=1,
                               padding=(3, 0))
        # Convolve the concatenated output with kernel size 1
        self.conv4 = nn.Conv2d(in_channels=24,
                               out_channels=2,
                               kernel_size=1,
                               stride=1,
                               padding=0)

    def forward(self, ids, mask):
        output = self.bert(ids,
                           attention_mask=mask)  # output is a tuple of 3 elements (start_logits, end_logits, hidden_states)
        # hidden_states is a tuple of 7 elements, each element is a tensor of shape (batch_size, sequence_length, hidden_size)
        # Get the output of the last encoder layer
        last_encoder_layer_output = output[2][-1]
        # Reshape the output to (batch_size, 1, sequence_length, hidden_size)
        last_encoder_layer_output = last_encoder_layer_output.unsqueeze(1)
        # Apply the first convolutional layer
        conv1_output = self.conv1(last_encoder_layer_output)
        # Apply the ReLU activation function
        conv1_output = torch.relu(conv1_output)
        # Apply the second convolutional layer
        conv2_output = self.conv2(last_encoder_layer_output)
        # Apply the ReLU activation function
        conv2_output = torch.relu(conv2_output)
        # Apply the third convolutional layer
        conv3_output = self.conv3(last_encoder_layer_output)
        # Apply the ReLU activation function
        conv3_output = torch.relu(conv3_output)
        # Concatenate the output of the three convolutional layers
        conv_output = torch.cat((conv1_output, conv2_output, conv3_output), dim=1)
        # Apply the fourth convolutional layer
        conv4_output = self.conv4(conv_output)
        # Reshape the output to (batch_size, 2, sequence_length)
        conv4_output = conv4_output.squeeze(3)
        # Get the start_logits and end_logits
        start_logits = conv4_output[:, 0, :]
        end_logits = conv4_output[:, 1, :]
        return start_logits, end_logits


model = CustomBERTModel()

## Model 6

Distilbert uncased + 3 CNN (Cin=1 Cout=64 kernel=((3 and 5 and 7), hidden_size) stride=1 pad=(1,0)) + relu + concatenate in features dimension + CNN (Cin=8 Cout=2 kernel=(1, 3) stride=1 pad=0)

In [None]:
class CustomBERTModel(nn.Module):
    def __init__(self):
        super(CustomBERTModel, self).__init__()
        self.bert = AutoModelForQuestionAnswering.from_pretrained(model_used, output_hidden_states=True)
        self.hidden_size = 768
        self.sequence_length = 384
        # Convolve the output with kernel size 3, 5, and 7
        self.conv1 = nn.Conv2d(in_channels=1,
                               out_channels=8,
                               kernel_size=(3, self.hidden_size),
                               stride=1,
                               padding=(1, 0))
        self.conv2 = nn.Conv2d(in_channels=1,
                               out_channels=8,
                               kernel_size=(5, self.hidden_size),
                               stride=1,
                               padding=(2, 0))
        self.conv3 = nn.Conv2d(in_channels=1,
                               out_channels=8,
                               kernel_size=(7, self.hidden_size),
                               stride=1,
                               padding=(3, 0))
        # Convolve the concatenated output with kernel size 1
        self.conv4 = nn.Conv2d(in_channels=8,
                               out_channels=2,
                               kernel_size=(1, 3),
                               stride=1,
                               padding=0)

    def forward(self, ids, mask):
        output = self.bert(ids,
                           attention_mask=mask)  # output is a tuple of 3 elements (start_logits, end_logits, hidden_states)
        # hidden_states is a tuple of 7 elements, each element is a tensor of shape (batch_size, sequence_length, hidden_size)
        # Get the output of the last encoder layer
        last_encoder_layer_output = output[2][-1]
        # Reshape the output to (batch_size, 1, sequence_length, hidden_size)
        last_encoder_layer_output = last_encoder_layer_output.unsqueeze(1)
        # Apply the first convolutional layer
        conv1_output = self.conv1(last_encoder_layer_output)
        # Apply the ReLU activation function
        conv1_output = torch.relu(conv1_output)
        # Apply the second convolutional layer
        conv2_output = self.conv2(last_encoder_layer_output)
        # Apply the ReLU activation function
        conv2_output = torch.relu(conv2_output)
        # Apply the third convolutional layer
        conv3_output = self.conv3(last_encoder_layer_output)
        # Apply the ReLU activation function
        conv3_output = torch.relu(conv3_output)
        # Concatenate the output of the three convolutional layers
        conv_output = torch.cat((conv1_output, conv2_output, conv3_output), dim=3)
        # Apply the fourth convolutional layer
        conv4_output = self.conv4(conv_output)
        # Reshape the output to (batch_size, 2, sequence_length)
        conv4_output = conv4_output.squeeze(3)
        # Get the start_logits and end_logits
        start_logits = conv4_output[:, 0, :]
        end_logits = conv4_output[:, 1, :]
        return start_logits, end_logits


model = CustomBERTModel()

## Model 7

Distilbert uncased + FC(in=hidden_size out=64) + relu + FC(in=64 out=2)

In [None]:
class CustomBERTModel(nn.Module):
    def __init__(self):
        super(CustomBERTModel, self).__init__()
        self.bert = AutoModelForQuestionAnswering.from_pretrained(model_used, output_hidden_states=True)
        self.hidden_size = 768
        self.sequence_length = 384
        # FC layers
        self.fc1 = nn.Linear(self.hidden_size, 64)
        self.fc2 = nn.Linear(64, 2)

    def forward(self, ids, mask):
        output = self.bert(ids,
                           attention_mask=mask)  # output is a tuple of 3 elements (start_logits, end_logits, hidden_states)
        # hidden_states is a tuple of 7 elements, each element is a tensor of shape (batch_size, sequence_length, hidden_size)
        # Get the output of the last encoder layer
        last_encoder_layer_output = output[2][-1]
        # FC layers
        x = self.fc1(last_encoder_layer_output)
        x = torch.relu(x)
        x = self.fc2(x)
        # Get the start and end logits
        start_logits = x[:, :, 0]
        end_logits = x[:, :, 1]
        return start_logits, end_logits


model = CustomBERTModel()

## Model 8

Distilbert uncased + DO(0.5) + FC(in=hidden_size out=64) + relu + FC(in=64 out=2)

In [None]:
class CustomBERTModel(nn.Module):
    def __init__(self):
        super(CustomBERTModel, self).__init__()
        self.bert = AutoModelForQuestionAnswering.from_pretrained(model_used, output_hidden_states=True)
        self.hidden_size = 768
        self.sequence_length = 384
        # FC layers
        self.do = nn.Dropout(0.5)
        self.fc1 = nn.Linear(self.hidden_size, 64)
        self.fc2 = nn.Linear(64, 2)

    def forward(self, ids, mask):
        output = self.bert(ids,
                           attention_mask=mask)  # output is a tuple of 3 elements (start_logits, end_logits, hidden_states)
        # hidden_states is a tuple of 7 elements, each element is a tensor of shape (batch_size, sequence_length, hidden_size)
        # Get the output of the last encoder layer
        last_encoder_layer_output = output[2][-1]
        # FC layers
        x = self.do(last_encoder_layer_output)
        x = self.fc1(last_encoder_layer_output)
        x = torch.relu(x)
        x = self.fc2(x)
        # Get the start and end logits
        start_logits = x[:, :, 0]
        end_logits = x[:, :, 1]
        return start_logits, end_logits


model = CustomBERTModel()

# Applying pruning to baseline DistilBERT

In [None]:
def magnitude_prune(model, pruning_ratio):
    for name, module in model.named_modules():
        if isinstance(module, torch.nn.Linear):
            # Apply magnitude-based weight pruning
            prune.l1_unstructured(module, name="weight", amount=pruning_ratio)

    return model

In [None]:
class CustomBERTModel(nn.Module):
    def __init__(self):
        super(CustomBERTModel, self).__init__()
        non_pruned_model = AutoModelForQuestionAnswering.from_pretrained(model_used, output_hidden_states = True)
        pruning_ratio = 0.3  # Prune 30% of the smallest weights
        pruned_model = magnitude_prune(non_pruned_model, pruning_ratio)
        self.bert = pruned_model
        
    def forward(self, ids, mask):
        output = self.bert(
            ids, 
            attention_mask=mask)
        return output[0], output[1]

model = CustomBERTModel()