<a href="https://colab.research.google.com/github/Codebmk/mt_writing_skills_aid/blob/main/nmt_model.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install seaborn
!pip install nltk
!pip install datasets
!pip install keras_nlp
!pip install rouge-score

import logging
import time
import nltk
import seaborn as sns
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
import datasets
import tensorflow as tf
import keras_nlp
import pathlib
import random
import json
from tensorflow import keras
from tensorflow_text.tools.wordpiece_vocab import (
    bert_vocab_from_dataset as bert_vocab,
)
from keras_nlp.metrics import Bleu
from IPython import display
display.clear_output()

In [2]:
# define our parameters/hyperparameters
BATCH_SIZE = 64
EPOCHS = 1
MAX_SEQUENCE_LENGTH = 128
ENG_VOCAB_SIZE = 15000
LUG_VOCAB_SIZE = 15000

EMBED_DIM = 256
INTERMEDIATE_DIM = 2048
NUM_HEADS = 8

In [3]:
import pandas as pd
from datasets import load_dataset

# Load the SALT dataset for English and Luganda
train_data = []
test_data = []
val_data = []

with open('./sample_data/salt-train-v1.1.jsonl', 'r') as file:
    for line in file:
        train_data.append(json.loads(line))
with open('./sample_data/salt-test-v1.1.jsonl', 'r') as file:
    for line in file:
        test_data.append(json.loads(line))
with open('./sample_data/salt-dev-v1.1.jsonl', 'r') as file:
    for line in file:
        val_data.append(json.loads(line))

train_set = []
test_set = []
val_set = []

# Access the train, test and validation split of the dataset
for item in train_data:
    eng_text = item['text']['eng']
    lug_text = item['text']['lug']
    train_set.append((str(eng_text), str(lug_text)))
for item in test_data:
    eng_text = item['text']['eng']
    lug_text = item['text']['lug']
    test_set.append((str(eng_text), str(lug_text)))
for item in val_data:
    eng_text = item['text']['eng']
    lug_text = item['text']['lug']
    val_set.append((str(eng_text), str(lug_text)))

# Extract the sentence pairs from the dataset split
train_pairs = [(data[0].lower(), data[1].lower()) for data in train_set]
test_pairs = [(data[0].lower(), data[1].lower()) for data in test_set]
val_pairs = [(data[0].lower(), data[1].lower()) for data in val_set]

print(f"{len(train_set)} training pairs")
print(f"{len(val_pairs)} validation pairs")
print(f"{len(test_pairs)} test pairs")

23947 training pairs
500 validation pairs
500 test pairs


In [4]:
import tensorflow as tf

# tokenize the data
def train_word_piece(text_samples, vocab_size, reserved_tokens):
    word_piece_ds = tf.data.Dataset.from_tensor_slices(text_samples)
    vocab = keras_nlp.tokenizers.compute_word_piece_vocabulary(
        word_piece_ds.batch(1000).prefetch(2),
        vocabulary_size=vocab_size,
        reserved_tokens=reserved_tokens,
    )
    return vocab

In [5]:
reserved_tokens = ["[PAD]", "[UNK]", "[START]", "[END]"]

eng_samples = [text_pair[0] for text_pair in train_pairs]
eng_vocab = train_word_piece(eng_samples, ENG_VOCAB_SIZE, reserved_tokens)

lug_samples = [text_pair[1] for text_pair in train_pairs]
lug_vocab = train_word_piece(lug_samples, LUG_VOCAB_SIZE, reserved_tokens)

In [6]:
print("English Tokens: ", eng_vocab[100:110])
print("Luganda Tokens: ", lug_vocab[100:110])

English Tokens:  ['by', 'been', 'can', 'do', 'one', 'all', 'school', 'how', 'our', 'an']
Luganda Tokens:  ['okukola', 'za', 'abakulembeze', 'bwa', 'wange', 'eggwanga', 'ka', 'waliwo', 'ttiimu', 'gye']


In [7]:
eng_tokenizer = keras_nlp.tokenizers.WordPieceTokenizer(
    vocabulary=eng_vocab, lowercase=False
)
lug_tokenizer = keras_nlp.tokenizers.WordPieceTokenizer(
    vocabulary=lug_vocab, lowercase=False
)

In [8]:
eng_input_ex = train_pairs[0][0]
eng_tokens_ex = eng_tokenizer.tokenize(eng_input_ex)
print("English sentence: ", eng_input_ex)
print("Tokens: ", eng_tokens_ex)
print(
    "Recovered text after detokenizing: ",
    eng_tokenizer.detokenize(eng_tokens_ex),
)

print()

lug_input_ex = train_pairs[0][1]
lug_tokens_ex = lug_tokenizer.tokenize(lug_input_ex)
print("Luganda sentence: ", lug_input_ex)
print("Tokens: ", lug_tokens_ex)
print(
    "Recovered text after detokenizing: ",
    lug_tokenizer.detokenize(lug_tokens_ex),
)

English sentence:  it was not a ghost refugee camp.
Tokens:  tf.Tensor([  91   69   76   24   30  497 2967  182  531 1453   10], shape=(11,), dtype=int32)
Recovered text after detokenizing:  tf.Tensor(b'it was not a ghost refugee camp .', shape=(), dtype=string)

