In [4]:
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
from scipy.signal import find_peaks
from scipy import signal
import statistics as st
from scipy import interpolate
import sys
import importlib.util
import os
df = pd.read_csv("csv/data/23/2.csv")

## BY default integration

In [5]:
def integrate(delta_time_cycle, integral_cycle,cycle,freq):
    # Reset the indices of the Series
    delta_time_cycle = delta_time_cycle.reset_index(drop=True)
    integral_cycle = integral_cycle.reset_index(drop=True)

    # Ensure all data is numeric
    delta_time_cycle = pd.to_numeric(delta_time_cycle, errors='coerce')
    integral_cycle = pd.to_numeric(integral_cycle, errors='coerce')

    # Initialize velocity array for this cycle
    velocity_arr = np.zeros(len(delta_time_cycle))
    distance_arr = np.zeros(len(delta_time_cycle))

    # Apply integrate function to calculate velocity
    for index in range(len(delta_time_cycle)):
        if cycle==0:
            if index == 0:
                velocity_arr[index] = 0  # Initial velocity is zero
                distance_arr[index] = 0
        
        if pd.isna(delta_time_cycle[index]) or pd.isna(integral_cycle[index]):
            velocity_arr[index] = velocity_arr[index - 1]  # Skip calculation if data is NaN
            distance_arr[index] = distance_arr[index - 1]
        else:
            velocity_arr[index] = velocity_arr[index - 1] + integral_cycle[index] * (1/freq)
            distance_arr[index] = distance_arr[index - 1] +(1/freq)* velocity_arr[index]


    return velocity_arr, distance_arr

def cycleIntegralFeatures(delta_time_cycle,integral_cycle, velocity_cycle_rmean, velocity_cycle_median,cycle, freq):
    # distance 

    v_arr, distance_of_cycle = integrate(delta_time_cycle, integral_cycle,cycle,freq) 
    v_rmean = np.mean(np.abs(v_arr))
    velocity_cycle_rmean.append(v_rmean)
    velocity_cycle_median.append(np.median(v_arr))
    return  velocity_cycle_rmean, velocity_cycle_median

In [6]:
# velocityX_Mean, velocityX_Median = [], []
# velocityY_Mean, velocityY_Median = [], []
# velocityZ_Mean, velocityZ_Median = [], []

# for i in range(len(idxHS) - 1):  # Loop through each pair of consecutive heel strikes
#     start_idx = idxHS[i]
#     end_idx = idxHS[i + 1]
#     # Extract indices between two consecutive heel strikes
#     indices_between = range(start_idx, end_idx)
#     delta_time_cycle = df1["deltaTime"].iloc[indices_between]
#     integral_cycle_x = df1[ 'accX_C'].iloc[indices_between]
#     integral_cycle_Y = df1[ 'accY_C'].iloc[indices_between]
#     integral_cycle_Z = df1[ 'accZ_C'].iloc[indices_between]
#     print("cycle:",i)
    
#     velocityX_Mean,velocityX_Median = cycleIntegralFeatures(delta_time_cycle, integral_cycle_x , velocityX_Mean, velocityX_Median,i, 100)
#     velocityY_Mean,velocityY_Median = cycleIntegralFeatures( delta_time_cycle, integral_cycle_Y, velocityY_Mean, velocityY_Median)
#     velocityZ_Mean,velocityZ_Median = cycleIntegralFeatures( delta_time_cycle, integral_cycle_Z, velocityZ_Mean, velocityZ_Median)

## Velocity Zero velocity update all cycles

In [7]:
# The function to detect walking samples
def walkingsamples(indexToeOff, indexHeelStrike, sample_frequency, error):
    idxWalking = np.asarray([], dtype=int)
    if error == False:
        # indexToeOff = data['foot']['Gait Events']['Terminal Contact']
        # indexHeelStrike = data['foot']['Gait Events']['Initial Contact']
        # sample_frequency = data['Sample Frequency (Hz)']
        
        idxAllTO = np.sort(np.unique(indexToeOff, axis=None))
        idxAllHS = np.sort(np.unique(indexHeelStrike, axis=None))
        timerange = 5 * sample_frequency
        interval = np.diff(idxAllTO)
        locidx = (np.zeros((idxAllHS[-1],1), dtype = 'int64')).flatten()
        for i in range(len(interval)):
            if interval[i] < timerange:
                startTO = idxAllTO[i]
                stopTO = idxAllTO[i+1]
                locidx[startTO:stopTO] = 1
                idxlastTOinWalking = np.where(locidx == 1)[-1][-1]
                finalHSinWalking = np.where(idxAllHS > idxlastTOinWalking)[0][0]
                idxfinalHSinWalking = idxAllHS[finalHSinWalking]
                locidx[idxlastTOinWalking:idxfinalHSinWalking] = 1
        idxWalking = (np.asarray(np.where(locidx == 1))).flatten()
    else:
        idxWalking = np.array([])
    return idxWalking

