In [None]:
# Import libraries
import os
import numpy as np
import pandas as pd
from collections import defaultdict
import seaborn as sns
from sklearn.linear_model import LinearRegression, Ridge
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
from sklearn.inspection import permutation_importance
import matplotlib.pyplot as plt
from matplotlib.patches import Patch
from itertools import combinations
import ast
from matplotlib import gridspec
import scipy.stats as stats
from statsmodels.stats.multitest import multipletests

In [None]:
# Helper functions
def load_and_prepare(file, predictors_columns):
    data_path = os.path.join(base_path, file)
    df = pd.read_excel(data_path)
    df['Walking Speed'] = df['Walking Speed'].map(walking_speed_mapping).astype(int)
    df['Accuracy'] = df['Accuracy'].map(accuracy_mapping).astype(int)
    df['Balance'] = df['Balance'].map(balance_mapping).astype(int)
    predictors = df[predictors_columns]
    return predictors, df

# Normalizes outputs to 1
def normalize_to_one(df, columns):
    normalized_df = df.copy()

    for column in columns:
        normalized_df[column] = normalized_df[column].astype(float)

    for index, row in df.iterrows():
        # Calculate the sum of columns for the current row
        row_sum = row[columns].sum()

        # Normalize the specified columns
        for column in columns:
            normalized_df.at[index, column] = row[column] / row_sum

    return normalized_df

# Reads in only the 27 trials from survey info
def read_and_adjust_sheet(filename, sheet_name):

    total_rows = 35
    rows_to_skip = [1, 2]
    nrows = total_rows - len(rows_to_skip) - 6

    # Read file, skip irrelevant trials, set first row as header
    df = pd.read_excel(filename, sheet_name=sheet_name, skiprows=rows_to_skip, nrows=nrows)

    # Reindex the DataFrame from 1 (correct trial numbers)
    df.index = range(1, len(df) + 1)

    return df

# Function to figure out which conditions to hold out for cross val
def match_two_parts(condition_x, condition_y):
    s_x, a_x, b_x = int(condition_x[1]), int(condition_x[3]), int(condition_x[5])
    s_y, a_y, b_y = int(condition_y[1]), int(condition_y[3]), int(condition_y[5])
    return ((s_x == s_y) + (a_x == a_y) + (b_x == b_y)) >= 2

def agg_arrays(arrays):
  return np.concatenate([arr for arr in arrays])

# Normalize predictor string column
def normalize_predictors(predictor_str):
    try:
        return set(ast.literal_eval(predictor_str))
    except:
        return set()
    
def draw_sig_bracket(ax, x1, x2, y, stars, tick=0.03, lw=1.6, pad=0.01, fs=13):
    ax.plot([x1, x1, x2, x2], [y, y + tick, y + tick, y],
            color="black", lw=lw, clip_on=False)
    ax.text((x1 + x2) / 2, y + pad, stars,
            ha="center", va="bottom", fontsize=fs, color="black")

In [None]:
# Setup
base_path = "/Users/jordanfeldman/Desktop/Research/Subject data"
filepath_survey = 'Subjective_Responses.xlsx'
filepath_survey = os.path.join(base_path, "Subjective_Responses.xlsx")
predictors_columns = ['Mean Width Straights (mm)', 'Straights Width Variability (mm)',
    'Mean Length Straights (mm)', 'Straights Length Variability (mm)', 'Average Speed (m/s)','Head Angle (deg)']
targets_columns = ['Balance', 'Foot Placement', 'Walking Speed', 'Metabolics']

# Define mappings
walking_speed_mapping = {'Slow': 0, 'Medium': 1, 'Fast': 2}
accuracy_mapping = {'Low': 0, 'Medium': 1, 'High': 2}
balance_mapping = {'Low': 0, 'Medium': 1, 'High': 2}

feature_summary = []

# Models (only ridge regression, but set up for easy addition)
model_dict = {
    "Ridge Regression": lambda: Ridge(alpha=1.0, random_state=0)
}

bmh_files = [
    'data_BMH01.xlsx', 'data_BMH02.xlsx', 'data_BMH21.xlsx','data_BMH06.xlsx',
    'data_BMH07.xlsx', 'data_BMH08.xlsx', 'data_BMH09.xlsx',
    'data_BMH10.xlsx', 'data_BMH13.xlsx', 'data_BMH19.xlsx',
    'data_BMH20.xlsx', 'data_BMH17.xlsx'
]

