In [1]:
import numpy as np
import tensorflow as tf
from sklearn import datasets
from sklearn.model_selection import train_test_split
import pylab as pl
from IPython import display
import sys
%matplotlib inline

In [2]:
class LSTM_cell(object):

    """
    LSTM cell object which takes 3 arguments for initialization.
    input_size = Input Vector size
    hidden_layer_size = Hidden layer size
    target_size = Output vector size

    """

    def __init__(self, input_size, hidden_layer_size, target_size):

        # Initialization of given values
        self.input_size = input_size
        self.hidden_layer_size = hidden_layer_size
        self.target_size = target_size
 

        
        self.Wc = tf.Variable(tf.zeros(
            [self.input_size, self.hidden_layer_size]))
        self.Uc = tf.Variable(tf.zeros(
            [self.hidden_layer_size, self.hidden_layer_size]))
        self.bc = tf.Variable(tf.zeros([self.hidden_layer_size]))
        
        self.Wog = tf.Variable(tf.zeros(
            [self.input_size, self.hidden_layer_size]))
        self.Uog = tf.Variable(tf.zeros(
            [self.hidden_layer_size, self.hidden_layer_size]))
        self.bog = tf.Variable(tf.zeros([self.hidden_layer_size]))
          # Weights for output layers
        self.Wo = tf.Variable(tf.truncated_normal([self.hidden_layer_size, self.target_size], mean=0, stddev=.01))
        self.bo = tf.Variable(tf.truncated_normal([self.target_size], mean=0, stddev=.01))
        
        
        self._inputs = tf.placeholder(tf.float32,
                                      shape=[None, None, self.input_size],
                                      name='inputs')

        # Processing inputs to work with scan function
        self.processed_input = process_batch_input_for_RNN(self._inputs)

        '''
        Initial hidden state's shape is [1,self.hidden_layer_size]
        In First time stamp, we are doing dot product with weights to
        get the shape of [batch_size, self.hidden_layer_size].
        For this dot product tensorflow use broadcasting. But during
        Back propagation a low level error occurs.
        So to solve the problem it was needed to initialize initial
        hiddden state of size [batch_size, self.hidden_layer_size].
        So here is a little hack !!!! Getting the same shaped
        initial hidden state of zeros.
        '''

        self.initial_hidden = self._inputs[:, 0, :]
        self.initial_hidden= tf.matmul(
            self.initial_hidden, tf.zeros([input_size, hidden_layer_size]))
        
        
        self.initial_hidden=tf.stack([self.initial_hidden,self.initial_hidden])
    # Function for LSTM cell.
    def Lstm(self, previous_hidden_memory_tuple, x):
        """
        This function takes previous hidden state and memory tuple with input and
        outputs current hidden state.
        """
        
        previous_hidden_state,c_prev=tf.unstack(previous_hidden_memory_tuple)

        #Forget Gate
        
          
        c_ = tf.tanh(
            tf.matmul(x,self.Wc)+tf.matmul(previous_hidden_state,self.Uc) + self.bc)
        
        #Final Memory cell
        c = c_prev +  c_
        
        o = tf.sigmoid(tf.matmul(x,self.Wog)+tf.matmul(previous_hidden_state,self.Uog) + self.bog)
            
        #Current Hidden state
        current_hidden_state = tf.tanh(c) * o


        return tf.stack([current_hidden_state,c])

    # Function for getting all hidden state.
    def get_states(self):
        """
        Iterates through time/ sequence to get all hidden state
        """

        # Getting all hidden state throuh time
        all_hidden_states = tf.scan(self.Lstm,
                                    self.processed_input,
                                    initializer=self.initial_hidden,
                                    name='states')
        all_hidden_states=all_hidden_states[:,0,:,:]
        
        return all_hidden_states

    # Function to get output from a hidden layer
    def get_output(self, hidden_state):
        """
        This function takes hidden state and returns output
        """
        output = tf.matmul(hidden_state, self.Wo) + self.bo

        return output

    # Function for getting all output layers
    def get_outputs(self):
        """
        Iterating through hidden states to get outputs for all timestamp
        """
        all_hidden_states = self.get_states()

        all_outputs = tf.map_fn(self.get_output, all_hidden_states)

        return all_outputs


# Function to convert batch input data to use scan ops of tensorflow.
def process_batch_input_for_RNN(batch_input):
    """
    Process tensor of size [5,3,2] to [3,5,2]
    """
    batch_input_ = tf.transpose(batch_input, perm=[2, 0, 1])
    X = tf.transpose(batch_input_)

    return X


