In [9]:
import torch, torch.nn as nn, torch.nn.functional as F
import math

# Transformer Architecture
- Built on encoder - decoder model
- Built entirely on self attention + feed forward network layers (No Recurrence and convolution)

<img src="media/Transformer%20Architecture.png" alt="Transformer Architecture" width="400"/>

### Encoder (N times)
1. Input Embedding
2. Positional Encoding
3. MultiHead Attention + Add & Norm
4. Feed Forward + Add & Norm

### Decoder (N times)
1. Output Encoding
2. Positional Encoding
3. *Masked* MultiHead Attention + Add & Norm
4. Cross MultiHead Attention (Learnings from Encoder are embedded) + Add & Norm
5. Feed Forward + Add & Norm

### Output
1. Linear
2. Softmax

# Implementing components for Transformers

## Building Positional Encoding Module

Since transformer doesn't have idea about the previous sequence of knowledge and we need a way to embed the positional information about the data. So transformer paper suggest using of the Periodic functions which can help us out in maintaining the positional information while training.

Referrence: "Attention is all you need"

$$
\mathrm{PE}_{(pos,\,2i)} = \sin\left(\frac{pos}{10000^{\frac{2i}{d_\text{model}}}}\right)
$$
$$
\mathrm{PE}_{(pos,\,2i+1)} = \cos\left(\frac{pos}{10000^{\frac{2i}{d_\text{model}}}}\right)
$$

- pos: position of the token in the sequence (0, 1, 2, …)
- i: dimension index (0, 1, 2, …)
- d_{model}: the total embedding dimension (e.g., 512)
- 2i and 2i+1 split sine and cosine across even and odd indices

Basically we are using two formula based on the token's position. If the token is even or odd and then we use either cos or sin

In [7]:
class Sinusoidal_Positional_Encoding(nn.Module):
    """
    d_model: Embedding dimension which is same as the input embedding dimension
    max_seq_len: the maximum length of the input sequence
    """
    def __init__(self, d_model: int, max_seq_len: int = 1000):
        super().__init__()
        self.d_model = d_model
        self.max_seq_len = max_seq_len
        # Positional Encoding Matrix
        pe = torch.zeros(max_seq_len, d_model)    # creating a zero matrix of shape of the maximum sequence length and the embedding dimension 
        position = torch.arrange(0, max_seq_len, dtype=torch.float).unsqueeze(1)     # creating a position matrix of shape of the maximum sequence length
        div_term = torch.exp(torch.arrange(0, d_model, 2).float() * -(math.log(10000)/ d_model))
        pe[:, 0::2] = torch.sin(position * div_term)        # for even index
        pe[:, 1::2] = torch.cos(position * div_term)        # for odd index
        self.register_buffer("pe", pe)  # this is a buffer which becomes a parameter of the model without gradients

In [8]:
# For ablation study, we can use the learned positional encoding as well
class Learned_Positional_Encoding(nn.Module):
    """
    d_model: Embedding dimension
    max_seq_len: the maximum length of the input sequence
    """
    def __init__(self, d_model: int, max_seq_len: int = 1000):
        super().__init__()
        self.embeddding = nn.Embedding(max_seq_len, d_model)

    def forward(self, x: torch.Tensor)->torch.Tensor:
        """
        x(input): (batch_size, seq_len, d_model)
        """
        batch_size, seq_len, _ = x.size()
        pos = torch.arrange(seq_len, device=x.device).unsqueeze(0)
        pos_emb = self.embeddding(pos)
        return x + pos_emb.unsqueeze(0)

## Single Head Self Attention

Masking is present in the multi head attention module present in decoder part of the transformer model.
This helps the model to avoid using the next tokens in the sequence for training 

In [None]:
def casual_mask(T: int, device: torch.device)->torch.Tensor:
    """
    Returns a bool mask where True means *masked*
    """

    m = torch.triu(torch.ones(T, T, device=device), diagonal=1)
    return m.view(1,1,T,T)

Referrence "Attention is all you need"

Here the attention head created is based on the scaled dot-product attention formula presented in the transformers paper  
**Scaled Dot-Product Attention Formula** : The scaled dot-product attention computes attention scores as:

$$
\text{Attention}(Q, K, V) = \text{softmax}\left( \frac{Q K^T}{\sqrt{d_k}} \right) V
$$

where:
- \( Q \) = queries,
- \( K \) = keys,
- \( V \) = values,
- \( d_k \) = dimensionality of keys.


<img src="media/attention.png" alt="Scaled dot product" width="400"/>  

We are implementing the Scaled Dot Product (Left Diagram)

