In [30]:
import os ## interaction with operating system
import numpy as np
import pandas as pd
import tensorflow as tf ##function that build models
from tensorflow.keras import layers , activations , models , preprocessing , utils

In [32]:
data_path='./dataset.txt'

In [33]:
input_texts = []  
target_texts = [] 

with open(data_path) as f:  # Opens the file located at data_path to read
    lines = f.read().split('\n')  # Reads the entire file and splits it into lines

for line in lines[: min(600, len(lines) - 1)]:  # Iterates through the first 600 lines or all lines except the last one
    split_line = line.split('\t')  # Splits the line into parts using the tab  as a splitter
    if len(split_line) >= 2:  # Checks if the line contains at least two words to avoid index errors
        input_text = split_line[0]  # Extracts the first part(question) as the input text
        target_text = split_line[1]  # Extracts the second part(answr) as the target text
        input_texts.append(input_text)  # Appends(add) the input text to the list of input texts
        target_texts.append(target_text)  # Appends(add) the target text to the list of target texts
    else:
        print(f"Skipping malformed line: {line}")  # warning for lines that don't have at least two words


In [34]:
print('type of input_text', type(input_text))  
#input_text is a string because it represents one input line

print('type of target_texts', type(target_texts))  
#target_texts is a list because it collects multiple target lines


type of input_text <class 'str'>
type of target_texts <class 'list'>


In [35]:
# Converting the lists `input_texts` and `target_texts` into a Pandas 2d DataFrame
zippedList = list(zip(input_texts, target_texts))  
# Combines the elements of `input_texts` and `target_texts` into pairs (tuples) to create a list of tuples

lines = pd.DataFrame(zippedList, columns=['input', 'output'])  
# Creates a 2d DataFrame with the paired data, adding column names 'input' and 'output' to represent the input and target text

lines.head()  



Unnamed: 0,input,output
0,What is my name,Your name is Amr Emara
1,How old am I,You are twenty years old
2,When was I born,You were born in 2004
3,What do I study,You study Artificial Intelligence
4,Where do I study,You study at the Arab Academy


In [36]:
lines.shape

(576, 2)

## Preparing input data for the Encoder

In [37]:
input_lines = list()  
# Initializes an empty list to store the input text lines

for line in lines.input:  
    input_lines.append(line)  
    # Iterates through the 'input' column of the DataFrame and add each line to input_lines

tokenizer = preprocessing.text.Tokenizer()  
# Creates a tokenizer  to convert text into numerical values

tokenizer.fit_on_texts(input_lines)  
#fits the tokenizer on the input text, creating a list of all unique words and assigning each word a number

tokenized_input_lines = tokenizer.texts_to_sequences(input_lines)  
# Converts each text line into a numeric values based on the tokenizer's word index

length_list = list()  
# Initializes a list to store the lengths of each tokenized sequence

for token_seq in tokenized_input_lines:  
    length_list.append(len(token_seq))  
    # adds the length of each tokenized sequence to `length_list`

max_input_length = np.array(length_list).max()  
# Finds the maximum sequence length from the list of lengths

print('Input max length is {}'.format(max_input_length))  
# Prints the maximum length of the input sequences

padded_input_lines = preprocessing.sequence.pad_sequences(tokenized_input_lines, maxlen=max_input_length, padding='post')  
# Pads the tokenized input sequences to ensure all sequences are of the same length (using post-padding)

encoder_input_data = np.array(padded_input_lines)  
# Converts the padded sequences into a Numpy array

print('Encoder input data shape -> {}'.format(encoder_input_data.shape))  
# Prints the shape of the encoder input data to verify its dimensions

input_word_dict = tokenizer.word_index  
# Retrieves the word-to-index mapping created by the tokenizer

num_input_tokens = len(input_word_dict) + 1  
# (index zero is reserved for a purpose ex padding)Calculates the total number of unique tokens, adding 1 to account for zero-based indexing

