In [2]:
import math

import torch
import torch.nn as nn
import torch.nn.functional as F

In [2]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

Using device: cuda


## Embedding Layer
*The Idea is to Augument the token embeddings with position-dependednt pattern of values.* 
##
*If the Pattern is characterisitc for each position, then other layers could learn to incoperate positional information into their transformation.*
##
***In Other words, if each position has a unique encoding, the model can infer order and distance between tokens.*** 

### nn.Embedding: Maps the Descrete Input Tokens into Dense Vectors (Embeddings) , a learnable look-up table

*What if we don't use Embeddings for learning, or transforming into more complex term, instead use things as it , could be use linear with it ?*

In [7]:
# Example usage of nn.Embedding
Num_Unique_Tokens = 1000
Size_Of_Vector_to_map = 64
enmbedding_layer = nn.Embedding(num_embeddings=Num_Unique_Tokens, embedding_dim=Size_Of_Vector_to_map)

In [23]:
positional_indices = torch.arange(0, 10).unsqueeze(0)
print(positional_indices.size())

torch.Size([1, 10])


In [29]:
positional_indices[:, 1::2]

tensor([[1, 3, 5, 7, 9]])

*For Positional Informations, max_length defines how long the input sequences can be that the model will see.*

In [32]:
# Adding Positional Information using indicies Information, though lacks relative positional information
class EmbeddingLayer(nn.Module):
    def __init__(self, vocab_size, embed_size, max_length):
        super(EmbeddingLayer, self).__init__()

        self.embedding = nn.Embedding(vocab_size, embed_size)
        self.position_embedding = nn.Embedding(max_length, embed_size)
        self.layer_norm = nn.LayerNorm(embed_size, eps=1e-12)
    
    def forward(self, x):
        word_embedding = self.embedding(x)                                                  # Convert unique word tokens to word embeddings
        
        positional_indices = torch.arange(x.size(-2), device=x.device).unsqueeze(0)         # Creates positional inidices tensor                             Shape: (1, Seqlen)
        positional_embeddings = self.position_embedding(positional_indices)                 # Convert positional indicies to positional embeddings          Shape: (1, Seqlen, embed_size)  
        
        x = word_embedding + positional_embeddings                                          # Adds word embedding to positional embedding
        x = self.layer_norm(x)                                                              # Apply layer normalization
        return x

In [35]:
## Usage With Config
class Config:
    vocab_size = 1000
    embed_size = 64
    max_length = 100    # Maximum length of the input sequence

## Usage ??

In [33]:
# Adding positional Information using sinusoidal function
class SinusoidalEmbeddingLayer(nn.Module):
    def __init__(self, vocab_size, embed_size, max_length, device):
        super(SinusoidalEmbeddingLayer, self).__init__()

        self.embedding = nn.Embedding(vocab_size, embed_size)
        
        # register_buffer => Tensor which is not a parameter, but should be part of the modules state.
        self.register_buffer("positional_embedding", self._get_positional_encoding(max_length, embed_size, device))
        self.layer_norm = nn.LayerNorm(embed_size, eps=1e-12)
    
    def _get_positional_encoding(self, max_length, embed_size, device):
        pe = torch.zeros(max_length, embed_size, device=device)                              # Create a tensor of zeros of size (max_length, embed_size)
        position = torch.arange(0, max_length, dtype=torch.float).unsqueeze(1)              # Create a tensor of size (max_length, 1)
        div_term = torch.exp(torch.arange(0, embed_size, 2).float() * (-math.log(10000.0) / embed_size))    # Create a tensor of exp values of 0 to embed_size/2
        
        pe[:, 0::2] = torch.sin(position * div_term)                                          # Apply sin function to even indices, start=0 , step=2
        pe[:, 1::2] = torch.cos(position * div_term)                                          # Apply cos function to odd indices, start=1, step=2
        pe = pe.unsqueeze(0)                                                                  # shape: (1, max_length, embed_size)
        return pe

    def forward(self, x):
        word_embedding = self.embedding(x)                                                  # Convert unique word tokens to word embeddings
        
        positional_embeddings = self.positional_embedding[:, :x.size(-2), :].to(x.device)   # Get sinosudal indicies information as positional embeddings          Shape: (1, Seqlen, embed_size)
        x = word_embedding + positional_embeddings                                          # Adds word embedding to positional embedding
        x = self.layer_norm(x)                                                              # Apply layer normalization
        return x

