# Determining Vertical and Horizontal Parameters for Automated Foot Slip Identification

Copyright 2024 F. Hoffmann-La Roche AG

   Licensed under the Apache License, Version 2.0 (the "License");
   you may not use this file except in compliance with the License.
   You may obtain a copy of the License at

       http://www.apache.org/licenses/LICENSE-2.0

   Unless required by applicable law or agreed to in writing, software
   distributed under the License is distributed on an "AS IS" BASIS,
   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   See the License for the specific language governing permissions and
   limitations under the License.

## Importing required libraries

In [None]:
%load_ext autoreload
%autoreload 2

from pathlib import Path
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from utils import auxiliaryFunctions as af
from utils import beam_walk as bw
from sklearn.metrics import cohen_kappa_score

## Retrieving DeepLabCut Tracking Data and Human-Annotated Foot Slip Labels

In [None]:
# Set the folder path where the video files are located
folder_path = 'path/to/threshold_dataset'  # Replace with your actual folder path
string_to_exclude = 'labels'

# Initialize Path object and file extensions
p = Path(folder_path)

# Collect all video file names without the extension
files = [file.stem for file in p.glob('*.csv') if string_to_exclude not in file.stem]
labeled_files = [file.stem for file in p.glob('*labels.csv')]

# Initialize dataframes to store DeepLabCut predictions and target data
total_dlc_df = pd.DataFrame()
total_target_df = pd.DataFrame()

# Initialize list and counters for foot slip analysis
dist_list = []
fs_n = 0  # Total number of foot slips
f_videos = 0  # Number of videos with foot slips
double_fs = 0  # Number of double foot slips

# Process each video file
for i, file in enumerate(files):
    
    if file.replace('_filtered', '_labels') in labeled_files:
        print(f'Analyzing file: {file}')

        # Extract DeepLabCut predictions and target data using custom functions
        dlc_df, target_df = af.DLCandTarget(folder_path, file)

        # Calculate the coordinates of the top edge of the beam
        m, c = af.calculateBeamCoord(dlc_df)

        # Normalize DeepLabCut dataframe by removing the beam y coordinates
        norm_dlc_df = af.removeBeamCoord(dlc_df, m, c)

        # Append target data and remove low likelihood points
        norm_dlc_df['target'] = target_df
        correct_norm_dlc_df = af.removeLowLikelihood(norm_dlc_df)

        # Separate the target column into a different dataframe
        correct_dlc_df = correct_norm_dlc_df.iloc[:, :-1]
        correct_target_df = correct_norm_dlc_df.iloc[:, -1]

        # Analyze foot slips
        single_fs_n, single_fs_idx = af.analyzeFootSlips(correct_target_df)

        # Update total number of foot slips
        fs_n += single_fs_n

        # Check for videos with more than one foot slip
        if single_fs_n > 1:
            f_videos = f_videos + 1
            double_fs = single_fs_n
            dist_fs = np.diff(single_fs_idx)

            # Append distances between foot slips to the list
            dist_list.extend(dist_fs)


        # Concatenate dataframes from different videos
        if i == 0:
            total_dlc_df = correct_dlc_df
            total_target_df = correct_target_df
            
            
        else:
            total_dlc_df = pd.concat([total_dlc_df, correct_dlc_df], axis=0, ignore_index=True)
            total_target_df = pd.concat([total_target_df, correct_target_df], axis=0, ignore_index=True)
        
        

            
# After processing all videos, you can now work with total_dlc_df and total_target_df

## Investigating the distribution of time intervals between distinct foot slips

In [None]:
dist_df = pd.DataFrame(dist_list, columns=['distance'])
fig = plt.figure(figsize=(9,9))
sns.set(font_scale=10)
sns.set(style='white')
counts, bins = np.histogram(dist_list, bins = 100)

b = sns.histplot(data=dist_list, bins=100, kde=True, element="step", color='#A0B9C6', line_kws={'linewidth': 2})
b.lines[0].set_color('#1E212B')

sns.set(context='poster')
b.set_xlabel("Distance in frames",fontsize=20)
b.set_ylabel("Counts",fontsize=20)
b.tick_params(labelsize=15)
print('Selected horizontal threshold:', np.quantile(dist_list, 0.05))


# Exploration for the Optimal Vertical Threshold

