In [1]:
!pip install torch==2.3.1 torchtext==0.18.0
!python -m spacy download en_core_web_sm
!python -m spacy download de_core_news_sm
!pip install datasets

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torch.nn.utils.rnn import pad_sequence
from datasets import load_dataset
from torchtext.vocab import build_vocab_from_iterator
import spacy
import math
import random
import numpy as np

# Device settings (once again we must pick a GPU if avaliable)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Device: {device}")

# Stabilize Randomness
SEED = 1234
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
torch.cuda.manual_seed(SEED)
torch.backends.cudnn.deterministic = True

# Preperation of data and Tokenizer
spacy_de = spacy.load('de_core_news_sm')
spacy_en = spacy.load('en_core_web_sm')

def tokenize_de(text):
    return [tok.text for tok in spacy_de.tokenizer(text)] # Transformer has no inherent direction

def tokenize_en(text):
    return [tok.text for tok in spacy_en.tokenizer(text)]

dataset = load_dataset("bentrevett/multi30k")

def yield_tokens(data_iter, language):
    for example in data_iter:
        if language == 'de':
            yield tokenize_de(example['de'])
        else:
            yield tokenize_en(example['en'])

special_symbols = ['<unk>', '<pad>', '<sos>', '<eos>']
vocab_de = build_vocab_from_iterator(yield_tokens(dataset['train'], 'de'), min_freq=2, specials=special_symbols)
vocab_en = build_vocab_from_iterator(yield_tokens(dataset['train'], 'en'), min_freq=2, specials=special_symbols)

vocab_de.set_default_index(vocab_de['<unk>'])
vocab_en.set_default_index(vocab_en['<unk>'])

PAD_IDX = vocab_de['<pad>']
SOS_IDX = vocab_de['<sos>']
EOS_IDX = vocab_de['<eos>']
BATCH_SIZE = 32 # Transformer consumes too much memory

# Batch Preperation
def collate_batch(batch):
    src_list, tgt_list = [], []
    for item in batch:
        src_tensor = torch.tensor([SOS_IDX] + [vocab_de[token] for token in tokenize_de(item['de'])] + [EOS_IDX], dtype=torch.long)
        tgt_tensor = torch.tensor([SOS_IDX] + [vocab_en[token] for token in tokenize_en(item['en'])] + [EOS_IDX], dtype=torch.long)
        src_list.append(src_tensor)
        tgt_list.append(tgt_tensor)

    src_batch = pad_sequence(src_list, padding_value=PAD_IDX, batch_first=True) # Transformer batch_first=True diğerini de dene
    tgt_batch = pad_sequence(tgt_list, padding_value=PAD_IDX, batch_first=True)
    return src_batch.to(device), tgt_batch.to(device)

train_loader = DataLoader(dataset['train'], batch_size=BATCH_SIZE, shuffle=True, collate_fn=collate_batch)
val_loader = DataLoader(dataset['validation'], batch_size=BATCH_SIZE, shuffle=False, collate_fn=collate_batch)
test_loader = DataLoader(dataset['test'], batch_size=BATCH_SIZE, shuffle=False, collate_fn=collate_batch)

Collecting torch==2.3.1
  Downloading torch-2.3.1-cp312-cp312-manylinux1_x86_64.whl.metadata (26 kB)
Collecting torchtext==0.18.0
  Downloading torchtext-0.18.0-cp312-cp312-manylinux1_x86_64.whl.metadata (7.9 kB)
Collecting nvidia-cuda-nvrtc-cu12==12.1.105 (from torch==2.3.1)
  Downloading nvidia_cuda_nvrtc_cu12-12.1.105-py3-none-manylinux1_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-runtime-cu12==12.1.105 (from torch==2.3.1)
  Downloading nvidia_cuda_runtime_cu12-12.1.105-py3-none-manylinux1_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-cupti-cu12==12.1.105 (from torch==2.3.1)
  Downloading nvidia_cuda_cupti_cu12-12.1.105-py3-none-manylinux1_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cudnn-cu12==8.9.2.26 (from torch==2.3.1)
  Downloading nvidia_cudnn_cu12-8.9.2.26-py3-none-manylinux1_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cublas-cu12==12.1.3.1 (from torch==2.3.1)
  Downloading nvidia_cublas_cu12-12.1.3.1-py3-none-manylinux1_x86_64.whl.metadata (1.5 kB)
