# LSTM by Example using Tensorflow
https://towardsdatascience.com/lstm-by-example-using-tensorflow-feb0c1968537

In [1]:
import numpy as np
import tensorflow as tf
from tensorflow.contrib import rnn
import random
import collections
import time

In [2]:
def elapsed(sec):
    if sec<60:
        return str(sec) + " sec"
    elif sec<(60*60):
        return str(sec/60) + " min"
    else:
        return str(sec/(60*60)) + " hr"

In [3]:
def get_config():
    return LSTMConfig

class LSTMConfig:
    # Input / output
    training_file = 'LSTM_by_Example_data/belling_the_cat.txt'
    output_dir = 'LSTM_by_Example_output'
    model_file = output_dir + '/LSTM_by_Example_model'
    
    # Parameters
    learning_rate = 0.001
    training_iters = 50000
    display_step = 1000
    n_input = 3

    # number of units in RNN cell
    n_hidden = 512
    
    # Use two layer RNN cells
    two_layer = False
    
    predictor_name = "rnn_predictor"

In [4]:
def read_data(fname):
    with open(fname) as f:
        content = f.readlines()
    content = [x.strip() for x in content]
    content = [content[i].split() for i in range(len(content))]
    content = np.array(content)
    content = np.reshape(content, [-1, ])
    return content

def build_dataset(words):
    count = collections.Counter(words).most_common()
    dictionary = dict()
    for word, _ in count:
        dictionary[word] = len(dictionary)
    reverse_dictionary = dict(zip(dictionary.values(), dictionary.keys()))
    return dictionary, reverse_dictionary

In [5]:
def RNN(config, x, weights, biases):

    # reshape to [1, n_input]
    x = tf.reshape(x, [-1, config.n_input])

    # Generate a n_input-element sequence of inputs
    # (eg. [had] [a] [general] -> [20] [6] [33])
    x = tf.split(x, config.n_input, 1)

    if config.two_layer:
        # 2-layer LSTM, each layer has n_hidden units.
        # Average Accuracy= 95.20% at 50k iter
        rnn_cell = rnn.MultiRNNCell([rnn.BasicLSTMCell(config.n_hidden), rnn.BasicLSTMCell(config.n_hidden)])
    else:
        # 1-layer LSTM with n_hidden units but with lower accuracy.
        # Average Accuracy= 90.60% 50k iter
        rnn_cell = rnn.BasicLSTMCell(config.n_hidden)

    # generate prediction
    outputs, states = rnn.static_rnn(rnn_cell, x, dtype=tf.float32)

    # there are n_input outputs but
    # we only want the last output
    return tf.add(tf.matmul(outputs[-1], weights['out']), biases['out'], name=config.predictor_name)

In [6]:
class LSTMModel(object):

    def __init__(self, config, vocab_size):
        # tf Graph input
        x = tf.placeholder("float", [None, config.n_input, 1], name='x')
        y = tf.placeholder("float", [None, vocab_size])

        # RNN output node weights and biases
        weights = {
            'out': tf.Variable(tf.random_normal([config.n_hidden, vocab_size]))
        }
        biases = {
            'out': tf.Variable(tf.random_normal([vocab_size]))
        }
    
        pred = RNN(config, x, weights, biases)

        # Loss and optimizer
        cost = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(logits=pred, labels=y))
        optimizer = tf.train.RMSPropOptimizer(learning_rate=config.learning_rate).minimize(cost)

        # Model evaluation
        correct_pred = tf.equal(tf.argmax(pred,1), tf.argmax(y,1))
        accuracy = tf.reduce_mean(tf.cast(correct_pred, tf.float32))
        
        # TODO: cleanup
        self.x = x
        self.y = y
        
        self.pred = pred
        self.cost = cost
        self.optimizer = optimizer
        self.accuracy = accuracy

