# Implement Transformer in PyTorch

## Problem Statement

Your task is to **implement a Transformer model**, including the **encoder and decoder architecture**, in PyTorch. This model follows the design outlined in the **Attention Is All You Need** paper (Vaswani et al., 2017).

You will build a **Transformer-based sequence-to-sequence model** and apply it to a **machine translation task** using the **Multi30k dataset** (English-German sentence pairs).

---

## 📌 Background

The **Transformer model** consists of:
1. **Encoder**:
   - Embeds the input sequence.
   - Applies multiple layers of **self-attention** and **feed-forward networks**.
   
2. **Decoder**:
   - Takes the encoder output.
   - Uses **masked self-attention** to process its own input.
   - Applies **encoder-decoder attention** to attend to encoder outputs.
   - Passes through a feed-forward network.

3. **Final Linear & Softmax Layer**:
   - Produces word probabilities for translation.

### **Mathematical Formulation**
A single **Multi-Head Attention** layer is defined as:

$${Attention}(Q, K, V) = \text{softmax} \left(\frac{QK^T}{\sqrt{d_k}}\right)$$

where:
- \( Q, K, V \) are Query, Key, and Value matrices.
- \( d_k \) is the hidden size of queries/keys (used for scaling).

The **multi-head attention** mechanism applies multiple attention layers in parallel:

$${MultiHead}(Q, K, V) = \text{Concat}(\text{head}_1, \dots, \text{head}_h) W^O$$

where:
- Each **head** independently computes attention using different learnable projections.

---

## **Task Requirements**

### 1️⃣ Implement a **Transformer Encoder**
- Implement:
  - **Token embeddings** and **positional encodings**.
  - **Multi-head self-attention**.
  - **Feed-forward network** with layer normalization.

### 2️⃣ Implement a **Transformer Decoder**
- Implement:
  - **Masked multi-head attention** to prevent peeking.
  - **Encoder-decoder attention** to attend to encoder outputs.

### 3️⃣ Implement a **Transformer block**
- with position encoding

---

## **Constraints**
- The input tensors should be shaped as:

$$X \in \mathbb{R}^{(B, T)}$$

where:
- **\(B\)** is the batch size.
- **\(T\)** is the sequence length.

- The Transformer model should follow:
  - **Embedding dimension**: \(d_{model} = 512\)
  - **Number of heads**: \(h = 8\)
  - **Feed-forward dimension**: \(d_{ff} = 2048\)
  - **Layers**: 6 Encoder + 6 Decoder

---

## **💡 Hints**
1. **Use PyTorch's `nn.MultiheadAttention`**:
   - Example: `self.attn = nn.MultiheadAttention(embed_dim=512, num_heads=8)`

2. **Implement Positional Encoding**:
   - Use sinusoidal functions to encode word positions.

3. **Apply Layer Normalization & Residual Connections**:
   - `nn.LayerNorm(d_model)`

4. **Mask Future Tokens in the Decoder**:
   - Use `torch.triu(torch.ones(T, T), diagonal=1).bool()`

---

## **📌 Example Implementation**



In [28]:
import torch
import torch.nn as nn
import math
from torch.utils.data import Dataset, DataLoader

########################################
# 1) Transformer Encoder Block
########################################

# Keep original comment
class TransformerEncoderLayer(nn.Module):
    def __init__(self, d_model=512, num_heads=8, d_ff=2048, dropout=0.1):
        super().__init__()
        # define attention layer, ffn, layer norm layers, and dropout
        self.attention_layer = nn.MultiheadAttention(embed_dim=d_model, num_heads=num_heads)
        self.feed_forward_layer = nn.Sequential(
            nn.Linear(d_model, d_ff),
            nn.ReLU(),  # Fix nn.Relu() -> nn.ReLU()
            nn.Linear(d_ff, d_model)
        )
        # We'll name these consistently
        self.layer_norm_1 = nn.LayerNorm(d_model)
        self.layer_norm_2 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, src, src_mask=None):
        # 1. multi-head self attention
        attn_output, _ = self.attention_layer(src, src, src, attn_mask=src_mask)
        # 2. add + norm
        output = self.layer_norm_1(src + self.dropout(attn_output))
        # 3. feed forward
        ffn_output = self.feed_forward_layer(output)
        # 4. add + norm
        output = self.layer_norm_2(output + self.dropout(ffn_output))
        return output

########################################
# 2) Transformer Decoder Block
########################################

# Keep original comment
class TransformerDecoderLayer(nn.Module):
    def __init__(self, d_model=512, num_heads=8, d_ff=2048, dropout=0.1):
        super().__init__()
        # define attention layer, cross attention layer, ffn, layer norm layers, and dropout
        self.self_attn_layer = nn.MultiheadAttention(embed_dim=d_model, num_heads=num_heads)
        self.cross_attn_layer = nn.MultiheadAttention(embed_dim=d_model, num_heads=num_heads)
        self.feed_forward_layer = nn.Sequential(
            nn.Linear(d_model, d_ff),
            nn.ReLU(),  # Fix nn.Relu() -> nn.ReLU()
            nn.Linear(d_ff, d_model)
        )
        self.layer_norm = nn.LayerNorm(d_model)
        self.layer_norm_2 = nn.LayerNorm(d_model)
        self.layer_norm_3 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, tgt, memory, tgt_mask=None, memory_mask=None):
        # 1. multi-head self attention
        self_attn_output, _ = self.self_attn_layer(tgt, tgt, tgt, attn_mask=tgt_mask)
        # 2. add + norm
        tgt = self.layer_norm(tgt + self.dropout(self_attn_output))
        # 3. multi-head cross attention
        cross_attn_output, _ = self.cross_attn_layer(tgt, memory, memory, attn_mask=memory_mask)
        # 4. add + norm
        output = self.layer_norm_2(tgt + self.dropout(cross_attn_output))
        # 5. feed forward
        ffn_output = self.feed_forward_layer(output)
        # 6. add + norm
        output = self.layer_norm_3(output + self.dropout(ffn_output))
        return output

