# Transformers

© Data Trainers LLC. GPL v 3.0.

Author: Axel Sirota


---



Inspired highly on the tutorial [NMT with Transformers](https://www.tensorflow.org/text/tutorials/transformer) which takes the code from the original Transformer model paper originally proposed in ["Attention is all you need"](https://arxiv.org/abs/1706.03762) by Vaswani et al. (2017).

## Prep

In [None]:
!pip install -U nltk 'gensim==4.2.0' 'keras-nlp' 'keras-preprocessing' 'tensorflow-text>=2.11'

In [None]:
import multiprocessing
import tensorflow as tf
import sys
import keras.backend as K
from keras.models import Sequential
from keras.layers import Dense, Embedding, Lambda, ELU, Conv1D, MaxPooling1D, Dropout
from keras.preprocessing import sequence
from keras.preprocessing.text import Tokenizer
from sklearn.model_selection import train_test_split
from keras import preprocessing
from textblob import TextBlob, Word
from keras_preprocessing.sequence import pad_sequences
from keras.initializers import Constant
from tensorflow.keras.layers.experimental import preprocessing
from tensorflow.keras import Model, Input
import tensorflow_text as tf_text
import matplotlib.ticker as ticker
import matplotlib.pyplot as plt
import numpy as np
import re
import random
import os
import pandas as pd
import gensim
import warnings
import nltk
import time

TRACE = False

def set_seeds_and_trace():
  os.environ['PYTHONHASHSEED'] = '0'
  np.random.seed(42)
  tf.random.set_seed(42)
  random.seed(42)
  if TRACE:
    tf.debugging.set_log_device_placement(True)

def set_session_with_gpus_and_cores():
  cores = multiprocessing.cpu_count()
  gpus = len(tf.config.list_physical_devices('GPU'))
  config = tf.compat.v1.ConfigProto( device_count = {'GPU': gpus  , 'CPU': cores} , intra_op_parallelism_threads=1, inter_op_parallelism_threads=1)
  sess = tf.compat.v1.Session(config=config)
  tf.compat.v1.keras.backend.set_session(sess)

set_seeds_and_trace()
set_session_with_gpus_and_cores()
warnings.filterwarnings('ignore')
nltk.download('punkt')


## The Transformer Layers

In this demo we will create, from scratch, with the same tools the original Authors had, the Transformer architecture. Why? To understand how it works, why it works, and exactly what is novel!

<table>
<tr>
  <th colspan=1>The original Transformer diagram</th>
  <th colspan=1>A representation of a 4-layer Transformer</th>
</tr>
<tr>
  <td>
   <img width=400 src="https://www.tensorflow.org/images/tutorials/transformer/transformer.png"/>
  </td>
  <td>
   <img width=307 src="https://www.tensorflow.org/images/tutorials/transformer/Transformer-4layer-compact.png"/>
  </td>
</tr>
</table>

Each of the components in these two diagrams will be explained as you progress through the demo.


### What did we have before?

Before, we used Cross Attention or self attention, remember? And for sequence data we basically used it like this:

<table>
<tr>
  <th colspan=1>Seq2Seq with attention</th>
<tr>
<tr>
  <td>
   <img src="https://www.dropbox.com/s/r6u7ll5nlt96t9f/seq2seq.png?raw=1"/>
  </td>
</tr>
</table>



Where we input attention with the hidden state to create another updated hidden state we could input into the next cell. And this worked well on medium sized sentences, but was hard to train and unstable. Now that we know this, the Transformer basicaly tried to get rid of the RNN by using **only** attention

### The embedding and positional encoding layer

The inputs to both the encoder and decoder use the same embedding and positional encoding logic.

<table>
<tr>
  <th colspan=1>The embedding and positional encoding layer</th>
<tr>
<tr>
  <td>
   <img src="https://www.tensorflow.org/images/tutorials/transformer/PositionalEmbedding.png"/>
  </td>
</tr>
</table>

In [None]:
## This comes straight from the paper

def positional_encoding(length, depth):
  depth = depth/2

  positions = np.arange(length)[:, np.newaxis]     # (seq, 1)
  depths = np.arange(depth)[np.newaxis, :]/depth   # (1, depth)

  angle_rates = 1 / (10000**depths)         # (1, depth)
  angle_rads = positions * angle_rates      # (pos, depth)

  pos_encoding = np.concatenate(
      [np.sin(angle_rads), np.cos(angle_rads)],
      axis=-1)

  return tf.cast(pos_encoding, dtype=tf.float32)

In [None]:
class PositionalEmbedding(tf.keras.layers.Layer):
  def __init__(self, vocab_size, d_model):
    super().__init__()
    self.d_model = d_model
    self.embedding = None. # Use the embedding Layer
    self.pos_encoding = positional_encoding(length=2048, depth=d_model)

  def compute_mask(self, *args, **kwargs):
    return self.embedding.compute_mask(*args, **kwargs)

  def call(self, x):
    length = None  # Get length
    x = None  # Apply embedding
    # This factor sets the relative scale of the embedding and positonal_encoding.
    x *= tf.math.sqrt(tf.cast(self.d_model, tf.float32))
    x = x + self.pos_encoding[tf.newaxis, :length, :]
    return x

In [None]:
pos = PositionalEmbedding(5000, 100)

In [None]:
input = None # Set a random input to test this
response = pos(input)
response.shape

In [None]:
response._keras_mask

Do the shapes look ok?

### Add and normalize

<table>
<tr>
  <th colspan=2>Add and normalize</th>
<tr>
<tr>
  <td>
   <img src="https://www.tensorflow.org/images/tutorials/transformer/Add+Norm.png"/>
  </td>
</tr>
</table>

Note: Use `Add` layer instead of + to propagate masks

We will create a BaseAttention layer that inherits the Add+Norm and then each subclass of attention will implement the correct one

In [None]:
class BaseAttention(tf.keras.layers.Layer):
  def __init__(self, **kwargs):
    super().__init__()
    self.mha = None  # Set the multi headed attention taking any kwarg
    self.layernorm = tf.keras.layers.LayerNormalization()
    self.add = tf.keras.layers.Add()

### Self Attention layer

<table>
<tr>
  <th colspan=1>The global self attention layer</th>
<tr>
<tr>
  <td>
   <img src="https://www.tensorflow.org/images/tutorials/transformer/SelfAttention.png"/>
  </td>
</tr>
</table>

In [None]:
class GlobalSelfAttention(BaseAttention):
  def call(self, x):
    # We need to compare everything with everything, therefore Q, K and V must be the input
    attn_output = None # Call self.mha with the correct Q, K and V!
    x = None  # Add the skip connection
    x = None  # Apply layer normalization
    return x

Let's test it!

In [None]:
embedding_dim = 100
vocab_size = 5000
input = None

# First we apply the PositionalEmbedding to embed into what the attention layer expects
pos = PositionalEmbedding(vocab_size, embedding_dim)

# Then we do the self attention, the n_heads is arbitrary
gsa = GlobalSelfAttention(num_heads=3, key_dim=embedding_dim)

# Test it
response = None
response.shape

Notice the shape is the same, since MHA concats all 3 heads and the we add everything

### The cross attention layer

This layer connects the encoder and decoder. This layer is the most straight-forward use of attention in the model, it performs the same task as the attention block in the previous demo (and we will copy it).

<table>
<tr>
  <th colspan=1>The cross attention layer</th>
<tr>
<tr>
  <td>
   <img src="https://www.tensorflow.org/images/tutorials/transformer/CrossAttention.png"/>
  </td>
</tr>
</table>

In [None]:
class CrossAttention(BaseAttention):
  def call(self, x, context):
    attn_output, attn_scores = None # Call self.mha with the correct Q, K and V! We want to return the att scores

    # Cache the attention scores for plotting later.
    self.last_attn_scores = attn_scores

    x = None  # Add the skip connection
    x = None  # Apply layer normalization

    return x

In [None]:
embedding_dim_es = 100
vocab_size_es = 5000

embedding_dim_en = 512
vocab_size_en = 6000

# We are supposing the model will translate Spanish to English, so context for CrossAttention will be the spanish input.

input_es = None
input_en = None


pos_es = PositionalEmbedding(vocab_size_es, embedding_dim_es)
pos_en = PositionalEmbedding(vocab_size_en, embedding_dim_en)


gsa = GlobalSelfAttention(num_heads=3, key_dim=embedding_dim_es)
cross = CrossAttention(num_heads=3, key_dim=embedding_dim_en)


context = None # Forget about the feed forwards to test

response = None # Forget about masked attention for now, assume it is the identity

response.shape

Notice the shape is (batch_size, words in sentence in output, embedding_dim) , regardless the input sentence had more words or other embedding dim. We are doing a good move forward!

### The causal self attention layer (Masked Multi Headed Attention)

<table>
<tr>
  <th colspan=1>The causal self attention layer</th>
<tr>
<tr>
  <td>
   <img src="https://www.tensorflow.org/images/tutorials/transformer/CausalSelfAttention.png"/>
  </td>
</tr>
</table>



The only big difference in the masked multi headedd attention is that we cannot attend to words in the future, so we will use a mask such that the `Nth` word can only see the first `N-1` words and not all the sentence.

In [None]:
class CausalSelfAttention(BaseAttention):
  def call(self, x):
    attn_output = self.mha(
        query=x,
        value=x,
        key=x,
        use_causal_mask = True)  # This is the key!
    x = None  # Add the skip connection
    x = None  # Apply layer normalization
    return x

<table>
<tr>
  <th colspan=1>The causal self attention layer</th>
<tr>
<tr>
  <td>
   <img width=330 src="https://www.tensorflow.org/images/tutorials/transformer/CausalSelfAttention-new-full.png"/>
  </td>
</tr>
</table>

Notice in the diagram above how the query can onlly attend the values for the past

In [None]:
embedding_dim_en = 512
vocab_size_en = 6000

# We are supposing the model will translate Spanish to English, so context for CrossAttention will be the spanish input.

input_en = None


pos_en = PositionalEmbedding(vocab_size_en, embedding_dim_en)

csa = CausalSelfAttention(num_heads =3, key_dim=embedding_dim_en)

response = None

response.shape

### The feed forward network

The transformer also includes this point-wise feed-forward network in both the encoder and decoder:

<table>
<tr>
  <th colspan=1>The feed forward network</th>
<tr>
<tr>
  <td>
   <img src="https://www.tensorflow.org/images/tutorials/transformer/FeedForward.png"/>
  </td>
</tr>
</table>

In [None]:
class FeedForward(tf.keras.layers.Layer):
  def __init__(self, d_model, dff, dropout_rate=0.1):
    super().__init__()
    self.seq = tf.keras.Sequential([
      tf.keras.layers.Dense(dff, activation='relu'),
      tf.keras.layers.Dense(d_model),
      tf.keras.layers.Dropout(dropout_rate)
    ])
    self.add = tf.keras.layers.Add()
    self.layer_norm = tf.keras.layers.LayerNormalization()

  def call(self, x):
    x = None  # Add the skip connection from the seq
    x = None  # Apply layer normalization
    return x


### The encoder layer

The encoder contains a stack of `N` encoder layers. Where each `EncoderLayer` contains a `GlobalSelfAttention` and `FeedForward` layer:

<table>
<tr>
  <th colspan=1>The encoder layer</th>
<tr>
<tr>
  <td>
   <img src="https://www.tensorflow.org/images/tutorials/transformer/EncoderLayer.png"/>
  </td>
</tr>
</table>

In [None]:
class EncoderLayer(tf.keras.layers.Layer):
  def __init__(self,*, d_model, num_heads, dff, dropout_rate=0.1):
    super().__init__()

    self.self_attention = None

    self.ffn = None

  def call(self, x):
    x = self.self_attention(x)
    x = self.ffn(x)
    return x

In [None]:
embedding_dim = 100
vocab_size = 5000
input = tf.constant(np.random.randint(1,vocab_size, size=(3,26)))
pos = PositionalEmbedding(vocab_size, embedding_dim)
sample_encoder_layer = EncoderLayer(d_model=embedding_dim, num_heads=3, dff=1012)
response = sample_encoder_layer(pos(input))
response.shape

### The encoder

Notice we need to be able to repeat the past EncoderLayer Nx times, so we need another Layer that is able to do exactly that

<table>
<tr>
  <th colspan=1>The encoder</th>
<tr>
<tr>
  <td>
   <img src="https://www.tensorflow.org/images/tutorials/transformer/Encoder.png"/>
  </td>
</tr>
</table>

In [None]:
class Encoder(tf.keras.layers.Layer):
  def __init__(self, *, num_layers, d_model, num_heads,
               dff, vocab_size, dropout_rate=0.1):
    super().__init__()

    self.d_model = d_model
    self.num_layers = num_layers

    self.pos_embedding = PositionalEmbedding(
        vocab_size=vocab_size, d_model=d_model)

    self.enc_layers = [
        EncoderLayer(d_model=d_model,
                     num_heads=num_heads,
                     dff=dff,
                     dropout_rate=dropout_rate)
        for _ in range(num_layers)]
    self.dropout = tf.keras.layers.Dropout(dropout_rate)

  def call(self, x):
    # `x` is token-IDs shape: (batch, seq_len)
    x = self.pos_embedding(x)  # Shape `(batch_size, seq_len, d_model)`.

    # Add dropout.
    x = self.dropout(x)

    for i in range(self.num_layers):
      x = self.enc_layers[i](x)

    return x  # Shape `(batch_size, seq_len, d_model)`.

In [None]:
embedding_dim = 100
vocab_size = 5000
input = tf.constant(np.random.randint(1,vocab_size, size=(3,26)))
sample_encoder = Encoder(num_layers=4,
                         d_model=embedding_dim,
                         num_heads=3,
                         dff=512,
                         vocab_size=vocab_size)
response = sample_encoder(input)
response.shape

We got our Encoder!! Yahoo!!

### The decoder layer

Same as before we need a Decoder layer that uses the Attention layers and then another layer to permit having Nx layers of decoding

<table>
<tr>
  <th colspan=1>The decoder layer</th>
<tr>
<tr>
  <td>
   <img src="https://www.tensorflow.org/images/tutorials/transformer/DecoderLayer.png"/>
  </td>
</tr>
</table>

In [None]:
class DecoderLayer(tf.keras.layers.Layer):
  def __init__(self,
               *,
               d_model,
               num_heads,
               dff,
               dropout_rate=0.1):
    super().__init__()

    self.causal_self_attention = None

    self.cross_attention = None

    self.ffn = None

  def call(self, x, context):
    x = self.causal_self_attention(x=x)
    x = self.cross_attention(x=x, context=context)

    # Cache the last attention scores for plotting later
    self.last_attn_scores = self.cross_attention.last_attn_scores

    x = self.ffn(x)  # Shape `(batch_size, seq_len, d_model)`.
    return x

In [None]:
embedding_dim_es = 100
vocab_size_es = 5000

embedding_dim_en = 512
vocab_size_en = 6000

# We are supposing the model will translate Spanish to English, so context for CrossAttention will be the spanish input.

input_es = tf.constant(np.random.randint(1,vocab_size_es, size=(3,26)))
input_en = tf.constant(np.random.randint(1,vocab_size_es, size=(3,24)))

pos_en = PositionalEmbedding(vocab_size_en, embedding_dim_en)


encoder =  Encoder(num_layers=2, d_model=embedding_dim_es, num_heads=3, dff=512, vocab_size=vocab_size_es)

context = encoder(input_es)

decoder_layer = DecoderLayer(d_model=embedding_dim_en, num_heads=3, dff=218, dropout_rate=0.2)

response = decoder_layer(pos_en(input_en), context=context)

response.shape

### The Decoder

Similar to the `Encoder`, the `Decoder` consists of a `PositionalEmbedding`, and a stack of `DecoderLayer`s:

<table>
<tr>
  <th colspan=1>The embedding and positional encoding layer</th>
<tr>
<tr>
  <td>
   <img src="https://www.tensorflow.org/images/tutorials/transformer/Decoder.png"/>
  </td>
</tr>
</table>

In [None]:
class Decoder(tf.keras.layers.Layer):
  def __init__(self, *, num_layers, d_model, num_heads, dff, vocab_size,
               dropout_rate=0.1):
    super(Decoder, self).__init__()

    self.d_model = d_model
    self.num_layers = num_layers

    self.pos_embedding = PositionalEmbedding(vocab_size=vocab_size,
                                             d_model=d_model)
    self.dropout = tf.keras.layers.Dropout(dropout_rate)
    self.dec_layers = [
        DecoderLayer(d_model=d_model, num_heads=num_heads,
                     dff=dff, dropout_rate=dropout_rate)
        for _ in range(num_layers)]

    self.last_attn_scores = None

  def call(self, x, context):
    # `x` is token-IDs shape (batch, target_seq_len)
    x = self.pos_embedding(x)  # (batch_size, target_seq_len, d_model)

    x = self.dropout(x)

    for i in range(self.num_layers):
      x  = self.dec_layers[i](x, context)

    self.last_attn_scores = self.dec_layers[-1].last_attn_scores

    # The shape of x is (batch_size, target_seq_len, d_model).
    return x

In [None]:
embedding_dim_es = 100
vocab_size_es = 5000

embedding_dim_en = 512
vocab_size_en = 6000

# We are supposing the model will translate Spanish to English, so context for CrossAttention will be the spanish input.

input_es = tf.constant(np.random.randint(1,vocab_size_es, size=(3,26)))
input_en = tf.constant(np.random.randint(1,vocab_size_es, size=(3,24)))

encoder =  Encoder(num_layers=2, d_model=embedding_dim_es, num_heads=3, dff=512, vocab_size=vocab_size_es)

context = encoder(input_es)

decoder = Decoder(num_layers=3, d_model=embedding_dim_en, num_heads=5, dff=124, vocab_size=vocab_size_en)

response = decoder(input_en, context=context)

response.shape

## The Transformer Model

You now have `Encoder` and `Decoder`. To complete the `Transformer` model, you need to put them together and add a final linear (`Dense`) layer which converts the resulting vector at each location into output token probabilities.

The output of the decoder is the input to this final linear layer.

<table>
<tr>
  <th colspan=1>The transformer</th>
<tr>
<tr>
  <td>
   <img src="https://www.tensorflow.org/images/tutorials/transformer/transformer.png"/>
  </td>
</tr>
</table>

In [None]:
class Transformer(tf.keras.Model):
  def __init__(self, *, num_layers, d_model, num_heads, dff,
               input_vocab_size, target_vocab_size, dropout_rate=0.1):
    super().__init__()
    self.encoder = None

    self.decoder = None

    self.final_layer = None

  def call(self, inputs):
    # To use a Keras model with `.fit` you must pass all your inputs in the
    # first argument.
    context, x  = inputs

    context = self.encoder(context)  # (batch_size, context_len, d_model)

    x = self.decoder(x, context)  # (batch_size, target_len, d_model)

    # Final linear layer output.
    logits = self.final_layer(x)  # (batch_size, target_len, target_vocab_size)

    # Return the final output and the attention weights.
    return logits

In [None]:
embedding_dim = 100
vocab_size_es = 5000
vocab_size_en = 6000

num_layers = 4
dff = 512
num_heads = 8
dropout_rate = 0.1

# We are supposing the model will translate Spanish to English, so context for CrossAttention will be the spanish input.

input_es = tf.constant(np.random.randint(1,vocab_size_es, size=(3,26)))
input_en = tf.constant(np.random.randint(1,vocab_size_es, size=(3,24)))

transformer = Transformer(
    num_layers=num_layers,
    d_model=embedding_dim,
    num_heads=num_heads,
    dff=dff,
    input_vocab_size=vocab_size_es,
    target_vocab_size=vocab_size_en,
    dropout_rate=dropout_rate)

response = transformer((input_es, input_en))
response.shape

In [None]:
transformer.summary()


## Homework: Try to make this transformer a translator and fit it in the spa.txt dataset from the NMT with Attention Lab!