In [101]:
!pip install mediapipe



In [102]:
!pip install --upgrade numpy
!pip install --upgrade pandas
!pip install --upgrade tensorflow
!pip install --upgrade mediapipe opencv-python

Collecting numpy
  Using cached numpy-2.2.5-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (62 kB)
Using cached numpy-2.2.5-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (16.4 MB)
Installing collected packages: numpy
  Attempting uninstall: numpy
    Found existing installation: numpy 1.26.4
    Uninstalling numpy-1.26.4:
      Successfully uninstalled numpy-1.26.4
[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
mediapipe 0.10.21 requires numpy<2, but you have numpy 2.2.5 which is incompatible.
tensorflow 2.19.0 requires numpy<2.2.0,>=1.26.0, but you have numpy 2.2.5 which is incompatible.
google-colab 1.0.0 requires pandas==2.2.2, but you have pandas 2.2.3 which is incompatible.
tf-keras 2.18.0 requires tensorflow<2.19,>=2.18, but you have tensorflow 2.19.0 which is incompatible.
numba 0.60.0 requires numpy<2.1,>=1.22, bu

In [103]:
import cv2
import mediapipe as mp
import numpy as np
import time
from collections import deque

In [104]:
# Define the indices for relevant pose landmarks based on MediaPipe's model.
# These are standard indices for the 33 landmarks provided by MediaPipe Pose.
LEFT_HIP = 23
RIGHT_HIP = 24
LEFT_ANKLE = 27
RIGHT_ANKLE = 28
LEFT_HEEL = 29
RIGHT_HEEL = 30
LEFT_FOOT_INDEX = 31
RIGHT_FOOT_INDEX = 32

In [105]:
# Class to store gait event information
class GaitEvent:
    def __init__(self, event_type, foot, timestamp, position):
        self.event_type = event_type # 'Heel Strike' or 'Toe Off'
        self.foot = foot # 'Left' or 'Right'
        self.timestamp = timestamp # in seconds
        self.position = position # Landmark position (x, y, z)

In [106]:
# Class to store gait cycle information
class GaitCycle:
    def __init__(self, foot, heel_strike_event, toe_off_event=None):
        self.foot = foot
        self.heel_strike = heel_strike_event
        self.toe_off = toe_off_event
        self.cycle_time = None
        self.stride_length = None
        self.stance_time = None
        self.swing_time = None
        self.double_support_times = [] # List of durations

    def complete_cycle(self, next_heel_strike_time):
        """Calculates cycle time and stride length when the next heel strike occurs."""
        if self.heel_strike and next_heel_strike_time is not None:
            self.cycle_time = next_heel_strike_time - self.heel_strike.timestamp

    def complete_stance_swing(self, toe_off_event):
        """Calculates stance and swing time once toe off is detected."""
        self.toe_off = toe_off_event
        if self.heel_strike and self.toe_off:
            self.stance_time = self.toe_off.timestamp - self.heel_strike.timestamp
            # Swing time is calculated when the next heel strike occurs (part of the next cycle's calculation)
            # self.swing_time = next_heel_strike_time - self.toe_off.timestamp
            pass

In [107]:
# Function to calculate Euclidean distance between two 3D points
def calculate_distance(p1, p2):
    """Calculates the Euclidean distance between two points (x, y, z)."""
    return np.sqrt((p1.x - p2.x)**2 + (p1.y - p2.y)**2 + (p1.z - p2.z)**2)

# Function to detect gait events (Heel Strike and Toe Off)
# This is a simplified example. Robust event detection requires careful algorithm design.
def detect_gait_events(key_point_history, frame_rate):
    """
    Detects gait events (Heel Strike and Toe Off) based on key point movement.

    Args:
        key_point_history: A deque of (timestamp, landmarks) tuples.
        frame_rate: The frame rate of the video.

    Returns:
        A list of detected GaitEvent objects in the current frame.
    """
    detected_events = []
    if len(key_point_history) < 2:
        return detected_events # Need at least two frames to detect movement

    current_time, current_landmarks = key_point_history[-1]
    previous_time, previous_landmarks = key_point_history[-2]

    heel_strike_threshold_y = 0.005
    toe_off_threshold_y = 0.005

    # Check Left Foot
    if len(current_landmarks) > max(LEFT_HEEL, LEFT_HIP) and len(previous_landmarks) > max(LEFT_HEEL, LEFT_HIP):
        current_left_heel_y = current_landmarks[LEFT_HEEL].y
        previous_left_heel_y = previous_landmarks[LEFT_HEEL].y
        left_hip_y = current_landmarks[LEFT_HIP].y

        # Simple Heel Strike Detection: Heel moves down significantly and is below hip
        if current_left_heel_y > previous_left_heel_y + heel_strike_threshold_y and current_left_heel_y > left_hip_y:
             # Add more robust checks here (e.g., velocity, relative position to ground)
             detected_events.append(GaitEvent('Heel Strike', 'Left', current_time / 1000000.0, current_landmarks[LEFT_HEEL]))

        # Simple Toe Off Detection: Foot index moves up significantly and is above hip
        if len(current_landmarks) > max(LEFT_FOOT_INDEX, LEFT_HIP) and len(previous_landmarks) > max(LEFT_FOOT_INDEX, LEFT_HIP):
            current_left_foot_index_y = current_landmarks[LEFT_FOOT_INDEX].y
            previous_left_foot_index_y = previous_landmarks[LEFT_FOOT_INDEX].y
            left_hip_y = current_landmarks[LEFT_HIP].y

            if current_left_foot_index_y < previous_left_foot_index_y - toe_off_threshold_y and current_left_foot_index_y < left_hip_y:
                 # Add more robust checks here
                 detected_events.append(GaitEvent('Toe Off', 'Left', current_time / 1000000.0, current_landmarks[LEFT_FOOT_INDEX]))


    # Check Right Foot (similar logic as Left Foot)
    if len(current_landmarks) > max(RIGHT_HEEL, RIGHT_HIP) and len(previous_landmarks) > max(RIGHT_HEEL, RIGHT_HIP):
        current_right_heel_y = current_landmarks[RIGHT_HEEL].y
        previous_right_heel_y = previous_landmarks[RIGHT_HEEL].y
        right_hip_y = current_landmarks[RIGHT_HIP].y

        # Simple Heel Strike Detection
        if current_right_heel_y > previous_right_heel_y + heel_strike_threshold_y and current_right_heel_y > right_hip_y:
             detected_events.append(GaitEvent('Heel Strike', 'Right', current_time / 1000000.0, current_landmarks[RIGHT_HEEL]))

        # Simple Toe Off Detection
        if len(current_landmarks) > max(RIGHT_FOOT_INDEX, RIGHT_HIP) and len(previous_landmarks) > max(RIGHT_FOOT_INDEX, RIGHT_HIP):
            current_right_foot_index_y = current_landmarks[RIGHT_FOOT_INDEX].y
            previous_right_foot_index_y = previous_landmarks[RIGHT_FOOT_INDEX].y
            right_hip_y = current_landmarks[RIGHT_HIP].y

            if current_right_foot_index_y < previous_right_foot_index_y - toe_off_threshold_y and current_right_foot_index_y < right_hip_y:
                 detected_events.append(GaitEvent('Toe Off', 'Right', current_time / 1000000.0, current_landmarks[RIGHT_FOOT_INDEX]))

    return detected_events

In [108]:
# Function to calculate gait metrics based on detected events and history
def calculate_gait_metrics(key_point_history, detected_events, gait_cycles_left, gait_cycles_right, pixel_to_meter_ratio):
    metrics = {}
    current_time = key_point_history[-1][0] / 1000000.0
    current_landmarks = key_point_history[-1][1]

    # Update ongoing gait cycles and detect new ones
    for event in detected_events:
        if event.event_type == 'Heel Strike':
            if event.foot == 'Left':
                # Complete the previous left cycle if one was ongoing
                if gait_cycles_left and gait_cycles_left[-1].toe_off:
                     # Calculate swing time for the previous cycle
                     gait_cycles_left[-1].swing_time = event.timestamp - gait_cycles_left[-1].toe_off.timestamp
                     # For simplicity, we'll calculate cycle time here and leave stride length as a placeholder.
                     if len(gait_cycles_left) > 1:
                         gait_cycles_left[-2].complete_cycle(event.timestamp)

                # Start a new left gait cycle
                gait_cycles_left.append(GaitCycle('Left', event))

            elif event.foot == 'Right':
                 # Complete the previous right cycle if one was ongoing
                 if gait_cycles_right and gait_cycles_right[-1].toe_off:
                      gait_cycles_right[-1].swing_time = event.timestamp - gait_cycles_right[-1].toe_off.timestamp
                      if len(gait_cycles_right) > 1:
                          gait_cycles_right[-2].complete_cycle(event.timestamp)

                 # Start a new right gait cycle
                 gait_cycles_right.append(GaitCycle('Right', event))

        elif event.event_type == 'Toe Off':
            if event.foot == 'Left':
                # Complete the stance phase of the current left cycle
                if gait_cycles_left and gait_cycles_left[-1].heel_strike and gait_cycles_left[-1].toe_off is None:
                    gait_cycles_left[-1].complete_stance_swing(event)
            elif event.foot == 'Right':
                 # Complete the stance phase of the current right cycle
                 if gait_cycles_right and gait_cycles_right[-1].heel_strike and gait_cycles_right[-1].toe_off is None:
                     gait_cycles_right[-1].complete_stance_swing(event)

    # Calculate metrics from completed cycles (you might want to average over several cycles)
    # Example: Calculate average cycle time from the last few completed cycles
    num_cycles_to_average = 3 # This line and the following were incorrectly indented
    left_cycle_times = [c.cycle_time for c in gait_cycles_left[-num_cycles_to_average:] if c.cycle_time is not None]
    right_cycle_times = [c.cycle_time for c in gait_cycles_right[-num_cycles_to_average:] if c.cycle_time is not None]

    if left_cycle_times:
        metrics['average_left_cycle_time'] = np.mean(left_cycle_times)
    if right_cycle_times:
        metrics['average_right_cycle_time'] = np.mean(right_cycle_times)

    # ... (rest of the function code remains the same)

    return metrics

In [109]:
# --- Main Pipeline Execution ---

def run_gait_analysis_pipeline(video_path):
    cap = cv2.VideoCapture(video_path)

    if not cap.isOpened():
        print(f"Error: Could not open video file {video_path}")
        return

    frame_rate = cap.get(cv2.CAP_PROP_FPS)
    if frame_rate == 0:
        print("Warning: Could not get frame rate from video. Assuming 30 FPS.")
        frame_rate = 30.0

    # Placeholder for pixel to meter ratio calibration
    # You will need to determine this based on your video setup (e.g., using a known distance in the frame)
    pixel_to_meter_ratio = 0.001 # Example: 1 pixel = 0.001 meters (adjust this!)

    # Initialize MediaPipe Pose model
    # 'static_image_mode=False' for video processing, 'model_complexity' can be 0, 1, or 2
    with mp_pose.Pose(static_image_mode=False, model_complexity=1, enable_segmentation=False, min_detection_confidence=0.5, min_tracking_confidence=0.5) as pose:

        key_point_history = deque(maxlen=int(frame_rate * 5)) # Store history for 5 seconds
        gait_cycles_left = [] # To store completed left gait cycles
        gait_cycles_right = [] # To store completed right gait cycles
        ongoing_cycle_left = None # To track the current incomplete left cycle
        ongoing_cycle_right = None # To track the current incomplete right cycle


        frame_count = 0
        start_time = time.time()

        while cap.isOpened():
            ret, frame = cap.read()
            if not ret:
                break

            frame_count += 1
            # Convert the BGR image to RGB.
            image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            # Process the image and find pose landmarks.
            results = pose.process(image)

            # Convert the RGB image back to BGR for visualization.
            image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)

            current_timestamp_us = int(cap.get(cv2.CAP_PROP_POS_MSEC) * 1000) # Timestamp in microseconds

            if results.pose_landmarks:
                landmarks = results.pose_landmarks.landmark
                # Store the full set of landmarks in history
                key_point_history.append((current_timestamp_us, landmarks))

                # Extract relevant gait key points (hip, ankle, heel, foot index)
                gait_landmarks = []
                if len(landmarks) > max(RIGHT_FOOT_INDEX, LEFT_FOOT_INDEX): # Ensure enough landmarks are detected
                    gait_landmarks.append(landmarks[LEFT_HIP])
                    gait_landmarks.append(landmarks[RIGHT_HIP])
                    gait_landmarks.append(landmarks[LEFT_ANKLE])
                    gait_landmarks.append(landmarks[RIGHT_ANKLE])
                    gait_landmarks.append(landmarks[LEFT_HEEL])
                    gait_landmarks.append(landmarks[RIGHT_HEEL])
                    gait_landmarks.append(landmarks[LEFT_FOOT_INDEX])
                    gait_landmarks.append(landmarks[RIGHT_FOOT_INDEX])

                # Detect gait events in the current frame
                detected_events = detect_gait_events(key_point_history, frame_rate)

                # Calculate gait metrics
                # Pass ongoing cycles to the calculation function to update them
                gait_metrics = calculate_gait_metrics(key_point_history, detected_events, gait_cycles_left, gait_cycles_right, pixel_to_meter_ratio)

                # --- Output or Display Metrics ---
                # You can print the metrics, display them on the frame, or save them to a file.
                print(f"Frame: {frame_count}, Timestamp: {current_timestamp_us / 1000000.0:.2f}s, Metrics: {gait_metrics}")

                # Optional: Draw landmarks on the frame
                mp_drawing.draw_landmarks(image, results.pose_landmarks, mp_pose.POSE_CONNECTIONS,
                                          mp_drawing.DrawingSpec(color=(245, 117, 66), thickness=2, circle_radius=2),
                                          mp_drawing.DrawingSpec(color=(245, 66, 230), thickness=2, circle_radius=2))

            # Display the frame
            cv2.imshow('MediaPipe Pose and Gait Analysis', image)

            # Break the loop if 'q' is pressed
            if cv2.waitKey(1) & 0xFF == ord('q'):
                break

        end_time = time.time()
        elapsed_time = end_time - start_time
        print(f"\nProcessing finished.")
        print(f"Processed {frame_count} frames in {elapsed_time:.2f} seconds.")
        if elapsed_time > 0:
            print(f"Average FPS: {frame_count / elapsed_time:.2f}")


    cap.release()
    cv2.destroyAllWindows()

