<a href="https://colab.research.google.com/github/TheAlchemist010/DeepLearning-Notebooks/blob/main/FIT3181/A2_Part3.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# <font color="#0b486b">  FIT3181: Deep Learning (2024) - Assignment 2 (Transformers)</font>
***
*CE/Lecturer (Clayton):*  **Dr Trung Le** | trunglm@monash.edu <br/>
*Lecturer (Clayton):* **Prof Dinh Phung** | dinh.phung@monash.edu <br/>
*Lecturer (Malaysia):*  **Dr Arghya Pal** | arghya.pal@monash.edu <br/>
*Lecturer (Malaysia):*  **Dr Lim Chern Hong** | lim.chernhong@monash.edu <br/>  <br/>
*Head Tutor 3181:*  **Miss Vy Vo** |  \[v.vo@monash.edu \] <br/>
*Head Tutor 5215:*  **Dr Van Nguyen** |  \[van.nguyen1@monash.edu \]

<br/> <br/>
Faculty of Information Technology, Monash University, Australia
***

# <font color="#0b486b">  Student Information</font>
***
Surname: **Gallagher**  <br/>
Firstname: **Daniel**    <br/>
Student ID: **33094969**    <br/>
Email: **dgal0013@student.monash.edu**    <br/>
Your tutorial time: **Monday 4pm**    <br/>
***

# <font color="0b486b">Assignment 2 – Deep Learning for Sequential Data</font>
### Due: <font color="red">11:55pm Sunday, 27 October 2024</font> (FIT3181)

#### <font color="red">Important note:</font> This is an **individual** assignment. It contributes **15%** to your final mark. Read the assignment instructions carefully.

## <font color="#0b486b">Assignment 2's Organization</font>
This assignment 2 has two (2) sections:
- Section 1: Fundamentals of RNNs (10 marks).
- Section 2: Deep Learning for Sequential Data (90 marks). This section is further divided into 4 parts.

