# Motion Gesture Recognition

In [1]:
import os
import math
import gc
import pickle
from tqdm import tqdm
import cv2
import numpy as np
import pandas as pd

In [2]:
# classes label you want to use all labels 
targets_name = pd.read_csv('Labels.csv', header=None)
targets_name.drop([0, 1, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 15, 18, 21, 22, 23, 24, 25, 26], inplace=True)
targets_name = targets_name[0].tolist()
targets_name

['No gesture',
 'Stop Sign',
 'Swiping Left',
 'Swiping Right',
 'Thumb Down',
 'Thumb Up']

In [3]:
# training targets
targets = pd.read_csv('Train.csv', index_col=0).drop(columns=['frames', 'label_id', 'shape', 'format'])

for key, value in targets['label'].value_counts().items():
    if key in targets_name:
        print(value, key)

targets.sort_values('label')
targets = targets.squeeze().to_dict()
targets = {key:val for key, val in targets.items() if val in targets_name}

print('\n\nTotal items for {0} gestures: {1}'.format(len(targets_name), len(targets)))

1844 No gesture
1841 Thumb Up
1821 Stop Sign
1810 Thumb Down
1762 Swiping Left
1730 Swiping Right


Total items for 6 gestures: 10808


In [4]:
# validation targets
targets_validation = pd.read_csv('Validation.csv', index_col=0).drop(columns=['frames', 'label_id', 'shape', 'format'])

for key, value in targets_validation['label'].value_counts().items():
    if key in targets_name:
        print(value, key)

targets_validation = targets_validation.squeeze().to_dict()
targets_validation = {key:val for key, val in targets_validation.items() if val in targets_name}

print('\n\nTotal items for {0} gestures: {1}'.format(len(targets_name), len(targets_validation)))

259 Stop Sign
256 No gesture
250 Thumb Down
247 Swiping Left
238 Thumb Up
231 Swiping Right


Total items for 6 gestures: 1481


## Preprocessing the frames

1. Unify frames to be 30 in each folder.
2. Resize the frames to 64x64 for input.
3. Convert them to grayscale.
4. Convert the list of frames to an np array.

In [5]:
def release_list(a):
    """Function to empty the RAM."""
    del a[:]
    del a
    gc.collect()

In [6]:
hm_frames = 30  # number of frames
def get_unify_frames(path):
    """Unify number of frames for each training.
    
    Args:
        path: path to directory.
    """
    offset = 0

    # pick frames
    frames = os.listdir(path)
    frames_count = len(frames)

    if hm_frames > frames_count:
        # duplicate last frame if video is shorter than necessary
        frames += [frames[-1]] * (hm_frames - frames_count)
    elif hm_frames < frames_count:
        # if there are more frames, then sample starting offset
        frames = frames[0:hm_frames]
    return frames

In [7]:
def resize_frame(frame):
    """Resize frames.
    
    Args:
        frame: image to be resized.
    """
    frame = cv2.imread(frame)
    frame = cv2.resize(frame, (64, 64))
    return frame

In [8]:
# return gray image
def rgb2gray(rgb):
    return np.dot(rgb[...,:3], [0.2989, 0.5870, 0.1140])

In [9]:
# training directories
temp = {}
dirs = []
for key, val in targets.items():
    if val not in temp:
        temp[val] = [key]
    else:
        temp[val].append(key)

    # if len(temp[val]) <= 500:
    dirs.append(str(key))

# validation directories
temp = {}
dirs_cv = []
for key, val in targets_validation.items():
    if val not in temp:
        temp[val] = [key]
    else:
        temp[val].append(key)

    # if len(temp[val]) <= 62:
    dirs_cv.append(str(key))

# dirs = [str(i) for i in targets.keys()]
# dirs_cv = [str(i) for i in targets_validation.keys()]

print(len(dirs))
print(len(dirs_cv))

10808
1481


In [10]:
gc.collect()

96

In [11]:
# Adjust training data
counter_training = 0 # number of training
training_targets = [] # training targets 
new_frames = [] # training data after resize & unify

for directory in tqdm(dirs):
    new_frame = [] # one training
    # Frames in each folder
    frames = get_unify_frames('Train/' + directory)
    if len(frames) == hm_frames: # just to be sure
        for frame in frames:
            frame = resize_frame('Train/' + directory + '/' + frame)
            new_frame.append(rgb2gray(frame))
            if len(new_frame) == 15: # partition each training on two trainings.
                new_frames.append(new_frame) # append each partition to training data
                training_targets.append(targets_name.index(targets[int(directory)]))
                counter_training +=1
                new_frame = []
                gc.collect()


