<a href="https://colab.research.google.com/github/ajayvallabh/PytorchTutorial/blob/main/TransformerEncoder.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import torch
import math
from torch import nn
import torch.nn.functional as F

In [None]:
def scaled_dot_product(q, k, v, mask=None):
  # q,k,v = 30 x 8 x 200 x 64
    d_k = q.size()[-1] # 64
    scaled = torch.matmul(q, k.transpose(-1, -2)) / math.sqrt(d_k) # 30 x 8 x200 x200
    print(f"scaled.size() : {scaled.size()}")
    if mask is not None:
        print(f"-- ADDING MASK of shape {mask.size()} --")
       # 30 x 8 x200 x200        # Broadcasting add. So just the last N dimensions need to match
        scaled += mask
    attention = F.softmax(scaled, dim=-1) # 30 x 8 x200 x200
    values = torch.matmul(attention, v) # 30 x 8 x200 x64
    return values, attention

In [None]:
class MultiHeadAttention(nn.Module):

    def __init__(self, d_model, num_heads):
        super().__init__()
        self.d_model = d_model # 512
        self.num_heads = num_heads # 8
        self.head_dim = d_model // num_heads # 64
        self.qkv_layer = nn.Linear(d_model , 3 * d_model) # 512* 1536
        self.linear_layer = nn.Linear(d_model, d_model) # 512* 512

    def forward(self, x, mask=None):
        batch_size, max_sequence_length, d_model = x.size() # 30x 200 x 512
        print(f"x.size(): {x.size()}")
        qkv = self.qkv_layer(x) # 30 x 200 x 1536
        print(f"qkv.size(): {qkv.size()}")
        qkv = qkv.reshape(batch_size, max_sequence_length, self.num_heads, 3 * self.head_dim) # 30 x 200 x 8 x 192
        print(f"qkv.size(): {qkv.size()}")
        qkv = qkv.permute(0, 2, 1, 3) # 30 x 200 x 8 x 192
        print(f"qkv.size(): {qkv.size()}")
        q, k, v = qkv.chunk(3, dim=-1) # each are 30 x 8 x 200 x 64
        print(f"q size: {q.size()}, k size: {k.size()}, v size: {v.size()}, ")
        values, attention = scaled_dot_product(q, k, v, mask) # 30 x 8 x200 x200 ## # 30 x 8 x200 x 64
        print(f"values.size(): {values.size()}, attention.size:{ attention.size()} ")
        values = values.reshape(batch_size, max_sequence_length, self.num_heads * self.head_dim) # 30 x200 x 512
        print(f"values.size(): {values.size()}")
        out = self.linear_layer(values)
        print(f"out.size(): {out.size()}")
        return out

In [None]:
class LayerNormalization(nn.Module):
    def __init__(self, parameters_shape, eps=1e-5):
        super().__init__()
        self.parameters_shape=parameters_shape # [512]
        self.eps=eps
        self.gamma = nn.Parameter(torch.ones(parameters_shape)) # [512]
        self.beta =  nn.Parameter(torch.zeros(parameters_shape)) # [512]

    def forward(self, inputs): # 30 x 200 x512
        dims = [-(i + 1) for i in range(len(self.parameters_shape))] # [-1]
        mean = inputs.mean(dim=dims, keepdim=True) # 30 x 200 x 1
        print(f"Mean ({mean.size()})")
        var = ((inputs - mean) ** 2).mean(dim=dims, keepdim=True) # 30 x 200 x 1
        std = (var + self.eps).sqrt() # 300 x 200 x 1
        print(f"Standard Deviation  ({std.size()})")
        y = (inputs - mean) / std # 300 x 200 x 512
        print(f"y: {y.size()}")
        out = self.gamma * y  + self.beta # 300 x 200 x 512
        print(f"self.gamma: {self.gamma.size()}, self.beta: {self.beta.size()}")
        print(f"out: {out.size()}")
        return out


In [None]:
class PositionwiseFeed Forward(nn.Module):
  def __init__(self, d_model, hidden, drop_prob = 0.1):
    super(PositionwiseFeedForward, self).__init__()
    self.linear1 = nn.Linear(d_model, hidden) # 512 x 2048
    self.linear2 = nn.Linear(hidden, d_model) # 2048 x 512
    self.relu = nn.ReLU()
    self.dropout = nn.Dropout(p=drop_prob)


  def forward(self, x): # 30 x 200 x 512
    x = self.linear1(x) # 30 x 200 x 2048
    x = self.relu(x) # 30 x 200 x 2048
    x = self.dropout(x) # 30 x 200 x 2048
    x = self.linear2(x) # 30 x 200 x 512
    return x

In [None]:

class EncoderLayer(nn.Module):
  def __init__(self, d_model, ffn_hidden, num_heads, drop_prob):
    super(EncoderLayer, self).__init__()
    self.attention = MultiHeadAttention(d_model = d_model, num_heads=num_heads)
    self.norm1 = LayerNoramlization(parameters_shape =[d_model])
    self.dropout1 = nn.Dropout(p=drop_prob)
    self.ffn = PositionwiseFeedForward(d_model=d_model, hidden=ffn_hidden, drop_prob=drop_prob)
    self.norm2 = LayerNormalization(parameters_shape= [d_model])
    self.dropout2 = nn.Dropout(p=drop_prob)

  def forward(self, x):
    residual_x = x # 30 x 200 x 512
    x = self.attention(x, mask= None) # 30 x 200 x 512
    x = self.dropout1(x) # 30 x 200 x 512
    x = self.norm1(x+residual_x) # 30 x 200 x 512
    residual_x = x # 30 x 200 x 512
    x = self.ffn(x) # 30 x 200 x 512
    x = self.dropout2(x) # 30 x 200 x 512
    x = self.norm2(x+residual_x) # 30 x 200 x 512
    return x


class Encoder(nn.Module):
  def __init__(self, d_model, ffn_hiddne, num_heads, drop_prob, num_layers):
    super().__init__()
    self.layers = nn.Sequential(*[EncoderLayer(d_model, ffn_hiddne, num_heads, drop_prob, num_layers) for _ in range(num_layers)])

  def forward(self, x):
    x = self.layers(x)
    return x

In [None]:
d_model =512.  # Embeded token dimension
num_heads = 8.  # split multi head attention in to 8 sub attentions
drop_prob =0.1 # 10% drop for optmization and avoid overfitting or leaning
batch_size = 30 # only 30 sentence allow in each batch
max_sequence_length = 200 # largest number of words allow in the sentence
ffn_hidden = 2048 # feed forward network
num_layers = 5 # number of transformer layer or number of time input is repeated

In [None]:
encoder = Encoder(d_model, ffn_hidden, num_heads, drop_prob, num_layers)
x = torch.randn((batch_size, max_sequence_length, d_model)) # include position
out = encoder(x)