In [None]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import pickle
import random
import numpy as np

In [2]:
input_text = pickle.load(open('../input/translation/DS_5_train_input','rb'))
output_text = pickle.load(open('../input/translation/DS_5_train_output','rb'))

In [3]:
text_pairs = []
for input,output in zip(input_text,output_text):
    output_lang = "[start] " + output + "[end]"
    text_pairs.append((input, output_lang))

In [4]:
print(random.choice(text_pairs))

In [5]:
random.shuffle(text_pairs)
num_val_samples = 995
num_test_samples = 5
num_train_samples = len(text_pairs) - num_val_samples - num_test_samples
train_pairs = text_pairs[:num_train_samples]
val_pairs = text_pairs[num_train_samples:num_train_samples + num_val_samples]
test_pairs = text_pairs[num_train_samples + num_val_samples:]

In [None]:
print(f"{len(text_pairs)} total pairs")
print(f"{len(train_pairs)} training pairs")
print(f"{len(val_pairs)} validation pairs")
print(f"{len(test_pairs)} test pairs")

In [None]:
vocab_size = 50
sequence_length = 213

source_vectorization = layers.TextVectorization(
    max_tokens=vocab_size,
    output_mode="int",
    output_sequence_length=sequence_length
)
target_vectorization = layers.TextVectorization(
    max_tokens=vocab_size,
    output_mode="int",
    output_sequence_length=sequence_length + 1
)
train_input_texts = [pair[0] for pair in train_pairs]
train_output_texts = [pair[1] for pair in train_pairs]
source_vectorization.adapt(train_input_texts)
target_vectorization.adapt(train_output_texts)

In [None]:
batch_size = 64

def format_dataset(inp, out):
    inp = source_vectorization(inp)
    out = target_vectorization(out)
    return ({
        "input_lang": inp,
        "output_lang": out[:, :-1],
    }, out[:, 1:])

def make_dataset(pairs):
    inp_texts, out_texts = zip(*pairs)
    inp_texts = list(inp_texts)
    out_texts = list(out_texts)
    dataset = tf.data.Dataset.from_tensor_slices((inp_texts, out_texts))
    dataset = dataset.batch(batch_size)
    dataset = dataset.map(format_dataset, num_parallel_calls=4)
    return dataset.shuffle(2048).prefetch(16).cache()

train_ds = make_dataset(train_pairs)
val_ds = make_dataset(val_pairs)

In [None]:
for inputs, targets in train_ds.take(1):
    print(f"inputs['input_lang'].shape: {inputs['input_lang'].shape}")
    print(f"inputs['output_lang'].shape: {inputs['output_lang'].shape}")
    print(f"targets.shape: {targets.shape}")

In [None]:
class TransformerEncoder(layers.Layer):
    def __init__(self, embed_dim, dense_dim, num_heads, **kwargs):
        super().__init__(**kwargs)
        self.embed_dim = embed_dim
        self.dense_dim = dense_dim
        self.num_heads = num_heads
        self.attention = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim)
        self.dense_proj = keras.Sequential(
            [layers.Dense(dense_dim, activation="relu"),
             layers.Dense(embed_dim),]
        )
        self.layernorm_1 = layers.LayerNormalization()
        self.layernorm_2 = layers.LayerNormalization()

    def call(self, inputs, mask=None):
        if mask is not None:
            mask = mask[:, tf.newaxis, :]
        attention_output = self.attention(
            inputs, inputs, attention_mask=mask)
        proj_input = self.layernorm_1(inputs + attention_output)
        proj_output = self.dense_proj(proj_input)
        return self.layernorm_2(proj_input + proj_output)

    def get_config(self):
        config = super().get_config()
        config.update({
            "embed_dim": self.embed_dim,
            "num_heads": self.num_heads,
            "dense_dim": self.dense_dim,
        })
        return config

