In [1]:
import tensorflow as tf
import pickle
import string
import re
import numpy as np
from keras.models import Model
from keras.layers import Input, LSTM, Dense, Embedding
from keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow import keras
from tensorflow.keras import layers
from keras.optimizers import Adam

In [2]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [3]:
# Load the input and output data
train_input_texts = pickle.load(open('/content/drive/MyDrive/Colab Notebooks/DL/Project2/Train_input', 'rb'))
train_output_texts = pickle.load(open('/content/drive/MyDrive/Colab Notebooks/DL/Project2/Train_output', 'rb'))

In [4]:
#Checking length of input and output max
input_lengthMax = max([len(txt) for txt in train_input_texts])
output_lengthMax = max([len(txt) for txt in train_output_texts])
print(input_lengthMax)
print(output_lengthMax)

64
99


In [5]:
#Like in the github for Ch11, added both input and output in one list seperated by comma and added a start and end token for output
language = []
for line_ip, line_op in zip(train_input_texts, train_output_texts):
    output_lng = "[start] " + line_op + " [end]"
    language.append((line_ip, output_lng))

In [6]:
language[1]

('a d a d b d a e b d a g c g a g c f c f ',
 '[start] b d b d a e e a d d f c g c f c f a g i j a g h k a d g l  [end]')

In [7]:
#split into training and validation set
import random
random.shuffle(language)
val_set_num = int(0.15 * len(language))
train_set_num = len(language) - val_set_num - val_set_num//6
train_set = language[:train_set_num]
val_set = language[train_set_num:train_set_num + val_set_num]
test_set = language[train_set_num+val_set_num:]

In [8]:
print(val_set_num)
print(train_set_num)
print(len(test_set))

16800
92400
2800


Vectorizing the input language and output language text pairs

In [None]:
strip_chars = string.punctuation + "¿"
strip_chars = strip_chars.replace("[", "")
strip_chars = strip_chars.replace("]", "")

def custom_standardization(input_string):
    lowercase = tf.strings.lower(input_string)
    return tf.strings.regex_replace(
        lowercase, f"[{re.escape(strip_chars)}]", "")

vocab_size = 40 #total unique words being 37 approx 40
sequence_length = 300 #max length of output being about 211 so approx kept it as 300

source_vectorization = layers.TextVectorization(
    max_tokens=vocab_size,
    output_mode="int",
    output_sequence_length=sequence_length,
)
target_vectorization = layers.TextVectorization(
    max_tokens=vocab_size,
    output_mode="int",
    output_sequence_length=sequence_length + 1,
    standardize=custom_standardization,
)
train_inp_lang = [pair[0] for pair in train_set]
train_otp_lang = [pair[1] for pair in train_set]
source_vectorization.adapt(train_inp_lang)
target_vectorization.adapt(train_otp_lang)

Preparing datasets for the translation task

In [None]:
batch_size = 64

def format_dataset(inp, otp):
    inp = source_vectorization(inp)
    otp = target_vectorization(otp)
    return ({
        "language_1": inp,
        "language_2": otp[:, :-1],
    }, otp[:, 1:])

def make_dataset(pairs):
    inp_lang, otp_lang = zip(*pairs)
    inp_lang = list(inp_lang)
    otp_lang = list(otp_lang)
    dataset = tf.data.Dataset.from_tensor_slices((inp_lang, otp_lang))
    dataset = dataset.batch(batch_size)
    dataset = dataset.map(format_dataset, num_parallel_calls=4)
    return dataset.shuffle(2048).prefetch(16).cache()

train_dataset = make_dataset(train_set)
val_dataset = make_dataset(val_set)

In [None]:
for inputs, targets in train_dataset.take(1):
    print(f"inputs['language_1'].shape: {inputs['language_1'].shape}")
    print(f"inputs['language_2'].shape: {inputs['language_2'].shape}")
    print(f"targets.shape: {targets.shape}")

inputs['language_1'].shape: (64, 300)
inputs['language_2'].shape: (64, 300)
targets.shape: (64, 300)


Transformer encoder implemented as a subclassed Layer

In [None]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers

class TransformerEncoder(layers.Layer):
    def __init__(self, embed_dim, dense_dim, num_heads, **kwargs):
        super().__init__(**kwargs)
        self.embed_dim = embed_dim
        self.dense_dim = dense_dim
        self.num_heads = num_heads
        #MultiHeadAttention layer that implements the multi-head attention mechanism
        self.attention = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim)
        #dense projection layer, applies two dense layers with ReLU activation and then a final dense layer with no activation function
        self.dense_proj = keras.Sequential(
            [layers.Dense(dense_dim, activation="relu"),
             layers.Dense(embed_dim),]
        )
        #Two LayerNormalization layers that normalize the input vectors and the output vectors, respectively
        self.layernorm_1 = layers.LayerNormalization()
        self.layernorm_2 = layers.LayerNormalization()

    def call(self, inputs, mask=None):
        if mask is not None:
            mask = mask[:, tf.newaxis, :]
        attention_output = self.attention(
            inputs, inputs, attention_mask=mask)
        proj_input = self.layernorm_1(inputs + attention_output)
        proj_output = self.dense_proj(proj_input)
        return self.layernorm_2(proj_input + proj_output)

    #dictionary containing the configuration of the layer
    def get_config(self):
        config = super().get_config()
        config.update({
            "embed_dim": self.embed_dim, #dimension of the input vector, and also the output vector
            "num_heads": self.num_heads, #number of attention heads to use in the Multi-Head Attention mechanism
            "dense_dim": self.dense_dim, #dimension of the intermediate vector produced by the dense projection layer
        })
        return configv

