In [2]:
import torch.nn as nn
import torch 
import torch.nn.functional as F
import math, copy, re
import warnings
import pandas as pd
import numpy as np
import seaborn as sns
import torchtext
import matplotlib.pyplot as plt
warnings.simplefilter("ignore")
print(torch.__version__)

2.0.0


**Creating Embedding Vectors**

- Each embedding vector's size is 512. If the vocab size is 100, that makes the embedding matrix 100x512. Similarly, if batch_size=64, the output will be 64x10x512.

In [3]:
class EmbeddingLayer(nn.Module):
    def __init__(self, vocab_size, embedding_dimension):
        """
        Parameters:
            vocab_size: vocabulary size
            embedding_dimension: dimension of embeddings
        """
        super(EmbeddingLayer, self).__init__()
        self.embed = nn.Embedding(vocab_size, embedding_dimension)
    
    def forward(self, x):
        """
        Parameters:
            x: input vector
        Returns:
            output: embedding vector
        """
        output = self.embed(x)
        return output

**Positional Encoding**
- In the paper, authors use sin() and cos() to create positional encoding. According to the paper, cosine is used for odd time steps and sine used in even time steps.
- PE = sin(pos/10000^2i/d)
- PE = cos(pos/10000^2i/d)
- Where pos is order in the sentence and i is position. If we have a batch size of 32 and seq length of 10 and let embedding dimension be 512, positional encoding vector's dimension would be 32x10x512. Then we add embedding vector's dimension with this.

In [4]:
class PositionalEmbedding(nn.Module):
    def __init__(self, max_seq_len, embed_model_dimension):
        super(PositionalEmbedding, self).__init__()
        self.embed_dim = embed_model_dimension
        
        positional_encoding = torch.zeros(max_seq_len, self.embed_dim)
        for pos in range(max_seq_len):
            for i in range(0, self.embed_dim, 2):
                positional_encoding[pos, i] = torch.sin(pos / (10000 ** ((2 * i) / self.embed_dim)))
                positional_encoding[pos, i + 1] = torch.cos(pos / (10000 ** ((2 * (i)) / self.embed_dim)))
        positional_encoding = positional_encoding.unsqueeze(0)
        self.register_buffer('pe', positional_encoding)
        
    def forward(self, x):
        x = x * np.sqrt(self.embed_dim)
        seq_len = x.size(1)
        x = x + torch.autograd.Variable(self.pe[:, :seq_len], requires_grad=False)
        return x

**MultiHeadAttention**

- Implementation of the multi-head attention mechanism which takes in -key-, -query-, -value- tensors each of shape (*batch_size*, *sequence_length*, *embedding_dimension*)

- The embedding_dimension is then split into n_heads and the multi-head attention is performed separately. Individual attention scores are concatenated and passed through a linear layer.

- To allow for parallel computation, *key*, *query* and *value* tensors are reshaped so that the *embedding_dimension* is split into n_heads equally. Thus, the *key* and *query* tensors are used to compute attention scores.

- A mask could be passed in to mask out particular elements in the *key* tensor. It's a useful method when performing seq2seq tasks, where attention mechanism needs to attend only to the previous elements in the sequence.


In [5]:
class MultiHeadAttention(nn.Module):
    def __init__(self, embedding_dimension=512, n_heads=8):
        super(MultiHeadAttention, self).__init__()
        
        self.embedding_dimension = embedding_dimension
        self.n_heads = n_heads
        self.single_head_dim = int(self.embedding_dimension / self.n_heads) #512 / 8 = 6 each key, query, value will be of 64d
        
        #key, query, value matrices 64 x 64
        self.query_matrix = nn.Linear(self.single_head_dim, self.single_head_dim, bias=False) 
        self.key_matrix = nn.Linear(self.single_head_dim, self.single_head_dim, bias=False)
        self.value_matrix = nn.Linear(self.n_heads * self.single_head_dim, self.embedding_dimension)
        self.out = nn.Linear(self.n_heads * self.single_head_dim, self.embed_dim)
        
    def fforward(self, key, query, value, mask=None):
        batch_size = key.size(0)
        seq_length = key.size(1)
        
        seq_length_query = query.size(1)
        
        key = key.view(batch_size, seq_length, self.n_heads, self.single_head_dim)
        query = query.view(batch_size, seq_length_query, self.n_heads, self.single_head_dim)
        value = value.view(batch_size, seq_length, self.n_heads, self.single_head_dim)
        
        key_ = self.key_matrix(key)
        query_ = self.query_matrix(query)
        value_ = self.value_matrix(value)
        
        query_ = query_.transpose(1,2)
        key_ = key.transpose(1,2)
        value_ = value.transpose(1,2)
        
        #computing attention
        key_adjusted = key_.transpose(-1,2)
        product = torch.matmul(query_, key_adjusted) #(32 x 8 x 10 x 64) x (32 x 8 x 64 x 10)
        
        if mask is not None:
            product = product.masked_fill(mask == 0, float("-1e20"))
            
        product = product / math.sqrt(self.single_head_dim) #/sqrt(64)
        
        #softmax
        scores = F.softmax(product, dim=-1)
        
        #multiply with value matrix
        scores = torch.matmul(scores, value_)
        
        #concatenate
        concat = scores.transpose(1,2).contiguous().view(batch_size, seq_length_query, self.single_head_dim * self.n_heads)
        
        output = self.out(concat) #(32, 10, 512) -> (32, 10, 512)
        
        return output

In [6]:
class TransformerLayer(nn.Module):
    def __init__(self, embedding_dimension=512, n_heads=8, fforward_dimension=2048):
        super(TransformerLayer, self).__init__()
        
        self.attention = MultiHeadAttention(embedding_dimension, n_heads)
        self.norm1 = nn.LayerNorm(embedding_dimension)
        
        self.fforward = nn.Sequential(
            nn.Linear(embedding_dimension, fforward_dimension),
            nn.ReLU(),
            nn.Linear(fforward_dimension, embedding_dimension),
        )
        self.norm2 = nn.LayerNorm(embedding_dimension)
        
    def forward(self, x, mask=None):
        attention_output = self.attention(x, x, x, mask)
        norm1_output = self.norm1(x + attention_output)
        
        fforward_output = self.fforward(norm1_output)
        norm2_output = self.norm2(norm1_ouput + fforward_output)
        
        return norm2_output

**Encoder**

In [7]:
class TransformerEncoder(nn.Module):
    def __init__(self, num_layers, embedding_dimension, n_heads, fforward_dimension, dropout_rate):
        super(TransformerEncoder, self).__init__()
        self.layers = nn.ModuleList([TransformerLayer(embedding_dimension, n_heads, fforward_dimension, dorpout_rate) for _ in range(num_layers)])
        self.layer_norm = nn.LayerNorm(embedding_dimension)
        
    def forward(self, x, mask):
        for layer in self.layers:
            x = layer(x, mask)
        x = self.layer_norm(x)
        return x