In [1]:
import pandas as pd
import numpy as np
import pprint

In [2]:
# base variables
csv_files = ["ALP", "BS", "DR", "EC", "HD", "JF", "JR", "SS"]

BASE_DIR = "~/Research/wheelchair/data/raw/Max"
OUTPUT_DIR = "~/Research/wheelchair/data/processed/"

# variables for glove types
materials = ["HYB", "PLA"]

### Helper functions and config

In [3]:
def mean_positive(x):
    positives = x[x > 0]
    if len(x) == 0:
        return np.nan
    return positives.mean()

def mean_negative(x):
    negatives = x[x < 0]
    if len(x) == 0:
        return np.nan
    return negatives.mean()

def nan_min(x):
    negatives = x[x < 0]
    if len(x) == 0:
        return np.nan
    return negatives.min()

def nan_max(x):
    positives = x[x >0]
    if len(x) == 0:
        return np.nan
    return positives.max()  


# Building the parameter dictionary
# side is "L" or "R" for left or right sides
def build_parameter_dictionary(side):
    xy_force_params = {
        "tangential_force":"N" , 
        "radial_force": "N", 
        "axle_force": "N",
        "moment_z": "Nm" 
    }
    
    # build up a dictionary containing the parameters we want to calculate
    agg_dict = {}
    for col, unit in xy_force_params.items():
        col_name = f"{col}_{side}" 
        og_column = f"{col_name}[{unit}]"
        agg_dict[f"{col_name}_pos[{unit}]"] = (og_column, mean_positive)
        agg_dict[f"{col_name}_neg[{unit}]"] = (og_column, mean_negative)
        agg_dict[f"{col_name}_pos_peak[{unit}]"] = (og_column, nan_max)
        agg_dict[f"{col_name}_neg_peak[{unit}]"] = (og_column, nan_min)

    # add the total force as well
    col = "total_force"
    unit = "N"
    col_name = f"{col}_{side}" 
    og_column = f"{col_name}[{unit}]"
    agg_dict[f"{col_name}[{unit}]"] = (og_column, mean_positive)
    agg_dict[f"{col_name}_peak[{unit}]"] = (og_column, nan_max)

    return agg_dict


# not specific to R or L side
def get_power_parameters():
    agg_dict = {}
    col = "power_z"
    unit = "W"

    og_column = f"{col}[{unit}]"
    agg_dict[f"{col}_pos[{unit}]"] = (og_column, mean_positive)
    agg_dict[f"{col}_neg[{unit}]"] = (og_column, mean_negative)
    agg_dict[f"{col}_pos_peak[{unit}]"] = (og_column, nan_max)
    agg_dict[f"{col}_neg_peak[{unit}]"] = (og_column, nan_min)
    return agg_dict

In [4]:
# load and clean the data for a given side
def load_cycle_data(path, side):
    raw_df = pd.read_csv(path)

    # only use named columns and when the hand cycle is touching
    df = raw_df.drop(columns=[c for c in raw_df.columns if "Unnamed" in c])
    df = df[df[f'theta_cop_{side}[deg]'].notna()]

    return df

def load_cycle_data_any_side(path):
    raw_df = pd.read_csv(path)

    # only use named columns and when the hand cycle is touching
    df = raw_df.drop(columns=[c for c in raw_df.columns if "Unnamed" in c])
    df = df[df[f'theta_cop_L[deg]'].notna() | df[f'theta_cop_R[deg]'].notna() ]
    return df

# Add total_force calculation
def compute_force_columns(df, side):
    # also add the total force
    df[f'total_force_{side}[N]'] = np.sqrt(
        df[f"tangential_force_{side}[N]"]**2 +
        df[f"radial_force_{side}[N]"]**2 +
        df[f"axle_force_{side}[N]"]**2
    )
    return df


def aggregate_per_cycle(df, side):
    agg_dict = build_parameter_dictionary(side)
    return df.groupby("cycle[count]").agg(**agg_dict)