Transformer Decoder

In [None]:
class TransformerDecoder(layers.Layer):
    def __init__(self, embed_dim, dense_dim, num_heads, **kwargs):
        super().__init__(**kwargs)
        self.embed_dim = embed_dim #dimensionality of the input embedding and output embeddings
        self.dense_dim = dense_dim #dimensionality of the hidden layer in the feedforward network inside the decoder
        self.num_heads = num_heads #number of attention heads to use in the multi-head attention layers
        self.attention_1 = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim)
        self.attention_2 = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim)
        self.dense_proj = keras.Sequential(
            [layers.Dense(dense_dim, activation="relu"),
             layers.Dense(embed_dim),]
        )
        self.layernorm_1 = layers.LayerNormalization()
        self.layernorm_2 = layers.LayerNormalization()
        self.layernorm_3 = layers.LayerNormalization()
        self.supports_masking = True

    def get_config(self):
        config = super().get_config()
        config.update({
            "embed_dim": self.embed_dim,
            "num_heads": self.num_heads,
            "dense_dim": self.dense_dim,
        })
        return config

    def get_causal_attention_mask(self, inputs):
        input_shape = tf.shape(inputs)
        batch_size, sequence_length = input_shape[0], input_shape[1]
        i = tf.range(sequence_length)[:, tf.newaxis]
        j = tf.range(sequence_length)
        mask = tf.cast(i >= j, dtype="int32")
        mask = tf.reshape(mask, (1, input_shape[1], input_shape[1]))
        mult = tf.concat(
            [tf.expand_dims(batch_size, -1),
             tf.constant([1, 1], dtype=tf.int32)], axis=0)
        return tf.tile(mask, mult)
#implements the forward pass of the Decoder layer
    def call(self, inputs, encoder_outputs, mask=None):
        causal_mask = self.get_causal_attention_mask(inputs)
        if mask is not None:
            padding_mask = tf.cast(
                mask[:, tf.newaxis, :], dtype="int32")
            padding_mask = tf.minimum(padding_mask, causal_mask)
        else:
            padding_mask = mask
        attention_output_1 = self.attention_1(
            query=inputs,
            value=inputs,
            key=inputs,
            attention_mask=causal_mask)
        attention_output_1 = self.layernorm_1(inputs + attention_output_1)
        attention_output_2 = self.attention_2(
            query=attention_output_1,
            value=encoder_outputs,
            key=encoder_outputs,
            attention_mask=padding_mask,
        )
        attention_output_2 = self.layernorm_2(
            attention_output_1 + attention_output_2)
        proj_output = self.dense_proj(attention_output_2)
        return self.layernorm_3(attention_output_2 + proj_output)

Using positional encoding to re-inject order information.
Implementing positional embedding as a subclassed layer

In [None]:
class PositionalEmbedding(layers.Layer):
    def __init__(self, sequence_length, input_dim, output_dim, **kwargs):
        super().__init__(**kwargs)
        self.token_embeddings = layers.Embedding(
            input_dim=input_dim, output_dim=output_dim)
        self.position_embeddings = layers.Embedding(
            input_dim=sequence_length, output_dim=output_dim)
        self.sequence_length = sequence_length
        self.input_dim = input_dim
        self.output_dim = output_dim

    def call(self, inputs):
        length = tf.shape(inputs)[-1]
        positions = tf.range(start=0, limit=length, delta=1)
        embedded_tokens = self.token_embeddings(inputs)
        embedded_positions = self.position_embeddings(positions)
        return embedded_tokens + embedded_positions

    def compute_mask(self, inputs, mask=None):
        return tf.math.not_equal(inputs, 0)

    def get_config(self):
        config = super(PositionalEmbedding, self).get_config()
        config.update({
            "output_dim": self.output_dim,
            "sequence_length": self.sequence_length,
            "input_dim": self.input_dim,
        })
        return config

Putting it all together

In [None]:
embed_dim = 256
dense_dim = 2048
num_heads = 8
#model takes two inputs: encoder_inputs and decoder_inputs
encoder_inputs = keras.Input(shape=(None,), dtype="int64", name="language_1")
x = PositionalEmbedding(sequence_length, vocab_size, embed_dim)(encoder_inputs) # adds positional encoding to the input embeddings
encoder_outputs = TransformerEncoder(embed_dim, dense_dim, num_heads)(x) #output of the PositionalEmbedding layer is passed through a TransformerEncoder

