In [1]:
import os
import keras
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow.contrib import lite
from keras.models import Sequential, load_model, Model
from keras.layers import Dense, Dropout, Flatten, Add, Concatenate, Conv1D, Conv2D, MaxPooling1D, MaxPooling2D
from keras import backend as K
from scipy.signal import stft
from sklearn import preprocessing
from sklearn.model_selection import train_test_split, StratifiedKFold
from sklearn.metrics import classification_report, confusion_matrix
import pickle as pkl

os.environ["CUDA_VISIBLE_DEVICES"] = ""


Using TensorFlow backend.


In [2]:
# saved files names
img_model_name = "keras_cnn_img.h5"
emg_model_name = "keras_cnn_emg.h5"
fus_model_name = "keras_cnn_fus.h5"

# hyper-parameters definition
split_p = 0.66
batch_size = 128
num_classes = 5
img_epochs = 30
emg_epochs = 30
fus_epochs = 30
img_rows, img_cols = 60, 60
emg_rows, emg_cols = 8, 3
seed = 7
np.random.seed(seed)

In [3]:
data_events = pkl.load(open('full_dataset_events.pkl', 'rb'))
dvs = data_events['frames']
lbl = data_events['labels']
emg = data_events['emg']
print(dvs.shape)
print(lbl.shape)
print(emg.shape)

dvs = dvs / 255.

print(dvs.shape)
print(lbl.shape)

(6000, 60, 60)
(6000,)
(6000, 24)
(6000, 60, 60)
(6000,)


In [None]:
n_train = int(len(dvs) * 0.8)

dvs_train = dvs[:n_train]
dvs_test = dvs[n_train:]
emg_train = emg[:n_train]
emg_test = emg[n_train:]
lbl_train = lbl[:n_train]
lbl_test = lbl[n_train:]

print(dvs_train.shape)
print(lbl_train.shape)
print(emg_train.shape)

In [None]:
# data format convertion
dvs_train = dvs_train.reshape(-1, img_rows, img_cols, 1)
dvs_test = dvs_test.reshape(-1, img_rows, img_cols, 1)
emg_train = emg_train.reshape(-1, emg_rows * emg_cols, 1)
emg_test = emg_test.reshape(-1, emg_rows * emg_cols, 1)


lbl_train = keras.utils.to_categorical(lbl_train, num_classes)
lbl_test = keras.utils.to_categorical(lbl_test, num_classes)

dvs_input_shape = (img_rows, img_cols, 1)
emg_input_shape = (emg_rows * emg_cols, 1)

In [None]:
plt.imshow(np.squeeze(dvs_train[500][:, ::-1]))
print(np.squeeze(dvs_train[0][40]))

In [None]:
# cnn for img
# create the cnn model
model_img = Sequential()
model_img.add(Conv2D(filters=8, kernel_size=(3, 3), activation='relu', input_shape=dvs_input_shape))
model_img.add(Conv2D(filters=8, kernel_size=(3, 3), activation='relu'))
model_img.add(MaxPooling2D(pool_size=(2, 2)))
model_img.add(Dropout(0.25))
model_img.add(Flatten())
model_img.add(Dense(50, activation='relu'))
model_img.add(Dropout(0.5))
model_img.add(Dense(num_classes, activation='softmax'))
                    
model_img.summary()

In [None]:
# compile model
model_img.compile(loss=keras.losses.categorical_crossentropy,
            optimizer=keras.optimizers.Adadelta(),
            metrics=['accuracy'])

# fit the model
model_img.fit(dvs_train, lbl_train,
        batch_size=batch_size,
        epochs=25,
        verbose=1,
        validation_data=(dvs_test, lbl_test))

# evaluate the model
scores = model_img.evaluate(dvs_test, lbl_test, verbose=1)
print("%s: %.2f%%" % (model_img.metrics_names[1], scores[1]*100))



In [None]:
# convert h5 model to tflite model
model_img.save('cnn_dvs_keras.h5')   
# converter = lite.TFLiteConverter.from_keras_model_file('cnn_dvs_keras.h5')
# model_h5 = converter.convert()
# file = open('cnn_dvs.tflite','wb')
# file.write(model_h5)

In [None]:
# cnn for emg

