In [1]:
import cv2
import mediapipe as mp
import pandas as pd
import numpy as np
from scipy.interpolate import CubicSpline
from scipy.signal import butter, filtfilt
import pandas.api.types as ptypes
from scipy.signal import find_peaks, argrelextrema
import os

# Calculate stance times for left and right feet
fps = 30  # Replace with the actual FPS of your video


# Function Definitions

def calculate_step_time(heel_strikes, fps):
    step_times = []
    # We need at least two heel strikes to calculate step time
    if len(heel_strikes) > 1:
        for i in range(len(heel_strikes) - 1):
            # Step time: from n-th heel strike to (n+1)-th heel strike
            step_time = (heel_strikes[i+1] - heel_strikes[i]) / fps
            step_times.append(step_time)
    return np.mean(step_times) if step_times else None


def calculate_stance_time(heel_strikes, toe_offs, fps):
    stance_times = []
    for heel_strike, toe_off in zip(heel_strikes, toe_offs):
        # Assuming heel_strike comes before toe_off, which is typical in gait cycles.
        if toe_off > heel_strike:
            stance_time = (toe_off - heel_strike) / fps
            stance_times.append(stance_time)
    return np.mean(stance_times) if stance_times else None


# Function to calculate swing time based on heel strikes and toe-offs
def calculate_swing_time(heel_strikes, toe_offs, fps):
    swing_times = []
    # Ensure that there is at least one toe-off followed by another heel strike
    if len(toe_offs) > 0 and len(heel_strikes) > 1:
        for i in range(len(toe_offs)):
            if i+1 < len(heel_strikes):
                # Swing time: from n-th toe-off to (n+1)-th heel strike
                swing_time = (heel_strikes[i+1] - toe_offs[i]) / fps
                swing_times.append(swing_time)
    return np.mean(swing_times) if swing_times else None

# Define a function to find local maxima for heel strikes and minima for toe-offs
def find_heel_strike_and_toe_off(distances):
    # Find local maxima for heel strikes
    heel_strikes = find_peaks(distances)[0]
    # Find local minima for toe-offs
    toe_offs = argrelextrema(distances.values, np.less)[0]
    return heel_strikes, toe_offs

# Function to calculate double support time
def calculate_double_support_time(heel_strikes_one_leg, toe_offs_contralateral_leg, fps):
    double_support_times = []
    # Iterate through the smaller list of events to ensure matching pairs
    min_events = min(len(heel_strikes_one_leg), len(toe_offs_contralateral_leg))
    if min_events > 0:
        for i in range(min_events - 1):
            # Double support time: from n-th toe-off of one leg to (n+1)-th heel strike of the opposite leg
            if toe_offs_contralateral_leg[i] < heel_strikes_one_leg[i+1]:
                double_support_time = (heel_strikes_one_leg[i+1] - toe_offs_contralateral_leg[i]) / fps
                double_support_times.append(double_support_time)
    return np.mean(double_support_times) if double_support_times else None


def apply_butterworth_lowpass_filter(data, order=10, cutoff_frequency=0.1752):
    # Calculate the Nyquist frequency
    nyquist_frequency = 0.5
    # Calculate the cutoff frequency as a fraction of the Nyquist frequency
    normalized_cutoff = cutoff_frequency / nyquist_frequency
    # Get the filter coefficients
    b, a = butter(order, normalized_cutoff, btype='low', analog=False)
    # Apply the filter
    filtered_data = filtfilt(b, a, data)
    return filtered_data

def fill_gaps_with_interpolation(column):
    # Indices of available and missing data
    available_data_idx = np.where(~np.isnan(column))[0]
    missing_data_idx = np.where(np.isnan(column))[0]
    
    # If there are no missing values or only a single value, return the column as is
    if len(missing_data_idx) == 0 or len(available_data_idx) < 2:
        return column

# Function to calculate 3D distance between two points.
def calculate_3d_distance(x1, y1, z1, x2, y2, z2):
    return np.sqrt((x2 - x1) ** 2 + (y2 - y1) ** 2 + (z2 - z1) ** 2)

