References:
- https://www.youtube.com/watch?v=Nw_PJdmydZY
- https://www.youtube.com/watch?v=Xg5JG30bYik

In [1]:
import math

import numpy as np

import torch
from torch import nn
import torch.nn.functional as F

In [2]:
print('NumPy version:', np.__version__)
print('PyTorch version:', torch.__version__)

NumPy version: 1.22.4
PyTorch version: 2.0.0+cu118


In [3]:
def get_device():
    return torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

## Transformer

### Scaled-Dot Product Attention

In [4]:
def scaled_dot_product(q, k, v, mask=None):
    d_k = q.size()[-1]
    scaled = torch.matmul(q, k.transpose(-1, -2)) / math.sqrt(d_k)
    if mask is not None:
        scaled = scaled.permute(1, 0, 2, 3) + mask
        scaled = scaled.permute(1, 0, 2, 3)
    attention = F.softmax(scaled, dim=-1)
    values = torch.matmul(attention, v)
    return values, attention

### Positional Encoding

In [5]:
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_sequence_length):
        super().__init__()
        self.max_sequence_length = max_sequence_length
        self.d_model = d_model

    def forward(self):
        even_i = torch.arange(0, self.d_model, 2).float()
        denominator = torch.pow(10000, even_i/self.d_model)
        position = (torch.arange(self.max_sequence_length)
                          .reshape(self.max_sequence_length, 1))
        even_PE = torch.sin(position / denominator)
        odd_PE = torch.cos(position / denominator)
        stacked = torch.stack([even_PE, odd_PE], dim=2)
        PE = torch.flatten(stacked, start_dim=1, end_dim=2)
        return PE

### Sentence Embedding

In [6]:
class SentenceEmbedding(nn.Module):
    "For a given sentence, create an embedding"
    def __init__(self, max_sequence_length, d_model, language_to_index, 
                 START_TOKEN, END_TOKEN, PADDING_TOKEN):
        super().__init__()
        self.vocab_size = len(language_to_index)
        self.max_sequence_length = max_sequence_length
        self.embedding = nn.Embedding(self.vocab_size, d_model)
        self.language_to_index = language_to_index
        self.position_encoder = PositionalEncoding(d_model, 
                                                   max_sequence_length)
        self.dropout = nn.Dropout(p=0.1)
        self.START_TOKEN = START_TOKEN
        self.END_TOKEN = END_TOKEN
        self.PADDING_TOKEN = PADDING_TOKEN
    
    def batch_tokenize(self, batch, start_token, end_token):

        def tokenize(sentence, start_token, end_token):
            sentence_word_indicies = [self.language_to_index[token] 
                                      for token in list(sentence)]
            if start_token:
                sentence_word_indicies.insert(0, 
                                              self.language_to_index[
                                                  self.START_TOKEN])
            if end_token:
                sentence_word_indicies.append(
                    self.language_to_index[self.END_TOKEN])
            for _ in range(len(sentence_word_indicies), 
                           self.max_sequence_length):
                sentence_word_indicies.append(
                    self.language_to_index[self.PADDING_TOKEN])
            return torch.tensor(sentence_word_indicies)

        tokenized = []
        for sentence_num in range(len(batch)):
           tokenized.append( tokenize(batch[sentence_num], 
                                      start_token, 
                                      end_token) )
        tokenized = torch.stack(tokenized)
        return tokenized.to(get_device())
    
    def forward(self, x, start_token, end_token): # sentence
        x = self.batch_tokenize(x, start_token, end_token)
        x = self.embedding(x)
        pos = self.position_encoder().to(get_device())
        x = self.dropout(x + pos)
        return x

### Multi-Head Attention

In [7]:
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, num_heads):
        super().__init__()
        self.d_model = d_model
        self.num_heads = num_heads
        self.head_dim = d_model // num_heads
        self.qkv_layer = nn.Linear(d_model , 3 * d_model)
        self.linear_layer = nn.Linear(d_model, d_model)
    
    def forward(self, x, mask):
        batch_size, sequence_length, d_model = x.size()
        qkv = self.qkv_layer(x)
        qkv = qkv.reshape(batch_size, sequence_length, 
                          self.num_heads, 3 * self.head_dim)
        qkv = qkv.permute(0, 2, 1, 3)
        q, k, v = qkv.chunk(3, dim=-1)
        values, attention = scaled_dot_product(q, k, v, mask)
        values = values.reshape(batch_size, sequence_length,
                                self.num_heads * self.head_dim)
        out = self.linear_layer(values)
        return out

