In [2]:
import torch
import torch.nn as nn
import math

In [1]:
#First we will build input Embeddings
# Here word are mapped to input IDs and input IDs are mappeed to 512 integer vector called Embeeding Vector.

In [None]:
# word into IDs (Position of each word in the vocabulary)

In [3]:
class InputEmbeddings(nn.Module):
    
    #Given a number it will return same vector every time.  This is what Embedding does It is mapping between number and vector of size 512. 
    # Here 512 vector size is d_model
    def __init__(self, d_model:int, vocab_size: int):
        super().__init__()
        self.d_model = d_model
        self.vocab_size = vocab_size
        #embdding is dictionary it maps number to the same vector every time.  This vector is learned by the model.
        self.embedding = nn.Embedding(vocab_size, d_model)
        
    # In the papwer we multiply embdding layer weights by sqrt(d_model)
    def forward(self,x):
        return self.embeddings(x) * math.sqrt(self.d_model)
        
        

In [5]:
#Positional Encoding:  List of senstennce mapped to 512 integer vector embedding.  
# We want to convey the model position of each of the word in the sentence.
# To find positional encoding we will add another vector to the embedding vector using sine and cosine formula.  
# This will give context of position of each word in the sentence
# Create a vector same size as embedding vector with context add them to the embedding vector.

In [7]:
class PositionalEncoding(nn.Module):
    
    #d_model is the size of vector the position encoding should be 
    # seq_len is MAX lenght of the sentence.  Since we have to create positonal endoding for each word in the sentence.
    #dropout is added to prevent overfitting.
    def __init__(self, d_model, seq_len:int, dropout:float):
        super().__init__()
        self.d_model = d_model
        self.seq_len = seq_len
        self.dropout =nn.Dropout(dropout)
        
        # For positional encoding we will first build matrix of size seq_len * d_model
        pe = torch.zeros(seq_len, d_model)
        # Take vector of length seq_len in the even position we apply sine function formula and odd position we apply cosine function formula.
        #Vector position that map the positon of each word inside the sentence. Create a vector of length seq_len.
        #unsqueze function Returns a new tensor with a dimension of size one inserted at the specified position. 
        # torch.unsqueeze adds an additional dimension to the tensor.
        # It indicates the position on where to add the dimension. torch.unsqueeze adds an additional dimension to the tensor.
        # So let's say you have a tensor of shape (3), if you add a dimension at the 0 position, it will be of shape (1,3), which means 1 row and 3 columns:
        # If you have a 2D tensor of shape (2,2) add add an extra dimension at the 0 position, this will result of the tensor having a shape of (1,2,2), which means one channel, 2 rows and 2 columns. 
        # If you add at the 1 position, it will be of shape (2,1,2), so it will have 2 channels, 1 row and 2 columns.
        # unsqueeze is a method to change the tensor dimensions, such that operations such as tensor multiplication can be possible.
        
        #create vector of shape seq_len and 1 row  shape (seq_len, 1)
        position = torch.arange(0, seq_len, dtype=float).unsqueeze(1) # Creating a tensor of shape (seq_len, 1)
        #demominator of the formula is 10000^((2*i)/d_model)
        #denominator of the formula is 10000^((2*i)/d_model) here is calculated in log space for numerical stability.
        div_term = torch.exp(torch.arange(0,d_model,2).float() * (-math.log(10000.0)/d_model))
        #Use sine for even position and cosine for odd position.
        #starting from 0 skip every 2 position in the vector to capture even positions.
        pe[:,0::2]= torch.sin(position * div_term)
        pe[:,1::2]= torch.cos(position * div_term)
        
        #Add batch dimension to this tensor so that we can apply it to all the batch of sentences.
        # Now we just have one tensor/vector of size seq_len * d_model we need to add it to all the batch of sentences.
        
        #Add new batch dimension to this pe
        pe = pe.unsqueeze(0) # This will create a tensor of shape (1, seq_len, d_model)
        
        #Register this tensor into buffer of this module.
        # When we have tensor that we want to keep inside the module, we want to keep the learned parameters saved inside the buffer when we save the file of the module.
        #tensor will be saved in the file along with state of he module
        self.register_buffer('pe',pe)
        
    
    # Add positional encoding to every word inside the sentence.
    def forward(self, x):
        #positonal encoding will not be learned by the model.  
        # x =x + pe of this particular sentence
        #shape (1, seq_len, d_model)
        # we do not want to learn positional encoding because they are fixed and cannot be learned by the model. require_grad_(False)
        x = x + (self.pe[:,:x.shape[1],:]).require_grad_(False)
        #apply dropout to prevent overfitting and return the result.
        return self.dropout(x)