In [None]:
class TransformerDecoder(layers.Layer):
    def __init__(self, embed_dim, dense_dim, num_heads, **kwargs):
        super().__init__(**kwargs)
        self.embed_dim = embed_dim
        self.dense_dim = dense_dim
        self.num_heads = num_heads
        self.attention_1 = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim)
        self.attention_2 = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim)
        self.dense_proj = keras.Sequential(
            [layers.Dense(dense_dim, activation="relu"),
             layers.Dense(embed_dim),]
        )
        self.layernorm_1 = layers.LayerNormalization()
        self.layernorm_2 = layers.LayerNormalization()
        self.layernorm_3 = layers.LayerNormalization()
        self.supports_masking = True

    def get_config(self):
        config = super().get_config()
        config.update({
            "embed_dim": self.embed_dim,
            "num_heads": self.num_heads,
            "dense_dim": self.dense_dim,
        })
        return config

    def get_causal_attention_mask(self, inputs):
        input_shape = tf.shape(inputs)
        batch_size, sequence_length = input_shape[0], input_shape[1]
        i = tf.range(sequence_length)[:, tf.newaxis]
        j = tf.range(sequence_length)
        mask = tf.cast(i >= j, dtype="int32")
        mask = tf.reshape(mask, (1, input_shape[1], input_shape[1]))
        mult = tf.concat(
            [tf.expand_dims(batch_size, -1),
             tf.constant([1, 1], dtype=tf.int32)], axis=0)
        return tf.tile(mask, mult)

    def call(self, inputs, encoder_outputs, mask=None):
        causal_mask = self.get_causal_attention_mask(inputs)
        if mask is not None:
            padding_mask = tf.cast(
                mask[:, tf.newaxis, :], dtype="int32")
            padding_mask = tf.minimum(padding_mask, causal_mask)
        attention_output_1 = self.attention_1(
            query=inputs,
            value=inputs,
            key=inputs,
            attention_mask=causal_mask)
        attention_output_1 = self.layernorm_1(inputs + attention_output_1)
        attention_output_2 = self.attention_2(
            query=attention_output_1,
            value=encoder_outputs,
            key=encoder_outputs,
            attention_mask=padding_mask,
        )
        attention_output_2 = self.layernorm_2(
            attention_output_1 + attention_output_2)
        proj_output = self.dense_proj(attention_output_2)
        return self.layernorm_3(attention_output_2 + proj_output)

In [None]:
class PositionalEmbedding(layers.Layer):
    def __init__(self, sequence_length, input_dim, output_dim, **kwargs):
        super().__init__(**kwargs)
        self.token_embeddings = layers.Embedding(
            input_dim=input_dim, output_dim=output_dim)
        self.position_embeddings = layers.Embedding(
            input_dim=sequence_length, output_dim=output_dim)
        self.sequence_length = sequence_length
        self.input_dim = input_dim
        self.output_dim = output_dim

    def call(self, inputs):
        length = tf.shape(inputs)[-1]
        positions = tf.range(start=0, limit=length, delta=1)
        embedded_tokens = self.token_embeddings(inputs)
        embedded_positions = self.position_embeddings(positions)
        return embedded_tokens + embedded_positions

    def compute_mask(self, inputs, mask=None):
        return tf.math.not_equal(inputs, 0)

    def get_config(self):
        config = super(PositionalEmbedding, self).get_config()
        config.update({
            "output_dim": self.output_dim,
            "sequence_length": self.sequence_length,
            "input_dim": self.input_dim,
        })
        return config

In [None]:
embed_dim = 128
dense_dim = 512
num_heads = 8

encoder_inputs = keras.Input(shape=(None,), dtype="int64", name="input_lang")
x = PositionalEmbedding(sequence_length, vocab_size, embed_dim)(encoder_inputs)
encoder_outputs = TransformerEncoder(embed_dim, dense_dim, num_heads)(x)

