<a href="https://colab.research.google.com/github/404saugat404/transformer/blob/main/transformer_encoder_complete_code.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [27]:
#first lets define the parameteers that we will need
d_model=512
num_head=8
drop_prob=0.1
batch_size=30
max_seq_len=200
ffn_hidden=2048
num_layer=5

import torch
import torch.nn as nn
import math

#before going on multihead attention, lets code for scaled dot product

In [28]:
#scaled dot product

def ScaledDotProduct(q,k,v,mask=None):
  d_k=q.size()[-1]
  scaled=torch.matmul(q,k.transpose(-1,-2))/math.sqrt(d_k)
  if mask is not None:
    scaled+=mask
  attention=torch.softmax(scaled,dim=-1)
  values=torch.matmul(attention,v)
  return values,attention


#lets create a class for multihead attention

In [6]:
#multihead attention
class MultiHeadAttention(nn.Module):
  def __init__(self,d_model,num_head):
    super().__init__()
    self.d_model=d_model
    self.num_head=num_head
    self.head_dim=d_model//num_head
    self.qkv_layer=nn.Linear(d_model,3*d_model)
    self.linear_layer=nn.Linear(d_model,d_model)

  def forward(self,x,mask=None):
    batch_size,max_seq_len,d_model=x.size()
    print(f"x.size(): {x.size()}")

    qkv=self.qkv_layer(x)
    print(f"qkv.size(): {qkv.size()}")

    qkv=qkv.reshape(batch_size,max_seq_len,self.num_head,3*self.head_dim)
    print(f"qkv.size(): {qkv.size()}")

    qkv = qkv.permute(0, 2, 1, 3)
    print(f"qkv.size(): {qkv.size()}")

    q, k, v = qkv.chunk(3, dim=-1)
    print(f"q size: {q.size()}, k size: {k.size()}, v size: {v.size()}, ")

    values, attention = ScaledDotProduct(q, k, v, mask)
    print(f"values.size(): {values.size()}, attention.size:{ attention.size()} ")

    values = values.reshape(batch_size, max_seq_len, d_model)
    print(f"values.size(): {values.size()}")

    output = self.linear_layer(values)
    print(f"output.size(): {output.size()}")

    return output


#lets create a class for layer normalization

In [34]:
#layer normalization
class LayerNormalization(nn.Module):
  def __init__(self,parameters_shape,eps=1e-5):
    super().__init__()
    self.parameters_shape=parameters_shape
    self.eps=eps
    self.gamma=nn.Parameter(torch.ones(parameters_shape))
    self.beta=nn.Parameter(torch.zeros(parameters_shape))

  def forward(self,inputs):
    dims = [-(i + 1) for i in range(len(self.parameters_shape))]
    mean = inputs.mean(dim=dims, keepdim=True)
    print(f"Mean ({mean.size()})")
    var = ((inputs - mean) ** 2).mean(dim=dims, keepdim=True)
    std = (var + self.eps).sqrt()
    print(f"Standard Deviation  ({std.size()})")
    y = (inputs - mean) / std
    print(f"y: {y.size()}")
    output = self.gamma * y  + self.beta
    print(f"self.gamma: {self.gamma.size()}, self.beta: {self.beta.size()}")
    print(f"out: {output.size()}")
    return output


    #try to understand this in more detail way using chatgpt or youtube


#lets create a class for feed forward network

In [9]:
class PositionWiseFeedForward(nn.Module):
  def __init__(self,d_model,hidden,drop_prob):
    super(PositionWiseFeedForward, self).__init__()
    self.linear1=nn.Linear(d_model,hidden)
    self.linear2=nn.Linear(hidden,d_model)
    self.relu=nn.ReLU()
    self.dropout=nn.Dropout(p=drop_prob)

  def forward(self,x):
    x=self.linear1(x)
    x=self.relu(x)
    x=self.dropout(x)
    x=self.linear2(x)
    return x



#lets define an EncoderLayer

In [32]:
#EncoderLayer

