# Conversational Chatbot with a sequence-to-sequence Transformer

**Reference:** [fchollet](https://twitter.com/fchollet)<br>

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


## Introduction

In this example, we'll build a sequence-to-sequence Transformer model, which
we'll train on a conversational chatbot task.

You'll learn how to:

- Vectorize text using the Keras `TextVectorization` layer.
- Implement a `TransformerEncoder` layer, a `TransformerDecoder` layer,
and a `PositionalEmbedding` layer.
- Prepare data for training a sequence-to-sequence model.
- - Use the trained model to generate response sentence in a conversational chatbot
input sentences (sequence-to-sequence inference).

The code featured here is adapted from the book
[Deep Learning with Python, Second Edition](https://www.manning.com/books/deep-learning-with-python-second-edition)
(chapter 11: Deep learning for text).
The present example is fairly barebones, so for detailed explanations of
how each building block works, as well as the theory behind Transformers,
I recommend reading the book.

## Setup

In [None]:
import pathlib
import random
import string
import re
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.layers import TextVectorization

## Preparing the data

In [None]:
def preprocess_sentence(sentence):
  sentence = sentence.lower().strip()
  # creating a space between a word and the punctuation following it
  # eg: "he is a boy." => "he is a boy ."
  sentence = re.sub(r"([?.!,])", r" \1 ", sentence)
  sentence = re.sub(r'[" "]+', " ", sentence)
  # removing contractions
  sentence = re.sub(r"i'm", "i am", sentence)
  sentence = re.sub(r"he's", "he is", sentence)
  sentence = re.sub(r"she's", "she is", sentence)
  sentence = re.sub(r"it's", "it is", sentence)
  sentence = re.sub(r"that's", "that is", sentence)
  sentence = re.sub(r"what's", "what is", sentence)
  sentence = re.sub(r"where's", "where is", sentence)
  sentence = re.sub(r"how's", "how is", sentence)
  sentence = re.sub(r"\'ll", " will", sentence)
  sentence = re.sub(r"\'ve", " have", sentence)
  sentence = re.sub(r"\'re", " are", sentence)
  sentence = re.sub(r"\'d", " would", sentence)
  sentence = re.sub(r"\'re", " are", sentence)
  sentence = re.sub(r"won't", "will not", sentence)
  sentence = re.sub(r"can't", "cannot", sentence)
  sentence = re.sub(r"n't", " not", sentence)
  sentence = re.sub(r"n'", "ng", sentence)
  sentence = re.sub(r"'bout", "about", sentence)
  # replacing everything with space except (a-z, A-Z, ".", "?", "!", ",")
  sentence = re.sub(r"[^a-zA-Z?.!,]+", " ", sentence)
  sentence = sentence.strip()
  return sentence

In [None]:
def read_dataset(path):

  text = []
  with open(path, 'r') as f:
    for line in f:
      line = line.rstrip()
      # print(line)
      res = line.split('__eou__')

      for i in range(0, len(res)-2):
        sent_1 = preprocess_sentence(res[i])
        sent_2 = preprocess_sentence(res[i+1])
        text.append(sent_1)
        text.append("[start] " + sent_2 + " [end]")
  
  return text

train_text = read_dataset('/content/drive/MyDrive/dataset/daily dialog/train/dialogues_train.txt')
test_text = read_dataset('/content/drive/MyDrive/dataset/daily dialog/test/dialogues_test.txt')
valid_text = read_dataset('/content/drive/MyDrive/dataset/daily dialog/validation/dialogues_validation.txt')


In [None]:
dataset = train_text + test_text + valid_text

In [None]:
index = len(train_text)
train_input = [train_text[i] for i in range(0,index) if i % 2 == 0]
train_output = [train_text[i] for i in range(0,index) if i % 2 == 1]
train_pairs = [(train, test) for train, test in zip(train_input, train_output)]

In [None]:
index = len(test_text)
test_input = [test_text[i] for i in range(0,index) if i % 2 == 0]
test_output = [test_text[i] for i in range(0,index) if i % 2 == 1]
test_pairs = [(train, test) for train, test in zip(test_input, test_output)]

In [None]:
index = len(valid_text)
val_input = [valid_text[i] for i in range(0,index) if i % 2 == 0]
val_output = [valid_text[i] for i in range(0,index) if i % 2 == 1]
val_pairs = [(train, test) for train, test in zip(val_input, val_output)]

## Vectorizing the text data

We'll use a instance of the `TextVectorization` layer to vectorize the text
data,
that is to say, to turn the original strings into integer sequences
where each integer represents the index of a word in a vocabulary.

We use the default string standardization (strip punctuation characters)
and splitting scheme (split on whitespace).



In [None]:
vocab_size = 15000
sequence_length = 20
batch_size = 64

strip_chars_one = string.punctuation
strip_chars_one = strip_chars_one.replace("?", "")
strip_chars_one = strip_chars_one.replace("!", "")
strip_chars_one = strip_chars_one.replace(".", "")
strip_chars_one = strip_chars_one.replace(",", "")

def custom_standardization_encoder(input_string):
    lowercase = tf.strings.lower(input_string)
    return tf.strings.regex_replace(lowercase, "[%s]" % re.escape(strip_chars_one), "")


inp_vectorization = TextVectorization(
    max_tokens=vocab_size, output_mode="int", output_sequence_length=sequence_length,
    standardize = custom_standardization_encoder
)

strip_chars = string.punctuation
strip_chars = strip_chars.replace("[", "")
strip_chars = strip_chars.replace("]", "")
strip_chars = strip_chars.replace("?", "")
strip_chars = strip_chars.replace("!", "")
strip_chars = strip_chars.replace(".", "")
strip_chars = strip_chars.replace(",", "")

def custom_standardization(input_string):
    lowercase = tf.strings.lower(input_string)
    return tf.strings.regex_replace(lowercase, "[%s]" % re.escape(strip_chars), "")

out_vectorization = TextVectorization(
    max_tokens=vocab_size, output_mode="int", output_sequence_length=sequence_length+1,
    standardize=custom_standardization,
)

index = len(dataset)
inp_texts = [dataset[i] for i in range(0,index) if i % 2 == 0]
out_texts = [dataset[i] for i in range(0,index) if i % 2 == 1]


inp_vectorization.adapt(inp_texts)
out_vectorization.adapt(out_texts)


Next, we'll format our datasets.

At each training step, the model will seek to predict target words N+1 (and beyond)
using the source sentence and the target words 0 to N.

As such, the training dataset will yield a tuple `(inputs, targets)`, where:

- `inputs` is a dictionary with the keys `encoder_inputs` and `decoder_inputs`.
`encoder_inputs` is the vectorized source sentence and `encoder_inputs` is the target sentence "so far",
that is to say, the words 0 to N used to predict word N+1 (and beyond) in the target sentence.
- `target` is the target sentence offset by one step:
it provides the next words in the target sentence -- what the model will try to predict.

## Decoder input:

![](https://drive.google.com/uc?export=view&id=10oBrq4YjCuB-bw2ovUp-hCJK0p_8gV2C)



In [None]:
def format_dataset(inp, out):
    inp = enc_vectorization(inp)
    out = dec_vectorization(out)
    return ({"encoder_inputs": inp, "decoder_inputs": out[:, :-1],}, out[:, 1:])


def make_dataset(pairs):
    inp_texts, out_texts = zip(*pairs)
    inp_texts = list(inp_texts)
    out_texts = list(out_texts)
    dataset = tf.data.Dataset.from_tensor_slices((inp_texts, out_texts))
    dataset = dataset.batch(batch_size)
    dataset = dataset.map(format_dataset)
    return dataset.shuffle(2048).prefetch(16).cache()


train_ds = make_dataset(train_pairs)
val_ds = make_dataset(val_pairs)
test_ds = make_dataset(test_pairs)

In [None]:
train_ds

<CacheDataset shapes: ({encoder_inputs: (None, 20), decoder_inputs: (None, 20)}, (None, 20)), types: ({encoder_inputs: tf.int64, decoder_inputs: tf.int64}, tf.int64)>

Let's take a quick look at the sequence shapes
(we have batches of 64 pairs, and all sequences are 20 steps long):

In [None]:
for inputs, targets in train_ds.take(1):
    print(f'inputs["encoder_inputs"].shape: {inputs["encoder_inputs"].shape}')
    print(f'inputs["decoder_inputs"].shape: {inputs["decoder_inputs"].shape}')
    
    print(f"targets.shape: {targets.shape}")

inputs["encoder_inputs"].shape: (64, 20)
inputs["decoder_inputs"].shape: (64, 20)
targets.shape: (64, 20)


## Building the model

Our sequence-to-sequence Transformer consists of a `TransformerEncoder`
and a `TransformerDecoder` chained together. To make the model aware of word order,
we also use a `PositionalEmbedding` layer.

The source sequence will be pass to the `TransformerEncoder`,
which will produce a new representation of it.
This new representation will then be passed
to the `TransformerDecoder`, together with the target sequence so far (target words 0 to N).
The `TransformerDecoder` will then seek to predict the next words in the target sequence (N+1 and beyond).

A key detail that makes this possible is causal masking
(see method `get_causal_attention_mask()` on the `TransformerDecoder`).
The `TransformerDecoder` sees the entire sequences at once, and thus we must make
sure that it only uses information from target tokens 0 to N when predicting token N+1
(otherwise, it could use information from the future, which would
result in a model that cannot be used at inference time).

## Model Architecture:

![](https://drive.google.com/uc?export=view&id=1_BX9YvY-E_7ttPOiY2wIOG6KW4FkmdAm)



In [None]:
class TransformerEncoder(layers.Layer):
    def __init__(self, embed_dim, dense_dim, num_heads, **kwargs):
        super(TransformerEncoder, self).__init__(**kwargs)
        self.embed_dim = embed_dim
        self.dense_dim = dense_dim
        self.num_heads = num_heads
        self.attention = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim
        )
        self.dense_proj = keras.Sequential(
            [layers.Dense(dense_dim, activation="relu"), layers.Dense(embed_dim),]
        )
        self.layernorm_1 = layers.LayerNormalization()
        self.layernorm_2 = layers.LayerNormalization()
        self.supports_masking = True

    def call(self, inputs, mask=None):
        if mask is not None:
            padding_mask = tf.cast(mask[:, tf.newaxis, tf.newaxis, :], dtype="int32")
        attention_output = self.attention(
            query=inputs, value=inputs, key=inputs, attention_mask=padding_mask
        )
        proj_input = self.layernorm_1(inputs + attention_output)
        proj_output = self.dense_proj(proj_input)
        return self.layernorm_2(proj_input + proj_output)


In [None]:
class PositionalEmbedding(layers.Layer):
    def __init__(self, sequence_length, vocab_size, embed_dim, **kwargs):
        super(PositionalEmbedding, self).__init__(**kwargs)
        self.token_embeddings = layers.Embedding(
            input_dim=vocab_size, output_dim=embed_dim
        )
        self.position_embeddings = layers.Embedding(
            input_dim=sequence_length, output_dim=embed_dim
        )
        self.sequence_length = sequence_length
        self.vocab_size = vocab_size
        self.embed_dim = embed_dim

    def call(self, inputs):
        length = tf.shape(inputs)[-1]
        positions = tf.range(start=0, limit=length, delta=1)
        embedded_tokens = self.token_embeddings(inputs)
        embedded_positions = self.position_embeddings(positions)
        return embedded_tokens + embedded_positions

    def compute_mask(self, inputs, mask=None):
        return tf.math.not_equal(inputs, 0)




In [None]:
class TransformerDecoder(layers.Layer):
    def __init__(self, embed_dim, latent_dim, num_heads, **kwargs):
        super(TransformerDecoder, self).__init__(**kwargs)
        self.embed_dim = embed_dim
        self.latent_dim = latent_dim
        self.num_heads = num_heads
        self.attention_1 = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim
        )
        self.attention_2 = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim
        )
        self.dense_proj = keras.Sequential(
            [layers.Dense(latent_dim, activation="relu"), layers.Dense(embed_dim),]
        )
        self.layernorm_1 = layers.LayerNormalization()
        self.layernorm_2 = layers.LayerNormalization()
        self.layernorm_3 = layers.LayerNormalization()
        self.supports_masking = True

    def call(self, inputs, encoder_outputs, mask=None):
        causal_mask = self.get_causal_attention_mask(inputs)
        if mask is not None:
            padding_mask = tf.cast(mask[:, tf.newaxis, :], dtype="int32")
            padding_mask = tf.minimum(padding_mask, causal_mask)

        attention_output_1 = self.attention_1(
            query=inputs, value=inputs, key=inputs, attention_mask=causal_mask
        )
        out_1 = self.layernorm_1(inputs + attention_output_1)

        attention_output_2 = self.attention_2(
            query=out_1,
            value=encoder_outputs,
            key=encoder_outputs,
            attention_mask=padding_mask,
        )
        out_2 = self.layernorm_2(out_1 + attention_output_2)

        proj_output = self.dense_proj(out_2)
        return self.layernorm_3(out_2 + proj_output)

    def get_causal_attention_mask(self, inputs):
        input_shape = tf.shape(inputs)
        batch_size, sequence_length = input_shape[0], input_shape[1]
        i = tf.range(sequence_length)[:, tf.newaxis]
        j = tf.range(sequence_length)
        mask = tf.cast(i >= j, dtype="int32")
        mask = tf.reshape(mask, (1, input_shape[1], input_shape[1]))
        mult = tf.concat(
            [tf.expand_dims(batch_size, -1), tf.constant([1, 1], dtype=tf.int32)],
            axis=0,
        )
        return tf.tile(mask, mult)

## Attention:

![](https://drive.google.com/uc?export=view&id=158scEbQNyJg1TxMhrDCWAN925wRCHjfe)



Next, we assemble the end-to-end model.

In [None]:
embed_dim = 256
latent_dim = 2048
num_heads = 8

encoder_inputs = keras.Input(shape=(None,), dtype="int64", name="encoder_inputs")
x = PositionalEmbedding(sequence_length, vocab_size, embed_dim)(encoder_inputs)
encoder_outputs = TransformerEncoder(embed_dim, latent_dim, num_heads)(x)
encoder = keras.Model(encoder_inputs, encoder_outputs)

decoder_inputs = keras.Input(shape=(None,), dtype="int64", name="decoder_inputs")
encoded_seq_inputs = keras.Input(shape=(None, embed_dim), name="decoder_state_inputs")
x = PositionalEmbedding(sequence_length, vocab_size, embed_dim)(decoder_inputs)
x = TransformerDecoder(embed_dim, latent_dim, num_heads)(x, encoded_seq_inputs)
x = layers.Dropout(0.5)(x)
decoder_outputs = layers.Dense(vocab_size, activation="softmax")(x)
decoder = keras.Model([decoder_inputs, encoded_seq_inputs], decoder_outputs)

decoder_outputs = decoder([decoder_inputs, encoder_outputs])
transformer = keras.Model(
    [encoder_inputs, decoder_inputs], decoder_outputs, name="transformer"
)

## Training our model

We'll use accuracy as a quick way to monitor training progress on the validation data.
Note that machine translation typically uses BLEU scores as well as other metrics, rather than accuracy.

Here we only train for 1 epoch, but to get the model to actually converge
you should train for at least 30 epochs.

In [None]:
epochs = 30 

transformer.summary()
transformer.compile(
    "adam", loss="sparse_categorical_crossentropy", metrics=["accuracy"]
)
transformer.fit(train_ds, epochs=epochs, validation_data=val_ds)

## Decoding test sentences

Finally, let's demonstrate how to reply to a conversation.
We simply feed into the model the vectorized input sentence
as well as the target token `"[start]"`, then we repeatedly generated the next token, until
we hit the token `"[end]"`.

In [None]:
out_vocab = out_vectorization.get_vocabulary()
out_index_lookup = dict(zip(range(len(out_vocab)), out_vocab))
max_decoded_sentence_length = 20


def decode_sequence(input_sentence):
    tokenized_input_sentence = inp_vectorization([input_sentence])
    decoded_sentence = "[start]"
    for i in range(max_decoded_sentence_length):
        tokenized_target_sentence = out_vectorization([decoded_sentence])[:, :-1]
        predictions = transformer([tokenized_input_sentence, tokenized_target_sentence])

        sampled_token_index = np.argmax(predictions[0, i, :])
        sampled_token = out_index_lookup[sampled_token_index]
        decoded_sentence += " " + sampled_token

        if sampled_token == "[end]":
          break
    return decoded_sentence





In [None]:
input_sentences = ['bob ! i hear your team won the match .',
                   'how do you do ?',
                   'do you think you are introverted or extroverted ?',
                   'yes , you are right . after all , the quality of your air conditioners is good . the only problem is price .',
                   'what ? he cannot do this to you .',
                   'thank you , lisa .',
                   'oh , she can make her own decisions .',
                   'but if i do not pass , will you call me ?',
                   'what happened , john ?',
                   'nice to meet you , mr . wilson .',
                   'hello , is sue there ?',
                   'hi ! i am happy you could make it .',
                   'what foods do you eat now ?',
                   'ok . is the plane on schedule ?']

In [None]:
for input_sentence in input_sentences:
    translated = decode_sequence(input_sentence)
    print(input_sentence)
    print(translated)
    print('***************')


In [None]:
for _ in range(30):
    input_sentence = random.choice(test_input)
    translated = decode_sequence(input_sentence)
    print(input_sentence)
    print(translated)
    print('***************')