decoder_inputs = keras.Input(shape=(None,), dtype="int64", name="output_lang")
x = PositionalEmbedding(sequence_length, vocab_size, embed_dim)(decoder_inputs)
x = TransformerDecoder(embed_dim, dense_dim, num_heads)(x, encoder_outputs)
x = layers.Dropout(0.5)(x)
decoder_outputs = layers.Dense(vocab_size, activation="softmax")(x)
transformer = keras.Model([encoder_inputs, decoder_inputs], decoder_outputs)

In [None]:
transformer.compile(
    optimizer="rmsprop",
    loss="sparse_categorical_crossentropy",
    # metrics=['accuracy', acc_pred])
    metrics=['accuracy'])

callbacks = [
    keras.callbacks.ModelCheckpoint(
        filepath="./translator_model.keras",
        save_best_only=True,
        monitor="val_loss",
        mode='min'),
    keras.callbacks.EarlyStopping(
        monitor='val_loss',
        patience=5,
        verbose=0,
        mode='min'
    )
]

transformer.fit(train_ds, epochs=50, validation_data=val_ds, callbacks = callbacks)

In [None]:
def acc_pred(y_true,y_pred):
    total_acc_words = 0
    total_num_words = 0
    for true,pred in zip(y_true,y_pred):
        true_words = true.split(" ")
        pred_words = pred.split(" ")
        count=0
        length = min(len(true_words),len(pred_words))
        for i in range(length):
            if (true_words[i] == pred_words[i]):
                count+=1
        total_acc_words+= count
        total_num_words+= len(true_words)
    acc = np.float(total_acc_words)/(total_num_words)
    return acc

In [None]:
inp_vocab = target_vectorization.get_vocabulary()
inp_index_lookup = dict(zip(range(len(inp_vocab)), inp_vocab))
max_decoded_sentence_length = 211

model = keras.models.load_model('translator_model.keras', custom_objects={'PositionalEmbedding':PositionalEmbedding, "TransformerEncoder": TransformerEncoder, "TransformerDecoder": TransformerDecoder})

def decode_sequence(input_sentence):
    tokenized_input_sentence = source_vectorization([input_sentence])
    decoded_sentence = "[start]"
    for i in range(max_decoded_sentence_length):
        tokenized_target_sentence = target_vectorization(
            [decoded_sentence])[:, :-1]
        predictions = model(
            [tokenized_input_sentence, tokenized_target_sentence])
        sampled_token_index = np.argmax(predictions[0, i, :])
        sampled_token = inp_index_lookup[sampled_token_index]
        decoded_sentence += " " + sampled_token
        if sampled_token == "end":
            break
            
    decoded_sentence = decoded_sentence.replace("[start] ","")
    decoded_sentence = decoded_sentence.replace(" end","")
    return decoded_sentence

test_inp_texts = [pair[0] for pair in test_pairs]
test_out_texts = [pair[1] for pair in test_pairs]
test_pred_texts = [decode_sequence(input) for input in test_inp_texts]

accuracy = acc_pred(test_out_texts,test_pred_texts)
print(f"Test accuracy: {accuracy:.3f}")

print(f"Ground Truth: {test_out_texts[3]}")
print(f"Predicted: {test_pred_texts[3]}")

# Instructions to run the model:

In [None]:
# Step 1
# You would need to provide the path for 'DS_5_train_input' and 'DS_5_train_output' at mentioned places
# Then run the entire cell

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import pick
import random
import numpy as np

input_text = pickle.load(open(path_for_DS_5_train_input,'rb')) # path for 'DS_5_train_input'
output_text = pickle.load(open(path_for_DS_5_train_output,'rb')) # path for 'DS_5_test_input'

text_pairs = []
for input,output in zip(input_text,output_text):
    output_lang = "[start] " + output + "[end]"
    text_pairs.append((input, output_lang))
    
random.shuffle(text_pairs)

vocab_size = 50
sequence_length = 213
batch_size = 64

