## Transformer architecture from attention is all you need paper

In [62]:
#Step1:Positional Encoding implementation

import torch
import torch.nn as nn
import math

class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super(PositionalEncoding, self).__init__()
        # create a matrix of shape (max_len, d_model)
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1) # shape (0, max_len) ----> shape (max_len, 1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model)) # captures the positional relationships 

        # fill the positional encoding matrix with sine for even  and consine for odd indices
        pe[:, 0::2] = torch.sin(position * div_term) 
        pe[:, 1::2] = torch.cos(position * div_term)

        # add batch dimension for broadcasting
        pe = pe.unsqueeze(0) # shape (1, max_len, d_model)
        
        # Register as buffer so it's saved with the model but not considered a parameter
        self.register_buffer('pe', pe)

    def forward(self, x):
        # x shape (batch_size, seq_len, d_model)
        # get the positional encoding for the input sequence length
        return x + self.pe[:, :x.size(1), :].to(x.device) # add positional encoding to the input

In [63]:
# step2: scaled dot product attention
def sclaed_dot_product_attention(q, k, v, mask=None):
    # q, k, v shape (batch_size, head, seq_len, depth = d_k = d_model/num_heads)
    d_k = q.size(-1) # get the last dimention size, which is the depth of the each head
    scores = torch.matmul(q, k.transpose(-2, -1)) /math.sqrt(d_k) # calculate the attention scores
    
    if mask is not None:
        scores = scores.masked_fill(mask == 0, float('-inf')) # apply the mask to the scores
    
    atten = torch.softmax(scores, dim = -1) # apply softmax to the scores
    output = torch.matmul(atten, v) # calculate the output
    return output # output shape (batch_size, seq_len, d_model)

In [64]:
# Step3: Multi-head attention imlementation

class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, num_heads):
        super().__init__()
        assert d_model % num_heads == 0, "d_model must be divisible by num_heads"
        self.num_heads = num_heads
        self.d_k = d_model // num_heads
        
        # projection matrices for Q, K and V and output
        self.q_linear = nn.Linear(d_model, d_model)
        self.k_linear = nn.Linear(d_model, d_model)
        self.v_linear = nn.Linear(d_model, d_model)
        self.out_linear = nn.Linear(d_model, d_model)

    def forward(self, q, k, v, mask=None):
        # q, k, v shape (batch_size, seq_len, d_model)
        batch_size = q.size(0)

        # linear projection and reshape into (batch_size, num_heads, seq_len, d_k)
        q = self.q_linear(q).view(batch_size, -1, self.num_heads, self.d_k).transpose(1,2) # shape (batch_size, num_heads, seq_len, d_k) # calulates Q=x⋅WQ+bQ and perform reshape
        k = self.k_linear(k).view(batch_size, -1, self.num_heads, self.d_k).transpose(1,2)
        v = self.v_linear(v).view(batch_size, -1, self.num_heads, self.d_k).transpose(1,2)

        # apply scaled dot product attention
        attn = sclaed_dot_product_attention(q, k, v, mask)

        # concatenate heads and project back
        attn = attn.transpose(1, 2).contiguous().view(batch_size, -1, self.num_heads * self.d_k)
        output = self.out_linear(attn) # shape (batch_size, seq_len, d_model)
        return output

In [65]:
# Step4: Position-wise feed-forward network implementation
# FFN is shared accross all positions, but applied individually to each token. This is called position-wise feed-forward network.
import torch.nn.functional as F

class FeedForward(nn.Module): # input and output dimension are same but is transformed by a linear layer/ It helps transform the attended info non-linearly before going to the next layer
    def __init__(self, d_model, d_ff):
        super().__init__()
        self.linear1 = nn.Linear(d_model, d_ff) # expand
        self.linear2 = nn.Linear(d_ff, d_model)

    def forward(self, x):
        return self.linear2(F.relu(self.linear1(x))) # apply ReLU (adds non-linearity) activation and then linear transformation
    

In [66]:
# Example usage
batch_size = 2
seq_len = 3
d_model = 4
d_ff = 8  # Hidden layer size (usually 4×d_model in real Transformer)

# Sample input (2 sequences of 3 tokens, each with embedding size 4)
x = torch.tensor([
    [[1.0, 2.0, 3.0, 4.0],
     [4.0, 3.0, 2.0, 1.0],
     [0.0, 1.0, 0.0, 1.0]],
     
    [[1.0, 1.0, 1.0, 1.0],
     [2.0, 2.0, 2.0, 2.0],
     [3.0, 3.0, 3.0, 3.0]],
])

