In [1]:
import numpy as np
import pandas as pd
import pickle
from tensorflow.keras import layers, activations, models, preprocessing
from tensorflow.keras import preprocessing, utils
import os
import yaml
import tensorflow as tf

In [2]:
from google.colab import drive
drive.mount('/content/drive')


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [3]:
dir_path = '/content/drive/MyDrive/My_Work/First_Chat_Bot/chatt'
files_list = os.listdir(dir_path + os.sep)

In [4]:
class Tokenize:
    """
    A class used to tokenize conversational data from YAML files.
    It separates questions and answers, removes invalid statements,
    and generates a tokenizer for vocabulary extraction.

    Attributes:
        files_list (list): List of file names containing conversational data.
        dir_path (str): Directory path where the files are stored.
    """

    def __init__(self, files_list, dir_path):
        """
        Initializes the Tokenize class with a list of files and their directory path.

        Args:
            files_list (list): List of YAML file names containing conversations.
            dir_path (str): Path to the directory containing the YAML files.
        """
        self.files_list = files_list
        self.dir_path = dir_path

    def separate_question_answers(self):
        """
        Extracts questions and answers from the conversational data in the provided files.

        Returns:
            tuple: A tuple containing two lists - questions and their corresponding answers.
        """
        questions = list()
        answers = list()
        for filepath in self.files_list:
            stream = open(self.dir_path + os.sep + filepath, 'rb')
            docs = yaml.safe_load(stream)
            conversations = docs['conversations']
            for con in conversations:
                if len(con) > 2:
                    questions.append(con[0])
                    replies = con[1:]
                    ans = ""
                    for rep in replies:
                        ans += " " + rep
                    answers.append(ans)
                elif len(con) > 1:
                    questions.append(con[0])
                    answers.append(con[1])

        return questions, answers

    def remove_not_str_statements(self):
        """
        Cleans the extracted data by removing answers that are not strings
        and appends start and end tags to valid answers.

        Returns:
            tuple: A tuple containing two lists - cleaned questions and tagged answers.
        """
        questions, answers = self.separate_question_answers()
        answers_with_tags = list()
        for i in range(len(answers)):
            if type(answers[i]) == str:
                answers_with_tags.append(answers[i])
            else:
                questions.pop(i)
        answers = list()
        for i in range(len(answers_with_tags)):
            answers.append(" <start> " + answers_with_tags[i] + " <end> ")

        return questions, answers

    def tokenization(self):
        """
        Tokenizes the questions and answers to create a vocabulary.

        Returns:
            tuple: A tuple containing the tokenizer object and the vocabulary size.
        """
        questions, answers = self.remove_not_str_statements()
        tokenizer = preprocessing.text.Tokenizer()
        tokenizer.fit_on_texts(questions + answers)
        vocab_size = len(tokenizer.word_index) + 1
        # print("vocab size is:", vocab_size)
        return tokenizer, vocab_size


In [5]:
tokenizer, vocab_size = Tokenize(files_list, dir_path).tokenization()
questions, answers = Tokenize(files_list, dir_path).remove_not_str_statements()


In [6]:
# from gensim.models import Word2Vec
import re

In [7]:
class Dataprepration:
    """
    A class for preparing encoder and decoder input/output data
    for training a sequence-to-sequence model.

    Attributes:
        questions (list): List of question sentences.
        answers (list): List of answer sentences.
    """

    def __init__(self, questions, answers):
        """
        Initializes the Dataprepration class with questions and answers.

        Args:
            questions (list): List of question sentences.
            answers (list): List of answer sentences.
        """
        self.questions = questions
        self.answers = answers

    def encoder_input_data(self):
        """
        Prepares the encoder input data by tokenizing and padding the questions.

        Returns:
            tuple: A tuple containing:
                - encoder_input_d (numpy array): Padded, tokenized questions.
                - maxlen_questions (int): Maximum sequence length of questions.
                - vocab_size (int): Vocabulary size of the tokenizer.
        """
        tokenizer, vocab_size = Tokenize(files_list, dir_path).tokenization()
        tokenized_questions = tokenizer.texts_to_sequences(self.questions)
        maxlen_questions = max([len(x) for x in tokenized_questions])
        padded_questions = preprocessing.sequence.pad_sequences(tokenized_questions, maxlen=maxlen_questions, padding='post')
        encoder_input_d = np.array(padded_questions)
        return encoder_input_d, maxlen_questions, vocab_size

    def decoder_input_data(self):
        """
        Prepares the decoder input data by tokenizing and padding the answers.

        Returns:
            tuple: A tuple containing:
                - decoder_input_d (numpy array): Padded, tokenized answers.
                - maxlen_answers (int): Maximum sequence length of answers.
        """
        tokenizer, vocab_size = Tokenize(files_list, dir_path).tokenization()
        tokenized_answers = tokenizer.texts_to_sequences(self.answers)
        maxlen_answers = max([len(x) for x in tokenized_answers])
        padded_answers = preprocessing.sequence.pad_sequences(tokenized_answers, maxlen=maxlen_answers, padding='post')
        decoder_input_d = np.array(padded_answers)
        return decoder_input_d, maxlen_answers

    def decoder_output_data(self):
        """
        Prepares the decoder output data by shifting tokenized answers,
        padding them, and converting them to one-hot encoded format.

        Returns:
            tuple: A tuple containing:
                - decoder_output_d (numpy array): One-hot encoded, padded tokenized answers.
                - maxlen_answers (int): Maximum sequence length of answers.
        """
        tokenized_answers = tokenizer.texts_to_sequences(self.answers)
        maxlen_answers = max([len(x) for x in tokenized_answers])
        for i in range(len(tokenized_answers)):
            tokenized_answers[i] = tokenized_answers[i][1:]  # Shift the answers by one time step
        padded_answers = preprocessing.sequence.pad_sequences(tokenized_answers, maxlen=maxlen_answers, padding='post')
        onehot_answers = utils.to_categorical(padded_answers, vocab_size)
        decoder_output_d = np.array(onehot_answers)
        return decoder_output_d, maxlen_answers