In [110]:
from google.colab import files
uploaded = files.upload()
video_path = "output_video_test_high_.mp4"
run_gait_analysis_pipeline(video_path)

Saving output_video_test_high_.mp4 to output_video_test_high_ (2).mp4
Error: Could not open video file output_video_test_high_.mp4


In [140]:
video_path_ = "output_video_test_high_ (2).mp4"

In [142]:
def run_gait_analysis_pipeline(video_path_):
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print(f"Failed to open video: {video_path_}")
        return

    fps = cap.get(cv2.CAP_PROP_FPS)
    frame_count = 0

    left_ankle_y, right_ankle_y = [], []
    left_ankle_x, right_ankle_x = [], []
    left_hip_y, right_hip_y = [], []
    timestamps = []

    pose_detected_frames = 0

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        image_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        results = pose.process(image_rgb)

        if results.pose_landmarks:
            pose_detected_frames += 1
            landmarks = results.pose_landmarks.landmark

            l_ankle = landmarks[mp_pose.PoseLandmark.LEFT_ANKLE]
            r_ankle = landmarks[mp_pose.PoseLandmark.RIGHT_ANKLE]
            l_hip = landmarks[mp_pose.PoseLandmark.LEFT_HIP]
            r_hip = landmarks[mp_pose.PoseLandmark.RIGHT_HIP]

            left_ankle_y.append(l_ankle.y)
            right_ankle_y.append(r_ankle.y)
            left_ankle_x.append(l_ankle.x)
            right_ankle_x.append(r_ankle.x)
            left_hip_y.append(l_hip.y)
            right_hip_y.append(r_hip.y)
            timestamps.append(frame_count / fps)

        frame_count += 1

    cap.release()

    print(f"Total frames processed: {frame_count}")
    print(f"Frames with pose detected: {pose_detected_frames}")

    compute_gait_metrics(
        left_ankle_y, right_ankle_y, left_ankle_x, right_ankle_x, timestamps, fps
    )


