<a href="https://colab.research.google.com/github/Chan3377/Deep-learning-for-Text-Transformer-Encoder-and-Decoder-for-Machine-Translation/blob/main/Deep_learning_for_Text_Transformer_Encoder_and_Decoder_for_Machine_Translation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Beyond text classification: Sequence-to-sequence learning

### A machine translation example

In [None]:
!wget http://storage.googleapis.com/download.tensorflow.org/data/spa-eng.zip
!unzip -q spa-eng.zip

--2024-01-13 09:24:04--  http://storage.googleapis.com/download.tensorflow.org/data/spa-eng.zip
Resolving storage.googleapis.com (storage.googleapis.com)... 74.125.197.207, 74.125.135.207, 74.125.142.207, ...
Connecting to storage.googleapis.com (storage.googleapis.com)|74.125.197.207|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 2638744 (2.5M) [application/zip]
Saving to: ‘spa-eng.zip’


2024-01-13 09:24:04 (191 MB/s) - ‘spa-eng.zip’ saved [2638744/2638744]



The text file contains one example per line: an English sentence, followed by a tab
character, followed by the corresponding Spanish sentence

In [None]:
text_file = "spa-eng/spa.txt"
with open(text_file) as f:
    lines = f.read().split("\n")[:-1]
text_pairs = []
# Iterate over the lines in the file
for line in lines:
    # Each line contains an English phrase and its Spanish translation, tab-separated
    english, spanish = line.split("\t")
    # prepend "[start]" and append "[end]" to the Spanish sentence
    spanish = "[start] " + spanish + " [end]"
    text_pairs.append((english, spanish))

In [None]:
import random
print(random.choice(text_pairs))

("Tom shouldn't have eaten so much.", '[start] Tom no debería haber comido tanto. [end]')


Shuffle data and split them into the usual training, validation, and test sets

In [None]:
import random
random.shuffle(text_pairs)
num_val_samples = int(0.15 * len(text_pairs))
num_train_samples = len(text_pairs) - 2 * num_val_samples
train_pairs = text_pairs[:num_train_samples]
val_pairs = text_pairs[num_train_samples:num_train_samples + num_val_samples]
test_pairs = text_pairs[num_train_samples + num_val_samples:]

**Vectorizing the English and Spanish text pairs**

Prepare two separate TextVectorization layers: one for **English** and one for **Spanish**
- We need to preserve the **"[start]"** and **"[end]"** tokens that we’ve inserted. By
default, the characters **[ and ]** would be stripped, but we want to keep them
around so we can tell apart the word **“start”** and the start token **"[start]"**.
- Punctuation is different from language to language! In the Spanish Text-
Vectorization layer, if we’re going to **strip punctuation** characters, we need to
also strip the character **¿**.

Note that for a non-toy translation model, we would treat punctuation characters as separate
tokens rather than stripping them, since we would want to be able to generate correctly
punctuated sentences.

In [None]:
import tensorflow as tf
import string
import re
from tensorflow import keras
from tensorflow.keras import layers

# Prepare a custom string standardization function for the Spanish TextVectorization layer:
# it preserves [ and ] but strips ¿ (as well as all other characters from strings.punctuation)
strip_chars = string.punctuation + "¿"
strip_chars = strip_chars.replace("[", "")
strip_chars = strip_chars.replace("]", "")

def custom_standardization(input_string):
    lowercase = tf.strings.lower(input_string)
    return tf.strings.regex_replace(
        lowercase, f"[{re.escape(strip_chars)}]", "")

# only look at the top 15,000 words in each language,
# and we’ll restrict sentences to 20 words.
vocab_size = 15000
sequence_length = 20

# The English layer
source_vectorization = layers.TextVectorization(
    max_tokens=vocab_size,
    output_mode="int",
    output_sequence_length=sequence_length,
)
# The Spanish layer
target_vectorization = layers.TextVectorization(
    # Generate Spanish sentences that have one extra token,
    # since we’ll need to offset the sentence by one step during training
    max_tokens=vocab_size,
    output_mode="int",
    output_sequence_length=sequence_length + 1,
    standardize=custom_standardization,
)
train_english_texts = [pair[0] for pair in train_pairs]
train_spanish_texts = [pair[1] for pair in train_pairs]
# Learn the vocabulary of each language
source_vectorization.adapt(train_english_texts)
target_vectorization.adapt(train_spanish_texts)

