# Deep Learning for Sequential Data (Transformers)

# Section 1: Set Random Seeds and Preprocess Data

## Install Required Packages

In [1]:
!pip install datasets transformers -q

## Import Libraries

In [2]:
import os
import torch
import random
import requests
import pandas as pd
import numpy as np
import torch.nn as nn
import math
from torch.utils.data import DataLoader
from torch.nn.utils.rnn import pad_sequence
from transformers import BertTokenizer, AutoModel, AutoTokenizer
from torch.optim import AdamW
from datasets import Dataset
import os
from six.moves.urllib.request import urlretrieve
from sklearn import preprocessing
import matplotlib.pyplot as plt
plt.style.use('ggplot')

## Set Random Seeds

In [3]:
def seed_all(seed=1029):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)  # if you are using multi-GPU.
    torch.backends.cudnn.benchmark = False
    torch.backends.cudnn.deterministic = True

seed_all(seed=1234)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

Using device: cuda


## 1.2 DataManager Class

In [4]:
class DataManager:
    """
    This class manages and preprocesses a simple text dataset for a sentence classification task.
    """

    def __init__(self, verbose=True, random_state=6789):
        self.verbose = verbose
        self.max_sentence_len = 0
        self.str_questions = list()
        self.str_labels = list()
        self.numeral_labels = list()
        self.numeral_data = list()
        self.random_state = random_state
        self.random = np.random.RandomState(random_state)

    @staticmethod
    def maybe_download(dir_name, file_name, url, verbose=True):
        if not os.path.exists(dir_name):
            os.mkdir(dir_name)
        if not os.path.exists(os.path.join(dir_name, file_name)):
            urlretrieve(url + file_name, os.path.join(dir_name, file_name))
        if verbose:
            print("Downloaded successfully {}".format(file_name))

    def read_data(self, dir_name, file_names):
        self.str_questions = list()
        self.str_labels = list()
        for file_name in file_names:
            file_path= os.path.join(dir_name, file_name)
            with open(file_path, "r", encoding="latin-1") as f:
                for row in f:
                    row_str = row.split(":")
                    label, question = row_str[0], row_str[1]
                    question = question.lower()
                    self.str_labels.append(label)
                    self.str_questions.append(question[0:-1])
                    if self.max_sentence_len < len(self.str_questions[-1]):
                        self.max_sentence_len = len(self.str_questions[-1])

        # turns labels into numbers
        le = preprocessing.LabelEncoder()
        le.fit(self.str_labels)
        self.numeral_labels = np.array(le.transform(self.str_labels))
        self.str_classes = le.classes_
        self.num_classes = len(self.str_classes)
        if self.verbose:
            print("\nSample questions and corresponding labels... \n")
            print(self.str_questions[0:5])
            print(self.str_labels[0:5])

    def manipulate_data(self):
        self.tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
        vocab = self.tokenizer.get_vocab()
        self.word2idx = {w: i for i, w in enumerate(vocab)}
        self.idx2word = {i:w for w,i in self.word2idx.items()}
        self.vocab_size = len(self.word2idx)

        token_ids = []
        num_seqs = []
        for text in self.str_questions:
          text_seqs = self.tokenizer.tokenize(str(text))
          token_ids = self.tokenizer.convert_tokens_to_ids(text_seqs)
          seq_tensor = torch.LongTensor(token_ids)
          num_seqs.append(seq_tensor)

        if num_seqs:
          self.numeral_data = pad_sequence(num_seqs, batch_first=True)
          self.num_sentences, self.max_seq_len = self.numeral_data.shape

    def train_valid_test_split(self, train_ratio=0.8, test_ratio = 0.1):
        train_size = int(self.num_sentences*train_ratio) +1
        test_size = int(self.num_sentences*test_ratio) +1
        valid_size = self.num_sentences - (train_size + test_size)
        data_indices = list(range(self.num_sentences))
        random.shuffle(data_indices)
        self.train_str_questions = [self.str_questions[i] for i in data_indices[:train_size]]
        self.train_numeral_labels = self.numeral_labels[data_indices[:train_size]]
        train_set_data = self.numeral_data[data_indices[:train_size]]
        train_set_labels = self.numeral_labels[data_indices[:train_size]]
        train_set_labels = torch.from_numpy(train_set_labels)
        train_set = torch.utils.data.TensorDataset(train_set_data, train_set_labels)
        self.test_str_questions = [self.str_questions[i] for i in data_indices[-test_size:]]
        self.test_numeral_labels = self.numeral_labels[data_indices[-test_size:]]
        test_set_data = self.numeral_data[data_indices[-test_size:]]
        test_set_labels = self.numeral_labels[data_indices[-test_size:]]
        test_set_labels = torch.from_numpy(test_set_labels)
        test_set = torch.utils.data.TensorDataset(test_set_data, test_set_labels)
        self.valid_str_questions = [self.str_questions[i] for i in data_indices[train_size:-test_size]]
        self.valid_numeral_labels = self.numeral_labels[data_indices[train_size:-test_size]]
        valid_set_data = self.numeral_data[data_indices[train_size:-test_size]]
        valid_set_labels = self.numeral_labels[data_indices[train_size:-test_size]]
        valid_set_labels = torch.from_numpy(valid_set_labels)
        valid_set = torch.utils.data.TensorDataset(valid_set_data, valid_set_labels)
        self.train_loader = DataLoader(train_set, batch_size=64, shuffle=True)
        self.test_loader = DataLoader(test_set, batch_size=64, shuffle=False)
        self.valid_loader = DataLoader(valid_set, batch_size=64, shuffle=False)