Luganda sentence:  enkambi y'abanoonyiboobubudamu teyaliiwo mu bulimba.
Tokens:  tf.Tensor([2030   48    6  212 2959 2082   52   66  173    9], shape=(10,), dtype=int32)
Recovered text after detokenizing:  tf.Tensor(b"enkambi y ' abanoonyiboobubudamu teyaliiwo mu bulimba .", shape=(), dtype=string)


In [9]:
def preprocess_batch(eng, lug):
    batch_size = tf.shape(lug)[0]

    eng = eng_tokenizer(eng)
    lug = lug_tokenizer(lug)

    # Pad `eng` to `MAX_SEQUENCE_LENGTH`.
    eng_start_end_packer = keras_nlp.layers.StartEndPacker(
        sequence_length=MAX_SEQUENCE_LENGTH,
        pad_value=eng_tokenizer.token_to_id("[PAD]"),
    )
    eng = eng_start_end_packer(eng)

    # Add special tokens (`"[START]"` and `"[END]"`) to `lug` and pad it as well.
    lug_start_end_packer = keras_nlp.layers.StartEndPacker(
        sequence_length=MAX_SEQUENCE_LENGTH + 1,
        start_value=lug_tokenizer.token_to_id("[START]"),
        end_value=lug_tokenizer.token_to_id("[END]"),
        pad_value=lug_tokenizer.token_to_id("[PAD]"),
    )
    lug = lug_start_end_packer(lug)

    return (
        {
            "encoder_inputs": eng,
            "decoder_inputs": lug[:, :-1],
        },
        lug[:, 1:],
    )


def make_dataset(pairs):
    eng_texts, lug_texts = zip(*pairs)
    eng_texts = list(eng_texts)
    lug_texts = list(lug_texts)
    dataset = tf.data.Dataset.from_tensor_slices((eng_texts, lug_texts))
    dataset = dataset.batch(BATCH_SIZE)
    dataset = dataset.map(preprocess_batch, num_parallel_calls=tf.data.AUTOTUNE)
    return dataset.shuffle(2048).prefetch(16).cache()


train_ds = make_dataset(train_pairs)
val_ds = make_dataset(val_pairs)

In [10]:
for inputs, targets in train_ds.take(1):
    print(f'inputs["encoder_inputs"].shape: {inputs["encoder_inputs"].shape}')
    print(f'inputs["decoder_inputs"].shape: {inputs["decoder_inputs"].shape}')
    print(f"targets.shape: {targets.shape}")

inputs["encoder_inputs"].shape: (64, 128)
inputs["decoder_inputs"].shape: (64, 128)
targets.shape: (64, 128)


In [11]:
## Building the model

# Encoder
encoder_inputs = keras.Input(shape=(None,), dtype="int64", name="encoder_inputs")

x = keras_nlp.layers.TokenAndPositionEmbedding(
    vocabulary_size=ENG_VOCAB_SIZE,
    sequence_length=MAX_SEQUENCE_LENGTH,
    embedding_dim=EMBED_DIM,
    mask_zero=True,
)(encoder_inputs)

encoder_outputs = keras_nlp.layers.TransformerEncoder(
    intermediate_dim=INTERMEDIATE_DIM, num_heads=NUM_HEADS
)(inputs=x)
encoder = keras.Model(encoder_inputs, encoder_outputs)


# Decoder
decoder_inputs = keras.Input(shape=(None,), dtype="int64", name="decoder_inputs")
encoded_seq_inputs = keras.Input(shape=(None, EMBED_DIM), name="decoder_state_inputs")

x = keras_nlp.layers.TokenAndPositionEmbedding(
    vocabulary_size=LUG_VOCAB_SIZE,
    sequence_length=MAX_SEQUENCE_LENGTH,
    embedding_dim=EMBED_DIM,
    mask_zero=True,
)(decoder_inputs)

x = keras_nlp.layers.TransformerDecoder(
    intermediate_dim=INTERMEDIATE_DIM, num_heads=NUM_HEADS
)(decoder_sequence=x, encoder_sequence=encoded_seq_inputs)
x = keras.layers.Dropout(0.5)(x)
decoder_outputs = keras.layers.Dense(LUG_VOCAB_SIZE, activation="softmax")(x)
decoder = keras.Model(
    [
        decoder_inputs,
        encoded_seq_inputs,
    ],
    decoder_outputs,
)
decoder_outputs = decoder([decoder_inputs, encoder_outputs])

transformer = keras.Model(
    [encoder_inputs, decoder_inputs],
    decoder_outputs,
    name="transformer",
)

In [12]:
from tensorflow.keras.callbacks import EarlyStopping

# Define the EarlyStopping callback
early_stopping = EarlyStopping(monitor='val_loss', patience=3, restore_best_weights=True)

In [13]:
transformer.summary()
transformer.compile(
    "rmsprop", loss="sparse_categorical_crossentropy", metrics=["accuracy"]
)

