<a href="https://colab.research.google.com/github/RoseVZ/Papers_to_code/blob/main/Transformer.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## On colab since I got the student Pro :))
Free Compute Yayyy!

## Imports

In [6]:
import os
import re
import math
import time
import random
import zipfile
import urllib.request

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader

from collections import Counter
from typing import List, Tuple




In [7]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")



Using device: cuda


### 0.1 Dataset

In [8]:
def preprocess(s: str) -> str:
    s = s.lower()
    s = re.sub(r"([?.!,¿])", r" \1 ", s)
    s = re.sub(r'[" "]+', " ", s)
    s = re.sub(r"[^a-zA-Z?.!,¿]+", " ", s)
    return s.strip()

In [9]:
# Load a small subset of sentence pairs
def load_sentence_pairs(path="fra.txt", max_sentences=10000):
    pairs = []
    with open(path, "r", encoding="utf-8") as f:
        for line in f:
            eng, fra, *_ = line.strip().split("\t")
            eng = preprocess(eng)
            fra = preprocess(fra)
            pairs.append((eng, fra))
            if len(pairs) >= max_sentences:
                break
    return pairs

In [10]:
load_sentence_pairs()[:5]

[('go .', 'va !'),
 ('go .', 'marche .'),
 ('go .', 'en route !'),
 ('go .', 'bouge !'),
 ('hi .', 'salut !')]

#Purpose of Vocab Class
###The Vocab class takes in a list of sentences and builds:

A word-to-index mapping: stoi (string → integer)

An index-to-word mapping: itos (integer → string)

Plus some special tokens like <pad\>, <sos\>, etc.

These tokens are essential:

<pad\>: for padding short sequences to equal length

<sos\>: start-of-sentence (decoder input)

<eos\>: end-of-sentence (decoder supervision)

<unk\>: unknown word (out-of-vocabulary)

In [11]:
# Tokenizer and vocabulary
class Vocab:

    def __init__(self, sentences: List[str], min_freq=2):
        self.pad_token = "<pad>"
        self.sos_token = "<sos>"
        self.eos_token = "<eos>"
        self.unk_token = "<unk>"

        #Create a list of these 4 special tokens to always include at the beginning of the vocabulary.
        special_tokens = [self.pad_token, self.sos_token, self.eos_token, self.unk_token]

        #put/flatten all words into a single 1d list
        words = [word for s in sentences for word in s.split()]
        #Calculate the frequency of the word in the vocabulary
        freq = Counter(words)

        #store the words with frequency greater than the minimum (here 2)
        vocab = [w for w, f in freq.items() if f >= min_freq]

        #mapping from int to string including the special tokens and all the words in vocab
        self.itos = special_tokens + sorted(vocab)
        #reverse map: string → index
        self.stoi = {word: i for i, word in enumerate(self.itos)}

    def encode(self, sentence: str) -> List[int]:
        #first convert the sentence to a 1D list-> look up the values of each token on the string to int map(stoi)
        #if not found return the <unk> token
        return [self.stoi.get(w, self.stoi[self.unk_token]) for w in sentence.split()]

    def decode(self, indices: List[int]) -> str:
        #same as encode but takes a list of indices as converts to words/tokens
        #eg. [8,4,5]->"I love pasta" :) [I do]
        return " ".join([self.itos[i] for i in indices if self.itos[i] not in [self.pad_token, self.sos_token, self.eos_token]])

    def __len__(self):
      #send the size of vocab: helpful while building NN
        return len(self.itos)



In [12]:
#get the eng-french pair
pairs = load_sentence_pairs()
#separate them
eng_sentences = [p[0] for p in pairs]
fra_sentences = [p[1] for p in pairs]

#create the Vocab
eng_vocab = Vocab(eng_sentences)
fra_vocab = Vocab(fra_sentences)

print(f"English vocab size: {len(eng_vocab)}, French vocab size: {len(fra_vocab)}")

English vocab size: 1404, French vocab size: 1940


### Dataset prep :(

In [13]:
#defining a size for each input sentence
MAX_LEN = 20

