<a href="https://colab.research.google.com/github/atick-faisal/Hand-Gesture-Recognition/blob/main/Deep-Learning-Analysis/Dynamic_Hand_Gestures_DL_v1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import os
import time
import joblib
import shutil
import tarfile
import requests

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from scipy.signal import butter, lfilter

In [2]:

DATASET_ID      = '1p0CSRb9gax0sKqdyzOYVt-BXvZ4GtrBv'

# -------------BASE DIR (MODIFY THIS TO YOUR NEED) ------------ #
BASE_DIR        = '../'
# BASE_DIR     = '/content/drive/MyDrive/Research/Hand Gesture/GitHub/'

DATA_DIR        = 'Sensor-Data/'
CHANNELS_DIR    = 'Channels/'
FEATURES_DIR    = 'Features/'
FIGURE_DIR      = 'Figures/'
LOG_DIR         = 'Logs/'

USERS           = ['001', '002', '003', '004', '005', '006', '007', '008', '009',
                   '010', '011', '012', '013', '014', '015', '016', '017', '018',
                   '019', '020', '021', '022', '023', '024', '025']
GESTURES        = ['j', 'z', 'bad', 'deaf', 'fine', 'good', 'goodbye', 'hello', 'hungry',
                   'me', 'no', 'please', 'sorry', 'thankyou', 'yes', 'you']

WINDOW_LEN      = 150

# ------------- FOR THE GREATER GOOD :) ------------- #
DATASET_LEN     = 1120
TRAIN_LEN       = 960
TEST_LEN        = 160

TEST_USER       = '001'
EPOCHS          = 5

CHANNELS_GROUP  = 'DYNAMIC_ACC_ONLY_'
CUT_OFF         = 3.0
ORDER           = 4
FS              = 100

CONFIG          = CHANNELS_GROUP + 'CUT_OFF_' + str(CUT_OFF) + '_ORDER_' + str(ORDER) + '\n'

In [3]:

#--------------------- Download util for Google Drive ------------------- #

def download_file_from_google_drive(id, destination):
    URL = "https://docs.google.com/uc?export=download"

    session = requests.Session()

    response = session.get(URL, params = { 'id' : id }, stream = True)
    token = get_confirm_token(response)

    if token:
        params = { 'id' : id, 'confirm' : token }
        response = session.get(URL, params = params, stream = True)
        
    save_response_content(response, destination)    

def get_confirm_token(response):
    for key, value in response.cookies.items():
        if key.startswith('download_warning'):
            return value
        
    return None

def save_response_content(response, destination):
    CHUNK_SIZE = 32768

    with open(destination, "wb") as f:
        for chunk in response.iter_content(CHUNK_SIZE):
            if chunk:
                f.write(chunk)

def download_data(fid, destination):
    print('cleaning already existing files ... ', end='')
    try:
        shutil.rmtree(destination)
        print('√')
    except:
        print('✕')
        
    print('creating data directory ... ', end='')
    os.mkdir(destination)
    print('√')
    
    print('downloading dataset from the repository ... ', end='')
    filename = os.path.join(destination, 'dataset.tar.xz')
    try:
        download_file_from_google_drive(fid, filename)
        print('√')
    except:
        print('✕')
        
    print('extracting the dataset ... ', end='')
    try:
        tar = tarfile.open(filename)
        tar.extractall(destination)
        tar.close()
        print('√')
    except:
        print('✕')

In [4]:

# ------- Comment This if already downloaded -------- #

# destination = os.path.join(BASE_DIR, DATA_DIR)
# download_data(DATASET_ID, destination)

In [5]:
class LowPassFilter(object): 
    def butter_lowpass(cutoff, fs, order):
        nyq = 0.5 * fs
        normal_cutoff = cutoff / nyq
        b, a = butter(order, normal_cutoff, btype='low', analog=False)
        return b, a

    def apply(data, cutoff=CUT_OFF, fs=FS, order=ORDER):
        b, a = LowPassFilter.butter_lowpass(cutoff, fs, order=order)
        y = lfilter(b, a, data)
        return y

In [6]:
def clean_dir(path):
    print('cleaning already existing files ... ', end='')
    try:
        shutil.rmtree(path)
        print('√')
    except:
        print('✕')
    
    print('creating ' + path + ' directory ... ', end='')
    os.mkdir(path)
    print('√')

