# CSCE 636 PROJECT 2 - ISHAAN MAHENDROO - UIN:327002775

In [1]:
import pickle
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow import keras
from keras import layers
import numpy as np
import string
import random

In [2]:
train_input = pickle.load(open('/content/Train_input','rb'))
train_output = pickle.load(open('/content/Train_output', 'rb'))

In [3]:
max_len_tr = 0
max_ind_tr = 0
max_len_ts = 0
max_ind_ts = 0
avg_tr = 0
avg_ts = 0
for i in range(len(train_input)):
  tr_word = train_input[i].split()
  l_tr = len(tr_word)
  avg_tr += l_tr
  if l_tr > max_len_tr:
    max_len_tr = l_tr
    max_ind_tr = i
  ts_word = train_output[i].split()
  l_ts = len(ts_word)
  avg_ts += l_ts
  if l_ts > max_len_ts:
    max_len_ts = l_ts
    max_ind_ts = i

print(max_len_tr)
print(train_input[max_ind_tr])
print(avg_tr / 112000)
print(max_len_ts)
print(train_output[max_ind_ts])
print(avg_ts / 112000)

32
a h a f a f c d c e a e b d b d a g a d c f b d a e a g b d c g 
22.096375
47
c d c e a f d e b d a e g a f f h b d c f b d a d k l b d c g a g ed ee a e ef a g m eg a h i j eh 
32.1445625


Seeing the average and max length of a sequence in each dataset to determine how long my sequence_length should be. Considering the "sentence" when there is no whitespace

In [4]:
test_output = []
for sequence in train_output:
  sequence = "[start] " + sequence + "[end]"
  test_output.append(sequence)


In [5]:
test_output[0]

'[start] b d b d c d c f a h e f g b d a d h i a f d j [end]'

Update each output language sequence to include "[start] " and " [end]".

In [6]:
source_vectorization = layers.TextVectorization(
    output_mode= "int",
    max_tokens = 1000,
    output_sequence_length= 40,
    standardize = "lower_and_strip_punctuation"
)

output_vectorization = layers.TextVectorization(
    output_mode = "int",
    max_tokens = 1000,
    output_sequence_length= 40 + 1,
    standardize = "lower"
)

source_vectorization.adapt(train_input)
output_vectorization.adapt(test_output)

I made my vocabulary size 1000, I assumed there could be 26 + (26*26) combinations if I account for pairs of letters and then each letter by itself, so I rounded to 1000 in case there were some other combinations I didn't see while looking at the data.

In [7]:
def vectorization(inputs, outputs):
  input = source_vectorization(inputs)
  output = output_vectorization(outputs)
  return ({"input_lang": input, "output_lang": output[:,:-1],}, output[:,1:])

def make_dataset(inputs, outputs):
  dataset = tf.data.Dataset.from_tensor_slices((inputs, outputs))
  dataset = dataset.batch(64)
  dataset = dataset.map(vectorization, num_parallel_calls=tf.data.AUTOTUNE)
  dataset = dataset.shuffle(112000).prefetch(tf.data.AUTOTUNE).cache()
  return dataset

data = make_dataset(train_input, test_output)
data = data.shuffle(1750) #another shuffle just for more robustness
val_size = 200 #since I batched the data, doing .take or .skip considers batches not individual rows
val_data = data.take(val_size)
train_data = data.skip(val_size)

I create a tensorflow dataset by passing the function vectorization to the input and output datasets. It returns the output_lang twice because the target is the output lang but the output lang is also an input during the transformer decoder portion of the model. So I need output lang as an input and output

In [8]:
print(val_data.cardinality().numpy()) #validation dataset is 64 * 200 batches = 12800 rows or a bit over 10% of the data
train_data.cardinality().numpy()

200


1550

In [9]:
for inputs, targets in train_data.take(1):
    print(f"input_lang shape: {inputs['input_lang'].shape}")
    print(f"output_lang shape: {inputs['output_lang'].shape}")
    print(f"targets shape: {targets.shape}")

input_lang shape: (64, 40)
output_lang shape: (64, 40)
targets shape: (64, 40)


Now the datasets are made where for both train and validation, each input is a batch with size 64 containing sequences where each word (letter/letters in this case) is represented by an int and each sequence is of length 40. The dataset has input_lang,output_lang without [end] for the training phase, output_lang without the [start] for the ground truth.