class EncoderLayer(nn.Module):
  def __init__(self,d_model,num_head,drop_prob,ffn_hidden):
    super().__init__()
    self.attention=MultiHeadAttention(d_model=d_model,num_head=num_head)

    self.norm1=LayerNormalization(parameters_shape=[d_model])

    self.dropout1=nn.Dropout(p=drop_prob)

    self.ffn=PositionWiseFeedForward(d_model=d_model,hidden=ffn_hidden,drop_prob=drop_prob)



    self.norm2=LayerNormalization(parameters_shape=[d_model])

    self.dropout2=nn.Dropout(p=drop_prob)


  def forward(self,x):
    residual_x=x

    print("--------attention 1--------")
    x=self.attention(x,mask=None)

    print("--------dropout--------")
    x=self.dropout1(x)

    print("--------add and norm--------")
    x=self.norm1(x+residual_x)

    print("--------ffn--------")
    residual_x=x
    x=self.ffn(x)

    print("--------dropout2--------")
    x=self.dropout2(x)

    print("--------add and norm2--------")
    x=self.norm2(x+residual_x)

    return x

#lets define the class encoder


In [25]:
import torch
import torch.nn as nn

class Encoder(nn.Module):
  def __init__(self,d_model,num_head,drop_prob,ffn_hidden,num_layers):
    super().__init__()
    self.layers=nn.Sequential(*[EncoderLayer(d_model,num_head,drop_prob,ffn_hidden) for _ in range(num_layers)])

  def forward(self,x):
    x=self.layers(x)
    return x

In [35]:
#now lets check the architecture
encoder=Encoder(d_model,num_head,drop_prob,ffn_hidden,num_layer)
x=torch.rand(batch_size,max_seq_len,d_model)
output=encoder(x)
print(output.shape)

--------attention 1--------
x.size(): torch.Size([30, 200, 512])
qkv.size(): torch.Size([30, 200, 1536])
qkv.size(): torch.Size([30, 200, 8, 192])
qkv.size(): torch.Size([30, 8, 200, 192])
q size: torch.Size([30, 8, 200, 64]), k size: torch.Size([30, 8, 200, 64]), v size: torch.Size([30, 8, 200, 64]), 
values.size(): torch.Size([30, 8, 200, 64]), attention.size:torch.Size([30, 8, 200, 200]) 
values.size(): torch.Size([30, 200, 512])
output.size(): torch.Size([30, 200, 512])
--------dropout--------
--------add and norm--------
Mean (torch.Size([30, 200, 1]))
Standard Deviation  (torch.Size([30, 200, 1]))
y: torch.Size([30, 200, 512])
self.gamma: torch.Size([512]), self.beta: torch.Size([512])
out: torch.Size([30, 200, 512])
--------ffn--------
--------dropout2--------
--------add and norm2--------
Mean (torch.Size([30, 200, 1]))
Standard Deviation  (torch.Size([30, 200, 1]))
y: torch.Size([30, 200, 512])
self.gamma: torch.Size([512]), self.beta: torch.Size([512])
out: torch.Size([30, 20

In [36]:

x = torch.randn( (batch_size, max_seq_len, d_model) ) # includes positional encoding
out = encoder(x)

--------attention 1--------
x.size(): torch.Size([30, 200, 512])
qkv.size(): torch.Size([30, 200, 1536])
qkv.size(): torch.Size([30, 200, 8, 192])
qkv.size(): torch.Size([30, 8, 200, 192])
q size: torch.Size([30, 8, 200, 64]), k size: torch.Size([30, 8, 200, 64]), v size: torch.Size([30, 8, 200, 64]), 
values.size(): torch.Size([30, 8, 200, 64]), attention.size:torch.Size([30, 8, 200, 200]) 
values.size(): torch.Size([30, 200, 512])
output.size(): torch.Size([30, 200, 512])
--------dropout--------
--------add and norm--------
Mean (torch.Size([30, 200, 1]))
Standard Deviation  (torch.Size([30, 200, 1]))
y: torch.Size([30, 200, 512])
self.gamma: torch.Size([512]), self.beta: torch.Size([512])
out: torch.Size([30, 200, 512])
--------ffn--------
--------dropout2--------
--------add and norm2--------
Mean (torch.Size([30, 200, 1]))
Standard Deviation  (torch.Size([30, 200, 1]))
y: torch.Size([30, 200, 512])
self.gamma: torch.Size([512]), self.beta: torch.Size([512])
out: torch.Size([30, 20