*23 Sep 2024 : 21BAI1133 - Mukundh J*
#  Speech and Natural Language Processing Lab 9
- Implement a machine translation system.
- Use the following link for guidance: https://keras.io/examples/nlp/neural_machine_translation_with_transformer
- Use any dataset other than the one in the example.
- Don't worry about understanding the model components at the moment.
  - We will eventually cover them in class.
  - Alternatively, you can use any other sequence-to-sequence architecture of your choice, that is appropriate for translation.
- Focus on getting the preprocessing right, and getting some meaningful outputs.


In [None]:
import os

os.environ["KERAS_BACKEND"] = "tensorflow"

import pathlib
import random
import string
import re
import numpy as np

import tensorflow.data as tf_data
import tensorflow.strings as tf_strings

import keras
from keras import layers
from keras import ops
from keras.layers import TextVectorization

In [None]:
!wget https://www.manythings.org/anki/ita-eng.zip -O ita-eng.zip

--2024-09-29 16:03:20--  https://www.manythings.org/anki/ita-eng.zip
Resolving www.manythings.org (www.manythings.org)... 173.254.30.110
Connecting to www.manythings.org (www.manythings.org)|173.254.30.110|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 8326901 (7.9M) [application/zip]
Saving to: ‘ita-eng.zip’


2024-09-29 16:03:22 (4.55 MB/s) - ‘ita-eng.zip’ saved [8326901/8326901]



In [None]:
!unzip /content/ita-eng.zip

Archive:  /content/ita-eng.zip
  inflating: ita.txt                 
  inflating: _about.txt              


In [None]:
text_file = "/content/ita.txt"

#### Data Parsing

In [None]:
with open(text_file) as f:
    lines = f.read().split("\n")[:-1]

In [None]:
lines

