### Seed

In [None]:
import matplotlib.pyplot as plt
from time import time
import matplotlib.pyplot as plt
import pandas as pd
import cv2 as cv

In [None]:
seed_value= 0

import os
os.environ['PYTHONHASHSEED']=str(seed_value)

import random
random.seed(seed_value)

import numpy as np
np.random.seed(seed_value)

import tensorflow as tf
tf.set_random_seed(seed_value)

import keras
from keras.models import Sequential, Model
from keras.layers import Input, Flatten, Dense, Dropout, Convolution2D, Conv2D, MaxPooling2D, Lambda, GlobalMaxPooling2D, GlobalAveragePooling2D, BatchNormalization, Activation, AveragePooling2D, Concatenate
from keras.preprocessing.image import ImageDataGenerator
from keras.callbacks import EarlyStopping, ModelCheckpoint, ReduceLROnPlateau
from keras.utils import np_utils
from keras import backend as K
session_conf = tf.ConfigProto(intra_op_parallelism_threads=1, inter_op_parallelism_threads=1)
sess = tf.Session(graph=tf.get_default_graph(), config=session_conf)
K.set_session(sess)

### Read Dataset

In [None]:
def getOneHot(a):
    b = np.zeros((a.size, a.max()+1))
    b[np.arange(a.size),a] = 1
    return b

In [None]:
def getData(path, folders):
    path+='/'
    labelNames = folders
    labels = []
    images = []
    temp = []
    for i in range(len(folders)):
        randomImages = []
        imgAddress = path + folders[i]
        l = os.listdir(imgAddress)
        for img in l:
            frame = cv.imread(imgAddress+'/'+img)
            temp.append((i,frame))
    random.shuffle(temp)
    for x in temp:
        temp1, temp2 = (x)
        labels.append(temp1)
        images.append(temp2)
    labels = getOneHot(np.asarray(labels))
    images = np.asarray(images)
    return images, labels

In [None]:
def preprocess(a):
    b = []
    for action_frame in a:
        hsv = cv.cvtColor(action_frame, cv.COLOR_BGR2HSV)
        lower_color = np.array([0, 10, 60])
        upper_color = np.array([20, 150, 255])
        mask = cv.inRange(hsv, lower_color, upper_color) 
        res = cv.bitwise_and(action_frame,action_frame, mask= mask)
        b.append(res)
    return b

In [None]:
labelNames = ['next', 'previous', 'stop']
x_train, y_train = getData('finalImageGeneric', labelNames)
x_val, y_val = getData('finalImageGenericVal', labelNames)

In [None]:
x_train = np.asarray(preprocess(x_train))
x_val = np.asarray(preprocess(x_val))

### Show images

In [None]:
def showImg(images, labels, labelNames):
    plt.figure(figsize=(10,10))
    for i in range(25):
        plt.subplot(5,5,i+1)
        plt.xticks([])
        plt.yticks([])
        plt.grid(False)
        n = random.randint(0,len(images))
        plt.imshow(images[n])
        plt.xlabel(labelNames[np.argmax(labels[n])])
    plt.show()

In [None]:
print("Training data:")
showImg(x_train, y_train, labelNames)
print("Validation data:")
showImg(x_val, y_val, labelNames)

### CNN architecture

