# Simple Sequence-to-Sequence Model - Step by Step

This notebook provides a simplified, easy-to-understand implementation of a seq2seq model for translation.

**What we'll build:** A German → English translator

**Architecture:** Encoder-Decoder with RNN (GRU cells)

## Table of Contents
1. [Setup & Installation](#setup)
2. [Understanding Seq2Seq Models](#understanding)
3. [Data Preparation](#data)
4. [Building the Encoder](#encoder)
5. [Building the Decoder](#decoder)
6. [Complete Seq2Seq Model](#model)
7. [Training](#training)
8. [Translation & Evaluation](#evaluation)

---
## Step 1: Setup & Installation <a id='setup'></a>

In [None]:
# Install required packages (run once)
!python -m spacy download en_core_web_sm
!python -m spacy download de_core_news_sm

In [None]:
# Import all libraries
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torch.nn.utils.rnn import pad_sequence

import spacy
import random
import math
from collections import Counter
from torchtext.datasets import Multi30k

# Set random seed for reproducibility
SEED = 42
random.seed(SEED)
torch.manual_seed(SEED)

# Check if GPU is available
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

---
## Step 2: Understanding Seq2Seq Models <a id='understanding'></a>

### What is a Sequence-to-Sequence Model?

A seq2seq model transforms one sequence (input) into another sequence (output).

**Architecture:**
```
Input Sentence → [ENCODER] → Context Vector → [DECODER] → Output Sentence
```

**Example:**
- Input: "Hallo Welt" (German)
- Encoder: Reads and compresses the meaning
- Context: Fixed-size representation of the input
- Decoder: Generates "Hello World" (English)

**Key Components:**
1. **Encoder**: Reads input sequence and creates a context vector
2. **Decoder**: Uses context to generate output sequence
3. **Hidden State**: Carries information through the network

---
## Step 3: Data Preparation <a id='data'></a>

We'll use the Multi30k dataset (German-English sentence pairs)

In [None]:
# Load tokenizers for German and English
spacy_de = spacy.load('de_core_news_sm')
spacy_en = spacy.load('en_core_web_sm')

# Tokenizer functions
def tokenize_de(text):
    """Convert German text to list of tokens"""
    return [tok.text for tok in spacy_de.tokenizer(text)]

def tokenize_en(text):
    """Convert English text to list of tokens"""
    return [tok.text for tok in spacy_en.tokenizer(text)]

# Test tokenizers
print("German:", tokenize_de("Hallo Welt"))
print("English:", tokenize_en("Hello World"))

In [None]:
# Step 3.1: Load the dataset
print("Loading Multi30k dataset...")
train_data = list(Multi30k(split='train', language_pair=('de', 'en')))
valid_data = list(Multi30k(split='valid', language_pair=('de', 'en')))
test_data = list(Multi30k(split='test', language_pair=('de', 'en')))

print(f"Train size: {len(train_data)}")
print(f"Valid size: {len(valid_data)}")
print(f"Test size: {len(test_data)}")

# Show example
print("\nExample pair:")
print(f"German: {train_data[0][0]}")
print(f"English: {train_data[0][1]}")

In [None]:
# Step 3.2: Build vocabularies
class Vocabulary:
    def __init__(self, freq_threshold=2):
        # Special tokens
        self.itos = {0: "<pad>", 1: "<sos>", 2: "<eos>", 3: "<unk>"}
        self.stoi = {"<pad>": 0, "<sos>": 1, "<eos>": 2, "<unk>": 3}
        self.freq_threshold = freq_threshold
    
    def build_vocabulary(self, sentence_list, tokenizer):
        """Build vocabulary from list of sentences"""
        frequencies = Counter()
        idx = 4  # Start after special tokens
        
        # Count word frequencies
        for sentence in sentence_list:
            for word in tokenizer(sentence):
                frequencies[word] += 1
        
        # Add words that appear more than threshold
        for word, count in frequencies.items():
            if count >= self.freq_threshold:
                self.stoi[word] = idx
                self.itos[idx] = word
                idx += 1
    
    def numericalize(self, text, tokenizer):
        """Convert text to list of indices"""
        tokenized = tokenizer(text)
        return [self.stoi.get(token, self.stoi["<unk>"]) for token in tokenized]
    
    def __len__(self):
        return len(self.itos)

# Create vocabularies
print("Building vocabularies...")
german_vocab = Vocabulary(freq_threshold=2)
english_vocab = Vocabulary(freq_threshold=2)

# Build from training data
german_sentences = [pair[0] for pair in train_data]
english_sentences = [pair[1] for pair in train_data]

german_vocab.build_vocabulary(german_sentences, tokenize_de)
english_vocab.build_vocabulary(english_sentences, tokenize_en)

print(f"German vocabulary size: {len(german_vocab)}")
print(f"English vocabulary size: {len(english_vocab)}")

In [None]:
# Step 3.3: Create DataLoader
class TranslationDataset:
    def __init__(self, data, src_vocab, trg_vocab, src_tokenizer, trg_tokenizer):
        self.data = data
        self.src_vocab = src_vocab
        self.trg_vocab = trg_vocab
        self.src_tokenizer = src_tokenizer
        self.trg_tokenizer = trg_tokenizer
    
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, idx):
        src_text, trg_text = self.data[idx]
        
        # Convert to indices
        src_indices = [self.src_vocab.stoi["<sos>"]] + \
                     self.src_vocab.numericalize(src_text, self.src_tokenizer) + \
                     [self.src_vocab.stoi["<eos>"]]
        
        trg_indices = [self.trg_vocab.stoi["<sos>"]] + \
                     self.trg_vocab.numericalize(trg_text, self.trg_tokenizer) + \
                     [self.trg_vocab.stoi["<eos>"]]
        
        return torch.tensor(src_indices), torch.tensor(trg_indices)

def collate_fn(batch):
    """Pad sequences in a batch to same length"""
    src_batch, trg_batch = zip(*batch)
    src_batch = pad_sequence(src_batch, padding_value=0)  # pad_idx = 0
    trg_batch = pad_sequence(trg_batch, padding_value=0)
    return src_batch, trg_batch

# Create datasets
train_dataset = TranslationDataset(train_data, german_vocab, english_vocab, 
                                  tokenize_de, tokenize_en)
valid_dataset = TranslationDataset(valid_data, german_vocab, english_vocab, 
                                  tokenize_de, tokenize_en)

# Create data loaders
BATCH_SIZE = 128
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, 
                         shuffle=True, collate_fn=collate_fn)
valid_loader = DataLoader(valid_dataset, batch_size=BATCH_SIZE, 
                         shuffle=False, collate_fn=collate_fn)

print(f"Number of training batches: {len(train_loader)}")
print(f"Number of validation batches: {len(valid_loader)}")

---
## Step 4: Building the Encoder <a id='encoder'></a>

**What does the Encoder do?**
- Takes input sentence (as word indices)
- Converts words to embeddings
- Processes them through RNN (GRU)
- Outputs: final hidden state (context vector)

In [None]:
class Encoder(nn.Module):
    def __init__(self, input_size, embedding_size, hidden_size, num_layers, dropout):
        """
        Args:
            input_size: Size of source vocabulary
            embedding_size: Dimension of word embeddings
            hidden_size: Dimension of hidden state
            num_layers: Number of RNN layers
            dropout: Dropout probability
        """
        super(Encoder, self).__init__()
        
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        
        # Embedding layer: converts word indices to dense vectors
        self.embedding = nn.Embedding(input_size, embedding_size)
        
        # GRU layer: processes sequence
        self.rnn = nn.GRU(embedding_size, hidden_size, num_layers, 
                         dropout=dropout if num_layers > 1 else 0)
        
        # Dropout for regularization
        self.dropout = nn.Dropout(dropout)
    
    def forward(self, x):
        """
        Args:
            x: Input tensor of shape (seq_len, batch_size)
        
        Returns:
            hidden: Final hidden state (num_layers, batch_size, hidden_size)
        """
        # x shape: (seq_len, batch_size)
        
        # Step 1: Convert indices to embeddings
        embedding = self.dropout(self.embedding(x))
        # embedding shape: (seq_len, batch_size, embedding_size)
        
        # Step 2: Pass through RNN
        outputs, hidden = self.rnn(embedding)
        # outputs shape: (seq_len, batch_size, hidden_size)
        # hidden shape: (num_layers, batch_size, hidden_size)
        
        return hidden

# Test the encoder
encoder = Encoder(
    input_size=len(german_vocab),
    embedding_size=256,
    hidden_size=512,
    num_layers=2,
    dropout=0.5
).to(device)

print("Encoder created successfully!")
print(f"Total parameters: {sum(p.numel() for p in encoder.parameters())}")

---
## Step 5: Building the Decoder <a id='decoder'></a>

**What does the Decoder do?**
- Takes: previous word + hidden state from encoder
- Predicts: next word in output sequence
- Repeats until `<eos>` token is generated

In [None]:
class Decoder(nn.Module):
    def __init__(self, output_size, embedding_size, hidden_size, num_layers, dropout):
        """
        Args:
            output_size: Size of target vocabulary
            embedding_size: Dimension of word embeddings
            hidden_size: Dimension of hidden state (must match encoder)
            num_layers: Number of RNN layers (must match encoder)
            dropout: Dropout probability
        """
        super(Decoder, self).__init__()
        
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.output_size = output_size
        
        # Embedding layer for target language
        self.embedding = nn.Embedding(output_size, embedding_size)
        
        # GRU layer
        self.rnn = nn.GRU(embedding_size, hidden_size, num_layers,
                         dropout=dropout if num_layers > 1 else 0)
        
        # Output layer: projects hidden state to vocabulary size
        self.fc = nn.Linear(hidden_size, output_size)
        
        self.dropout = nn.Dropout(dropout)
    
    def forward(self, x, hidden):
        """
        Args:
            x: Input tensor (1, batch_size) - one word at a time
            hidden: Hidden state from encoder or previous step
        
        Returns:
            prediction: Output logits (batch_size, output_size)
            hidden: Updated hidden state
        """
        # x shape: (1, batch_size)
        
        # Step 1: Get embedding
        embedding = self.dropout(self.embedding(x))
        # embedding shape: (1, batch_size, embedding_size)
        
        # Step 2: Pass through RNN
        output, hidden = self.rnn(embedding, hidden)
        # output shape: (1, batch_size, hidden_size)
        # hidden shape: (num_layers, batch_size, hidden_size)
        
        # Step 3: Project to vocabulary size
        prediction = self.fc(output.squeeze(0))
        # prediction shape: (batch_size, output_size)
        
        return prediction, hidden

# Test the decoder
decoder = Decoder(
    output_size=len(english_vocab),
    embedding_size=256,
    hidden_size=512,
    num_layers=2,
    dropout=0.5
).to(device)

print("Decoder created successfully!")
print(f"Total parameters: {sum(p.numel() for p in decoder.parameters())}")

---
## Step 6: Complete Seq2Seq Model <a id='model'></a>

**How it works:**
1. Encoder processes source sentence
2. Decoder generates target sentence word-by-word
3. Teacher forcing: sometimes use true target word during training

In [None]:
class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder, device):
        super(Seq2Seq, self).__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.device = device
    
    def forward(self, src, trg, teacher_forcing_ratio=0.5):
        """
        Args:
            src: Source sequence (src_len, batch_size)
            trg: Target sequence (trg_len, batch_size)
            teacher_forcing_ratio: Probability of using true target word
        
        Returns:
            outputs: Predictions (trg_len, batch_size, output_size)
        """
        batch_size = src.shape[1]
        trg_len = trg.shape[0]
        trg_vocab_size = self.decoder.output_size
        
        # Store decoder outputs
        outputs = torch.zeros(trg_len, batch_size, trg_vocab_size).to(self.device)
        
        # Step 1: Encode source sentence
        hidden = self.encoder(src)
        
        # Step 2: First input to decoder is <sos> token
        input = trg[0, :].unsqueeze(0)  # Shape: (1, batch_size)
        
        # Step 3: Generate output sequence word by word
        for t in range(1, trg_len):
            # Get prediction from decoder
            output, hidden = self.decoder(input, hidden)
            
            # Store prediction
            outputs[t] = output
            
            # Decide next input: true word or predicted word?
            teacher_force = random.random() < teacher_forcing_ratio
            top1 = output.argmax(1)
            
            # Next input is either true word or predicted word
            input = trg[t].unsqueeze(0) if teacher_force else top1.unsqueeze(0)
        
        return outputs

# Create the complete model
model = Seq2Seq(encoder, decoder, device).to(device)

print("\nSeq2Seq Model created!")
print(f"Total parameters: {sum(p.numel() for p in model.parameters()):,}")

---
## Step 7: Training <a id='training'></a>

**Training Process:**
1. Forward pass: Get predictions
2. Calculate loss: Compare predictions to true targets
3. Backward pass: Compute gradients
4. Update weights: Use optimizer

In [None]:
# Initialize optimizer and loss function
optimizer = optim.Adam(model.parameters(), lr=0.001)
criterion = nn.CrossEntropyLoss(ignore_index=0)  # Ignore padding

def train_epoch(model, iterator, optimizer, criterion, clip):
    """Train for one epoch"""
    model.train()
    epoch_loss = 0
    
    for i, (src, trg) in enumerate(iterator):
        src = src.to(device)
        trg = trg.to(device)
        
        # Zero gradients
        optimizer.zero_grad()
        
        # Forward pass
        output = model(src, trg)
        
        # Calculate loss
        output_dim = output.shape[-1]
        output = output[1:].view(-1, output_dim)  # Ignore <sos> token
        trg = trg[1:].view(-1)
        loss = criterion(output, trg)
        
        # Backward pass
        loss.backward()
        
        # Clip gradients to prevent exploding gradients
        torch.nn.utils.clip_grad_norm_(model.parameters(), clip)
        
        # Update weights
        optimizer.step()
        
        epoch_loss += loss.item()
    
    return epoch_loss / len(iterator)

def evaluate(model, iterator, criterion):
    """Evaluate the model"""
    model.eval()
    epoch_loss = 0
    
    with torch.no_grad():
        for i, (src, trg) in enumerate(iterator):
            src = src.to(device)
            trg = trg.to(device)
            
            # No teacher forcing during evaluation
            output = model(src, trg, teacher_forcing_ratio=0)
            
            # Calculate loss
            output_dim = output.shape[-1]
            output = output[1:].view(-1, output_dim)
            trg = trg[1:].view(-1)
            loss = criterion(output, trg)
            
            epoch_loss += loss.item()
    
    return epoch_loss / len(iterator)

print("Training functions defined!")

In [None]:
# Train the model
N_EPOCHS = 10
CLIP = 1
best_valid_loss = float('inf')

print("Starting training...\n")

for epoch in range(N_EPOCHS):
    train_loss = train_epoch(model, train_loader, optimizer, criterion, CLIP)
    valid_loss = evaluate(model, valid_loader, criterion)
    
    # Save best model
    if valid_loss < best_valid_loss:
        best_valid_loss = valid_loss
        torch.save(model.state_dict(), 'best_model.pt')
    
    # Calculate perplexity
    train_ppl = math.exp(train_loss)
    valid_ppl = math.exp(valid_loss)
    
    print(f'Epoch: {epoch+1:02}')
    print(f'\tTrain Loss: {train_loss:.3f} | Train PPL: {train_ppl:7.3f}')
    print(f'\tValid Loss: {valid_loss:.3f} | Valid PPL: {valid_ppl:7.3f}')

print("\nTraining complete!")

---
## Step 8: Translation & Evaluation <a id='evaluation'></a>

Now let's use our trained model to translate sentences!

In [None]:
def translate_sentence(model, sentence, src_vocab, trg_vocab, src_tokenizer, max_len=50):
    """
    Translate a German sentence to English
    
    Args:
        model: Trained seq2seq model
        sentence: German sentence (string)
        src_vocab: German vocabulary
        trg_vocab: English vocabulary
        src_tokenizer: German tokenizer
        max_len: Maximum length of translation
    
    Returns:
        translation: English sentence (string)
    """
    model.eval()
    
    # Tokenize and convert to indices
    tokens = [src_vocab.stoi["<sos>"]] + \
             src_vocab.numericalize(sentence, src_tokenizer) + \
             [src_vocab.stoi["<eos>"]]
    
    # Convert to tensor
    src_tensor = torch.LongTensor(tokens).unsqueeze(1).to(device)
    
    with torch.no_grad():
        # Encode source
        hidden = model.encoder(src_tensor)
        
        # Start with <sos> token
        trg_indices = [trg_vocab.stoi["<sos>"]]
        
        # Generate translation word by word
        for _ in range(max_len):
            trg_tensor = torch.LongTensor([trg_indices[-1]]).unsqueeze(0).to(device)
            
            # Get prediction
            output, hidden = model.decoder(trg_tensor, hidden)
            
            # Get most likely next word
            pred_token = output.argmax(1).item()
            trg_indices.append(pred_token)
            
            # Stop if <eos> token is generated
            if pred_token == trg_vocab.stoi["<eos>"]:
                break
    
    # Convert indices to words
    trg_tokens = [trg_vocab.itos[i] for i in trg_indices]
    
    # Remove special tokens and join
    translation = ' '.join(trg_tokens[1:-1])  # Remove <sos> and <eos>
    
    return translation

print("Translation function ready!")

In [None]:
# Load best model
model.load_state_dict(torch.load('best_model.pt'))

# Test translations
test_sentences = [
    "Ein Mann geht die Straße entlang.",
    "Eine Frau spielt mit einem Kind.",
    "Der Hund läuft im Park.",
    "Zwei Männer spielen Fußball."
]

print("=" * 70)
print("TRANSLATIONS")
print("=" * 70)

for sentence in test_sentences:
    translation = translate_sentence(
        model, sentence, german_vocab, english_vocab, tokenize_de
    )
    print(f"\nGerman:  {sentence}")
    print(f"English: {translation}")

### Calculate BLEU Score

In [None]:
from nltk.translate.bleu_score import sentence_bleu, corpus_bleu

def calculate_bleu(model, data, src_vocab, trg_vocab, src_tokenizer, trg_tokenizer, max_samples=100):
    """
    Calculate BLEU score on dataset
    """
    references = []
    hypotheses = []
    
    for i, (src, trg) in enumerate(data[:max_samples]):
        # Generate translation
        translation = translate_sentence(
            model, src, src_vocab, trg_vocab, src_tokenizer
        )
        
        # Tokenize reference and hypothesis
        ref = trg_tokenizer(trg)
        hyp = translation.split()
        
        references.append([ref])
        hypotheses.append(hyp)
    
    # Calculate corpus BLEU score
    bleu_score = corpus_bleu(references, hypotheses)
    return bleu_score

# Calculate BLEU score on test set
bleu = calculate_bleu(model, test_data, german_vocab, english_vocab, 
                     tokenize_de, tokenize_en, max_samples=100)
print(f"\nBLEU Score: {bleu:.4f}")

---
## Summary

**What we built:**
1. ✅ Encoder: Processes source sentence
2. ✅ Decoder: Generates target sentence
3. ✅ Seq2Seq Model: Combines both
4. ✅ Training Loop: Learns from data
5. ✅ Translation Function: Generates translations

**Key Concepts:**
- **Embedding**: Converts words to vectors
- **RNN/GRU**: Processes sequences
- **Hidden State**: Carries information
- **Teacher Forcing**: Training technique
- **Perplexity**: Measures model uncertainty
- **BLEU Score**: Measures translation quality

**Next Steps:**
- Try different hyperparameters
- Add attention mechanism
- Use LSTM instead of GRU
- Train on larger dataset

---
## Exercise: Translate Your Own Sentences!

Try translating your own German sentences:

In [None]:
# Your turn! Try translating these:
my_sentences = [
    "Guten Morgen!",
    "Wie geht es dir?",
    "Ich liebe Programmieren.",
    # Add your own sentences here!
]

for sentence in my_sentences:
    translation = translate_sentence(
        model, sentence, german_vocab, english_vocab, tokenize_de
    )
    print(f"German:  {sentence}")
    print(f"English: {translation}")
    print()