In [None]:
class Single_Head_Self_Attention(nn.Module):
    """
    Single-Head Attention
    args:
        d_model: embedding dimension
        d_k: dimension of the key, value and 
        dropout: dropout rate
        trace_shapes: whether to trace the shapes of the tensors for debugging
    """
    def __init__(self, d_model: int, d_k: int, dropout: float = 0.0, causal: bool = True, trace_shapes: bool = False):
        super().__init__()
        self.d_k = d_k  # Store d_k as instance variable for use in forward
        self.q = nn.Linear(d_model, d_k, bias=False)
        self.k = nn.Linear(d_model, d_k, bias=False)
        self.v = nn.Linear(d_model, d_k, bias=False)
        self.dropout = nn.Dropout(dropout)
        self.causal = causal
        self.trace_shapes = trace_shapes
        
    def forward(self, x: torch.Tensor):  # (B, T, d_model)
        batch_size, seq_len, _ = x.shape
        q = self.q(x)    # (Batch_size, Seq_len, d_k)
        k = self.k(x)    # (Batch_size, Seq_len, d_k)
        v = self.v(x)    # (Batch_size, Seq_len, d_k)
        if self.trace_shapes:
            print(f"q: {tuple(q.shape)}")
            print(f"k: {tuple(k.shape)}")
            print(f"v: {tuple(v.shape)}")

        # Applying the scaled dot-product attention formula
        scale = 1.0 / math.sqrt(self.d_k)
        attention = torch.matmul(q, k.transpose(-2, -1)) * scale   # (Batch_size, Seq_len, Seq_len)
        
        # Masking the upper triangle of the attention matrix
        # Why? To prevent the model from attending to future tokens during training
        if self.causal:
            mask = casual_mask(seq_len, device=x.device)
            attention = attention.masked_fill(mask.squeeze(1), float("-inf"))
        w = F.softmax(attention, dim=-1)
        w = self.dropout(w)
        out = torch.matmul(w, v)   # (Batch_size, Seq_len, d_k)
        if self.trace_shapes:
            print(f"Weights: {tuple(w.shape)} Out {out.shape}")
        return out, w
        

## Building multi head attention module

<img src="media/attention.png" alt="Scaled dot product" width="400"/>  

We are now implementing the Multiple Scaled Dot Product which are stacked (Right Diagram)

In [None]:
class Multi_Head_Self_Attention(nn.Module):
    """
    Multi-Head Attention
    This represent one multi head self attention block

    args:
        d_model: embedding dimension
        n_heads: number of attention heads
        dropout: dropout rate
        trace_shapes: whether to trace the shapes of the tensors for debugging

    note:
        d_k (d_head here) ie, dimension of the key, value and query is same as d_model / n_heads
    """
    def __init__(self, d_model: int, n_heads: int, dropout: float = 0.0, causal: bool = True, trace_shapes: bool = False):
        super().__init__()
        assert d_model % n_heads == 0, "d_model must be divisible by n_heads"
        self.d_model = d_model
        self.n_heads = n_heads
        self.d_head = d_model // n_heads
        self.dropout = nn.Dropout(dropout)
        self.causal = causal
        self.trace_shapes = trace_shapes
        
        self.qkv = nn.Linear(d_model, 3 * d_model, bias=False)      # we need to project the input to 3 times the dimension of the input so that we can split it into query, key and value
        self.proj = nn.Linear(d_model, d_model, bias=False)         # after the attention computation, we need to project the output back to the original dimension


    def forward(self, x: torch.Tensor):
        batch_size, seq_len, C = x.shape   # c is the embedding dimension
        qkv = self.qkv(x)
        qkv = qkv.view(batch_size, seq_len, 3, self.n_heads, self.d_head)
        q, k, v = qkv.unbind(dim=2)
        q = q.transpose(1, 2)
        k = k.transpose(1, 2)
        v = v.transpose(1, 2)

        if self.trace_shapes:
            print(f"q: {tuple(q.shape)}")
            print(f"k: {tuple(k.shape)}")
            print(f"v: {tuple(v.shape)}")

        scale = 1.0 / math.sqrt(self.d_head)
        attention = torch.matmul(q, k.transpose(-2, -1)) * scale

        # mask out the upper triangle of the attention matrix
        if self.causal:
            mask = casual_mask(seq_len, device=x.device)
            attention = attention.masked_fill(mask, float("-inf"))
        
        w = F.softmax(attention, dim=-1)
        w = self.dropout(w)
        ctx = torch.matmul(w, v)
        if self.trace_shapes:
            print(f"ctx: {tuple(ctx.shape)}")
        ctx = ctx.transpose(1, 2)
        # we need to make sure that the context is contiguous
        # because the view operation is not guaranteed to be contiguous
        # if it is not contiguous, the view operation will throw an error
        # contiguous means that the memory is in a contiguous block
        ctx = ctx.contiguous().view(batch_size, seq_len, C)
        out = self.proj(ctx)
        return out
        

