In [47]:
import torch
from torch import nn 
import pandas as pd
import math 
import torch.nn.functional as F

In [9]:
df = pd.read_csv("eng_-french.csv")
df.head()

Unnamed: 0,English words/sentences,French words/sentences
0,Hi.,Salut!
1,Run!,Cours !
2,Run!,Courez !
3,Who?,Qui ?
4,Wow!,Ça alors !


## Embedding block 

In [14]:
class InputEmbedding (nn.Module) :
    def __init__(self,d_model:int, vocab_size:int):
        super(InputEmbedding,self).__init__()

        self.d_model= d_model
        self.vocab_size = vocab_size
        self.embedding= nn.Embedding(vocab_size,d_model)

    def forward(self,x):
        # this multiplication helps maintain the appropriate variance of the input embeddings.
        return self.embedding(x)*math.sqrt(self.d_model)

model = InputEmbedding(10,10)
print(model)


InputEmbedding(
  (embedding): Embedding(10, 10)
)


## Positional encoding block 

for even positions 
$$
PE(pos,2i) = sin(\frac{pos}{10000^{\frac{2i}{d_{model}}}})
$$
for odd positions
$$
PE(pos,2i+1) = cos(\frac{pos}{10000^{\frac{2i}{d_{model}}}})
$$

however we are calculating the divisor in the log scale for numerical stability

In [23]:
class PositionalEncoding(nn.Module):
    def __init__(self,d_model:int, seq_len:int,dropout:float):
        super(PositionalEncoding,self).__init__()

        self.d_model = d_model 
        self.seq_len = seq_len
        self.dropout = nn.Dropout(dropout)

        # create a matrix of seq_len * d_model 
        Pos_enc = torch.zeros(seq_len,d_model)
        # create a vector of shape (seq_len)
        position = torch.arange(0,seq_len,dtype = torch.float).unsqueeze(1) # (seq_len,1)
        # compute the divisor
        div_term = torch.exp(torch.arange(0,d_model, 2)).float()* (-math.log(10000.0)/d_model)

        Pos_enc [:,0::2] = torch.sin(position * div_term)
        Pos_enc [:,1::2] = torch.cos(position * div_term)

        # add an additional dimension for the batch size 
        Pos_enc = Pos_enc.unsqueeze(1) # [1,seq_len,d_model]
        self.register_buffer("Pos_enc",Pos_enc) # as this in unlearnable parameter, to save it with the model 


        def forward(self,x):
            # add every pos_enc to every embedding of the word to account for placement of the word in a sentence
            # make it an unlearnable parameter as it's fixed 
            x = x + self.Pos_enc [:,x.shape(1),:].requires_grad(False)
            x = self.dropout(x)
            return x 
    
model= PositionalEncoding(10,10,0.55)
print(model)        


PositionalEncoding(
  (dropout): Dropout(p=0.55, inplace=False)
)


## Add and Norm block 
These blocks incorporate two essential components: a residual connection and a LayerNormalization layer.

In [39]:
class AddNorm (nn.Module):

    def __init__(self, d_model,dropout:float):
        super(AddNorm,self).__init__()
        self.dropout = nn.Dropout(dropout)

        # norm block 
        self.normlayer = nn.LayerNorm(d_model)
        

    def forward(self, sublayer):
        x = x + self.dropout(self.normlayer(sublayer(x)))
        return x 

## Position-Wise Feed-Forward Network (FFN):

FFN consists of two fully connected layers. Number of dimensions in the hidden layer $d_{ff}$, is generally set to around four times that of the token embedding $d_{model}$. So it is sometimes also called the expand-and-contract network.
There is an activation at the hidden layer, which is usually set to ReLU activation

The FFN transforms the features of each position in the input sequence independently.
By processing each position separately, the FFN enables the model to capture position-specific information and learn different representations for different parts of the sequence.

The **expanding** action increases the dimensionality of the representations, allowing the model to capture more complex features and interactions in the data, while the **contracting** action compresses these representations, preserving the most relevant information and reducing computational complexity, thereby improving the model's efficiency and capacity to capture intricate patterns.

$$FFN(x,W_1,W_2,b_1,b_2)=max(0,xW_1+b_1)W_2+b_2$$
where $W_1, W_2, b_1$ and $b_2$ are learnable parameters.








In [41]:
class FFN(nn.Module):
    def __init__(self,d_model, dropout):
        super(FFN,self).__init__()

        self.d_model = d_model
        self.dropout = dropout
        self.dropout = nn.Dropout(self.dropout)

        #FFN block 
        d_ff = d_model *4
        self.linear1 = nn.Linear(self.d_model,d_ff) # W_1 and b_1
        self.linear2 = nn.Linear(d_ff, self.d_model) # W_2 and b_2


    def forward(self,x):
        x = self.linear2(self.dropout(F.relu(self.linear1(x))))
        return x 
    
