In [None]:
import torch
import torch.nn as nn

In [None]:
def scaled_dot_product(q,k,v, mask=None):
  #each 30 X 8 X 200 X 64
  d_k = q.size()[-1] #64
  scaled = torch.matmul(q,k.transpose(-1,-2))/torch.sqrt(d_k)
  if mask is not None:
    scaled += mask #won't be used because we are in encoder so we have all the dictionary
  attention = torch.softmax(scaled, dim=-1) #30 X 8 X 200 X 200
  output = torch.matmul(attention,v) # for every batch 30 X 8 X 200 X 64
  return output, attention

In [None]:
class MultiHeadAttention(nn.Module):
  def __init__(self, d_model, num_heads):
    super(MultiHeadAttention, self).__init__()
    self.d_model = d_model #512
    self.num_heads = num_heads #8
    self.head_dim = d_model // num_heads #64
    self.qkv_layer = nn.Linear(d_model, 3*d_model) #in parallel (512 x 1536)
    self.linear_layer = nn.Linear(d_model, d_model) #512 x 512

    def forward(self, x, mask=None):
      batch_size, max_sequence_length, d_model = x.size() #30 x 200 x 512
      print(f"x.size(): {x.size()}")
      qkv = self.qkv_layer(x) #30 x 200 x 1536
      print(f"qkv.size(): {qkv.size()}")
      qkv = qkv.reshape(batch_size, max_sequence_length, self.num_heads, 3 * self.head_dim) #30 x 200 x 8 x 192
      print(f"qkv.size(): {qkv.size()}")
      qkv = qkv.permute(0, 2, 1, 3) # 30 x 8 x 200 x 192
      print(f"qkv.size(): {qkv.size()}")
      q, k, v = qkv.chunk(3, dim=-1) #each 30 X 8 X 200 X 64
      print(f"q size: {q.size()}, k size: {k.size()}, v size: {v.size()}, ")
      values, attention = scaled_dot_product(q, k, v, mask) ## attention = 30 X 8 X 200 X 200 ## value = 30 X 8 X 200 X 64
      print(f"values.size(): {values.size()}, attention.size:{ attention.size()} ")
      values = values.reshape(batch_size, max_sequence_length, self.num_heads * self.head_dim) #30 x 200 x 512
      print(f"values.size(): {values.size()}")
      out = self.linear_layer(values)
      print(f"out.size(): {out.size()}")
      return out

In [None]:
class LayerNormalization(nn.Module):
    def __init__(self, parameters_shape, eps=1e-5):
        super().__init__()
        self.parameters_shape=parameters_shape
        self.eps=eps
        self.gamma = nn.Parameter(torch.ones(parameters_shape))
        self.beta =  nn.Parameter(torch.zeros(parameters_shape))

    def forward(self, inputs):
        dims = [-(i + 1) for i in range(len(self.parameters_shape))]
        mean = inputs.mean(dim=dims, keepdim=True)
        print(f"Mean ({mean.size()})")
        var = ((inputs - mean) ** 2).mean(dim=dims, keepdim=True)
        std = (var + self.eps).sqrt()
        print(f"Standard Deviation  ({std.size()})")
        y = (inputs - mean) / std
        print(f"y: {y.size()}")
        out = self.gamma * y  + self.beta
        print(f"self.gamma: {self.gamma.size()}, self.beta: {self.beta.size()}")
        print(f"out: {out.size()}")
        return out


In [None]:
class PositionwiseFeedForward(nn.Module):

    def __init__(self, d_model, hidden, drop_prob=0.1):
        super(PositionwiseFeedForward, self).__init__()
        self.linear1 = nn.Linear(d_model, hidden) #512 x 2048
        self.linear2 = nn.Linear(hidden, d_model) #2048 x 512
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(p=drop_prob)

    def forward(self, x): #30 x 200 x 512
        x = self.linear1(x) #30 x 200 x 2048
        print(f"x after first linear layer: {x.size()}")
        x = self.relu(x) #30 x 200 x 2048
        print(f"x after activation: {x.size()}")
        x = self.dropout(x) #30 x 200 x 2048
        print(f"x after dropout: {x.size()}")
        x = self.linear2(x) #30 x 200 x 512
        print(f"x after 2nd linear layer: {x.size()}")
        return x

In [None]:
class EncoderLayer(nn.Module):
  def __init__(self, d_model, num_heads, drop_prob, ffn_hidden):
    super(EncoderLayer, self).__init__()
    self.attention = MultiHeadAttention(d_model, num_heads)
    self.norm1 =  LayerNormalization(parameters_shape = [d_model])
    self.dropout1 = nn.Dropout(p = drop_prob)
    self.ffn = PositionwiseFeedForward(d_model, ffn_hidden, drop_prob)
    self.norm2 = LayerNormalization(parameters_shape = [d_model])
    self.dropout2 = nn.Dropout(p = drop_prob)

  def forward(self,x):
    residual_x = x #30 x 200 x 512
    x = self.attention(x,x,x) #30 x 200 x 512
    x = self.dropout1(x) #30 x 200 x 512
    x = self.norm1(x + residual_x)
    residual_x = x
    x = self.ffn(x) #30 x 200 x 512
    x = self.dropout2(x) #30 x 200 x 512
    x = self.norm2(x + residual_x) #30 x 200 x 512
    return x #more context aware than before(UPDATED)

In [None]:
class Encoder(nn.Module):
  def __init__(self, d_model, num_heads, drop_prob, ffn_hidden, num_layers):
    super(Encoder, self).__init__()
    self.layers = nn.ModuleList([EncoderLayer(d_model, num_heads, drop_prob, ffn_hidden) for _ in range(num_layers)])

  def forwad(self,x):
    x = self.forwad(x)
    return x

In [None]:
d_model = 512
num_heads = 8
drop_prob = 0.1
batch_size = 30
max_sequence_length = 200
ffn_hidden = 2048
num_layers = 5

encoder = Encoder(d_model, num_heads, drop_prob, ffn_hidden, num_layers)