In [1]:
import os
import pandas as pd
from scipy.io import loadmat
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
import sklearn
import typing as tp
import h5py
import re
from scipy.io import loadmat
import csv
import xlrd
import warnings

In [31]:
kimore_h5_path: str = os.path.join('..', 'data', 'SemesterProject', 'KiMoRe', 'KiMoRe.h5')

with h5py.File(kimore_h5_path, 'r') as f:
    print(f.keys())

<KeysViewHDF5 ['B_ID1', 'B_ID2', 'B_ID3', 'B_ID4', 'B_ID5', 'B_ID6', 'B_ID7', 'B_ID8', 'E_ID1', 'E_ID10', 'E_ID11', 'E_ID12', 'E_ID13', 'E_ID14', 'E_ID15', 'E_ID16', 'E_ID17', 'E_ID2', 'E_ID3', 'E_ID4', 'E_ID5', 'E_ID6', 'E_ID7', 'E_ID8', 'E_ID9', 'NE_ID1', 'NE_ID10', 'NE_ID11', 'NE_ID12', 'NE_ID13', 'NE_ID14', 'NE_ID15', 'NE_ID16', 'NE_ID17', 'NE_ID18', 'NE_ID19', 'NE_ID2', 'NE_ID20', 'NE_ID21', 'NE_ID22', 'NE_ID23', 'NE_ID24', 'NE_ID25', 'NE_ID26', 'NE_ID27', 'NE_ID3', 'NE_ID4', 'NE_ID5', 'NE_ID6', 'NE_ID7', 'NE_ID8', 'NE_ID9', 'P_ID1', 'P_ID10', 'P_ID11', 'P_ID12', 'P_ID13', 'P_ID14', 'P_ID15', 'P_ID16', 'P_ID2', 'P_ID3', 'P_ID4', 'P_ID5', 'P_ID6', 'P_ID7', 'P_ID8', 'P_ID9', 'S_ID1', 'S_ID10', 'S_ID2', 'S_ID3', 'S_ID4', 'S_ID5', 'S_ID6', 'S_ID7', 'S_ID8', 'S_ID9']>


In [15]:
kinect_joints = ["spinebase", "spinemid", "neck", "head", "shoulderleft", "elbowleft", "wristleft", "handleft", "shoulderright", "elbowright", "wristright", "handright", "hipleft", "kneeleft", "ankleleft", "footleft", "hipright", "kneeright", "ankleright", "footright", "spineshoulder", "handtipleft", "thumbleft", "handtipright", "thumbright"]

# Taken from: https://github.com/mattfast/kimore-model/blob/master/src/load_kimore.py

