<a href="https://colab.research.google.com/github/ankita2002/LLMS/blob/main/HW3.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Homework 3: Transformers, T5, Machine Translation

This homework guides you through implementing a complete T5 (Text-To-Text Transfer Transformer) encoder-decoder architecture from scratch for machine translation. You will build the encoder stack, decoder stack with cross-attention, and train the model on English-German translation.

**Total Points: 20**

**Instructions:**
1. Complete all tasks in this notebook
2. Ensure your code runs without errors
3. Submit both this notebook and any additional files created
4. Write clear explanations for your approach
5. Train your model on the English-German translation dataset

## Setup

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import matplotlib.pyplot as plt
import math
from typing import Optional, Tuple, List, Dict

# Set random seeds
torch.manual_seed(42)
np.random.seed(42)

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f'Using device: {device}')

## Dataset Preparation

We'll use the WMT English-German translation dataset. This dataset contains parallel sentences in English and German, perfect for training a sequence-to-sequence translation model.

In [None]:
!pip install -q datasets

In [None]:
import os
from collections import Counter
from datasets import load_dataset
import tqdm

print("Initializing WMT Dataset Loader (No backups, real data)...")

# 1. Load the Real WMT Dataset
# We use WMT16 (German-English), a standard benchmark.
# We take the first 50,000 examples to keep RAM usage efficient while ensuring variety.
# If you want the full dataset (4.5M pairs), remove the [:50000] slice.
dataset = load_dataset("wmt16", "de-en", split="train[:50000]")

print(f"Successfully loaded {len(dataset)} pairs from WMT16.")

# 2. Extract pairs into memory
translation_pairs = []
for item in dataset:
    # WMT data structure is usually {'translation': {'de': '...', 'en': '...'}}
    en_text = item['translation']['en']
    de_text = item['translation']['de']
    translation_pairs.append((en_text, de_text))

# 3. Build Vocabulary (Character-level)
# We stick to character-level to match your previous logic.
# Note: For SOTA results on WMT, you would usually use BPE (Byte Pair Encoding),
# but character-level is robust for learning deep learning fundamentals.
print("Building vocabulary from dataset...")

counter = Counter()
for en, de in translation_pairs:
    counter.update(en) # Keep case sensitivity? standard is usually to keep it for WMT
    counter.update(de)

# Filter rare characters to keep vocab clean (optional, but good for real web data)
MIN_FREQ = 5
common_chars = [char for char, count in counter.items() if count >= MIN_FREQ]

# Add special tokens
special_tokens = ['<pad>', '<sos>', '<eos>', '<unk>']
chars = special_tokens + sorted(common_chars)
vocab_size = len(chars)

print(f'Shared vocabulary size: {vocab_size}')

# 4. Mappings
stoi = {ch: i for i, ch in enumerate(chars)}
itos = {i: ch for i, ch in enumerate(chars)}

PAD_IDX = stoi['<pad>']
SOS_IDX = stoi['<sos>']
EOS_IDX = stoi['<eos>']
UNK_IDX = stoi['<unk>']

# 5. Encoding/Decoding Functions
def encode(s):
    """Encode string to token IDs (Character level)."""
    # We do NOT lower() here to preserve information, unlike the toy example
    return [stoi.get(c, UNK_IDX) for c in s]

def decode(l):
    """Decode token IDs to string."""
    return ''.join([itos[i] for i in l if i != PAD_IDX])

print(f"Special tokens: PAD={PAD_IDX}, SOS={SOS_IDX}, EOS={EOS_IDX}, UNK={UNK_IDX}")

# 6. Train/Val Split (80/20)
n_train = int(0.8 * len(translation_pairs))
train_pairs = translation_pairs[:n_train]
val_pairs = translation_pairs[n_train:]

