In [None]:
import tensorflow as tf
from sklearn.cross_validation import train_test_split
import sys
from sklearn.utils import shuffle
import h5py
import time
import seaborn as ses

from IPython import display
import pylab as pl

%matplotlib inline

# Model of one layer LSTM


### load X,Y Data from folder /Data

In [None]:
def restore (nameOfFile,nameOfDataset):
    h5f = h5py.File(nameOfFile,'r')
    matrix = h5f[nameOfDataset][:]
    h5f.close()
    return matrix

In [None]:
X = restore("Data/data_matrix_X.h5","X")
Y = restore("Data/data_matrix_Y.h5","Y")

x_train,x_test,y_train,y_test = train_test_split(X,Y,test_size=0.40,random_state=42)
x_validate,x_test,y_validate,y_test = train_test_split(x_test,y_test,test_size=0.50,random_state=42)

In [None]:
print(x_train.shape)
print(y_train.shape)
print(x_test.shape)
print(y_test.shape)
print(x_validate.shape)
print(y_validate.shape)

### model tuning parameters

In [None]:
# size of input vector
input_size = 27
# nimber of hidden unit
hidden_size = 80
# number of output vector
output_size = 2

learn_rate = 0.001

batch_size = 700
epoch_number = 1200

## Design of LSTM cell

In [None]:
class LstmCell(object):
    
    def __init__(self,input_size , hidden_size , output_size):
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.output_size = output_size
        
        # Weights and Bias for input gate
        self.wig = tf.Variable(tf.random_normal([self.input_size, self.hidden_size],mean=0,stddev=0.01))
        self.uig = tf.Variable(tf.random_normal([self.hidden_size, self.hidden_size],mean=0,stddev=0.01))
        self.big = tf.Variable(tf.ones([self.hidden_size]))

        # Weights and Bias for forget gate
        self.wfg = tf.Variable(tf.random_normal([self.input_size, self.hidden_size],mean=0,stddev=0.01))
        self.ufg = tf.Variable(tf.random_normal([self.hidden_size, self.hidden_size],mean=0,stddev=0.01))
        self.bfg = tf.Variable(tf.ones([self.hidden_size]))                
        
        # Weights and Bias for output gate
        self.wog = tf.Variable(tf.random_normal([self.input_size, self.hidden_size],mean=0,stddev=0.01))
        self.uog = tf.Variable(tf.random_normal([self.hidden_size, self.hidden_size],mean=0,stddev=0.01))
        self.bog = tf.Variable(tf.ones([self.hidden_size]))        
        
        # Weights and Bias for new memory gate
        self.wcg = tf.Variable(tf.random_normal([self.input_size, self.hidden_size],mean=0,stddev=0.01))
        self.ucg = tf.Variable(tf.random_normal([self.hidden_size, self.hidden_size],mean=0,stddev=0.01))
        self.bcg = tf.Variable(tf.ones([self.hidden_size]))
        
        # Weights and Bias for output layer
        self.vo = tf.Variable(tf.random_normal([self.hidden_size,output_size],mean=0,stddev=0.01))
        self.bo = tf.Variable(tf.ones([self.output_size]))
        
        #define shape of input matrix 
        self.inputs_matrix = tf.placeholder(dtype=tf.float32 ,shape=[None,None,self.input_size])
        
        #prepare input matrix to feed model 
        # it will make matrix as [m,t,n] --> [t,m,n]
        # t : number of time
        # n : length of input vector
        # m : number of sentences
        self.feed_marix = tf.transpose(self.inputs_matrix,perm=[1,0,2])
        
        #prepare initial state as matrix of zerose (batch_size,hidden_size)
        self.initial_state_ = tf.matmul(self.feed_marix[0],tf.zeros([self.input_size,self.hidden_size]))
        self.initial_state = tf.stack([self.initial_state_,self.initial_state_])
        
       
    
    def lstm(self, previous_hidden_memory_tuple, x):
        """lstm cell that compute  curent state at time t using
           previous state at t-1 and input vector x and previous memory
    
        Args:
            previous_hidden_state (tuple): 
                        ( previous_memory_state at (t-1) , previous_state at (t-1) ) .
            x (vector): input vector of char.

        Returns:
            prev_memory_and_state: tuple of 
                        ( curent_memory at time (t) , state at time (t) )
        """
        
        previous_memory,previous_hidden_state=tf.unstack(previous_hidden_memory_tuple)
        #Input Gate
        i= tf.sigmoid(tf.matmul(x,self.wig)+tf.matmul(previous_hidden_state,self.uig) + self.big)
        #Forget Gate
        f= tf.sigmoid(tf.matmul(x,self.wfg)+tf.matmul(previous_hidden_state,self.ufg) + self.bfg)
        #Output Gate
        o= tf.sigmoid(tf.matmul(x,self.wog)+tf.matmul(previous_hidden_state,self.uog) + self.bog)
        #New Memory Cell
        c_= tf.nn.tanh(tf.matmul(x,self.wcg)+tf.matmul(previous_hidden_state,self.ucg) + self.bcg) 
        #Final Memory cell
        c= f*previous_memory + i*c_
        #Current Hidden state
        current_hidden_state = o*tf.nn.tanh(c)
        #stake previous memory and state in tuple
        prev_memory_and_state = tf.stack([c,current_hidden_state]) 
        return prev_memory_and_state
        
        
    def get_states(self):
        """get all state of batch_number example it accumulate 
           all states of one sentence matrix like :
           S0 = initial_state + x0
           S1 = S0 + x1
           S2 = S1 + x2
                .
                .
           Sn = Sn-1 + xn
           
           and for batch_number
    
        Returns:
            all_stats: all state of m example

        Note:
            returned matrix size (number_time_steps ,batch_number , input_size )
        """
        # apply gru function on all matrix of all batch like
        #   state_0 =  gru(initial_state,feed_marix[0])
        #   state_1 =  gru(state_0,feed_matrix[1])
        all_stats = tf.scan(self.lstm,self.feed_marix,self.initial_state,name="stats")
        return all_stats        
        
    def get_output(self,hidden_stat):
        """apply the hidden_state on RELU activation 
           function to compute output
        
        Args:
            hidden_stat (tuple): (memory at time t, state at time t)
        
        Returns:
            output: output matrix

        Note:
            returned matrix size (batch_number , output_size )
        """
        hide_memory,hide_state = tf.unstack(hidden_stat)
        output = tf.nn.relu(tf.matmul(hide_state,self.vo)+self.bo)
        return output
    
        
    def get_outputs(self):
        """get all output for all states
    
        Returns:
            all_outputs: output matrix for all state  

        Note:
            returned matrix size (state_numbers , batch_number , output_size )
        """
        #get all states for every time step 
        all_stats = self.get_states()
        #compute output matrix for all states
        all_outputs = tf.map_fn(self.get_output,all_stats)
        return all_outputs