### Load Data

In [None]:
print('Loading data...')
try:
    DataManager.maybe_download("data", "train_2000.label", "http://cogcomp.org/Data/QA/QC/")
    dm = DataManager()
    dm.read_data("data/", ["train_2000.label"])
except:
    print("Using local file...")
    dm = DataManager()
    dm.read_data("", ["train_2000_practice.label"])

Loading data...
Downloaded successfully train_2000.label

Sample questions and corresponding labels... 

['manner how did serfdom develop in and then leave russia ?', 'cremat what films featured the character popeye doyle ?', "manner how can i find a list of celebrities ' real names ?", 'animal what fowl grabs the spotlight after the chinese year of the monkey ?', 'exp what is the full form of .com ?']
['DESC', 'ENTY', 'DESC', 'ENTY', 'ABBR']


In [6]:
dm.manipulate_data()
dm.train_valid_test_split(train_ratio=0.8, test_ratio = 0.1)

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


tokenizer_config.json:   0%|          | 0.00/48.0 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/466k [00:00<?, ?B/s]

config.json:   0%|          | 0.00/570 [00:00<?, ?B/s]

In [7]:
for x, y in dm.train_loader:
    print("Batch input shape:", x.shape)
    print("Batch label shape:", y.shape)
    break

Batch input shape: torch.Size([64, 36])
Batch label shape: torch.Size([64])


## 1.3 BaseTrainer Class

In [8]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