The assignment 2 is organized in three (3) notebooks.
- Notebook 1 ([link](https://colab.research.google.com/drive/1Rm2wWOJCjilpVf4T6RJZFtkjRULuTo0g?usp=sharing)) [Total: 30 marks] includes Section 1 as well as Part 1 and Part 2 of Section 2.
- Notebook 2 ([link](https://colab.research.google.com/drive/19-WnvLH24yUZ_eih3_8P_Fjn8cM8X2Gz?usp=sharing)) [Total: 40 marks] includes Part 3 of Section 2.
- Notebook 3 (this notebook) [Total: 30 marks] includes Part 4 of Section 2.


## <font color="#0b486b">What to submit</font>

This assignment is to be completed individually and submitted to Moodle unit site. **By the due date, you are required to submit one  <font color="red; font-weight:bold">single zip file, named xxx_assignment02_solution.zip</font> where `xxx` is your student ID, to the corresponding Assignment (Dropbox) in Moodle**. You can use Google Colab to do Assignment 2 but you need to save it to an `*.ipynb` file to submit to the unit Moodle.

**More importantly, if you use Google Colab to do this assignment, you need to first make a copy of this notebook on your Google drive**.

***For example, if your student ID is <font color="red; font-weight:bold">12356</font>, then gather all of your assignment solutions to a folder, create a zip file named <font color="red; font-weight:bold">123456_assignment02_solution.zip</font> and submit this file.***

Within this zip folder, you **must** submit the following files <u>for each part</u>:
1.	**`FIT3181_DeepLearning_Assignment2_Official[Main].ipynb`**:  this is your Python notebook solution source file.
1.	**`FIT3181_DeepLearning_Assignment2_Official[Main].html`**: this is the output of your Python notebook solution *exported* in HTML format.
1. **`FIT3181_DeepLearning_Assignment2_Official[RNNs].ipynb`**
1. **`FIT3181_DeepLearning_Assignment2_Official[RNNs].html`**
1. **`FIT3181_DeepLearning_Assignment2_Official[Transformers].ipynb`**
1. **`FIT3181_DeepLearning_Assignment2_Official[Transformers].html`**
1.	Any **extra files or folder** needed to complete your assignment (e.g., images used in your answers).



## Section 2: Deep Learning for Sequential Data

### <font color="#0b486b">Set random seeds</font>

We need to install the package datasets for creating BERT datasets.

In [None]:
!pip install datasets



We start with importing PyTorch and NumPy and setting random seeds for PyTorch and NumPy. You can use any seeds you prefer.

In [None]:
import os
import torch
import random
import requests
import pandas as pd
import numpy as np
import torch.nn as nn
from torch.utils.data import DataLoader
from torch.nn.utils.rnn import pad_sequence
from transformers import BertTokenizer
import os
from six.moves.urllib.request import urlretrieve
from sklearn import preprocessing
import matplotlib.pyplot as plt
plt.style.use('ggplot')

In [None]:
def seed_all(seed=1029):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)  # if you are using multi-GPU.
    torch.backends.cudnn.benchmark = False
    torch.backends.cudnn.deterministic = True
seed_all(seed=1234)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

## <font color="#0b486b">Download and preprocess the data</font>

<div style="text-align: right"><font color="red; font-weight:bold"><span></div>

The dataset we use for this assignment is a question classification dataset for which the training set consists of $5,500$ questions belonging to 6 coarse question categories including:
- abbreviation (ABBR),
- entity (ENTY),
- description (DESC),
- human (HUM),
- location (LOC) and
- numeric (NUM).

In this assignment, we will utilize a subset of this dataset, containing $2,000$ questions for training and validation. We will use 80% of those 2000 questions for trainning and the rest for validation.


Preprocessing data is a crucial initial step in any machine learning or deep learning project. The *TextDataManager* class simplifies the process by providing functionalities to download and preprocess data specifically designed for the subsequent questions in this assignment. It is highly recommended to gain a comprehensive understanding of the class's functionality by **carefully reading** the content provided in the *TextDataManager* class before proceeding to answer the questions.

In [None]:
class DataManager:
    """
    This class manages and preprocesses a simple text dataset for a sentence classification task.

    Attributes:
        verbose (bool): Controls verbosity for printing information during data processing.
        max_sentence_len (int): The maximum length of a sentence in the dataset.
        str_questions (list): A list to store the string representations of the questions in the dataset.
        str_labels (list): A list to store the string representations of the labels in the dataset.
        numeral_labels (list): A list to store the numerical representations of the labels in the dataset.
        maxlen (int): Maximum length for padding sequences. Sequences longer than this length will be truncated,
            and sequences shorter than this length will be padded with zeros. Defaults to 50.
        numeral_data (list): A list to store the numerical representations of the questions in the dataset.
        random_state (int): Seed value for random number generation to ensure reproducibility.
            Set this value to a specific integer to reproduce the same random sequence every time. Defaults to 6789.
        random (np.random.RandomState): Random number generator object initialized with the given random_state.
            It is used for various random operations in the class.

    Methods:
        maybe_download(dir_name, file_name, url, verbose=True):
            Downloads a file from a given URL if it does not exist in the specified directory.
            The directory and file are created if they do not exist.

        read_data(dir_name, file_names):
            Reads data from files in a directory, preprocesses it, and computes the maximum sentence length.
            Each file is expected to contain rows in the format "<label>:<question>".
            The labels and questions are stored as string representations.

        manipulate_data():
            Performs data manipulation by tokenizing, numericalizing, and padding the text data.
            The questions are tokenized and converted into numerical sequences using a tokenizer.
            The sequences are padded or truncated to the maximum sequence length.

        train_valid_test_split(train_ratio=0.9):
            Splits the data into training, validation, and test sets based on a given ratio.
            The data is randomly shuffled, and the specified ratio is used to determine the size of the training set.
            The string questions, numerical data, and numerical labels are split accordingly.
            TensorFlow `Dataset` objects are created for the training and validation sets.


    """

    def __init__(self, verbose=True, random_state=6789):
        self.verbose = verbose
        self.max_sentence_len = 0
        self.str_questions = list()
        self.str_labels = list()
        self.numeral_labels = list()
        self.numeral_data = list()
        self.random_state = random_state
        self.random = np.random.RandomState(random_state)

    @staticmethod
    def maybe_download(dir_name, file_name, url, verbose=True):
        if not os.path.exists(dir_name):
            os.mkdir(dir_name)
        if not os.path.exists(os.path.join(dir_name, file_name)):
            urlretrieve(url + file_name, os.path.join(dir_name, file_name))
        if verbose:
            print("Downloaded successfully {}".format(file_name))

    def read_data(self, dir_name, file_names):
        self.str_questions = list()
        self.str_labels = list()
        for file_name in file_names:
            file_path= os.path.join(dir_name, file_name)
            with open(file_path, "r", encoding="latin-1") as f:
                for row in f:
                    row_str = row.split(":")
                    label, question = row_str[0], row_str[1]
                    question = question.lower()
                    self.str_labels.append(label)
                    self.str_questions.append(question[0:-1])
                    if self.max_sentence_len < len(self.str_questions[-1]):
                        self.max_sentence_len = len(self.str_questions[-1])

        # turns labels into numbers
        le = preprocessing.LabelEncoder()
        le.fit(self.str_labels)
        self.numeral_labels = np.array(le.transform(self.str_labels))
        self.str_classes = le.classes_
        self.num_classes = len(self.str_classes)
        if self.verbose:
            print("\nSample questions and corresponding labels... \n")
            print(self.str_questions[0:5])
            print(self.str_labels[0:5])

    def manipulate_data(self):
        self.tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
        vocab = self.tokenizer.get_vocab()
        self.word2idx = {w: i for i, w in enumerate(vocab)}
        self.idx2word = {i:w for w,i in self.word2idx.items()}
        self.vocab_size = len(self.word2idx)

        token_ids = []
        num_seqs = []
        for text in self.str_questions:  # iterate over the list of text
          text_seqs = self.tokenizer.tokenize(str(text))  # tokenize each text individually
          # Convert tokens to IDs
          token_ids = self.tokenizer.convert_tokens_to_ids(text_seqs)
          # Convert token IDs to a tensor of indices using your word2idx mapping
          seq_tensor = torch.LongTensor(token_ids)
          num_seqs.append(seq_tensor)  # append the tensor for each sequence

        # Pad the sequences and create a tensor
        if num_seqs:
          self.numeral_data = pad_sequence(num_seqs, batch_first=True)  # Pads to max length of the sequences
          self.num_sentences, self.max_seq_len = self.numeral_data.shape

    def train_valid_test_split(self, train_ratio=0.8, test_ratio = 0.1):
        train_size = int(self.num_sentences*train_ratio) +1
        test_size = int(self.num_sentences*test_ratio) +1
        valid_size = self.num_sentences - (train_size + test_size)
        data_indices = list(range(self.num_sentences))
        random.shuffle(data_indices)
        self.train_str_questions = [self.str_questions[i] for i in data_indices[:train_size]]
        self.train_numeral_labels = self.numeral_labels[data_indices[:train_size]]
        train_set_data = self.numeral_data[data_indices[:train_size]]
        train_set_labels = self.numeral_labels[data_indices[:train_size]]
        train_set_labels = torch.from_numpy(train_set_labels)
        train_set = torch.utils.data.TensorDataset(train_set_data, train_set_labels)
        self.test_str_questions = [self.str_questions[i] for i in data_indices[-test_size:]]
        self.test_numeral_labels = self.numeral_labels[data_indices[-test_size:]]
        test_set_data = self.numeral_data[data_indices[-test_size:]]
        test_set_labels = self.numeral_labels[data_indices[-test_size:]]
        test_set_labels = torch.from_numpy(test_set_labels)
        test_set = torch.utils.data.TensorDataset(test_set_data, test_set_labels)
        self.valid_str_questions = [self.str_questions[i] for i in data_indices[train_size:-test_size]]
        self.valid_numeral_labels = self.numeral_labels[data_indices[train_size:-test_size]]
        valid_set_data = self.numeral_data[data_indices[train_size:-test_size]]
        valid_set_labels = self.numeral_labels[data_indices[train_size:-test_size]]
        valid_set_labels = torch.from_numpy(valid_set_labels)
        valid_set = torch.utils.data.TensorDataset(valid_set_data, valid_set_labels)
        self.train_loader = DataLoader(train_set, batch_size=64, shuffle=True)
        self.test_loader = DataLoader(test_set, batch_size=64, shuffle=False)
        self.valid_loader = DataLoader(valid_set, batch_size=64, shuffle=False)

In [None]:
print('Loading data...')
DataManager.maybe_download("data", "train_2000.label", "http://cogcomp.org/Data/QA/QC/")

dm = DataManager()
dm.read_data("data/", ["train_2000.label"])

Loading data...
Downloaded successfully train_2000.label

Sample questions and corresponding labels... 

['manner how did serfdom develop in and then leave russia ?', 'cremat what films featured the character popeye doyle ?', "manner how can i find a list of celebrities ' real names ?", 'animal what fowl grabs the spotlight after the chinese year of the monkey ?', 'exp what is the full form of .com ?']
['DESC', 'ENTY', 'DESC', 'ENTY', 'ABBR']


In [None]:
dm.manipulate_data()
dm.train_valid_test_split(train_ratio=0.8, test_ratio = 0.1)

In [None]:
for x, y in dm.train_loader:
    print(x.shape, y.shape)
    break

torch.Size([64, 36]) torch.Size([64])


We now declare the `BaseTrainer` class, which will be used later to train the subsequent deep learning models for text data.

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

class BaseTrainer:
    def __init__(self, model, criterion, optimizer, train_loader, val_loader):
        self.model = model
        self.criterion = criterion  #the loss function
        self.optimizer = optimizer  #the optimizer
        self.train_loader = train_loader  #the train loader
        self.val_loader = val_loader  #the valid loader

    #the function to train the model in many epochs
    def fit(self, num_epochs):
        self.num_batches = len(self.train_loader)

        for epoch in range(num_epochs):
            print(f'Epoch {epoch + 1}/{num_epochs}')
            train_loss, train_accuracy = self.train_one_epoch()
            val_loss, val_accuracy = self.validate_one_epoch()
            print(
                f'{self.num_batches}/{self.num_batches} - train_loss: {train_loss:.4f} - train_accuracy: {train_accuracy*100:.4f}% \
                - val_loss: {val_loss:.4f} - val_accuracy: {val_accuracy*100:.4f}%')

    #train in one epoch, return the train_acc, train_loss
    def train_one_epoch(self):
        self.model.train()
        running_loss, correct, total = 0.0, 0, 0
        for i, data in enumerate(self.train_loader):
            inputs, labels = data
            inputs, labels = inputs.to(device), labels.to(device)
            self.optimizer.zero_grad()
            outputs = self.model(inputs)
            loss = self.criterion(outputs, labels)
            loss.backward()
            self.optimizer.step()

            running_loss += loss.item()
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
        train_accuracy = correct / total
        train_loss = running_loss / self.num_batches
        return train_loss, train_accuracy

    #evaluate on a loader and return the loss and accuracy
    def evaluate(self, loader):
        self.model.eval()
        loss, correct, total = 0.0, 0, 0
        with torch.no_grad():
            for data in loader:
                inputs, labels = data
                inputs, labels = inputs.to(device), labels.to(device)
                outputs = self.model(inputs)
                loss = self.criterion(outputs, labels)
                loss += loss.item()
                _, predicted = torch.max(outputs.data, 1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()

        accuracy = correct / total
        loss = loss / len(self.val_loader)
        return loss, accuracy

    #return the val_acc, val_loss, be called at the end of each epoch
    def validate_one_epoch(self):
      val_loss, val_accuracy = self.evaluate(self.val_loader)
      return val_loss, val_accuracy

## <font color="#0b486b">Part 4: Transformer-based models for sequence modeling and neural embedding</font>

<div style="text-align: right"><font color="red; font-weight:bold">[Total marks for this part: 30 marks]<span></div>

#### <font color="red">**Question 4.1**</font>

**Implement the multi-head attention module of the Transformer for the text classification problem. The provided code is from our tutorial. In this part, we only use the output of the Transformer encoder for the classification task. For further information on the Transformer model, refer to [this paper](https://proceedings.neurips.cc/paper_files/paper/2017/file/3f5ee243547dee91fbd053c1c4a845aa-Paper.pdf).**

<div style="text-align: right"><font color="red; font-weight:bold">[Total marks for this part: 10 marks]<span></div>


Below is the code of `MultiHeadSelfAttention`, `PositionWiseFeedForward`, `PositionalEncoding`, and `EncoderLayer`.

In [None]:
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, num_heads):
        super(MultiHeadAttention, self).__init__()
        # Ensure that the model dimension (d_model) is divisible by the number of heads
        assert d_model % num_heads == 0, "d_model must be divisible by num_heads"

        # Initialize dimensions
        self.d_model = d_model # Model's dimension
        self.num_heads = num_heads # Number of attention heads
        self.d_k = d_model // num_heads # Dimension of each head's key, query, and value

        # Linear layers for transforming inputs
        self.W_q = nn.Linear(d_model, d_model) # Query transformation
        self.W_k = nn.Linear(d_model, d_model) # Key transformation
        self.W_v = nn.Linear(d_model, d_model) # Value transformation
        self.W_o = nn.Linear(d_model, d_model) # Output transformation

    def scaled_dot_product_attention(self, Q, K, V):
        # Calculate attention scores
        attn_scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.d_k)

        # Apply mask if provided (useful for preventing attention to certain parts like padding)
        #if mask is not None:
            #attn_scores = attn_scores.masked_fill(mask == 0, -1e9)

        # Softmax is applied to obtain attention probabilities
        attn_probs = torch.softmax(attn_scores, dim=-1)

        # Multiply by values to obtain the final output
        output = torch.matmul(attn_probs, V)
        return output

    def split_heads(self, x):
        # Reshape the input to have num_heads for multi-head attention
        batch_size, seq_length, d_model = x.size()
        return x.view(batch_size, seq_length, self.num_heads, self.d_k).transpose(1, 2)

    def combine_heads(self, x):
        # Combine the multiple heads back to original shape
        batch_size, _, seq_length, d_k = x.size()
        return x.transpose(1, 2).contiguous().view(batch_size, seq_length, self.d_model)

    def forward(self, Q, K, V):
        # Apply linear transformations and split heads
        Q = self.split_heads(self.W_q(Q))
        K = self.split_heads(self.W_k(K))
        V = self.split_heads(self.W_v(V))

        # Perform scaled dot-product attention
        attn_output = self.scaled_dot_product_attention(Q, K, V)

        # Combine heads and apply output transformation
        output = self.W_o(self.combine_heads(attn_output))
        return output

In [None]:
class PositionWiseFeedForward(nn.Module):
    def __init__(self, d_model, d_ff):
        super(PositionWiseFeedForward, self).__init__()
        self.fc1 = nn.Linear(d_model, d_ff)
        self.fc2 = nn.Linear(d_ff, d_model)
        self.relu = nn.ReLU()

    def forward(self, x):
        return self.fc2(self.relu(self.fc1(x)))

In [None]:
import math

class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_seq_length):
        super(PositionalEncoding, self).__init__()

        pe = torch.zeros(max_seq_length, d_model)
        position = torch.arange(0, max_seq_length, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * -(math.log(10000.0) / d_model))

        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)

        self.register_buffer('pe', pe.unsqueeze(0))

    def forward(self, x):
        return x + self.pe[:, :x.size(1)]

In [None]:
class EncoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout):
        super(EncoderLayer, self).__init__()
        self.self_attn = MultiHeadAttention(d_model, num_heads)
        self.feed_forward = PositionWiseFeedForward(d_model, d_ff)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        attn_output = self.self_attn(x, x, x)
        x = self.norm1(x + self.dropout(attn_output))
        ff_output = self.feed_forward(x)
        x = self.norm2(x + self.dropout(ff_output))
        return x