decoder_inputs = keras.Input(shape=(None,), dtype="int64", name="language_2")
#decoder_inputs are also passed through a PositionalEmbedding layer, and then through a TransformerDecoder layer
x = PositionalEmbedding(sequence_length, vocab_size, embed_dim)(decoder_inputs)
x = TransformerDecoder(embed_dim, dense_dim, num_heads)(x, encoder_outputs)
x = layers.Dropout(0.5)(x) #Dropout layer is applied to the output of the TransformerDecoder layer to prevent overfitting
decoder_outputs = layers.Dense(vocab_size, activation="softmax")(x) #Dense layer with a softmax activation is applied to produce the output probability distribution over the vocabulary for each position in the output sequence
transformer = keras.Model([encoder_inputs, decoder_inputs], decoder_outputs)

In [None]:
transformer.summary

<bound method Model.summary of <keras.src.engine.functional.Functional object at 0x7aa0cdff2680>>

In [None]:
#To save the model
callbacks =[keras.callbacks.ModelCheckpoint("final_transformer120.keras",save_best_only=True)]

In [None]:
transformer.compile(
    optimizer=Adam(),
    loss="sparse_categorical_crossentropy",
    metrics=["accuracy"])
transformer.fit(train_dataset, epochs=120, validation_data=val_dataset,callbacks=callbacks)

Epoch 1/120
   2/1400 [..............................] - ETA: 16:46:26 - loss: 4.6123 - accuracy: 0.1332

Translating few lines with Transformer model to check output syntax

In [None]:
import numpy as np
output_vocab = target_vectorization.get_vocabulary()
output_index_lookup = dict(zip(range(len(output_vocab)), output_vocab))
max_decoded_sentence_length = 100

def decode_sequence(input_sentence):
    tokenized_input_sentence = source_vectorization([input_sentence])
    decoded_sentence = "[start]"
    for i in range(max_decoded_sentence_length):
        tokenized_target_sentence = target_vectorization(
            [decoded_sentence])[:, :-1]
        predictions = transformer(
            [tokenized_input_sentence, tokenized_target_sentence])
        sampled_token_index = np.argmax(predictions[0, i, :])
        sampled_token = output_index_lookup[sampled_token_index]
        decoded_sentence += " " + sampled_token
        if sampled_token == "[end]":
            break
    decoded_sentence = decoded_sentence.replace('[start] ','').replace(' [end]','')
    return decoded_sentence

test_input_train_texts = [pair[0] for pair in train_set]
for _ in range(5):
    input_sentence = random.choice(test_input_train_texts)
    print("-")
    print(input_sentence)
    print(decode_sequence(input_sentence))

In [None]:
from tensorflow.keras import models
import pickle
import tensorflow as tf

model = models.load_model("/content/drive/MyDrive/Colab Notebooks/DL/Project2/Rishabh_Bassi_532008692_Project2_Model.h5")
test_output = test_set

# Include your data preprocessing code if applicable
# <your data preprocessing code>
# Include your data preprocessing code if applicable

test_loss, test_acc = model.evaluate(test_output)
your_score = round(test_acc*1000) / 10
print(f"Your Score: {your_score}")

In [None]:
import numpy as np
output_vocab = target_vectorization.get_vocabulary()
output_index_lookup = dict(zip(range(len(output_vocab)), output_vocab))
max_decoded_sentence_length = sequence_length

def decode_sequence(input_sentence):
    tokenized_input_sentence = source_vectorization([input_sentence])
    decoded_sentence = "[start]"
    for i in range(max_decoded_sentence_length):
        tokenized_target_sentence = target_vectorization(
            [decoded_sentence])[:, :-1]
        predictions = transformer(
            [tokenized_input_sentence, tokenized_target_sentence])
        sampled_token_index = np.argmax(predictions[0, i, :])
        sampled_token = output_index_lookup[sampled_token_index]
        decoded_sentence += " " + sampled_token
        if sampled_token == "[end]":
            break
    decoded_sentence = decoded_sentence.replace('[start] ','').replace(' [end]','')
    return decoded_sentence



test_input_train_texts = [pair[0] for pair in test_set]
test_output_train_texts = [pair[1] for pair in test_set]
total_sentences = len(test_set)
count=0
i=1
for pair in test_set:
    truesq = pair[1].replace('[start] ','').replace(' [end]','')
    outputseq=decode_sequence(pair[0]).strip()
    if(outputseq==truesq.strip()):
        count+=1
    if(i%200==0):
        print("Accuracy and I",i,count/i);
    i+=1
#     print("True Output ",truesq.strip())
#     print("Predicted ",outputseq)


test_acc = count / total_sentences
your_score = round(test_acc * 1000) / 10
print(f"Your Test Accuracy: {test_acc:.4f}")
print(f"Your Score: {your_score}")

# for _ in range(5):
#     input_sentence = random.choice(test_input_train_texts)
#     print("-")
#     print(input_sentence)
#     print(decode_sequence(input_sentence))



total_sentences = len(test_output)
total_correct_sentences = sum(pred_sentence.strip() == true_sentence.strip() for pred_sentence, true_sentence in zip(test_prediction, test_output))

test_acc = total_correct_sentences / total_sentences
your_score = round(test_acc * 1000) / 10
print(f"Your Test Accuracy: {test_acc:.4f}")
print(f"Your Score: {your_score}")