# Initialize and apply FFN
ffn = FeedForward(d_model=d_model, d_ff=d_ff)
output = ffn(x)

print("Output shape:", output.shape)
print("First token transformed:", output[0, 0])


Output shape: torch.Size([2, 3, 4])
First token transformed: tensor([ 0.3409, -1.7106,  0.0177, -0.8115], grad_fn=<SelectBackward0>)


In [67]:
# Step5: Trasformer encoder layer implementaion
# here multihead attention - learn dependencies between tokens in the sequence
# residual connections - help gradients flow through the network(prevent vanishing gradients problems)
# normalization - stabilize the training process and improve convergence ( nomalize features for stable learning)
# feed-forward network - apply non-linear transformations to the attended information
class EncoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout=0.1):
        super().__init__()
        self.self_attn = MultiHeadAttention(d_model, num_heads)  # Multi-head attention
        self.ffn = FeedForward(d_model, d_ff)  # Position-wise feed-forward network
        self.norm1 = nn.LayerNorm(d_model)  # Layer normalization
        self.norm2 = nn.LayerNorm(d_model)
        self.dropout1 = nn.Dropout(dropout)  # Dropout for attention
        self.dropout2 = nn.Dropout(dropout)  # Dropout for feed-forward network

    def forward(self, x, mask=None):
        # 1. self-attention + residual connection + normalization
        attn_output = self.self_attn(x, x, x, mask)
        x =  self.norm1(x + self.dropout1(attn_output)) # residual connection and normalization

        # 2. FFN + residual connection + normalization
        ffn_output = self.ffn(x) # FFN output
        x = x + self.dropout2(ffn_output) # residual connection
        x = self.norm2(x) # normalization 

        return x  # Output shape (batch_size, seq_len, d_model)

## EncoderLayer Block Diagram

```
Input: x (batch_size, seq_len, d_model)
                    ↓
    ┌─────────────────────────────────────────┐
    │         MULTI-HEAD ATTENTION            │
    │  Query = x, Key = x, Value = x          │
    │  attn_output = self_attn(x, x, x, mask) │
    └─────────────────────────────────────────┘
                    ↓
              [Dropout1 Applied]
                    ↓
    ┌─────────────────────────────────────────┐
    │         RESIDUAL + LAYER NORM           │
    │    x = norm1(x + dropout1(attn_output)) │
    └─────────────────────────────────────────┘
                    ↓
    ┌─────────────────────────────────────────┐
    │         FEED FORWARD NETWORK            │
    │      ffn_output = ffn(x)                │
    │  (Linear → ReLU → Linear transformation)│
    └─────────────────────────────────────────┘
                    ↓
              [Dropout2 Applied]
                    ↓
    ┌─────────────────────────────────────────┐
    │         RESIDUAL + LAYER NORM           │
    │    x = norm2(x + dropout2(ffn_output))  │
    └─────────────────────────────────────────┘
                    ↓
Output: x (batch_size, seq_len, d_model)
```

### Component Breakdown:

| Component | Purpose | Implementation Details |
|-----------|---------|----------------------|
| **Multi-Head Attention** | Learn token dependencies | `self_attn(x, x, x, mask)` - Self-attention with Q=K=V=x |
| **Dropout1** | Regularization | `dropout1(attn_output)` - Prevent overfitting |
| **Residual + LayerNorm** | Gradient flow & stability | `norm1(x + dropout1(attn_output))` - Add input to output |
| **Feed Forward** | Non-linear transformation | `ffn(x)` - Linear → ReLU → Linear layers |
| **Dropout2** | Regularization | `dropout2(ffn_output)` - Prevent overfitting |
| **Residual + LayerNorm** | Gradient flow & stability | `norm2(x + dropout2(ffn_output))` - Add input to output |

### Key Features:
- **Two Sub-layers**: Self-attention + Feed-forward network
- **Two Residual Connections**: Help gradients flow through deep networks
- **Two Layer Normalizations**: Stabilize training and improve convergence
- **Two Dropout Layers**: Regularization to prevent overfitting
- **Consistent Shape**: Input and output have same dimensions

In [68]:
# Step6: Full Encoder block implementation