In [143]:
from scipy.signal import find_peaks, savgol_filter

# Initialize MediaPipe Pose
mp_pose = mp.solutions.pose
pose = mp_pose.Pose()
mp_drawing = mp.solutions.drawing_utils

def run_gait_analysis_pipeline(video_path_):
    cap = cv2.VideoCapture(video_path_)
    fps = cap.get(cv2.CAP_PROP_FPS)
    frame_count = 0

    left_ankle_y, right_ankle_y = [], []
    left_ankle_x, right_ankle_x = [], []
    left_hip_y, right_hip_y = [], []
    timestamps = []

    pose_detected_frames = 0

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        image_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        results = pose.process(image_rgb)

        if results.pose_landmarks:
            pose_detected_frames += 1
            landmarks = results.pose_landmarks.landmark

            l_ankle = landmarks[mp_pose.PoseLandmark.LEFT_ANKLE]
            r_ankle = landmarks[mp_pose.PoseLandmark.RIGHT_ANKLE]
            l_hip = landmarks[mp_pose.PoseLandmark.LEFT_HIP]
            r_hip = landmarks[mp_pose.PoseLandmark.RIGHT_HIP]

            left_ankle_y.append(l_ankle.y)
            right_ankle_y.append(r_ankle.y)
            left_ankle_x.append(l_ankle.x)
            right_ankle_x.append(r_ankle.x)
            left_hip_y.append(l_hip.y)
            right_hip_y.append(r_hip.y)
            timestamps.append(frame_count / fps)

        frame_count += 1

    cap.release()

    print(f"Total frames processed: {frame_count}")
    print(f"Frames with pose detected: {pose_detected_frames}")

    compute_gait_metrics(
        left_ankle_y, right_ankle_y, left_ankle_x, right_ankle_x, timestamps, fps
    )
    cap = cv2.VideoCapture(video_path_)
    if not cap.isOpened():
        print(f" Failed to open video: {video_path_}")
        return
    else:
        print(f"Successfully opened video: {video_path_}")
    video_path_ = "/content/output_video_test_high_ (2).mp4"
    result = run_gait_analysis_pipeline(video_path)
    print(result)

