## Load needed packages

In [37]:
import numpy as np
import polars as pl
from sklearn.model_selection import train_test_split
import re
import torch
import torch.nn as nn
import torch.nn.functional as F
from transformers import AutoTokenizer

## Load data

In [2]:
df = pl.read_csv("../data/imdb.csv")

In [3]:
df.select(pl.col("sentiment").value_counts()).unnest(pl.col("sentiment"))

sentiment,count
str,u32
"""negative""",25000
"""positive""",25000


In [4]:
# Replace sentiment values with integers
# 1 for positive, 0 for negative
df = df.with_columns(
    pl.col("sentiment").replace("positive", 1).replace("negative", 0).cast(pl.Int8)
)

In [5]:
df.head()

review,sentiment
str,i8
"""One of the other reviewers has…",1
"""A wonderful little production.…",1
"""I thought this was a wonderful…",1
"""Basically there's a family whe…",0
"""Petter Mattei's ""Love in the T…",1


In [6]:
# Split the dataset into training and testing sets
# 80% for training, 20% for testing
# Stratified split to maintain the proportion of sentiment classes
X_train, X_test, y_train, y_test = train_test_split(
    df["review"],
    df["sentiment"],
    test_size=0.2,
    random_state=42,
    stratify=df["sentiment"],
)

In [7]:
y_train.value_counts()

sentiment,count
i8,u32
1,20000
0,20000


In [8]:
X_train = X_train.to_list()
X_test = X_test.to_list()
y_train = y_train.to_list()
y_test = y_test.to_list()

## Preprocessing

### Text cleaning (remove HTML tags, special characters)

In [9]:
def remove_html_tags(text):
    """Remove HTML tags from text."""
    clean = re.compile(
        "<.*?>"
    )  # Regex to match HTML tags, ? indicates non-greedy matching
    return re.sub(clean, "", text)

In [10]:
def remove_special_characters(text):
    """Remove special characters from text."""
    return re.sub(
        r"[^a-zA-Z0-9\s.,!?\"']", " ", text
    ).lower()  # Keep space and common punctuation marks

In [11]:
def clean_text(text):
    """Clean text by removing HTML tags and special characters."""
    text = remove_html_tags(text)
    text = remove_special_characters(text)
    return text

### Tokenisation

The tokenizer from HuggingFace handles vocabulary building, sequence padding, and truncation as well.

In [12]:
# Load a pre-trained tokenizer. 'bert-base-uncased' is a good general-purpose model.
# The 'uncased' means it expects lowercase input, which aligns with our cleaning.
# Setting `do_lower_case=False` because we already lowercased the text.
tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased", do_lower_case=False)

In [13]:
VOCAB_SIZE = tokenizer.vocab_size
UNK_TOKEN_ID = tokenizer.unk_token_id
PAD_TOKEN_ID = tokenizer.pad_token_id
CLS_TOKEN_ID = tokenizer.cls_token_id  # [CLS] token for classification tasks
SEP_TOKEN_ID = tokenizer.sep_token_id  # [SEP] token to separate sequences

In [14]:
VOCAB_SIZE, UNK_TOKEN_ID, PAD_TOKEN_ID, CLS_TOKEN_ID, SEP_TOKEN_ID

(30522, 100, 0, 101, 102)

In [15]:
max_l = [len(tokenizer.encode(clean_text(i), add_special_tokens=True)) for i in X_train]

Token indices sequence length is longer than the specified maximum sequence length for this model (670 > 512). Running this sequence through the model will result in indexing errors


In [None]:
np.quantile(max_l, 0.90)  # 90th percentile of the sequence lengths
# This gives us an idea of the maximum sequence length we might need to handle.
# We can use this to set a maximum length for padding/truncation in our model.

np.float64(561.0)

In [25]:
MAX_SEQ_LENGTH = 512  # Set a maximum sequence length for padding/truncation

In [26]:
def text_pipeline_hf(text, max_len, hf_tokenizer):
    """
    Combines all preprocessing steps for a single text review using Hugging Face tokenizer:
    1. Cleans the text.
    2. Tokenizes, numericalizes, pads/truncates, and adds special tokens.
    3. Returns a dictionary with 'input_ids' and 'attention_mask'.
    """
    cleaned = clean_text(text)
    encoding = hf_tokenizer(
        cleaned,
        add_special_tokens=True,  # Add [CLS] and [SEP]
        max_length=max_len,  # Max length for padding/truncation
        padding="max_length",  # Pad to max_len
        truncation=True,  # Truncate if longer than max_len
        return_tensors="pt",  # Return PyTorch tensors
    )
    return encoding  # Returns a BatchEncoding object (like a dict)

In [None]:
# Process all reviews
all_input_ids = []
all_attention_masks = []
for review in X_train:
    encoding = text_pipeline_hf(review, MAX_SEQ_LENGTH, tokenizer)
    all_input_ids.append(encoding["input_ids"])
    all_attention_masks.append(encoding["attention_mask"])

In [None]:
all_test_ids = []
all_test_attention_masks = []
for review in X_test:
    encoding = text_pipeline_hf(review, MAX_SEQ_LENGTH, tokenizer)
    all_test_ids.append(encoding["input_ids"])
    all_test_attention_masks.append(encoding["attention_mask"])

In [36]:
# Concatenate all individual tensors into single batch tensors
text_input_ids_tensor = torch.cat(all_input_ids, dim=0)
text_attention_mask_tensor = torch.cat(all_attention_masks, dim=0)

# Convert the test set tensors
text_test_input_ids_tensor = torch.cat(all_test_ids, dim=0)
text_test_attention_mask_tensor = torch.cat(all_test_attention_masks, dim=0)

# Convert labels to a PyTorch tensor
labels_tensor = torch.tensor(y_train, dtype=torch.int8)
labels_tensor_test = torch.tensor(y_test, dtype=torch.int8)

## Transformer

In [None]:
# --- 1. MultiHeadSelfAttention Module ---
# This module implements the self-attention mechanism, allowing the model
# to weigh the importance of different words in the input sequence.


class MultiHeadSelfAttention(nn.Module):
    """
    Implements the Multi-Head Self-Attention mechanism.

    Args:
        d_model (int): The dimension of the input embeddings.
        num_heads (int): The number of attention heads.
    """

    def __init__(self, d_model, num_heads, dropout=0.1):
        super().__init__()

        # Ensure that d_model is divisible by num_heads
        assert d_model % num_heads == 0, "embed_dim must be divisible by num_heads"

        self.embed_dim = d_model
        self.num_heads = num_heads
        self.head_dim = d_model // num_heads

        # Linear layers for Query, Key, Value projections
        # These project the input into different spaces for each head.
        self.q_proj = nn.Linear(d_model, d_model)
        self.k_proj = nn.Linear(d_model, d_model)
        self.v_proj = nn.Linear(d_model, d_model)

        # Output linear layer to combine the outputs of all heads
        self.out_proj = nn.Linear(d_model, d_model)

        self.attn_dropout = nn.Dropout(dropout)

    def forward(self, query, key, value, mask=None):
        """
        Forward pass for Multi-Head Self-Attention.

        Args:
            query (torch.Tensor): Input tensor for queries (batch_size, seq_len, d_model).
            key (torch.Tensor): Input tensor for keys (batch_size, seq_len, d_model).
            value (torch.Tensor): Input tensor for values (batch_size, seq_len, d_model).
            mask (torch.Tensor, optional): An optional mask tensor (batch_size, 1, 1, seq_len)
                                          to prevent attention to padded tokens.
                                          Typically 0 for padded positions, 1 for actual tokens.

        Returns:
            torch.Tensor: Output tensor after attention (batch_size, seq_len, d_model).
            torch.Tensor: Attention weights (batch_size, num_heads, seq_len, seq_len).
        """
        batch_size = query.shape[0]

        # 1. Linear projections for Q, K, V
        # Shape after projection: (batch_size, seq_len, d_model)
        Q = self.q_proj(query)
        K = self.k_proj(key)
        V = self.v_proj(value)

        # 2. Split into multiple heads and reshape
        # Reshape to (batch_size, seq_len, num_heads, head_dim)
        # Then permute to (batch_size, num_heads, seq_len, head_dim) for batch matrix multiplication
        Q = Q.view(batch_size, -1, self.num_heads, self.head_dim).permute(0, 2, 1, 3)
        K = K.view(batch_size, -1, self.num_heads, self.head_dim).permute(0, 2, 1, 3)
        V = V.view(batch_size, -1, self.num_heads, self.head_dim).permute(0, 2, 1, 3)

        # 3. Calculate attention scores (Q @ K_T)
        # (batch_size, num_heads, seq_len, head_dim) @ (batch_size, num_heads, head_dim, seq_len)
        # -> (batch_size, num_heads, seq_len, seq_len)
        attention_scores = torch.matmul(Q, K.transpose(-2, -1)) / np.sqrt(self.head_dim)

        # 4. Apply mask (if provided)
        # Masking is typically used to ignore padding tokens.
        if mask is not None:
            # Expand mask to match attention_scores dimensions
            # mask shape: (batch_size, 1, 1, seq_len) -> (batch_size, 1, seq_len, seq_len)
            # The mask should be broadcastable.
            attention_scores = attention_scores.masked_fill(mask == 0, float("-inf"))

        # 5. Apply softmax to get attention probabilities
        attention_weights = F.softmax(attention_scores, dim=-1)

        # 6. Apply dropout to attention weights
        attention_weights = self.attn_dropout(attention_weights)

        # 7. Multiply attention weights with V
        # (batch_size, num_heads, seq_len, seq_len) @ (batch_size, num_heads, seq_len, head_dim)
        # -> (batch_size, num_heads, seq_len, head_dim)
        context_layer = torch.matmul(attention_weights, V)

        # 8. Concatenate heads and reshape back to original embed_dim
        # Permute back to (batch_size, seq_len, num_heads, head_dim)
        # Then reshape to (batch_size, seq_len, embed_dim)
        # The permute() operation can make a tensor non-contiguous.
        # Since the subsequent view() operation requires a contiguous tensor to reshape
        # .contiguous() is called in between to ensure the memory layout is correct for the view() operation to succeed.
        context_layer = (
            context_layer.permute(0, 2, 1, 3)
            .contiguous()
            .view(batch_size, -1, self.embed_dim)
        )

        # 9. Final linear projection
        output = self.out_proj(context_layer)

        return output, attention_weights


# --- 2. PositionalEncoding Module ---
# Transformers are permutation-invariant, meaning they don't inherently understand
# the order of words. Positional Encoding adds information about the position
# of each token in the sequence.


class PositionalEncoding(nn.Module):
    """
    Implements the Positional Encoding mechanism.
    Adds sinusoidal positional encodings to the input embeddings.

    Args:
        d_model (int): The dimension of the input embeddings.
        max_seq_len (int): The maximum sequence length the model is expected to handle.
        dropout (float): Dropout rate.
    """

    def __init__(self, d_model, max_seq_len, dropout=0.1):
        super().__init__()
        self.dropout = nn.Dropout(dropout)

        # Create a positional encoding matrix
        # pe shape: (max_seq_len, d_model)
        pe = torch.zeros(max_seq_len, d_model)
        # position shape: (max_seq_len, 1)
        position = torch.arange(0, max_seq_len, dtype=torch.float).unsqueeze(1)
        # div_term shape: (d_moel / 2)
        div_term = torch.exp(
            torch.arange(0, d_model, 2).float() * (-np.log(10000.0) / d_model)
        )

        # Apply sine to even indices in pe, cosine to odd indices
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)

        # Add an extra dimension for batch (1, max_seq_len, d_model)
        # This allows it to be broadcasted to input_embeddings (batch_size, seq_len, d_model)
        self.register_buffer("pe", pe.unsqueeze(0))  # 'pe' is not a learnable parameter

    def forward(self, x):
        """
        Forward pass for Positional Encoding.

        Args:
            x (torch.Tensor): Input tensor (batch_size, seq_len, d_model).

        Returns:
            torch.Tensor: Output tensor with positional encodings added.
        """
        # Add positional encoding to the input embeddings
        # x is (batch_size, seq_len, d_model)
        # self.pe is (1, max_seq_len, d_model)
        # We slice self.pe to match the current sequence length of x
        x = x + self.pe[:, : x.size(1), :]
        return self.dropout(x)  # Apply dropout to the output