print("-" * 40)
print(f"Train pairs: {len(train_pairs)}")
print(f"Val pairs:   {len(val_pairs)}")
print("-" * 40)
print("Sample Data (Real WMT):")
print(f"  EN: {train_pairs[5][0]}") # Random index to show real sentences
print(f"  DE: {train_pairs[5][1]}")
print(f"  Encoded: {encode(train_pairs[5][0])}")

## TASK 1: T5 Encoder Implementation (6 points)

Build the encoder stack that processes source sequences with bidirectional attention. The encoder is similar to BERT but designed for the T5 architecture.

**1.1**: *Encoder Block* — Implement T5 encoder block with bidirectional self-attention, feed-forward network, layer normalization, and residual connections

**1.2**: *Encoder Stack* — Stack multiple encoder blocks with positional encoding and token embeddings

**1.3**: *Encoder Testing* — Test encoder implementation and verify bidirectional attention works correctly


In [None]:
# Helper function: Positional Encoding
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super().__init__()
        position = torch.arange(max_len).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2) * -(math.log(10000.0) / d_model))

        pe = torch.zeros(max_len, d_model)
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)

        self.register_buffer('pe', pe)

    def forward(self, x):
        # x shape: (batch_size, seq_len, d_model)
        return x + self.pe[:x.size(1), :].unsqueeze(0)

In [None]:
class FeedForward(nn.Module):
    def __init__(self, n_embd, hidden_size, dropout=0.1):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(n_embd, hidden_size),
            nn.GELU(),
            nn.Dropout(dropout),
            nn.Linear(hidden_size, n_embd),
            nn.Dropout(dropout),
        )
    def forward(self, x): return self.net(x)

In [None]:
# Helper function: Scaled Dot-Product Attention
def scaled_dot_product_attention(query, key, value, mask=None, dropout=None):
    """
    Compute scaled dot-product attention.

    Args:
        query: [batch_size, seq_len, d_k] or [batch_size, n_heads, seq_len, d_k]
        key: [batch_size, seq_len, d_k] or [batch_size, n_heads, seq_len, d_k]
        value: [batch_size, seq_len, d_v] or [batch_size, n_heads, seq_len, d_v]
        mask: [batch_size, seq_len, seq_len] or None (0s for masked positions)
        dropout: Dropout layer or None
    Returns:
        output: [batch_size, seq_len, d_v] or [batch_size, n_heads, seq_len, d_v]
        attention_weights: [batch_size, seq_len, seq_len] or [batch_size, n_heads, seq_len, seq_len]
    """
    d_k = query.size(-1)

    # Compute attention scores: Q @ K^T / sqrt(d_k)
    scores = torch.matmul(query, key.transpose(-2, -1)) / math.sqrt(d_k)

    # Apply mask if provided (set masked positions to -inf before softmax)
    if mask is not None:
        scores = scores.masked_fill(mask == 0, -1e9)

    # Apply softmax to get attention weights
    attention_weights = F.softmax(scores, dim=-1)

    # Apply dropout if provided
    if dropout is not None:
        attention_weights = dropout(attention_weights)

    # Multiply attention weights by values
    output = torch.matmul(attention_weights, value)

    return output, attention_weights

In [None]:
class MultiHeadAttention(nn.Module):
    def __init__(self, n_embd, n_head, dropout=0.1):
        super().__init__()
        assert n_embd % n_head == 0
        self.n_head = n_head
        self.head_dim = n_embd // n_head
        self.scale = self.head_dim ** -0.5
        self.query = nn.Linear(n_embd, n_embd)
        self.key = nn.Linear(n_embd, n_embd)
        self.value = nn.Linear(n_embd, n_embd)
        self.proj = nn.Linear(n_embd, n_embd)
        self.dropout = nn.Dropout(dropout)

    def forward(self, q, k, v, mask=None):
        B, T, C = q.shape
        q = self.query(q).view(B, -1, self.n_head, self.head_dim).transpose(1, 2)
        k = self.key(k).view(B, -1, self.n_head, self.head_dim).transpose(1, 2)
        v = self.value(v).view(B, -1, self.n_head, self.head_dim).transpose(1, 2)
        scores = (q @ k.transpose(-2, -1)) * self.scale
        if mask is not None:
            scores = scores.masked_fill(mask == 0, float('-inf'))
        attn = self.dropout(F.softmax(scores, dim=-1))
        out = (attn @ v).transpose(1, 2).contiguous().view(B, -1, C)
        return self.proj(out), attn

