# Problem: Build a Transformer Model from Scratch

## Objective
Implement a **Transformer model** in PyTorch for sequence processing and prediction. The model should include an embedding layer, a Transformer encoder, and an output projection layer.

## Tasks

1. Implement Positional Encoding to inject sequence order into embeddings  
Create sinusoidal positional encodings that are added to input embeddings to provide order information.

2. Implement Multi-Head Self Attention mechanism  
Apply attention in parallel across multiple heads to capture different representation subspaces.

3. Linear projection of queries, keys, and values  
Use a single linear layer to project input into concatenated Q, K, V tensors.

4. Scaled dot-product attention  
Compute attention scores by scaled dot product of queries and keys, followed by softmax and application to values.

5. Output projection after head concatenation  
Concatenate the outputs of all heads and project back to the original embedding dimension.

6. Implement FeedForward layer used within Transformer blocks  
Build a two-layer MLP with a ReLU activation in between to process each token independently.

7. Connect components in a TransformerEncoderLayer with proper layer normalization and residual connections  
Apply residual connections and layer normalization around the attention and feedforward sublayers.


## Requirements

- Support padded input sequences for variable-length data.
- Ensure the model handles batched inputs with correct tensor shapes.


In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import math

In [2]:
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super().__init__()
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2) * (-math.log(10000.0) / d_model))

        pe[:, 0::2] = torch.sin(position * div_term)  # Even indices
        pe[:, 1::2] = torch.cos(position * div_term)  # Odd indices

        pe = pe.unsqueeze(0)  # Shape: [1, max_len, d_model]
        self.register_buffer('pe', pe)

    def forward(self, x):
        seq_len = x.size(1)
        return x + self.pe[:, :seq_len]


class MultiHeadSelfAttention(nn.Module):
    def __init__(self, embed_dim, num_heads):
        super().__init__()
        assert embed_dim % num_heads == 0, "Embedding dimension must be divisible by number of heads"

        self.num_heads = num_heads
        self.head_dim = embed_dim // num_heads

        self.qkv_proj = nn.Linear(embed_dim, 3 * embed_dim)
        self.out_proj = nn.Linear(embed_dim, embed_dim)

    def forward(self, x):
        B, T, D = x.shape
        qkv = self.qkv_proj(x)  # Shape: (B, T, 3*D)
        qkv = qkv.reshape(B, T, 3, self.num_heads, self.head_dim).permute(2, 0, 3, 1, 4)
        q, k, v = qkv[0], qkv[1], qkv[2]  # Each is (B, num_heads, T, head_dim)

        scores = (q @ k.transpose(-2, -1)) / math.sqrt(self.head_dim)  # (B, num_heads, T, T)
        attn_weights = torch.softmax(scores, dim=-1)
        attn_output = attn_weights @ v  # (B, num_heads, T, head_dim)

        attn_output = attn_output.transpose(1, 2).reshape(B, T, D)
        return self.out_proj(attn_output)


class FeedForward(nn.Module):
    def __init__(self, embed_dim, ff_dim):
        super().__init__()
        self.linear1 = nn.Linear(embed_dim, ff_dim)
        self.relu = nn.ReLU()
        self.linear2 = nn.Linear(ff_dim, embed_dim)

    def forward(self, x):
        return self.linear2(self.relu(self.linear1(x)))


class TransformerEncoderLayer(nn.Module):
    def __init__(self, embed_dim, num_heads, ff_dim):
        super().__init__()
        self.attn = MultiHeadSelfAttention(embed_dim, num_heads)
        self.ffn = FeedForward(embed_dim, ff_dim)
        self.norm1 = nn.LayerNorm(embed_dim)
        self.norm2 = nn.LayerNorm(embed_dim)

    def forward(self, x):
        x = x + self.attn(self.norm1(x))
        x = x + self.ffn(self.norm2(x))
        return x


