In [None]:
import pandas as pd
import numpy as np
import os
import glob
import json
import pickle
from joblib import dump, load
from sklearn.preprocessing import StandardScaler, MinMaxScaler
import matplotlib.pyplot as plt
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
from sklearn.ensemble import RandomForestRegressor
from collections import defaultdict

## Configuration

In [None]:
# Total number of experiments
N_EXPERIMENTS = 63
# Directory where the derived data is stored
DERIVED_DATA_DIR = '../../../data'

experiments_dirs_path = glob.glob(DERIVED_DATA_DIR + '/*/*')
assert(len(experiments_dirs_path) == N_EXPERIMENTS)

# Path where the results are stored
RESULTS_PATH = '../../../results'



In [None]:
# Get paths from experiments without skin
experiments_dirs_path_filter_wos = []

exp_params = {}
for exp_path in experiments_dirs_path:
    with open(exp_path + '/parameters.json') as f:
        exp_params[exp_path] = json.load(f)
        
    if int(exp_params[exp_path]['SkinConfig']) == 0 or exp_params[exp_path]['SkinConfig'] == 'NaN':
        experiments_dirs_path_filter_wos.append(exp_path)
        
print('Final number of experiments:', len(experiments_dirs_path_filter_wos))

In [None]:
# Get paths from experiments that pass certain filter
experiments_dirs_path_filter = []

exp_params = {}
for exp_path in experiments_dirs_path:
    with open(exp_path + '/parameters.json') as f:
        exp_params[exp_path] = json.load(f)
        
    if int(exp_params[exp_path]['SkinConfig']) in [3]:
        experiments_dirs_path_filter.append(exp_path)
        
print('Final number of experiments:', len(experiments_dirs_path_filter))

## Skin-like material evaluation

In [None]:
# Evaluate skin-like materials experiments over baseline models
RESULTS = {}

FORCE_CELLS_PER_JOINT = {
    'Hip': [5, 6],
    'Knee': [3, 4, 7, 8],
    'Ankle': [1, 2]
}

DATA_ID = '0011_09082021'

