<a href="https://colab.research.google.com/github/CelikAbdullah/deep-learning-notebooks/blob/main/Natural%20Language%20Processing%20(NLP)/Machine%20Translation/Machine%20Translation%20with%20a%20Transformer.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Libraries

In [None]:
import tensorflow as tf
import string
import re
from tensorflow import keras
import random
import numpy as np

We are going to implement a sequence-to-sequence modeling on a machine translation task.

# Loading the dataset

First, we have to download an English-to-Spanish translation dataset from the following download link:

In [None]:
!wget http://storage.googleapis.com/download.tensorflow.org/data/spa-eng.zip

--2023-09-15 15:59:03--  http://storage.googleapis.com/download.tensorflow.org/data/spa-eng.zip
Resolving storage.googleapis.com (storage.googleapis.com)... 172.217.219.207, 209.85.146.207, 209.85.147.207, ...
Connecting to storage.googleapis.com (storage.googleapis.com)|172.217.219.207|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 2638744 (2.5M) [application/zip]
Saving to: ‘spa-eng.zip’


2023-09-15 15:59:03 (240 MB/s) - ‘spa-eng.zip’ saved [2638744/2638744]



To complete the download, we unzip the .zip file:

In [None]:
!unzip -q spa-eng.zip

# Parse the text file

The text file contains one example per line:

**[an English sentence] [tab character] [corresponding Spanish sentence]**

Let's parse the .txt file:

In [None]:
text_file = "spa-eng/spa.txt"
with open(text_file) as f:
  lines = f.read().split("\n")[:-1]

text_pairs = []

# iterate over the lines in the file
for line in lines:
  # each line contains an English phrase and its Spanish translation
  # a Tab separates them
  english, spanish = line.split("\t")
  # prepend [start] and append [end] to the Spanish sentence
  spanish = "[start] " + spanish + " [end]"
  text_pairs.append((english, spanish))

Let's print a random sentence to see how it looks like:

In [None]:
random_example = random.choice(text_pairs)
print(random_example)

('Perhaps you have misunderstood the aim of our project.', '[start] Puede que hayas entendido mal el objetivo de nuestro proyecto. [end]')


# Prepare the dataset

Let's shuffle the dataset and split it into a training, validation and test sets:

In [None]:
# shuffle
random.shuffle(text_pairs)
# calculate number of validation samples
num_val_samples = int(0.15 * len(text_pairs))
# calculate number of training samples
num_train_samples = len(text_pairs) - 2 * num_val_samples
# training set
train_pairs = text_pairs[:num_train_samples]
# validation set
val_pairs = text_pairs[num_train_samples:num_train_samples + num_val_samples]
# test set
test_pairs = text_pairs[num_train_samples + num_val_samples:]

## Vectorize the English and Spanish text pairs

We create two TextVectorization layers: one for English and one for Spanish.

For that, we preserve the [start] and [end] tokens that we inserted previously. Keep in mind that punctuation is different in each language. In the Spanish TextVectorization layer, if we are going to strip punctuation characters, we need to also strip the character "¿". Normally, we wouldn't do that but for the sake of simplicity, we'll do it here.

In [None]:
strip_chars = string.punctuation + "¿"
strip_chars = strip_chars.replace("[", "")
strip_chars = strip_chars.replace("]", "")

# a custom string standardization function
# for the Spanish TextVectorization layer:
# it preserves [ and ] but strips ¿ as well
# as other characters from strings.punctuation
def custom_standardization(input_string):
    lowercase = tf.strings.lower(input_string)
    return tf.strings.regex_replace(
        lowercase, f"[{re.escape(strip_chars)}]", "")

In [None]:
# to keep things simple, we'll only look at the
# top 15000 words in each language
vocab_size = 15000
# we'll also restrict sentences to 20 words
sequence_length = 20