In [8]:
class Encoder_Decoder:
    """
    A class to create and train an encoder-decoder model using an LSTM-based sequence-to-sequence architecture.

    Attributes:
        questions (list): List of input questions.
        answers (list): List of corresponding answers.
    """

    def __init__(self, questions, answers):
        """
        Initializes the Encoder_Decoder class with input questions and answers.

        Args:
            questions (list): List of input question sentences.
            answers (list): List of target answer sentences.
        """
        self.questions = questions
        self.answers = answers

    def modeling(self):
        """
        Creates and trains the encoder-decoder model. This includes:
        - Preparing encoder input data, decoder input data, and decoder output data.
        - Defining the model architecture, including embedding layers, LSTM layers, and dense layers.
        - Compiling and training the model on the prepared data.

        Returns:
            tuple: A tuple containing:
                - model (tf.keras.Model): The trained encoder-decoder model.
                - decoder_embedding (tf.Tensor): The embedding layer for the decoder inputs.
                - decoder_dense (tf.Tensor): The dense layer for generating predictions.
                - decoder_inputs (tf.Tensor): Input tensor for the decoder.
                - decoder_lstm (tf.keras.layers.LSTM): The LSTM layer used in the decoder.
                - encoder_inputs (tf.Tensor): Input tensor for the encoder.
                - encoder_states (list): List of encoder states (state_h and state_c).
        """
        preparation_object = Dataprepration(self.questions, self.answers)

        # Prepare input and output data
        encoder_input_da, maxlen_questions, VOCAB_SIZE = preparation_object.encoder_input_data()
        decoder_input_da, maxlen_answers = preparation_object.decoder_input_data()
        decoder_output_da, _ = preparation_object.decoder_output_data()

        # Define encoder
        encoder_inputs = tf.keras.layers.Input(shape=(maxlen_questions,))
        encoder_embedding = tf.keras.layers.Embedding(VOCAB_SIZE, 200, mask_zero=True)(encoder_inputs)
        encoder_outputs, state_h, state_c = tf.keras.layers.LSTM(200, return_state=True)(encoder_embedding)
        encoder_states = [state_h, state_c]

        # Define decoder
        decoder_inputs = tf.keras.layers.Input(shape=(maxlen_answers,))
        decoder_embedding = tf.keras.layers.Embedding(VOCAB_SIZE, 200, mask_zero=True)(decoder_inputs)
        decoder_lstm = tf.keras.layers.LSTM(200, return_state=True, return_sequences=True)
        decoder_outputs, _, _ = decoder_lstm(decoder_embedding, initial_state=encoder_states)
        decoder_dense = tf.keras.layers.Dense(VOCAB_SIZE, activation=tf.keras.activations.softmax)
        output = decoder_dense(decoder_outputs)

        # Create the model
        model = tf.keras.models.Model([encoder_inputs, decoder_inputs], output)
        model.compile(optimizer=tf.keras.optimizers.RMSprop(), loss='categorical_crossentropy')
        model.summary()

        # Train the model
        model.fit([encoder_input_da, decoder_input_da], decoder_output_da, batch_size=50, epochs=100, verbose=0)

        return model, decoder_embedding, decoder_dense, decoder_inputs, decoder_lstm, encoder_inputs, encoder_states


