In [1]:
##################################
#
# Initial implementation of linear logic recurrent neural network
#
# Many elements borrowed from "A noob’s guide to implementing RNN-LSTM using Tensorflow"
# see https://gist.github.com/monikkinom/e97d518fe02a79177b081c028a83ec1c
#
# The architecture is as described in deepll.pdf, the input space is one-dimensional
# (plus the bias) and we learn the function which maps a binary sequence of length N
# to the sum of its digits.

# Global flags
batch_size = 10
input_size = 1
state_size = 24
N = 12 # length of sequences
NUM_EXAMPLES = 1000 # number of training examples
epoch = 20

In [2]:
# The next three lines are recommend by TF
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import tensorflow as tf
import numpy as np
import collections
import six
import math

from random import shuffle
from tensorflow.python.ops import variable_scope as vs
from tensorflow.python.ops.rnn_cell import _linear
from tensorflow.python.ops.rnn_cell import RNNCell
from tensorflow.python.ops import array_ops
from tensorflow.python.ops import init_ops
from tensorflow.python.ops.math_ops import sigmoid
from tensorflow.python.ops.math_ops import tanh
from tensorflow.python.ops.math_ops import matmul

In [3]:
# In tf.nn.rnn_cell there is a private function _linear that 
# we have modified here to be used in our LinearLogicRNNCell

# A 2D tensor of shape [X,Y] means a matrix with X rows and Y cols
# The row index here is interpreted as indexing into a batch.
    
class LinearLogicRNNCell(RNNCell):
    
    def __init__(self, num_units, input_size=None, activation=tanh):
        if input_size is not None:
            logging.warn("%s: The input_size parameter is deprecated." % self)
        self._num_units = num_units
        self._activation = activation
    
    @property
    def state_size(self):
        return self._num_units

    @property
    def output_size(self):
        return self._num_units

    def __call__(self, inputs, state, scope=None):
        # the scope business gives a namespace to our weight variable matrix names
        # e.g. "Program", "Matrix" and "Bias"
        with vs.variable_scope(scope or "Linear"):  # "BasicRNNCell"
            # Our input is a pair of 2D tensors of shapes [B, U] and [B, V]
            # respectively, where B is the batch size, U is the dimension of the
            # input space and V is the dimension of the state space (e.g. U = 1)

            # array_ops.concat(1,args) is a Tensor of shape [B,U+V]
            # which is obtained by stacking the BxU and BxV matrices
            # together horizontally to get a Bx(U+V) matrix. We first modify
            # args by adding on the end a new matrix which contains higher
            # degree monomials in the entries of our input/state tensors. Let
            # us write (I) for the input matrix (BxU) and (S) for the state
            # matrix (BxV)       

            args = [inputs,state]
            output_size = self._num_units
            
            # every row of inputs (resp. state) has length input_size (resp state_size)
            prog_weights = vs.get_variable("Program", [(input_size+state_size)**2])
            f = lambda x: tf.mul(tf.reshape(tf.matmul(tf.transpose(x),x),[-1]),prog_weights)
            h = array_ops.concat(1, args) # (I|S)
            h2 = tf.expand_dims(h,1)
            h5 = tf.map_fn(f, h2)  
                
            # Comment out the following line to not add higher monomials
            args = args + [h5]
            
            # Calculate the total size of arguments on dimension 1.
            total_arg_size = 0
            shapes = [a.get_shape().as_list() for a in args]

            for shape in shapes:
                total_arg_size += shape[1]           
    
            matrix = vs.get_variable("Matrix", [total_arg_size, output_size]) # P    
            res = matmul(array_ops.concat(1, args), matrix)
            bias_term = vs.get_variable("Bias", [output_size], initializer=init_ops.constant_initializer(0.0))

            output = self._activation(res + bias_term)
        return output, output
        # note that as currently written the RNN emits its internal state at
        # each time step

In [4]:
# Create a shuffled list of all binary sequnces of length N
s = '{0:0' + str(N) + 'b}'
train_input = [s.format(i) for i in range(2**N)]
shuffle(train_input)
train_input = [map(int,i) for i in train_input]
ti = []
for i in train_input:
    temp_list = []
    for j in i:
        temp_list.append([j])
    ti.append(np.array(temp_list))
train_input = ti

print("Number of sequences: " + str(len(train_input)))
print(train_input[0])
# A typical element of train_input at this point will be an array like
# array([[1],[0],[1],[1],[0]])

Number of sequences: 4096
[[1]
 [0]
 [1]
 [1]
 [1]
 [0]
 [0]
 [0]
 [1]
 [1]
 [0]
 [0]]


In [5]:
# Training output
train_output = []

