In [12]:
import math
import torch
from torch import nn
import torch.nn.functional as F
import numpy as np
import pandas as pd

In [2]:
torch.sqrt(torch.Tensor([4]))

tensor([2.])

In [3]:
class FeedForward(nn.Module):
    def __init__(self, d_model, hidden, drop_prob=0.1):
        super().__init__()
        self.linear1 = nn.Linear(d_model, hidden)
        self.linear2 = nn.Linear(hidden, d_model)
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(p=drop_prob)

    def forward(self, x):
        x = self.linear1(x)
        x = self.relu(x)
        x = self.dropout(x)
        x = self.linear2(x)
        return x

In [4]:
class LayerNormalization(nn.Module):
    def __init__(self, parameters_shape, eps=1e-5):
        super().__init__()
        self.parameters_shape = parameters_shape # [d_model]
        self.eps = eps
        self.gamma = nn.Parameter(torch.ones(parameters_shape))
        self.beta = nn.Parameter(torch.zeros(parameters_shape))

    def forward(self, inputs): #  batch_size * max_length * d_model
        dims = [-(i+1) for i in range(len(self.parameters_shape))]
        print(dims)
        mean = inputs.mean(dim=dims, keepdim=True)
        var = ((inputs - mean)**2).mean(dim=dims, keepdim=True)
        std = (var + self.eps).sqrt()
        y = (inputs - mean) / std
        out = self.gamma * y + self.beta
        return out

In [5]:
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, num_heads):
        super().__init__()
        self.d_model = d_model
        self.num_heads = num_heads
        self.head_dim = d_model // num_heads
        # 3 * d_model to simulate three independant matrix, we can consider these three matrices as concatenate together
        self.qkv_layer = nn.Linear(d_model, 3 * d_model)
        self.linear_layer = nn.Linear(d_model, d_model)
    
    def forward(self, x, mask=None):
        batch_size, sequence_length, _ = x.size()
        qkv = self.qkv_layer(x)
        # We create dimension for the heads to parallelize the process.
        # The last dimension contains the matrix q, k and v
        qkv = qkv.reshape(batch_size, sequence_length, self.num_heads, 3 * self.head_dim)
        # We move the head dimension to the second position and the sequence length dimension to the third place.
        # This allows us to parallelize the calculations of the dot products K and Q for each word and then for each head.
        qkv.permute(0, 2, 1, 3)
        # We retrieve independent q, k and v matrices by chuking the qkv matrix on the last dimension
        q, k, v = qkv.chunk(3, dim=-1)
        attention = (q @ k.transpose(-2, -1)) / math.sqrt(self.head_dim)
        if mask is not None:
            attention += mask
        attention = F.softmax(attention, dim=-1)
        values = attention @ v
        # Concatenation of all the different head, strictly equivalent to (batch_size, sequence_length, self.d_model)
        values = values.reshape(batch_size, sequence_length, self.num_heads*self.head_dim)
        out = self.linear_layer(values)
        return out

In [6]:
class EncoderLayer(nn.Module):
    def __init__(self, d_model, ffn_hidden, num_heads, drop_prob):
        super().__init__()
        self.attention = MultiHeadAttention(d_model=d_model, num_heads=num_heads)
        self.dropout1 = nn.Dropout(p=drop_prob)
        self.norm1 = LayerNormalization(parameters_shape=[d_model])
        self.ffn = FeedForward(d_model=d_model, hidden=ffn_hidden, drop_prob=drop_prob)
        self.dropout2 = nn.Dropout(p=drop_prob)
        self.norm2 = LayerNormalization(parameters_shape=[d_model])


    def forward(self, x):
        residual_x = x
        x= self.attention(x, mask=None) # The encoder has to be able to look at any other word in the sentence
        x = self.dropout1(x)
        x = self.norm1(x + residual_x)
        residual_x = x
        x = self.ffn(x)
        x = self.dropout2(x)
        x = self.norm2(x + residual_x)
        return x


In [7]:
class Encoder(nn.Module):
    def __init__(self, d_model, ffn_hidden, num_heads, drop_prob, num_layers):
        super().__init__()
        self.layers = nn.Sequential(*(EncoderLayer(d_model, ffn_hidden, num_heads, drop_prob)
                                      for _ in range(num_layers)))

    def forward(self,x):
        x = self.layers(x)
        return x

In [8]:
d_model = 512 # embedding dimension
max_length = 200 # maximum number of words for one translation
batch_size = 32 # number of "sentence" per batch
num_heads = 8 # number of heads during the self attention
drop_prob = 0.1 # probability of dropout for a better generalization
ffn_hidden = 2048 # expend 512 to 2048 during feed forward step
num_layers = 5 # number of sequential encoder

In [9]:
encoder = Encoder(d_model, ffn_hidden, num_heads, drop_prob, num_layers)

In [10]:
sum(p.numel() for p in encoder.parameters() if p.requires_grad)

15761920

In [11]:
x = torch.randn((batch_size, max_length, d_model))
x = encoder(x)
x

[-1]
[-1]
[-1]
[-1]
[-1]
[-1]
[-1]
[-1]
[-1]
[-1]


tensor([[[ 2.8273,  0.5176,  1.3205,  ...,  2.1729, -0.5244,  0.6641],
         [ 1.0610,  0.0607,  0.3337,  ...,  0.3078, -0.1650,  0.8738],
         [-0.5909, -0.7375,  1.8477,  ...,  1.4211, -2.1396, -0.2343],
         ...,
         [ 0.4979, -0.5947,  0.1580,  ...,  0.0441, -1.6795,  0.3166],
         [-0.1200,  1.1569,  0.5789,  ..., -1.0715, -1.3850, -0.8678],
         [-0.2148, -0.6836,  2.9846,  ...,  1.3972,  0.7459,  2.1555]],

        [[-0.8392, -0.9191,  1.8405,  ...,  0.0996,  1.0430,  0.1627],
         [-1.3963, -2.1196,  0.9659,  ..., -0.2220, -0.3350,  1.1400],
         [ 0.4950,  0.7012,  1.4349,  ..., -0.7722, -0.1003, -0.2914],
         ...,
         [ 0.3337,  0.0944,  0.3303,  ..., -0.2441, -1.3602,  0.1699],
         [ 0.8172, -1.8289,  0.7876,  ...,  1.3739, -1.2688,  0.8389],
         [ 0.4236,  0.6009,  0.9114,  ...,  0.8605, -0.4383, -0.5911]],

        [[ 0.1716,  0.1042,  0.2407,  ..., -0.2389, -0.0581,  0.5588],
         [-0.9467, -1.1659,  0.9371,  ...,  0