# **Encoder Architecture**

![](https://media.geeksforgeeks.org/wp-content/uploads/20240110165738/Transformer-python.webp)

In [1]:
d_model = 512
num_heads = 8
drop_prob = 0.1
batch_size = 30
max_sequence_length = 200
ffn_hidden = 2048
num_layers = 5

In [2]:
import torch
import torch.nn as nn
import torch.nn.functional as F

In [3]:
import math

def scaled_attention(q, k, v, mask):                  # 30 x 8 x 200 x 64
  shape = max_sequence_length                         # 200 x 200
  d_k = q.size()[-1]                                  # 1 x 1
  scaled = (q @ k.transpose(-2, -1)) / math.sqrt(d_k) # 30 x 8 x 200 x 200

  if mask:
    tril = torch.tril(torch.ones(shape, shape))       # 200 x 200
    mask = tril.masked_fill(tril == 0, float('-inf')) # 200 x 200
    mask = mask.masked_fill(tril == 1, 0)             # 200 x 200
    scaled += mask                                    # 30 x 8 x 200 x 200

  attention = F.softmax(scaled, dim = -1)             # 30 x 8 x 200 x 200
  values = attention @ v                              # 30 x 8 x 200 x 64

  return values

In [4]:
class MultiheadAttention(nn.Module):
  def __init__(self, d_model, num_heads):
    super().__init__()
    self.d_model = d_model                           # 512
    self.num_heads = num_heads                       # 8
    self.head_dim = d_model // num_heads             # 64
    self.qkv_layer = nn.Linear(d_model, 3 * d_model) # 512 x 1536
    self.lin_layer = nn.Linear(d_model, d_model)     # 512 x 512

  def forward(self, x, mask = False):
    batch_size, sequence_len, input_dim = x.size()   # 30 x 200 x 512
    qkv = self.qkv_layer(x)                          # 30 x 200 x 1536
    qkv = qkv.reshape(batch_size, sequence_len,
                self.num_heads, 3 * self.head_dim)   # 30 x 200 x 8 x 196
    qkv = qkv.permute(0, 2, 1, 3)                    # 30 x 8 x 200 x 196
    q, k, v = qkv.chunk(3, dim = -1)                 # (30 x 8 x 200 x 64) * 3
    values = scaled_attention(q, k, v, mask)         # 30 x 8 x 200 x 64
    values = values.reshape(batch_size,sequence_len,
                    self.num_heads * self.head_dim)  # 30 x 200 x 512
    out = self.lin_layer(values)                     # 30 x 200 x 512
    return out

In [5]:
class LayerNorm(nn.Module):
  def __init__(self, params_shape, eps = 1e-5):
    super().__init__()
    self.params_shape = params_shape                         # 1 x 512
    self.eps = eps
    self.gamma = nn.Parameter(torch.ones(params_shape))      # 1 x 512
    self.beta = nn.Parameter(torch.zeros(params_shape))      # 1 x 512

  def forward(self, input):
    dims = [-(i + 1) for i in range(len(self.params_shape))] # 1 x params_shape
    mean = input.mean(dim = dims, keepdim = True)            # 30 x 200 x 1
    var = (((input - mean) ** 2)
                .mean(dim = dims , keepdim = True))          # 30 x 200 x 1
    sd = (var + self.eps).sqrt()                             # 30 x 200 x 1
    X_dash = (input - mean) / sd                             # 30 x 200 x 512
    Y = self.gamma * X_dash + self.beta                      # 30 x 200 x 512

    return Y

In [6]:
class PositionwiseFeedForward(nn.Module):
  def __init__(self, d_model, hidden, prob):
    super(PositionwiseFeedForward, self).__init__()
    self.linear1 = nn.Linear(d_model, hidden)       # 512, 2048
    self.linear2 = nn.Linear(hidden, d_model)       # 2048 x 512
    self.relu = nn.ReLU()
    self.dropout = nn.Dropout(prob)

  def forward(self, x):                             # 30 x 200 x 512
    x = self.linear1(x)                             # 30 x 200 x 2048
    x = self.relu(x)                                # 30 x 200 x 2048
    x = self.dropout(x)                             # 30 x 200 x 2048
    x = self.linear2(x)                             # 30 x 200 x 512
    return x

In [7]:
class EncoderLayer(nn.Module):
  def __init__(self, d_model, num_heads, drop_prob, ffn_hidden):
    super(EncoderLayer, self).__init__()
    self.attention = MultiheadAttention(d_model, num_heads)
    self.norm1 = LayerNorm([d_model])
    self.dropout1 = nn.Dropout(drop_prob)
    self.ffn = PositionwiseFeedForward(d_model, ffn_hidden, drop_prob)
    self.dropout2 = nn.Dropout(drop_prob)
    self.norm2 = LayerNorm([d_model])

  def forward(self, x):
    residual_x = x                  # 30 x 200 x 512
    x = self.attention(x)           # 30 x 200 x 512
    x = self.dropout1(x)            # 30 x 200 x 512
    x = self.norm1(x + residual_x)  # 30 x 200 x 512
    residual_x = x                  # 30 x 200 x 512
    x = self.ffn(x)                 # 30 x 200 x 512
    x = self.dropout2(x)            # 30 x 200 x 512
    x = self.norm2(x + residual_x)  # 30 x 200 x 512

    return x

In [8]:
class Encoder(nn.Module):
  def __init__(self, d_model, num_layers, num_heads, drop_prob, ffn_hidden):
    super().__init__()
    self.layers = nn.Sequential(*[EncoderLayer(d_model, num_heads, drop_prob, ffn_hidden) for _ in range(2)])

  def forward(self, x):
    for layer in self.layers:
      x = layer(x)

    return x

In [9]:
x = torch.randn(batch_size, max_sequence_length, d_model)
encoder = Encoder(d_model, num_layers, num_heads, drop_prob, ffn_hidden)
y = encoder(x)


In [10]:
x.shape, y.shape

(torch.Size([30, 200, 512]), torch.Size([30, 200, 512]))

In [11]:
x[0][0][:20]

tensor([ 1.4521, -0.8794,  1.7164, -1.2958, -0.2280, -0.0445,  1.3175,  0.9746,
        -0.0713, -2.6337, -0.6067,  1.6901, -1.1586,  0.2260, -0.4585, -0.1086,
         1.8267,  2.2390, -0.7567, -1.0699])

In [12]:
y[0][0][:20]

tensor([ 1.5077, -0.0504,  1.5386, -1.8080, -0.1344, -0.2638,  1.2649,  0.9424,
        -0.0458, -2.0284, -0.9581,  1.4612, -1.0095, -0.2887, -0.0793,  0.4107,
         1.6133,  1.6992, -0.3484, -1.3440], grad_fn=<SliceBackward0>)