In [1]:
import os
import pandas as pd
import ast
import re
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
import numpy as np

In [2]:
#Read movie_lines.txt
lines = pd.read_csv('Dataa/movie_lines.txt', sep='\+\+\+\$\+\+\+', header=None, engine='python', names=['lineID', 'characterID', 'movieID', 'character', 'text'], encoding='latin-1')

# Read movie_conversations.txt
conversations = pd.read_csv('Dataa/movie_conversations.txt', sep='\+\+\+\$\+\+\+', header=None, engine='python', names=['character1ID', 'character2ID', 'movieID', 'utteranceIDs'], encoding='latin-1')

In [3]:
print(lines.head(1))
print(conversations.head(1))
print(len(lines))
print(len(conversations))

   lineID characterID movieID character           text
0  L1045          u0      m0    BIANCA    They do not!
  character1ID character2ID movieID                       utteranceIDs
0          u0           u2      m0    ['L194', 'L195', 'L196', 'L197']
304713
83097


In [4]:
# Create a dictionary to map each line's ID to its text
id2line = {line.lineID: line.text for line in lines.itertuples()}

# This will be the sample output:
# {
#    1: 'Hello, how are you?',
#    2: "I'm fine, thank you.",
#    3: 'Good morning!',
#    4: 'Hi there.'
# }

# Fixed an error, there was an extra space after each lineID
id2line_new = {key.replace(" ", "") if " " in key else key: value for key, value in id2line.items()}
id2line = id2line_new

In [5]:
# Extract conversations
conversations_data = []

for conv in conversations.itertuples():
    # Convert the string representation of list to a list of strings
    utterance_ids = ast.literal_eval(conv.utteranceIDs)
    # Get the text corresponding to each line ID
    conv_texts = [id2line.get(uid, '') for uid in utterance_ids if uid in id2line]
    if conv_texts:  # Only add non-empty conversations
        conversations_data.append(conv_texts)

In [6]:
def preprocess_text(text):
    # Convert text to str (for some reason the text was being considered float😒)
    text = str(text)
    # Convert to lowercase
    text = text.lower()
    # Remove punctuation and non-alphabetic characters
    text = re.sub(r"[^a-zA-Z0-9]+", " ", text)
     # Replace contractions and common abbreviations with full forms
    text = re.sub(r"i'm", "i am", text)
    text = re.sub(r"he's", "he is", text)
    text = re.sub(r"she's", "she is", text)
    text = re.sub(r"that's", "that is", text)
    text = re.sub(r"what's", "what is", text)
    text = re.sub(r"where's", "where is", text)
    text = re.sub(r"\'ll", " will", text)
    text = re.sub(r"\'ve", " have", text)
    text = re.sub(r"\'re", " are", text)
    text = re.sub(r"\'d", " would", text)
    text = re.sub(r"won't", "will not", text)
    text = re.sub(r"can't", "can not", text)
    text = re.sub(r"[-()\"#/@;:<>{}`+=~|.!?,]", "", text)
    # Remove leading and trailing spaces
    return text.strip()
# Apply preprocessing to each line in the conversations
preprocessed_conversations = []
for conv in conversations_data:
    # Preprocess each line in the conversation
    preprocessed_conversations.append([preprocess_text(line) for line in conv])

In [7]:
input_texts = []
target_texts = []

#Iterating through each conversation that is preprocessed
for conv in preprocessed_conversations:
    #Iterating through each line in the conversation
    for i in range(len(conv) - 1):
        #Appending the input and target texts
        input_texts.append(conv[i]) #input text
        target_texts.append(conv[i + 1])  #target text

In [8]:
#adding the start and end tags to the sentence to know where to stop generating the text
target_texts = ["<start> " + text + " <end>" for text in target_texts]

In [9]:
# Tokenize the input and target texts
tokenizer = Tokenizer()
#Fit the tokenizer on the input and target texts
#This will create the vocabulary of words used in the texts
#This will split the words
#This will also assign a unique integer to each word
#This will also count the frequency of each word
# 1: i (i occured most for example) 2: am (am occured 2nd most for example)
tokenizer.fit_on_texts(input_texts + target_texts)

