# Preprocessing


In [12]:
import os
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from scipy.signal import butter, filtfilt, savgol_filter, find_peaks
from scipy.spatial import distance
import tkinter
from tkinter import filedialog


## Helper Functions
### Initial Preprocessing

Extract direction of walk:

In [2]:
def extract_walk_direction(filename):
    # extract direction of the walk 
    if "back" in filename:
        direction = "back"
    elif "front" in filename:
        direction = "front"
    else:
        raise ValueError("File path must contain either 'back' or 'front' to determine the direction of the walk")
    
    return direction

Remove unneeded features:

In [3]:
def filter_columns(df):
	keypoints = ["RAnkle", "LAnkle", "RKnee", "LKnee", "RHip", "LHip"]

	columns = [f"{kp}_x" for kp in keypoints] + [f"{kp}_y" for kp in keypoints]

	return df.loc[:, df.columns.isin(columns)]

Smooth using a moving average or using a Savitzky-Golay filter 


In [4]:
def smoothing(data, filter):
    window = 5

    if filter == "average": 
        smooth = data.rolling(window=window).mean()
        smooth = smooth.fillna(data).tolist()
    elif filter == "savgol":
        order = 2
        smooth = savgol_filter(data, window, order, deriv=1, delta=df.time[1]-df.time[0])

    return smooth

Butterworth filter


In [5]:
def butterworth_filter(data):
    """
    Use Butterworth filtering to reduce high-frequency noise
    """

    fs = 30  # Sampling frequency in Hz
    cutoff = 4  # Cutoff frequency in Hz
    order = 2  
    btype = "lowpass" 

    b, a = butter(order, cutoff / (0.5 * fs), btype=btype)
    buttered = filtfilt(b, a, data)

    return buttered

Initial Preprocessing - full process

In [6]:
def initial_preprocessing(df, filter="average"):
    # remove unused columns
    new_df = filter_columns(df)
    
    # smooth the coordinates
    new_df = df.apply(lambda column: smoothing(column, filter=filter))

    # use butterworth fukter
    new_df = df.apply(lambda column: butterworth_filter(column))

    return new_df

### Gait Variable Preprocessing

Detect the moments of heel strike and toe off for a single leg:

In [7]:
def detect_steps(data, direction=''):
    """
    data: time series of the y-pixel of the ankle
    direction: front or back

    returns:
        heel_strikes: the indexes of when the heel first touches the ground 
    """

    dist = 20 # minimum distance between two detected steps taken on a single leg

    if not direction:
        direction = input("Choose walk direction: 'front' or 'back'?")

    # find peaks in the derivative of the y-coord of the ankle
    deriv  = np.gradient(data)
    if direction == 'front':
        peaks, _ = find_peaks(deriv, distance=dist)
    elif direction == 'back':
        peaks, _ = find_peaks(-deriv, distance=dist)

    # detect heel-strikes and toe-offs
    heel_strikes = []
    toe_offs = []
    for peak in peaks:
        peak_height = deriv[peak]
        heel_threshold = 0.35 * abs(peak_height) # heel strike occurs after each peak of the derivative, once the 35% of peak height is crossed
        toe_threshold = 0.3 * abs(peak_height) # toe-off occurs before each peak of the derivative, once the 30% of peak height is crossed

        # heel strikes
        for i in range(peak + 1, len(deriv)):
            if abs(deriv[i]) <= heel_threshold:
                heel_strikes.append(i)
                break
            
        # toe-offs
        for i in range(peak - 1, -1, -1):
            if abs(deriv[i]) <= toe_threshold:
                toe_offs.append(i)
                break

    return heel_strikes, toe_offs

Generating binary time series - 0 for when foot is in the air, 1 for when foot is on the floor: 