source_vectorization = layers.TextVectorization(
    max_tokens=vocab_size,
    output_mode="int",
    output_sequence_length=sequence_length
)
target_vectorization = layers.TextVectorization(
    max_tokens=vocab_size,
    output_mode="int",
    output_sequence_length=sequence_length + 1
)

input_texts = [pair[0] for pair in text_pairs]
output_texts = [pair[1] for pair in text_pairs]
source_vectorization.adapt(input_texts)
target_vectorization.adapt(output_texts)

class TransformerEncoder(layers.Layer):
    def __init__(self, embed_dim, dense_dim, num_heads, **kwargs):
        super().__init__(**kwargs)
        self.embed_dim = embed_dim
        self.dense_dim = dense_dim
        self.num_heads = num_heads
        self.attention = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim)
        self.dense_proj = keras.Sequential(
            [layers.Dense(dense_dim, activation="relu"),
             layers.Dense(embed_dim),]
        )
        self.layernorm_1 = layers.LayerNormalization()
        self.layernorm_2 = layers.LayerNormalization()

    def call(self, inputs, mask=None):
        if mask is not None:
            mask = mask[:, tf.newaxis, :]
        attention_output = self.attention(
            inputs, inputs, attention_mask=mask)
        proj_input = self.layernorm_1(inputs + attention_output)
        proj_output = self.dense_proj(proj_input)
        return self.layernorm_2(proj_input + proj_output)

    def get_config(self):
        config = super().get_config()
        config.update({
            "embed_dim": self.embed_dim,
            "num_heads": self.num_heads,
            "dense_dim": self.dense_dim,
        })
        return config
    
class TransformerDecoder(layers.Layer):
    def __init__(self, embed_dim, dense_dim, num_heads, **kwargs):
        super().__init__(**kwargs)
        self.embed_dim = embed_dim
        self.dense_dim = dense_dim
        self.num_heads = num_heads
        self.attention_1 = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim)
        self.attention_2 = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim)
        self.dense_proj = keras.Sequential(
            [layers.Dense(dense_dim, activation="relu"),
             layers.Dense(embed_dim),]
        )
        self.layernorm_1 = layers.LayerNormalization()
        self.layernorm_2 = layers.LayerNormalization()
        self.layernorm_3 = layers.LayerNormalization()
        self.supports_masking = True

    def get_config(self):
        config = super().get_config()
        config.update({
            "embed_dim": self.embed_dim,
            "num_heads": self.num_heads,
            "dense_dim": self.dense_dim,
        })
        return config

    def get_causal_attention_mask(self, inputs):
        input_shape = tf.shape(inputs)
        batch_size, sequence_length = input_shape[0], input_shape[1]
        i = tf.range(sequence_length)[:, tf.newaxis]
        j = tf.range(sequence_length)
        mask = tf.cast(i >= j, dtype="int32")
        mask = tf.reshape(mask, (1, input_shape[1], input_shape[1]))
        mult = tf.concat(
            [tf.expand_dims(batch_size, -1),
             tf.constant([1, 1], dtype=tf.int32)], axis=0)
        return tf.tile(mask, mult)

    def call(self, inputs, encoder_outputs, mask=None):
        causal_mask = self.get_causal_attention_mask(inputs)
        if mask is not None:
            padding_mask = tf.cast(
                mask[:, tf.newaxis, :], dtype="int32")
            padding_mask = tf.minimum(padding_mask, causal_mask)
        attention_output_1 = self.attention_1(
            query=inputs,
            value=inputs,
            key=inputs,
            attention_mask=causal_mask)
        attention_output_1 = self.layernorm_1(inputs + attention_output_1)
        attention_output_2 = self.attention_2(
            query=attention_output_1,
            value=encoder_outputs,
            key=encoder_outputs,
            attention_mask=padding_mask,
        )
        attention_output_2 = self.layernorm_2(
            attention_output_1 + attention_output_2)
        proj_output = self.dense_proj(attention_output_2)
        return self.layernorm_3(attention_output_2 + proj_output)
    