# define the English TextVectorization layer
source_vectorization = keras.layers.TextVectorization(
    max_tokens=vocab_size,
    output_mode="int",
    output_sequence_length=sequence_length,
)
# define the Spanish TextVectorization layer
target_vectorization = keras.layers.TextVectorization(
    max_tokens=vocab_size,
    output_mode="int",
    # recall: that each Spanish sentence starts with the "[start]" token
    #         so, we need to offset the sentence by one step during training
    output_sequence_length=sequence_length + 1,
    standardize=custom_standardization,
)
train_english_texts = [pair[0] for pair in train_pairs]
train_spanish_texts = [pair[1] for pair in train_pairs]
# invoke adapt to learn the vocabulary of each language
source_vectorization.adapt(train_english_texts)
target_vectorization.adapt(train_spanish_texts)

## Turn data into a tf.data pipeline

We want to return a tuple (inputs, target) where inputs is a dict with two key, “encoder_inputs” (the English sentence) and “decoder_inputs” (the Spanish sentence), and target is the Spanish sentence offset by one step ahead:

In [None]:
batch_size = 64

def format_dataset(eng, spa):
  eng = source_vectorization(eng)
  spa = target_vectorization(spa)
  return ({
      "english": eng,
      # note: the input Spanish sentence doesn't include the last token
      #       to keep inputs and targets at the same length
      "spanish": spa[:, :-1],},
          # the target Spanish sentence is one step ahead. Both are still the same length(20 words)
          spa[:, 1:])

def make_dataset(pairs):
  eng_texts, spa_texts = zip(*pairs)
  eng_texts = list(eng_texts)
  spa_texts = list(spa_texts)
  dataset = tf.data.Dataset.from_tensor_slices((eng_texts, spa_texts))
  dataset = dataset.batch(batch_size)
  dataset = dataset.map(format_dataset, num_parallel_calls=4)

  # use in-memory caching to speed up the preprocessing
  return dataset.shuffle(2048).prefetch(16).cache()

# create the Datasets for training and validation
train_ds = make_dataset(train_pairs)
val_ds = make_dataset(val_pairs)

What our dataset outputs look like:

In [None]:
for inputs, targets in train_ds.take(1):
  print(f"inputs['english'].shape: {inputs['english'].shape}")
  print(f"inputs['spanish'].shape: {inputs['spanish'].shape}")
  print(f"targets.shape: {targets.shape}")

inputs['english'].shape: (64, 20)
inputs['spanish'].shape: (64, 20)
targets.shape: (64, 20)


# Create a Transformer

## Encoder

We implement the encoder part of the Transformer as a subclassed Layer.

In [None]:
class TransformerEncoder(keras.layers.Layer):
  def __init__(self, embed_dim, dense_dim, num_heads, **kwargs):
    super().__init__(**kwargs)
    # set the size of the input token vectors
    self.embed_dim = embed_dim
    # set the size of the inner dense layer
    self.dense_dim = dense_dim
    # set the number of attention heads
    self.num_heads = num_heads
    # create the multi-head self-attention layer
    self.attention = keras.layers.MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim)
    # create the Dense layer with ReLU action
    self.dense_proj = keras.Sequential([keras.layers.Dense(dense_dim, activation="relu"), keras.layers.Dense(embed_dim),])
    # create normalization layers
    self.layernorm_1 = keras.layers.LayerNormalization()
    self.layernorm_2 = keras.layers.LayerNormalization()

  def call(self, inputs, mask=None):
    # expand the rank of the mask generated by the Embedding layer
    if mask is not None:
      mask = mask[:, tf.newaxis, :]
    attention_output = self.attention(inputs, inputs, attention_mask=mask)
    proj_input = self.layernorm_1(inputs + attention_output)
    proj_output = self.dense_proj(proj_input)

    return self.layernorm_2(proj_input + proj_output)

  # for serialization; so that we can save the model
  def get_config(self):
    config = super().get_config()
    config.update({
        "embed_dim": self.embed_dim,
        "num_heads": self.num_heads,
        "dense_dim": self.dense_dim,
        })
    return config

## Decoder