In [36]:
class Config:
    vocab_size = 1000
    embed_size = 64
    max_length = 100    # Maximum length of the input sequence
    device = device     # Device to use (CPU or GPU) 

## Multi-Head Attention
*Having Attention is finding the similarity between the tokens which adds contextual meaning for input streams of tokens*
<Br>
*While also having Several Heads allows model to focus on different aspects at once*
<Br>
*This Works same as convolution filters as one filter looks for face, other for **different features** like car. as same one heads looks for subject-verb interactions, another adjectives and other sentence relations*

### Method 1: Implementing Each Head Seperately

In [39]:
class Head(nn.Module):
    def __init__(self, embed_size, head_dim):
        super(Head, self).__init__()

        self.WQ = nn.Linear(embed_size, head_dim)
        self.WK = nn.Linear(embed_size, head_dim)
        self.WV = nn.Linear(embed_size, head_dim)
    
    def forward(self, q, k, v, mask=False):
        Q = self.WQ(q)                                                                    # Shape: (batch_size, seq_len, head_dim)
        K = self.WK(k)                                                                    # Shape: (batch_size, seq_len, head_dim)
        V = self.WV(v)                                                                    # Shape: (batch_size, seq_len, head_dim)

        # Scaled Dot-Product Attention with masking
        score = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(K.size(-1))              # Shape: (batch_size, seq_len, seq_len)
        if mask == True:
            score = score.masked_fill(mask == 0, float("-inf"))                           # Masking the attention scores
        attention_scores = F.softmax(score, dim=-1)                                       # Shape of Attention Scores: (batch_size, seq_len, seq_len)
        attention_output = torch.matmul(attention_scores, V)                              # Output Shape: (batch_size, seq_len, head_dim)

        return attention_output, attention_scores                                         # Return the attention output and weights

In [41]:
class Multi_head_attention(nn.Module):
    def __init__(self, embed_dim, num_heads):
        super(Multi_head_attention, self).__init__()
        
        assert embed_dim % num_heads == 0, "Embedding dimension must be divisible by number of heads"
        self.head_dim = embed_dim // num_heads

        self.heads = nn.ModuleList([Head(embed_dim, self.head_dim) for _ in range(num_heads)])
        self.WO = nn.Linear(embed_dim, embed_dim)

    def forward(self, q, k, v, mask=False):
        head_outputs = [h(q, k, v, mask) for h in self.heads]
        attention_outputs = [output[0] for output in head_outputs]
        attention_scores = [output[1] for output in head_outputs]
        
        x = torch.cat(attention_outputs, dim=-1)                # Concatenate outputs from all heads
        x = self.WO(x)
        return x, attention_scores

### Method 2: Implementing Head In same 

