In [None]:
import cv2
import mediapipe as mp
import numpy as np
import pandas as pd
import os
import matplotlib.pyplot as plt
from pathlib import Path

In [None]:
PoseLandmark = mp.solutions.pose.PoseLandmark
mp_pose = mp.solutions.pose
np.set_printoptions(precision=4, suppress=True)

In [None]:
def compute_angle(a, b, c):
    a, b, c = np.array(a), np.array(b), np.array(c)
    ba = a - b
    bc = c - b
    cos_angle = np.dot(ba, bc) / (np.linalg.norm(ba) * np.linalg.norm(bc) + 1e-6)
    angle = np.arccos(np.clip(cos_angle, -1.0, 1.0))
    return np.degrees(angle)


In [None]:
def detect_heel_strike(prev_y, current_y, threshold=0.005):
    if prev_y is None:
        return False
    velocity = current_y - prev_y
    return -threshold < velocity < threshold

In [None]:
def compute_stride_length(prev_pos, curr_pos):
    if prev_pos is None:
        return 0.0
    prev = np.array(prev_pos)
    curr = np.array(curr_pos)
    return float(np.linalg.norm(curr - prev))

In [None]:
def extract_frame_landmarks(landmark_list):
    d = {}
    for idx, lm_enum in enumerate(PoseLandmark):
        name = lm_enum.name
        lm = landmark_list[idx]
        d[f"{name}_x"] = float(lm.x)
        d[f"{name}_y"] = float(lm.y)
        d[f"{name}_z"] = float(lm.z)
        d[f"{name}_visibility"] = float(lm.visibility)

        d['left_knee_angle'] = compute_angle([d['LEFT_HIP_x'], d['LEFT_HIP_y']], [d['LEFT_KNEE_x'], d['LEFT_KNEE_y']], [d['LEFT_ANKLE_x'], d['LEFT_ANKLE_y']])
        d['right_knee_angle'] = compute_angle([d['RIGHT_HIP_x'], d['RIGHT_HIP_y']], [d['RIGHT_KNEE_x'], d['RIGHT_KNEE_y']], [d['RIGHT_ANKLE_x'], d['RIGHT_ANKLE_y']])
        d['left_hip_angle'] = compute_angle([d['LEFT_SHOULDER_x'], d['LEFT_SHOULDER_y']], [d['LEFT_HIP_x'], d['LEFT_HIP_y']], [d['LEFT_KNEE_x'], d['LEFT_KNEE_y']])
        d['right_hip_angle'] = compute_angle([d['RIGHT_SHOULDER_x'], d['RIGHT_SHOULDER_y']], [d['RIGHT_HIP_x'], d['RIGHT_HIP_y']], [d['RIGHT_KNEE_x'], d['RIGHT_KNEE_y']])
    return d



In [None]:
def initialize_video_processing(video_path):
    cap = cv2.VideoCapture(video_path)
    fps = cap.get(cv2.CAP_PROP_FPS) or 30.0
    pose = mp_pose.Pose(static_image_mode=False, min_detection_confidence=0.5, min_tracking_confidence=0.5)
    return cap, fps, pose

print('initialize_video_processing ready')


In [None]:
def process_video_for_gait(video_path, cycles_per_video=5):
    cap, fps, pose = initialize_video_processing(video_path)
    frame_number = 0
    gait_count = 0
    recording_cycle = False
    current_cycle = []
    prev_left_heel_y = None
    prev_left_heel_pos = None
    all_rows = []

    while cap.isOpened() and gait_count < cycles_per_video:
        ret, frame = cap.read()
        if not ret:
            break
        frame_number += 1
        frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        results = pose.process(frame_rgb)
        if results.pose_landmarks:
            lm = results.pose_landmarks.landmark
            row = extract_frame_landmarks(lm)

            left_heel_y = row.get('LEFT_HEEL_y')
            left_heel_pos = [row.get('LEFT_HEEL_x'), row.get('LEFT_HEEL_y')]

            if detect_heel_strike(prev_left_heel_y, left_heel_y):
                if not recording_cycle:
                    recording_cycle = True
                    current_cycle = []
                    cycle_start_frame = frame_number
                    prev_left_heel_pos = left_heel_pos
                else:
                    recording_cycle = False
                    gait_count += 1
                    cycle_end_frame = frame_number
                    stride_length = compute_stride_length(prev_left_heel_pos, left_heel_pos)
                    step_time_sec = (cycle_end_frame - cycle_start_frame) / fps
                    
                    for r in current_cycle:
                        r['stride_length'] = stride_length
                        r['step_time_sec'] = step_time_sec
                    all_rows.extend(current_cycle)

            prev_left_heel_y = left_heel_y

            if recording_cycle:
                current_cycle.append(row)

    cap.release()
    pose.close()
    return all_rows

In [None]:
def extract_gait_features_to_csv(video_paths, output_csv='gait_features.csv', cycles_per_video=5, allowed_exts=None):
    allowed_exts = allowed_exts or ['.mp4', '.avi', '.mov', '.mkv']

    if isinstance(video_paths, (str, Path)):
        video_paths = [video_paths]

    all_rows = []
    video_items = []

    for p in video_paths:
        p = Path(p)
        if not p.exists():
            print(f'Warning: path does not exist, skipping: {p}')
            continue
        if p.is_dir():
            for child in sorted(p.iterdir()):
                if child.suffix.lower() in allowed_exts and child.is_file():
                    video_items.append(child)
        elif p.is_file():
            video_items.append(p)

    if not video_items:
        print('No video files found in provided paths. Writing empty CSV.')
        out_dir = Path('Datasets')
        out_dir.mkdir(parents=True, exist_ok=True)
        df = pd.DataFrame(all_rows)
        out_path = out_dir / output_csv
        df.to_csv(out_path, index=False)
        print(f'Saved {len(df)} rows to {out_path}')
        return df

    # Process each video and tag with gait_pattern (folder name)
    for video_path in video_items:
        folder_name = video_path.parent.name or ''
        try:
            rows = process_video_for_gait(str(video_path), cycles_per_video=cycles_per_video)
        except Exception as e:
            print(f'Error processing {video_path}: {e}')
            continue
        for r in rows:
            r['gait_pattern'] = folder_name
        all_rows.extend(rows)

    # Build DataFrame and save to Datasets/
    out_dir = Path('Datasets')
    out_dir.mkdir(parents=True, exist_ok=True)
    df = pd.DataFrame(all_rows)
    out_path = out_dir / output_csv
    df.to_csv(out_path, index=False)
    print(f'Saved {len(df)} rows to {out_path}')
    return df

In [None]:

df_test = extract_gait_features_to_csv([], output_csv='test_empty_restored.csv')
print('Rows:', len(df_test))
print('Datasets files:', [p.name for p in Path('Datasets').glob('*')])