def load_kimore_data() -> pd.DataFrame:
	data_frames: tp.List[pd.DataFrame] = []

	base_path: str = os.path.join('..', 'data', 'SemesterProject', 'KiMoRe')

	for root, dirs, files in os.walk(base_path):
		if 'Raw' in dirs:
			# root should be equal base_path/PatientGroup/Patient_type/Patient_id/Es#
			exercise_num: int = int(root[-1])

			# ../data/SemesterProject/KiMoRe/PainPostureDisorderGroup/BackPain/B_ID4/Es5...

			start_of_patient_id: int = root[:-4].rfind('/') + 1
			patient_id: str = root[start_of_patient_id:-4]

			base_dict: tp.Dict[str, tp.Union[int, str, float, np.ndarray]] = {'exercise_num': exercise_num, 'patient_id': patient_id}

			current_positions: tp.List[tp.Dict[str, tp.Union[int, str, float, np.ndarray]]] = []
			current_orientations: tp.List[tp.Dict[str, tp.Union[int, str, float, np.ndarray]]] = []
			current_timestamps: tp.List[tp.Dict[str, tp.Union[int, str, float, np.ndarray]]]  = []

			print(f'Working on {root}...')

			raw_file_path: str = os.path.join(root, 'Raw')

			chosen_format: tp.Optional[str] = None

			try:
				for raw_filename in os.listdir(raw_file_path):
					if not raw_filename.endswith('.csv'):
						# Could be an AVI Depth file so skip it.
						continue
					full_raw_path: str = os.path.join(raw_file_path, raw_filename)

					is_joint_orientation: bool = raw_filename.startswith('JointOrientation')
					is_joint_position: bool = raw_filename.startswith('JointPosition')
					is_timestamps: bool = raw_filename.startswith('TimeStamp')

					if is_joint_position:
						start_of_format: int = len('JointPosition')
					elif is_joint_orientation:
						start_of_format: int = len('JointOrientation')
					elif is_timestamps:
						start_of_format: int = len('TimeStamp')
					else:
						# Skip if none of the flags are set.
						continue

					current_format: str = raw_filename[start_of_format:][:13 + 1]

					if chosen_format is None:
						chosen_format = current_format
					elif chosen_format != current_format:
						# If they don't have the same data collection time,
						# it's useless and should be ignored.
						continue

					if is_joint_position:
						col_prefix: str = 'pos'

						base_columns: tp.List[str] = [f'{col_prefix}_{joint}' for joint in kinect_joints]
						raw_columns: tp.List[tp.List[str]] = [[f'{base}_x', f'{base}_y', f'{base}_z'] for base in
															  base_columns]
						columns: tp.List[str] = [col for rc in raw_columns for col in rc]
					elif is_joint_orientation:
						col_prefix: str = 'ori'

						base_columns: tp.List[str] = [f'{col_prefix}_{joint}' for joint in kinect_joints]
						raw_columns: tp.List[tp.List[str]] = [[f'{base}_x', f'{base}_y', f'{base}_z', f'{base}_w'] for base
															  in base_columns]
						columns: tp.List[str] = [col for rc in raw_columns for col in rc]

					elif is_timestamps:
						columns: tp.List[str] = ['time_ms']

					raw_df: pd.DataFrame = pd.read_csv(full_raw_path, names=columns, index_col=False)

					for i, row in raw_df.iterrows():
						row_dict: tp.Dict[str, float] = row.to_dict()

						new_dict: tp.Dict[str, tp.Union[int, float, str, bool]] = base_dict.copy() if is_timestamps else dict()

						for col_name, col_value in row_dict.items():
							new_dict[col_name] = col_value

						new_dict['row_index'] = i

						if is_timestamps:
							current_timestamps.append(new_dict)
						elif is_joint_position:
							current_positions.append(new_dict)
						elif is_joint_orientation:
							current_orientations.append(new_dict)

				# If there wasn't valid data, skip it.
				if len(current_positions) == 0 or len(current_orientations) == 0 or len(current_timestamps) == 0:
					continue

				positions_df: pd.DataFrame = pd.DataFrame.from_records(current_positions, index='row_index')
				orientations_df: pd.DataFrame = pd.DataFrame.from_records(current_orientations, index='row_index')
				timestamps_df: pd.DataFrame = pd.DataFrame.from_records(current_timestamps, index='row_index')

				timestamps_df['time_ms'] = timestamps_df['time_ms'] - timestamps_df['time_ms'].min()

				current_dataframe: pd.DataFrame = positions_df.join([orientations_df, timestamps_df])

				del orientations_df
				del timestamps_df
				del positions_df

				label_file_path: str = os.path.join(root, 'Label')

				label_files: tp.List[str] = [fn for fn in os.listdir(label_file_path) if fn.endswith('.xlsx')]

				clinical_assessment_filename: str = [fn for fn in label_files if fn.startswith('ClinicalAssessment_')][0]
				supplementary_info_filename: str = [fn for fn in label_files if fn.startswith('SuppInfo_')][0]

				clinical_assessment_df: pd.DataFrame = pd.read_excel(
					os.path.join(label_file_path, clinical_assessment_filename))
				supplementary_info_df: pd.DataFrame = pd.read_excel(
					os.path.join(label_file_path, supplementary_info_filename))

				actually_needed_clinical_columns: tp.List[str] = [col for col in clinical_assessment_df.columns if
																  not col.startswith('clinical ') or col.endswith(
																	  f'Ex#{exercise_num}')]
				new_clinical_columns: tp.Dict[str, str] = {col: col.lower().replace(' ', '_')[:11] for col in
														   actually_needed_clinical_columns}
				clinical_assessment_df = clinical_assessment_df[actually_needed_clinical_columns].rename(
					new_clinical_columns, axis=1)

				new_supp_info_columns: tp.Dict[str, str] = {col: col.lower().replace(' ', '_') for col in
															supplementary_info_df.columns}
				supplementary_info_df: pd.DataFrame = supplementary_info_df.rename(new_supp_info_columns, axis=1)

				for clinical_col in clinical_assessment_df.columns:
					current_dataframe[clinical_col] = clinical_assessment_df[clinical_col].values.item()

				for supp_info_col in supplementary_info_df.columns:
					current_dataframe[supp_info_col] = supplementary_info_df[supp_info_col].values.item()

				del supplementary_info_df
				del clinical_assessment_df

				data_frames.append(current_dataframe)
			except BaseException:
				continue

	output: pd.DataFrame = pd.concat(data_frames, ignore_index=True)

	output['time_ms'] = output['time_ms'] / 10_000
	output['frame_num'] = np.round(30 / 1000 * output['time_ms'])

	output['is_healthy_experienced'] = output['group'].str.lower().isin(['e', 'cg_e'])
	output['is_healthy_inexperienced'] = output['group'].str.lower() == 'ne'
	output['is_low_back_problems'] = output['group'].str.lower() == 'b'
	output['is_parkinsons'] = output['group'].str.lower() == 'p'
	output['is_stroke'] = output['group'].str.lower() == 's'

	new_cols: tp.List[str] = ['is_healthy_experienced', 'is_healthy_inexperienced', 'is_low_back_problems', 'is_parkinsons', 'is_stroke']

	output[new_cols] = output[new_cols].astype(np.float32)

	return output