In [None]:
class TransformerDecoder(keras.layers.Layer):
  def __init__(self, embed_dim, dense_dim, num_heads, **kwargs):
    super().__init__(**kwargs)
    self.embed_dim = embed_dim
    self.dense_dim = dense_dim
    self.num_heads = num_heads
    self.attention_1 = keras.layers.MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim)
    self.attention_2 = keras.layers.MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim)
    self.dense_proj = keras.Sequential([keras.layers.Dense(dense_dim, activation="relu"), keras.layers.Dense(embed_dim),])
    self.layernorm_1 = keras.layers.LayerNormalization()
    self.layernorm_2 = keras.layers.LayerNormalization()
    self.layernorm_3 = keras.layers.LayerNormalization()
    # ensure that the layer will propagate its input mask to its outputs
    self.supports_masking = True

  def get_config(self):
    config = super().get_config()
    config.update({
        "embed_dim": self.embed_dim,
        "num_heads": self.num_heads,
        "dense_dim": self.dense_dim,
    })

    return config

  def get_causal_attention_mask(self, inputs):
    input_shape = tf.shape(inputs)
    batch_size, sequence_length = input_shape[0], input_shape[1]
    i = tf.range(sequence_length)[:, tf.newaxis]
    j = tf.range(sequence_length)
    # generate matrix of shape (sequence_length, sequence_length) with 1s in one half and 0s in the other
    mask = tf.cast(i >= j, dtype="int32")
    # replicate it along the batch axis to get a matrix of shape (batch_size, sequence_length, sequence_length)
    mask = tf.reshape(mask, (1, input_shape[1], input_shape[1]))
    mult = tf.concat( [tf.expand_dims(batch_size, -1), tf.constant([1, 1], dtype=tf.int32)], axis=0)
    return tf.tile(mask, mult)

  def call(self, inputs, encoder_outputs, mask=None):
    # get the causal mask
    causal_mask = self.get_causal_attention_mask(inputs)
    # prepare the input mask which describes padding locations in the target sequence
    if mask is not None:
      padding_mask = tf.cast(mask[:, tf.newaxis, :], dtype="int32")
      # merge the two masks together
      padding_mask = tf.minimum(padding_mask, causal_mask)
    else:
      padding_mask = mask

    # pass causal mask to the 1st attention layer which applies self-attention over the target sequence
    attention_output_1 = self.attention_1( query=inputs, value=inputs, key=inputs, attention_mask=causal_mask)
    attention_output_1 = self.layernorm_1(inputs + attention_output_1)
    # pass the combined mask to the second attention layer which relates the source sequence to the target sequence
    attention_output_2 = self.attention_2(query=attention_output_1, value=encoder_outputs,key=encoder_outputs, attention_mask=padding_mask,)
    attention_output_2 = self.layernorm_2(attention_output_1 + attention_output_2)
    proj_output = self.dense_proj(attention_output_2)

    return self.layernorm_3(attention_output_2 + proj_output)

## Create a Positional Embedding layer

We implement positional embedding as a subclassed layer.

In [None]:
class PositionalEmbedding(keras.layers.Layer):
  # note: we have to know the sequence_length in advance
  def __init__(self, sequence_length, input_dim, output_dim, **kwargs):
    super().__init__(**kwargs)
    # Embedding layer for the token indices
    self.token_embeddings = keras.layers.Embedding(input_dim=input_dim, output_dim=output_dim)
    # Embedding layer for the token positions
    self.position_embeddings = keras.layers.Embedding(input_dim=sequence_length, output_dim=output_dim)
    self.sequence_length = sequence_length
    self.input_dim = input_dim
    self.output_dim = output_dim

  def call(self, inputs):
    length = tf.shape(inputs)[-1]
    positions = tf.range(start=0, limit=length, delta=1)
    embedded_tokens = self.token_embeddings(inputs)
    embedded_positions = self.position_embeddings(positions)
    # add both embedding vectors together
    return embedded_tokens + embedded_positions

  # like the Embedding layer, this layer should be able to generate a mask
  # so we can ignore padding 0s in the inputs.
  def compute_mask(self, inputs, mask=None):
    return tf.math.not_equal(inputs, 0)

  # for serialization; so that we can save the model
  def get_config(self):
    config = super(PositionalEmbedding, self).get_config()
    config.update({
        "output_dim": self.output_dim,
        "sequence_length": self.sequence_length,
        "input_dim": self.input_dim,
        })
    return config

## Putting all together

