## SetUp

In [None]:
import tensorflow as tf
tf.test.gpu_device_name()

'/device:GPU:0'

In [None]:
!nvidia-smi -L

GPU 0: Tesla T4 (UUID: GPU-7e0b796c-84c5-00e2-d7e5-3bba01e3d086)


In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
!pip install tensorflow

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [None]:
import pathlib
import random
import string
import re
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.layers import TextVectorization

## Loading the Data

In [None]:
text_file = pd.read_csv('/content/drive/MyDrive/Document Translation/data.csv')

In [None]:
text_file = text_file.drop('Attribution', axis = 1)
text_file

Unnamed: 0,English,Spanish
0,Hi.,Ciao!
1,Hi.,Ciao.
2,Run!,Corri!
3,Run!,Corra!
4,Run!,Correte!
...,...,...
362856,I know that adding sentences only in your nati...,So che aggiungere frasi soltanto nella sua lin...
362857,I know that adding sentences only in your nati...,So che aggiungere frasi solamente nella sua li...
362858,I know that adding sentences only in your nati...,So che aggiungere frasi solamente nella sua li...
362859,Doubtless there exists in this world precisely...,Senza dubbio esiste in questo mondo proprio la...


In [None]:
eng_text = []
spa_text = []

for sent in text_file['English']:
  eng_text.append(sent)

for sent in text_file['Spanish']:
  spa_text.append(sent)

eng_text[:10], spa_text[:10]

(['Hi.',
  'Hi.',
  'Run!',
  'Run!',
  'Run!',
  'Who?',
  'Wow!',
  'Duck!',
  'Duck!',
  'Duck!'],
 ['Ciao!',
  'Ciao.',
  'Corri!',
  'Corra!',
  'Correte!',
  'Chi?',
  'Wow!',
  'Amore!',
  'Tesoro!',
  'Immergiti!'])

In [None]:
text_pairs = []

for eng, spa in zip(eng_text, spa_text):
  eng = '[start]' + eng + '[end]'
  text_pairs.append((spa, eng))

text_pairs[:10]

[('Ciao!', '[start]Hi.[end]'),
 ('Ciao.', '[start]Hi.[end]'),
 ('Corri!', '[start]Run![end]'),
 ('Corra!', '[start]Run![end]'),
 ('Correte!', '[start]Run![end]'),
 ('Chi?', '[start]Who?[end]'),
 ('Wow!', '[start]Wow![end]'),
 ('Amore!', '[start]Duck![end]'),
 ('Tesoro!', '[start]Duck![end]'),
 ('Immergiti!', '[start]Duck![end]')]

## Split into Training, Valiation and Testing Sets

In [None]:
random.shuffle(text_pairs)
text_pairs[:10]

[('Tom ha trascorso gli ultimi anni della sua vita a Boston.',
  '[start]Tom spent the last years of his life in Boston.[end]'),
 ('Aveva un bambino sano.', '[start]She had a healthy baby.[end]'),
 ('Tom parla con le sue piante?', '[start]Does Tom talk to his plants?[end]'),
 ('La scuola inizia alle nove.', '[start]School starts at nine.[end]'),
 ('Tom si stava preparando per il lavoro.',
  '[start]Tom was getting ready for work.[end]'),
 ('Perché vorresti aiutarmi?', '[start]Why would you want to help me?[end]'),
 ('Devo parlarvi.', '[start]I must speak to you.[end]'),
 ("Il re regnò sul suo popolo per quarant'anni.",
  '[start]The king reigned over his people for forty years.[end]'),
 ('È bello essere tornata', "[start]It's nice to be back.[end]"),
 ('Non sto più lavorando per lei.',
  "[start]I'm not working for you anymore.[end]")]

In [None]:
  num_train_samples = int(0.70* len(text_pairs))
  num_val_samples = int(0.15* len(text_pairs))

  num_train_samples, num_val_samples

(254002, 54429)

In [None]:
train_pairs = text_pairs[:num_train_samples]
val_pairs = text_pairs[num_train_samples: num_val_samples+num_train_samples]
test_pairs = text_pairs[num_val_samples+num_train_samples:]

In [None]:
len(train_pairs), len(val_pairs), len(test_pairs)

(254002, 54429, 54430)

## Vectorizing the Data

