In [1]:
!pip uninstall -y mediapipe
!pip install mediapipe==0.10.8

[0mCollecting mediapipe==0.10.8
  Downloading mediapipe-0.10.8-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (9.6 kB)
Collecting protobuf<4,>=3.11 (from mediapipe==0.10.8)
  Downloading protobuf-3.20.3-py2.py3-none-any.whl.metadata (720 bytes)
Collecting sounddevice>=0.4.4 (from mediapipe==0.10.8)
  Downloading sounddevice-0.5.2-py3-none-any.whl.metadata (1.6 kB)
Downloading mediapipe-0.10.8-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (34.5 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m34.5/34.5 MB[0m [31m67.4 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading protobuf-3.20.3-py2.py3-none-any.whl (162 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m162.1/162.1 kB[0m [31m15.5 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading sounddevice-0.5.2-py3-none-any.whl (32 kB)
Installing collected packages: protobuf, sounddevice, mediapipe
  Attempting uninstall: protobuf
    Found existing installation: protobuf 5.29.5
  

In [1]:
#NEW ONE
import select

# Patch select to avoid WinError 10038
_ORIGINAL_SELECT = select.select
def _safe_select(*args, **kwargs):
    try:
        return _ORIGINAL_SELECT(*args, **kwargs)
    except OSError as e:
        if e.winerror == 10038:
            return [], [], []
        raise
select.select = _safe_select

import cv2
import math
import numpy as np
import os
import joblib
from sklearn.ensemble import RandomForestRegressor
import mediapipe as mp
from scipy.signal import find_peaks

mp_pose = mp.solutions.pose
pose = mp_pose.Pose(
    static_image_mode=False,
    model_complexity=2,
    min_detection_confidence=0.7,
    min_tracking_confidence=0.7
)

POINTS = {
    "shoulder": 12,
    "hip": 24,
    "knee": 26,
    "ankle": 28,
    "toe": 32,
    "left_ankle": 27,
    "left_toe": 31
}

UNIFORM_COLOR = (0, 255, 255)

def extract_keypoints(video_path):
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print("Error: Unable to open video file.")
        return np.array([]), [], {}, None, 0

    fps = cap.get(cv2.CAP_PROP_FPS)
    keypoints, frames = [], []
    trajectories = {name: [] for name in POINTS}
    pixel_to_meter = None
    frame_width, frame_height = 0, 0

    try:
        while cap.isOpened():
            ret, frame = cap.read()
            if not ret:
                break

            if frame_width == 0:
                frame_height, frame_width = frame.shape[:2]

            frames.append(frame.copy())
            rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            results = pose.process(rgb)

            if results.pose_landmarks:
                lm = results.pose_landmarks.landmark
                kp = np.array([[p.x, p.y] for p in lm])
                keypoints.append(kp)

                if pixel_to_meter is None:
                    hip = lm[mp_pose.PoseLandmark.RIGHT_HIP.value]
                    ankle = lm[mp_pose.PoseLandmark.RIGHT_ANKLE.value]
                    leg_length_px = abs(hip.y - ankle.y) * frame_height

                    if leg_length_px > 10:
                        pixel_to_meter = (0.45 * 1.7) / leg_length_px
                        print(f"Calibrated: 1px = {pixel_to_meter:.6f}m")
                    else:
                        pixel_to_meter = 0.001
                        print("Using default calibration")

                for name, idx in POINTS.items():
                    landmark = lm[idx]
                    if landmark.visibility > 0.5:
                        x = int(landmark.x * frame_width)
                        y = int(landmark.y * frame_height)
                        trajectories[name].append((x, y))
                    else:
                        if trajectories[name]:
                            trajectories[name].append(trajectories[name][-1])
                        else:
                            trajectories[name].append((0, 0))
            else:
                for name in POINTS:
                    if trajectories[name]:
                        trajectories[name].append(trajectories[name][-1])
                    else:
                        trajectories[name].append((0, 0))
    finally:
        cap.release()

    if pixel_to_meter is None:
        print("Warning: Using default calibration")
        pixel_to_meter = 0.001

    return np.array(keypoints), frames, trajectories, pixel_to_meter, fps

def angle(a, b, c):
    ab, bc = b - a, c - b
    cos_angle = np.dot(ab, bc) / (np.linalg.norm(ab) * np.linalg.norm(bc))
    return np.degrees(np.arccos(np.clip(cos_angle, -1.0, 1.0)))

def detect_steps_by_minima(right_ankle_traj, left_ankle_traj, min_prominence=10, min_distance=10):
    arr1 = np.array(right_ankle_traj)
    arr2 = np.array(left_ankle_traj)
    mask = ((arr1 != (0, 0)).all(axis=1)) & ((arr2 != (0, 0)).all(axis=1))
    dists = np.full(len(arr1), np.nan)
    dists[mask] = np.linalg.norm(arr1[mask] - arr2[mask], axis=1)
    peaks, _ = find_peaks(-dists, prominence=min_prominence, distance=min_distance)
    return peaks.tolist(), dists

def calculate_gait_parameters(keypoints, trajectories, pixel_to_meter, fps):
    if keypoints.size == 0 or len(trajectories["ankle"]) < 2:
        return np.zeros(6), [], [], []

    right_ankle_traj = trajectories["ankle"]
    left_ankle_traj = trajectories["left_ankle"]

    step_events, dists = detect_steps_by_minima(right_ankle_traj, left_ankle_traj)

    stride_lengths = []
    for i in range(1, len(step_events)):
        idx1, idx2 = step_events[i-1], step_events[i]
        pt1 = np.array(right_ankle_traj[idx1])
        pt2 = np.array(right_ankle_traj[idx2])
        if (pt1 != 0).all() and (pt2 != 0).all():
            stride_lengths.append(np.linalg.norm(pt2 - pt1) * pixel_to_meter)
    stride_length = np.mean(stride_lengths) if stride_lengths else 0

    # Cadence: steps per minute, using time between first and last detected step
    if len(step_events) > 1:
        total_time_sec = (step_events[-1] - step_events[0]) / fps
        cadence = (len(step_events) - 1) / total_time_sec * 60 if total_time_sec > 0 else 0
    else:
        cadence = 0

    speed_kmph = stride_length * cadence / 120 * 3.6 if cadence > 0 else 0

    knee_angles, hip_angles, ankle_angles = [], [], []
    for i in range(len(keypoints)):
        hip = keypoints[i, POINTS["hip"]]
        knee = keypoints[i, POINTS["knee"]]
        ankle = keypoints[i, POINTS["ankle"]]
        toe = keypoints[i, POINTS["toe"]]
        shoulder = keypoints[i, POINTS["shoulder"]]
        knee_angles.append(angle(hip, knee, ankle))
        hip_angles.append(angle(shoulder, hip, knee))
        ankle_angles.append(angle(knee, ankle, toe))

    return np.array([
        stride_length,
        cadence,
        np.mean(knee_angles),
        np.mean(hip_angles),
        np.mean(ankle_angles),
        speed_kmph
    ]), [knee_angles, hip_angles, ankle_angles], step_events

def overlay_text_on_video(frames, gait_params, suggestions, joint_angles,
                          output_path, trajectories, step_events):
    if not frames:
        print("Error: No frames to process")
        return

    height, width = frames[0].shape[:2]
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_path, fourcc, 30, (width, height))
    font = cv2.FONT_HERSHEY_SIMPLEX
    step_events_set = set(step_events)

    for i, frame in enumerate(frames):
        disp = frame.copy()

        # Draw trajectories for all joints except left_ankle
        for name, points in trajectories.items():
            if name in ["left_ankle", "left_toe"]:
                continue
            for j in range(1, min(i+1, len(points))):
                if points[j-1] != (0,0) and points[j] != (0,0):
                    alpha = max(0.3, j/len(points))
                    col = tuple(int(c*alpha) for c in UNIFORM_COLOR)
                    cv2.line(disp, points[j-1], points[j], col, 2)

        # Draw only points at joints (no skeleton lines)
        for name, points in trajectories.items():
            if i < len(points) and points[i] != (0,0):
                x, y = points[i]
                cv2.circle(disp, (x, y), 6, UNIFORM_COLOR, -1)
                cv2.circle(disp, (x, y), 8, (255, 255, 255), 2)

        # Mark step events (ankle collisions)
        if i in step_events_set:
            right_ankle_pos = trajectories["ankle"][i]
            left_ankle_pos = trajectories["left_ankle"][i]
            if right_ankle_pos != (0,0):
                cv2.circle(disp, right_ankle_pos, 12, (0, 0, 255), 3)
            if left_ankle_pos != (0,0):
                cv2.circle(disp, left_ankle_pos, 12, (255, 0, 0), 3)

        y_offset = 30
        if i < len(joint_angles[0]):
            knee_angle = joint_angles[0][i]
            hip_angle = joint_angles[1][i]
            ankle_angle = joint_angles[2][i]
        else:
            knee_angle = joint_angles[0][-1] if joint_angles[0] else 0
            hip_angle = joint_angles[1][-1] if joint_angles[1] else 0
            ankle_angle = joint_angles[2][-1] if joint_angles[2] else 0

        params_text = [
            f"Stride Length: {gait_params[0]:.4f} m",
            f"Cadence: {gait_params[1]:.2f} steps/min",
            f"Hip Angle: {hip_angle:.1f} deg",
            f"Knee Angle: {knee_angle:.1f} deg",
            f"Ankle Angle: {ankle_angle:.1f} deg",
            f"Speed: {gait_params[5]:.2f} km/h",
            f"Steps: {len(step_events)}"
        ]

        for text in params_text:
            cv2.putText(disp, text, (20, y_offset), font, 0.6, (0, 0, 0), 3)
            cv2.putText(disp, text, (20, y_offset), font, 0.6, (0, 255, 0), 2)
            y_offset += 25

        for idx, suggestion in enumerate(suggestions):
            cv2.putText(disp, suggestion, (20, height - 40 - idx * 25),
                        font, 0.6, (0, 0, 0), 3)
            cv2.putText(disp, suggestion, (20, height - 40 - idx * 25),
                        font, 0.6, (0, 255, 255), 2)

        out.write(disp)

    out.release()

def generate_suggestions(gait_params):
    suggestions = []
    OPTIMAL_VALUES = {
        "stride_length": (0.6, 1.0),
        "cadence": (70, 120),
        "knee_angle": (10, 70),
        "hip_angle": (10, 40),
        "ankle_angle": (10, 30)
    }
    param_names = ["Stride Length", "Cadence", "Knee Angle", "Hip Angle", "Ankle Angle"]

    for i, (param, (low, high)) in enumerate(OPTIMAL_VALUES.items()):
        value = gait_params[i]
        if value < low:
            suggestions.append(f"Increase {param_names[i]} to improve gait.")
        elif value > high:
            suggestions.append(f"Decrease {param_names[i]} to improve posture.")

    if not suggestions:
        suggestions.append("Gait parameters are within optimal range!")

    return suggestions

def process_video(video_path, output_path="output_video.mp4"):
    try:
        MODEL_PATH = "gait_model.pkl"
        DATA_PATH = "training_data.pkl"

        if os.path.exists(MODEL_PATH):
            model = joblib.load(MODEL_PATH)
        else:
            model = RandomForestRegressor(warm_start=True)

        if os.path.exists(DATA_PATH):
            X_train, y_train = joblib.load(DATA_PATH)
        else:
            X_train, y_train = [], []

        keypoints, frames, trajectories, pixel_to_meter, fps = extract_keypoints(video_path)

        if keypoints.size == 0 or not frames:
            raise Exception("No keypoints or frames extracted from video")

        gait_params, joint_angles, step_events = calculate_gait_parameters(
            keypoints, trajectories, pixel_to_meter, fps
        )

        suggestions = generate_suggestions(gait_params)

        print(f"Detected {len(step_events)} steps")
        print(f"Stride Length: {gait_params[0]:.4f}m")
        print(f"Cadence: {gait_params[1]:.2f} steps/min")
        print(f"Speed: {gait_params[5]:.2f} km/h")

        overlay_text_on_video(
            frames,
            gait_params,
            suggestions,
            joint_angles,
            output_path,
            trajectories,
            step_events
        )

        if joint_angles[0]:
            knee_mean = np.mean(joint_angles[0])
            if not np.isnan(knee_mean):
                X_train.append(gait_params.tolist())
                y_train.append(knee_mean)
                model.fit(np.array(X_train), np.array(y_train))
                joblib.dump(model, MODEL_PATH)
                joblib.dump((X_train, y_train), DATA_PATH)
                print("Model updated and saved")

        return gait_params
    except Exception as e:
        print(f"Error processing video: {e}")
        return None

Downloading model to /usr/local/lib/python3.11/dist-packages/mediapipe/modules/pose_landmark/pose_landmark_heavy.tflite


In [4]:
video_path = "walk-2.mp4"
process_video(video_path)

Calibrated: 1px = 0.004288m
Detected 9 steps
Stride Length: 0.5681m
Cadence: 101.52 steps/min
Speed: 1.73 km/h
Model updated and saved


  warn(


array([  0.56810583, 101.52375815,  10.90124422,   7.35888282,
        52.63148944,   1.73028716])

In [5]:
import cv2
import mediapipe as mp
import numpy as np
from google.colab import files
import os

input_video_path = 'output_video.mp4'
output_video_path = 'blurred_faces_output.mp4'

if not os.path.isfile(input_video_path):
    raise FileNotFoundError(f"Input video not found at {input_video_path}")

mp_face_mesh = mp.solutions.face_mesh
mp_face_detection = mp.solutions.face_detection

face_mesh = mp_face_mesh.FaceMesh(
    static_image_mode=False,
    max_num_faces=2,
    min_detection_confidence=0.5,
    min_tracking_confidence=0.5
)
face_detection = mp_face_detection.FaceDetection(model_selection=1, min_detection_confidence=0.5)

cap = cv2.VideoCapture(input_video_path)
fps = cap.get(cv2.CAP_PROP_FPS)
width  = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_video_path, fourcc, fps, (width, height))

while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break

    mask = np.zeros((height, width), dtype=np.uint8)
    rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    results_mesh = face_mesh.process(rgb)

    face_found = False
    if results_mesh.multi_face_landmarks:
        for face_landmarks in results_mesh.multi_face_landmarks:
            points = []
            for lm in face_landmarks.landmark:
                x = int(lm.x * width)
                y = int(lm.y * height)
                points.append([x, y])
            points = np.array(points, dtype=np.int32)
            points = points[(points[:,0] >= 0) & (points[:,0] < width) & (points[:,1] >= 0) & (points[:,1] < height)]
            if len(points) > 10:
                hull = cv2.convexHull(points)
                cv2.fillConvexPoly(mask, hull, 255)
                face_found = True

    # Fallback: use face detection if mesh failed
    if not face_found:
        results_det = face_detection.process(rgb)
        if results_det.detections:
            for det in results_det.detections:
                bbox = det.location_data.relative_bounding_box
                x1 = int(bbox.xmin * width)
                y1 = int(bbox.ymin * height)
                x2 = int((bbox.xmin + bbox.width) * width)
                y2 = int((bbox.ymin + bbox.height) * height)
                x1, y1 = max(0, x1), max(0, y1)
                x2, y2 = min(width - 1, x2), min(height - 1, y2)
                mask[y1:y2, x1:x2] = 255

    blurred = cv2.GaussianBlur(frame, (99, 99), 30)
    mask_3d = cv2.merge([mask, mask, mask])
    frame_blurred = np.where(mask_3d == 255, blurred, frame)
    out.write(frame_blurred.astype(np.uint8))

cap.release()
out.release()
face_mesh.close()
face_detection.close()

print("Processing complete!")
files.download(output_video_path)


Processing complete!


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>