for i in train_input:
    count = 0
    for j in i:
        if j[0] == 1:
            count+=1
    temp_list = ([0]*(N+1))
    temp_list[count]=1
    train_output.append(temp_list)
        
# This matches every sequence in train_input with the one-hot encoded representation
# of its classification, that is, e_i where 0 \le i \le 20 and i is the number of 1s
# that appear in the sequence

print(train_output[0])

[0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0]


In [6]:
test_input = train_input[NUM_EXAMPLES:]
test_output = train_output[NUM_EXAMPLES:]
train_input = train_input[:NUM_EXAMPLES]
train_output = train_output[:NUM_EXAMPLES]

In [7]:
# Definition of the model

# inputs, we create N of them, each of shape [None,1]
inputs = [tf.placeholder(tf.float32, [None,1]) for _ in range(N)]
target = tf.placeholder(tf.float32, [None,N+1], name='target')

# We use tf.nn.rnn rather than dynamic_rnn because there appears to
# be a problem with tf.map_fn and the latter, at least in 0.10
# state_size is the number of hidden neurons in each layer
cell = LinearLogicRNNCell(state_size)

# tf.nn.rnn returns a pair, the first is a list of the
# outputs from each step, the second is the final internal state.
# The way our RNN cell is written, val will be a list of the
# internal states at each time step.
val, last_state = tf.nn.rnn(cell,inputs,dtype=tf.float32)

# We use only the final internal state, which is a 2D tensor
# of shape [batch_size,state_size]
weight = tf.Variable(tf.truncated_normal([state_size,N+1]))
bias = tf.Variable(tf.constant(0.1, shape=[N+1]))

# note that + works so that the bias is added to each row
# that is, it is the same for each element of the batch
prediction = tf.nn.softmax(tf.matmul(last_state, weight) + bias)
cross_entropy = -tf.reduce_sum(target * tf.log(prediction))

optimizer = tf.train.AdamOptimizer()
minimize = optimizer.minimize(cross_entropy)

mistakes = tf.not_equal(tf.argmax(target, 1), tf.argmax(prediction, 1))
error = tf.reduce_mean(tf.cast(mistakes, tf.float32))
                                                        

In [8]:
# Initialise the model
init_op = tf.initialize_all_variables()
sess = tf.Session()
sess.run(init_op)

# Training
no_of_batches = int(len(train_input)/batch_size)
print("Number of batches: " + str(no_of_batches))

# An annoying thing here is that we cannot use a list as a key in a 
# dictionary. The workaround we found on StackOverflow here:
# http://stackoverflow.com/questions/33684657/issue-feeding-a-list-into-feed-dict-in-tensorflow)

# epoch is a global var
for i in range(epoch):
    ptr = 0
    for j in range(no_of_batches):
        inp = train_input[ptr:ptr+batch_size]
        out = train_output[ptr:ptr+batch_size]
        ptr += batch_size
        
        feed_dict = {target:out}
        for d in range(N):
            dt = inputs[d]
            # dt is the TF graph node where we input the tensor giving one
            # of the inputs at a particular time step.
            
            # inp has dimensions [batch_size, N, 1] and we want to extract
            # the 2D Tensor of shape [batch_size,1] obtained by setting the
            # second coordinate to d
            ti = []
            for k in range(batch_size):
                ti.append([inp[k][d][0]])

            feed_dict[dt] = np.array(ti)
            
        sess.run(minimize, feed_dict)
    print("Epoch - " + str(i))
    
feed_dict = {target:test_output}
for d in range(N):
    dt = inputs[d]
    ti = []
    for k in range(len(test_input)):
        ti.append([test_input[k][d][0]])
    feed_dict[dt] = np.array(ti)

incorrect = sess.run(error, feed_dict)
print('Epoch {:2d} error {:3.1f}%'.format(i+1,100 * incorrect))

#print("Example ----")
#print(train_input[0])
#print("Correct classification is:")
#print(train_output[0])
#print("We predicted:")
#print(sess.run(prediction,{data: [train_input[0]]}))

sess.close()

Number of batches: 100
Epoch - 0
Epoch - 1
Epoch - 2
Epoch - 3
Epoch - 4
Epoch - 5
Epoch - 6
Epoch - 7
Epoch - 8
Epoch - 9
Epoch - 10
Epoch - 11
Epoch - 12
Epoch - 13
Epoch - 14
Epoch - 15
Epoch - 16
Epoch - 17
Epoch - 18
Epoch - 19
Epoch 20 error 1.0%


In [9]:
# Very initial experiments
# No higher monomials, 20 epochs, ~15%
# Higher monomials, 20 epochs, 0.4%