In [8]:
import math
from pathlib import Path
from copy import deepcopy

import torch
import torch.nn as nn
from transformers import BertTokenizer

# from datasets import load_dataset
# from tokenizers import Tokenizer
# from tokenizers.models import WordLevel
# from tokenizers.trainers import WordLevelTrainer
# from tokenizers.pre_tokenizers import Whitespace

# Tokenization
Us humans love text (semantic) inputs and outputs, but the machine (regrettably) does not. Tokenization is the process of converting text into numerical symbols to facilitate calculations on it.

The paper uses BPE (Byte-Pair Encoding). I will use the pre-trained BertTokenizer from HuggingFace Transformers Library.

Bert was pretrained with two objectives:
1. **Masked Language Modeling (MLM)**: BERT randomly masks 15% of the input tokens and trains to predict the masked words. This allows for bidirectional learning.
2. **Next Sentence Prediction (NSP)**: Allows BERT to understand the coherence between sentences. Model has to predict if these two sentences were next to each other in the original text or not. 

This way, the model learns an inner representation of the English language that can then be used to extract features useful for downstream tasks.

There are a few special 'tokens' which should be noted:
1. **`[CLS]`** (Classifier): This token is inserted at the beginning of the sequence. In NSP, this represents the entire sequence, capturing the overall meaning of the combined input.
2. **`[SEP]`** (Separator): This token acts as a separator between the two sentences in a pair during NSP. It helps the model differentiate between the first and second sentences within the combined sequence.
3. **`[MASK]`** (Mask): This token replaces a certain percentage of words in the input sentence. The model's objective is to predict the original masked word based on the context provided by the surrounding words.
4. **`[UNK]`** (Unknown): Represents an unknown word.


In [9]:
tok = BertTokenizer.from_pretrained("bert-base-uncased")

# Embedding
Embedding is the conversion of any word token in to its vector representation. For example, the words 'boat' and 'ship' are closer to each other than, say, the word 'racoon'.     
Each dimension of the vector representation nudges the entity in some semantic direction. So in the language of vectors, the cosine similarity of two entities insinuates semantic closeness.

The weights for these embeddings are multiplied by 	$$\sqrt{d_m}$$      
Where d<sub>m</sub> == num. of dimensions of model

In [10]:
class Embed(nn.Module):
    def __init__(self, vocab: int, d_model: int = 512):
        super(Embed, self).__init__()
        self.d_model = d_model
        self.vocab = vocab
        self.emb = nn.Embedding(self.vocab, self.d_model)
        self.scaling = torch.sqrt(self.d_model)

    def forward(self, x):
        return self.emb(x) * self.scaling

# Positional Encoding
Unlike RNNs, the Transformer model has no idea of relative word positions in a sentence. For the model, the phrases    
`Avengers beat Thanos` and `Thanos beat Avengers`    
Mean the same, even though these two sentences are catastrophically different. 

Therefore, this information is injected by adding a 'positional encoding' in the input embeddings of the encoder and the decoder. The Transformer Architecture uses sine and cosine transformations to achieve this

> PE<sub>(pos, 2i)</sub> = sin($pos \over 10000^{(2i/d_m)}$)
<br>

> PE<sub>(pos, 2i+1)</sub> = cos($pos \over 10000^{(2i/d_m)}$)


In [11]:
class PositionalEncoding(nn.Module):
    def __init__(self, d_model: int = 512, dropout: float = .1, max_len: int = 5000):
        super(PositionalEncoding, self).__init__()
        self.dropout = nn.Dropout(dropout)
        
        # Computing the positional encodings in log space to avoid numerical overflow
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2) * -(torch.log(torch.Tensor([10000.0])) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0)
        self.register_buffer('pe', pe)
        
    def forward(self, x):
        x = x + Variable(self.pe[:, :x.size(1)], requires_grad=False)
        return self.dropout(x)


# Attention
Attention mechanism is the very thing that pushed this paper to the forefront of NLP research (apparantly Attention IS all you need).    
Here we basically do the following:
1. Find weights by multipying (dot product) the initial embedding of the first word with the embeddings of all other words.  
2. These weights are normalized (sum = 1)  
3. Weights are again multiplied with the embeddings of all words  


Attention(Q, K, V) = $softmax($ $QK^T \over \sqrt(d_k)$ $)V$


