In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import math

class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, num_heads):
        super().__init__()
        # Ensure the model dimension can be evenly split across the heads
        assert d_model % num_heads == 0, "d_model must be divisible by num_heads"

        self.d_model = d_model
        self.num_heads = num_heads
        self.d_k = d_model // num_heads # Dimension of each head

        # Linear projections for Query, Key, and Value
        self.W_q = nn.Linear(d_model, d_model)
        self.W_k = nn.Linear(d_model, d_model)
        self.W_v = nn.Linear(d_model, d_model)

        # Final output linear projection
        self.W_o = nn.Linear(d_model, d_model)

    def scaled_dot_product_attention(self, Q, K, V, mask=None):
        # 1. Multiply Q and K^T, then scale down by the square root of d_k
        scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.d_k)

        # 2. Apply the mask (if provided) to hide future tokens during training
        if mask is not None:
            scores = scores.masked_fill(mask == 0, -1e9)

        # 3. Apply softmax to get the attention probabilities
        attention_weights = F.softmax(scores, dim=-1)

        # 4. Multiply by V to get the final context vectors
        output = torch.matmul(attention_weights, V)
        return output

    def forward(self, Q, K, V, mask=None):
        batch_size = Q.size(0)

        # 1. Pass inputs through linear layers and reshape to split into multiple heads
        # New shape: (batch_size, num_heads, seq_length, d_k)
        Q = self.W_q(Q).view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2)
        K = self.W_k(K).view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2)
        V = self.W_v(V).view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2)

        # 2. Calculate attention for all heads simultaneously
        attention_output = self.scaled_dot_product_attention(Q, K, V, mask)

        # 3. Concatenate the heads back together
        # Reshape back to: (batch_size, seq_length, d_model)
        attention_output = attention_output.transpose(1, 2).contiguous().view(batch_size, -1, self.d_model)

        # 4. Pass through the final linear layer
        return self.W_o(attention_output)

In [2]:
# 1. Define our model dimensions
batch_size = 2      # Processing 2 sentences at a time
seq_length = 10     # Each sentence has 10 words (tokens)
d_model = 512       # Each word is represented by a 512-dimensional vector
num_heads = 8       # We will split the 512 dimensions into 8 parallel attention heads

# 2. Instantiate our Multi-Head Attention block
mha_layer = MultiHeadAttention(d_model=d_model, num_heads=num_heads)

# 3. Create a dummy input tensor full of random numbers
# In self-attention, the Queries (Q), Keys (K), and Values (V) all come from the exact same input
dummy_input = torch.randn(batch_size, seq_length, d_model)

# 4. Pass the dummy data through the layer
output = mha_layer(Q=dummy_input, K=dummy_input, V=dummy_input)

# 5. Print the shapes to verify
print(f"Input shape : {dummy_input.shape} -> [Batch, Sequence Length, D_Model]")
print(f"Output shape: {output.shape} -> [Batch, Sequence Length, D_Model]")

Input shape : torch.Size([2, 10, 512]) -> [Batch, Sequence Length, D_Model]
Output shape: torch.Size([2, 10, 512]) -> [Batch, Sequence Length, D_Model]


In [3]:
class PositionWiseFeedForward(nn.Module):
    def __init__(self, d_model, d_ff):
        super().__init__()
        # d_ff is typically 4x larger than d_model (e.g., 512 -> 2048)
        self.fc1 = nn.Linear(d_model, d_ff)
        self.fc2 = nn.Linear(d_ff, d_model)
        self.dropout = nn.Dropout(0.1)

    def forward(self, x):
        # Pass through first layer, apply ReLU activation, dropout, then second layer
        return self.fc2(self.dropout(F.relu(self.fc1(x))))

class EncoderBlock(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout=0.1):
        super().__init__()
        self.self_attn = MultiHeadAttention(d_model, num_heads)
        self.feed_forward = PositionWiseFeedForward(d_model, d_ff)

        # PyTorch has a built-in LayerNorm we can use
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)

        self.dropout = nn.Dropout(dropout)

    def forward(self, x, mask=None):
        # 1. Self-Attention part
        attn_output = self.self_attn(Q=x, K=x, V=x, mask=mask)
        # Add residual connection and normalize
        x = self.norm1(x + self.dropout(attn_output))

        # 2. Feed-Forward part
        ff_output = self.feed_forward(x)
        # Add residual connection and normalize
        x = self.norm2(x + self.dropout(ff_output))

        return x

In [4]:
# 1. Setup dimensions (d_ff is usually 4 * d_model)
d_ff = 2048

# 2. Instantiate the complete Encoder Block
encoder_block = EncoderBlock(d_model=d_model, num_heads=num_heads, d_ff=d_ff)

# 3. Pass our previous dummy input through the block
# We use the same dummy_input from the previous step
encoder_output = encoder_block(dummy_input)

# 4. Verify shapes
print(f"Encoder Input shape : {dummy_input.shape}")
print(f"Encoder Output shape: {encoder_output.shape}")

Encoder Input shape : torch.Size([2, 10, 512])
Encoder Output shape: torch.Size([2, 10, 512])