# def load_kimore_data(path):
#
# 	data = []
# 	for (root, dirs, files) in os.walk(path):
#
# 		# if current directory contains "Raw", extract data
# 		if "Raw" in dirs:
#
# 			new_dict = {}
#
# 			# get exercise number
# 			new_dict["Exercise"] = int(root[-1])
#
# 			print("Working on " + root)
#
# 			# extract raw data
# 			raw_files = os.listdir(os.path.join(root, "Raw"))
# 			for file in raw_files:
# 				if not file.endswith('.csv'):
# 					continue
#
#
# 				file_path = os.path.join(os.path.join(root, "Raw"),file)
# 				csv_reader: pd.DataFrame = pd.read_csv(file_path, encoding_errors='ignore')
#
# 				if file.startswith("JointOrientation"):
#
# 					for joint in kinect_joints:
# 						new_dict[joint + "-o"] = []
#
# 					for row in csv_reader:
# 						for i in range(len(kinect_joints)):
# 							if len(row) > 0:
# 								new_dict[kinect_joints[i] + "-o"].append(row[(4*i):(4*i+4)])
#
# 					orientation_present = True
#
# 				elif file.startswith("JointPosition"):
#
# 					for joint in kinect_joints:
# 						new_dict[joint + "-p"] = []
#
# 					for row in csv_reader:
# 						for i in range(len(kinect_joints)):
# 							if len(row) > 0:
# 								new_dict[kinect_joints[i] + "-p"].append(row[(4*i):(4*i+3)])
#
# 				elif file.startswith("TimeStamp"):
#
# 					new_dict["Timestamps"] = []
# 					for row in csv_reader:
# 						if len(row) > 0:
# 							new_dict["Timestamps"].append(row[0])
#
# 			# verify that all data was collected
# 			if 'spinebase-o' not in new_dict:
# 				continue
#
# 			if 'spinebase-p' not in new_dict:
# 				continue
#
# 			if 'Timestamps' not in new_dict:
# 				continue
#
# 			# extract data labels
# 			label_files = os.listdir(os.path.join(root, "Label"))
# 			for file in label_files:
#
# 				file_path = os.path.join(os.path.join(root, "Label"),file)
#
# 				sheet: pd.DataFrame = pd.read_excel(file_path)
#
# 				titles = sheet.columns
# 				vals = sheet.iloc[0]
#
# 				if file.startswith("SuppInfo"):
# 					for t, v in zip(titles, vals):
# 						new_dict[t] = v
#
# 				elif file.startswith("ClinicalAssessment"):
# 					new_dict["cTS"] = vals[new_dict["Exercise"]]
# 					new_dict["cPO"] = vals[new_dict["Exercise"] + 5]
# 					new_dict["cCF"] = vals[new_dict["Exercise"] + 10]
#
# 			# append exercise to data
# 			data.append(new_dict)
#
# 	return data