# model = FFN([1,102,10],512,0.5)
# model = Encoder()
# print(model)
        

## Multihead self-attention 

Attention mechanisms were introduced to give access to all sequence elements at each time step. The key is to be selective and determine which words are most important in a specific context.

Self-attention is a mechanism used in deep learning models, that enhances the information content of an input embedding by incorporating information about the input's context. It allows the model to assign different weights to different words in a sequence, focusing more on relevant parts and less on irrelevant ones, thus enriching the representation of the input sequence. 

Each word in a sequence is transformed into three vectors: Query (Q), Key (K), and Value (V) by multiplying the word's embedding by learnable weights. This process is done to capture various information about the word for Q, K, and V, which are then fed into our attention layer:
- Q-> The query vector that represents the word for which we want to calculate the attention scores. It's the vector that we will compare with other words in the sequence to determine their relevance to the current word.

- K -> The key vector represents the other words in the sequence. Each word has its own key vector. These key vectors are compared with the query vector to determine how relevant each word is to the query word.
- V -> The value vector that carries information about the word itself. After determining the relevance of each word (using keys and queries), these values are combined to create the output. 

Computing the dot product of the Query vector of one word with the Key vector of another word, divided by the square root of the dimensionality of the vectors, produces a score that represents the importance of the relationship between the two words, which is then passed through a softmax function to get attention weights, and finally, these attention weights are used to compute a weighted sum of the Value vectors, providing the context vector.
Finally, the model uses this weighted sum to create a new representation for each word that takes into account its relationship with all the other words in the sentence. This representation captures the context in which the word appears
$$
Attention(Q,K,V) = softmax (\frac{Qk^T}{\sqrt{d_{model}}}) V
$$
Multi-head allows the model to focus on different aspects of the input simultaneously, improving its ability to capture complex relationships within the sequence.
1.	Splitting into Heads: In multi-head self-attention, the input is transformed into multiple smaller representations, called "heads". Each head has its own set of learned weight matrices for query (Q), key (K), and value (V) transformations. These weight matrices are learned during training.
2.	Parallel Computations: Each head performs its own attention calculation independently, resulting in multiple sets of attention scores.
3.	Concatenation and Linear Transformation: After the attention scores are calculated for each head, they are concatenated together and multiplied by a learned weight matrix. This linear transformation ensures that the outputs from different heads are combined appropriately.
$$
MultiHead(Q, K, V ) = Concat(head_1, ..., head_h) W_O
$$
$$
\quad  \textrm{where} \quad  head_i = Attention(QW^Q_i, KW^K_i, VW^V_i)
$$


In [42]:
class MultiHeadAttention(nn.Module):
    def __init__(self, h: int , d_model:int, dropout:float):
        super(MultiHeadAttention,self).__init__()

        self.h = h 
        self.d_model = d_model
 
        # check is it's possible to divide d_model amongst the available heads h 
        assert d_model  % h == 0, "d_model is not divisible by h"
        #split d_model into the multitude of heads 
        d_k = d_model// h

        # linear transformation matrices
        self.w_q = nn.Linear(d_model,d_model)
        self.w_k = nn.Linear(d_model,d_model) 
        self.w_v = nn.Linear(d_model,d_model) 
        self.w_o = nn.Linear(d_model,d_model)

        self.dropout = nn.Dropout(dropout)


    @staticmethod
    def attention(Q,K,V,mask,dropout: nn.Dropout):

        d_k = V.shape[-1]
        #[batch_size, num_heads, seq_len, seq_len]
        attention_values = (Q @ K.Transpose(-2,-1))/ math.sqrt(d_k)

        # MASKED SELF ATTENTION THAT IS USED IN THE DECODER 
        # if mask is not None : 
            # attention_value.fill_mask(mask = 0, -1e9)

        # dim=-1 sp that the softmax function normalizes the scores for each query across all keys
        attention_values = attention_values.softmax(dim = -1) 

        if dropout : 
            attention_values = dropout(attention_values)

        attention_f_values = attention_values @ V

        return  attention_f_values, attention_values



    def forward(self,Q,K,V,mask = None):

        # linear transformation 
        # -> (batch_size, seq_length , d_model)
        Q = self.w_q(Q) 
        K = self.w_k(K)
        V = self.w_v(V)

        # splitting by viewing each matrix as a (batch_size, seq_length, h , d_k)
        # change the shape to (batch__size, num_heads, seq_length, d_k)
        Q = Q.view(Q.shape[0],Q.shape[1], self.h , self.d_k).transpose(1,2)
        K = K.view(K.shape[0],K.shape[1], self.h , self.d_k).transpose(1,2)
        V = V.view(V.shape[0],V.shape[1], self.h , self.d_k).transpose(1,2)

        x, self.attention_values = MultiHeadAttention.attention(Q,K,V,mask,self.dropout)

        # return the shape to (batch_size, seq_length,num_head,s d_k) 
        #Concatenate the results of all the heads. (batch_size, seq_len, d_model)
        x = x.transpose(1,2).contiguous().view(x.shape[0],-1,self.h*self.d_k)

        x = self.w_o(x)

        return x 