In [5]:
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000, dropout=0.1):
        super().__init__()
        self.dropout = nn.Dropout(p=dropout)

        # Create a long matrix of zeros: shape (max_len, d_model)
        pe = torch.zeros(max_len, d_model)

        # Create a vector of positions: 0, 1, 2, 3... (max_len)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)

        # Calculate the denominator of the sine/cosine argument
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))

        # Apply sine to even indices (0, 2, 4...)
        pe[:, 0::2] = torch.sin(position * div_term)

        # Apply cosine to odd indices (1, 3, 5...)
        pe[:, 1::2] = torch.cos(position * div_term)

        # Add a batch dimension: shape becomes (1, max_len, d_model)
        pe = pe.unsqueeze(0)

        # register_buffer tells PyTorch this isn't a parameter that requires gradients
        self.register_buffer('pe', pe)

    def forward(self, x):
        # x shape: (batch_size, seq_length, d_model)
        # We slice the pre-computed pe matrix to match the actual sequence length of x
        x = x + self.pe[:, :x.size(1), :]
        return self.dropout(x)

In [6]:
# 1. Setup our vocabulary and input parameters
vocab_size = 10000  # Imagine we have a dictionary of 10,000 unique words
d_model = 512
seq_length = 10
batch_size = 2

# 2. Instantiate standard Token Embeddings and our new Positional Encoding
embedding_layer = nn.Embedding(num_embeddings=vocab_size, embedding_dim=d_model)
pos_encoder = PositionalEncoding(d_model=d_model)

# 3. Create dummy raw input (Imagine these are token IDs, like word indexes in our dictionary)
# Shape: (Batch, Sequence Length). Filled with random integers between 0 and vocab_size
raw_token_ids = torch.randint(low=0, high=vocab_size, size=(batch_size, seq_length))

# 4. Pass the raw tokens through the pipeline
# Step A: Turn token IDs into 512-dimensional vectors
embedded_tokens = embedding_layer(raw_token_ids)
# Step B: Inject the positional sine/cosine waves
encoded_input = pos_encoder(embedded_tokens)

# 5. Verify shapes
print(f"Raw Token IDs shape: {raw_token_ids.shape} -> [Batch, Seq_Length]")
print(f"Embedded Tokens shape: {embedded_tokens.shape}")
print(f"Final Encoded Input shape: {encoded_input.shape}")

Raw Token IDs shape: torch.Size([2, 10]) -> [Batch, Seq_Length]
Embedded Tokens shape: torch.Size([2, 10, 512])
Final Encoded Input shape: torch.Size([2, 10, 512])


In [7]:
class TransformerEncoder(nn.Module):
    def __init__(self, vocab_size, d_model, num_heads, d_ff, num_layers, max_len=5000, dropout=0.1):
        super().__init__()
        # 1. The Input Pipeline
        self.embedding = nn.Embedding(num_embeddings=vocab_size, embedding_dim=d_model)
        self.pos_encoder = PositionalEncoding(d_model=d_model, max_len=max_len, dropout=dropout)

        # 2. The Stack of Encoder Blocks
        # We use a list comprehension to create 'num_layers' separate blocks
        self.layers = nn.ModuleList([
            EncoderBlock(d_model, num_heads, d_ff, dropout)
            for _ in range(num_layers)
        ])

        # 3. Final Layer Normalization (Standard practice to stabilize the final output)
        self.norm = nn.LayerNorm(d_model)

    def forward(self, x, mask=None):
        # Step 1: Turn token IDs into dense vectors
        x = self.embedding(x)

        # Step 2: Inject positional "timestamps"
        x = self.pos_encoder(x)

        # Step 3: Pass the data sequentially through all N blocks
        for layer in self.layers:
            x = layer(x, mask)

        # Step 4: Normalize and return the final abstract representation
        return self.norm(x)

In [8]:
# 1. Standard Transformer Parameters
vocab_size = 10000
d_model = 512
num_heads = 8
d_ff = 2048
num_layers = 6  # We are stacking 6 blocks!
batch_size = 2
seq_length = 10

# 2. Instantiate the massive Encoder stack
full_encoder = TransformerEncoder(vocab_size, d_model, num_heads, d_ff, num_layers)

# 3. Generate raw mock data (e.g., a batch of 2 sentences, 10 words each)
raw_input_tokens = torch.randint(low=0, high=vocab_size, size=(batch_size, seq_length))

# 4. Do the forward pass through the entire stack
# Note: In a real training loop, you might wait a second for this to compute!
final_memory_representation = full_encoder(raw_input_tokens)

# 5. Verify the final shape
print(f"Raw Input: {raw_input_tokens.shape}")
print(f"Final Encoder Stack Output: {final_memory_representation.shape}")

Raw Input: torch.Size([2, 10])
Final Encoder Stack Output: torch.Size([2, 10, 512])


In [9]:
class DecoderBlock(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout=0.1):
        super().__init__()
        # 1. Self-Attention (for the text being generated)
        self.self_attn = MultiHeadAttention(d_model, num_heads)

        # 2. Cross-Attention (to look back at the Encoder's output)
        self.cross_attn = MultiHeadAttention(d_model, num_heads)

        # 3. Feed-Forward
        self.feed_forward = PositionWiseFeedForward(d_model, d_ff)

        # Three normalization layers for the three sub-layers
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.norm3 = nn.LayerNorm(d_model)

        self.dropout = nn.Dropout(dropout)

    def forward(self, x, enc_output, src_mask=None, tgt_mask=None):
        # Step 1: Masked Self-Attention (Q, K, V all come from the Decoder)
        # We pass tgt_mask here to prevent looking into the future
        attn_output = self.self_attn(Q=x, K=x, V=x, mask=tgt_mask)
        x = self.norm1(x + self.dropout(attn_output))

        # Step 2: Cross-Attention (Q comes from Decoder, K & V come from Encoder)
        # We pass src_mask here in case the input sequence had padding we need to ignore
        attn_output = self.cross_attn(Q=x, K=enc_output, V=enc_output, mask=src_mask)
        x = self.norm2(x + self.dropout(attn_output))

        # Step 3: Feed-Forward
        ff_output = self.feed_forward(x)
        x = self.norm3(x + self.dropout(ff_output))

        return x