In [8]:
#Layer Normalization/add and norm  
# If we have a batch of 3 items and each item has features of size 512. 
# For each item in the batch we calcuate mean and standard deviation of size of vector 512. 
# Calculate new value for each item by subtracting mean and dividing by standard deviation. 
# also introduce learnable parameters gamma and beta. Where gamma is multiplied to the new value and beta is added to the new value. 
# we need model to be able to amplify the value if needed, model will learn to add gamma to the value in such a way that it can amplify the value.

class LayerNormalization(nn.Module):
    def __init__(self, epsilon:float=1e-6):
        super().__init__()
        #epsilon is a small number to avoid division by zero.
        self.eps = epsilon
        #Using nn.parameter to create learnable parameters.
        self.alpha = nn.Parameter(torch.ones(1)) # Multiplied 
        self.bias = nn.Parameter(torch.zeros(1)) # Added to the new value
    
    # We need to calculate mean and standard deviation for each item in the batch.
    def forward(self, x):
        #Usually mean cancels the dimensions that are applied by want to keep it therefore keep_dim =True added.
        # Mean of each item column last dimension of the tensor.
        mean = x.mean(dim=-1, keepdim=True)
        std = x.std(dim=-1, keepdim=True)
        return self.alpha * (x -mean) / (std + self.eps) + self.bias 

In [9]:
# Build Feedforward a fully connected layer that the model uses both in encoder and decoder.
# This consists of two transformation with ReLU activation function in between. 
# TWO matrics W1 and W2 are W1 multiplied to X and bias added multiplied by W2 and bias added. 
# inpiut and output d_model dimension are 512
# inner layer has dimensionality of 2048.
# FFN(x) = max(0, xW1 + b1)W2 + b2
# Here max means ReLU activation function on xW1 + b1.

class FeedForwardBlock(nn.Module):
    def __init__(self, d_model: int, d_ff: int, dropout:float):
        super().__init__()
        self.linear_1 = nn.Linear(d_model,d_ff) # w1 and b1
        self.dropout = nn.Dropout(dropout)
        self.linear_2 = nn.Linear(d_ff, d_model) # w2 and b2
        
        
    def forward(self,x):
        # We have input sentence with dimension (Batch, Seq_len, d_model)
        #(Batch, Seq_len, d_model) ---> (Batch, Seq_len, d_dff)-->(Batch, Seq_len, d_model)
        return self.linear_2(self.dropout(torch.relu(self.linear_1(x))))

In [10]:
#Multi-Head Attention takes the input and copy it three times one time as Key, one time as Query and one time as Value.
# Input Sequence (seq_len, d_model)
# Input Sequence transformed into 3 matrices
# H here is number of heads.
# Split the martics ALONG THE EMBEDDING DIMENSION NOT ALONG THE SEQ_LEN DIMENSION.
# Each HEAD will have FULL SENTENCE BUT DIFFERENT PART OF THE EMBEDDINGS
# Apply attention to each head.
# Attention(Q,K,V) = softmax(QK^T/sqrt(d_model))V
#MultiHead(Q,K,V) = Concat (head_1, head_2,head_3... head_h)W 


# Q (seq_len, d_model) * Wq (d_model, d_model)  ===> Q'(seq_len, d_model) --Split into H matrices->
# K (seq_len, d_model) * Wk (d_model, d_model)  ===> K'(seq_len, d_model) --Split into H matrices->
# V (seq_len, d_model) * Wv (d_model, d_model)  ===> V'(seq_len, d_model) --Split into H matrices->
# Head = Attention(Q,K,V) = Attention (QWq, KWk, VWv)
# H = Concat (head_1, head_2,head_3... head_h)
# H * Wo = MH-A
#Here Wo is output weight which is h *dv * d_model ===> d_model * d_model
#Here dv = d_model/h =d_k