def compute_gait_metrics(lay, ray, lax, rax, timestamps, fps):
    if not lay or not ray or not lax or not rax or not timestamps:
        print("Insufficient data for gait metrics.")
        return

    print(f"Data points available: {len(lay)} frames")

    signal = -np.array(lay)
    min_peak_distance = max(1, int(fps * 0.3))

    peaks, _ = find_peaks(signal, distance=min_peak_distance)
    print(f"Detected {len(peaks)} gait peaks")

    if len(peaks) < 2:
        print("Not enough peaks for cycle computation.")
        return

    cycle_times = np.diff([timestamps[p] for p in peaks])
    avg_cycle_time = np.mean(cycle_times) if len(cycle_times) > 1 else None

    step_lengths = [abs(lax[i] - rax[i]) for i in peaks if i < len(lax) and i < len(rax)]
    avg_step_length = np.mean(step_lengths) if step_lengths else None

    stride_lengths = [
        abs(lax[peaks[i]] - lax[peaks[i - 1]])
        for i in range(1, len(peaks))
        if peaks[i] < len(lax) and peaks[i - 1] < len(lax)
    ]
    avg_stride_length = np.mean(stride_lengths) if stride_lengths else None

    gait_speed = (
        avg_stride_length / avg_cycle_time
        if avg_stride_length is not None and avg_cycle_time
        else None
    )

    stance_l = estimate_stance_times(lay, timestamps)
    stance_r = estimate_stance_times(ray, timestamps)

    avg_stance_l = np.mean([end - start for start, end in stance_l]) if stance_l else None
    avg_stance_r = np.mean([end - start for start, end in stance_r]) if stance_r else None

    double_supports = 0
    for ls in stance_l:
        for rs in stance_r:
            overlap = max(0, min(ls[1], rs[1]) - max(ls[0], rs[0]))
            if overlap > 0:
                double_supports += overlap
    avg_double_support = double_supports / min(len(stance_l), len(stance_r)) if stance_l and stance_r else None

    print("\n=== Gait Metrics ===")
    print(f"Average Cycle Time:       {avg_cycle_time:.2f} s" if avg_cycle_time is not None else "Average Cycle Time:       N/A")
    print(f"Average Step Length:      {avg_step_length:.3f} units" if avg_step_length is not None else "Average Step Length:      N/A")
    print(f"Average Stride Length:    {avg_stride_length:.3f} units" if avg_stride_length is not None else "Average Stride Length:    N/A")
    print(f"Gait Speed:               {gait_speed:.3f} units/s" if gait_speed is not None else "Gait Speed:               N/A")
    print(f"Average Stance Time (L):  {avg_stance_l:.2f} s" if avg_stance_l is not None else "Average Stance Time (L):  N/A")
    print(f"Average Stance Time (R):  {avg_stance_r:.2f} s" if avg_stance_r is not None else "Average Stance Time (R):  N/A")
    print(f"Average Double Support:   {avg_double_support:.2f} s" if avg_double_support is not None else "Average Double Support:   N/A")

def estimate_stance_times(y_coords, timestamps, vel_thresh=0.015, min_stance_dur=0.05):
    y = np.array(y_coords)
    if len(y) < 9:
        return []

    # Apply Savitzky-Golay smoothing to remove noise
    y_smooth = savgol_filter(y, window_length=9, polyorder=2)
    timestamps = np.array(timestamps)

    dt = np.diff(timestamps)
    dy = np.diff(y_smooth)
    velocity = np.abs(dy / dt)

    stance_periods = []
    in_stance = False
    start_time = None

    for i, v in enumerate(velocity):
        if v < vel_thresh:
            if not in_stance:
                in_stance = True
                start_time = timestamps[i]
        else:
            if in_stance:
                end_time = timestamps[i]
                duration = end_time - start_time
                if duration >= min_stance_dur:
                    stance_periods.append((start_time, end_time))
                in_stance = False

    return stance_periods

run_gait_analysis_pipeline(video_path_)

Total frames processed: 527
Frames with pose detected: 300
Data points available: 300 frames
Detected 11 gait peaks

=== Gait Metrics ===
Average Cycle Time:       0.45 s
Average Step Length:      0.033 units
Average Stride Length:    0.048 units
Gait Speed:               0.106 units/s
Average Stance Time (L):  0.40 s
Average Stance Time (R):  0.38 s
Average Double Support:   0.33 s
Successfully opened video: output_video_test_high_ (2).mp4
Total frames processed: 0
Frames with pose detected: 0
Insufficient data for gait metrics.
 Failed to open video: /content/output_video_test_high_(2).mp4
None


In [144]:
import cv2
import mediapipe as mp
import numpy as np

# Initialize MediaPipe Pose
mp_pose = mp.solutions.pose
pose = mp_pose.Pose(static_image_mode=False, min_detection_confidence=0.5, min_tracking_confidence=0.5)
mp_drawing = mp.solutions.drawing_utils

# Define connections for stick figure
POSE_CONNECTIONS = [
    (mp_pose.PoseLandmark.LEFT_HIP, mp_pose.PoseLandmark.LEFT_KNEE),
    (mp_pose.PoseLandmark.LEFT_KNEE, mp_pose.PoseLandmark.LEFT_ANKLE),
    (mp_pose.PoseLandmark.RIGHT_HIP, mp_pose.PoseLandmark.RIGHT_KNEE),
    (mp_pose.PoseLandmark.RIGHT_KNEE, mp_pose.PoseLandmark.RIGHT_ANKLE),
    (mp_pose.PoseLandmark.LEFT_HIP, mp_pose.PoseLandmark.RIGHT_HIP),
    (mp_pose.PoseLandmark.LEFT_SHOULDER, mp_pose.PoseLandmark.RIGHT_SHOULDER),
    (mp_pose.PoseLandmark.LEFT_SHOULDER, mp_pose.PoseLandmark.LEFT_HIP),
    (mp_pose.PoseLandmark.RIGHT_SHOULDER, mp_pose.PoseLandmark.RIGHT_HIP),
    (mp_pose.PoseLandmark.LEFT_SHOULDER, mp_pose.PoseLandmark.LEFT_ELBOW),
    (mp_pose.PoseLandmark.LEFT_ELBOW, mp_pose.PoseLandmark.LEFT_WRIST),
    (mp_pose.PoseLandmark.RIGHT_SHOULDER, mp_pose.PoseLandmark.RIGHT_ELBOW),
    (mp_pose.PoseLandmark.RIGHT_ELBOW, mp_pose.PoseLandmark.RIGHT_WRIST),
    (mp_pose.PoseLandmark.NOSE, mp_pose.PoseLandmark.LEFT_EYE),
    (mp_pose.PoseLandmark.NOSE, mp_pose.PoseLandmark.RIGHT_EYE),
]

def draw_stick_figure(image, landmarks, connections, visibility_th=0.5):
    h, w, _ = image.shape
    for connection in connections:
        start_idx, end_idx = connection
        start = landmarks[start_idx.value]
        end = landmarks[end_idx.value]
        if start.visibility > visibility_th and end.visibility > visibility_th:
            x1, y1 = int(start.x * w), int(start.y * h)
            x2, y2 = int(end.x * w), int(end.y * h)
            cv2.line(image, (x1, y1), (x2, y2), (0, 255, 0), 2)
    return image

def visualize_gait(video_path, output_path='gait_output.mp4'):
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print(f"Failed to open video: {video_path}")
        return

    fps = cap.get(cv2.CAP_PROP_FPS)
    width  = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))

    frame_count = 0
    pose_detected_frames = 0

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        image_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        results = pose.process(image_rgb)

        if results.pose_landmarks:
            pose_detected_frames += 1
            frame = draw_stick_figure(frame, results.pose_landmarks.landmark, POSE_CONNECTIONS)

        out.write(frame)
        frame_count += 1

    cap.release()
    out.release()
    print(f"Total frames processed: {frame_count}")
    print(f"Frames with pose detected: {pose_detected_frames}")
    print(f"Output video saved to: {output_path}")