In [10]:
# 1. Set up dimensions
d_model = 512
num_heads = 8
d_ff = 2048
batch_size = 2

# Let's say our input prompt had 10 words, but our generated response has 15 words
seq_length_enc = 10
seq_length_dec = 15

# 2. Instantiate the Decoder Block
decoder_block = DecoderBlock(d_model, num_heads, d_ff)

# 3. Create dummy tensors
# This represents the output from the Encoder (K and V)
encoder_memory = torch.randn(batch_size, seq_length_enc, d_model)

# This represents the text we are currently generating in the Decoder (Q)
decoder_input = torch.randn(batch_size, seq_length_dec, d_model)

# 4. Pass them through the Decoder Block
# Note: We are ignoring masks for this specific shape test
decoder_output = decoder_block(x=decoder_input, enc_output=encoder_memory)

# 5. Verify shapes
print(f"Encoder Memory (Prompt) Shape: {encoder_memory.shape}")
print(f"Decoder Input (Generation) Shape: {decoder_input.shape}")
print(f"Decoder Block Output Shape: {decoder_output.shape}")

Encoder Memory (Prompt) Shape: torch.Size([2, 10, 512])
Decoder Input (Generation) Shape: torch.Size([2, 15, 512])
Decoder Block Output Shape: torch.Size([2, 15, 512])


In [11]:
class TransformerDecoder(nn.Module):
    def __init__(self, vocab_size, d_model, num_heads, d_ff, num_layers, max_len=5000, dropout=0.1):
        super().__init__()
        # 1. The Input Pipeline (for the target text being generated)
        self.embedding = nn.Embedding(num_embeddings=vocab_size, embedding_dim=d_model)
        self.pos_encoder = PositionalEncoding(d_model=d_model, max_len=max_len, dropout=dropout)

        # 2. The Stack of Decoder Blocks
        self.layers = nn.ModuleList([
            DecoderBlock(d_model, num_heads, d_ff, dropout)
            for _ in range(num_layers)
        ])

        # 3. Final Normalization
        self.norm = nn.LayerNorm(d_model)

    def forward(self, x, enc_output, src_mask=None, tgt_mask=None):
        # Step 1: Embed and add positional timestamps
        x = self.embedding(x)
        x = self.pos_encoder(x)

        # Step 2: Pass through all N decoder blocks
        for layer in self.layers:
            x = layer(x, enc_output, src_mask, tgt_mask)

        # Step 3: Normalize and return
        return self.norm(x)

In [12]:
class Transformer(nn.Module):
    def __init__(self, src_vocab_size, tgt_vocab_size, d_model=512, num_heads=8,
                 d_ff=2048, num_layers=6, max_len=5000, dropout=0.1):
        super().__init__()

        # The left half of the architecture
        self.encoder = TransformerEncoder(
            src_vocab_size, d_model, num_heads, d_ff, num_layers, max_len, dropout
        )

        # The right half of the architecture
        self.decoder = TransformerDecoder(
            tgt_vocab_size, d_model, num_heads, d_ff, num_layers, max_len, dropout
        )

        # The final projection layer to predict the next word
        self.fc_out = nn.Linear(d_model, tgt_vocab_size)

    def forward(self, src, tgt, src_mask=None, tgt_mask=None):
        # 1. Encode the source prompt
        enc_output = self.encoder(x=src, mask=src_mask)

        # 2. Decode using the prompt's memory and the current generated text
        dec_output = self.decoder(x=tgt, enc_output=enc_output, src_mask=src_mask, tgt_mask=tgt_mask)

        # 3. Project to vocabulary size to get raw word probabilities (logits)
        logits = self.fc_out(dec_output)

        return logits

In [13]:
# 1. Setup global parameters
src_vocab_size = 10000  # e.g., English vocabulary
tgt_vocab_size = 15000  # e.g., French vocabulary (often different sizes!)
batch_size = 2
src_seq_length = 10     # Length of the English prompt
tgt_seq_length = 15     # Length of the current French translation

# 2. Instantiate the full Transformer!
model = Transformer(src_vocab_size=src_vocab_size, tgt_vocab_size=tgt_vocab_size)

# 3. Create raw dummy input tokens
source_prompt = torch.randint(0, src_vocab_size, (batch_size, src_seq_length))
target_generation = torch.randint(0, tgt_vocab_size, (batch_size, tgt_seq_length))

# 4. Do a complete forward pass
output_logits = model(src=source_prompt, tgt=target_generation)

# 5. Verify the final shape
print(f"Source Prompt Shape: {source_prompt.shape}")
print(f"Target Generation Shape: {target_generation.shape}")
print(f"Final Output Logits Shape: {output_logits.shape} -> [Batch, Target Sequence Length, Target Vocab Size]")

Source Prompt Shape: torch.Size([2, 10])
Target Generation Shape: torch.Size([2, 15])
Final Output Logits Shape: torch.Size([2, 15, 15000]) -> [Batch, Target Sequence Length, Target Vocab Size]


