<a href="https://colab.research.google.com/github/erinijapranckeviciene/MF54609_18981_1_20241/blob/main/FC_Chapter11_Transformers.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

### BTW, Recalling the effect of validation error being less than training

https://pyimagesearch.com/2019/10/14/why-is-my-validation-loss-lower-than-my-training-loss/#:~:text=Regularization%20methods%20often%20sacrifice%20training,lower%20than%20your%20training%20loss.

### Prepare IMDB data as before

In [None]:
!wget https://github.com/erinijapranckeviciene/MF54609_18981_1_20241/raw/refs/heads/main/datasets/RNN/aclImdb.zip
!unzip -qq aclImdb.zip

In [None]:
import os, pathlib, shutil, random
from tensorflow import keras

batch_size = 32
train_ds = keras.utils.text_dataset_from_directory("aclImdb/train", batch_size=batch_size )
val_ds = keras.utils.text_dataset_from_directory("aclImdb/val", batch_size=batch_size)
test_ds = keras.utils.text_dataset_from_directory("aclImdb/test", batch_size=batch_size)

### text only ds

In [None]:
# This function returns only data part without target
# to create a new dataset that will be used to create dictionary
text_only_train_ds = train_ds.map(lambda x, y: x)
for inputs in text_only_train_ds:
    print("inputs.shape:", inputs.shape)
    print("inputs.dtype:", inputs.dtype)
    print("inputs[0]:", inputs[0])
    break

### Prepare int representation dataset

In [None]:
import tensorflow as tf
from tensorflow.keras import layers

# max_length is a length of the vector that encodes text
# 250 size gave ~0.85 on test data
# 600 with 4 LSTM units does not train at all
# the reviews are about 300 words
max_length = 300
max_tokens = 20000
text_vectorization = layers.TextVectorization( max_tokens=max_tokens, output_mode="int", output_sequence_length=max_length)

text_vectorization.adapt(text_only_train_ds)

int_train_ds = train_ds.map(lambda x, y: (text_vectorization(x), y), num_parallel_calls=4)
int_val_ds = val_ds.map(lambda x, y: (text_vectorization(x), y), num_parallel_calls=4)
int_test_ds = test_ds.map(lambda x, y: (text_vectorization(x), y), num_parallel_calls=4)

### Verify

In [None]:
for inputs, targets in int_train_ds:
    print(inputs.shape)
    print(inputs[0])
    print(targets[0])
    # With this test_input variable verify tf.one_hot() transformation
    test_input=inputs[0]
    break

## Transformers  F.Chollet Chapter 11

#### 11.4 The Transformer architecture

..."Transformers were introduced in the seminal paper “Attention is all you need” by Vaswani et al. The gist of the paper is right there in the title: as it turned out, a simple mechanism called “neural attention” could be used to build powerful sequence models that didn’t feature any recurrent layers or convolution layers."...

### Listing 11.21 Transformer encoder implemented as a subclassed Layer

In [None]:
import tensorflow as tf
from tensorflow import keras
from keras import layers

class TransformerEncoder(layers.Layer):
  def __init__(self, embed_dim, dense_dim, num_heads, **kwargs):
    super().__init__(**kwargs)
    self.embed_dim = embed_dim
    self.dense_dim = dense_dim
    self.num_heads = num_heads
    self.attention = layers.MultiHeadAttention(
        num_heads=num_heads, key_dim=embed_dim)
    self.dense_proj = keras.Sequential(
        [layers.Dense(dense_dim, activation="relu"), layers.Dense(embed_dim),]
    )
    self.layernorm_1 = layers.LayerNormalization()
    self.layernorm_2 = layers.LayerNormalization()

  def call(self, inputs, mask=None):
    if mask is not None:
        mask=mask[:, tf.newaxis, :]
    attention_output = self.attention(
        inputs, inputs, attention_mask=mask
    )
    proj_input = self.layernorm_1(inputs + attention_output)
    proj_output = self.dense_proj(proj_input)
    return self.layernorm_2(proj_input + proj_output)

  def get_config(self):
    config = super().get_config()
    config.update({
        "embed_dim": self.embed_dim,
        "num_heads": self.num_heads,
        "dense_dim": self.dense_dim,
    })
    return config


