In [1]:
import torch
import torch.nn as nn
import math as mt

https://medium.com/@bavalpreetsinghh/transformer-from-scratch-using-pytorch-28a5d1b2e033

<p align="center">
  <img src="images/transformer.webp" alt="Transformer" width="500">
</p>

# Creating the fundamental blocks

## Input Embedding

It allows to convert the original sentence into a vector of X dimensions (the original Transformer model uses 512 as a size of dimension d_model)

<p align="center">
  <img src="images/embedding.webp" alt="Embedding" width="500">
</p>

Purpose of the `__init__()` method
- initialize the state of an object (i.e. set up initial values)
- define the layers and components that the neural network will use
- ensure that any necessary setup or initialization code is executed when an object is created

`super()` function is used to call a method from the parent class, in this case, calls the `__init__()` method of the nn.Module class

In [11]:
class InputEmbeddings(nn.Module):

    def __init__(self, d_model: int, vocab_size: int) -> None:
        super().__init__()
        self.d_model = d_model
        self.vocab_size = vocab_size
        self.embedding = nn.Embedding(vocab_size, d_model)

    def forward(self, x):
        # (batch, seq_len) --> (batch, seq_len, d_model)
        # Multiply by sqrt(d_model) to scale the embeddings acording to the paper
        return self.embedding(x) * mt.sqrt(self.d_model)

- `nn.Embedding(vocab_size, d_model)`: this creates an embedding layer that maps indices to a d_model-dimensional vector. The embedding layer is initialied randomly and these vectors are learned during training.
- `self.embedding(x)`: Here, x is a tensor of token indices. The mebedding layer looks up to the vector for each token index in x.
- `math.sqrt(self.d_model)`: done to maintain variance of the embeddings, helping with training stability.

## Positional Encoding

Helps the model understand the position of each word in a sentence, since transformers do not inherently process tokens in a sequential manner like RNNs.

In [None]:
class PositionalEncoding(nn.Module):

    def __init__(self, d_model: int, seq_len: int, dropout:float) -> None:
        super().__init__()
        self.d_model = d_model
        self.seq_len = seq_len
        self.dropout = nn.Dropout(dropout)
        # Create a matrix of shape (seq_len, d_model)
        pe = torch.zeros(seq_len, d_model)
        # Create a vector of shape (seq_len)
        position = torch.arange(0, seq_len, dtype=torch.float).unsqueeze(1)
        # Create a vector of shape (d_model)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-mt.log(10000.0) / d_model))
        # Apply sin to even indices
        pe[:, 0::2] = torch.sin(position * div_term)
        # Apply cos to odd indices
        pe[:, 1::2] = torch.cos(position * div_term)
        # Add a batch dimension to the positional encoding
        pe = pe.unsqueeze(0)
        # Register the positional encoding as a buffer
        self.register_buffer('pe', pe)

    def forward(self, x):
        x = x + (self.pe[:, :x.shape[1], :]).requires_grad_(False)
        return self.dropout(x)


- `pe = pe.unsqueeze(0)`: because there will be batch of sentences, we need to add the batch dimension to the tensor (seq_len, d_model) -> (1, seq_len, d_model).
- `self.register_buffer('pe', pe)`: we store pe without making it a learnable parameter.

`forward` method:
- we add positional encodings to the input embeddings x and applies dropout
- we tell the model not to learn pos encodings as they are fixed, using `required_grad_(False)` 

## Projection Layer

Used to convert the high-dimensional vectors (output of the decoder) into logits over the vocabulary (typically the last layer in the decoder)

In [18]:
class ProjectionLayer(nn.Module):

    def __init__(self, d_model: int, vocab_size: int) -> None:
        super().__init__()
        self.proj = nn.Linear(d_model, vocab_size)

    def forward(self, x):
        # (batch_size, seq_len, d_model) --> (batch_size, seq_len, vocab_size)
        return self.proj(x)

`self.proj`: an instance of `nn.Linear` that maps from d_model dimensions to vocab_size dimensions. This tensor can then be used to compute the probability distribution over the vocabulary

## Layer Normalization

Technique used to improve the training of NNs by normalizing inputs across the features for each training example.
(The dimensions of the word embeddings (e.g. 512 dimensions) are referred to as "features").

<p align="center">
  <img src="images/LayerNorm.webp" alt="Embedding" width="500">
</p>

Both Gamma & Beta are learnable parameters.