In [14]:
def create_masks(src, tgt, pad_idx=0):
    """
    Creates the masking tensors for the Encoder and Decoder.
    We assume the token ID for <PAD> is 0.
    """
    # 1. Source Mask: Hide <PAD> tokens in the prompt
    # Shape: (batch_size, 1, 1, src_len)
    # We use unsqueeze to add dimensions so it broadcasts correctly across the attention heads
    src_mask = (src != pad_idx).unsqueeze(1).unsqueeze(2)

    # 2. Target Padding Mask: Hide <PAD> tokens in the target generation
    tgt_pad_mask = (tgt != pad_idx).unsqueeze(1).unsqueeze(2)

    # 3. Target Look-Ahead Mask: Hide future tokens
    tgt_len = tgt.size(1)
    # torch.tril generates a lower triangular matrix of 1s (True) and upper triangle of 0s (False)
    tgt_look_ahead_mask = torch.tril(torch.ones((tgt_len, tgt_len), device=tgt.device)).bool()

    # 4. Combine them: A token is valid if it's NOT a pad AND it's NOT in the future
    tgt_mask = tgt_pad_mask & tgt_look_ahead_mask

    return src_mask, tgt_mask

In [15]:
# 1. Setup dummy data (Let's pretend 0 is our <PAD> token)
pad_idx = 0
batch_size = 2
src_len = 10
tgt_len = 15

# Create dummy source/target tensors with some padding at the end
dummy_src = torch.randint(1, 100, (batch_size, src_len))
dummy_src[:, -2:] = pad_idx # Make the last 2 tokens of the source be padding

dummy_tgt = torch.randint(1, 100, (batch_size, tgt_len))
dummy_tgt[:, -3:] = pad_idx # Make the last 3 tokens of the target be padding

# 2. Generate the masks
src_mask, tgt_mask = create_masks(dummy_src, dummy_tgt, pad_idx)

# 3. Verify
print(f"Source Tensor Shape: {dummy_src.shape}")
print(f"Source Mask Shape: {src_mask.shape} -> [Batch, 1, 1, Src_Len]")
print(f"Target Mask Shape: {tgt_mask.shape} -> [Batch, 1, Tgt_Len, Tgt_Len]")

# Let's visualize the look-ahead mask for the first sequence in the batch!
print("\nTarget Mask (First sequence, First head):")
# It should print a triangle of True/False values
print(tgt_mask[0, 0, :5, :5])

Source Tensor Shape: torch.Size([2, 10])
Source Mask Shape: torch.Size([2, 1, 1, 10]) -> [Batch, 1, 1, Src_Len]
Target Mask Shape: torch.Size([2, 1, 15, 15]) -> [Batch, 1, Tgt_Len, Tgt_Len]

Target Mask (First sequence, First head):
tensor([[ True, False, False, False, False],
        [ True,  True, False, False, False],
        [ True,  True,  True, False, False],
        [ True,  True,  True,  True, False],
        [ True,  True,  True,  True,  True]])


In [16]:
import torch.optim as optim

# 1. Setup the Optimizer and Loss Function
learning_rate = 1e-4
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

# We tell the loss function to completely ignore the <PAD> tokens so they don't skew the math
criterion = nn.CrossEntropyLoss(ignore_index=pad_idx)

# 2. Put the model in training mode (activates Dropout layers)
model.train()

print("Initiating Training Loop...\n")

epochs = 10
for epoch in range(epochs):
    # Step A: Clear out the gradients from the previous step
    optimizer.zero_grad()

    # Step B: Shift the target data for Teacher Forcing
    # decoder_input gets everything EXCEPT the last token
    # labels gets everything EXCEPT the first token
    dec_input = dummy_tgt[:, :-1]
    labels = dummy_tgt[:, 1:]

    # Step C: Generate our masks based on this shifted input
    src_mask, tgt_mask = create_masks(dummy_src, dec_input, pad_idx)

    # Step D: The Forward Pass (Make predictions)
    # Output shape: [Batch, Seq_Len - 1, Vocab_Size]
    predictions = model(src=dummy_src, tgt=dec_input, src_mask=src_mask, tgt_mask=tgt_mask)

    # Step E: Calculate the Loss
    # PyTorch's CrossEntropyLoss requires us to flatten the 3D predictions into 2D,
    # and the 2D labels into 1D
    loss = criterion(
        predictions.contiguous().view(-1, tgt_vocab_size),
        labels.contiguous().view(-1)
    )

    # Step F: The Backward Pass (Calculate the gradients / the "mistakes")
    loss.backward()

    # Step G: Update the model's internal weights
    optimizer.step()

    print(f"Epoch {epoch + 1:02d}/{epochs} | Loss: {loss.item():.4f}")

Initiating Training Loop...

Epoch 01/10 | Loss: 9.7312
Epoch 02/10 | Loss: 7.5653
Epoch 03/10 | Loss: 6.5314
Epoch 04/10 | Loss: 5.9153
Epoch 05/10 | Loss: 5.4009
Epoch 06/10 | Loss: 4.9200
Epoch 07/10 | Loss: 4.5943
Epoch 08/10 | Loss: 4.2634
Epoch 09/10 | Loss: 3.8391
Epoch 10/10 | Loss: 3.4319


In [17]:
!pip install transformers datasets



In [18]:
from transformers import AutoTokenizer
import torch
from torch.utils.data import Dataset, DataLoader

# 1. Load pre-trained tokenizers (We will use BERT tokenizers for this example)
# bert-base-uncased for English, and a standard French tokenizer
tokenizer_en = AutoTokenizer.from_pretrained("bert-base-uncased")
tokenizer_fr = AutoTokenizer.from_pretrained("dbmdz/bert-base-french-europeana-cased")

