In [None]:
from __future__ import print_function
from keras.callbacks import LambdaCallback, ModelCheckpoint, EarlyStopping
from keras.models import Sequential
from keras.optimizers import RMSprop, Adam
from keras.utils.data_utils import get_file
from keras import preprocessing
from keras import backend as K
from keras.utils import Sequence
from keras.models import load_model
from keras.metrics import categorical_accuracy
#from sklearn import model_selection
import numpy as np
import random
import sys
import io
from utils import *

In [None]:
in_path = "text/your_text.txt"
out_path = "text/your_text_clean.txt"

In [None]:
# sanitize clean txt
charset = ['\n', ' ', '!', "'", ',', '-', '.', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9',
           '?', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 
           'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z']

text_cleaner(in_path, charset, out_path)

In [None]:
# load clean text from file
with io.open(out_path, encoding='utf-8') as f:
    textclean = f.read().lower()
print('corpus length:', len(textclean))

In [None]:
# Vectorise the corpus and create a dictionary
# Character-based: each independent character constitutes a "word"
text2words = [word for word in textclean]
wordlist = sorted(list(set(text2words)))
print("Total words in corpus: ", len(text2words))
print("Dictionary length: ", len(wordlist))

In [None]:
# maps words <-> indices
word_indices = dict((w, i) for i, w in enumerate(wordlist))
indices_word = dict((i, w) for i, w in enumerate(wordlist))

In [None]:
# Generate the sentences of length maxlen that will make up our dataset
# The "y" for each training sentence is the word that immediately follows
maxlen = 40
step = 1
sentences = []
next_words = []
for i in range(0, len(text2words) - maxlen, step):
    sentences.append(text2words[i: i + maxlen])
    next_words.append(text2words[i + maxlen])
print('number of sentences:', len(sentences))

In [None]:
def train_test_splitter(X, Y, percentage):
    
    indices = np.arange(len(X))
    np.random.shuffle(indices)
    
    test_size = int(percentage * len(X))
    test_indices = indices[0:test_size]
    train_indices = indices[test_size:]
    
    X_train = [X[i] for i in train_indices]
    X_test = [X[i] for i in test_indices]
    
    Y_train = [Y[i] for i in train_indices]
    Y_test = [Y[i] for i in test_indices]
    
    return X_train, Y_train, X_test, Y_test

In [None]:
sentences_train, next_words_train, sentences_test, next_words_test = train_test_splitter(sentences, next_words, 0.01)

In [None]:
# Parameters for the generator
params = {'dim': (maxlen, len(wordlist)),
          'batch_size': 128,
          'n_classes': 6, # not used here
          'n_channels': 1, # not used here
          'shuffle': True}

In [None]:
# Based on: https://stanford.edu/~shervine/blog/keras-how-to-generate-data-on-the-fly

class DataGenerator(Sequence):
    'Generates data for Keras'
    def __init__(self, sentences, next_words, batch_size=64, dim=(maxlen, len(wordlist)), n_channels=1,
                 n_classes=10, shuffle=True):
        'Initialization'
        self.dim = dim
        self.batch_size = batch_size
        self.next_words = next_words
        self.sentences = sentences
        self.n_channels = n_channels
        self.n_classes = n_classes
        self.shuffle = shuffle
        self.on_epoch_end()

    def __len__(self):
        'Denotes the number of batches per epoch'
        return int(np.floor(len(self.sentences) / self.batch_size))
        
    
    def __getitem__(self, index):
        'Generate one batch of data'
        # Generate indexes of the batch
        indexes = self.indexes[index*self.batch_size:(index+1)*self.batch_size]

        # Find list of IDs
        sentences_temp = [self.sentences[k] for k in indexes]
        next_words_temp = [self.next_words[k] for k in indexes]

        # Generate data
        X, y = self.__data_generation(sentences_temp, next_words_temp)

        return X, y

    def on_epoch_end(self):
        'Updates indexes after each epoch'
        self.indexes = np.arange(len(self.sentences))
        if self.shuffle == True:
            np.random.shuffle(self.indexes)

    def __data_generation(self, sentences_temp, next_words_temp):
        'Generates data containing batch_size samples' # X : (n_samples, *dim, n_channels)
        # Initialization
        X = np.zeros((self.batch_size, *self.dim), dtype=np.bool)
        y = np.zeros((self.batch_size, len(wordlist)), dtype=np.bool)
        
        wordindex = 0
                 
        for i in range(self.batch_size):
            for t, w in enumerate(sentences_temp[i]):
                X[i, t, word_indices[w]] = 1
            y[i, word_indices[next_words_temp[wordindex]]] = 1

            wordindex += 1
            if wordindex == len(sentences):
                wordindex = 0

        return X, y

In [None]:
# One-hot encoder

