In [1]:
import tensorflow as tf
import time
import numpy as np 
import matplotlib.pyplot as plt

from tensorflow.keras.layers import MultiHeadAttention, Embedding, Dense, Input, Dropout, LayerNormalization
from transformers import DistilBertTokenizerFast 
from transformers import TFDistilBertForTokenClassification

2024-06-15 06:42:48.748243: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:9261] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2024-06-15 06:42:48.748354: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:607] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2024-06-15 06:42:48.882398: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1515] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered


# Positional Encodings 
We calculate the positional encodings and add it to the input sequence as an input the encoder and decoder inorder to get the specific poistion of the word in the setence.

The formula being:
$$
PE_{(pos, 2i)}= sin\left(\frac{pos}{{10000}^{\frac{2i}{d}}}\right)
\tag{1}$$
<br>
$$
PE_{(pos, 2i+1)}= cos\left(\frac{pos}{{10000}^{\frac{2i}{d}}}\right)
\tag{2}$$

* $d$ is the dimension of the word embedding and positional encoding
* $pos$ is the position of the word.
* $k$ refers to each of the different dimensions in the positional encodings, with $i$ equal to $k$ $//$ $2$.

In [2]:
# Function to get the angles
def get_angles(pos,k,d):
    i = k//2
    
    angles = pos / np.power(10000 ,(2*i)/ np.float32(d))
    
    return angles

In [3]:
# Function to calculate the positional encodings
def positional_encodings(positions,d):
    angle_rads = get_angles(np.arange(positions)[:,np.newaxis],
                            np.arange(d)[np.newaxis,:],
                            d)
    
    angle_rads[:,0::2] = np.sin(angle_rads[:,0::2])
    angle_rads[:,1::2] = np.cos(angle_rads[:,1::2])
    
    pos_encoding = angle_rads[np.newaxis,...]
    
    return tf.cast(pos_encoding,dtype= tf.float32)


In [4]:
pe = positional_encodings(50,512)
print(pe.shape)

(1, 50, 512)


## 2. Masking

We will use 2 types of masks which are:
- Padding Mask : This adds a boolean mask after as to which entry in the padding sequence must be addressed and which not especially the zeros.
- Look Ahead Mask : This helps the model pretend that it correctly predicted a part of the output and see if, without looking ahead, it can correctly predict the next output. 

In [5]:
def padding_mask(decoder_token_ids):
    seq = 1 - tf.cast(tf.math.equal(decoder_token_ids,0),dtype=tf.float32)
    
    return seq[:,tf.newaxis,:]

x = tf.constant([[7., 6., 0., 0., 1.], [1., 2., 3., 0., 0.], [0., 0., 0., 4., 5.]])
print(padding_mask(x))

tf.Tensor(
[[[1. 1. 0. 0. 1.]]

 [[1. 1. 1. 0. 0.]]

 [[0. 0. 0. 1. 1.]]], shape=(3, 1, 5), dtype=float32)


In [6]:
def look_ahead_mask(sequence_length):
    mask = tf.linalg.band_part(tf.ones((1, sequence_length, sequence_length)), -1, 0)
    return mask

In [7]:
x = tf.random.uniform((1, 3))
temp = look_ahead_mask(x.shape[1])
temp

<tf.Tensor: shape=(1, 3, 3), dtype=float32, numpy=
array([[[1., 0., 0.],
        [1., 1., 0.],
        [1., 1., 1.]]], dtype=float32)>

In [8]:
def scaled_dot_product_attention(q,v,k, mask):
    mat_mul = tf.matmul(q,v,transpose_b=True)
    
    dk = tf.cast(tf.shape(k)[-1], tf.absfloat32)
    
    scaled_attention_logits = mat_mul / dk
    
    if mask is None:
        scaled_attention += ((1 - mask) * -1e9)
        
    attention_weights = tf.nn.softmax(scaled_attention_logits,axis=-1)
    
    outputs = tf.matmul(attention_weights,v)
    
    return outputs,attention_weights

# The Encoder

In [9]:
# This is the feed forward network 
def FullyConnected(embedding_dim, fully_connected_dim):
    return tf.keras.Sequential([
        tf.keras.layers.Dense(fully_connected_dim,activation='relu'),
        tf.keras.layers.Dense(embedding_dim)
    ])