['Hi.\tCiao!\tCC-BY 2.0 (France) Attribution: tatoeba.org #538123 (CM) & #607364 (Cero)',
 'Hi.\tCiao.\tCC-BY 2.0 (France) Attribution: tatoeba.org #538123 (CM) & #4522287 (Guybrush88)',
 'Run!\tCorri!\tCC-BY 2.0 (France) Attribution: tatoeba.org #906328 (papabear) & #906347 (Guybrush88)',
 'Run!\tCorra!\tCC-BY 2.0 (France) Attribution: tatoeba.org #906328 (papabear) & #906348 (Guybrush88)',
 'Run!\tCorrete!\tCC-BY 2.0 (France) Attribution: tatoeba.org #906328 (papabear) & #906350 (Guybrush88)',
 'Who?\tChi?\tCC-BY 2.0 (France) Attribution: tatoeba.org #2083030 (CK) & #2126402 (Guybrush88)',
 'Wow!\tWow!\tCC-BY 2.0 (France) Attribution: tatoeba.org #52027 (Zifre) & #1922050 (Guybrush88)',
 'Duck!\tAmore!\tCC-BY 2.0 (France) Attribution: tatoeba.org #280158 (CM) & #5502518 (Guybrush88)',
 'Duck!\tTesoro!\tCC-BY 2.0 (France) Attribution: tatoeba.org #280158 (CM) & #5502519 (Guybrush88)',
 'Duck!\tImmergiti!\tCC-BY 2.0 (France) Attribution: tatoeba.org #280158 (CM) & #5502520 (Guybrush88)

In [None]:
text_pairs = []
for line in lines:
    eng, ita, x= line.split("\t")
    ita = "[start] " + ita + " [end]"
    text_pairs.append((eng, ita))

In [None]:
for _ in range(5):
    print(random.choice(text_pairs))

("That's Tom's fork.", '[start] Quella è la forchetta di Tom. [end]')
("We carried out the captain's order to the letter.", "[start] Noi eseguimmo l'ordine del capitano alla lettera. [end]")
('Tom has been very friendly.', '[start] Tom è stato molto amichevole. [end]')
('Am I hurting you?', '[start] Vi sto facendo del male? [end]')
('There is nothing like a walk.', "[start] Non c'è nulla di meglio di una camminata. [end]")


In [None]:
random.shuffle(text_pairs)
num_val_samples = int(0.15 * len(text_pairs))
num_train_samples = len(text_pairs) - 2 * num_val_samples
train_pairs = text_pairs[:num_train_samples]
val_pairs = text_pairs[num_train_samples : num_train_samples + num_val_samples]
test_pairs = text_pairs[num_train_samples + num_val_samples :]

print(f"{len(text_pairs)} total pairs")
print(f"{len(train_pairs)} training pairs")
print(f"{len(val_pairs)} validation pairs")
print(f"{len(test_pairs)} test pairs")

377937 total pairs
264557 training pairs
56690 validation pairs
56690 test pairs


#### Vectorizing the data




In [None]:
strip_chars = string.punctuation + "¿"
strip_chars = strip_chars.replace("[", "")
strip_chars = strip_chars.replace("]", "")

vocab_size = 15000
sequence_length = 20
batch_size = 64

In [None]:
def custom_standardization(input_string):
    lowercase = tf_strings.lower(input_string)
    return tf_strings.regex_replace(lowercase, "[%s]" % re.escape(strip_chars), "")

In [None]:
eng_vectorization = TextVectorization(
    max_tokens=vocab_size,
    output_mode="int",
    output_sequence_length=sequence_length,
)
jpn_vectorization = TextVectorization(
    max_tokens=vocab_size,
    output_mode="int",
    output_sequence_length=sequence_length + 1,
    standardize=custom_standardization,
)
train_eng_texts = [pair[0] for pair in train_pairs]
train_jpn_texts = [pair[1] for pair in train_pairs]
eng_vectorization.adapt(train_eng_texts)
jpn_vectorization.adapt(train_jpn_texts)

In [None]:
def format_dataset(eng,jpn):
    eng = eng_vectorization(eng)
    jpn =jpn_vectorization(jpn)
    return (
        {
            "encoder_inputs": eng,
            "decoder_inputs":jpn[:, :-1],
        },
       jpn[:, 1:],
    )


In [None]:
def make_dataset(pairs):
    eng_texts, jpn_texts = zip(*pairs)
    eng_texts = list(eng_texts)
    jpn_texts = list(jpn_texts)
    dataset = tf_data.Dataset.from_tensor_slices((eng_texts, jpn_texts))
    dataset = dataset.batch(batch_size)
    dataset = dataset.map(format_dataset)
    return dataset.cache().shuffle(2048).prefetch(16)

In [None]:
train_ds = make_dataset(train_pairs)
val_ds = make_dataset(val_pairs)

In [None]:
for inputs, targets in train_ds.take(1):
    print(f'inputs["encoder_inputs"].shape: {inputs["encoder_inputs"].shape}')
    print(f'inputs["decoder_inputs"].shape: {inputs["decoder_inputs"].shape}')
    print(f"targets.shape: {targets.shape}")

inputs["encoder_inputs"].shape: (64, 20)
inputs["decoder_inputs"].shape: (64, 20)
targets.shape: (64, 20)


#### Model Building

In [None]:
import keras.ops as ops

In [None]:
class TransformerEncoder(layers.Layer):
    def __init__(self, embed_dim, dense_dim, num_heads, **kwargs):
        super().__init__(**kwargs)
        self.embed_dim = embed_dim
        self.dense_dim = dense_dim
        self.num_heads = num_heads
        self.attention = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim
        )
        self.dense_proj = keras.Sequential(
            [
                layers.Dense(dense_dim, activation="relu"),
                layers.Dense(embed_dim),
            ]
        )
        self.layernorm_1 = layers.LayerNormalization()
        self.layernorm_2 = layers.LayerNormalization()
        self.supports_masking = True

    def call(self, inputs, mask=None):
        if mask is not None:
            padding_mask = ops.cast(mask[:, None, :], dtype="int32")
        else:
            padding_mask = None

        attention_output = self.attention(
            query=inputs, value=inputs, key=inputs, attention_mask=padding_mask
        )
        proj_input = self.layernorm_1(inputs + attention_output)
        proj_output = self.dense_proj(proj_input)
        return self.layernorm_2(proj_input + proj_output)

    def get_config(self):
        config = super().get_config()
        config.update(
            {
                "embed_dim": self.embed_dim,
                "dense_dim": self.dense_dim,
                "num_heads": self.num_heads,
            }
        )
        return config

