### Let's import the required packages.

In [13]:
import tensorflow as tf
import matplotlib.pyplot as plt
import re
import numpy as np
import os
import sys
import pickle
from time import time
start_time = time()

# keras tokenizer
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow import keras
#required layers
from tensorflow.keras.layers import Embedding, LSTM, Dense,SimpleRNN, GRU, Dropout, Bidirectional
#to convert the output to one hot encoded data
from tensorflow.keras.utils import to_categorical
#nestrouv adam optimizer
from tensorflow.keras.optimizers import Nadam
#to convert tokens to ids
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping
from tensorflow.keras.models import load_model
from tqdm.notebook import tqdm


print(sys.version)

### Read file data

In [14]:
#the source of the data used https://www.gutenberg.org/files/1497/1497-h/1497-h.htm#link2H_4_0004
file = open(".your-path-here/republic_clean.txt",mode="r", encoding = "utf8")
#convert each line to item in a list
lines = file.read().splitlines()
#join them again into one string
lines = ' '.join(lines)

lines = lines.replace('--', ' ')
# remove punctuations
lines = re.sub(r'[^\w\s]','',lines)
#remove more than followed speace
lines = re.sub(' +', ' ', lines)
lines = lines.split()

#if the text has 'BOOK I' title keep the followed line, if doesn't has this title hash the followed code.
lines = lines[2:]
#remove non-alphabetic word in the text
lines = [word for word in lines if word.isalpha()]
#slice the text data
test_lines = lines[550:651]
print(f"number of words in the text file after cleaning is : {len(lines)}")
print(f"number of words in the test file after cleaning is : {len(test_lines)}")

#convert them to one text again
lines = ' '.join(lines)
test_lines = ' '.join(test_lines)
#convert them to lower case
lines = lines.lower()
test_lines = test_lines.lower()

### Continue in preprocessing the text data

In [15]:
#instantiate the tokenizer
tokenizer = Tokenizer()
#fit on the data
tokenizer.fit_on_texts([lines])
#convert tokens to sequences
IDs = np.array(tokenizer.texts_to_sequences([lines])[0])
#change their type to int16 to consume less memory
IDs = IDs.astype('int16')
print(f"number of IDs is: {len(IDs)}")

### check the number of unique words

In [16]:
vocab_size = len(tokenizer.word_index) + 1
print(f"number of unique words is: {vocab_size}")

### Organize the data into organized sequences each sequence has 51 value.

In [17]:
sequences = []

for i in tqdm(range(50,len(IDs))):
    sequence = IDs[i-50:i+1]
    sequences.append(sequence)
print('the total number of sequences is:',len(sequences))

sequences = np.array(sequences)
print(f"The sequences shape is {sequences.shape}")
print(f"The number of words in each sequence is: {sequences.shape[1]}")

## X will hold the 50 input words and y will hold the output words.

In [18]:
X = []
y = []

for i in tqdm(sequences):
    X.append(i[0:-1])
    y.append(i[-1])
    
X = np.array(X)
y = np.array(y)
print(f"The number of sequences in the input variable is: {X.shape[0]} and the number of input words in each sequence is {X.shape[1]}")
print(f"The number of sequences in the output variable is: {y.shape[0]} and each sequence has one output word\n")

print(f"First sequence is: {X[0]}, and it has {len(X[0])} words")
print(f"and the response is: {y[0]}")

In [19]:
#Convert the output to one hot encoded data.
y = to_categorical(y, num_classes=vocab_size)

### Function to plot the performance

In [20]:
def plot_performance(history):
    """
    function to plot training price loss vs validation price loss and training price accuracy vs validation price accuracy.<br>
    
    params:
    
    history: model.fit object
    
    return:
    
    None
    """
    val_loss_per_epoch = history.history['val_loss']
    loss_per_epoch = history.history['loss']
    val_accuracy_per_epoch = history.history['val_categorical_accuracy']
    accuracy_per_epoch = history.history['categorical_accuracy']
    plt.figure(figsize=(8,8))
    plt.title(f"Training loss & validation loss with batch size 200")
    plt.xlabel('epoch')
    plt.ylabel('loss function')
    plt.plot(np.arange(1,len(val_loss_per_epoch)+1),val_loss_per_epoch,label=f"validation loss")
    plt.plot(np.arange(1,len(loss_per_epoch)+1),loss_per_epoch,label = f"training loss")
    plt.legend(loc="upper left")
    plt.show()
    plt.figure(figsize=(8,8))
    plt.title(f"Training accuracy & validation accuracy with batch size 200")
    plt.xlabel('epoch')
    plt.ylabel('accuracy')
    plt.plot(np.arange(1,len(val_accuracy_per_epoch)+1),val_accuracy_per_epoch,label="validation accuracy")
    plt.plot(np.arange(1,len(accuracy_per_epoch)+1),accuracy_per_epoch,label = "training accuracy")
    #plt.xticks(np.arange(0, 55, 5))
    #plt.yticks(np.arange(0, 105, 5))
    plt.legend(loc="upper left")
    plt.show()

