In [1]:
import torch
from torch import nn
import numpy as np
import tensorflow as tf

***Encoder part:***


```
      1.Multihead attention
      2.Positional encoding
      3.layer normalization
      4.Feed Forward Neural Network

      1,2,3,4 --> encoding layer

      encoder --> num_encodinglayer >=1



**Multihead Attention**
```
linear transformation of input into query , key , value vectors.
split it for each heads.
scaled dot product attention in each head parallelly.
concatenate all attention vectors.
linear transform it.

Goal is to obtain attention matrix.
```




In [36]:
class MultiHeadAttention(nn.Module):
      def __init__(self, input_dim, d_model, num_heads):
        super().__init__()
        self.input_dim = input_dim
        self.d_model = d_model
        self.num_heads = num_heads
        self.head_dim = d_model // num_heads
        self.qkv_layer = nn.Linear(input_dim , 3*d_model) #linear layer for query,key,value
        self.linear_layer = nn.Linear(d_model , d_model)#linear layer for final transformation

        assert d_model % num_heads ==0, "d_model must be divisible by num_heads!"

      def __Softmax(self, x): #prefixed with __ to prevent from calling object.softmax
          epsilon = 1e-9 #adding small epsilon so that denominator doesnt become zero and the final value doesnt becomes nan!
          x0 = x - torch.max(x, dim = -1, keepdims = True)[0]# x - max(x) for numerical stability
          return torch.softmax(x0 + epsilon, dim = -1)


      def __Scaled_Dot_Product_Attention(self, q, k, v, mask = False):
          d_k = k.size(-1)
          qk = torch.matmul(q, k.transpose(-2,-1))/torch.sqrt(torch.tensor(d_k, dtype = torch.float32)) #torch.sqrt requires tensor format data
          if mask == True:
              mask_matrix = torch.tril(torch.ones(qk.size(-2),qk.size(-1)),1)
              mask_matrix = mask_matrix.to(qk.device) #setting mask matrix to be in the same device as the qk matrix. device = either cpu,gpu or memory
              qk = qk + (1.0 - mask_matrix) * -1e9 #matrix inversion and largely negative value instead of -inf
          attention_weights = self.__Softmax(qk)
          attention_matrix = torch.matmul(attention_weights, v)
          return attention_matrix, attention_weights

      def forward(self , x , mask = False):
          batch_size , seq_len , input_dim = x.size()
          #print(f"x shape:{x.size()}")
          qkv = self.qkv_layer(x)
          #print(f"qkv shape:{qkv.size()}")
          qkv = qkv.reshape(batch_size , seq_len , self.num_heads , 3*self.head_dim)
          #print(f"qkv shape after splitting heads:{qkv.size()}")
          qkv = qkv.permute(0,2,1,3)
          #print(f"qkv shape after permuting seq_len and num_heads:{qkv.size()}")
          q,k,v = qkv.chunk(3 , dim=-1)
         # print(f"q shape:{q.size()} \nk shape:{k.size()} \nv shape():{v.size()}")
          if mask not in [True,False,0,1]:
            mask = False # defaulting mask = False if invalid value for mask.
          if mask == True:
              attention, atn_weights = self.__Scaled_Dot_Product_Attention(q, k, v, mask = True)
          else:
              attention, atn_weights = self.__Scaled_Dot_Product_Attention(q, k, v)
          #print(f"attention shape:{attention.size()}")
          attention = attention.reshape(batch_size, seq_len, self.num_heads * self.head_dim)
          #print(f"attention shape after concatenation:{attention.size()}")
          output = self.linear_layer(attention)
          return output

      def forward_cross_attention(self, q, k, v, mask = False):
          batch_size, seq_len, _ = q.size()
          if mask not in [True,False,0,1]:
              mask = False # defaulting mask = False if invalid value for mask.
          if mask == True:
              attention, atn_weights = self.__Scaled_Dot_Product_Attention(q, k, v, mask = True)
          else:
              attention, atn_weights = self.__Scaled_Dot_Product_Attention(q, k, v)
          #print(f"attention shape:{attention.size()}")
          attention = attention.reshape(batch_size, seq_len, self.num_heads * self.head_dim) #Concatenation
          #print(f"attention shape after concatenation:{attention.size()}")
          output = self.linear_layer(attention)
          return output






**Positional Encoding**

```
create a positional encoding matrix using the input embedding and add with the input embedding