In [16]:
with warnings.catch_warnings():
	warnings.simplefilter('ignore')
	df: pd.DataFrame = load_kimore_data()
df

Working on ../data/SemesterProject/KiMoRe/PainPostureDisorderGroup/BackPain/B_ID4/Es1...
Working on ../data/SemesterProject/KiMoRe/PainPostureDisorderGroup/BackPain/B_ID4/Es2...
Working on ../data/SemesterProject/KiMoRe/PainPostureDisorderGroup/BackPain/B_ID4/Es4...
Working on ../data/SemesterProject/KiMoRe/PainPostureDisorderGroup/BackPain/B_ID4/Es3...
Working on ../data/SemesterProject/KiMoRe/PainPostureDisorderGroup/BackPain/B_ID4/Es5...
Working on ../data/SemesterProject/KiMoRe/PainPostureDisorderGroup/BackPain/B_ID7/Es1...
Working on ../data/SemesterProject/KiMoRe/PainPostureDisorderGroup/BackPain/B_ID7/Es2...
Working on ../data/SemesterProject/KiMoRe/PainPostureDisorderGroup/BackPain/B_ID7/Es4...
Working on ../data/SemesterProject/KiMoRe/PainPostureDisorderGroup/BackPain/B_ID7/Es3...
Working on ../data/SemesterProject/KiMoRe/PainPostureDisorderGroup/BackPain/B_ID7/Es5...
Working on ../data/SemesterProject/KiMoRe/PainPostureDisorderGroup/BackPain/B_ID5/Es1...
Working on ../data/Se

Unnamed: 0,pos_spinebase_x,pos_spinebase_y,pos_spinebase_z,pos_spinemid_x,pos_spinemid_y,pos_spinemid_z,pos_neck_x,pos_neck_y,pos_neck_z,pos_head_x,...,clinical_cf,group,age_,gender,frame_num,is_healthy_experienced,is_healthy_inexperienced,is_low_back_problems,is_parkinsons,is_stroke
0,0.148025,-1.146570,2.60229,2.0,0.147799,-0.870216,2.64721,2.0,0.146912,-0.595615,...,25.555907,B,52,F,0.0,0.0,0.0,1.0,0.0,0.0
1,0.148034,-1.146580,2.60229,2.0,0.147791,-0.870219,2.64721,2.0,0.146843,-0.595615,...,25.555907,B,52,F,1.0,0.0,0.0,1.0,0.0,0.0
2,0.147904,-1.146670,2.60234,2.0,0.147691,-0.870267,2.64724,2.0,0.146768,-0.595635,...,25.555907,B,52,F,2.0,0.0,0.0,1.0,0.0,0.0
3,0.147692,-1.146840,2.60271,2.0,0.147566,-0.870364,2.64729,2.0,0.146635,-0.595729,...,25.555907,B,52,F,3.0,0.0,0.0,1.0,0.0,0.0
4,0.146500,-1.147980,2.60381,2.0,0.147056,-0.870540,2.64741,2.0,0.146204,-0.595432,...,25.555907,B,52,F,4.0,0.0,0.0,1.0,0.0,0.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
279084,-0.333194,-0.215199,2.58387,2.0,-0.345806,0.088729,2.48869,2.0,-0.355289,0.384876,...,28.333333,E,22,M,550.0,1.0,0.0,0.0,0.0,0.0
279085,-0.329756,-0.215215,2.58713,2.0,-0.342719,0.090435,2.49272,2.0,-0.352767,0.388246,...,28.333333,E,22,M,551.0,1.0,0.0,0.0,0.0,0.0
279086,-0.324673,-0.215416,2.59172,2.0,-0.334720,0.093020,2.50138,2.0,-0.341581,0.394412,...,28.333333,E,22,M,552.0,1.0,0.0,0.0,0.0,0.0
279087,-0.320218,-0.215014,2.59377,2.0,-0.330699,0.094413,2.50459,2.0,-0.338477,0.396178,...,28.333333,E,22,M,553.0,1.0,0.0,0.0,0.0,0.0


In [10]:
' '.join(df['group'].drop_duplicates().tolist())

