In [None]:
import numpy as np
import pandas as pd
from scipy.interpolate import interp1d
from scipy.spatial.distance import cdist
import glob 
import os 
import re 

## Reading in All Data

In [None]:
# Get all csv files in the folder
folder_path = 'Aligned Data All Participants'

# Initialize separate dictionaries
ax_dct = {}
la_dct = {}
fl_dct = {} 

# Get all CSV files
all_files = glob.glob(os.path.join(folder_path, "*.csv"))

# iterating across all files bucketing them 
for i in range(1, 31): 

    ax_imu = pd.DataFrame()
    ax_mcp = pd.DataFrame()
    la_imu = pd.DataFrame()
    la_mcp = pd.DataFrame()
    fl_imu = pd.DataFrame()
    fl_mcp = pd.DataFrame()

    for file in all_files:
        
        if re.search(rf'\\{i}(?!\d)', file):  
            if 'Axial' in file and 'IMU' in file: 
                ax_imu = pd.read_csv(file)
            elif 'Axial' in file and 'MoCap' in file:
                ax_mcp = pd.read_csv(file)
            elif 'Lateral' in file and 'IMU' in file:
                la_imu = pd.read_csv(file)
            elif 'Lateral' in file and 'MoCap' in file:
                la_mcp = pd.read_csv(file)
            elif 'Flexion' in file and 'IMU' in file:
                fl_imu = pd.read_csv(file)
            elif 'Flexion' in file and 'MoCap' in file:
                fl_mcp = pd.read_csv(file)
            else: 
                print('Warning: {file} isnt classified')
    
    if not ax_imu.empty and not ax_mcp.empty:
        ax_dct[i] = [ax_imu, ax_mcp]

    if not la_imu.empty and not la_mcp.empty:
        la_dct[i] = [la_imu, la_mcp]

    if not fl_imu.empty and not fl_mcp.empty:
        fl_dct[i] = [fl_imu, fl_mcp]

print(f"Loaded {len(ax_dct)} AxialRotation participants, each with MCP and IMU measurements.")
print(f"Loaded {len(la_dct)} Flexion participants, each with MCP and IMU measurements.")
print(f"Loaded {len(fl_dct)} LateralBending participants, each with MCP and IMU measurements.")

Loaded 26 AxialRotation participants, each with MCP and IMU measurements.
Loaded 27 Flexion participants, each with MCP and IMU measurements.
Loaded 28 LateralBending participants, each with MCP and IMU measurements.


## Functions 

In [None]:
# calculate distance
def dtw_distance(ts1, ts2, distance_metric = 'euclidean'):
    """Compute and return DTW (dynamic time warping) distance between two time series

    Dynamic Time Warping (DTW) is a method for measuring the similarity between two time series, 
    even if they are misaligned or vary in speed (such as the differences in IMU and MoCap). DTW 
    works by finding the best way to align the two sequences. Instead of comparing each point 
    strictly one-to-one, DTW allows each point in one time series to be matched with one or more 
    oints in the other. This flexible "many-to-many" matching ensures that similar patterns are 
    aligned, even if they occur at different time steps. Once the optimal alignment is found, the 
    overall difference between the two time series is calculated using the Euclidean distance 
    between the matched points. However, this value is practically meaningless out of context, so 
    permutation tests must happen to place value onto this distance. 

    For more information: https://en.wikipedia.org/wiki/Dynamic_time_warping
    
    Args:
        ts1 (np.array): A time series that's been mapped to a time grid. 
        ts2 (np.array): Another time series that's been mapped to the same time grid as ts1. 
        distance_metric (string): A metric to define difference in time series, such as 'euclidean', 'mahalnobis' etc. 

    Returns: 
        cost[n, m] (float): A value representing the distance between ts1 and ts2 
    """
    n, m = len(ts1), len(ts2)
    D = cdist(ts1.reshape(-1, 1), ts2.reshape(-1, 1), metric=distance_metric)

    cost = np.zeros((n + 1, m + 1))
    cost[0, 1:] = np.inf
    cost[1:, 0] = np.inf

    for i in range(n):
        for j in range(m):
            cost[i+1, j+1] = D[i, j] + min(cost[i, j+1], cost[i+1, j], cost[i, j])

    return cost[n, m]