```

In [3]:
class PositionalEncoding(nn.Module):
      def __init__(self, input_dim):
        super().__init__()
        self.input_dim = input_dim


      def __padding(self,x, max_seq_len):#padding the input embedding
        padded = torch.zeros((x.size(0), max_seq_len, x.size(2)))
        for i, seq in enumerate(x):
          padded[i,:len(seq)] = seq #padding if each sequences are of different length, but this is inefficient for large datasets because of looping
        return padded


      def forward(self, x):
          d_model = self.input_dim
          max_seq_len = max(len(seq) for seq in x)
          x = self.__padding(x, max_seq_len)
          positional_matrix = torch.zeros(max_seq_len, d_model)
          for pos in range(max_seq_len):
            for i in range(0,d_model,2):
              positional_matrix[pos][i] = torch.sin(pos/torch.pow(torch.tensor(10000), i/d_model))
              if i+1 < d_model:
                positional_matrix[pos][i+1] = torch.cos(pos/torch.pow(torch.tensor(10000), i/d_model))

          #print(positional_matrix.size())
          pos_encoded_matrix = x + positional_matrix
          return pos_encoded_matrix

**Addition and layer normalization**

```
the output of the attention layer and the data from residual/skip connection
is added and normalized
```

In [4]:
class AddNorm(nn.Module):
    def __init__(self, d_model, epsilon = 1e-9):
      super().__init__()
      self.gamma = nn.Parameter(torch.ones(d_model))
      self.beta = nn.Parameter(torch.zeros(d_model))
      self.eps = epsilon

    def __normalize(self,x):
      mean = x.mean(dim = -1, keepdims = True)
      var = x.var(dim = -1, keepdims = True, correction = 0) #correction=0 to disable bessel's correction(divide by n-1 instead of).
      norm = (x - mean)/torch.sqrt(var + self.eps)
      return norm


    def forward(self, x1, x2):
      assert x1.size()==x2.size(),"dimensionality of attention output and residual connection data should be strictly same!"
      return self.gamma * self.__normalize(x1 + x2) + self.beta


**Feed Forward Neural Network**
```
transform the data to higher dimension to capture complex information.
relu activation to introduce non-linearity.
transform back to lower dimension of d_model.
```

In [5]:
class FeedForwardNetwork(nn.Module):
  def __init__(self,d_model, d_ffn):
    super().__init__()
    self.d_ffn = d_ffn
    self.d_model = d_model
    self.ffn = nn.Sequential(
        nn.Linear(d_model, d_ffn),
        nn.ReLU(),
        nn.Linear(d_ffn, d_model)
    )

  def forward(self, x):
      return self.ffn(x)


**Encoding layer**
```
structuring the
1.positional encoding
2.multihead attention
3.add and normalization
4.feed forward neural network

```

In [6]:
class EncodingLayer(nn.Module):
    def __init__(self, input_dim, d_model, num_heads, d_ffn):
      super().__init__()
      self.input_dim = input_dim
      self.d_model = d_model
      self.num_heads = num_heads
      self.d_ffn = d_ffn

      self.positional_encoder = PositionalEncoding(input_dim = self.input_dim)
      self.multihead_attention = MultiHeadAttention(input_dim = self.input_dim, d_model = self.d_model, num_heads = self.num_heads)
      self.add_norm1 = AddNorm(d_model = self.d_model)
      self.ff_network = FeedForwardNetwork(d_model = self.d_model, d_ffn = self.d_ffn)
      self.add_norm2 = AddNorm(d_model = self.d_model)

    def forward(self, x, mask = False):
      """ x ---> Input Embedding """
      x = self.positional_encoder.forward(x)
      mha_x = self.multihead_attention.forward(x , mask)
      addnorm1 = self.add_norm1.forward(x1 = x, x2 = mha_x)
      ffnn_x = self.ff_network.forward(x = addnorm1)
      encoder_output = self.add_norm2.forward(x1 = addnorm1, x2 = ffnn_x)
      return encoder_output




**Transformer Encoder**


one or more encoder layers

In [7]:
class Encoder(nn.Module):
    def __init__(self, num_enc_layers, input_dim, d_model, num_heads, d_ffn):
      super().__init__()

      self.layers = nn.ModuleList([EncodingLayer(input_dim, d_model, num_heads, d_ffn) for _ in range(num_enc_layers)])

    def forward(self, x, mask = False):
      for layer in self.layers:
          x = layer(x , mask)
      return x

**Decoder layer**
```
structuring:
1. Positional encoder for decoder input.
2. multihead attention with mask
3. layer normalization.
4. multihead attention without mask with encoder output and decoder data as input
5. layer normalization
6. feed forward network
7. layer normalization.


