In [None]:
import numpy as np
import tensorflow as tf
import samples as ls
import math


#number of dimensions used and steps when unrolling
input_dim = 6
input_steps = 5

hidden_layers = [50,10]

#straight has to be number 0!
labels = ['straight','curve_m','wall','object']
classes = len(labels)

#percentage used for test data
test_rate = 0.1

learning_rate = 0.01

In [None]:
samples = ls.Samples(labels)

for label in labels:
    samples.load_samples(label = label)
    
samples.convert_to_input(size = 20)
samples.unroll(input_steps)
samples.split_test(test_rate)

In [None]:
tf.logging.set_verbosity(tf.logging.ERROR)
tf.reset_default_graph()

x = tf.placeholder(tf.float32, [None, input_steps, input_dim])
y = tf.placeholder(tf.float32, [None, classes])

Ws = []
for i in range(len(hidden_layers)-1):
    W = tf.Variable(tf.random_normal([hidden_layers[i], hidden_layers[i+1]]))
    b = tf.Variable(tf.zeros(hidden_layers[i+1]))
    Ws.append((W,b))
W = tf.Variable(tf.random_normal([hidden_layers[-1], classes]))
b = tf.Variable(tf.zeros([classes]))
Ws.append((W,b))

#x = [tf.placeholder(tf.float32, [None, input_dim]) for _ in range(input_steps)]
#y = tf.placeholder(tf.float32, [None, classes])
x2 = tf.unstack(x, input_steps, 1)

#rnn_cell = tf.contrib.rnn.BasicRNNCell(hidden_layer)
rnn_cell = tf.contrib.rnn.BasicLSTMCell(hidden_layers[0], forget_bias=1.0)
outputs, state = tf.contrib.rnn.static_rnn(rnn_cell, x2, dtype=tf.float32)
#outputs, state = tf.nn.dynamic_rnn(rnn_cell, x, dtype=tf.float32)
#rnn = tf.matmul(outputs[-1],Ws[0][0]) + bs[0][0]

preactivations = []
activations = []
act = outputs[-1]
for W,b in Ws:
    preact = tf.matmul(act, W) + b
    preactivations.append(preact)
    act = tf.nn.relu(preact)
    #act = tf.sigmoid(preact)
    activations.append(act)
rnn = tf.nn.softmax(preact)
activations[-1] = rnn
rnn_pred = tf.argmax(rnn, 1, name='RNN_pred')

cross_entropy = tf.reduce_mean(
    tf.nn.softmax_cross_entropy_with_logits(labels=y, logits=rnn))

optimizer = tf.train.GradientDescentOptimizer(learning_rate).minimize(cross_entropy)
#optimizer = tf.train.AdamOptimizer(learning_rate).minimize(cross_entropy)

correct_prediction = tf.equal(tf.argmax(y,1), rnn_pred)
accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))

In [None]:
sess = tf.Session()
saver = tf.train.Saver()

#summaries
def var_summary(var, name):
    mean = tf.reduce_mean(var)
    tf.summary.scalar(name + '_mean', mean)
    stddev = tf.sqrt(tf.reduce_mean(tf.square(var - mean)))
    tf.summary.scalar(name + '_stddev', stddev)
    tf.summary.scalar(name + '_max', tf.reduce_max(var))
    tf.summary.scalar(name + '_min', tf.reduce_min(var))
    tf.summary.histogram(name + '_histogram', var)
i = 0
for W,b in Ws: 
    var_summary(W, 'weights_%s' % i)
    var_summary(b, 'bias_%s' % i)
    i += 1
for i in range(len(activations)):
    tf.summary.histogram('preactivations_%s' % i, preactivations[i])
    tf.summary.histogram('activations_%s' % i, activations[i])
tf.summary.scalar('cross_entropy', cross_entropy)
tf.summary.scalar('accuracy', accuracy)
step_counter = 0

merged = tf.summary.merge_all()
train_writer = tf.summary.FileWriter('tmp/rnn/summary', sess.graph)

#init or load from checkpoint
init = tf.global_variables_initializer()
sess.run(init)
#saver.restore(sess, 'tmp/rnn/model/rnn')


In [None]:
def convert_to_onehot(labels):
    new_labels = []
    for y in labels:
        label = [0]*classes
        label[y] = 1
        new_labels.append(label)
    return new_labels

def test_all():
    xtest,ytest = samples.get_all()
    xtest = np.array(xtest)
    ytest = np.array(convert_to_onehot(ytest))
    acc = sess.run(accuracy, feed_dict={x: xtest, y: ytest})
    print("Accuracy: %s" % acc)
    
def test():
    xtest,ytest = samples.get_test()
    xtest = np.array(xtest)
    ytest = np.array(convert_to_onehot(ytest))
    acc = sess.run(accuracy, feed_dict={x: xtest, y: ytest})
    print("Accuracy: %s" % acc)

def train(steps, batch_size):
    global step_counter
    progress = 5
    for i in range(steps):
        xtrain,ytrain = samples.get_batch(batch_size)
        xtrain = np.array(xtrain)
        ytrain = np.array(convert_to_onehot(ytrain))
        summary,_ = sess.run([merged, optimizer], feed_dict={x: xtrain, y:ytrain})
        train_writer.add_summary(summary, i+step_counter)
        if i>=progress/100.0*steps-1:
            print("Progress: %s%%" % progress)
            progress += 5
            test()
            saver.save(sess, 'tmp/rnn/model/rnn', global_step=step_counter)
    step_counter += steps

In [None]:
train(10000,8)

In [None]:
def feed_data(data):
    if len(data) < input_steps: raise ValueError('Data too short')
    results = []
    for i in range(len(data) - input_steps + 1):
        batch = np.array(data[:input_steps])
        res = sess.run(rnn_pred, feed_dict={x:[batch]})
        results.append([list(res), list(batch)])
        data.pop(0)
    return results

In [None]:
#prove of memory
a = [0,0,0,0,0,0]
b = [0,2,0,5,-100,0]
res_t = []
for i in range(109):
    data1 = [b,b,b,b,b,a,a,a,a,a,a,a,a,a,a,a]
    res = feed_data(data1)
    te = []
    for r in res:
        te.append(r[0])
    res_t.append(te)
print(res_t[1:] == res_t[:-1])
print(res_t)

In [None]:
test_all()