gc.collect()

with open('new-frames.pkl', 'wb') as file:
    pickle.dump(new_frames, file)
release_list(new_frames)

with open('training-targets.pkl', 'wb') as file:
    pickle.dump(training_targets, file)

  1%|▊                                                                                      | 95/10808 [00:19<36:57,  4.83it/s]


KeyboardInterrupt: 

In [None]:
# we do the same for the validation data
counter_validation = 0
cv_targets = []
new_frames_cv = []

for directory in tqdm(dirs_cv):
    new_frame = []
    # Frames in each folder
    frames = get_unify_frames('Validation/' + directory)
    if len(frames)==hm_frames:
        for frame in frames:
            frame = resize_frame('Validation/' + directory + '/' + frame)
            new_frame.append(rgb2gray(frame))
            if len(new_frame) == 15:
                new_frames_cv.append(new_frame)
                cv_targets.append(targets_name.index(targets_validation[int(directory)]))
                counter_validation +=1
                new_frame = []
print(counter_validation)

gc.collect()

with open('cv-new-frames.pkl', 'wb') as file:
    pickle.dump(new_frames_cv, file)
release_list(new_frames_cv)

with open('cv-targets.pkl', 'wb') as file:
    pickle.dump(cv_targets, file)

In [None]:
gc.collect()

In [None]:
counter_training = 4000*2
print(counter_training)
counter_validation = 496*2
print(counter_validation)

In [None]:
# training
# with open('../input/20bnjester/training-targets.pkl', 'rb') as file:
#     training_targets = pickle.load(file)

# with open('../input/20bnjester/new-frames.pkl', 'rb') as file:
#     new_frames = pickle.load(file)

In [None]:
# validation
# with open('../input/20bnjester/cv-targets.pkl', 'rb') as file:
#     cv_targets = pickle.load(file)

# with open('../input/20bnjester/cv-new-frames.pkl', 'rb') as file:
#     new_frames_cv = pickle.load(file)

In [None]:
# convert training data to np float32
training_data = np.array(new_frames[0:counter_training], dtype=np.float32)
training_data.shape

In [None]:
# convert validation data to np float32
cv_data = np.array(new_frames_cv[0:counter_validation], dtype=np.float32)
cv_data.shape

In [None]:
# To check training length
print("Training new frames:", len(training_data))

# To check validation length
print("Validation new frames:", len(cv_data))

In [None]:
gc.collect()

## Normalization

In [None]:
from sklearn.preprocessing import StandardScaler
def normalization(data):
    print('old mean', data.mean())

    scaler = StandardScaler()

    scaled_images  = scaler.fit_transform(data.reshape(-1, 15*64*64))
    print('new mean', scaled_images.mean())
    
    scaled_images  = scaled_images.reshape(-1, 15, 64, 64, 1)    
    print(scaled_images.shape)
    
    return scaled_images

In [None]:
# Normalisation: training
scaled_images = normalization(training_data)

In [None]:
# Normalisation: validation
scaled_images_cv = normalization(cv_data)

## Creating and training the model

In [None]:
gc.collect()

In [None]:
import tensorflow as tf

In [None]:
from keras import Model
from keras.layers import Conv3D, MaxPool3D, ConvLSTM2D, Flatten, Dense

In [None]:
class Conv3DModel(Model):
    def __init__(self):
        super(Conv3DModel, self).__init__()
#         with tpu_strategy.scope():
        # Convolutions
        self.conv1 = Conv3D(32, (3, 3, 3), activation='relu', name="conv1", data_format='channels_last')
        self.pool1 = MaxPool3D(pool_size=(2, 2, 2), data_format='channels_last')
        self.conv2 = Conv3D(64, (3, 3, 3), activation='relu', name="conv1", data_format='channels_last')
        self.pool2 = MaxPool3D(pool_size=(2, 2,2), data_format='channels_last')

        # LSTM & Flatten
        self.convLSTM = ConvLSTM2D(40, (3, 3))
        self.flatten = Flatten(name="flatten")

        # Dense layers
        self.d1 = Dense(128, activation='relu', name="d1")
        self.out = Dense(8, activation='softmax', name="output")

    def call(self, x):