Collec



Device: cuda


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


README.md: 0.00B [00:00, ?B/s]

train.jsonl: 0.00B [00:00, ?B/s]

val.jsonl: 0.00B [00:00, ?B/s]

test.jsonl: 0.00B [00:00, ?B/s]

Generating train split:   0%|          | 0/29000 [00:00<?, ? examples/s]

Generating validation split:   0%|          | 0/1014 [00:00<?, ? examples/s]

Generating test split:   0%|          | 0/1000 [00:00<?, ? examples/s]

In [2]:
class PositionalEncoding(nn.Module):
    def __init__(self, emb_size: int, dropout: float, maxlen: int = 5000):
        super(PositionalEncoding, self).__init__()
        den = torch.exp(- torch.arange(0, emb_size, 2)* math.log(10000) / emb_size)
        pos = torch.arange(0, maxlen).reshape(maxlen, 1)
        pos_embedding = torch.zeros((maxlen, emb_size))
        pos_embedding[:, 0::2] = torch.sin(pos * den)
        pos_embedding[:, 1::2] = torch.cos(pos * den)
        pos_embedding = pos_embedding.unsqueeze(0)

        self.dropout = nn.Dropout(dropout)
        # save as buffer
        self.register_buffer('pos_embedding', pos_embedding)

    def forward(self, token_embedding):
        # Add position information to word vector
        return self.dropout(token_embedding + self.pos_embedding[:, :token_embedding.size(1), :])

class Seq2SeqTransformer(nn.Module):
    def __init__(self, num_encoder_layers, num_decoder_layers, emb_size, nhead, src_vocab_size, tgt_vocab_size, dim_feedforward, dropout=0.1):
        super(Seq2SeqTransformer, self).__init__()

        # Embedding
        self.src_tok_emb = nn.Embedding(src_vocab_size, emb_size)
        self.tgt_tok_emb = nn.Embedding(tgt_vocab_size, emb_size)

        # Position information
        self.positional_encoding = PositionalEncoding(emb_size, dropout=dropout)

        # Main Transformer block (Encoder + Decoder)
        self.transformer = nn.Transformer(d_model=emb_size,
                                          nhead=nhead,
                                          num_encoder_layers=num_encoder_layers,
                                          num_decoder_layers=num_decoder_layers,
                                          dim_feedforward=dim_feedforward,
                                          dropout=dropout,
                                          batch_first=True) # Data in (Batch, Seq) format

        # Output layer (Word prediction)
        self.generator = nn.Linear(emb_size, tgt_vocab_size)

    def forward(self, src, trg, src_mask, tgt_mask, src_padding_mask, tgt_padding_mask, memory_key_padding_mask):
        # Embedding + Positional Encoding
        src_emb = self.positional_encoding(self.src_tok_emb(src))
        tgt_emb = self.positional_encoding(self.tgt_tok_emb(trg))

        # Flow through Transformer
        outs = self.transformer(src_emb, tgt_emb, src_mask, tgt_mask, None,
                                src_padding_mask, tgt_padding_mask, memory_key_padding_mask)

        return self.generator(outs)

    def encode(self, src, src_mask):
        return self.transformer.encoder(self.positional_encoding(self.src_tok_emb(src)), src_mask)

    def decode(self, tgt, memory, tgt_mask):
        return self.transformer.decoder(self.positional_encoding(self.tgt_tok_emb(tgt)), memory, tgt_mask)

In [3]:
def generate_square_subsequent_mask(sz):
    # Masking
    mask = (torch.triu(torch.ones((sz, sz), device=device)) == 1).transpose(0, 1)
    mask = mask.float().masked_fill(mask == 0, float('-inf')).masked_fill(mask == 1, float(0.0))
    return mask