print('Number of Input tokens = {}'.format(num_input_tokens))  
# Prints the total number of unique tokens in the input text


Input max length is 22
Encoder input data shape -> (576, 22)
Number of Input tokens = 530


In [39]:
encoder_input_data

array([[  3,   2,  86, ...,   0,   0,   0],
       [ 13, 211,  87, ...,   0,   0,   0],
       [ 32,  41,  15, ...,   0,   0,   0],
       ...,
       [510, 511,   9, ...,   0,   0,   0],
       [  9, 209, 210, ...,   0,   0,   0],
       [522, 194,   5, ...,   0,   0,   0]], dtype=int32)

## Preparing input data for the Decoder


In [40]:
output_lines = list()  
# Initializes an empty list to store the modified output text lines

for line in lines.output:  
    output_lines.append('<START> ' + line + ' <END>')  
    #<START> token as the initial input. the decoder generates tokens step-by-step until the <END> token is predicted

tokenizer = preprocessing.text.Tokenizer()  
# Creates a tokenizer object to convert the output text into numerical values

tokenizer.fit_on_texts(output_lines)  
# Fits the tokenizer on the output text lines and creates vocab then adding an integer index to each word
 
tokenized_output_lines = tokenizer.texts_to_sequences(output_lines)  
# Converts each output text line into a sequence of numeric numbers

length_list = list()  
# Initializes a list to store the lengths of each tokenized output sequence

for token_seq in tokenized_output_lines:  
    length_list.append(len(token_seq))  
    # adds the length of each tokenized sequence to length_list

max_output_length = np.array(length_list).max()  
# gets the maximum sequence length from the list of lengths

print('Output max length is {}'.format(max_output_length))  

padded_output_lines = preprocessing.sequence.pad_sequences(tokenized_output_lines, maxlen=max_output_length, padding='post')  
# Pads the tokenized output sequences to make all the sequences are the same length (using post-padding)

decoder_input_data = np.array(padded_output_lines)  
# Converts the padded sequences into a NumPy array

print('Decoder input data shape -> {}'.format(decoder_input_data.shape))  
# Prints the shape of the decoder input data to verify its dimensions

output_word_dict = tokenizer.word_index  
# gets the word-to-index mapping created by the tokenizer for the output text

num_output_tokens = len(output_word_dict) + 1  
# Calculates the total number of unique tokens in the output text, adding 1 to account for zero-based indexing

print('Number of Output tokens = {}'.format(num_output_tokens))  
# Prints the total number of unique tokens in the output text


Output max length is 74
Decoder input data shape -> (576, 74)
Number of Output tokens = 1707


## Preparing target data for the Decoder 

In [22]:
decoder_target_data = list()  
# Initializes an empty list to store the target data for the decoder

for token_seq in tokenized_output_lines:  
    decoder_target_data.append(token_seq[1:])  
   # creates the decoder target data by removing the first token (<START>) from each tokenized output sequence
    # The target sequence starts from the second token as the model predicts the next word at each step

padded_output_lines = preprocessing.sequence.pad_sequences(decoder_target_data, maxlen=max_output_length, padding='post')  
# Pads the target sequences to ensure they are the same length as max_output_length (using post-padding)

onehot_output_lines = utils.to_categorical(padded_output_lines, num_output_tokens)  
# Converts the padded target sequences into one-hot encoded format, where each word is represented as a vector
# with a 1 in the position corresponding to the word's index and 0 elsewhere.
# The `num_output_tokens' tells the size of the one-hot vector.

decoder_target_data = np.array(onehot_output_lines)  
# Converts the one-hot encoded target data into a NumPy array.

print('Decoder target data shape -> {}'.format(decoder_target_data.shape))  



Decoder target data shape -> (576, 74, 1707)


## Defining the Model


In [41]:
# Encoder Inputs
encoder_inputs = tf.keras.layers.Input(shape=(None,))  
# Defines the input layer for the encoder where the input shape is a sequence of variable length (None ashan yt3amel maa kza length)

