In [1]:
import tensorflow as tf
import numpy as np
import cv2, glob, os
from sklearn.model_selection import train_test_split

In [2]:
dataset_path = 'dataset/'
classes = 4
num_epochs = 10
learning_rate = 0.0001
width = 128
height = 128
Y = None
Z = None
tf.reset_default_graph()

In [3]:
def load_frames():
    global Z
    global Y
    label = None
    X = []
    filenum = 0
    Y = np.zeros((40,4))
    for file in glob.glob(dataset_path + '**', recursive=True):
        num_frame = 0
        if file == dataset_path:
            continue
        if os.path.isdir(file):
            x_list = []
            label = file[len(dataset_path):] #label   
        if os.path.isfile(file):
            x_list = []
            vidcap = cv2.VideoCapture(file)
            while(vidcap.isOpened()):
                ret, frame = vidcap.read()
                if ret == True:
                    frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
                    each_frame = cv2.resize(frame, (height, width))
                    x_list.append(np.array(each_frame))
                    num_frame +=1
                else:
                    X.append(np.array(x_list))
                    vidcap.release()
            Y[filenum, int(label)] = 1.0
            filenum += 1

   
    Z = np.array(X)

In [4]:
load_frames()

In [5]:
X, X_test, Y, Y_test = train_test_split(Z,Y, test_size=0.0, random_state=30)

In [6]:
class LSTM_RNN:
    
    def lstm_gates(self, prev_state, x):

        self.Fgt_sigmoid_W = tf.get_variable('Fgt_sigmoid_W', shape=[height*width+100,100], dtype=tf.float64)
        self.Fgt_sigmoid_b = tf.get_variable('Fgt_sigmoid_b', shape=[1,100], dtype=tf.float64)
        
        self.in_sigmoid_W = tf.get_variable('In_sigmoid_W', shape=[height*width+100,100], dtype=tf.float64)
        self.in_sigmoid_b = tf.get_variable('In_sigmoid_b', shape=[1,100], dtype=tf.float64)
        
        self.in_tan_W = tf.get_variable('In_tan_W', shape=[height*width+100,100], dtype=tf.float64)
        self.in_tan_b = tf.get_variable('In_tan_b', shape=[1,100], dtype=tf.float64)
        
        self.out_sigmoid_W = tf.get_variable('Out_sigmoid_W', shape=[height*width+100,100], dtype=tf.float64)
        self.out_sigmoid_b = tf.get_variable('Out_sigmoid_b', shape=[1,100], dtype=tf.float64)
        
        self.out_tan_W = tf.get_variable('Out_tan_W', shape=[100,100], dtype=tf.float64)
        self.out_tan_b = tf.get_variable('Out_tan_b', shape=[1,100], dtype=tf.float64)
        
        vectorized_input = tf.reshape(x, (1,height*width))
        
        self.conveyor_belt,self.output_belt = prev_state
        self.concatenation = tf.concat([vectorized_input, self.output_belt], 1)
        
        self.forget_gate = tf.nn.sigmoid(tf.matmul(self.concatenation, self.Fgt_sigmoid_W) + self.Fgt_sigmoid_b)
        self.forget_conveyor = tf.multiply(self.conveyor_belt, self.forget_gate)
        
        self.input_sig = tf.nn.sigmoid(tf.matmul(self.concatenation, self.in_sigmoid_W) + self.in_sigmoid_b)
        self.input_tan = tf.nn.tanh(tf.matmul(self.concatenation, self.in_tan_W) + self.in_tan_b)
        
        self.input_concatenation = tf.multiply(self.input_sig, self.input_tan)
        
        self.conveyor_input = tf.add(self.forget_conveyor, self.input_concatenation)
        
        self.out_conveyor_sigmoid = tf.nn.sigmoid(tf.matmul(self.concatenation, self.out_sigmoid_W) + self.out_sigmoid_b)
        self.out_conveyor_tan = tf.nn.tanh(tf.matmul(self.conveyor_input, self.out_tan_W) + self.out_tan_b)
        
        self.conveyor_output = tf.multiply(self.out_conveyor_sigmoid, self.out_conveyor_tan)
        
        return [self.conveyor_input, self.conveyor_output]
        
        

    
    def __init__(self):
        self.X_placeholder = tf.placeholder(tf.float64, [None, width, height])
        self.Y_placeholder = tf.placeholder(tf.int64, [1, classes])
    
        self.init_conveyor = tf.placeholder(tf.float64, [1,100])
        self.init_output_conveyor = tf.placeholder(tf.float64, [1,100])
        
        self.O_w = tf.Variable(np.random.rand(100,4), dtype=tf.float64)
        self.O_b = tf.Variable(np.random.rand(1,4), dtype=tf.float64)
        
        self.states = tf.scan(self.lstm_gates, self.X_placeholder, initializer=[self.init_conveyor, self.init_output_conveyor], name='states')
        
        
        logit = tf.matmul(self.states[-1][1], self.O_w) + self.O_b
        self.prediction = tf.nn.softmax(logit)
       
        self.loss = tf.nn.softmax_cross_entropy_with_logits_v2(logits=logit,labels=self.Y_placeholder)
        self.train_step = tf.train.AdamOptimizer(learning_rate).minimize(self.loss)

In [7]:
with tf.Session() as sess:
    loss = 0.0
    
    model = LSTM_RNN()
    sess.run(tf.global_variables_initializer())
    
    for epoch in range(num_epochs):
        
        for num_data in range(Y.shape[0]):
            _current_state = np.zeros((1,100))
            _current_state2 = np.zeros((1,100))
            label = np.reshape(Y[num_data], (1,4))
            
            _loss, _train_step, _states, _prediction = sess.run(
                [model.loss, model.train_step, model.states, model.prediction],
                feed_dict = {
                    model.X_placeholder:X[num_data],
                    model.Y_placeholder:label,
                    model.init_conveyor:_current_state,
                    model.init_output_conveyor:_current_state2
                })
            loss += _loss
        print("Step %d | Loss %g"%(epoch, loss))
        loss = 0.0  

Step 0 | Loss 71.1258
Step 1 | Loss 56.6312
Step 2 | Loss 56.1183
Step 3 | Loss 55.9545
Step 4 | Loss 55.9033
Step 5 | Loss 55.8883
Step 6 | Loss 55.8847
Step 7 | Loss 55.8845
Step 8 | Loss 55.8851
Step 9 | Loss 55.8858