'B P S NE E CG_E'

In [7]:
df['frame_num'] = np.round(30 / 1000 * df['time_ms'])

In [17]:
df.to_pickle(os.path.join('..', 'data', 'SemesterProject', 'KiMoRe', 'processed_data.tar.gz'), protocol=-1)

In [3]:
data_types_ld: tp.List[tp.Dict[str, tp.Union[str, int, bool, np.ndarray]]] = []

meta_data_ld: tp.List[tp.Dict[str, tp.Union[str, int, bool]]] = []

name_re_obj: re.Pattern = re.compile(r'(?P<patient_type>B|NE|S|P|E)_ID(?P<patient_num>\d\d?)/(?P<data_type>exercises|metadata)(?:/Es(?P<exercise_id>\d)/joints/(?P<joint>\w+))?', re.IGNORECASE)

def parse_name(name: str) -> tp.Optional[tp.Dict[str, tp.Union[str, int]]]:
    match_result: tp.Optional[re.Match] = name_re_obj.match(name)

    if match_result is None:
        return None

    patient_type: str = match_result['patient_type'].lower()
    patient_number: int = int(match_result['patient_num'])

    match patient_type:
        case 'b':
            full_patient_type: str = 'lower back pain'
        case 'p':
            full_patient_type: str = 'parkinsons'
        case 's':
            full_patient_type: str = 'stroke'
        case 'e':
            full_patient_type: str = 'healthy experienced'
        case 'ne':
            full_patient_type: str = 'healthy inexperienced'
        case _:
            return None

    if match_result['data_type'] == 'metadata':
        last_slash: int = name.rfind('/')
        meta_data: str = name[last_slash + 1:]

        return {'is_meta': True, 'patient_type': full_patient_type, 'patient_num': patient_number, 'data': meta_data}

    if match_result['exercise_id'] is None:
        return None

    exercise_id: int = int(match_result['exercise_id'])

    if exercise_id > 5:
        return None

    joint_name: str = match_result['joint']

    exercise_names: tp.List[str] = ['arm_lift', 'side_tilt', 'trunk_rotation', 'hip_rotation', 'squat']

    exercise_name: str = exercise_names[exercise_id - 1]

    return {'patient_type': full_patient_type, 'patient_num': patient_number, 'exercise_name': exercise_name,  'joint_name': joint_name, 'exercise_id': exercise_id, 'is_meta': False}


def check_if_nan(name: str, item: tp.Union[h5py.Group, h5py.Dataset]) -> tp.Optional[bool]:
    if type(item) == h5py.Dataset:
        parsed_name_dict: tp.Optional[tp.Dict[str, tp.Union[int, str]]] = parse_name(name)

        if parsed_name_dict is None:
            return None

        if parsed_name_dict['is_meta']:
            base_dict: tp.Dict = parsed_name_dict
            base_dict['name'] = name

            data: tp.Any = item[()]

            base_dict['actual_data'] = data

            meta_data_ld.append(base_dict)

        return None


        base_dict: tp.Dict[str, tp.Union[str, int, bool, np.ndarray]] = parsed_name_dict.copy()

        data: np.ndarray = item[()].T
        is_nan: bool = np.any(np.isnan(data))

        base_dict['name'] = name
        base_dict['has_nans'] = is_nan

        # df[['time_ms', 'cameraX', 'cameraY', 'cameraZ', 'confidenceState', 'AbsQuat_X', 'AbsQuat_Y', 'AbsQuat_Z', 'AbsQuat_W']]

        data_types_ld.extend([dict(name=name, has_nans=is_nan, patient_type=base_dict['patient_type'], patient_num=base_dict['patient_num'], exercise_name=base_dict['exercise_name'], joint_name=base_dict['joint_name'], exercise_id=base_dict['exercise_id'], time_ms=d[0], camera_x=d[1], camera_y=d[2], camera_z=d[3], confidence_state=d[4], abs_quat_x=d[5], abs_quat_y=d[6], abs_quat_z=d[7], abs_quat_w=d[8])  for d in data])

        # print(f'There\'s a dataset called {name}. Has nans? {is_nan}')
        # data_types_ld.append(next_dict)
    return None

