In [16]:
import math
import torch
import numpy as np

In [17]:
d_output_embedding = 32
d_input_embedding = 32
d_hidden = 64
n_sequence_words = 10
n_head = 8

n_words = 1000

embedding_layer = torch.nn.Linear(n_words, d_input_embedding)
qkv_projection_layer = torch.nn.Linear(d_input_embedding, 3 * d_output_embedding)
mask_matrix = torch.triu(torch.ones(n_sequence_words, n_sequence_words)  * float('-inf'), diagonal=1)

In [18]:
# Positional encoding equation: 
# PE(pos, 2i) = sin(pos / 10000^(2i/d_model)) 
# PE(pos, 2i + 1) = cos(pos / 10000^(2i/d_model))
class PositionalEndoding(torch.nn.Module):
    def __init__(self, d_model, dropout: float = 0.1, max_len = 5000):
        super().__init__()

        self.dropout = torch.nn.Dropout(p=dropout)
        position = torch.arange(max_len).unsqueeze(1) # Unsqueeze means add a dimension for batch
        div_term = torch.exp(torch.arange(0, d_model, 2) * (-math.log(10000.0) / d_model))
        pe = torch.zeros(max_len, 1, d_model)
        pe[:, 0, 0::2] = torch.sin(position * div_term)
        pe[:, 0, 1::2] = torch.cos(position * div_term)
        self.register_buffer('pe', pe) # Register buffer means that the tensor will be saved in the state_dict but not trained
    
    def forward(self, x):
        # x: (batch_size, n_sequence_words, d_model)
        return self.dropout(x + self.pe[:x.size(0)]) # (batch_size, n_sequence_words, d_model)

