In [None]:
import torch
import math
from torch import nn
import torch.nn.functional as F

In [None]:
def scaled_dot_product(q, k , v , mask = None):
  #q, k, v = 30 x 8 x200 x 64
  d_k = q.size()[-1] #64
  scaled = torch.matmul(q, k.transpose(-1, -2)) / math.sqrt(d_k) #30 x 8 x 200 x 200
  print(f"scaled.size(): {scaled.size()}")
  if mask is not None:
    print(f"--Adding Mask of shape{mask.size()} --")
    scaled += mask
  attention =F.softmax(scaled, dim = -1)  # 30 x 8 x 200 x 200
  values = torch.matmul(attention, v)      # 30 x 8 x 200x 64(200x200 * 200x64)
  return values, attention


In [None]:
class MultiHeadAttention(nn.Module):

  def __init__(self, d_model, num_heads):
    super().__init__()
    self.d_model = d_model      #512
    self.num_heads = num_heads    #8
    self.head_dim = d_model // num_heads  #512/8 = 64
    self.qkv_layer = nn.Linear(d_model, 3*d_model)  # 512x 1536(512*3)
    self.linear_layer = nn.Linear(d_model, d_model)  #512 x 512

  def forward(self, x, mask = None):
    batch_size, max_sequence_length, d_model = x.size() #30 x 200 x 512
    print(f"x.size(): {x.size()}")

    qkv = self.qkv_layer(x)   #30 x 200 x 1536
    print(f"qkv.size(): {qkv.size()}")

    qkv = qkv.reshape(batch_size, max_sequence_length, self.num_heads, 3*self.head_dim)
    print(f"qkv.size(): {qkv.size()}") # 30 x 200 x 8 x 192(64*3)

    qkv = qkv.permute(0, 2, 1, 3) # 30 x 8 x 200 x 192
    print(f"qkv.size(): {qkv.size()}")

    q, k, v = qkv.chunk(3, dim = -1) #each 30 x 8 x 200 x 64
    print(f"q size: {q.size()}, k size: {k.size()}, v size: {v.size()}, ")

    values, attention = scaled_dot_product(q, k, v, mask)
    #attention = 30x 8 x 200 x 200 , values = 30x8x200x64
    print(f"values.size(): {values.size()}, attention.size:{ attention.size()} ")

    values = values.reshape(batch_size, max_sequence_length, self.num_heads * self.head_dim)
    print(f"values.size(): {values.size()}") # 30 x 200 x 512

    out = self.linear_layer(values)
    print(f"out.size(): {out.size()}")

    return out

In [None]:
class LayerNormalization(nn.Module):

  def __init__(self, parameters_shape, eps = 1e-5):
    super().__init__()
    self.parameters_shape = parameters_shape  #512
    self.eps = eps
    self.gamma = nn.Parameter(torch.ones(parameters_shape))  #[512]
    self.beta = nn.Parameter(torch.zeros(parameters_shape))   #[512]

  def forward(self, inputs):  #30x200x512
    dims = [-(i+1)for i in range(len(self.parameters_shape))]   #[-1]
    mean = inputs.mean(dim = dims, keepdim = True)              # 30 x200 x1
    print(f"Mean ({mean.size()})")

    var = ((inputs - mean)**2).mean(dim = dims, keepdim= True)   #30x200x1
    std = (var + self.eps).sqrt()                                #30x200x1
    print(f"Standard Deviation  ({std.size()})")

    y = (inputs - mean)/ std                                    #30x200x512
    print(f"y: {y.size()}")

    out = self.gamma*y + self.beta
    print(f"self.gamma: {self.gamma.size()}, self.beta: {self.beta.size()}")
    print(f"out: {out.size()}")

    return out



