### Imports

In [None]:
import re
import os
import json
import spacy
import random
import numpy as np
import pandas as pd
import tensorflow as tf
import tensorflow_datasets as tfds

### Reproducibility

In [None]:
C_Seed = 1
os.environ['PYTHONHASHSEED'] = str(C_Seed)
random.seed(C_Seed)
np.random.seed(C_Seed)
tf.random.set_seed(C_Seed)

### Initializations

In [None]:
# Custom Parameters
C_FilePath = "/content/drive/MyDrive/Jarvis/Data/MarvelCinematicUniverse_02_CaptainAmericaFirstAvenger.csv"
C_max_convos = None
C_max_length = 40
C_batch_size = 64
C_num_layers = 2
C_num_units = 512
C_d_model = 256
C_num_heads = 8
C_activation = 'relu'
C_epochs = 40
C_dropout = 0.1
C_max_vocab_size = 2**15
C_Random_State = 0
C_actual_vocab_size = 16
C_nlp = spacy.load('en_core_web_sm')

# Utilities

In [None]:
class DataUtilities:
    """Getting the data, preprocess to remove / replace unsupported patterns and encode the data. """

    def __init__(self):
        """Importing the data from a csv and initializing the variables used in the class. """  
        self.data = pd.read_csv(C_FilePath)
        if C_max_convos:
            self.data = self.data[1:C_max_convos]
        self.tokenized_questions = []
        self.tokenized_answers = []
        self.tokenizer = None
        self.dataset = None
        self.questions = []
        self.answers = []

    def preprocess(self, string):
        """Cleaning the data and removing unsupported patterns from the input. """
        string = string.lower()
        string = re.sub(r'[^a-z0-9]', ' ', string)
        string = re.sub(r'\s{1,}', ' ', string)
        doc = C_nlp(string)
        string = [token.lemma_ for token in doc if token.text not in C_nlp.Defaults.stop_words]
        return ' '.join(string)

    def tokenize(self):
        """Tokenizing, encoding and padding to form the dataset for model training. """

        self.data['Q'] = self.data['Questions'].apply(self.preprocess)
        self.data['A'] = self.data['Answers'].apply(self.preprocess)

        self.questions = self.data['Q'].tolist()
        self.answers = self.data['A'].tolist()

        self.tokenizer = tf.keras.preprocessing.text.Tokenizer()
        self.tokenizer.fit_on_texts(self.questions + self.answers)
        C_actual_vocab_size = len(self.tokenizer.word_index)

        self.tokenized_questions = self.tokenizer.texts_to_sequences(self.questions)
        self.tokenized_answers = self.tokenizer.texts_to_sequences(self.answers)

        self.tokenized_questions = tf.keras.preprocessing.sequence.pad_sequences(self.tokenized_questions, maxlen=C_max_length, padding='post')
        self.tokenized_answers = tf.keras.preprocessing.sequence.pad_sequences(self.tokenized_answers, maxlen=C_max_length, padding='post')
        self.dataset = tf.data.Dataset.from_tensor_slices(({'XfIn': self.tokenized_questions,
                                                            'XfDeIn': self.tokenized_answers[:, :-1]},
                                                           self.tokenized_answers[:, 1:]))
        self.dataset = self.dataset.cache()
        self.dataset = self.dataset.shuffle(len(self.tokenized_questions))
        self.dataset = self.dataset.batch(C_batch_size)
        self.dataset = self.dataset.prefetch(tf.data.experimental.AUTOTUNE)
        return self.dataset, self.tokenizer


class ScaledDotProductAttention:
    """Calculate the scaled dot product attention. The formula for calculating the scaled dot product attention is :
    ScaledDotProductAttention = softmax[(Q . K) / (D ** 0.5)] . V
    Mask is optional.
    """

    def __init__(self, q, k, v, mask):
        self.q = q
        self.k = k
        self.v = v
        self.mask = mask

    def attention(self):
        """Multiply Query and Key, scale it by square root of depth of key, softmax it and multiply by Value. """
        qk = tf.matmul(self.q, self.k, transpose_b=True)
        scale_factor = tf.cast(tf.shape(self.k)[-1], tf.float32)
        logits = qk / tf.math.sqrt(scale_factor)
        if self.mask is not None:
            logits += (self.mask * -1e9)
        attention_weights = tf.nn.softmax(logits, axis=-1)
        sdpa = tf.matmul(attention_weights, self.v)
        return sdpa