In [None]:
# Select the 'y' coordinate of the 'hindPaw' from the dataframe
y_total_dlc_df = total_dlc_df[[('hindPaw', 'y')]]

# Define a list of thresholds from 0 to 29
thresholds = [x for x in range(30)]

# Initialize lists to store TP, FN, and FP for each threshold
TP_list = []
FN_list = []
FP_list = []

# Define a threshold for the number of frames
h_th = 32

# Loop over each threshold
for th in thresholds:
    # Create a binary prediction where 1 indicates a foot slip (y-coordinate > threshold)
    pred = y_total_dlc_df['hindPaw']>th
    pred = pred*1

    for i in range(pred.shape[0]-h_th):
        if (pred['y'].iloc[i] == 1 and pred['y'].iloc[i+1] == 0):
            if sum(pred['y'].iloc[i+1:i+10] > 0):
                pred.loc[pred.index[i:i + h_th], 'y'] = 1

    # Initialize counters for TP, FN, and FP
    TP = 0
    FN = 0
    FP = 0
    count = 0

    # Loop over each frame, excluding the first frame
    for i in range(1, pred.shape[0]):
        # If a foot slip is predicted, increment the count by the ground truth value
        if (pred['y'].iloc[i] == 1):
            count = count + total_target_df[i]

        # If a non-slip is predicted
        if (pred['y'].iloc[i] == 0):
            # If the previous frame was a foot slip
            if pred['y'].iloc[i-1]:
                # If the count is greater than 0, increment TP and reset count
                if count > 0:
                    TP = TP + 1
                # If the count is 0, increment FP
                else:
                    FP = FP +1
                count = 0

    # Append TP and FP for the current threshold to their respective lists
    TP_list.append(TP)
    FP_list.append(FP)

    # Initialize a counter for FN
    count2 = 0

    # Loop over each frame in the ground truth, excluding the first frame
    for i in range(1, len(total_target_df)):
        # If a foot slip is in the ground truth, increment the count by the prediction value
        if total_target_df[i] == 1:
            count2 = count2 + pred['y'].iloc[i]

        # If a non-slip is in the ground truth
        if total_target_df[i] == 0:
            # If the previous frame was a foot slip
            if total_target_df[i-1] == 1:
                # If the count is 0, increment FN
                if count2 == 0:
                    FN = FN + 1
            count2 = 0

    # Append FN for the current threshold to its list
    FN_list.append(FN)

# Calculate recall, precision, and F1 score for each threshold
Recall = np.array(TP_list)/(np.array(TP_list) + np.array(FN_list))
Precision = np.array(TP_list)/(np.array(TP_list) + np.array(FP_list))
f1_score = 2*(Precision*Recall)/(Precision+Recall)

# Calculate the sum of recall and precision for each threshold
R_plus_P = Recall + Precision
# Find the maximum value of the sum
max_value = max(R_plus_P)

# Find the threshold(s) that give the maximum sum
Best_threshold = [thresholds[index] for index, item in enumerate(R_plus_P) if item == max_value]

# Print the best threshold and its recall and precision
print('Selected vertical threshold:', Best_threshold)
print('Recall:', Recall[Best_threshold])
print('Precision:', Precision[Best_threshold])

In [None]:
fig = plt.figure(figsize = (9,9))
sns.set(style='white')
sns.despine()
sns.scatterplot(x=Recall, y=Precision, palette='Blues', hue=thresholds, s=140)
plt.xlabel('Recall', fontsize=20)
plt.ylabel('Precision', fontsize=20)
plt.tick_params(labelsize=15)
plt.legend(title='Threshold (px)', loc='lower left', fontsize=20, title_fontsize=20)

# Validation of the selected parameters in new videos

In [None]:
# Selected threshold
h_th = 32 # Horizontal threshold, in frames
y_th = 18 # Vertical threshold, in pixels

In [None]:
p = folder_path = 'path/to/validation_dataset'
avi_files = [os.path.splitext(f)[0] for f in os.listdir(p) if f.endswith('.avi')]
csv_files = [os.path.splitext(f)[0] for f in os.listdir(p) if f.endswith('.csv')]
txt_files = [os.path.splitext(f)[0] for f in os.listdir(p) if f.endswith('.txt')]


total_target = np.array([])
total_dlc_df = pd.DataFrame(columns = ['hindPaw_y'])

