In [1]:
import math
import torch
from torch import nn
import torch.nn.functional as F

In [14]:
def scaled_dot_product(q, k, v, mask=None):
  d_k = q.size()[-1]
  scaled = torch.matmul(q, k.transpose(-1, -2)) / math.sqrt(d_k)
  print(f"scaled.size() : {scaled.size()}")
  if mask is not None:
    scaled += mask
  attention = F.softmax(scaled, dim=-1)
  values = torch.matmul(attention, v)
  return values, attention

class MultiHeadAttention(nn.Module):
  def __init__(self, d_model, num_heads):
    super().__init__()
    self.d_model = d_model
    self.num_heads = num_heads
    self.head_dim = d_model // num_heads
    self.qkv_layer = nn.Linear(d_model , 3 * d_model)
    self.linear_layer = nn.Linear(d_model, d_model)

  def forward(self, x, mask=None):
    batch_size, max_sequence_length, d_model = x.size()
    qkv = self.qkv_layer(x)
    qkv = qkv.reshape(batch_size, max_sequence_length, self.num_heads, 3 * self.head_dim)
    qkv = qkv.permute(0, 2, 1, 3)
    q, k, v = qkv.chunk(3, dim=-1)
    values, attention = scaled_dot_product(q, k, v, mask)
    values = values.reshape(batch_size, max_sequence_length, self.num_heads * self.head_dim)
    out = self.linear_layer(values)
    return out

class LayerNormalization(nn.Module):
  def __init__(self, parameters_shape, eps=1e-5):
    super().__init__()
    self.parameters_shape=parameters_shape
    self.eps=eps
    self.gamma = nn.Parameter(torch.ones(parameters_shape))
    self.beta =  nn.Parameter(torch.zeros(parameters_shape))

  def forward(self, inputs):
    dims = [-(i + 1) for i in range(len(self.parameters_shape))]
    mean = inputs.mean(dim=dims, keepdim=True)
    var = ((inputs - mean) ** 2).mean(dim=dims, keepdim=True)
    std = (var + self.eps).sqrt()
    y = (inputs - mean) / std
    out = self.gamma * y  + self.beta
    return out


class PositionwiseFeedForward(nn.Module):
  def __init__(self, d_model, hidden, drop_prob=0.1):
    super(PositionwiseFeedForward, self).__init__()
    self.linear1 = nn.Linear(d_model, hidden)
    self.linear2 = nn.Linear(hidden, d_model)
    self.relu = nn.ReLU()
    self.dropout = nn.Dropout(p=drop_prob)

  def forward(self, x):
    x = self.linear1(x)
    x = self.relu(x)
    x = self.dropout(x)
    x = self.linear2(x)
    return x

class EncoderLayer(nn.Module):
  def __init__(self, d_model, ffn_hidden, num_heads, drop_prob):
    super(EncoderLayer, self).__init__()
    self.attention = MultiHeadAttention(d_model=d_model, num_heads=num_heads)
    self.norm1 = LayerNormalization(parameters_shape=[d_model])
    self.dropout1 = nn.Dropout(p=drop_prob)
    self.ffn = PositionwiseFeedForward(d_model=d_model, hidden=ffn_hidden, drop_prob=drop_prob)
    self.norm2 = LayerNormalization(parameters_shape=[d_model])
    self.dropout2 = nn.Dropout(p=drop_prob)

  def forward(self, x):
    residual_x = x
    print("------- ATTENTION 1 ------")
    x = self.attention(x, mask=None)
    print("------- DROPOUT 1 ------")
    x = self.dropout1(x)
    print("------- ADD AND LAYER NORMALIZATION 1 ------")
    x = self.norm1(x + residual_x)
    residual_x = x
    print("------- ATTENTION 2 ------")
    x = self.ffn(x)
    print("------- DROPOUT 2 ------")
    x = self.dropout2(x)
    print("------- ADD AND LAYER NORMALIZATION 2 ------")
    x = self.norm2(x + residual_x)
    return x

class Encoder(nn.Module):
  def __init__(self, d_model, ffn_hidden, num_heads, drop_prob, num_layers):
    super().__init__()
    self.layers = nn.Sequential(*[EncoderLayer(d_model, ffn_hidden, num_heads, drop_prob) for _ in range(num_layers)])

  def forward(self, x):
    x = self.layers(x)
    return x

In [15]:
d_model = 512
num_heads = 8
drop_prob = 0.1
batch_size = 30
max_sequence_length = 200
ffn_hidden = 2048
num_layers = 5

encoder = Encoder(d_model, ffn_hidden, num_heads, drop_prob, num_layers)

In [None]:
x = torch.randn( (batch_size, max_sequence_length, d_model) ) # includes positional encoding
out = encoder(x)