### Install packages

### Import libraries

In [62]:
import string
import random
import matplotlib
import pandas as pd
import tensorflow as tf
from tensorflow import keras
from nltk import word_tokenize

### Get SQuAD data from GitHub

In [63]:
csv_url = 'https://raw.githubusercontent.com/davisjrule/SQuAD-bot/main/train-squad.csv'
df = pd.read_csv(csv_url)
df.drop('id', axis=1, inplace=True)
df.drop('answer_start', axis=1, inplace=True)
df.drop('Unnamed: 0', axis=1, inplace=True)
df.drop('context', axis=1, inplace=True)
df.rename(columns={"text": "answer"}, inplace = True)
df = df.iloc[0:500]


### Get chat data from GitHub

In [64]:
csv_url = 'https://raw.githubusercontent.com/davisjrule/SQuAD-bot/main/topical_chat.csv'
df2 = pd.read_csv(csv_url)
df2.drop('conversation_id', axis=1, inplace=True)
df2.drop('sentiment', axis=1, inplace=True)

questions = df2[0:5000:2]
responses = df2[1:5000:2]

questions.head()

# create dataframe
data = pd.DataFrame()
data['question'] = df2[0:5000:2]
data['answer'] = df2[1:5000:2].values
data = data.iloc[0:700]
data = pd.concat([data, df])

data = data.sample(frac=1).reset_index(drop=True)


### Process data

In [65]:
def clean_text(text):
  text = str(text).lower()
  text = text.replace("can't", "cannot")
  text = text.replace("won't", "will not")
  text = text.replace("i'm", "i am")
  text = text.replace("'ve", " have")
  text = text.replace("'ll", " will")
  text = text.replace("n't", " not")
  text = text.replace("'d", " would")
  text = text.replace("'s", " is")
  text = text.replace("'re", " are")
  for char in string.punctuation:
    if char != '-':
      text = text.replace(char, '')
  return text

In [66]:
pairs = list(zip(data['question'],data['answer']))
pairs