In [None]:
class PosionwiseFeedForward(nn.Module):

  def __init__(self, d_model, hidden, drop_prob = 0.1):
    super(PosionwiseFeedForward, self).__init__()
    self.linear1 = nn.Linear(d_model, hidden)
    self.linear2 = nn.Linear(hidden, d_model)
    self.relu = nn.ReLU()
    self.dropout = nn.Dropout(p = drop_prob)

  def forward(self, x):
    x = self.linear1(x)
    print(f"x after first linear layer: {x.size()}")

    x = self.relu(x)
    print(f"x after activation: {x.size()}")

    x = self.dropout(x)
    print(f"x after dropout: {x.size()}")

    x = self.linear2(x)
    print(f"x after 2nd linear layer: {x.size()}")

    return x

In [None]:
class EncoderLayer(nn.Module):
  def __init__(self, d_model ,ffn_hidden, num_heads, drop_prob):
    super(EncoderLayer, self).__init__()
    self.attention = MultiHeadAttention(d_model=d_model, num_heads= num_heads)
    self.norm1 = LayerNormalization(parameters_shape=[d_model])
    self.dropout1 = nn.Dropout(p=drop_prob)

    self.ffn = PosionwiseFeedForward(d_model = d_model, hidden= ffn_hidden, drop_prob= drop_prob)
    self.norm2 = LayerNormalization(parameters_shape=[d_model])
    self.dropout2 = nn.Dropout(p = drop_prob)

  def forward(self, x):
    residual_x = x      #30 x 200 x 512

    print("-----------ATTENTION 1-------------")
    x = self.attention(x, mask = None)     # 30 x200 x 512

    print("--------------DROPOUT 1 ------------")
    x = self.dropout1(x)                   #30 x 200 x 512

    print("----------ADD and NORM layer--------")
    x = self.norm1(x+ residual_x)

    print("------------ATTENTION 2-------------")
    x = self.ffn(x)

    print("------------DROPOUT 2 --------------")
    x = self.dropout2(x)

    print("----------ADD and NORM layer 2------")
    x = self.norm2(x + residual_x)

    return x


In [None]:
class Encoder(nn.Module):
  def __init__(self, d_model, ffn_hidden, num_heads, drop_prob, num_layers):
    super().__init__()
    self.layers = nn.Sequential(*[EncoderLayer(d_model, ffn_hidden, num_heads, drop_prob)
                                   for _ in range(num_layers)])

  def forward(self, x):
    x = self.layers(x)
    return x


In [None]:
d_model = 512
num_heads = 8
drop_prob = 0.1
batch_size = 30
max_sequence_length = 200
ffn_hidden = 2048
num_layers = 5

encoder = Encoder(d_model, ffn_hidden, num_heads, drop_prob, num_layers)

In [None]:
#test input - includes positional encoding

x = torch.randn( (batch_size, max_sequence_length, d_model))
out = encoder(x)

-----------ATTENTION 1-------------
x.size(): torch.Size([30, 200, 512])
qkv.size(): torch.Size([30, 200, 1536])
qkv.size(): torch.Size([30, 200, 8, 192])
qkv.size(): torch.Size([30, 8, 200, 192])
q size: torch.Size([30, 8, 200, 64]), k size: torch.Size([30, 8, 200, 64]), v size: torch.Size([30, 8, 200, 64]), 
scaled.size(): torch.Size([30, 8, 200, 200])
values.size(): torch.Size([30, 8, 200, 64]), attention.size:torch.Size([30, 8, 200, 200]) 
values.size(): torch.Size([30, 200, 512])
out.size(): torch.Size([30, 200, 512])
--------------DROPOUT 1 ------------
----------ADD and NORM layer--------
Mean (torch.Size([30, 200, 1]))
Standard Deviation  (torch.Size([30, 200, 1]))
y: torch.Size([30, 200, 512])
self.gamma: torch.Size([512]), self.beta: torch.Size([512])
out: torch.Size([30, 200, 512])
------------ATTENTION 2-------------
------------DROPOUT 2 --------------
----------ADD and NORM layer 2------
Mean (torch.Size([30, 200, 1]))
Standard Deviation  (torch.Size([30, 200, 1]))
y: tor