# 2. Get the special token IDs
# We need to know the exact integer IDs for Padding, Start of Sentence, and End of Sentence
PAD_IDX = tokenizer_en.pad_token_id
SOS_IDX = tokenizer_en.cls_token_id  # BERT uses CLS for the start token
EOS_IDX = tokenizer_en.sep_token_id  # BERT uses SEP for the end token

print(f"English Vocab Size: {tokenizer_en.vocab_size}")
print(f"French Vocab Size: {tokenizer_fr.vocab_size}")
print(f"PAD ID: {PAD_IDX} | SOS ID: {SOS_IDX} | EOS ID: {EOS_IDX}")

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


config.json:   0%|          | 0.00/570 [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/48.0 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/466k [00:00<?, ?B/s]

config.json:   0%|          | 0.00/420 [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/83.0 [00:00<?, ?B/s]

vocab.txt: 0.00B [00:00, ?B/s]

English Vocab Size: 30522
French Vocab Size: 32000
PAD ID: 0 | SOS ID: 101 | EOS ID: 102


In [19]:
class TranslationDataset(Dataset):
    def __init__(self, english_sentences, french_sentences, tokenizer_en, tokenizer_fr):
        self.english_sentences = english_sentences
        self.french_sentences = french_sentences
        self.tokenizer_en = tokenizer_en
        self.tokenizer_fr = tokenizer_fr

    def __len__(self):
        return len(self.english_sentences)

    def __getitem__(self, idx):
        # 1. Grab the raw text strings
        eng_text = self.english_sentences[idx]
        fra_text = self.french_sentences[idx]

        # 2. Convert text strings into lists of integers (Token IDs)
        # add_special_tokens=False because we will manually add SOS and EOS
        eng_tokens = self.tokenizer_en.encode(eng_text, add_special_tokens=False)
        fra_tokens = self.tokenizer_fr.encode(fra_text, add_special_tokens=False)

        # 3. Wrap the sentences in <SOS> and <EOS> tokens
        eng_tensor = torch.tensor([SOS_IDX] + eng_tokens + [EOS_IDX], dtype=torch.long)
        fra_tensor = torch.tensor([SOS_IDX] + fra_tokens + [EOS_IDX], dtype=torch.long)

        return eng_tensor, fra_tensor

# Let's create a tiny dummy dataset to test our pipeline
dummy_english = ["Hello world.", "How are you?", "Machine learning is fascinating.", "I am an engineer."]
dummy_french = ["Bonjour le monde.", "Comment allez-vous?", "L'apprentissage automatique est fascinant.", "Je suis ingénieur."]

# Instantiate the dataset
dataset = TranslationDataset(dummy_english, dummy_french, tokenizer_en, tokenizer_fr)

# Test it on the first sentence
sample_eng, sample_fra = dataset[0]
print(f"Raw English text: '{dummy_english[0]}' -> Token IDs: {sample_eng}")
print(f"Raw French text:  '{dummy_french[0]}' -> Token IDs: {sample_fra}")

Raw English text: 'Hello world.' -> Token IDs: tensor([ 101, 7592, 2088, 1012,  102])
Raw French text:  'Bonjour le monde.' -> Token IDs: tensor([ 101, 3092,  916,  354, 1442,   18,  102])


In [20]:
from torch.nn.utils.rnn import pad_sequence

def collate_fn(batch):
    # batch is a list of tuples: [(eng_tensor1, fra_tensor1), (eng_tensor2, fra_tensor2), ...]
    eng_batch, fra_batch = [], []

    for eng_item, fra_item in batch:
        eng_batch.append(eng_item)
        fra_batch.append(fra_item)

    # pad_sequence automatically adds PAD_IDX to the end of shorter tensors
    # batch_first=True ensures our shape is [Batch, Seq_Len] instead of [Seq_Len, Batch]
    eng_padded = pad_sequence(eng_batch, batch_first=True, padding_value=PAD_IDX)
    fra_padded = pad_sequence(fra_batch, batch_first=True, padding_value=PAD_IDX)

    return eng_padded, fra_padded

# Wrap our dataset in a PyTorch DataLoader
batch_size = 2
dataloader = DataLoader(
    dataset,
    batch_size=batch_size,
    shuffle=True,
    collate_fn=collate_fn
)

# Test the DataLoader by pulling one batch
for eng_batch, fra_batch in dataloader:
    print(f"English Batch Shape: {eng_batch.shape} -> [Batch, Seq_Len]")
    print(f"French Batch Shape:  {fra_batch.shape} -> [Batch, Seq_Len]")
    print("\nPadded English Tensor:")
    print(eng_batch)
    break # We only want to look at the first batch

English Batch Shape: torch.Size([2, 7]) -> [Batch, Seq_Len]
French Batch Shape:  torch.Size([2, 10]) -> [Batch, Seq_Len]

Padded English Tensor:
tensor([[  101,  1045,  2572,  2019,  3992,  1012,   102],
        [  101,  3698,  4083,  2003, 17160,  1012,   102]])


In [21]:
# 1. Re-initialize the model with the REAL vocabulary sizes
src_vocab_size = tokenizer_en.vocab_size
tgt_vocab_size = tokenizer_fr.vocab_size
d_model = 512

# Note: We are using a smaller Transformer here so it trains quickly in Colab
real_model = Transformer(
    src_vocab_size=src_vocab_size,
    tgt_vocab_size=tgt_vocab_size,
    d_model=d_model,
    num_heads=8,
    d_ff=2048,
    num_layers=2 # Reduced to 2 layers for fast testing
)

# Move model to GPU if available (highly recommended!)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
real_model = real_model.to(device)
print(f"Training on device: {device}\n")

# 2. Setup Optimizer and Loss
optimizer = torch.optim.Adam(real_model.parameters(), lr=1e-4)
criterion = nn.CrossEntropyLoss(ignore_index=PAD_IDX)

# 3. The Real Training Loop
real_model.train()
epochs = 20

for epoch in range(epochs):
    epoch_loss = 0

    # We now iterate over our actual dataloader!
    for eng_batch, fra_batch in dataloader:
        # Move data to GPU
        eng_batch = eng_batch.to(device)
        fra_batch = fra_batch.to(device)

        optimizer.zero_grad()

        # Shift targets for Teacher Forcing
        dec_input = fra_batch[:, :-1]
        labels = fra_batch[:, 1:]

        # Create masks
        src_mask, tgt_mask = create_masks(eng_batch, dec_input, PAD_IDX)
        src_mask, tgt_mask = src_mask.to(device), tgt_mask.to(device)

        # Forward pass
        predictions = real_model(src=eng_batch, tgt=dec_input, src_mask=src_mask, tgt_mask=tgt_mask)

        # Calculate loss
        loss = criterion(
            predictions.contiguous().view(-1, tgt_vocab_size),
            labels.contiguous().view(-1)
        )

        # Backward pass
        loss.backward()
        optimizer.step()

        epoch_loss += loss.item()

    # Print average loss for the epoch
    print(f"Epoch {epoch + 1:02d}/{epochs} | Average Loss: {epoch_loss / len(dataloader):.4f}")

Training on device: cpu

Epoch 01/20 | Average Loss: 10.4383
Epoch 02/20 | Average Loss: 8.8226
Epoch 03/20 | Average Loss: 7.8442
Epoch 04/20 | Average Loss: 7.0379
Epoch 05/20 | Average Loss: 6.5801
Epoch 06/20 | Average Loss: 6.1647
Epoch 07/20 | Average Loss: 5.7332
Epoch 08/20 | Average Loss: 5.2911
Epoch 09/20 | Average Loss: 4.9339
Epoch 10/20 | Average Loss: 4.5103
Epoch 11/20 | Average Loss: 3.9567
Epoch 12/20 | Average Loss: 3.6563
Epoch 13/20 | Average Loss: 3.1490
Epoch 14/20 | Average Loss: 2.7675
Epoch 15/20 | Average Loss: 2.5109
Epoch 16/20 | Average Loss: 2.1276
Epoch 17/20 | Average Loss: 1.8161
Epoch 18/20 | Average Loss: 1.5506
Epoch 19/20 | Average Loss: 1.3404
Epoch 20/20 | Average Loss: 1.1168


In [22]:
def translate(model, sentence, tokenizer_en, tokenizer_fr, max_length=50, device=device):
    # 1. Put the model in evaluation mode (turns off dropout layers)
    model.eval()

    # 2. Tokenize the English input and add special tokens
    tokens = tokenizer_en.encode(sentence, add_special_tokens=False)
    src_tensor = torch.tensor([SOS_IDX] + tokens + [EOS_IDX], dtype=torch.long).unsqueeze(0).to(device)

    # 3. Start the French target sequence with just the <SOS> token
    tgt_indices = [SOS_IDX]

    # We don't need to calculate gradients for generation
    with torch.no_grad():
        for _ in range(max_length):
            # Convert our growing list of tokens into a tensor
            tgt_tensor = torch.tensor(tgt_indices, dtype=torch.long).unsqueeze(0).to(device)

            # Generate the anti-cheating masks
            src_mask, tgt_mask = create_masks(src_tensor, tgt_tensor, PAD_IDX)
            src_mask, tgt_mask = src_mask.to(device), tgt_mask.to(device)

            # Forward pass through the Transformer
            # Shape: [Batch (1), Sequence Length, Vocab Size]
            predictions = model(src=src_tensor, tgt=tgt_tensor, src_mask=src_mask, tgt_mask=tgt_mask)

            # 4. Grab the predictions for the VERY LAST token in the sequence
            next_token_logits = predictions[0, -1, :]

            # 5. Greedy Search: Pick the token ID with the highest mathematical probability
            next_token_id = next_token_logits.argmax().item()

            # 6. Append the predicted word to our running sequence
            tgt_indices.append(next_token_id)

            # 7. If the model predicts the End of Sentence token, stop the loop!
            if next_token_id == EOS_IDX:
                break

    # 8. Convert the list of predicted IDs back into human-readable French text
    translated_text = tokenizer_fr.decode(tgt_indices, skip_special_tokens=True)

    return translated_text

In [23]:
# Let's test it on one of the sentences from our training data
test_sentence = "Machine learning is fascinating."

print(f"English: {test_sentence}")
print(f"French Translation: {translate(real_model, test_sentence, tokenizer_en, tokenizer_fr)}")

English: Machine learning is fascinating.
French Translation: © L ' apprentissage automatique est fascinant. ª


In [1]:
from transformers import AutoTokenizer
from datasets import load_dataset
import torch
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence

# 1. Re-initialize the tokenizers so Colab remembers them
print("Loading tokenizers (this is fast)...")
tokenizer_en = AutoTokenizer.from_pretrained("bert-base-uncased")
tokenizer_fr = AutoTokenizer.from_pretrained("dbmdz/bert-base-french-europeana-cased")

# 2. Download and prep the dataset
print("Loading dataset...")
raw_datasets = load_dataset("opus_books", "en-fr")

# Grab the first 10,000 sentences for training
train_dataset = raw_datasets["train"].select(range(10000))
print(f"Loaded {len(train_dataset)} sentence pairs for training.")

# 3. Fix our Tokenizer Quirk: Grab the exact IDs for each language
PAD_IDX = tokenizer_en.pad_token_id # Usually 0 for both

# English Special Tokens
SOS_EN = tokenizer_en.cls_token_id
EOS_EN = tokenizer_en.sep_token_id

# French Special Tokens (This stops the weird symbols!)
SOS_FR = tokenizer_fr.cls_token_id
EOS_FR = tokenizer_fr.sep_token_id

print("\nSpecial Token IDs configured. Memory restored!")

Loading tokenizers (this is fast)...


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


config.json:   0%|          | 0.00/570 [00:00<?, ?B/s]



tokenizer_config.json:   0%|          | 0.00/48.0 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/466k [00:00<?, ?B/s]

config.json:   0%|          | 0.00/420 [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/83.0 [00:00<?, ?B/s]

vocab.txt: 0.00B [00:00, ?B/s]

Loading dataset...


README.md: 0.00B [00:00, ?B/s]

en-fr/train-00000-of-00001.parquet:   0%|          | 0.00/21.0M [00:00<?, ?B/s]

Generating train split:   0%|          | 0/127085 [00:00<?, ? examples/s]

Loaded 10000 sentence pairs for training.

Special Token IDs configured. Memory restored!


In [3]:
class ProductionTranslationDataset(Dataset):
    def __init__(self, hf_dataset, tokenizer_en, tokenizer_fr, max_len=50):
        self.dataset = hf_dataset
        self.tokenizer_en = tokenizer_en
        self.tokenizer_fr = tokenizer_fr
        self.max_len = max_len

    def __len__(self):
        return len(self.dataset)

    def __getitem__(self, idx):
        pair = self.dataset[idx]['translation']
        eng_text = pair['en']
        fra_text = pair['fr']

        # Tokenize and truncate to max_len - 2
        eng_tokens = self.tokenizer_en.encode(eng_text, add_special_tokens=False)[:self.max_len-2]
        fra_tokens = self.tokenizer_fr.encode(fra_text, add_special_tokens=False)[:self.max_len-2]

        eng_tensor = torch.tensor([SOS_EN] + eng_tokens + [EOS_EN], dtype=torch.long)
        fra_tensor = torch.tensor([SOS_FR] + fra_tokens + [EOS_FR], dtype=torch.long)

        return eng_tensor, fra_tensor

def collate_fn(batch):
    eng_batch, fra_batch = zip(*batch)
    eng_padded = pad_sequence(eng_batch, batch_first=True, padding_value=PAD_IDX)
    fra_padded = pad_sequence(fra_batch, batch_first=True, padding_value=PAD_IDX)
    return eng_padded, fra_padded

# Instantiate dataset and dataloader
real_train_data = ProductionTranslationDataset(train_dataset, tokenizer_en, tokenizer_fr)
production_dataloader = DataLoader(real_train_data, batch_size=32, shuffle=True, collate_fn=collate_fn)

print(f"Production DataLoader ready. {len(production_dataloader)} batches per epoch.")

Production DataLoader ready. 313 batches per epoch.


In [5]:
import torch
import torch.nn as nn
import math

class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, num_heads):
        super().__init__()
        self.d_model = d_model
        self.num_heads = num_heads
        self.d_k = d_model // num_heads
        self.W_q = nn.Linear(d_model, d_model)
        self.W_k = nn.Linear(d_model, d_model)
        self.W_v = nn.Linear(d_model, d_model)
        self.W_o = nn.Linear(d_model, d_model)

    def forward(self, Q, K, V, mask=None):
        batch_size = Q.size(0)
        Q = self.W_q(Q).view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2)
        K = self.W_k(K).view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2)
        V = self.W_v(V).view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2)

        scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.d_k)
        if mask is not None:
            scores = scores.masked_fill(mask == 0, -1e9)

        attention_weights = torch.softmax(scores, dim=-1)
        output = torch.matmul(attention_weights, V)
        output = output.transpose(1, 2).contiguous().view(batch_size, -1, self.d_model)
        return self.W_o(output)