In [None]:
model, decoder_embedding, decoder_dense, decoder_inputs,decoder_lstm , encoder_inputs, encoder_states = Encoder_Decoder(questions, answers).modeling()

In [None]:
def make_inference_models():
    """
    Creates the inference models for both the encoder and decoder parts of a sequence-to-sequence model.

    The encoder model is used to encode input sequences and return the internal states,
    while the decoder model is used for making predictions one timestep at a time
    during the inference phase.

    Returns:
        tuple: A tuple containing:
            - encoder_model (tf.keras.Model): The model for encoding input sequences.
            - decoder_model (tf.keras.Model): The model for decoding and predicting output sequences.
    """
    # Define the encoder inference model
    encoder_model = tf.keras.models.Model(encoder_inputs, encoder_states)

    # Define placeholders for decoder states during inference
    decoder_state_input_h = tf.keras.layers.Input(shape=(200,))
    decoder_state_input_c = tf.keras.layers.Input(shape=(200,))
    decoder_states_inputs = [decoder_state_input_h, decoder_state_input_c]

    # Use the decoder LSTM with the states as initial inputs
    decoder_outputs, state_h, state_c = decoder_lstm(
        decoder_embedding, initial_state=decoder_states_inputs
    )
    decoder_states = [state_h, state_c]

    # Apply the dense layer to obtain predictions
    decoder_outputs = decoder_dense(decoder_outputs)

    # Define the decoder inference model
    decoder_model = tf.keras.models.Model(
        [decoder_inputs] + decoder_states_inputs,
        [decoder_outputs] + decoder_states
    )

    return encoder_model, decoder_model


In [None]:
def str_to_tokens(sentence: str):
    """
    Converts a given sentence into tokenized and padded sequences for use as input to the encoder model.

    Args:
        sentence (str): The input sentence to be tokenized.

    Returns:
        numpy array: A padded sequence of tokens corresponding to the input sentence.
    """
    # Extract the maximum question length from the encoder input data
    _, maxlen_questions, _ = Dataprepration(questions, answers).encoder_input_data()

    # Convert the sentence to lowercase and split it into words
    words = sentence.lower().split()
    tokens_list = list()

    # Convert each word into its corresponding token from the tokenizer's word index
    for word in words:
        tokens_list.append(tokenizer.word_index[word])

    # Pad the tokenized sequence to match the maximum question length
    return preprocessing.sequence.pad_sequences([tokens_list], maxlen=maxlen_questions, padding='post')


In [None]:
def chatbot_inference(tokenizer, num_responses=10, max_translation_length=72):
    """
    Handles the chatbot inference process, allowing user interaction with the trained encoder-decoder model.

    The function takes user input, processes it through the encoder-decoder model, and generates a decoded response.

    Args:
        enc_model (tf.keras.Model): The trained encoder inference model.
        dec_model (tf.keras.Model): The trained decoder inference model.
        tokenizer (tf.keras.preprocessing.text.Tokenizer): The tokenizer used for encoding and decoding text.
        num_responses (int, optional): Number of questions the chatbot will respond to. Default is 10.
        max_translation_length (int, optional): Maximum length of the generated translation. Default is 72.

    Returns:
        None: The function prints decoded translations to the console.
    """
    enc_model , dec_model = make_inference_models()
    for _ in range(num_responses):
        # Take user input
        user_input = input('Enter question: ')

        # Encode the input using the encoder model
        states_values = enc_model.predict(str_to_tokens(user_input))

        # Initialize the target sequence with the "start" token
        empty_target_seq = np.zeros((1, 1))
        empty_target_seq[0, 0] = tokenizer.word_index['start']

        # Initialize variables for the decoding loop
        stop_condition = False
        decoded_translation = ''

        while not stop_condition:
            # Predict the next token and updated decoder states
            dec_outputs, h, c = dec_model.predict([empty_target_seq] + states_values)

            # Get the token index with the highest probability
            sampled_word_index = np.argmax(dec_outputs[0, -1, :])
            sampled_word = None

            # Convert the token index back to the word
            for word, index in tokenizer.word_index.items():
                if sampled_word_index == index:
                    decoded_translation += ' {}'.format(word)
                    sampled_word = word
                    break

            # Check if the stop condition is met
            if sampled_word == 'end' or len(decoded_translation.split()) > max_translation_length:
                stop_condition = True

            # Update the target sequence and states for the next timestep
            empty_target_seq = np.zeros((1, 1))
            empty_target_seq[0, 0] = sampled_word_index
            states_values = [h, c]

        # Print the generated response
        print(decoded_translation)


chatbot_inference(tokenizer, num_responses=10, max_translation_length=72)