def create_mask(src, tgt):
    src_seq_len = src.shape[1]
    tgt_seq_len = tgt.shape[1]

    # Target mask
    tgt_mask = generate_square_subsequent_mask(tgt_seq_len)
    src_mask = torch.zeros((src_seq_len, src_seq_len), device=device).type(torch.bool) # Encoder can see everything

    # Padding mask (Don't see spaces)
    src_padding_mask = (src == PAD_IDX)
    tgt_padding_mask = (tgt == PAD_IDX)
    return src_mask, tgt_mask, src_padding_mask, tgt_padding_mask

In [4]:
# Hyperparametrs
SRC_VOCAB_SIZE = len(vocab_de)
TGT_VOCAB_SIZE = len(vocab_en)
EMB_SIZE = 512
NHEAD = 8               # Num of attention heads
FFN_HID_DIM = 512       # Feed Forward layer dimension
BATCH_SIZE = 32
NUM_ENCODER_LAYERS = 3  # Changed here for Ablation (3-e)
NUM_DECODER_LAYERS = 3

transformer = Seq2SeqTransformer(NUM_ENCODER_LAYERS, NUM_DECODER_LAYERS, EMB_SIZE,
                                 NHEAD, SRC_VOCAB_SIZE, TGT_VOCAB_SIZE, FFN_HID_DIM)

# Use Xavier Initialization for weights for faster learning
for p in transformer.parameters():
    if p.dim() > 1:
        nn.init.xavier_uniform_(p)

transformer = transformer.to(device)

loss_fn = nn.CrossEntropyLoss(ignore_index=PAD_IDX)
optimizer = torch.optim.Adam(transformer.parameters(), lr=0.0001, betas=(0.9, 0.98), eps=1e-9)

print(f"Model Parametre Count: {sum(p.numel() for p in transformer.parameters() if p.requires_grad)}")


Model Parametre Count: 23073839


In [5]:
import time
import torch.optim as optim
from torch.utils.data import Subset

# HYPERPARAMETRS
#DATA_LIMIT = 1500         # DATA_LIMIT must be increased
N_EPOCHS = 5              # Epoch must be at least 5
LEARNING_RATE = 0.0005

# Model Architecture
# Dimension were chosen so that it can be compared with RNN
ENC_LAYERS = 3       # Num of Encoder layers (3 layers are better for it to compete with RNN)
DEC_LAYERS = 3       # Num of Decoder layers
EMB_SIZE = 512       # Embedding layers
NHEAD = 8            # num of Attention heads
FFN_HID_DIM = 512    # Feedforward layer dimension


train_loader = DataLoader(dataset['train'], batch_size=32, shuffle=True, collate_fn=collate_batch)
val_loader = DataLoader(dataset['validation'], batch_size=32, shuffle=False, collate_fn=collate_batch)

print(f"Training set dimension: {len(dataset['train'])} sentence")

# Setting up Model
transformer_model = Seq2SeqTransformer(
    num_encoder_layers=ENC_LAYERS,
    num_decoder_layers=DEC_LAYERS,
    emb_size=EMB_SIZE,
    nhead=NHEAD,
    src_vocab_size=len(vocab_de),
    tgt_vocab_size=len(vocab_en),
    dim_feedforward=FFN_HID_DIM
).to(device)

# Xavier Initialization for weights
for p in transformer_model.parameters():
    if p.dim() > 1:
        nn.init.xavier_uniform_(p)

# Optimizer
optimizer = optim.Adam(transformer_model.parameters(), lr=LEARNING_RATE, betas=(0.9, 0.98), eps=1e-9)
loss_fn = nn.CrossEntropyLoss(ignore_index=PAD_IDX)

print(f"Number of Transformer Model Parametres: {sum(p.numel() for p in transformer_model.parameters() if p.requires_grad):,}")

# Standard Training Function
def train_epoch(model, iterator, optimizer, criterion, device):
    model.train()
    epoch_loss = 0

    for src, tgt in iterator:
        src = src.to(device)
        tgt = tgt.to(device)

        tgt_input = tgt[:, :-1] # Final token except for (<eos>)

        # Create Masks
        src_mask, tgt_mask, src_padding_mask, tgt_padding_mask = create_mask(src, tgt_input)

        # Run the Model
        logits = model(src, tgt_input, src_mask, tgt_mask, src_padding_mask, tgt_padding_mask, src_padding_mask)

        optimizer.zero_grad()

        tgt_out = tgt[:, 1:] # First token except for (<sos>)
        loss = criterion(logits.reshape(-1, logits.shape[-1]), tgt_out.reshape(-1))

        loss.backward()
        optimizer.step()
        epoch_loss += loss.item()

    return epoch_loss / len(iterator)