In [None]:
# keeping gaps in time consistent 
def adjust_for_gaps(series, gap=0.2): 
    """ 'Standardizes' the gaps between MoCap and IMU measurements 

    DTW and other built-in functions utilized for this analysis (interp1d, arange) can give 
    misleading results if the two time series that are compared have inconsistent gaps. This 
    functions takes in a dictionary, where the keys are participants and the values are a list 
    of 2 dataframes, 1 for each measurement, and ensures that both measurements within each 
    participant have roughly equal gaps. 

    Args:
        series (dct): A dictionary formatted in the way specified above
        gap (float): The size of the gaps that should be standardized (or larger) in seconds. Default set to 0.2 seconds. 
    
    Returns: 
        adj_series (dct): The same dictionary as the input but with the new changes 
    """
    adj_series = {} 

    for key, pair in series.items(): 
        # removing all na values 
        imu = pair[0].dropna().reset_index(drop=True)
        mcp = pair[1].dropna().reset_index(drop=True)

        # finding gaps 
        def find_gaps(df):
            time_diffs = df.iloc[:, 0].diff()
            gap_indices = time_diffs[time_diffs > gap].index 
            gaps = []
            for idx in gap_indices: 
                gap_start = df.iloc[idx-1, 0]
                gap_end = df.iloc[idx, 0]
                gaps.append((gap_start, gap_end))
            return gaps

        all_gaps = list(set(find_gaps(imu) + find_gaps(mcp)))

        imu_sync = imu.copy()
        mcp_sync = mcp.copy()

        # if gap exists in imu, remove interval in mocap as well and vice versa 
        for gap_start, gap_end, in all_gaps:
            imu_sync = imu_sync[(imu_sync.iloc[:, 0] < gap_start) | (imu_sync.iloc[:, 0] > gap_end)]
            mcp_sync = mcp_sync[(mcp_sync.iloc[:, 0] < gap_start) | (mcp_sync.iloc[:, 0] > gap_end)]

        # reinit new dict 
        adj_series[key] = [imu_sync, mcp_sync]
    
    return adj_series

In [None]:
# obtain DTW distances and p-values 
def get_distances_and_p_values(series, perms=500, granularity=0.1, seed=566):
    """Basically the full process + permutation tests.

    The permutation test has the following hypotheses:

    H0: The observed DTW similarity between IMU and MoCap is no better than what would be expected under random phase shifts of IMU (Any observed alignment could be due to chance phase alignment between two signals with similar internal structure.)
    Ha: The observed DTW similarity between IMU and MoCap is significantly better than what would be expected under random phase shifts of IMU (a meaningful, temporally-aligned relationship exists!)

    1. Obtain DTW distance between IMU and MoCap (as mentioned in the custom function: dtw_distance())
    2. Run permutation test! 
        a. Create permutations that maintain the same temporal structure as IMU. This is done by 'rotating' the time series by a certain amount, wrapping around to the beginning (hence, phase shifts).  
        b. Compute DTW distance for each permutation against MoCap
        c. Determine how permutated distances are smaller than observed (this is our p-value!)
    3. Repeat steps 1 and 2 for every participant 

    Rather than interpreting p-values in a binary fashion (reject null vs. not reject null), we should understand what the p-value represents
    in context. For instance, a p-value of 0.038 means that 3.8% of permuted, phase aligned time series resulted in a lower distance than what
    was observed for this particular participant. From there, we recommend 'aggregating' the results by describing the proportion of participants 
    that resulted in a p-value that is smaller than a desired significance level (so going back to binary fashion, but also explaining the results 
    at a granular level as well!) (e.g. 25/26 participants resulted in statistically similar measurements between MoCap and IMU for Flexion).  

    Args:
        series (dct): A dictionary formatted in the way specified above. 
        perms (int): Number of permutations generated for each permutation test. 
        granularity (float): Granulity of matching time series, default set to the tenth of a second (don't make it anything larger or lose information). Decreasing this value drastically increases simulation time. 
        seed (int): Setting seed for reproducibility
    
    Returns: 
        dtw (array): An array distances for each participants' measurements
        p_values (array): p-values to determine the significance of each distance 
    """
    np.random.seed(seed)

    dtw = []
    p_values = []

    for key, pair in series.items(): 
        imu = pair[0]
        mcp = pair[1]

        # start and end of 'consistent' time series 
        start = round(min(imu.iloc[:, 0].min(), imu.iloc[:, 0].min()), 2)
        end = round(max(imu.iloc[:, 0].max(), imu.iloc[:, 0].max()), 2)

        time_grid = np.arange(start, end+granularity, granularity) # time grid to keep everything consistent 

        # applying time grid to time series 
        interp_func_imu = interp1d(imu.iloc[:, 0], imu.iloc[:, 1], kind='linear', fill_value='extrapolate')
        interp_func_mcp = interp1d(mcp.iloc[:, 0], mcp.iloc[:, 1], kind='linear', fill_value='extrapolate')

        imu_interp = interp_func_imu(time_grid).ravel()
        mcp_interp = interp_func_mcp(time_grid).ravel()

        observed = dtw_distance(imu_interp, mcp_interp) # utilizing function from earlier 

        dtw.append(observed)

        # permutation test 
        if not np.isnan(observed): 
            perm_dists = []
            for _ in range(perms):
                permuted = np.roll(imu_interp, np.random.randint(1, len(imu_interp))) # rotating elements to generate permutations with the same temporal structure  
                dist = dtw_distance(permuted, mcp_interp) 
                perm_dists.append(dist)

            # determining if observed distance is significantly SMALLER than if done by randomly mixing phases  
            p_value = max(np.mean(np.array(perm_dists) <= observed), 2e-16)

            # Printed output 
            print(f"Key: {key}, Observed DTW: {observed}, p-value: {p_value}, Mean Sim Distance: {np.mean(perm_dists)}, SD Sim Distance: {np.std(perm_dists)}")
            
            dtw.append(observed)
            p_values.append(p_value)
            
        else: 
            # error handling 
            print(f"Omitted key: {key} due to NaN")
    
    return dtw, p_values 

