In [1]:
import os
import pandas as pd
import numpy as np
from scipy.signal import butter, filtfilt
from functools import partial

ModuleNotFoundError: No module named 'pandas'

In [2]:
def joints_filter(data):
    f = 6 # Filter frequency
    fs = 100 # Hz
    low_pass = f/(fs/2)
    b2, a2 = butter(N=2, Wn=low_pass, btype='lowpass')
    columns = data.columns[1:]
    for col in columns:
        data[col] = filtfilt(b2, a2, data[col])
    return data

def load_IK(ik_file):
    """
    ik_file: full extension name
    """
    IK = pd.read_csv(ik_file ,header=8, sep='\t', usecols=[0,10,11,17,18])
    return IK

def load_ID(id_file):
    """
    id_file: file_name/.sto
    """
    ID = pd.read_csv(id_file, header=6, sep='\t', usecols=[0,16,17,18,19])
    ID = ID_col_arranger(ID)
    return ID

def load_time_intervals(events_file):
    """
    From motion file, find the times where you want to do the estimation and save the results in 3 columns
    time, get_left_data, get_right_data.
    get_left_data: boolen column. To make this column find when subject on force plate, and then find the
    first TO before it and first HS after it, this range will be TRUE (use some saftey factor)
    """
    events = pd.read_csv(events_file)
    custom_around_func = partial(np.around, decimals=2)
    events['time'] = events['time'].apply(custom_around_func)
    return events

def merge_joints(IK,ID,events):
    # Merge kinematics and kinetics data
    joints_data = pd.merge(IK, ID, on='time', how='inner')
    # Assert no data loss
    assert len(joints_data)==len(ID)==len(IK)
    # low pass filtering OpenSim data
#     joints_data = joints_filter(joints_data)
    # Merge the columns that tells when to make measurements
    joints_data_with_events = pd.merge(joints_data, events, on='time', how='inner')
    # Assert no data lost
    assert len(joints_data_with_events)==len(joints_data)
    # Reset time to zero to match EMG
    joints_data_with_events = reset_time(joints_data_with_events)
    return joints_data_with_events

def load_features(features_file):
    """
    features_file: .csv file
    """
    features = pd.read_csv(features_file, index_col='time')
    return features

def merge_IO(features, joints_data):
    """
    downsampling is hold while merging by removing points
    """
    Dataset = pd.merge(left=features, right=joints_data, on='time', how='inner')
    assert len(Dataset)==len(features)
    Dataset.set_index("time", inplace=True)
    return Dataset

def ID_col_arranger(ID):
    col = ID.columns
    return ID[[col[0], col[1], col[3], col[2], col[4]]]

def reset_time(data):
    start_time = data['time'].min()
    data['time'] = data['time'].apply(lambda x:x-start_time)
    data['time'] = np.around(data['time'], 2)
    return data

In [3]:
def get_dataset(subject=None):
    if subject==None:
        subject = input("Please write subject number in a format XX: ")
    
    files = [f'S{subject}_test', f'S{subject}_train_01', f'S{subject}_train_02', f'S{subject}_val']
    settings = pd.read_csv(f"../settings/dataset_settings/S{subject}_dataset_settings.csv", header=None)
    
    ik_path = settings.iloc[0,1]
    IK_files = list(map(lambda x: f"{ik_path}{x}_IK.mot", files))

    id_path = settings.iloc[1,1]
    ID_files = list(map(lambda x: f"{id_path}{x}/inverse_dynamics.sto", files))
    
    events_path = settings.iloc[2,1]
    events_files = list(map(lambda x: f"{events_path}{x}_events.csv", files))
    
    features_path = settings.iloc[3,1]
    Features_files = list(map(lambda x: f"{features_path}{x}_features.csv", files))

    output_folder = settings.iloc[4,1]
    output_files = list(map(lambda x: f"{output_folder}{x}_dataset.csv", files))
    
    for ik_file, id_file, events_file, features_file, output_name\
        in zip(IK_files,ID_files, events_files, Features_files, output_files):
        
        IK = load_IK(ik_file)
        ID = load_ID(id_file)
        events = load_time_intervals(events_file)
        features = load_features(features_file)
        joints_data = merge_joints(IK,ID, events)
        Dataset = merge_IO(features, joints_data)
        Dataset.loc[Dataset['L_side_select']==False,
                    ['knee_angle_l_moment','ankle_angle_l_moment']]=np.nan
        
        Dataset.loc[Dataset['R_side_select']==False,
                    ['knee_angle_r_moment','ankle_angle_r_moment']]=np.nan
        
        Dataset.drop(columns=['L_side_select', 'R_side_select'], inplace=True)
        Dataset.to_csv(output_name)
    print("DONE!")

In [4]:
get_dataset()

Please write subject number in a format XX:  02


DONE!