In [10]:
class EncodeLayer(tf.keras.layers.Layer):
    def __init__(self,embedding_dim,num_heads, fully_connected_dim,
                dropout_rate=0.6, layernorm_eps = 1e-6):
        super(EncoderLayer, self).__init__()
        
        self.mha = MultiHeadAttention(num_heads = num_heads,
                                     key_dim = embedding_dim,
                                     dropout = dropout_rate)
        
        self.ffn = FullyConnected(embedding_dim,fully_connected_dim)
        
        self.layernorm1 = LayerNormalization(epsilon=layernorm_eps)
        self.layernorm2 = LayerNormalization(epsilon=layernorm_eps)
        
        self.dropout_ffn = Dropout(rate=dropout_rate)
        
    def call(self,x,training,mask):
        
        self_mha_output = self.mha(x,x,x,mask) # Gives the multihead attention logits
        
        skip_x_attention = self.layernorm1(x + self_mha_output) # Here we apply the add and norm layer for better performance
        
        ffn_output = self.ffn(skip_x_attention) # Pass the logits from mha to ffn
        
        ffn_output = self.dropout_ffn(fnn_output,training = training) # Use dropout on the computed weights
        
        encoder_layer_output = self.layernorm2(ffn_output+skip_x_attention)
        

        return encode_layer_output
        

In [11]:
# Now the full Encoding layer using positional encodings
class Encoder(tf.keras.layers.Layer):
    def __init__(self,num_layers, embedding_dim, num_heads, fully_connected_dim, input_vocab_size,
               maximum_position_encoding, dropout_rate=0.1, layernorm_eps=1e-6):
        super(Encoder, self).__init__()
        self.embedding_dim = embedding_dim
        self.fully_connected_dim = fully_connected_dim
        self.num_layer = num_layers
        
        self.embedding =Embedding(input_dim = input_vocab_size,output_dim = self.embedding_dim)
        
        self.encode_layers= [EncodeLayer(self.embedding_dim,
                                    num_heads = num_heads,
                                    fully_connected_dim = fully_connected_dim,
                                    dropout_rate = dropout_rate,
                                    layernorm_eps = layernorm_eps)
                            for _ in range(self.num_layers)] 
        self.pos_encoding = positional_encoding(maximum_position_encoding, 
                                                self.embedding_dim)
        
        self.dropout = Dropout(dropout_layer)
        
    
    def call(self,x,trainig,mask):
        seq_len = tf.shape(x)[1]
        
        x = self.embedding(x)
        
        x*= tf.math.sqrt(tf.cast(self.embedding_dim,tf.float32)) # This is to scale the embedding with the sqrt(dimensions of the embeddings)
        
        x += self.pos_encoding[:, :seq_len,:]# Add the positional encodings to the embeddings
        
        x = self.dropout(x,training = training)
        
        for i in range(self.num_layers):
            x = self.encode_layers[i](x,training,mask)
            
        return x
        
        

In [14]:
class DecodeLayer(tf.keras.layers.Layer):
    def __init__(self,embedding_dim,full_connected_dim,num_heads,dropout_rate=0.1,layernorm_eps = 1e-6):
        
        self.mha1 = MultiHeadAttention(num_heads,
                                      key_dim = embedding_dim,
                                      dropout = dropout_rate)
        
        self.mha2 = MultiHeadAttention(num_heads,
                                      key_dim = embedding_dim,
                                      dropout = dropout_rate)
        
        self.ffn = FullyConnected(embedding_dim = embedding_dim,
                                 fully_connected_dim = fully_connected_dim)
        
        self.layernorm1 = LayerNormalization(epsilon = layernorm_eps)
        self.layernorm2 = LayerNormalization(epsilon = layernorm_eps)
        self.layernorm3 = LayerNormalization(epsilon = layernorm_eps)
        
        self.DropoutFFN = Dropout(dropout_rate)
        
        
    def call(self,x,encoder_output,training,padding_mask,look_ahead_mask):
        
        mha_out_1, attn_weights_1 = self.mha1(x,x,x,look_ahead_mask,return_attention_scores=True)
        
        Q1 = self.layernorm1(x + mha_out_1) # Apply normalization layer to the sum of input and mha_output
        
        mha_out_2 , attn_weights_2 = self.mha2(Q1,encoder_output,encoder_output,padding_mask, return_attention_scores=True)
        
        mha_out_2 = self.layernorm2(Q1 + mha_out_2)
        
        ffn = self.ffn(mha_out_2)
        
        ffn = self.DropoutFFN(ffn,training=training)
        
        out3 = self.layernorm3(ffn + mha_out_2)
        
        
        return out3,attn_weights_1, attn_weights_2  