In [8]:
def foot_on_floor(Heels, Toes, total_frames):
    """
    Create a series of 0s and 1s indicating if the foot is (1) or isn't (0) on the floor.
    
    Heels (list): List of indexes where the heel-strike occurs.
    Toes (list): List of indexes where the toe-off occurs.
    total_frames (int): Total number of frames in the video.

    """

    foot_on_floor = [0] * total_frames

    # handle case where video starts mid-step (no initial heel strike)
    if Toes[0] < Heels[0]:
        for i in range(0, Toes[0] + 1):  # foot on floor from first frame until first toe-off
            foot_on_floor[i] = 1

    # handle case where video ends mid-step (no final toe-off)
    if Heels[-1] > Toes[-1]:
        for i in range(Heels[-1], total_frames):  # foot stays on floor until last frame
            foot_on_floor[i] = 1
    
    # handle all other regular cases
    for heel, toe in zip(Heels, Toes):
        for i in range(heel, toe+1):  # +1 to include the toe-off frame
            foot_on_floor[i] = 1
    
    return foot_on_floor


#### Finding the average step width
This is calculated by finding the euclidean distance between the ankle joints for each frame, then dividing it by the euclidean distance of the two hip keypoints 

In [9]:
def get_step_width_distances(row):
    """
    Return the distances between left and right ankle, and left and right hips per frame
    """

    ankles = distance.euclidean(row[["RAnkle_x", "RAnkle_y"]], row[["LAnkle_x", "LAnkle_y"]])
    hips = distance.euclidean(row[["LHip_x", "LHip_y"]], row[["RHip_x", "RHip_y"]])

    return pd.Series([ankles, hips])

def get_step_width(df):
    """
    Returns the distance between the two ankles, normalised by the width of the hip
    """
    
    df[['ankle_distances', 'hip_distances']] = df.apply(get_step_width_distances, axis=1)
    return df['ankle_distances'] / df['hip_distances']


#### Finding margin of stability (MOS)

The following systematic review gives a good overview of how this is calculated: https://doi.org/10.1186%2Fs12891-021-04466-4