# Initialize MediaPipe Pose.
mp_pose = mp.solutions.pose
pose = mp_pose.Pose(static_image_mode=False, model_complexity=1, enable_segmentation=False, smooth_landmarks=True)

# Video input from file on PC instead of webcam.
video_path = "F:\\Download Folder\\modelwalking.mp4"  # Replace with your video file path.
cap = cv2.VideoCapture(video_path)

# Specify the landmarks you want to save.
landmarks_of_interest = {
    23: 'left_hip',
    24: 'right_hip',
    31: 'left_foot_index',
    32: 'right_foot_index'
}

# Prepare data storage for CSV.
columns = [landmarks_of_interest[i] + '_x' for i in landmarks_of_interest] + \
          [landmarks_of_interest[i] + '_y' for i in landmarks_of_interest] + \
          [landmarks_of_interest[i] + '_z' for i in landmarks_of_interest] + \
          [landmarks_of_interest[i] + '_visibility' for i in landmarks_of_interest]
landmark_data = []

while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break

    # Convert the BGR image to RGB and process it with MediaPipe Pose.
    frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    results = pose.process(frame_rgb)

    # Check if any landmarks are detected and extract them.
    if results.pose_landmarks:
        # Extract visibility of feet
        left_foot_visibility = results.pose_landmarks.landmark[31].visibility
        right_foot_visibility = results.pose_landmarks.landmark[32].visibility

        frame_landmarks = []
        for idx, landmark_name in landmarks_of_interest.items():
            if (landmark_name == 'left_foot_index' and left_foot_visibility < 0.2) or \
               (landmark_name == 'right_foot_index' and right_foot_visibility < 0.2):
                # If visibility is low, append NaN
                frame_landmarks.extend([np.nan, np.nan, np.nan, np.nan])
            else:
                # Otherwise, append the actual landmark data
                landmark = results.pose_landmarks.landmark[idx]
                frame_landmarks.extend([landmark.x, landmark.y, landmark.z, landmark.visibility])
                
        landmark_data.append(frame_landmarks)

# Release the video file.
cap.release()


# Convert the landmark data into a pandas DataFrame
df = pd.DataFrame(landmark_data, columns=columns)

# Interpolate missing values and apply Butterworth filter for each column
for col in df.columns:
    if ptypes.is_numeric_dtype(df[col]):
        df[col] = fill_gaps_with_interpolation(df[col].values)
        df[col] = apply_butterworth_lowpass_filter(df[col].values)

# Calculate 3D distances after interpolation and filtering
df['right_hip_to_left_foot_distance'] = calculate_3d_distance(
    df['right_hip_x'], df['right_hip_y'], df['right_hip_z'],
    df['left_foot_index_x'], df['left_foot_index_y'], df['left_foot_index_z']
)

df['right_hip_to_right_foot_distance'] = calculate_3d_distance(
    df['right_hip_x'], df['right_hip_y'], df['right_hip_z'],
    df['right_foot_index_x'], df['right_foot_index_y'], df['right_foot_index_z']
)

df['left_hip_to_left_foot_distance'] = calculate_3d_distance(
    df['left_hip_x'], df['left_hip_y'], df['left_hip_z'],
    df['left_foot_index_x'], df['left_foot_index_y'], df['left_foot_index_z']
)

df['left_hip_to_right_foot_distance'] = calculate_3d_distance(
    df['left_hip_x'], df['left_hip_y'], df['left_hip_z'],
    df['right_foot_index_x'], df['right_foot_index_y'], df['right_foot_index_z']
)


# Calculate heel strike and toe-off events for the left and right feet
df['left_heel_strike_events'] = np.nan
df['right_heel_strike_events'] = np.nan
df['left_toe_off_events'] = np.nan
df['right_toe_off_events'] = np.nan

