In [27]:
import torch
import math
from torch import nn
import torch.nn.functional as F

In [28]:
def scaled_dot_product(q, k, v, mask = None):
  dim_k = q.size()[-1]
  scaled = torch.matmul(q, k.transpose(-1,-2)) / math.sqrt(dim_k)
  print(f"scaled.size() : {scaled.size()}")

  if mask != None:
    print("Adding mask - boradcasting add - last N dim must be same")
    scaled +=  mask

  attention = F.softmax(scaled, dim = -1)
  values = torch.matmul(attention, v)
  return values, attention

In [29]:
class MultiHeadAttention(nn.Module):

    def __init__(self, d_model, num_heads):
        super().__init__()
        self.d_model = d_model
        self.num_heads = num_heads
        self.head_dim = d_model // num_heads
        self.qkv_layer = nn.Linear(d_model , 3 * d_model)
        self.linear_layer = nn.Linear(d_model, d_model)

    def forward(self, x, mask=None):
        batch_size, max_sequence_length, d_model = x.size()
        print(f"x.size(): {x.size()}")
        qkv = self.qkv_layer(x)
        print(f"qkv.size(): {qkv.size()}")
        qkv = qkv.reshape(batch_size, max_sequence_length, self.num_heads, 3 * self.head_dim)
        print(f"qkv.size(): {qkv.size()}")
        qkv = qkv.permute(0, 2, 1, 3)
        print(f"qkv.size(): {qkv.size()}")
        q, k, v = qkv.chunk(3, dim=-1)
        print(f"q size: {q.size()}, k size: {k.size()}, v size: {v.size()}, ")
        values, attention = scaled_dot_product(q, k, v, mask)
        print(f"values.size(): {values.size()}, attention.size:{ attention.size()} ")
        values = values.reshape(batch_size, max_sequence_length, self.num_heads * self.head_dim)
        print(f"values.size(): {values.size()}")
        out = self.linear_layer(values)
        print(f"out.size(): {out.size()}")
        return out

In [30]:
class LayerNormalization(nn.Module):
  def __init__(self, parameters_shape, eps = 1e-5) -> None:
    super().__init__()
    self.parameters_shape = parameters_shape
    self.eps = eps
    self.gamma = nn.Parameter(torch.ones(parameters_shape))
    self.beta = nn.Parameter(torch.zeros(parameters_shape))

  def forward(self, inputs):
    dims = [-(i+1) for i in range(len(self.parameters_shape))]
    mean = inputs.mean(dim = dims, keepdim = True)
    print(f"mean size = {mean.size()}")

    var = ((inputs - mean)**2).mean(dim = dims, keepdim = True)
    std = (var + self.eps).sqrt()
    print(f"Standard deviation : {std}")

    y = (inputs - mean) / std
    print(f"y: {y}")

    out = self.gamma * y + self.beta
    print(f"self.gamma: {self.gamma.size()}, self.beta: {self.beta.size()}")
    print(f"out: {out.size()}")
    return out


In [31]:
class PositionwiseFeedForward(nn.Module):
  def __init__(self, d_model, hidden, drop_prob=0.1):
    super(PositionwiseFeedForward, self).__init__()
    self.linear1 = nn.Linear(d_model, hidden)
    self.linear2 = nn.Linear(hidden, d_model)
    self.relu = nn.ReLU()
    self.dropout = nn.Dropout(p=drop_prob)

  def forward(self, x):
    x = self.linear1(x)
    print(f"x after first linear layer: {x.size()}")
    x = self.relu(x)
    print(f"x after activation: {x.size()}")
    x = self.dropout(x)
    print(f"x after dropout: {x.size()}")
    x = self.linear2(x)
    print(f"x after 2nd linear layer: {x.size()}")
    return x



In [32]:
class EncoderLayer(nn.Module):

  def __init__(self, d_model, ffn_hidden, num_heads, drop_prob):
    super(EncoderLayer, self).__init__()
    self.attention = MultiHeadAttention(d_model = d_model, num_heads = num_heads)
    self.norm1 = LayerNormalization(parameters_shape = [d_model])
    self.dropout1 = nn.Dropout(p = drop_prob)
    self.ffn = PositionwiseFeedForward(d_model = d_model, hidden = ffn_hidden, drop_prob = drop_prob)
    self.norm2 = LayerNormalization(parameters_shape = [d_model])
    self.dropout2 = nn.Dropout(p = drop_prob)

  def forward(self, x):
    residual_x = x
    print("Attention 1 ----------")
    x = self.attention(x, mask = None)
    print("Dropout 1 ---------- ")
    x = self.dropout1(x)
    print("add and layer normalization 1 ----------")
    x = self.norm1(x+residual_x)
    residual_x = x
    print("attention 2 ----------")
    x = self.ffn(x)
    print("dropout 2 ----------")
    x = self.dropout2(x)
    print("add and norm 2 ----------")
    x = self.norm2(x +residual_x)
    return x


In [33]:
class Encoder(nn.Module):
    def __init__(self, d_model, ffn_hidden, num_heads, drop_prob, num_layers):
        super().__init__()
        self.layers = nn.ModuleList(
            [EncoderLayer(d_model, ffn_hidden, num_heads, drop_prob) for _ in range(num_layers)]
        )

    def forward(self, x):
        for layer in self.layers:
            x = layer(x)
        return x


In [34]:
d_model = 512
num_heads = 8
drop_prob = 0.1
batch_size = 32
max_sequence_length = 200
ffn_hidden = 2048
num_layers = 6

encoder = Encoder(d_model, ffn_hidden, num_heads, drop_prob, num_layers)

In [35]:
x = torch.randn( (batch_size, max_sequence_length, d_model) ) # includes positional encoding
out = encoder(x)

Attention 1 ----------
x.size(): torch.Size([32, 200, 512])
qkv.size(): torch.Size([32, 200, 1536])
qkv.size(): torch.Size([32, 200, 8, 192])
qkv.size(): torch.Size([32, 8, 200, 192])
q size: torch.Size([32, 8, 200, 64]), k size: torch.Size([32, 8, 200, 64]), v size: torch.Size([32, 8, 200, 64]), 
scaled.size() : torch.Size([32, 8, 200, 200])
values.size(): torch.Size([32, 8, 200, 64]), attention.size:torch.Size([32, 8, 200, 200]) 
values.size(): torch.Size([32, 200, 512])
out.size(): torch.Size([32, 200, 512])
Dropout 1 ---------- 
add and layer normalization 1 ----------
mean size = torch.Size([32, 200, 1])
Standard deviation : tensor([[[0.9631],
         [0.9738],
         [1.0439],
         ...,
         [1.0037],
         [0.9882],
         [0.9767]],

        [[1.0033],
         [1.0077],
         [0.9886],
         ...,
         [0.9464],
         [1.0212],
         [1.0458]],

        [[1.0211],
         [1.0036],
         [1.0193],
         ...,
         [0.9524],
         [0.