In [148]:
import cv2
import mediapipe as mp
import numpy as np
import time
from collections import deque
from scipy.signal import find_peaks, savgol_filter
import os
from google.colab import files # Import the files module for downloading

# --- Initialization ---
# Initialize MediaPipe Pose model
# static_image_mode=False for video processing
# model_complexity can be 0, 1, or 2 (higher is more accurate but slower)
# min_detection_confidence and min_tracking_confidence are thresholds for detection and tracking
mp_pose = mp.solutions.pose
pose = mp_pose.Pose(static_image_mode=False, model_complexity=1, enable_segmentation=False, min_detection_confidence=0.5, min_tracking_confidence=0.5)
mp_drawing = mp.solutions.drawing_utils

# Define the indices for relevant pose landmarks based on MediaPipe's model.
# These are standard indices for the 33 landmarks provided by MediaPipe Pose.
LEFT_HIP = 23
RIGHT_HIP = 24
LEFT_ANKLE = 27
RIGHT_ANKLE = 28
LEFT_HEEL = 29
RIGHT_HEEL = 30
LEFT_FOOT_INDEX = 31
RIGHT_FOOT_INDEX = 32

# Define connections for stick figure visualization
POSE_CONNECTIONS = [
    (mp_pose.PoseLandmark.LEFT_HIP, mp_pose.PoseLandmark.LEFT_KNEE),
    (mp_pose.PoseLandmark.LEFT_KNEE, mp_pose.PoseLandmark.LEFT_ANKLE),
    (mp_pose.PoseLandmark.RIGHT_HIP, mp_pose.PoseLandmark.RIGHT_KNEE),
    (mp_pose.PoseLandmark.RIGHT_KNEE, mp_pose.PoseLandmark.RIGHT_ANKLE),
    (mp_pose.PoseLandmark.LEFT_HIP, mp_pose.PoseLandmark.RIGHT_HIP),
    (mp_pose.PoseLandmark.LEFT_SHOULDER, mp_pose.PoseLandmark.RIGHT_SHOULDER),
    (mp_pose.PoseLandmark.LEFT_SHOULDER, mp_pose.PoseLandmark.LEFT_HIP),
    (mp_pose.PoseLandmark.RIGHT_SHOULDER, mp_pose.PoseLandmark.RIGHT_HIP),
    (mp_pose.PoseLandmark.LEFT_SHOULDER, mp_pose.PoseLandmark.LEFT_ELBOW),
    (mp_pose.PoseLandmark.LEFT_ELBOW, mp_pose.PoseLandmark.LEFT_WRIST),
    (mp_pose.PoseLandmark.RIGHT_SHOULDER, mp_pose.PoseLandmark.RIGHT_ELBOW),
    (mp_pose.PoseLandmark.RIGHT_ELBOW, mp_pose.PoseLandmark.RIGHT_WRIST),
    (mp_pose.PoseLandmark.NOSE, mp_pose.PoseLandmark.LEFT_EYE),
    (mp_pose.PoseLandmark.NOSE, mp_pose.PoseLandmark.RIGHT_EYE),
]

# --- Data Structures for Gait Analysis ---
# Class to store gait event information
class GaitEvent:
    def __init__(self, event_type, foot, timestamp, position):
        self.event_type = event_type # 'Heel Strike' or 'Toe Off'
        self.foot = foot # 'Left' or 'Right'
        self.timestamp = timestamp # in seconds
        self.position = position # Landmark position (x, y, z)

# Class to store gait cycle information
class GaitCycle:
    def __init__(self, foot, heel_strike_event, toe_off_event=None):
        self.foot = foot
        self.heel_strike = heel_strike_event
        self.toe_off = toe_off_event
        self.cycle_time = None
        self.stride_length = None # Placeholder, requires calibration
        self.stance_time = None
        self.swing_time = None
        self.double_support_times = [] # List of durations

    def complete_cycle(self, next_heel_strike_time):
        """Calculates cycle time when the next heel strike occurs."""
        if self.heel_strike and next_heel_strike_time is not None:
            self.cycle_time = next_heel_strike_time - self.heel_strike.timestamp

    def complete_stance_swing(self, toe_off_event):
        """Calculates stance time once toe off is detected."""
        self.toe_off = toe_off_event
        if self.heel_strike and self.toe_off:
            self.stance_time = self.toe_off.timestamp - self.heel_strike.timestamp
            # Swing time is calculated when the next heel strike occurs (part of the next cycle's calculation)
            pass

# --- Helper Functions ---
def calculate_distance(p1, p2):
    """Calculates the Euclidean distance between two points (x, y, z)."""
    # Ensure points have x, y, z attributes
    if not hasattr(p1, 'x') or not hasattr(p1, 'y') or not hasattr(p1, 'z') or \
       not hasattr(p2, 'x') or not hasattr(p2, 'y') or not hasattr(p2, 'z'):
        print("Warning: Points do not have valid coordinates for distance calculation.")
        return 0.0
    return np.sqrt((p1.x - p2.x)**2 + (p1.y - p2.y)**2 + (p1.z - p2.z)**2)