unique_conditions = [f's{x}a{y}b{z}' for x in range(3) for y in range(3) for z in range(3)]
model_types = ['subject_agnostic', 'subject_specific']

# Loop over model types and models
all_results = {mt: {m: [] for m in model_dict} for mt in model_types}
subject_specific_importances = {m: defaultdict(lambda: defaultdict(list)) for m in model_dict}
subject_specific_coeffs = {m: defaultdict(lambda: defaultdict(list)) for m in model_dict}


In [None]:
# Run models
for model_type in model_types:
    print(f"\n Processing {model_type} \n")
    scaler = StandardScaler()  # Will be re-fit for each split

    if model_type == 'subject_agnostic':
        print(model_type)
        for test_file in bmh_files:
            for test_condition in unique_conditions:
                combined_data = pd.DataFrame()
                combined_targets = pd.DataFrame()
                for bmh_file in bmh_files:
                    data_path = os.path.join(base_path, bmh_file)
                    df_data = pd.read_excel(data_path)
                    df_data['Walking Speed'] = df_data['Walking Speed'].map(walking_speed_mapping).astype(int)
                    df_data['Accuracy'] = df_data['Accuracy'].map(accuracy_mapping).astype(int)
                    df_data['Balance'] = df_data['Balance'].map(balance_mapping).astype(int)
                    df_data['Condition'] = df_data.apply(lambda row: f"s{row['Walking Speed']:.0f}a{row['Accuracy']:.0f}b{row['Balance']:.0f}", axis=1)
                    predictors = df_data[predictors_columns]

                    sheet_name = bmh_file.split('_')[1].replace('.xlsx', '')
                    df_survey = read_and_adjust_sheet(filepath_survey, sheet_name)
                    df_survey = normalize_to_one(df_survey, targets_columns)
                    targets = df_survey[targets_columns]
                    is_not_matching = ~df_data['Condition'].apply(lambda cond: match_two_parts(cond, test_condition))
                    predictors = predictors[is_not_matching.values]
                    targets = targets[is_not_matching.values]

                    combined_data = pd.concat([combined_data, predictors], ignore_index=True)
                    combined_targets = pd.concat([combined_targets, targets], ignore_index=True)

                combined_data_scaled = pd.DataFrame(scaler.fit_transform(combined_data), columns=combined_data.columns)

                # Test data
                data_path_test = os.path.join(base_path, test_file)
                df_data_test = pd.read_excel(data_path_test)
                df_data_test['Walking Speed'] = df_data_test['Walking Speed'].map(walking_speed_mapping).astype(int)
                df_data_test['Accuracy'] = df_data_test['Accuracy'].map(accuracy_mapping).astype(int)
                df_data_test['Balance'] = df_data_test['Balance'].map(balance_mapping).astype(int)
                df_data_test['Condition'] = df_data_test.apply(lambda row: f"s{row['Walking Speed']:.0f}a{row['Accuracy']:.0f}b{row['Balance']:.0f}", axis=1)
                X_test = df_data_test[predictors_columns]
                X_test = pd.DataFrame(scaler.transform(X_test), columns=X_test.columns)

                sheet_name = test_file.split('_')[1].replace('.xlsx', '')
                df_survey_test = read_and_adjust_sheet(filepath_survey, sheet_name)
                df_survey = normalize_to_one(df_survey, targets_columns)
                y_test = df_survey_test[targets_columns]

                # Filter for matching conditions in test data
                df_data_test = df_data_test.reset_index(drop=True)
                is_matching_condition = df_data_test['Condition'] == test_condition
                y_test = y_test.reset_index(drop=True)
                X_test = X_test.reset_index(drop=True)

                X_test = X_test[is_matching_condition]
                y_test = y_test[is_matching_condition]

                mae_fold = []

                for col_idx, column in enumerate(targets_columns):
                    for model_name, model_fn in model_dict.items():
                        model = model_fn() # new model instance
                        model.fit(combined_data_scaled, combined_targets[column])
                        predictions = model.predict(X_test)
                        for i, value in enumerate(predictions):
                            if value < 0:
                                predictions[i] = 0
                        if col_idx == 0:
                            og_predictions_df = pd.DataFrame(predictions, columns=[column])
                        else:
                            og_predictions_df[column] = predictions

                        mae = mean_absolute_error(y_test[column].values, predictions)
                        all_results[model_type][model_name].append({
                            'test_file': test_file, 'test_condition': test_condition,
                            'target_variable': column,
                            'mae': mae,
                            'predicted_values': predictions.tolist(),
                            'actual_values': y_test[column].values.tolist(),
                            'predictors_used': ', '.join(predictors_columns)
                        })

    elif model_type == 'subject_specific':
        for bmh_file in bmh_files:
            predictors, df_data = load_and_prepare(bmh_file, predictors_columns)
            df_data['Condition'] = df_data.apply(lambda row: f"s{row['Walking Speed']}a{row['Accuracy']}b{row['Balance']}", axis=1)
            sheet_name = bmh_file.split('_')[1].replace('.xlsx', '')  # Remove .xlsx extension
            df_survey = read_and_adjust_sheet(filepath_survey, sheet_name)
            df_survey = normalize_to_one(df_survey, targets_columns)
            targets = df_survey[targets_columns]
            subject_maes = []
            for test_condition in df_data['Condition'].unique():
                targets_drop = targets.reset_index(drop=True)
                is_not_matching = ~(df_data['Condition'] == test_condition)
                X_train = predictors[df_data['Condition'] != test_condition]
                X_test = predictors[df_data['Condition'] == test_condition]
                y_train = targets_drop[df_data['Condition'] != test_condition]
                y_test = targets_drop[df_data['Condition'] == test_condition]
                scaler = StandardScaler()
                X_train_scaled = scaler.fit_transform(X_train)
                X_test_scaled = scaler.transform(X_test)
                for target_col in targets_columns:
                    for model_name, model_fn in model_dict.items():
                        model = model_fn() # new model instance
                        model.fit(X_train_scaled, y_train[target_col])
                        predictions = model.predict(X_test_scaled)
                        predictions = np.clip(predictions, 0, None)
                        mae = mean_absolute_error(y_test[target_col], predictions)
                        all_results['subject_specific'][model_name].append({
                            'subject': bmh_file.split('_')[1],
                            'test_condition': test_condition,
                            'target_variable': target_col,
                            'mae': mae,
                            'predicted_values': predictions.tolist(),
                            'actual_values': y_test[target_col].tolist(),
                            'predictors_used': ', '.join(predictors_columns)
                        })
                        subj = bmh_file.split('_')[1]
                        # Only subject specific: importances & coeffs
                        if X_train_scaled.shape[0]>=2 and len(np.unique(y_train[target_col]))>1:
                            result = permutation_importance(model, X_train_scaled, y_train[target_col], n_repeats=30, random_state=0)
                            subject_specific_importances[model_name][subj][target_col].append(result.importances_mean)
                            feature_summary.append({
                                'subject': subj,
                                'target': target_col,
                                'model': model_name,
                                'predictors': predictors_columns,
                                'feature_names': X_train.columns.tolist(),
                                'feature_importances': result.importances_mean.tolist(),
                                'coefficients': model.coef_.tolist() if hasattr(model, 'coef_') else None
                                
                            })
                        subject_specific_coeffs[model_name][subj][target_col].append(model.coef_)
