# Project 02

**Submitted by: Diego Sol**

CSCE 636-600: Deep Learning

Professor: Dr. Anxiao Jiang

----------------

**This project is on machine translation for two "artificial" languages: an "Input Language" and an "Output Language". We want to build a model to translate the texts in the "Input Language" to texts in the "Output Language". For example, a text in the "Input Language" can be "a g b f a f a e a k a j c f b f c d a k a k c e b g a h a k b d b f b f b d c d " , and its translation to the "Output Language" is "b f c f b f c d a j e f g c e b g a k i j b d b f a k l m b f b d a h ed ee ef a k k eg a k h eh a e ei c d a f ej ek a g d el".**

# Initialization

**This section includes mounting the session to Google Drive, importing necessary libraries to run the file, and downloading and reformatting the training datasets.**

## Mount Colab

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


## Import Libraries

In [2]:
import pickle
import tensorflow as tf
import numpy as np

from tensorflow import keras
from tensorflow.keras import layers
from matplotlib import pyplot as plt

rng = np.random.default_rng()

# RUN THIS SECTION

**This section evaluates the model and makes predictions.**

## Encoder

In [3]:
class TransformerEncoder(layers.Layer):
    def __init__(self, embed_dim, dense_dim, num_heads, **kwargs):
        super().__init__(**kwargs)
        self.embed_dim = embed_dim
        self.dense_dim = dense_dim
        self.num_heads = num_heads
        self.attention = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim)
        self.dense_proj = keras.Sequential(
            [layers.Dense(dense_dim, activation="relu"),
             layers.Dense(embed_dim),]
        )
        self.layernorm_1 = layers.LayerNormalization()
        self.layernorm_2 = layers.LayerNormalization()

    def call(self, inputs, mask=None):
        if mask is not None:
            mask = mask[:, tf.newaxis, :]
        attention_output = self.attention(
            inputs, inputs, attention_mask=mask)
        proj_input = self.layernorm_1(inputs + attention_output)
        proj_output = self.dense_proj(proj_input)
        return self.layernorm_2(proj_input + proj_output)

    def get_config(self):
        config = super().get_config()
        config.update({
            "embed_dim": self.embed_dim,
            "num_heads": self.num_heads,
            "dense_dim": self.dense_dim,
        })
        return config

## Decoder

In [4]:
class TransformerDecoder(layers.Layer):
    def __init__(self, embed_dim, dense_dim, num_heads, **kwargs):
        super().__init__(**kwargs)
        self.embed_dim = embed_dim
        self.dense_dim = dense_dim
        self.num_heads = num_heads
        self.attention_1 = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim)
        self.attention_2 = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim)
        self.dense_proj = keras.Sequential(
            [layers.Dense(dense_dim, activation="relu"),
             layers.Dense(embed_dim),]
        )
        self.layernorm_1 = layers.LayerNormalization()
        self.layernorm_2 = layers.LayerNormalization()
        self.layernorm_3 = layers.LayerNormalization()
        self.supports_masking = True

    def get_config(self):
        config = super().get_config()
        config.update({
            "embed_dim": self.embed_dim,
            "num_heads": self.num_heads,
            "dense_dim": self.dense_dim,
        })
        return config

    def get_causal_attention_mask(self, inputs):
        input_shape = tf.shape(inputs)
        batch_size, sequence_length = input_shape[0], input_shape[1]
        i = tf.range(sequence_length)[:, tf.newaxis]
        j = tf.range(sequence_length)
        mask = tf.cast(i >= j, dtype="int32")
        mask = tf.reshape(mask, (1, input_shape[1], input_shape[1]))
        mult = tf.concat(
            [tf.expand_dims(batch_size, -1),
             tf.constant([1, 1], dtype=tf.int32)], axis=0)
        return tf.tile(mask, mult)

    def call(self, inputs, encoder_outputs, mask=None):
        causal_mask = self.get_causal_attention_mask(inputs)
        if mask is not None:
            padding_mask = tf.cast(
                mask[:, tf.newaxis, :], dtype="int32")
            padding_mask = tf.minimum(padding_mask, causal_mask)
        attention_output_1 = self.attention_1(
            query=inputs,
            value=inputs,
            key=inputs,
            attention_mask=causal_mask)
        attention_output_1 = self.layernorm_1(inputs + attention_output_1)
        attention_output_2 = self.attention_2(
            query=attention_output_1,
            value=encoder_outputs,
            key=encoder_outputs,
            attention_mask=padding_mask,
        )
        attention_output_2 = self.layernorm_2(
            attention_output_1 + attention_output_2)
        proj_output = self.dense_proj(attention_output_2)
        return self.layernorm_3(attention_output_2 + proj_output)

## Positional Embedding

