# Transformer From Scratch

Implement the [Attention Is All You Need](https://arxiv.org/abs/1706.03762) paper from scratch using PyTorch, focusing on building a Seq2Seq Transformer architecture for translating text from English to Italian.

In [None]:
!pip install -r requirements.txt

In [None]:
import math
from dataclasses import dataclass, asdict
from time import time

import torch
import torch.nn as nn
from torch.optim import Adam
from torch.utils.data import DataLoader
from datasets import load_dataset
from transformers import AutoTokenizer
from sklearn.metrics import f1_score
import pandas as pd

### Embedding
Mapping input token indices to vectors, transforming discrete tokens into continuous numerical representations.

* **Initialization**
  * `vocab_size`: the number of unique tokens in the vocabulary.
  * `d_model`: the embedding dimension.

* **Forward**:
  * `sequence`: input token indices.
  * `returns`: embedding vector corresponding to the input tokens.


In [None]:
class Embedding(nn.Module):
  def __init__(self, vocab_size, d_model):
    super().__init__()

    self.embedding = nn.Embedding(vocab_size, d_model)

  def forward(self, sequence):
    return self.embedding(sequence)

### Positional Encoding
Injects positional information into the input embeddings to capture the order of tokens in the sequence.

* **Initialization**:
  * `max_sequence_length`: the maximum length of the input sequences.
  * `d_model`: the embedding dimension.
  
* **Forward**:
  * `embedding`: the input embedded tokens.
  * `returns`: the sum of the input embeddings & the positional encodings.

In [None]:
class PositionalEncoding(nn.Module):
  def __init__(self,max_sequence_length, d_model):
    super().__init__()

    pe = torch.zeros(max_sequence_length, d_model)
    position = torch.arange(0, max_sequence_length, dtype=torch.float).unsqueeze(1)

    _2i = torch.arange(0, d_model, 2, dtype=torch.float)
    scaler = torch.exp(-_2i * (math.log(10000) / d_model))

    pe[:, 0::2] = torch.sin(position * scaler)
    pe[:, 1::2] = torch.cos(position * scaler)

    self.register_buffer("pe", pe.unsqueeze(0))


  def forward(self, embedding):
      return embedding + self.pe[:, :embedding.size(1)]

### Multi Head Self Attention


Implements the multi-head attention mechanism, allowing the model to focus on different parts of the input sequence in parallel.

* **Initialization**:
  * `d_model`: the input & output vectors dimensionality.
  * `h`: the number of attention heads.

* **Methods**:
  * `split()`: splits the input matrix into multiple heads.
  * `attention()`: computes the scaled dot-product attention.
  * `concatenate()`: concatenates the attention heads.
    
* **Forward**:
    * `Q`: the queries matrix.
    * `K`: the keys matrix.
    * `V`: the values matrix.
    * `mask`: the optional attention mask.
    * `returns`: output matrix after applying multi-head attention & linear projection.


In [None]:
class MultiHeadAttention(nn.Module):
  def __init__(self, d_model, h) -> None:
    super().__init__()

    assert d_model % h == 0, "d_model must be divisible by h"

    self.d_model = d_model
    self.h = h
    self.d_k = d_model // h

    self.W_q = nn.Linear(d_model, d_model)
    self.W_k = nn.Linear(d_model, d_model)
    self.W_v = nn.Linear(d_model, d_model)
    self.W_o = nn.Linear(d_model, d_model)


  def split(self, M):
    batch_size, sequence_length, d_model = M.size()

    splitted_heads = M.view(batch_size, sequence_length, self.h, self.d_k).transpose(1, 2)

    return splitted_heads


  def attention(self, Q, K, V, mask=None):
    scaled_attention = torch.matmul(input=Q, other=K.transpose(-1, -2))

    scaled_attention_score = scaled_attention / math.sqrt(self.d_k)

    if mask is not None:
      scaled_attention_score.masked_fill(mask == 0, float("-inf"))

    attention_weights = torch.softmax(input=scaled_attention_score, dim=-1)

    weighted_sum = torch.matmul(input=attention_weights, other=V)

    return weighted_sum


  @staticmethod
  def concatenate(M):
    batch_size, h, sequence_length, d_k = M.size()

    concatenated_heads = M.transpose(1, 2).contiguous().view(batch_size, sequence_length, h * d_k)

    return concatenated_heads


  def forward(self, Q, K, V, mask=None):
    Q = self.W_q(Q)
    K = self.W_k(K)
    V = self.W_v(V)

    Q = self.split(Q)
    K = self.split(K)
    V = self.split(V)

    weighted_sum = self.attention(Q, K, V, mask)

    concatenated_heads = self.concatenate(weighted_sum)

    output = self.W_o(concatenated_heads)

    return output

### Position Wise Feed Forward Network

Implements a feed-forward network applied to each position separately.

* **Initialization**:
  * `d_model`: the input & output vectors dimensions.
  * `d_ff`: the feed-forward layer's dimension.

* **Forward**:
  * `x`: the input matrix.
  * `returns`: output matrix with the same shape as input after applying the feed-forward network.


In [None]:
class PositionWiseFeedForward(nn.Module):
  def __init__(self, d_model, d_ff):
    super().__init__()

    self.layer1 = nn.Linear(d_model, d_ff)
    self.layer2 = nn.Linear(d_ff, d_model)

    self.activation = nn.ReLU()


  def forward(self, x):
    x = self.layer1(x)
    x = self.activation(x)
    x = self.layer2(x)

    return x

## Encoder

Gathering the encoder component, which processes input sequences through multi-head self-attention & position-wise feed-forward networks.

* **Initialization**:
  * `d_model`: the input & output vectors dimensions.
  * `d_ff`:the feed-forward layer's dimension.
  * `h`: the number of attention heads.
  * `dropout`: the dropout rate for regularization.

* **Components**:
  * `self_attention`: the multi-head self-attention sub-layer.
  * `feed_forward`: the position-wise feed forward sub-layer.
  * `norm1`: a normalization layer applied after the self-attention sub-layer.
  * `norm2`: a normalization layer applied after the feed-forward sub-layer.
  * `dropout`: the dropout layer applied to the outputs of each sub-layer.

* **Forward**:
  * `x`: the input matrix.
  * `mask`: the optional attention mask.
  * `returns`: output matrix with the same shape as the input, after applying self-attention, feed-forward, dropout, and normalization.


In [None]:
class Encoder(nn.Module):
  def __init__(self, d_model, d_ff, h, dropout):
    super().__init__()

    self.self_attention = MultiHeadAttention(d_model, h)
    self.feed_forward = PositionWiseFeedForward(d_model, d_ff)

    self.norm1 = nn.LayerNorm(d_model)
    self.norm2 = nn.LayerNorm(d_model)
    self.dropout = nn.Dropout(dropout)


  def forward(self, x, mask):
    self_attention_output = self.dropout(self.self_attention(x, x, x, mask))
    x = self.norm1(x + self_attention_output)

    feed_forward_output = self.dropout(self.feed_forward(x))
    x = self.norm2(x + feed_forward_output)

    return x

## Decoder

Gathering the decoder component, which processes the target sequences through multi-head self-attention & cross-attention with the encoder output, & position-wise feed-forward networks.

* **Initialization**:
  * `d_model`: the input & output vectors dimensions.
  * `d_ff`: the feed-forward layer's dimension.
  * `h`: the number of attention heads.
  * `dropout`: the dropout rate for regularization.

* **Components**:
  * `self_attention`: the multi-head self-attention sub-layer.
  * `cross_attention`: the multi-head attention sub-layer that attends to the encoder's output.
  * `feed_forward`: the position-wise feed forward sub-layer.
  * `norm1`: a normalization layer applied after the self-attention sub-layer.
  * `norm2`: a normalization layer applied after the cross-attention sub-layer.
  * `norm3`: a normalization layer applied after the feed-forward sub-layer.
  * `dropout`: the dropout layer applied to the outputs of each sub-layer.

* **Forward**:
  * `x`: the input matrix (decoder input).
  * `encoder_output`: the output matrix from the encoder.
  * `encoder_mask`: the attention mask for the encoder's output.
  * `decoder_mask`: the attention mask for the decoder's self-attention.
  * `returns`: output matrix with the same shape as the input, after applying self-attention, cross-attention, feed-forward, dropout, and normalization.


In [None]:
class Decoder(nn.Module):
  def __init__(self, d_model, d_ff, h, dropout):
    super().__init__()

    self.self_attention = MultiHeadAttention(d_model, h)
    self.cross_attention = MultiHeadAttention(d_model, h)
    self.feed_forward = PositionWiseFeedForward(d_model, d_ff)

    self.norm1 = nn.LayerNorm(d_model)
    self.norm2 = nn.LayerNorm(d_model)
    self.norm3 = nn.LayerNorm(d_model)
    self.dropout = nn.Dropout(dropout)


  def forward(self, x, encoder_output, encoder_mask, decoder_mask):
    self_attention_output = self.dropout(self.self_attention(x, x, x, decoder_mask))
    x = self.norm1(x + self_attention_output)

    cross_attention_output = self.dropout(self.self_attention(x, encoder_output, encoder_output, encoder_mask))
    x = self.norm2(x + cross_attention_output)

    feed_forward_output = self.dropout(self.feed_forward(x))
    x = self.norm3(x + feed_forward_output)

    return x

## Transformer

Implements the transformer model, integrating encoder and decoder components with embeddings, positional encodings, and masking.

* **Initialization**:
  * `source_pad_idx`: padding token index for source sequences.
  * `target_pad_idx`: padding token index for target sequences.
  * `source_vocab_size`: vocabulary size for source language.
  * `target_vocab_size`: vocabulary size for target language.
  * `h`: the number of attention heads.
  * `n`: the number of encoder and decoder layers.
  * `d_model`: the input & output vectors dimensions.
  * `d_ff`: the feed-forward layer's dimension.
  * `max_sequence_length`: the maximum sequence length of the input sequences for positional encoding.
  * `dropout`: the dropout rate for regularization.

* **Components**:
  * `encoder_embedding`: embedding layer for source sequences.
  * `decoder_embedding`: embedding layer for target sequences.
  * `positional_encoding`: adds positional information to embeddings.
  * `encoders`: stack of encoder layers.
  * `decoders`: stack of decoder layers.
  * `pre_softmax`: linear projection layer that produces logits.
  * `dropout`: the dropout layer applied to embeddings.

* **Methods**:
  * `masking()`: generates source and target masks for attention.
    
* **Forward**:
  * `source_sequence`: the source input sequence.
  * `target_sequence`: the target input sequence.
  * `returns`: logits.


In [None]:
class Transformer(nn.Module):
  def __init__(self, source_pad_idx, target_pad_idx, source_vocab_size, target_vocab_size, h, n, d_model, d_ff, max_sequence_length, dropout):
    super().__init__()

    self.source_pad_idx = source_pad_idx
    self.target_pad_idx = target_pad_idx

    self.encoder_embedding = Embedding(source_vocab_size, d_model)
    self.decoder_embedding = Embedding(target_vocab_size, d_model)

    self.positional_encoding = PositionalEncoding(max_sequence_length, d_model)

    self.encoders = nn.ModuleList([Encoder(d_model, d_ff, h, dropout) for _ in range(n)])
    self.decoders = nn.ModuleList([Decoder(d_model, d_ff, h, dropout) for _ in range(n)])

    self.pre_softmax = nn.Linear(d_model, target_vocab_size)
    self.dropout = nn.Dropout(dropout)


  def masking(self, source_sequence, target_sequence):
    device = source_sequence.device

    source_mask = (source_sequence != self.source_pad_idx).unsqueeze(1).unsqueeze(2).to(device)
    target_mask = (target_sequence != self.target_pad_idx).unsqueeze(1).unsqueeze(3).to(device)

    sequence_length = target_sequence.size(1)
    ones = torch.ones(sequence_length, sequence_length, device=device)

    future_mask = torch.tril(ones, diagonal=0).bool()
    target_mask = target_mask & future_mask

    return source_mask, target_mask


  def forward(self, source_sequence, target_sequence):
    encoder_embedded = self.dropout(self.positional_encoding(self.encoder_embedding(source_sequence)))
    decoder_embedded = self.dropout(self.positional_encoding(self.decoder_embedding(target_sequence)))

    encoder_mask, decoder_mask = self.masking(source_sequence, target_sequence)

    encoder_output, decoder_output = encoder_embedded, decoder_embedded

    for encoder in self.encoders:
       encoder_output = encoder(encoder_output, encoder_mask)

    for decoder in self.decoders:
       decoder_output = decoder(decoder_output, encoder_output, encoder_mask, decoder_mask)

    logits = self.pre_softmax(decoder_output)

    return logits

## Training


### Loading the dataset

Loads the [europarl_en-it](https://huggingface.co/datasets/g8a9/europarl_en-it) dataset from Hugging Face, specifying that it should be divided into training and validation sets.


* training set: 100,000 sentences.
* validation set: 20,000 sentences.

In [None]:
dataset = load_dataset("g8a9/europarl_en-it", split=["train", "validation"])

train = dataset[0].take(100000)
validation = dataset[1].take(20000)

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


Downloading readme:   0%|          | 0.00/1.18k [00:00<?, ?B/s]

Downloading data:   0%|          | 0.00/577M [00:00<?, ?B/s]

Downloading data:   0%|          | 0.00/64.0M [00:00<?, ?B/s]

Downloading data:   0%|          | 0.00/335k [00:00<?, ?B/s]

Generating train split:   0%|          | 0/1709808 [00:00<?, ? examples/s]

Generating validation split:   0%|          | 0/190069 [00:00<?, ? examples/s]

Generating test split:   0%|          | 0/996 [00:00<?, ? examples/s]

### Load the tokenizers for both English & Italian

Load and initialize tokenizers for `English-to-Italian` and `Italian-to-English` translation.

* **English-to-Italian Tokenizer**: used to tokenize english sentences.
  * Checkpoint: `"Helsinki-NLP/opus-mt-en-it"`


* **Italian-to-English Tokenizer**: used to tokenize italian sentences.
  * Checkpoint: `"Helsinki-NLP/opus-mt-it-en"`
  


In [None]:
english_checkpoint = "Helsinki-NLP/opus-mt-en-it"
english_tokenizer = AutoTokenizer.from_pretrained(english_checkpoint)

italian_checkpoint = "Helsinki-NLP/opus-mt-it-en"
italian_tokenizer = AutoTokenizer.from_pretrained(italian_checkpoint)

tokenizer_config.json:   0%|          | 0.00/42.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/1.38k [00:00<?, ?B/s]

source.spm:   0%|          | 0.00/789k [00:00<?, ?B/s]

target.spm:   0%|          | 0.00/814k [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/2.35M [00:00<?, ?B/s]



tokenizer_config.json:   0%|          | 0.00/42.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/1.38k [00:00<?, ?B/s]

source.spm:   0%|          | 0.00/814k [00:00<?, ?B/s]

target.spm:   0%|          | 0.00/790k [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/2.37M [00:00<?, ?B/s]

### Prepare the data loaders

* `collate_fn`: prepares batches of tokenized english and italian sentences for training by applying tokenization, truncation, and padding.

* `train_loader`: provides batches of tokenized sentences for training the model, with shuffling enabled to ensure varied mini-batches.

* `validation_loader`: provides batches of tokenized sentences for evaluating the model’s performance during validation.


In [None]:
def collate_fn(batch):
    english_sentences = [sample["sent_en"] for sample in batch]
    italian_sentences = [sample["sent_it"] for sample in batch]

    english_encoding = english_tokenizer.batch_encode_plus(english_sentences, truncation=True, padding="max_length", max_length=100, return_tensors="pt")
    italian_encoding = italian_tokenizer.batch_encode_plus(italian_sentences, truncation=True, padding="max_length", max_length=100, return_tensors="pt")

    english_batch = english_encoding["input_ids"]
    italian_batch = italian_encoding["input_ids"]

    return english_batch, italian_batch


train_loader = DataLoader(dataset=train, batch_size=64, collate_fn=collate_fn, shuffle=True)
validation_loader = DataLoader(dataset=validation, batch_size=64, collate_fn=collate_fn)

### Model configures and training hyperparameters

* `Config`: defines model hyperparameters and configuration details:
  * `source_pad_idx`: padding token index for source sequences.
  * `target_pad_idx`: padding token index for target sequences.
  * `source_vocab_size`: vocabulary size for source language.
  * `target_vocab_size`: vocabulary size for target language.
  * `h`: the number of attention heads.
  * `n`: the number of encoder and decoder layers.
  * `d_model`: the input & output vectors dimensions.
  * `d_ff`: the feed-forward layer's dimension.
  * `max_sequence_length`: the maximum sequence length of the input sequences for positional encoding.
  * `dropout`: the dropout rate for regularization.

* `device`: sets up the computation device, selecting GPU if available or falling back to CPU.

* `transformer`: initializes the Transformer model.

* `loss_function`: cross-entropy loss function.
* `optimizer`: Adam optimizer.



In [26]:
@dataclass(frozen=True)
class Config:
    source_pad_idx: int
    target_pad_idx: int
    source_vocab_size: int
    target_vocab_size: int
    h: int
    n: int
    d_model: int
    d_ff: int
    max_sequence_length: int
    dropout: float


source_pad_idx = english_tokenizer.pad_token_id
target_pad_idx = italian_tokenizer.pad_token_id

source_vocab_size = english_tokenizer.vocab_size
target_vocab_size = italian_tokenizer.vocab_size

cfg = Config(source_pad_idx, target_pad_idx, source_vocab_size, target_vocab_size, 8, 6, 512, 2048, 100, 0.1)
cfg = asdict(cfg)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
transformer = Transformer(**cfg).to(device)

loss_function = nn.CrossEntropyLoss(ignore_index=italian_tokenizer.pad_token_id)
optimizer = Adam(transformer.parameters(), lr=0.0001, betas=(0.9, 0.98), eps=1e-9)

transformer

Transformer(
  (encoder_embedding): Embedding(
    (embedding): Embedding(80035, 512)
  )
  (decoder_embedding): Embedding(
    (embedding): Embedding(80379, 512)
  )
  (positional_encoding): PositionalEncoding()
  (encoders): ModuleList(
    (0-5): 6 x Encoder(
      (self_attention): MultiHeadAttention(
        (W_q): Linear(in_features=512, out_features=512, bias=True)
        (W_k): Linear(in_features=512, out_features=512, bias=True)
        (W_v): Linear(in_features=512, out_features=512, bias=True)
        (W_o): Linear(in_features=512, out_features=512, bias=True)
      )
      (feed_forward): PositionWiseFeedForward(
        (layer1): Linear(in_features=512, out_features=2048, bias=True)
        (layer2): Linear(in_features=2048, out_features=512, bias=True)
        (activation): ReLU()
      )
      (norm1): LayerNorm((512,), eps=1e-05, elementwise_affine=True)
      (norm2): LayerNorm((512,), eps=1e-05, elementwise_affine=True)
      (dropout): Dropout(p=0.1, inplace=False)


### Trainer

Trains the Transformer model on the provided data loader (train loader) by computing predictions, calculating the loss, and updating model parameters. It aggregates the loss and performance metrics (F1 score) over all batches and returns the average loss and F1 score.

In [27]:
def train(transformer, data_loader, loss_function, optimizer, device):
    transformer.train()

    total_loss = 0
    num_batches = len(data_loader)

    all_predictions = []
    all_targets = []

    for source_batch, target_batch in data_loader:
        source_batch, target_batch = source_batch.to(device), target_batch.to(device)

        target_input = target_batch[:, :-1]
        target_output = target_batch[:, 1:]

        optimizer.zero_grad()

        output = transformer(source_batch, target_input)
        loss = loss_function(output.view(-1, output.size(-1)), target_output.contiguous().view(-1))

        loss.backward()
        optimizer.step()

        total_loss += loss.item()

        predictions = output.argmax(dim=-1).cpu().numpy()
        targets = target_output.cpu().numpy()

        all_predictions.extend(predictions.flatten())
        all_targets.extend(targets.flatten())

    f1 = f1_score(all_targets, all_predictions, average="weighted")
    average_loss = total_loss / num_batches

    return average_loss, f1

### Evaluator
Evaluates the Transformer's performanceon a the validation dataset without updating weights. It calculates the average loss and F1 score based on the model’s predictions compared to the true values.

In [28]:
def evaluate(transformer, data_loader, loss_function, device):
    transformer.eval()

    total_loss = 0
    num_batches = len(data_loader)

    all_predictions = []
    all_targets = []

    with torch.no_grad():
        for source_batch, target_batch in data_loader:
            source_batch, target_batch = source_batch.to(device), target_batch.to(device)

            target_input = target_batch[:, :-1]
            target_output = target_batch[:, 1:]

            output = transformer(source_batch, target_input)
            loss = loss_function(output.view(-1, output.size(-1)), target_output.contiguous().view(-1))

            total_loss += loss.item()

            predictions = output.argmax(dim=-1).cpu().numpy()
            targets = target_output.cpu().numpy()

            all_predictions.extend(predictions.flatten())
            all_targets.extend(targets.flatten())

    f1 = f1_score(all_targets, all_predictions, average="weighted")
    average_loss = total_loss / num_batches

    return average_loss, f1

### Training and evaluation loops

iterates over a `5` epochs to train and evaluate the model

- **Training**: train the model on the training set using the `train` function.
- **Evaluation**: evaluate the model on the validation set using the `evaluate` function.


In [16]:
epochs = 5
results = []

start = time()

for epoch in range(epochs):
    train_loss, f1_train = train(transformer, train_loader, loss_function, optimizer, device)

    validation_loss, f1_validation = evaluate(transformer, validation_loader, loss_function, device)

    results.append({"Epochs": epoch + 1, "Loss(train)": train_loss, "Loss(validation)": validation_loss, "F1(train)": f1_train, "F1(validation)": f1_validation})

    print(f"Completed: {(((epoch + 1) / epochs) * 100):.2f}%")

end = time()

Completed: 20.00%
Completed: 40.00%
Completed: 60.00%
Completed: 80.00%
Completed: 100.00%


### Results

After completing the training and evaluation process, the results are summarized in a dataFrame and include the following:

- Total epochs number.
- Train loss.
- Validation loss.
- Train F1 score.
- Validation F1 score.
- Total training time taken in minutes.
- The device used for training.

In [30]:
results_df = pd.DataFrame(results).iloc[-1:]

training_time = round((end - start) / 60)
device_used = "1 x T4 GPU"

results_df.insert(5, "Time(minutes)", training_time)
results_df.insert(6, "Device", device_used)

results_df.style.hide(axis="index")

Epochs,Loss(train),Loss(validation),F1(train),F1(validation),Time(minutes),Device
5,0.116378,0.091345,0.262342,0.273207,170,1 x T4 GPU