In [None]:
class PositionalEmbedding(layers.Layer):
    def __init__(self, sequence_length, vocab_size, embed_dim, **kwargs):
        super().__init__(**kwargs)
        self.token_embeddings = layers.Embedding(
            input_dim=vocab_size, output_dim=embed_dim
        )
        self.position_embeddings = layers.Embedding(
            input_dim=sequence_length, output_dim=embed_dim
        )
        self.sequence_length = sequence_length
        self.vocab_size = vocab_size
        self.embed_dim = embed_dim

    def call(self, inputs):
        length = ops.shape(inputs)[-1]
        positions = ops.arange(0, length, 1)
        embedded_tokens = self.token_embeddings(inputs)
        embedded_positions = self.position_embeddings(positions)
        return embedded_tokens + embedded_positions

    def compute_mask(self, inputs, mask=None):
        if mask is None:
            return None
        else:
            return ops.not_equal(inputs, 0)

    def get_config(self):
        config = super().get_config()
        config.update(
            {
                "sequence_length": self.sequence_length,
                "vocab_size": self.vocab_size,
                "embed_dim": self.embed_dim,
            }
        )
        return config

In [None]:
class TransformerDecoder(layers.Layer):
    def __init__(self, embed_dim, latent_dim, num_heads, **kwargs):
        super().__init__(**kwargs)
        self.embed_dim = embed_dim
        self.latent_dim = latent_dim
        self.num_heads = num_heads
        self.attention_1 = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim
        )
        self.attention_2 = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim
        )
        self.dense_proj = keras.Sequential(
            [
                layers.Dense(latent_dim, activation="relu"),
                layers.Dense(embed_dim),
            ]
        )
        self.layernorm_1 = layers.LayerNormalization()
        self.layernorm_2 = layers.LayerNormalization()
        self.layernorm_3 = layers.LayerNormalization()
        self.supports_masking = True

    def call(self, inputs, encoder_outputs, mask=None):
        causal_mask = self.get_causal_attention_mask(inputs)
        if mask is not None:
            padding_mask = ops.cast(mask[:, None, :], dtype="int32")
            padding_mask = ops.minimum(padding_mask, causal_mask)
        else:
            padding_mask = None

        attention_output_1 = self.attention_1(
            query=inputs, value=inputs, key=inputs, attention_mask=causal_mask
        )
        out_1 = self.layernorm_1(inputs + attention_output_1)

        attention_output_2 = self.attention_2(
            query=out_1,
            value=encoder_outputs,
            key=encoder_outputs,
            attention_mask=padding_mask,
        )
        out_2 = self.layernorm_2(out_1 + attention_output_2)

        proj_output = self.dense_proj(out_2)
        return self.layernorm_3(out_2 + proj_output)

    def get_causal_attention_mask(self, inputs):
        input_shape = ops.shape(inputs)
        batch_size, sequence_length = input_shape[0], input_shape[1]
        i = ops.arange(sequence_length)[:, None]
        j = ops.arange(sequence_length)
        mask = ops.cast(i >= j, dtype="int32")
        mask = ops.reshape(mask, (1, input_shape[1], input_shape[1]))
        mult = ops.concatenate(
            [ops.expand_dims(batch_size, -1), ops.convert_to_tensor([1, 1])],
            axis=0,
        )
        return ops.tile(mask, mult)

    def get_config(self):
        config = super().get_config()
        config.update(
            {
                "embed_dim": self.embed_dim,
                "latent_dim": self.latent_dim,
                "num_heads": self.num_heads,
            }
        )
        return config