left_heel_strikes, left_toe_offs = find_heel_strike_and_toe_off(df['left_hip_to_left_foot_distance'])
right_heel_strikes, right_toe_offs = find_heel_strike_and_toe_off(df['right_hip_to_right_foot_distance'])

df.loc[left_heel_strikes, 'left_heel_strike_events'] = df.loc[left_heel_strikes, 'left_hip_to_left_foot_distance']
df.loc[left_toe_offs, 'left_toe_off_events'] = df.loc[left_toe_offs, 'left_hip_to_left_foot_distance']
df.loc[right_heel_strikes, 'right_heel_strike_events'] = df.loc[right_heel_strikes, 'right_hip_to_right_foot_distance']
df.loc[right_toe_offs, 'right_toe_off_events'] = df.loc[right_toe_offs, 'right_hip_to_right_foot_distance']


# Save the DataFrame to a CSV file.
csv_path = 'selected_pose_landmarks_filtered_with_distances.csv'  # Modify as needed.
df.to_csv(csv_path, index=False)

print(f"Landmarks with filled, filtered data and distances saved to {csv_path}")

left_stance_time = calculate_stance_time(left_heel_strikes, left_toe_offs, fps)
right_stance_time = calculate_stance_time(right_heel_strikes, right_toe_offs, fps)

print(f"Average Left Stance Time: {left_stance_time} seconds")
print(f"Average Right Stance Time: {right_stance_time} seconds")


left_swing_time = calculate_swing_time(left_heel_strikes, left_toe_offs, fps)
right_swing_time = calculate_swing_time(right_heel_strikes, right_toe_offs, fps)

print(f"Average Left Swing Time: {left_swing_time} seconds")
print(f"Average Right Swing Time: {right_swing_time} seconds")


left_step_time = calculate_step_time(left_heel_strikes, fps)
right_step_time = calculate_step_time(right_heel_strikes, fps)

print(f"Average Left Step Time: {left_step_time} seconds")
print(f"Average Right Step Time: {right_step_time} seconds")



double_support_time_left_then_right = calculate_double_support_time(left_heel_strikes, right_toe_offs, fps)
double_support_time_right_then_left = calculate_double_support_time(right_heel_strikes, left_toe_offs, fps)

# Calculate the average of both double support times
average_double_support_time = np.nanmean([double_support_time_left_then_right, double_support_time_right_then_left])

print(f"Average Double Support Time: {average_double_support_time} seconds")

# Before deletion, print the file path to confirm
print(f"Preparing to delete: {csv_path}")

# Delete the CSV file
if os.path.exists(csv_path):
    os.remove(csv_path)
    print(f"Deleted {csv_path}")
else:
    print(f"The file {csv_path} does not exist.")



results_df = pd.DataFrame()
results_df['Average Left Stance Time'] = [left_stance_time]
results_df['Average Right Stance Time'] = [right_stance_time]
results_df['Average Left Swing Time'] = [left_swing_time]
results_df['Average Right Swing Time'] = [right_swing_time]
results_df['Average Left Step Time'] = [left_step_time]
results_df['Average Right Step Time'] = [right_step_time]
results_df['Average Double Support Time'] = [average_double_support_time]

# Save the DataFrame to a CSV file
results_df.to_csv("yy.csv", index=False)

print("Results successfully saved to yy.csv")






Landmarks with filled, filtered data and distances saved to selected_pose_landmarks_filtered_with_distances.csv
Average Left Stance Time: 0.19509202453987734 seconds
Average Right Stance Time: 0.22824074074074074 seconds
Average Left Swing Time: 0.1605316973415133 seconds
Average Right Swing Time: 0.17972027972027968 seconds
Average Left Step Time: 0.3556237218813906 seconds
Average Right Step Time: 0.4076923076923077 seconds
Average Double Support Time: 2.16 seconds
Preparing to delete: selected_pose_landmarks_filtered_with_distances.csv
Deleted selected_pose_landmarks_filtered_with_distances.csv
Results successfully saved to yy.csv