In other words, each word from Query Vector ($1$ x $k$) is multiplied with the Key matrix ($k$ x $k$) and normalized. This is then multiplied with the Value Vector. We explored single head attention here.

# Multi-Head Attention
In Multi-Head Attention, we have multiple $Q, K, V$ matrices split from the input. These matrices are fed through multiple Attention Blocks. The outputs are then concatenated to give us the final Attention Output.

In [12]:
class Attention:
    def __init__(self, dropout: float = 0.):
        super(Attention, self).__init__()
        self.dropout = nn.Dropout(dropout)
        self.softmax = nn.Softmax(dim=-1)

    def forward(self, query, key, value, mask=None):
        d_k = query.size(-1)
        scores = torch.matmul(query, key.transpose(-2, -1)) / math.sqrt(d_k)
        if mask is not None:
            scores = scores.masked_fill(mask == 0, -1e9)
        p_attn = self.dropout(self.softmax(scores))
        return torch.matmul(p_attn, value)
    
    def __call__(self, query, key, value, mask=None):
        return self.forward(query, key, value, mask)

In [13]:
class MultiHeadAttention(nn.Module):
    def __init__(self, h: int = 8, d_model: int = 512, dropout: float = 0.1):
        super(MultiHeadAttention, self).__init__()
        self.d_k = d_model // h
        self.h = h
        self.attn = Attention(dropout)
        self.lindim = (d_model, d_model)
        self.linears = nn.ModuleList([deepcopy(nn.Linear(*self.lindim)) for _ in range(4)])
        self.final_linear = nn.Linear(*self.lindim, bias=False)
        self.dropout = nn.Dropout(p=dropout)
        
    def forward(self, query, key, value, mask=None):
        if mask is not None:
            mask = mask.unsqueeze(1)
        
        query, key, value = [l(x).view(query.size(0), -1, self.h, self.d_k).transpose(1, 2) for l, x in zip(self.linears, (query, key, value))]
        nbatches = query.size(0)
        x = self.attn(query, key, value, mask=mask)
        
        # Concatenate and multiply by W^O
        x = x.transpose(1, 2).contiguous().view(nbatches, -1, self.h * self.d_k)
        return self.final_linear(x)


# Add & Norm
* It is known that techniques like Normalization (Mean = 0, Var = 1) and Residual Connections improve training time and performance. Hence, there is a layer of Add & Norm after every attention and feed-forward layer in both Encoder and Decoder Blocks.      
* Residual Connections refer to adding the output of the previous layer to the current layer's output.     
* Additionally, Dropouts are added too (help in generalization).

Here, we take a residual connection of the original word embedding, add it to the embedding from the multi-head attention, and then normalize it.  

Final output of each layer will then be:   
$$ResidualConnection(x) = x+Dropout(SubLayer(LayerNorm(x)))$$

In [14]:
class LayerNorm(nn.Module):
    def __init__(self, features: int, eps: float = 1e-6):
        super(LayerNorm, self).__init__()
        self.gamma = nn.Parameter(torch.ones(features))
        self.beta = nn.Parameter(torch.zeros(features))
        self.eps = eps

    def forward(self, x):
        mean = x.mean(-1, keepdim=True)
        std = x.std(-1, keepdim=True)
        return self.gamma * (x - mean) / (std + self.eps) + self.beta

class ResidualConnection(nn.Module):
    def __init__(self, size: int = 512, dropout: float = .1):
        super(ResidualConnection,  self).__init__()
        self.norm = LayerNorm(size)
        self.dropout = nn.Dropout(dropout)


    def forward(self, x, sublayer):
        return x + self.dropout(sublayer(self.norm(x)))


# Feed Forward Layer
Feed-Forward networks are essential as they help provide non-linearity and complexity to the neural network. The Transformer model has a ReLU (Rectified LInear Unit) and a Dropout layer.


In [15]:
from torch import nn
class FeedForward(nn.Module):
    def __init__(self, d_model: int = 512, d_ff: int = 2048, dropout: float = .1):
        super(FeedForward, self).__init__()
        self.l1 = nn.Linear(d_model, d_ff)
        self.l2 = nn.Linear(d_ff, d_model)
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(dropout)


    def forward(self, x):
        return self.l2(self.dropout(self.relu(self.l1(x))))

# Encoder Block
This block takes whole sentences as input. After the input sentence is through Input Embedding and Positional Embedding, the multi-head attention and feed-forward blocks are repeated $n$ times (hyperparameters), in the encoder block.