class MultiHeadAttention(tf.keras.layers.Layer):
    """Split Query Key Value into multiple heads so that the transformer will be able to attend to information at
    different positions at different representational spaces. """

    def __init__(self):
        super(MultiHeadAttention, self).__init__()
        self.depth = C_d_model // C_num_heads
        self.wq = tf.keras.layers.Dense(C_d_model)
        self.wk = tf.keras.layers.Dense(C_d_model)
        self.wv = tf.keras.layers.Dense(C_d_model)
        self.dense = tf.keras.layers.Dense(C_d_model)

    def get_config(self):
        config = super(MultiHeadAttention, self).get_config()
        config.update({'num_heads': C_num_heads, 'd_model': C_d_model})
        return config

    def split_heads(self, inputs, batch_size):
        inputs = tf.reshape(inputs, shape=(batch_size, -1, C_num_heads, self.depth))
        return tf.transpose(inputs, perm=[0, 2, 1, 3])

    def call(self, inputs):
        """Split query, key, value into multiple heads, calculate Scaled Dot Product Attention and concatenate those
        into a single attention. """
        query, key, value, mask = inputs['query'], inputs['key'], inputs['value'], inputs['mask']
        batch_size = tf.shape(query)[0]
        query = self.wq(query)
        key = self.wk(key)
        value = self.wv(value)

        query = self.split_heads(query, batch_size)
        key = self.split_heads(key, batch_size)
        value = self.split_heads(value, batch_size)

        scaled_attention = ScaledDotProductAttention(query, key, value, mask).attention()
        scaled_attention = tf.transpose(scaled_attention, perm=[0, 2, 1, 3])
        concat_attention = tf.reshape(scaled_attention, (batch_size, -1, C_d_model))
        outputs = self.dense(concat_attention)

        return outputs