**Preparing datasets for the translation task**

Turn data into a **tf.data** pipeline. We want it to return a tuple (inputs, target) where:

- **inputs** is a dict with two keys, **“encoder_inputs” (the English sentence)** and **“decoder_inputs” (the Spanish sentence)**, and

- **target** is the Spanish sentence offset by one step ahead

In [None]:
batch_size = 64

# spa[:, :-1] - The input Spanish sentence doesn’t include the last token to keep inputs and targets at the same length
# spa[:, 1:] - The target Spanish sentence is one step ahead. Both are still the same length (20 words)
def format_dataset(eng, spa):
    eng = source_vectorization(eng)
    spa = target_vectorization(spa)
    return ({
        "english": eng,
        "spanish": spa[:, :-1],
    }, spa[:, 1:])

def make_dataset(pairs):
    eng_texts, spa_texts = zip(*pairs)
    eng_texts = list(eng_texts)
    spa_texts = list(spa_texts)
    dataset = tf.data.Dataset.from_tensor_slices((eng_texts, spa_texts))
    dataset = dataset.batch(batch_size)
    dataset = dataset.map(format_dataset, num_parallel_calls=4)
    # Use in-memory caching to speed up preprocessing
    return dataset.shuffle(2048).prefetch(16).cache()

train_ds = make_dataset(train_pairs)
val_ds = make_dataset(val_pairs)

In [None]:
for inputs, targets in train_ds.take(1):
    print(f"inputs['english'].shape: {inputs['english'].shape}")
    print(f"inputs['spanish'].shape: {inputs['spanish'].shape}")
    print(f"targets.shape: {targets.shape}")

inputs['english'].shape: (64, 20)
inputs['spanish'].shape: (64, 20)
targets.shape: (64, 20)


### Sequence-to-sequence learning with RNNs

- What ***standard sequence-to-sequence models*** do is by ***reading the entire source sentence before starting to translate it***. This is especially important if you’re dealing with languages that have wildly different word ordering, like English and Japanese.
- ***Proper sequence-to-sequence setup***:
    - First use an ***RNN (the encoder) to turn the entire source sequence into a single vector (or set of vectors)***. This could be the last output of the RNN, or alternatively, its final internal state vectors.
    - Then you would ***use this vector (or vectors) as the initial state of another RNN (the decoder)***, ***which would look at elements 0…N in the target sequence, and try to predict step N+1 in the target sequence.***

**GRU-based encoder**

- Implement this in Keras with **GRU-based encoders**:
    - The choice of **GRU** rather than **LSTM** makes things a bit simpler,
    - since **GRU** only has a **single state** vector, whereas **LSTM** has **multiple**.

In [None]:
from tensorflow import keras
from tensorflow.keras import layers

embed_dim = 256
latent_dim = 1024

# The English source sentence goes here.
# Specifying the name of the input enables us to fit() the model with a dict of inputs
source = keras.Input(shape=(None,), dtype="int64", name="english")
# mask_zero=True - mask the pad (zero)
x = layers.Embedding(vocab_size, embed_dim, mask_zero=True)(source)
# Encoded source sentence is the last output of a bidirectional GRU.
encoded_source = layers.Bidirectional(
    layers.GRU(latent_dim), merge_mode="sum")(x)

**GRU-based decoder and the end-to-end model**

- Implement **GRU-based decoders and the end-to-end model**:
    - a simple **GRU layer** that takes as its initial state the encoded source sentence.
    - On top of it, add a **Dense layer** that produces for each output step a probability distribution over the Spanish vocabulary.

In [None]:
# The Spanish target sentence goes here.
past_target = keras.Input(shape=(None,), dtype="int64", name="spanish")
# # mask_zero=True - mask the pad (zero)
x = layers.Embedding(vocab_size, embed_dim, mask_zero=True)(past_target)
decoder_gru = layers.GRU(latent_dim, return_sequences=True)
# The encoded source sentence serves as the initial state of the decoder GRU.
x = decoder_gru(x, initial_state=encoded_source)
x = layers.Dropout(0.5)(x)
# Predicts the next token
target_next_step = layers.Dense(vocab_size, activation="softmax")(x)
# End-to-end model: maps the source sentence and the target sentence to the target sentence one step in the future
seq2seq_rnn = keras.Model([source, past_target], target_next_step)

