# Transformers

Selections from the [Chapter 16](https://github.com/ageron/handson-ml3/blob/main/16_nlp_with_rnns_and_attention.ipynb) notebook from the Scikit-learn book. Much like the textbook author, I gave up trying to make it work with Keras 3.

In [None]:
# Connect google drive for persistence
from google.colab import drive
from pathlib import Path

drive.mount("/content/drive")
model_root = Path("/content/drive/MyDrive/SavedModels/")

Mounted at /content/drive


In [None]:
import sys
IS_COLAB = "google.colab" in sys.modules
if IS_COLAB:
    import os
    os.environ["TF_USE_LEGACY_KERAS"] = "1"
    import tf_keras

from packaging import version
import tensorflow as tf

assert version.parse(tf.__version__) >= version.parse("2.8.0")

In [None]:
if not tf.config.list_physical_devices('GPU'):
    print("No GPU was detected. Neural nets can be very slow without a GPU.")
    if "google.colab" in sys.modules:
        print("Go to Runtime > Change runtime and select a GPU hardware "
              "accelerator.")
    if "kaggle_secrets" in sys.modules:
        print("Go to Settings > Accelerator and select GPU.")


We'll continue on with the English-Spanish translation task, so let's re-download and prepare the data.

In [None]:
from pathlib import Path
# Even loading the data has to change for keras 2/3
url = "https://storage.googleapis.com/download.tensorflow.org/data/spa-eng.zip"
path = tf.keras.utils.get_file("spa-eng.zip", origin=url, cache_dir="datasets",
                               extract=True)
text = (Path(path).with_name("spa-eng") / "spa.txt").read_text()

Downloading data from https://storage.googleapis.com/download.tensorflow.org/data/spa-eng.zip


In [None]:
import numpy as np

text = text.replace("¡", "").replace("¿", "")
pairs = [line.split("\t") for line in text.splitlines()]
np.random.seed(42)  # extra code – ensures reproducibility on CPU
np.random.shuffle(pairs)
sentences_en, sentences_es = zip(*pairs)  # separates the pairs into 2 lists

In [None]:
vocab_size = 1000
max_length = 50
batch_size = 32

text_vec_layer_en = tf.keras.layers.TextVectorization(
    max_tokens=vocab_size,
    output_mode = "int",
    output_sequence_length=max_length,
    pad_to_max_tokens=True,
)
text_vec_layer_es = tf.keras.layers.TextVectorization(
    max_tokens=vocab_size,
    output_mode = "int",
    output_sequence_length=max_length,
    pad_to_max_tokens=True,
)
text_vec_layer_en.adapt(sentences_en)
text_vec_layer_es.adapt([f"startofseq {s} endofseq" for s in sentences_es])

In [None]:
X_train = tf.constant(sentences_en[:100_000])
X_valid = tf.constant(sentences_en[100_000:])
X_train_dec = tf.constant([f"startofseq {s}" for s in sentences_es[:100_000]])
X_valid_dec = tf.constant([f"startofseq {s}" for s in sentences_es[100_000:]])
Y_train = text_vec_layer_es([f"{s} endofseq" for s in sentences_es[:100_000]])
Y_valid = text_vec_layer_es([f"{s} endofseq" for s in sentences_es[100_000:]])

## RNN with Attention
We'll define a bidirectional LSTM model, but this time add an `Attention` layer.

The following cell is the same as in the vanilla LSTM examples.

In [None]:
tf.random.set_seed(42)  # extra code – ensures reproducibility on CPU
encoder = tf.keras.layers.Bidirectional(
    tf.keras.layers.LSTM(256, return_sequences=True, return_state=True))

encoder_inputs = tf.keras.layers.Input(shape=(1,), dtype=tf.string)
decoder_inputs = tf.keras.layers.Input(shape=(1,), dtype=tf.string)

# fairly arbitrary size for the word embeddings
# You could probably sub in pre-trained embeddings here
embed_size = 128
encoder_input_ids = text_vec_layer_en(encoder_inputs)
decoder_input_ids = text_vec_layer_es(decoder_inputs)
encoder_embedding_layer = tf.keras.layers.Embedding(vocab_size, embed_size,
                                                    mask_zero=True)
decoder_embedding_layer = tf.keras.layers.Embedding(vocab_size, embed_size,
                                                    mask_zero=True)
encoder_embeddings = encoder_embedding_layer(encoder_input_ids)
decoder_embeddings = decoder_embedding_layer(decoder_input_ids)

encoder_outputs, *encoder_state = encoder(encoder_embeddings)
encoder_state = [tf.keras.layers.Concatenate(axis=-1)([encoder_state[0], encoder_state[2]]),  # short-term
                 tf.keras.layers.Concatenate(axis=-1)([encoder_state[1], encoder_state[3]])]  # long-term

decoder = tf.keras.layers.LSTM(512, return_sequences=True)
decoder_outputs = decoder(decoder_embeddings, initial_state=encoder_state)

Instead of just connecting the encoder and decoder directly, we'll put an `Attention` layer in the middle.

In [None]:
attention_layer = tf.keras.layers.Attention()
# query = decoder_outputs, value = encoder_outputs
attention_outputs = attention_layer([decoder_outputs, encoder_outputs])
output_layer = tf.keras.layers.Dense(vocab_size, activation="softmax")
Y_proba = output_layer(attention_outputs)

In [None]:
# trying to figure out shape mismatch error, no luck
for tensor in [decoder_outputs, encoder_outputs, attention_outputs]:
    print(tensor.shape)

(None, 50, 512)
(None, 50, 512)
(None, 50, 512)


**Warning**: the following cell will take a while to run (possibly a couple hours if you are not using a GPU).

In [None]:
attn_path = model_root / "attention_model.keras"
if attn_path.exists():
    attention_model = tf.keras.models.load_model(attn_path)
else:
    attention_model = tf.keras.Model(inputs=[encoder_inputs, decoder_inputs], outputs=[Y_proba])
    attention_model.compile(loss="sparse_categorical_crossentropy", optimizer="nadam",
                metrics=["accuracy"])
    attention_model.fit((X_train, X_train_dec), Y_train, epochs=10,
            validation_data=((X_valid, X_valid_dec), Y_valid))
    attention_model.save(attn_path)

attention_model.summary()

Model: "model"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 input_1 (InputLayer)        [(None, 1)]                  0         []                            
                                                                                                  
 text_vectorization (TextVe  (None, 50)                   0         ['input_1[0][0]']             
 ctorization)                                                                                     
                                                                                                  
 input_2 (InputLayer)        [(None, 1)]                  0         []                            
                                                                                                  
 embedding (Embedding)       (None, 50, 128)              128000    ['text_vectorization[0][0]

In [None]:
# encode/decode one word at a time until we predict endofseq
def translate(model, sentence_en):
    translation = ""
    for word_idx in range(max_length):
        X = tf.constant([sentence_en])  # encoder input
        X_dec = tf.constant(["startofseq " + translation])  # decoder input
        y_proba = model.predict((X, X_dec), verbose=False)[0, word_idx]  # last token's probas
        predicted_word_id = np.argmax(y_proba)
        predicted_word = text_vec_layer_es.get_vocabulary()[predicted_word_id]
        if predicted_word == "endofseq":
            break
        translation += " " + predicted_word
    return translation.strip()

In [None]:
translate(attention_model, "Is class time over yet?")

'ya hace clases hora'

In [None]:
translate(attention_model, "I love to go to the beach do you know where I can find it?")

'me encanta ir a la playa hace [UNK] dónde puedo encontrar'

In [None]:
attention_model.evaluate((X_valid, X_valid_dec), Y_valid)



[1.318494439125061, 0.7055259346961975]

## Attention Is All You Need: The Transformer Architecture
### Positional encodings

In [None]:
max_length = 50  # max length in the whole training set
embed_size = 128
tf.random.set_seed(42)  # extra code – ensures reproducibility on CPU
pos_embed_layer = tf.keras.layers.Embedding(max_length, embed_size)
batch_max_len_enc = encoder_embeddings.shape[1]
encoder_in = encoder_embeddings + pos_embed_layer(tf.range(batch_max_len_enc))
batch_max_len_dec = decoder_embeddings.shape[1]
decoder_in = decoder_embeddings + pos_embed_layer(tf.range(batch_max_len_dec))

Alternatively, we can use fixed, non-trainable positional encodings:

In [None]:
class PositionalEncoding(tf.keras.layers.Layer):
    def __init__(self, max_length, embed_size, dtype=tf.float32, **kwargs):
        super().__init__(dtype=dtype, **kwargs)
        assert embed_size % 2 == 0, "embed_size must be even"
        p, i = np.meshgrid(np.arange(max_length),
                           2 * np.arange(embed_size // 2))
        pos_emb = np.empty((1, max_length, embed_size))
        pos_emb[0, :, ::2] = np.sin(p / 10_000 ** (i / embed_size)).T
        pos_emb[0, :, 1::2] = np.cos(p / 10_000 ** (i / embed_size)).T
        self.pos_encodings = tf.constant(pos_emb.astype(self.dtype))
        self.supports_masking = True

    def call(self, inputs):
        batch_max_length = tf.shape(inputs)[1]
        return inputs + self.pos_encodings[:, :batch_max_length]

In [None]:
pos_embed_layer = PositionalEncoding(max_length, embed_size)
encoder_in = pos_embed_layer(encoder_embeddings)
decoder_in = pos_embed_layer(decoder_embeddings)

### Multi-Head Attention

In [None]:
N = 2  # instead of 6
num_heads = 8
dropout_rate = 0.1
n_units = 128  # for the first Dense layer in each Feed Forward block
encoder_pad_mask = tf.math.not_equal(encoder_input_ids, 0)[:, tf.newaxis]
Z = encoder_in
for _ in range(N):
    skip = Z
    attn_layer = tf.keras.layers.MultiHeadAttention(
        num_heads=num_heads, key_dim=embed_size, dropout=dropout_rate)
    Z = attn_layer(Z, value=Z, attention_mask=encoder_pad_mask)
    Z = tf.keras.layers.LayerNormalization()(tf.keras.layers.Add()([Z, skip]))
    skip = Z
    Z = tf.keras.layers.Dense(n_units, activation="relu")(Z)
    Z = tf.keras.layers.Dense(embed_size)(Z)
    Z = tf.keras.layers.Dropout(dropout_rate)(Z)
    Z = tf.keras.layers.LayerNormalization()(tf.keras.layers.Add()([Z, skip]))

In [None]:
decoder_pad_mask = tf.math.not_equal(decoder_input_ids, 0)[:, tf.newaxis]
causal_mask = tf.linalg.band_part(  # creates a lower triangular matrix
    tf.ones((batch_max_len_dec, batch_max_len_dec), tf.bool), -1, 0)

In [None]:
encoder_outputs = Z  # let's save the encoder's final outputs
Z = decoder_in  # the decoder starts with its own inputs
for _ in range(N):
    skip = Z
    attn_layer = tf.keras.layers.MultiHeadAttention(
        num_heads=num_heads, key_dim=embed_size, dropout=dropout_rate)
    Z = attn_layer(Z, value=Z, attention_mask=causal_mask & decoder_pad_mask)
    Z = tf.keras.layers.LayerNormalization()(tf.keras.layers.Add()([Z, skip]))
    skip = Z
    attn_layer = tf.keras.layers.MultiHeadAttention(
        num_heads=num_heads, key_dim=embed_size, dropout=dropout_rate)
    Z = attn_layer(Z, value=encoder_outputs, attention_mask=encoder_pad_mask)
    Z = tf.keras.layers.LayerNormalization()(tf.keras.layers.Add()([Z, skip]))
    skip = Z
    Z = tf.keras.layers.Dense(n_units, activation="relu")(Z)
    Z = tf.keras.layers.Dense(embed_size)(Z)
    Z = tf.keras.layers.LayerNormalization()(tf.keras.layers.Add()([Z, skip]))

**Warning**: the following cell will take a while to run (possibly 2 or 3 hours if you are not using a GPU).

In [None]:
transformer_path = model_root / "transformer.keras"
if transformer_path.exists():
    transformer_model = tf.keras.models.load_model(transformer_path, custom_objects={'PositionalEncoding': PositionalEncoding})
else:
    Y_proba = tf.keras.layers.Dense(vocab_size, activation="softmax")(Z)
    transformer_model = tf.keras.Model(inputs=[encoder_inputs, decoder_inputs],
                        outputs=[Y_proba])
    transformer_model.compile(loss="sparse_categorical_crossentropy", optimizer="nadam",
                metrics=["accuracy"])
    transformer_model.fit((X_train, X_train_dec), Y_train, epochs=10,
            validation_data=((X_valid, X_valid_dec), Y_valid))
    transformer_model.save(transformer_path)

transformer_model.summary()

Model: "model_1"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 input_2 (InputLayer)        [(None, 1)]                  0         []                            
                                                                                                  
 input_1 (InputLayer)        [(None, 1)]                  0         []                            
                                                                                                  
 text_vectorization_1 (Text  (None, 50)                   0         ['input_2[0][0]']             
 Vectorization)                                                                                   
                                                                                                  
 text_vectorization (TextVe  (None, 50)                   0         ['input_1[0][0]']       

In [None]:
translate(transformer_model, "I like soccer and also going to the beach")

'me gusta el fútbol y iba a la playa'

In [None]:
transformer_model.evaluate((X_valid, X_valid_dec), Y_valid)



[1.0695241689682007, 0.7320123910903931]