# create the cnn model
model_emg = Sequential()
model_emg.add(Conv1D(filters=32, kernel_size=3, activation='relu', input_shape=emg_input_shape))
model_emg.add(Conv1D(filters=32, kernel_size=3, activation='relu'))
model_emg.add(Dropout(0.5))
model_emg.add(MaxPooling1D(pool_size=2))
model_emg.add(Flatten())
model_emg.add(Dense(100, activation='relu'))
model_emg.add(Dense(num_classes, activation='softmax'))

model_emg.summary()

In [None]:
# compile model
model_emg.compile(loss=keras.losses.categorical_crossentropy,
            optimizer=keras.optimizers.Adadelta(),
            metrics=['accuracy'])

# fit the model
model_emg.fit(emg_train, lbl_train, epochs=emg_epochs, batch_size=batch_size, verbose=0)

model_emg.fit(emg_train, lbl_train,
        batch_size=batch_size,
        epochs=emg_epochs,
        verbose=1,
        validation_data=(emg_test, lbl_test))

# evaluate the model
scores = model_emg.evaluate(emg_test, lbl_test, verbose=0)
print("%s: %.2f%%" % (model_emg.metrics_names[1], scores[1]*100))

In [None]:
# convert h5 model to tflite model
model_emg.save('cnn_emg_keras.h5')   
# converter = lite.TFLiteConverter.from_keras_model_file('cnn_emg_keras.h5')
# model_h5 = converter.convert()
# file = open('cnn_emg.tflite','wb')
# file.write(model_h5)

In [None]:
from keras.models import load_model
 
# load model
model_emg = load_model('cnn_emg_keras.h5')
model_img = load_model('cnn_dvs_keras.h5')

In [None]:
# mlp for fus
mergedOut = Concatenate()([model_img.output,model_emg.output])
mergedOut = Dense(5, activation='softmax', name='otherD')(mergedOut)
model_fus = Model([model_img.input, model_emg.input], mergedOut)
model_fus.summary()

In [None]:
# freeze the layers except the last dense
for layer in model_fus.layers[:len(model_fus.layers)-1]:
    layer.trainable = False

# check the status of the layers
for layer in model_fus.layers:
    print(layer, layer.trainable)

In [None]:
# compile model
model_fus.compile(loss=keras.losses.categorical_crossentropy,
            optimizer=keras.optimizers.Adadelta(),
            metrics=['accuracy'])

# fit the model
model_fus.fit([dvs_train, emg_train], lbl_train,
        batch_size=batch_size,
        epochs=fus_epochs,
        verbose=1,
        validation_data=([dvs_test, emg_test], lbl_test))

# evaluate the model
scores = model_fus.evaluate([dvs_test, emg_test], lbl_test, verbose=1)
print("%s: %.2f%%" % (model_fus.metrics_names[1], scores[1]*100))

In [None]:
# confusion matrix for fus
pred_fus = model_fus.predict([dvs_test, emg_test])
print(confusion_matrix(lbl_test.argmax(axis=1), pred_fus.argmax(axis=1)))

In [None]:
# save keras model or weights on google drive
# create on colab directory
model_fus.save(fus_model_name)    
# model_file = drive.CreateFile({'title' : fus_model_name})
# model_file.SetContentFile(fus_model_name)
# model_file.Upload()

# # download to google drive
# drive.CreateFile({'id': model_file.get('id')})

In [4]:
# load model from google drive
model_cnn_fus = load_model(fus_model_name)
model_cnn_fus.summary()

Instructions for updating:
Colocations handled automatically by placer.
Instructions for updating:
Please use `rate` instead of `keep_prob`. Rate should be set to `rate = 1 - keep_prob`.
Instructions for updating:
Use tf.cast instead.
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
conv2d_1_input (InputLayer)     (None, 60, 60, 1)    0                                            
__________________________________________________________________________________________________
conv2d_1 (Conv2D)               (None, 58, 58, 8)    80          conv2d_1_input[0][0]             
__________________________________________________________________________________________________
conv1d_1_input (InputLayer)     (None, 24, 1)        0                                            
________________________________________________________________________

In [5]:
# convert h5 model to tflite model
converter = lite.TFLiteConverter.from_keras_model_file(fus_model_name)
model_h5 = converter.convert()
file = open('cnn_fus.tflite','wb')
file.write(model_h5)


Instructions for updating:
Use tf.compat.v1.graph_util.convert_variables_to_constants
Instructions for updating:
Use tf.compat.v1.graph_util.extract_sub_graph
INFO:tensorflow:Froze 18 variables.
INFO:tensorflow:Converted 18 variables to const ops.


1406996