In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import math

In [None]:
# input Embedding class: Here in nn module we have inbuilt embedding function
#  we have used to embed based on vocabulary size and length of the vector
class InputEmbedding(nn.Module):
  def __init__(self, vocab_size: int, d_model: int):
    super().__init__()
    self.d_model = d_model
    self.vocab_size = vocab_size
    self.embedding= nn.Embedding(vocab_size, d_model)

  def forward(self, x):
    return self.embedding(x) * math.sqrt(self.d_model)


In [None]:
# Positional Encoding class
# As we know the positional encoding will not change through traininig
# We have fixed formulas for that for even position we have sin and for odd we have cos function formulas
class PositionalEncoding(nn.Module):
  def __init__(self, d_model: int, dropout: float =0.1,max_len: int=5000):
    super().__init()
    self.d_model=d_model
    self.max_len=max_len
    self.dropout=nn.dropout(dropout)
    pe=torch.zeros(max_len, d_model)
    position=torch.arange(0,max_len, dtype=torch.float).unsqueeze(1)
    dim_term=torch.exp(torch.arange(0,d_model,2).float() *(-math.log(10000.0)/ d_model))
    pe[:,0::2]= torch.sin(position* dim_term)
    pe[:,1::2]= torch.cos(position* dim_term)

    pe= pe.unsqueeze(0)

    self.register_buffer("pe", pe)

    def forward(self,x):
      x= x+ (self.pe[:,:x.shape[1],:]).requires_grad_(False)
      return self.dropout(x)





In [None]:
# Layer Normalization class
class Layer_Normalization(nn.Module):
  def __init__(self, eps: float = 10**-6):
    super().__init__()
    self.eps=eps
    self.alpha= nn.parameter(torch.ones(1))
    self.bias= nn.parameter(torch.zeros(1))

  def forward(self,x):
    mean= x.mean(dim=-1,keepdim=True)
    std= x.std(dim=-1,keepdim=True)
    return self.alpha *(x-mean) /(std-self.eps) + self.bias





In [None]:
# Feed Forward Layer
class FeedForwardLayer(nn.Module):
  def __init__(self, d_model: int, d_ff: int, dropout: float = 0.1):
    super().__init__()
    self.Linear_1= nn.Linear(d_model, d_ff)
    self.Dropout= nn.Dropout(dropout)
    self.Linear_2= nn.Linear(d_ff, d_model)

  def forward(self,x):
     x= self.Linear_1(x)
     x= F.relu(x)
     x= self.Dropout(x)
     x= self.Linear_2(x)
     return x


In [None]:
#Multi Head Attention
class MultiHeadAttention(nn.Module):
  def __init__(self, d_model: int, h : int, dropout: float):
    super().__init__()
    self.d_model=d_model
    self.h=h
    assert d_model % h ==0,"d_model is not divisible by h"
    self.d_k =d_model // h

    self.w_q=nn.Linear(d_model,d_model)
    self.w_v=nn.Linear(d_model,d_model)
    self.w_k=nn.Linear(d_model,d_model)

    self.w_o = nn.Linear(d_model, d_model)
    self.dropout ==nn.Dropout(dropout)

  @staticmethod
  def attention(query,key,value,mask, dropout: nn.Dropout):
    d_k= query.shape[-1]

    attention_scores =(query @ key.transpose(-2,-1))/math.sqrt(d_k)
    if mask is not None:
      attention_scores.masked_fill_(mask==0, -1e9)
    attention_scores= attention_scores.softmax(dim=-1)
    if dropout is not None:
      attention_scores= dropout(attention_scores)

    return (attention_scores @ value), attention_scores

  def forward(self, q,k,v, mask):
    query=self.w_q(q)
    key=self.w_k(k)
    value=self.w_v(v)

    query= query.view(query.shape[0],query.shape[1],self.h, self.d_k).transpose(1,2)
    key= key.view(key.shape[0],key.shape[1],self.h, self.d_k).transpose(1,2)
    value=value.view(value.shape[0],value.shape[1],self.h, self.d_k).transpose(1,2)

    x,self.attention_scores = MultiHeadAttention.attention(query, key, value, mask, self.dropout)

    x= x.transpose(1,2).contiguous()
    x= x.view(x.shape[0],-1,self.h*self.d_k)
    x= self.w_o(x)
    return x




In [None]:
# Residual connection
class ResidualConnection(nn.Module):
  def __init__(self,dropout: float):
    super().__init()
    self.dropout=nn.Dropout(dropout)
    self.norm=Layer_Normalization()

  def forward(self, x, sublayer):
    return x+ self.dropout(sublayer(self.norm(x)))