### Listing 11.22 Using the Transformer encoder for text classification

In [None]:
vocab_size = 20000
embed_dim = 256
num_heads = 2
dense_dim = 32

inputs = keras.Input(shape=(None,), dtype="int64")
x = layers.Embedding(vocab_size, embed_dim)(inputs)
x = TransformerEncoder(embed_dim, dense_dim, num_heads)(x)
x = layers.GlobalMaxPooling1D()(x)
x = layers.Dropout(0.5)(x)
outputs = layers.Dense(1, activation="sigmoid")(x)
model = keras.Model(inputs, outputs)

model.compile(optimizer="rmsprop", loss="binary_crossentropy", metrics=["accuracy"])
model.summary()


### Listing 11.23 Training and evaluating the Transformer encoder based model

In [None]:
callbacks = [ keras.callbacks.ModelCheckpoint("transformer_encoder.keras", save_best_only=True) ]
model.fit(int_train_ds, validation_data=int_val_ds, epochs=20, callbacks=callbacks)

model = keras.models.load_model("transformer_encoder.keras", custom_objects={"TransformerEncoder": TransformerEncoder})
print(f"Test acc: {model.evaluate(int_test_ds)[1]:.3f}")


### Listing 11.24 Implementing positional embedding as a subclassed layer

In [None]:
class PositionalEmbedding(layers.Layer):
  def __init__(self, sequence_length, input_dim, output_dim, **kwargs):
    super().__init__(**kwargs)
    self.token_embeddings = layers.Embedding(
        input_dim=input_dim, output_dim=output_dim)
    self.position_embeddings = layers.Embedding(
        input_dim=sequence_length, output_dim=output_dim)
    self.sequence_length = sequence_length
    self.input_dim = input_dim
    self.output_dim = output_dim

  def call(self, inputs):
    length = tf.shape(inputs)[-1]
    positions = tf.range(start=0, limit=length, delta=1)
    embedded_tokens = self.token_embeddings(inputs)
    embedded_positions = self.position_embeddings(positions)
    return embedded_tokens + embedded_positions

  #def compute_mask(self, inputs, mask=None):
  #  return tf.math.not_equal(inputs, 0) #it was a mistake correction below
    #return layers.Lambda(lambda x: tf.math.not_equal(x, 0))(inputs)

  def get_config(self):
    config = super().get_config()
    config.update({
      "output_dim": self.output_dim,
      "sequence_length": self.sequence_length,
      "input_dim": self.input_dim,
      })
    return config

### Listing 11.25 Combining the Transformer encoder with positional embedding

In [None]:
vocab_size = 20000
sequence_length = 600
embed_dim = 256
num_heads = 2
dense_dim = 32

inputs = keras.Input(shape=(None,), dtype="int64")
x = PositionalEmbedding(sequence_length, vocab_size, embed_dim)(inputs)
x = TransformerEncoder(embed_dim, dense_dim, num_heads)(x)
x = layers.GlobalMaxPooling1D()(x)
x = layers.Dropout(0.5)(x)
outputs = layers.Dense(1, activation="sigmoid")(x)
model = keras.Model(inputs, outputs)

model.compile(optimizer="rmsprop", loss="binary_crossentropy", metrics=["accuracy"])
model.summary()


### Change the dataset - the sequence length is 600


In [None]:
text_vectorization = layers.TextVectorization( max_tokens=vocab_size, output_mode="int", output_sequence_length=sequence_length)

text_vectorization.adapt(text_only_train_ds)

int_train_ds = train_ds.map(lambda x, y: (text_vectorization(x), y), num_parallel_calls=4)
int_val_ds = val_ds.map(lambda x, y: (text_vectorization(x), y), num_parallel_calls=4)
int_test_ds = test_ds.map(lambda x, y: (text_vectorization(x), y), num_parallel_calls=4)

### Execute the model

In [None]:
callbacks = [keras.callbacks.ModelCheckpoint("full_transformer_encoder.keras", save_best_only=True)]
model.fit(int_train_ds, validation_data=int_val_ds, epochs=20, callbacks=callbacks)