class PositionalEncoding(tf.keras.layers.Layer):
    """Since the transformer is a non-recurrent model unlike RNNs and LSTMs, there has to be a positional encoding to
    denote the relative position of each word in the sequence. Otherwise, the transformer will effectively see a bag of
    wards with no information on the correlation of the words whatsoever. """

    def __init__(self):
        super(PositionalEncoding, self).__init__()
        self.pos_encoding = self.positional_encoding()

    def get_config(self):
        config = super(PositionalEncoding, self).get_config()
        config.update({'position': C_actual_vocab_size, 'd_model': d_model})
        return config

    def get_angles(self, pos, i):
        """Defining the base formula for positional encoding. """
        angles = 1 / tf.pow(10000, (2 * (i // 2) / tf.cast(C_d_model, tf.float32)))
        return pos * angles

    def positional_encoding(self):
        """Positional Encoding for each word - sine to words at even indices and cosine to words at odd indices. """
        angle_rads = self.get_angles(pos = tf.cast(tf.range(C_actual_vocab_size)[:, tf.newaxis], dtype=tf.float32),
                                    i = tf.cast(tf.range(C_d_model)[tf.newaxis, :], dtype=tf.float32)
                                    )
        sines = tf.math.sin(angle_rads[:, 0::2])
        cosines = tf.math.cos(angle_rads[:, 1::2])
        pos_encoding = tf.concat([sines, cosines], axis=-1)
        pos_encoding = pos_encoding[tf.newaxis, ...]
        return pos_encoding

    def call(self, inputs):
        return inputs + self.pos_encoding[:, :tf.shape(inputs)[1], :]


class Encoder:
    """Defining the encoder part of the transformer model. This will take in word embeddings and sums it up with the
    positional encoding. This aggregation is fed as an input to the encoder."""

    def __init__(self):
        """Initializing the variables used in the class."""
        self.en_inputs, self.el_inputs, self.en_outputs, self.el_outputs = None, None, None, None
        self.en_padding_mask, self.el_padding_mask = None, None
        self.en_embeddings, self.el_attention = None, None

    def encoder_layer(self):
        self.el_inputs = tf.keras.Input(shape=(None, C_d_model))
        self.el_padding_mask = tf.keras.Input(shape=(1, 1, None))
        self.el_attention = MultiHeadAttention()({'query': self.el_inputs,
                                                  'key': self.el_inputs,
                                                  'value': self.el_inputs,
                                                  'mask': self.el_padding_mask,
                                                  })
        self.el_attention = tf.keras.layers.Dropout(C_dropout)(self.el_attention)
        self.el_attention = tf.keras.layers.LayerNormalization(epsilon=1e-6)(self.el_inputs + self.el_attention)
        self.el_outputs = tf.keras.layers.Dense(C_num_units, activation='relu')(self.el_attention)
        self.el_outputs = tf.keras.layers.Dense(C_d_model)(self.el_outputs)
        self.el_outputs = tf.keras.layers.Dropout(C_dropout)(self.el_outputs)
        self.el_outputs = tf.keras.layers.LayerNormalization(epsilon=1e-6)(self.el_attention + self.el_outputs)
        return tf.keras.Model(inputs=[self.el_inputs, self.el_padding_mask], outputs=self.el_outputs)

    def encoder(self):
        self.en_inputs = tf.keras.Input(shape=(None,))
        self.en_padding_mask = tf.keras.Input(shape=(1, 1, None))
        self.en_embeddings = tf.keras.layers.Embedding(
            C_actual_vocab_size, C_d_model)(self.en_inputs)
        self.en_embeddings *= tf.math.sqrt(tf.cast(C_d_model, tf.float32))
        self.en_embeddings = PositionalEncoding()(self.en_embeddings)
        self.en_outputs = tf.keras.layers.Dropout(C_dropout)(self.en_embeddings)
        for i in range(C_num_layers):
            self.en_outputs = self.encoder_layer()([self.en_outputs, self.en_padding_mask])
        return tf.keras.Model(inputs=[self.en_inputs, self.en_padding_mask], outputs=self.en_outputs)


class Decoder:
    """Defining the decoder part of the transformer. The decoder takes in the output of the encoder, processes it,
    decodes it and displays it in a human readable format."""
    def __init__(self):
        """Initializing the variables used in the class."""
        self.de_padding_mask, self.dl_padding_mask, self.dl_en_outputs, self.de_en_outputs = None, None, None, None
        self.de_inputs, self.dl_inputs, self.de_outputs, self.dl_outputs = None, None, None, None
        self.de_embeddings, self.dl_attn1, self.dl_attn2 = None, None, None
        self.de_foresight_mask, self.dl_foresight_mask = None, None

    def decoder_layer(self):
        self.dl_inputs = tf.keras.Input(shape=(None, C_d_model))
        self.dl_en_outputs = tf.keras.Input(shape=(None, C_d_model))
        self.dl_foresight_mask = tf.keras.Input(shape=(1, None, None))
        self.dl_padding_mask = tf.keras.Input(shape=(1, 1, None))
        self.dl_attn1 = MultiHeadAttention()(inputs={'query': self.dl_inputs,
                                                     'key': self.dl_inputs,
                                                     'value': self.dl_inputs,
                                                     'mask': self.dl_foresight_mask
                                                     })
        self.dl_attn1 = tf.keras.layers.LayerNormalization(epsilon=1e-6)(self.dl_attn1 + self.dl_inputs)
        self.dl_attn2 = MultiHeadAttention()(inputs={'query': self.dl_attn1,
                                                     'key': self.dl_en_outputs,
                                                     'value': self.dl_en_outputs,
                                                     'mask': self.dl_padding_mask
                                                     })
        self.dl_attn2 = tf.keras.layers.Dropout(C_dropout)(self.dl_attn2)
        self.dl_attn2 = tf.keras.layers.LayerNormalization(epsilon=1e-6)(self.dl_attn2 + self.dl_attn1)
        self.dl_outputs = tf.keras.layers.Dense(C_num_units, activation='relu')(self.dl_attn2)
        self.dl_outputs = tf.keras.layers.Dense(C_d_model)(self.dl_outputs)
        self.dl_outputs = tf.keras.layers.Dropout(C_dropout)(self.dl_outputs)
        self.dl_outputs = tf.keras.layers.LayerNormalization(epsilon=1e-6)(self.dl_outputs + self.dl_attn2)
        return tf.keras.Model(inputs=[self.dl_inputs, self.dl_en_outputs, self.dl_foresight_mask, self.dl_padding_mask],
                              outputs=self.dl_outputs)

    def decoder(self):
        self.de_inputs = tf.keras.Input(shape=(None,))
        self.de_en_outputs = tf.keras.Input(shape=(None, C_d_model))
        self.de_foresight_mask = tf.keras.Input(shape=(1, None, None))
        self.de_padding_mask = tf.keras.Input(shape=(1, 1, None))
        self.de_embeddings = tf.keras.layers.Embedding(
            C_actual_vocab_size, C_d_model)(self.de_inputs)
        self.de_embeddings *= tf.math.sqrt(tf.cast(C_d_model, tf.float32))
        self.de_embeddings = PositionalEncoding()(self.de_embeddings)
        self.de_outputs = tf.keras.layers.Dropout(C_dropout)(self.de_embeddings)
        for i in range(C_num_layers):
            self.de_outputs = self.decoder_layer()(
                inputs=[self.de_outputs, self.de_en_outputs, self.de_foresight_mask, self.de_padding_mask])
        return tf.keras.Model(inputs=[self.de_inputs, self.de_en_outputs, self.de_foresight_mask, self.de_padding_mask],
                              outputs=self.de_outputs)


class Transformer:
    """Building the transformer from component parts. """

    def __init__(self):
        self.xf_inputs, self.xf_de_inputs, self.xf_en_outputs, self.xf_de_outputs = None, None, None, None
        self.xf_en_padding_mask, self.xf_foresight_mask, self.xf_de_padding_mask = None, None, None
        self.pad_mask, self.foresight_mask, self.seq_len, self.pad_mask2 = None, None, None, None
        self.xf_outputs = None

    def post_padding(self, x):
        self.pad_mask = tf.cast(tf.math.equal(x, 0), tf.float32)
        return self.pad_mask[:, tf.newaxis, tf.newaxis, :]

    def block_foresight(self, x):
        self.seq_len = tf.shape(x)[1]
        self.foresight_mask = 1 - tf.linalg.band_part(tf.ones((self.seq_len, self.seq_len)), -1, 0)
        self.pad_mask2 = self.post_padding(x)
        return tf.maximum(self.foresight_mask, self.pad_mask2)

    def transformer(self):
        self.xf_inputs = tf.keras.Input(shape=(None,), name='XfIn')
        self.xf_de_inputs = tf.keras.Input(shape=(None,), name='XfDeIn')
        self.xf_en_padding_mask = tf.keras.layers.Lambda(self.post_padding, output_shape=(1, 1, None), name='XfEnPM')(self.xf_inputs)
        self.xf_foresight_mask = tf.keras.layers.Lambda(self.block_foresight, output_shape=(1, None, None), name='XfFm')(self.xf_de_inputs)
        self.xf_de_padding_mask = tf.keras.layers.Lambda(self.post_padding, output_shape=(1, 1, None), name='XfDePm')(self.xf_inputs)
        self.xf_en_outputs = Encoder().encoder()(inputs=[self.xf_inputs, self.xf_en_padding_mask])
        self.xf_de_outputs = Decoder().decoder()(inputs=[self.xf_de_inputs, self.xf_en_outputs, self.xf_foresight_mask, self.xf_de_padding_mask])
        self.xf_outputs = tf.keras.layers.Dense(units=C_actual_vocab_size)(self.xf_de_outputs)
        return tf.keras.Model(inputs=[self.xf_inputs, self.xf_de_inputs], outputs=self.xf_outputs)

# Build and train the model

In [None]:
# Necessary Helper Utilities

def plot_generator():
    metrics = ['Root_Mean_Squared_Error', 'Loss']
    linetype = ['-', '--']
    fig, axes = plt.subplots(1, 2, figsize = (18, 5))
    epochs = range(len(history.history[metrics[0].lower()]))
    for i in range(len(metrics)):
        axes[i].plot(epochs, history.history[metrics[i].lower()], 'r'+ linetype[i], label = 'Training')
        axes[i].plot(epochs, history.history['val_' + metrics[i].lower()], 'b' + linetype[i] , label = 'Validation')
        axes[i].set_title(metrics[i], fontsize = 16)
        axes[i].grid(True)
        axes[i].set_xlabel('Epoch')
        axes[i].set_ylabel(metrics[i])
        axes[i].legend()

def lrscheduler(epoch, lr):
  if epoch % C_Lr_Interval == C_Lr_Interval - 1:
    return lr * 0.1 
  else:
    return lr

def cb_def():
    cb = [
          tf.keras.callbacks.EarlyStopping(monitor = 'val_loss', patience = C_Patience, restore_best_weights = True, min_delta = 0.0001),
          tf.keras.callbacks.LearningRateScheduler(lrscheduler)
         ]
    return cb

In [None]:
data_util = DataUtilities()
dataset, tokenizer = data_util.tokenize()
print(len(tokenizer.word_counts))
jarvis = Transformer().transformer()
jarvis.compile(optimizer='sgd', loss='sparse_categorical_crossentropy', metrics=['sparse_categorical_accuracy'])
C_Patience = 5
C_Lr_Interval = 5
history = jarvis.fit(dataset, epochs=C_epochs, verbose=0, callbacks=cb_def())

In [None]:
txt = 'who is the father of tony'
txt = tf.expand_dims(C_start_token + tokenizer.encode(txt) + C_stop_token, axis=0)
outs = tf.expand_dims(C_start_token, axis=0)
predictions = jarvis([txt, outs])
predictions = predictions[:, -1:, :]
predicted_id = tf.cast(tf.argmax(predictions, axis=-1), tf.int32)
outs = tf.concat([outs, predicted_id], axis=-1)
output = tf.squeeze(outs, axis=0)
ansz = tokenizer.decode([i for i in output if i < tokenizer.vocab_size])
print(ansz)