def detect_gait_events(key_point_history, frame_rate):
    """
    Detects gait events (Heel Strike and Toe Off) based on key point movement.
    This is a simplified example. Robust event detection requires careful algorithm design.

    Args:
        key_point_history: A deque of (timestamp, landmarks) tuples.
        frame_rate: The frame rate of the video.

    Returns:
        A list of detected GaitEvent objects in the current frame.
    """
    detected_events = []
    if len(key_point_history) < 2:
        return detected_events # Need at least two frames to detect movement

    current_time, current_landmarks = key_point_history[-1]
    previous_time, previous_landmarks = key_point_history[-2]

    # Thresholds for detecting significant vertical movement
    # These thresholds may need tuning based on video resolution and subject's movement
    heel_strike_threshold_y = 0.005
    toe_off_threshold_y = 0.005

    # Ensure landmarks are detected for relevant points
    if current_landmarks is None or previous_landmarks is None:
        return detected_events

    # Check Left Foot
    if len(current_landmarks) > max(LEFT_HEEL, LEFT_HIP) and len(previous_landmarks) > max(LEFT_HEEL, LEFT_HIP):
        current_left_heel_y = current_landmarks[LEFT_HEEL].y
        previous_left_heel_y = previous_landmarks[LEFT_HEEL].y
        left_hip_y = current_landmarks[LEFT_HIP].y

        # Simple Heel Strike Detection: Heel moves down significantly and is below hip (relative y-coordinate)
        # This is a basic heuristic and can be improved.
        if current_left_heel_y > previous_left_heel_y + heel_strike_threshold_y and current_left_heel_y > left_hip_y:
             # Add more robust checks here (e.g., velocity, relative position to ground)
             detected_events.append(GaitEvent('Heel Strike', 'Left', current_time / 1000000.0, current_landmarks[LEFT_HEEL]))

        # Simple Toe Off Detection: Foot index moves up significantly and is above hip (relative y-coordinate)
        # This is a basic heuristic and can be improved.
        if len(current_landmarks) > max(LEFT_FOOT_INDEX, LEFT_HIP) and len(previous_landmarks) > max(LEFT_FOOT_INDEX, LEFT_HIP):
            current_left_foot_index_y = current_landmarks[LEFT_FOOT_INDEX].y
            previous_left_foot_index_y = previous_landmarks[LEFT_FOOT_INDEX].y
            left_hip_y = current_landmarks[LEFT_HIP].y

            if current_left_foot_index_y < previous_left_foot_index_y - toe_off_threshold_y and current_left_foot_index_y < left_hip_y:
                 # Add more robust checks here
                 detected_events.append(GaitEvent('Toe Off', 'Left', current_time / 1000000.0, current_landmarks[LEFT_FOOT_INDEX]))

    # Check Right Foot (similar logic as Left Foot)
    if len(current_landmarks) > max(RIGHT_HEEL, RIGHT_HIP) and len(previous_landmarks) > max(RIGHT_HEEL, RIGHT_HIP):
        current_right_heel_y = current_landmarks[RIGHT_HEEL].y
        previous_right_heel_y = previous_landmarks[RIGHT_HEEL].y
        right_hip_y = current_landmarks[RIGHT_HIP].y

        # Simple Heel Strike Detection
        if current_right_heel_y > previous_right_heel_y + heel_strike_threshold_y and current_right_heel_y > right_hip_y:
             detected_events.append(GaitEvent('Heel Strike', 'Right', current_time / 1000000.0, current_landmarks[RIGHT_HEEL]))

        # Simple Toe Off Detection
        if len(current_landmarks) > max(RIGHT_FOOT_INDEX, RIGHT_HIP) and len(previous_landmarks) > max(RIGHT_FOOT_INDEX, RIGHT_HIP):
            current_right_foot_index_y = current_landmarks[RIGHT_FOOT_INDEX].y
            previous_right_foot_index_y = previous_landmarks[RIGHT_FOOT_INDEX].y
            right_hip_y = current_landmarks[RIGHT_HIP].y

            if current_right_foot_index_y < previous_right_foot_index_y - toe_off_threshold_y and current_right_foot_index_y < right_hip_y:
                 detected_events.append(GaitEvent('Toe Off', 'Right', current_time / 1000000.0, current_landmarks[RIGHT_FOOT_INDEX]))

    return detected_events

def calculate_gait_metrics(key_point_history, detected_events, gait_cycles_left, gait_cycles_right, pixel_to_meter_ratio):
    """
    Calculates gait metrics based on detected events and landmark history.

    Args:
        key_point_history: A deque of (timestamp, landmarks) tuples.
        detected_events: A list of GaitEvent objects detected in the current frame.
        gait_cycles_left: List of completed left gait cycles.
        gait_cycles_right: List of completed right gait cycles.
        pixel_to_meter_ratio: Conversion factor from pixels to meters (requires calibration).

    Returns:
        A dictionary containing calculated gait metrics for the current frame/state.
    """
    metrics = {}
    current_time = key_point_history[-1][0] / 1000000.0 if key_point_history else 0
    current_landmarks = key_point_history[-1][1] if key_point_history else None

    if current_landmarks is None:
        return metrics # Return empty metrics if no landmarks are detected

    # Update ongoing gait cycles and detect new ones
    for event in detected_events:
        if event.event_type == 'Heel Strike':
            if event.foot == 'Left':
                # Complete the previous left cycle if one was ongoing and had a toe off
                if gait_cycles_left and gait_cycles_left[-1].toe_off:
                     gait_cycles_left[-1].swing_time = event.timestamp - gait_cycles_left[-1].toe_off.timestamp
                     # Complete the cycle time for the second to last cycle (Heel Strike to next Heel Strike)
                     if len(gait_cycles_left) > 1:
                         gait_cycles_left[-2].complete_cycle(event.timestamp)

                # Start a new left gait cycle
                gait_cycles_left.append(GaitCycle('Left', event))

            elif event.foot == 'Right':
                 # Complete the previous right cycle if one was ongoing and had a toe off
                 if gait_cycles_right and gait_cycles_right[-1].toe_off:
                      gait_cycles_right[-1].swing_time = event.timestamp - gait_cycles_right[-1].toe_off.timestamp
                      # Complete the cycle time for the second to last cycle
                      if len(gait_cycles_right) > 1:
                          gait_cycles_right[-2].complete_cycle(event.timestamp)

                 # Start a new right gait cycle
                 gait_cycles_right.append(GaitCycle('Right', event))

        elif event.event_type == 'Toe Off':
            if event.foot == 'Left':
                # Complete the stance phase of the current left cycle if it's ongoing
                if gait_cycles_left and gait_cycles_left[-1].heel_strike and gait_cycles_left[-1].toe_off is None:
                    gait_cycles_left[-1].complete_stance_swing(event)
            elif event.foot == 'Right':
                 # Complete the stance phase of the current right cycle if it's ongoing
                 if gait_cycles_right and gait_cycles_right[-1].heel_strike and gait_cycles_right[-1].toe_off is None:
                     gait_cycles_right[-1].complete_stance_swing(event)

    # Calculate metrics from completed cycles (you might want to average over several cycles)
    # Example: Calculate average cycle time from the last few completed cycles
    num_cycles_to_average = 3
    left_cycle_times = [c.cycle_time for c in gait_cycles_left[-num_cycles_to_average:] if c.cycle_time is not None]
    right_cycle_times = [c.cycle_time for c in gait_cycles_right[-num_cycles_to_average:] if c.cycle_time is not None]

    if left_cycle_times:
        metrics['average_left_cycle_time'] = np.mean(left_cycle_times)
    if right_cycle_times:
        metrics['average_right_cycle_time'] = np.mean(right_cycle_times)

    # Calculate average stance and swing times from completed cycles
    left_stance_times = [c.stance_time for c in gait_cycles_left[-num_cycles_to_average:] if c.stance_time is not None]
    right_stance_times = [c.stance_time for c in gait_cycles_right[-num_cycles_to_average:] if c.stance_time is not None]
    left_swing_times = [c.swing_time for c in gait_cycles_left[-num_cycles_to_average:] if c.swing_time is not None]
    right_swing_times = [c.swing_time for c in gait_cycles_right[-num_cycles_to_average:] if c.swing_time is not None]


    if left_stance_times:
        metrics['average_left_stance_time'] = np.mean(left_stance_times)
    if right_stance_times:
        metrics['average_right_stance_time'] = np.mean(right_stance_times)
    if left_swing_times:
        metrics['average_left_swing_time'] = np.mean(left_swing_times)
    if right_swing_times:
        metrics['average_right_swing_time'] = np.mean(right_swing_times)

    # Calculate step and stride length estimates using the pixel_to_meter_ratio
    # IMPORTANT: The pixel_to_meter_ratio needs to be accurately calibrated for meaningful results.
    # See the comments in the Execution section for how to approach calibration.
    if detected_events:
         for event in detected_events:
             if event.event_type == 'Heel Strike':
                 if event.foot == 'Left' and current_landmarks is not None and len(current_landmarks) > RIGHT_ANKLE and len(current_landmarks) > LEFT_HEEL:
                     # Estimate left step length (distance between left heel and right ankle at left heel strike)
                     metrics['current_left_step_length_m'] = calculate_distance(current_landmarks[LEFT_HEEL], current_landmarks[RIGHT_ANKLE]) * pixel_to_meter_ratio
                 elif event.foot == 'Right' and current_landmarks is not None and len(current_landmarks) > LEFT_ANKLE and len(current_landmarks) > RIGHT_HEEL:
                     # Estimate right step length (distance between right heel and left ankle at right heel strike)
                     metrics['current_right_step_length_m'] = calculate_distance(current_landmarks[RIGHT_HEEL], current_landmarks[LEFT_ANKLE]) * pixel_to_meter_ratio

    # You can add more metric calculations here (e.g., cadence, double support time)

    return metrics