class BaseTrainer:
    def __init__(self, model, criterion, optimizer, train_loader, val_loader):
        self.model = model
        self.criterion = criterion
        self.optimizer = optimizer
        self.train_loader = train_loader
        self.val_loader = val_loader

    def fit(self, num_epochs):
        self.num_batches = len(self.train_loader)

        for epoch in range(num_epochs):
            print(f'Epoch {epoch + 1}/{num_epochs}')
            train_loss, train_accuracy = self.train_one_epoch()
            val_loss, val_accuracy = self.validate_one_epoch()
            print(
                f'{self.num_batches}/{self.num_batches} - train_loss: {train_loss:.4f} - train_accuracy: {train_accuracy*100:.4f}% \
                - val_loss: {val_loss:.4f} - val_accuracy: {val_accuracy*100:.4f}%')

    def train_one_epoch(self):
        self.model.train()
        running_loss, correct, total = 0.0, 0, 0
        for i, data in enumerate(self.train_loader):
            inputs, labels = data
            inputs, labels = inputs.to(device), labels.to(device)
            self.optimizer.zero_grad()
            outputs = self.model(inputs)
            loss = self.criterion(outputs, labels)
            loss.backward()
            self.optimizer.step()

            running_loss += loss.item()
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
        train_accuracy = correct / total
        train_loss = running_loss / self.num_batches
        return train_loss, train_accuracy

    def evaluate(self, loader):
        self.model.eval()
        loss, correct, total = 0.0, 0, 0
        with torch.no_grad():
            for data in loader:
                inputs, labels = data
                inputs, labels = inputs.to(device), labels.to(device)
                outputs = self.model(inputs)
                batch_loss = self.criterion(outputs, labels)
                loss += batch_loss.item()
                _, predicted = torch.max(outputs.data, 1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()

        accuracy = correct / total
        loss = loss / len(loader)
        return loss, accuracy

    def validate_one_epoch(self):
      val_loss, val_accuracy = self.evaluate(self.val_loader)
      return val_loss, val_accuracy

print("BaseTrainer class defined successfully!")

BaseTrainer class defined successfully!


# Section 2: Transformer-based Models for Sequence Modeling

## 2.1 Transformer Components

### MultiHeadAttention

In [9]:
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, num_heads):
        super(MultiHeadAttention, self).__init__()
        # Ensure that the model dimension (d_model) is divisible by the number of heads
        assert d_model % num_heads == 0, "d_model must be divisible by num_heads"

        # Initialize dimensions
        self.d_model = d_model # Model's dimension
        self.num_heads = num_heads # Number of attention heads
        self.d_k = d_model // num_heads # Dimension of each head's key, query, and value

        # Linear layers for transforming inputs
        self.W_q = nn.Linear(d_model, d_model) # Query transformation
        self.W_k = nn.Linear(d_model, d_model) # Key transformation
        self.W_v = nn.Linear(d_model, d_model) # Value transformation
        self.W_o = nn.Linear(d_model, d_model) # Output transformation

    def scaled_dot_product_attention(self, Q, K, V):
        # Calculate attention scores
        attn_scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.d_k)

        # Softmax is applied to obtain attention probabilities
        attn_probs = torch.softmax(attn_scores, dim=-1)

        # Multiply by values to obtain the final output
        output = torch.matmul(attn_probs, V)
        return output

    def split_heads(self, x):
        # Reshape the input to have num_heads for multi-head attention
        batch_size, seq_length, d_model = x.size()
        return x.view(batch_size, seq_length, self.num_heads, self.d_k).transpose(1, 2)

    def combine_heads(self, x):
        # Combine the multiple heads back to original shape
        batch_size, _, seq_length, d_k = x.size()
        return x.transpose(1, 2).contiguous().view(batch_size, seq_length, self.d_model)

    def forward(self, Q, K, V):
        # Apply linear transformations and split heads
        Q = self.split_heads(self.W_q(Q))
        K = self.split_heads(self.W_k(K))
        V = self.split_heads(self.W_v(V))

        # Perform scaled dot-product attention
        attn_output = self.scaled_dot_product_attention(Q, K, V)

        # Combine heads and apply output transformation
        output = self.W_o(self.combine_heads(attn_output))
        return output

print("MultiHeadAttention defined successfully!")

MultiHeadAttention defined successfully!


### PositionWiseFeedForward

In [10]:
class PositionWiseFeedForward(nn.Module):
    def __init__(self, d_model, d_ff):
        super(PositionWiseFeedForward, self).__init__()
        self.fc1 = nn.Linear(d_model, d_ff)
        self.fc2 = nn.Linear(d_ff, d_model)
        self.relu = nn.ReLU()

    def forward(self, x):
        return self.fc2(self.relu(self.fc1(x)))

print("PositionWiseFeedForward defined successfully!")

PositionWiseFeedForward defined successfully!


### PositionalEncoding