### Task 1.1 Implement T5 encoder block

Implement a T5 encoder block with:
- Bidirectional multi-head self-attention (all positions can attend to all positions)
- Feed-forward network
- Layer normalization (pre-norm architecture: normalize before sub-layers)
- Residual connections around each sub-layer

The encoder block should follow this structure:
1. Self-attention: `x = x + dropout(attention(layer_norm(x)))`
2. Feed-forward: `x = x + dropout(ffn(layer_norm(x)))`

In [None]:
class T5EncoderBlock(nn.Module):
    """T5 Encoder Block with bidirectional self-attention."""
    def __init__(self, d_model, n_heads, d_ff, dropout=0.1):
        super().__init__()
        ## YOUR_CODE_STARTS_HERE
        # TODO: Initialize:
        # - Multi-head self-attention (bidirectional, no masking)
        # - Feed-forward network
        # - Two layer normalization layers (one for attention, one for FFN)
        # - Dropout layer

        ## YOUR_CODE_ENDS_HERE

    def forward(self, x, mask=None):
        ## YOUR_CODE_STARTS_HERE
        # TODO: Implement encoder block with residual connections (pre-norm)
        # 1. Self-attention: x = x + dropout(attention(layer_norm(x)))
        # 2. Feed-forward: x = x + dropout(ffn(layer_norm(x)))
        # Note: Apply layer norm BEFORE the sub-layer (pre-norm architecture)

        ## YOUR_CODE_ENDS_HERE

        return x

In [None]:
# Test your implementation
encoder_block = T5EncoderBlock(d_model=512, n_heads=8, d_ff=2048)
x = torch.randn(2, 10, 512)
output = encoder_block(x)
print(f"Encoder block output shape: {output.shape}")
## RESULT_CHECKING_POINT -> torch.Size([2, 10, 512])

### Task 1.2 Build encoder stack


In [None]:
class T5Encoder(nn.Module):
    """T5 Encoder: Stack of encoder blocks with embeddings and positional encoding."""
    def __init__(self, vocab_size, d_model, n_heads, n_layers, d_ff, max_len=5000, dropout=0.1):
        super().__init__()
        ## YOUR_CODE_STARTS_HERE
        # TODO: Initialize:
        # - Token embedding layer
        # - Positional encoding
        # - Stack of encoder blocks (n_layers)
        # - Final layer normalization (optional, but common in T5)

        ## YOUR_CODE_ENDS_HERE

    def forward(self, x, mask=None):
        ## YOUR_CODE_STARTS_HERE
        # TODO: Implement encoder forward pass
        # 1. Token embeddings
        # 2. Add positional encoding
        # 3. Apply dropout
        # 4. Pass through encoder blocks
        # 5. Apply final layer norm

        ## YOUR_CODE_ENDS_HERE

        return x

# Test your implementation
encoder = T5Encoder(vocab_size=1000, d_model=512, n_heads=8, n_layers=6, d_ff=2048)
x = torch.randint(0, 1000, (2, 10))
output = encoder(x)
print(f"Encoder output shape: {output.shape}")
## RESULT_CHECKING_POINT -> torch.Size([2, 10, 512])

### Task 1.3 Test encoder implementation

