<h1><strong><u>RNN Text Model</u></strong></h1>

In [12]:
import numpy as np
import tensorflow as tf
from nltk.tokenize import word_tokenize
import re
from keras import Input, activations
from keras.callbacks import ModelCheckpoint
from keras.layers import SimpleRNN, Dense, LSTM, Dropout, Embedding
from keras.losses import CategoricalCrossentropy
from keras.models import Sequential
from sklearn.preprocessing import OrdinalEncoder

<h2><strong><u>Data Preprocessing Methods</u></strong></h2>

In [14]:
def tokenize_text(text):
    new_text = text.lower() #lowercase
    new_text = re.sub(r"([^\w\s])", "", new_text) #remove punctuation
    text_array = word_tokenize(new_text) #tokenize
    return text_array

In [15]:
# Ordinal Encoding

def encode_text(text):
    """Given a list of words, I encode it word by word with each word being a sample). 
    I return the result and the encoder."""
    new_text = text.lower() #lowercase
    new_text = re.sub(r"([^\w\s])", "", new_text) #remove punctuation
    text_array = word_tokenize(new_text) #tokenize
    info("Encoding inputs...")
    debug(f"{text_array}")
    encoder = OrdinalEncoder()
    #result = encoder.fit_transform(text)
    result = encoder.fit_transform(np.reshape(text_array, (len(text_array), 1)))
    # info("Number of input characters:", len(encoder.categories_[0]))
    # debug("Input categories:", encoder.categories_[0])
    # info(f"{result.shape=}")
    # debug(result)
    #print(result.shape)
    return result, encoder


# encoded_array, encoder = encode_text(text)
# features, targets = time_delayed(encoded_array, 5)
# print("Features:")
# print(features.shape)
# print("Targets:")
# print(targets.shape)

In [16]:
# max_value = max(encoded_array)
# print(max_value[0])

In [17]:
# set(encoded_array.flatten())

<h2><u>RNN Class</u></h2>

In [18]:
PRINT_DEBUG = False
PRINT_INFO = True


def debug(*args):
    if PRINT_DEBUG:
        print(*args)


def info(*args):
    if PRINT_INFO:
        print(*args)

In [19]:
def time_delayed(seq, delay):
    features = []
    targets = []
    for target_index in range(delay, len(seq)):
        features.append(seq[target_index - delay:target_index])
        targets.append(seq[target_index])
    return np.array(features), np.array(targets)

In [20]:
# # this will need to be modified to handle words instead of letters
# def encode_sequence(sequence):
#     """Given a string, I encode it letter by letter (each letter is a sample). I return the
#     result and the encoder."""
#     info("Encoding inputs...")
#     debug(f"{sequence}")
#     encoder = OrdinalEncoder(sparse=False)
#     result = encoder.fit_transform(np.reshape(sequence, (len(sequence), 1)))
#     info("Number of input characters:", len(encoder.categories_[0]))
#     debug("Input categories:", encoder.categories_[0])
#     info(f"{result.shape=}")
#     debug(result)
#     return result, encoder

In [21]:
class RNNTextModel:
    def __init__(self, training_string, delay_length=100):
        encoded_training_data, self.encoder = encode_text(training_string)
        self.time_steps = delay_length
        max_vocabulary_size = len(set(encoded_training_data.flatten()))
        info("Number of distinct words:", max_vocabulary_size)
        debug("encoded_training_data:", encoded_training_data)
        self.X_delayed, self.y_delayed = time_delayed(encoded_training_data, self.time_steps)
        self.model = self.create_model(max_vocabulary_size, self.X_delayed.shape, self.y_delayed.shape)

    #figure out shape issues to see which values to use for embedding layer
    def create_model(max_vocabulary_size, input_shape, output_shape, delay_length=100):
        info("Creating model...")
        info("Input shape:", input_shape[1:])
        model = Sequential(
            [Input(shape=input_shape[1:]),
            Embedding(input_dim=max_vocabulary_size, output_dim=64, input_length=delay_length),
            LSTM(256, return_sequences=True, activation=activations.tanh),
            Dropout(0.2),
            LSTM(256, activation=activations.tanh),
            Dropout(0.2),
            Dense(output_shape[1], activation=activations.softmax)]
        )
        model.summary()
        model.compile(optimizer="adam", loss=CategoricalCrossentropy(), metrics=["categorical_accuracy"])
        return model
    
    def encode_input_string(self, string):
        encoded_input_array = encode_text(string)
        return encoded_input_array
    
    def fit(self, prefix, epochs=2):
        info("Fitting...")
        callbacks = []
        if prefix is not None:
            checkpoint = ModelCheckpoint(prefix + "-{epoch:03d}-{loss:.4f}.hdf5", monitor='loss', verbose=1,
                                         save_best_only=True, mode='min')
            callbacks = [checkpoint]
        self.model.fit(self.X_delayed, self.y_delayed, epochs=epochs, verbose=True, callbacks=callbacks, batch_size=1000)

    def load_weights(self, filename):
        info(f"Loading weights from {filename}...")
        self.model.load_weights(filename)
    
    def predict_from_seed(self, seed, prediction_count):
        info("Predicting output sequence...")
        result = seed
        new_seed = seed
        for i in range(prediction_count):
            inp = self.encode_input_string(new_seed)
            debug(f"{inp=}")
            p = self.encoder.inverse_transform(self.model.predict(inp))
            debug(f"{p=}")
            print(p.shape)
            print(f"Predicted word: {p[0][0]}")
            result += p[0][0]
            new_seed = result[-len(seed):]
        return result

In [None]:
def main(text, seed):
    rnn_model = RNNTextModel(text, delay_length=5)
    rnn_model.fit(prefix="rnn_text_model", epochs=10)
    output = rnn_model.predict_from_seed(seed, 10)
    print(f"Generated text: {seed} {output}")

In [22]:
try:
    with open("./the_sunken_world.txt", "r", encoding="utf-8") as file:
        text = file.read()
except FileNotFoundError:
    print("Error: The file 'your_file.txt' was not found.")
except Exception as e:
    print(f"An error occurred: {e}")