In [10]:
class PositionalEmbedding(layers.Layer):
  def __init__(self, sequence_length, input_dim, output_dim, **kwargs):
    super().__init__(**kwargs)
    self.token_embeddings = layers.Embedding(input_dim = input_dim, output_dim = output_dim)
    self.position_embeddings = layers.Embedding(input_dim = sequence_length, output_dim = output_dim)
    self.sequence_length = sequence_length
    self.input_dim = input_dim
    self.output_dim = output_dim

  def call(self, inputs):
    embedded_tokens = self.token_embeddings(inputs)

    length = tf.shape(inputs)[-1]
    positions = tf.range(start = 0, limit = length, delta = 1)
    embedded_positions = self.position_embeddings(positions)
    return embedded_tokens + embedded_positions

  def compute_mask(self, inputs, mask  = None):
    return tf.math.not_equal(inputs, 0)

  def get_config(self):
    config = super().get_config()
    config.update({"sequence_length": self.sequence_length, "input_dim": self.input_dim, "output_dim": self.output_dim,})
    return config

I wanted to embed my tensors to hopefully extract the meaning of each sentence. I felt doing positional embedding would be the best since I am doing text. Even though it is a made up language I still think order matters.

I used the author of the books method for positional embedding over the cosine method. I simply have 2 embedding layers, one for the original word and one for the position, then the length of the input sequence, the input_dim (vocab_size), and the output_dim. So both the word and positions get passed through a keras embedding layer seperately and then added together as the result.

Also a compute mask function so we can ignore padding 0s in the inputs.

In [11]:
class TransformerEncoder(layers.Layer):
  def __init__(self, embed_dim, dense_dim, num_heads, **kwargs):
    super().__init__(**kwargs)
    self.embed_dim = embed_dim
    self.dense_dim = dense_dim
    self.num_heads = num_heads

    self.attention = layers.MultiHeadAttention(num_heads = num_heads, key_dim = embed_dim)
    self.dense = keras.Sequential([layers.Dense(dense_dim, activation = "relu"), layers.Dense(embed_dim),])
    self.layernormal1 = layers.LayerNormalization()
    self.layernormal2 = layers.LayerNormalization()

  def call(self, inputs, mask = None):
    if mask is not None:
      mask = mask[:, tf.newaxis, :]

    attention_layer = self.attention(inputs, inputs, attention_mask = mask)
    dense_input = self.layernormal1(attention_layer + inputs)
    dense_layer = self.dense(dense_input)
    dense_output = self.layernormal2(dense_input + dense_layer)
    return dense_output

  def get_config(self):
    config = super().get_config()
    config.update({"embed_dim": self.embed_dim, "dense_dim": self.dense_dim, "num_heads": self.num_heads,})
    return config

For the transformerendcoder, I followed the textbooks general outline of the structure as well. Doing:

multihead attention layer => residual_1(multihead_output + inputs) => dense_projection => resdiual_2(residual_1_output + dense_proj_output).

So the init and call functions follow that outline. I needed the embed_dim, dense_dim, and num_heads then included the required 5 layers. Then the call function is just passing through the layers in the above order.

In [51]:
class TransformerDecoder(layers.Layer):
  def __init__(self, embed_dim, dense_dim, num_heads, **kwargs):
    super().__init__(**kwargs)
    self.embed_dim = embed_dim
    self.dense_dim = dense_dim
    self.num_heads = num_heads

    self.attention1 = layers.MultiHeadAttention(num_heads = num_heads, key_dim = embed_dim)
    self.attention2 = layers.MultiHeadAttention(num_heads = num_heads, key_dim = embed_dim)
    self.dense = keras.Sequential([layers.Dense(dense_dim, activation = 'relu'), layers.Dense(embed_dim),])
    self.layernormal1 = layers.LayerNormalization()
    self.layernormal2 = layers.LayerNormalization()
    self.layernormal3 = layers.LayerNormalization()

    self.supports_masking = True

  def get_causal_attention_mask(self, inputs):
    input_shape = tf.shape(inputs)
    batch_size, sequence_length = input_shape[0], input_shape[1]
    i = tf.range(sequence_length)[:, tf.newaxis]
    j = tf.range(sequence_length)
    mask = tf.cast(i >= j, dtype = "int32")
    mask = tf.reshape(mask, (1, input_shape[1], input_shape[1]))
    mult = tf.concat([tf.expand_dims(batch_size, -1), tf.constant([1,1], dtype = tf.int32)], axis = 0)
    return tf.tile(mask, mult)

  def call(self, inputs, encoder_outputs, mask = None):
    causal_mask = self.get_causal_attention_mask(inputs)
    if mask is not None:
      padding_mask = tf.cast(mask[:,tf.newaxis,:], dtype = "int32")
      padding_mask = tf.minimum(padding_mask, causal_mask)
    else:
      padding_mask = mask

    attention_layer1 = self.attention1(query = inputs, value = inputs, key = inputs, attention_mask = causal_mask)
    attention_layer1 = self.layernormal1(inputs + attention_layer1)
    attention_layer2 = self.attention2(query = attention_layer1, value = encoder_outputs, key = encoder_outputs, attention_mask = padding_mask)
    attention_layer2 = self.layernormal2(attention_layer1 + attention_layer2)
    dense_layer = self.dense(attention_layer2)
    dense_output = self.layernormal3(attention_layer2 + dense_layer)
    return dense_output

  def get_config(self):
    config = super().get_config()
    config.update({"embed_dim": self.embed_dim, "dense_dim": self.dense_dim, "num_heads": self.num_heads,})
    return config