In [21]:
def predict_next_word_proba(recurrent_model, tokenizer, data):
  #convert text to array
  data = data.split()
  #last word will be the true value
  correct_word = data[-1]
  #take last 50 words to be the test data
  data = data[50:-1]

  #convert to data to string
  data = ' '.join(data)
  #convert them to id sequences using our predefined tokenizer
  sequence = tokenizer.texts_to_sequences([data])
  #convert test sequence to numpy array
  sequence = np.array(sequence)
  #start predict the probability of the next word
  proba = recurrent_model.predict(sequence)['next_word']
  #sort the probabilities
  sorted_proba = np.sort(recurrent_model.predict(sequence)['next_word'][0],kind = 'mergesort')
  L = np.argsort(-proba)
  #get the index of largest probability
  largest_value_index = L[:,0]
  #get the index of the second largest probability
  second_largest_value_index = L[:,1]
  #get the index of third largest probability
  third_largest_value_index = L[:,2]
  #get the largest probability
  largest_proba = sorted_proba[-1]
  #get the second largest probability
  second_largest_proba = sorted_proba[-2]
  #get the third largest probability
  third_largest_proba = sorted_proba[-3]

  #define three variables each variable will hold the value of the next word first variable will be the most predicted value
  next_word_1 = None
  next_word_2 = None
  next_word_3 = None

  #this for loop will get the word as string
  #value here is the id of the word and the key is the word itself
  for key, value in tokenizer.word_index.items():
      if value == largest_value_index[0]:
          next_word_1 = key
          continue
      elif value == second_largest_value_index[0]:
          next_word_2 = key
          continue
      elif value == third_largest_value_index[0]:
          next_word_3 = key
          continue

  return f"Correct word is [{correct_word}]\n\nthe predict next word will be one of these three words (the higher probability the higher chance to be the next word)\n\nthe predicted next word is [{next_word_1}] with the largest probability {round(largest_proba,3)}\nthe next word could be [{next_word_2}] with the second largest probability {round(second_largest_proba,3)}\nand it could be [{next_word_3}] with the third largest probability {round(third_largest_proba,3)}"

### First trial: Simple recurrent language model

In [41]:
first_start_time = time()

keras.backend.clear_session()
in_text = keras.Input(batch_shape=(None, 50))

#embedding layer uses a distributed representation for words so that different words with similar meanings will have a similar representation.
embedded = keras.layers.Embedding(vocab_size, 50)(in_text)
#simple rnn
#relu activation function to avoid vanishing gradient as much as we can
RNN =  SimpleRNN(units = 1000)(embedded)

#averaged = tf.reduce_mean(bi_lstm, axis=1)
drop_out = Dropout(rate=0.2)(RNN)

fused = Dense(128, activation='relu')(drop_out)
output = Dense(vocab_size, activation='softmax')(fused)

rnn_model = keras.Model(
    inputs={
        'previous_words': in_text,
    },
    outputs={
        'next_word': output,
    },
)
rnn_model.summary()
tf.keras.utils.plot_model(rnn_model,to_file='first model.png', show_shapes=True)

In [None]:
rnn_model.compile(
loss={
    'next_word':"categorical_crossentropy"
},
metrics={
    'next_word':["categorical_accuracy"]
},
loss_weights={
    'next_word':1,
},
optimizer=Nadam(learning_rate=0.001)
)

In [None]:
checkpoint = ModelCheckpoint("simple_rnn_model.h5", monitor='val_categorical_accuracy', verbose=1, save_best_only=True, save_weights_only=False)
early_stopping = EarlyStopping(monitor='val_categorical_accuracy', patience=5)
#To train the first model on GPU
tf.debugging.set_log_device_placement(True)
#Start training the first model
rnn_history = rnn_model.fit(x={'previous_words': X}, y= {'next_word':y},validation_split=0.2, epochs=50, batch_size=256, callbacks=[checkpoint,early_stopping])