In [11]:
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_seq_length):
        super(PositionalEncoding, self).__init__()

        pe = torch.zeros(max_seq_length, d_model)
        position = torch.arange(0, max_seq_length, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * -(math.log(10000.0) / d_model))

        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)

        self.register_buffer('pe', pe.unsqueeze(0))

    def forward(self, x):
        return x + self.pe[:, :x.size(1)]

print("PositionalEncoding defined successfully!")

PositionalEncoding defined successfully!


### EncoderLayer

In [12]:
class EncoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout):
        super(EncoderLayer, self).__init__()
        self.self_attn = MultiHeadAttention(d_model, num_heads)
        self.feed_forward = PositionWiseFeedForward(d_model, d_ff)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        attn_output = self.self_attn(x, x, x)
        x = self.norm1(x + self.dropout(attn_output))
        ff_output = self.feed_forward(x)
        x = self.norm2(x + self.dropout(ff_output))
        return x

print("EncoderLayer defined successfully!")

EncoderLayer defined successfully!


## 2.2 TransformerClassifier Implementation

In [13]:
class TransformerClassifier(nn.Module):
    def __init__(self, embed_dim, num_heads, ff_dim, num_layers, dropout_rate=0.2, data_manager=None):
        super(TransformerClassifier, self).__init__()
        self.vocab_size = data_manager.vocab_size
        self.num_classes = data_manager.num_classes
        self.embed_dim = embed_dim
        self.max_seq_len = data_manager.max_seq_len
        self.num_heads = num_heads
        self.ff_dim = ff_dim
        self.num_layers = num_layers
        self.dropout_rate = dropout_rate

    def build(self):
        """Build the Transformer architecture"""
        # Embedding layer
        self.embedding = nn.Embedding(self.vocab_size, self.embed_dim)

        # Positional encoding
        self.pos_encoding = PositionalEncoding(self.embed_dim, self.max_seq_len)

        # Dropout
        self.dropout = nn.Dropout(self.dropout_rate)

        # Encoder layers
        self.encoder_layers = nn.ModuleList([
            EncoderLayer(self.embed_dim, self.num_heads, self.ff_dim, self.dropout_rate)
            for _ in range(self.num_layers)
        ])

        # Classification head
        self.classifier = nn.Linear(self.embed_dim, self.num_classes)

    def forward(self, x):
        """
        Forward pass through the Transformer
        Args:
            x: [batch_size, seq_len]
        Returns:
            logits: [batch_size, num_classes]
        """
        # Embedding: [batch_size, seq_len] -> [batch_size, seq_len, embed_dim]
        x = self.embedding(x)

        # Add positional encoding
        x = self.pos_encoding(x)

        # Apply dropout
        x = self.dropout(x)

        # Pass through encoder layers
        for encoder_layer in self.encoder_layers:
            x = encoder_layer(x)

        # Average pooling across sequence dimension: [batch_size, seq_len, embed_dim] -> [batch_size, embed_dim]
        x = torch.mean(x, dim=1)

        # Classification: [batch_size, embed_dim] -> [batch_size, num_classes]
        logits = self.classifier(x)

        return logits

print("TransformerClassifier defined successfully!")

TransformerClassifier defined successfully!


### Train TransformerClassifier

In [14]:
print("\n=== Training Transformer Classifier ===")
transformer = TransformerClassifier(embed_dim=512, num_heads=8, ff_dim=2048, num_layers=12, dropout_rate=0.1, data_manager=dm)
transformer.build()
transformer = transformer.to(device)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(transformer.parameters(), lr=1e-4, betas=(0.9, 0.98), eps=1e-9)
trainer = BaseTrainer(model=transformer, criterion=criterion, optimizer=optimizer, train_loader=dm.train_loader, val_loader=dm.valid_loader)
trainer.fit(num_epochs=30)