class TransformerModel(nn.Module):
    def __init__(self, vocab_size, embed_dim, num_heads, num_layers, ff_dim, output_dim):
        super().__init__()

        self.cls_token = nn.Parameter(torch.zeros(1, 1, embed_dim))

        self.embedding = nn.Embedding(vocab_size, embed_dim)
        self.pos_encoding = PositionalEncoding(embed_dim)
        self.layers = nn.ModuleList([
            TransformerEncoderLayer(embed_dim, num_heads, ff_dim)
            for _ in range(num_layers)
        ])
        self.output_proj = nn.Linear(embed_dim, output_dim)

    def forward(self, x):
        x = self.embedding(x)
        x = self.pos_encoding(x)

        # Prepend CLS token to the sequence
        cls_tokens = self.cls_token.expand(x.shape[0], -1, -1)
        x = torch.cat((cls_tokens, x), dim=1)

        for layer in self.layers:
            x = layer(x)

        return self.output_proj(x[:, 0])  # Use CLS token for classification


# Symmetry Detection Test
A synthetic task where the model must determine if a sequence is a mirror of itself. It's a good test for positional encoding. If everything is working correctly, you should see test accuracy above 90% after 100 epochs.

In [3]:
torch.manual_seed(42)

seq_length = 10
num_samples = 10000
vocab_size = seq_length  # tokens are 0..N-1

def create_mirror_data(num_samples, seq_length, vocab_size):
    half_len = seq_length // 2
    X = torch.randint(0, vocab_size, (num_samples, seq_length))
    y = torch.zeros(num_samples, dtype=torch.long)

    for i in range(num_samples):
        if torch.rand(1) > 0.5:
            # Create a mirror: [a, b, c, c, b, a]
            first_half = X[i, :half_len]
            X[i, seq_length-half_len:] = torch.flip(first_half, dims=[0])
            y[i] = 1 # Match
        else:
            # Keep random (usually not a mirror)
            y[i] = 0 # No match
    return X, y

X, y = create_mirror_data(num_samples, seq_length, vocab_size)
X_test, y_test = create_mirror_data(1000, seq_length, vocab_size)


embed_dim = 64
num_heads = 4
num_layers = 2
ff_dim = 128

model = TransformerModel(vocab_size, embed_dim, num_heads, num_layers, ff_dim, output_dim=2)

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=2e-4, weight_decay=1e-2)

epochs = 100
batch_size = 64
for epoch in range(epochs):
    avg_loss = 0.0
    for i in range(0, num_samples, batch_size):
        X_batch = X[i:i+batch_size]
        y_batch = y[i:i+batch_size]

        # Forward pass
        predictions = model(X_batch)
        loss = criterion(predictions, y_batch)
        avg_loss += loss.item()

        # Backward pass and optimization
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    if epoch == 0 or (epoch + 1) % 10 == 0:
        model.eval()
        with torch.no_grad():
            test_predictions = model(X_test)
            test_loss = criterion(test_predictions, y_test)
            _, predicted_classes = torch.max(test_predictions, 1)
            accuracy = (predicted_classes == y_test).float().mean().item()
        model.train()

        print(f"Epoch [{epoch + 1}/{epochs}], Avg Train Loss: {avg_loss/(num_samples//batch_size):.4f}, Test Loss: {test_loss.item():.4f}, Test Accuracy: {accuracy:.4f}")


Epoch [1/100], Avg Train Loss: 0.5794, Test Loss: 0.4546, Test Accuracy: 0.7660
Epoch [10/100], Avg Train Loss: 0.2077, Test Loss: 0.2483, Test Accuracy: 0.8790
Epoch [20/100], Avg Train Loss: 0.1738, Test Loss: 0.2336, Test Accuracy: 0.9050
Epoch [30/100], Avg Train Loss: 0.1447, Test Loss: 0.2191, Test Accuracy: 0.9130
Epoch [40/100], Avg Train Loss: 0.1183, Test Loss: 0.2238, Test Accuracy: 0.9060
Epoch [50/100], Avg Train Loss: 0.1072, Test Loss: 0.1811, Test Accuracy: 0.9290
Epoch [60/100], Avg Train Loss: 0.0881, Test Loss: 0.1907, Test Accuracy: 0.9330
Epoch [70/100], Avg Train Loss: 0.0774, Test Loss: 0.1928, Test Accuracy: 0.9320
Epoch [80/100], Avg Train Loss: 0.0735, Test Loss: 0.1798, Test Accuracy: 0.9410
Epoch [90/100], Avg Train Loss: 0.0692, Test Loss: 0.1692, Test Accuracy: 0.9450
Epoch [100/100], Avg Train Loss: 0.0730, Test Loss: 0.1823, Test Accuracy: 0.9370