# Save all model results in one CSV per model type
for model_type in all_results:
    for model_name in all_results[model_type]:
        df = pd.DataFrame(all_results[model_type][model_name])
        outfile = f"{model_type}_results_{model_name.replace(' ','_').lower()}_rerun.csv"
        df.to_csv(outfile, index=False)
        print(f"Saved master results CSV: {outfile}")

# Save the coefficients and importances summary
feature_df = pd.DataFrame(feature_summary)
feature_df.to_csv("subject_specific_feature_summary.csv", index=False)
print("Saved feature importance & coefficients CSV: subject_specific_feature_summary.csv")

print("Done")

In [None]:
# Subject agnostic errors
cv_path = '/Users/jordanfeldman/Desktop/Research/subject_agnostic_results_ridge_regression.csv'

threshold = 0.11       
use_thresh = 'y'        # 'y' to use threshold, 'n' to ignore

# Load & clean
cv_df = pd.read_csv(cv_path)
cv_df['predicted_values'] = cv_df['predicted_values'].apply(lambda x: float(str(x).strip('[]')))
cv_df['actual_values']    = cv_df['actual_values'].apply(lambda x: float(str(x).strip('[]')))

pair_indices = list(combinations(range(4), 2))

# Function to get sign match with/without threshold
def pair_match(pred_diff, act_diff, k, use_thresh_flag):
    if use_thresh_flag == 'y':
        if (abs(act_diff) < k and abs(pred_diff) < k) or (np.sign(pred_diff) == np.sign(act_diff)):
            return 1
        else:
            return 0
    else:
        if act_diff == 0 and pred_diff == 0:
            return 1
        elif act_diff == 0 and pred_diff != 0:
            return 0
        else:
            return int(np.sign(pred_diff) == np.sign(act_diff))