In [None]:
def test_encoder(encoder, sample_text, max_len=50):
    """Test encoder and visualize attention patterns."""
    encoder.eval()

    # Encode text → ensure tensors go to the correct device
    encoded = torch.tensor(encode(sample_text[:max_len]), device=device).unsqueeze(0)

    # Forward pass
    with torch.no_grad():
        output = encoder(encoded)

    print(f"Input text: {sample_text[:max_len]}")
    print(f"Input shape: {encoded.shape}")
    print(f"Encoder output shape: {output.shape}")
    print(f"Encoder output mean: {output.mean().item():.4f}")
    print(f"Encoder output std: {output.std().item():.4f}")

    return output

## TASK 2: T5 Decoder with Cross-Attention (7 points)

Build the decoder stack with causal self-attention and cross-attention to encoder outputs. The decoder uses masked self-attention (causal) for autoregressive generation and cross-attention to attend to the encoder's output.

**2.1**: *Cross-Attention Mechanism* — Implement cross-attention where queries come from decoder states, and keys/values come from encoder outputs

**2.2**: *T5 Decoder Block* — Implement decoder block with masked self-attention, cross-attention, and feed-forward network

**2.3**: *Decoder Stack* — Stack multiple decoder blocks with positional encoding and token embeddings

### Task 2.1 Implement cross-attention mechanism

In [None]:
class CrossAttentionHead(nn.Module):
    """Single head of cross-attention (decoder attends to encoder)."""
    def __init__(self, head_size, n_embd, dropout):
        super().__init__()
        ## YOUR_CODE_STARTS_HERE
        # Query comes from decoder, Key and Value come from encoder

        ## YOUR_CODE_ENDS_HERE

    def forward(self, decoder_states, encoder_states):
        ## YOUR_CODE_STARTS_HERE

        ## YOUR_CODE_ENDS_HERE
        return out

In [None]:
class CrossMultiHeadAttention(nn.Module):
    """Multi-head cross-attention for T5."""
    def __init__(self, n_head, head_size, n_embd, dropout):
        super().__init__()
        ## YOUR_CODE_STARTS_HERE

        ## YOUR_CODE_ENDS_HERE

    def forward(self, decoder_states, encoder_states):
        ## YOUR_CODE_STARTS_HERE

        ## YOUR_CODE_ENDS_HERE
        return out

In [None]:
# Test your implementation
cross_attn = CrossMultiHeadAttention(n_head=8, head_size=64, n_embd=512, dropout=0.1)
decoder_states = torch.randn(2, 10, 512)  # (batch, decoder_seq_len, d_model)
encoder_states = torch.randn(2, 15, 512)  # (batch, encoder_seq_len, d_model)
output = cross_attn(decoder_states, encoder_states)
print(f"Cross-attention output shape: {output.shape}")
## RESULT_CHECKING_POINT -> torch.Size([2, 10, 512])

### Task 2.2 Implement T5 decoder block

Implement the decoder block with:
- Masked self-attention (causal, for autoregressive generation)
- Cross-attention (decoder attends to encoder)
- Feed-forward network
- Three layer normalization layers (pre-norm)
- Residual connections

In [None]:
def generate_causal_mask(seq_len):
    return torch.tril(torch.ones(seq_len, seq_len))

In [None]:
class T5DecoderBlock(nn.Module):
    def __init__(self, n_embd, n_head, block_size, dropout):
        super().__init__()
        head_size = n_embd // n_head
        ## YOUR_CODE_STARTS_HERE

        ## YOUR_CODE_ENDS_HERE

    def forward(self, x, encoder_output):
        ## YOUR_CODE_STARTS_HERE

        ## YOUR_CODE_ENDS_HERE
        return x

### Task 2.3 Build decoder stack


In [None]:
class T5Decoder(nn.Module):
    def __init__(self, vocab_size, d_model, n_heads, n_layers, d_ff, block_size, max_len, dropout):
        super().__init__()
        ## YOUR_CODE_STARTS_HERE

        ## YOUR_CODE_ENDS_HERE

    def forward(self, x, encoder_output):
        ## YOUR_CODE_STARTS_HERE

        ## YOUR_CODE_ENDS_HERE