# Sentiment Analysis Test
Real-world application of the Transformer model for sentiment analysis. Note: not a gret test for positional encoding, but a good sanity check for the overall model.

## Install required libraries
`pip install datasets transformers`

In [4]:
from datasets import load_dataset
from torch.utils.data import DataLoader
from transformers import AutoTokenizer

# 1. Load a small subset of the SST-2 dataset
dataset = load_dataset("glue", "sst2", split="train[:5000]")

# 2. Tokenization (Turning words into numbers)
tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")

def tokenize_function(examples):
    return tokenizer(examples["sentence"], padding="max_length", truncation=True, max_length=16)

tokenized_dataset = dataset.map(tokenize_function, batched=True)
tokenized_dataset.set_format(type='torch', columns=['input_ids', 'label'])

# 3. Create DataLoader
train_loader = DataLoader(tokenized_dataset, batch_size=32, shuffle=True)

In [5]:
# Updated Hyperparameters for Sentiment Analysis
vocab_size = tokenizer.vocab_size  # Usually ~30,522 for BERT
embed_dim = 32                     # Small embedding for speed
num_heads = 4
num_layers = 2
ff_dim = 128
output_dim = 2                     # 0 for Negative, 1 for Positive

# Initialize the model with the new vocab size
# Note: You'll need to update your TransformerModel class to include an nn.Embedding layer
model = TransformerModel(vocab_size, embed_dim, num_heads, num_layers, ff_dim, output_dim)

# Use CrossEntropyLoss for classification instead of MSELoss
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.0001)

In [6]:
# Training loop
model.train()
epochs = 50
for epoch in range(epochs):
    avg_loss = 0.0
    for batch in train_loader:
        X = batch['input_ids']  # Shape: (batch_size, seq_length)
        y = batch['label']      # Shape: (batch_size,)

        # Forward pass
        predictions = model(X)
        loss = criterion(predictions, y)
        avg_loss += loss.item()

        # Backward pass and optimization
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    if (epoch + 1) % 5 == 0:
        print(f"Epoch [{epoch + 1}/{epochs}], Avg Loss: {avg_loss/len(train_loader):.4f}")

Epoch [5/50], Avg Loss: 0.6579
Epoch [10/50], Avg Loss: 0.5995
Epoch [15/50], Avg Loss: 0.5352
Epoch [20/50], Avg Loss: 0.4578
Epoch [25/50], Avg Loss: 0.3902
Epoch [30/50], Avg Loss: 0.3233
Epoch [35/50], Avg Loss: 0.2642
Epoch [40/50], Avg Loss: 0.1950
Epoch [45/50], Avg Loss: 0.1495
Epoch [50/50], Avg Loss: 0.1063


In [8]:
# 1. Put model in evaluation mode (disables dropout/batchnorm)
model.eval()

test_sequences = [
    "This was the worst film I have ever seen.",
    "I absolutely loved this movie!"
]

# 2. Tokenize (ensure return_tensors="pt" for PyTorch)
tokenized_test = tokenizer(test_sequences, padding="max_length", truncation=True, max_length=16, return_tensors="pt")
X_test = tokenized_test['input_ids']  # Shape: (2, seq_length)
with torch.no_grad():
    logits = model(X_test)
    probabilities = torch.softmax(logits, dim=-1)
    predictions = torch.argmax(probabilities, dim=-1)

print(f"Raw Logits: {logits.tolist()}")
print(f"Probabilities: {probabilities.tolist()}")
print(f"Predicted Classes (0 negative, 1 positive): {predictions.tolist()}") # [1, 0] (hopefully!)

Raw Logits: [[0.25765424966812134, -0.5338520407676697], [-0.4944272041320801, 0.17682135105133057]]
Probabilities: [[0.6881546378135681, 0.3118453025817871], [0.3382173180580139, 0.6617826819419861]]
Predicted Classes (0 negative, 1 positive): [0, 1]
