In [30]:
import os
import pickle
import numpy as np
import tensorflow as tf
from tensorflow.keras.models import load_model
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.layers import Input, Embedding, LSTM, Dense, Concatenate, Bidirectional
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam
import re
import unicodedata

In [45]:
!python --version
print("TensorFlow version:", tf.__version__)
print("TensorFlow Keras version:", tf.keras.__version__)

Python 3.10.14
TensorFlow version: 2.10.0
TensorFlow Keras version: 2.10.0


### Load the Tokenizer

In [32]:
# Get the current working directory
cwd = os.getcwd()

# Join the paths
tokenizer_path = os.path.join(cwd, 'data', 'tokenizer_dd_tf210.pickle')
# Load tokenizer from pickle
with open(tokenizer_path, 'rb') as handle:
    tokenizer = pickle.load(handle)

### Load the Model

In [33]:
# keras_file_path = os.path.join(cwd, 'data', 's2s_model_dd_tf210.keras')
# model = tf.keras.models.load_model(keras_file_path, custom_objects=None, compile=True)
# model.summary()

### Save the Weights

In [34]:
# weights_path = os.path.join(cwd, 'data', 's2s_model_dd_tf210_weights_app.h5')
# model.save_weights(weights_path)

### Define the Model

In [35]:
# Define model parameters
latent_dim = 200
num_encoder_tokens = len(tokenizer.word_index) + 1
num_decoder_tokens = len(tokenizer.word_index) + 1
learning_rate = 0.001

# Define encoder
encoder_inputs = Input(shape=(None,))
encoder_embedding = Embedding(num_encoder_tokens, latent_dim, mask_zero=True)(encoder_inputs)
# Make the LSTM layer bidirectional
encoder_lstm = Bidirectional(LSTM(latent_dim, return_state=True, dropout=0.2)) #, dropout=0.2, kernel_regularizer=l2(0.01)))  # , recurrent_dropout=0.2) Removed recurrent_dropout for cuDNN compatibility
encoder_outputs, forward_h, forward_c, backward_h, backward_c = encoder_lstm(encoder_embedding)
state_h = Concatenate()([forward_h, backward_h])
state_c = Concatenate()([forward_c, backward_c])
encoder_states = [state_h, state_c]

# Update latent_dim to match the concatenated states
latent_dim *= 2

# Define decoder
decoder_inputs = Input(shape=(None,))
decoder_embedding = Embedding(num_decoder_tokens, latent_dim, mask_zero=True)
decoder_embedded = decoder_embedding(decoder_inputs)
decoder_lstm = LSTM(latent_dim, return_sequences=True, return_state=True, dropout=0.2) #, dropout=0.2, kernel_regularizer=l2(0.01))  # , recurrent_dropout=0.2) Removed recurrent_dropout for cuDNN compatibility
decoder_outputs, _, _ = decoder_lstm(decoder_embedded, initial_state=encoder_states)
decoder_dense = Dense(num_decoder_tokens, activation='softmax')
decoder_outputs = decoder_dense(decoder_outputs)

# Define the model
model = Model([encoder_inputs, decoder_inputs], decoder_outputs)

# Compile the model
model.compile(optimizer=Adam(learning_rate=learning_rate), loss='sparse_categorical_crossentropy')

### Load Weights

In [36]:
# weights_path = os.path.join(cwd, 'data', 's2s_model_dd_tf210_weights_app.h5')
weights_path = os.path.join(cwd, 'data', 's2s_model_dd_tf210_weights.h5')
model.load_weights(weights_path)

### Define models for inference

In [37]:
# Define encoder model for inference
encoder_model = Model(encoder_inputs, encoder_states)

# Define decoder model for inference
decoder_state_input_h = Input(shape=(latent_dim,))
decoder_state_input_c = Input(shape=(latent_dim,))
decoder_states_inputs = [decoder_state_input_h, decoder_state_input_c]