class TransformerEncoder(nn.Module):
    def __init__(self, vocab_size, d_model, num_heads, d_ff, num_layers, max_len=5000, dropout=0.1):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, d_model)  # token embeddings
        self.pos_encoder = PositionalEncoding(d_model, max_len)  # positional encodings
        self.layers = nn.ModuleList([
            EncoderLayer(d_model, num_heads, d_ff, dropout)
            for _ in range(num_layers)
        ])
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, mask=None):
        # x: input token IDs → shape (batch_size, seq_len)
        x = self.embedding(x) * math.sqrt(x.size(-1))  # shape: (batch, seq_len, d_model)
        x = self.pos_encoder(x)
        x = self.dropout(x)

        for layer in self.layers:
            x = layer(x, mask)

        return x  # shape: (batch_size, seq_len, d_model)



## Transformer Decoder Architecture Flow

```
Input (shifted target embeddings)
              ↓
┌─────────────────────────────────┐
│   Masked Multi-Head Self-Attn  │
└─────────────────────────────────┘
              ↓
     + Residual → LayerNorm
              ↓
┌─────────────────────────────────┐
│   Encoder-Decoder Cross-Attn   │
└─────────────────────────────────┘
              ↓
     + Residual → LayerNorm
              ↓
┌─────────────────────────────────┐
│         Feed Forward            │
└─────────────────────────────────┘
              ↓
     + Residual → LayerNorm
              ↓
            Output
```

**Alternative Compact Version:**

| Step | Component | Operation |
|------|-----------|-----------|
| 1 | **Input** | Shifted target embeddings |
| 2 | **Self-Attention** | Masked Multi-Head Self-Attention |
| 3 | **Residual** | Add & LayerNorm |
| 4 | **Cross-Attention** | Encoder-Decoder Cross-Attention |
| 5 | **Residual** | Add & LayerNorm |
| 6 | **Feed Forward** | Position-wise Feed Forward |
| 7 | **Residual** | Add & LayerNorm |
| 8 | **Output** | Final decoder output |


In [69]:
# Step7: Transformer Decoder Layer Implementation

class DecoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout=0.1):
        super().__init__()
        self.self_attn = MultiHeadAttention(d_model, num_heads)
        self.cross_attn = MultiHeadAttention(d_model, num_heads)
        self.ffn = FeedForward(d_model, d_ff)

        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.norm3 = nn.LayerNorm(d_model)

        self.dropout1 = nn.Dropout(dropout)
        self.dropout2 = nn.Dropout(dropout)
        self.dropout3 = nn.Dropout(dropout)

    def forward(self, x, enc_output, tgt_mask=None, memory_mask=None):
        # Step 1: Masked self-attention
        _x = self.self_attn(x, x, x, tgt_mask)
        x = self.norm1(x + self.dropout1(_x))

        # Step 2: Cross-attention with encoder output as key/value
        _x = self.cross_attn(x, enc_output, enc_output, memory_mask)
        x = self.norm2(x + self.dropout2(_x))

        # Step 3: Feed-forward
        _x = self.ffn(x)
        x = self.norm3(x + self.dropout3(_x))

        return x


In [70]:
# Step8: Full Transformer Decoder Stack Implementation
class TransformerDecoder(nn.Module):
    def __init__(self, vocab_size, d_model, num_heads, d_ff, num_layers, max_len=5000, dropout=0.1):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, d_model)
        self.pos_encoder = PositionalEncoding(d_model, max_len)
        self.layers = nn.ModuleList([
            DecoderLayer(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)
        ])
        self.dropout = nn.Dropout(dropout)

    def forward(self, tgt, memory, tgt_mask=None, memory_mask=None):
        # tgt shape: (batch_size, tgt_seq_len)
        x = self.embedding(tgt) * math.sqrt(tgt.size(-1))
        x = self.pos_encoder(x)
        x = self.dropout(x)

        for layer in self.layers:
            x = layer(x, memory, tgt_mask, memory_mask)

        return x  # shape: (batch_size, tgt_seq_len, d_model)


In [71]:
# Step9: Full Transformer Model (Encoder + Decoder)
class Transformer(nn.Module):
    def __init__(self, src_vocab_size, tgt_vocab_size, d_model=512, num_heads=8, d_ff=2048, 
                 num_layers=6, max_len=5000, dropout=0.1):
        super().__init__()
        self.encoder = TransformerEncoder(src_vocab_size, d_model, num_heads, d_ff, num_layers, max_len, dropout)
        self.decoder = TransformerDecoder(tgt_vocab_size, d_model, num_heads, d_ff, num_layers, max_len, dropout)
        self.output_layer = nn.Linear(d_model, tgt_vocab_size)

    def forward(self, src, tgt, src_mask=None, tgt_mask=None, memory_mask=None):
        memory = self.encoder(src, src_mask)              # Encoder output
        dec_output = self.decoder(tgt, memory, tgt_mask, memory_mask)  # Decoder output
        logits = self.output_layer(dec_output)            # Final token logits
        return logits  # shape: (batch_size, tgt_seq_len, tgt_vocab_size)