In [52]:
class Multi_Head_Attention(nn.Module):
    def __init__(self, embed_dim, num_heads):
        super(Multi_Head_Attention, self).__init__()
        assert embed_dim % num_heads == 0, "Embedding dimension must be divisible by number of heads"
        
        self.embed_dim = embed_dim
        self.num_heads = num_heads
        self.head_dim = embed_dim // num_heads
        
        # Linear projections
        self.WQ = nn.Linear(embed_dim, embed_dim)
        self.WK = nn.Linear(embed_dim, embed_dim)
        self.WV = nn.Linear(embed_dim, embed_dim)
        self.WO = nn.Linear(embed_dim, embed_dim)
        
    def forward(self, q, k, v, masked=False):
        batch_size = q.size(0)
        q_len, k_len, v_len = q.size(1), k.size(1), v.size(1)
        
        # Linear projections
        Q = self.WQ(q)  # [B, L_q, E]
        K = self.WK(k)  # [B, L_k, E]
        V = self.WV(v)  # [B, L_v, E]
        
        # Reshape for multi-head: [B, T, E] → [B, H, L, D]
        Q = Q.view(batch_size, q_len, self.num_heads, self.head_dim).transpose(1, 2)
        K = K.view(batch_size, k_len, self.num_heads, self.head_dim).transpose(1, 2)
        V = V.view(batch_size, v_len, self.num_heads, self.head_dim).transpose(1, 2)
        
        scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.head_dim)  # Scaled dot-product attention [B, H, L, L]
        
        # Optional Assertion
        assert q_len == k_len, "Query and Key lengths must be equal for self-attention"

        if masked:
            mask = torch.triu(torch.ones(q_len, k_len, device=q.device), diagonal=1).bool() # Create causal mask (for decoder self-attention)
            mask = mask.unsqueeze(0).unsqueeze(1)  # [1, 1, L_q, L_k]
            scores = scores.masked_fill(mask, float('-inf'))
        
        attention_scores = F.softmax(scores, dim=-1)  # Attention Scores[B, H, T_q, T_k]
        attention_output = torch.matmul(attention_scores, V)  # [B, H, T_q, D]

        attention_output = attention_output.transpose(1, 2).contiguous().view(batch_size, q_len, self.embed_dim)# Concatenate heads: [B, H, T, D] → [B, T, E]
        output = self.WO(attention_output)
        
        return output, attention_scores

In [51]:
class Config:
    embed_dim = 64
    num_heads = 8

## Feed Forward Layer
*It Process Each Embedding Sepeartly insteaad of processing whole as single vector, that's why it is also called **Position-Wise Feed Forward Layer***

In [53]:
class FeedForward(nn.Module):
    def __init__(self, embed_dim, ff_dim, droupout=0.5):    # ff_dim is usally higher that model_dim
        super(FeedForward, self).__init__()

        self.fc1 = nn.Linear(embed_dim, ff_dim)
        self.fc2 = nn.Linear(ff_dim, embed_dim)
        self.relu = nn.ReLU()
        self.droupout = nn.Dropout(droupout)
    
    def forward(self, x):
        x = self.relu(self.fc1(x))
        x = self.fc2(x)

        x = self.droupout(x)       # apply dropout to the output of the second linear layer to reduce overfitting
        return x

In [54]:
class Config:
    embed_dim = 512
    ff_dim = 2048   # usually 4 times model_dim
    dropout = 0.1   

In [50]:
feed_forward = FeedForward(Config.model_dim, Config.ff_dim, Config.dropout).to(device)
print(feed_forward)

FeedForward(
  (fc1): Linear(in_features=512, out_features=2048, bias=True)
  (fc2): Linear(in_features=2048, out_features=512, bias=True)
  (relu): ReLU()
  (droupout): Dropout(p=0.1, inplace=False)
)


## Encoder Layer & Encoder

In [57]:
# Though this Implementation is based on the paper "Attention is All You Need" by Vaswani et al. (2017) but
# for ease of training, we would use Pre-Normallization instead of Post-Normalization
class Encoder_Layer(nn.Module):
    def __init__(self, config):
        super(Encoder_Layer, self).__init__()
        self.layer_norm1 = nn.LayerNorm(config.embed_dim, eps=1e-12)
        self.layer_norm2 = nn.LayerNorm(config.embed_dim, eps=1e-12)
        self.multi_head_attention = Multi_Head_Attention(config.embed_dim, config.n_heads)
        self.feed_forward = FeedForward(config.embed_dim, config.ff_dim, config.dropout)
    
    def forward(self, x, mask=False):
        x = self.layer_norm1(x)                                         # Apply layer1 normalization
        attention_output, attention_scores = self.multi_head_attention(x, x, x, mask)
        x = x + attention_output                                        # Residual connection
        x = self.layer_norm2(x)                                         # Apply layer2 normalization
        x = x + self.feed_forward(x)                                    # Residual connection
        return x, attention_scores                                      # Return the output and attention scores

