This code was based on the example Keras script for LTSM https://github.com/fchollet/keras/blob/master/examples/lstm_text_generation.py

The key differences are
- Generation is not based on a sequence from the input text, it's based on a random starting character.
- Various possible hidden architectures
- Save best models throughout training
- Patience/early stopping while training

In [34]:
# Various required inputs
from keras.models import Sequential
from keras.layers import Dense, Activation
from keras.layers import LSTM, GRU
from keras.optimizers import RMSprop, Adam
import numpy as np
import random
import sys
import string
import os

In [35]:
# Prints the text generations to a file
out_file = "output/onelayer-full-gru.txt"
out = open(out_file, 'w')

# Where to save the best weights
weights_path = "weights/gru_full_weights.h5"

# The file to use for training
file_name = "data/full.txt"
text = open(file_name).read().replace('\r', '').replace('\n', '').lower() # Do not worry about case sensitivity and remove new line chars

# Extract unique set of chars from the text
chars = sorted(list(set(text)))

In [36]:
# Mapping from char -> int and back
char_indices = dict((c, i) for i, c in enumerate(chars))
indices_char = dict((i, c) for i, c in enumerate(chars))

# Split the text into max_length chunks
max_length = 40
step = 3
sentences = [] # This will be treated as the input
next_chars = [] # This will be treated as the output/prediction

for i in range(0, len(text) - max_length, step):
    sentences.append(text[i: i + max_length])
    next_chars.append(text[i + max_length])

# Vectorize the inputs and outputs
x = np.zeros((len(sentences), max_length, len(chars)), dtype=np.bool)
y = np.zeros((len(sentences), len(chars)), dtype=np.bool)

for i, sentence in enumerate(sentences):
    for t, char in enumerate(sentence):
        x[i, t, char_indices[char]] = 1
    y[i, char_indices[next_chars[i]]] = 1

In [None]:
# Hidden layer(s) options
def onelayer_LSTM():
    model.add(LSTM(128, input_shape=(max_length, len(chars)), dropout=dropout_rate))

def threelayer_LSTM():
    model.add(LSTM(32, return_sequences=True, input_shape=(max_length, len(chars)), dropout=dropout_rate))
    model.add(LSTM(32, return_sequences=True, dropout=dropout_rate))
    model.add(LSTM(32, dropout=dropout_rate))

def onelayer_GRU():
    model.add(GRU(128, input_shape=(max_length, len(chars)), dropout=dropout_rate))

def threelayer_GRU():
    model.add(GRU(32, return_sequences=True, input_shape=(max_length, len(chars)), dropout=dropout_rate))
    model.add(GRU(32, return_sequences=True, dropout=dropout_rate))
    model.add(GRU(32, dropout=dropout_rate))

In [37]:
# Build the model. Calll the appropriate function based on desired architecture
model = Sequential()

# Dropout rate for hidden layers. Set to zero for no dropout
dropout_rate = 0.2

# Swap line below to change hidden architecture
#onelayer_LSTM()
onelayer_GRU()
#threelayer_LSTM()
#threelayer_GRU()

# Output layer, softmax probabilities for each character
model.add(Dense(len(chars)))
model.add(Activation('softmax'))

#optimizer = RMSProp(lr=0.1)
#optimizer = Adam(lr=0.01)
model.compile(loss='categorical_crossentropy', optimizer="adam")

In [38]:
# helper function to sample an index from a probability array
def sample(preds, temperature=1.0):
    preds = np.asarray(preds).astype('float64')
    preds = np.log(preds) / temperature
    exp_preds = np.exp(preds)
    preds = exp_preds / np.sum(exp_preds)
    probas = np.random.multinomial(1, preds, 1)
    return np.argmax(probas)

def generate_text(model, diversity, start, length):
    generated = start
    print('----- Generating with starting token: "' + generated + '"')

    for i in range(length):
        # Use only the last max_length characters for prediction
        recent_chars = generated[- max_length:]

        x_pred = np.zeros((1, max_length, len(chars)))
        for t, char in enumerate(recent_chars):
            x_pred[0, t, char_indices[char]] = 1.

        # Predict the next character in turn
        preds = model.predict(x_pred, verbose=0)[0]
        next_index = sample(preds, diversity)
        # The next character generated/predicted
        next_char = indices_char[next_index]
        generated += next_char
    
    out.write("\t\t"+generated+"\n")
    print(generated)
    print()
    
def generate(model, diversity, length):
    start = random.choice(string.ascii_lowercase)
    generate_text(model, diversity, start, length)
    
def generate_diverse(model, length):
    for diversity in [0.01, 0.2, 0.5, 1.0]:
        print()
        print('----- diversity:', diversity)
        out.write("\tDiversity: " + str(diversity) +"\n")

        # Use a length of 140, match twitters limit so we can generate pretend tweets
        generate(model, diversity, length)

In [None]:
epochs = 500 # Number of epochs to run
best_loss = float('inf') # Keep track of best loss, start at infinity
patience = 20 # Number of iterations without progress
no_improvement = 0 # Number of current iterations without improvement

# Train the model, output generated text after each iteration. Be sure to give enough iterations to adequately learn (20+ minimum)
for iteration in range(1, epochs + 1):
    print()
    print('-' * 50)
    print('Iteration', iteration)
    
    # Only one epoch here, as the actual epochs are controlled by the iteration loop so we have additional control
    hist = model.fit(x, y,
              batch_size=128,
              epochs=1)
    
    loss = hist.history['loss'][-1]
        
    if loss < best_loss:
        best_loss = loss
        model.save_weights(weights_path)
        no_improvement = 0
    else:
        no_improvement += 1
    
    # If we havent improved for patience generations, then stop early 
    if no_improvement > patience:
        # Could look at trying other things such as decreasing learning rate
        break
    
    # Unlikely - but if we achieve a zero loss we are finished
    if loss == 0:
        break

    # Print out some text generations every 10 iterations, so we can monitor how the model is progressing
    if iteration % 10 == 0:
        out.write("Iteration: " + str(iteration) + "\n")
        
        for i in range(5):
            generate_diverse(model, 140)
        
        # Force the file write
        out.flush()
        os.fsync(out.fileno())

In [None]:
# Load the model that had lowest loss
model.load_weights(weights_path)

In [None]:
# Generate some samples with this best model
out.write("\n----------------\n")
out.write("Final Generations\n")

for i in range(50):
    generate_diverse(model, 140)
        
out.close()