Here, $n = 6$

In [16]:
class EncoderLayer(nn.Module):
    def __init__(self, size: int, self_attn: MultiHeadAttention, feed_forward: FeedForward, dropout: float = .1):
        super(EncoderLayer, self).__init__()
        self.self_attn = self_attn
        self.feed_forward = feed_forward
        self.sub1 = ResidualConnection(size, dropout)
        self.sub2 = ResidualConnection(size, dropout)
        self.size = size

    def forward(self, x, mask):
        x = self.sub1(x, lambda x: self.self_attn(x, x, x, mask))
        return self.sub2(x, self.feed_forward)


class Encoder(nn.Module):
    def __init__(self, layer, n: int = 6):
        super(Encoder, self).__init__()
        self.layers = nn.ModuleList([deepcopy(layer) for _ in range(n)])
        self.norm = LayerNorm(layer.size)
        
    def forward(self, x, mask):
        for layer in self.layers:
            x = layer(x, mask)
        return self.norm(x)

# Decoder Block
This block recieves two main inputs: 
* Output of previous decoder: Can be a single/ series of tokens. This will be referred to as `prev_op`.
* Output from Encoder: Gives context

The prev_op is first passed through Embedding and positional encoding. Then, a Masked Multi-Head Attention system is applied (as output value should only depend on previously fed inputs. The future is masked.)

The output from Masked Multi-Head Attention layer, along with output of Encoder Block are send into a Multi-Head Attention layer. This then goes through a Feed Forward Layer.

In [17]:
class DecoderLayer(nn.Module):
    def __init__(self, size: int, self_attn: MultiHeadAttention, src_attn: MultiHeadAttention, 
                 feed_forward: FeedForward, dropout: float = .1):
        super(DecoderLayer, self).__init__()
        self.size = size
        self.self_attn = self_attn
        self.src_attn = src_attn
        self.feed_forward = feed_forward
        self.sub1 = ResidualConnection(size, dropout)
        self.sub2 = ResidualConnection(size, dropout)
        self.sub3 = ResidualConnection(size, dropout)
 
    def forward(self, x, memory, src_mask, tgt_mask):
        x = self.sub1(x, lambda x: self.self_attn(x, x, x, tgt_mask))
        x = self.sub2(x, lambda x: self.src_attn(x, memory, memory, src_mask))
        return self.sub3(x, self.feed_forward)

In [18]:
class Decoder(nn.Module):
    def __init__(self, layer: DecoderLayer, n: int = 6):
        super(Decoder, self).__init__()
        self.layers = nn.ModuleList([deepcopy(layer) for _ in range(n)])
        self.norm = LayerNorm(layer.size)
        
    def forward(self, x, memory, src_mask, tgt_mask):
        for layer in self.layers:
            x = layer(x, memory, src_mask, tgt_mask)
        return self.norm(x)

In [19]:
class EncoderDecoder(nn.Module):
    def __init__(self, encoder: Encoder, decoder: Decoder, 
                 src_embed: Embed, tgt_embed: Embed, final_layer: Output):
        super(EncoderDecoder, self).__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.src_embed = src_embed
        self.tgt_embed = tgt_embed
        self.final_layer = final_layer
        
    def forward(self, src, tgt, src_mask, tgt_mask):
        return self.final_layer(self.decode(self.encode(src, src_mask), src_mask, tgt, tgt_mask))
    
    def encode(self, src, src_mask):
        return self.encoder(self.src_embed(src), src_mask)
    
    def decode(self, memory, src_mask, tgt, tgt_mask):
        return self.decoder(self.tgt_embed(tgt), memory, src_mask, tgt_mask)

NameError: name 'Output' is not defined

# Output
The vector output from the decoder has to be transformed to a final output. This is done by creating a probability distribution over the whole vocabulary for each token. A SoftMax Function is used to define the probability dictribution, but here I will use LogSoftmax as it is apparantly faster.


In [None]:
class Output(nn.Module):
    def __init__(self, input_dim: int, output_dim: int):
        super(Output, self).__init__()
        self.l1 = nn.Linear(input_dim, output_dim)
        self.log_softmax = nn.LogSoftmax(dim=-1)
    
    def forward(self, x: torch.Tensor) -> torch.Tensor:
        op = self.l1(x)
        return self.log_softmax(op)