In [None]:
model = keras.models.load_model("full_transformer_encoder.keras", custom_objects={"TransformerEncoder": TransformerEncoder, "PositionalEmbedding": PositionalEmbedding})
print(f"Test acc: {model.evaluate(int_test_ds)[1]:.3f}")

### 11.5.1 A machine translation example

In [None]:
!wget http://storage.googleapis.com/download.tensorflow.org/data/spa-eng.zip
!unzip -q spa-eng.zip

### Parse the file


In [None]:
text_file = "spa-eng/spa.txt"
with open(text_file) as f:
  lines = f.read().split("\n")[:-1]
text_pairs = []

for line in lines:
  eng, spa = line.split("\t")
  spa = "[start] " + spa + " [end]"
  text_pairs.append((eng, spa))

import random
print(random.choice(text_pairs))


### Shuffle and create the training, validation and test sets

In [None]:
import random
random.shuffle(text_pairs)
num_val_samples = int(0.15 * len(text_pairs))
num_train_samples = len(text_pairs) - 2 * num_val_samples
train_pairs = text_pairs[:num_train_samples]
val_pairs = text_pairs[num_train_samples:num_train_samples + num_val_samples]
test_pairs = text_pairs[num_train_samples + num_val_samples:]

### Listing 11.26 Vectorizing the English and Spanish text pairs

Note that for a non-toy translation model, we would treat punctuation characters as sep-
arate tokens rather than stripping them, since we would want to be able to generate cor-
rectly punctuated sentences. In our case, for simplicity, we’ll get rid of all punctuation.

In [None]:
import tensorflow as tf
import string
import re
strip_chars = string.punctuation + "¿"
strip_chars = strip_chars.replace("[", "")
strip_chars = strip_chars.replace("]", "")

def custom_standardization(input_string):
  lowercase = tf.strings.lower(input_string)
  return tf.strings.regex_replace(lowercase, f"[{re.escape(strip_chars)}]", "")

#To keep things simple, we’ll only look at
#the top 15,000 words in each language,
#and we’ll restrict sentences to 20 words.

vocab_size = 15000
sequence_length = 20

source_vectorization = layers.TextVectorization(
    max_tokens=vocab_size,
    output_mode="int",
    output_sequence_length=sequence_length,
)
target_vectorization = layers.TextVectorization(
    max_tokens=vocab_size,
    output_mode="int",
    output_sequence_length=sequence_length + 1,
    standardize=custom_standardization,
)
train_english_texts = [pair[0] for pair in train_pairs]
train_spanish_texts = [pair[1] for pair in train_pairs]
source_vectorization.adapt(train_english_texts)
target_vectorization.adapt(train_spanish_texts)


### Listing 11.27 Preparing datasets for the translation task

Finally, we can turn our data into a tf.data pipeline. We want it to return a tuple
(inputs, target) where inputs is a dict with two keys, “encoder_inputs” (the English
sentence) and “decoder_inputs” (the Spanish sentence), and target is the Spanish
sentence offset by one step ahead.

In [None]:
batch_size = 64
def format_dataset(eng, spa):
  eng = source_vectorization(eng)
  spa = target_vectorization(spa)
  return ({"english": eng,"spanish": spa[:, :-1],}, spa[:, 1:])

def make_dataset(pairs):
  eng_texts, spa_texts = zip(*pairs)
  eng_texts = list(eng_texts)
  spa_texts = list(spa_texts)
  dataset = tf.data.Dataset.from_tensor_slices((eng_texts, spa_texts))
  dataset = dataset.batch(batch_size)
  dataset = dataset.map(format_dataset, num_parallel_calls=4)
  return dataset.shuffle(2048).prefetch(16).cache()

train_ds = make_dataset(train_pairs)
val_ds = make_dataset(val_pairs)
#


### Dataset outputs


In [None]:
for inputs, targets in train_ds.take(1):
  print(f"inputs['english'].shape: {inputs['english'].shape}")
  print(f"inputs['spanish'].shape: {inputs['spanish'].shape}")
  print(f"targets.shape: {targets.shape}")


## 11.5.2 Sequence-to-sequence learning with RNNs

### Listing 11.28 GRU-based encoder