### Layer Normalization

In [8]:
class LayerNormalization(nn.Module):
    def __init__(self, parameters_shape, eps=1e-5):
        super().__init__()
        self.parameters_shape=parameters_shape
        self.eps=eps
        self.gamma = nn.Parameter(torch.ones(parameters_shape))
        self.beta =  nn.Parameter(torch.zeros(parameters_shape))

    def forward(self, inputs):
        dims = [-(i + 1) for i in range(len(self.parameters_shape))]
        mean = inputs.mean(dim=dims, keepdim=True)
        var = ((inputs - mean) ** 2).mean(dim=dims, keepdim=True)
        std = (var + self.eps).sqrt()
        y = (inputs - mean) / std
        out = self.gamma * y + self.beta
        return out

### Position-wise Feed Forward Network

In [9]:
class PositionwiseFeedForward(nn.Module):
    def __init__(self, d_model, hidden, drop_prob=0.1):
        super(PositionwiseFeedForward, self).__init__()
        self.linear1 = nn.Linear(d_model, hidden)
        self.linear2 = nn.Linear(hidden, d_model)
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(p=drop_prob)

    def forward(self, x):
        x = self.linear1(x)
        x = self.relu(x)
        x = self.dropout(x)
        x = self.linear2(x)
        return x

### Encoder Layer

In [10]:
class EncoderLayer(nn.Module):
    def __init__(self, d_model, ffn_hidden, num_heads, drop_prob):
        super(EncoderLayer, self).__init__()
        self.attention = MultiHeadAttention(d_model=d_model, 
                                            num_heads=num_heads)
        self.norm1 = LayerNormalization(parameters_shape=[d_model])
        self.dropout1 = nn.Dropout(p=drop_prob)
        self.ffn = PositionwiseFeedForward(d_model=d_model, hidden=ffn_hidden, 
                                           drop_prob=drop_prob)
        self.norm2 = LayerNormalization(parameters_shape=[d_model])
        self.dropout2 = nn.Dropout(p=drop_prob)

    def forward(self, x, self_attention_mask):
        residual_x = x.clone()
        x = self.attention(x, mask=self_attention_mask)
        x = self.dropout1(x)
        x = self.norm1(x + residual_x)
        residual_x = x.clone()
        x = self.ffn(x)
        x = self.dropout2(x)
        x = self.norm2(x + residual_x)
        return x

### Sequential Encoder

In [11]:
class SequentialEncoder(nn.Sequential):
    def forward(self, *inputs):
        x, self_attention_mask  = inputs
        for module in self._modules.values():
            x = module(x, self_attention_mask)
        return x

### Encoder

In [12]:
class Encoder(nn.Module):
    def __init__(self, 
                 d_model, 
                 ffn_hidden, 
                 num_heads, 
                 drop_prob, 
                 num_layers,
                 max_sequence_length,
                 language_to_index,
                 START_TOKEN,
                 END_TOKEN, 
                 PADDING_TOKEN):
        super().__init__()
        self.sentence_embedding = SentenceEmbedding(max_sequence_length, 
                                                    d_model, language_to_index, 
                                                    START_TOKEN, END_TOKEN, 
                                                    PADDING_TOKEN)
        self.layers = SequentialEncoder(*[EncoderLayer(d_model, ffn_hidden, 
                                                       num_heads, drop_prob)
                                      for _ in range(num_layers)])

    def forward(self, x, self_attention_mask, start_token, end_token):
        '''
        x: batch of english sentences
        '''

        # construct the sentence embeddings
        # take every single sentence, we're going to use the character
        # we have it map to integer values for every single sentence
        x = self.sentence_embedding(x, start_token, end_token)

        # pass it to the encoder
        # self_attention_mask is just the padding mask because in the English
        # sentence we are allowed to look forward and backwards
        # don't need the look ahead mask here
        x = self.layers(x, self_attention_mask)

        # going to get a list of contextually aware characters in the form of
        # vectors
        return x

### Multi-Head Cross Attention

