In [1]:
import os
from google.colab import drive

import numpy as np
import pandas as pd
import tensorflow as tf
import tensorflow_probability as tfp

import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime
from tensorflow.keras.optimizers.schedules import ExponentialDecay
from tensorflow.keras.optimizers import Adam


from tensorflow.keras.layers import TextVectorization, Dense, MaxPooling1D, Conv1D, LSTM, MultiHeadAttention, Flatten, Layer, LayerNormalization, Dropout, Embedding
from tensorflow.keras.preprocessing.sequence import pad_sequences

### Dataset

In [2]:
drive.mount('/content/drive')

Mounted at /content/drive


In [3]:
datasets_path = "/content/drive/My Drive/Machine Learning Datasets/Hindi-English"

In [4]:
Hindi_embedding_matrix = np.load(datasets_path + '/Hindi_embedding_matrix.npy')
English_embedding_matrix = np.load(datasets_path + '/English_embedding_matrix.npy')

Hindi_vectorized_text = np.load(datasets_path + '/Hindi_vectorized.npy')
English_vectorized_text = np.load(datasets_path + '/English_vectorized.npy')

In [5]:
print(
    f"""
    Hindi Embeddings Shape =  {Hindi_embedding_matrix.shape}
    English Embeddings shape = {English_embedding_matrix.shape}

    Hindi_vectors_shape = {Hindi_vectorized_text.shape}
    English_vectors_shape = {English_vectorized_text.shape}
    """
)


    Hindi Embeddings Shape =  (84469, 300)
    English Embeddings shape = (76776, 300)

    Hindi_vectors_shape = (127575, 417)
    English_vectors_shape = (127575, 398)
    


##### Note-
    -> Embeddings Dimensions for both Hindi and English texts = 300
    -> English texts have a vocabulary of 76776, while Hindi texts have a vocabulary of 84469
    -> The maximum number of words in English texts is 398, while for Hindi, it's 417
    -> There are 127_575 samples in parallel for both the features(English) and targets(Hindi)

In [6]:
BATCH_SIZE = 1024
MAX_SEQ_LEN = 450
EMBEDDING_DIMS = 300
HINDI_VOCAB_SIZE = Hindi_embedding_matrix.shape[0]
ENGLISH_VOCAB_SIZE = English_embedding_matrix.shape[0]

In [9]:
dataset = tf.data.Dataset.from_tensor_slices((English_vectorized_text, Hindi_vectorized_text))
dataset = dataset.shuffle(200_000)
train = dataset.take(100_000)
test = dataset.skip(100_000)
train = train.batch(BATCH_SIZE)
test = test.batch(1024)


## <center> Masking </center>
<p aligh = "justify" >Masking is a very simple but important factor in improving the accuracy of a model. It helps to tell a model, which parts of the input data to consider, at a particular Timestep. </p>

<br>

### <center> Types Of Masks </center>

####  Padding Mask
<p align = "justify">
We have already padded our sequences towards the end, to keep the sequence size consistent over the training examples. But, if this data is fed directly to the model, it might get something from the padded portions, which don't even exist in the sequence. So, we send along with our inputs, a padding mask function, that masks the padded region.
</p>

####  Look Ahead Mask
<p align = "justify">
The Transformer Network has an Encoder-Decoder Architecture. The decoder architecture tries to predict the text for next timestep, only considering the past timesteps. Look Ahead Mask allows us to mask the upcoming sections of the text.
</p>


In [11]:
def create_padding_mask(seq):
    seq = tf.cast(tf.math.equal(seq, 0), tf.float32)
    ## Adding required dimensions to the padding
    return seq[:, tf.newaxis, tf.newaxis, :]        # (Batch_size, 1, 1, seq_len)


# Working Example
x = tf.constant([[7, 6, 0, 0, 1], [1, 2, 3, 0, 0], [0, 0, 0, 4, 5]])
create_padding_mask(x)

<tf.Tensor: shape=(3, 1, 1, 5), dtype=float32, numpy=
array([[[[0., 0., 1., 1., 0.]]],


       [[[0., 0., 0., 1., 1.]]],


       [[[1., 1., 1., 0., 0.]]]], dtype=float32)>

In [20]:
def create_lookahead_mask(size):
    n = int(size * (size+1) / 2)
    mask = tfp.math.fill_triangular(tf.ones((n,), dtype=tf.int32), upper=False)
    return tf.cast(mask, tf.float32)

# Working Example
x = tf.random.uniform((1, 3))
temp = create_lookahead_mask(x.shape[1])
print(x)
print(temp)

tf.Tensor([[0.8253505  0.84161127 0.8758038 ]], shape=(1, 3), dtype=float32)
tf.Tensor(
[[1. 0. 0.]
 [1. 1. 0.]
 [1. 1. 1.]], shape=(3, 3), dtype=float32)