Your task is to develop `TransformerClassifier` in which we input the embedding with the shape `[batch_size, seq_len, embed_dim]` to some `EncoderLayer` (i.e., num_layers specifies the number of EncoderLayer) and then compute the average of all token embeddings (i.e., `[batch_size, seq_len, embed_dim]`) across the `seq_len`. Finally, on the top of this average embedding, we build up a linear layer for making predictions.

In [None]:
class TransformerClassifier(nn.Module):
    def __init__(self, embed_dim, num_heads, ff_dim, num_layers, dropout_rate=0.2, data_manager = None):
        super(TransformerClassifier, self).__init__()
        self.vocab_size = data_manager.vocab_size
        self.num_classes = data_manager.num_classes
        self.embed_dim = embed_dim
        self.max_seq_len = data_manager.max_seq_len
        self.num_heads = num_heads
        self.ff_dim = ff_dim
        self.num_layers = num_layers
        self.dropout_rate = dropout_rate

    def build(self):

        self.embedding = nn.Embedding(self.vocab_size, self.embed_dim)
        self.positional_encoding = PositionalEncoding(self.embed_dim, self.max_seq_len)

        self.encoder_layers = nn.ModuleList()
        for _ in range(self.num_layers):
            self.encoder_layers.append(EncoderLayer(self.embed_dim, self.num_heads, self.ff_dim, self.dropout_rate))


        self.fc = nn.Linear(self.embed_dim, self.num_classes)



    def forward(self, x):
        x = self.embedding(x)
        x = self.positional_encoding(x)
        for layer in self.encoder_layers:
            x = layer(x)
        x = torch.mean(x, dim=1)

        return self.fc(x)