def nonactivesamples(length, indexWalkingPeriods, ):
    # length = len(gyro.values)
    # indexWalkingPeriods = data['foot']['Gait Phases']['Walking samples']
    locidx = np.ones((length), dtype='int64')
    for i in range(0,len(indexWalkingPeriods)):
        locidx[indexWalkingPeriods[i]] = 0
    idxNonWalking = (np.argwhere(locidx ==1)).flatten()
    return idxNonWalking

In [9]:
def StationaryData(EFacc, indexMidStance, indexNonActive):
    ntolerance = 10
    stationary = np.zeros((len(EFacc)), dtype='int64')
    for i in range(len(indexMidStance)):
            stationary[indexMidStance[i]] = 1
    for i in range(len(indexNonActive)):
        stationary[indexNonActive[i]] = 1
    b = np.array(np.diff(stationary), dtype=int)
    c = np.where(b != 0)[0]
    for i in range(1, len(c)):
        if (c[i]-c[i-1] < ntolerance) & (stationary[c[i-1]] > 0):
            stationary[c[i-1]+1:c[i]] = 1
    c = np.where(b == 0)[0]
    for i in range(1, len(c)):
        if (c[i]-c[i-1] < ntolerance) & (stationary[c[i-1]] < 0):
            stationary[c[i-1]+1:c[i]] = 1
    stationaryStart = (np.asarray(np.argwhere(np.append(0, np.diff(stationary)) == 1))).flatten()
    stationaryEnd = (np.asarray(np.argwhere(np.append(0, np.diff(stationary)) == -1))).flatten()
    if stationary[0] == 1:
        stationaryStart = np.insert(stationaryStart, 0, 0)
    if stationaryStart[-1] > stationaryEnd[-1]:
        stationaryStart = stationaryStart[0:-1]
    if (len(stationaryStart) > len(stationaryEnd)) & (stationaryStart[-1] > stationaryEnd[-1]):
        stationaryEnd = np.append(stationaryEnd, len(EFacc))
   return stationary, stationaryStart, stationaryEnd



def StationaryData(EFacc, indexMidStance, indexNonActive):
    ntolerance = 10
    stationary = np.zeros((len(EFacc)), dtype='int64')
    
    for i in indexMidStance:
        stationary[i] = 1
    for i in indexNonActive:
        stationary[i] = 1
        
    b = np.array(np.diff(stationary), dtype=int)
    c = np.where(b != 0)[0]
    
    for i in range(1, len(c)):
        if (c[i] - c[i - 1] < ntolerance) & (stationary[c[i - 1]] > 0):
            stationary[c[i - 1] + 1:c[i]] = 1
            
    c = np.where(b == 0)[0]
    
    for i in range(1, len(c)):
        if (c[i] - c[i - 1] < ntolerance) & (stationary[c[i - 1]] < 0):
            stationary[c[i - 1] + 1:c[i]] = 1
            
    stationaryStart = (np.asarray(np.argwhere(np.append(0, np.diff(stationary)) == 1))).flatten()
    stationaryEnd = (np.asarray(np.argwhere(np.append(0, np.diff(stationary)) == -1))).flatten()
    
    if stationary[0] == 1:
        stationaryStart = np.insert(stationaryStart, 0, 0)
        
    if stationaryStart[-1] > stationaryEnd[-1]:
        stationaryStart = stationaryStart[0:-1]
        
    if (len(stationaryStart) > len(stationaryEnd)) & (stationaryStart[-1] > stationaryEnd[-1]):
        stationaryEnd = np.append(stationaryEnd, len(EFacc))
        
    return stationary, stationaryStart, stationaryEnd


IndentationError: unindent does not match any outer indentation level (<tokenize>, line 25)