## <center> Positional Encoding </center>
<p align = "justify">
Attention layers see their input as a set of vectors, with no sequential order. This model also doesn't contain any recurrent or convolutional layers. Because of this a "positional encoding" is added to give the model some information about the relative position of the tokens in the sentence.<br>
The positional encoding vector is added to the embedding vector. Embeddings represent a token in a d-dimensional space where tokens with similar meaning will be closer to each other. But the embeddings do not encode the relative position of tokens in a sentence. So after adding the positional encoding, tokens will be closer to each other based on the similarity of their meaning and their position in the sentence, in the d-dimensional space.

$$\Large{PE_{(pos, 2i)} = \sin(pos / 10000^{2i / d_{model}})} $$
$$\Large{PE_{(pos, 2i+1)} = \cos(pos / 10000^{2i / d_{model}})} $$
<center>
<img src = "https://camo.githubusercontent.com/c279dcae2225189217ab2827711b56b10919d179aead5584e87426de6ee27a67/68747470733a2f2f6a696e676c6573636f64652e6769746875622e696f2f6173736574732f696d672f706f7374732f696c6c75737472617465642d67756964652d7472616e73666f726d65722d31302e6a7067" width = "800"> </center>

In [55]:
def get_positional_encoding(num_positions:int, dimensions:int):
    """
    num_positions: Length Of Sequences in the dataset after padding
    dimensions: Number of dimensions used to represent each word in embedding matrix
    """
    # Create a column vector for positions
    pos_vec = np.arange(num_positions)[:, np.newaxis]
    
    # Create a row vector for dimensions
    dims_vec = np.arange(dimensions)[np.newaxis, :]
    
    i = dims_vec // 2
    angles = pos_vec * 1.0 / (np.power(10_000, 2 * i / np.float32(dimensions)))
    angles[:, 0::2] = np.sin(angles[:, 0::2])
    angles[:, 0::1] = np.cos(angles[:, 0::1])
    pos_encoding = angles[np.newaxis, ...]
    return tf.cast(pos_encoding, tf.float32)

get_positional_encoding(10, 4)

<tf.Tensor: shape=(1, 10, 4), dtype=float32, numpy=
array([[[ 1.        ,  1.        ,  1.        ,  1.        ],
        [ 0.66636676,  0.5403023 ,  0.99995   ,  0.99995   ],
        [ 0.6143003 , -0.41614684,  0.9998    ,  0.9998    ],
        [ 0.9900591 , -0.9899925 ,  0.99955016,  0.99955004],
        [ 0.7270351 , -0.6536436 ,  0.9992005 ,  0.9992001 ],
        [ 0.5744009 ,  0.2836622 ,  0.9987513 ,  0.99875027],
        [ 0.9612168 ,  0.96017027,  0.9982027 ,  0.99820054],
        [ 0.7918362 ,  0.75390226,  0.997555  ,  0.997551  ],
        [ 0.5492263 , -0.14550003,  0.9968085 ,  0.99680173],
        [ 0.9162743 , -0.91113025,  0.99596363,  0.9959527 ]]],
      dtype=float32)>

# <center> Transformer </center>
<center>
<img src = "https://miro.medium.com/max/642/1*oW9WHT_EAbcgSodkTMTTLA.png" height = 400></img>

#### Encoder
<p align = "justify">
The Transformer Encoder layer pairs self-attention and convolutional neural network style of processing to improve the speed of training and passes K and V matrices to the Decoder. The inputs are passed through multiple layers of similar structure, to get the outputs to be fed to Decoder. The dimensions of the inputs are secured as they are, because blocks are repeated.
</p>
###### Steps-
    -> Input Sequence is

<img src = "https://jalammar.github.io/images/t/transformer_resideual_layer_norm.png" height = "300"> </img>

In [72]:
class Encoder_Layer(Layer):
    def __init__(self, embedding_dims, num_heads, fully_connected_dim,
                 dropout_rate = 0.1, layernorm_eps = 1e-6):
        super(Encoder_Layer, self).__init__()

        self.mha = MultiHeadAttention(num_heads,
                                      key_dim = embedding_dims,
                                      dropout = dropout_rate)
        self.dense1 = Dense(fully_connected_dim, activation = 'relu')
        self.dense2 = Dense(embedding_dims)
        
        self.layer_norm1 = LayerNormalization(epsilon = layernorm_eps)
        self.layer_norm2 = LayerNormalization(epsilon = layernorm_eps)

        self.dropout_ffn = Dropout(dropout_rate)

    def call(self, x, mask):
        attnout = self.mha(x, x, x, mask)
        attnout = self.layer_norm1(x + attnout)

        ffn_out = self.dense1(attnout)
        ffn_out = self.dense2(attnout)
        ffn_out = self.dropout_ffn(ffn_out)
        encoder_layer_out = self.layer_norm2(ffn_out + attnout)
        return encoder_layer_out