In [11]:
class MultiHeadAttentionBlock(nn.Module):
    
    #h is number of heads
    #We need to divide the embedding vectors into h heads
    #d_model 512 should be divisible by number of heads.
    
    def __init__(self, d_model:int, h:int, dropout:float):
        super().__init__()
        self.d_model = d_model
        self.h = h
        self.dropout = nn.Dropout(dropout)
        assert d_model %h == 0, "d_model should be divisible by number of heads"
        #d_k is d_model/h
        #nn.Linear is a linear layer used in neural networks that applies a linear transformation to input data using weights and biases.
        # The nn.Linear module takes two parameters: in_features and out_features, which represent the number of input and output features, 
        # respectively. When an nn.Linear object is created, it randomly initializes a weight matrix and a bias vector. 
        # The size of the weight matrix is out_features x in_features, and the size of the bias vector is 
        # out_features.nn.Linear works by performing a matrix multiplication of the 
        # input data with the weight matrix and adding the bias term. This operation is applied to each layer in a feed-forward neural network.
        # nn.Linear and nn.Conv2d are both fundamental modules in PyTorch used for different purposes. 
        # While nn.Linear applies a linear transformation to the incoming data, nn.Conv2d applies a 2D convolution over an input signal composed of several input planes.
        # nn.Conv2d applies a 2D convolution over an input signal and is primarily used in convolutional layers for tasks like image processing.
        self.d_k = d_model // h 
        self.w_q = nn.Linear(d_model, d_model) # Wq
        self.w_k = nn.Linear(d_model, d_model) # Wk
        self.w_v = nn.Linear(d_model, d_model) # Wv
        
        # W0 = h * d_k * d_model 
        self.w_o = nn.Linear(d_model,d_model) # Wo
        self.dropout = nn.Dropout(dropout)
    
    # Calculate attention call this method without creating instance of this calss
    @staticmethod
    #Here we get smaller head1, head2, head3...
    def attention( query, key, value, mask, dropout:nn.Dropout):
        #d_k is the last dimension of query key and value
        # attention = Softmax (Q * K^T /sqrt(d_k)) *V
        d_k = query.shape[-1]
        #transpose last two dimensions transpose(-2,-1)
        # last dimension are seq_len by d_k after transpose it becomes d_k by seq_len
        # (Batch, h, seq_len, d_k)  ---> (batch, h, seq_len, d_k)
        attention_scores = (query @ key.transpose(-2,-1)) /math.sqrt(d_k)
        #Before applying Softmax we need to apply mask we need to hide some interactions
        #replace values tha we want to mask with very small values
        if mask is not None:
            # for all attention score where maske == 0 replace with 1e9 very small value -1e9 is like - infinity
            attention_scores.masked_fill_(mask == 0, -1e9)
        attention_scores = attention_scores.softmax(dim=-1) # Input dim (batch, h, seq_len) 
        if dropout is not None:
            attention_scores = dropout(attention_scores)
        
        # here second attention_scores is returned for visualization purposes
        return (attention_scores @value), attention_scores
        
        
    #if we want some words to not interact with other words we use masking
    # The forward() method takes the input data (typically a tensor) and returns the output data as a tensor after passing it through the model's layers.
    # The forward pass computes the predicted output tensor, which can be used for making predictions or calculating the loss during training.
    # if we do not want some words to interact with other we will set attention to 0.  This is used in decoder to maske words we do not want to decoder to see.
    def forward(self,q,k,v, mask):
        query = self.w_q(q) # (Batch, seq_len, d_model) * (d_model, d_model) = (Batch, seq_len, d_model)
        key = self.w_k(k) # (Batch, seq_len, d_model) * (d_model, d_model) = (Batch, seq_len, d_model)
        value = self.w_v(v) # (Batch, seq_len, d_model) * (d_model, d_model) = (Batch, seq_len, d_model)
        
        #Divide query key values into h heads 
        # The "view" method functions by altering the tensor's shape while preserving its original dataIn simpler terms, 
        # it enables developers to adjust the size and structure of the tensor without modifying the information it contains.
        # reshaped_x = x.view(2, 4) Here tensor is reshaped to 2 rows and 4 columns. 
        # Flatten tensor to one dimensional array y.view(-1)
        # Adjust batch size: tensor "images" with a shape of (32, 3, 64, 64), 
        # where 32 denotes the batch size, 3 indicates the RGB channels, and 64x64 represents the image size.
        
        # HWere we kept query.shape[0] and query.shape[1] because we do not want to split the batch size and seq length.
        # we want to split by d_model dimension into self.h and self.d_k
        # Here self.h * self.d_k = self.d_model
        # we need to transpose because We want to have d imension instead of third dimension as second dimension transpose(1,2)
        # This will allow each head see seq_length x d_k
        # (Batch, Seq_length, d_model) ---> (Batch, Seq_length, h, d_k)  --->(Batch, h, seq_length, d_k)
        # We want each head to watch seq_len by d_k
        # Each head will see full length of each sentence Seq_length but smaller part of the embedding
        query = query.view(query.shape[0], query.shape[1], self.h, self.d_k).transpose(1,2) # Batch Size x h * Seq_length * d_k
        key = key.view(key.shape[0], key.shape[1], self.h, self.d_k).transpose(1,2) # Batch Size x h * Seq_length * d_k
        value = value.view(value.shape[0], value.shape[1], self.h, self.d_k).transpose(1,2) # Batch Size x h * Seq_length * d_k
        
        # Calculate the Attention 
        x, self.attention_scores =  MultiHeadAttentionBlock.attention( query, key, value, mask, self.dropout)
        
        # We had seq_len in the third dimension, we want seq_length back in the first dimension to combine them.  
        #(Batch, h, seq_len, d_k) ---> (Batch, seq_len, h, d_k) --->(Batch, seql_len, d_model)
        x = x.transpose(1,2).contigusous().view(x.shape[0], -1, self.h * self.d_k)
        
        #multiply x by Wo output matrix
        return self.w_o(x)
    
    
        
        