The decoder follows a pretty similar structure to the encoder however there are a few key differences. There are now 7 layers in the decoder, an additional multihead attention layer that takes in the encoder's outputs as its input and an additional layer normalization layer to account for this additional multihead attention layer.

So in the init function simialrly I needed the embed_dim, dense_dim, and num_heads again then below that are the 7 layers I need being 2 multihead_attentions, 3 layer_normalizations, and 2 Dense layers.

The main difference between this and the encoder at least code-wise is that I needed to account for causal padding. The transformerdecoder is "order agnostic" meaning unlike the RNN which looks at its inputs one at a time, the decoder looks at the whole target sequence at the same time. This would be cheating and I would have perfect train accuracy with terrible validation if I didn't include this because the decoder would be able to see the answer (the n+1th key would just be copied to location n in the output because it can see the next key already, it doesn't need to predict it will just map). So the get causal_attention_mask will mask the outputs we shouldn't know yet with a matrix of the form 1,0,0... (next row) 1,1,0,0,0,... (next row) 1,1,1,0,0,0,... and so on.

The call function works pretty similarly too as it is just following the structure of the decoder from the textbook. The main difference again is that one of the attention layers must use the encoder's outputs as its input. This is clear in the attention_layer_2 where the key and value of this attention_layer are the encoder_outputs. The first attention layer is taking the output_lang as input, the second attention layer is taking the (encoded) input_lang as the input. So it follows as:

multihead_attention1(output_lang) => layer_norm1(output_lang + multihead_attention_output) => multihead_attention2(encoded_input_lang) => layer_norm2(layer_norm_1_output + multihead_attention2(encoded_input_lang)) => dense_projection => layernorm3(dense_output + layernorm_2_output).

In [13]:
embed_dim = 256
dense_dim = 2048
num_heads = 8
input_seq_len = 40
output_seq_len = 40
vocab_size = 1000

encoder_inputs = keras.Input(shape = (None,), dtype = "int64", name = "input_lang")
x = PositionalEmbedding(input_seq_len, vocab_size, embed_dim)(encoder_inputs)
encoder_outputs = TransformerEncoder(embed_dim, dense_dim, num_heads)(x)

decoder_inputs = keras.Input(shape = (None,), dtype = "int64", name = "output_lang")
x = PositionalEmbedding(output_seq_len, vocab_size, embed_dim)(decoder_inputs)
x = TransformerDecoder(embed_dim, dense_dim, num_heads)(x, encoder_outputs)
x = layers.Dropout(0.5)(x)
decoder_outputs = layers.Dense(vocab_size, activation = "softmax")(x)

transformer = keras.Model([encoder_inputs, decoder_inputs], decoder_outputs)
transformer.compile(optimizer = "Adam", loss = "sparse_categorical_crossentropy", metrics = ["accuracy"])