In [7]:
def train(session, model, config, training_data):
    step = 0
    offset = random.randint(0, config.n_input+1)
    end_offset = config.n_input + 1
    acc_total = 0
    loss_total = 0
    
    # save meta model and prepare saver for state
    tf.train.export_meta_graph(filename=config.model_file)
    saver = tf.train.Saver()
    
    start_time = time.time()

    while step < config.training_iters:
        # Generate a minibatch. Add some randomness on selection process.
        if offset > (len(training_data)-end_offset):
            offset = random.randint(0, config.n_input+1)

        symbols_in_keys = [ [dictionary[ str(training_data[i])]] for i in range(offset, offset+config.n_input) ]
        symbols_in_keys = np.reshape(np.array(symbols_in_keys), [-1, config.n_input, 1])

        symbols_out_onehot = np.zeros([vocab_size], dtype=float)
        symbols_out_onehot[dictionary[str(training_data[offset+config.n_input])]] = 1.0
        symbols_out_onehot = np.reshape(symbols_out_onehot,[1,-1])

        _, acc, loss, onehot_pred = session.run([model.optimizer, model.accuracy, model.cost, model.pred], \
                                                feed_dict={model.x: symbols_in_keys, model.y: symbols_out_onehot})
        loss_total += loss
        acc_total += acc
        if (step+1) % config.display_step == 0:
            print("Iter= " + str(step+1) + ", Average Loss= " + \
                  "{:.6f}".format(loss_total/config.display_step) + ", Average Accuracy= " + \
                  "{:.2f}%".format(100*acc_total/config.display_step))
            acc_total = 0
            loss_total = 0
            symbols_in = [training_data[i] for i in range(offset, offset + config.n_input)]
            symbols_out = training_data[offset + config.n_input]
            symbols_out_pred = reverse_dictionary[int(tf.argmax(onehot_pred, 1).eval())]
            print("%s - [%s] vs [%s]" % (symbols_in,symbols_out,symbols_out_pred))
            # save the current state
            saver.save(session, config.model_file,global_step=step+1, write_meta_graph=False)
        step += 1
        offset += (config.n_input + 1)
        
    print("Optimization Finished!")
    print("Elapsed time: ", elapsed(time.time() - start_time))

In [None]:
config = get_config()

#writer = tf.summary.FileWriter(config.output_dir)

training_data = read_data(config.training_file)
dictionary, reverse_dictionary = build_dataset(training_data)

vocab_size = len(dictionary)

model = LSTMModel(config, vocab_size)

# Initializing the variables
init = tf.global_variables_initializer()

# Launch the graph
with tf.Session() as session:
    session.run(init)
    
    #writer.add_graph(session.graph)
    
    train(session, model, config, training_data)

Iter= 1000, Average Loss= 5.949724, Average Accuracy= 3.60%
['nobody', 'spoke', '.'] - [then] vs [cat]


In [None]:
def test(session, config, dictionary, reverse_dictionary):
    length_of_sentence_to_produce = 10
    
    graph = tf.get_default_graph()
    pred = graph.get_tensor_by_name("rnn_predictor:0")
    x = graph.get_tensor_by_name("x:0") 
    
    try_again = "y"
    
    while try_again == "y":
        prompt = "%s words: " % config.n_input
        sentence = input(prompt)
        sentence = sentence.strip()
        words = sentence.split(' ')
        if len(words) != config.n_input:
            print("Wrong number of words")
            continue
        try:
            symbols_in_keys = [dictionary[str(words[i])] for i in range(len(words))]
            for i in range(length_of_sentence_to_produce):
                keys = np.reshape(np.array(symbols_in_keys), [-1, config.n_input, 1])
                onehot_pred = session.run(pred, feed_dict={x: keys})
                onehot_pred_index = int(tf.argmax(onehot_pred, 1).eval())
                sentence = "%s %s" % (sentence,reverse_dictionary[onehot_pred_index])
                symbols_in_keys = symbols_in_keys[1:]
                symbols_in_keys.append(onehot_pred_index)
                print(sentence)
        except Exception as e:
            print(e)
            
        try_again = input("Type 'y' to try again ")

In [None]:
import tensorflow as tf
import numpy as np
import collections

config = get_config()

training_data = read_data(config.training_file)
dictionary, reverse_dictionary = build_dataset(training_data)

new_graph = tf.Graph() # see https://github.com/tensorflow/tensorflow/issues/4603
with tf.Session(graph=new_graph) as session:
    saver = tf.train.import_meta_graph(config.model_file)
    saver.restore(session, tf.train.latest_checkpoint(config.output_dir))
    
    test(session, config, dictionary, reverse_dictionary)