**Training the recurrent sequence-to-sequence model**

In [None]:
seq2seq_rnn.compile(
    optimizer="rmsprop",
    loss="sparse_categorical_crossentropy",
    metrics=["accuracy"])
seq2seq_rnn.fit(train_ds, epochs=15, validation_data=val_ds)

Epoch 1/15
Epoch 2/15
Epoch 3/15
Epoch 4/15
Epoch 5/15
Epoch 6/15
Epoch 7/15
Epoch 8/15
Epoch 9/15
Epoch 10/15
Epoch 11/15
Epoch 12/15
Epoch 13/15
Epoch 14/15
Epoch 15/15


<keras.src.callbacks.History at 0x7afb0425ead0>

**Translating new sentences with our RNN encoder and decoder**

Use the model for inference.
* Pick a few sentences in the test set and check how the model translates them.
* Start from the seed token, "[start]", and feed it into the decoder model, together with the encoded English source sentence.
* Retrieve a next token prediction, and Re-inject it into the decoder repeatedly, sampling one new target token at each iteration, until getting to the "[end]" or reach the maximum sentence length.

In [None]:
import numpy as np
# Prepare a dict to convert token index predictions to string tokens
spa_vocab = target_vectorization.get_vocabulary()
spa_index_lookup = dict(zip(range(len(spa_vocab)), spa_vocab))
max_decoded_sentence_length = 20

def decode_sequence(input_sentence):
    tokenized_input_sentence = source_vectorization([input_sentence])
    # seed token
    decoded_sentence = "[start]"
    for i in range(max_decoded_sentence_length):
        tokenized_target_sentence = target_vectorization([decoded_sentence])
        # sample the next token
        next_token_predictions = seq2seq_rnn.predict(
            [tokenized_input_sentence, tokenized_target_sentence])
        sampled_token_index = np.argmax(next_token_predictions[0, i, :])
        # Convert the next token prediction to a string and append it to the generated sentence.
        sampled_token = spa_index_lookup[sampled_token_index]
        decoded_sentence += " " + sampled_token
        # Exit condition: either hit max length or sample a stop character
        if sampled_token == "[end]":
            break
    return decoded_sentence

test_eng_texts = [pair[0] for pair in test_pairs]
for _ in range(20):
    input_sentence = random.choice(test_eng_texts)
    print("-")
    print(input_sentence)
    print(decode_sequence(input_sentence))

-
We saw nothing strange.
[start] vimos nada sin decir [end]
-
His mother didn't allow him to ride a motorbike.
[start] su madre no fue capaz de un [UNK] [end]
-
He ran outside naked.
[start] Él se quedó corriendo [end]
-
Nobody understands me.
[start] nadie me entiende [end]
-
Which did you eat, fish or meat?
[start] qué comiste pescado o carne [end]
-
I don't like to drink coffee.
[start] no me gusta el café [end]
-
She sat down and crossed her legs.
[start] se sentó y se [UNK] los ojos [end]
-
It is completely natural for her to be mad.
[start] es muy [UNK] que su peso se debe estar [end]
-
He was in critical condition.
[start] Él estaba en la [UNK] [end]
-
He kept me waiting on purpose.
[start] Él me hizo esperando a propósito [end]
-
Fill in this form.
[start] ponte este formulario [end]
-
The older we grow, the poorer our memory becomes.
[start] los jóvenes empeora más que nos [UNK] los muebles [end]
-
He was in the process of making a final decision.
[start] Él estaba en la mism

Limitation of  the the RNN approach to sequence-to-sequence learning:
- The source sequence representation has to be held entirely in the encoder state vector(s), which
    - puts significant limitations on the size and
    - complexity of the sentences you can translate.
- RNNs have trouble dealing with very long sequences, since they tend to progressively forget about the past—by the time you’ve reached the ***100th token*** in either sequence, ***little information remains about the start of the sequence***.
    
