<a href="https://colab.research.google.com/github/YuanLongPeng/RosettaStone/blob/main/%E3%80%8CTransformer_Based%E3%80%8D.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

The source code was adapted from https://colab.research.google.com/github/keras-team/keras-io/blob/master/examples/nlp/ipynb/neural_machine_translation_with_transformer.ipynb

In [None]:
import pathlib
import random
import string
import re
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.layers import TextVectorization

Download training and testing dataset.

In [None]:
def loadDataset(language):
  text_file = keras.utils.get_file(
      fname=language,
      origin="https://github.com/multi30k/dataset/raw/master/data/task1/raw/"+language,
      extract=True,
      cache_dir="/content/",
  )

  import gzip

  g = gzip.GzipFile(mode="rb", fileobj=open('/content/datasets/'+language, 'rb'))
  open(r"/content/datasets/" + language + '.txt', "wb").write(g.read())

  text_file = r"/content/datasets/" + language + '.txt'

  with open(text_file) as f:
      lines = f.read().split("\n")[:-1]
  text_pairs = []
  for line in lines:
      spa = "[start] " + line + " [end]"
      text_pairs.append(spa)
  
  return text_pairs

In [None]:
train_cs = loadDataset('train.cs.gz')
train_de = loadDataset('train.de.gz')
train_en = loadDataset('train.en.gz')
train_fr = loadDataset('train.fr.gz')

Downloading data from https://github.com/multi30k/dataset/raw/master/data/task1/raw/train.cs.gz
Downloading data from https://github.com/multi30k/dataset/raw/master/data/task1/raw/train.de.gz
Downloading data from https://github.com/multi30k/dataset/raw/master/data/task1/raw/train.en.gz
Downloading data from https://github.com/multi30k/dataset/raw/master/data/task1/raw/train.fr.gz


In [None]:
val_cs = loadDataset('val.cs.gz')
val_de = loadDataset('val.de.gz')
val_en = loadDataset('val.en.gz')
val_fr = loadDataset('val.fr.gz')

Downloading data from https://github.com/multi30k/dataset/raw/master/data/task1/raw/val.cs.gz
Downloading data from https://github.com/multi30k/dataset/raw/master/data/task1/raw/val.de.gz
Downloading data from https://github.com/multi30k/dataset/raw/master/data/task1/raw/val.en.gz
Downloading data from https://github.com/multi30k/dataset/raw/master/data/task1/raw/val.fr.gz


Count distinguished words for embedding layer and count the maximum length of sentence.

In [None]:

len_train_cs = len(set(' '.join(train_cs).split(' '))) + 1
sequence_length_cs = max([len(pair.split(' ')) for pair in train_cs])
print(f'The number of embedding words for Czech: {len_train_cs}')
print(f'The maximum length of sentence for Czech: {sequence_length_cs}')

len_train_de = len(set(' '.join(train_de).split(' '))) + 1
sequence_length_de = max([len(pair.split(' ')) for pair in train_de])
print(f'The number of embedding words for Deutsch: {len_train_de}')
print(f'The maximum length of sentence for Deutsch: {sequence_length_de}')

len_train_en = len(set(' '.join(train_en).split(' '))) + 1
sequence_length_en = max([len(pair.split(' ')) for pair in train_en])
print(f'The number of embedding words for English: {len_train_en}')
print(f'The maximum length of sentence for English: {sequence_length_en}')

len_train_fr = len(set(' '.join(train_fr).split(' '))) + 1
sequence_length_fr = max([len(pair.split(' ')) for pair in train_fr])
print(f'The number of embedding words for French: {len_train_fr}')
print(f'The maximum length of sentence for French: {sequence_length_fr}')


The number of embedding words for Czech: 30177
The maximum length of sentence for Czech: 35
The number of embedding words for Deutsch: 24910
The maximum length of sentence for Deutsch: 41
The number of embedding words for English: 15460
The maximum length of sentence for English: 39
The number of embedding words for French: 17007
The maximum length of sentence for French: 45


Prepare TextVectorization layer for encoders and decoders

In [None]:
strip_chars = string.punctuation.replace("[", "").replace("]", "")

def custom_standardization(input_string):
    lowercase = tf.strings.lower(input_string)
    return tf.strings.regex_replace(lowercase, "[%s]" % re.escape(strip_chars), "")



sequence_length = max([sequence_length_cs, sequence_length_de, sequence_length_en, sequence_length_fr])
batch_size = 128

#TextVectorization layer for encoder of Czech
cs_vectorization = TextVectorization(
    max_tokens=len_train_cs, output_mode="int", output_sequence_length=sequence_length,
)

#TextVectorization layer for decoder of Czech
auto_cs_vectorization = TextVectorization(
    max_tokens=len_train_cs, output_mode="int", output_sequence_length=sequence_length+1, standardize=custom_standardization,
)