In [None]:
strip_char = string.punctuation + "¿"
strip_char = strip_char.replace("[", "")
strip_char = strip_char.replace("]", "")

In [None]:
vocab_size = 15000
sequence_length = 20
batch_size = 64

In [None]:
def custom_standardization(input_string):
  lowercase = tf.strings.lower(input_string)
  return tf.strings.regex_replace(lowercase, "[%s]" % re.escape(strip_char),"")

In [None]:
text_pairs[0]

('Tom ha trascorso gli ultimi anni della sua vita a Boston.',
 '[start]Tom spent the last years of his life in Boston.[end]')

In [None]:
custom_standardization(text_pairs[0])

<tf.Tensor: shape=(2,), dtype=string, numpy=
array([b'tom ha trascorso gli ultimi anni della sua vita a boston',
       b'[start]tom spent the last years of his life in boston[end]'],
      dtype=object)>

In [None]:
# Vectorization

spa_vectorization = TextVectorization(
    max_tokens = vocab_size,
    output_mode = 'int',
    output_sequence_length = sequence_length,
    standardize = custom_standardization
)

eng_vectorization = TextVectorization(
    max_tokens = vocab_size,
    output_mode = 'int',
    output_sequence_length = sequence_length + 1,
    standardize = custom_standardization
)

In [None]:
train_spa_text = [pair[0] for pair in train_pairs]
train_eng_text = [pair[1] for pair in train_pairs]

spa_vectorization.adapt(train_spa_text)
eng_vectorization.adapt(train_eng_text)

In [None]:
spa_vectorization(train_spa_text[1])

<tf.Tensor: shape=(20,), dtype=int64, numpy=
array([ 167,   11,  600, 3151,    0,    0,    0,    0,    0,    0,    0,
          0,    0,    0,    0,    0,    0,    0,    0,    0])>

In [None]:
eng_vectorization(train_eng_text[1])

<tf.Tensor: shape=(21,), dtype=int64, numpy=
array([  54,   86,    7, 3150, 2423,    0,    0,    0,    0,    0,    0,
          0,    0,    0,    0,    0,    0,    0,    0,    0,    0])>

In [None]:
def format_datasets(spa, eng):
  spa = spa_vectorization.call(spa)
  eng = eng_vectorization.call(eng)
  return ({"encoder_inputs": spa, "decoder_inputs": eng[:, :-1]}, eng[:, 1:])

format_datasets([train_spa_text[0]], [train_eng_text[0]])



({'encoder_inputs': <tf.Tensor: shape=(1, 20), dtype=int64, numpy=
  array([[   2,   12, 1042,   93, 3073,  186,   92,   63,  227,    5,   70,
             0,    0,    0,    0,    0,    0,    0,    0,    0]])>,
  'decoder_inputs': <tf.Tensor: shape=(1, 20), dtype=int64, numpy=
  array([[  5, 467,   6, 129, 390,  12,  43, 683,  10, 112,   0,   0,   0,
            0,   0,   0,   0,   0,   0,   0]])>},
 <tf.Tensor: shape=(1, 20), dtype=int64, numpy=
 array([[467,   6, 129, 390,  12,  43, 683,  10, 112,   0,   0,   0,   0,
           0,   0,   0,   0,   0,   0,   0]])>)

In [None]:
  s = spa_vectorization(train_spa_text)
  e = eng_vectorization(train_eng_text)

  s.shape, e.shape

(TensorShape([254002, 20]), TensorShape([254002, 21]))

In [None]:
def make_datasets(pairs):
  spa_text, eng_text = zip(*pairs)
  spa_texts = list(spa_text)
  eng_texts = list(eng_text)
  dataset = tf.data.Dataset.from_tensor_slices((spa_texts, eng_texts))
  dataset = dataset.batch(batch_size)
  dataset = dataset.map(format_datasets)
  return dataset.shuffle(2048).prefetch(16).cache()

In [None]:
train_ds = make_datasets(train_pairs)
val_ds = make_datasets(val_pairs)

In [None]:
for inputs, targets in train_ds.take(1):
    print(f'inputs["encoder_inputs"].shape: {inputs["encoder_inputs"].shape}')
    print(f'inputs["decoder_inputs"].shape: {inputs["decoder_inputs"].shape}')
    print(f"targets.shape: {targets.shape}")