cv_errors = []
k = float(threshold)

for i in range(0, len(cv_df), 4):
    block = cv_df.iloc[i:i+4].reset_index(drop=True)
    if len(block) < 4 or len(block['target_variable'].unique()) < 4: 
        continue

    block_matches = []
    for idx1, idx2 in pair_indices:
        pred_diff = block.loc[idx1, 'predicted_values'] - block.loc[idx2, 'predicted_values']
        act_diff  = block.loc[idx1, 'actual_values']    - block.loc[idx2, 'actual_values']
        block_matches.append(pair_match(pred_diff, act_diff, k, use_thresh))

    if block_matches:
        acc = np.mean(block_matches) * 100.0
        err = 100.0 - acc
        cv_errors.append(err)

print(f"Subject agnostic errors (n={len(cv_errors)}):")
print(cv_errors)


In [None]:
# Subject specific errors
ss_path = '/Users/jordanfeldman/Desktop/Research/subject_specific_results_ridge_regression.csv'

threshold = 0.11
use_thresh = 'y'   # 'y' to use threshold, 'n' to ignore
pair_indices = list(combinations(range(4), 2))

# Load & clean
df = pd.read_csv(ss_path)
df['predicted_values'] = df['predicted_values'].apply(lambda x: float(str(x).strip('[]')))
df['actual_values']    = df['actual_values'].apply(lambda x: float(str(x).strip('[]')))

new_df = df.copy()
# Function to get sign match with/without threshold
def pair_match(pred_diff, act_diff, k, use_thresh_flag):
    if use_thresh_flag == 'y':
        if (abs(act_diff) < k and abs(pred_diff) < k) or (np.sign(pred_diff) == np.sign(act_diff)):
            return 1
        else:
            return 0
    else:
        if act_diff == 0 and pred_diff == 0:
            return 1
        elif act_diff == 0 and pred_diff != 0:
            return 0
        else:
            return int(np.sign(pred_diff) == np.sign(act_diff))

ss_errors = []  
k = float(threshold)

# If your CSV has a 'subject' column, this groups by subject; otherwise, this still works as one group
grouped = new_df.groupby(['subject'], dropna=False) if 'subject' in new_df.columns else [('all', new_df)]

for subject, group in grouped:
    for i in range(0, len(group), 4):
        block = group.iloc[i:i+4].reset_index(drop=True)
        if len(block) < 4 or len(block['target_variable'].unique()) < 4:
            continue

        block_matches = []
        for idx1, idx2 in pair_indices:
            pred_diff = block.loc[idx1, 'predicted_values'] - block.loc[idx2, 'predicted_values']
            act_diff  = block.loc[idx1, 'actual_values']    - block.loc[idx2, 'actual_values']
            block_matches.append(pair_match(pred_diff, act_diff, k, use_thresh))

        if block_matches:
            acc = np.mean(block_matches) * 100.0
            err = 100.0 - acc
            ss_errors.append(err)

print(f"Subject-specific errors (n={len(ss_errors)}):")
print(ss_errors)


In [None]:
# Permutation importance plotting script

# Load the file
df = pd.read_csv("/Users/jordanfeldman/Desktop/Research/subject_specific_feature_summary.csv")
df_full = df.copy()

# Extract long-format data for plotting
long_data = []

