# RECURSIVE NEURAL NETWORK FOR TEXT PREDICTION

### 1. Mount Drive

In [0]:
from google.colab import drive
drive.mount('/content/gdrive')

Drive already mounted at /content/gdrive; to attempt to forcibly remount, call drive.mount("/content/gdrive", force_remount=True).


In [0]:
cd ~/../content/gdrive/My Drive/Colab Notebooks

/content/gdrive/My Drive/Colab Notebooks


### 2. Load Modules

In [0]:
from keras.callbacks import Callback, ModelCheckpoint, EarlyStopping, ReduceLROnPlateau
from keras.models import Sequential, load_model
from keras.layers import Dense, LSTM, SimpleRNN
from keras.regularizers import l2
from keras.optimizers import RMSprop
import numpy as np
import sys
from matplotlib import pyplot as plt
import io
from random import randint

### 3. Define Parameters

In [0]:
# hyper parameters
train_path = 'shakespeare_train.txt'
val_path = 'shakespeare_valid.txt'
sequence_len = 40
step = 3
learning_rate = 0.001
batch_size = 128
num_epochs = 5

# flags
model_type = 0                      # 0 -> Simple RNN, 1 -> LSTM
verbose = 1                         # 0 -> no verbosity, 1 -> verbosity
output_text = 0                     # 0 -> text output callback off, 1 -> text output callback on

### 4. Load Dataset

4.1 Load Data from Drive

In [0]:
def load_text(path):
    with io.open(path, 'r', encoding='utf8') as f:
        return f.read()


def load_data(train_path, val_path):
    train_text = load_text(train_path)                              # load training text
    val_text = load_text(val_path)                                  # load validation text
    vocab = sorted(list(set(train_text + val_text)))                # create character's collection
    return {
        'train_text': train_text,
        'val_text': val_text,
        'vocab': vocab,
        'vocab_to_int': dict((c, i) for i, c in enumerate(vocab)),
        'int_to_vocab': dict((i, c) for i, c in enumerate(vocab))
    }

data = load_data(train_path, val_path)

4.2 Pre-Process Data

In [0]:
def create_sentences(text, sequence_len, step):
    sentences, next_chars = [], []
    for i in range(0, len(text) - sequence_len, step):
        sentences.append(text[i: i + sequence_len])
        next_chars.append(text[i + sequence_len])
    return sentences, next_chars


def encode_sequences(sentences, next_chars, data, sequence_len, vocab_len):
    num_sequences = len(sentences)
    X = np.zeros((num_sequences, sequence_len, vocab_len), dtype=np.bool)
    y = np.zeros((num_sequences, vocab_len), dtype=np.bool)
    for i, sentence in enumerate(sentences):
        for t, char in enumerate(sentence):
            X[i, t, data['vocab_to_int'][char]] = 1
        y[i, data['vocab_to_int'][next_chars[i]]] = 1
    return X, y


def preprocess_data(data, sequence_len, step):
    # cut the text in semi-redundant sequences of fixed length
    train_sentences, train_next_chars = create_sentences(data['train_text'], sequence_len, step)
    val_sentences, val_next_chars = create_sentences(data['val_text'], sequence_len, step)

    # one hot encode sequences
    X_train, y_train = encode_sequences(train_sentences, train_next_chars, data, sequence_len, len(data['vocab']))
    X_val, y_val = encode_sequences(val_sentences, val_next_chars, data, sequence_len, len(data['vocab']))

    return X_train, y_train, X_val, y_val


X_train, y_train, X_val, y_val = preprocess_data(data, sequence_len, step)

### 5. Build/Load Model

In [0]:
def build_LSTM(input_shape):
    model = Sequential()
    model.add(LSTM(32, recurrent_regularizer=l2(0.01), return_sequences=True, input_shape=input_shape))
    model.add(LSTM(32))
    model.add(Dense(input_shape[1], activation='softmax'))
    return model


def build_RNN(input_shape):
    model = Sequential()
    model.add(SimpleRNN(32, recurrent_regularizer=l2(0.01), return_sequences=True, input_shape=input_shape))
    model.add(SimpleRNN(32, return_sequences=True))
    model.add(SimpleRNN(32))
    model.add(Dense(input_shape[1], activation='softmax'))
    return model


# build model
input_shape = (sequence_len, len(data['vocab']))
model = build_RNN(input_shape) if not model_type else build_LSTM(input_shape)

# load model
# model = load_model('model.h5') if not model_type else load_model('model.h5')

### 6. Train Model

6.1 Define Callbacks