# post-aggregation averages
def get_averages_from_l_r(df):
    # with positives and negatives
    side_cols = {
        "tangential_force":"N" , 
        "radial_force": "N", 
        "axle_force": "N",
        "moment_z": "Nm"
    }

    for col, unit in side_cols.items():
        L_pos = f"{col}_L_pos[{unit}]"
        L_neg = f"{col}_L_neg[{unit}]"

        R_pos = f"{col}_R_pos[{unit}]"
        R_neg = f"{col}_R_neg[{unit}]"

        L_pos_peak = f"{col}_L_pos_peak[{unit}]"
        L_neg_peak = f"{col}_L_neg_peak[{unit}]"     

        R_pos_peak = f"{col}_R_pos_peak[{unit}]"
        R_neg_peak = f"{col}_R_neg_peak[{unit}]"

        # averages
        df[f"{col}_avg_pos[{unit}]"] = df[[L_pos, R_pos]].mean(axis=1, skipna=True)
        df[f"{col}_avg_neg[{unit}]"] = df[[L_neg, R_neg]].mean(axis=1, skipna=True)


        df[f"{col}_avg_pos_peak[{unit}]"] = df[[L_pos_peak, R_pos_peak]].mean(axis=1, skipna=True)
        df[f"{col}_avg_neg_peak[{unit}]"] = df[[L_neg_peak, R_neg_peak]].mean(axis=1, skipna=True)


    # add on the total force, that is not split into positive and negatives
    df['total_force_avg[N]'] = df[['total_force_R[N]', 'total_force_L[N]']].mean(axis=1, skipna=True)
    df['total_force_avg_peak[N]'] = df[['total_force_R_peak[N]', 'total_force_L_peak[N]']].mean(axis=1, skipna=True)

    return df

def compute_per_cycle_power(path):
    df = load_cycle_data_any_side(path)

    # manually calculate the power
    df['power_z[W]'] = df['gyro_z_R[rad/s]']*df['moment_z_R[Nm]']
    agg_dict = get_power_parameters()
    return df.groupby("cycle[count]").agg(**agg_dict)
    

In [5]:
def run_all(material, initials):
    input_file = f"{BASE_DIR}/{material}/{initials}25{material}.csv"
    output_file = f"{OUTPUT_DIR}/{material}/{initials}25{material}_per_cycle.csv"

    print(f"Processing file for {initials} with gloves {material}")    

    # left side
    df_left = load_cycle_data(input_file, "L")
    df_left = compute_force_columns(df_left, "L")
    agg_df_left = aggregate_per_cycle(df_left, "L")
    
    # # right side
    df_right = load_cycle_data(input_file, "R")
    df_right = compute_force_columns(df_right, "R")
    agg_df_right = aggregate_per_cycle(df_right, "R")
    
    # power calculation
    power_df = compute_per_cycle_power(input_file)
    
    # put it all together into one aggregate datafarme
    full_df = pd.concat([agg_df_left, agg_df_right, power_df], axis=1)

    # calculate the averages
    result_df = get_averages_from_l_r(full_df)

    # print to CSV files
    result_df.to_csv(output_file)
    print(f"Saved → {output_file}")

    return result_df

In [6]:
for material in materials:
    for initials in csv_files:
        run_all(material, initials)

Processing file for ALP with gloves HYB
Saved → ~/Research/wheelchair/data/processed//HYB/ALP25HYB_per_cycle.csv
Processing file for BS with gloves HYB
Saved → ~/Research/wheelchair/data/processed//HYB/BS25HYB_per_cycle.csv
Processing file for DR with gloves HYB
Saved → ~/Research/wheelchair/data/processed//HYB/DR25HYB_per_cycle.csv
Processing file for EC with gloves HYB
Saved → ~/Research/wheelchair/data/processed//HYB/EC25HYB_per_cycle.csv
Processing file for HD with gloves HYB
Saved → ~/Research/wheelchair/data/processed//HYB/HD25HYB_per_cycle.csv
Processing file for JF with gloves HYB
Saved → ~/Research/wheelchair/data/processed//HYB/JF25HYB_per_cycle.csv
Processing file for JR with gloves HYB
Saved → ~/Research/wheelchair/data/processed//HYB/JR25HYB_per_cycle.csv
Processing file for SS with gloves HYB
Saved → ~/Research/wheelchair/data/processed//HYB/SS25HYB_per_cycle.csv
Processing file for ALP with gloves PLA
Saved → ~/Research/wheelchair/data/processed//PLA/ALP25PLA_per_cycle.c