class PositionWiseFeedForward(nn.Module):
    def __init__(self, d_model, d_ff):
        super().__init__()
        self.fc1 = nn.Linear(d_model, d_ff)
        self.fc2 = nn.Linear(d_ff, d_model)
        self.relu = nn.ReLU()
    def forward(self, x):
        return self.fc2(self.relu(self.fc1(x)))

class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000, dropout=0.1):
        super().__init__()
        self.dropout = nn.Dropout(p=dropout)
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0)
        self.register_buffer('pe', pe)
    def forward(self, x):
        x = x + self.pe[:, :x.size(1)]
        return self.dropout(x)

class EncoderBlock(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout=0.1):
        super().__init__()
        self.self_attn = MultiHeadAttention(d_model, num_heads)
        self.feed_forward = PositionWiseFeedForward(d_model, d_ff)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)
    def forward(self, x, mask):
        attn_output = self.self_attn(Q=x, K=x, V=x, mask=mask)
        x = self.norm1(x + self.dropout(attn_output))
        ff_output = self.feed_forward(x)
        x = self.norm2(x + self.dropout(ff_output))
        return x

class TransformerEncoder(nn.Module):
    def __init__(self, vocab_size, d_model, num_heads, d_ff, num_layers, max_len=5000, dropout=0.1):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, d_model)
        self.pos_encoder = PositionalEncoding(d_model, max_len, dropout)
        self.layers = nn.ModuleList([EncoderBlock(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)])
        self.norm = nn.LayerNorm(d_model)
    def forward(self, x, mask):
        x = self.embedding(x)
        x = self.pos_encoder(x)
        for layer in self.layers:
            x = layer(x, mask)
        return self.norm(x)