In [72]:
# Step 10: Training and testing the Transformer model

# Manual toy dataset
raw_data = [
    ("I am a student", "Ich bin ein Schüler"),
    ("He is a teacher", "Er ist ein Lehrer"),
    ("She is reading a book", "Sie liest ein Buch"),
    ("They are playing", "Sie spielen"),
    ("We love learning", "Wir lieben das Lernen"),
]

def tokenize(sentence):
    return sentence.lower().split()

specials = ['<pad>', '<sos>', '<eos>', '<unk>']

# Build vocab
def build_vocab(sentences):
    tokens = set()
    for s in sentences:
        tokens.update(tokenize(s))
    return {tok: i for i, tok in enumerate(specials + sorted(tokens))}

# Tokenize and vocab
src_vocab = build_vocab([src for src, _ in raw_data])
tgt_vocab = build_vocab([tgt for _, tgt in raw_data])
inv_tgt_vocab = {v: k for k, v in tgt_vocab.items()}

# Encode sentences using the vocab
# Add <sos> at the beginning and <eos> at the end of each
# sentence, and replace unknown tokens with <unk>
# Also, pad sequences to a fixed length
# with <pad> token
# The <sos> and <eos> tokens are used to indicate the start and end of a sequence, respectively.
# The <unk> token is used for unknown words that are not in the vocabulary.
# The <pad> token is used to pad sequences to a fixed length
# The vocabulary is built from the sentences, and each token is assigned a unique index.
# The encode function converts a sentence into a sequence of indices based on the vocabulary.
# The pad function pads the sequence to a fixed length with the <pad> token.
# The src_vocab and tgt_vocab dictionaries map tokens to their corresponding indices.
def encode(sentence, vocab):
    return [vocab['<sos>']] + [vocab.get(tok, vocab['<unk>']) for tok in tokenize(sentence)] + [vocab['<eos>']]

def pad(seq, max_len, pad_id):
    return seq + [pad_id] * (max_len - len(seq))

src_encoded = []
tgt_encoded = []

max_src_len = 10
max_tgt_len = 12

for src, tgt in raw_data:
    src_seq = pad(encode(src, src_vocab), max_src_len, src_vocab['<pad>'])
    tgt_seq = pad(encode(tgt, tgt_vocab), max_tgt_len, tgt_vocab['<pad>'])
    src_encoded.append(src_seq)
    tgt_encoded.append(tgt_seq)

import torch
src_batch = torch.tensor(src_encoded)
tgt_batch = torch.tensor(tgt_encoded)

# For training
tgt_input = tgt_batch[:, :-1]
tgt_output = tgt_batch[:, 1:]

In [73]:
# Defining model, Loss, and Optimizer 
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

# Use CPU to avoid CUDA device interface issues
device = torch.device("cpu")  # Force CPU for stable execution

vocab_size_src = len(src_vocab)
vocab_size_tgt = len(tgt_vocab)

# Initialize model
model = Transformer(
    src_vocab_size=vocab_size_src,
    tgt_vocab_size=vocab_size_tgt,
    d_model=512,
    num_heads=8,
    d_ff=2048,
    num_layers=2,
).to(device)

criterion = nn.CrossEntropyLoss(ignore_index=tgt_vocab['<pad>'])
optimizer = optim.Adam(model.parameters(), lr=0.01)

# Move data to device
src_batch = src_batch.to(device)
tgt_input = tgt_input.to(device)
tgt_output = tgt_output.to(device)

In [74]:
#Training Loop 

num_epochs = 30

for epoch in range(num_epochs):
    model.train()
    optimizer.zero_grad()

    # Forward pass
    logits = model(src_batch, tgt_input)  # (batch, tgt_len-1, tgt_vocab_size)
    loss = criterion(logits.view(-1, vocab_size_tgt), tgt_output.reshape(-1))

    # Backward + update
    loss.backward()
    optimizer.step()

    print(f"Epoch {epoch+1}, Loss: {loss.item():.4f}")