for i,fi in enumerate(avi_files):
    dlc_df = pd.read_csv(os.path.join(p, fi + '.csv'), 
                         header=[1,2],
                        dtype =np.float64, 
                        index_col=0)
    m,c = af.calculateBeamCoord(dlc_df)
    norm_dlc_df = af.removeBeamCoord(dlc_df, m, c)
    
    
    if 'Result_' + fi in txt_files:
        
        
        file_name = 'Result_' + fi
        file_d = os.path.join(p, file_name + '.txt')
        with open(file_d) as f:
            r_lines = f.readlines()
        f.close()

        temp_scoring = np.zeros((norm_dlc_df.shape[0]))
        # Interesting stuff is happening starting from line 2
        for i in range(2,len(r_lines)):
            r_content = r_lines[i].split()
            temp_scoring[int(r_content[-3]):int(r_content[-2])] = 1

    else:
        
        file_name = fi + '_labels.csv'
        file_d = pd.read_csv(os.path.join(p, file_name), header=[0])
        
        if 'footslips' in file_d.columns:
            temp_scoring = file_d['footslips']
        else:
            temp_scoring = file_d['footslip']
        temp_scoring= np.array(temp_scoring)
    
    results_df = pd.DataFrame({'scoring':temp_scoring, 'hindPaw_y':norm_dlc_df[('hindPaw', 'y')], 'hindPaw_likelihood':norm_dlc_df[('hindPaw', 'likelihood')]})
    results_df = results_df[results_df.hindPaw_likelihood > 0.95]
    
    total_target = np.concatenate((total_target, np.array(results_df.scoring)))
    total_dlc_df = pd.concat([total_dlc_df, results_df[['hindPaw_y']]], axis=0, ignore_index=True)

    
TP_list = []
FN_list = []
FP_list = []


pred = total_dlc_df> y_th
pred = pred*1

# (3) I need to incorporate the temporal threshold
for i in range(pred.shape[0]-h_th):
    if (pred['hindPaw_y'].iloc[i] == 1 and pred['hindPaw_y'].iloc[i+1] == 0):
         if sum(pred['hindPaw_y'].iloc[i+1:i+h_th] > 0):
                for j in range(h_th):
                    pred['hindPaw_y'].iloc[i+j] = 1


                    
TP = 0
FN = 0
FP = 0
count = 0

# (4) I count the number of true positives

for i in range(1, pred.shape[0]):
    if (pred['hindPaw_y'].iloc[i] == 1):
        count = count + total_target[i]

    if (pred['hindPaw_y'].iloc[i] == 0):
        if pred['hindPaw_y'].iloc[i-1]:
            if count > 0:
                TP = TP + 1
            else:
                FP = FP +1
            count = 0
            
TP_list.append(TP)
FP_list.append(FP)

count2 = 0
for i in range(1, len(total_target)):
    if total_target[i] == 1:
        count2 = count2 + pred['hindPaw_y'].iloc[i]

    if total_target[i] == 0:
        if total_target[i-1] == 1:
            if count2 == 0:
                FN = FN + 1
        count2 = 0
FN_list.append(FN)
                    
        
Recall = np.array(TP_list)/(np.array(TP_list) + np.array(FN_list))
Precision = np.array(TP_list)/(np.array(TP_list) + np.array(FP_list))
f1_score = 2*(Precision*Recall)/(Precision+Recall)

print('Recall:', Recall)
print('Precision:', Precision)

# Agreement between scorers

In [None]:
# Define the path to the dataset
path = '/path/to/agreement_dataset'

# Initialize lists to store the number of foot slips annotated by each scorer for each video
bfs = [] # Scorer 4
rfs = [] # Scorer 2
mfs = [] # Scorer 3
ffs = [] # Scorer 1

# Get the names of all .mp4 files in the dataset directory
p = Path(path)
video_files = [i.stem for i in p.glob('**/*.mp4')]