class DecoderBlock(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout=0.1):
        super().__init__()
        self.self_attn = MultiHeadAttention(d_model, num_heads)
        self.cross_attn = MultiHeadAttention(d_model, num_heads)
        self.feed_forward = PositionWiseFeedForward(d_model, d_ff)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.norm3 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)
    def forward(self, x, enc_output, src_mask, tgt_mask):
        attn_output = self.self_attn(Q=x, K=x, V=x, mask=tgt_mask)
        x = self.norm1(x + self.dropout(attn_output))
        attn_output = self.cross_attn(Q=x, K=enc_output, V=enc_output, mask=src_mask)
        x = self.norm2(x + self.dropout(attn_output))
        ff_output = self.feed_forward(x)
        x = self.norm3(x + self.dropout(ff_output))
        return x

class TransformerDecoder(nn.Module):
    def __init__(self, vocab_size, d_model, num_heads, d_ff, num_layers, max_len=5000, dropout=0.1):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, d_model)
        self.pos_encoder = PositionalEncoding(d_model, max_len, dropout)
        self.layers = nn.ModuleList([DecoderBlock(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)])
        self.norm = nn.LayerNorm(d_model)
    def forward(self, x, enc_output, src_mask, tgt_mask):
        x = self.embedding(x)
        x = self.pos_encoder(x)
        for layer in self.layers:
            x = layer(x, enc_output, src_mask, tgt_mask)
        return self.norm(x)

