# Deep Learning Lab - 4. Transformers

This lab is for introducting Transformers using TensorFlow and Keras API.

**Transformers** are a type of deep learning model introduced in the paper ["Attention Is All You Need"](https://arxiv.org/abs/1706.03762) by Vaswani et al. in 2017. They revolutionized natural language processing (NLP) tasks by using self-attention mechanisms to capture dependencies between different parts of a sentence efficiently.

**Key Concepts:**
* **Self-Attention:** Helps the model focus on different parts of the input sequence while processing it.

* **Multi-Head Attention:** Multiple attention mechanisms run in parallel to focus on different parts of the sentence at once.

* **Positional Encoding:** Since transformers don’t have a built-in notion of word order (like RNNs), positional encodings are added to input embeddings to give the model information about the order of words.

**Components**

* **Encoder and Decoder:** The transformer consists of an encoder that processes input data and a decoder that generates output data. The encoder processes the input sentence, and the decoder uses the encoder’s representation to predict the output sentence.

* **Stacked Layers:** Multiple encoder and decoder layers are stacked on top of each other to build deeper models.

* **Feed-Forward Networks:** Each encoder and decoder layer has a fully connected feed-forward neural network after the multi-head attention.

<div style="text-align: center;">
<img src="https://machinelearningmastery.com/wp-content/uploads/2021/10/transformer_1.png" alt="Image description" width="300" height="200">
</div>

One of the key innovations of transformers is their ability to process entire sequences of data in parallel, instead of relying on a recurrent approach that processes data one step at a time. This is achieved through the use of self-attention mechanisms, which allow the model to selectively focus on different parts of the input sequence as it processes the data.

In this tutorial, we will provide a detailed overview of transformers, starting with an explanation of how they work and what makes them unique. We will then walk through a step-by-step guide on how to implement a transformer model in Python, using the TensorFlow deep learning library.

## Implementation

### Step 0: Install dependencies
In this lab, we use [TensorFlow](https://www.tensorflow.org/). TensorFlow is an open-source platform developed by Google for machine learning and deep learning. if you are using Kaggle or Google Colab environment, you can use TensorFlow without installing that but if you want to run your code locally you can install Tensorflow by using the following command with pip:

In [1]:
! pip install tensorflow==2.16.1

In [2]:
import tensorflow as tf
print(tf.version.VERSION)

2.16.1


In [3]:
import numpy as np

### Step 1: Build Multi Head Attension Sub Layer



<div style="text-align: center;">
<img src="https://raw.githubusercontent.com/amanchadha/coursera-deep-learning-specialization/0242d1ffe79086d97b0f210f9664c84ac564abd1/C5%20-%20Sequence%20Models/Week%204/Transformer%20Network/self-attention.png" alt="Image description" width="500" height="300">
</div>
The scaled dot product attention function computes the attention weight for a sequence of queries (q), keys (k), and values (v). The attention weight measures how much focus should be given to each element in the sequence of values based on the corresponding element in the sequence of queries and keys. The function first computes the dot product between the query and key vectors, then scales the attention logits by dividing them by the square root of the depth of the key vectors. It then applies an optional mask to the attention logits and applies a softmax function to obtain the attention weights. Finally, it computes the weighted sum of the value vectors using the attention weights. The function returns the output and attention weights Attention can be represented by the following equation:

$$\text { Attention }(Q, K, V)=\operatorname{softmax}\left(\frac{Q K^{T}}{\sqrt{d_{k}}}+{M}\right) V\$$

In [4]:
def scaled_dot_product_attention(Q, K, V, mask=None):
    
    matmul_QK = tf.matmul(Q,K,transpose_b=True)  # dot-product of shape (..., Tq, Tv)

    dk = K.shape[-1]
    scaled_attention_logits = matmul_QK/np.sqrt(dk) # scaled dot-product of shape (..., Tq, Tv)

    if mask is not None: 
        scaled_attention_logits += (1. - mask) *(-1e9)

    # Compute the Softmax
    attention_weights = tf.nn.softmax(scaled_attention_logits, axis=-1)  # weights of shape (..., Tq, Tv)

    #Multiply with V
    output = tf.matmul(attention_weights,V)  # Attention representation of shape (..., Tq, dv)
    
    return output, attention_weights

## MultiHeadAttention
The scaled dot product attention is a powerful mechanism that enables a neural network to selectively focus on relevant parts of a sequence when performing tasks such as language translation or image captioning. However, in many cases, there may be multiple aspects or features that an input element wants to attend to, and a single weighted average is not a sufficient way to capture this information. To address this, we can extend the attention mechanism to multiple heads.

<div style="text-align: center;">
<img src="https://www.researchgate.net/publication/380427185/figure/fig3/AS:11431281241719045@1715204009536/The-overall-structure-of-multi-head-attention.jpg" alt="Image description" width="300" height="200">
</div>

The idea behind multi-head attention is to perform several different attention operations in parallel, with each attention head focusing on a different aspect of the input sequence. In other words, instead of a single query-key-value triplet, we use multiple such triplets in parallel, each one focusing on a different aspect of the input sequence.

To achieve this, we first split the query, key, and value matrices into several submatrices, each of which represents a different aspect of the input sequence. Then, for each submatrix, we apply the scaled dot product attention mechanism independently. This results in several different attention outputs, each one representing a different aspect of the input sequence.

Finally, we concatenate the attention outputs and apply a linear transformation using a weight matrix to obtain a combined output. This combined output represents the final attention output that captures all the relevant aspects of the input sequence.

Mathematically, we can express the multi-head attention operation as a matrix operation involving the query, key, and value matrices, as well as a set of learnable weight matrices used to combine the attention outputs.

In [5]:
class Multihead_Attention(tf.keras.layers.Layer):
    def __init__(self, H, d_model, dk, dv):  
        super(Multihead_Attention, self).__init__()
        
        initializer = tf.keras.initializers.GlorotUniform()
        self.WQ = tf.Variable(initializer(shape=(H, d_model, dk)), trainable=True)
        self.WK = tf.Variable(initializer(shape=(H, d_model, dk)), trainable=True)
        self.WV = tf.Variable(initializer(shape=(H, d_model, dv)), trainable=True)
        self.WO = tf.Variable(initializer(shape=(H*dv,d_model)), trainable=True)

    
    def call(self, Q, K, V, mask=None):
        #Projecting Q,K,V to Qh, Kh, Vh. The H projection are stacked on the penultiem axis
        Qh= tf.experimental.numpy.dot(Q, self.WQ) #of shape (batch_size, Tq, H, dk)
        Kh= tf.experimental.numpy.dot(K, self.WK) #of shape (batch_size, Tv, H, dk)
        Vh= tf.experimental.numpy.dot(V, self.WV) #of shape (batch_size, Tv, H, dv)
        
        #Transposition
        Qh=tf.transpose(Qh, [0,2,1,3]) #of shape (batch_size, H, Tq, dk)
        Kh=tf.transpose(Kh, [0,2,1,3]) #of shape (batch_size, H, Tv, dk)
        Vh=tf.transpose(Vh, [0,2,1,3]) #of shape (batch_size, H, Tv, dv)
        
        # Computing the dot-product attention
        Ah,_=scaled_dot_product_attention(Qh, Kh, Vh, mask=mask) #of shape (batch_size, H, Tq, dv)
        
        #Flattening the H and dv axis and projecting back to d_model
#        A = tf.reshape(Ah,(*Ah.shape[:-2],-1))
        s=Ah.shape
        A = tf.reshape(Ah,(s[0],s[2],s[1]*s[3])) #of shape (batch_size, Tq, H*dv)
        A= tf.experimental.numpy.dot(A, self.WO) #of shape (batch_size, Tq, d_model)
        
        return A

### Step 2: Build Feed Forward Neural Network Sub Layer

In [6]:
class FNNLayer(tf.keras.layers.Layer):
    def __init__(self, d_model, dff):
        super(FNNLayer, self).__init__()

        self.layer1 = tf.keras.layers.Conv1D(filters=dff, kernel_size=1,activation="relu")
        self.layer2 = tf.keras.layers.Conv1D(filters=d_model, kernel_size=1)


    def call(self, x):
        x=self.layer1(x)
        fnn_layer_out=self.layer2(x)
 
        return fnn_layer_out

### Step 3: Implement Positional Encoding

When working with sequence to sequence tasks, the order of the data is crucial. While training RNNs, the input order is preserved automatically. However, when training Transformer networks, all data is input at once, and there's no inherent order information. To overcome this, positional encoding is used to specify the position of each input in the sequence. This encoding is achieved through sine and cosine formulas as follows:

$$
PE_{(pos, 2i)}= sin\left(\frac{pos}{{10000}^{\frac{2i}{d}}}\right)
\tag{1}$$

<br>
$$PE_{(pos, 2i+1)}= cos\left(\frac{pos}{{10000}^{\frac{2i}{d}}}\right)
\tag{2}$$

Here, $pos$ refers to the position of the input in the sequence, $i$ refers to the index of the dimension in the embedding vector, and $d$ refers to the dimensionality of the model.

In [7]:
def positional_encoding(positions, d):
    # initialize a matrix angle_rads of all the angles
    pos=np.arange(positions)[:, np.newaxis] #Column vector containing the position span [0,1,..., positions]
    k= np.arange(d)[np.newaxis, :]  #Row vector containing the dimension span [[0, 1, ..., d-1]]
    i = k//2
    angle_rads = pos/(10000**(2*i/d)) #Matrix of angles indexed by (pos,i)
    
    # apply sin to even indices in the array; 2i
    angle_rads[:, 0::2] = np.sin(angle_rads[:, 0::2])
  
    # apply cos to odd indices in the array; 2i+1
    angle_rads[:, 1::2] = np.cos(angle_rads[:, 1::2])

    #adds batch axis
    pos_encoding = angle_rads[np.newaxis, ...] 
    
    return tf.cast(pos_encoding, dtype=tf.float32)

### Step 4: Build Encoder


The Encoder consists of a stack of identical layers, where each layer has two sub-layers: a multi-head self-attention mechanism and a position-wise fully connected feedforward network.

<div style="text-align: center;">
<img src="https://encrypted-tbn0.gstatic.com/images?q=tbn:ANd9GcSyz2j58Hk1uFj_b_sQh6ecy3gP4bGMAgs1_q4zPivrcAkfp4kd8XEqTOQn0TQYdpjJ1vU&usqp=CAU" alt="Image description" width="200" height="200">
</div>


In the self-attention mechanism, the Encoder attends to all positions of the input sequence to compute a weighted sum of the values at each position, where the weights are determined by the similarity between the query and key vectors for each position. This allows the Encoder to capture dependencies between all positions of the input sequence in parallel, and to assign more weight to the positions that are most relevant to the current position.

The fully connected feedforward network consists of two linear transformations with a ReLU activation function in between, which is applied to each position in the sequence independently. This allows the Encoder to learn non-linear relationships between the hidden states at different positions of the sequence.

In addition to the sub-layers, each layer in the Encoder also has residual connections and layer normalization, which help to mitigate the vanishing gradient problem and improve training stability.

The residual connection allows the output of the sub-layer to be added to the original input sequence, which preserves information from the input sequence and helps to propagate gradients through the network. The layer normalization normalizes the output of the sub-layer with respect to the mean and variance of the hidden states, which helps to reduce the effects of covariate shift and improve convergence.

The output of the Encoder is a sequence of hidden states that contains information about the context of each input token. This sequence is then passed to the Transformer Decoder for further processing, where it is used to generate an output sequence.


In [8]:
class EncoderLayer(tf.keras.layers.Layer):

    def __init__(self, H, d_model, dk, dv, dff, dropout_rate=0.1, layernorm_eps=1e-6):
        super(EncoderLayer, self).__init__()
        
        self.mha = Multihead_Attention(H, d_model, dk, dv)
        self.ffn = FNNLayer(d_model, dff)
        self.layernorm1 = tf.keras.layers.LayerNormalization(epsilon=layernorm_eps)
        self.layernorm2 = tf.keras.layers.LayerNormalization(epsilon=layernorm_eps)
        self.dropout_mha = tf.keras.layers.Dropout(dropout_rate)
        self.dropout_ffn = tf.keras.layers.Dropout(dropout_rate)
    
    def call(self, x, training=False, mask=None):
        A = self.mha(x,x,x,mask=mask) # Self attention (batch_size, Tq, d_model)
        A = self.dropout_mha(A, training=training) #Apply Dropout during training
        
        
        #  Residual connection + Layer normalization
        out1 = self.layernorm1(x+A)  # (batch_size, Tq, d_model)

        # Pointwise ffn
        ffn_output = self.ffn(out1) # (batch_size, Tq, d_model)
        ffn_output = self.dropout_ffn(ffn_output, training=training) # Apply Dropout during training
        
        # Residual connection + Layer normalization
        encoder_layer_out = self.layernorm2(ffn_output+out1)  # (batch_size, input_seq_len, fully_connected_dim)
        
        return encoder_layer_out

In [9]:
class Encoder(tf.keras.layers.Layer):

    def __init__(self, N, H, d_model, dk, dv, dff, dropout_rate=0.1, layernorm_eps=1e-6):
        super(Encoder, self).__init__()
        
        self.layers=[EncoderLayer(H, d_model, dk, dv, dff, 
                                  dropout_rate=dropout_rate, 
                                  layernorm_eps=layernorm_eps)
                                  for i in range(N)]
    
    def call(self, x, training=False, mask=None):
        for layer in self.layers:
            x = layer(x, training=training, mask=mask)
                                  
        return x
                                  

### Step 5: Build Decoder

The decoder takes in the encoded input sequence along with the previous generated output sequence. The output sequence is first passed through an embedding layer, which maps each token to a high-dimensional vector space. The embedding output is then added with a positional encoding, which allows the model to encode the sequential order of the input/output sequence. The positional encoding is added to the embeddings through a simple addition operation.

<div style="text-align: center;">
<img src="https://encrypted-tbn1.gstatic.com/images?q=tbn:ANd9GcRuKZ3AKs-84k2hE7vxgQW8tFgk2w7fX2o5vyl0-HmGvdXjoAiH" alt="Image description" width="200" height="200">
</div>

Next, the decoder applies a multi-head self-attention mechanism similar to that of the encoder. However, the decoder also uses an additional masked self-attention mechanism, which prevents the decoder from attending to future tokens in the output sequence during training. This is because during training, the decoder is not yet aware of the future tokens and hence should not attend to them. During inference, the masked self-attention mechanism is not used as the model is generating the output sequence token by token, and hence it does not have access to the future tokens.

The decoder then applies a feedforward neural network to each position in the sequence. The output of the feedforward neural network is passed through a residual connection, followed by layer normalization. The residual connection allows the model to learn the difference between the input and output of the layer, while layer normalization helps to stabilize the training process.

The output of the decoder is then passed through a linear layer, which maps the high-dimensional vector space to the output vocabulary size. A softmax activation function is applied to the output to obtain the final probability distribution over the output vocabulary. The model then samples the token from this distribution and repeats the process until an end-of-sequence token is generated.

In [10]:
class DecoderLayer(tf.keras.layers.Layer):

    def __init__(self, H, d_model, dk, dv, dff, dropout_rate=0.1, layernorm_eps=1e-6):
        super(DecoderLayer, self).__init__()
        
        self.mha1 = Multihead_Attention(H, d_model, dk, dv)
        self.mha2 = Multihead_Attention(H, d_model, dk, dv)
        self.ffn = FNNLayer(d_model, dff)
        self.layernorm1 = tf.keras.layers.LayerNormalization(epsilon=layernorm_eps)
        self.layernorm2 = tf.keras.layers.LayerNormalization(epsilon=layernorm_eps)
        self.layernorm3 = tf.keras.layers.LayerNormalization(epsilon=layernorm_eps)
        self.dropout_mha1 = tf.keras.layers.Dropout(dropout_rate)
        self.dropout_mha2 = tf.keras.layers.Dropout(dropout_rate)                                     
        self.dropout_ffn = tf.keras.layers.Dropout(dropout_rate)
    
    def call(self, x, encoder_output, training=False, look_ahead_mask=None, padding_mask=None):
        # 1st Masked MultiHead attention                                     
        A1 = self.mha1(x,x,x,mask=look_ahead_mask) # Self attention (batch_size, Tq, d_model)
        A1 = self.dropout_mha1(A1, training=training) #Apply Dropout during training
        
        #  Residual connection + Layer normalization
        out1 = self.layernorm1(x+A1)  # (batch_size, Tq, d_model)

        # 2nd Masked MultiHead attention                                     
        A2 = self.mha2(x,encoder_output,encoder_output,mask=padding_mask) # Self attention (batch_size, Tq, d_model)
        A2 = self.dropout_mha2(A2, training=training) #Apply Dropout during training
        
        
        #  Residual connection + Layer normalization
        out2 = self.layernorm2(out1+A2)  # (batch_size, Tq, d_model)
                                             
        # Pointwise ffn
        ffn_output = self.ffn(out2) # (batch_size, Tq, d_model)
        ffn_output = self.dropout_ffn(ffn_output, training=training) # Apply Dropout during training
        
        # Residual connection + Layer normalization
        decoder_layer_out = self.layernorm3(ffn_output+out2)  # (batch_size, input_seq_len, fully_connected_dim)
        
        return decoder_layer_out

In [11]:
class Decoder(tf.keras.layers.Layer):

    def __init__(self, N, H, d_model, dk, dv, dff, dropout_rate=0.1, layernorm_eps=1e-6):
        super(Decoder, self).__init__()
        
        self.layers=[DecoderLayer(H, d_model, dk, dv, dff, 
                                  dropout_rate=dropout_rate, 
                                  layernorm_eps=layernorm_eps)
                                  for i in range(N)]
    
    def call(self, x, encoder_output, training=False, look_ahead_mask=None, padding_mask=None):
        for layer in self.layers:
            x = layer(x,encoder_output, look_ahead_mask=look_ahead_mask, padding_mask=padding_mask)
                                  
        return x

### Step 6: Define the Model

The Transformer is composed of two main components: the encoder and the decoder. The encoder takes an input sequence and produces a sequence of hidden representations, while the decoder takes this sequence of hidden representations and generates an output sequence. Both the encoder and decoder are composed of several layers of multi-headed self-attention and point-wise feed-forward neural networks.

<div style="text-align: center;">
<img src="https://machinelearningmastery.com/wp-content/uploads/2021/10/transformer_1.png" alt="Image description" width="300" height="200">
</div>


In [12]:
class Transformer(tf.keras.Model):
    
    def __init__(self, N, H, d_model, dk, dv, dff, 
                 vocab_size, max_positional_encoding, 
                 dropout_rate=0.1, layernorm_eps=1e-6):

        super(Transformer, self).__init__()
        
        initializer = tf.keras.initializers.GlorotUniform()
        self.embedding = tf.Variable(initializer(shape=(vocab_size, d_model)), trainable=True)
        self.PE = positional_encoding(max_positional_encoding, d_model)
        
        self.dropout_encoding_input = tf.keras.layers.Dropout(dropout_rate)
        self.dropout_decoding_input = tf.keras.layers.Dropout(dropout_rate)
        
        self.encoder = Encoder(N, H, d_model, dk, dv, dff, dropout_rate=dropout_rate, layernorm_eps=layernorm_eps)
        self.decoder = Decoder(N, H, d_model, dk, dv, dff, dropout_rate=dropout_rate, layernorm_eps=layernorm_eps)

        

    def call(self, x, y, training=False, enc_padding_mask=None, look_ahead_mask=None, dec_padding_mask=None):
        
        x = tf.matmul(x,self.embedding)
        x = x + self.PE
        x =  self.dropout_encoding_input(x,training=training)
        
        encoder_output = self.encoder(x,training=training, mask=enc_padding_mask)
        
        y = tf.matmul(y,self.embedding)
        y = y + self.PE
        y = self.dropout_decoding_input(y,training=training)
        
        dec_output = self.decoder(y, encoder_output, training=training, 
                                  look_ahead_mask=look_ahead_mask, padding_mask=dec_padding_mask)
        
        
        pred =  tf.matmul(self.embedding,dec_output,transpose_b=True)
        pred = tf.nn.softmax(pred)
        
        return pred

In [13]:
N, H, d_model, dk, dv, dff = 6, 8, 512, 64, 64, 2048
vocab_size, T =29, 11
batch_size = 3


transformer = Transformer(N, H, d_model, dk, dv, dff, 
                 vocab_size, T)

input_shape = (None, T,vocab_size)


x = tf.random.uniform((batch_size, T, vocab_size))
y =  tf.random.uniform((batch_size, T, vocab_size))

pred = transformer(x,y,training=True)
print(pred.shape)

transformer.summary()

(3, 29, 11)


In [14]:
optimizer = tf.keras.optimizers.Adam(learning_rate=0.1, epsilon=1e-09)

transformer.compile(loss='crossentropy',optimizer=optimizer,metrics=['accuracy'])