In [None]:
embed_dim = 256
latent_dim = 2048
num_heads = 8

In [None]:
encoder_inputs = keras.Input(shape=(None,), dtype="int64", name="encoder_inputs")
x = PositionalEmbedding(sequence_length, vocab_size, embed_dim)(encoder_inputs)
encoder_outputs = TransformerEncoder(embed_dim, latent_dim, num_heads)(x)
encoder = keras.Model(encoder_inputs, encoder_outputs)

In [None]:
decoder_inputs = keras.Input(shape=(None,), dtype="int64", name="decoder_inputs")
encoded_seq_inputs = keras.Input(shape=(None, embed_dim), name="decoder_state_inputs")
x = PositionalEmbedding(sequence_length, vocab_size, embed_dim)(decoder_inputs)
x = TransformerDecoder(embed_dim, latent_dim, num_heads)(x, encoded_seq_inputs)
x = layers.Dropout(0.5)(x)
decoder_outputs = layers.Dense(vocab_size, activation="softmax")(x)
decoder = keras.Model([decoder_inputs, encoded_seq_inputs], decoder_outputs)

In [None]:
decoder_outputs = decoder([decoder_inputs, encoder_outputs])
transformer = keras.Model(
    [encoder_inputs, decoder_inputs], decoder_outputs, name="transformer"
)

#### Model Training

In [None]:
epochs = 30

In [None]:
transformer.summary()
transformer.compile(
    "rmsprop", loss="sparse_categorical_crossentropy", metrics=["accuracy"]
)
transformer.fit(train_ds, epochs=epochs, validation_data=val_ds)