# Convert the input texts to sequences of integers
# 'hello how are you' becomes [6, 7, 8, 5]
input_sequences = tokenizer.texts_to_sequences(input_texts)

# Convert the target texts to sequences of integers
# 'i am fine thank you' becomes [1, 2, 3, 4, 5].
target_sequences = tokenizer.texts_to_sequences(target_texts)


#finding max of both sequences
#max_length = max(max(len(seq) for seq in input_sequences), max(len(seq) for seq in target_sequences))
#pad zeros in small length input sequences at the end
max_length = 70
input_sequences = pad_sequences(input_sequences, maxlen=max_length, padding='post')
#pad zeros in small length target sequences at the end
target_sequences = pad_sequences(target_sequences, maxlen=max_length, padding='post')

In [10]:
#From here on split data into training and testing data
# chose network of the choice LSTM, Transformer

# **Natural Language Understanding**

NLU is a subfield of AI that focuses on enabling computers to understand the meaning and intent behind human language.

NLU is used in this project to:
1.  Understand the user's goals and intentions.
2.  Extract relevant information from user input.
3.  Respond in a way that aligns with the identified intent and context

In [11]:
import spacy
import json
import random

# Intent Recognition

In [12]:
INTENT_PATH = 'Dataa/intents.json'

# Open the file and load the JSON data into the 'intents'
with open(INTENT_PATH) as f:
    intents = json.load(f)

# Extract the 'intents' key from the loaded JSON data
intents = intents['intents']

In [13]:
# Intents.json file has various intent dictionaries each following the following structure
# {
#     "tag": "unique_intent_identifier",  # e.g., "greeting", "goodbye"
#     "patterns": [ List of user phrases that trigger this intent ],
#     "responses": [ List of possible responses the chatbot can give ],
#     "context_set": ""   used for context-aware chatbots
# }


def get_intent(user_input):
    """
    Searches for a matching intent based on user input.

    This function iterates through all intents in the loaded `intents` list.
    For each intent, it checks if any patterns (user phrases) match the lowercased user input.
    If a match is found, the corresponding intent dictionary is returned. Otherwise, the function returns None.

    Args:   user_input (str): The user's input phrase.

    Returns:  dict or None: The matching intent dictionary if found, otherwise None.
    """

    for intent in intents:
        for pattern in intent['patterns']:
            if pattern.lower() in user_input.lower():
                return intent
    return None


def get_response(intent):
    """
    Selects a random response from the provided intent.

    This function assumes the `intent` dictionary has a "responses" key containing a list of possible chatbot responses.
    It randomly chooses one of these responses and returns it.

    Args:   intent (dict): The intent dictionary containing user patterns and responses.

    Returns:  str: A random response chosen from the intent's "responses" list.
    """
    return random.choice(intent['responses'])

# **Name Entity Recognition**

In [14]:
# Load the spaCy English small model
nlp = spacy.load('en_core_web_sm')

def extract_entities(user_input):
    """
    Extracts named entities from user input using spaCy.

    This function takes a string of user input and processes it using the loaded spaCy model (`nlp`).
    The model identifies and classifies named entities within the text. The function then iterates through the identified entities,
    extracting their text and label (type of entity) and returning a list of tuples containing these values.

    Args:   user_input (str): The user's input text.

    Returns:  list: A list of tuples where each tuple contains (entity_text, entity_label).
    """

    # for example
    # User Input: Barack Obamma was the president of United States of America
    # Entities:
    #         1) Barack Obamma -> Person
    #         2) United States of America -> GPE (GeoPolitical Entity)

    doc = nlp(user_input)
    entities = [(ent.text, ent.label_) for ent in doc.ents]
    return entities

# **Context Management**

In [15]:
current_context = ""

# Getter and Setter functions for context
def get_context():
    return current_context

def set_context(context):
    global current_context
    current_context = context

