# Test Generation using a Bidirectional LSTM Network

<br/>

In [9]:
from __future__ import print_function
#import Keras library
from keras.models import Sequential, Model
from keras.layers import Dense, Activation, Dropout
from keras.layers import LSTM, Input, Bidirectional
from keras.optimizers import Adam
from keras.callbacks import EarlyStopping
from keras.metrics import categorical_accuracy

#import spacy, and spacy french model
# spacy is used to work on text
import spacy
nlp = spacy.load('es_core_news_sm')

#import other libraries
import numpy as np
import random
import sys
import os
import time
import codecs
import collections
from six.moves import cPickle
import pandas as pd

In [40]:
#define parameters used in the tutorial
data_dir = 'data/'# data directory containing raw texts
save_dir = 'vocab' # directory to store trained NN models
file_list = ["spanish_emojis"]
vocab_file = os.path.join(save_dir, "words_vocab.pkl")
words_list_file = os.path.join(save_dir, "words_list.pkl")
sequences_step = 1 #step to create sequences
seq_length = 10 # sequence length
preprocess = False

In [151]:
def read_file(filename):
    items = []
    if os.path.exists(filename):
        # try:
        with open(filename, 'rb') as fname:
            while True:
                try:
                    items = cPickle.load(fname)
                except EOFError:
                    print(EOFError)
                    break
    else:
        items = []
    
    return items

In [11]:
def create_wordlist(doc):
    wl = []
    for word in doc:
        if word.text not in ("\n","\n\n",'\u2009','\xa0'):
            wl.append(word.text.lower())
    return wl

In [24]:
if(preprocess):
    wordlist = []
    input_file = pd.read_csv(os.path.join(data_dir, file_list[0] + ".csv"))
    lines = input_file['observations']
    for data in lines:
        #create sentences
        doc = nlp(data)
        wl = create_wordlist(doc)
        wordlist = wordlist + wl

In [41]:
if(preprocess):
    # count the number of words
    word_counts = collections.Counter(wordlist)

    # Mapping from index to word : that's the vocabulary
    vocabulary_inv = [x[0] for x in word_counts.most_common()]
    vocabulary_inv = list(sorted(vocabulary_inv))

    # Mapping from word to index
    vocab = {x: i for i, x in enumerate(vocabulary_inv)}
    words = [x[0] for x in word_counts.most_common()]

    #size of the vocabulary
    vocab_size = len(words)
    print("vocab size: ", vocab_size)

    #save the words and vocabulary
    with open(os.path.join(vocab_file), 'wb') as f:
        cPickle.dump((words, vocab, vocabulary_inv), f)
    with open(os.path.join(words_list_file), 'wb') as f:
        cPickle.dump((wordlist), f)

vocab size:  66846


In [None]:
if(!preprocess):
    wordlist = read_file(words_list_file)
    (words, vocab, vocabulary_inv) = read_file(vocab_file)
    
    # count the number of words
    word_counts = collections.Counter(wordlist)
    
    #size of the vocabulary
    vocab_size = len(words)
    print("vocab size: ", vocab_size)

In [42]:
#create sequences
sequences = []
next_words = []
for i in range(0, len(wordlist) - seq_length, sequences_step):
    sequences.append(wordlist[i: i + seq_length])
    next_words.append(wordlist[i + seq_length])

print('nb sequences:', len(sequences))

nb sequences: 1443918


In [43]:
X = np.zeros((len(sequences), seq_length, vocab_size), dtype=np.bool)
y = np.zeros((len(sequences), vocab_size), dtype=np.bool)
for i, sentence in enumerate(sequences):
    for t, word in enumerate(sentence):
        X[i, t, vocab[word]] = 1
    y[i, vocab[next_words[i]]] = 1

MemoryError: 

In [None]:
def bidirectional_lstm_model(seq_length, vocab_size):
    print('Build LSTM model.')
    model = Sequential()
    model.add(Bidirectional(LSTM(rnn_size, activation="relu"),input_shape=(seq_length, vocab_size)))
    model.add(Dropout(0.6))
    model.add(Dense(vocab_size))
    model.add(Activation('softmax'))
    
    optimizer = Adam(lr=learning_rate)
    callbacks=[EarlyStopping(patience=2, monitor='val_loss')]
    model.compile(loss='categorical_crossentropy', optimizer=optimizer, metrics=[categorical_accuracy])
    return model

In [None]:
rnn_size = 256 # size of RNN
batch_size = 32 # minibatch size
seq_length = 30 # sequence length
num_epochs = 50 # number of epochs
learning_rate = 0.001 #learning rate
sequences_step = 1 #step to create sequences

In [None]:
md = bidirectional_lstm_model(seq_length, vocab_size)
md.summary()

In [None]:
#fit the model
callbacks=[EarlyStopping(patience=4, monitor='val_loss'),
           ModelCheckpoint(filepath=save_dir + "/" + 'my_model_gen_sentences_lstm.{epoch:02d}-{val_loss:.2f}.hdf5',\
                           monitor='val_loss', verbose=0, mode='auto', period=2)]
history = md.fit(X, y,
                 batch_size=batch_size,
                 shuffle=True,
                 epochs=num_epochs,
                 callbacks=callbacks,
                 validation_split=0.01)

In [None]:
#save the model
md.save(save_dir + "/" + 'my_model_gen_sentences_lstm.final.hdf5')

In [None]:
#load vocabulary
print("loading vocabulary...")
vocab_file = os.path.join(save_dir, "words_vocab.pkl")

with open(os.path.join(save_dir, 'words_vocab.pkl'), 'rb') as f:
        words, vocab, vocabulary_inv = cPickle.load(f)

vocab_size = len(words)

In [None]:
from keras.models import load_model
# load the model
print("loading model...")
model = load_model(save_dir + "/" + 'my_model_gen_sentences_lstm.final.hdf5')

In [None]:
def sample(preds, temperature=1.0):
    # helper function to sample an index from a probability array
    preds = np.asarray(preds).astype('float64')
    preds = np.log(preds) / temperature
    exp_preds = np.exp(preds)
    preds = exp_preds / np.sum(exp_preds)
    probas = np.random.multinomial(1, preds, 1)
    return np.argmax(probas)

In [None]:
#initiate sentences
seed_sentences = "nolan avance sur le chemin de pierre et grimpe les marches ."
generated = ''
sentence = []
for i in range (seq_length):
    sentence.append("a")

seed = seed_sentences.split()

for i in range(len(seed)):
    sentence[seq_length-i-1]=seed[len(seed)-i-1]

generated += ' '.join(sentence)
print('Generating text with the following seed: "' + ' '.join(sentence) + '"')

print ()

In [None]:
words_number = 100
#generate the text
for i in range(words_number):
    #create the vector
    x = np.zeros((1, seq_length, vocab_size))
    for t, word in enumerate(sentence):
        x[0, t, vocab[word]] = 1.
    #print(x.shape)

    #calculate next word
    preds = model.predict(x, verbose=0)[0]
    next_index = sample(preds, 0.34)
    next_word = vocabulary_inv[next_index]

    #add the next word to the text
    generated += " " + next_word
    # shift the sentence by one, and and the next word at its end
    sentence = sentence[1:] + [next_word]

print(generated)