decoder_inference_inputs = Input(shape=(None,))
decoder_embedding_inference = decoder_embedding(decoder_inference_inputs)
decoder_outputs, state_h, state_c = decoder_lstm(
    decoder_embedding_inference, initial_state=decoder_states_inputs
)
decoder_states = [state_h, state_c]
decoder_outputs = decoder_dense(decoder_outputs)

decoder_model = Model(
    [decoder_inference_inputs] + decoder_states_inputs,
    [decoder_outputs] + decoder_states
)

# Save token index mappings
target_token_index = tokenizer.word_index
reverse_target_token_index = {v: k for k, v in target_token_index.items()}

### Define preprocess functions

In [38]:
contractions = {
    "’": "'",
    "‘": "'",
    "“": '"',
    "”": '"',
    "can't": "cannot",
    "won't": "will not",
    "n't": " not",
    "i'm": "i am",
    "i'd": "i would",
    "thats's": "that is",
    "it's": "it is",
    "he's": "he is",
    "she's": "she is",
    "you're": "you are",
    "they're": "they are",
    "we're": "we are",
    "i've": "i have",
    "you've": "you have",
    "they've": "they have",
    "we've": "we have",
    "isn't": "is not",
    "aren't": "are not",
    "wasn't": "was not",
    "weren't": "were not",
    "doesn't": "does not",
    "don't": "do not",
    "didn't": "did not",
    "hasn't": "has not",
    "haven't": "have not",
    "hadn't": "had not",
    "i'll": "i will",
    "you'll": "you will",
    "he'll": "he will",
    "she'll": "she will",
    "we'll": "we will",
    "they'll": "they will",
    "wouldn't": "would not",
    "shouldn't": "should not",
    "couldn't": "could not",
    "mightn't": "might not",
    "mustn't": "must not",
    "she'd": "she would",
    "he'd": "he would",
    "they'd": "they would",
    "we'd": "we would",
    "that'll": "that will",
    "there'll": "there will",
    "who'll": "who will",
    "it'll": "it will",
    "that'd": "that would",
    "there'd": "there would",
    "who'd": "who would",
    "when's": "when is",
    "where's": "where is",
    "why's": "why is",
    "how's": "how is",
    "y'all": "you all",
    "let's": "let us",
    "ma'am": "madam",
    "o'clock": "of the clock",
    "ain't": "is not",
    "could've": "could have",
    "should've": "should have",
    "would've": "would have",
    "might've": "might have",
    "must've": "must have",
    "who've": "who have",
    "oughtn't": "ought not",
    "daren't": "dare not",
    "needn't": "need not",
    "what's": "what is",
    "usedn't": "used not"
}

def normalize_text(text: str) -> str:
    # Normalize Unicode string to NFKD form, remove non-ASCII characters, and then decode it back to a UTF-8 string
    text = unicodedata.normalize('NFKD', text).encode('ascii', 'ignore').decode('utf-8')
    # Convert to lowercase
    text = text.lower()
    # Remove spaces around apostrophes
    text = re.sub(r"\s*'\s*", "'", text)
    # Add a space before and after any punctuation mark (., !, or ?)
    text = re.sub(r"\s*([.!?])\s*", r" \1 ", text)
    # Correct contractions
    for contraction, replacement in contractions.items():
        text = re.sub(re.escape(contraction), replacement, text)
    # Replace any sequence of characters that are not letters, basic punctuation
    text = re.sub(r"[^a-z' ]", ' ', text) # re.sub(r"[^a-z.,'!? ]", ' ', text)
    # Replace any sequence of whitespace characters with a single space and remove leading and trailing whitespace
    text = re.sub(r"\s+", ' ', text).strip()
    return text

def remove_names(text: str) -> str:
    # Use spaCy to detect and remove names from the text
    doc = nlp(text)
    filtered_text = ' '.join([token.text for token in doc if token.ent_type_ != 'PERSON']) # Takes really long time, exlude from chatbot input preprocessing
    return filtered_text