That means RNN-based models can’t hold onto long-term context, which can be essential for translating long documents.

### Sequence-to-sequence learning with Transformer

The limitations of RNN approach are what has led the machine learning community to embrace the Transformer architecture for sequence-to-sequence problems.

- A ***sequence-to-sequence Transformer***, unlike an ***RNN***, which ***looks at its input one step at a time***, and thus will only have access to steps 0...N to generate output step N (which is token N+1 in the target sequence), ***the Transformer Decoder is order-agnostic: it looks at the entire target sequence at once.***
    - **There is an issue here**:
        - ***If it were allowed to use its entire input, it would simply learn to copy input step N+1 to location N in the output***. The model would thus achieve perfect training accuracy, but of course, when running inference, it would be completely useless, since input steps beyond N aren’t available.
    - **Here to fix the issue*:***
        - ***Mask the upper half of the pairwise attention matrix to prevent the model from paying any attention to information from the future***—only information from tokens 0...N in the target sequence should be used when generating target token N+1.
    - To achieve this, add a ***get_causal_attention_mask(self, inputs)*** method to our ***TransformerDecoder*** to retrieve an attention mask that we can pass to our ***MultiHeadAttention*** layers.

#### The Transformer decoder

- Add a **get_causal_attention_mask(self, inputs)** method to the TransformerDecoder

In [None]:
class TransformerDecoder(layers.Layer):
    def __init__(self, embed_dim, dense_dim, num_heads, **kwargs):
        super().__init__(**kwargs)
        self.embed_dim = embed_dim
        self.dense_dim = dense_dim
        self.num_heads = num_heads
        self.attention_1 = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim)
        self.attention_2 = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim)
        self.dense_proj = keras.Sequential(
            [layers.Dense(dense_dim, activation="relu"),
             layers.Dense(embed_dim),]
        )
        self.layernorm_1 = layers.LayerNormalization()
        self.layernorm_2 = layers.LayerNormalization()
        self.layernorm_3 = layers.LayerNormalization()
        # This attribute ensures that the layer will propagate its input mask to its outputs;
        self.supports_masking = True

    def get_config(self):
        config = super().get_config()
        config.update({
            "embed_dim": self.embed_dim,
            "num_heads": self.num_heads,
            "dense_dim": self.dense_dim,
        })
        return config

    def get_causal_attention_mask(self, inputs):
        input_shape = tf.shape(inputs)
        batch_size, sequence_length = input_shape[0], input_shape[1]
        i = tf.range(sequence_length)[:, tf.newaxis]
        j = tf.range(sequence_length)
        # Generate matrix of shape (sequence_length, sequence_length) with 1s in one half and 0s in the other.
        mask = tf.cast(i >= j, dtype="int32")
        # Replicate it along the batch axis to get a matrix of shape (batch_size, sequence_length, sequence_length)
        mask = tf.reshape(mask, (1, input_shape[1], input_shape[1]))
        mult = tf.concat(
            [tf.expand_dims(batch_size, -1),
             tf.constant([1, 1], dtype=tf.int32)], axis=0)
        return tf.tile(mask, mult)

    def call(self, inputs, encoder_outputs, mask=None):
        # Retrieve the causal mask
        causal_mask = self.get_causal_attention_mask(inputs)
        # Prepare the input mask (that describes padding locations in the target sequence).
        if mask is not None:
            padding_mask = tf.cast(
                mask[:, tf.newaxis, :], dtype="int32")
            # Merge the two masks together
            padding_mask = tf.minimum(padding_mask, causal_mask)
        else:
            padding_mask = mask
        # Pass the causal mask to the first attention layer,
        # which performs self-attention over the target sequence
        attention_output_1 = self.attention_1(
            query=inputs,
            value=inputs,
            key=inputs,
            attention_mask=causal_mask)
        attention_output_1 = self.layernorm_1(inputs + attention_output_1)
        # Pass the combined mask to the second attention layer,
        # which relates the source sequence to the target sequence
        attention_output_2 = self.attention_2(
            query=attention_output_1,
            value=encoder_outputs,
            key=encoder_outputs,
            attention_mask=padding_mask,
        )
        attention_output_2 = self.layernorm_2(
            attention_output_1 + attention_output_2)
        proj_output = self.dense_proj(attention_output_2)
        return self.layernorm_3(attention_output_2 + proj_output)