# Train the model and store the history
history = transformer.fit(train_ds, epochs=EPOCHS, validation_data=val_ds, callbacks=[early_stopping])

Model: "transformer"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 encoder_inputs (InputLayer)    [(None, None)]       0           []                               
                                                                                                  
 token_and_position_embedding (  (None, None, 256)   3872768     ['encoder_inputs[0][0]']         
 TokenAndPositionEmbedding)                                                                       
                                                                                                  
 decoder_inputs (InputLayer)    [(None, None)]       0           []                               
                                                                                                  
 transformer_encoder (Transform  (None, None, 256)   1315072     ['token_and_position_em

In [14]:
transformer.save("./best_models/nmt_transformer1.h5")

In [15]:
transformer = tf.keras.models.load_model("./best_models/nmt_transformer1.h5")

def translate_sentence(input_sentence):
    batch_size = tf.shape(input_sentence)[0]

    # Tokenize the encoder input.
    encoder_input_tokens = eng_tokenizer(input_sentence).to_tensor(
        shape=(None, MAX_SEQUENCE_LENGTH)
    )

    # Define a function that outputs the next token's probability given the
    # input sequence.
    def next(prompt, cache, index):
        logits = transformer([encoder_input_tokens, prompt])[:, index - 1, :]
        # Ignore hidden states for now; only needed for contrastive search.
        hidden_states = None
        return logits, hidden_states, cache

    # Build a prompt of length 40 with a start token and padding tokens.
    length = 40
    start = tf.fill((batch_size, 1), lug_tokenizer.token_to_id("[START]"))
    pad = tf.fill((batch_size, length - 1), lug_tokenizer.token_to_id("[PAD]"))
    prompt = tf.concat((start, pad), axis=-1)

    generated_tokens = keras_nlp.samplers.GreedySampler()(
        next,
        prompt,
        end_token_id=lug_tokenizer.token_to_id("[END]"),
        index=1,  # Start sampling after start token.
    )
    generated_sentences = lug_tokenizer.detokenize(generated_tokens)
    return generated_sentences


test_eng_texts = [pair[0] for pair in test_pairs]
for i in range(5):
    input_sentence = random.choice(test_eng_texts)
    translated = translate_sentence(tf.constant([input_sentence]))
    translated = translated.numpy()[0].decode("utf-8")
    translated = (
        translated.replace("[PAD]", "")
        .replace("[START]", "")
        .replace("[END]", "")
        .strip()
    )
    print(f"** Example {i} **")
    print(input_sentence)
    print(translated)
    print()

** Example 0 **
every christian should contribute what he can afford to the construction of the church.
abantu baaaawuuuuutaase .

** Example 1 **
the impact is at their manufacturing and distribution points.
abantu baaaawuuuuutaase .

** Example 2 **
there is a lot of disagreement within the party.
abantu bangi baaawuuuuutaase .

** Example 3 **
the bishop cautioned the congregation against cheating in competitions.
abantu baaaaaawuuunuuuuuuuuuuuuuuun .

** Example 4 **
he came in third place with thirty votes.
abantu bangi baaawuuuuutamu .



In [16]:
rouge_1 = keras_nlp.metrics.RougeN(order=1)
rouge_2 = keras_nlp.metrics.RougeN(order=2)

for test_pair in test_pairs[:30]:
    input_sentence = test_pair[0]
    reference_sentence = test_pair[1]

    translated_sentence = translate_sentence(tf.constant([input_sentence]))
    translated_sentence = translated_sentence.numpy()[0].decode("utf-8")
    translated_sentence = (
        translated_sentence.replace("[PAD]", "")
        .replace("[START]", "")
        .replace("[END]", "")
        .strip()
    )

    rouge_1(reference_sentence, translated_sentence)
    rouge_2(reference_sentence, translated_sentence)

print("ROUGE-1 Score: ", rouge_1.result())
print("ROUGE-2 Score: ", rouge_2.result())

ROUGE-1 Score:  {'precision': <tf.Tensor: shape=(), dtype=float32, numpy=0.044444446>, 'recall': <tf.Tensor: shape=(), dtype=float32, numpy=0.012592593>, 'f1_score': <tf.Tensor: shape=(), dtype=float32, numpy=0.01902357>}
ROUGE-2 Score:  {'precision': <tf.Tensor: shape=(), dtype=float32, numpy=0.0>, 'recall': <tf.Tensor: shape=(), dtype=float32, numpy=0.0>, 'f1_score': <tf.Tensor: shape=(), dtype=float32, numpy=0.0>}


In [17]:
bleu = Bleu()

for test_pair in test_pairs[:30]:
    input_sentence = test_pair[0]
    reference_sentence = test_pair[1]

    translated_sentence = translate_sentence(tf.constant([input_sentence]))
    translated_sentence = translated_sentence.numpy()[0].decode("utf-8")
    translated_sentence = (
        translated_sentence.replace("[PAD]", "")
        .replace("[START]", "")
        .replace("[END]", "")
        .strip()
    )

    bleu.update_state([reference_sentence], [translated_sentence])

score = bleu.result()
print("BLEU Score:", score)

BLEU Score: tf.Tensor(0.0, shape=(), dtype=float32)