In [19]:
#Skip connections
#IT is connection between Add & Norm and the Previous Layer 
class ResidualConnection(nn.Module):
    
    def __init__(self, dropout: float) ->None:
        super().__init__()
        self.dropout = nn.Dropout(dropout)
        self.norm = nn.LayerNormalization()
    
    #Here sublayer is the previous layer 
    def forward(self, x, sublayer):
        #Combine x with output of the previous layer or sublayer
        #First we apply the normalization then we apply the sublayer
        return x + self.dropout(sublayer(self.norm(x)))
        

In [20]:
# We have Encoder Block which is repeated N times
# Ouput of the previous send to next one
# Last output send to the decoder

In [21]:
#We call it self attention because in the it is applied 3 role query key and value
# Encode Block has 2 Add and Norm blocks, one Multi Head Attention and one Feed Forward 
# Self Attention: 
#   Self Attention is multi head attention, We call it self attention because in case of Encoder it is applied to the same intput with
#  three different roles role of query key and value

class EncoderBlock(nn.Module):
    def __init__(self, self_attention_block: MultiHeadAttentionBlock, feed_forward_block: FeedForwardBlock, dropout:float):
        super().__init__()
        self.self_attention_block = self_attention_block
        self.feed_forward_block = feed_forward_block
        self.residual_connection = nn.ModuleList([
            ResidualConnection(dropout),
            ResidualConnection(dropout)
        ])
    
    # src_mask is the mask we want to apply to the input of the encoder
    # We want to hide the interaction of padding word with other words
    # Self Attention: Sentence that is watching itself. Once sentence interacting with other words in the same sentence.
    # Query from decoder watch K and V coming from the encoder
    def forward(self, x, src_mask):
        # x is sent to the residual_connection skip connection and also to the multi head attention and combine the two.
        # call self attention becasue role of q k v is the input itself.
        # itis the sentence that is watching iself each word of one sentence is interacting with other word of the same sentence
        # In decoder we have cross connection
        #Here we are calling forward of the multihead attention block
        # Multihead attention combined with x by using residual connection
        x = self.residual_connection[0](x, lambda x: self.self_attention_block(x,x,x,src_mask))
        x = self.residual_connection[1](x, lambda x: self.feed_forward_block)
        return x
        
    

In [22]:
# Encode is made up of upto N encoder objects

In [23]:
class Encoder(nn.Module):

    def __init__(self, layers: nn.ModuleList):
        super().__init__()
        self.layers = layers
        self.norm = LayerNormalization()
        
    def forward(self,x, mask):
        for layer in self.layers:
            #ouput of previous layer become input to the next layer
            x = layer(x, mask)
        return self.norm(x)

In [26]:
class DecoderBlock(nn.Module):
    
    #Self Attention: 
    # Masked MultHead Attention is the self attention block because same input plays the role of query, key and value.
    # Each word in the sentence is mapped to each other word in the same sentence.
    # Multi Head Attention: Here query comes from the decoder and key and value coming from the encoder
    # This is called Cross attention here we are cross two types of objects.
    # Cross Attention: Here query comes from the decoder and key and value coming from the encoder
    # Different type of attention blocks added here
    def __init__(self, self_attention_block: MultiHeadAttentionBlock, 
                 cross_attention_block:MultiHeadAttentionBlock, feed_forward_block: FeedForwardBlock, dropout:float):
        super().__init__()
        self.self_attention_block = self_attention_block
        self.cross_attention_block = cross_attention_block
        self.feed_forward_block = feed_forward_block
        # 3 residual connection blocks
        self.residual_connection = nn.ModuleList([
            ResidualConnection(dropout),
            ResidualConnection(dropout),
            ResidualConnection(dropout)
        ])
    
    # x is input to the decoder
    # and we also need output from the encoder
    # here we are dealing with translation so we have source language and target language
    #  We have 2 masks one coming from the encoder and one coming from the decoder
    # src_mask coming from the encoder source language
    # tgt_mask coming from the decoder target language
    def forward(self, x, encoder_output, src_mask, tgt_mask):
        #Calculate self attention first
        x = self.residual_connections[0](x, lambda x: self.self_attention_block(x,x,x, src_mask) )
        #Here query coming from the decoder and key and value coming from the encoder
        x = self.residual_connections[1](x, lambda x: self.cross_attention_block(x, encoder_output, encoder_output, tgt_mask))
        x = self.residual_connections[2](x, self.feed_forward_block)
        return x