#pad using <pad> the i/p sentence if its shorter or truncate if longer
def pad_seq(seq, max_len, pad_idx):
    return seq[:max_len] + [pad_idx] * (max_len - len(seq))


class TranslationDataset(Dataset):
    def __init__(self, pairs: List[Tuple[str, str]], src_vocab: Vocab, tgt_vocab: Vocab):
        """
        Inputs:
              pairs: list of tuples (source_sentence, target_sentence) both as strings
              src_vocab: vocabulary object for source language (e.g., English)
              tgt_vocab: vocabulary object for target language (e.g., French).
        """
        self.src_vocab = src_vocab
        self.tgt_vocab = tgt_vocab
        self.data = []
        for src, tgt in pairs:
            #pad the encoded english (source vals)
            src_ids = pad_seq(src_vocab.encode(src), MAX_LEN, src_vocab.stoi[src_vocab.pad_token])
            #convert target = <strt> token + target_vals+ <endofsentence> token
            tgt_ids = [tgt_vocab.stoi[tgt_vocab.sos_token]] + tgt_vocab.encode(tgt) + [tgt_vocab.stoi[tgt_vocab.eos_token]]
            #pad to MAX_LEN+2 for sos and eos tokens
            tgt_ids = pad_seq(tgt_ids, MAX_LEN + 2, tgt_vocab.stoi[tgt_vocab.pad_token])
            #append the src and target to data
            self.data.append((src_ids, tgt_ids))

    def __len__(self):
      #return len of data
        return len(self.data)

    def __getitem__(self, idx):
      #Given an index idx, returns the source and target sequences as PyTorch tensors.
        return torch.tensor(self.data[idx][0]), torch.tensor(self.data[idx][1])


In [14]:
dataset = TranslationDataset(pairs, eng_vocab, fra_vocab)
train_loader = DataLoader(dataset, batch_size=32, shuffle=True)

### Model Building

### Building Blocks


In [15]:
class PositionalEncoding(nn.Module):
    """
    Input:
        d_model: embedding dimension
        max_len: maximum length of the input sequence
    """
    def __init__(self, d_model, max_len=5000):
        super().__init__()
        #initializing a matrix of zeros for positonal embeddings with shape (max_len)x embedding_dim
        pe = torch.zeros(max_len, d_model)
        #Creates a column vector [0, 1, ..., max_len-1] to represent positions with shape max_lenx1
        position = torch.arange(0, max_len).unsqueeze(1)

        #Computes a scaling term that varies across dimensions.
        div_term = torch.exp(torch.arange(0, d_model, 2) * (-math.log(10000.0) / d_model))

        #each position gets a unique encoding.
        #even gets sin and odd gets cosine
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)

        self.pe = pe.unsqueeze(0)  # shape: (1, max_len, d_model) adds batch dimension

    def forward(self, x):
      #Adds positional encodings to the input embeddings.
        return x + self.pe[:, :x.size(1)].to(x.device)



In [47]:
# 5) Multi-head Attention
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, n_heads):
        """
        Input:
            d_model: embedding dimension
            n_heads: number of attention heads
        """
        super().__init__()
        # Number of heads must evenly divide the embedding dimension
        assert d_model % n_heads == 0
        # Dimension of each head
        self.d_k = d_model // n_heads
        self.n_heads = n_heads

        # Linear layers to project inputs to queries, keys and values
        self.q_linear = nn.Linear(d_model, d_model)
        self.k_linear = nn.Linear(d_model, d_model)
        self.v_linear = nn.Linear(d_model, d_model)
        # Final linear layer after concatenating heads
        self.out = nn.Linear(d_model, d_model)

    def forward(self, q, k, v, mask=None):
        # Get batch size and sequence lengths of q, k, v and embedding dims
        B, Lq, Dq = q.size()
        B, Lk, Dk = k.size()
        B, Lv, Dv = v.size()

        # Project queries, keys, values and reshape to (B, n_heads, Seq_len, d_k)
        q = self.q_linear(q).view(B, Lq, self.n_heads, self.d_k).transpose(1, 2)
        k = self.k_linear(k).view(B, Lk, self.n_heads, self.d_k).transpose(1, 2)
        v = self.v_linear(v).view(B, Lv, self.n_heads, self.d_k).transpose(1, 2)

        # Compute scaled dot-product attention scores
        scores = torch.matmul(q, k.transpose(-2, -1)) / math.sqrt(self.d_k)
        # Apply mask if provided (e.g., padding or future tokens)
        if mask is not None:
            scores = scores.masked_fill(mask == 0, float("-inf"))
        # Normalize attention scores to probabilities
        attn = torch.softmax(scores, dim=-1)
        # Compute weighted sum of values according to attention
        context = torch.matmul(attn, v)
        # Concatenate all heads and reshape back to (B, Lq, Dq)
        context = context.transpose(1, 2).contiguous().view(B, Lq, Dq)

        # Final linear projection
        return self.out(context)