=== Training Transformer Classifier ===
Epoch 1/30
26/26 - train_loss: 2.0085 - train_accuracy: 21.6115%                 - val_loss: 1.7013 - val_accuracy: 12.6263%
Epoch 2/30
26/26 - train_loss: 1.7242 - train_accuracy: 18.7383%                 - val_loss: 1.6465 - val_accuracy: 12.6263%
Epoch 3/30
26/26 - train_loss: 1.6846 - train_accuracy: 21.0493%                 - val_loss: 1.6669 - val_accuracy: 27.2727%
Epoch 4/30
26/26 - train_loss: 1.6868 - train_accuracy: 22.4859%                 - val_loss: 1.6176 - val_accuracy: 27.2727%
Epoch 5/30
26/26 - train_loss: 1.6984 - train_accuracy: 20.7995%                 - val_loss: 1.5911 - val_accuracy: 27.2727%
Epoch 6/30
26/26 - train_loss: 1.6799 - train_accuracy: 21.2367%                 - val_loss: 1.5783 - val_accuracy: 27.2727%
Epoch 7/30
26/26 - train_loss: 1.6893 - train_accuracy: 24.2349%                 - val_loss: 1.6105 - val_accuracy: 27.2727%
Epoch 8/30
26/26 - train_loss: 1.4551 - train_accuracy: 34.2286%                 - v

## 2.3 Create BERT Dataset

In [15]:
model_name = "bert-base-uncased"  # BERT or any similar model

# Tokenize input and prepare model inputs
tokenizer = AutoTokenizer.from_pretrained(model_name)

dataset = Dataset.from_dict({"text": dm.str_questions, "label": dm.numeral_labels})

# Tokenize the dataset
def tokenize_function(examples):
    return tokenizer(examples["text"], padding="max_length", truncation=True, max_length=36)

dataset = dataset.map(tokenize_function, batched=True)
dataset.set_format(type="torch", columns=["input_ids", "attention_mask", "label"])
print(dataset)

Map:   0%|          | 0/2000 [00:00<?, ? examples/s]

Dataset({
    features: ['text', 'label', 'input_ids', 'token_type_ids', 'attention_mask'],
    num_rows: 2000
})


## 2.4 Split BERT Dataset

In [16]:
def train_valid_test_split(dataset, train_ratio=0.8, test_ratio=0.1):
    num_sentences = len(dataset)
    train_size = int(num_sentences*train_ratio) +1
    test_size = int(num_sentences*test_ratio) +1
    valid_size = num_sentences - (train_size + test_size)
    train_set = dataset[:train_size]
    train_set = Dataset.from_dict(train_set)
    train_set.set_format(type="torch", columns=["input_ids", "attention_mask", "label"])
    test_set = dataset[-test_size:]
    test_set = Dataset.from_dict(test_set)
    test_set.set_format(type="torch", columns=["input_ids", "attention_mask", "label"])
    valid_set = dataset[train_size:-test_size]
    valid_set = Dataset.from_dict(valid_set)
    valid_set.set_format(type="torch", columns=["input_ids", "attention_mask", "label"])
    train_loader = DataLoader(train_set, batch_size=64, shuffle=True)
    test_loader = DataLoader(test_set, batch_size=64, shuffle=False)
    valid_loader = DataLoader(valid_set, batch_size=64, shuffle=False)
    return train_loader, test_loader, valid_loader

In [17]:
train_loader, test_loader, valid_loader = train_valid_test_split(dataset)

## 2.5 PrefixTuningForClassification Implementation

