# MNIST LSTMConv2D Example
Adapted from [github: TensorFlow Examples](https://github.com/aymericdamien/TensorFlow-Examples/blob/master/notebooks/3_NeuralNetworks/recurrent_network.ipynb).

Uses a custom LSTM cell that implements the LSTMConv2D op like in Keras.


# TODO

- Batching of input data
- Visualization (tt.visualization), e.g. draw image
- Metric: Round values, percentage of correctly "classified"
- RNNConv2dCell: Initial-state: conv_step ?

In [None]:
from __future__ import print_function

import os
import sys

# add path to libraries for ipython
sys.path.append(os.path.expanduser("~/libs"))

import random
import numpy as np
import tensorflow as tf
import tensortools as tt

In [None]:
BATCH_SIZE = 128
NUM_EXAMPES = 10000
LEARGNING_RATE = 0.001
DISPLAY_STEP = 10

TIME_STEPS = 10
KERNEL_FILTERS = 1
KERNEL_SIZE = 3
IMAGE_SIZE = 28
CHANNELS = 1
EPOCHS = 10

### Input data

In [None]:
def print_image(image):
    print('----------')
    for y in xrange(IMAGE_SIZE):
        for x in xrange(IMAGE_SIZE):
            val = image[y, x, 0]
            sign = '.'
            if (val > 0.25):
                sign = 'x'
            if (val > 0.5):
                sign = 'X'
            if (val < 0.25):
                sign = 'o'
            if (val < 0.5):
                sign = 'O'    
            print(sign, end='')
        print()

In [None]:
def generate_image(positions):
    image = np.zeros((IMAGE_SIZE, IMAGE_SIZE, 1))

    for p_idx in xrange(len(positions)):
        position = positions[p_idx]
        for x in xrange(position[0]-2, position[0] + 4):
            if x < 0 or x >= IMAGE_SIZE:
                continue
            for y in xrange(position[1] - 2, position[1] + 4):
                if y < 0 or y >= IMAGE_SIZE:
                    continue
                image[y, x, 0] = 1.0

    return image

In [None]:
def generate_single_sequence():
    x_pos1 = random.randint(0, IMAGE_SIZE)
    y_pos1 = random.randint(0, IMAGE_SIZE)
    x_step1 = random.randint(-2, 3)
    y_step1 = random.randint(-2, 3)
    x_pos2 = random.randint(0, IMAGE_SIZE)
    y_pos2 = random.randint(0, IMAGE_SIZE)
    x_step2 = random.randint(-2, 3)
    y_step2 = random.randint(-2, 3)
    x_pos3 = random.randint(0, IMAGE_SIZE)
    y_pos3 = random.randint(0, IMAGE_SIZE)
    x_step3 = random.randint(-2, 3)
    y_step3 = random.randint(-2, 3)
    current_list = []
    for t in xrange(TIME_STEPS):
        pos_list = [(x_pos1, y_pos1), (x_pos2, y_pos2), (x_pos3, y_pos3)]
        image = generate_image(pos_list)
        current_list.append(image)
        x_pos1 += x_step1
        y_pos1 += y_step1
        x_pos2 += x_step2
        y_pos2 += y_step2
        x_pos3 += x_step3
        y_pos3 += y_step3
    pos_list = [(x_pos1, y_pos1), (x_pos2, y_pos2), (x_pos3, y_pos3)]
    label = generate_image(pos_list)
    return np.array(current_list), label

In [None]:
def generate_data(nb_train=600, nb_test=100):
    train_data_list = []
    train_labels_list = []
    test_data_list = []
    test_labels_list = []

    for i in xrange(nb_train):
        seq, label = generate_single_sequence()
        train_data_list.append(seq)
        train_labels_list.append(label)
    train_data_list = np.asarray(train_data_list)
    train_labels_list = np.asarray(train_labels_list)

    for j in xrange(nb_test):
        seq, label = generate_single_sequence()
        test_data_list.append(seq)
        test_labels_list.append(label)
    test_data_list = np.asarray(test_data_list)
    test_labels_list = np.asarray(test_labels_list)

    return (train_data_list, train_labels_list), (test_data_list, test_labels_list)

In [None]:
def inputs():
    # the data, shuffled and split between train and test sets
    (X_train, y_train), (X_test, y_test) = generate_data()
    print('')
    print('X_train shape:', X_train.shape)
    print('y_train shape:', y_train.shape)
    print('X_test shape:', X_test.shape)
    print('y_test shape:', y_test.shape)
    print(X_train.shape[0], 'train samples')
    print(X_test.shape[0], 'test samples')

    Y_train = y_train
    Y_test = y_test

    print(Y_train.shape[0], 'train labels')
    print(Y_test.shape[0], 'test labels')

    return (X_train, Y_train), (X_test, Y_test)

### Graph construction

In [None]:
g = tf.Graph()

In [None]:
def RNN(x):
    # Prepare data shape to match `rnn` function requirements
    # Current data input shape: (batch_size, n_steps, n_input)
    # Required shape: 'n_steps' tensors list of shape (batch_size, n_input)
    print(x.get_shape())
    # Permuting batch_size and n_steps
    x = tf.transpose(x, [1, 0, 2, 3, 4])  # (time_steps, batch_size, )
    print(x.get_shape())
    # Reshaping to (n_steps*batch_size, n_input)
    #x = tf.reshape(x, [-1, N_INPUT])
    # Split to get a list of 'n_steps' tensors of shape (batch_size, n_input)
    x = tf.split(0, TIME_STEPS, x)
    print(x[0].get_shape())
    x = [tf.squeeze(i, (0,)) for i in x]
    print(x[0].get_shape())

    # Define a lstm cell with tensorflow
    lstm_cell = tt.recurrent.BasicLSTMConv2DCell(3, 3, 32,
                                                 IMAGE_SIZE, IMAGE_SIZE,
                                                 forget_bias=1.0)

    # Get lstm cell output
    outputs, states = tt.recurrent.rnn_conv2d(lstm_cell, x, dtype=tf.float32)

    # Linear activation, using rnn inner loop last output
    return outputs[-1]

In [None]:
with g.as_default():
    x = tf.placeholder(tf.float32, [None, TIME_STEPS, IMAGE_SIZE, IMAGE_SIZE, CHANNELS], "X")
    y_ = tf.placeholder(tf.float32, [None, IMAGE_SIZE, IMAGE_SIZE, CHANNELS], "Y_")

    out = RNN(x)
    
    pred = tt.network.conv2d_transpose("Deconv", out, 1, 3, 3, 1, 1)

In [None]:
with g.as_default():
    with tf.name_scope("Train"):
        cost = tf.reduce_mean(tf.square(pred - y_))
        optimizer = tf.train.AdamOptimizer(learning_rate=LEARGNING_RATE).minimize(cost)

    with tf.name_scope("Accuracy"):
        correct_pred = tf.equal(tf.argmax(pred,1), tf.argmax(y_,1))
        accuracy = tf.reduce_mean(tf.cast(correct_pred, tf.float32))    

In [None]:
with g.as_default():
    # Launch the graph
    with tf.Session() as sess:
        sess.run(tf.initialize_all_variables())
        
        tt.visualization.show_graph(sess.graph_def)
        
        # TODO: remove
        train, test = inputs()
        batch_x = train[0][0:BATCH_SIZE]
        batch_y = train[1][0:BATCH_SIZE] 
        
        step = 1
        # Keep training until reach max iterations
        while step * BATCH_SIZE < NUM_EXAMPES:
            # TODO: implement real batching! Here: overfitting on same samples
            #batch_x, batch_y = mnist.train.next_batch(BATCH_SIZE)
            #batch_x = train[0][0:BATCH_SIZE]
            #batch_y = train[1][0:BATCH_SIZE] 
            
            # Run optimization op (backprop)
            sess.run(optimizer, feed_dict={x: batch_x, y_: batch_y})
            if step % DISPLAY_STEP == 0:
                # Calculate batch accuracy
                acc = sess.run(accuracy, feed_dict={x: batch_x, y_: batch_y})
                # Calculate batch loss
                loss = sess.run(cost, feed_dict={x: batch_x, y_: batch_y})
                print("@{}: Minibatch Loss= {:.6f}, Training Accuracy= {:.5f}".format(
                        str(step*BATCH_SIZE), loss, acc))
                
            step += 1
        print("Optimization Finished!")

        test_x = test[0][0]
        test_y = test[1][0] 
        
        print('  --> IN:')
        for i in range(TIME_STEPS):
            print_image(test_x[i])
        print('  --> SOLL-OUT:')
        print_image(test_y)
        print('  --> IS-OUT:')
        test_x = np.expand_dims(test_x, axis=0)
        test_y = np.expand_dims(test_y, axis=0)
        prediction, acc = sess.run([pred, accuracy], feed_dict={x: test_x, y_: test_y})
        print('Test accuracy', acc)
        prediction_squeezed = np.squeeze(prediction, axis=0)
        print_image(prediction_squeezed)