In [None]:
def functiongaitspeed(EFacc_df, indexMidStance, indexNonActive, time, sample_frequency):
    EFacc = EFacc_df.to_numpy()
    # Detect stationary periods
    magACC = np.sqrt((EFacc[0])**2 + (EFacc[1])**2 + (EFacc[2])**2)
    stationary, stationaryStart, stationaryEnd = StationaryData(EFacc, indexMidStance, indexNonActive)
    # print(stationaryStart)  
    # print(stationaryEnd)
    velocity = np.zeros((np.shape(EFacc)))
    # print(velocity)
    for t in range(1, len(velocity)):
        velocity[t,:] = velocity[t-1,:] + (EFacc[t,:] * 1/sample_frequency)
        if stationary[t] == 1:
            velocity[t,:] = [0, 0, 0] # force zero velocity when foot stationary
        # Compute integral drift during non-stationary periods
    # print(velocity[1])
    velDrift = np.zeros((np.shape(velocity)))
    # print(velDrift)
    for i in range(len(stationaryEnd)):
        driftRate = velocity[stationaryEnd[i]-1, :] / (stationaryEnd[i] - stationaryStart[i])
        enum = (np.array(range(1, (stationaryEnd[i] - stationaryStart[i])))).transpose()
        drift = (np.array([enum * driftRate[0], enum * driftRate[1], enum * driftRate[2]])).transpose()
        velDrift[stationaryStart[i]:stationaryEnd[i]-1, :] = drift
    # print(velDrift)
    # Remove integral drift
    velocity = velocity - velDrift
    # Compute magnitude velocity
    magnitudeVelocity = np.sqrt(velocity[:,0]**2 + velocity[:,1]**2 + velocity[:,2]**2)
    # print(magnitudeVelocity)
    tff = np.argwhere(stationary == 1).flatten()
    velocity2 = np.zeros((np.shape(EFacc)))
    for t in range(1, len(velocity2)):
        velocity2[t,:] = velocity2[t-1,:] + (EFacc[t,:] * 1/sample_frequency)
            
    x_observed = np.unique(tff)
    y_observed = velocity2[x_observed]
    x = np.arange(len(velocity2))
    
    y0 = interpolate.PchipInterpolator(x_observed, y_observed[:,0])
    y1 = interpolate.PchipInterpolator(x_observed, y_observed[:,1])
    y2 = interpolate.PchipInterpolator(x_observed, y_observed[:,2])
    
    velocity2_corr = np.zeros(shape=velocity2.shape)
    velocity2_corr[x,0] = velocity2[x,0] - y0(x)
    velocity2_corr[x,1] = velocity2[x,1] - y1(x)
    velocity2_corr[x,2] = velocity2[x,2] - y2(x)
    
    for t in range(1, len(velocity2)):
        if stationary[t] == 1:
            velocity2_corr[t,:] = [0, 0, 0] # force zero velocity when foot is stationary
    return magnitudeVelocity

In [None]:
# sample_freq = 100
# errors = False
# selected_columns = ['accX_C', 'accY_C', 'accZ_C']
# EFacc_df = df1[selected_columns]


# idxWalking = walkingsamples(idxTO, idxHS, sample_freq, errors)
# print("walking samples:",idxWalking)

# idxNonWalking = nonactivesamples(len(df1['gyrX_T'].values), idxWalking)
# print("non active samples:", idxNonWalking)

# len(idxWalking) + len(idxNonWalking), df1.shape

## For each cycle

In [None]:
def StationaryData(EFacc, indexMidStance, indexNonActive):
    ntolerance = 10
   
    stationary = np.zeros((len(EFacc)), dtype='int64')
#     print("e:", (EFacc))
#     print("st:", len(stationary))

    for i in range(0,len(indexMidStance)):
        stationary[indexMidStance[i]] = 1
    for i in range(len(indexNonActive)):
        stationary[indexNonActive[i]] = 1
    if (len(indexMidStance) != 0):
        for i in indexMidStance:
            stationary[i] = 1
    else:
        stationary[i] = 0 
            
    if (len(indexNonActive) != 0):   
        for i in indexNonActive:
            stationary[i] = 1
    else:
        stationary[i] = 0
        
    b = np.array(np.diff(stationary), dtype=int)
    c = np.where(b != 0)[0]
    
    for i in range(1, len(c)):
        if (c[i] - c[i - 1] < ntolerance) & (stationary[c[i - 1]] > 0):
            stationary[c[i - 1] + 1:c[i]] = 1
            
    c = np.where(b == 0)[0]
    
    for i in range(1, len(c)):
        if (c[i] - c[i - 1] < ntolerance) & (stationary[c[i - 1]] < 0):
            stationary[c[i - 1] + 1:c[i]] = 1
            
    stationaryStart = (np.asarray(np.argwhere(np.append(0, np.diff(stationary)) == 1))).flatten()
    stationaryEnd = (np.asarray(np.argwhere(np.append(0, np.diff(stationary)) == -1))).flatten()
    print( stationary)
    if stationary[0] == 1:
        stationaryStart = np.insert(stationaryStart, 0, 0)
    if( len(stationaryStart)>0):   
        if stationaryStart[-1] > stationaryEnd[-1]:
            stationaryStart = stationaryStart[0:-1]
        
        if (len(stationaryStart) > len(stationaryEnd)) & (stationaryStart[-1] > stationaryEnd[-1]):
            stationaryEnd = np.append(stationaryEnd, len(EFacc))
        
    return stationary,stationaryStart,stationaryEnd