In [5]:
class PositionalEmbedding(layers.Layer):
    def __init__(self, sequence_length, input_dim, output_dim, **kwargs):
        super().__init__(**kwargs)
        self.token_embeddings = layers.Embedding(
            input_dim=input_dim, output_dim=output_dim)
        self.position_embeddings = layers.Embedding(
            input_dim=sequence_length, output_dim=output_dim)
        self.sequence_length = sequence_length
        self.input_dim = input_dim
        self.output_dim = output_dim

    def call(self, inputs):
        length = tf.shape(inputs)[-1]
        positions = tf.range(start=0, limit=length, delta=1)
        embedded_tokens = self.token_embeddings(inputs)
        embedded_positions = self.position_embeddings(positions)
        return embedded_tokens + embedded_positions

    def compute_mask(self, inputs, mask=None):
        return tf.math.not_equal(inputs, 0)

    def get_config(self):
        config = super(PositionalEmbedding, self).get_config()
        config.update({
            "output_dim": self.output_dim,
            "sequence_length": self.sequence_length,
            "input_dim": self.input_dim,
        })
        return config

## Vectorize vocabularies

In [6]:
encoder_vectorize = tf.keras.layers.TextVectorization(output_mode='int', output_sequence_length=200, vocabulary= ['', '[UNK]', 'a', 'b', 'c', 'e', 'g', 'd', 'f', 'k', 'i', 'j', 'h'])
decoder_vectorize = tf.keras.layers.TextVectorization(output_mode='int', output_sequence_length=200, vocabulary= ['', '[UNK]', 'a', 'b', 'c', 'e', 'g', 'd', 'f', 'k', 'i', 'j', 'h', 'start', 'm', 'l', 'end', 'eh', 'eg', 'ef', 'ee', 'ed', 'ei', 'ej', 'ek', 'el', 'em', 'fd', 'fe', 'ff', 'fg', 'fh', 'fi', 'fj', 'fk', 'fl', 'fm', 'gd'])
inverse_vectorize = dict(zip(range(38), ['', '[UNK]', 'a', 'b', 'c', 'e', 'g', 'd', 'f', 'k', 'i', 'j', 'h', 'start', 'm', 'l', 'end', 'eh', 'eg', 'ef', 'ee', 'ed', 'ei', 'ej', 'ek', 'el', 'em', 'fd', 'fe', 'ff', 'fg', 'fh', 'fi', 'fj', 'fk', 'fl', 'fm', 'gd']))

## Decode vocabulary

In [7]:
def decode_sequence(input_chunk):
  encoded_chunk = encoder_path(input_chunk)
  decoded_chunk = np.asarray(["start"])
  decoded_chunk = np.repeat(decoded_chunk,input_chunk.shape[0])
  for i in range(0, 200):
    predictions = decoder_path([decoder_vectorize(decoded_chunk),encoded_chunk])
    sampled_token_index = np.argmax(predictions[:, i, :], axis=1)
    sampled_token = np.vectorize(inverse_vectorize.get)(sampled_token_index)
    decoded_chunk = np.char.add(decoded_chunk, " ")
    decoded_chunk = np.char.add(decoded_chunk, sampled_token)
  return decoded_chunk

## Load transformer model

In [8]:
transformer = keras.models.load_model(
    "/content/drive/MyDrive/Colab Notebooks/CSCE_636_Project_2/transformer.keras",
    custom_objects={"TransformerEncoder": TransformerEncoder,
                    "PositionalEmbedding": PositionalEmbedding,
                    "TransformerDecoder": TransformerDecoder})

In [9]:
test_input = encoder_vectorize(pickle.load(open("/content/drive/MyDrive/Colab Notebooks/CSCE_636_Project_2/DS_5_test_input",'rb')))

In [10]:
encoder_input = transformer.get_layer("input_1").input
encoder_ouput = transformer.get_layer("transformer_encoder").output
encoder_path = keras.Model(encoder_input,encoder_ouput)

decoder_input = transformer.get_layer("input_2").input
x = transformer.get_layer("positional_embedding_1")(decoder_input)
encoder_output = transformer.get_layer("transformer_encoder").output
x = transformer.get_layer("transformer_decoder")(x,encoder_output)
x = transformer.get_layer("dropout")(x)
decoder_ouput = transformer.get_layer("dense_4")(x)
decoder_path = keras.Model([decoder_input,encoder_output],decoder_ouput)

## Predict an example

In [13]:
test_output = decode_sequence(test_input)

for i in range(0,test_output.shape[0]):
  end_index = test_output[i].find("end")
  if end_index != -1:
    test_output[i] = test_output[i][6:end_index-1]

ResourceExhaustedError: ignored

In [None]:
print(test_output[i])
print(ans[i][6:])