In [None]:
## Building simple feed forward network with GELU activation
class FeedForward(nn.Module):
    """
    Position-wise Feed Forward Network
    
    Args:
        d_model: input/output dimension
        d_hd : hidden dimension
        dropout: dropout rate
    """

    def __init__(self, d_model: int, d_hd: int, dropout: float = 0.1):
        super().__init__()
        self.linear1 = nn.Linear(d_model, d_hd)
        self.linear2 = nn.Linear(d_hd, d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x: torch.Tensor):
        """
        x : (Batch_size, Seq_len, d_model)
        returns : (Batch_size, Seq_len, d_model)
        """
        x = self.linear1(x)
        x = F.gelu(x)
        x = self.dropout(x)
        x = self.linear2(x)
        return x

## Building Layer normalization
mean of all features
$$
\mu = \frac{1}{D} \sum_{i=1}^{D} x_i, \quad
$$

variabce of all features
$$
\sigma^2 = \frac{1}{D} \sum_{i=1}^{D} (x_i - \mu)^2
$$

$$
\text{LayerNorm}(x_i) = \gamma \cdot \frac{x_i - \mu}{\sqrt{\sigma^2 + \epsilon}} + \beta
$$

In [18]:
class LayerNorm(nn.Module):
    """
    Layer Normalization

    Args:
        d_model : dimension to normalize over
        eps : using small value to avoid division by zero
        bias : boolean to include bias or not
    """

    def __init__(self, d_model: int, eps: float = 1e-5, bias: bool = True):
        super().__init__()
        self.eps = eps
        self.weight = nn.Parameter(torch.ones(d_model))
        self.bias = nn.Parameter(torch.zeros(d_model)) if bias else None

    def forward(self, x: torch.Tensor):
        """
        x : (Batch_size, Seq_len, d_model)
        returns : (Batch_size, Seq_len, d_model)
        """
        mean = x.mean(dim=-1, keepdim=True)
        var = x.var(dim=-1, keepdim=True, unbiased=False)
        normalized = (x - mean) / torch.sqrt(var + self.eps)
        normalized = normalized * self.weight
        if self.bias is not None:
            normalized = normalized + self.bias
        return normalized
        


we can use nn.LayerNormalization from pytorch to implement the layernormalization directly

In [20]:
# LayerNorm = nn.LayerNorm(d_model, eps=1e-5, elementwise_affine=True) 
# elementwise_affine = True means that the layer normalization will have learnable parameters ie, weights and biases
# if we set it to False, the layer normalization will not have any learnable parameters and it will be a simple layer normalization

In [21]:
class Residual_Connection(nn.Module):
    """
    Residual connection with layer normalization.
    It supports both pre normalization and post normalization
    """
    def __init__(self, d_model: int, dropout: float = 0.1, bias: bool = True, pre_norm: bool = True):
        super().__init__()
        self.norm = nn.LayerNorm(d_model, eps=1e-5, elementwise_affine=bias)
        self.dropout = nn.Dropout(dropout)
        self.pre_norm = pre_norm
        
    def forward(self, x, sublayer) -> torch.Tensor:
        """
        Apply residual connection to any sublayer with the same size

        Args:
            x: input tensor (batch_size, seq_len, d_model)
            sublayer: function that takes x and returns tensor of same shape
        Returns:
            output: (batch_size, seq_len, d_model)
        """
        if self.pre_norm:
            return x + self.dropout(sublayer(self.norm(x)))
        else:
            return self.norm(x + self.dropout(sublayer(x)))
            

In [22]:
class SublayerConnection(nn.Module):
    """
    A residual connection followed by a layer norm.
    This is a convenience wrapper that combines residual connection and layer norm.
    """
    def __init__(self, d_model: int, dropout: float = 0.1):
        super().__init__()
        self.norm = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, x, sublayer) -> torch.Tensor:
        return x + self.dropout(sublayer(self.norm(x)))


# Encoder

```
Input: x (batch_size, seq_len, d_model)
    ↓
1. Attention + Residual:
   x = x + dropout(self_attention(layer_norm(x)))
    ↓  
2. Feed Forward + Residual:
   x = x + dropout(feed_forward(layer_norm(x)))
    ↓
Output: x (batch_size, seq_len, d_model)
```