In [16]:
# Dictionary to store context information for intents that will be used to set context
intent_context_map = {}
for intent in intents:
    if intent['context_set']:
        intent_context_map[intent['tag']] = intent['context_set']
    else:
        intent_context_map[intent['tag']] = None

In [17]:
def handle_input(user_input):
    """
    Processes user input, returning a response and extracted entities.

    This function takes the user's input as a string. It first calls `get_intent` to identify the intent (purpose)
    behind the user's input based on the loaded intents and their patterns.

    If an intent is found:
        - `get_response` is called to retrieve a random response from the corresponding intent's list of responses.
        - `extract_entities` is used to identify and extract named entities from the user's input using the spaCy library.
        - The `intent_context_map` is used to look up the context associated with the identified intent tag. If context exists, the `set_context`
          function is called to set the current conversational context. Otherwise, the context is set to None.

    If no intent is found, a default response indicating confusion is returned, and an empty list of entities is provided.

    Args:   user_input (str): The user's input text.

    Returns:  tuple: A tuple containing (response_text, list_of_entities).
    """

    intent = get_intent(user_input)
    if intent:
        response = get_response(intent)
        entities = extract_entities(user_input)
        context = intent_context_map[intent['tag']]
        if context:
            set_context(context)
        else:
            set_context(None)
    else:
        response = retrieve_response(user_input)
        if(response):
            response = response
            entities = extract_entities(user_input)
        else:
            response = generate_reponse(user_input)
            entities = extract_entities(user_input)

    return response, entities

# Response Generator
<ul>
<li> 1. Rule Based Reponse </li>
    - As Rule Based is already define which is in the form if intent file
<li> 2. Retrieval Based Reponse </li>
    - Retrieval Based is Implemented below where I have to retrieve the answer from the already given data like
    - input sequences, targeted sequences
<li> 3. Generative Reponse </li>
    - For Generative Response we will train a model using transformer givin below
</ul>    

In [19]:
from sklearn.metrics.pairwise import cosine_similarity

def retrieve_response(user_input):
    user_in = tokenizer.texts_to_sequences([user_input])
    input_seq = pad_sequences(user_in, maxlen=max_length, padding='post')
    similarities = cosine_similarity(input_seq, input_sequences)[0]
    
    similar_indices = np.where(similarities > 0.95)[0]
    if len(similar_indices) > 0:
        most_similar_index = similar_indices[0]  # Take the first one above threshold
        output_sequence = target_sequences[most_similar_index]
        output_text = tokenizer.sequences_to_texts([output_sequence])[0]
        return output_text
    else:
        return None

In [20]:
# In reponse Generation we have three type to generate out response 
# 1. Rule-Based in this we have to get user input and Check it in the intent file if found then it will be return to the as an
# answer by the bot if not found in the Rule-Based then switch to second type
# 2. Retrivel Based in this type we have to get input from the user and check it's simililarity with target_sequence as already
# define and if found or we will give it as an answer here we will also define a score from above that score we will get the result and 
# below that score we will then switch to third type
# 3. In this type a Generative reponse will be generate based on the trained model it might be RNN or transformer.

# Model Training Using (seq2seq) LSTM 

In [21]:
#converting data to training, validation, testing
from sklearn.model_selection import train_test_split
input_train, input_test, target_train, target_test = train_test_split(input_sequences, target_sequences, test_size=0.3, random_state=42)
input_train, input_val, target_train, target_val = train_test_split(input_train, target_train, test_size=0.2, random_state=42)

In [22]:
import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, LSTM, Dense, Embedding

# Define the encoder
encoder_inputs = Input(shape=(max_length,))
encoder_embedding = Embedding(input_dim=len(tokenizer.word_index) + 1, output_dim=64)(encoder_inputs)
encoder_lstm = LSTM(128, return_state=True)
encoder_outputs, state_h, state_c = encoder_lstm(encoder_embedding)
encoder_states = [state_h, state_c]