# Standard evaluation function
def evaluate_epoch(model, iterator, criterion, device):
    model.eval()
    epoch_loss = 0

    with torch.no_grad():
        for src, tgt in iterator:
            src = src.to(device)
            tgt = tgt.to(device)

            tgt_input = tgt[:, :-1]
            src_mask, tgt_mask, src_padding_mask, tgt_padding_mask = create_mask(src, tgt_input)

            logits = model(src, tgt_input, src_mask, tgt_mask, src_padding_mask, tgt_padding_mask, src_padding_mask)

            tgt_out = tgt[:, 1:]
            loss = criterion(logits.reshape(-1, logits.shape[-1]), tgt_out.reshape(-1))
            epoch_loss += loss.item()

    return epoch_loss / len(iterator)

# Training Loop
for epoch in range(1, N_EPOCHS+1):
    start_time = time.time()

    train_loss = train_epoch(transformer_model, train_loader, optimizer, loss_fn, device)
    val_loss = evaluate_epoch(transformer_model, val_loader, loss_fn, device)

    end_time = time.time()

    print(f"Epoch: {epoch:02} | Train Loss: {train_loss:.3f} | Val Loss: {val_loss:.3f} | Süre: {(end_time - start_time):.1f}s")

Training set dimension: 29000 sentence
Number of Transformer Model Parametres: 23,073,839




Epoch: 01 | Train Loss: 4.601 | Val Loss: 3.929 | Süre: 43.9s
Epoch: 02 | Train Loss: 3.862 | Val Loss: 3.636 | Süre: 39.5s
Epoch: 03 | Train Loss: 3.658 | Val Loss: 3.523 | Süre: 39.3s
Epoch: 04 | Train Loss: 3.546 | Val Loss: 3.406 | Süre: 39.2s
Epoch: 05 | Train Loss: 3.454 | Val Loss: 3.363 | Süre: 39.3s


In [6]:
from torchtext.data.metrics import bleu_score

# Greedy Decoding (batch_first=True)
def greedy_decode(model, src, src_mask, max_len, start_symbol):
    # src: [1, seq_len] (Batch size 1, Row length N)
    src = src.to(device)
    src_mask = src_mask.to(device)

    memory = model.encode(src, src_mask)

    # Starting token (Batch=1, Seq=1)
    ys = torch.ones(1, 1).fill_(start_symbol).type(torch.long).to(device)

    for i in range(max_len-1):
        memory = memory.to(device)

        # ys.size(1) is the length of sequence
        tgt_mask = (generate_square_subsequent_mask(ys.size(1))
                    .type(torch.bool)).to(device)

        out = model.decode(ys, memory, tgt_mask)

        # no need to transpose since batch_first=True
        # Take the probabilities of final words: out[:, -1] -> [Batch, Dim]
        prob = model.generator(out[:, -1])

        _, next_word = torch.max(prob, dim=1)
        next_word = next_word.item()

        ys = torch.cat([ys, torch.ones(1, 1).type_as(src.data).fill_(next_word)], dim=1)

        if next_word == EOS_IDX:
            break

    return ys

# Translation Function
def translate_sentence_transformer(model, sentence, vocab_de, vocab_en):
    model.eval()

    # Tokenize and convert to ID
    # unsqueeze(0) -> [1, Seq_Len]
    src = torch.tensor([vocab_de[token] for token in tokenize_de(sentence)]).long().unsqueeze(0).to(device)
    num_tokens = src.shape[1]

    # Encoder mask
    src_mask = (torch.zeros(num_tokens, num_tokens)).type(torch.bool).to(device)

    # Predict
    tgt_tokens = greedy_decode(model, src, src_mask, max_len=num_tokens + 5, start_symbol=SOS_IDX).flatten()

    # Convert to word
    return [vocab_en.lookup_token(i) for i in list(tgt_tokens.cpu().numpy())]