In [17]:
class FeedForward(nn.Module):
    def __init__(self, d_model, d_ff):
        super().__init__()
        self.net = nn.Sequential(
            #expands to size d_ff
            nn.Linear(d_model, d_ff),
            nn.ReLU(),
            #reduces back to d_model
            nn.Linear(d_ff, d_model)
        )

    def forward(self, x):
        return self.net(x)

In [18]:
class EncoderLayer(nn.Module):
    def __init__(self, d_model, n_heads, d_ff):
        super().__init__()
        #Self-attention + Feed-forward sublayer.
        self.attn = MultiHeadAttention(d_model, n_heads)
        self.ff = FeedForward(d_model, d_ff)

        #two normalization layers
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)

    def forward(self, x, mask=None):
      #Self-attention + residual + norm
        x2 = self.norm1(x + self.attn(x, x, x, mask))

      # Feed-forward + residual + norm
        x3 = self.norm2(x2 + self.ff(x2))
        return x3



In [19]:
class DecoderLayer(nn.Module):
    def __init__(self, d_model, n_heads, d_ff):
        super().__init__()
        #Self-attention for target tokens and attention over encoder output
        self.self_attn = MultiHeadAttention(d_model, n_heads)
        self.enc_attn = MultiHeadAttention(d_model, n_heads)
        self.ff = FeedForward(d_model, d_ff)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.norm3 = nn.LayerNorm(d_model)

    def forward(self, x, enc_out, tgt_mask=None, src_mask=None):
      #Masked self-attention on decoder inputs
        x = self.norm1(x + self.self_attn(x, x, x, tgt_mask))
      #Encoder-decoder attention (cross-attn)
        x = self.norm2(x + self.enc_attn(x, enc_out, enc_out, src_mask))
        #Feed-forward
        x = self.norm3(x + self.ff(x))
        return x

### Assembling the blocks

In [20]:
class Encoder(nn.Module):
    def __init__(self, vocab_size, d_model, n_heads, d_ff, num_layers, max_len):
        """
        Input:
            vocab_size: size of the input
            d_model: embedding dimension
            n_heads: number of attention heads
            d_ff: dimension of the feed-forward network
            num_layers: number of encoder layers
            max_len: maximum length of the input sequence
        """
        super().__init__()
        #embedding lookup table: maps word indices to d_model-dimensional vectors.
        self.embedding = nn.Embedding(vocab_size, d_model)
        #positional encoding
        self.pos_encoding = PositionalEncoding(d_model, max_len)
        #a list of encoder layers (self-attention + feedforward blocks)
        self.layers = nn.ModuleList([
            EncoderLayer(d_model, n_heads, d_ff) for _ in range(num_layers)
        ])
        #prevent overfitting
        self.dropout = nn.Dropout(0.1)

    def forward(self, src, mask=None):
      #Embeds the input and scales it by √d_model

        x = self.embedding(src) * math.sqrt(self.embedding.embedding_dim)
        print("Embedding output shape:", x.shape)
        #gets positional encoding
        x = self.pos_encoding(x)
        x = self.dropout(x)
        #creates encoding layers
        for layer in self.layers:
            x = layer(x, mask)
        return x