Epoch 1/30
[1m4134/4134[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m230s[0m 52ms/step - accuracy: 0.7714 - loss: 1.5956 - val_accuracy: 0.9815 - val_loss: 0.1697
Epoch 2/30
[1m4134/4134[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m240s[0m 49ms/step - accuracy: 0.9803 - loss: 0.1658 - val_accuracy: 0.9973 - val_loss: 0.0380
Epoch 3/30
[1m4134/4134[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m200s[0m 48ms/step - accuracy: 0.9955 - loss: 0.0413 - val_accuracy: 0.9998 - val_loss: 0.0118
Epoch 4/30
[1m4134/4134[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m202s[0m 49ms/step - accuracy: 0.9971 - loss: 0.0343 - val_accuracy: 0.9999 - val_loss: 0.0040
Epoch 5/30
[1m4134/4134[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m201s[0m 48ms/step - accuracy: 0.9995 - loss: 0.0080 - val_accuracy: 0.9999 - val_loss: 0.0011
Epoch 6/30
[1m4134/4134[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m202s[0m 48ms/step - accuracy: 0.9993 - loss: 0.0089 - val_accuracy: 1.0000 - val_loss: 5.274

<keras.src.callbacks.history.History at 0x7f737c67f280>

#### Decoding Test Sentences

In [None]:
def decode_sequence(input_sentence):
    tokenized_input_sentence = eng_vectorization([input_sentence])
    decoded_sentence = "[start]"
    for i in range(max_decoded_sentence_length):
        tokenized_target_sentence = jpn_vectorization([decoded_sentence])[:, :-1]
        predictions = transformer([tokenized_input_sentence, tokenized_target_sentence])

        # ops.argmax(predictions[0, i, :]) is not a concrete value for jax here
        sampled_token_index = ops.convert_to_numpy(
            ops.argmax(predictions[0, i, :])
        ).item(0)
        sampled_token = jpn_index_lookup[sampled_token_index]
        decoded_sentence += " " + sampled_token

        if sampled_token == "[end]":
            break
    return decoded_sentence

In [None]:
jpn_vocab = jpn_vectorization.get_vocabulary()
jpn_index_lookup = dict(zip(range(len(jpn_vocab)), jpn_vocab))
max_decoded_sentence_length = 20

In [None]:
test_eng_texts = [pair[0] for pair in test_pairs]

In [None]:
test_eng_texts

["You're sneaky.",
 "I'm here now, Tom.",
 "It's something only Tom can do.",
 'What kept you interested?',
 "That's all for today.",
 'Your name is familiar to me.',
 'You may swim now.',
 'Do you think Tom is really sorry?',
 'My brother is looking for an apartment.',
 'Why does Tom want to see me?',
 'She picked flowers in the garden.',
 'My license was confiscated.',
 'Tom is the one who knows what to do.',
 'Any emotion, if it is sincere, is involuntary.',
 'I want to go and live in Australia.',
 "I'm missing a sock.",
 "You've already said that.",
 'We need to cross the river.',
 'My mother is busy as a bee every day.',
 'I appreciate what you did yesterday.',
 'I feel very happy.',
 'Was it fun?',
 'I think I can.',
 "Aren't you busy tomorrow afternoon?",
 "I don't have a choice.",
 'Are you a detective?',
 "They're absolutely certain.",
 'I almost never drink milk.',
 'You have my thanks.',
 "I regret that I didn't go there.",
 "Why doesn't he exercise with me anymore?",
 "It's

In [None]:
for _ in range(30):
    input_sentence = random.choice(test_eng_texts)
    translated = decode_sequence(input_sentence)
    print(input_sentence,"\t",translated)

The bird's wing was broken. 	 [start] basso cosse per riuscite [end]
You owe me a beer. 	 [start] piani ha a dovè [end]
As far as I know, he is kind. 	 [start] giusto al tom lui il è trovare             
The man who shot him was Sirhan Bishara Sirhan. 	 [start] notte vuole lascia vi per [UNK] [UNK] [UNK]            
Have you learned to drive a car? 	 [start] morto gentile non morto a [end]
Tom suggested I go with him to Boston. 	 [start] capace tom essere da vi non sua             
The company went bankrupt. 	 [start] legge hanno destro [end]
That was just the tip of the iceberg. 	 [start] per dei di contando ho di infastidisce             
I think I need a hearing aid. 	 [start] cosa tom piace a ketchup [end]
I thought that I dreamed it. 	 [start] veramente la tom aprite un               
He got lost while he was walking in the woods. 	 [start] sto settimana stare il per bicchiere sono di riescono           
Let's go now. 	 [start] essere questa                  
I insist upon that. 	

In [None]:
for encoder_input, decoder_output in train_ds.take(1):
  print("Encoder Input:", encoder_input)
  print("Decoder Output:", decoder_output)

Encoder Input: {'encoder_inputs': <tf.Tensor: shape=(64, 20), dtype=int64, numpy=
array([[   2,  100,   10, ...,    0,    0,    0],
       [  28,    9,    3, ...,    0,    0,    0],
       [   2,  490, 2646, ...,    0,    0,    0],
       ...,
       [  18,   93, 4515, ...,    0,    0,    0],
       [  78,    2,    5, ...,    0,    0,    0],
       [  21,    3,   15, ...,    0,    0,    0]])>, 'decoder_inputs': <tf.Tensor: shape=(64, 20), dtype=int64, numpy=
array([[  2,   4,  14, ...,   0,   0,   0],
       [  2,   9,  39, ...,   0,   0,   0],
       [  2,   4, 281, ...,   0,   0,   0],
       ...,
       [  2,   5,  15, ...,   0,   0,   0],
       [  2, 417,   7, ...,   0,   0,   0],
       [  2,  22,  18, ...,   0,   0,   0]])>}
Decoder Output: tf.Tensor(
[[   4   14   62 ...    0    0    0]
 [   9   39   89 ...    0    0    0]
 [   4  281 3452 ...    0    0    0]
 ...
 [   5   15   13 ...    0    0    0]
 [ 417    7    4 ...    0    0    0]
 [  22   18   57 ...    0    0    0]], sh

In [None]:
input_sentence

"Tom didn't want Mary to be alone."

In [None]:
translated

'[start] sta molto era non È [end]'