## Define Model

In [None]:
# define GruCell class
rnn = LstmCell(hidden_size=hidden_size,input_size=input_size,output_size=output_size)

#get all outputs 
outputs = rnn.get_outputs()

#get last state for batch
last_output = outputs[-1]

#apply softmax on all last states
output = tf.nn.softmax(last_output)

#define shape of y
y = tf.placeholder(tf.float32,shape=[None,output_size])

#compute Cost_function 
cross_entropy = -tf.reduce_sum(y * tf.log(output))/batch_size

#use AdamOptmizer to reduece error
optmizer_step = tf.train.AdamOptimizer().minimize(cross_entropy)

#compute accuracy
correct_prediction = tf.equal(tf.argmax(y,1),tf.argmax(output,1))
accuracy = (tf.reduce_sum(tf.cast(correct_prediction,tf.float32)))*100

## Train Model

In [None]:
sess = tf.Session()
sess.run(tf.initialize_all_variables())

In [None]:
start_time = time.time()
loss_fig=[]
test_accuracy_fig=[]
train_accuracy_fig=[]
validate_accuracy_fig=[]
epoch_fig=[]

def print_info():
    pl.figure(figsize=(15,6))
    pl.plot(epoch_fig,loss_fig,'b')
    pl.plot(epoch_fig,train_accuracy_fig,'r')
    pl.plot(epoch_fig,validate_accuracy_fig,'g')
    pl.plot(epoch_fig,test_accuracy_fig,'y')
    pl.show()

    sys.stdout.flush()
    print("\r         Iteration:      %s \n" 
          "               Loss:      %s \n"  
          "     Train-Accuracy:      %s \n"
          "  Validate-Accuracy:      %s \n"
          "      test-Accuracy:      %s \n"%(epoch,loss,train_accuracy,validate_acuuracy,test_acuuracy))
    sys.stdout.flush()



pl.ion() 
for epoch in range(epoch_number):
    start = 0
    end = batch_size
    batchs_num = int(X.shape[0]/batch_size)
    for i in range(batchs_num):
        X = x_train[start:end]
        Y = y_train[start:end]
        start=end
        end=end+batch_size
        sess.run(optmizer_step,feed_dict={rnn.inputs_matrix:X,y:Y})
    
    loss = sess.run(cross_entropy,feed_dict={rnn.inputs_matrix:X,y:Y})*100
    train_accuracy = sess.run(accuracy,feed_dict={rnn.inputs_matrix:x_train,y:y_train}) / len(x_train)
    validate_acuuracy = sess.run(accuracy,feed_dict={rnn.inputs_matrix:x_validate,y:y_validate}) / len(x_validate)
    test_acuuracy = sess.run(accuracy,feed_dict={rnn.inputs_matrix:x_test,y:y_test}) / len(x_test)

    epoch_fig.append(epoch)
    loss_fig.append(loss)
    train_accuracy_fig.append(train_accuracy)
    validate_accuracy_fig.append(validate_acuuracy)
    test_accuracy_fig.append(test_acuuracy)
    

    print_info()
    display.clear_output(wait=True)

    
print_info()
print("\n--- %s minute ---" % ((time.time() - start_time)/60))

## Test model 

In [None]:
#get all outputs of test set
test_outputs = rnn.get_outputs()
sess.run(test_outputs,feed_dict={rnn.inputs_matrix:x_test})
#get last state of last time step
last_test_output = test_outputs[-1]


In [None]:
#apply softmax on last_test_output
y_predict = sess.run(tf.nn.softmax(last_test_output),feed_dict={rnn.inputs_matrix:x_test})

In [None]:
#compute accuracy
correct_prediction = tf.equal(tf.argmax(y_predict,1),tf.argmax(y_test,1))
accuracy = (tf.reduce_sum(tf.cast(correct_prediction,tf.float32))*100)/len(x_test)
print("Accuracy of test set :: %s "%(sess.run(accuracy)))


In [None]:
num = 10
print(y_test[num])
print(y_predict[num])