In [21]:
class Decoder(nn.Module):
    def __init__(self, vocab_size, d_model, n_heads, d_ff, num_layers, max_len):
        """
        Input:
            vocab_size: size of the input
            d_model: embedding dimension
            n_heads: number of attention heads
            d_ff: dimension of the feed-forward network
            num_layers: number of decoder layers
            max_len: maximum length of the input sequence
        """
        super().__init__()
        #Embeds target token indices
        self.embedding = nn.Embedding(vocab_size, d_model)
        #adds positional info
        self.pos_encoding = PositionalEncoding(d_model, max_len)
        #list of decoding layers
        self.layers = nn.ModuleList([
            DecoderLayer(d_model, n_heads, d_ff) for _ in range(num_layers)
        ])
        #avoid overfitting
        self.dropout = nn.Dropout(0.1)

    def forward(self, tgt, enc_out, tgt_mask=None, src_mask=None):
      #Embedding + scaling + positional encoding + dropout.
        x = self.embedding(tgt) * math.sqrt(self.embedding.embedding_dim)
        x = self.pos_encoding(x)
        x = self.dropout(x)
        for layer in self.layers:
            x = layer(x, enc_out, tgt_mask, src_mask)
        return x

In [22]:
class Transformer(nn.Module):
    def __init__(self, src_vocab_size, tgt_vocab_size, d_model=512, n_heads=8,
                 d_ff=2048, num_layers=6, max_len=100):
        """
        Input:
            src_vocab_size: size of the source vocabulary
            tgt_vocab_size: size of the target vocabulary
            d_model: embedding dimension
            n_heads: number of attention heads
            d_ff: dimension of the feed-forward network
            num_layers: number of encoder and decoder layers
            max_len: maximum length of the input sequence
        """
        super().__init__()
        #encoder
        self.encoder = Encoder(src_vocab_size, d_model, n_heads, d_ff, num_layers, max_len)
        #decoder
        self.decoder = Decoder(tgt_vocab_size, d_model, n_heads, d_ff, num_layers, max_len)
        #fully connected feed forward network
        self.fc_out = nn.Linear(d_model, tgt_vocab_size)

    def forward(self, src, tgt, src_mask=None, tgt_mask=None):
        enc_out = self.encoder(src, src_mask)
        dec_out = self.decoder(tgt, enc_out, tgt_mask, src_mask)
        return self.fc_out(dec_out)

#### some sort of Masking to prevent transformers from attending to padding and other stuff

In [23]:
def create_pad_mask(matrix, pad_token=0):
  #Returns a boolean mask where True indicates non-padding positions.
    return (matrix != pad_token).unsqueeze(1).unsqueeze(2)

In [52]:
def create_look_ahead_mask(size, device):
    # Create upper triangular matrix of shape (1,1,size,size) on the given device
    mask = torch.triu(torch.ones((1, 1, size, size), device=device), diagonal=1).bool()
    # Invert mask: True means allowed, False means masked (future tokens)
    return ~mask  # shape: (1, 1, size, size)

In [35]:
def combine_masks(tgt_seq, pad_token=0):
    pad_mask = create_pad_mask(tgt_seq, pad_token)  # shape: (B, 1, 1, T)
    look_ahead_mask = create_look_ahead_mask(tgt_seq.size(1)).to(tgt_seq.device)  # shape: (1, 1, T, T)
    return pad_mask & ~look_ahead_mask  # valid tokens

## Training

In [26]:
def masked_loss_fn(pred, target, pad_token=0):
   # padding tokens don’t contribute to loss.
    loss_fn = nn.CrossEntropyLoss(ignore_index=pad_token)
    return loss_fn(pred.view(-1, pred.size(-1)), target.view(-1))

In [27]:
src_vocab = Vocab([pair[0] for pair in pairs])
tgt_vocab = Vocab([pair[1] for pair in pairs])

In [28]:
src_vocab_size = len(src_vocab)
tgt_vocab_size = len(tgt_vocab)