# --- Visualization Function ---
def draw_stick_figure(image, landmarks, connections, visibility_th=0.5):
    """
    Draws a stick figure on the image based on provided landmarks and connections.

    Args:
        image: The input image frame (NumPy array).
        landmarks: A list of pose landmarks from MediaPipe.
        connections: A list of tuples defining connections between landmarks.
        visibility_th: Minimum visibility score for a landmark to be drawn.

    Returns:
        The image frame with the stick figure drawn.
    """
    h, w, _ = image.shape
    # Ensure landmarks are not None and have the expected structure
    if landmarks is None:
        return image
    for connection in connections:
        start_idx, end_idx = connection
        # Use .value to get the integer index from the enum
        start = landmarks[start_idx.value]
        end = landmarks[end_idx.value]
        if start.visibility > visibility_th and end.visibility > visibility_th:
            x1, y1 = int(start.x * w), int(start.y * h)
            x2, y2 = int(end.x * w), int(end.y * h)
            cv2.line(image, (x1, y1), (x2, y2), (0, 255, 0), 2) # Draw in green
    return image

def visualize_gait(video_path_, output_path='gait_output.mp4'):
    """
    Processes a video to draw a stick figure visualization of the pose.

    Args:
        video_path_: Path to the input video file.
        output_path: Path to save the output video with visualization.
    """
    print(f"Attempting to open video for visualization: {video_path_}") # Added print statement for debugging
    cap = cv2.VideoCapture(video_path_)

    # --- Added Error Handling for Video Capture ---
    if not cap.isOpened():
        print(f"ERROR: Failed to open video file for visualization: {video_path_}. Please check the file path and ensure the video is not corrupted or in an unsupported format.") # More specific error message
        return False # Indicate failure

    print(f"Successfully opened video for visualization: {video_path_}") # Added success message

    fps = cap.get(cv2.CAP_PROP_FPS)
    width  = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

    # Define the codec and create VideoWriter object
    fourcc = cv2.VideoWriter_fourcc(*'mp4v') # Codec for mp4
    out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))

    # --- Added Error Handling for VideoWriter ---
    if not out.isOpened():
        print(f"ERROR: Failed to initialize VideoWriter for output path: {output_path}. Ensure the codec ('mp4v') is supported and you have write permissions.")
        cap.release() # Release the video capture object
        return False # Indicate failure

    frame_count = 0
    pose_detected_frames = 0

    # Use a separate pose instance for visualization if needed, or reuse the main one
    # with mp_pose.Pose(static_image_mode=False, model_complexity=1, enable_segmentation=False, min_detection_confidence=0.5, min_tracking_confidence=0.5) as pose_viz:

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        image_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        results = pose.process(image_rgb) # Reuse the main pose instance

        if results.pose_landmarks:
            pose_detected_frames += 1
            # Draw landmarks using mp_drawing.draw_landmarks (MediaPipe's default visualization)
            mp_drawing.draw_landmarks(frame, results.pose_landmarks, mp_pose.POSE_CONNECTIONS,
                                      mp_drawing.DrawingSpec(color=(245, 117, 66), thickness=2, circle_radius=2), # Orange dots
                                      mp_drawing.DrawingSpec(color=(245, 66, 230), thickness=2, circle_radius=2)) # Pink connections
            # Alternatively, use your custom draw_stick_figure function if preferred
            # frame = draw_stick_figure(frame, results.pose_landmarks.landmark, POSE_CONNECTIONS)


        out.write(frame) # Write the frame with visualization to the output video
        frame_count += 1

    cap.release()
    out.release()
    print(f"Visualization finished.")
    print(f"Total frames processed for visualization: {frame_count}")
    print(f"Frames with pose detected for visualization: {pose_detected_frames}")
    print(f"Output visualization video saved to: {output_path}")

    return True # Indicate success