with h5py.File(kimore_h5_path, 'r') as f:
    f.visititems(check_if_nan)

data_types_df: pd.DataFrame = pd.DataFrame.from_records(data_types_ld)
data_types_df

In [4]:
pd.DataFrame.from_records(meta_data_ld)

Unnamed: 0,is_meta,patient_type,patient_num,data,name,actual_data
0,True,lower back pain,1,age,B_ID1/metadata/age,[[66.0]]
1,True,lower back pain,1,gender,B_ID1/metadata/gender,[[0.0]]
2,True,lower back pain,1,group,B_ID1/metadata/group,[[4.0]]
3,True,lower back pain,2,age,B_ID2/metadata/age,[[52.0]]
4,True,lower back pain,2,gender,B_ID2/metadata/gender,[[1.0]]
...,...,...,...,...,...,...
229,True,stroke,8,gender,S_ID8/metadata/gender,[[1.0]]
230,True,stroke,8,group,S_ID8/metadata/group,[[2.0]]
231,True,stroke,9,age,S_ID9/metadata/age,[[67.0]]
232,True,stroke,9,gender,S_ID9/metadata/gender,[[0.0]]


In [5]:
data_types_df.to_pickle(os.path.join('..', 'data', 'SemesterProject', 'KiMoRe', 'processed_data.tar.gz'), protocol=-1)

In [1]:
pd.read_csv(os.path.join('..', 'data', 'SemesterProject', 'KiMoRe', 'KiMoRe_columnHeaders.csv'))

NameError: name 'pd' is not defined

In [1]:
import numpy as np

In [None]:
df: pd.DataFrame = data_types_df.explode(column='data')
df[['time_ms', 'cameraX', 'cameraY', 'cameraZ', 'confidenceState', 'AbsQuat_X', 'AbsQuat_Y', 'AbsQuat_Z', 'AbsQuat_W']] = df[['data']].apply(lambda g: tuple(g.tolist()), result_type='expand', axis=1)
df.describe()

In [12]:
data_types_df[['rows', 'cols']].describe()

Unnamed: 0,rows,cols
count,10887.0,10887.0
mean,7.984844,650.07734
std,2.816849,364.864005
min,0.0,0.0
25%,9.0,468.0
50%,9.0,623.0
75%,9.0,855.0
max,9.0,2487.0


In [7]:
import numpy as np

a = np.concatenate([[np.ones(5)], [np.zeros(5)]])

[print(aa) for aa in a], a.shape

[1. 1. 1. 1. 1.]
[0. 0. 0. 0. 0.]


([None, None], (2, 5))

In [25]:
[n for n in data_types_df.sample(frac=1, replace=False)['name'].tolist() if 'Es6' in n]

['NE_ID1/exercises/Es6/joints/shoulderright',
 'NE_ID1/exercises/Es6/joints/spineshoulder',
 'NE_ID2/exercises/Es6/joints/hipleft',
 'NE_ID2/exercises/Es6/joints/spinebase',
 'NE_ID1/exercises/Es6/joints/handleft',
 'NE_ID2/exercises/Es6/joints/spineshoulder',
 'NE_ID1/exercises/Es6/joints/kneeright',
 'NE_ID1/exercises/Es6/joints/neck',
 'NE_ID1/exercises/Es6/joints/handtipleft',
 'NE_ID2/exercises/Es6/joints/thumbleft',
 'NE_ID2/exercises/Es6/joints/elbowleft',
 'NE_ID2/exercises/Es6/joints/handleft',
 'NE_ID2/exercises/Es6/joints/hipright',
 'NE_ID2/exercises/Es6/joints/shoulderright',
 'NE_ID2/exercises/Es6/joints/elbowright',
 'NE_ID2/exercises/Es6/joints/ankleright',
 'NE_ID1/exercises/Es6/joints/thumbright',
 'NE_ID2/exercises/Es6/joints/footright',
 'NE_ID1/exercises/Es6/joints/hipright',
 'NE_ID1/exercises/Es6/joints/ankleright',
 'NE_ID1/exercises/Es6/joints/hipleft',
 'NE_ID2/exercises/Es6/joints/kneeleft',
 'NE_ID1/exercises/Es6/joints/elbowleft',
 'NE_ID2/exercises/Es6/joi