In [None]:

def functiongaitspeed( EFacc, indexMidStance, indexNonActive, lenght_acc_points, sample_frequency):
#  time = EFacc_df["time"]
#     EFacc_df = EFacc_df.drop(columns=["time"])
    EFacc = EFacc_df.to_numpy()
    
    # Detect stationary periods
    magACC = np.sqrt(EFacc[0]**2 + EFacc[1]**2 + EFacc[2]**2)
    stationary, stationaryStart, stationaryEnd = StationaryData(EFacc, indexMidStance, indexNonActive)
    
    velocity = np.zeros((np.shape(EFacc)))
    
    for t in range(start_idx, end_idx):
        velocity[t, :] = velocity[t-1, :] + (EFacc[t, :] * 1/sample_frequency)
        if stationary[t] == 1:
            velocity[t, :] = [0, 0, 0]  # force zero velocity when foot stationary
    
    velDrift = np.zeros((np.shape(velocity)))
    
    for i in range(np.size(stationaryEnd)):
        if stationaryStart[i] >= start_idx and stationaryEnd[i] <= end_idx:
            driftRate = velocity[stationaryEnd[i]-1, :] / (stationaryEnd[i] - stationaryStart[i])
            enum = (np.arange(1, (stationaryEnd[i] - stationaryStart[i]))).transpose()
            drift = (np.array([enum * driftRate[0], enum * driftRate[1], enum * driftRate[2]])).transpose()
            velDrift[stationaryStart[i]:stationaryEnd[i]-1, :] = drift
    
    # Remove integral drift
    velocity = velocity - velDrift
    
    # Compute magnitude velocity
    magnitudeVelocity = np.sqrt(velocity[:, 0]**2 + velocity[:, 1]**2 + velocity[:, 2]**2)
    
    return magnitudeVelocity[start_idx:end_idx],velocity[start_idx:end_idx]

In [None]:
def calculate_ef_acceleration(angular_velocity, acc_bf):
    # Ensure angular_velocity and acc_bf are numpy arrays
    angular_velocity = np.array(angular_velocity)
    acc_bf = np.array(acc_bf)
    
    # Check if the input data is 2D (multiple samples)
    if angular_velocity.ndim == 2 and acc_bf.ndim == 2:
        n_samples = angular_velocity.shape[0]
        acc_ef = np.zeros_like(acc_bf)
        
        for i in range(n_samples):
            omega = angular_velocity[i]
            acc = acc_bf[i]
            
            # Calculate the skew-symmetric matrix [omega]_x
            omega_skew = np.array([
                [0, -omega[2], omega[1]],
                [omega[2], 0, -omega[0]],
                [-omega[1], omega[0], 0]
            ])
            
            # Calculate acc_ef = -[omega]_x * (omega x acc_bf)
            acc_ef[i] = -np.dot(omega_skew, np.cross(omega, acc))
            
            # Subtract gravity from the Z-axis component
            acc_ef[i, 2] -= 9.81
        
        return acc_ef

In [None]:
# selected_columns = ['accX_C', 'accY_C', 'accZ_C']
# EFacc_selected = df1[selected_columns]
# angles=df1[['gyrX_C','gyrY_C','gyrZ_C']]
# R=calculate_ef_acceleration(angles, EFacc_selected)


In [None]:
# for i in range(len(idxHS) - 1):  # Loop through each pair of consecutive heel strikes
#     start_idx = idxHS[i]
#     end_idx = idxHS[i + 1]
#     print("cycle:",i)
#     # Extract indices between two consecutive heel strikes
#     indices_between = range(start_idx, end_idx)
#     selected_columns = ['accX_C', 'accY_C', 'accZ_C']
#     EFacc_selected = df1[selected_columns]
#     idxOnsMSw_selected = [idx for idx in idxOnsMSw if start_idx <= idx < end_idx]
 
#     idxNonWalking_selected= [idx for idx in idxNonWalking if start_idx <= idx < end_idx]
#     velocity_r,velocity = functiongaitspeed(  R, idxOnsMSw_selected, idxNonWalking_selected, len(df1['accX_C']), sample_freq)
#     print("velocity", velocity_r)
#     v_rmean = np.mean(velocity_r)
#     print("mean",  v_rmean)
