In [39]:
import os
import shutil
import tarfile
import requests
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

In [40]:
DATASET_ID   = '1cAJdvAZDolurN3KCZcYz_YJSMV-aIzWT'

# -------------BASE DIR (MODIFY THIS TO YOUR NEED) ------------ #
BASE_DIR     = '../'

DATA_DIR     = 'Sensor-Data/'
SPG_IMG_DIR  = 'Spectrogram-Images/'
IMG_SIZE     = (4, 4) # INCHES

USERS        = ['001', '002', '003', '004', '005', '006', '007']
# ------------------------------- Only Dynalic Gestures ------------------------------ #
GESTURES     = ['j', 'z', 'bad', 'deaf', 'fine', 'good', 'goodbye', 'hello', 'hungry',
                'me', 'no', 'please', 'sorry', 'thankyou', 'yes', 'you']
AXES         = ['X', 'Y', 'Z']

In [41]:
#--------------------- Download util for Google Drive ------------------- #

def download_file_from_google_drive(id, destination):
    URL = "https://docs.google.com/uc?export=download"

    session = requests.Session()

    response = session.get(URL, params = { 'id' : id }, stream = True)
    token = get_confirm_token(response)

    if token:
        params = { 'id' : id, 'confirm' : token }
        response = session.get(URL, params = params, stream = True)
        
    save_response_content(response, destination)    

def get_confirm_token(response):
    for key, value in response.cookies.items():
        if key.startswith('download_warning'):
            return value
        
    return None

def save_response_content(response, destination):
    CHUNK_SIZE = 32768

    with open(destination, "wb") as f:
        for chunk in response.iter_content(CHUNK_SIZE):
            if chunk:
                f.write(chunk)

def download_data(fid, destination):
    print('cleaning already existing files ... ', end='')
    try:
        shutil.rmtree(destination)
        print('√')
    except:
        print('✕')
        
    print('creating data directory ... ', end='')
    os.mkdir(destination)
    print('√')
    
    print('downloading dataset from the repository ... ', end='')
    filename = os.path.join(destination, 'dataset.tar.xz')
    try:
        download_file_from_google_drive(fid, filename)
        print('√')
    except:
        print('✕')
        
    print('extracting the dataset ... ', end='')
    try:
        tar = tarfile.open(filename)
        tar.extractall(destination)
        tar.close()
        print('√')
    except:
        print('✕')   

In [42]:
# ------- Comment This if already downloaded -------- #

# destination = os.path.join(BASE_DIR, DATA_DIR)
# download_data(DATASET_ID, destination)

In [49]:
# ------------- Spatial Path Image Generation ----------- #

def clean_dir(path):
    print('cleaning already existing files ... ', end='')
    try:
        shutil.rmtree(path)
        print('√')
    except:
        print('✕')
    
    print('creating ' + path + ' directory ... ', end='')
    os.mkdir(path)
    print('√')

def write_spectrogram_image(x, path):
    fig = plt.figure(frameon=False, figsize=(4, 4))
    ax = fig.add_axes([0, 0, 1, 1])
    ax.axis('off')
    psd, _, _, spec_img = plt.specgram(
        x               = x,
        Fs              = 100,
        window          = np.hamming(32),
        NFFT            = 32,
        cmap            = 'jet',
        noverlap        = 16,
        mode            = 'psd',
        scale           = 'dB',
        detrend         = None,
        scale_by_freq   = True,
        interpolation   = 'hamming'      
    )
    plt.yscale('symlog')
    plt.savefig(path)
    plt.close()

def generate_spg_images():
    count = 0
    image_dir = os.path.join(BASE_DIR, SPG_IMG_DIR)
    clean_dir(image_dir)
        
    for axis in AXES:
        print('processing spectrogram images for ' + axis + ' axis ... ', end='')
        axis_dir = os.path.join(image_dir, axis)
        os.mkdir(axis_dir)
        
        for gesture in GESTURES:
            os.mkdir(os.path.join(axis_dir, gesture))
    
            for user in USERS:
                user_dir = os.path.join(BASE_DIR, DATA_DIR, user)
                gesture_dir = os.path.join(user_dir, gesture + '.csv')
                
                accx = pd.read_csv(gesture_dir)['ACCx_world'].to_numpy().reshape(-1, 150)
                accy = pd.read_csv(gesture_dir)['ACCy_world'].to_numpy().reshape(-1, 150)
                accz = pd.read_csv(gesture_dir)['ACCz_world'].to_numpy().reshape(-1, 150)

                for i in range(accx.shape[0]):
                    image_name = 'u' + user + '_g' + '{:0>2d}'.format(GESTURES.index(gesture)) + \
                                 '_s' + '{:0>7d}'.format(count) + '_a' + axis + '.jpg'
                    path = os.path.join(BASE_DIR, SPG_IMG_DIR, axis, gesture, image_name)
                    
                    if axis == 'X':
                        write_spectrogram_image(accx[i, :], path)
                    elif axis == 'Y':
                        write_spectrogram_image(accy[i, :], path)
                    else:
                        write_spectrogram_image(accz[i, :], path)

                    count = count + 1
            
        print('√')

In [50]:
generate_spg_images()

cleaning already existing files ... √
creating ../Spectrogram-Images/ directory ... √
processing spectrogram images for X axis ... √
processing spectrogram images for Y axis ... √
processing spectrogram images for Z axis ... √