In [15]:
class Decoder(tf.keras.layers.Layer):
    def __init__(self,num_layers,num_heads,embedding_dim,fully_connected_dim,target_vocab_size,
                max_position_encoding,dropout_rate=0.1,layer_eps=1e-6):
        self.num_layer = num_layers
        self.embedding_dim = embedding_dim,
        
        self.embedding = Embedding(target_vocab_size,embedding_dim)
        self.positional_encoding = positional_encoding(max_positional_encoding,embedding_dim)
        
        self.dec_layers = [
            DecoderLayer(self.embedding_dim,
                        fully_connected_dim,
                        dropout_rate=dropout_rate,
                        layernorm_eps = layernorm_eps)
            for _ in range(self.num_layers)
        ]
        
        self.dropout = Dropout(dropout_rate)
        
        
    def call(self,x,enc_output,training,padding_mask,look_ahead_mask):
        seq_len = tf.shape(x)[1]
        attention_weights = {}
        
        x = self.embedding(x)
        
        x *= tf.math.sqrt(tf.cast(self.embedding_dim,tf.float32))
        
        x += self.positional_encoding[:,:seq_len,:]
        
        x = self.dropout(x,training=training)
        
        for i in range(self.num_layers):
            x , block1, block2 = self.dec_layers[i](x,enc_output,training,look_ahead_mask,padding_mask)
            
            attention_weights['decoder_layer{}_block1_self_att'.format(i+1)] = block1
            attention_weights['decoder_layer{}_block2_decenc_att'.format(i+1)] = block2
            
        return x , attention_weights

In [16]:
class Transformer(tf.keras.Model):
    """
    Complete transformer with an Encoder and a Decoder
    """
    def __init__(self, num_layers, embedding_dim, num_heads, fully_connected_dim, input_vocab_size, 
               target_vocab_size, max_positional_encoding_input,
               max_positional_encoding_target, dropout_rate=0.1, layernorm_eps=1e-6):
        super(Transformer, self).__init__()

        self.encoder = Encoder(num_layers=num_layers,
                               embedding_dim=embedding_dim,
                               num_heads=num_heads,
                               fully_connected_dim=fully_connected_dim,
                               input_vocab_size=input_vocab_size,
                               maximum_position_encoding=max_positional_encoding_input,
                               dropout_rate=dropout_rate,
                               layernorm_eps=layernorm_eps)

        self.decoder = Decoder(num_layers=num_layers, 
                               embedding_dim=embedding_dim,
                               num_heads=num_heads,
                               fully_connected_dim=fully_connected_dim,
                               target_vocab_size=target_vocab_size, 
                               maximum_position_encoding=max_positional_encoding_target,
                               dropout_rate=dropout_rate,
                               layernorm_eps=layernorm_eps)

        self.final_layer = Dense(target_vocab_size, activation='softmax')
    
    def call(self, input_sentence, output_sentence, training, enc_padding_mask, look_ahead_mask, dec_padding_mask):
        enc_output = self.encoder(input_sentence,training,enc_padding_mask)
        
        # call self.decoder with the appropriate arguments to get the decoder output
        # dec_output.shape == (batch_size, tar_seq_len, embedding_dim)
        dec_output, attention_weights = self.decoder(tar, enc_output, training, look_ahead_mask, dec_padding_mask)
        
        # pass decoder output through a linear layer and softmax (~2 lines)
        final_output = self.final_layer(dec_output) # (batch_size, tar_seq_len, target_vocab_size)
        # END CODE HERE

        return final_output, attention_weights