def preprocess_text(text: str) -> str:
    # Normalize text
    text = normalize_text(text)
    # Remove names using spaCy's NER
    if initial_preprocessing:
        text = remove_names(text)
    # # Remove punctuation
    # text = text.translate(str.maketrans('', '', string.punctuation))
    # Remove stopwords and tokenize
    # words = word_tokenize(text) # More intelligent splitting
    # filtered_words = [word for word in words if word not in stop_words]
    # # Lemmatize words
    # lemmatized_words = [lemmatizer.lemmatize(word) for word in filtered_words]
    # Add <SOS> and <EOS> tokens, and join the list into a single string
    # return ' '.join(['sofs'] + lemmatized_words + ['eofs'])
        # Trim the text to the desired length
    words = text.split()[:max_length]
    trimmed_text = ' '.join(words)  # Consider to remove trimming, if you want pad later on max length
    return trimmed_text

# def generate_response(input_seq: np.ndarray, max_decoder_seq_length: int) -> str:
#     states_value = encoder_model.predict(input_seq, verbose=0)
#     target_seq = np.ones((1, 1)) # Was np.zeros((1, 1))
#     target_seq[0, 0] = tokenizer.word_index['<START>']
#     stop_condition = False
#     decoded_sentence = ''
#     while not stop_condition:
#         output_tokens, h, c = decoder_model.predict([target_seq] + states_value, verbose=0)
#         sampled_token_index = np.argmax(output_tokens[0, -1, :])
#         sampled_char = reverse_target_token_index[sampled_token_index]
#         decoded_sentence += ' ' + sampled_char
#         if (sampled_char == '<END>' or len(decoded_sentence.split()) > max_decoder_seq_length):
#             stop_condition = True
#         target_seq = np.ones((1, 1)) # Was np.zeros((1, 1))
#         target_seq[0, 0] = sampled_token_index
#         states_value = [h, c]
#     return decoded_sentence.strip().replace('<START>', '').replace('<END>', '').strip()

### Generate response

In [39]:
# Function to generate responses
def generate_response(input_seq: np.ndarray, max_decoder_seq_length: int) -> str:
    # Encode the input sequence to get the internal states
    states_value = encoder_model.predict(input_seq, verbose=0)

    # Generate empty target sequence of length 1 with only the start token
    target_seq = np.ones((1, 1)) # Was np.zeros((1, 1))
    target_seq[0, 0] = tokenizer.word_index['<START>']

    stop_condition = False
    decoded_sentence = ''
    while not stop_condition:
        output_tokens, h, c = decoder_model.predict([target_seq] + states_value, verbose=0)

        # Sample a token and add the corresponding character to the decoded sentence
        sampled_token_index = np.argmax(output_tokens[0, -1, :])
        sampled_char = reverse_target_token_index[sampled_token_index]
        decoded_sentence += ' ' + sampled_char

        # Exit condition: either hit max length or find stop token
        if (sampled_char == '<END>' or len(decoded_sentence.split()) > max_decoder_seq_length):
            stop_condition = True

        # Update the target sequence (of length 1)
        target_seq = np.ones((1, 1)) # Was np.zeros((1, 1))
        target_seq[0, 0] = sampled_token_index

        # Update states
        states_value = [h, c]

    return decoded_sentence.strip().replace('<START>', '').replace('<END>', '').strip()