In [29]:
model = Transformer(src_vocab_size, tgt_vocab_size, d_model=256, n_heads=4, d_ff=512, num_layers=2, max_len=100).to(device)

optimizer = torch.optim.Adam(model.parameters(), lr=1e-4, betas=(0.9, 0.98), eps=1e-9)


In [48]:
num_epochs = 10
for epoch in range(num_epochs):
    model.train()
    total_loss = 0
    for src_batch, tgt_batch in train_loader:
        src_batch = src_batch.to(device)
        tgt_batch = tgt_batch.to(device)

        tgt_input = tgt_batch[:, :-1]
        tgt_output = tgt_batch[:, 1:]

        src_mask = create_pad_mask(src_batch, src_vocab.stoi[src_vocab.pad_token]).to(device)
        tgt_mask = combine_masks(tgt_input, tgt_vocab.stoi[tgt_vocab.pad_token])

        preds = model(src_batch, tgt_input, src_mask=src_mask, tgt_mask=tgt_mask)
        loss = masked_loss_fn(preds, tgt_output, tgt_vocab.stoi[tgt_vocab.pad_token])

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

    print(f"Epoch {epoch+1}/{num_epochs}, Loss: {total_loss / len(train_loader):.4f}")

Epoch 1/10, Loss: 1.2369
Epoch 2/10, Loss: 1.1476
Epoch 3/10, Loss: 1.0621
Epoch 4/10, Loss: 0.9905
Epoch 5/10, Loss: 0.9192
Epoch 6/10, Loss: 0.8568
Epoch 7/10, Loss: 0.8010
Epoch 8/10, Loss: 0.7515
Epoch 9/10, Loss: 0.7023
Epoch 10/10, Loss: 0.6605


### INFERENCE

In [49]:
import torch

def translate_sentence(model, src_sentence, src_vocab, tgt_vocab, max_len=20, device='cpu'):
    model.eval()

    # Preprocess and encode the source sentence
    src_tokens = src_vocab.encode(src_sentence)
    src_tokens = src_tokens[:max_len]  # truncate if longer than max_len
    src_tokens += [src_vocab.stoi[src_vocab.pad_token]] * (max_len - len(src_tokens))  # pad

    src_tensor = torch.tensor([src_tokens], dtype=torch.long).to(device)  # (1, max_len)

    # Create source mask (pad tokens masked)
    src_mask = (src_tensor != src_vocab.stoi[src_vocab.pad_token]).unsqueeze(1).unsqueeze(2)  # (1,1,1,max_len)

    # Start with <sos> token as initial target input
    tgt_tokens = [tgt_vocab.stoi[tgt_vocab.sos_token]]

    for _ in range(max_len):
        tgt_tensor = torch.tensor([tgt_tokens], dtype=torch.long).to(device)  # (1, current_len)

        # Create target mask (pad + look-ahead mask)
        tgt_mask = create_look_ahead_mask(tgt_tensor.size(1), device).to(device)
        tgt_pad_mask = (tgt_tensor != tgt_vocab.stoi[tgt_vocab.pad_token]).unsqueeze(1).unsqueeze(2).to(device)
        combined_tgt_mask = tgt_pad_mask & tgt_mask  # (1,1,len,len)

        # Forward pass
        with torch.no_grad():
            outputs = model(src_tensor, tgt_tensor, src_mask=src_mask, tgt_mask=combined_tgt_mask)

        # Get probabilities of the last token
        last_token_logits = outputs[0, -1, :]  # (vocab_size,)
        predicted_id = torch.argmax(last_token_logits).item()

        # Stop if <eos> predicted
        if predicted_id == tgt_vocab.stoi[tgt_vocab.eos_token]:
            break

        tgt_tokens.append(predicted_id)

    # Decode to words, skip special tokens
    translated_sentence = tgt_vocab.decode(tgt_tokens[1:])  # remove <sos> token

    return translated_sentence

In [53]:
sentence = "go ."
translation = translate_sentence(model, sentence, eng_vocab, fra_vocab, max_len=20, device=device)
print("English:", sentence)
print("French:", translation)


English: go .
French: va !