For my initial attempt, I kept the structure of the model the same as the textbook again, meaning I have a pretty small model. I have the encoder inputs get passed through their positional embedding layer then through the transformer encoder. Then the output_lang gets passed into the decoder_inputs and through its respective positional embedding layer. Then the transformerdecoder layer uses the outputs from the encoder and the positionaly embedded output_lang tensors. After that I included a dropout even though it was my first attempt because it seems like common practice to include a dropout layer for transformer models so I tried it. Had the model seemed like it was underfitting I could remove it in following attempts. Similarly for the first attempt I followed the same embed_dim, num_heads, and dense_dim as the textbook to get an initial baseline model performance then tune as I needed. This is also why I started with 1 TransformerEncoder and 1 TransformerDecoder layer, I could stack more of these layers together if necessary for a complex problem. However as seen below, the model performed very well on this dataset. I believe it was able to get such high accuracy becasue the dataset was designed to be perfectly translated. Had it been a real language the accuracy could be lower.

In [14]:
transformer.summary()

Model: "model"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 input_lang (InputLayer)     [(None, None)]               0         []                            
                                                                                                  
 output_lang (InputLayer)    [(None, None)]               0         []                            
                                                                                                  
 positional_embedding (Posi  (None, None, 256)            266240    ['input_lang[0][0]']          
 tionalEmbedding)                                                                                 
                                                                                                  
 positional_embedding_1 (Po  (None, None, 256)            266240    ['output_lang[0][0]']     

In [15]:
transformer.fit(train_data, epochs = 10, validation_data = val_data)

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


<keras.src.callbacks.History at 0x7b48dd537610>

In [16]:
transformer.save('transformer.h5')

  saving_api.save_model(


# SIMPLE CODE FOR TESTING

In [17]:
sparse_vocab = output_vectorization.get_vocabulary()
sparse_index = dict(zip(range(len(sparse_vocab)), sparse_vocab))
max_sent = 40

In [49]:
def decode_sentence(input):
  token_inputs = source_vectorization([input])
  decoded_sentence = "[start]"
  for i in range(max_sent):
    token_outputs = output_vectorization([decoded_sentence])[:, :-1] # get rid of last word in the output "[end]"
    prediction = transformer([token_inputs, token_outputs])
    sample_index = np.argmax(prediction[0,i,:]) #predictions makes an array of all the words, so look at word i and choose the highest probability
    sample = sparse_index[sample_index]

    if sample == "[end]":
      break

    decoded_sentence += " " + sample

  return decoded_sentence[len("[start] "):] # remove the [start]

In [50]:
sentence = train_input[0]
print(sentence)
print(decode_sentence(sentence))
print(train_output[0])

a f b d a d a h b d c d c f b d 
b d b d c d c f a h e f g b d a d h i a f d j
b d b d c d c f a h e f g b d a d h i a f d j 


In [43]:
for _ in range(5):
  i = np.random.randint(len(train_input))
  sentence = train_input[i]
  print(sentence)
  print(decode_sentence(sentence))
  print(train_output[i])
  print('-' * 30)

a f a f a d c f b d c e a g c d c g 
c f b d a d d e c e a f f g c d c g a g i j a f h k
c f b d a d d e c e a f f g c d c g a g i j a f h k 
------------------------------
a h b d a e a d a d a e b d b d a g b d c g a e c d 
b d b d a e e b d a d f g b d c g a g i j a d h k a e l c d a e ed a h d m ee
b d b d a e e b d a d f g b d c g a g i j a d h k a e l c d a e ed a h d m ee 
------------------------------
a f a g c e a g c g a d c d a e b d c f 
c e c g c d b d a e g a d f h a g e i a g d j c f a f k l
c e c g c d b d a e g a d f h a g e i a g d j c f a f k l 
------------------------------
a d a e a e a e a f c f b d a d b d c g 
c f b d a f d e a e f a e g a e h b d c g a d j k a d i l
c f b d a f d e a e f a e g a e h b d c g a d j k a d i l 
------------------------------
a e a g a f c e a h b d a d b d b d b d b d 
c e b d b d b d a d f g b d a h e h i a f d j b d a g k l a e m
c e b d b d b d a d f g b d a h e h i a f d j b d a g k l a e m 
------------------------------


As we can see, the model gets the translations correct. However since I have no test dataset yet this could be inaccurate of course. Since I randomly shuffled twice I do not know which samples were in my train or validation dataset so I just selected 5 random ones from the whole dataset. In practice if this was all the data I had it could be better to split this dataset into train,validate,and test. However since the test set is coming later I only did train and validate. And I feel seeing that it performed very well on the validation set is a strong indication of how it will perform on the unseen test set.