In [40]:
def beam_search_decode(input_seq, beam_width=3, max_decoder_seq_length=15):
    # Encode the input sequence to get the internal states
    states_value = encoder_model.predict(input_seq, verbose=0)

    # Initialize the beams
    start_token_index = tokenizer.word_index['<START>']
    end_token_index = tokenizer.word_index['<END>']
    beams = [(np.array([[start_token_index]]), states_value, 0.0)]  # (sequence, states, cumulative_probability)

    for _ in range(max_decoder_seq_length):
        all_candidates = []
        for seq, states, score in beams:
            if seq[0, -1] == end_token_index:
                # If the beam already ended with the end token, add it to the candidates
                all_candidates.append((seq, states, score))
                continue
            
            # Predict the next token
            output_tokens, h, c = decoder_model.predict([seq[:, -1:]] + states, verbose=0)
            # Get the top beam_width predictions
            top_k_indices = np.argsort(output_tokens[0, -1, :])[-beam_width:]
            
            # Create new beams for each prediction
            for idx in top_k_indices:
                new_seq = np.hstack([seq, np.array([[idx]])])
                new_score = score + np.log(output_tokens[0, -1, idx])  # Use log to prevent underflow
                all_candidates.append((new_seq, [h, c], new_score))
        
        # Select the top beam_width beams
        beams = sorted(all_candidates, key=lambda x: x[2], reverse=True)[:beam_width]

        # Check if all beams end with the end token
        if all(seq[0, -1] == end_token_index for seq, _, _ in beams):
            break

    # Choose the best beam (highest score)
    best_seq, _, _ = beams[0]
    decoded_sentence = ' '.join([reverse_target_token_index[idx] for idx in best_seq[0] if idx != start_token_index and idx != end_token_index])
    return decoded_sentence


In [44]:
# initial_preprocessing = False
# max_length = 15

def chat():
    print("Chatbot is ready! Type 'exit' to end the conversation.")
    while True:
        user_input = input("You: ")
        if user_input.lower() == 'exit':
            print("Argama: Goodbye!")
            print("Beamara: Goodbye!")
            break
        input_text = preprocess_text(user_input)
        input_sequence = [tokenizer.texts_to_sequences([input_text])[0]]
        padded_input_sequence = pad_sequences(input_sequence, maxlen=max_length, padding='pre', truncating='post')
        response = generate_response(np.array(padded_input_sequence), max_length)
        response_2 = beam_search_decode(np.array(padded_input_sequence), beam_width=3, max_decoder_seq_length=max_length)
        print(f"Argama: {response}")
        print(f"Beamara: {response}")

if __name__ == "__main__":
    chat()

Chatbot is ready! Type 'exit' to end the conversation.


You:  hello bot


Argama: hello is this the way what is the fare
Beamara: hello is this the way what is the fare


You:  maybe we can discuss some issue?


Argama: yes we have a lot of vegetables
Beamara: yes we have a lot of vegetables


You:  are you vegetarian?


Argama: i am not sure i am not sure
Beamara: i am not sure i am not sure


You:  have you finished at least school


Argama: no i am not sure
Beamara: no i am not sure


You:  what do you like?


Argama: i like the novels of dickens very much
Beamara: i like the novels of dickens very much


You:  do you like sports?


Argama: yes i have a lot of friends
Beamara: yes i have a lot of friends


You:  name your friends


Argama: yes
Beamara: yes


You:  what is the weather outside?


Argama: it is a fruit with a <OOV> of the city
Beamara: it is a fruit with a <OOV> of the city


You:  is it cloudy?


Argama: no not at all
Beamara: no not at all


You:  it is sunny?


Argama: i am sorry sir i am afraid i can not i am afraid i can not
Beamara: i am sorry sir i am afraid i can not i am afraid i can not


You:  would you like to go with me in cinema


Argama: sure what do you want to go
Beamara: sure what do you want to go


You:  movie to watch


Argama: i am sorry to hear that
Beamara: i am sorry to hear that


You:  read books


Argama: and that 's the leading cause of lung cancer and <OOV> <OOV> speakers <OOV> <OOV>
Beamara: and that 's the leading cause of lung cancer and <OOV> <OOV> speakers <OOV> <OOV>


You:  exit


Argama: Goodbye!
Beamara: Goodbye!