In [None]:
print("Testing T5DecoderBlock...")
decoder_block = T5DecoderBlock(n_embd=512, n_head=8, block_size=128, dropout=0.1)
x = torch.randn(2, 10, 512)
encoder_output = torch.randn(2, 15, 512)
output = decoder_block(x, encoder_output)
print(f"Decoder block output shape: {output.shape}")

## TASK 3: Full T5 Model and Machine Translation (7 points)

Combine encoder and decoder into a complete T5 model and train it on English-German machine translation.

**3.1**: *Complete T5 Model* — Combine encoder and decoder stacks with shared token embeddings and language modeling head

**3.2**: *Translation Dataset Preparation* — Prepare English-German translation dataset with proper batching and padding

**3.3**: *Training and Testing* — Train model on translation task and evaluate performance

### Task 3.1 Build complete T5 model


In [None]:
class T5Model(nn.Module):
    def __init__(self, vocab_size, d_model, n_heads, n_layers, d_ff, block_size, max_len=5000, dropout=0.1):
        super().__init__()
        ## YOUR_CODE_STARTS_HERE

        ## YOUR_CODE_ENDS_HERE

    def forward(self, src, tgt):
        ## YOUR_CODE_STARTS_HERE

        ## YOUR_CODE_ENDS_HERE
        return logits

### Task 3.2 Prepare translation dataset


In [None]:
def get_translation_batch(split, batch_size=4):
    """Get a batch of English-German translation pairs."""
    pairs = train_pairs if split == 'train' else val_pairs

    # Random sampling
    indices = torch.randint(0, len(pairs), (batch_size,))

    src_batch = []
    tgt_batch = []

    for idx in indices:
        en_text, de_text = pairs[idx]
        src_ids = encode(en_text)
        # German: Add SOS at start, EOS at end
        tgt_ids = [SOS_IDX] + encode(de_text) + [EOS_IDX]

        src_batch.append(torch.tensor(src_ids, dtype=torch.long))
        tgt_batch.append(torch.tensor(tgt_ids, dtype=torch.long))

    # Pad
    src_batch = torch.nn.utils.rnn.pad_sequence(src_batch, batch_first=True, padding_value=PAD_IDX)
    tgt_batch = torch.nn.utils.rnn.pad_sequence(tgt_batch, batch_first=True, padding_value=PAD_IDX)

    # Teacher forcing split
    tgt_input = tgt_batch[:, :-1] # Input: SOS ... Token
    tgt_output = tgt_batch[:, 1:] # Target: Token ... EOS

    return src_batch.to(device), tgt_input.to(device), tgt_output.to(device)

In [None]:
BLOCK_SIZE = 512 # Increased from 64 to 512 to handle sentences > 64 chars
BATCH_SIZE = 16

model = T5Model(
    vocab_size=vocab_size,
    d_model=128,
    n_heads=4,
    n_layers=2,
    d_ff=512,
    block_size=BLOCK_SIZE
).to(device)

print(f"Model initialized with block_size={BLOCK_SIZE}")

optimizer = torch.optim.AdamW(model.parameters(), lr=1e-3)
criterion = nn.CrossEntropyLoss(ignore_index=PAD_IDX)

# Updated Batch Function with Truncation
def get_safe_batch(split, batch_size=4, max_len=BLOCK_SIZE):
    src_batch, tgt_in, tgt_out = get_translation_batch(split, batch_size)

    # Truncate if they exceed block_size
    if src_batch.size(1) > max_len:
        src_batch = src_batch[:, :max_len]
    if tgt_in.size(1) > max_len:
        tgt_in = tgt_in[:, :max_len]
        tgt_out = tgt_out[:, :max_len] # Output must match input length

    return src_batch, tgt_in, tgt_out

print("Starting training...")
model.train()

### Task 3.3 Train and test translation model