Let’s implement this in Keras with GRU-based encoders and decoders. The choice
of GRU rather than LSTM makes things a bit simpler, since GRU only has a single
state vector, whereas LSTM has multiple. Let’s start with the encoder.

Q: Difference in GRU and LSTM layers

In [None]:
from tensorflow import keras
from tensorflow.keras import layers

embed_dim = 256
latent_dim = 1024

source = keras.Input(shape=(None,), dtype="int64", name="english")
x = layers.Embedding(vocab_size, embed_dim, mask_zero=True)(source)
encoded_source = layers.Bidirectional(
    layers.GRU(latent_dim), merge_mode="sum")(x)

### Listing 11.29 GRU-based decoder and the end-to-end model

During training, the decoder takes as input the entire target sequence, but thanks to
the step-by-step nature of RNNs, it only looks at tokens 0…N in the input to predict token N in the output (which corresponds to the next token in the sequence, since
the output is intended to be offset by one step). This means we only use information
from the past to predict the future, as we should; otherwise we’d be cheating, and our
model would not work at inference time.

In [None]:
past_target = keras.Input(shape=(None,), dtype="int64", name="spanish")

x = layers.Embedding(vocab_size, embed_dim, mask_zero=True)(past_target)
decoder_gru = layers.GRU(latent_dim, return_sequences=True)

x = decoder_gru(x, initial_state=encoded_source)
x = layers.Dropout(0.5)(x)
target_next_step = layers.Dense(vocab_size, activation="softmax")(x)

# End-to-end model: maps the source sentence and the target sentence to the target sentence one step in the future
seq2seq_rnn = keras.Model([source, past_target], target_next_step)


### Listing 11.30 Training our recurrent sequence-to-sequence model

In [None]:
seq2seq_rnn.compile(optimizer="rmsprop", loss="sparse_categorical_crossentropy", metrics=["accuracy"])
seq2seq_rnn.fit(train_ds, epochs=15, validation_data=val_ds)

### Listing 11.31 Translating new sentences with our RNN encoder and decoder

In [None]:
import numpy as np

#Prepare a dict to convert token
#index predictions to string tokens

spa_vocab = target_vectorization.get_vocabulary()
spa_index_lookup = dict(zip(range(len(spa_vocab)), spa_vocab))
max_decoded_sentence_length = 20

def decode_sequence(input_sentence):
  tokenized_input_sentence = source_vectorization([input_sentence])
  decoded_sentence = "[start]"
  for i in range(max_decoded_sentence_length):
    tokenized_target_sentence = target_vectorization([decoded_sentence])
    next_token_predictions = seq2seq_rnn.predict(
        [tokenized_input_sentence, tokenized_target_sentence])
    sampled_token_index = np.argmax(next_token_predictions[0, i, :])
    sampled_token = spa_index_lookup[sampled_token_index]
    decoded_sentence += " " + sampled_token
    if sampled_token == "[end]":
      break
  return decoded_sentence

In [None]:
test_eng_texts = [pair[0] for pair in test_pairs]
for _ in range(30):
  input_sentence = random.choice(test_eng_texts)
  print("-")
  print(input_sentence)
  print(decode_sequence(input_sentence))
  print("-"*50)

### Listing 11.33 The TransformerDecoder

In [None]:
class TransformerDecoder(layers.Layer):
  def __init__(self, embed_dim, dense_dim, num_heads, **kwargs):
    super().__init__(**kwargs)
    self.embed_dim = embed_dim
    self.dense_dim = dense_dim
    self.num_heads = num_heads
    self.attention_1 = layers.MultiHeadAttention( num_heads=num_heads, key_dim=embed_dim)
    self.attention_2 = layers.MultiHeadAttention( num_heads=num_heads, key_dim=embed_dim)
    self.dense_proj = keras.Sequential([layers.Dense(dense_dim, activation="relu"), layers.Dense(embed_dim),] )
    self.layernorm_1 = layers.LayerNormalization()
    self.layernorm_2 = layers.LayerNormalization()
    self.layernorm_3 = layers.LayerNormalization()
    self.supports_masking = True

  def get_config(self):
    config = super().get_config()
    config.update({
        "embed_dim": self.embed_dim,
        "num_heads": self.num_heads,
        "dense_dim": self.dense_dim,
    })
    return config