for _, row in df_full.iterrows():
    try:
        feature_names = ast.literal_eval(row['feature_names'])
        importances = ast.literal_eval(row['feature_importances'])
        target = row['target']  # One target per row
    except:
        continue

    for name, score in zip(feature_names, importances):
        long_data.append({
            'Predictor': name.strip(),
            'Importance': score,
            'Target': target
        })

# Create plot DataFrame
plot_df = pd.DataFrame(long_data)

pretty_map = {
    'Mean Width Straights (mm)': 'Step width',
    'Straights Width Variability (mm)': 'Step width\nvariability',
    'Mean Length Straights (mm)': 'Step length',
    'Straights Length Variability (mm)': 'Step length\nvariability',
    'Average Speed (m/s)': 'Walking\nspeed',
    'Head Angle (deg)': 'Head\nangle',
}
display_order = [
    'Step width',
    'Step width\nvariability',
    'Step length',
    'Step length\nvariability',
    'Walking\nspeed',
    'Head\nangle'
]

plot_df = plot_df.copy()
plot_df['Predictor'] = plot_df['Predictor'].map(pretty_map).fillna(plot_df['Predictor'])
plot_df['Predictor'] = pd.Categorical(plot_df['Predictor'], categories=display_order, ordered=True)

hue_order = ['Walking Speed', 'Balance', 'Foot Placement', 'Metabolics']
green_palette = {
    'Walking Speed':  '#E6F4D7',  # very light green
    'Balance':        '#A4B999',  # light green
    'Foot Placement': '#6FAA5F',  # dark green
    'Metabolics':     '#B6D64A',  # yellow-green
}

plt.figure(figsize=(10, 9))

ax = sns.barplot(
    data=plot_df,
    x='Predictor',
    y='Importance',
    hue='Target',
    hue_order=hue_order,
    errorbar=None,              
    palette=[green_palette[h] for h in hue_order],
    edgecolor='black',             
    linewidth=1.2
)

ax.set_xlabel('Features for subject-specific model', fontsize=20)
ax.set_ylabel('Feature relevance', fontsize=20)
ax.tick_params(axis='x', labelsize=16)
ax.tick_params(axis='y', labelsize=16)
ax.set_ylim(0, 0.6) 
sns.despine(top=True, right=True)

leg = ax.legend(title='Movement goals', frameon=False, labels =['Walking speed', 'Balance', 'Foot placement', 'Energy expenditure'])
plt.setp(leg.get_title(), fontsize=16)
for txt in leg.get_texts():
    txt.set_fontsize(16)

plt.tight_layout()
plt.savefig('permutation_importance_redo.svg', dpi=300, bbox_inches='tight')
plt.show()



In [None]:
# Ranking error box plots with proportions of error levels
cv_df_plot = pd.DataFrame({'Error (%)': cv_errors, 'Condition': 'Cross Validation'})
ss_df_plot = pd.DataFrame({'Error (%)': ss_errors, 'Condition': 'Subject Specific'})
plot_df = pd.concat([cv_df_plot, ss_df_plot], ignore_index=True)
plot_df = plot_df.copy()
plot_df["Condition"] = plot_df["Condition"].replace({
    "Subject agnostic": "New subject, new condition",
    "Subject specific": "Subject-specific, new condition"
})
order = ["New subject, new condition", "Subject-specific, new condition"]
plot_df["Condition"] = plot_df["Condition"].replace({
    "Cross Validation": "New subject, new condition",
    "Subject Specific": "Subject-specific, new condition"
})

# box colors
palette = {
    "New subject, new condition": "lightgrey",
    "Subject-specific, new condition": "#32572C"
}

# Stats
shapiro_cv = stats.shapiro(cv_errors)
shapiro_ss = stats.shapiro(ss_errors)
print(f"Shapiro-Wilk CV errors: W={shapiro_cv.statistic}, p-value={shapiro_cv.pvalue}")
print(f"Shapiro-Wilk SS errors: W={shapiro_ss.statistic}, p-value={shapiro_ss.pvalue}")

wilcoxon_test = stats.wilcoxon(cv_errors, ss_errors)
print(f"Wilcoxon p-value: {wilcoxon_test[1]}")

p_adj = float(np.asarray(wilcoxon_test[1]))
stars = "**" if p_adj < 0.005 else "*" if p_adj < 0.05 else None