def encoder(X, Y, word_indices):
    X_enc = np.zeros((len(X), maxlen, len(wordlist)), dtype = np.bool)
    Y_enc = np.zeros((len(X), len(wordlist)))
    
    wordindex = 0
    
    for i in range(len(X)):
            for t, w in enumerate(X[i]):
                X_enc[i, t, word_indices[w]] = 1
            Y_enc[i, word_indices[Y[wordindex]]] = 1

            wordindex += 1
            #if wordindex == len(sentences):
                #wordindex = 0

    return X_enc, Y_enc

In [None]:
sentences_test_oh, next_words_test_oh = encoder(sentences_test, next_words_test, word_indices)

In [None]:
# build the model: 2 LSTMs with dropout, Adam optimization
print("Building the model")
model = Sequential()
model.add(LSTM(128, return_sequences=True, input_shape=(maxlen, len(wordlist))))
model.add(Dropout(0.3))
model.add(LSTM(128))
model.add(Dropout(0.3))
model.add(Dense(len(wordlist)))
model.add(Activation('softmax'))
optimizer = Adam(lr=0.001)
model.compile(loss='categorical_crossentropy', optimizer=optimizer, metrics = [categorical_accuracy])
print("Done!")

In [None]:
model.summary()

In [None]:
# Helper function to sample an index from the probability output
def sample(preds, temperature=1.0):
    preds = np.asarray(preds).astype('float64')
    preds = np.log(preds) / temperature
    exp_preds = np.exp(preds)
    preds = exp_preds / np.sum(exp_preds)
    probas = np.random.multinomial(1, preds, 1)
    return np.argmax(probas)

In [None]:
# Prints a test run after each epoch
# "Diversity" is the temperature, or how far the sampling can stray from the most likely choice
def on_epoch_end(epoch, _):
    # Function invoked at end of each epoch. Prints generated text.
    print()
    print('----- Generating text after Epoch: %d' % epoch)

    start_index = random.randint(0, len(text2words) - maxlen - 1)
    for diversity in [0.2, 0.3, 0.5, 1]:
        print('----- diversity:', diversity)

        generated = ''
        sentence = text2words[start_index: start_index + maxlen]
        
        generated = ''.join(sentence)

        print('----- Generating with seed: "' + ''.join(sentence) + '"')
        sys.stdout.write(generated)

        for i in range(40):
            x_pred = np.zeros((1, maxlen, len(wordlist)))
            for t, word in enumerate(sentence):
                x_pred[0, t, word_indices[word]] = 1.

            preds = model.predict(x_pred, verbose=0)[0]
            next_index = sample(preds, diversity)
            next_word = indices_word[next_index]

            #generated += ' '
            generated += str(next_word)
            sentence = sentence[1:]
            sentence.append(next_word)

            sys.stdout.write(next_word)
            sys.stdout.flush()
        print()

In [None]:
print_callback = LambdaCallback(on_epoch_end=on_epoch_end)
checkpoint_callback = ModelCheckpoint("your_model_name.h5", monitor='loss', verbose=1, save_best_only=True, mode='min')
training_generator = DataGenerator(sentences_train, next_words_train, **params)

In [None]:
#model = load_model(your_model.h5")

In [None]:
model.fit_generator(generator=training_generator, epochs=10,
                    use_multiprocessing=False,
                    workers=10,
                   callbacks=[print_callback, checkpoint_callback],
                   validation_data=(sentences_test_oh, next_words_test_oh))

In [None]:
#model.save(your_model.h5')

In [None]:
# Feeds some input text to the trained model, and outputs its completion

def inputsample(input_text, div = [0.3], num=140):
    
    sentence = [letter for letter in input_text]
    
    for diversity in div:
        sentence = [letter for letter in input_text]
        
        print("Diversity", diversity, "\n")
        
        generated = ''.join(sentence)
        # zero pad the sentence to maxlen characters.
        while len(sentence) < maxlen:
            sentence.append('éàé')

        #sys.stdout.write(input_text)
        for i in range(num):

            x_pred = np.zeros((1, maxlen, len(wordlist)))

            for t, word in enumerate(sentence):
                if word != 'éàé':
                    x_pred[0, t, word_indices[word]] = 1.

            preds = model.predict(x_pred, verbose=0)[0]
            next_index = sample(preds, diversity)
            next_word = indices_word[next_index]

            
            generated += str(next_word)
            sentence = sentence[1:]
            sentence.append(next_word)

            #sys.stdout.write(next_word)
            #sys.stdout.flush()
        print(generated)
        print("\n -------------------")

In [None]:
# Enter some input text (up to 40 characters including spaces)
input_text = "i believe that"
# Choose an array of diversities to try out
div = [0.3]
# Choose length of the output text
length = 140
inputsample(input_text, div, length)