In [None]:
class LayerNormalization(nn.Module):

    def __init__(self, features: int, eps: float=1e-5) -> None:
        super().__init__()
        self.eps = eps
        self.gamma = nn.Parameter(torch.ones(features))
        self.beta = nn.Parameter(torch.zeros(features))


    def forward(self, x):
        # x : (batch, seq_len, hidden_size)
        mean = x.mean(dim = -1, keepdim = True)
        std = x.std(dim = -1, keepdim = True)
        return self.gamma * (x - mean) / (std + self.eps) + self.beta

- Initialize with a small epsilon value eps to prevent division by zero
- `nn.Parameter` makes gamma & beta learnable parameters

## Residual Connection

Used to help with the training of deep neural networks by allowing gradients to flow more easily through the network. See this paper for more details: [Deep Residual Learning for Image Recognition](https://arxiv.org/abs/1512.03385 "He et al., 2015")

In [None]:
class ResidualConnection(nn.Module):

    def __init__(self, features: int, dropout: float):
        super().__init__()
        self.dropout = nn.Dropout(dropout)
        self.norm = LayerNormalization(features)
    
    def forward(self, x, sublayer):
        return x + self.dropout(sublayer(self.norm(x)))

`ResidualConnection` together with `LayerNormalization` form the `Add & Norm` layer.

## Feed Forward

This helps in adding non-linearity to the model, allowing it to learn more complex patterns.

In [15]:
class FeedForwardBlock(nn.Module):

    def __init__(self, d_model: int, d_ff: int, dropout: float) -> None:
        super().__init__()
        self.linear_1 = nn.Linear(d_model, d_ff)
        self.dropout = nn.Dropout(dropout)
        self.linear_2 = nn.Linear(d_ff, d_model)

    def forward(self, x):
        # (batch_size, seq_len,d_model) --> (batcg_size, seq_len, d_ff) --> (batch_size, seq_len, d_model)
        return self.linear_2(self.dropout(torch.relu(self.linear_1(x))))

- We initialize d_model, d_ff (dimension of the feed froward layer), and dropout
- We define two linear models: `linear_1` (W1, b1) and `linear_2` (W2, b2) and the ReLU to introduce non-linearities into the model
- The dropout is used to prevent overfitting by randomly setting a fraction of the input units to zero during training

## Attention mechanism

### 1. Self-Attention

<p align="center">
  <img src="images/attention.webp" alt="Embedding" width="500">
</p>

### 2. Multi-Head Attention

<p align="center">
  <img src="images/multi-head.webp" alt="Embedding" width="700">
</p>

In [None]:
class MultiHeadAttentionBlock(nn.Module):

    def __init__(self, d_model: int, h: int, dropout: float) -> None:
        super().__init__()
        self.d_model = d_model
        self.h = h
        assert d_model % h == 0, "d_model is not divisible by h" # To make sure d_model is divisible by the number of heads

        self.d_k = d_model // h # Dimension of vector seen by each head
        self.w_q = nn.Linear(d_model, d_model, bias=False)
        self.w_k = nn.Linear(d_model, d_model, bias=False)
        self.w_v = nn.Linear(d_model, d_model, bias=False)
        self.w_o = nn.Linear(d_model, d_model, bias=False)
        self.dropout = nn.Dropout(dropout)

    @staticmethod
    def attention(query, key, value, mask, dropout: nn.Dropout):
        d_k = query.shape[-1]
        # Applying the formula from the paper
        attention_scores = (query @ key.transpose(-2, -1) / mt.sqrt(d_k))
        if mask is not None:
            # Write a very low value (indicating -inf) to the positions where mask == 0
            attention_scores.masked_fill_(mask == 0, -1e9)
        attention_scores = attention_scores.softmax(dim=-1)
        if dropout is not None:
            attention_scores = dropout(attention_scores)
        return (attention_scores @ value), attention_scores
    
    def forward(self, q, k, v, mask):
        query = self.w_q(q)
        key = self.w_k(k)
        value = self.w_v(v)

        # (batch, seq_len, d_model) --> (batch, seq_len, h, d_k) --> (batch, h, seq_len, d_k)
        query = query.view(query.shape[0], query.shape[1], self.h, self.d_k).transpose(1, 2)
        key = key.view(query.shape[0], key.shape[1], self.h, self.d_k).transpose(1, 2)
        value = value.view(value.shape[0], value.shape[1], self.h, self.d_k).transpose(1, 2)

        # Calculate attention
        x, self.attention_scores = MultiHeadAttentionBlock.attention(query, key, value, mask, self.dropout)

        # Combine all the head together
        x = x.transpose(1, 2).contiguous().view(x.shape[0], -1, self.h * self.d_k)

        # Multiply by Wo
        return self.w_o(x)

`__init__`method:
- Initialize $d_{model}$, $h$ (number of attention heads) and $dropout$ rate
- Ensure $d_{model}$ is divisible by $h$
- Defines linear layers for the query $W_q$, key $W_k$, value $W_v$, and output $W_o$

`attention` static method:
- Computes the attention formula
- Applies mask / dropout if provided

`forward` method:
- We apply linear transformations to the input tensors q, k and v using learnable weight matrices $W_q$, $W_k$ and $W_v$
- We reshape the tensors to have a separate dimension for the number of heads, the dimension becomes (batch, seq_len, h, d_k)

### What is a Mask ?

- **Padding Mask**: is used to ensure that padding tokens in the input sequence do not influence the attention mechanism (when batching sequences of different lengths we need all examples in a batch to have the same length, so we pad the shorter ones with a special token — usually called `<PAD>` or token ID `0`). Used in both Encoded & Decoder blocks.

- **Look-Ahead Mask (Causal Mask)**: is used to ensure that during training and inference, each position in the output sequence can only attend the positions before it and the current positions, but not any future posiitions. Used in Decoder block.

# Encoder Block

Contains one multi-head attention, two Add and Norm & one feed forward layer

In [None]:
class EncoderBlock(nn.Module):

    def __init__(self, self_attention_block: MultiHeadAttentionBlock, feed_forward_block: FeedForwardBlock, features:int, dropout: float) -> None:
        super().__init__()
        self.self_attention_block = self_attention_block
        self.feed_forward_block = feed_forward_block
        self.residual_connections = nn.ModuleList([ResidualConnection(features, dropout) for _ in range(2)])

    def forward(self, x, src_mask):
        x = self.residual_connections[0](x, lambda x: self.self_attention_block(x, x, x, src_mask))
        x = self.residual_connections[1](x, self.feed_forward_block)
        return x

- Here, the `self_attention_block` takes the input (x, x, x) as the (query, key, value), which is why it's called 'self-attention'
- Why `lambda x: self.self_attention_block(x, x, x, src_mask)`?
  - The `ResidualConnection` expects a **function** as its second argument (`sublayer`), it will call it internally as `sublayer(self.norm(x))`
  - If we just passed `self.self_attention_block` directly, Python would complain because that function needs 4 arguments `(q, k, v, mask)` — but `ResidualConnection` will only call it with (`x`)
  - So we wrap it in a **lambda** to “freeze” the extra arguments and call the attention block with (x, x, x, src_mask)
- Why `self.feed_forward_block` (and not `self.feed_forward_block.forward`)?
  - feed_forward_block is an instance of a PyTorch nn.Module (your class FeedForwardBlock)
  - In PyTorch, every nn.Module has a special behavior: calling the module automatically invokes its forward() method

In [None]:
class Encoder(nn.Module):

    def __init__(self, features:int, layers: nn.ModuleList) -> None:
        super().__init__()
        self.layers = layers
        self.norm = LayerNormalization(features)

    def forward(self, x, mask):
        for layer in self.layers:
            x = layer(x, mask)
        return self.norm(x)

We create this `Encoder` class because an encoder is most of the time made of N identical `EncoderBlock` layers (each with its own learnable weights), to provide more depth to the model.

# Decoder Block

Contains a self-attention mechanism, a cross attention mechanism (attenting to the encoder's output), and a feed-forward network, all surrounded by residual connections and layer normalization.

In [None]:
class DecoderBlock(nn.Module):

    def __init__(self, self_attention_block: MultiHeadAttentionBlock, cross_attention_block: MultiHeadAttentionBlock, feed_forward_block: FeedForwardBlock, features:int, dropout: float) -> None:
        super().__init__()
        self.self_attention_block = self_attention_block
        self.cross_attention_block = cross_attention_block
        self.feed_forward_block = feed_forward_block
        self.residual_connections = nn.ModuleList([ResidualConnection(features, dropout) for _ in range(3)])

    def forward(self, x, encoder_output, src_mask, tgt_mask):
        x = self.residual_connections[0](x, lambda x: self.self_attention_block(x, x, x, tgt_mask))
        x = self.residual_connections[1](x, lambda x: self.cross_attention_block(x, encoder_output, encoder_output, src_mask))
        x = self.residual_connections[2](x, self.feed_forward_block)
        return x

- `src_mask`: source mask to prevent the model from attending to padding tokens in the source input
- `tgt_mask`: target mask to prevent the model froom attending to future tokens in the target sequence

In [None]:
class Decoder(nn.Module):

    def __init__(self, features: int, layers: nn.ModuleList) -> None:
        super().__init__()
        self.layers = layers
        self.norm = LayerNormalization(features)

    def forward(self, x, encoder_output, src_mask, tgt_mask):
        for layer in self.layers:
            x = layer(x, encoder_output, src_mask, tgt_mask)
        return self.norm(x)

We create this `Decoder` class because a decoder is most of the time made of N identical `DecoderBlock` layers (each with its own learnable weights), to provide more depth to the model.

# Transformer Class

Encapsulates the entire transformer model.

In [None]:
class Transformer(nn.Module):

    def __init__(self, encoder: Encoder, decoder: Decoder, src_embed: InputEmbeddings, tgt_embed: InputEmbeddings, src_pos: PositionalEncoding, tgt_pos: PositionalEncoding, projection_layer: ProjectionLayer) -> None:
        super().__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.src_embed = src_embed
        self.tgt_embed = tgt_embed
        self.src_pos = src_pos
        self.tgt_pos = tgt_pos
        self.projection_layer = projection_layer

    def encode(self, src, src_mask):
        src = self.src_embed(src)
        src = self.src_pos(src)
        return self.encoder(src, src_mask)
    
    def decode(self, encoder_output: torch.Tensor, src_mask: torch.Tensor, tgt: torch.Tensor, tgt_mask: torch.Tensor):
        tgt = self.tgt_embed(tgt)
        tgt = self.tgt_pos(tgt)
        return self.decoder(tgt, encoder_output, src_mask, tgt_mask)
    
    def project(self, x):
        return self.projection_layer(x)

`encode` method:
- apply source embeddings to the input tensor $src$
- add positional encodings to the embedded source tensor
- pass the resulting tensor through the encoder along with the source mask (to handle padding tokens)
- returns the encoded representation of the source sequence of shape (batch_size, seq_len, d_model)

`decode` method:
- apply target embeddings to the input tensor tgt
- add positional encodings to the embedded target tensor
- pass the resulting tensor through the decoder along with the encoder output and the respective masks
- returns the decoded representation of the target sequence of shape (batch_size, seq_len, d_model)

`project` method:
  - apply the projection layer to map the d_model output to vocab_size dimensional logits

In [None]:
def build_transformer(src_vocab_size: int, tgt_vocab_size:int, src_seq_len:int, tgt_seq_len:int, d_model: int=512, N: int=6, h: int=8, dropout: float=0.1, d_ff: int=2048) -> Transformer:
    # Create the embedding layers
    src_embed = InputEmbeddings(d_model, src_vocab_size)
    tgt_embed = InputEmbeddings(d_model, tgt_vocab_size)

    # Create the positional encodings
    src_pos = PositionalEncoding(d_model, src_seq_len, dropout)
    tgt_pos = PositionalEncoding(d_model, tgt_seq_len, dropout)

    # Create the encoder block
    encoder_blocks = []
    for _ in range (N):
        encoder_self_attention_block = MultiHeadAttentionBlock(d_model, h, dropout)
        feed_forward_block = FeedForwardBlock(d_model, d_ff, dropout)
        encoder_block = EncoderBlock(encoder_self_attention_block, feed_forward_block, d_model, dropout)
        encoder_blocks.append(encoder_block)

    # Create the decoder block
    decoder_blocks = []
    for _ in range (N):
        decoder_self_attention_block = MultiHeadAttentionBlock(d_model, h, dropout)
        decoder_cross_attention_block = MultiHeadAttentionBlock(d_model, h, dropout)
        feed_forward_block = FeedForwardBlock(d_model, d_ff, dropout)
        decoder_block = DecoderBlock(decoder_self_attention_block, decoder_cross_attention_block, feed_forward_block, d_model, dropout)
        decoder_blocks.append(decoder_block)

    # Create the encoder and decoder
    encoder = Encoder(d_model, nn.ModuleList(encoder_blocks))
    decoder = Decoder(d_model, nn.ModuleList(decoder_blocks))

    # Create the projection layer
    projection_layer = ProjectionLayer(d_model, tgt_vocab_size)

    # Create the transformer
    transformer = Transformer(encoder, decoder, src_embed, tgt_embed, src_pos, tgt_pos, projection_layer)

    # Initialize the parameters
    for p in transformer.parameters():
        if p.dim() > 1:
            nn.init.xavier_uniform(p)
    
    return transformer