inputs["encoder_inputs"].shape: (64, 20)
inputs["decoder_inputs"].shape: (64, 20)
targets.shape: (64, 20)


## Building the model

In [None]:
class TransformerEncoder(layers.Layer):
    def __init__(self, embed_dim, dense_dim, num_heads, **kwargs):
        super().__init__(**kwargs)
        self.embed_dim = embed_dim
        self.dense_dim = dense_dim
        self.num_heads = num_heads
        self.attention = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim
        )
        self.dense_proj = keras.Sequential(
            [layers.Dense(dense_dim, activation="relu"), layers.Dense(embed_dim),]
        )
        self.layernorm_1 = layers.LayerNormalization()
        self.layernorm_2 = layers.LayerNormalization()
        self.supports_masking = True

    def call(self, inputs, mask=None):
        if mask is not None:
            padding_mask = tf.cast(mask[:, tf.newaxis, :], dtype="int32")
        attention_output = self.attention(
            query=inputs, value=inputs, key=inputs, attention_mask=padding_mask
        )
        proj_input = self.layernorm_1(inputs + attention_output)
        proj_output = self.dense_proj(proj_input)
        return self.layernorm_2(proj_input + proj_output)

class PositionalEmbedding(layers.Layer):
    def __init__(self, sequence_length, vocab_size, embed_dim, **kwargs):
        super().__init__(**kwargs)
        self.token_embeddings = layers.Embedding(
            input_dim=vocab_size, output_dim=embed_dim
        )
        self.position_embeddings = layers.Embedding(
            input_dim=sequence_length, output_dim=embed_dim
        )
        self.sequence_length = sequence_length
        self.vocab_size = vocab_size
        self.embed_dim = embed_dim

    def call(self, inputs):
        length = tf.shape(inputs)[-1]
        positions = tf.range(start=0, limit=length, delta=1)
        embedded_tokens = self.token_embeddings(inputs)
        embedded_positions = self.position_embeddings(positions)
        return embedded_tokens + embedded_positions

    def compute_mask(self, inputs, mask=None):
        return tf.math.not_equal(inputs, 0)

class TransformerDecoder(layers.Layer):
    def __init__(self, embed_dim, latent_dim, num_heads, **kwargs):
        super().__init__(**kwargs)
        self.embed_dim = embed_dim
        self.latent_dim = latent_dim
        self.num_heads = num_heads
        self.attention_1 = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim
        )
        self.attention_2 = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim
        )
        self.dense_proj = keras.Sequential(
            [layers.Dense(latent_dim, activation="relu"), layers.Dense(embed_dim),]
        )
        self.layernorm_1 = layers.LayerNormalization()
        self.layernorm_2 = layers.LayerNormalization()
        self.layernorm_3 = layers.LayerNormalization()
        self.supports_masking = True

    def call(self, inputs, encoder_outputs, mask=None):
        causal_mask = self.get_causal_attention_mask(inputs)
        if mask is not None:
            padding_mask = tf.cast(mask[:, tf.newaxis, :], dtype="int32")
            padding_mask = tf.minimum(padding_mask, causal_mask)

        attention_output_1 = self.attention_1(
            query=inputs, value=inputs, key=inputs, attention_mask=causal_mask
        )
        out_1 = self.layernorm_1(inputs + attention_output_1)

        attention_output_2 = self.attention_2(
            query=out_1,
            value=encoder_outputs,
            key=encoder_outputs,
            attention_mask=padding_mask,
        )
        out_2 = self.layernorm_2(out_1 + attention_output_2)

        proj_output = self.dense_proj(out_2)
        return self.layernorm_3(out_2 + proj_output)

    def get_causal_attention_mask(self, inputs):
        input_shape = tf.shape(inputs)
        batch_size, sequence_length = input_shape[0], input_shape[1]
        i = tf.range(sequence_length)[:, tf.newaxis]
        j = tf.range(sequence_length)
        mask = tf.cast(i >= j, dtype="int32")
        mask = tf.reshape(mask, (1, input_shape[1], input_shape[1]))
        mult = tf.concat(
            [tf.expand_dims(batch_size, -1), tf.constant([1, 1], dtype=tf.int32)],
            axis=0,
        )
        return tf.tile(mask, mult)

In [None]:
embed_dim = 256
latent_dim = 2048
num_heads = 8