In [None]:
def build_transformer(embed_dim=256, dense_dim=2048, num_heads=8):
  encoder_inputs = keras.Input(shape=(None,), dtype="int64", name="english")
  x = PositionalEmbedding(sequence_length, vocab_size, embed_dim)(encoder_inputs)
  # encode the source sentence
  encoder_outputs = TransformerEncoder(embed_dim, dense_dim, num_heads)(x)

  decoder_inputs = keras.Input(shape=(None,), dtype="int64", name="spanish")
  x = PositionalEmbedding(sequence_length, vocab_size, embed_dim)(decoder_inputs)
  # encode the target sequence and combine ith with the encoded source sentence
  x = TransformerDecoder(embed_dim, dense_dim, num_heads)(x, encoder_outputs)
  x = keras.layers.Dropout(0.5)(x)
  # predict a word for each output position
  decoder_outputs = keras.layers.Dense(vocab_size, activation="softmax")(x)

  return keras.Model([encoder_inputs, decoder_inputs], decoder_outputs)

In [None]:
transformer = build_transformer()

## Compile and Train the Transformer

In [None]:
transformer.compile(optimizer="rmsprop",loss="sparse_categorical_crossentropy",metrics=["accuracy"])
transformer.fit(train_ds, epochs=30, validation_data=val_ds)

Epoch 1/30
Epoch 2/30
Epoch 3/30
Epoch 4/30
Epoch 5/30
Epoch 6/30
Epoch 7/30
Epoch 8/30
Epoch 9/30
Epoch 10/30
Epoch 11/30
Epoch 12/30
Epoch 13/30
Epoch 14/30
Epoch 15/30
Epoch 16/30
Epoch 17/30
Epoch 18/30
Epoch 19/30
Epoch 20/30
Epoch 21/30
Epoch 22/30
Epoch 23/30
Epoch 24/30
Epoch 25/30
Epoch 26/30
Epoch 27/30
Epoch 28/30
Epoch 29/30
Epoch 30/30


<keras.src.callbacks.History at 0x7b4a0044e620>

# Testing our Transformer

In [None]:
spa_vocab = target_vectorization.get_vocabulary()
spa_index_lookup = dict(zip(range(len(spa_vocab)), spa_vocab))
max_decoded_sentence_length = 20

def decode_sequence(input_sentence):
  tokenized_input_sentence = source_vectorization([input_sentence])
  decoded_sentence = "[start]"
  for i in range(max_decoded_sentence_length):
    tokenized_target_sentence = target_vectorization([decoded_sentence])[:, :-1]
    predictions = transformer([tokenized_input_sentence, tokenized_target_sentence])
    sampled_token_index = np.argmax(predictions[0, i, :])
    sampled_token = spa_index_lookup[sampled_token_index]
    decoded_sentence += " " + sampled_token
    if sampled_token == "[end]":
      break
  return decoded_sentence

test_eng_texts = [pair[0] for pair in test_pairs]
for _ in range(20):
  input_sentence = random.choice(test_eng_texts)
  print("-")
  print(input_sentence)
  print(decode_sequence(input_sentence))

-
What you don't want to do is make Tom angry.
[start] qué no quieres hacer enfadar a tom [end]
-
God created the world in six days.
[start] dios no he [UNK] en seis días [end]
-
Rosa Parks refused to give up her seat for a white passenger.
[start] [UNK] cada [UNK] que le dio un libro a un cara blanco en blanco [end]
-
He proposed that we should play baseball.
[start] Él mató a jugar al al béisbol [end]
-
Why did you learn German?
[start] por qué [UNK] alemán [end]
-
It's not too early.
[start] no es demasiado pronto [end]
-
He is our teacher and a person we should respect.
[start] Él es nuestro profesor y deberíamos [UNK] a los desayuno [end]
-
He has two boys and a girl.
[start] tiene dos hijos ni una chica [end]
-
Keep them.
[start] [UNK] [end]
-
The radio was plugged in.
[start] la radio estaba [UNK] [end]
-
You learn something new every day.
[start] tú [UNK] algo nuevo coche [end]
-
Tom called Mary up yesterday.
[start] tom llamó a mary ayer [end]
-
To drive a car, you need a lice