In [0]:
def sample(predictions, temperature=1.0):
    # helper function to sample an index from a probability array
    predictions = np.asarray(predictions).astype('float64')
    predictions = np.log(predictions) / temperature
    return np.argmax(np.random.multinomial(1, (np.exp(predictions) / np.sum(np.exp(predictions))), 1))


class PrintCallback(Callback):
    def __init__(self, model, model_name, int_to_vocab, vocab_to_int, sequence_len, vocab_len, text):
        super().__init__()
        self.model = model
        self.model_name = model_name
        self.vocab_to_int = vocab_to_int
        self.int_to_vocab = int_to_vocab
        self.sequence_len = sequence_len
        self.vocab_len = vocab_len
        self.text = text

    def on_epoch_end(self, epoch, logs=None):
        if (epoch + 1) % 5 == 0:
            start_index = randint(0, len(text) - sequence_len - 1)
            sentence = text[start_index: start_index + sequence_len]
            # sentence = 'juliet'
            generated = sentence
            file = open('sentence.txt', 'a')
            sys.stdout.write(sentence)
            print('Generating with seed: "' + sentence + '"')
            sys.stdout.write(generated)
            for i in range(400):
                x_pred = np.zeros((1, sequence_len, vocab_len))
                for t, char in enumerate(sentence):
                    x_pred[0, t, vocab_to_int[char]] = 1.
                predictions = model.predict(x_pred, verbose=0)[0]
                next_index = sample(predictions, 3)
                next_char = int_to_vocab[next_index]
                sentence = sentence[1:] + next_char
                sys.stdout.write(next_char)
                file.write(next_char)
                sys.stdout.flush()
            print()
            file.close()

model_name = 'SimpleRNN_model.h5' if not model_type else 'LSTM_model.h5'
callbacks = [
             ModelCheckpoint(model_name, save_best_only=True, monitor='val_acc', mode='max'),
             EarlyStopping(monitor='val_acc', mode='max', patience=5, verbose=1),
             ReduceLROnPlateau(monitor='val_acc', factor=0.1, patience=2, min_lr=0.000001, verbose=1)
]
if output_text:
    callbacks.append(PrintCallback(model, model_name, data['int_to_vocab'], data['vocab_to_int'], sequence_len, len(data['vocab']), data['train_text']))

6.2 Run Training Process

In [0]:
def fit_model(model, X_train, y_train, X_val, y_val, learning_rate, batch_size, num_epochs, callbacks, verbose):
    optimizer = RMSprop(learning_rate)
    model.compile(loss='categorical_crossentropy', optimizer=optimizer, metrics=['accuracy'])
    history = model.fit(
        X_train, y_train,
        validation_data=(X_val, y_val),
        batch_size=batch_size,
        epochs=num_epochs,
        callbacks=callbacks,
        verbose=verbose,
        shuffle=True
    )
    return history


history = fit_model(
    model,
    X_train, y_train,
    X_val, y_val,
    learning_rate,
    batch_size,
    num_epochs,
    callbacks,
    verbose
)

In [0]:
# use this cell for saving model and learning curves

model.save('model.h5')

with open('results/accuracy.txt', 'w+') as f:  
    f.write(str(history.history['acc']))

with open('results/val_accuracy.txt', 'w+') as f:  
    f.write(str(history.history['val_acc']))

with open('results/loss.txt', 'w+') as f:  
    f.write(str(history.history['loss']))

with open('results/val_loss.txt', 'w+') as f:  
    f.write(str(history.history['val_loss']))

### 7. Plot Learning Curves

In [0]:
def plot_learning_curves(history):
    plt.figure(figsize=[8, 6])											# loss curves
    plt.plot(history.history['loss'], 'r', linewidth=3.0)
    plt.plot(history.history['val_loss'], 'b', linewidth=3.0)
    plt.legend(['Training loss', 'Validation Loss'], fontsize=18)
    plt.xlabel('Epochs ', fontsize=16)
    plt.ylabel('Loss', fontsize=16)
    plt.title('Loss Curves', fontsize=16)
    plt.savefig('loss_plot.png')
    plt.show()

    plt.figure(figsize=[8, 6])											# accuracy curves
    plt.plot(history.history['acc'], 'r', linewidth=3.0)
    plt.plot(history.history['val_acc'], 'b', linewidth=3.0)
    plt.legend(['Training Accuracy', 'Validation Accuracy'], fontsize=18)
    plt.xlabel('Epochs ', fontsize=16)
    plt.ylabel('Accuracy', fontsize=16)
    plt.title('Accuracy Curves', fontsize=16)
    plt.savefig('acc_plot.png')
    plt.show()
    plt.close()


plot_learning_curves(history)