# Define the decoder
decoder_inputs = Input(shape=(max_length,))
decoder_embedding = Embedding(input_dim=len(tokenizer.word_index) + 1, output_dim=64)(decoder_inputs)
decoder_lstm = LSTM(128, return_sequences=True, return_state=True)
decoder_outputs, _, _ = decoder_lstm(decoder_embedding, initial_state=encoder_states)
decoder_dense = Dense(len(tokenizer.word_index) + 1, activation='softmax')
decoder_outputs = decoder_dense(decoder_outputs)

# Define the model that will turn
# encoder_input_data & decoder_input_data into decoder_target_data
model = Model([encoder_inputs, decoder_inputs], decoder_outputs)

# Compile the model
model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])

In [23]:
# Prepare the target data shifted by one time step
target_train_shifted = np.zeros_like(target_train)
target_val_shifted = np.zeros_like(target_val)

target_train_shifted[:, :-1] = target_train[:, 1:]
target_val_shifted[:, :-1] = target_val[:, 1:]

# Train the model
history = model.fit(
    [input_train, target_train], target_train_shifted,
    batch_size=8,
    epochs=1,
    validation_data=([input_val, target_val], target_val_shifted)
)

[1m  128/15513[0m [37m━━━━━━━━━━━━━━━━━━━━[0m [1m2:37:57[0m 616ms/step - accuracy: 0.7947 - loss: 6.1987

KeyboardInterrupt: 

In [None]:
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score

# Make predictions
predictions = model.predict([input_test, target_test])

# Flatten the predictions and true values
preds_flat = np.argmax(predictions, axis=-1).flatten()
targets_flat = target_test.flatten()

# Calculate evaluation metrics
accuracy = accuracy_score(targets_flat, preds_flat)
precision = precision_score(targets_flat, preds_flat, average='weighted')
recall = recall_score(targets_flat, preds_flat, average='weighted')
f1 = f1_score(targets_flat, preds_flat, average='weighted')

print(f'Accuracy: {accuracy}')
print(f'Precision: {precision}')
print(f'Recall: {recall}')
print(f'F1 Score: {f1}')

In [None]:
encoder_model = Model(encoder_inputs, encoder_states)

decoder_state_input_h = Input(shape=(128,))
decoder_state_input_c = Input(shape=(128,))
decoder_states_inputs = [decoder_state_input_h, decoder_state_input_c]

decoder_lstm_outputs, state_h, state_c = decoder_lstm(
    decoder_embedding, initial_state=decoder_states_inputs
)
decoder_states = [state_h, state_c]
decoder_outputs = decoder_dense(decoder_lstm_outputs)

decoder_model = Model(
    [decoder_inputs] + decoder_states_inputs,
    [decoder_outputs] + decoder_states
)

In [None]:
def generate_response(input_text):
    input_seq = tokenizer.texts_to_sequences([input_text])
    input_seq = pad_sequences(input_seq, maxlen=max_length, padding='post')
    
    states_value = encoder_model.predict(input_seq)
    
    target_seq = np.zeros((1, 1))
    target_seq[0, 0] = tokenizer.word_index['<start>']
    
    generated_response = ''
    
    stop_condition = False
    while not stop_condition:
        output_tokens, h, c = decoder_model.predict([target_seq] + states_value)
        
        sampled_token_index = np.argmax(output_tokens[0, -1, :])
        sampled_word = tokenizer.index_word.get(sampled_token_index, '<unk>')
        
        if sampled_word == '<end>' or len(generated_response.split()) >= max_length:
            stop_condition = True
        else:
            generated_response += ' ' + sampled_word
        
        target_seq = np.zeros((1, 1))
        target_seq[0, 0] = sampled_token_index
        
        states_value = [h, c]
    
    return generated_response.strip()

In [None]:
while True:
    user_input = input("You: ")
    if user_input.lower() == "bye":
        response, _ = handle_input(user_input)
        print("Bot:", response)
        break

    response, entities = handle_input(user_input)
    print("Bot:", response)
    print('Entities:', entities)
    print("Context:", get_context())
    print('\n'+'='*50+'\n')