Layer Normalization (https://arxiv.org/pdf/1607.06450.pdf):

$$ y = \frac{x - \mathrm{E}[x]}{ \sqrt{\mathrm{Var}[x] + \epsilon}} * \gamma + \beta $$

In [25]:

# https://pytorch.org/docs/stable/_modules/torch/nn/modules/normalization.html#LayerNorm
class LayerNorm(torch.nn.Module):
    def __init__(self, d_model, eps = 1e-6):
        super().__init__()
        self.d_model = d_model
        self.eps = eps
        self.gamma = torch.nn.Parameter(torch.ones(d_model))
        self.beta = torch.nn.Parameter(torch.zeros(d_model))

    def forward(self, x):
        if isinstance(self.d_model, int):
            dims = [-1]
        else:
            dims = [-(i+1) for i in range(len(self.d_model))]
        mean = x.mean(dim=dims, keepdim=True)
        std = x.std(-1, keepdim=True)
        x_transformed = (x-mean) / (std + self.eps)
        return self.gamma * x_transformed + self.beta

In [26]:
"""
    self attention function from the transformer model
    Q, K, V: (batch_size, n_head, n_sequence_words, d_output_embedding)
    mask: (n_sequence_words, n_sequence_words), None if no mask
    return: (batch_size, n_head, n_sequence_words, d_output_embedding) for the output of the self-attention
            (batch_size, n_head, n_sequence_words, n_sequence_words) for the attention matrix

    Example in matrix with batch size 2, n_head 3, n_sequence_words 2, d_output_embedding 3:
    Q (or K, V) = [
            // batch 1
            [
                // head 1
                [
                    // word 1
                    [1, 2, 3],
                    // word 2 
                    [4, 5, 6]
                ],
                // head 2
                [
                    [7, 8, 9],
                    [10, 11, 12]
                ],
            ], 
            // batch 2
            [
                [
                    [13, 14, 15],
                    [16, 17, 18]
                ],
                [
                    [19, 20, 21],
                    [22, 23, 24]
                ]
            ], 
            [
                [
                    [13, 14, 15], 
                    [16, 17, 18]
                ],
                [
                    [19, 20, 21],
                    [22, 23, 24]
                ]
            ]
        ]
"""
def self_attention(Q, K, V, mask = None):
    # Q, K, V: (batch_size, n_head, n_sequence_words, d_output_embedding)
    scale = torch.matmul(Q, K.transpose(-2, -1)) / np.sqrt(d_output_embedding) # (batch_size, n_head, n_sequence_words, n_sequence_words)
    if mask is not None:
        scale = scale + mask
    attention = torch.nn.functional.softmax(scale, dim=-1)
    return torch.matmul(attention, V), attention # (batch_size, n_head, n_sequence_words, d_output_embedding) for the output of the self-attention

class MultiHeadAttention(torch.nn.Module):
    def __init__(self, d_input_embedding, d_output_embedding, n_head):
        super(MultiHeadAttention, self).__init__()
        self.d_input_embedding = d_input_embedding
        self.d_output_embedding = d_output_embedding
        self.n_head = n_head
        self.qkv_projection_layer = torch.nn.Linear(d_input_embedding, 3 * d_output_embedding)
        self.linear = torch.nn.Linear(d_output_embedding, d_output_embedding)

    def forward(self, X, mask = None):
        # X: (batch_size, n_sequence_words, d_input_embedding)
        batch_size, n_sequence_words, _ = X.shape
        # Q, K, V projection
        qkv = self.qkv_projection_layer(X)
        batch_size, n_sequence_words, _ = qkv.shape
        qkv = qkv.reshape(batch_size, n_sequence_words, n_head, -1) # (batch_size, n_sequence_words, n_head, 3 * d_output_embedding for q + k + v)
        qkv = qkv.permute(0, 2, 1, 3) # (batch_size, n_head, n_sequence_words, 3 * d_output_embedding for q + k + v) The calculation is per head
        q, k, v = qkv.chunk(3, dim=-1) # (batch_size, n_head, n_sequence_words, d_output_embedding) for q, k, v
        value, attention = self_attention(q, k, v, mask) # value: (batch_size, n_head, n_sequence_words, d_output_embedding) for the output of the self-attention
        value = value.reshape(batch_size, n_sequence_words, -1) # concat n_head
        return self.linear(value)


In [27]:
# Positionwise feed forward
# It is a fully connected feed-forward network
# which is applied to each position separately and identically. This
# consists of two linear transformations with a ReLU activation in between.
# FFN(x) = ReLU(x*W1 + b1)*W2 + b

class PositionwiseFeedForward(torch.nn.Module):
    def __init__(self, d_model, d_hidden, dropout = 0.1):
        super(PositionwiseFeedForward, self).__init__()
        self.linear1 = torch.nn.Linear(d_model, d_hidden)
        self.linear2 = torch.nn.Linear(d_hidden, d_model)
        self.relu = torch.nn.ReLU()
        self.dropout = torch.nn.Dropout(dropout)

    def forward(self, x):
        x = self.relu(self.linear1(x))
        x = self.dropout(x)
        x = self.linear2(x)
        return x

In [28]:
# Encoder layer
# Represents a single encoder layer:
# 1. X = positionalEncoding(X)
# 2. X = self_attention(X) 
# 3. X = X + layer_norm(X)
# 4. X = positionwise_feed_forward(X)
# 5. X = X + layer_norm(X)

class EncoderLayer(torch.nn.Module):
    def __init__(self, d_model, d_hidden, n_head, dropout = 0.1):
        super(EncoderLayer, self).__init__()
        self.positional_encoding = PositionalEndoding(d_model, dropout)
        self.self_attention = MultiHeadAttention(d_model, d_model, n_head)
        self.positionwise_feed_forward = PositionwiseFeedForward(d_model, d_hidden, dropout)
        self.layer_norm1 = LayerNorm(d_model)
        self.layer_norm2 = LayerNorm(d_model)
        self.dropout = torch.nn.Dropout(dropout)
        self.relu = torch.nn.ReLU()

    def forward(self, X, mask = None):
        residual_X = X
        X = self.positional_encoding(X)
        X = self.self_attention(X, mask)
        X = self.layer_norm1(X) + self.dropout(residual_X)
        residual_X = X
        X = self.positionwise_feed_forward(X)
        X = self.layer_norm2(X) + self.dropout(residual_X)
        return X


In [29]:
X = torch.randn(8, n_sequence_words, d_input_embedding)

In [33]:
X.shape

torch.Size([8, 10, 32])

In [31]:
encoderLayer = EncoderLayer(d_output_embedding, d_hidden, n_head)
output = encoderLayer(X, mask_matrix)

In [32]:
output.shape

torch.Size([8, 10, 32])