In [None]:
encoder_inputs = keras.Input(shape=(None,), dtype="int64", name="encoder_inputs")
x = PositionalEmbedding(sequence_length, vocab_size, embed_dim)(encoder_inputs)
encoder_outputs = TransformerEncoder(embed_dim, latent_dim, num_heads)(x)
encoder = keras.Model(encoder_inputs, encoder_outputs)

In [None]:
decoder_inputs = keras.Input(shape=(None,), dtype="int64", name="decoder_inputs")
encoded_seq_inputs = keras.Input(shape=(None, embed_dim), name="decoder_state_inputs")
x = PositionalEmbedding(sequence_length, vocab_size, embed_dim)(decoder_inputs)
x = TransformerDecoder(embed_dim, latent_dim, num_heads)(x, encoded_seq_inputs)
x = layers.Dropout(0.5)(x)
decoder_outputs = layers.Dense(vocab_size, activation="softmax")(x)
decoder = keras.Model([decoder_inputs, encoded_seq_inputs], decoder_outputs)

In [None]:
decoder_outputs = decoder([decoder_inputs, encoder_outputs])
transformer = keras.Model(
    [encoder_inputs, decoder_inputs], decoder_outputs, name="transformer"
)

In [None]:
decoder_outputs

<KerasTensor: shape=(None, None, 15000) dtype=float32 (created by layer 'model_1')>

## Training model

In [None]:
from tensorflow.keras.callbacks import EarlyStopping

early_stopping = EarlyStopping(patience=3, restore_best_weights=True)
epochs =30   # This should be at least 30 for convergence

transformer.summary()
transformer.compile(
    "rmsprop", loss="sparse_categorical_crossentropy", metrics=["accuracy"]
)
transformer.fit(train_ds, epochs=epochs, validation_data=val_ds, callbacks=[early_stopping])


Model: "transformer"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 encoder_inputs (InputLayer)    [(None, None)]       0           []                               
                                                                                                  
 positional_embedding (Position  (None, None, 256)   3845120     ['encoder_inputs[0][0]']         
 alEmbedding)                                                                                     
                                                                                                  
 decoder_inputs (InputLayer)    [(None, None)]       0           []                               
                                                                                                  
 transformer_encoder (Transform  (None, None, 256)   3155456     ['positional_embedding[

<keras.callbacks.History at 0x7fc1cdc85ed0>

In [None]:
# Save the model
transformer.save('/content/drive/MyDrive/Document Translation/saved_model')

# or you can specify a different file format such as TensorFlow SavedModel
# model.save('/content/drive/MyDrive/Document Translation/saved_model', save_format='tf')
# Save the model in HDF5 format
transformer.save('/content/drive/MyDrive/Document Translation/saved_model.h5')




In [None]:
eng_vocab = eng_vectorization.get_vocabulary()
eng_index_lookup = dict(zip(range(len(eng_vocab)), eng_vocab))
max_decoded_sentence_length = 20

In [None]:
def decode_sequence(input_sentence):
    tokenized_input_sentence = spa_vectorization([input_sentence])
    decoded_sentence = "[start]"
    for i in range(max_decoded_sentence_length):
        tokenized_target_sentence = eng_vectorization([decoded_sentence])[:, :-1]
        predictions = transformer([tokenized_input_sentence, tokenized_target_sentence])

        sampled_token_index = np.argmax(predictions[0, i, :])
        sampled_token = eng_index_lookup[sampled_token_index]
        decoded_sentence += " " + sampled_token

        if sampled_token == "[end]":
            break
    return decoded_sentence


In [None]:
test_spa_texts = [pair[0] for pair in test_pairs]
test_eng_texts = [pair[1] for pair in test_pairs]

# for i in range(5):
#     input_sentence = test_spa_texts[i]
#     print(f"input sentence: {input_sentence}")
#     translated = decode_sequence(input_sentence)
#     print(f"Original Sentence: {test_eng_texts[i]} , translated sentence: {translated}")

print("Spanish Sentence: " + test_spa_texts[1000])
print("English Original Sentence: " + test_eng_texts[1000])
print("English Translated Sentence: " + decode_sequence(test_spa_texts[1000]))

Spanish Sentence: Sembrava che Tom e Mary fossero esausti.
English Original Sentence: [start]It seemed Tom and Mary were exhausted.[end]
English Translated Sentence: [start] looked tom and mary was exhausted[end]              