#         with tpu_strategy.scope():
        x = self.conv1(x)
        x = self.pool1(x)
        x = self.conv2(x)
        x = self.pool2(x)
        x = self.convLSTM(x)
        #x = self.pool2(x)
        #x = self.conv3(x)
        #x = self.pool3(x)
        x = self.flatten(x)
        x = self.d1(x)
        return self.out(x)

In [None]:
# with tpu_strategy.scope():
model = Conv3DModel()

In [None]:
# use tensorflow dataset
train_dataset = tf.data.Dataset.from_tensor_slices((scaled_images, training_targets))
cv_dataset = tf.data.Dataset.from_tensor_slices((scaled_images_cv, cv_targets))

In [None]:
model(scaled_images[0:2])

In [None]:
model.summary()

## Metrics

In [None]:
gc.collect()

In [None]:
from keras.losses import SparseCategoricalCrossentropy
from keras.optimizers import Adam
from keras.metrics import Mean, SparseCategoricalAccuracy

In [None]:
loss_fn = SparseCategoricalCrossentropy()
optimizer = Adam()

In [None]:
# Loss
train_loss = Mean(name='train_loss')
valid_loss = Mean(name='valid_loss')
# Accuracy
train_accuracy = SparseCategoricalAccuracy(name='train_accuracy')
valid_accuracy = SparseCategoricalAccuracy(name='valid_accuracy')

In [None]:
@tf.function
def train_step(image, targets):
    with tf.GradientTape() as tape:
        # Make a prediction on all the batch
        predictions = model(image)
        # Get the error/loss on these predictions
        loss = loss_fn(targets, predictions)
    # Compute the gradient which respect to the loss
    grads = tape.gradient(loss, model.trainable_variables)
    # Change the weights of the model
    optimizer.apply_gradients(zip(grads, model.trainable_variables))
    # The metrics are accumulate over time. You don't need to average it yourself.
    train_loss(loss)
    train_accuracy(targets, predictions)

In [None]:
@tf.function
def valid_step(image, targets):
    predictions = model(image)
    t_loss = loss_fn(targets, predictions)
    # Set the metrics for the test
    valid_loss(t_loss)
    valid_accuracy(targets, predictions)

In [None]:
ckpt = tf.train.Checkpoint(step=tf.Variable(1), optimizer=optimizer, model=model)
manager = tf.train.CheckpointManager(ckpt, 'training_checkpoints/tf_ckpts', max_to_keep=10)
ckpt.restore(manager.latest_checkpoint)

In [None]:
epoch = 10
batch_size = 32
b = 0
training_acc = []
validation_acc = []
for epoch in range(epoch):
    # Training set
    for images_batch, targets_batch in train_dataset.batch(batch_size):
        train_step(images_batch, targets_batch)
        template = '\r Batch {}/{}, Loss: {}, Accuracy: {}'
        print(template.format(
            b, len(training_targets), train_loss.result(), 
            train_accuracy.result()*100
        ), end="")
        b += batch_size
    # Validation set
    for images_batch, targets_batch in cv_dataset.batch(batch_size):
        valid_step(images_batch, targets_batch)

    template = '\nEpoch {}, Valid Loss: {}, Valid Accuracy: {}'
    print(template.format(
        epoch+1,
        valid_loss.result(), 
        valid_accuracy.result()*100)
    )
    training_acc.append(float(train_accuracy.result()*100))
    validation_acc.append(float(valid_accuracy.result()*100))
    ckpt.step.assign_add(1)
    save_path = manager.save()
    print("Saved checkpoint for step {}: {}".format(int(ckpt.step), save_path))
    valid_loss.reset_states()
    valid_accuracy.reset_states()
    train_accuracy.reset_states()
    train_loss.reset_states()

In [None]:
print(manager.checkpoints)

In [None]:
# save the model for use in the application
model.save_weights('weights/path_to_my_weights', save_format='tf')

In [None]:
# plote Accuracy / epoch
plt.plot([1,2,3,4,5,6,7,8,9,10],training_acc, '-' )
plt.plot([1,2,3,4,5,6,7,8,9,10],validation_acc, '-' )

plt.ylabel('Accuracy')
plt.xlabel('Epochs')
plt.show()

In [None]:
from tensorflow.keras.utils import plot_model