In [None]:
transformer = TransformerClassifier(embed_dim=512, num_heads=8, ff_dim=2048, num_layers=12, dropout_rate=0.1, data_manager= dm)
transformer.build()
transformer = transformer.to(device)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(transformer.parameters(), lr=1e-4, betas=(0.9, 0.98), eps=1e-9)
trainer = BaseTrainer(model= transformer, criterion=criterion, optimizer=optimizer, train_loader=dm.train_loader, val_loader=dm.valid_loader)
trainer.fit(num_epochs=30)

Epoch 1/30
26/26 - train_loss: 2.0195 - train_accuracy: 22.4235%                 - val_loss: 0.8066 - val_accuracy: 12.6263%
Epoch 2/30
26/26 - train_loss: 1.7232 - train_accuracy: 18.8007%                 - val_loss: 0.8273 - val_accuracy: 12.6263%
Epoch 3/30
26/26 - train_loss: 1.6846 - train_accuracy: 21.9238%                 - val_loss: 0.8405 - val_accuracy: 26.2626%
Epoch 4/30
26/26 - train_loss: 1.6877 - train_accuracy: 21.9863%                 - val_loss: 0.7704 - val_accuracy: 27.2727%
Epoch 5/30
26/26 - train_loss: 1.6988 - train_accuracy: 20.8620%                 - val_loss: 0.7289 - val_accuracy: 27.2727%
Epoch 6/30
26/26 - train_loss: 1.6803 - train_accuracy: 21.3616%                 - val_loss: 0.7095 - val_accuracy: 27.2727%
Epoch 7/30
26/26 - train_loss: 1.6396 - train_accuracy: 26.2961%                 - val_loss: 0.7326 - val_accuracy: 37.3737%
Epoch 8/30
26/26 - train_loss: 1.4001 - train_accuracy: 37.6640%                 - val_loss: 0.7161 - val_accuracy: 47.4747%