### plotting first model performance

In [None]:
plot_performance(rnn_history)

### Start predicting the next word probability using the first model.

In [None]:
#load the best model which gave us the best validation accuracy to predict the test data.
rnn_model = load_model('simple_rnn_model.h5')
print('input data: '+test_lines[:-3]+' .....')
print('\n')
print(predict_next_word_proba(rnn_model, tokenizer, test_lines))

### Calculate the total time taken to train and test the first model.

In [None]:
first_end_time = time()
total_time = first_end_time - first_start_time
result = '{0:02.0f} minutes and {1:02.0f} seconds'.format(*divmod((total_time/60) * 60, 60))
print(f"The total time taken to train and test the first model was: {result}")

### Second Trail: LSTM then gru layer to memorize sequences.

In [40]:
second_start_time = time()

keras.backend.clear_session()
in_text = keras.Input(batch_shape=(None, 50))

#embedding layer uses a distributed representation for words so that different words with similar meanings will have a similar representation.
embedded = keras.layers.Embedding(vocab_size, 50)(in_text)
#lstm layer
lstm =  LSTM(units = 1000,return_sequences=True)(embedded)
gru =  GRU(units = 736)(lstm)

drop_out = Dropout(rate=0.2)(gru)

fused = Dense(128, activation='relu')(drop_out)
output = Dense(vocab_size, activation='softmax')(fused)

second_model = keras.Model(
    inputs={
        'previous_words': in_text,
    },
    outputs={
        'next_word': output,
    },
)
second_model.summary()
tf.keras.utils.plot_model(second_model,to_file='second model.png', show_shapes=True)

In [None]:
second_model.compile(
loss={
    'next_word':"categorical_crossentropy"
},
metrics={
    'next_word':["categorical_accuracy"]
},
loss_weights={
    'next_word':1,
},
optimizer=Nadam(learning_rate=0.001)
)

In [None]:
checkpoint = ModelCheckpoint("second_model.h5", monitor='val_categorical_accuracy', verbose=1, save_best_only=True, save_weights_only=False)
early_stopping = EarlyStopping(monitor='val_categorical_accuracy', patience=5)
#To train the second model on GPU
tf.debugging.set_log_device_placement(True)
#Start training the second model.
second_history = second_model.fit(x={'previous_words': X}, y= {'next_word':y},validation_split=0.2, epochs=50, batch_size=256, callbacks=[checkpoint,early_stopping])

### Plotting second model preformance

In [None]:
plot_performance(second_history)

### Start predicting the next word probability using the second model.

In [None]:
#load the best model which gave us the best validation accuracy to predict the test data.
second_model = load_model('second_model.h5')
print('input data: '+test_lines[:-3]+' .....')
print('\n')
print(predict_next_word_proba(second_model, tokenizer, test_lines))

### Calculate the total time taken to train and test the second model.

In [None]:
second_end_time = time()
total_time = second_end_time - second_start_time
result = '{0:02.0f} minutes and {1:02.0f} seconds'.format(*divmod((total_time/60) * 60, 60))
print(f"The total time taken to train and test the second model was: {result}")

### Third trial: GRU Model

In [33]:
third_start_time = time()

keras.backend.clear_session()
in_text = keras.Input(batch_shape=(None, 50))

#embedding layer uses a distributed representation for words so that different words with similar meanings will have a similar representation.
embedded = keras.layers.Embedding(vocab_size, 50)(in_text)
#GRU layer
gru =  GRU(units = 1000,return_sequences=True)(embedded)
lstm =  LSTM(units = 736)(gru)

drop_out = Dropout(rate=0.2)(lstm)

fused = Dense(128, activation='relu')(drop_out)
output = Dense(vocab_size, activation='softmax')(fused)

third_model = keras.Model(
    inputs={
        'previous_words': in_text,
    },
    outputs={
        'next_word': output,
    },
)
third_model.summary()
tf.keras.utils.plot_model(third_model,to_file='third model.png', show_shapes=True)

In [34]:
third_model.compile(
loss={
    'next_word':"categorical_crossentropy"
},
metrics={
    'next_word':["categorical_accuracy"]
},
loss_weights={
    'next_word':1,
},
optimizer=Nadam(learning_rate=0.001)
)

