In [1]:
import numpy as np
import keras as K
import keras.layers as KL
from keras.utils import np_utils
import pickle
import tqdm
import os
import cv2
from matplotlib import pyplot as plt

#!pip3 install -U tensorflow-addons
import tensorflow_addons as tfa
import tensorflow as tf

In [2]:
GPUs_num = len(tf.config.list_physical_devices('GPU'))
CPUs_num = len(tf.config.list_physical_devices('CPU'))
print("Num GPUs Available: ", GPUs_num)
print("gpu_devices: ", tf.config.list_physical_devices('GPU'))
print("Num CPUs Available: ", CPUs_num)
print("cpu_devices: ", tf.config.list_physical_devices('CPU'))
device = None
device = '/GPU:0' if GPUs_num > 0 else '/CPU:0'
device

# Dataset processing
* data partition
* labels tokenization
* data samples numeration

In [15]:
root_path = "../input/hand-gestures/dataset"
test_dir = "test"
train_dir = "train"
valid_dir = "valid"
save_dir = "./model"
tflite_model_name = "tflite_model"

token_to_label = dict()
label_to_token = dict()
id_to_label = dict()
labels_names = ["down", "up", "left", "right", "flip_front", "flip_back", "flip_left", "flip_right", "land", "nothing"]
num_of_class_samples = {label: 0 for label in labels_names}
labels_num = len(labels_names)

In [16]:
for idx, label in enumerate(labels_names):
    token_to_label[idx] = label
    label_to_token[label] = idx

In [17]:
token_to_label

In [18]:
label_to_token

In [19]:
count = 0
all_data = np.zeros((1, 21, 3))
labels = np.array([], dtype=int)
partition = {"test":[], "train":[], "valid":[]}

for dir in [train_dir, valid_dir, test_dir]:
    full_dir_path = os.path.join(root_path, dir)
    print(f"Processing {full_dir_path} direcotry.")
    files = os.listdir(full_dir_path)
    for file in files:
        class_name = file.split('.')[0]
        file_path = os.path.join(full_dir_path, file)
        with open(file_path, "rb") as handle:
            print(f"Openning {file_path} file.")
            data = pickle.load(handle)
            for landmark in data:
                partition[dir].append(count)
                all_data = np.append(all_data, [landmark], axis=0)
                #id_to_label[count] = label_to_token[class_name]
                labels = np.append(labels, label_to_token[class_name])
                num_of_class_samples[class_name] += 1
                count += 1
all_data = all_data[1:]
assert len(all_data) == len(labels)

In [30]:
print("Number of samples:", len(all_data))
print("Number of training samples:", len(partition[train_dir]))
print("Number of validation samples:", len(partition[valid_dir]))
print("Number of test samples:", len(partition[test_dir]))
print("==================")
print("Data distribution")
mylist = [key for key, val in num_of_class_samples.items() for _ in range(val)]
mylist[:] = [x if x != "flip_left" else "flip_l" for x in mylist]
mylist[:] = [x if x != "flip_right" else "flip_r" for x in mylist]
mylist[:] = [x if x != "flip_front" else "flip_f" for x in mylist]
mylist[:] = [x if x != "flip_back" else "flip_b" for x in mylist]
fig = plt.figure(1)
plt.hist(mylist, bins=10, rwidth=0.5)
plt.show()
fig.savefig("data_distribution.png")

# Keras data generator

In [9]:
class DataGenerator(K.utils.all_utils.Sequence):
    def __init__(self, list_IDs, labels, batch_size=32, dim=(32,32,32), n_channels=1, shuffle=True, n_classes=9):
        'Initialization'
        self.dim = dim
        self.batch_size = batch_size
        self.labels = labels
        self.list_IDs = list_IDs
        self.n_channels = n_channels
        self.n_classes = n_classes
        self.shuffle = shuffle
        self.on_epoch_end()
        
    def on_epoch_end(self):
        'Updates indexes after each epoch'
        self.indexes = np.arange(len(self.list_IDs))
        if self.shuffle == True:
            np.random.shuffle(self.indexes)
        
    def __len__(self):
        'Denotes the number of batches per epoch'
        return int(np.floor(len(self.list_IDs) / self.batch_size))
    
    def __data_generation(self, list_IDs_temp):
        'Generates data containing batch_size samples'
        X = np.empty((self.batch_size, *self.dim, self.n_channels))
        y = np.empty((self.batch_size), dtype=int)
        
        X = all_data[list_IDs_temp]
        y = self.labels[list_IDs_temp]
        
        return X, K.utils.np_utils.to_categorical(y, num_classes=self.n_classes)

    def __getitem__(self, index):
        'Generate one batch of data'
        indexes = self.indexes[index*self.batch_size:(index+1)*self.batch_size]

        list_IDs_temp = [self.list_IDs[k] for k in indexes]

        X, y = self.__data_generation(list_IDs_temp)

        return X, y