In [18]:
class PrefixTuningForClassification(nn.Module):
    def __init__(self, model_name, prefix_length=None, data_manager=None):
        super(PrefixTuningForClassification, self).__init__()

        # Load the pretrained transformer model (BERT-like model)
        self.model = AutoModel.from_pretrained(model_name).to(device)
        self.hidden_size = self.model.config.hidden_size
        self.prefix_length = prefix_length
        self.num_classes = data_manager.num_classes

        # Freeze all BERT parameters
        for param in self.model.parameters():
            param.requires_grad = False

        # Create prefix embeddings if prefix_length is specified
        if self.prefix_length is not None:
            # Learnable prefix embeddings
            self.prefix_embeddings = nn.Parameter(
                torch.randn(self.prefix_length, self.hidden_size)
            )
            print(f"Created learnable prefix embeddings of shape {self.prefix_embeddings.shape}")
        else:
            print("No prefix embeddings (prefix_length=None)")

        # Classification head (always trainable)
        self.classifier = nn.Linear(self.hidden_size, self.num_classes)

    def forward(self, input_ids, attention_mask):
        """
        Forward pass with optional prefix tuning
        Args:
            input_ids: [batch_size, seq_len]
            attention_mask: [batch_size, seq_len]
        Returns:
            logits: [batch_size, num_classes]
        """
        batch_size = input_ids.size(0)

        if self.prefix_length is not None:
            # Get embeddings from BERT's embedding layer
            embeddings = self.model.embeddings(input_ids)
            # embeddings: [batch_size, seq_len, hidden_size]

            # Expand prefix embeddings for the batch
            # [prefix_length, hidden_size] -> [batch_size, prefix_length, hidden_size]
            prefix_embeddings_expanded = self.prefix_embeddings.unsqueeze(0).expand(batch_size, -1, -1)

            # Concatenate prefix embeddings with input embeddings
            # [batch_size, seq_len + prefix_length, hidden_size]
            embeddings = torch.cat([prefix_embeddings_expanded, embeddings], dim=1)

            # Extend attention mask for prefix
            # Create attention mask for prefix (all ones)
            prefix_attention_mask = torch.ones(
                batch_size, self.prefix_length,
                dtype=attention_mask.dtype,
                device=attention_mask.device
            )
            # Concatenate prefix attention mask with original attention mask
            extended_attention_mask = torch.cat([prefix_attention_mask, attention_mask], dim=1)

            # Convert attention mask to the format expected by BERT
            # BERT expects: 1 for tokens to attend to, 0 for tokens to ignore
            # We need to expand it to [batch_size, 1, 1, seq_len] for broadcasting
            extended_attention_mask = extended_attention_mask[:, None, None, :]
            extended_attention_mask = (1.0 - extended_attention_mask) * -10000.0

            # Pass through BERT encoder with custom embeddings
            encoder_outputs = self.model.encoder(
                embeddings,
                attention_mask=extended_attention_mask,
                return_dict=True
            )

            # Get the last hidden state
            last_hidden_state = encoder_outputs.last_hidden_state

        else:
            # No prefix - use standard BERT forward pass
            outputs = self.model(
                input_ids=input_ids,
                attention_mask=attention_mask,
                return_dict=True
            )
            last_hidden_state = outputs.last_hidden_state

        # Average pooling across sequence dimension
        # Mask out padding tokens before averaging
        if self.prefix_length is not None:
            # Use the extended attention mask (without prefix mask formatting)
            mask_expanded = torch.cat([prefix_attention_mask, attention_mask], dim=1).unsqueeze(-1)
        else:
            mask_expanded = attention_mask.unsqueeze(-1)

        # Masked average pooling
        masked_hidden = last_hidden_state * mask_expanded
        sum_hidden = torch.sum(masked_hidden, dim=1)
        sum_mask = torch.sum(mask_expanded, dim=1)
        pooled_output = sum_hidden / sum_mask
        # pooled_output: [batch_size, hidden_size]

        # Classification
        logits = self.classifier(pooled_output)
        # logits: [batch_size, num_classes]

        return logits

print("PrefixTuningForClassification defined successfully!")

PrefixTuningForClassification defined successfully!


## 2.6 FineTunedBaseTrainer Class