In [10]:
def find_MOS(df, RHeels, LHeels, direction):
    # estimated center of mass (eCOM) - the middle point between the two hip keypoints
    if direction == "back":
        df["eCOM_x"] = ((df['RHip_x'] - df['LHip_x']) / 2) + df["LHip_x"]
    elif direction == "front":
        df["eCOM_x"] = ((df['LHip_x'] - df['RHip_x']) / 2) + df["RHip_x"]

    # Find the hip width in each frame in order to normalise the leg lengths 
    df["Hip_width"] = abs(df['LHip_x'] - df['RHip_x'])

    # Find leg length in each frame where a heel strike occurs
    # Right leg
    df_RHeels = df.loc[RHeels]
    df_RHeels["LegLen"] = df_RHeels.apply(lambda row: np.sqrt((row['RHip_y'] - row['RAnkle_y'])**2 + (row['RHip_x'] - row['RAnkle_x'])**2) / row['Hip_width'], axis=1)

    # Left leg
    df_LHeels = df.loc[LHeels]
    df_LHeels["LegLen"] = df_LHeels.apply(lambda row: np.sqrt((row['LHip_y'] - row['LAnkle_y'])**2 + (row['LHip_x'] - row['LAnkle_x'])**2) / row['Hip_width'], axis=1)

    # Average leg length 
    meanLegLen = np.mean(df_RHeels.LegLen.tolist() + df_LHeels.LegLen.tolist())

    # Finding the extrapolated center of mass (XCOM)
    omega = np.sqrt(9.81/meanLegLen); # the natural frequency of the pendulum-like leg
    vCOM = np.gradient(df.eCOM_x)

    XCOM = df.eCOM_x + (vCOM/omega)


    # Calculating the estimated margin of stability (eMOS) in the mediolateral direction

    # first, finding the x-axis pixel difference from the XCOM to the right and left ankle, normalised by the hip width
    if direction == "front":
        RMOS0 =  (-XCOM + df.RAnkle_x)/df.Hip_width; 
        LMOS0 =  (-XCOM + df.LAnkle_x)/df.Hip_width;
    elif direction == "back":
        RMOS0 =  (XCOM - df.RAnkle_x)/df.Hip_width; 
        LMOS0 =  (XCOM - df.LAnkle_x)/df.Hip_width;

    # as explained in the paper https://www.ncbi.nlm.nih.gov/pmc/articles/PMC8240253/#CR7, the code above is the calcualtion equivalent to the formula
    # MOS = -XCOM + BOS
    # Where the BOS is the base of support. When walking, the base of support switches from the left to the right and vice versa on each step.
    # Hence, the RMOS may only exist from the moment the right heel strikes, until the following left heel strike, and vice versa:

    df["MOS"] = np.nan

    # NOTE: all left leg MOS values have been multiplied by -1 in order for the MOS to signify the same type of instability, regardless of what leg is being stood on. 
    # This is set up such that a large +ve value indicates instability in the lateral direction, and a large -ve value indicates instability in the medial direction.

    if min(RHeels) < min(LHeels):
        for j in range(0, min(RHeels)):
            df['MOS'].iloc[j] = LMOS0[j] * -1
    else:
        for j in range(0, min(LHeels)):
            df['MOS'].iloc[j] = RMOS0[j]


    # Loop through each full right step for RMOS
    for i, HeelLoc in enumerate(RHeels):
        # find at which video frame the next left heel strike is occuring, unless the video ends before the next strike
        try:
            endLoc = LHeels[i] if LHeels[i] > HeelLoc else LHeels[i+1]
        except IndexError:
            endLoc = df.index[-1]

        # add the values between the heel strike and the following opposing leg heel strike to the MOS
        for j in range(HeelLoc, endLoc + 1):
            df['MOS'].iloc[j] = RMOS0[j]


    # Loop through each full right step for RMOS
    for i, HeelLoc in enumerate(LHeels):
        # find at which video frame the next right heel strike is occuring, unless the video ends before the next strike
        try:
            endLoc = RHeels[i] if RHeels[i] > HeelLoc else RHeels[i+1]
        except IndexError:
            endLoc = df.index[-1]
        
        # add the values between the heel strike and the following opposing leg heel strike to the MOS
        for j in range(HeelLoc, endLoc + 1):
            df['MOS'].iloc[j] = LMOS0[j] * -1
            
    
    if df["MOS"].isna().any():
        raise ValueError(f"MOS contains NaNs.")
    
    return df.MOS


In [11]:
def gait_variable_preprocessing(df, direction):
    # find heel strikes and toe offs of each leg
    RHeels, RToes = detect_steps(df.RAnkle, direction=direction)
    LHeels, LToes = detect_steps(df.LAnkle, direction=direction)

    # generate binary time series to descripe when each foot is placed on the floor
    df["RAnkle_onfloor"] = foot_on_floor(RHeels, RToes, len(df))
    df["LAnkle_onfloor"] = foot_on_floor(LHeels, LToes, len(df))

    # calculate step width per frame
    df["step_width"] = get_step_width(df)

    # calculate the margin of stability (MOS)
    df["MOS"] = find_MOS(df, RHeels, LHeels, direction=direction)

    return df

## Normalisation

In [None]:
def normalise_keypoints(df):
    keypoints = ["RAnkle", "LAnkle", "RKnee", "LKnee", "RHip", "LHip"]

    # look for the max and min value of the x and y axes in order to normalise to that scale

    

## Batch Preprocessing

Run this section to preprocess an entire directory of CSV files at a time.

Load input file directory:

In [None]:
tkinter.Tk().withdraw() # prevents an empty tkinter window from appearing
dir_path = filedialog.askdirectory()

In [None]:
# all CSV files in the parent directory
csv_files = [f for f in os.listdir(dir_path) if f.endswith('.csv')]