In [2]:
import os
import glob

import pandas as pd
import numpy as np
from sklearn.cluster import KMeans

import modules.gait_metrics as gm
import modules.pose_estimation as pe


load_dir = os.path.join('data', 'kinect', 'best pos')
save_dir = os.path.join('data', 'results')

save_name = 'kinect_gait_metrics.csv'

# All files with .pkl extension
file_paths = glob.glob(os.path.join(load_dir, '*.pkl'))
save_path = os.path.join(save_dir, save_name)

df_metrics = pd.read_csv(save_path, index_col=0)


kinect_id = '2014-12-16_P005_Post_004'

matching_files = [f for f in file_paths if kinect_id in f]

for file_path in matching_files:

    df_head_feet = pd.read_pickle(file_path)
    
    # Convert all position vectors to float type 
    # so they can be easily input to linear algebra functions
    df_head_feet = df_head_feet.applymap(pd.to_numeric)

    # %% Peak detection

    # Cluster frames with k means to locate the 4 walking passes
    frames = df_head_feet.index.values.reshape(-1, 1)
    k_means = KMeans(n_clusters=4, random_state=0).fit(frames)

    foot_diff = df_head_feet.L_FOOT - df_head_feet.R_FOOT
    foot_dist = foot_diff.apply(np.linalg.norm)



In [11]:
frame_rate = 30

In [3]:
import modules.general as gen

frame_labels = k_means.labels_

pass_dfs = gm.split_by_pass(df_head_feet, frame_labels)

In [4]:
# Enforce consistent sides for the feet on all walking passes. 
# Also calculate the general direction of motion for each pass.
pass_dfs, pass_directions = zip(*[pe.consistent_sides(df) for df in pass_dfs])

In [5]:
# Estimate frames where foot contacts floor
pass_dfs_contact = [gm.foot_contacts(a, b) for a, b in zip(pass_dfs, pass_directions)]

In [9]:
df_contact_frames = pass_dfs_contact[0]

In [10]:
# Foot positions at the contact frames
df_contact_pos = gen.lookup_values(df_contact_frames, df_head_feet)

In [12]:
df_stride_length = df_contact_pos.diff(periods=-1).applymap(np.linalg.norm)

In [13]:
df_stride_time = abs(df_contact_frames.diff(periods=-1)) / 30

In [14]:
df_stride_velocity = df_stride_length / df_stride_time

In [96]:
series_contact = df_contact_frames.stack().sort_values().astype(int)

In [148]:
df_contact = series_contact.reset_index()
df_contact.columns = ['contact_number', 'part', 'frame']

In [149]:
import modules.pandas_funcs as pf

df_contact = pf.column_from_lookup(df_contact, df_head_feet,
                                column='position', lookup_cols=('frame', 'part'))

In [150]:
df_contact

Unnamed: 0,contact_number,part,frame,position
0,0,R_FOOT,234,"[-79.2984, -59.659, 254.924]"
1,0,L_FOOT,246,"[-18.4634, -63.777, 250.636]"
2,1,R_FOOT,262,"[40.0823, -63.9547, 263.324]"


In [152]:
df_foot = df_contact[df_contact.part == 'R_FOOT']

In [153]:
df_l, df_r = [x for _, x in df_contact.groupby('part')]

In [154]:
df_l

Unnamed: 0,contact_number,part,frame,position
1,0,L_FOOT,246,"[-18.4634, -63.777, 250.636]"


In [156]:
stride_length_r = df_r.position.diff(periods=-1).apply(np.linalg.norm).to_frame()
stride_length_r.columns = ['stride_length']

In [157]:
df_contact.merge(stride_length_r, left_index=True, right_index=True)

Unnamed: 0,contact_number,part,frame,position,stride_length
0,0,R_FOOT,234,"[-79.2984, -59.659, 254.924]",119.752931
2,1,R_FOOT,262,"[40.0823, -63.9547, 263.324]",


In [168]:
df_foot = df_contact.copy()

df_foot.columns = df_foot.columns.str.lower()
df_foot['side'] = df_foot.part.str[0]

df_foot = df_foot.drop('part', axis=1)

In [169]:
df_foot

Unnamed: 0,contact_number,frame,position,side
0,0,234,"[-79.2984, -59.659, 254.924]",R
1,0,246,"[-18.4634, -63.777, 250.636]",L
2,1,262,"[40.0823, -63.9547, 263.324]",R


In [183]:
a, b, c = [x for x in df_foot.itertuples(index=False, name='Foot')]

In [216]:
from numpy.linalg import norm


def is_stride(foot_a0, foot_b0, foot_a1):
    
    a_a_same_side = foot_a0.side == foot_a1.side
    
    a_a_consecutive_contacts = foot_a0.contact_number == foot_a1.contact_number - 1
    
    a_b_same_contacts = foot_a0.contact_number == foot_b0.contact_number
    
    # Verify that foot A and B are on different sides.
    a_b_different_side = foot_a0.side != foot_b0.side
    
    tests = [a_a_same_side, a_a_consecutive_contacts, 
             a_b_same_contacts, a_b_different_side]
    
    return np.all(tests)


def is_step(foot_a0, foot_b0, foot_a1):
    
    # Verify that the A feet compose a stride
    a_is_stride = is_stride(foot_a0, foot_a1)
    
    # Verify that foot A and B are on different sides.
    a_b_different = foot_a0.side != foot_b0.side
    
    return a_is_stride and a_b_different
    

def stride_length(foot_0, foot_1):
    
    assert is_stride(foot_0, foot_1)
        
    return norm(foot_0.position - foot_1.position)
    

def stride_time(foot_0, foot_1, frame_rate=30):
    
    assert is_stride(foot_0, foot_1)
    
    return (foot_1.frame - foot_0.frame) / frame_rate  


def stride_velocity(foot_0, foot_1, frame_rate=30):
    
    return stride_length(foot_0, foot_1) / stride_time(foot_0, foot_1, frame_rate=frame_rate)


def step_length(foot_a0, foot_b0, foot_a1):
    
    assert is_step(foot_a0, foot_b0, foot_a1)
    
    lin.proj_point_line(self.stance, foot_a0, foot_a1)

In [215]:
stride_velocity(a, c, frame_rate=30)

128.30671217888644

In [217]:
is_step(a, b, c)

True

In [218]:
a

Foot(contact_number=0, frame=234, position=array([-79.2984, -59.659 , 254.924 ]), side='R')

In [219]:
b

Foot(contact_number=0, frame=246, position=array([-18.4634, -63.777 , 250.636 ]), side='L')

In [220]:
c

Foot(contact_number=1, frame=262, position=array([ 40.0823, -63.9547, 263.324 ]), side='R')