In [None]:
import torch
import math
from torch import nn
import torch.nn.functional as F

In [None]:
def scaled_dot_product(q,k,v,mask=None):
  d_k = q.size()[-1]
  scaled = torch.matmul(q,k.transpose(-1,-2))/math.sqrt(d_k)
  if mask is not None:
    scaled += mask
  attention = F.softmax(scaled,dim=-1)
  values = torch.matmul(attention,v)
  return values,attention

In [None]:
class MultiHeadAttention(nn.Module):
  def __init__(self,d_model,num_heads):
    super().__init__()
    self.d_model = d_model
    self.num_heads = num_heads
    self.head_dim = d_model//num_heads
    self.qkv_layer = nn.Linear(d_model, 3*d_model)
    self.linear_layer = nn.Linear(d_model,d_model)

  def forward(self,x,mask=None):
    batch_size, sequence_length, d_model = x.size()
    qkv = self.qkv_layer(x)
    qkv = qkv.reshape(batch_size, sequence_length, self.num_heads, 3*self.head_dim)
    qkv = qkv.permute(0,2,1,3)
    q,k,v = qkv.chunk(3,dim=-1)
    values,attention = scaled_dot_product(q,k,v,mask)
    values = values.reshape(batch_size,sequence_length,self.num_heads*self.head_dim)
    out = self.linear_layer(values)
    return out

In [None]:
class LayerNormalization(nn.Module):
  def __init__(self,parameters_shape,eps=1e-5):
    super().__init__()
    self.parameters_shape = parameters_shape
    self.eps = eps
    self.gamma = nn.Parameter(torch.ones(parameters_shape))
    self.beta = nn.Parameter(torch.zeros(parameters_shape))

  def forward(self,inputs):
    dims = [-(i+1) for i in range(len(self.parameters_shape))]
    mean = inputs.mean(dim=dims,keepdim=True)
    var = ((inputs-mean)**2).mean(dim=dims,keepdim=True)
    std = (var+self.eps).sqrt()
    y = (inputs-mean)/std
    out = self.gamma*y + self.beta
    return out

In [None]:
class PositionwiseFeedForward(nn.Module):
  def __init__(self,d_model,hidden,drop_prob=0.1):
    super(PositionwiseFeedForward,self).__init__()
    self.linear1 = nn.Linear(d_model,hidden)
    self.linear2 = nn.Linear(hidden,d_model)
    self.relu = nn.ReLU()
    self.dropout = nn.Dropout(p=drop_prob)

  def forward(self,x):
    x = self.linear1(x)
    x = self.relu(x)
    x = self.dropout(x)
    x = self.linear2(x)
    return x

In [None]:
class EncoderLayer(nn.Module):
  def __init__(self,d_model,ffn_hidden,num_heads,drop_prob):
    super(EncoderLayer,self).__init__()
    self.attention = MultiHeadAttention(d_model=d_model,num_heads=num_heads)
    self.norm1 = LayerNormalization(parameters_shape=[d_model])
    self.dropout1 = nn.Dropout(p=drop_prob)
    self.ffn = PositionwiseFeedForward(d_model=d_model,hidden=ffn_hidden,drop_prob=drop_prob)
    self.norm2 = LayerNormalization(parameters_shape=[d_model])
    self.dropout2 = nn.Dropout(p=drop_prob)

  def forward(self,x):
    residual_x = x
    x = self.attention(x,mask=None)
    x = self.dropout1(x)
    x = self.norm1(x+residual_x)
    residual_x = x
    x = self.ffn(x)
    x = self.dropout2(x)
    x = self.norm2(x+residual_x)
    return x

In [None]:
class Encoder(nn.Module):
  # encoder inherit 'Module' class from pytorch
  def __init__(self,d_model,ffn_hidden,num_heads,drop_prob,num_layers):
    super().__init__()
    self.layers = nn.Sequential(*[EncoderLayer(d_model,ffn_hidden,num_heads,drop_prob) for _ in range(num_layers)])

  def forward(self,x):
    x = self.layers(x)
    return x