## Axial Rotation 

In [None]:
# took me around 16 min to run 

adj_ax = adjust_for_gaps(ax_dct, gap=0.2) # syncing gaps in ax
ax_dtw, ax_p = get_distances_and_p_values(adj_ax, perms=500, granularity=0.1, seed=566) # obtaining distances and using perm tests for p-values 

Key: 1, Observed DTW: 2287.5116059455454, p-value: 0.048, Mean Sim Distance: 6957.521388990037, SD Sim Distance: 2842.9371384476212
Key: 2, Observed DTW: 774.0564917161372, p-value: 0.012, Mean Sim Distance: 4672.788100978274, SD Sim Distance: 2535.5740940889145
Key: 3, Observed DTW: 478.25501718143704, p-value: 2e-16, Mean Sim Distance: 5482.6265196691265, SD Sim Distance: 2958.082309377639
Key: 4, Observed DTW: 1973.4635210828185, p-value: 0.07, Mean Sim Distance: 5811.490408828528, SD Sim Distance: 2253.3597174184124
Key: 7, Observed DTW: 1500.8782826207446, p-value: 0.056, Mean Sim Distance: 5697.995664011592, SD Sim Distance: 2782.87423486587
Key: 9, Observed DTW: 1915.5376223435806, p-value: 0.038, Mean Sim Distance: 7672.258831787424, SD Sim Distance: 3620.3839393423314
Key: 10, Observed DTW: 1333.8765863648457, p-value: 0.06, Mean Sim Distance: 6308.980746299572, SD Sim Distance: 3272.60190807031
Key: 11, Observed DTW: 3623.481753471097, p-value: 0.04, Mean Sim Distance: 6397.6

## Lateral Bending 

In [None]:
# took me around 17 min to run 

adj_la = adjust_for_gaps(la_dct, gap=0.2) # syncing gaps in la 
la_dtw, la_p = get_distances_and_p_values(adj_la, perms=500, granularity=0.1, seed=566) # obtaining distances and using perm tests for p-values 

Key: 1, Observed DTW: 304.5404381527296, p-value: 0.02, Mean Sim Distance: 3821.9859322788147, SD Sim Distance: 2300.768473476973
Key: 2, Observed DTW: 249.3379876307666, p-value: 0.016, Mean Sim Distance: 2110.69095162572, SD Sim Distance: 1105.6638659315681
Key: 3, Observed DTW: 449.3437140148661, p-value: 0.004, Mean Sim Distance: 2654.82325671069, SD Sim Distance: 1330.9960626239335
Key: 4, Observed DTW: 224.99359051317927, p-value: 2e-16, Mean Sim Distance: 3161.538196220086, SD Sim Distance: 1783.252761133557
Key: 5, Observed DTW: 1275.303428700717, p-value: 0.016, Mean Sim Distance: 4399.880095330044, SD Sim Distance: 1914.9782765870875
Key: 7, Observed DTW: 877.8681429127348, p-value: 0.022, Mean Sim Distance: 3230.468900851127, SD Sim Distance: 1418.7152815147226
Key: 9, Observed DTW: 806.5520451717422, p-value: 0.022, Mean Sim Distance: 3000.546109529315, SD Sim Distance: 1465.3289432489041
Key: 10, Observed DTW: 258.9673494317371, p-value: 0.012, Mean Sim Distance: 3017.9600

## Flexion 

In [None]:
# took me around 30 min to run 

adj_fl = adjust_for_gaps(fl_dct, gap=0.2) # syncing gaps in fl 
fl_dtw, fl_p = get_distances_and_p_values(adj_fl, perms=500, granularity=0.1, seed=566) # obtaining distances and using perm tests for p-values 

Key: 1, Observed DTW: 1529.0343052695846, p-value: 0.376, Mean Sim Distance: 2495.9643791757608, SD Sim Distance: 1223.2665795490593
Key: 2, Observed DTW: 438.3791026474238, p-value: 0.002, Mean Sim Distance: 2622.3632217915215, SD Sim Distance: 1113.2918302629103
Key: 3, Observed DTW: 725.80813382195, p-value: 0.028, Mean Sim Distance: 2633.23886740987, SD Sim Distance: 1186.2200571297087
Key: 4, Observed DTW: 959.6747984914522, p-value: 0.03, Mean Sim Distance: 2653.6871875325564, SD Sim Distance: 1179.4680905206078
Key: 5, Observed DTW: 1457.844198154674, p-value: 0.018, Mean Sim Distance: 5359.530684717366, SD Sim Distance: 2171.8621476870535
Key: 7, Observed DTW: 1053.4649075897994, p-value: 0.076, Mean Sim Distance: 4323.827995499059, SD Sim Distance: 1996.0960957106918
Key: 9, Observed DTW: 519.0417277651823, p-value: 2e-16, Mean Sim Distance: 3789.65347024388, SD Sim Distance: 1803.7615589148581
Key: 10, Observed DTW: 451.37512401693857, p-value: 0.028, Mean Sim Distance: 2967.