## Encoder wrapper 

In [43]:
class EncoderBlock(nn.Module):
    def __init__(self,SelfAttention_block :MultiHeadAttention, FFN_block :FFN ,dropout:float) :
        super(EncoderBlock,self).__init__()

        self.SelfAttention_block = SelfAttention_block 
        self.FFN_block = FFN_block
        #ModuleList for storing and iterating over a list of modules.
        self.AddNorm_block = nn.ModuleList([AddNorm(dropout) for _ in range(2)])

    def forward(self ,x , mask):
        x = self.AddNorm_block[0](x,self.SelfAttention_block(x,x,x,mask))
        x = self.AddNorm_block[1](x,self.FFN_block(x))

        return x 
    
class Encoder(nn.Module):

    def __init__(self, layers :nn.ModuleList,d_model):
        super(Encoder, self).__init__()
        self.layers = layers 
        self.normlayer = nn.LayerNorm(d_model)

    def forward(self,x,mask):
        for layer in self.layers:
            x = layer(x,mask)
        return self.normlayer(x) 


## Decoder wrapper

In [45]:
class DecoderBlock (nn.Module):
    def __init__(self,MaskedSelfAtt: MultiHeadAttention,CrossAttention:MultiHeadAttention, FNN_block : FFN, dropout:float):
        super(DecoderBlock,self).__init__()
        self.MaskedSelfAtt = MaskedSelfAtt
        self.CrossAttention = CrossAttention
        self.FNN_block = FNN_block 
        self.AddNorm = nn.ModuleList([AddNorm(dropout) for _ in range[3]])

        
    def forward(self, x , encoder_output , encoder_mask , decoder_mask ):
        # 2ftkry shofy  leeh hena mstkhdm lambda 
        # x = self.AddNorm[0](x, lambda x : self.MaskedSelfAtt(x,x,x,deco_mask))
        x = self.AddNorm[0](x, self.MaskedSelfAtt(x,x,x,decoder_mask))
        x = self.AddNorm[1](x, self.CrossAttention(x,encoder_output,encoder_output,encoder_mask))
        # leeeh msh wakhod 2y input hena fl FNN 
        x = self.AddNorm[2](x, self.FNN_block)
        return x 
    
class Decoder (nn.Module):
    def __init__(self,layers:nn.ModuleList,d_model):
        super(Decoder,self).__init__()

        self.layers = layers 
        self.normlayer = nn.LayerNorm(d_model)
    def forward (self,x,encoder_output,encoder_mask, decoder_mask):
        for layer in self.layers:
            x = layer(x,encoder_output,encoder_mask, decoder_mask)
        return self.normlayer(x)

## Classification head

In [48]:
class ClassificationHead (nn.Module):
    def __init__(self,d_model,vocab_size):
        super(ClassificationHead,self).__init__()
        self.linear = nn.Linear(d_model, vocab_size)

    def forward (self,x):
        logits = self.linear(x)
        probabilities = F.softmax(logits, dim=-1)
        return probabilities

## Transformer wrapper 

 during inference, you can reuse the output of the encoder in a Transformer model. This is a common practice, especially in sequence-to-sequence tasks like machine translation, where the encoder processes the input sequence once and the decoder generates the output sequence token by token.

In [None]:
class Transformer(nn.Module):

    def __init__(self, encoder:Encoder, decoder:Decoder, input_embedding:InputEmbedding , output_embedding:InputEmbedding, 
                 input_pos : PositionalEncoding,output_pos:PositionalEncoding, classification_head:ClassificationHead) :
        super(Transformer, self).__init__()

        self.encoder = encoder
        self.decoder = decoder 
        self.input_embedding = input_embedding
        self.output_embedding = output_embedding
        self.input_pos = input_pos
        self.output_pos = output_pos
        self.classification_head = classification_head

    def forward(self):
        
        pass