In [58]:
class Encoder(nn.Module):
    def __init__(self, config):
        super(Encoder, self).__init__()
        self.encoder_layers = nn.ModuleList([Encoder_Layer(config) for _ in range(config.num_encoder_layers)])
    
    def forward(self, x):
        for layer in self.encoder_layers:
            x = layer(x)
        
        return x

In [59]:
class config:
    num_encoder_layers = 2
    embed_dim = 512
    n_heads = 8
    ff_dim = 2048
    dropout = 0.1

## Decoder Layer & Decoder

In [3]:
# Here also, for ease of training, we would use Pre-Normallization instead of Post-Normalization
class Decoder_Layer(nn.Module):
    def __init__(self, config):
        super(Decoder_Layer, self).__init__()
        self.layer_norm1 = nn.LayerNorm(config.embed_dim, eps=1e-12)
        self.layer_norm2 = nn.LayerNorm(config.embed_dim, eps=1e-12)
        self.layer_norm3 = nn.LayerNorm(config.embed_dim, eps=1e-12)
        self.masked_multi_head_attention = Multi_Head_Attention(config.embed_dim, config.n_heads)
        self.multi_head_attention = Multi_Head_Attention(config.embed_dim, config.n_heads)
        self.feed_forward = FeedForward(config.embed_dim, config.ff_dim, config.dropout)
    
    def forward(self, x, encoder_output):
        x = self.layer_norm1(x)                                         # Apply layer1 normalization
        attention_output, attention_scores = self.masked_multi_head_attention(x, x, x, mask=True)
        x = x + attention_output                                        # Residual connection
        x = self.layer_norm2(x)                                         # Apply layer2 normalization
        attention_output, attention_scores = self.multi_head_attention(x, encoder_output, encoder_output, mask=False)
        x = x + attention_output                                        # Residual connection
        x = self.layer_norm3(x)                                         # Apply layer3 normalization
        x = x + self.feed_forward(x)                                    # Residual connection
        return x, attention_scores                                      # Return the output and attention scores

*What if we input encoder layers's input in every decoder layer or Each Encoder layer's Output in each Decoder layer's Input  ?*

In [5]:
# Incorrectly Implemented as we itertevely require the encoder_output
class Decoder(nn.Module):
    def __init__(self, config):
        super(Decoder, self).__init__()
        self.decoder_layers = nn.ModuleList([Decoder_Layer(config) for _ in range(config.num_decoder_layers)])
            
    def forward(self, x, encoder_output):
        for layer in self.decoder_layers:
            x = layer(x, encoder_output)
        
        return x

In [None]:
class Config:
    num_decoder_layers = 2
    embed_dim = 512
    n_heads = 8
    ff_dim = 2048
    dropout = 0.1

## Transformer
*Let Now Define Transofmer Seq-2-Seq Architecture*

In [8]:
class Transformer(nn.Module):
    def __init__(self, config):
        super(Transformer, self).__init__()

        self.embedding_layer = SinusoidalEmbeddingLayer(config.vocab_size, config.embed_dim, config.max_length, config.device)
        
        self.decoder_layers = nn.ModuleList([Decoder_Layer(config) for _ in range(config.num_layers)])
        self.encoder_layers = nn.ModuleList([Encoder_Layer(config) for _ in range(config.num_layers)])

        self.fc_out = nn.Linear(config.embed_dim, config.embed_dim)        # Final linear layer
    
    def forward(self, source_input, target_input, Coupled=False):
        src = self.embedding_layer(source_input)                                    # Shape: (batch, seq_len, embed_dim)
        tgt = self.embedding_layer(target_input)                                    # Shape: (batch, seq_len, embed_dim)

        if Coupled:
            for encoder_layer, decoder_layer in zip(self.encoder_layers, self.decoder_layers):
                src, attention_output1 = encoder_layer(src)                                                # Encoder Layer
                tgt, attention_output2 = decoder_layer(tgt, src)   
        
        else:
            for encoder_layer in self.encoder_layers:
                src, attention_output = encoder_layer(src)

            for decoder_layer in self.decoder_layers:
                tgt, attention_output = decoder_layer(tgt, src)

        output = self.fc_out(tgt)
        return output