# Encoder Embedding
encoder_embedding = tf.keras.layers.Embedding(num_input_tokens, 256, mask_zero=True)(encoder_inputs)  
# Embedding layer maps input tokens (num_input_tokens) into dense 256-dimensional(embedding dim) vectors
# mask_zero=True allows the model to ignore padding tokens during training

# Encoder LSTM
encoder_outputs, state_h, state_c = tf.keras.layers.LSTM(
    256, return_state=True, recurrent_dropout=0.2, dropout=0.2
)(encoder_embedding)
 
# LSTM layer processes the embedding sequences and outputs the hidden state and the cell state
# recurrent_dropout=0.2 adds dropout to the recurrent connections, and dropout=0.2, 20% of the input features are randomly set to zero during training
# return_state=True ensures the final hidden and cell states are returned

encoder_states = [state_h, state_c]  
# Stores the encoder's hidden and cell states for initializing the decoder

# Decoder Inputs
decoder_inputs = tf.keras.layers.Input(shape=(None,))  
# Defines the input layer for the decoder, where the input shape is a sequence of variable length (None)

# Decoder Embedding
decoder_embedding = tf.keras.layers.Embedding(num_output_tokens, 256, mask_zero=True)(decoder_inputs)  
# Embedding layer maps output tokens (num_output_tokens) into dense 256-dimensional vectors.

# Decoder LSTM
decoder_lstm = tf.keras.layers.LSTM(
    256, return_state=True, return_sequences=True, recurrent_dropout=0.2, dropout=0.2
)  
# LSTM layer processes the decoder's embedding sequences.
# return_sequences=True ensures the LSTM outputs a sequence for each time step
# return_state=True ensures the final hidden and cell states are returned

decoder_outputs, _, _ = decoder_lstm(decoder_embedding, initial_state=encoder_states)    
# The decoder LSTM is initialized with the encoder's final hidden and cell states

# Dense Layer
decoder_dense = tf.keras.layers.Dense(num_output_tokens, activation=tf.keras.activations.softmax)  
# Fully connected layer applies a softmax activation to predict the next token for each time step

output = decoder_dense(decoder_outputs)  
# Applies the dense layer to the decoder outputs, producing the final probabilities for each token

# Define the Model
model = tf.keras.models.Model([encoder_inputs, decoder_inputs], output)  
# Creates the sequence-to-sequence model with the encoder and decoder as inputs and the token probabilities as output

# Compile the Model
model.compile(optimizer=tf.keras.optimizers.Adam(), loss='categorical_crossentropy')  
# Compiles the model using the Adam optimizer(adaptive learning and mostly used) and used for multi-class prediction.

# Model Summary
model.summary()  



In [42]:
history = model.fit(
    [encoder_input_data, decoder_input_data],  # Input data for the encoder and decoder
    decoder_target_data,  # Target output for the decoder, one-hot encoded
    batch_size=124,  # number of samples per batch for gradient updates
    epochs=500# number of epochs
)
# Trains the model using the provided data, updating weights to minimize the loss function
# The `history` object stores training metrics, such as loss, for each epoch

model.save('model.h5')  
# Saves the trained model to a file named 'model.h5'
# The saved model includes the architecture, weights, and optimizer state, allowing it to be reloaded when used


