<a href="https://colab.research.google.com/github/ayushk7102/Transformer_Model_TF2/blob/main/Transformer_TF2_Chatbot.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install tf

Collecting tf
  Downloading tf-1.0.0.tar.gz (620 bytes)
Building wheels for collected packages: tf
  Building wheel for tf (setup.py) ... [?25l[?25hdone
  Created wheel for tf: filename=tf-1.0.0-py3-none-any.whl size=1285 sha256=b7c455fe63dbc286ff5f9e010f029313ebb9f22ae6977a70d8cdbda7cfb34274
  Stored in directory: /root/.cache/pip/wheels/db/c7/58/cca67875b41ff853d3fdaa20b54a780ef2e045fbcacaef1ee3
Successfully built tf
Installing collected packages: tf
Successfully installed tf-1.0.0


In [2]:
import tensorflow as tf; print(tf.__version__)
import argparse
import os
import re
import tensorflow_datasets as tfds
from tensorflow.keras import layers

2.6.0


**Loading dataset, preprocessing**
---



In [3]:
#PREPROCESSING

def preprocess_sentence(sentence):
  sentence = sentence.lower().strip()

  sentence = re.sub(r"([?.!,])", r" \1 ", sentence)
  sentence = re.sub(r'[" "]+', " ", sentence)

  sentence = re.sub(r"i'm", "i am", sentence)
  sentence = re.sub(r"he's", "he is", sentence)
  sentence = re.sub(r"she's", "she is", sentence)
  sentence = re.sub(r"it's", "it is", sentence)
  sentence = re.sub(r"that's", "that is", sentence)
  sentence = re.sub(r"what's", "that is", sentence)
  sentence = re.sub(r"where's", "where is", sentence)
  sentence = re.sub(r"how's", "how is", sentence)
  sentence = re.sub(r"\'ll", " will", sentence)
  sentence = re.sub(r"\'ve", " have", sentence)
  sentence = re.sub(r"\'re", " are", sentence)
  sentence = re.sub(r"\'d", " would", sentence)
  sentence = re.sub(r"\'re", " are", sentence)
  sentence = re.sub(r"won't", "will not", sentence)
  sentence = re.sub(r"can't", "cannot", sentence)
  sentence = re.sub(r"n't", " not", sentence)
  sentence = re.sub(r"n'", "ng", sentence)
  sentence = re.sub(r"'bout", "about", sentence)

  sentence = re.sub(r"[^a-zA-Z?.!,]+", " ", sentence)
  sentence = sentence.strip()
  return sentence


In [4]:
# CHANGE DELIMITER ACC TO DATASET: FOR CORNELL MOVIE DIALOGS,' +++$+++ '

def load_conversations(hparams, lines_filename, conversations_filename):
  # dictionary of line id to text
  id2line = {}
  delim = ' +++$+++ '
  with open(lines_filename, errors='ignore') as file:
    lines = file.readlines()
  for line in lines:
    parts = line.replace('\n', '').split(delim)
    id2line[parts[0]] = parts[4]

  questions, answers = [], []
  with open(conversations_filename, 'r') as file:
    lines = file.readlines()
  for line in lines:
    parts = line.replace('\n', '').split(' +++$+++ ')
    # get conversation in a list of line ID
    conversation = [line[1:-1] for line in parts[3][1:-1].split(', ')]
    for i in range(len(conversation) - 1):
      questions.append(preprocess_sentence(id2line[conversation[i]]))
      answers.append(preprocess_sentence(id2line[conversation[i + 1]]))
      if len(questions) >= hparams.max_samples:
        return questions, answers
  return questions, answers

In [5]:

def tokenize_and_filter(hparams, tokenizer, questions, answers):
  tokenized_questions, tokenized_answers = [], []

  for (question, answer) in zip(questions, answers):
    # tokenize sentence
    sentence1 = hparams.start_token + tokenizer.encode(
        question) + hparams.end_token
    sentence2 = hparams.start_token + tokenizer.encode(
        answer) + hparams.end_token

    # check tokenize sentence length
    if len(sentence1) <= hparams.max_length and len(
        sentence2) <= hparams.max_length:
      tokenized_questions.append(sentence1)
      tokenized_answers.append(sentence2)

  # pad tokenized sentences
  tokenized_questions = tf.keras.preprocessing.sequence.pad_sequences(
      tokenized_questions, maxlen=hparams.max_length, padding='post')
  tokenized_answers = tf.keras.preprocessing.sequence.pad_sequences(
      tokenized_answers, maxlen=hparams.max_length, padding='post')

  return tokenized_questions, tokenized_answers

In [6]:

def get_dataset(hparams):
  # download corpus
  path_to_zip = tf.keras.utils.get_file(
      'cornell_movie_dialogs.zip',
      origin=
      'http://www.cs.cornell.edu/~cristian/data/cornell_movie_dialogs_corpus.zip',
      extract=True)

  path_to_dataset = os.path.join(
      os.path.dirname(path_to_zip), "cornell movie-dialogs corpus")

  # get movie_lines.txt and movive_conversations.txt
  lines_filename = os.path.join(path_to_dataset, 'movie_lines.txt')
  conversations_filename = os.path.join(path_to_dataset,
                                        'movie_conversations.txt')

  questions, answers = load_conversations(hparams, lines_filename,
                                          conversations_filename)
  print('Loaded dataset')
  tokenizer = tfds.deprecated.text.SubwordTextEncoder.build_from_corpus(
      questions + answers, target_vocab_size=2**13)

  hparams.start_token = [tokenizer.vocab_size]
  hparams.end_token = [tokenizer.vocab_size + 1]
  hparams.vocab_size = tokenizer.vocab_size + 2

  questions, answers = tokenize_and_filter(hparams, tokenizer, questions,
                                           answers)

  dataset = tf.data.Dataset.from_tensor_slices(({
      'inputs': questions,
      'dec_inputs': answers[:, :-1]
  }, answers[:, 1:]))
  dataset = dataset.cache()
  dataset = dataset.shuffle(len(questions))
  dataset = dataset.batch(hparams.batch_size)
  dataset = dataset.prefetch(tf.data.experimental.AUTOTUNE)

  return dataset, tokenizer

## **Model begins here**



In [7]:
def scaled_dot_product_attention(query, key, value, mask):
  matmul_qk = tf.matmul(query, key, transpose_b=True)

  depth = tf.cast(tf.shape(key)[-1], tf.float32)
  logits = matmul_qk / tf.math.sqrt(depth)

  # add the mask zero out padding tokens.
  if mask is not None:
    logits += (mask * -1e9)

  attention_weights = tf.nn.softmax(logits, axis=-1)

  return tf.matmul(attention_weights, value)

In [8]:

class MultiHeadAttention(layers.Layer):

  def __init__(self, hparams, name="multi_head_attention"):
    super(MultiHeadAttention, self).__init__(name=name)
    self.num_heads = hparams.num_heads
    self.d_model = hparams.d_model

    assert self.d_model % self.num_heads == 0

    self.depth = self.d_model // self.num_heads

    self.query_dense = layers.Dense(self.d_model)
    self.key_dense = layers.Dense(self.d_model)
    self.value_dense = layers.Dense(self.d_model)

    self.dense = layers.Dense(self.d_model)

  def get_config(self):
    config = super(MultiHeadAttention, self).get_config()
    config.update({'num_heads': self.num_heads, 'd_model': self.d_model})
    return config

  def split_heads(self, inputs, batch_size):
    inputs = tf.reshape(
        inputs, shape=(batch_size, -1, self.num_heads, self.depth))
    return tf.transpose(inputs, perm=[0, 2, 1, 3])

  def call(self, inputs, **kwargs):
    query, key, value, mask = inputs['query'], inputs['key'], inputs[
        'value'], inputs['mask']
    batch_size = tf.shape(query)[0]

    # linear layers
    query = self.query_dense(query)
    key = self.key_dense(key)
    value = self.value_dense(value)

    # split heads
    query = self.split_heads(query, batch_size)
    key = self.split_heads(key, batch_size)
    value = self.split_heads(value, batch_size)

    # scaled dot-product attention
    scaled_attention = scaled_dot_product_attention(query, key, value, mask)
    scaled_attention = tf.transpose(scaled_attention, perm=[0, 2, 1, 3])

    # concatenation of heads
    concat_attention = tf.reshape(scaled_attention,
                                  (batch_size, -1, self.d_model))

    # final linear layer
    outputs = self.dense(concat_attention)

    return outputs

In [9]:
#CREATE MASKS: Padding and look-ahead

def create_padding_mask(x):
  mask = tf.cast(tf.math.equal(x, 0), dtype=tf.float32)
  return mask[:, tf.newaxis, tf.newaxis, :]

def create_look_ahead_mask(x):
  seq_len = tf.shape(x)[1]
  look_ahead_mask = 1 - tf.linalg.band_part(
      tf.ones((seq_len, seq_len), dtype=tf.float32), -1, 0)
  padding_mask = create_padding_mask(x)
  return tf.maximum(look_ahead_mask, padding_mask)

In [10]:
class PositionalEncoding(layers.Layer):

  def __init__(self, position, d_model):
    super(PositionalEncoding, self).__init__()
    self.position = position
    self.d_model = d_model
    self.pos_encoding = self.positional_encoding(position, d_model)

  def get_config(self):
    config = super(PositionalEncoding, self).get_config()
    config.update({'position': self.position, 'd_model': self.d_model})
    return config

  def get_angles(self, position, i, d_model):
    angles = 1 / tf.pow(10000, (2 * (i // 2)) / d_model)
    return position * angles

  def positional_encoding(self, position, d_model):
    angle_rads = self.get_angles(
        position=tf.cast(tf.range(position)[:, tf.newaxis], dtype=tf.float32),
        i=tf.cast(tf.range(d_model)[tf.newaxis, :], dtype=tf.float32),
        d_model=tf.cast(d_model, dtype=tf.float32))
    # apply sin to even index in the array
    sines = tf.math.sin(angle_rads[:, 0::2])
    # apply cos to odd index in the array
    cosines = tf.math.cos(angle_rads[:, 1::2])

    pos_encoding = tf.concat([sines, cosines], axis=-1)
    pos_encoding = pos_encoding[tf.newaxis, ...]
    return pos_encoding

  def call(self, inputs, **kwargs):
    return inputs + self.pos_encoding[:, :tf.shape(inputs)[1], :]



In [11]:
def encoder(hparams, name="encoder"):
  print('hparams passed successfully to encoder')

  inputs = tf.keras.Input(shape=(None,), name="inputs")
  padding_mask = tf.keras.Input(shape=(1, 1, None), name="padding_mask")

  embeddings = layers.Embedding(hparams.vocab_size, hparams.d_model)(inputs)
  embeddings *= tf.math.sqrt(tf.cast(hparams.d_model, dtype=tf.float32))
  embeddings = PositionalEncoding(hparams.vocab_size,
                                  hparams.d_model)(embeddings)

  outputs = layers.Dropout(hparams.dropout)(embeddings)

  for i in range(hparams.num_layers):
    outputs = encoder_layer(
        hparams,
        name="encoder_layer_{}".format(i),
    )([outputs, padding_mask])

  return tf.keras.Model(
      inputs=[inputs, padding_mask], outputs=outputs, name=name)



In [12]:
#tf.keras.utils.plot_model()

In [13]:

def decoder_layer(hparams, name="decoder_layer"):
  inputs = tf.keras.Input(shape=(None, hparams.d_model), name="inputs")
  enc_outputs = tf.keras.Input(
      shape=(None, hparams.d_model), name="encoder_outputs")
  look_ahead_mask = tf.keras.Input(
      shape=(1, None, None), name="look_ahead_mask")
  padding_mask = tf.keras.Input(shape=(1, 1, None), name='padding_mask')

  attention1 = MultiHeadAttention(
      hparams, name="attention_1")(inputs={
          'query': inputs,
          'key': inputs,
          'value': inputs,
          'mask': look_ahead_mask
      })
  attention1 += tf.cast(inputs, dtype=tf.float32)
  attention1 = layers.LayerNormalization(epsilon=1e-6)(attention1)

  attention2 = MultiHeadAttention(
      hparams, name="attention_2")(inputs={
          'query': attention1,
          'key': enc_outputs,
          'value': enc_outputs,
          'mask': padding_mask
      })
  attention2 = layers.Dropout(hparams.dropout)(attention2)
  attention2 += attention1
  attention2 = layers.LayerNormalization(epsilon=1e-6)(attention2 + attention1)

  outputs = layers.Dense(
      hparams.num_units, activation=hparams.activation)(attention2)
  outputs = layers.Dense(hparams.d_model)(outputs)
  outputs = layers.Dropout(hparams.dropout)(outputs)
  outputs += attention2
  outputs = layers.LayerNormalization(epsilon=1e-6)(outputs)

  return tf.keras.Model(
      inputs=[inputs, enc_outputs, look_ahead_mask, padding_mask],
      outputs=outputs,
      name=name)

In [14]:

def encoder_layer(hparams, name="encoder_layer"):
  inputs = tf.keras.Input(shape=(None, hparams.d_model), name="inputs")
  padding_mask = tf.keras.Input(shape=(1, 1, None), name="padding_mask")

  attention = MultiHeadAttention(
      hparams, name="attention")({
          'query': inputs,
          'key': inputs,
          'value': inputs,
          'mask': padding_mask
      })
  attention = layers.Dropout(hparams.dropout)(attention)
  attention += tf.cast(inputs, dtype=tf.float32)
  attention = layers.LayerNormalization(epsilon=1e-6)(attention)

  outputs = layers.Dense(
      hparams.num_units, activation=hparams.activation)(attention)
  outputs = layers.Dense(hparams.d_model)(outputs)
  outputs = layers.Dropout(hparams.dropout)(outputs)
  outputs += attention
  outputs = layers.LayerNormalization(epsilon=1e-6)(outputs)

  return tf.keras.Model(
      inputs=[inputs, padding_mask], outputs=outputs, name=name)

def decoder(hparams, name='decoder'):
  inputs = tf.keras.Input(shape=(None,), name='inputs')
  enc_outputs = tf.keras.Input(
      shape=(None, hparams.d_model), name='encoder_outputs')
  look_ahead_mask = tf.keras.Input(
      shape=(1, None, None), name='look_ahead_mask')
  padding_mask = tf.keras.Input(shape=(1, 1, None), name='padding_mask')

  embeddings = layers.Embedding(hparams.vocab_size, hparams.d_model)(inputs)
  embeddings *= tf.math.sqrt(tf.cast(hparams.d_model, dtype=tf.float32))
  embeddings = PositionalEncoding(hparams.vocab_size,
                                  hparams.d_model)(embeddings)

  outputs = layers.Dropout(hparams.dropout)(embeddings)

  for i in range(hparams.num_layers):
    outputs = decoder_layer(
        hparams,
        name='decoder_layer_{}'.format(i),
    )(inputs=[outputs, enc_outputs, look_ahead_mask, padding_mask])

  return tf.keras.Model(
      inputs=[inputs, enc_outputs, look_ahead_mask, padding_mask],
      outputs=outputs,
      name=name)

In [15]:
def transformer(hparams, name="transformer"):
  print('hparams passed successfully to transformer')
  inputs = tf.keras.Input(shape=(None,), name="inputs")
  dec_inputs = tf.keras.Input(shape=(None,), name="dec_inputs")

  enc_padding_mask = layers.Lambda(
      create_padding_mask, output_shape=(1, 1, None),
      name='enc_padding_mask')(inputs)
  # mask the future tokens for decoder inputs at the 1st attention block
  look_ahead_mask = layers.Lambda(
      create_look_ahead_mask,
      output_shape=(1, None, None),
      name='look_ahead_mask')(dec_inputs)
  # mask the encoder outputs for the 2nd attention block
  dec_padding_mask = layers.Lambda(
      create_padding_mask, output_shape=(1, 1, None),
      name='dec_padding_mask')(inputs)

  enc_outputs = encoder(hparams)(inputs=[inputs, enc_padding_mask])

  dec_outputs = decoder(hparams)(
      inputs=[dec_inputs, enc_outputs, look_ahead_mask, dec_padding_mask])

  outputs = layers.Dense(hparams.vocab_size, name="outputs")(dec_outputs)

  return tf.keras.Model(inputs=[inputs, dec_inputs], outputs=outputs, name=name)

**DRIVER**

In [16]:
class CustomSchedule(tf.keras.optimizers.schedules.LearningRateSchedule):

  def __init__(self, hparams, warmup_steps=4000):
    super(CustomSchedule, self).__init__()
    self.d_model = tf.cast(hparams.d_model, dtype=tf.float32)
    self.warmup_steps = warmup_steps

  def __call__(self, step):
    arg1 = tf.math.rsqrt(step)
    arg2 = step * self.warmup_steps**-1.5
    return tf.math.rsqrt(self.d_model) * tf.math.minimum(arg1, arg2)


In [None]:
def inference(hparams, model, tokenizer, sentence):
  sentence = preprocess_sentence(sentence)

  sentence = tf.expand_dims(
      hparams.start_token + tokenizer.encode(sentence) + hparams.end_token,
      axis=0)

  output = tf.expand_dims(hparams.start_token, 0)

  for i in range(hparams.max_length):
    predictions = model(inputs=[sentence, output], training=False)

    # select the last word from the seq_len dimension
    predictions = predictions[:, -1:, :]
    predicted_id = tf.cast(tf.argmax(predictions, axis=-1), tf.int32)

    # return the result if the predicted_id is equal to the end token
    if tf.equal(predicted_id, hparams.end_token[0]):
      break

    # concatenated the predicted_id to the output which is given to the decoder
    # as its input.
    output = tf.concat([output, predicted_id], axis=-1)

  return tf.squeeze(output, axis=0)

def predict(hparams, model, tokenizer, sentence):
  prediction = inference(hparams, model, tokenizer, sentence)

  predicted_sentence = tokenizer.decode(
      [i for i in prediction if i < tokenizer.vocab_size])

  return predicted_sentence

def evaluate(hparams, model, tokenizer):
  print('\nEvaluate')
  sentence = 'where have you been?'
  output = predict(hparams, model, tokenizer, sentence)
  print('input: {}\noutput: {}'.format(sentence, output))

  sentence = "it's a trap!"
  output = predict(hparams, model, tokenizer, sentence)
  print('\ninput: {}\noutput: {}'.format(sentence, output))

  sentence = 'I am not crazy, my mother had me tested'
  for _ in range(5):
    output = predict(hparams, model, tokenizer, sentence)
    print('\ninput: {}\noutput: {}'.format(sentence, output))
    sentence = output


def main(hparams):
  dataset, tokenizer = get_dataset(hparams)
  print('hparams : ', hparams)
  model = transformer(hparams)

  optimizer = tf.keras.optimizers.Adam(
      CustomSchedule(hparams), beta_1=0.9, beta_2=0.98, epsilon=1e-9)

  cross_entropy = tf.keras.losses.SparseCategoricalCrossentropy(
      from_logits=True, reduction='none')

  def loss_function(y_true, y_pred):
    y_true = tf.reshape(y_true, shape=(-1, hparams.max_length - 1))
    loss = cross_entropy(y_true, y_pred)
    mask = tf.cast(tf.not_equal(y_true, 0), dtype=tf.float32)
    loss = tf.multiply(loss, mask)
    return tf.reduce_mean(loss)

  def accuracy(y_true, y_pred):
    y_true = tf.reshape(y_true, shape=(-1, hparams.max_length - 1))
    return tf.keras.metrics.sparse_categorical_accuracy(y_true, y_pred)

  model.compile(optimizer, loss=loss_function, metrics=[accuracy])

  model.fit(dataset, epochs=hparams.epochs)

  evaluate(hparams, model, tokenizer)

if __name__ == '__main__':
  parser = argparse.ArgumentParser()
  parser.add_argument(
      '--max_samples',
      default=25000,
      type=int,
      help='maximum number of conversation pairs to use')
  parser.add_argument(
      '--max_length', default=40, type=int, help='maximum sentence length')
  parser.add_argument('--batch_size', default=64, type=int)
  parser.add_argument('--num_layers', default=2, type=int)
  parser.add_argument('--num_units', default=512, type=
int)
  parser.add_argument('--d_model', default=256, type=int)
  parser.add_argument('--num_heads', default=8, type=int)
  parser.add_argument('--dropout', default=0.1, type=float)
  parser.add_argument('--activation', default='relu', type=str)
  parser.add_argument('--epochs', default=250, type=int)
  print('reached here')
  #hparams = parser.parse_args()
  hparams, unknown = parser.parse_known_args()
  main(hparams)




reached here
Loaded dataset
hparams :  Namespace(activation='relu', batch_size=64, d_model=256, dropout=0.1, end_token=[8102], epochs=250, max_length=40, max_samples=25000, num_heads=8, num_layers=2, num_units=512, start_token=[8101], vocab_size=8103)
hparams passed successfully to transformer
hparams passed successfully to encoder
Epoch 1/250
Epoch 2/250
Epoch 3/250
Epoch 4/250
Epoch 5/250
Epoch 6/250
Epoch 7/250
Epoch 8/250
Epoch 9/250
Epoch 10/250
Epoch 11/250
Epoch 12/250
Epoch 13/250
Epoch 14/250
Epoch 15/250
Epoch 16/250
Epoch 17/250
Epoch 18/250
Epoch 19/250
Epoch 20/250
Epoch 21/250
Epoch 22/250
Epoch 23/250
Epoch 24/250
Epoch 25/250
Epoch 26/250

In [None]:
output = predict(hparams, model, tokenizer, sentence)
print('\ninput: {}\noutput: {}'.format(sentence, output))
sentence = output


In [None]:
NUM_LAYERS = 2
D_MODEL = 256
NUM_HEADS = 8
UNITS = 512
DROPOUT = 0.1

VOCAB_SIZE = 220000

In [None]:
model = transformer(
    vocab_size=VOCAB_SIZE,
    num_layers=NUM_LAYERS,
    units=UNITS,
    d_model=D_MODEL,
    num_heads=NUM_HEADS,
    dropout=DROPOUT)

In [None]:
def loss_function(y_true, y_pred):
  y_true = tf.reshape(y_true, shape=(-1, MAX_LENGTH - 1))
  
  loss = tf.keras.losses.SparseCategoricalCrossentropy(
      from_logits=True, reduction='none')(y_true, y_pred)

  mask = tf.cast(tf.not_equal(y_true, 0), tf.float32)
  loss = tf.multiply(loss, mask)

  return tf.reduce_mean(loss)

class CustomSchedule(tf.keras.optimizers.schedules.LearningRateSchedule):

  def __init__(self, d_model, warmup_steps=4000):
    super(CustomSchedule, self).__init__()

    self.d_model = d_model
    self.d_model = tf.cast(self.d_model, tf.float32)

    self.warmup_steps = warmup_steps

  def __call__(self, step):
    arg1 = tf.math.rsqrt(step)
    arg2 = step * (self.warmup_steps**-1.5)

    return tf.math.rsqrt(self.d_model) * tf.math.minimum(arg1, arg2)

learning_rate = CustomSchedule(D_MODEL)

optimizer = tf.keras.optimizers.Adam(
    learning_rate, beta_1=0.9, beta_2=0.98, epsilon=1e-9)

def accuracy(y_true, y_pred):
  # ensure labels have shape (batch_size, MAX_LENGTH - 1)
  y_true = tf.reshape(y_true, shape=(-1, MAX_LENGTH - 1))
  accuracy = tf.metrics.SparseCategoricalAccuracy()(y_true, y_pred)
  return accuracy

model.compile(optimizer=optimizer, loss=loss_function, metrics=[accuracy])

EPOCHS = 20

model.fit(dataset, epochs=EPOCHS)

In [None]:

def evaluate(sentence):
  sentence = preprocess_sentence(sentence)

  sentence = tf.expand_dims(
      START_TOKEN + tokenizer.encode(sentence) + END_TOKEN, axis=0)

  output = tf.expand_dims(START_TOKEN, 0)

  for i in range(MAX_LENGTH):
    predictions = model(inputs=[sentence, output], training=False)

    # select the last word from the seq_len dimension
    predictions = predictions[:, -1:, :]
    predicted_id = tf.cast(tf.argmax(predictions, axis=-1), tf.int32)

    # return the result if the predicted_id is equal to the end token
    if tf.equal(predicted_id, END_TOKEN[0]):
      break

    # concatenated the predicted_id to the output which is given to the decoder as its input.
    output = tf.concat([output, predicted_id], axis=-1)

  return tf.squeeze(output, axis=0)

def predict(sentence):
  prediction = evaluate(sentence)
  predicted_sentence = tokenizer.decode([i for i in prediction if i < tokenizer.vocab_size])
  return predicted_sentence