In [None]:
PRINT_EVERY = 50

for epoch in range(5):   # or more epochs later
    for step in range(len(train_pairs)//BATCH_SIZE):
        src, tgt_in, tgt_out = get_safe_batch('train', batch_size=BATCH_SIZE)

        logits = model(src, tgt_in)
        loss = criterion(logits.reshape(-1, vocab_size), tgt_out.reshape(-1))

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # Print every few iterations
        if step % PRINT_EVERY == 0:
            print(f"Epoch {epoch} | Step {step}: Loss = {loss.item():.4f}")

print("✓ Training completed successfully!")


In [None]:
# Set model to evaluation mode (disables dropout)
model.eval()

def translate_sentence(sentence, model, max_length=100):
    """
    Translates an English sentence to German using the trained T5Model.
    Uses greedy decoding.
    """
    # 1. Prepare Source (English)
    # Encode and add batch dimension [1, seq_len]
    src_ids = encode(sentence)
    src_tensor = torch.tensor(src_ids, dtype=torch.long).unsqueeze(0).to(device)

    # 2. Encoder Pass
    # We encode the source sequence once
    with torch.no_grad():
        encoder_output = model.encoder(src_tensor)

    # 3. Decoder Loop (Autoregressive)
    # Start with the Start-Of-Sequence token
    decoder_input = torch.tensor([[SOS_IDX]], dtype=torch.long).to(device)

    generated_tokens = []

    for _ in range(max_length):
        with torch.no_grad():
            # Pass current sequence and encoder output to decoder
            # decoder_input shape: [1, current_seq_len]
            decoder_output = model.decoder(decoder_input, encoder_output)

            # Project to vocabulary
            logits = model.lm_head(decoder_output)

            # Get the token with highest probability for the last position
            # logits shape: [1, current_seq_len, vocab_size]
            next_token_logits = logits[:, -1, :]
            next_token_id = torch.argmax(next_token_logits, dim=-1).item()

        # Stop if End-Of-Sequence token is generated
        if next_token_id == EOS_IDX:
            break

        generated_tokens.append(next_token_id)

        # Append prediction to decoder input for next step
        next_token_tensor = torch.tensor([[next_token_id]], dtype=torch.long).to(device)
        decoder_input = torch.cat([decoder_input, next_token_tensor], dim=1)

    # 4. Decode to string
    translated_text = decode(generated_tokens)
    return translated_text

# --- Test Translations ---
print("--- Translation Demo ---")
test_sentences = [
    "Hello",
    "Good morning",
    "I love you",
    "Where is the bathroom?",
    "This is difficult"
]

for s in test_sentences:
    trans = translate_sentence(s, model)
    print(f"EN: {s:25} -> DE: {trans}")

# Try a custom one
custom = "I am hungry"
print(f"\nCustom: {custom} -> {translate_sentence(custom, model)}")

# BONUS (2 points): Solve any task with an LLM

**Goal.**  
Pick **one** of the homework tasks (Task 1, Task 2, or Task 3) and solve it using an **LLM**. Provide the **LLM** with the **task description** and any **starter/prerequisite code** it depends on, ask it to generate a complete **code solution** first, then run that **generated code** here in the code notebook yourself. Finally, document what you did and **compare** the LLM’s result to your own pipeline.

**What to deliver below.**
1) **LLM used** (name + version, e.g., “Llama-3-8B-Instruct”, “GPT-x”, “Claude-x”, “Mistral-x”, etc.).  
2) **Prompt(s)** you used.  
3) **LLM output** — copy and paste the generated code.  
4) **Comparison** to your solution: what matches or differs (quantitative or qualitative).  
5) **Reflection**: what the LLM was **good at** vs **bad at**, what it got **right** vs **wrong**.

> **No code required.** You do **not** need to run, share, or submit any code used for the LLM generation. Provide only the deliverables listed above.
> You may use any LLMs through any interface (API, web UI, local inference).