In [8]:
class DecodingLayer(nn.Module):
    def __init__(self, input_dim, d_model, num_heads, d_ffn):
      super().__init__()
      self.positional_encoder = PositionalEncoding(input_dim)
      self.self_attention = MultiHeadAttention(input_dim, d_model, num_heads)
      self.add_norm1 = AddNorm(d_model)
      self.cross_attention = MultiHeadAttention(input_dim, d_model, num_heads)
      self.add_norm2 = AddNorm(d_model)
      self.ff_network = FeedForwardNetwork(d_model, d_ffn)
      self.add_norm3 = AddNorm(d_model)

    def forward(self, x, enc_output, src_mask = False, tgt_mask = False):
        x = self.positional_encoder(x)
        mha_x = self.self_attention(x, tgt_mask)
        addnorm1 = self.add_norm1(x, mha_x)
        cross_atn_x = self.cross_attention.forward_cross_attention(addnorm1, enc_output, enc_output, src_mask)
        addnorm2 = self.add_norm2(addnorm1, cross_atn_x)
        ffnn_x = self.ff_network(addnorm2)
        decoder_output = self.add_norm3(addnorm2, ffnn_x)

        return decoder_output

**Decoder**

In [9]:
class Decoder(nn.Module):
    def __init__(self,num_dec_layers, input_dim, d_model, num_heads, d_ffn):
      super().__init__()
      self.layers = nn.ModuleList([DecodingLayer(input_dim, d_model, num_heads, d_ffn) for _ in range(num_dec_layers)])

    def forward(self, x, enc_output, src_mask = False, tgt_mask = False):
        for layer in self.layers:
            x = layer(x, enc_output, src_mask, tgt_mask)
        return x

**Transformer Structure**

```
structuring encoder,decoder
```

In [21]:
class Transformer(nn.Module):
    def __init__(self, vocab, src_vocab_size, tgt_vocab_size, input_dim, d_model, num_heads, d_ffn, num_enc_layers = 1, num_dec_layers = 1,):
      super().__init__()

      self.encoder = Encoder(num_enc_layers = num_enc_layers, input_dim = input_dim, d_model = d_model, num_heads = num_heads, d_ffn = d_ffn)
      self.decoder = Decoder(num_dec_layers= num_dec_layers, input_dim = input_dim, d_model = d_model, num_heads = num_heads, d_ffn = d_ffn)

      self.src_embedding = nn.Embedding(src_vocab_size, d_model)
      self.tgt_embedding = nn.Embedding(tgt_vocab_size, d_model)

      self.final_linear = nn.Linear(d_model, d_model)

      self.start_token = vocab['<START>']
      self.end_token = vocab['<END>']

    def forward(self, src, tgt):
        src = self.src_embedding(src)
        tgt = self.tgt_embedding(tgt)

        enc_output = self.encoder(src)
        dec_output = self.decoder(tgt, enc_output, tgt_mask = True)

        out = self.final_linear(dec_output)

        out = nn.functional.softmax(out, dim = -1)
        return out

    def encode(self, src):
        enc_out = self.encoder(src)

        return enc_out

    def decode(self, tgt, memory):
        dec_out = self.decoder(tgt, memory, tgt_mask = True)
        dec_out = self.final_linear(dec_out)
        return nn.functional.softmax(dec_out, dim = -1)

    def generate(self, src, max_len):
      memory = self.encode(src)
      ys = torch.ones([[1],[1]]).fill_(self.start_token).type_as(src.data)
      for _ in range(max_len - 1):
        out = self.decode(ys, memory)
        next_word = out[:, -1].argmax(dim=-1)
        ys = torch.cat([ys, next_word.unsqueeze(1)], dim = 1)
        if next_word == self.end_token:
          break
      return ys