class Transformer(nn.Module):
    def __init__(self, src_vocab_size, tgt_vocab_size, d_model=512, num_heads=8, d_ff=2048, num_layers=6, max_len=5000, dropout=0.1):
        super().__init__()
        self.encoder = TransformerEncoder(src_vocab_size, d_model, num_heads, d_ff, num_layers, max_len, dropout)
        self.decoder = TransformerDecoder(tgt_vocab_size, d_model, num_heads, d_ff, num_layers, max_len, dropout)
        self.fc_out = nn.Linear(d_model, tgt_vocab_size)
    def forward(self, src, tgt, src_mask=None, tgt_mask=None):
        enc_output = self.encoder(x=src, mask=src_mask)
        dec_output = self.decoder(x=tgt, enc_output=enc_output, src_mask=src_mask, tgt_mask=tgt_mask)
        return self.fc_out(dec_output)

def create_masks(src, tgt, pad_idx):
    src_mask = (src != pad_idx).unsqueeze(1).unsqueeze(2)
    tgt_pad_mask = (tgt != pad_idx).unsqueeze(1).unsqueeze(2)
    tgt_len = tgt.size(1)
    tgt_look_ahead_mask = torch.tril(torch.ones((tgt_len, tgt_len), device=tgt.device)).bool()
    tgt_mask = tgt_pad_mask & tgt_look_ahead_mask
    return src_mask, tgt_mask

print("Architecture loaded into memory successfully!")

Architecture loaded into memory successfully!


In [8]:
import torch.nn as nn
from tqdm import tqdm

# 1. Setup the Device and Instantiate the Model
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Building model on: {device}...")

production_model = Transformer(
    src_vocab_size=tokenizer_en.vocab_size,
    tgt_vocab_size=tokenizer_fr.vocab_size,
    d_model=256,
    num_heads=8,
    d_ff=1024,
    num_layers=3
).to(device)

# 2. Fast 1-Epoch Training Loop
optimizer = torch.optim.Adam(production_model.parameters(), lr=3e-4)
criterion = nn.CrossEntropyLoss(ignore_index=PAD_IDX)

print("Running a quick training pass...")
production_model.train()
loop = tqdm(production_dataloader, leave=True)

for eng_batch, fra_batch in loop:
    eng_batch, fra_batch = eng_batch.to(device), fra_batch.to(device)
    optimizer.zero_grad()

    dec_input = fra_batch[:, :-1]
    labels = fra_batch[:, 1:]

    src_mask, tgt_mask = create_masks(eng_batch, dec_input, PAD_IDX)
    src_mask, tgt_mask = src_mask.to(device), tgt_mask.to(device)

    predictions = production_model(src=eng_batch, tgt=dec_input, src_mask=src_mask, tgt_mask=tgt_mask)
    loss = criterion(predictions.contiguous().view(-1, tokenizer_fr.vocab_size), labels.contiguous().view(-1))

    loss.backward()
    optimizer.step()
    loop.set_description("Training")
    loop.set_postfix(loss=loss.item())

# 3. The Grand Finale: Translation!
print("\nTraining complete! Testing translation...")
test_sentence = "I am a student."
print(f"English: {test_sentence}")
print(f"French Translation: {translate_production(production_model, test_sentence, tokenizer_en, tokenizer_fr)}")

Building model on: cuda...
Running a quick training pass...


Training: 100%|██████████| 313/313 [00:24<00:00, 13.00it/s, loss=5.08]



Training complete! Testing translation...
English: I am a student.
French Translation: Je ne pas.