# Loop over each video
for n, video in enumerate(video_files):
    # Read the deeplabcut csv file for the current video
    dlc_path = os.path.join(p, 'Result_' +video + '.csv')
    dlc_df = pd.read_csv(dlc_path, header=[1,2], dtype =np.float64, index_col=0)

    # Read the foot slip annotations for Scorer 1
    f_scoring = pd.read_csv(os.path.join(p, 'Result_' + video + '_labels.csv'), header=[0], dtype =np.float64, index_col=0)['footslips']
    f_scoring = np.array(f_scoring)

    # Calculate the total number of foot slips annotated by Scorer 1
    fFootslips = sum(f_scoring[i] == 1 and f_scoring[i-1] == 0 for i in range(1, len(f_scoring)))
    ffs.append(fFootslips)

    # Read the foot slip annotations for Scorer 2
    with open(os.path.join(p, 'Result_' +  video + '_R.txt')) as f:
        r_lines = f.readlines()
    r_scoring = np.zeros((dlc_df.shape[0]))
    for i in range(2,len(r_lines)):
        r_content = r_lines[i].split()
        r_scoring[int(r_content[-3]):int(r_content[-2])] = 1

    # Calculate the total number of foot slips annotated by Scorer 2
    rFootslips = sum(r_scoring[i] == 1 and r_scoring[i-1] == 0 for i in range(1, len(r_scoring)))
    rfs.append(rFootslips)

    # Read the foot slip annotations for Scorer 3
    with open(os.path.join(p, 'Result_' + video + '_M.txt')) as f:
        m_lines = f.readlines()
    m_scoring = np.zeros((dlc_df.shape[0]))
    for i in range(2,len(m_lines)):
        m_content = m_lines[i].split()
        m_scoring[int(m_content[-3]):int(m_content[-2])] = 1

    # Calculate the total number of foot slips annotated by Scorer 3
    mFootslips = sum(m_scoring[i] == 1 and m_scoring[i-1] == 0 for i in range(1, len(m_scoring)))
    mfs.append(mFootslips)

    # Read the foot slip annotations for Scorer 4
    with open(os.path.join(p, 'Result_' + video + '_B.txt')) as f:
        b_lines = f.readlines()
    b_scoring = np.zeros((dlc_df.shape[0]))
    for i in range(2,len(b_lines)):
        b_content = b_lines[i].split()
        b_scoring[int(b_content[-3]):int(b_content[-2])] = 1

    # Calculate the total number of foot slips annotated by Scorer 4
    bFootslips = sum(b_scoring[i] == 1 and b_scoring[i-1] == 0 for i in range(1, len(b_scoring)))
    bfs.append(bFootslips)

    # Concatenate the foot slip annotations from all scorers into a single dataframe
    if n == 0:
        scoring_df = pd.DataFrame({'S1':f_scoring, 'S2':r_scoring, 'S3':m_scoring, 'S4':b_scoring})
    else:
        prov_df = pd.DataFrame({'S1':f_scoring, 'S2':r_scoring, 'S3':m_scoring, 'S4':b_scoring})
        scoring_df = pd.concat([scoring_df, prov_df], axis=0, ignore_index=True)

# Create a dataframe with the total number of foot slips annotated by each scorer for each video
foot_df = pd.DataFrame({'video':video_files, 'S1': ffs, 'S2':rfs, 'S3':mfs, 'S4':bfs})
foot_df = foot_df.set_index('video', drop=True)

# Transpose the dataframe
new_df_t = foot_df.T

In [None]:
tot_pred = pd.DataFrame()
tfs = []
trial_list = []
extension2 = '.csv'
for video in video_files:
    print(video)
    video_path = os.path.join(p, video + '.mp4')
    dlc_path = os.path.join(p, 'Result_' + video + extension2)
    
    dlc_df = pd.read_csv(
        dlc_path, 
        header=[1,2], #setting the first 2 rows as column names
        dtype =np.float64, #defining the type of the objects inside the dataframe
        index_col=0
    )
    
   
    trial = af.Trial(video_path, dlc_df, 120)
    
    trial_list.append(trial)

    tfs.append(trial.footslips)

new_df_t.loc['Threshold'] = tfs

transposed_df = new_df_t.T
k = np.zeros((5,5))


r=0
for c in range(transposed_df.shape[1]):
    j=0
    for t in range(transposed_df.shape[1]):
        kappa = cohen_kappa_score(transposed_df.iloc[:,c], transposed_df.iloc[:,t])
        k[j][r] = kappa
        j=j+1
    r=r+1



In [None]:
cohen = pd.DataFrame(
    k,
    columns = new_df_t.index,
    index = new_df_t.index
)

cohen.to_csv('agreementCohenK.csv')
sns.set(font_scale=2)
fig = plt.figure(figsize=(10,10))
sns.heatmap(cohen,
            square=True, linewidths=.5, cmap='crest', annot=True)