# --- Main Pipeline Execution ---
def run_gait_analysis_pipeline(video_path_):
    """
    Runs the MediaPipe pose estimation and gait analysis pipeline on a video.

    Args:
        video_path_: Path to the input video file.
    """
    print(f"Attempting to open video for analysis: {video_path_}") # Added print statement for debugging
    cap = cv2.VideoCapture(video_path_)

    # --- Added Error Handling for Video Capture ---
    if not cap.isOpened():
        print(f"ERROR: Could not open video file for analysis: {video_path_}. Please check the file path and ensure the video is not corrupted or in an unsupported format.") # More specific error message
        return False # Indicate failure

    print(f"Successfully opened video for analysis: {video_path_}") # Added success message


    frame_rate = cap.get(cv2.CAP_PROP_FPS)
    if frame_rate == 0:
        print("Warning: Could not get frame rate from video. Assuming 30 FPS.")
        frame_rate = 30.0

    # --- Pixel to Meter Ratio Calibration ---
    # To get gait metrics in meters or centimeters, you need to calibrate the pixel_to_meter_ratio.
    # This involves measuring a known real-world distance in the video frame and dividing it by the corresponding pixel distance.
    # Replace the placeholder value below with your calculated ratio.
    # Example: If a 1-meter ruler in the video is 500 pixels long, the ratio is 1 meter / 500 pixels = 0.002 meters/pixel.
    # If you want centimeters, the ratio would be 100 cm / 500 pixels = 0.2 cm/pixel.
    pixel_to_meter_ratio = 0.001 # Placeholder: Replace with your calibrated value!

    # Initialize data structures for gait analysis
    key_point_history = deque(maxlen=int(frame_rate * 5)) # Store history for 5 seconds for event detection
    gait_cycles_left = [] # To store completed left gait cycles
    gait_cycles_right = [] # To store completed right gait cycles

    frame_count = 0
    start_time = time.time()

    print(f"Starting gait analysis for video: {video_path_}")

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break # End of video

        frame_count += 1
        # Convert the BGR image to RGB for MediaPipe processing
        image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        # Process the image and find pose landmarks.
        results = pose.process(image)

        # Get the current timestamp in microseconds
        current_timestamp_us = int(cap.get(cv2.CAP_PROP_POS_MSEC) * 1000)

        if results.pose_landmarks:
            landmarks = results.pose_landmarks.landmark
            # Store the full set of landmarks and timestamp in history
            key_point_history.append((current_timestamp_us, landmarks))

            # Detect gait events in the current frame using the history
            detected_events = detect_gait_events(key_point_history, frame_rate)

            # Calculate gait metrics based on the current state and detected events
            # The calculated step/stride lengths will be in meters if pixel_to_meter_ratio is calibrated correctly in meters/pixel.
            gait_metrics = calculate_gait_metrics(key_point_history, detected_events, gait_cycles_left, gait_cycles_right, pixel_to_meter_ratio)

            # --- Output or Display Metrics ---
            # Print the calculated metrics for the current frame/state
            if gait_metrics:
                print(f"Frame: {frame_count}, Timestamp: {current_timestamp_us / 1000000.0:.2f}s, Metrics: {gait_metrics}")

        # In a Colab environment, cv2.imshow is not typically used for displaying video frames directly.
        # The visualization is saved to a file.
        # cv2.imshow('MediaPipe Pose and Gait Analysis', image)

        # The waitKey and breaking the loop with 'q' is for local execution with GUI windows.
        # if cv2.waitKey(1) & 0xFF == ord('q'):
        #     break

    end_time = time.time()
    elapsed_time = end_time - start_time
    print(f"\nGait analysis finished.")
    print(f"Processed {frame_count} frames in {elapsed_time:.2f} seconds.")
    if elapsed_time > 0:
        print(f"Average FPS: {frame_count / elapsed_time:.2f}")

    cap.release()
    # cv2.destroyAllWindows() # Not needed in Colab

    return True # Indicate success


# --- Execution ---
# Ensure you have uploaded the video file to your Colab environment
# and the path below correctly points to the uploaded file.
# Use the files.upload() function in a separate cell to upload the video.
# Example:
# from google.colab import files
# uploaded = files.upload()
# video_path_ = list(uploaded.keys())[0] # Get the name of the uploaded file

# Assuming the video is uploaded and named 'output_video_test_high_ (2).mp4' in the /content/ directory
video_path_ = "/content/output_video_test_high_ (2).mp4"

# Define the output path for the visualization video
output_video_path = 'gait_stick_figure_output.mp4'

# Check if the video file exists at the specified path
if not os.path.exists(video_path_):
    print(f"Error: Video file not found at {video_path_}. Please upload the video and ensure the path is correct.")
else:
    print(f"Video file found at {video_path_}. Proceeding with analysis.")
    # Run the gait analysis pipeline to calculate and print metrics
    # Remember to calibrate the pixel_to_meter_ratio in the run_gait_analysis_pipeline function
    analysis_successful = run_gait_analysis_pipeline(video_path_)

    # Run the visualization pipeline to generate the stick figure video
    # The output video will be saved as 'gait_stick_figure_output.mp4' in the /content/ directory
    if analysis_successful: # Only attempt visualization if analysis was successful
        visualization_successful = visualize_gait(video_path_, output_path=output_video_path)

        if visualization_successful:
            print("\nGait analysis and visualization complete.")
            print(f"Attempting to download the output video '{output_video_path}'...")
            # --- Added automatic download of the output video ---
            try:
                files.download(output_video_path)
                print(f"Download initiated for '{output_video_path}'. Check your browser's downloads.")
            except Exception as e:
                print(f"Error initiating download for '{output_video_path}': {e}")
                print(f"You can also try downloading the file manually from the file browser in Colab (look for '{output_video_path}' in the /content/ directory).")
        else:
             print("\nGait analysis complete, but visualization failed.")
    else:
        print("\nGait analysis failed.")


Video file found at /content/output_video_test_high_ (2).mp4. Proceeding with analysis.
Attempting to open video for analysis: /content/output_video_test_high_ (2).mp4
Successfully opened video for analysis: /content/output_video_test_high_ (2).mp4
Starting gait analysis for video: /content/output_video_test_high_ (2).mp4
Frame: 139, Timestamp: 2.30s, Metrics: {'current_left_step_length_m': 3.466096569658732e-05, 'current_right_step_length_m': 3.581988546268648e-05}
Frame: 143, Timestamp: 2.37s, Metrics: {'current_left_step_length_m': 2.292943345945438e-05, 'current_right_step_length_m': 2.219481022785552e-05}
Frame: 239, Timestamp: 3.97s, Metrics: {'average_right_stance_time': 1.599733, 'current_left_step_length_m': 0.00011429818836361981}
Frame: 240, Timestamp: 3.98s, Metrics: {'average_left_stance_time': 0.016664000000000012, 'average_right_stance_time': 1.599733}
Frame: 243, Timestamp: 4.03s, Metrics: {'average_left_cycle_time': 1.6663890000000001, 'average_right_cycle_time': 1.733

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

Download initiated for 'gait_stick_figure_output.mp4'. Check your browser's downloads.


In [145]:
video_path = "/content/output_video_test_high_ (2).mp4"
visualize_gait(video_path_)

Total frames processed: 527
Frames with pose detected: 300
Output video saved to: gait_output.mp4
