### Transformer Network

The research paper "Attention Is All You Need" published in 2017 introduced Transformer neural networks, which has brought about a revolutionary change in the field of NLP. The key innovation of the Transformer is its **`attention mechanism`**, which allows it to process input sequences in parallel and capture long-range dependencies more effectively.


**Link to Research Paper:** https://arxiv.org/abs/1706.03762



**Why Trasnformer?**

Key things that makes transformer better than RNN and LSTM:
- Parellel processing of the inputs which can make the training process lot faster
- Better contextual representation of the input because of multi-head self attentions


**When to use Transformer?**

Transformers are best suited for tasks where context and long-range dependencies are important. Their attention mechanisms allow them to capture these features more effectively.

Some of the common NLP tasks that are well-suited for transformers include:
- Machine Translation
- Text Summarization
- Question Answering
- Named Entity Recognition etc.,


# Exploring Transformer Architecture through English to French Machine Translation



## Table of Contents

- [Python Libraries](#0)
- [1 - Positional Encoding](#1)
- [2 - Masking](#2)
    - [2.1 - Padding Mask](#2-1)
    - [2.2 - Look-ahead Mask](#2-2)
- [3 - Encoder](#4)
    - [4.1 Encoder Layer](#4-1)
    - [4.2 - Full Encoder](#4-2)
- [4 - Decoder](#5)
    - [5.1 - Decoder Layer](#5-1)
    - [5.2 - Full Decoder](#5-2)
- [5 - Transformer](#6)
- [6 - References](#7)

<a name='0'></a>
## Python Libraries

Loading all the required python packages.

In [5]:
import numpy as np
import tensorflow as tf
from tensorflow.keras.layers import Embedding, MultiHeadAttention, Dense, Input, Dropout, LayerNormalization

<a name='1'></a>
## 1 - Positional Encoding

Positional encoding to the input sequence is a critical step in the Transformer architecture, as it enables the model to understand the sequential order of the tokens in the input sequence.

The formula for calculating the positional encoding is as follows:

$${PE}{(pos,k)} = sin(pos/10000^{2i/dmodel})$$

$${PE}{(pos,k+1)} = cos(pos/10000^{2i/dmodel})$$

- $pos$ is the position of the token in the sequence
- $k = 2i$ is the index of the the each dimension in positional encoding. So, $i= k//2$
- $dmodel$ is the dimension of the embedding vector.

if $k=[0,1,2,3,4,5]$ indices of postional embedding, then $i=[0,0,1,1,2,2]$

In [None]:
def get_angles(pos, k, d_model:int):
  """
  Arguments:
  pos -- (an array of shape (position, 1) representing the positions in the sequence
  k -- k (an array of shape (1, d_model) representing the indices of the embedding dimensions
  d_model -- integer representing the dimensionality of the model
  
  Returns:
  angles -- an array of shape (position, d_model) representing the angles for the positional encoding.
  """
  i = k//2
  angles = pos/np.power(10_000, 2*i/d_model)
  return angles

In [None]:
def positional_encoding(position:int, d_model:int ):
  """
  Arguments:
  position: an integer indicating the maximum sequence length
  d_model: an integer indicating the dimensionality of the model
  
  Returns:
  encoding -- a 3D tensor of shape (1, position, d_model) representing the positional encoding for a sequence of length position
  """
  angle_radians = get_angles(np.arange(position)[: , np.newaxis],
                             np.arange(d_model)[np.newaxis, :],
                             d_model)
  # apply sin to even indices in the array; 2i
  angle_radians[:, 0::2] = np.sin(angle_radians[:, 0::2])

  # apply cos to odd indices in the array; 2i+1
  angle_radians[:,1:2] = np.cos(angle_radians[:, 1:2])

  pos_encoding = angle_radians[np.newaxis, ...]

  return tf.cast(pos_encoding, dtype=tf.float32)

In [None]:
positional_encoding(4, 5)

<tf.Tensor: shape=(1, 4, 5), dtype=float32, numpy=
array([[[ 0.0000000e+00,  1.0000000e+00,  0.0000000e+00,  0.0000000e+00,
          0.0000000e+00],
        [ 8.4147096e-01,  5.4030228e-01,  2.5116222e-02,  2.5118865e-02,
          6.3095731e-04],
        [ 9.0929741e-01, -4.1614684e-01,  5.0216600e-02,  5.0237730e-02,
          1.2619144e-03],
        [ 1.4112000e-01, -9.8999250e-01,  7.5285293e-02,  7.5356595e-02,
          1.8928709e-03]]], dtype=float32)>

<a name='2'></a>
## 2 - Masking

There are two types of masks used when building Transformer network: *padding mask* and *look-ahead mask*. 

<a name='2-1'></a>
### 2.1 - Padding Mask

It is important to feed sequences of uniform length to the transformer. We can pad the sequences with zeros, and truncate sequences that exceed maximum length of the model.

In [None]:
def create_padding_mask(seq):
  """
    Creates a mask tensor representing the padding positions in the input sequence.
    
    Arguments:
    seq -- a tensor of shape (batch_size, seq_len)

    Returns:
    mask -- a tensor of shape (batch_size, 1, seq_len), where each position is 0 if the corresponding position in
    the input sequence is a padding position, and 1 otherwise.
  """
  mask = 1 - tf.cast(tf.math.equal(seq, 0),dtype=tf.float32)

  # reshaping mask so that it has an additional dimension, 
  # which will be needed when applying the mask in the self-attention mechanism of the Transformer model. 
  return mask[:, tf.newaxis, :]


In [None]:
x = tf.constant([[7., 6., 1., 0., 0.], 
                 [1., 2., 3., 0., 0.], 
                 [4., 5., 0., 0., 0.]])
print(create_padding_mask(x))

tf.Tensor(
[[[1. 1. 1. 0. 0.]]

 [[1. 1. 1. 0. 0.]]

 [[1. 1. 0. 0. 0.]]], shape=(3, 1, 5), dtype=float32)


In [None]:
print(tf.keras.activations.softmax(x)) # softmax without padding
print(tf.keras.activations.softmax(x + (1 - create_padding_mask(x)) * -1.0e9)) #softmax with padding

tf.Tensor(
[[7.2876644e-01 2.6809821e-01 1.8064314e-03 6.6454901e-04 6.6454901e-04]
 [8.4437378e-02 2.2952460e-01 6.2391251e-01 3.1062774e-02 3.1062774e-02]
 [2.6502505e-01 7.2041273e-01 4.8541026e-03 4.8541026e-03 4.8541026e-03]], shape=(3, 5), dtype=float32)
tf.Tensor(
[[[0.72973627 0.26845497 0.00180884 0.         0.        ]
  [0.09003057 0.24472848 0.66524094 0.         0.        ]
  [0.26762316 0.72747517 0.00490169 0.         0.        ]]

 [[0.72973627 0.26845497 0.00180884 0.         0.        ]
  [0.09003057 0.24472848 0.66524094 0.         0.        ]
  [0.26762316 0.72747517 0.00490169 0.         0.        ]]

 [[0.7310586  0.26894143 0.         0.         0.        ]
  [0.26894143 0.7310586  0.         0.         0.        ]
  [0.26894143 0.7310586  0.         0.         0.        ]]], shape=(3, 3, 5), dtype=float32)


<a name='2-2'></a>
### 2.2 - Look-ahead Mask


In [None]:
def create_look_ahead_mask(sequence_length):
  mask = tf.linalg.band_part(tf.ones((1, sequence_length, sequence_length)), -1, 0)
  return mask 

In [None]:
create_look_ahead_mask(4)

<tf.Tensor: shape=(1, 4, 4), dtype=float32, numpy=
array([[[1., 0., 0., 0.],
        [1., 1., 0., 0.],
        [1., 1., 1., 0.],
        [1., 1., 1., 1.]]], dtype=float32)>

<a name='4'></a>
## 3 - Encoder

Encoder contains - multi-head self attention layers and feed forward neural network that is independently applied to every position.

In [None]:
def FeedForward(embedding_dim, full_connected_dim):
  return tf.keras.Sequential([
      tf.keras.layers.Dense(full_connected_dim, activation='relu'),
      tf.keras.layers.Dense(embedding_dim)
  ])

In [None]:
class EncoderLayer(tf.keras.layers.Layer):
  def __init__(self, embedding_dim, num_heads, full_connected_dim,dropout_rate=0.1, layernorm_eps=1e-6 ):
    super().__init__()
    self.mha = MultiHeadAttention(num_heads, key_dim=embedding_dim, dropout=dropout_rate)
    self.ffnn = FeedForward(embedding_dim, full_connected_dim)
    self.layer_norm1 = LayerNormalization(epsilon = layernorm_eps )
    self.layer_norm2 = LayerNormalization(epsilon = layernorm_eps)
    self.drop_out = Dropout(dropout_rate)
  
  def __call__(self, x, training, mask):
    self_mha_output = self.mha(x,x,x,mask) # if query, key, value are same, then self-attenstion will be computed
    out1 = self.layer_norm1(x + self_mha_output)
    ffn_output = self.ffnn(out1)
    ffn_output = self.drop_out(ffn_output, training=training)
    encoder_layer_out = self.layer_norm2(out1 + ffn_output)
    return encoder_layer_out


In [None]:
class Encoder(tf.keras.layers.Layer):
  def __init__(self, num_encoders, embedding_dim, num_heads, fully_connected_dim, input_vocab_size, 
               max_pos_encoding, dropout_rate=0.1, layernorm_eps=1e-6):
    super().__init__()
    self.embedding_dim = embedding_dim
    self.num_layers = num_encoders
    self.embedding = Embedding(input_vocab_size, embedding_dim)
    self.pos_encoding = positional_encoding(max_pos_encoding, embedding_dim)
    self.enc_layers = [EncoderLayer(embedding_dim, num_heads, fully_connected_dim, dropout_rate, layernorm_eps) for _ in range(num_encoders)]
    self.dropout = Dropout(dropout_rate)
  
  def __call__(self, x, training, mask):
    seq_len = tf.shape(x)[1]
    x = self.embedding(x)
    # scaling: This is done to prevent the dot product operation in the self-attention mechanism from getting too large or too small
    x *= tf.math.sqrt(tf.cast(self.embedding_dim, tf.float32))
    x += self.pos_encoding[:, :seq_len, :]
    x = self.dropout(x, training = training)
    for i in range(self.num_layers):
      x = self.enc_layers[i](x, training, mask)

    return x # tensor of shape (batch_size, input_seq_len, embedding_dim)

<a name='5'></a>
## 4 - Decoder


In [None]:
class DecoderLayer(tf.keras.layers.Layer):
  def __init__(self, embedding_dim, num_heads, fully_connected_dim, dropout_rate=0.1, layernorm_eps=1e-6):
    super().__init__()
    self.masked_mha = MultiHeadAttention(num_heads, key_dim=embedding_dim, dropout=dropout_rate)
    self.mha = MultiHeadAttention(num_heads, key_dim=embedding_dim, dropout=dropout_rate)
    self.ffnn = FeedForward(embedding_dim, fully_connected_dim)
    self.layer_norm1 = LayerNormalization(epsilon=layernorm_eps)
    self.layer_norm2 = LayerNormalization(epsilon=layernorm_eps)
    self.layer_norm3 = LayerNormalization(epsilon=layernorm_eps)
    self.dropout = Dropout(dropout_rate)
  
  def __call__(self, x, enc_output, training, look_ahead_mask, padding_mask):
    mult_attn_out1, attn_weights_block1 = self.masked_mha(x, x, x, look_ahead_mask, return_attention_scores=True)
    Q1 = self.layer_norm1(mult_attn_out1 + x)
    mult_attn_out2, attn_weights_block2 = self.mha(Q1, enc_output, enc_output, padding_mask, return_attention_scores=True) 
    mult_attn_out2 = self.layer_norm2(mult_attn_out2 + Q1)
    ffn_output = self.ffnn(mult_attn_out2)
    ffn_output = self.dropout(ffn_output, training = training)
    out3 = self.layer_norm3(ffn_output + mult_attn_out2)
    return out3, attn_weights_block1, attn_weights_block2
    

In [None]:
class Decoder(tf.keras.layers.Layer):
  def __init__(self, num_decoders, embedding_dim, num_heads, fully_connected_dim, target_vocab_size, max_pos_encoding, dropout_rate=0.1, layernorm_eps=1e-6 ):
    super().__init__()
    self.embedding_dim = embedding_dim
    self.num_layers = num_decoders
    self.embedding = Embedding(target_vocab_size, embedding_dim)
    self.pos_encoding = positional_encoding(max_pos_encoding, embedding_dim)
    self.dec_layers = [DecoderLayer(embedding_dim, num_heads, fully_connected_dim) for _ in range(num_decoders)]
    self.dropout = Dropout(dropout_rate)

  def __call__(self,x, enc_output, training, look_ahead_mask, padding_mask):
    seq_len = tf.shape(x)[1]
    attention_weights = {}

    x = self.embedding(x)
    #scaling
    x *= tf.math.sqrt(tf.cast(self.embedding_dim, tf.float32))
    x += self.pos_encoding[:, :seq_len, :]
    x = self.dropout(x, training = training)
    for i in range(self.num_layers):
      x, block1, block2 = self.dec_layers[i](x, enc_output, training, look_ahead_mask, padding_mask)
      attention_weights['decoder_layer{}_block1_self_att'.format(i+1)] = block1
      attention_weights['decoder_layer{}_block2_decenc_att'.format(i+1)] = block2
    return x, attention_weights

<a name='6'></a> 
## 5 - Transformer


In [None]:
class Transformer(tf.keras.Model):
  def __init__(self, num_layers, embedding_dim, num_heads, fully_connected_dim, input_vocab_size, target_vocab_size, 
               max_pos_encoding_input, max_pos_encoding_target):
    super().__init__()
    self.encoder = Encoder(num_layers, embedding_dim, num_heads, fully_connected_dim, input_vocab_size, max_pos_encoding_input)
    self.decoder = Decoder(num_layers, embedding_dim, num_heads, fully_connected_dim, target_vocab_size, max_pos_encoding_target)
    self.final_layer = Dense(target_vocab_size, activation='softmax')
  
  def __call__(self, input_sentence, output_sentence, training, enc_padding_mask, look_ahead_mask, dec_padding_mask):
    enc_output = self.encoder(input_sentence, training, enc_padding_mask)
    dec_output, attention_weights = self.decoder(output_sentence, enc_output, training, look_ahead_mask, dec_padding_mask )
    final_output = self.final_layer(dec_output)
    return final_output, attention_weights

Machine Translation (English to French) using the above transformer

Loading the dataset

In [None]:
!!curl -O http://www.manythings.org/anki/fra-eng.zip
!!unzip fra-eng.zip

['Archive:  fra-eng.zip',
 'replace _about.txt? [y]es, [n]o, [A]ll, [N]one, [r]ename: y',
 '  inflating: _about.txt              ',
 'replace fra.txt? [y]es, [n]o, [A]ll, [N]one, [r]ename: y',
 '  inflating: fra.txt                 ']

In [None]:
data_path = "fra.txt"
batch_size= 10_000
input_texts = []
target_texts = []
input_vocab = set()
target_vocab = set()

with open(data_path, "r", encoding="utf-8") as f:
    lines = f.read().split("\n")
    
for line in lines[: min(batch_size, len(lines) - 1)]:
  line = line.lower()
  input_text, target_text, _ = line.split("\t")
  input_texts.append(input_text)
  target_texts.append(target_text)

  for word in input_text.lower().split():
    if word not in input_vocab:
      input_vocab.add(word)

  for word in target_text.split():
    if word not in target_vocab:
      target_vocab.add(word)


In [None]:
input_vocab_sroted = sorted(list(input_vocab))
target_vocab_sroted = sorted(list(target_vocab))

input_vocab_sroted.append('pad')
target_vocab_sroted.append('pad')

ENCODER_VOCAB_SIZE = len(input_vocab_sroted)
TARGET_VOCAB_SIZE = len(target_vocab_sroted)

# needed for padding
max_encoder_seq_length = max([len(sentence.split()) for sentence in input_texts])
max_decoder_seq_length = max([len(sentence.split()) for sentence in target_texts])

print("Number of samples:", len(input_texts))
print("Number of unique input tokens:", ENCODER_VOCAB_SIZE)
print("Number of unique target tokens:", TARGET_VOCAB_SIZE)
print("Max sequence length for inputs:", max_encoder_seq_length)
print("Max sequence length for outputs:", max_decoder_seq_length)

Number of samples: 10000
Number of unique input tokens: 2727
Number of unique target tokens: 5391
Max sequence length for inputs: 4
Max sequence length for outputs: 10


In [None]:
# adding index to each character in the sorted list
input_token_index = dict([(word, i) for i, word in enumerate(input_vocab_sroted)])
output_token_index = dict([(word, i) for i, word in enumerate(target_vocab_sroted)])

In [None]:
# intializing one hot encode vectors

# encoder_input_data is a 3D array of shape (num_senteces, max_english_sentence_length, num_english_characters) containing a one-hot vectorization of the English sentences.
encoder_input_data = np.zeros(shape=(len(input_texts), max_decoder_seq_length), dtype=np.float32)

# decoder_input_data is a 3D array of shape (num_sentences, max_french_sentence_length, num_french_characters) containg a one-hot vectorization of the French sentences.
decoder_target_data = np.zeros(shape=(len(target_texts), max_decoder_seq_length), dtype=np.float32)


OneHot Encoding

In [None]:
for i, (input_sent, target_sent) in enumerate(zip(input_texts, target_texts)):
  for t, word in enumerate(input_sent.split()):
    encoder_input_data[i, t] =  input_token_index[word]
  encoder_input_data[i, t+1:] = input_token_index['pad'] # padding with spaces
  for t, word in enumerate(target_sent.split()):
    decoder_target_data[i, t] = output_token_index[word]
  decoder_target_data[i, t+1:] = output_token_index['pad']  # padding with spaces

Training the Transformer

In [33]:
tf.random.set_seed(10)

num_layers = 6
embedding_dim = 4
num_heads = 4
fully_connected_dim = 8
input_vocab_size = ENCODER_VOCAB_SIZE
target_vocab_size = TARGET_VOCAB_SIZE
max_positional_encoding_input = 10
max_positional_encoding_target = 11

transformer = Transformer(num_layers, embedding_dim, num_heads, fully_connected_dim, 
                          input_vocab_size, target_vocab_size, max_positional_encoding_input, 
                          max_positional_encoding_target)

enc_padding_mask = create_padding_mask(encoder_input_data)
dec_padding_mask = create_padding_mask(decoder_target_data)

look_ahead_mask = create_look_ahead_mask(max_positional_encoding_input)

translation, weights = transformer( encoder_input_data, decoder_target_data, True, enc_padding_mask, look_ahead_mask, dec_padding_mask )

In [1]:
def loss_function(real, pred):
    mask = tf.math.logical_not(tf.math.equal(real, 0))
    loss_ = tf.keras.losses.sparse_categorical_crossentropy(real, pred, from_logits=True)
    mask = tf.cast(mask, dtype=loss_.dtype)
    loss_ *= mask
    return tf.reduce_mean(loss_)

def compute_accuracy(real, pred):
    mask = tf.math.logical_not(tf.math.equal(real, 0))
    accuracy_ = tf.keras.metrics.sparse_categorical_accuracy(real, pred)
    mask = tf.cast(mask, dtype=accuracy_.dtype)
    accuracy_ *= mask
    return tf.reduce_sum(accuracy_) / tf.reduce_sum(mask)


In [3]:
def train_step(model, input_sentence, output_sentence, target_sentence, enc_padding_mask, look_ahead_mask, dec_padding_mask, optimizer, loss_function):
    with tf.GradientTape() as tape:
        predictions, _ = model(input_sentence, output_sentence, True, enc_padding_mask, look_ahead_mask, dec_padding_mask)
        loss = loss_function(target_sentence, predictions)
    gradients = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(zip(gradients, model.trainable_variables))
    return loss


In [None]:
def fit(model, input_dataset, output_dataset, trainable, enc_padding_mask, look_ahead_mask, dec_padding_mask, optimizer, loss_function, epochs):
    train_losses = []
    train_accs = []
    for epoch in range(epochs):
        epoch_loss = 0
        epoch_acc = 0
        for batch, (input_sentence, output_sentence, target_sentence) in enumerate(train_dataset):
            enc_padding_mask, look_ahead_mask, dec_padding_mask = create_masks(input_sentence, output_sentence)
            loss = train_step(model, input_sentence, output_sentence, target_sentence, enc_padding_mask, combined_mask, dec_padding_mask, optimizer, loss_function)
            epoch_loss += loss
            epoch_acc += compute_accuracy(target_sentence, model(input_sentence, output_sentence, False, enc_padding_mask, combined_mask, dec_padding_mask)[0])
        train_losses.append(epoch_loss / (batch + 1))
        train_accs.append(epoch_acc / (batch + 1))
        print(f'Epoch {epoch + 1}, Train Loss: {train_losses[-1]:.4f}, Train Accuracy: {train_accs[-1]:.4f}')
    return train_losses, train_accs


In [4]:
optimizer = tf.keras.optimizers.Adam(learning_rate=0.001)
loss_function = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
metrics = [tf.keras.metrics.SparseCategoricalAccuracy()]

train_losses, train_accs = fit(transformer,encoder_input_data, decoder_target_data, True, 
                               enc_padding_mask, look_ahead_mask, dec_padding_mask, optimizer, loss_function, metrics )

NameError: ignored

In [34]:
# Reverse-lookup token index to decode sequences back to something readable.
reverse_input_word_index = dict((i, word) for word, i in input_token_index.items())
reverse_target_word_index = dict((i, word) for word, i in output_token_index.items())

In [37]:
for i, predicted_tf in enumerate(translation[:10]):
  print('-------------')
  french_pred = ' '.join([reverse_target_word_index.get(np.argmax(pred)) for pred in predicted_tf.numpy()])
  print(f'English:{input_texts[i]}')
  print(f'French:{target_texts[i]}')
  print(f'Predicted French:{french_pred}')

-------------
English:go.
French:va !
Predicted French:pourrais-tu moi. motivé. beignet. gavée. gavée. gavée. gavée. moi. trépasser
-------------
English:go.
French:marche.
Predicted French:pourrais-tu cessez normales. sucré. pourrais-tu pourrais-tu pourrais-tu gavée. moi. sucré.
-------------
English:go.
French:en route !
Predicted French:pourrais-tu mortel. compte. sucré. sucré. pourrais-tu pourrais-tu matériel motivé. pourrais-tu
-------------
English:go.
French:bouge !
Predicted French:pourrais-tu demande-lui. sucré. trépasser pourrais-tu synchronisés. gavée. préparé normales. renonçons.
-------------
English:hi.
French:salut !
Predicted French:pourrais-tu préparé normales. pourrais-tu pourrais-tu gavée. pourrais-tu pourrais-tu renonçons. sucré.
-------------
English:hi.
French:salut.
Predicted French:pourrais-tu matériel normales. compte. pourrais-tu pourrais-tu gavée. pourrais-tu normales. regarderai.
-------------
English:run!
French:cours !
Predicted French:pourrais-tu rappelle

<a name='7'></a> 
## 6 - References

Vaswani, A., Shazeer, N., Parmar, N., Uszkoreit, J., Jones, L., Gomez, A. N., Kaiser, Ł., & Polosukhin, I. (2017). Attention is all you need. arXiv preprint arXiv:1706.03762.


Francois Chollet. "A ten-minute introduction to sequence-to-sequence learning in Keras". Keras Blog, 14 September 2016, https://blog.keras.io/a-ten-minute-introduction-to-sequence-to-sequence-learning-in-keras.html.




Jalammar, J. (2018, August 24). The Illustrated Transformer. Retrieved from http://jalammar.github.io/illustrated-transformer/





# Comparing with LSTM and RNN

- The main idea behind Transformers is to replace recursive approach in RNNs by self-attnetion mechanism.

- Self-attention mechanism allows the model to encode the input sequence by including the context by attending to different parts of the input sequence (via Query, Key, and Value tensors)

- For example, if we want to translate the sentence "I am a student" to French, we would feed the input sequence one word at a time and update the hidden state at each time step. The final hidden state would be used to generate the output "je suis étudiant."

- In transformers, we would compute self-attention scores between all the words in the input sequence, and use these scores to compute a weighted sum of the input sequence to obtain a representation for each output element. This allows the model to consider all the words in the input sequence at once, and to capture long-range dependencies between them.


Advantages of Transformers:
- Parellel processing of the inputs; faster training
- Does not suffer from vanishing gradient or exploding gradient problem; makes it easy to train sequences