In [35]:
checkpoint = ModelCheckpoint("third_model.h5", monitor='val_categorical_accuracy', verbose=1, save_best_only=True, save_weights_only=False)
early_stopping = EarlyStopping(monitor='val_categorical_accuracy', patience=5)
#To train the third model on GPU
tf.debugging.set_log_device_placement(True)
#Start training the third model
third_history = third_model.fit(x={'previous_words': X}, y= {'next_word':y},validation_split=0.2, epochs=50, batch_size=256, callbacks=[checkpoint,early_stopping])

### Plotting third model performance

In [36]:
plot_performance(third_history)

### Start predicting the next word probability using the third model.

In [37]:
#load the best model which gave us the best validation accuracy to predict the test data.
third_model = load_model('third_model.h5')
print('input data: '+test_lines[:-3]+' .....')
print('\n')
print(predict_next_word_proba(third_model, tokenizer, test_lines))

### Calculate the total time taken to train and test the third model.

In [39]:
third_end_time = time()
total_time = third_end_time - third_start_time
result = '{0:02.0f} minutes and {1:02.0f} seconds'.format(*divmod((total_time/60) * 60, 60))
print(f"The total time taken to train and test the third model was: {result}")

### Fourth trial: Bi-directional LSTM

In [28]:
fourth_start_time = time()

keras.backend.clear_session()
in_text = keras.Input(batch_shape=(None, 50))

#embedding layer uses a distributed representation for words so that different words with similar meanings will have a similar representation.
embedded = keras.layers.Embedding(vocab_size, 50)(in_text)
#Bi-directional LSTM layer
bi_lstm =  Bidirectional(LSTM(units = 1000))(embedded)

drop_out = Dropout(rate=0.2)(bi_lstm)

fused = Dense(128, activation='relu')(drop_out)
output = Dense(vocab_size, activation='softmax')(fused)

fourth_model = keras.Model(
    inputs={
        'previous_words': in_text,
    },
    outputs={
        'next_word': output,
    },
)
fourth_model.summary()
tf.keras.utils.plot_model(fourth_model,to_file='fourth model.png', show_shapes=True)

In [29]:
fourth_model.compile(
loss={
    'next_word':"categorical_crossentropy"
},
metrics={
    'next_word':["categorical_accuracy"]
},
loss_weights={
    'next_word':1,
},
optimizer=Nadam(learning_rate=0.001)
)

In [30]:
checkpoint = ModelCheckpoint("fourth_model.h5", monitor='val_categorical_accuracy', verbose=1, save_best_only=True, save_weights_only=False)
early_stopping = EarlyStopping(monitor='val_categorical_accuracy', patience=5)
#To train the fourth model on GPU
tf.debugging.set_log_device_placement(True)
#Start training the fourth model
fourth_history = fourth_model.fit(x={'previous_words': X}, y= {'next_word':y},validation_split=0.2, epochs=50, batch_size=256, callbacks=[checkpoint,early_stopping])

### Plotting the fourth model preformance

In [31]:
plot_performance(fourth_history)

### Start predicting the next word probability using the fourth model.

In [32]:
#load the best model which gave us the best validation accuracy to predict the test data.
fourth_model = load_model('fourth_model.h5')
print('input data: '+test_lines[:-3]+' .....')
print('\n')
print(predict_next_word_proba(fourth_model, tokenizer, test_lines))

### Calculate the total time taken to train and test the fourth model.

In [38]:
fourth_end_time = time()
total_time = fourth_end_time - fourth_start_time
result = '{0:02.0f} minutes and {1:02.0f} seconds'.format(*divmod((total_time/60) * 60, 60))
print(f"The total time taken to train and test the fourth model was: {result}")

### Fifth trial: Bi-directional GRU to memorize the sequences.

In [22]:
fifth_start_time = time()

keras.backend.clear_session()
in_text = keras.Input(batch_shape=(None, 50))

#embedding layer uses a distributed representation for words so that different words with similar meanings will have a similar representation.
embedded = keras.layers.Embedding(vocab_size, 50)(in_text)
#Bi-directional GRU layer
bi_gru =  Bidirectional(GRU(units = 1000))(embedded)

drop_out = Dropout(rate=0.2)(bi_gru)

fused = Dense(128, activation='relu')(drop_out)
output = Dense(vocab_size, activation='softmax')(fused)

fifth_model = keras.Model(
    inputs={
        'previous_words': in_text,
    },
    outputs={
        'next_word': output,
    },
)
fifth_model.summary()
tf.keras.utils.plot_model(fifth_model,to_file='fifth model.png', show_shapes=True)