Epoch 1/500
[1m5/5[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m4s[0m 384ms/step - loss: 7.4368
Epoch 2/500
[1m5/5[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 406ms/step - loss: 7.3280
Epoch 3/500
[1m5/5[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 373ms/step - loss: 6.5430
Epoch 4/500
[1m5/5[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 361ms/step - loss: 5.8277
Epoch 5/500
[1m5/5[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 359ms/step - loss: 5.7628
Epoch 6/500
[1m5/5[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 371ms/step - loss: 5.6999
Epoch 7/500
[1m5/5[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 368ms/step - loss: 5.5920
Epoch 8/500
[1m5/5[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 401ms/step - loss: 5.5417
Epoch 9/500
[1m5/5[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 369ms/step - loss: 5.4931
Epoch 10/500
[1m5/5[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 373ms/step - loss: 5.4467



## Inference models

In [43]:
def make_inference_models():
    # Create the encoder model for inference
    encoder_model = tf.keras.models.Model(encoder_inputs, encoder_states)
    # Extracts the input layer and outputs the encoder states (hidden and cell states)

    # Create input layers for the decoder's hidden and cell states during inference
    decoder_state_input_h = tf.keras.layers.Input(shape=(256,))
    decoder_state_input_c = tf.keras.layers.Input(shape=(256,))
    # These inputs correspond to the initial states passed to the decoder during inference

    decoder_states_inputs = [decoder_state_input_h, decoder_state_input_c]
    # Groups the hidden and cell states into a list for easier handling

    # Use the existing decoder LSTM layer, but with the new initial states
    decoder_outputs, state_h, state_c = decoder_lstm(
        decoder_embedding, initial_state=decoder_states_inputs
    )
    # Processes the decoder embedding with the provided states and returns the next output and states

    decoder_states = [state_h, state_c]
    # Groups the new hidden and cell states into a list

    decoder_outputs = decoder_dense(decoder_outputs)
    # Applies the dense (softmax) layer to the LSTM output to generate token probabilities

    # Define the decoder model for inference
    decoder_model = tf.keras.models.Model(
        [decoder_inputs] + decoder_states_inputs,  # Inputs tokens and previous states
        [decoder_outputs] + decoder_states  # Outputs predicted tokens and updated states
    )

    # Return the inference models
    return encoder_model, decoder_model


In [44]:
def str_to_tokens(sentence: str):
   
    words = sentence.lower().split()  
    # Converts the input sentence to lwercase and splits it into words

    tokens_list = list()  
    # Initializes an empty list to store the tokenized words

    for word in words:  
        tokens_list.append(input_word_dict[word])  
        # Converts each word to its corresponding integer token using the input word dictionary

    return preprocessing.sequence.pad_sequences(
        [tokens_list], maxlen=max_input_length, padding='post'
    )  
    # Pads the tokenized sequence to the `max_input_length` with zeros at the end, make sure same sequence length


In [51]:
# Load inference models
enc_model, dec_model = make_inference_models()

# Interactive loop for sequence-to-sequence translation
for epoch in range(encoder_input_data.shape[0]):
    # Encode user input
    states_values = enc_model.predict(str_to_tokens(input('User: ')), verbose=False)
    
    # Initialize target sequence with the <START> token
    empty_target_seq = np.zeros((1, 1))
    empty_target_seq[0, 0] = output_word_dict['start']
    
    stop_condition = False  # Flag to indicate when to stop decoding
    decoded_translation = ''  # Initialize the translation string
    
    while not stop_condition:
        # Predict the next token and states using the decoder
        dec_outputs, h, c = dec_model.predict([empty_target_seq] + states_values, verbose=False)
        
        # Get the token with the highest probability (argmax)
        sampled_word_index = np.argmax(dec_outputs[0, -1, :])
        
        # Convert token index to word
        sampled_word = None
        for word, index in output_word_dict.items():
            if sampled_word_index == index:
                decoded_translation += ' {}'.format(word)  # Append the word to the translation
                sampled_word = word
        
        # Stop if the <END> token is generated or the translation exceeds max length
        if sampled_word == 'end' or len(decoded_translation.split()) > max_output_length:
            stop_condition = True
        
        # Update the target sequence for the next time step
        empty_target_seq = np.zeros((1, 1))
        empty_target_seq[0, 0] = sampled_word_index
        
        # Update the decoder states
        states_values = [h, c]
    
    # Output the final translation
    print("Bot:" + decoded_translation.replace(' end', ''))
    print()


KeyboardInterrupt: Interrupted by user