#TextVectorization layer for encoder of Deutsch
de_vectorization = TextVectorization(
    max_tokens=len_train_de, output_mode="int", output_sequence_length=sequence_length,
)

#TextVectorization layer for decoder of Deutsch
de_de_vectorization = TextVectorization(
    max_tokens=len_train_de, output_mode="int", output_sequence_length=sequence_length+1, standardize=custom_standardization,
)

#TextVectorization layer for encoder of English
en_vectorization = TextVectorization(
    max_tokens=len_train_en, output_mode="int", output_sequence_length=sequence_length+1, standardize=custom_standardization,
)

#TextVectorization layer for encoder of French
fr_vectorization = TextVectorization(
    max_tokens=len_train_fr, output_mode="int", output_sequence_length=sequence_length,
)

cs_vectorization.adapt(train_cs)
auto_cs_vectorization.adapt(train_cs)
de_vectorization.adapt(train_de)
de_de_vectorization.adapt(train_de)
en_vectorization.adapt(train_en)
fr_vectorization.adapt(train_fr)

Prepare data format for Czech to Deutsch and Czech.

In [None]:

def format_dataset(cs, de):
    auto_cs = auto_cs_vectorization(cs)
    input_cs = cs_vectorization(cs)
    de = de_de_vectorization(de)
    return ({"encoder_inputs": input_cs, "decoder_inputs": de[:, :-1], "autodecoder_inputs": auto_cs[:, :-1],}, {"de": de[:, 1:], "cs": auto_cs[:, 1:]})


def make_dataset(cs_texts, de_texts):
    cs_texts = list(cs_texts)
    de_texts = list(de_texts)
    dataset = tf.data.Dataset.from_tensor_slices((cs_texts, de_texts))
    dataset = dataset.batch(batch_size)
    dataset = dataset.map(format_dataset)
    return dataset.shuffle(2048).prefetch(16).cache()


train_ds = make_dataset(train_cs, train_de)

In [None]:
for inputs, targets in train_ds.take(1):
    print(f'inputs["Czech encoder_inputs"].shape: {inputs["encoder_inputs"].shape}')
    print(f'inputs["Deutsch decoder_inputs"].shape: {inputs["decoder_inputs"].shape}')
    print(f'inputs["Czech decoder_inputs"].shape: {inputs["autodecoder_inputs"].shape}')
    print(f"Deutsch targets.shape: {targets['de'].shape}")
    print(f"Czech targets.shape: {targets['cs'].shape}")

inputs["Czech encoder_inputs"].shape: (128, 45)
inputs["Deutsch decoder_inputs"].shape: (128, 45)
inputs["Czech decoder_inputs"].shape: (128, 45)
Deutsch targets.shape: (128, 45)
Czech targets.shape: (128, 45)


Define the encoder and decoder of Transformer model

In [None]:

class TransformerEncoder(layers.Layer):
    def __init__(self, embed_dim, dense_dim, num_heads, **kwargs):
        super(TransformerEncoder, self).__init__(**kwargs)
        self.embed_dim = embed_dim
        self.dense_dim = dense_dim
        self.num_heads = num_heads
        self.attention = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim
        )
        self.dense_proj = keras.Sequential(
            [layers.Dense(dense_dim, activation="relu"), layers.Dense(embed_dim),]
        )
        self.layernorm_1 = layers.LayerNormalization()
        self.layernorm_2 = layers.LayerNormalization()
        self.supports_masking = True

    def call(self, inputs, mask=None):
        if mask is not None:
            padding_mask = tf.cast(mask[:, tf.newaxis, tf.newaxis, :], dtype="int32")
        attention_output = self.attention(
            query=inputs, value=inputs, key=inputs, attention_mask=padding_mask
        )
        proj_input = self.layernorm_1(inputs + attention_output)
        proj_output = self.dense_proj(proj_input)
        return self.layernorm_2(proj_input + proj_output)


class PositionalEmbedding(layers.Layer):
    def __init__(self, sequence_length, vocab_size, embed_dim, **kwargs):
        super(PositionalEmbedding, self).__init__(**kwargs)
        self.token_embeddings = layers.Embedding(
            input_dim=vocab_size, output_dim=embed_dim
        )
        self.position_embeddings = layers.Embedding(
            input_dim=sequence_length, output_dim=embed_dim
        )
        self.sequence_length = sequence_length
        self.vocab_size = vocab_size
        self.embed_dim = embed_dim

    def call(self, inputs):
        length = tf.shape(inputs)[-1]
        positions = tf.range(start=0, limit=length, delta=1)
        embedded_tokens = self.token_embeddings(inputs)
        embedded_positions = self.position_embeddings(positions)
        return embedded_tokens + embedded_positions

    def compute_mask(self, inputs, mask=None):
        return tf.math.not_equal(inputs, 0)