In [23]:
fifth_model.compile(
loss={
    'next_word':"categorical_crossentropy"
},
metrics={
    'next_word':["categorical_accuracy"]
},
loss_weights={
    'next_word':1,
},
optimizer=Nadam(learning_rate=0.001)
)

### Starting training the fifth model.

In [24]:
checkpoint = ModelCheckpoint("fifth_model.h5", monitor='val_categorical_accuracy', verbose=1, save_best_only=True, save_weights_only=False)
early_stopping = EarlyStopping(monitor='val_categorical_accuracy', patience=7)
#To train the fifth model on GPU
tf.debugging.set_log_device_placement(True)
#Start training the fifth model
fifth_history = fifth_model.fit(x={'previous_words': X}, y= {'next_word':y},validation_split=0.2, epochs=50, batch_size=256, callbacks=[checkpoint,early_stopping])

### Plotting the fifth model performance.

In [None]:
plot_performance(fifth_history)

### Start predicting the next word probability using the fifth model.

In [26]:
#load the best model which gave us the best validation accuracy to predict the test data.
fifth_model = load_model('fifth_model.h5')
print('input data: '+test_lines[:-3]+' .....')
print('\n')
print(predict_next_word_proba(fifth_model, tokenizer, test_lines))

### Calculate the total time taken to train and test the fifth model.

In [27]:
fifth_end_time = time()
total_time = fifth_end_time - fifth_start_time
result = '{0:02.0f} minutes and {1:02.0f} seconds'.format(*divmod((total_time/60) * 60, 60))
print(f"The total time taken to train and test the fifth model was: {result}")

### Last trial: Bi-directional GRU connected with Bi-directional LSTM to memorize the sequences. 

In [25]:
last_start_time = time()

keras.backend.clear_session()
in_text = keras.Input(batch_shape=(None, 50))

#embedding layer uses a distributed representation for words so that different words with similar meanings will have a similar representation.
embedded = keras.layers.Embedding(vocab_size, 50)(in_text)
#Bi-directional GRU layer
bi_gru =  Bidirectional(GRU(units = 1000, return_sequences = True))(embedded)
bi_lstm = Bidirectional(LSTM(units = 736))(bi_gru)

drop_out = Dropout(rate=0.2)(bi_lstm)

fused = Dense(128, activation='relu')(drop_out)
output = Dense(vocab_size, activation='softmax')(fused)

last_model = keras.Model(
    inputs={
        'previous_words': in_text,
    },
    outputs={
        'next_word': output,
    },
)
last_model.summary()
tf.keras.utils.plot_model(last_model,to_file='last model.png', show_shapes=True)

In [None]:
last_model.compile(
loss={
    'next_word':"categorical_crossentropy"
},
metrics={
    'next_word':["categorical_accuracy"]
},
loss_weights={
    'next_word':1,
},
optimizer=Nadam(learning_rate=0.001)
)

### Start training the last model.

In [None]:
checkpoint = ModelCheckpoint("last_model.h5", monitor='val_categorical_accuracy', verbose=1, save_best_only=True, save_weights_only=False)
early_stopping = EarlyStopping(monitor='val_categorical_accuracy', patience=7)
#To train the last model on GPU
tf.debugging.set_log_device_placement(True)
#Start training the last models
last_history = last_model.fit(x={'previous_words': X}, y= {'next_word':y},validation_split=0.2, epochs=50, batch_size=256, callbacks=[checkpoint,early_stopping])

### Plotting the last model performance.

In [None]:
plot_performance(last_history)

### Start predicting the next word probability using the last model.

In [None]:
#load the best model which gave us the best validation accuracy to predict the test data.
last_model = load_model('last_model.h5')
print('input data: '+test_lines[:-3]+' .....')
print('\n')
print(predict_next_word_proba(last_model, tokenizer, test_lines))

### Calculate the total time taken to train and test the last model.

In [None]:
last_end_time = time()
total_time = last_end_time - last_start_time
result = '{0:02.0f} minutes and {1:02.0f} seconds'.format(*divmod((total_time/60) * 60, 60))
print(f"The total time taken to train and test the last model was: {result}")

### Calculate the total time taken to run the whole code.

In [None]:
end_time = time()
total_time = end_time - start_time
result = '{0:02.0f} minutes and {1:02.0f} seconds'.format(*divmod((total_time/60) * 60, 60))
print(f"The total time taken to run this code was: {result}")