for JOINT in ['Hip', 'Knee', 'Ankle']:
    RESULTS[JOINT] = {}
    
    CELLS = FORCE_CELLS_PER_JOINT[JOINT]

    H3_LEG = 'L' # L|R

    features = ['L{}Pos'.format(JOINT)] + ['F' + str(i) + 'z' for i in FORCE_CELLS_PER_JOINT[JOINT]]
    targets = ['F' + str(i) + ax for i in FORCE_CELLS_PER_JOINT[JOINT] for ax in ['x', 'y']]

    print('Number of features: {}'.format(len(features)))
    print('Selected features: {}'.format(features))
    print('\n')
    print('Number of targets: {}'.format(len(targets)))
    print('Selected targets: {}'.format(targets))
    
    # Index to crop the data and use only this section of each experiment (start idx, end idx)
    # The indexes can be defined manually defining crop_by_index=(start idx, end idx) or seleted at random setting crop_by_index=True
    window_size = 200
    crop_by_index = False #(1500, 1700) # True

    # Sample the experiment data to use only this sample datapoints
    random_sample = True
    random_sample_pct = 0.05

    targets_dict_train = {}
    features_dict_train = {}
    for i, exp_path in enumerate(experiments_dirs_path_filter_wos):
        print('{} - Experiment {} from {}'.format(i, exp_path.split('/')[-1], exp_path.split('/')[-2]))

        # Load targets
        targets_df = pd.read_csv(exp_path + '/force_cells_processed.csv')

        # Load features
        exo_df = pd.read_csv(exp_path + '/H3_processed.csv')
        # leg_df = pd.read_csv(exp_path + '/leg_processed.csv')
        # features_df = pd.concat([exo_df, leg_df], axis=1)
        features_df = exo_df

        idx_aux = targets_df.duplicated(keep='first')
        targets_df = targets_df.loc[~idx_aux]
        features_df = features_df.loc[~idx_aux]
        print('Droping {} duplicated data points'.format(len(idx_aux[idx_aux == False])))

        # Drop first row to remove noise in the start of the data recording
        targets_df = targets_df.iloc[1:]
        features_df = features_df.iloc[1:]
        # Drop null values
        idx = features_df.notna().all(axis=1)
        features_df = features_df.loc[idx]
        targets_df = targets_df.loc[idx]
        print('Droping {} data points by null features'.format(len(idx[idx == False])))

        assert(len(features_df) == len(targets_df))
        data_df = pd.concat([features_df, targets_df], axis=1)

        # Crop the data by the indicated indexes
        if crop_by_index:
            if crop_by_index == True:
                start_idx = random.randint(100, len(data_df) - window_size - 100)
                crop_by_index = (start_idx, start_idx + window_size)

            data_df = data_df.iloc[crop_by_index[0]:crop_by_index[1]]

        if random_sample:
            data_df = data_df.sample(frac=random_sample_pct, random_state=0)

        # Store the final array
        targets_dict_train[i] = data_df[targets].values
        features_dict_train[i] = data_df[features].values

    targets_dict_test = {}
    features_dict_test = {}
    for i, exp_path in enumerate(experiments_dirs_path_filter):
        print('{} - Experiment {} from {}'.format(i, exp_path.split('/')[-1], exp_path.split('/')[-2]))

        # Load targets
        targets_df = pd.read_csv(exp_path + '/force_cells_processed.csv')

        # Load features
        exo_df = pd.read_csv(exp_path + '/H3_processed.csv')
        # leg_df = pd.read_csv(exp_path + '/leg_processed.csv')
        # features_df = pd.concat([exo_df, leg_df], axis=1)
        features_df = exo_df

        idx_aux = targets_df.duplicated(keep='first')
        targets_df = targets_df.loc[~idx_aux]
        features_df = features_df.loc[~idx_aux]
        print('Droping {} duplicated data points'.format(len(idx_aux[idx_aux == False])))

        # Drop first row to remove noise in the start of the data recording
        targets_df = targets_df.iloc[1:]
        features_df = features_df.iloc[1:]
        # Drop null values
        idx = features_df.notna().all(axis=1)
        features_df = features_df.loc[idx]
        targets_df = targets_df.loc[idx]
        print('Droping {} data points by null features'.format(len(idx[idx == False])))

        assert(len(features_df) == len(targets_df))
        data_df = pd.concat([features_df, targets_df], axis=1)

        # Crop the data by the indicated indexes
        if crop_by_index:
            if crop_by_index == True:
                start_idx = random.randint(100, len(data_df) - window_size - 100)
                crop_by_index = (start_idx, start_idx + window_size)

            data_df = data_df.iloc[crop_by_index[0]:crop_by_index[1]]

        if random_sample:
            data_df = data_df.sample(frac=random_sample_pct, random_state=0)

        # Store the final array
        targets_dict_test[i] = data_df[targets].values
        features_dict_test[i] = data_df[features].values

    X_train = np.concatenate([v for k, v in features_dict_train.items()], axis=0)
    Y_train = np.concatenate([v for k, v in targets_dict_train.items()], axis=0)
    
    X_test = np.concatenate([v for k, v in features_dict_test.items()], axis=0)
    Y_test = np.concatenate([v for k, v in targets_dict_test.items()], axis=0)

    s = MinMaxScaler().fit(X_train)

    X_train_norm = s.transform(X_train)
    X_test_norm = s.transform(X_test)

    for m in ['RF', 'XGB', 'SVM', 'KNN']:
        RESULTS[JOINT][m] = {}
        
        HS_DATE = {'RF':'11082021',  'XGB': '10082021', 'SVM': '24082021', 'KNN': '24082021'}
        

        if m == 'XGB':
            results = defaultdict(list)
            for target in range(Y_train.shape[1]):
                model = load(os.path.join(RESULTS_PATH, DATA_ID, '{}_{}_{}'.format(JOINT, m, HS_DATE[m]), '{}_{}_best_model_{}_{}_{}.joblib'.format(JOINT, m, target, HS_DATE[m], DATA_ID)))

                dtest = xgb.DMatrix(data=X_test, label=Y_test[:, target])

                test_preds = model.predict(dtest)

                results['MAE'].append(mean_absolute_error(Y_test[:, target], test_preds))
                results['MSE'].append(mean_squared_error(Y_test[:, target], test_preds))
                results['R2'].append(r2_score(Y_test[:, target], test_preds))
        else:
            model = load(os.path.join(RESULTS_PATH, DATA_ID, '{}_{}_{}'.format(JOINT, m, HS_DATE[m]), '{}_{}_best_model_{}_{}.joblib'.format(JOINT, m, HS_DATE[m], DATA_ID)))
            
            test_preds = model.predict(X_test_norm)
            
            results = {
                'MAE': mean_absolute_error(Y_test, test_preds, multioutput='raw_values'),
                'MSE': mean_squared_error(Y_test, test_preds, multioutput='raw_values'),
                'R2': r2_score(Y_test, test_preds, multioutput='raw_values')
            }

        for f, force in enumerate(['Fx', 'Fy']):
            RESULTS[JOINT][m][force] = {}
            
            for loss in ['MAE', 'MSE', 'R2']:
                scores = [results[loss][i + f] for i in range(0, len(CELLS) * 2, 2)]

                RESULTS[JOINT][m][force][loss] = {}
                RESULTS[JOINT][m][force][loss]['mean'] = np.mean(scores)
                RESULTS[JOINT][m][force][loss]['std'] = np.std(scores)
                
                       

In [None]:
RESULTS

In [None]:
for JOINT in ['Hip', 'Knee', 'Ankle']:
    for FORCE in ['Fx', 'Fy']:
        scores_mean = [RESULTS[JOINT][model][FORCE]['R2']['mean'] for model in ['RF', 'KNN']]        
        scores_std = [RESULTS[JOINT][model][FORCE]['R2']['std'] for model in ['RF', 'KNN']]
        
        print('{} {}: {:.4f} ± {:.4f}'.format(JOINT, FORCE, np.mean(scores_mean), np.std(scores_std)))