In [10]:
inp = KL.Input(shape=(21, 3), name='landmarks_in')
x = inp

x = KL.Convolution1D(filters=24, kernel_size=(3), strides=(1), activation='relu')(x)
x = KL.Convolution1D(filters=32, kernel_size=(3), strides=(1), activation='relu')(x)
x = KL.Convolution1D(filters=64, kernel_size=(3), strides=(1), activation='relu')(x)
x = KL.Convolution1D(filters=64, kernel_size=(2), strides=(1), activation='relu')(x)
x = KL.Convolution1D(filters=64, kernel_size=(2), strides=(1), activation='relu')(x)

x = KL.Flatten(name='flattened')(x)
x = KL.Dense(units=100, activation='linear')(x)
x = KL.Dropout(rate=.2)(x)
x = KL.Dense(units=50, activation='relu')(x)
x = KL.Dropout(rate=.2)(x)

clas = KL.Dense(units=labels_num, activation='softmax', name='loss')(x)

model = K.Model(inputs=[inp], outputs=[clas])

with tf.device(device):
    model.compile(optimizer='adam',
                  loss={'loss': 'categorical_crossentropy'},
                  metrics=['categorical_accuracy'])

In [11]:
callbacks = [
        K.callbacks.ModelCheckpoint(save_dir, save_best_only=True),
        K.callbacks.EarlyStopping(monitor='val_loss',
                                  min_delta=.0005,
                                  patience=20,
                                  verbose=True,
                                  mode='auto'),
        tfa.callbacks.TQDMProgressBar(),
        K.callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.2,
                              patience=5, min_lr=0.001),
        K.callbacks.TensorBoard(log_dir='./logs', profile_batch=(0, 10))


    ]
params = {'dim': (21, 3),
          'batch_size': 64,
          'n_channels': 0,
          'n_classes': labels_num}


with tf.device(device):
    training_generator = DataGenerator(partition['train'], labels, **params, shuffle=True)
    validation_generator = DataGenerator(partition['valid'], labels, **params, shuffle=False)
    test_generator = DataGenerator(partition['test'], labels, **params)

In [12]:
from tensorflow.keras.utils import plot_model
plot_model(model, show_shapes=True)

In [13]:
with tf.device(device):
    hist = model.fit(training_generator,
                           validation_data=validation_generator,
                           use_multiprocessing=True,
                           workers=6,
                           callbacks=callbacks,
                           epochs=100)

In [28]:
loss = plt.figure(1)
plt.plot(hist.history['loss'])
plt.plot(hist.history['val_loss'])
plt.legend(['training', 'validation'])
plt.title('loss')
plt.xlabel('epoch')
acc = plt.figure(2)
plt.plot(hist.history['categorical_accuracy'])
plt.plot(hist.history['val_categorical_accuracy'])
plt.legend(['training', 'validation'])
plt.title('categorical accuracy')
plt.xlabel('epoch')
plt.show()

In [29]:
loss.savefig("static_gesture_loss.png")
acc.savefig("static_gesture_accuracy.png")

In [15]:
score = model.evaluate(test_generator, verbose=0)
print("Test score:", score[0])
print("Test accuracy:", score[1])

In [16]:
converter = tf.lite.TFLiteConverter.from_keras_model(model)
tflite_model = converter.convert()

with open(tflite_model_name + '.tflite', 'wb') as f:
    f.write(tflite_model)

In [17]:
interpreter = tf.lite.Interpreter(model_path=tflite_model_name+'.tflite')

input_details = interpreter.get_input_details()
output_details = interpreter.get_output_details()

interpreter.allocate_tensors()

In [18]:
input_details

In [21]:
interpreter.set_tensor(input_details[0]['index'], [all_data[partition['test'][300]].astype(np.float32)])
    
interpreter.invoke()

predict = interpreter.get_tensor(output_details[0]['index'])
print(np.argmax(predict))
print(labels[partition['test'][300]])