########################################
# Positional Encoding
########################################

class PositionalEncoding(nn.Module):
    def __init__(self, seq_len, d_model):
        super().__init__()
        self.register_buffer("pe", torch.zeros(1, seq_len, d_model))
        position = torch.arange(0, seq_len).unsqueeze(1).float()
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        self.pe[0, :, 0::2] = torch.sin(position * div_term)
        self.pe[0, :, 1::2] = torch.cos(position * div_term)

    def forward(self, x):
        return x + self.pe[:, :x.size(1)].to(x.device)

########################################
# 3) Implement Transformer Model
########################################

# Keep original comment
class Transformer(nn.Module):
    def __init__(self, vocab_size, d_model=512, num_heads=8, d_ff=2048, num_layers=6):
        super().__init__()
        # 1. embedding layer
        self.embedding = nn.Embedding(vocab_size, d_model)
        # 2. positional encoder
        self.pe = PositionalEncoding(seq_len=5000, d_model=d_model)  # arbitrary large seq_len
        # 3. encoder
        self.encoder = nn.ModuleList([
            TransformerEncoderLayer(d_model, num_heads, d_ff) for _ in range(num_layers)
        ])
        # 4. decoder
        self.decoder = nn.ModuleList([
            TransformerDecoderLayer(d_model, num_heads, d_ff) for _ in range(num_layers)
        ])
        # 5. output layer
        self.fc_out = nn.Linear(d_model, vocab_size)  # fix to nn.Linear

    def forward(self, src, tgt):
        # decoder
        src = self.embedding(src)
        src = self.pe(src)
        for encoder_layer in self.encoder:
            src = encoder_layer(src)

        # encoder
        tgt = self.embedding(tgt)
        tgt = self.pe(tgt)
        for decoder_layer in self.decoder:
            tgt = decoder_layer(tgt, src)

        return self.fc_out(tgt)
        



In [29]:
def test_transformer():
    batch_size = 16
    seq_len = 10
    vocab_size = 100
    model = Transformer(vocab_size=vocab_size, d_model=32, num_heads=4, d_ff=64, num_layers=2)

    # Create random integer tensors for src and tgt
    src = torch.randint(0, vocab_size, (batch_size, seq_len))  # (B, T)
    tgt = torch.randint(0, vocab_size, (batch_size, seq_len))

    # Pass them through the model
    out = model(src, tgt)  # (B, T, vocab_size)
    print("Output shape:", out.shape)  # Expect (16, 10, 100)

    # The model is basically untrained, so we just ensure it runs.

test_transformer()

Output shape: torch.Size([16, 10, 100])


In [30]:
# Training Transformer Model with Toy Dataset
def train_transformer_with_toy_dataset():
    vocab_size = 10000
    d_model = 512
    seq_len = 20
    batch_size = 16
    num_heads = 8
    num_layers = 6
    d_ff = 2048
    max_len = 5000
    num_epochs = 50
    learning_rate = 0.001
    
    model = Transformer(vocab_size, d_model, num_heads, d_ff, num_layers)
    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
    criterion = nn.CrossEntropyLoss()
    
    # Initialize Toy Dataset and DataLoader
    class ToySequenceDataset(Dataset):
        def __init__(self, num_samples=500, sequence_length=20, vocab_size=10000):
            self.sequence_length = sequence_length
            self.vocab_size = vocab_size
            self.data = torch.randint(0, vocab_size, (num_samples, sequence_length))

        def __len__(self):
            return len(self.data)

        def __getitem__(self, idx):
            return self.data[idx]

    toy_dataset = ToySequenceDataset(num_samples=500)
    train_loader = DataLoader(toy_dataset, batch_size=batch_size, shuffle=True)

    # Training Loop
    for epoch in range(num_epochs):
        model.train()
        total_loss = 0
        for sequences in train_loader:
            src, tgt = sequences, sequences
            optimizer.zero_grad()
            output = model(src, tgt)
            loss = criterion(output.view(-1, vocab_size), tgt.view(-1))
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
        
        if epoch % 10 == 0:
            print(f"Epoch [{epoch + 1}/{num_epochs}], Loss: {total_loss / len(train_loader):.4f}")

    # Testing the trained model
    model.eval()
    with torch.no_grad():
        test_sequences = next(iter(train_loader))
        test_outputs = model(test_sequences, test_sequences)
        print("Transformer Output Shape:", test_outputs.shape)  # Expected: (batch_size, seq_len, vocab_size)

train_transformer_with_toy_dataset()

Epoch [1/50], Loss: 9.4341
Epoch [11/50], Loss: 8.7746
Epoch [21/50], Loss: 8.7186
Epoch [31/50], Loss: 8.7036
Epoch [41/50], Loss: 8.6840
Transformer Output Shape: torch.Size([16, 20, 10000])