### Listing 11.34 TransformerDecoder method that generates a causal mask


In [None]:
#Generate matrix of shape (sequence_length, sequence_length) with 1s in one half and 0s in the other.
def get_causal_attention_mask(self, inputs):
  input_shape = tf.shape(inputs)
  batch_size, sequence_length = input_shape[0], input_shape[1]
  i = tf.range(sequence_length)[:, tf.newaxis]
  j = tf.range(sequence_length)
  mask = tf.cast(i >= j, dtype="int32")

  #Replicate it along the batch axis to get a matrix of shape (batch_size, sequence_length, sequence_length)
  mask = tf.reshape(mask, (1, input_shape[1], input_shape[1]))
  mult = tf.concat([tf.expand_dims(batch_size, -1), tf.constant([1, 1], dtype=tf.int32)], axis=0)
  return tf.tile(mask, mult)


### Listing 11.35 The forward pass of the TransformerDecoder

In [None]:
def call(self, inputs, encoder_outputs, mask=None):
  causal_mask = self.get_causal_attention_mask(inputs)
  if mask is not None:
    padding_mask = tf.cast(mask[:, tf.newaxis, :], dtype="int32")
    padding_mask = tf.minimum(padding_mask, causal_mask)
  attention_output_1 = self.attention_1(query=inputs, value=inputs, key=inputs, attention_mask=causal_mask)
  attention_output_1 = self.layernorm_1(inputs + attention_output_1)
  attention_output_2 = self.attention_2(query=attention_output_1, value=encoder_outputs, key=encoder_outputs, attention_mask=padding_mask,)
  attention_output_2 = self.layernorm_2(attention_output_1 + attention_output_2)
  proj_output = self.dense_proj(attention_output_2)
  return self.layernorm_3(attention_output_2 + proj_output)


### Listing 11.36 End-to-end Transformer

In [None]:
embed_dim = 256
dense_dim = 2048
num_heads = 8

encoder_inputs = keras.Input(shape=(None,), dtype="int64", name="english")
x = PositionalEmbedding(sequence_length, vocab_size, embed_dim)(encoder_inputs)
encoder_outputs = TransformerEncoder(embed_dim, dense_dim, num_heads)(x)

decoder_inputs = keras.Input(shape=(None,), dtype="int64", name="spanish")
x = PositionalEmbedding(sequence_length, vocab_size, embed_dim)(decoder_inputs)
x = TransformerDecoder(embed_dim, dense_dim, num_heads)(x, encoder_outputs)
x = layers.Dropout(0.5)(x)
decoder_outputs = layers.Dense(vocab_size, activation="softmax")(x)
transformer = keras.Model([encoder_inputs, decoder_inputs], decoder_outputs)


### Listing 11.37 Training the sequence-to-sequence Transformer

In [None]:
transformer.compile(optimizer="rmsprop", loss="sparse_categorical_crossentropy", metrics=["accuracy"])
transformer.fit(train_ds, epochs=30, validation_data=val_ds)

### Listing 11.38 Translating new sentences with our Transformer model

In [None]:
import numpy as np
spa_vocab = target_vectorization.get_vocabulary()
spa_index_lookup = dict(zip(range(len(spa_vocab)), spa_vocab))
max_decoded_sentence_length = 20

def decode_sequence(input_sentence):
  tokenized_input_sentence = source_vectorization([input_sentence])
  decoded_sentence = "[start]"
  for i in range(max_decoded_sentence_length):
    tokenized_target_sentence = target_vectorization([decoded_sentence])[:, :-1]
    predictions = transformer([tokenized_input_sentence, tokenized_target_sentence])
    sampled_token_index = np.argmax(predictions[0, i, :])
    sampled_token = spa_index_lookup[sampled_token_index]
    decoded_sentence += " " + sampled_token
    if sampled_token == "[end]":
      break
  return decoded_sentence


In [None]:
test_eng_texts = [pair[0] for pair in test_pairs]
for _ in range(20):
  input_sentence = random.choice(test_eng_texts)
  print("-")
  print(input_sentence)
  print(decode_sequence(input_sentence))