#### <font color="red">**Question 4.2**</font>
**Prefix prompt-tuning with Transformers: You need to implement the prefix prompt-tuning with Transformers. Basically, we base on a pre-trained Transformer, add prefix prompts, and do fine-tuning for a target dataset.**

<div style="text-align: right"><font color="red; font-weight:bold">[Total marks for this part: 10 marks]<span></div>

To implement prefix prompt-tuning with pretrained Transformers, we first need to create the Bert dataset.

In [None]:
from transformers import AutoModel, AutoTokenizer, AdamW
from datasets import Dataset

model_name = "bert-base-uncased"  # BERT or any similar model

# Tokenize input and prepare model inputs
tokenizer = AutoTokenizer.from_pretrained(model_name)

dataset = Dataset.from_dict({"text": dm.str_questions, "label": dm.numeral_labels})

# Tokenize the dataset
def tokenize_function(examples):
    return tokenizer(examples["text"], padding="max_length", truncation=True, max_length= 36)

dataset = dataset.map(tokenize_function, batched=True)
dataset.set_format(type="torch", columns=["input_ids", "attention_mask", "label"])
print(dataset)

The following function splits the BERT dataset `dataset` into three BERT datasets for training, valid, and testing.

In [None]:
def train_valid_test_split(dataset, train_ratio=0.8, test_ratio = 0.1):
    num_sentences = len(dataset)
    train_size = int(num_sentences*train_ratio) +1
    test_size = int(num_sentences*test_ratio) +1
    valid_size = num_sentences - (train_size + test_size)
    train_set = dataset[:train_size]
    train_set = Dataset.from_dict(train_set)
    train_set.set_format(type="torch", columns=["input_ids", "attention_mask", "label"])
    test_set = dataset[-test_size:]
    test_set = Dataset.from_dict(test_set)
    test_set.set_format(type="torch", columns=["input_ids", "attention_mask", "label"])
    valid_set = dataset[train_size:-test_size]
    valid_set = Dataset.from_dict(valid_set)
    valid_set.set_format(type="torch", columns=["input_ids", "attention_mask", "label"])
    train_loader = DataLoader(train_set, batch_size=64, shuffle=True)
    test_loader = DataLoader(test_set, batch_size=64, shuffle=False)
    valid_loader = DataLoader(valid_set, batch_size=64, shuffle=False)
    return train_loader, test_loader, valid_loader