# BLEU Calculation
def calculate_bleu_transformer(data, model):
    trgs = []
    pred_trgs = []

    # Calculate for first 100 examples of Test set (To speed up)
    for i, datum in enumerate(data):
        if i > 100: break
        src = datum['de']
        trg = datum['en']
        pred_tokens = translate_sentence_transformer(model, src, vocab_de, vocab_en)
        pred_str = [t for t in pred_tokens if t not in ['<sos>', '<eos>', '<pad>']]
        trg_tokens = tokenize_en(trg)
        pred_trgs.append(pred_str)
        trgs.append([trg_tokens])

    return bleu_score(pred_trgs, trgs)



In [7]:
# An example translation
print("--- Example Translation ---")
test_sentence = "Ein Mann spielt Fußball."
translation = translate_sentence_transformer(transformer_model, test_sentence, vocab_de, vocab_en)
print(f"Source: {test_sentence}")
print(f"Translation: {' '.join(translation).replace('<sos>', '').replace('<eos>', '')}")

# BLEU Score calculation
score = calculate_bleu_transformer(dataset['test'], transformer_model)
print(f"Model BLEU Score: {score*100:.2f}")

--- Example Translation ---
Source: Ein Mann spielt Fußball.
Translation:  A man is walking down a street . 
Model BLEU Score: 4.24


In [8]:
import pandas as pd
import time
import torch
import torch.nn as nn
import torch.optim as optim

# Ablation Configs (Task 3-e)
ablation_configs = [
    {"name": "Base Model",       "L": 3, "H": 8}, # Original
    {"name": "Fewer Layers (2)", "L": 2, "H": 8}, # Shallower
    {"name": "Fewer Heads (4)",  "L": 3, "H": 4}  # Less attention
]

results = []

print(f"{'='*40}\nSTARTING ABLATION STUDY\n{'='*40}")

for cfg in ablation_configs:
    print(f"\n>> Testing: {cfg['name']} (Layers={cfg['L']}, Heads={cfg['H']})")

    # Re-initialize Model with new config
    model = Seq2SeqTransformer(
        num_encoder_layers=cfg['L'],
        num_decoder_layers=cfg['L'],
        emb_size=512,
        nhead=cfg['H'],
        src_vocab_size=len(vocab_de),
        tgt_vocab_size=len(vocab_en),
        dim_feedforward=512
    ).to(device)

    # Init Weights & Optimizer
    for p in model.parameters():
        if p.dim() > 1: nn.init.xavier_uniform_(p)

    optimizer = optim.Adam(model.parameters(), lr=0.0005, betas=(0.9, 0.98), eps=1e-9)

    # Training Loop (5 Epochs)
    start = time.time()
    for ep in range(1, 6):
        train_loss = train_epoch(model, train_loader, optimizer, loss_fn, device)


    elapsed = time.time() - start

    # Calculate BLEU
    # Note: We use the test set to get the final score
    bleu = calculate_bleu_transformer(dataset['test'], model)

    print(f"\n   Done! Time: {elapsed:.1f}s | BLEU: {bleu*100:.2f}")

    # Save Results
    results.append({
        "Configuration": cfg['name'],
        "Layers": cfg['L'],
        "Heads": cfg['H'],
        "Time (s)": round(elapsed, 1),
        "BLEU": round(bleu * 100, 2)
    })

print("\n" + "="*40 + "\nABLATION RESULTS TABLE\n" + "="*40)
df = pd.DataFrame(results)
print(df)

STARTING ABLATION STUDY

>> Testing: Base Model (Layers=3, Heads=8)

   Done! Time: 194.2s | BLEU: 5.32

>> Testing: Fewer Layers (2) (Layers=2, Heads=8)

   Done! Time: 147.4s | BLEU: 7.96

>> Testing: Fewer Heads (4) (Layers=3, Heads=4)

   Done! Time: 193.0s | BLEU: 3.24

ABLATION RESULTS TABLE
      Configuration  Layers  Heads  Time (s)  BLEU
0        Base Model       3      8     194.2  5.32
1  Fewer Layers (2)       2      8     147.4  7.96
2   Fewer Heads (4)       3      4     193.0  3.24