### Putting it all together: A Transformer for machine translation

- The end-to-end Transformer is the model to be training. It maps the source sequence and the target sequence to the target sequence one step in the future.
- It straightforwardly combines the pieces of:
    - ***PositionalEmbedding*** layers,
    - the ***TransformerEncoder***, and
    - the ***TransformerDecoder***.
- Note that both the ***TransformerEncoder*** and the ***TransformerDecoder*** are shape-invariant, could be ***stacking many of them to create a more powerful encoder or decoder***.

#### PositionalEmbedding layer

In [None]:
class PositionalEmbedding(layers.Layer):
    def __init__(self, sequence_length, input_dim, output_dim, **kwargs):
        super().__init__(**kwargs)
        self.token_embeddings = layers.Embedding(
            input_dim=input_dim, output_dim=output_dim)
        self.position_embeddings = layers.Embedding(
            input_dim=sequence_length, output_dim=output_dim)
        self.sequence_length = sequence_length
        self.input_dim = input_dim
        self.output_dim = output_dim

    def call(self, inputs):
        length = tf.shape(inputs)[-1]
        positions = tf.range(start=0, limit=length, delta=1)
        embedded_tokens = self.token_embeddings(inputs)
        embedded_positions = self.position_embeddings(positions)
        return embedded_tokens + embedded_positions

    def compute_mask(self, inputs, mask=None):
        return tf.math.not_equal(inputs, 0)

    def get_config(self):
        config = super(PositionalEmbedding, self).get_config()
        config.update({
            "output_dim": self.output_dim,
            "sequence_length": self.sequence_length,
            "input_dim": self.input_dim,
        })
        return config

#### The Transformer Encoder

In [None]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers

class TransformerEncoder(layers.Layer):
    # initialize variables
    def __init__(self, embed_dim, dense_dim, num_heads, **kwargs):
        super().__init__(**kwargs)
        # size of the input token vector - embedding token vector representation
        self.embed_dim = embed_dim
        # size of the inner dense layer - use for dense projection
        self.dense_dim = dense_dim
        # number of attention heads
        self.num_heads = num_heads

        # initialize multi-head attention
        self.attention = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim)
        # dense projection - independently learned linear projections
        self.dense_proj = keras.Sequential(
            [layers.Dense(dense_dim, activation="relu"),
             layers.Dense(embed_dim),]
        )

        # layer normalization - help gradients flow better during backpropagation
        # normalizes each sequence independently from other sequences in the batch
        self.layernorm_1 = layers.LayerNormalization()
        self.layernorm_2 = layers.LayerNormalization()

    # computation goes in call()
    # the call() method is called automatically when the layer is used in a Keras model
    def call(self, inputs, mask=None):
        # The mask that will be generated by the Embedding layer will be 2D,
        # but the attention layer expects to be 3D or 4D, so we expand its rank
        if mask is not None:
            mask = mask[:, tf.newaxis, :]
        attention_output = self.attention(
            inputs, inputs, attention_mask=mask)
        proj_input = self.layernorm_1(inputs + attention_output)
        proj_output = self.dense_proj(proj_input)
        return self.layernorm_2(proj_input + proj_output)

    # implement serialization so we can save the model
    # get_config method: this enables the layer to be reinstantiated from its config dict,
    # which is useful during model saving and loading.
    def get_config(self):
        config = super().get_config()
        config.update({
            "embed_dim": self.embed_dim,
            "num_heads": self.num_heads,
            "dense_dim": self.dense_dim,
        })
        return config

**End-to-end Transformer**

In [None]:
embed_dim = 256
dense_dim = 2048
num_heads = 8

# inputs for encoder
encoder_inputs = keras.Input(shape=(None,), dtype="int64", name="english")
# process with Positional embedding before passing to TransformerEncoder class
x = PositionalEmbedding(sequence_length, vocab_size, embed_dim)(encoder_inputs)
# the outputs of encoder will then pass to TransformerDecoder class (Encode the source sentence)
encoder_outputs = TransformerEncoder(embed_dim, dense_dim, num_heads)(x)