In [None]:
train_loader, test_loader, valid_loader = train_valid_test_split(dataset)

You need to implement the class `PrefixTuningForClassification` for the prefix prompt fine-tuning. We first load a pre-trained BERT model specified by `model_name`. The parameter `prefix_length` specifies the length of the prefix prompts we add to the pre-trained BERT model. Specifically, given the input batch `[batch_size, seq_len]`, we input to the embedding layer of the pre-trained BERT model to obtain `[batch_size, seq_len, embed_size]`. We create the prefix prompts $P$ of the size `[prefix_length, embed_size]` and concatenate to the embeddings from the pre-trained BERT to obtain `[batch_size, seq_len + prefix_length, embed_size]`. This concatenation tensor will then be fed to the encoder layers of the pre-trained BERT layer to obtain the last `[batch_size, seq_len + prefix_length, embed_size]`.

We then take mean across the seq_len to obtain `[batch_size, embed_size]` on which we can build up a linear layer for making predictions. Please note that **the parameters to tune include the prefix prompts $P$** and **the output linear layer**, and you should freeze the parameters of the BERT pre-trained model. Moreover, your code should cover the edge case when `prefix_length=None`. In this case, we do not insert any prefix prompts and we only do fine-tuning for the output linear layer on top.  

In [None]:
class PrefixTuningForClassification(nn.Module):
    def __init__(self, model_name, prefix_length=None, data_manager = None):
        super(PrefixTuningForClassification, self).__init__()

        # Load the pretrained transformer model (BERT-like model)
        self.model = AutoModel.from_pretrained(model_name).to(device)
        self.hidden_size =  self.model.config.hidden_size
        self.prefix_length = prefix_length
        self.num_classes = data_manager.num_classes
        self.build()

    def build(self):
        self.prefix_embeddings = torch.empty(self.prefix_length, self.hidden_size).to(device)
        torch.nn.init.xavier_uniform_(self.prefix_embeddings)
        self.prefix_embeddings.requires_grad = True

        for param in self.model.parameters():
            param.requires_grad = False

        self.classifier = nn.Linear(self.hidden_size, self.num_classes)

    def forward(self, input_ids, attention_mask):
        embeddings = self.model.embeddings(input_ids)
        prefix_embeddings_expanded = self.prefix_embeddings.unsqueeze(0).expand(embeddings.shape[0], -1, -1)
        embeddings = torch.cat((prefix_embeddings_expanded, embeddings), dim=1)
        batch_size, seq_len = input_ids.shape
        extended_attention_mask = torch.cat(
            (torch.ones(batch_size, self.prefix_length, device=attention_mask.device), attention_mask),
            dim=1
        )
        extended_attention_mask = self.model.get_extended_attention_mask(extended_attention_mask, (batch_size, seq_len + self.prefix_length), attention_mask.device)
        x = self.model.encoder(embeddings, attention_mask=extended_attention_mask).last_hidden_state
        x = torch.mean(x, dim=1)
        x = self.classifier(x)

        return x



You can use the following `FineTunedBaseTrainer` to train the prompt fine-tuning models.