In [19]:
class FineTunedBaseTrainer:
    def __init__(self, model, criterion, optimizer, train_loader, val_loader):
        self.model = model
        self.criterion = criterion
        self.optimizer = optimizer
        self.train_loader = train_loader
        self.val_loader = val_loader

    def fit(self, num_epochs):
        self.num_batches = len(self.train_loader)

        for epoch in range(num_epochs):
            print(f'Epoch {epoch + 1}/{num_epochs}')
            train_loss, train_accuracy = self.train_one_epoch()
            val_loss, val_accuracy = self.validate_one_epoch()
            print(
                f'{self.num_batches}/{self.num_batches} - train_loss: {train_loss:.4f} - train_accuracy: {train_accuracy*100:.4f}% \
                - val_loss: {val_loss:.4f} - val_accuracy: {val_accuracy*100:.4f}%')

    def train_one_epoch(self):
        self.model.train()
        running_loss, correct, total = 0.0, 0, 0
        for batch in self.train_loader:
            input_ids = batch["input_ids"].to(device)
            attention_mask = batch["attention_mask"].to(device)
            labels = batch["label"].to(device)
            self.optimizer.zero_grad()
            outputs = self.model(input_ids=input_ids, attention_mask=attention_mask)
            loss = self.criterion(outputs, labels)
            loss.backward()
            self.optimizer.step()

            running_loss += loss.item()
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
        train_accuracy = correct / total
        train_loss = running_loss / self.num_batches
        return train_loss, train_accuracy

    def evaluate(self, loader):
        self.model.eval()
        loss, correct, total = 0.0, 0, 0
        with torch.no_grad():
            for batch in loader:
                input_ids = batch["input_ids"].to(device)
                labels = batch["label"].to(device)
                attention_mask = batch["attention_mask"].to(device)
                outputs = self.model(input_ids=input_ids, attention_mask=attention_mask)
                batch_loss = self.criterion(outputs, labels)
                loss += batch_loss.item()
                _, predicted = torch.max(outputs.data, 1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()

        accuracy = correct / total
        loss = loss / len(loader)
        return loss, accuracy

    def validate_one_epoch(self):
      val_loss, val_accuracy = self.evaluate(self.val_loader)
      return val_loss, val_accuracy

print("FineTunedBaseTrainer defined successfully!")

FineTunedBaseTrainer defined successfully!


## 2.7 Train Prefix-Prompt Tuning Model

In [20]:
print("\n=== Training Prefix Tuning Model ===")
prefix_tuning_model = PrefixTuningForClassification(model_name="bert-base-uncased", prefix_length=5, data_manager=dm).to(device)


=== Training Prefix Tuning Model ===


model.safetensors:   0%|          | 0.00/440M [00:00<?, ?B/s]

Created learnable prefix embeddings of shape torch.Size([5, 768])


In [21]:
if prefix_tuning_model.prefix_length is not None:
    optimizer = torch.optim.Adam(list(prefix_tuning_model.classifier.parameters()) + [prefix_tuning_model.prefix_embeddings], lr=5e-5)
else:
    optimizer = torch.optim.Adam(prefix_tuning_model.classifier.parameters(), lr=1e-4)

criterion = nn.CrossEntropyLoss()
trainer = FineTunedBaseTrainer(model=prefix_tuning_model, criterion=criterion, optimizer=optimizer, train_loader=train_loader, val_loader=valid_loader)
trainer.fit(num_epochs=100)

Epoch 1/100
26/26 - train_loss: 1.7451 - train_accuracy: 23.2979%                 - val_loss: 1.7436 - val_accuracy: 22.7273%
Epoch 2/100
26/26 - train_loss: 1.7100 - train_accuracy: 26.3585%                 - val_loss: 1.7049 - val_accuracy: 26.7677%
Epoch 3/100
26/26 - train_loss: 1.6649 - train_accuracy: 26.6708%                 - val_loss: 1.6755 - val_accuracy: 28.2828%
Epoch 4/100
26/26 - train_loss: 1.6443 - train_accuracy: 29.5440%                 - val_loss: 1.6502 - val_accuracy: 31.3131%
Epoch 5/100
26/26 - train_loss: 1.6231 - train_accuracy: 31.4803%                 - val_loss: 1.6248 - val_accuracy: 33.3333%
Epoch 6/100
26/26 - train_loss: 1.6089 - train_accuracy: 33.2292%                 - val_loss: 1.6060 - val_accuracy: 34.8485%
Epoch 7/100
26/26 - train_loss: 1.5980 - train_accuracy: 33.8538%                 - val_loss: 1.5886 - val_accuracy: 36.8687%
Epoch 8/100
26/26 - train_loss: 1.5715 - train_accuracy: 36.7895%                 - val_loss: 1.5706 - val_accuracy: 3

## 2.8 Best Model Section

After analyzing all models trained across the three assignments, the **AttentionRNN model** achieved the highest test set accuracy. 

## Model Performance Comparison

Model | Test Accuracy |
-------|---------------|
TextCNN/Logistic Regression | 97.01% |
**AttentionRNN (GRU-based)** | **98.01%** ⭐ |
PrefixTuningForClassification | 82.59% |
TransformerClassifier | Not evaluated on test set |


#### Model Hyperparameters:
- **cell_type**: `'gru'` - GRU cells for recurrent layers
- **embed_size**: `128` - Dimensionality of word embeddings
- **state_sizes**: `[64, 128]` - Hidden state dimensions for the 2 GRU layers
  - First layer: 64 hidden units
  - Second layer: 128 hidden units
- **output_type**: `'mean'` - Aggregation method (mean pooling with attention)
- **vocab_size**: `30522` - BERT tokenizer vocabulary size
- **num_classes**: `6` - Number of output classes (ABBR, DESC, ENTY, HUM, LOC, NUM)

#### Training Hyperparameters:
- **optimizer**: `Adam`
- **learning_rate**: `0.001` (1e-3)
- **criterion**: `CrossEntropyLoss`
- **num_epochs**: `30`
- **batch_size**: `64`
- **train_ratio**: `0.8` (80% training data)
- **test_ratio**: `0.1` (10% test data)
- **validation_ratio**: `0.1` (10% validation data)
- **random_seed**: `1234`


The AttentionRNN class extends BaseRNN and implements a custom attention mechanism:

```python
class AttentionRNN(BaseRNN):
    def __init__(self, cell_type='gru', embed_size=128, state_sizes=[128, 128],
                 output_type="mean", data_manager=None):
        super().__init__(cell_type, embed_size, state_sizes, output_type, data_manager)

    def build(self):
        # Inherits embedding and RNN layers from BaseRNN
        super().build()

        # Add attention mechanism
        self.attention = MyAttention(output_length=self.state_sizes[-1])

    def forward(self, x):
        # x: [batch_size, seq_len]
        x = self.embedding(x)  # [batch_size, seq_len, embed_size]

        # Pass through RNN layers
        all_states, last_state = self.rnn(x)
        # all_states: [batch_size, seq_len, hidden_size]
        # last_state: [batch_size, hidden_size]

        # Apply attention
        context_vector = self.attention(all_states, last_state)
        # context_vector: [batch_size, hidden_size]

        # Classification
        logits = self.fc(context_vector)
        # logits: [batch_size, num_classes]

        return logits
```

### MyAttention Mechanism

```python
class MyAttention(nn.Module):
    def __init__(self, output_length):
        super(MyAttention, self).__init__()
        self.output_length = output_length
        self.U = None  # Initialized during first forward pass
        self.V = None

    def forward(self, all_states, last_state):
        # all_states: [batch_size, seq_len, state_size]
        batch_size, seq_len, state_size = all_states.shape

        # Lazy initialization
        if self.U is None:
            self.U = nn.Linear(state_size, self.output_length).to(all_states.device)
            self.V = nn.Linear(self.output_length, 1).to(all_states.device)

        # Compute attention scores
        scores = self.U(all_states)  # [batch_size, seq_len, output_length]
        scores = torch.tanh(scores)
        scores = self.V(scores).squeeze(-1)  # [batch_size, seq_len]

        # Attention weights
        attention_weights = F.softmax(scores, dim=1)  # [batch_size, seq_len]

        # Context vector (weighted sum)
        context_vector = torch.bmm(
            attention_weights.unsqueeze(1),  # [batch_size, 1, seq_len]
            all_states  # [batch_size, seq_len, state_size]
        ).squeeze(1)  # [batch_size, state_size]

        return context_vector
```


## Conclusion

The **AttentionRNN model achieves 98.01% test accuracy**, which:
- Outperforms all other models
- Demonstrates strong generalization with only 1.99% test error
- Successfully leverages attention mechanism for question classification