In [None]:
#EncoderLayer
class EncoderBlock(nn.Module):
  def __init__(self,self_attention_block: MultiHeadAttention, feed_forward_layer:FeedForwardLayer, dropout:float):
    super().__init__()
    self.self_attention_block=self_attention_block
    self.feed_forward_layer=feed_forward_layer
    self.residual_connection=nn.ModuleList([ResidualConnection(dropout) for _ in range(2)])

  def forward(self, x, mask):
    x= self.residual_connection[0](x, lambda x: self.self_attention_block(x,x,x,mask))
    x= self.residual_connection[1](x, self.feed_forward_layer)
    return x

class Encoder(nn.Module):
  def __init__(self, layers: nn.ModuleList):
    super().__init__()
    self.layers=layers
    self.norm=Layer_Normalization()

  def forward(self, x, mask):
    for layer in self.layers:
      x= layer(x,mask)
    return self.norm(x)



In [None]:
#Decoder

class DecoderBlock(nn.Module):
 def __init__(self,self_attention_block: MultiHeadAttention, feed_forward_layer:FeedForwardLayer, dropout:float):
    super().__init__()
    self.self_attention_block=self_attention_block
    self.feed_forward_layer=feed_forward_layer
    self.residual_connection=nn.ModuleList([ResidualConnection(dropout) for _ in range(3)])
 def forward(self, x,encoder_output, src_mask,trt_mask):
    x= self.residual_connection[0](x, lambda x: self.self_attention_block(x,x,x,trt_mask))
    x= self.residual_connection[1](x, lambda x: self.self_attention_block(x,encoder_output,encoder_output,src_mask))
    x= self.residual_connection[2](x, self.feed_forward_layer)
    return x

class Encoder(nn.Module):
  def __init__(self, layers: nn.ModuleList):
    super().__init__()
    self.layers=layers
    self.norm=Layer_Normalization()

  def forward(self, x,encoder_output, src_mask,trt_mask):
    for layer in self.layers:
      x= layer(x,encoder_output, src_mask,trt_mask)
    return self.norm(x)

In [None]:
#ProjectionLayer
class projectionLayer(nn.Module):
  def __init__(self,d_model: int, vocab_size: int):
    super().__Init__()
    self.linear=nn.Linear(d_model,vocab_size)

  def forward(self,x):
    return F.log_softmax(self.linear(x),dim=-1)

In [None]:
#Transformer

class transformer(nn.Module):
  def __init__(self,encoder: Encoder,decoder: Decoder, projection_layer: projectionLayer,src_emb: InputEmbedding, trt_emb: InputEmbedding,src_pos: PositionalEncoding, trt_emb: PositionalEncoding):
    super().__init__()
    self.encoder=encoder
    self.decoder=decoder
    self.projection_layer=projection_layer
    self.src_emb=src_emb
    self.trt_emb=trt_emb
    self.src_pos=src_pos
    self.trt_pos=trt_pos
  def encoder_forward(self,src,src_mask):
    src= self.src_emb(src)
    src= self.src_pos(src)
    return self.encoder(src,src_mask)
  def trt_forward(self,trt,trt_mask):
    trt= self.trt_emb(trt)
    trt= self.trt_pos(trt)
    return self.decoder(trt,encoder_output,src_mask,trt_mask)
  def project(self,x):
    return self.projection_layer(x)
  def build_Transformer(src_vocab_size: int,trt_vocab_size: int, d_model: int= 512,src_seq_len: int,tgt_seq_len: int, dropout: float=0.01, d_ff: int=2048, N:int= 6,h:int =8)-> Transformer:
    src_embed= InputEmbedding(src_vocab_size,d_model)
    trt_embed= InputEmbedding(trt_vocab_size,d_model)
    src_pos= PositionalEncoding(d_mode,src_seq_len,dropout)
    trt_pos= PositionalEncoding(d_model,tgt_seq_len,dropout)

    encoder_blocks=[]
    for _ in range(N):
      encoder_self_attention_block= MultiHeadAttention(d_model,h,dropout)
      feed_forward_block= FeedForwardLayer(d_model,d_ff,dropout)
      encoder_block= EncoderBlock(encoder_self_attention_block,feed_forward_block,dropout)
      encoder_blocks.append(encoder_block)
    decoder_blocks=[]
    for _ in range(N):
      encoder_self_attention_block= MultiHeadAttention(d_model,h,dropout)
      decoder_cross_attention_block= MultiHeadAttention(d_model,h,dropout)
      feed_forward_block= FeedForwardLayer(d_model,d_ff,dropout)
      decoder_block= DecoderBlock(decoder_cross_attention_block,feed_forward_block,dropout)

      decoder_blocks.append(decoder_block)

      ecoder= Encoder(nn.ModuleList(encoder_blocks))
      decoder= Decoder(nn.ModuleList(decoder_blocks))

      projection_layer= projectionLayer(d_model,trt_vocab_size)
      transformer= Transformer(encoder,decoder,projection_layer,src_embed,trt_embed,src_pos,trt_pos)
      #initialize parameters
      for p in transformer.parameters():
      if p.dim()>1:
        nn.init.xavier_uniform_(p)
      return transformer
