In [2]:
%load_ext autoreload
%autoreload 2
import h5py
import numpy as np
import os
import csv
import pandas as pd
from scipy.stats import skew, kurtosis
from sklearn.preprocessing import StandardScaler
import pywt
from sklearn.preprocessing import normalize
from scipy.special import entr
from scipy import signal
from skimage.restoration import denoise_wavelet
from utils.helper import *
import sklearn_relief as relief

ModuleNotFoundError: No module named 'utils.helper'

In [None]:
def standardize_position_name(pos):
    pos = pos.lower()
    if len(pos.split()) > 1:
        pos = pos.split()
        if 'right' in pos[1]:
            pos[1] = 'r'
        elif 'left' in pos[1]:
            pos[1] = 'l'
        else:
            raise ValueError(f"Unknown name format of position {' '.join(pos)}")
        pos = '_'.join(pos)
    return pos

In [None]:
def get_position_id(data, pos):
    """
    Extract position ID from mat data
    """
    position_ids = {}
    for i in range(5):
        ref = data['jumpExp']['sensors']['header'][i][0]
        loc = ''.join([chr(c[0]) for c in data[ref]['position']])
        loc = standardize_position_name(loc)
        position_ids[loc] = i
    if pos not in position_ids:
        raise ValueError(f'{pos} not available for current patient, try one of {list(position_ids)}')
    return position_ids[pos]

In [3]:
def get_measurement_from_sensor(data, measurement, placement_id):
    """
    Extract specific measurement from mat data
    """
    if measurement not in data['jumpExp']['sensors']:
        raise ValueError(f'{measurement} not available for current patient, try one of {list(data["jumpExp"]["sensors"].keys())}')
    ref = data['jumpExp']['sensors'][measurement][placement_id][0]
    meas = np.array(data[ref])
    if measurement in ['press', 'temp']:
        meas = meas.T
    return meas

In [4]:
positions = ['wrist_r', 'wrist_l', 'ankle_r', 'ankle_l', 'chest']
measurements = ['acc','gyro', 'press']
data_root='/datasets/GaitDetection/'

In [44]:
mat_files = [x for x in os.listdir(os.path.join(data_root,'data')) if x[-4:] == '.mat']
for file in mat_files:
    data_fn = os.path.join(data_root, 'data', file)
    matlab_data = h5py.File(data_fn)
    csv_data = {}
    for pos in positions:
        p_id = get_position_id(matlab_data, pos)
        for meas in measurements:
            curr_data = get_measurement_from_sensor(matlab_data, meas, p_id)
            if curr_data.shape[0]==3:
                csv_data[f'{pos}__{meas}_x'] = curr_data[0].squeeze()
                csv_data[f'{pos}__{meas}_y'] = curr_data[1].squeeze()
                csv_data[f'{pos}__{meas}_z'] = curr_data[2].squeeze()
            elif curr_data.shape[0]==1:
                csv_data[f'{pos}__{meas}'] = curr_data.squeeze()
            else:
                raise ValueError()
    df = pd.DataFrame.from_dict(csv_data)
    file_loc = os.path.join(data_root, 'csv_data', f'{file[:-4]}.csv')
    df.to_csv(file_loc)

# Generate example csv

In [6]:
csv_data = {}
for pos in positions:
    for meas in measurements:
        if meas == 'press':
            curr_data = np.random.randn(1,5000)
        else: 
            curr_data = np.random.randn(3,5000)
        if curr_data.shape[0]==3:
            csv_data[f'{pos}__{meas}_x'] = curr_data[0].squeeze()
            csv_data[f'{pos}__{meas}_y'] = curr_data[1].squeeze()
            csv_data[f'{pos}__{meas}_z'] = curr_data[2].squeeze()
        elif curr_data.shape[0]==1:
            csv_data[f'{pos}__{meas}'] = curr_data.squeeze()
        else:
            raise ValueError()
df = pd.DataFrame.from_dict(csv_data)
df.to_csv('example/data.csv')