### Web Intelligence - Exercise 12

In this last exercise, we will explore the foundational principles and practical applications of the *Transformer* encoder, a key building block in modern natural language processing (NLP). The Transformer architecture, introduced in the landmark paper "*Attention is All You Need*", has become a cornerstone of NLP due to its ability to efficiently model long-range dependencies and perform computations in parallel.


Our focus will be on implementing the Transformer encoder for a sentiment analysis task using the [IMDB movie review](https://huggingface.co/datasets/stanfordnlp/imdb) dataset. This task will help you understand the core components of the Transformer encoder, including the self-attention mechanism, positional encodings, and feedforward layers, and how these contribute to processing sequential data effectively. To support you in this process, a Jupyter Notebook file is provided, guiding you through data preprocessing, model construction, training, and evaluation step by step.

**Question 1.** Sentiment Analysis

In [5]:
import re
import torch
from torch.utils.data import Dataset, DataLoader
from collections import Counter
from datasets import load_dataset # 
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
from nltk.stem import WordNetLemmatizer
import math

Load the dataset and prepare the training and testing sets

In [6]:
imdb = load_dataset("imdb")
train_data = imdb['train']["text"]
train_labels = imdb["train"]["label"]
test_data = imdb['test']["text"]
test_labels = imdb["test"]["label"]

Preprocess the data by lowercasing, removing punctuation, special characters, and stop words etc.

In [8]:
stop_words = set(stopwords.words("english"))

# Preprocess raw text
def clean_data(texts):
    
    # Initialize the lemmatizer
    lemmatizer = WordNetLemmatizer()
    
    cleaned_texts = []
    for text in texts:
        
        
    return cleaned_texts

train_data = clean_data(train_data)
test_data = clean_data(test_data)

[nltk_data] Downloading package stopwords to
[nltk_data]     C:\Users\caspa\AppData\Roaming\nltk_data...
[nltk_data]   Unzipping corpora\stopwords.zip.
[nltk_data] Downloading package punkt to
[nltk_data]     C:\Users\caspa\AppData\Roaming\nltk_data...
[nltk_data]   Unzipping tokenizers\punkt.zip.
[nltk_data] Downloading package wordnet to
[nltk_data]     C:\Users\caspa\AppData\Roaming\nltk_data...


Build a vocabulary and map each word to a unique integer value.

**Note**.  We can only consider the top-$K$ most common words in the corpus so we can define an additional token (i.e. "$<$unk$>$") to indicate these "unknown" or discarded words in the dataset. We will also apply the padding to have equal-length sequences. In other words, if a given input sequence is shorter than the expected length, then we will add additional tokens to reach the desired sequence size. For this purpose, we can define another token (i.e., "$<$pad$>$") to represent these tokens.

In [10]:
def tokenize(text):
    return text.split()

# Tokenizer and Vocabulary
def tokenize_and_build_vocab(texts, vocab_size=20000):
    
    
    return vocab

vocab = tokenize_and_build_vocab(train_data, vocab_size=20000)



Convert the dataset into sequences of token indices.

In [None]:
# Convert the dataset into sequences of token indices.
def token2indices(texts, vocab):
    '''
    Converts a list of texts to token indices. If tokens are not in vocab, use the <unk> token.
    :param texts: 
    :param vocab: 
    :return: 
    '''
    integer_sequences = []
    for text in texts:
        current_seq = [vocab.get(token, vocab["<unk>"]) for token in word_tokenize(text)]
        integer_sequences.append(current_seq)
            
    return integer_sequences

train_int_data = token2indices(train_data, vocab)
test_int_data = token2indices(test_data, vocab)

Define the dataset and data loaders required for the training procedure as we did in the previous exercises. We will also implement a function to pad/truncate sequences to a fixed length and set the *collate_fn* parameter of *PyTorch*'s *DataLoader* class.

In [None]:
batch_size = 32
seq_len = 500

class IMDBDataset(Dataset):
    def __init__(self, texts, label):
        self.texts = texts
        self.label = label
    
    def __len__(self):
        return len(self.texts)
    
    def __getitem__(self, index):
        
        return self.texts[index], self.label[index]
        

# Data collator for padding
def collate_batch(batch, max_len):
    
    
    return padded_sequence, labels

# Define the datasets
train_dataset = IMDBDataset(train_int_data, train_labels)
test_dataset = IMDBDataset(test_int_data, test_labels)

# Define the data loader
train_loader = DataLoader(train_dataset, shuffle=True, batch_size=batch_size, collate_fn=lambda batch: collate_batch(batch, max_len=seq_len))
test_loader = DataLoader(test_dataset, shuffle=True, batch_size=batch_size, collate_fn=lambda batch: collate_batch(batch, max_len=seq_len))

Implement a positional encoding function to inject sequence order information into the token embeddings. Use sinusoidal or learned positional encodings.

In [None]:
class PositionalEncoding(torch.nn.Module):
    '''
    Positional encoding module
    
    '''
    def __init__(self, model_dim, seq_len, device='cpu'):
        """
        theta = p / 10000^(2d/model_dim) = p * exp(-(2d/model_dim)*log(10000))
        pe[:, 0::2] = sin(theta) and pe[:, 1::2] = cos(theta)
        
        :param model_dim: model dimension
        :param seq_len: sequence length
        :param device: device (default: cpu)
        """
        super().__init__()
        
        self.pos_emb = 

    def forward(self, current_emb):
        """
        Adds the positional encoding to the input embedding tensor
        :param current_emb: a tensor of shape (batch_size, seq_len, model_dim)
        :return: 
        """
        return current_emb + self.pos_emb

- Calculate **query (Q)**, **key (K)**, and **value (V)** matrices by applying learned linear transformations to the input embeddings.
    
- Compute scaled dot-product attention for each token: 
$$
    Attention(Q,K,V) = Softmax\left( \frac{QK^\top}{\sqrt{d_K}} \right)V
$$
where $d_K$ is the dimensionality of the key vectors.

- Implement the multi-head attention by splitting embeddings into multiple heads, applying the attention mechanism for each head, and concatenating the results.

In [None]:
class ScaledDotProductAttention(torch.nn.Module):
    """
    Implements Scaled Dot-Product Attention
    The forward method returns the scaled dot-product attention
    """
    def __init__(self):
        super().__init__()

    def forward(self, query, key, value):
        """
        Implements Scaled Dot-Product Attention
        :param query: a tensor of shape (batch_size, heads_num, seq_len, key_dim)
        :param key: a tensor of shape (batch_size, heads_num, seq_len, key_dim)
        :param value: a tensor of shape (batch_size, heads_num, seq_len, key_dim)
        :return: dot-product and attention
        """
        
        return 

class MultiHeadAttention(torch.nn.Module):
    def __init__(self, model_dim, num_heads, device="cpu"):
        super().__init__()
        
        assert model_dim % num_heads == 0, "model_dim must be divisible by num_heads"



    

    def forward(self, query, key, value):
        """
        Implements Scaled Dot-Product Attention for multi-head attention
        Instead of concatenating individual heads, we can build tensors in the form of already concatenated
        :param query: a tensor of shape (batch_size, seq_len, model_dim)
        :param key: a tensor of shape (batch_size, seq_len, model_dim)
        :param value: a tensor of shape (batch_size, seq_len, model_dim)
        :return: 
        """
        
        
        return output

Implement the encoder component that consists of the following steps:
- A multi-head attention mechanism.
- A feedforward network consisting of two linear layers separated by a ReLU activation.
- Residual connections around both the attention mechanism and the feedforward network.
- Layer normalization after each residual connection.

In [None]:
class EncoderBlock(torch.nn.Module):
    def __init__(self, model_dim, heads_num, hidden_dim, dropout=0.1, device="cpu"):
        """
        Defines the encoder block
        :param model_dim: the model dimension
        :param heads_num: the number of heads
        :param hidden_dim: the dimension of the feedforward network model
        :param dropout: the dropout rate (default: 0.1)
        :param device: the device (default: cpu)
        """
        super(EncoderBlock, self).__init__()
        

    

    def forward(self, x):


        
        return output

- Initialize the word embeddings and add position encodings to them.
- **Stack Multiple Encoder Blocks.** Arrange multiple encoder blocks sequentially to construct the full encoder.

In [None]:
class Encoder(torch.nn.Module):
    def __init__(self, vocab_size, model_dim, heads_num, layers_num, hidden_dim, seq_len, dropout=0.1, device="cpu"):
        super(Encoder, self).__init__()
        """
        :param vocab_size: the vocabulary size
        :param model_dim: the model dimension
        :param heads_num: the number of heads
        :param layers_num: the number of layers
        :param hidden_dim: the dimension of the feedforward network model
        :param seq_len: the sequence length
        :param dropout: the dropout rate (default: 0.1)
        :param device: the device type (default: cpu)
        """
        


    

    def forward(self, sequences):
        """
        Implements the forward pass of the encoder
        :param sequences: a tensor storing word indices of shape (batch_size, seq_len)
        :return: 
        """
        

        return x

**Building a classifier**
- Use a feedforward output layer with a sigmoid activation function to predict the sentiment label (positive or negative) by using the mean of the outputs of the encoder architecture.

In [None]:
embedding_dim = 100
heads_num = 2
layers_num = 3
hidden_dim = 128
dropout = 0.1
lr = 1e-3
epochs = 10
device = torch.device('cpu') 
#torch.device('cuda' if torch.cuda.is_available() else 'cpu')

class Classifier(torch.nn.Module):
  def __init__(self, vocab_size, model_dim, heads_num, layers_num, hidden_dim, seq_len, dropout, device):
      super(Classifier, self).__init__()
      
      self.encoder = Encoder(vocab_size, model_dim, heads_num, layers_num, hidden_dim, seq_len, dropout, device)
      self.fc = torch.nn.Linear(model_dim, 1)

  def forward(self, x):
    encoder_output = self.encoder(x)
    
 
      
    return output
  
model = Classifier(
    len(vocab), model_dim=embedding_dim, 
    heads_num=heads_num, layers_num=layers_num, 
    hidden_dim=hidden_dim, seq_len=seq_len, 
    dropout=dropout, device=device
)

model.to(device)

In [None]:
import torch.optim as optim
from torch.nn import BCELoss
from tqdm.notebook import tqdm

# Training Setup
loss_func = BCELoss()
optimizer = optim.Adam(model.parameters(), lr=lr)

# Training Loop
def train_model(model, train_loader, loss_func, optimizer, epochs=5):
    model.train()
    for epoch in tqdm(range(epochs),desc='Epoch'):
        
        total_loss, total_count, correct = 0, 0, 0
        for texts, labels in tqdm(train_loader, desc='Batch'):
            texts, labels = texts.to(device), labels.to(device).view(-1, 1).to(torch.float)

            # Forward pass
            predictions = model(texts)
            loss = loss_func(predictions, labels)
            correct += ((predictions > 0.5) == labels).sum().item()

            # Backward pass and optimization
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            total_loss += loss.item()
            
            total_count += len(labels)
        
        accuracy = correct / total_count
        print(f"Epoch {epoch+1}/{epochs}, Loss: {total_loss/len(train_loader)}, Accuracy: {accuracy}")
        
train_model(model, train_loader, loss_func, optimizer, epochs=epochs)


Implement an evaluation function to measure accuracy on the training and testing dataset.

In [None]:

def evaluate_model(model, data_loader):
    model.eval()
    correct, total = 0, 0
    with torch.no_grad():


    

    print(f"Accuracy: {correct/total:.2f}")

evaluate_model(model, train_loader)
evaluate_model(model, test_loader)