class TransformerDecoder(layers.Layer):
    def __init__(self, embed_dim, latent_dim, num_heads, **kwargs):
        super(TransformerDecoder, self).__init__(**kwargs)
        self.embed_dim = embed_dim
        self.latent_dim = latent_dim
        self.num_heads = num_heads
        self.attention_1 = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim
        )
        self.attention_2 = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim
        )
        self.dense_proj = keras.Sequential(
            [layers.Dense(latent_dim, activation="relu"), layers.Dense(embed_dim),]
        )
        self.layernorm_1 = layers.LayerNormalization()
        self.layernorm_2 = layers.LayerNormalization()
        self.layernorm_3 = layers.LayerNormalization()
        self.supports_masking = True

    def call(self, inputs, encoder_outputs, mask=None):
        causal_mask = self.get_causal_attention_mask(inputs)
        if mask is not None:
            padding_mask = tf.cast(mask[:, tf.newaxis, :], dtype="int32")
            padding_mask = tf.minimum(padding_mask, causal_mask)

        attention_output_1 = self.attention_1(
            query=inputs, value=inputs, key=inputs, attention_mask=causal_mask
        )
        out_1 = self.layernorm_1(inputs + attention_output_1)

        attention_output_2 = self.attention_2(
            query=out_1,
            value=encoder_outputs,
            key=encoder_outputs,
            attention_mask=padding_mask,
        )
        out_2 = self.layernorm_2(out_1 + attention_output_2)

        proj_output = self.dense_proj(out_2)
        return self.layernorm_3(out_2 + proj_output)

    def get_causal_attention_mask(self, inputs):
        input_shape = tf.shape(inputs)
        batch_size, sequence_length = input_shape[0], input_shape[1]
        i = tf.range(sequence_length)[:, tf.newaxis]
        j = tf.range(sequence_length)
        mask = tf.cast(i >= j, dtype="int32")
        mask = tf.reshape(mask, (1, input_shape[1], input_shape[1]))
        mult = tf.concat(
            [tf.expand_dims(batch_size, -1), tf.constant([1, 1], dtype=tf.int32)],
            axis=0,
        )
        return tf.tile(mask, mult)


Create the encoder of Czech, decoder of Deutsch and decoder of Czech.

Define the model of Czech to Deutsch and Czech.

In [None]:

embed_dim = 256
latent_dim = 2048
num_heads = 8

#####################Create encoder of Czech#############################################################
encoder_inputs = keras.Input(shape=(None,), dtype="int64", name="encoder_inputs")
x = PositionalEmbedding(sequence_length, len_train_cs, embed_dim)(encoder_inputs)
encoder_outputs = TransformerEncoder(embed_dim, latent_dim, num_heads)(x)
encoder = keras.Model(encoder_inputs, encoder_outputs)


#####################Create decoder of Deutsch#############################################################
decoder_inputs = keras.Input(shape=(None,), dtype="int64", name="decoder_inputs")
encoded_seq_inputs = keras.Input(shape=(None, embed_dim), name="decoder_state_inputs")
x = PositionalEmbedding(sequence_length, len_train_de, embed_dim)(decoder_inputs)
x = TransformerDecoder(embed_dim, latent_dim, num_heads)(x, encoded_seq_inputs)
x = layers.Dropout(0.5)(x)
decoder_outputs = layers.Dense(len_train_de, activation="softmax")(x)
decoder = keras.Model([decoder_inputs, encoded_seq_inputs], decoder_outputs, name="de")


#####################Create decoder of Czech#############################################################
autodecoder_inputs = keras.Input(shape=(None,), dtype="int64", name="autodecoder_inputs")
autoencoded_seq_inputs = keras.Input(shape=(None, embed_dim), name="autodecoder_state_inputs")
x = PositionalEmbedding(sequence_length, len_train_cs, embed_dim)(autodecoder_inputs)
x = TransformerDecoder(embed_dim, latent_dim, num_heads)(x, autoencoded_seq_inputs)
x = layers.Dropout(0.5)(x)
autodecoder_outputs = layers.Dense(len_train_cs, activation="softmax")(x)
autodecoder = keras.Model([autodecoder_inputs, autoencoded_seq_inputs], autodecoder_outputs, name="cs")


#####################Define the model of Czech to Deutsch and Czech.#############################################################
decoder_outputs = decoder([decoder_inputs, encoder_outputs])
autodecoder_outputs = autodecoder([autodecoder_inputs, encoder_outputs])
transformer = keras.Model(
    [encoder_inputs, decoder_inputs, autodecoder_inputs], [decoder_outputs, autodecoder_outputs], name="transformer"
)

Train the model of Czech to Deutsch and Czech.

In [None]:
epochs = 100

transformer.summary()
transformer.compile(
    "adam", loss="sparse_categorical_crossentropy", metrics=["accuracy"]
)
transformer.fit(train_ds, epochs=epochs)