fig = plt.figure(figsize=(12, 10), constrained_layout=False)
gs  = fig.add_gridspec(
    nrows=2, ncols=2,
    width_ratios=[1, 1],
    height_ratios=[0.22, 1.0],  
    wspace=0.35, hspace=0.05
)

ax_box   = fig.add_subplot(gs[:, 0])   
ax_leg   = fig.add_subplot(gs[0, 1])  
ax_stack = fig.add_subplot(gs[1, 1])  
ax_leg.axis("off")

# Box plot
sns.boxplot(
    data=plot_df,
    x="Condition",
    y="Error (%)",
    showcaps=True,
    ax=ax_box,
    legend=False,
    palette=palette,
    showfliers=False
)

for patch in ax_box.patches:
    patch.set_edgecolor("black")

for line in ax_box.lines:
    line.set_color("black")

ax_box.tick_params(axis='both', labelsize=16)

ax_box.set_xticklabels(
    ["Subject-agnostic", "Subject-specific"],
    fontsize=16
)
ax_box.set_xlabel("Model type", fontsize=20)

# add mean markers
means = plot_df.groupby("Condition")["Error (%)"].mean().reindex(order)
xpos = np.arange(len(order))
ax_box.scatter(xpos, means.values, color="black", marker="^", s=85, zorder=3)

# add significance
if stars:
    ymax = plot_df["Error (%)"].max()
    ymin = plot_df["Error (%)"].min()
    span = ymax - ymin
    y_br = ymax + 0.10 * span - 20
    draw_sig_bracket(ax_box, 0, 1, y_br, stars,
                     tick=0.035 * span, pad=0.05 * span)
    ax_box.set_ylim(top=y_br + 0.12 * span)

ax_box.set_ylabel("Ranking error (%)", fontsize=20)
ax_box.set_ylim(0, 100)
sns.despine(ax=ax_box)

# Stacked proportions 
tmp = plot_df.copy()
level_order = [0.0, 16.7, 33.3, 50.0, 66.7, 83.3]  
tmp["ValueRounded"] = tmp["Error (%)"].round(1)
levels_obs = [lv for lv in level_order if lv in tmp["ValueRounded"].unique()]


counts = (tmp.groupby(["Condition", "ValueRounded"])["Error (%)"]
            .count()
            .unstack(fill_value=0)
            .reindex(index=order, columns=levels_obs))
props = counts.div(counts.sum(axis=1), axis=0)


level_colors = {
    0.0:  "#4B2E83",  # deep purple
    16.7: "#325AA8",  # blue
    33.3: "#1E7F88",  # teal
    50.0: "#18A7AE",  # aqua
    66.7: "#27C5D8",  # light cyan
    83.3: "#BFE7E9",  # very light cyan
}

bottom = np.zeros(len(order))
for lvl in levels_obs:
    ax_stack.bar(
        order,
        props[lvl].values,
        bottom=bottom,
        color=level_colors[lvl],
        width=0.65,
        edgecolor="none",
        label=f"{lvl:.1f}%"
    )
    bottom += props[lvl].values

ax_stack.set_ylim(0, 1.0)
ax_stack.set_ylabel("Proportion of ranking error", fontsize=20)
ax_stack.set_xlabel("Model type", fontsize=20)
ax_stack.set_xticklabels(
    ["Subject-agnostic", "Subject-specific"],
    fontsize=16
)
ax_stack.tick_params(axis='both', labelsize=16)
sns.despine(ax=ax_stack)


handles, labels = ax_stack.get_legend_handles_labels()
label_to_handle = {lab: h for h, lab in zip(handles, labels)}
ordered_labels  = [f"{lv:.1f}%" for lv in level_order if f"{lv:.1f}%" in label_to_handle]
ordered_handles = [label_to_handle[lab] for lab in ordered_labels]
leg = ax_leg.legend(
    ordered_handles,
    ordered_labels,
    title="Ranking error (%)",
    ncol=3,             
    loc="center",
    frameon=False,
    fontsize=16,
    handlelength=1.0,
    columnspacing=1.2,
    labelspacing=0.6,
)
leg.get_title().set_fontsize(16)

plt.tight_layout()
plt.savefig('ranking_error_boxplots_fixed.svg', dpi=300)
plt.show()