In [3]:
hidden_layer_size = 1
input_size = 1
target_size = 1
y = tf.placeholder(tf.float32, shape=[None, target_size],name='inputs')
rnn=LSTM_cell( input_size, hidden_layer_size, target_size)
outputs = rnn.get_outputs()
last_output = outputs[-1]
output=tf.nn.sigmoid(last_output)
cross_entropy = tf.reduce_mean(tf.nn.sigmoid_cross_entropy_with_logits(labels=y,logits=last_output))
lr = tf.placeholder(dtype=tf.float32,shape=[])
train_step = tf.train.AdamOptimizer(lr).minimize(cross_entropy)
correct_prediction = tf.equal(y,tf.round(output))
accuracy = (tf.reduce_mean(tf.cast(correct_prediction, tf.float32)))*100
sess = tf.Session()
sess.run(tf.global_variables_initializer())
k = 2
j = 0
lear = 0.1
while(1):
    j += 1
    choose = np.random.randint(0,1401,100)
    X=np.random.binomial(n=1,p=0.5,size=(64,k,1))
    Y = np.sum(X,1) % 2
    sess.run(train_step,feed_dict={rnn._inputs:X, y:Y,lr:lear})
    X = np.random.binomial(n=1,p=0.5,size=(128,k,1))
    Y = np.sum(X,1) % 2
    Loss,acc = sess.run([cross_entropy,accuracy],feed_dict={rnn._inputs:X, y:Y})
    if Loss < 0.2 - k/100:
        print(k,acc)
        k += 1
        #lear *= 0.9
    if j % 100 == 1:
        print(j,Loss)
    if k == 9:
        break
X_test = np.random.binomial(n=1, p=0.5, size=(2**16,16,1))
y_test = np.sum(X_test,1) % 2
Test_accuracy=str(sess.run(accuracy,feed_dict={rnn._inputs:X_test, y:y_test}))
print('Result:',Test_accuracy,'Iteration:',j)

1 0.6982485
2 100.0
101 0.42723772
201 0.33087647
301 0.4047866
401 0.35388651
501 0.3178196
601 0.4065681
701 0.2765196
801 0.35845658
901 0.35256487
1001 0.4313743
1101 0.36228025
1201 0.4522351
1301 0.39326778
1401 0.31574863
1501 0.4174074
1601 0.35581315
1701 0.35421175
1801 0.39961103
1901 0.33618307
2001 0.4348814
2101 0.49007154
2201 0.41718745
2301 0.38069308
2401 0.33455876
2501 0.35336936
2601 0.34370902
2701 0.38219827
2801 0.34979284
2901 0.35394895
3001 0.3626743
3101 0.41827506
3201 0.3084761
3301 0.33906388
3401 0.24848443
3501 0.5218151
3601 0.28230435
3701 0.346217
3801 0.33703518
3901 0.36471295
4001 0.4004224
4101 0.4054565
4201 0.42708907
4301 0.32133722
4401 0.38045573
4501 0.25710842
4601 0.31316334
4701 0.38142636
4801 0.3590058
4901 0.33005422
5001 0.41663903
5101 0.34386888
5201 0.28880507
5301 0.30088627
5401 0.3380674
5501 0.40648806
5601 0.34006226
5701 0.3512537
5801 0.33192486
5901 0.29930905
6001 0.34555155
6101 0.4567657
6201 0.35253054
6301 0.3671794
6

KeyboardInterrupt: 

In [None]:
import matplotlib.pyplot as plt
X_test = np.random.binomial(n=1, p=1, size=(1,20,1))
hs = sess.run(outputs,feed_dict={rnn._inputs:X_test})
plt.plot(np.reshape(hs,[-1,1]))
plt.show()
X_test = np.random.binomial(n=1, p=1, size=(1,20,1))
X_test[0,5:15,0] = 0
hs = sess.run(outputs,feed_dict={rnn._inputs:X_test})
plt.plot(np.reshape(hs,[-1,1]))
plt.show()
X_test = np.random.binomial(n=1, p=0, size=(1,20,1))
X_test[0,5:15,0] = 1
hs = sess.run(outputs,feed_dict={rnn._inputs:X_test})
plt.plot(np.reshape(hs,[-1,1]))
plt.show()