[('Beyonce joined 2 other women on what list from Billboard magazine in 2011?',
  'Top 20 Hot 100 Songwriters'),
 ('What was Angela Merkel serving as in relation to the letter?',
  'the head of the G7 in Germany'),
 ('When did Beyoncé begin to manage the girl group?', '1995'),
 (" Before Google launched gmail, that was the name of a free email service offered by garfield's website. That actually seems more fun.",
  ' I know now everytime I open gmail I think of garfield'),
 (' I would do exactly the same as you.  How could you not pass that up the chance to set yourself up for life? lol',
  ' Yeah, that is quite lucky. Do you follow softball at all?'),
 ('How much did Beyonce raise for Obama at the 40/40 Club?', '4 million'),
 ('Who reported Beyonce to e the top earning woman in music?', 'Forbes'),
 (' Now that you mention it I do somewhat remember seeing how he did that. It was almost like a piece of metal that would lock into his shoe so he could almost lean 45 degrees. I believe he 

In [67]:
import numpy as np
import re

input_docs = []
target_docs = []
input_tokens = set()
target_tokens = set()

for line in pairs[:1200]:
  input_doc, target_doc = line[0], line[1]

  input_doc = clean_text(input_doc)
  target_doc = clean_text(target_doc)

  # Appending each input sentence to input_docs
  input_docs.append(input_doc)
  
  # Splitting words from punctuation  
  target_doc = " ".join(re.findall(r"[\w']+|[^\s\w]", target_doc))

  # Redefine target_doc below and append it to target_docs
  target_doc = '<START> ' + target_doc + ' <END>'
  target_docs.append(target_doc)

  
  # Now we split up each sentence into words and add each unique word to our vocabulary set
  for token in re.findall(r"[\w']+|[^\s\w]", input_doc):
    if token not in input_tokens:
      input_tokens.add(token)
       
  for token in target_doc.split():
    if token not in target_tokens:
      target_tokens.add(token)


input_tokens = sorted(list(input_tokens))
target_tokens = sorted(list(target_tokens))
num_encoder_tokens = len(input_tokens)
num_decoder_tokens = len(target_tokens)

# target_tokens

In [68]:
print(input_tokens)

['-', '001', '1', '10', '100', '10000', '11', '1110', '12', '15', '15000000', '16', '165', '17', '1853', '1895', '19', '1900', '1909', '1912', '1918', '1928', '1934', '1936', '1940', '1950s', '1958', '1963', '1977', '1980', '1991', '1992', '1993', '1994', '1995', '1998', '19th', '1mb', '2', '20', '2000', '2001', '2002', '2003', '2004', '2005', '2006', '2007', '2008', '2009', '2010', '2011', '2012', '2013', '2014', '2015', '2020', '20th', '224', '24', '2430', '26', '270', '2795', '29', '2nd', '3', '3000', '309', '30s', '31', '32000', '38', '39', '3rd', '4', '40', '4000', '4040', '43', '45', '4500', '46th', '49', '4th', '5', '50', '500k', '52nd', '55', '57th', '5th', '6', '622', '628', '63', '6331', '64', '7', '70k', '71', '75', '7500', '8', '80', '84', '85', '9', 'a', 'abc', 'abilities', 'ability', 'able', 'about', 'above', 'absentee', 'absolutely', 'acadian', 'accent', 'accept', 'acclaim', 'according', 'achieve', 'achieved', 'acoustic', 'acquittal', 'act', 'acting', 'active', 'activity

In [69]:
input_features_dict = dict(
    [(token, i) for i, token in enumerate(input_tokens)])
target_features_dict = dict(
    [(token, i) for i, token in enumerate(target_tokens)])

reverse_input_features_dict = dict(
    (i, token) for token, i in input_features_dict.items())
reverse_target_features_dict = dict(
    (i, token) for token, i in target_features_dict.items())


In [70]:
max_encoder_seq_length = max([len(re.findall(r"[\w']+|[^\s\w]", input_doc)) for input_doc in input_docs])
max_decoder_seq_length = max([len(re.findall(r"[\w']+|[^\s\w]", target_doc)) for target_doc in target_docs])

encoder_input_data = np.zeros(
    (len(input_docs), max_encoder_seq_length, num_encoder_tokens),
    dtype='float32')
decoder_input_data = np.zeros(
    (len(input_docs), max_decoder_seq_length, num_decoder_tokens),
    dtype='float32')
decoder_target_data = np.zeros(
    (len(input_docs), max_decoder_seq_length, num_decoder_tokens),
    dtype='float32')

for line, (input_doc, target_doc) in enumerate(zip(input_docs, target_docs)):
    for timestep, token in enumerate(re.findall(r"[\w']+|[^\s\w]", input_doc)):
        #Assign 1. for the current line, timestep, & word in encoder_input_data
        encoder_input_data[line, timestep, input_features_dict[token]] = 1.
    
    for timestep, token in enumerate(target_doc.split()):
        decoder_input_data[line, timestep, target_features_dict[token]] = 1.
        if timestep > 0:
            decoder_target_data[line, timestep - 1, target_features_dict[token]] = 1.



In [71]:
max_encoder_seq_length = max([len(re.findall(r"[\w']+|[^\s\w]", input_doc)) for input_doc in input_docs])
max_decoder_seq_length = max([len(re.findall(r"[\w']+|[^\s\w]", target_doc)) for target_doc in target_docs])

encoder_input_data = np.zeros(
    (len(input_docs), max_encoder_seq_length), dtype='int32'
)
decoder_input_data = np.zeros(
    (len(input_docs), max_decoder_seq_length), dtype='int32'
)
decoder_output_data = np.zeros(
    (len(input_docs), max_decoder_seq_length, num_decoder_tokens),
    dtype='int32'
  )
for line, (input_doc, target_doc) in enumerate(zip(input_docs, target_docs)):
  for timestep, token in enumerate(re.findall(r"[\w']+|[^\s\w]", input_doc)):
    encoder_input_data[line, timestep] = input_features_dict[token]
  for timestep, token in enumerate(target_doc.split()):
    decoder_input_data[line, timestep] = target_features_dict[token]
    if timestep > 0:
        decoder_output_data[line, timestep - 1, target_features_dict[token]] = 1.


In [72]:
from tensorflow.keras import layers , activations , models , preprocessing

In [74]:
import tensorflow as tf

encoder_inputs = tf.keras.layers.Input(shape=( max_encoder_seq_length, ))
encoder_embedding = tf.keras.layers.Embedding( num_encoder_tokens, 200 , mask_zero=True) (encoder_inputs)
encoder_outputs , state_h , state_c = tf.keras.layers.LSTM( 200 , return_state=True )( encoder_embedding )
encoder_states = [ state_h , state_c ]

decoder_inputs = tf.keras.layers.Input(shape=( None ,  ))
decoder_embedding = tf.keras.layers.Embedding( num_decoder_tokens, 200 , mask_zero=True) (decoder_inputs)
decoder_lstm = tf.keras.layers.LSTM( 200 , return_state=True , return_sequences=True )
decoder_outputs , _ , _ = decoder_lstm ( decoder_embedding , initial_state=encoder_states )
decoder_dense = tf.keras.layers.Dense( num_decoder_tokens , activation=tf.keras.activations.softmax ) 
output = decoder_dense ( decoder_outputs )

model = tf.keras.models.Model([encoder_inputs, decoder_inputs], output )
model.compile(optimizer=tf.keras.optimizers.Adam(), loss='categorical_crossentropy')

model.summary()

Model: "model"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_3 (InputLayer)           [(None, 62)]         0           []                               
                                                                                                  
 input_4 (InputLayer)           [(None, None)]       0           []                               
                                                                                                  
 embedding_2 (Embedding)        (None, 62, 200)      578800      ['input_3[0][0]']                
                                                                                                  
 embedding_3 (Embedding)        (None, None, 200)    530400      ['input_4[0][0]']                
                                                                                              

In [75]:
model.fit([encoder_input_data , decoder_input_data], decoder_output_data, batch_size=50, epochs=200 ) 
model.save( 'model.h5' ) 

Epoch 1/200
Epoch 2/200
Epoch 3/200
Epoch 4/200
Epoch 5/200
Epoch 6/200
Epoch 7/200
Epoch 8/200
Epoch 9/200
Epoch 10/200
Epoch 11/200
Epoch 12/200
Epoch 13/200
Epoch 14/200
Epoch 15/200
Epoch 16/200
Epoch 17/200
Epoch 18/200
Epoch 19/200
Epoch 20/200
Epoch 21/200
Epoch 22/200
Epoch 23/200
Epoch 24/200
Epoch 25/200
Epoch 26/200
Epoch 27/200
Epoch 28/200
Epoch 29/200
Epoch 30/200
Epoch 31/200
Epoch 32/200
Epoch 33/200
Epoch 34/200
Epoch 35/200
Epoch 36/200
Epoch 37/200
Epoch 38/200
Epoch 39/200
Epoch 40/200
Epoch 41/200
Epoch 42/200
Epoch 43/200
Epoch 44/200
Epoch 45/200
Epoch 46/200
Epoch 47/200
Epoch 48/200
Epoch 49/200
Epoch 50/200
Epoch 51/200
Epoch 52/200
Epoch 53/200
Epoch 54/200
Epoch 55/200
Epoch 56/200
Epoch 57/200
Epoch 58/200
Epoch 59/200
Epoch 60/200
Epoch 61/200
Epoch 62/200
Epoch 63/200
Epoch 64/200
Epoch 65/200
Epoch 66/200
Epoch 67/200
Epoch 68/200
Epoch 69/200
Epoch 70/200
Epoch 71/200
Epoch 72/200
Epoch 73/200
Epoch 74/200
Epoch 75/200
Epoch 76/200
Epoch 77/200
Epoch 78

In [None]:
from keras.models import load_model
model = load_model('model.h5')

In [76]:
def make_inference_models():
    
    encoder_model = tf.keras.models.Model(encoder_inputs, encoder_states)
    
    decoder_state_input_h = tf.keras.layers.Input(shape=( 200 ,))
    decoder_state_input_c = tf.keras.layers.Input(shape=( 200 ,))
    
    decoder_states_inputs = [decoder_state_input_h, decoder_state_input_c]
    
    decoder_outputs, state_h, state_c = decoder_lstm(
        decoder_embedding , initial_state=decoder_states_inputs)
    decoder_states = [state_h, state_c]
    decoder_outputs = decoder_dense(decoder_outputs)
    decoder_model = tf.keras.models.Model(
        [decoder_inputs] + decoder_states_inputs,
        [decoder_outputs] + decoder_states)
    
    return encoder_model , decoder_model

In [77]:
def str_to_tokens( sentence : str ):
    words = sentence.lower().split()
    tokens_list = list()
    for word in words:
        tokens_list.append( input_features_dict[word] ) 
    return preprocessing.sequence.pad_sequences( [tokens_list] , maxlen=max_encoder_seq_length , padding='post')

In [None]:
enc_model , dec_model = make_inference_models()

for _ in range(10):
    states_values = enc_model.predict( str_to_tokens( input( 'Enter question : ' ) ) )
    empty_target_seq = np.zeros( ( 1 , 1 ) )
    empty_target_seq[0, 0] = target_features_dict['<START>']
    stop_condition = False
    decoded_translation = ''
    while not stop_condition :
        dec_outputs , h , c = dec_model.predict([ empty_target_seq ] + states_values )
        sampled_word_index = np.argmax( dec_outputs[0, -1, :] )
        sampled_word = reverse_target_features_dict[sampled_word_index]
        decoded_translation += " " + sampled_word
        
        if sampled_word == '<END>' or len(decoded_translation.split()) > max_decoder_seq_length:
            stop_condition = True
            
        empty_target_seq = np.zeros( ( 1 , 1 ) )  
        empty_target_seq[ 0 , 0 ] = sampled_word_index
        states_values = [ h , c ] 

    print( decoded_translation )


In [None]:
from keras.models import load_model
training_model = load_model('model.h5')

encoder_model = tf.keras.models.Model(encoder_inputs, encoder_states)
    
decoder_state_input_h = tf.keras.layers.Input(shape=( 200 ,))
decoder_state_input_c = tf.keras.layers.Input(shape=( 200 ,))

decoder_states_inputs = [decoder_state_input_h, decoder_state_input_c]

decoder_outputs, state_h, state_c = decoder_lstm(
    decoder_embedding , initial_state=decoder_states_inputs)
decoder_states = [state_h, state_c]
decoder_outputs = decoder_dense(decoder_outputs)
decoder_model = tf.keras.models.Model(
    [decoder_inputs] + decoder_states_inputs,
    [decoder_outputs] + decoder_states)

def decode_response(test_input):
  states_values = encoder_model.predict(test_input)

  target_seq = np.zeros( ( 1 , 1 ) )
  target_seq[0, 0] = target_features_dict['<START>']
  stop_condition = False
  decoded_translation = ""
  while not stop_condition :
      dec_outputs , h , c = training_model.predict([ target_seq ] + states_values )

      sampled_word_index = np.argmax( dec_outputs[0, -1, :] )
      sampled_word = reverse_target_features_dict[sampled_word_index]
      decoded_translation += " " + sampled_word
      
      if sampled_word == '<END>' or len(decoded_translation.split()) > max_decoder_seq_length:
          stop_condition = True
          
      empty_target_seq = np.zeros( ( 1 , 1 ) )  
      empty_target_seq[ 0 , 0 ] = sampled_word_index
      states_values = [ h , c ] 

  return decoded_translation


In [None]:
def generate_response(input):
  input = clean_text(input)
  #tokens = re.findall(r"[\w']+|[^\s\w]", input)
  #input_matrix = np.zeros(
  #    (1, max_encoder_seq_length),
  #    dtype='int32')
  #for timestep, token in enumerate(tokens):
  #  if token in input_features_dict:
  #    input_matrix[0, timestep] = input_features_dict[token]
  #output = decode_response(input_matrix)
  output = decode_response(str_to_tokens(input))
  return output.replace("<START>","").replace("<END>","")


while True:
  reply = input()
  if reply == "STOP":
    break
  print(generate_response(reply))

In [None]:
def make_inference_models():
    
    encoder_model = tf.keras.models.Model(encoder_inputs, encoder_states)
    
    decoder_state_input_h = tf.keras.layers.Input(shape=( 200 ,))
    decoder_state_input_c = tf.keras.layers.Input(shape=( 200 ,))
    
    decoder_states_inputs = [decoder_state_input_h, decoder_state_input_c]
    
    decoder_outputs, state_h, state_c = decoder_lstm(
        decoder_embedding , initial_state=decoder_states_inputs)
    decoder_states = [state_h, state_c]
    decoder_outputs = decoder_dense(decoder_outputs)
    decoder_model = tf.keras.models.Model(
        [decoder_inputs] + decoder_states_inputs,
        [decoder_outputs] + decoder_states)
    
    return encoder_model , decoder_model

Legacy Code:

In [None]:
from tensorflow import keras
from keras.layers import Input, LSTM, Dense
from keras.models import Model

#Dimensionality
dimensionality = 256

#The batch size and number of epochs
batch_size = 10
epochs = 10

#Encoder
encoder_inputs = Input(shape=(None, num_encoder_tokens))
encoder_lstm = LSTM(dimensionality, return_state=True)
encoder_outputs, state_hidden, state_cell = encoder_lstm(encoder_inputs)
encoder_states = [state_hidden, state_cell]

#Decoder
decoder_inputs = Input(shape=(None, num_decoder_tokens))
decoder_lstm = LSTM(dimensionality, return_sequences=True, return_state=True)
decoder_outputs, decoder_state_hidden, decoder_state_cell = decoder_lstm(decoder_inputs, initial_state=encoder_states)
decoder_dense = Dense(num_decoder_tokens, activation='softmax')
decoder_outputs = decoder_dense(decoder_outputs)

#Model
training_model = Model([encoder_inputs, decoder_inputs], decoder_outputs)

#Compiling
training_model.compile(optimizer='rmsprop', loss='categorical_crossentropy', metrics=['accuracy'], sample_weight_mode='temporal')

#Training
training_model.fit([encoder_input_data, decoder_input_data], decoder_target_data, batch_size = batch_size, epochs = epochs, validation_split = 0.2)
training_model.save('training_model.h5')
     

In [None]:
from keras.models import load_model
training_model = load_model('training_model.h5')
encoder_inputs = training_model.input[0]
encoder_outputs, state_h_enc, state_c_enc = training_model.layers[2].output
encoder_states = [state_h_enc, state_c_enc]
encoder_model = Model(encoder_inputs, encoder_states)

latent_dim = 256
decoder_state_input_hidden = Input(shape=(latent_dim,))
decoder_state_input_cell = Input(shape=(latent_dim,))
decoder_states_inputs = [decoder_state_input_hidden, decoder_state_input_cell]
decoder_outputs, state_hidden, state_cell = decoder_lstm(decoder_inputs, initial_state=decoder_states_inputs)
decoder_states = [state_hidden, state_cell]
decoder_outputs = decoder_dense(decoder_outputs)
decoder_model = Model([decoder_inputs] + decoder_states_inputs, [decoder_outputs] + decoder_states)

def decode_response(test_input):
  #Getting the output states to pass into the decoder
  states_value = encoder_model.predict(test_input)
  
  #Generating empty target sequence of length 1
  target_seq = np.zeros((1, 1, num_decoder_tokens))
  
  #Setting the first token of target sequence with the start token
  target_seq[0, 0, target_features_dict['<START>']] = 1.
  
  #A variable to store our response word by word
  decoded_sentence = ''
    
  stop_condition = False

  while not stop_condition:
    #Predicting output tokens with probabilities and states
    output_tokens, hidden_state, cell_state = decoder_model.predict([target_seq] + states_value)
    
    #Choosing the one with highest probability
    sampled_token_index = np.argmax(output_tokens[0, -1, :])
    sampled_token = reverse_target_features_dict[sampled_token_index]
    decoded_sentence += " " + sampled_token
    
    #Stop if hit max length or found the stop token
    if (sampled_token == '' or len(decoded_sentence) > max_decoder_seq_length):
      stop_condition = True
    
    #Update the target sequence
    target_seq = np.zeros((1, 1, num_decoder_tokens))
    target_seq[0, 0, sampled_token_index] = 1.
    
    #Update states
    states_value = [hidden_state, cell_state]
  return decoded_sentence
     


In [None]:
class ChatBot:
  negative_responses = ("no", "nope", "nah", "naw", "not a chance", "sorry")
  exit_commands = ("quit", "pause", "exit", "goodbye", "bye", "later", "stop")
  
  #Method to start the conversation
  def start_chat(self):
    user_response = input("Hi, I'm a chatbot trained on random dialogs. Would you like to chat with me?\n")
    
    if user_response in self.negative_responses:
      print("Ok, have a great day!")
      return
    self.chat(user_response)
  
  #Method to handle the conversation
  def chat(self, reply):
    while not self.make_exit(reply):
      reply = input(self.generate_response(reply)+"\n")
    
  #Method to convert user input into a matrix
  def string_to_matrix(self, user_input):
    tokens = re.findall(r"[\w']+|[^\s\w]", user_input)
    user_input_matrix = np.zeros(
      (1, max_encoder_seq_length, num_encoder_tokens),
      dtype='float32')
    for timestep, token in enumerate(tokens):
      if token in input_features_dict:
        user_input_matrix[0, timestep, input_features_dict[token]] = 1.
    return user_input_matrix

  #Method that will create a response using seq2seq model we built
  def generate_response(self, user_input):
    input_matrix = self.string_to_matrix(user_input)  
    chatbot_response = decode_response(input_matrix)
    #Remove  and  tokens from chatbot_response
    chatbot_response = chatbot_response.replace("<START>",'')
    chatbot_response = chatbot_response.replace("<END>",'')
    return chatbot_response
  
  #Method to check for exit commands
  def make_exit(self, reply):
    for exit_command in self.exit_commands:
      if exit_command in reply:
        print("Ok, have a great day!")
        return True
    return False
  
chatbot = ChatBot()
chatbot.start_chat()

In [None]:
def generate_response(input):
  input = clean_text(input)
  tokens = re.findall(r"[\w']+|[^\s\w]", input)
  input_matrix = np.zeros(
      (1, max_encoder_seq_length, num_encoder_tokens),
      dtype='float32')
  for timestep, token in enumerate(tokens):
    if token in input_features_dict:
      input_matrix[0, timestep, input_features_dict[token]] = 1
  output = decode_response(input_matrix)
  return output.replace("<START>","").replace("<END>","")


while True:
  reply = input()
  if reply == "STOP":
    break
  print(generate_response(reply)+'\n')

In [None]:
def save_model(model):
  model.save('')
  return pick_model

In [None]:
def load_model():
  return model