In [None]:
def get_model(layer1Filter = 16, layer2Filter = 32, layer3Filter = 64, layer4Output = 400, optim = 'adam'):
    x = Input((50, 50, 3))
    model = BatchNormalization(axis = 3)(x)
    model = Convolution2D(filters = layer1Filter, kernel_size = (3,3), activation='relu')(model)
    model = MaxPooling2D()(model)
    model = Dropout(0.5, seed = seed_value)(model)
    
    model = BatchNormalization(axis = 3)(model)
    model = Convolution2D(filters = layer2Filter, kernel_size = (3,3), activation='relu')(model)
    model = MaxPooling2D()(model)
    model = Dropout(0.5, seed = seed_value)(model)
    
    model = BatchNormalization(axis = 3)(model)
    model = Convolution2D(filters = layer3Filter, kernel_size = (3,3), activation='relu')(model)
    model = MaxPooling2D()(model)
    model = Dropout(0.5, seed = seed_value)(model)
    
    model = Flatten()(model)
    model = Dense(layer4Output , activation = 'relu')(model)
    model = Dropout(0.5, seed = seed_value)(model)
    model = Dense(3, activation = 'softmax')(model)
    
    model = Model(input = x, output = model)
    
    if optim == 'adam':
        opt = keras.optimizers.Adam(lr=0.001, beta_1=0.9, beta_2=0.999, epsilon=1e-08, decay=0.0)
    else:
        opt = keras.optimizers.RMSprop(learning_rate=0.001, rho=0.9)
        
    model.compile(opt, loss='binary_crossentropy', metrics=['accuracy'])
    return model

In [None]:
def get_callbacks(name_weights, patience_lr):
    mcp_save = ModelCheckpoint(name_weights, save_best_only=True, monitor='val_loss', mode='min')
    reduce_lr_loss = ReduceLROnPlateau(monitor='val_accuracy', factor=0.75, patience=patience_lr, verbose=1, epsilon=1e-4)
    return [mcp_save, reduce_lr_loss]


In [None]:
def train_model(x_train, y_train, x_val, y_val, layer4Output, optim, epochs):
    name_weights = "final_model_weights_"+optim+"_"+str(layer4Output)+".h5"
    model = get_model(layer4Output = layer4Output, optim = optim)
    callbacks = get_callbacks(name_weights = name_weights, patience_lr=2)
    history = model.fit(x = x_train, y = y_train, validation_data = (x_val, y_val), epochs = epochs, callbacks=callbacks)
    return model, history

In [None]:
# layer4 = [200, 400, 500, 700, 1000, 1200]
# optimizers = ['adam', 'rmsprop']
# scores = []
# for optim in optimizers:
#     for nLayer in layer4:
#         temp = [optim, nLayer]
#         model = train_model(x_train, y_train, x_val, y_val, nLayer, optim, 20)
#         val = model.evaluate(x_val, y_val)
#         temp.append(val)
#         scores.append(temp)

In [None]:
model, history = train_model(x_train, y_train, x_val, y_val, 400, 'adam', 20)

### Prediction

In [None]:
cap = cv.VideoCapture(0)

while True:
    ret, frame = cap.read()
    if not ret:
        print("Unable to capture video")
        break
    frame = cv.flip( frame, 1 )
    frame2 = cv.resize(frame, (50, 50))
    a = []
    a.append(frame2)
    a = np.array(preprocess(a))
    prob = model.predict(a)
    pred = np.argmax(prob)
    if prob[0][pred] < 0.9:
        s = "Background " + str(prob[0])
    else:
        s = labelNames[pred] + " " + str(prob[0][pred])
    font = cv.FONT_HERSHEY_SIMPLEX
    cv.putText(frame,s,(40,40),font,0.70,(0,0,255),2)
    cv.imshow('frame', frame)
    cv.imshow('frame2', a[0])
    if cv.waitKey(10) & 0xFF == ord('q'):
        break 

cap.release()
cv.destroyAllWindows()

### Visualisation

In [None]:
plt.plot(history.history['accuracy'])
plt.plot(history.history['val_accuracy'])
plt.title('Model accuracy')
plt.ylabel('Accuracy')
plt.xlabel('Epoch')
plt.legend(['Train', 'Test'], loc='upper left')
plt.show()

plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('Model loss')
plt.ylabel('Loss')
plt.xlabel('Epoch')
plt.legend(['Train', 'Test'], loc='upper left')
plt.show()

### Using trained model

In [None]:
def load_trained_model(weights_path):
    model= get_model()
    model.load_weights(weights_path)
    return model

In [None]:
model = load_trained_model("/home/ankit/Desktop/Acad/Sem/Sem5/COL780/Assn/4/weights/final_model_weights_adam_400.h5")