In [None]:
class FineTunedBaseTrainer:
    def __init__(self, model, criterion, optimizer, train_loader, val_loader):
        self.model = model
        self.criterion = criterion  #the loss function
        self.optimizer = optimizer  #the optimizer
        self.train_loader = train_loader  #the train loader
        self.val_loader = val_loader  #the valid loader

    #the function to train the model in many epochs
    def fit(self, num_epochs):
        self.num_batches = len(self.train_loader)

        for epoch in range(num_epochs):
            print(f'Epoch {epoch + 1}/{num_epochs}')
            train_loss, train_accuracy = self.train_one_epoch()
            val_loss, val_accuracy = self.validate_one_epoch()
            print(
                f'{self.num_batches}/{self.num_batches} - train_loss: {train_loss:.4f} - train_accuracy: {train_accuracy*100:.4f}% \
                - val_loss: {val_loss:.4f} - val_accuracy: {val_accuracy*100:.4f}%')

    #train in one epoch, return the train_acc, train_loss
    def train_one_epoch(self):
        self.model.train()
        running_loss, correct, total = 0.0, 0, 0
        for batch in self.train_loader:
            input_ids = batch["input_ids"].to(device)
            attention_mask = batch["attention_mask"].to(device)
            labels = batch["label"].to(device)
            self.optimizer.zero_grad()
            outputs = self.model(input_ids= input_ids, attention_mask= attention_mask)
            loss = self.criterion(outputs, labels)
            loss.backward()
            self.optimizer.step()

            running_loss += loss.item()
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
        train_accuracy = correct / total
        train_loss = running_loss / self.num_batches
        return train_loss, train_accuracy

    #evaluate on a loader and return the loss and accuracy
    def evaluate(self, loader):
        self.model.eval()
        loss, correct, total = 0.0, 0, 0
        with torch.no_grad():
            for batch in loader:
                input_ids = batch["input_ids"].to(device)
                labels = batch["label"].to(device)
                attention_mask = batch["attention_mask"].to(device)
                outputs = self.model(input_ids= input_ids, attention_mask= attention_mask)
                loss = self.criterion(outputs, labels)
                loss += loss.item()
                _, predicted = torch.max(outputs.data, 1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()

        accuracy = correct / total
        loss = loss / len(self.val_loader)
        return loss, accuracy

    #return the val_acc, val_loss, be called at the end of each epoch
    def validate_one_epoch(self):
      val_loss, val_accuracy = self.evaluate(self.val_loader)
      return val_loss, val_accuracy

We declare and train the prefix-prompt tuning model. In addition, you need to be patient with this model because it might converge slowly with many epochs.

In [None]:
prefix_tuning_model = PrefixTuningForClassification(model_name = "bert-base-uncased", prefix_length = 5, data_manager = dm).to(device)

In [None]:
if prefix_tuning_model.prefix_length is not None:
  optimizer = torch.optim.Adam(list(prefix_tuning_model.classifier.parameters()) + [prefix_tuning_model.prefix_embeddings], lr=5e-5)
else:
  optimizer = torch.optim.Adam(prefix_tuning_model.classifier.parameters(), lr=1e-4)
criterion = nn.CrossEntropyLoss()
trainer = FineTunedBaseTrainer(model= prefix_tuning_model, criterion=criterion, optimizer=optimizer, train_loader=train_loader, val_loader=valid_loader)
trainer.fit(num_epochs=100)

Epoch 1/100




26/26 - train_loss: 1.7959 - train_accuracy: 20.6121%                 - val_loss: 0.9391 - val_accuracy: 19.6970%
Epoch 2/100
26/26 - train_loss: 1.7557 - train_accuracy: 25.2967%                 - val_loss: 0.9187 - val_accuracy: 26.2626%
Epoch 3/100
26/26 - train_loss: 1.7322 - train_accuracy: 28.0450%                 - val_loss: 0.9033 - val_accuracy: 29.7980%
Epoch 4/100
26/26 - train_loss: 1.7085 - train_accuracy: 31.5428%                 - val_loss: 0.8883 - val_accuracy: 32.3232%
Epoch 5/100
26/26 - train_loss: 1.6836 - train_accuracy: 35.6027%                 - val_loss: 0.8740 - val_accuracy: 34.8485%
Epoch 6/100
26/26 - train_loss: 1.6524 - train_accuracy: 36.7270%                 - val_loss: 0.8642 - val_accuracy: 36.3636%
Epoch 7/100
26/26 - train_loss: 1.6373 - train_accuracy: 38.9756%                 - val_loss: 0.8553 - val_accuracy: 37.3737%
Epoch 8/100
26/26 - train_loss: 1.6101 - train_accuracy: 39.7252%                 - val_loss: 0.8450 - val_accuracy: 39.8990%
Epoc

#### <font color="red">**Question 4.3**</font>
**For any models defined in the previous questions (of all parts), you are free to fine-tune hyperparameters, e.g., `optimizer`, `learning_rate`, `state_sizes`, such that you get a best model, i.e., the one with the highest accuracy on the test set. You will need to report (i) what is your best model,  (ii) its accuracy on the test set, and (iii) the values of its hyperparameters. Note that you must report your best model's accuracy with rounding to 4 decimal places, i.e., 0.xxxx. You will also need to upload your best model (or provide us with the link to download your best model). The assessment will be based on your best model's accuracy, with up to 9 marks available, specifically:**
* The best accuracy $\ge$ 0.97: 10 marks
* 0.97 $>$ The best accuracy $\ge$ 0.92: 7 marks
* 0.92 $>$ The best accuracy $\ge$ 0.85: 4 marks
* The best accuracy $<$ 0.85: 0 mark

**For this question, you can put below the code to train the best model. In this case, you need to show your code and the evidence of running regarding the best model. Moreover, if you save the best model, you need to provide the link to download the best model, the code to load the best model, and then evaluate on the test set.**
<div style="text-align: right"><font color="red">[10 marks]</font></div>

# Give your answer here.

(i) What is your best model?

PrefixTuningForClassification from the base model bert-base-uncased

(ii) The accuracy of your best model on the test set

97.4747%

(iii) The values of the hyperparameters of your best model

prefix_length = 20, learning rate=4e-5, 543 epochs

(iv) The link to download your best model

https://github.com/TheAlchemist010/DeepLearning-Notebooks/raw/refs/heads/main/FIT3181/model.pth

Code to download and test below

In [None]:
#Initial training
prefix_tuning_model = PrefixTuningForClassification(model_name = "bert-base-uncased", prefix_length = 20, data_manager = dm).to(device)
optimizer = torch.optim.Adam(list(prefix_tuning_model.classifier.parameters()) + [prefix_tuning_model.prefix_embeddings], lr=4e-5)
criterion = nn.CrossEntropyLoss()
trainer = FineTunedBaseTrainer(model= prefix_tuning_model, criterion=criterion, optimizer=optimizer, train_loader=train_loader, val_loader=valid_loader)
trainer.fit(num_epochs=400)

#trainer.fit(num_epochs=400) is later ran again to reach the accuraccy

Epoch 1/400




26/26 - train_loss: 1.7645 - train_accuracy: 20.4872%                 - val_loss: 0.9119 - val_accuracy: 18.6869%
Epoch 2/400
26/26 - train_loss: 1.7526 - train_accuracy: 20.7370%                 - val_loss: 0.8934 - val_accuracy: 18.6869%
Epoch 3/400
26/26 - train_loss: 1.7244 - train_accuracy: 21.7364%                 - val_loss: 0.8753 - val_accuracy: 19.6970%
Epoch 4/400
26/26 - train_loss: 1.7243 - train_accuracy: 24.3598%                 - val_loss: 0.8630 - val_accuracy: 23.2323%
Epoch 5/400
26/26 - train_loss: 1.6869 - train_accuracy: 25.6715%                 - val_loss: 0.8545 - val_accuracy: 26.2626%
Epoch 6/400
26/26 - train_loss: 1.6720 - train_accuracy: 29.4191%                 - val_loss: 0.8469 - val_accuracy: 30.8081%
Epoch 7/400
26/26 - train_loss: 1.6759 - train_accuracy: 31.6677%                 - val_loss: 0.8416 - val_accuracy: 30.8081%
Epoch 8/400
26/26 - train_loss: 1.6481 - train_accuracy: 33.9163%                 - val_loss: 0.8342 - val_accuracy: 32.8283%
Epoc

In [None]:
#Save
model_save_path = "model.pth"

torch.save({
    'classifier_state_dict': prefix_tuning_model.classifier.state_dict(),
    'prefix_embeddings': prefix_tuning_model.prefix_embeddings
}, model_save_path)

In [None]:
#Download from my github
url = "https://github.com/TheAlchemist010/DeepLearning-Notebooks/raw/refs/heads/main/FIT3181/model.pth"
response = requests.get(url)
with open("downloaded_prefix_tuning.pth", "wb") as f:
    f.write(response.content)
checkpoint = torch.load("downloaded_prefix_tuning.pth")

dowloaded_model = PrefixTuningForClassification(
    model_name="bert-base-uncased",
    prefix_length=20,
    data_manager=dm
).to(device)

dowloaded_model.classifier.load_state_dict(checkpoint['classifier_state_dict'])
dowloaded_model.prefix_embeddings = checkpoint['prefix_embeddings']

  checkpoint = torch.load("downloaded_prefix_tuning.pth")


In [None]:
#Test on training set
trainer = FineTunedBaseTrainer(model= dowloaded_model, criterion=criterion, optimizer=optimizer, train_loader=train_loader, val_loader=valid_loader)
val_loss, val_accuracy = trainer.evaluate(trainer.val_loader)
print(f'Validation Loss: {val_loss:.4f}, Validation Accuracy: {val_accuracy*100:.2f}%')

Validation Loss: 0.0165, Validation Accuracy: 97.47%


---
<div style="text-align: center"> <font color="green">GOOD LUCK WITH YOUR ASSIGNMENT 2!</font> </div>
<div style="text-align: center"> <font color="black">END OF ASSIGNMENT</font> </div>