Epoch 1, Loss: 3.1701
Epoch 2, Loss: 6.1985
Epoch 3, Loss: 4.4693
Epoch 4, Loss: 3.9004
Epoch 5, Loss: 4.5901
Epoch 6, Loss: 4.6560
Epoch 7, Loss: 3.7179
Epoch 8, Loss: 3.8246
Epoch 9, Loss: 3.3376
Epoch 10, Loss: 3.0011
Epoch 11, Loss: 2.9379
Epoch 12, Loss: 3.1550
Epoch 13, Loss: 2.9331
Epoch 14, Loss: 2.7485
Epoch 15, Loss: 2.8254
Epoch 16, Loss: 2.8021
Epoch 17, Loss: 2.8220
Epoch 18, Loss: 2.9300
Epoch 19, Loss: 2.8761
Epoch 20, Loss: 2.6689
Epoch 21, Loss: 2.7213
Epoch 22, Loss: 2.7654
Epoch 23, Loss: 2.6871
Epoch 24, Loss: 2.6781
Epoch 25, Loss: 2.6909
Epoch 26, Loss: 2.6994
Epoch 27, Loss: 2.6649
Epoch 28, Loss: 2.6430
Epoch 29, Loss: 2.6289
Epoch 30, Loss: 2.6103


In [75]:
# Greedy Decoding Function
def greedy_decode(model, src_seq, max_len=12):
    model.eval()
    src = torch.tensor([src_seq], dtype=torch.long).to(device)
    memory = model.encoder(src)

    ys = torch.tensor([[tgt_vocab['<sos>']]], dtype=torch.long).to(device)

    for _ in range(max_len):
        out = model.decoder(ys, memory)
        logits = model.output_layer(out[:, -1:])
        next_token = torch.argmax(logits, dim=-1)[:, -1].unsqueeze(1)

        ys = torch.cat([ys, next_token], dim=1)
        if next_token.item() == tgt_vocab['<eos>']:
            break
    return ys.squeeze().tolist()


In [76]:
# Testing a translation
def decode_tokens(token_ids, inv_vocab):
    return [inv_vocab[i] for i in token_ids if i not in [tgt_vocab['<pad>'], tgt_vocab['<sos>'], tgt_vocab['<eos>']]]

# Try translating the first sentence
src_sentence = "I am a student"
src_seq = pad(encode(src_sentence, src_vocab), max_src_len, src_vocab['<pad>'])

predicted_ids = greedy_decode(model, src_seq)
translated_tokens = decode_tokens(predicted_ids, inv_tgt_vocab)

print("Input:", src_sentence)
print("Translation:", " ".join(translated_tokens))


Input: I am a student
Translation: 


In [80]:
# Loading and Preprocessing data
from torchtext.datasets import Multi30k
from torchtext.data.utils import get_tokenizer
from torchtext.vocab import build_vocab_from_iterator

import spacy

# Load spaCy tokenizers
spacy_de = spacy.load("de_core_news_sm")
spacy_en = spacy.load("en_core_web_sm")

def tokenize_de(text):
    return [tok.text.lower() for tok in spacy_de.tokenizer(text)]

def tokenize_en(text):
    return [tok.text.lower() for tok in spacy_en.tokenizer(text)]

In [82]:
# Build vacab and prepare subset 
from torch.utils.data import DataLoader

# Load full dataset, we’ll only use a few samples
train_iter = Multi30k(split='train')

# Tokenize and extract few samples
raw_data = []
for i, (de, en) in enumerate(train_iter):
    if i >= 10:  # small subset
        break
    raw_data.append((tokenize_en(en), tokenize_de(de)))

# Build vocab
def yield_tokens(data, tokenizer):
    for src, tgt in data:
        yield src
        yield tgt

vocab = build_vocab_from_iterator(yield_tokens(raw_data, None),
                                   specials=['<pad>', '<sos>', '<eos>', '<unk>'],
                                   special_first=True)
vocab.set_default_index(vocab['<unk>'])

print("Vocab size:", len(vocab))

# Encode sequences
def encode(tokens):
    return [vocab['<sos>']] + [vocab[tok] for tok in tokens] + [vocab['<eos>']]


✅ Created 15 translation pairs
✅ Tokenized all pairs
✅ Vocab size: 105
Test English: ['a', 'man', 'sits', 'on', 'a', 'bench'] → [1, 5, 22, 23, 10, 5, 24, 2]
Test German: ['ein', 'mann', 'sitzt', 'auf', 'einer', 'bank'] → [1, 11, 25, 26, 12, 27, 28, 2]

🎉 SUCCESS: Multi30k-style dataset ready without torchtext!


📊 PREPARING DATASET FOR TRAINING
✅ Source batch shape: torch.Size([15, 15])
✅ Target input shape: torch.Size([15, 14])
✅ Target output shape: torch.Size([15, 14])
✅ Dataset prepared with 15 samples