In [13]:
class MultiHeadCrossAttention(nn.Module):
    def __init__(self, d_model, num_heads):
        super().__init__()
        self.d_model = d_model
        self.num_heads = num_heads
        self.head_dim = d_model // num_heads
        self.kv_layer = nn.Linear(d_model , 2 * d_model)
        self.q_layer = nn.Linear(d_model , d_model)
        self.linear_layer = nn.Linear(d_model, d_model)
    
    def forward(self, x, y, mask):
        batch_size, sequence_length, d_model = x.size() 
        # in practice, this is the same for both languages...
        # so we can technically combine with normal attention
        
        kv = self.kv_layer(x)
        q = self.q_layer(y)
        kv = kv.reshape(batch_size, sequence_length, self.num_heads, 
                        2 * self.head_dim)
        q = q.reshape(batch_size, sequence_length, self.num_heads, 
                      self.head_dim)
        kv = kv.permute(0, 2, 1, 3)
        q = q.permute(0, 2, 1, 3)
        k, v = kv.chunk(2, dim=-1)
        values, attention = scaled_dot_product(q, k, v, mask) 
        # We don't need the mask for cross attention, 
        # removing in outer function!
        
        values = values.reshape(batch_size, sequence_length, d_model)
        out = self.linear_layer(values)
        return out

### Decoder

In [14]:
class DecoderLayer(nn.Module):
    def __init__(self, d_model, ffn_hidden, num_heads, drop_prob):
        super(DecoderLayer, self).__init__()
        self.self_attention = MultiHeadAttention(d_model=d_model, 
                                                 num_heads=num_heads)
        self.layer_norm1 = LayerNormalization(parameters_shape=[d_model])
        self.dropout1 = nn.Dropout(p=drop_prob)

        self.encoder_decoder_attention = MultiHeadCrossAttention(
                                          d_model=d_model, 
                                          num_heads=num_heads)
        self.layer_norm2 = LayerNormalization(parameters_shape=[d_model])
        self.dropout2 = nn.Dropout(p=drop_prob)

        self.ffn = PositionwiseFeedForward(d_model=d_model, hidden=ffn_hidden, 
                                           drop_prob=drop_prob)
        self.layer_norm3 = LayerNormalization(parameters_shape=[d_model])
        self.dropout3 = nn.Dropout(p=drop_prob)

    def forward(self, x, y, self_attention_mask, cross_attention_mask):
        _y = y.clone()
        y = self.self_attention(y, mask=self_attention_mask)
        y = self.dropout1(y)
        y = self.layer_norm1(y + _y)

        _y = y.clone()
        y = self.encoder_decoder_attention(x, y, mask=cross_attention_mask)
        y = self.dropout2(y)
        y = self.layer_norm2(y + _y)

        _y = y.clone()
        y = self.ffn(y)
        y = self.dropout3(y)
        y = self.layer_norm3(y + _y)
        return y

### Sequential Decoder

In [15]:
class SequentialDecoder(nn.Sequential):
    def forward(self, *inputs):
        x, y, self_attention_mask, cross_attention_mask = inputs
        for module in self._modules.values():
            y = module(x, y, self_attention_mask, cross_attention_mask)
        return y

### Decoder

In [16]:
class Decoder(nn.Module):
    def __init__(self, 
                 d_model, 
                 ffn_hidden, 
                 num_heads, 
                 drop_prob, 
                 num_layers,
                 max_sequence_length,
                 language_to_index,
                 START_TOKEN,
                 END_TOKEN, 
                 PADDING_TOKEN):
        super().__init__()
        self.sentence_embedding = SentenceEmbedding(max_sequence_length, 
                                                    d_model, 
                                                    language_to_index, 
                                                    START_TOKEN, 
                                                    END_TOKEN, 
                                                    PADDING_TOKEN)
        self.layers = SequentialDecoder(*[DecoderLayer(d_model, 
                                                       ffn_hidden, 
                                                       num_heads, 
                                                       drop_prob
                                        ) for _ in range(num_layers)])

    def forward(self, x, y, self_attention_mask, cross_attention_mask, 
                start_token, end_token):
        '''
        x: batch of vectors x
        y: batch of targeted sentences
        '''

        # perform sentence embeddings
        # start of Decoder will include the start_token
        # end of Decoder will include the end_token
        y = self.sentence_embedding(y, start_token, end_token)

        # pass to Decoder layers
        # pass the self_attention_mask and cross_attention_mask
        y = self.layers(x, y, self_attention_mask, cross_attention_mask)

        # output is a set of Kannada character vectors
        return y