# inputs for decoder
decoder_inputs = keras.Input(shape=(None,), dtype="int64", name="spanish")
# process with Positional embedding before passing to TransformerDecoder class
x = PositionalEmbedding(sequence_length, vocab_size, embed_dim)(decoder_inputs)
# take the decoder inputs (x) and encoder outputs (encoder_outputs)
x = TransformerDecoder(embed_dim, dense_dim, num_heads)(x, encoder_outputs)
x = layers.Dropout(0.5)(x)
# Predict a word for each output position
decoder_outputs = layers.Dense(vocab_size, activation="softmax")(x)
transformer = keras.Model([encoder_inputs, decoder_inputs], decoder_outputs)

**Training the sequence-to-sequence Transformer**

In [None]:
transformer.compile(
    optimizer="rmsprop",
    loss="sparse_categorical_crossentropy",
    metrics=["accuracy"])
transformer.fit(train_ds, epochs=30, validation_data=val_ds)

Epoch 1/30
Epoch 2/30
Epoch 3/30
Epoch 4/30
Epoch 5/30
Epoch 6/30
Epoch 7/30
Epoch 8/30
Epoch 9/30
Epoch 10/30
Epoch 11/30
Epoch 12/30
Epoch 13/30
Epoch 14/30
Epoch 15/30
Epoch 16/30
Epoch 17/30
Epoch 18/30
Epoch 19/30
Epoch 20/30
Epoch 21/30
Epoch 22/30
Epoch 23/30
Epoch 24/30
Epoch 25/30
Epoch 26/30
Epoch 27/30
Epoch 28/30
Epoch 29/30
Epoch 30/30


<keras.src.callbacks.History at 0x7afafd45d6f0>

**Translating new sentences with our Transformer model**

In [None]:
import numpy as np
# Prepare a dict to convert token index predictions to string tokens
spa_vocab = target_vectorization.get_vocabulary()
spa_index_lookup = dict(zip(range(len(spa_vocab)), spa_vocab))
max_decoded_sentence_length = 20

def decode_sequence(input_sentence):
    tokenized_input_sentence = source_vectorization([input_sentence])
    decoded_sentence = "[start]"
    for i in range(max_decoded_sentence_length):
        tokenized_target_sentence = target_vectorization(
            [decoded_sentence])[:, :-1]
        # sample the next token
        predictions = transformer(
            [tokenized_input_sentence, tokenized_target_sentence])
        sampled_token_index = np.argmax(predictions[0, i, :])
        # Convert the next token prediction to a string, and append it to the generated sentence
        sampled_token = spa_index_lookup[sampled_token_index]
        decoded_sentence += " " + sampled_token
        # Exit condition
        if sampled_token == "[end]":
            break
    return decoded_sentence

test_eng_texts = [pair[0] for pair in test_pairs]
for _ in range(20):
    input_sentence = random.choice(test_eng_texts)
    print("-")
    print(input_sentence)
    print(decode_sequence(input_sentence))

-
Why haven't you called?
[start] por qué no te ha sido te llamó [end]
-
You need a lot of water.
[start] necesitas mucho agua [end]
-
We searched everywhere.
[start] todos los [UNK] [end]
-
What on earth is going on here?
[start] qué te [UNK] hay aquí [end]
-
I know I'm going to learn a lot.
[start] sé que voy a aprender mucho [end]
-
I remember what I saw.
[start] recuerdo lo que vi [end]
-
He stood up.
[start] se puso la cama [end]
-
I can't move my legs.
[start] no puedo [UNK] [end]
-
Tom always says that.
[start] tom siempre dice eso [end]
-
I can see what you mean.
[start] tengo que ver lo que te [UNK] [end]
-
Is it still raining?
[start] está lloviendo todavía [end]
-
Tom said goodnight.
[start] tom dijo buenas noches [end]
-
Be still.
[start] todavía hay [end]
-
I was about to suggest the same thing.
[start] estaba a punto de [UNK] la misma cosa [end]
-
The lion is eating meat.
[start] el león es una carne [end]
-
When I told him that, he was very much embarrassed.
[start] cuan