def extract_channels():
    channels_dir = os.path.join(BASE_DIR, CHANNELS_DIR)
    clean_dir(channels_dir)
        
    for user in USERS:
        print('Processing data for user ' + user, end=' ')
        
        X = []
        y = []
        first_time = True
        
        for gesture in GESTURES:
              
            user_dir = os.path.join(BASE_DIR, DATA_DIR, user)
            gesture_dir = os.path.join(user_dir, gesture + '.csv')

            dataset = pd.read_csv(gesture_dir)
            
            flx1 = dataset['flex_1'].to_numpy().reshape(-1, WINDOW_LEN)
            flx2 = dataset['flex_2'].to_numpy().reshape(-1, WINDOW_LEN)
            flx3 = dataset['flex_3'].to_numpy().reshape(-1, WINDOW_LEN)
            flx4 = dataset['flex_4'].to_numpy().reshape(-1, WINDOW_LEN)
            flx5 = dataset['flex_5'].to_numpy().reshape(-1, WINDOW_LEN)
            
            accx = dataset['ACCx'].to_numpy()
            accy = dataset['ACCy'].to_numpy()
            accz = dataset['ACCz'].to_numpy()
            
            accx = LowPassFilter.apply(accx).reshape(-1, WINDOW_LEN)
            accy = LowPassFilter.apply(accy).reshape(-1, WINDOW_LEN)
            accz = LowPassFilter.apply(accz).reshape(-1, WINDOW_LEN)
            
            gyrx = dataset['GYRx'].to_numpy()
            gyry = dataset['GYRy'].to_numpy()
            gyrz = dataset['GYRz'].to_numpy()
            
            gyrx = LowPassFilter.apply(gyrx).reshape(-1, WINDOW_LEN)
            gyry = LowPassFilter.apply(gyry).reshape(-1, WINDOW_LEN)
            gyrz = LowPassFilter.apply(gyrz).reshape(-1, WINDOW_LEN)
            
            accm = np.sqrt(accx ** 2 + accy ** 2 + accz ** 2)
            gyrm = np.sqrt(gyrx ** 2 + gyry ** 2 + gyrz ** 2)
            
            g_idx = GESTURES.index(gesture)
            labels = np.ones((accx.shape[0], 1)) * g_idx
            
            channels = np.stack([
                accx, accy, accz
            ], axis=-1)
            
            if first_time == True:
                X = channels
                y = labels
                first_time = False
            else:
                X = np.append(X, channels, axis=0)
                y = np.append(y, labels, axis=0)
            
        
        x_path = os.path.join(BASE_DIR, CHANNELS_DIR, CHANNELS_GROUP + user + '_X.joblib')
        y_path = os.path.join(BASE_DIR, CHANNELS_DIR, CHANNELS_GROUP + user + '_y.joblib')
        joblib.dump(X, x_path)
        joblib.dump(y, y_path)
        
        print('√')
        

In [7]:
extract_channels()

cleaning already existing files ... ✕
creating ../Channels/ directory ... √
Processing data for user 001 √
Processing data for user 002 √
Processing data for user 003 √
Processing data for user 004 √
Processing data for user 005 √
Processing data for user 006 √
Processing data for user 007 √
Processing data for user 008 √
Processing data for user 009 √
Processing data for user 010 √
Processing data for user 011 √
Processing data for user 012 √
Processing data for user 013 √
Processing data for user 014 √
Processing data for user 015 √
Processing data for user 016 √
Processing data for user 017 √
Processing data for user 018 √
Processing data for user 019 √
Processing data for user 020 √
Processing data for user 021 √
Processing data for user 022 √
Processing data for user 023 √
Processing data for user 024 √
Processing data for user 025 √


In [15]:

ACC = []
logs = ''

for test_user in USERS:
    print('Processing results for user ' + test_user, end='... ')
    
    X_train = []
    X_test = []
    y_train = []
    y_test = []
    
    first_time_train = True
    first_time_test = True

    for user in USERS:
        x_path = os.path.join(BASE_DIR, CHANNELS_DIR, CHANNELS_GROUP + user + '_X.joblib')
        y_path = os.path.join(BASE_DIR, CHANNELS_DIR, CHANNELS_GROUP + user + '_y.joblib')
        X = joblib.load(x_path)
        y = joblib.load(y_path)

        if user == test_user:
            if first_time_train == True:
                first_time_train = False
                X_test = X
                y_test = y
                
            else:
                X_test = np.append(X_test, X, axis=0)
                y_test = np.append(y_test, y, axis=0)
                
        else:
            if first_time_test == True:
                first_time_test = False
                X_train = X
                y_train = y
                
            else:
                X_train = np.append(X_train, X, axis=0)
                y_train = np.append(y_train, y, axis=0)


#     X_train, y_train = shuffle(X_train, y_train)

    print(X_train.shape, y_train.shape)

#     avg_acc = (acc / EPOCHS) * 100
#     print(f'%.2f %%' %(avg_acc))
    
#     logs = logs + 'Average accuracy for user ' + str(test_user) + '... ' + str(avg_acc) + '\n'

#     ACC.append(avg_acc)
    
# AVG_ACC = np.mean(ACC)
# STD = np.std(ACC)
# print('------------------------------------')
# print(f'Average accuracy %.2f +/- %.2f' %(AVG_ACC, STD))

Processing results for user 001... (3840, 150, 3) (3840, 1)
Processing results for user 002... (3840, 150, 3) (3840, 1)
Processing results for user 003... (3840, 150, 3) (3840, 1)
Processing results for user 004... (3840, 150, 3) (3840, 1)
Processing results for user 005... (3840, 150, 3) (3840, 1)
Processing results for user 006... (3840, 150, 3) (3840, 1)
Processing results for user 007... (3840, 150, 3) (3840, 1)
Processing results for user 008... (3840, 150, 3) (3840, 1)
Processing results for user 009... (3840, 150, 3) (3840, 1)
Processing results for user 010... (3840, 150, 3) (3840, 1)
Processing results for user 011... (3840, 150, 3) (3840, 1)
Processing results for user 012... (3840, 150, 3) (3840, 1)
Processing results for user 013... (3840, 150, 3) (3840, 1)
Processing results for user 014... (3840, 150, 3) (3840, 1)
Processing results for user 015... (3840, 150, 3) (3840, 1)
Processing results for user 016... (3840, 150, 3) (3840, 1)
Processing results for user 017... (3840