### Transformer

In [17]:
class Transformer(nn.Module):
    def __init__(self, 
                d_model, # dimensions for every single character vector
                         # typically take it as 512
                ffn_hidden, # hidden layers for feed forward network
                            # set at 2048
                num_heads, # number of heads in Multi-Head Attention as well as 
                            # Multi-Head Cross Attention
                drop_prob, # Dropout as the probability we will switch of
                          # neurons to promote generalization
                          # typically set at 0.1
                num_layers, # number of Encoder and Decoder layers
                            # can set at 5
                max_sequence_length, # maximum number of characters in a given
                                  # input sentence which we set it to about 200
                kn_vocab_size, # all the possible characters that can occur in
                              # Kannada sentence or translation
                english_to_index, # dictionary that maps a character to a unique
                                  # number
                kannada_to_index,
                 # Start, End and Padding tokens respectively
                START_TOKEN, 
                END_TOKEN, 
                PADDING_TOKEN
                ):
        super().__init__()
        self.encoder = Encoder(d_model, ffn_hidden, num_heads, drop_prob, 
                               num_layers, max_sequence_length, 
                               english_to_index, START_TOKEN, END_TOKEN, 
                               PADDING_TOKEN)
        self.decoder = Decoder(d_model, ffn_hidden, num_heads, drop_prob, 
                               num_layers, max_sequence_length, 
                               kannada_to_index, START_TOKEN, END_TOKEN, 
                               PADDING_TOKEN)
        self.linear = nn.Linear(d_model, kn_vocab_size)
        self.device = get_device()
  
    def forward(self, 
                x, # batch of English snetences
                y, # batch of target Kannada sentences

                # Masks
                encoder_self_attention_mask=None, 
                # incorporate the padding mask
                
                decoder_self_attention_mask=None, 
                # incorporate both padding and look ahead mask
                
                decoder_cross_attention_mask=None, 
                # also incorporate the padding mask
                
                
                # these four other parameters are
                # going to determine whether we should
                # include a start token and an end token
                # to our encoder and decoder inputs and
                # outputs in this case false means that we
                # do not include it and true means that we do
                
                enc_start_token=False,
                enc_end_token=False,
                dec_start_token=False, # We should make this true
                dec_end_token=False): # x, y are batch of sentences
        '''
        Transformer call to the forward pass
        '''

        # take the batch of sentences x and pass it through the Encoder
        # get the list of character vectors that are eventually going to be 
        # context aware
        x = self.encoder(x, encoder_self_attention_mask, 
                         start_token=enc_start_token, 
                         end_token=enc_end_token)
        
        # take those vectors and pass it to the Decoder along with the 
        # targeted batch Kannada sentences y
        # get the output of the decoder
        out = self.decoder(x, y, decoder_self_attention_mask, 
                           decoder_cross_attention_mask, 
                           start_token=dec_start_token, 
                           end_token=dec_end_token)
        
        
        out = self.linear(out)

        # evetually going to pass it to a softmax activation 
        # when we're computing the loss function
        return out

## Dependencies

In [18]:
!pip install session-info

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting session-info
  Downloading session_info-1.0.0.tar.gz (24 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting stdlib_list
  Downloading stdlib_list-0.8.0-py3-none-any.whl (63 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m63.5/63.5 kB[0m [31m972.1 kB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: session-info
  Building wheel for session-info (setup.py) ... [?25l[?25hdone
  Created wheel for session-info: filename=session_info-1.0.0-py3-none-any.whl size=8042 sha256=e6f8d64da0a887802a711689b7f9d0ccc4a6bf443a2d071ba7f3e8e34521cc7c
  Stored in directory: /root/.cache/pip/wheels/6a/aa/b9/eb5d4031476ec10802795b97ccf937b9bd998d68a9b268765a
Successfully built session-info
Installing collected packages: stdlib_list, session-info
Successfully installed session-info-1.0.0 stdlib_list-0.8.0


In [19]:
import session_info

session_info.show()