In [None]:
# defining the parameters
d_model = 512 # size of every single vector throughout he encoder
num_heads = 8 # no. of sets of k,q,v vectors multi-head attention
drop_prob = 0.1 # dropout for reqularization
batch_size = 30 # passing 30 examples same time (Batch Gradient Descent)
max_sequence_length = 200 # maximum length of sentence
ffn_hidden = 2048 # number of neurons in ffnn
num_layers = 5 # number of encoders and decoders in stack

encoder = Encoder(d_model,ffn_hidden,num_heads,drop_prob,num_layers)

In [None]:
encoder

Encoder(
  (layers): Sequential(
    (0): EncoderLayer(
      (attention): MultiHeadAttention(
        (qkv_layer): Linear(in_features=512, out_features=1536, bias=True)
        (linear_layer): Linear(in_features=512, out_features=512, bias=True)
      )
      (norm1): LayerNormalization()
      (dropout1): Dropout(p=0.1, inplace=False)
      (ffn): PositionwiseFeedForward(
        (linear1): Linear(in_features=512, out_features=2048, bias=True)
        (linear2): Linear(in_features=2048, out_features=512, bias=True)
        (relu): ReLU()
        (dropout): Dropout(p=0.1, inplace=False)
      )
      (norm2): LayerNormalization()
      (dropout2): Dropout(p=0.1, inplace=False)
    )
    (1): EncoderLayer(
      (attention): MultiHeadAttention(
        (qkv_layer): Linear(in_features=512, out_features=1536, bias=True)
        (linear_layer): Linear(in_features=512, out_features=512, bias=True)
      )
      (norm1): LayerNormalization()
      (dropout1): Dropout(p=0.1, inplace=False)
 

In [None]:
class MultiHeadCrossAttention(nn.Module):
  def __init__(self,d_model,num_heads):
    super().__init__()
    self.d_model = d_model
    self.num_heads = num_heads
    self.head_dim = d_model//num_heads
    self.kv_layer = nn.Linear(d_model,2*d_model)
    self.q_layer = nn.Linear(d_model,d_model)
    self.linear_layer = nn.Linear(d_model,d_model)

  def forward(self,x,y,mask=None):
    batch_size, sequence_length, d_model = x.size()
    kv = self.kv_layer(x)
    q = self.q_layer(y)
    kv = kv.reshape(batch_size, sequence_length, self.num_heads, 2*self.head_dim)
    q = q.reshape(batch_size, sequence_length, self.num_heads, self.head_dim)
    kv = kv.permute(0,2,1,3)
    q = q.permute(0,2,1,3)
    k,v = kv.chunk(2,dim=-1)
    values,attention = scaled_dot_product(q,k,v,mask)
    values = values.reshape(batch_size,sequence_length,d_model)
    out = self.linear_layer(values)
    return out

In [None]:
class DecoderLayer(nn.Module):
  def __init__(self,d_model,ffn_hidden,num_heads,drop_prob):
    super(DecoderLayer,self).__init__()
    self.self_attention = MultiHeadAttention(d_model=d_model,num_heads=num_heads)
    self.norm1 = LayerNormalization(parameters_shape=[d_model])
    self.dropout1 = nn.Dropout(p=drop_prob)
    self.encoder_decoder_attention = MultiHeadCrossAttention(d_model=d_model,num_heads=num_heads)
    self.norm2 = LayerNormalization(parameters_shape=[d_model])
    self.dropout2 = nn.Dropout(p=drop_prob)
    self.ffn = PositionwiseFeedForward(d_model=d_model,hidden=ffn_hidden,drop_prob=drop_prob)
    self.norm3 = LayerNormalization(parameters_shape=[d_model])
    self.dropout3 = nn.Dropout(p=drop_prob)

  def forward(self, x, y, decoder_mask):
      _y = y # 30 x 200 x 512
      print("MASKED SELF ATTENTION")
      y = self.self_attention(y, mask=decoder_mask) # 30 x 200 x 512
      print("DROP OUT 1")
      y = self.dropout1(y) # 30 x 200 x 512
      print("ADD + LAYER NORMALIZATION 1")
      y = self.norm1(y + _y) # 30 x 200 x 512

      _y = y # 30 x 200 x 512
      print("CROSS ATTENTION")
      y = self.encoder_decoder_attention(x, y, mask=None) #30 x 200 x 512
      print("DROP OUT 2")  #30 x 200 x 512
      y = self.dropout2(y)
      print("ADD + LAYER NORMALIZATION 2")
      y = self.norm2(y + _y)  #30 x 200 x 512

      _y = y  #30 x 200 x 512
      print("FEED FORWARD 1")
      y = self.ffn(y) #30 x 200 x 512
      print("DROP OUT 3")
      y = self.dropout3(y) #30 x 200 x 512
      print("ADD + LAYER NORMALIZATION 3")
      y = self.norm3(y + _y) #30 x 200 x 512
      return y #30 x 200 x 512



In [None]:
class SequentialDecoder(nn.Sequential):
  def forward(self,*inputs):
    x,y,mask = inputs
    for module in self._modules.values():
      y = module(x,y,mask)
    return y

In [None]:
class Decoder(nn.Module):
  def __init__(self,d_model,ffn_hidden,num_heads,drop_prob,num_layers=1):
    super().__init__()
    self.layers = SequentialDecoder(*[DecoderLayer(d_model,ffn_hidden,num_heads,drop_prob) for _ in range(num_layers)])
  def forward(self,x,y,mask):
    # x:30*200*512
    # y:30*200*512
    # mask:200*200
    y = self.layers(x,y,mask)
    return y

In [None]:
# defining the parameters
d_model = 512 # size of every single vector throughout he encoder
num_heads = 8 # no. of sets of k,q,v vectors multi-head attention
drop_prob = 0.1 # dropout for reqularization
batch_size = 30 # passing 30 examples same time (Batch Gradient Descent)
max_sequence_length = 200 # maximum length of sentence
ffn_hidden = 2048 # number of neurons in ffnn
num_layers = 5 # number of encoders and decoders in stack

In [None]:
x = torch.randn((batch_size,max_sequence_length,d_model)) # encoded english sentence
y = torch.randn((batch_size,max_sequence_length,d_model)) # encoded hindi sentence
mask = torch.full([max_sequence_length,max_sequence_length], float('-inf'))
mask = torch.triu(mask,diagonal=1)
decoder = Decoder(d_model,ffn_hidden,num_heads,drop_prob,num_layers)
out = decoder(x,y,mask)

MASKED SELF ATTENTION
DROP OUT 1
ADD + LAYER NORMALIZATION 1
CROSS ATTENTION
DROP OUT 2
ADD + LAYER NORMALIZATION 2
FEED FORWARD 1
DROP OUT 3
ADD + LAYER NORMALIZATION 3
MASKED SELF ATTENTION
DROP OUT 1
ADD + LAYER NORMALIZATION 1
CROSS ATTENTION
DROP OUT 2
ADD + LAYER NORMALIZATION 2
FEED FORWARD 1
DROP OUT 3
ADD + LAYER NORMALIZATION 3
MASKED SELF ATTENTION
DROP OUT 1
ADD + LAYER NORMALIZATION 1
CROSS ATTENTION
DROP OUT 2
ADD + LAYER NORMALIZATION 2
FEED FORWARD 1
DROP OUT 3
ADD + LAYER NORMALIZATION 3
MASKED SELF ATTENTION
DROP OUT 1
ADD + LAYER NORMALIZATION 1
CROSS ATTENTION
DROP OUT 2
ADD + LAYER NORMALIZATION 2
FEED FORWARD 1
DROP OUT 3
ADD + LAYER NORMALIZATION 3
MASKED SELF ATTENTION
DROP OUT 1
ADD + LAYER NORMALIZATION 1
CROSS ATTENTION
DROP OUT 2
ADD + LAYER NORMALIZATION 2
FEED FORWARD 1
DROP OUT 3
ADD + LAYER NORMALIZATION 3