In [25]:
class Decoder(nn.Module):
    def __init__(self, layers: nn.ModuleList):
        super().__init__()
        self.layers = layers
        self.norm = LayerNormalization()
        
    def forward(self,x, encoder_output,src_mask, tgt_mask):
        for layer in self.layers:
            #aligned to forward to DecoderBlock
            x = layer(x, encoder_output, src_mask, tgt_mask)
        return self.norm(x)
        

In [None]:
#ouput of mulitihead attention is Linear layer
# Output of multihead is seq x d_model 
# we expect output to be seq x d_model if we do not consider batch dim
# We want to map this word back into the vocabulary
# We want linear layer 

In [27]:
#Linear layer convert embedding to position in vocab

In [28]:
class ProjectionLayer(nn.Module):
    #liner layer projecting from domodel to vocab size
    def __init__(self, d_model:int, vocab_size:int):
        super().__init__()
        self.proj = nn.Linear(d_model, vocab_size)
    
    def forward(self, x):
        #(Batch, seq_len, d_model) ---> (batch, seq_len, vocab_size)
        # apply logsoftmax for numerical stability
        return torch.log_softmax(self.proj(x), dim=-1)

In [29]:
class Transformer(nn.Module):
    #One input embedding for the source language one input embedding for the target language
    def __init__(self, encoder: Encoder, decoder: Decoder, src_embed:InputEmbeddings, tgt_embed:InputEmbeddings, src_pos: PositionalEncoding, tgt_pos:PositionalEncoding, projection_layer:ProjectionLayer):
        super().__init__()
        self.encoder=encoder
        self.decoder=decoder
        self.src_embed = src_embed
        self.tgt_embed = tgt_embed
        self.src_pos = src_pos
        self.tgt_pos = tgt_pos
        self.projection_layer = projection_layer
    
    def encode(self, src, src_mask):
        src = self.src_embed(src)
        src = self.src_pos(src)
        return self.encoder(src, src_mask)
    
    def decode(self, encoder_output, src_mask, tgt, tgt_mask):
        tgt = self.tgt_embed(tgt)
        tgt = self.tgt_pos(tgt)
        return self.decoder(tgt, encoder_output, src_mask, tgt_mask)
    
    def project(self,x):
        return self.projection_layer(x)

In [1]:
def build_transformer(src_vocab_size:int, tgt_vocab_size:int, src_seq_len:int, tgt_seq_len:int, d_model:int=512, N:int=6, h:int=8, dropout:float=.1,d_ff:int=2048):
    
    src_embed = InputEmbeddings(d_model, src_vocab_size)
    tgt_embed = InputEmbeddings(d_model, tgt_vocab_size)
    
    src_pos = PositionalEncoding(d_model, src_seq_len,dropout)
    tgt_pos = PositionalEncoding(d_model, tgt_seq_len,dropout)
    
    encoder_blocks =[]
    
    for _ in range(N):
        encoder_self_attention_block = MultiHeadAttentionBlock(d_model, h, dropout)
        feed_forward_block = FeedForwardBlock(d_model, d_ff, dropout)
        encoder_block=EncoderBlock(encoder_self_attention_block,feed_forward_block,dropout)
        encoder_blocks.append(encoder_block)
    
    decoder_blocks = []
    for _ in range(N):
        decoder_self_attention_block = MultiHeadAttentionBlock(d_model, h, dropout)
        decoder_cross_attention_block = MultiHeadAttentionBlock(d_model, h, dropout)
        feed_forward_block = FeedForwardBlock(d_model, d_ff, dropout)
        decoder_block = DecoderBlock(decoder_self_attention_block, decoder_cross_attention_block, feed_forward_block, dropout)
        decoder_blocks.append(decoder_block)
        
    encoder = Encoder(nn.ModuleList(encoder_blocks))
    decoder = Decoder(nn.ModuleList(decoder_blocks))
    
    projection_layer = ProjectionLayer(d_model, tgt_vocab_size)
    
    transformer = Transformer(encoder, decoder, src_embed, tgt_embed, src_pos, tgt_pos, projection_layer)
    
    for p in transformer.parameters():
        if p.dim() > 1:
            nn.init.xavier_uniform_(p)
        
    return transformer
    
    