Model: "transformer"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 encoder_inputs (InputLayer)    [(None, None)]       0           []                               
                                                                                                  
 positional_embedding (Position  (None, None, 256)   7736832     ['encoder_inputs[0][0]']         
 alEmbedding)                                                                                     
                                                                                                  
 decoder_inputs (InputLayer)    [(None, None)]       0           []                               
                                                                                                  
 transformer_encoder (Transform  (None, None, 256)   3155456     ['positional_embedding[

<keras.callbacks.History at 0x7fa74d2ee390>

Decrease the learning rate to train the model of Czech to Deutsch and Czech.

In [None]:
epochs = 50  # This should be at least 30 for convergence

transformer.summary()
opt = keras.optimizers.Adam(learning_rate=0.0001)
transformer.compile(loss="sparse_categorical_crossentropy", metrics=["accuracy"], optimizer=opt)
transformer.fit(train_ds, epochs=epochs)#, validation_data=train_ds

Model: "transformer"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 encoder_inputs (InputLayer)    [(None, None)]       0           []                               
                                                                                                  
 positional_embedding (Position  (None, None, 256)   7736832     ['encoder_inputs[0][0]']         
 alEmbedding)                                                                                     
                                                                                                  
 decoder_inputs (InputLayer)    [(None, None)]       0           []                               
                                                                                                  
 transformer_encoder (Transform  (None, None, 256)   3155456     ['positional_embedding[

<keras.callbacks.History at 0x7fa74cee1890>

Prepare data format for Czech to English

Create the decoder of English.

Freeze encoder of Czech, decoder of Deutsch and decoder of Czech.

Define the translation model of Czech to English.

In [None]:

#Prepare data format for Czech to English
def format_dataset(cs, en):
    input_cs = cs_vectorization(cs)
    en = en_vectorization(en)
    return ({"encoder_inputs": input_cs, "en_decoder_inputs": en[:, :-1]}, {"en": en[:, 1:]})


def make_dataset(cs_texts, en_texts):
    cs_texts = list(cs_texts)
    en_texts = list(en_texts)
    dataset = tf.data.Dataset.from_tensor_slices((cs_texts, en_texts))
    dataset = dataset.batch(batch_size)
    dataset = dataset.map(format_dataset)
    return dataset.shuffle(2048).prefetch(16).cache()


#Create the decoder of English.
train_ds = make_dataset(train_cs, train_en)
en_decoder_inputs = keras.Input(shape=(None,), dtype="int64", name="en_decoder_inputs")
en_encoded_seq_inputs = keras.Input(shape=(None, embed_dim), name="en_decoder_state_inputs")
x = PositionalEmbedding(sequence_length, len_train_en, embed_dim)(en_decoder_inputs)
x = TransformerDecoder(embed_dim, latent_dim, num_heads)(x, en_encoded_seq_inputs)
x = layers.Dropout(0.5)(x)
en_decoder_outputs = layers.Dense(len_train_en, activation="softmax")(x)
en_decoder = keras.Model([en_decoder_inputs, en_encoded_seq_inputs], en_decoder_outputs, name="en")

en_decoder_outputs = en_decoder([en_decoder_inputs, encoder_outputs])


#Freeze encoder of Czech, decoder of Deutsch and decoder of Czech.
encoder.trainable = False
decoder.trainable = False
autodecoder.trainable = False


#Define the translation model of Czech to English.
transformer_2en = keras.Model(
    [encoder_inputs, en_decoder_inputs], [en_decoder_outputs], name="transformer"
)

transformer_2en.summary()
transformer_2en.compile(
    "adam", loss=["sparse_categorical_crossentropy"], metrics=["accuracy"]
)


Model: "transformer"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 encoder_inputs (InputLayer)    [(None, None)]       0           []                               
                                                                                                  
 positional_embedding (Position  (None, None, 256)   7736832     ['encoder_inputs[0][0]']         
 alEmbedding)                                                                                     
                                                                                                  
 en_decoder_inputs (InputLayer)  [(None, None)]      0           []                               
                                                                                                  
 transformer_encoder (Transform  (None, None, 256)   3155456     ['positional_embedding[

Train the translation model of Czech to English.

In [None]:
epochs = 100
transformer_2en.fit(train_ds, epochs=epochs)

Epoch 1/100
Epoch 2/100
Epoch 3/100
Epoch 4/100
Epoch 5/100
Epoch 6/100
Epoch 7/100
Epoch 8/100
Epoch 9/100
Epoch 10/100
Epoch 11/100
Epoch 12/100
Epoch 13/100
Epoch 14/100
Epoch 15/100
Epoch 16/100
Epoch 17/100
Epoch 18/100
Epoch 19/100
Epoch 20/100
Epoch 21/100
Epoch 22/100
Epoch 23/100
Epoch 24/100
Epoch 25/100
Epoch 26/100
Epoch 27/100
Epoch 28/100
Epoch 29/100
Epoch 30/100
Epoch 31/100
Epoch 32/100
Epoch 33/100
Epoch 34/100
Epoch 35/100
Epoch 36/100
Epoch 37/100
 14/227 [>.............................] - ETA: 4:07 - loss: 0.0780 - accuracy: 0.9262

Decrease learning rate to train the translation model of Czech to English.

In [None]:
epochs = 100

transformer_2en.summary()
opt = keras.optimizers.Adam(learning_rate=0.0001)
transformer_2en.compile(loss="sparse_categorical_crossentropy", metrics=["accuracy"], optimizer=opt)
transformer_2en.fit(train_ds, epochs=epochs)

Prepare data format for aligining French to Czech.

Create the encoder of French.


In [None]:
def format_dataset(cs, de, fr, zero):
    auto_cs = auto_cs_vectorization(cs)
    input_cs = cs_vectorization(cs)
    input_fr = fr_vectorization(fr)
    de = de_de_vectorization(de)
    return ({"encoder_inputs": input_cs, "frencoder_inputs": input_fr, "decoder_inputs": de[:, :-1], "autodecoder_inputs": auto_cs[:, :-1],}, {"de": de[:, 1:], "cs": auto_cs[:, 1:], "zero": zero})


def make_dataset(cs_texts, de_texts, fr_texts):
    cs_texts = list(cs_texts)
    de_texts = list(de_texts)
    fr_texts = list(fr_texts)
    zero = np.zeros((len(fr_texts), sequence_length, embed_dim))
    dataset = tf.data.Dataset.from_tensor_slices((cs_texts, de_texts, fr_texts, zero))
    dataset = dataset.batch(batch_size)
    dataset = dataset.map(format_dataset)
    return dataset.shuffle(2048).prefetch(16).cache()


train_ds_fr = make_dataset(train_cs, train_de, train_fr)


###############Create the encoder of French.##########################################
frencoder_inputs = keras.Input(shape=(None,), dtype="int64", name="frencoder_inputs")
x = PositionalEmbedding(sequence_length, len_train_fr, embed_dim)(frencoder_inputs)
frencoder_outputs = TransformerEncoder(embed_dim, latent_dim, num_heads)(x)
frencoder = keras.Model(frencoder_inputs, frencoder_outputs)

subtracted = keras.layers.Subtract(name="zero")([frencoder_outputs, encoder_outputs])
zeroLayer = keras.Model([frencoder_outputs, encoder_outputs], subtracted, name="zero")


In [None]:
for inputs, targets in train_ds_fr.take(1):
    print(f'inputs["Czech encoder_inputs"].shape: {inputs["encoder_inputs"].shape}')
    print(f'inputs["Deutsch decoder_inputs"].shape: {inputs["decoder_inputs"].shape}')
    print(f'inputs["Czech decoder_inputs"].shape: {inputs["autodecoder_inputs"].shape}')
    print(f'inputs["French encoder_inputs"].shape: {inputs["frencoder_inputs"].shape}')
    print(f"targets.shape: {targets['de'].shape}")
    print(f"targets.shape: {targets['cs'].shape}")
    print(f"targets.shape: {targets['zero'].shape}")

Define the loss function shown in Equation (1)

Define the model of aligning encoder of French to Czech.

Train the model of aligning encoder of French to Czech.

In [None]:
epochs = 100

import keras.backend as K


#Define the loss function shown in Equation (1)
def custom_loss(y_true, y_pred):

    loss_square = K.square(y_pred - y_true + K.epsilon())

    loss_sqrt = K.sqrt(K.abs(y_pred - y_true) + K.epsilon())
    
    return loss_square + loss_sqrt

decoder_outputs = decoder([decoder_inputs, frencoder_outputs])
autodecoder_outputs = autodecoder([autodecoder_inputs, frencoder_outputs])

encoder.trainable = False
decoder.trainable = False
autodecoder.trainable = False


#Define the model of aligning encoder of French to Czech.
transformer_stage_1 = keras.Model(
    [encoder_inputs, frencoder_inputs, decoder_inputs, autodecoder_inputs], [subtracted, decoder_outputs, autodecoder_outputs], name="transformer"
)

transformer_stage_1.summary()
transformer_stage_1.compile(
    "adam", loss=[custom_loss, "sparse_categorical_crossentropy", "sparse_categorical_crossentropy"], metrics=["accuracy"]
)


#Train the model of aligning encoder of French to Czech.
transformer_stage_1.fit(train_ds_fr, epochs=epochs)#, validation_data=train_ds

Decrease learning rate to train the model of aligning encoder of French to Czech.

In [None]:
epochs = 50

transformer_stage_1.summary()
opt = keras.optimizers.Adam(learning_rate=0.0001)
transformer_stage_1.compile(loss=[custom_loss, "sparse_categorical_crossentropy", "sparse_categorical_crossentropy"], metrics=["accuracy"], optimizer=opt)
transformer_stage_1.fit(train_ds_fr, epochs=epochs)


In [None]:
epochs = 50

transformer_stage_1.summary()
opt = keras.optimizers.Adam(learning_rate=0.00001)
transformer_stage_1.compile(loss=[custom_loss, "sparse_categorical_crossentropy", "sparse_categorical_crossentropy"], metrics=["accuracy"], optimizer=opt)
transformer_stage_1.fit(train_ds_fr, epochs=epochs)


Aligning French to Czech without mapping to Deutsch and Czech.

In [None]:
###############French
def format_dataset(cs, de, fr, zero):
    auto_cs = auto_cs_vectorization(cs)
    input_cs = cs_vectorization(cs)
    input_fr = fr_vectorization(fr)
    de = de_de_vectorization(de)
    return ({"encoder_inputs": input_cs, "frencoder_inputs": input_fr,}, {"zero": zero})


def make_dataset(cs_texts, de_texts, fr_texts):
    cs_texts = list(cs_texts)
    de_texts = list(de_texts)
    fr_texts = list(fr_texts)
    zero = np.zeros((len(fr_texts), sequence_length, embed_dim))
    dataset = tf.data.Dataset.from_tensor_slices((cs_texts, de_texts, fr_texts, zero))
    dataset = dataset.batch(batch_size)
    dataset = dataset.map(format_dataset)
    return dataset.shuffle(2048).prefetch(16).cache()


train_ds_fr = make_dataset(train_cs, train_de, train_fr)


epochs = 100

transformer_stage_1 = keras.Model(
    [encoder_inputs, frencoder_inputs], [subtracted], name="transformer"
)

transformer_stage_1.summary()
opt = keras.optimizers.Adam(learning_rate=0.00001)
transformer_stage_1.compile(loss=[custom_loss], metrics=["accuracy"], optimizer=opt)
transformer_stage_1.fit(train_ds_fr, epochs=epochs)#, validation_data=train_ds

Generate results of French to English.

In [None]:
en_vocab = en_vectorization.get_vocabulary()
en_index_lookup = dict(zip(range(len(en_vocab)), en_vocab))
max_decoded_sentence_length = sequence_length


decoder_outputs = en_decoder([en_decoder_inputs, frencoder_outputs])
transformer = keras.Model(
    [frencoder_inputs, en_decoder_inputs], [decoder_outputs], name="transformer"
)

def decode_sequence(input_sentence):
    tokenized_input_sentence = fr_vectorization([input_sentence])
    decoded_sentence = "[start]"
    for i in range(max_decoded_sentence_length):
        tokenized_target_sentence = en_vectorization([decoded_sentence])[:, :-1]
        predictions = transformer([tokenized_input_sentence, tokenized_target_sentence])

        sampled_token_index = np.argmax(predictions[0, i, :])
        sampled_token = en_index_lookup[sampled_token_index]
        decoded_sentence += " " + sampled_token

        if sampled_token == "[end]":
            break
    return decoded_sentence

alltext = []
for i in range(len(val_fr)):
    translated = decode_sequence(val_fr[i])
    alltext.append(translated)
    print(translated)


[start] a group of men are responding to an emergency [end]
[start] a man in a green tank top is putting up on a stool [end]
[start] boy wearing a helmet sitting on a bench [end]
[start] two men are shoveling cement on a skateboard while in a river [end]
[start] a man with a long vest is sitting in a small childs wagon [end]
[start] a woman in a red coat is walking with a male facepainted game and the exam [end]
[start] a brown dog is running a black and white dog [end]
[start] a young boy clings to the rope of the calf of the opposing of a softball game [end]
[start] a man in a dark suit is taking a nap in front of a building [end]
[start] a woman wearing a blue tank top is walking and talking to a pack of the determined sign [end]
[start] a young child stands in a rock climbing [end]
[start] a person is riding a piece of iron on the bench [end]
[start] three young kids stand around a white and light blue [end]
[start] a woman is sitting at a table with her hair asleep during a founta

Generate results of translation model of Czech to English.

In [None]:
en_vocab = en_vectorization.get_vocabulary()
en_index_lookup = dict(zip(range(len(en_vocab)), en_vocab))
max_decoded_sentence_length = sequence_length


decoder_outputs = en_decoder([en_decoder_inputs, encoder_outputs])
transformer = keras.Model(
    [encoder_inputs, en_decoder_inputs], [decoder_outputs], name="transformer"
)

def decode_sequence(input_sentence):
    tokenized_input_sentence = cs_vectorization([input_sentence])
    decoded_sentence = "[start]"
    for i in range(max_decoded_sentence_length):
        tokenized_target_sentence = en_vectorization([decoded_sentence])[:, :-1]
        predictions = transformer([tokenized_input_sentence, tokenized_target_sentence])

        sampled_token_index = np.argmax(predictions[0, i, :])
        sampled_token = en_index_lookup[sampled_token_index]
        decoded_sentence += " " + sampled_token

        if sampled_token == "[end]":
            break
    return decoded_sentence

alltext = []
for i in range(len(val_cs)):
    translated = decode_sequence(val_cs[i])
    alltext.append(translated)
    print(str(i)+translated)


0[start] a group of men are loading on a truck [end]
1[start] a man asleep on a couch in a green room [end]
2[start] a boy wearing a headset is sitting a woman on his shoulders [end]
3[start] two men are using a blue fishing net on a shore of an ocean [end]
4[start] a balding man is wearing a red life vest and sitting on a small rowboat [end]
5[start] a lady in a red coat holds a pair of a bag as they swing day traveling along a city lane on a diner [end]
6[start] a brown dog runs behind a black dog [end]
7[start] a young boy wearing a bandanna swings a baseball bat at a baseball [end]
8[start] a man in a crowded office is talking on a cellphone [end]
9[start] a smiling woman in a walkway of by a mountain bike [end]
10[start] a young child stands alone on to a hot rock [end]
11[start] a person on a ski slope in the middle of snowboarding [end]
12[start] three young children are standing around a metal barrel [end]
13[start] a woman sitting at a wedding flower market in a outdoor market

The following steps repeated the same way as French encoder to train a encoder of Deutsch.

In [None]:
###############Deutsch
de_encoder_inputs = keras.Input(shape=(None,), dtype="int64", name="de_encoder_inputs")
x = PositionalEmbedding(sequence_length, len_train_de, embed_dim)(de_encoder_inputs)
de_encoder_outputs = TransformerEncoder(embed_dim, latent_dim, num_heads)(x)
de_encoder = keras.Model(de_encoder_inputs, de_encoder_outputs)

subtracted = keras.layers.Subtract(name="zero")([de_encoder_outputs, encoder_outputs])
zeroLayer = keras.Model([de_encoder_outputs, encoder_outputs], subtracted, name="zero")

def format_dataset(cs, de, fr, zero):
    auto_cs = auto_cs_vectorization(cs)
    input_cs = cs_vectorization(cs)
    input_de = de_vectorization(de)
    de = de_de_vectorization(de)
    return ({"encoder_inputs": input_cs, "de_encoder_inputs": input_de, "decoder_inputs": de[:, :-1], "autodecoder_inputs": auto_cs[:, :-1],}, {"de": de[:, 1:], "cs": auto_cs[:, 1:], "zero": zero})


def make_dataset(cs_texts, de_texts, fr_texts):
    cs_texts = list(cs_texts)
    de_texts = list(de_texts)
    fr_texts = list(fr_texts)
    zero = np.zeros((len(fr_texts), sequence_length, embed_dim))
    dataset = tf.data.Dataset.from_tensor_slices((cs_texts, de_texts, fr_texts, zero))
    dataset = dataset.batch(batch_size)
    dataset = dataset.map(format_dataset)
    return dataset.shuffle(2048).prefetch(16).cache()


train_ds_de = make_dataset(train_cs, train_de, train_fr)

In [None]:
for inputs, targets in train_ds_de.take(1):
    print(f'inputs["encoder_inputs"].shape: {inputs["encoder_inputs"].shape}')
    print(f'inputs["de_encoder_inputs"].shape: {inputs["de_encoder_inputs"].shape}')
    print(f'inputs["decoder_inputs"].shape: {inputs["decoder_inputs"].shape}')
    print(f'inputs["autodecoder_inputs"].shape: {inputs["autodecoder_inputs"].shape}')
    print(f"targets.shape: {targets['de'].shape}")
    print(f"targets.shape: {targets['cs'].shape}")
    print(f"targets.shape: {targets['zero'].shape}")

In [None]:
epochs = 100  # This should be at least 30 for convergence

import keras.backend as K

def custom_loss(y_true, y_pred):
 
    loss_square = K.square(y_pred - y_true + K.epsilon())

    loss_sqrt = K.sqrt(K.abs(y_pred - y_true) + K.epsilon())
    
    return loss_square + loss_sqrt

decoder_outputs = decoder([decoder_inputs, de_encoder_outputs])
autodecoder_outputs = autodecoder([autodecoder_inputs, de_encoder_outputs])

encoder.trainable = False
decoder.trainable = False
autodecoder.trainable = False

transformer_stage_1 = keras.Model(
    [encoder_inputs, de_encoder_inputs, decoder_inputs, autodecoder_inputs], [subtracted, decoder_outputs, autodecoder_outputs], name="transformer"
)

transformer_stage_1.summary()
transformer_stage_1.compile(
    "adam", loss=[custom_loss, "sparse_categorical_crossentropy", "sparse_categorical_crossentropy"], metrics=["accuracy"]
)
transformer_stage_1.fit(train_ds_de, epochs=epochs)#, validation_data=train_ds

In [None]:
epochs = 50  # This should be at least 30 for convergence

transformer_stage_1.summary()
opt = keras.optimizers.Adam(learning_rate=0.0001)
transformer_stage_1.compile(loss=[custom_loss, "sparse_categorical_crossentropy", "sparse_categorical_crossentropy"], metrics=["accuracy"], optimizer=opt)
transformer_stage_1.fit(train_ds_de, epochs=epochs)#, validation_data=train_ds


In [None]:
epochs = 50  # This should be at least 30 for convergence

transformer_stage_1.summary()
opt = keras.optimizers.Adam(learning_rate=0.00001)
transformer_stage_1.compile(loss=[custom_loss, "sparse_categorical_crossentropy", "sparse_categorical_crossentropy"], metrics=["accuracy"], optimizer=opt)
transformer_stage_1.fit(train_ds_de, epochs=epochs)#, validation_data=train_ds


In [None]:
###############French
def format_dataset(cs, de, fr, zero):
    auto_cs = auto_cs_vectorization(cs)
    input_cs = cs_vectorization(cs)
    input_de = de_vectorization(de)
    de = de_de_vectorization(de)
    return ({"encoder_inputs": input_cs, "de_encoder_inputs": input_de,}, {"zero": zero})


def make_dataset(cs_texts, de_texts, fr_texts):
    cs_texts = list(cs_texts)
    de_texts = list(de_texts)
    fr_texts = list(fr_texts)
    zero = np.zeros((len(fr_texts), sequence_length, embed_dim))
    dataset = tf.data.Dataset.from_tensor_slices((cs_texts, de_texts, fr_texts, zero))
    dataset = dataset.batch(batch_size)
    dataset = dataset.map(format_dataset)
    return dataset.shuffle(2048).prefetch(16).cache()


train_ds_de = make_dataset(train_cs, train_de, train_fr)

de_encoder.load_weights('/content/drive/My Drive/de_encoder_clip.h5')

epochs = 100

transformer_stage_1 = keras.Model(
    [encoder_inputs, de_encoder_inputs], [subtracted], name="transformer"
)

transformer_stage_1.summary()
opt = keras.optimizers.Adam(learning_rate=0.00001)
transformer_stage_1.compile(loss=[custom_loss], metrics=["accuracy"], optimizer=opt)
transformer_stage_1.fit(train_ds_de, epochs=epochs)#, validation_data=train_ds

In [None]:
en_vocab = en_vectorization.get_vocabulary()
en_index_lookup = dict(zip(range(len(en_vocab)), en_vocab))
max_decoded_sentence_length = sequence_length


decoder_outputs = en_decoder([en_decoder_inputs, de_encoder_outputs])
transformer = keras.Model(
    [de_encoder_inputs, en_decoder_inputs], [decoder_outputs], name="transformer"
)

def decode_sequence(input_sentence):
    tokenized_input_sentence = de_vectorization([input_sentence])
    decoded_sentence = "[start]"
    for i in range(max_decoded_sentence_length):
        tokenized_target_sentence = en_vectorization([decoded_sentence])[:, :-1]
        predictions = transformer([tokenized_input_sentence, tokenized_target_sentence])

        sampled_token_index = np.argmax(predictions[0, i, :])
        sampled_token = en_index_lookup[sampled_token_index]
        decoded_sentence += " " + sampled_token

        if sampled_token == "[end]":
            break
    return decoded_sentence

alltext = []
for i in range(len(val_de)):
    #input_sentence = random.choice(train_fr)
    translated = decode_sequence(val_de[i])
    alltext.append(translated)
    print(str(i)+translated)


0[start] a group of adults are finishing a dance floor place [end]
1[start] a man is sleeping in a green tank top of a book [end]
2[start] a young boy in sandals is sitting down on a bench [end]
3[start] two men are building gingerbread house on a cement car [end]
4[start] a man with a shirt is tossed in the air by a rubber bowl a green shirt [end]
5[start] a woman in a red coat is seen from a distance that has a down while an outdoor pole with spectators are being held by [end]
6[start] a brown dog runs over the white and white dog is running [end]
7[start] a young boy with a red white and yellow shirt is getting ready to cut down a ball [end]
8[start] a man is walking in an art gallery [end]
9[start] a smiling woman with a mural on a tank top of an ornate coffin [end]
10[start] a little girl walks by a large trampoline [end]
11[start] a person is driving a snowmobile the air [end]
12[start] three little kids standing around a wheel barrel [end]
13[start] a woman sits at an outdoor fa