In [73]:
class Encoder(Layer):
    def __init__(self, num_layers, num_heads,
                 embedding_dims, fully_connencted_dim,
                 input_vocab_size, sequence_len,
                 dropout_rate = 0.1, layernorm_eps = 1e-6):
        super(Encoder, self).__init__()
        
        self.embedding_dims = embedding_dims
        self.num_layers = num_layers
        self.sequence_len = sequence_len
        self.embedding = Embedding(input_vocab_size, embedding_dims,
                            embeddings_initializer = tf.keras.initializers.Constant(English_embedding_matrix))
        
        self.positional_encoding = get_positional_encoding(self.sequence_len, embedding_dims)

        self.encoding_layers = [Encoder_Layer(
                                    embedding_dims = embedding_dims,
                                    num_heads = num_heads,                    
                                    fully_connected_dim = fully_connencted_dim,                                
                                    dropout_rate = dropout_rate, layernorm_eps = layernorm_eps
                                ) for i in range(self.num_layers)]
        self.dropout = Dropout(dropout_rate)


    def call(self, x, padding_mask):

        x = self.embedding(x)

        ## Scaling the embeddings
        x *= tf.math.sqrt(tf.cast(self.embedding_dims, tf.float32))

        ## Adding the positional encodings to embeddings
        x += self.positional_encoding[:, :self.sequence_len, :]
        
        x = self.dropout(x)

        # Passing x through series of encoder_layers:
        for i in range(self.num_layers):
            x = self.encoding_layers[i](x, mask = padding_mask)
        return x



#### Decoder
<p align = "justify">
Similar to the Encoder, Decoder has multiple decoder_layers.
</p>

<img src = "https://www.mihaileric.com/static/output_token-06e143fae69ea58572608d65a546255c-06aef.png" height = 400></img>

In [74]:
class Decoder_Layer(Layer):
    def __init__(self, num_heads, embedding_dims, fully_connected_dim, dropout_rate = 0.1, layernorm_eps = 1e-6):

        self.mha1 = MultiHeadAttention(num_heads, embedding_dims, dropout = dropout_rate)
        self.layernorm1 = LayerNormalization(epsilon = layernorm_eps)

        self.mha2 = MultiHeadAttention(num_heads, embedding_dims, dropout = dropout_rate)
        self.layernorm2 = LayerNormalization(epsilon = layernorm_eps)

        self.dense1 = Dense(fully_connected_dim, activation = 'relu')
        self.dense2 = Dense(embedding_dims)
        self.dropout_ = Dropout(dropout_rate)
        self.layernorm3 = LayerNormalization(epsilon = layernorm_eps)

    def call(self, x, encoder_out, padding_mask, lookahead_mask):
        attn1_out = self.mha1(x, x, x, mask = lookahead_mask)
        attn1_out = self.layernorm1(x + attn1_out)

        attn2_out = self.mha2(attn1_out, encoder_out, encoder_out, mask = padding_mask)
        attn2_out = self.layernorm2(attn1_out + attn2_out)

        dense_out = self.dense1(attn2_out)
        dense_out = self.dense2(dense_out)
        dense_out = self.dropout_(dense_out)

        decoder_out = self.layernorm3(dense_out + attn2_out)
        return decoder_out


In [75]:
class Decoder(Layer):
    def __init__(self, num_layers, num_heads,
                 embedding_dims, fully_connected_dim,
                 output_vocab_size, sequence_length,
                 dropout_rate = 0.1,
                 layernorm_eps = 1e-6
                 ):
        super(Decoder, self).__init__()
        self.num_layers = num_layers
        self.seq_len = sequence_length
        self.embedding_dims = embedding_dims

        self.embedding = Embedding(output_vocab_size, embedding_dims,
                            embeddings_initializer = tf.keras.initializers.Constant(English_embedding_matrix))
        self.positional_encoding = get_positional_encoding(sequence_length, embedding_dims)

        self.decoder_layers = [Decoder_Layer(num_heads, embedding_dims,
                                              fully_connected_dim, dropout_rate,
                                              layernorm_eps) for i in range(num_layers)]


        self.dropout = Dropout(dropout_rate)

    def call(self, x, encoder_out, padding_mask, lookahead_mask):
        x = self.embeddings(x)
        x *= tf.math.sqrt(tf.cast(self.embedding_dims, tf.float32))
        x += self.positional_encoding[:, :self.seq_len, :]
        x = self.dropout(x)

        for i in range(self.num_layers):
            x = self.decoder_layers[i]( x, encoder_out, padding_mask, lookahead_mask)

        return x
        