class PositionalEmbedding(layers.Layer):
    def __init__(self, sequence_length, input_dim, output_dim, **kwargs):
        super().__init__(**kwargs)
        self.token_embeddings = layers.Embedding(
            input_dim=input_dim, output_dim=output_dim)
        self.position_embeddings = layers.Embedding(
            input_dim=sequence_length, output_dim=output_dim)
        self.sequence_length = sequence_length
        self.input_dim = input_dim
        self.output_dim = output_dim

    def call(self, inputs):
        length = tf.shape(inputs)[-1]
        positions = tf.range(start=0, limit=length, delta=1)
        embedded_tokens = self.token_embeddings(inputs)
        embedded_positions = self.position_embeddings(positions)
        return embedded_tokens + embedded_positions

    def compute_mask(self, inputs, mask=None):
        return tf.math.not_equal(inputs, 0)

    def get_config(self):
        config = super(PositionalEmbedding, self).get_config()
        config.update({
            "output_dim": self.output_dim,
            "sequence_length": self.sequence_length,
            "input_dim": self.input_dim,
        })
        return config

In [None]:
# Step 2
# You would need to provide the path for 'DS_5_test_input' and 'DS_5_test_output' at mentioned places
# You would need to provide the path for the model at the mentioned place

input_text = pickle.load(open(path_for_DS_5_test_input,'rb')) # path for 'DS_5_test_input'
output_text = pickle.load(open(path_for_DS_5_test_output,'rb')) # path for 'DS_5_test_output'

test_pairs = []
for input,output in zip(input_text,output_text):
    output_lang = "[start] " + output + "[end]"
    test_pairs.append((input, output_lang))
    
inp_vocab = target_vectorization.get_vocabulary()
inp_index_lookup = dict(zip(range(len(inp_vocab)), inp_vocab))
max_decoded_sentence_length = 211

model = keras.models.load_model(path_for_model, 
        custom_objects={'PositionalEmbedding':PositionalEmbedding, 
                        "TransformerEncoder": TransformerEncoder, 
                        "TransformerDecoder": TransformerDecoder}) # path for model

def acc_pred(y_true,y_pred):
    total_acc_words = 0
    total_num_words = 0
    for true,pred in zip(y_true,y_pred):
        true_words = true.split(" ")
        pred_words = pred.split(" ")
        count=0
        length = min(len(true_words),len(pred_words))
        for i in range(length):
            if (true_words[i] == pred_words[i]):
                count+=1
        total_acc_words+= count
        total_num_words+= len(true_words)
    acc = np.float(total_acc_words)/(total_num_words)
    return acc

def decode_sequence(input_sentence):
    tokenized_input_sentence = source_vectorization([input_sentence])
    decoded_sentence = "[start]"
    for i in range(max_decoded_sentence_length):
        tokenized_target_sentence = target_vectorization(
            [decoded_sentence])[:, :-1]
        predictions = model(
            [tokenized_input_sentence, tokenized_target_sentence])
        sampled_token_index = np.argmax(predictions[0, i, :])
        sampled_token = inp_index_lookup[sampled_token_index]
        decoded_sentence += " " + sampled_token
        if sampled_token == "end":
            break
            
    decoded_sentence = decoded_sentence.replace("[start] ","")
    decoded_sentence = decoded_sentence.replace(" end","")
    return decoded_sentence

test_inp_texts = [pair[0] for pair in test_pairs]
test_out_texts = [pair[1] for pair in test_pairs]
test_pred_texts = [decode_sequence(input) for input in test_inp_texts]

accuracy = acc_pred(test_out_texts,test_pred_texts)
print(f"Test accuracy: {accuracy:.3f}")

print(f"Ground Truth: {test_out_texts[3]}")
print(f"Predicted: {test_pred_texts[3]}")