This is a companion notebook for the book [Deep Learning with Python, Second Edition](https://www.manning.com/books/deep-learning-with-python-second-edition?a_aid=keras&a_bid=76564dff). For readability, it only contains runnable code blocks and section titles, and omits everything else in the book: text paragraphs, figures, and pseudocode.

**If you want to be able to follow what's going on, I recommend reading the notebook side by side with your copy of the book.**

This notebook was generated for TensorFlow 2.6.

## Beyond text classification: Sequence-to-sequence learning

### A machine translation example

In [None]:
!wget http://storage.googleapis.com/download.tensorflow.org/data/spa-eng.zip # importing the data
!unzip -q spa-eng.zip # unzipping the data

In [None]:
text_file = "spa-eng/spa.txt" # reading the data from the file and storing it in a list
with open(text_file) as f: # opening the file 
    lines = f.read().split("\n")[:-1] # reading the file and splitting it by new line and storing it in a list
text_pairs = [] # creating a list to store the text pairs
for line in lines: # iterating through the lines
    english, spanish = line.split("\t") # splitting the line by tab
    spanish = "[start] " + spanish + " [end]" # adding start and end tokens to the spanish text
    text_pairs.append((english, spanish)) # appending the english and spanish text to the text pairs list

In [None]:
import random # importing the random module
print(random.choice(text_pairs)) # printing a random text pair

In [None]:
import random # importing the random module
random.shuffle(text_pairs) # shuffling the text pairs
num_val_samples = int(0.15 * len(text_pairs)) # calculating the number of validation samples
num_train_samples = len(text_pairs) - 2 * num_val_samples # calculating the number of training samples
train_pairs = text_pairs[:num_train_samples] # splitting the text pairs into training pairs
val_pairs = text_pairs[num_train_samples:num_train_samples + num_val_samples] # splitting the text pairs into validation pairs
test_pairs = text_pairs[num_train_samples + num_val_samples:] # splitting the text pairs into test pairs

**Vectorizing the English and Spanish text pairs**

In [None]:
import tensorflow as tf # importing the tensorflow module
import string # importing the string module
import re # importing the re module
from tensorflow import keras # importing the keras module
from tensorflow.keras import layers # importing the layers module

strip_chars = string.punctuation + "¿" # defining the characters to strip
strip_chars = strip_chars.replace("[", "") # replacing the characters
strip_chars = strip_chars.replace("]", "") # replacing the characters

def custom_standardization(input_string): # defining the custom standardization function
    lowercase = tf.strings.lower(input_string) # converting the input string to lowercase
    return tf.strings.regex_replace( # returning the regex replace
        lowercase, f"[{re.escape(strip_chars)}]", "") # replacing the characters

vocab_size = 15000 # defining the vocabulary size
sequence_length = 20 # defining the sequence length

source_vectorization = layers.TextVectorization( # defining the source vectorization
    max_tokens=vocab_size, # setting the maximum tokens
    output_mode="int", # setting the output mode
    output_sequence_length=sequence_length, # setting the output sequence length
)
target_vectorization = layers.TextVectorization( # defining the target vectorization
    max_tokens=vocab_size, # setting the maximum tokens
    output_mode="int", # setting the output mode
    output_sequence_length=sequence_length + 1, # setting the output sequence length
    standardize=custom_standardization, # setting the standardization
)
train_english_texts = [pair[0] for pair in train_pairs] # getting the english texts from the train pairs
train_spanish_texts = [pair[1] for pair in train_pairs] # getting the spanish texts from the train pairs
source_vectorization.adapt(train_english_texts) # adapting the source vectorization
target_vectorization.adapt(train_spanish_texts) # adapting the target vectorization

**Preparing datasets for the translation task**

In [None]:
batch_size = 64 # defining the batch size

def format_dataset(eng, spa): # defining the format dataset function
    eng = source_vectorization(eng) # vectorizing the english text
    spa = target_vectorization(spa) # vectorizing the spanish text
    return ({ # returning the dictionary
        "english": eng, # returning the english text
        "spanish": spa[:, :-1], # returning the spanish text
    }, spa[:, 1:]) # returning the spanish text

def make_dataset(pairs): # defining the make dataset function
    eng_texts, spa_texts = zip(*pairs) # unzipping the pairs
    eng_texts = list(eng_texts) # converting the english texts to a list
    spa_texts = list(spa_texts) # converting the spanish texts to a list
    dataset = tf.data.Dataset.from_tensor_slices((eng_texts, spa_texts)) # creating a dataset from the tensor slices
    dataset = dataset.batch(batch_size) # batching the dataset
    dataset = dataset.map(format_dataset, num_parallel_calls=4) # mapping the dataset with the format dataset function in parallel with 4 calls (this is done to speed up the process)
    return dataset.shuffle(2048).prefetch(16).cache() # returning the dataset shuffled, prefetched and cached (this is done to speed up the process)

train_ds = make_dataset(train_pairs) # creating the training dataset
val_ds = make_dataset(val_pairs) # creating the validation dataset

In [None]:
for inputs, targets in train_ds.take(1): # iterating through the training dataset
    print(f"inputs['english'].shape: {inputs['english'].shape}") # printing the shape of the english inputs
    print(f"inputs['spanish'].shape: {inputs['spanish'].shape}") # printing the shape of the spanish inputs
    print(f"targets.shape: {targets.shape}") # printing the shape of the targets

### Sequence-to-sequence learning with RNNs

**GRU-based encoder**

In [None]:
from tensorflow import keras # importing the keras module
from tensorflow.keras import layers # importing the layers module

embed_dim = 256 # defining the embedding dimension
latent_dim = 1024 # defining the latent dimension

source = keras.Input(shape=(None,), dtype="int64", name="english") # defining the source input layer with the input shape and data type and name "english" 
x = layers.Embedding(vocab_size, embed_dim, mask_zero=True)(source) # defining the embedding layer with the vocabulary size, embedding dimension and mask zero set to True
encoded_source = layers.Bidirectional( # defining the bidirectional layer
    layers.GRU(latent_dim), merge_mode="sum")(x) # defining the GRU layer with the latent dimension and merge mode set to sum

**GRU-based decoder and the end-to-end model**

In [None]:
past_target = keras.Input(shape=(None,), dtype="int64", name="spanish") # defining the past target input layer with the input shape and data type and name "spanish"
x = layers.Embedding(vocab_size, embed_dim, mask_zero=True)(past_target) # defining the embedding layer with the vocabulary size, embedding dimension and mask zero set to True
decoder_gru = layers.GRU(latent_dim, return_sequences=True) # defining the GRU layer with the latent dimension and return sequences set to True
x = decoder_gru(x, initial_state=encoded_source) # defining the GRU layer with the initial state set to the encoded source
x = layers.Dropout(0.5)(x) # defining the dropout layer with the rate set to 0.5
target_next_step = layers.Dense(vocab_size, activation="softmax")(x) # defining the dense layer with the vocabulary size and activation set to softmax
seq2seq_rnn = keras.Model([source, past_target], target_next_step) # defining the model with the source, past target and target next step

**Training our recurrent sequence-to-sequence model**

In [None]:
seq2seq_rnn.compile( # compiling the model
    optimizer="rmsprop", # setting the optimizer to rmsprop
    loss="sparse_categorical_crossentropy", # setting the loss to sparse categorical crossentropy
    metrics=["accuracy"]) # setting the metrics to accuracy
seq2seq_rnn.fit(train_ds, epochs=15, validation_data=val_ds) # fitting the model with the training and validation datasets

**Translating new sentences with our RNN encoder and decoder**

In [None]:
import numpy as np # importing the numpy module
spa_vocab = target_vectorization.get_vocabulary() # getting the spanish vocabulary
spa_index_lookup = dict(zip(range(len(spa_vocab)), spa_vocab)) # creating a dictionary with the index and vocabulary
max_decoded_sentence_length = 20 # defining the maximum decoded sentence length

def decode_sequence(input_sentence): # defining the decode sequence function
    tokenized_input_sentence = source_vectorization([input_sentence]) # vectorizing the input sentence
    decoded_sentence = "[start]" # defining the decoded sentence with the start token 
    for i in range(max_decoded_sentence_length): # iterating through the maximum decoded sentence length
        tokenized_target_sentence = target_vectorization([decoded_sentence]) # vectorizing the target sentence
        next_token_predictions = seq2seq_rnn.predict( # predicting the next token
            [tokenized_input_sentence, tokenized_target_sentence]) # predicting the next token
        sampled_token_index = np.argmax(next_token_predictions[0, i, :]) # sampling the token index
        sampled_token = spa_index_lookup[sampled_token_index] # sampling the token
        decoded_sentence += " " + sampled_token # adding the sampled token to the decoded sentence
        if sampled_token == "[end]": # if the sampled token is the end token
            break # break the loop
    return decoded_sentence # return the decoded sentence

test_eng_texts = [pair[0] for pair in test_pairs] # getting the english texts from the test pairs
for _ in range(20): # iterating 20 times
    input_sentence = random.choice(test_eng_texts) # choosing a random english text
    print("-") # printing a dash
    print(input_sentence) # printing the input sentence
    print(decode_sequence(input_sentence)) # printing the decoded sequence

### Sequence-to-sequence learning with Transformer

#### The Transformer decoder

**The `TransformerDecoder`**

In [None]:
class TransformerDecoder(layers.Layer): # defining the transformer decoder layer
    def __init__(self, embed_dim, dense_dim, num_heads, **kwargs): # defining the initialization function
        super().__init__(**kwargs) # initializing the layer
        self.embed_dim = embed_dim # setting the embedding dimension
        self.dense_dim = dense_dim # setting the dense dimension
        self.num_heads = num_heads # setting the number of heads
        self.attention_1 = layers.MultiHeadAttention( # defining the multi head attention layer
            num_heads=num_heads, key_dim=embed_dim) # setting the number of heads and key dimension
        self.attention_2 = layers.MultiHeadAttention( # defining the multi head attention layer
            num_heads=num_heads, key_dim=embed_dim) # setting the number of heads and key dimension
        self.dense_proj = keras.Sequential( # defining the sequential layer
            [layers.Dense(dense_dim, activation="relu"), # defining the dense layer with the dense dimension and activation set to relu
             layers.Dense(embed_dim),] # defining the dense layer with the embedding dimension
        )
        self.layernorm_1 = layers.LayerNormalization() # defining the layer normalization layer 
        self.layernorm_2 = layers.LayerNormalization() # defining the layer normalization layer
        self.layernorm_3 = layers.LayerNormalization() # defining the layer normalization layer
        self.supports_masking = True # setting the supports masking to True

    def get_config(self): # defining the get config function
        config = super().get_config() # getting the configuration
        config.update({ # updating the configuration
            "embed_dim": self.embed_dim, # setting the embedding dimension
            "num_heads": self.num_heads, # setting the number of heads
            "dense_dim": self.dense_dim, # setting the dense dimension
        })
        return config # returning the configuration

    def get_causal_attention_mask(self, inputs): # defining the get causal attention mask function
        input_shape = tf.shape(inputs) # getting the shape of the inputs
        batch_size, sequence_length = input_shape[0], input_shape[1] # getting the batch size and sequence length
        i = tf.range(sequence_length)[:, tf.newaxis] # getting the range of the sequence length
        j = tf.range(sequence_length) # getting the range of the sequence length
        mask = tf.cast(i >= j, dtype="int32") # casting the mask to integer 32 bit type (this is done to get the lower triangular matrix)
        mask = tf.reshape(mask, (1, input_shape[1], input_shape[1])) # reshaping the mask to the input shape 
        mult = tf.concat( # concatenating the tensors
            [tf.expand_dims(batch_size, -1), # expanding the dimensions of the batch size 
             tf.constant([1, 1], dtype=tf.int32)], axis=0) # concatenating the tensors along the 0 axis 
        return tf.tile(mask, mult) # returning the tiled mask

    def call(self, inputs, encoder_outputs, mask=None): # defining the call function with the inputs, encoder outputs and mask
        causal_mask = self.get_causal_attention_mask(inputs) # getting the causal attention mask
        if mask is not None: # if the mask is not None
            padding_mask = tf.cast( # casting the padding mask
                mask[:, tf.newaxis, :], dtype="int32") # casting the mask to integer 32 bit type
            padding_mask = tf.minimum(padding_mask, causal_mask) # getting the minimum of the padding mask and causal mask
        else: # if the mask is None
            padding_mask = mask # setting the padding mask to the mask
        attention_output_1 = self.attention_1( # getting the attention output
            query=inputs, # setting the query to the inputs
            value=inputs, # setting the value to the inputs
            key=inputs, # setting the key to the inputs
            attention_mask=causal_mask) # setting the attention mask to the causal mask
        attention_output_1 = self.layernorm_1(inputs + attention_output_1) # getting the layer normalization output by adding the inputs and attention output 1 and passing it through the layer normalization layer
        attention_output_2 = self.attention_2( # getting the attention output 2 by passing the inputs, encoder outputs and padding mask through the attention 2 layer
            query=attention_output_1, # setting the query to the attention output 1
            value=encoder_outputs, # setting the value to the encoder outputs
            key=encoder_outputs, # setting the key to the encoder outputs
            attention_mask=padding_mask, # setting the attention mask to the padding mask
        )
        attention_output_2 = self.layernorm_2( # getting the layer normalization output by adding the attention output 1 and attention output 2 and passing it through the layer normalization layer
            attention_output_1 + attention_output_2) # adding the attention output 1 and attention output 2
        proj_output = self.dense_proj(attention_output_2) # getting the projection output by passing the attention output 2 through the dense projection layer
        return self.layernorm_3(attention_output_2 + proj_output) # returning the layer normalization output by adding the attention output 2 and projection output and passing it through the layer normalization layer

#### Putting it all together: A Transformer for machine translation

**PositionalEmbedding layer**

In [None]:
class PositionalEmbedding(layers.Layer): # defining the positional embedding layer
    def __init__(self, sequence_length, input_dim, output_dim, **kwargs): # defining the initialization function with the sequence length, input dimension, output dimension and keyword arguments
        super().__init__(**kwargs) # initializing the layer
        self.token_embeddings = layers.Embedding( # defining the token embeddings layer
            input_dim=input_dim, output_dim=output_dim) # setting the input dimension and output dimension
        self.position_embeddings = layers.Embedding( # defining the position embeddings layer
            input_dim=sequence_length, output_dim=output_dim) # setting the input dimension and output dimension
        self.sequence_length = sequence_length # setting the sequence length
        self.input_dim = input_dim # setting the input dimension
        self.output_dim = output_dim # setting the output dimension

    def call(self, inputs): # defining the call function with the inputs
        length = tf.shape(inputs)[-1] # getting the length of the inputs
        positions = tf.range(start=0, limit=length, delta=1) # getting the range of the positions
        embedded_tokens = self.token_embeddings(inputs) # getting the embedded tokens
        embedded_positions = self.position_embeddings(positions) # getting the embedded positions
        return embedded_tokens + embedded_positions # returning the embedded tokens and embedded positions

    def compute_mask(self, inputs, mask=None): # defining the compute mask function with the inputs and mask
        return tf.math.not_equal(inputs, 0) # returning the not equal of the inputs and 0

    def get_config(self): # defining the get config function
        config = super(PositionalEmbedding, self).get_config() # getting the configuration
        config.update({ # updating the configuration
            "output_dim": self.output_dim, # setting the output dimension
            "sequence_length": self.sequence_length, # setting the sequence length
            "input_dim": self.input_dim, # setting the input dimension
        })
        return config # returning the configuration

**End-to-end Transformer**

In [None]:
embed_dim = 256 # defining the embedding dimension
dense_dim = 2048 # defining the dense dimension
num_heads = 8 # defining the number of heads

encoder_inputs = keras.Input(shape=(None,), dtype="int64", name="english") # defining the encoder inputs with the input shape, data type and name "english"
x = PositionalEmbedding(sequence_length, vocab_size, embed_dim)(encoder_inputs) # defining the positional embedding layer with the sequence length, vocabulary size and embedding dimension
encoder_outputs = TransformerEncoder(embed_dim, dense_dim, num_heads)(x) # defining the transformer encoder with the embedding dimension, dense dimension and number of heads

decoder_inputs = keras.Input(shape=(None,), dtype="int64", name="spanish") # defining the decoder inputs with the input shape, data type and name "spanish"
x = PositionalEmbedding(sequence_length, vocab_size, embed_dim)(decoder_inputs) # defining the positional embedding layer with the sequence length, vocabulary size and embedding dimension
x = TransformerDecoder(embed_dim, dense_dim, num_heads)(x, encoder_outputs) # defining the transformer decoder with the embedding dimension, dense dimension, number of heads and encoder outputs
x = layers.Dropout(0.5)(x) # defining the dropout layer with the rate set to 0.5
decoder_outputs = layers.Dense(vocab_size, activation="softmax")(x) # defining the dense layer with the vocabulary size and activation set to softmax
transformer = keras.Model([encoder_inputs, decoder_inputs], decoder_outputs) # defining the model with the encoder inputs, decoder inputs and decoder outputs

**Training the sequence-to-sequence Transformer**

In [None]:
transformer.compile( # compiling the model
    optimizer="rmsprop", # setting the optimizer to rmsprop
    loss="sparse_categorical_crossentropy", # setting the loss to sparse categorical crossentropy
    metrics=["accuracy"]) # setting the metrics to accuracy
transformer.fit(train_ds, epochs=30, validation_data=val_ds) # fitting the model with the training and validation datasets and 30 epochs (this is done to train the model)

**Translating new sentences with our Transformer model**

In [None]:
import numpy as np # importing the numpy module
spa_vocab = target_vectorization.get_vocabulary() # getting the spanish vocabulary
spa_index_lookup = dict(zip(range(len(spa_vocab)), spa_vocab)) # creating a dictionary with the index and vocabulary
max_decoded_sentence_length = 20 # defining the maximum decoded sentence length

def decode_sequence(input_sentence): # defining the decode sequence function
    tokenized_input_sentence = source_vectorization([input_sentence]) # vectorizing the input sentence
    decoded_sentence = "[start]" # defining the decoded sentence with the start token
    for i in range(max_decoded_sentence_length): # iterating through the maximum decoded sentence length
        tokenized_target_sentence = target_vectorization( # vectorizing the target sentence
            [decoded_sentence])[:, :-1] # getting the target sentence
        predictions = transformer( # getting the predictions
            [tokenized_input_sentence, tokenized_target_sentence]) # getting the predictions
        sampled_token_index = np.argmax(predictions[0, i, :]) # sampling the token index
        sampled_token = spa_index_lookup[sampled_token_index] # sampling the token
        decoded_sentence += " " + sampled_token # adding the sampled token to the decoded sentence
        if sampled_token == "[end]": # if the sampled token is the end token
            break # break the loop
    return decoded_sentence # return the decoded sentence

test_eng_texts = [pair[0] for pair in test_pairs] # getting the english texts from the test pairs
for _ in range(20): # iterating 20 times
    input_sentence = random.choice(test_eng_texts) # choosing a random english text
    print("-") # printing a dash
    print(input_sentence) # printing the input sentence
    print(decode_sequence(input_sentence)) # printing the decoded sequence

## Summary