In [12]:
# Updated Squat.py
# Adds a suite of evaluation metrics (tracking rate, jitter, angle deviation, ROM, stability,
# rep detection metrics, DTW similarity, and optional MPJPE/PCK/OKS when GT is provided).
#
# Saves per-frame CSV and a summary to /mnt/data/squat_metrics.csv
#
# NOTE: If you want Rep Precision/Recall or DTW similarity, drop:
#   - gt_reps.json  (format: [{"start": start_frame, "end": end_frame}, ...])
#   - expert.npy    (1D or 2D numpy array of expert trajectory; e.g., angles per frame)
#
# Based on original file: see source. :contentReference[oaicite:1]{index=1}

import cv2
import mediapipe as mp
import numpy as np
import os
import sys
import csv
import json
from collections import deque, defaultdict
from math import sqrt

In [13]:
from angle_utils import calculate_angle, vertical_angle

In [14]:
mp_drawing = mp.solutions.drawing_utils
mp_pose = mp.solutions.pose
cap = cv2.VideoCapture(0)

# --- Output file ---
OUT_CSV = "squat_metrics.csv"

# --- Metric settings ---
REQUIRED_KEYPOINTS_IDX = [
    mp_pose.PoseLandmark.LEFT_SHOULDER.value,
    mp_pose.PoseLandmark.RIGHT_SHOULDER.value,
    mp_pose.PoseLandmark.LEFT_HIP.value,
    mp_pose.PoseLandmark.RIGHT_HIP.value,
    mp_pose.PoseLandmark.LEFT_KNEE.value,
    mp_pose.PoseLandmark.RIGHT_KNEE.value,
    mp_pose.PoseLandmark.LEFT_ANKLE.value,
    mp_pose.PoseLandmark.RIGHT_ANKLE.value
]

MIN_CONFIDENCE = 0.3  # landmark.visibility threshold to count as detected
EXPECTED_SQUAT_ROM_DEGREES = (30, 140)  # expected min,max knee angles for a "full squat"

In [15]:
# --- helper metric functions ---

def euclidean(a, b):
    a = np.asarray(a); b = np.asarray(b)
    return np.linalg.norm(a - b)

def detection_flag_for_landmarks(landmarks, width, height):
    """
    returns True if enough required keypoints are visible above MIN_CONFIDENCE.
    Also returns per-keypoint vis list and pixel coords list
    """
    vis = []
    pts = []
    count = 0
    for idx in REQUIRED_KEYPOINTS_IDX:
        lm = landmarks[idx]
        v = getattr(lm, "visibility", 1.0)  # mediapipe gives visibility
        vis.append(v)
        pts.append((lm.x * width, lm.y * height, v))
        if v >= MIN_CONFIDENCE:
            count += 1
    # success if >= 75% required keypoints visible
    return (count >= int(0.75 * len(REQUIRED_KEYPOINTS_IDX))), vis, pts

# Jitter (temporal stability) helpers
def frame_speed(prev_pts, cur_pts):
    """mean speed (pixels) across required keypoints"""
    prev = np.asarray(prev_pts)
    cur = np.asarray(cur_pts)
    # prev/cur shape: (K,3) last column is visibility
    d = np.linalg.norm((cur[:, :2] - prev[:, :2]), axis=1)
    return np.nanmean(d)

def mean_abs_acceleration(speeds):
    """speeds: list of per-frame (scalar) mean speeds -> return mean absolute acceleration"""
    if len(speeds) < 3:
        return 0.0
    v = np.asarray(speeds)
    acc = np.abs(np.diff(v, n=1))
    return float(np.mean(acc))

# Angle helpers
def wrapped_angle_diff(a, b):
    d = abs(a - b) % 360
    return min(d, 360 - d)

# Rep detection evaluation (interval IoU)
def iou_intervals(a, b):
    s = max(a[0], b[0]); e = min(a[1], b[1])
    inter = max(0, e - s + 1)
    uni = (a[1] - a[0] + 1) + (b[1] - b[0] + 1) - inter
    return inter / uni if uni > 0 else 0.0

def rep_detection_metrics(pred_intervals, gt_intervals, iou_threshold=0.5):
    gt_matched = [False] * len(gt_intervals)
    tp = 0
    for p in pred_intervals:
        best_iou = 0; best_idx = -1
        for i, g in enumerate(gt_intervals):
            iou = iou_intervals(p, g)
            if iou > best_iou:
                best_iou = iou; best_idx = i
        if best_iou >= iou_threshold and not gt_matched[best_idx]:
            tp += 1; gt_matched[best_idx] = True
    fp = len(pred_intervals) - tp
    fn = len(gt_intervals) - tp
    prec = tp / (tp + fp) if (tp + fp) > 0 else 0.0
    rec = tp / (tp + fn) if (tp + fn) > 0 else 0.0
    f1 = (2 * prec * rec / (prec + rec)) if (prec + rec) > 0 else 0.0
    return {"precision": prec, "recall": rec, "f1": f1, "tp": tp, "fp": fp, "fn": fn}

# Simple DTW (supports multivariate if sequences are arrays of shape (T, D))
def dtw_distance(seq_a, seq_b):
    a = np.asarray(seq_a)
    b = np.asarray(seq_b)
    if a.ndim == 1:
        a = a[:, None]
    if b.ndim == 1:
        b = b[:, None]
    na, nb = len(a), len(b)
    D = np.full((na + 1, nb + 1), np.inf)
    D[0, 0] = 0.0
    for i in range(1, na + 1):
        for j in range(1, nb + 1):
            cost = np.linalg.norm(a[i - 1] - b[j - 1])
            D[i, j] = cost + min(D[i - 1, j], D[i, j - 1], D[i - 1, j - 1])
    return float(D[na, nb])

def dtw_similarity(seq_a, seq_b):
    # convert distance to similarity [0,1] using a soft mapping
    dist = dtw_distance(seq_a, seq_b)
    return 1.0 / (1.0 + dist)  # larger distance -> smaller similarity

# MPJPE / PCK / OKS helpers (require GT; included for completeness)
def mpjpe(pred_kpts, gt_kpts, mask=None):
    pred = np.asarray(pred_kpts); gt = np.asarray(gt_kpts)
    d = np.linalg.norm(pred - gt, axis=-1)
    if mask is None:
        return float(np.nanmean(d))
    mask = np.asarray(mask)
    return float(np.nanmean(d[mask]))

def pck(pred_kpts, gt_kpts, thresh, scale=None, mask=None):
    pred = np.asarray(pred_kpts); gt = np.asarray(gt_kpts)
    d = np.linalg.norm(pred - gt, axis=-1)
    if scale is not None:
        thr = thresh * float(scale)
    else:
        thr = thresh
    if mask is None:
        correct = (d <= thr)
    else:
        correct = (d <= thr) & np.asarray(mask)
    return float(np.sum(correct) / np.sum(np.ones_like(d) if mask is None else mask))

def oks(pred_kpts, gt_kpts, sigmas=None, area=1.0, vis_mask=None):
    pred = np.asarray(pred_kpts); gt = np.asarray(gt_kpts)
    K = pred.shape[0]
    if sigmas is None:
        sigmas = np.ones(K) * 0.08
    dx2 = np.sum((pred - gt) ** 2, axis=-1)
    var = (sigmas * 2) ** 2
    oks_per_k = np.exp(-dx2 / (2 * (var) * (area + np.finfo(float).eps)))
    if vis_mask is not None:
        oks_per_k = oks_per_k[np.asarray(vis_mask)]
    return float(np.mean(oks_per_k))

In [16]:
# ---- Data structures for runtime collection ----
frame_idx = 0
rep_count = 0
squat_state = "up"
pred_rep_intervals = []  # list of (start_frame, end_frame)
current_rep_start = None

# Data collected per frame
collected = {
    "frame": [],
    "detection": [],
    "avg_knee_angle": [],
    "avg_spine_angle": [],
    "l_knee_angle": [],
    "r_knee_angle": [],
    "l_spine_angle": [],
    "r_spine_angle": [],
    "mean_speed": []
}

# For temporal metrics
prev_pts = None
speed_history = []

# For ROM computation per rep
current_rep_knee_angles = []

# Try to load optional GT / expert files from cwd
gt_reps = None
expert_traj = None
if os.path.exists("gt_reps.json"):
    try:
        with open("gt_reps.json", "r") as f:
            gt_reps = [(int(x["start"]), int(x["end"])) for x in json.load(f)]
        print("Loaded GT rep intervals from gt_reps.json")
    except Exception as e:
        print("Failed to load gt_reps.json:", e)

if os.path.exists("expert.npy"):
    try:
        expert_traj = np.load("expert.npy", allow_pickle=True)
        print("Loaded expert trajectory from expert.npy")
    except Exception as e:
        print("Failed to load expert.npy:", e)

# --- Fullscreen window setup (kept from original) ---
cv2.namedWindow('Squat Form Correction + Rep Counter', cv2.WND_PROP_FULLSCREEN)
cv2.setWindowProperty('Squat Form Correction + Rep Counter', cv2.WND_PROP_FULLSCREEN, cv2.WINDOW_FULLSCREEN)

with mp_pose.Pose(min_detection_confidence=0.5,
                  min_tracking_confidence=0.5) as pose:
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break
        h, w = frame.shape[:2]

        image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        image.flags.writeable = False
        results = pose.process(image)
        image.flags.writeable = True
        image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)

        detection_flag = False
        try:
            landmarks = results.pose_landmarks.landmark

            # LEFT SIDE (normalized coords from mediapipe)
            l_shoulder = [landmarks[mp_pose.PoseLandmark.LEFT_SHOULDER.value].x,
                          landmarks[mp_pose.PoseLandmark.LEFT_SHOULDER.value].y]
            l_hip = [landmarks[mp_pose.PoseLandmark.LEFT_HIP.value].x,
                     landmarks[mp_pose.PoseLandmark.LEFT_HIP.value].y]
            l_knee = [landmarks[mp_pose.PoseLandmark.LEFT_KNEE.value].x,
                      landmarks[mp_pose.PoseLandmark.LEFT_KNEE.value].y]
            l_ankle = [landmarks[mp_pose.PoseLandmark.LEFT_ANKLE.value].x,
                       landmarks[mp_pose.PoseLandmark.LEFT_ANKLE.value].y]

            # RIGHT SIDE
            r_shoulder = [landmarks[mp_pose.PoseLandmark.RIGHT_SHOULDER.value].x,
                          landmarks[mp_pose.PoseLandmark.RIGHT_SHOULDER.value].y]
            r_hip = [landmarks[mp_pose.PoseLandmark.RIGHT_HIP.value].x,
                     landmarks[mp_pose.PoseLandmark.RIGHT_HIP.value].y]
            r_knee = [landmarks[mp_pose.PoseLandmark.RIGHT_KNEE.value].x,
                      landmarks[mp_pose.PoseLandmark.RIGHT_KNEE.value].y]
            r_ankle = [landmarks[mp_pose.PoseLandmark.RIGHT_ANKLE.value].x,
                       landmarks[mp_pose.PoseLandmark.RIGHT_ANKLE.value].y]

            # ANGLES (in degrees)
            l_knee_angle = calculate_angle(l_hip, l_knee, l_ankle)
            r_knee_angle = calculate_angle(r_hip, r_knee, r_ankle)
            l_spine_angle = vertical_angle(l_shoulder, l_hip)
            r_spine_angle = vertical_angle(r_shoulder, r_hip)

            avg_knee_angle = (l_knee_angle + r_knee_angle) / 2.0
            avg_spine_angle = (l_spine_angle + r_spine_angle) / 2.0

            # FEEDBACK (kept from your original)
            if avg_knee_angle > 140:
                squat_feedback = "Standing tall"
            elif avg_knee_angle > 80:
                squat_feedback = "Good squat"
            elif avg_knee_angle < 30:
                squat_feedback = "Too low"
            else:
                squat_feedback = ""

            if avg_spine_angle < 10:
                spine_feedback = "Upright posture"
            elif avg_spine_angle < 30:
                spine_feedback = "Correct posture"
            else:
                spine_feedback = "Too much forward lean"

            # --- REP LOGIC (kept) ---
            if squat_feedback == "Good squat" and spine_feedback == "Correct posture":
                if squat_state == "up":
                    squat_state = "down"
                    current_rep_knee_angles = []  # start collecting for ROM
                    current_rep_start = frame_idx

            if squat_feedback == "Standing tall" and spine_feedback == "Upright posture":
                if squat_state == "down":
                    rep_count += 1
                    squat_state = "up"
                    if current_rep_start is not None:
                        pred_rep_intervals.append((current_rep_start, frame_idx))
                        current_rep_start = None

            # --- detection flag and pixel coordinates ---
            detection_flag, vis_list, pts = detection_flag_for_landmarks(landmarks, w, h)

            # compute per-frame mean speed (jitter)
            if prev_pts is None:
                mean_spd = 0.0
            else:
                mean_spd = frame_speed(prev_pts, pts)
            prev_pts = pts
            speed_history.append(mean_spd)

            # collect angle for ROM and DTW
            if squat_state == "down":
                current_rep_knee_angles.append(avg_knee_angle)

            # store data
            collected["frame"].append(frame_idx)
            collected["detection"].append(1 if detection_flag else 0)
            collected["avg_knee_angle"].append(float(avg_knee_angle))
            collected["avg_spine_angle"].append(float(avg_spine_angle))
            collected["l_knee_angle"].append(float(l_knee_angle))
            collected["r_knee_angle"].append(float(r_knee_angle))
            collected["l_spine_angle"].append(float(l_spine_angle))
            collected["r_spine_angle"].append(float(r_spine_angle))
            collected["mean_speed"].append(float(mean_spd))

            # overlay (some running metrics)
            # Tracking success rate (running)
            track_rate = np.mean(collected["detection"]) if len(collected["detection"]) > 0 else 0.0
            jitter_now = np.std(speed_history[-30:]) if len(speed_history) >= 2 else 0.0
            angle_symmetry = abs(l_knee_angle - r_knee_angle)

            cv2.putText(image, f'Reps: {rep_count}', (30, 40),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 255, 255), 2)
            cv2.putText(image, f'Knee Angle: {int(avg_knee_angle)}', (30, 80),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255, 255, 0), 2)
            cv2.putText(image, f'Spine Lean: {int(avg_spine_angle)}', (30, 120),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 255), 2)
            cv2.putText(image, squat_feedback, (30, 160),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 255, 0), 2)
            cv2.putText(image, spine_feedback, (30, 200),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 128, 255), 2)

            # small metric overlays
            cv2.putText(image, f'TrackRate: {track_rate*100:.0f}%', (30, h-80),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.7, (200, 200, 200), 2)
            cv2.putText(image, f'Jitter: {jitter_now:.2f}', (30, h-50),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.7, (200, 200, 200), 2)
            cv2.putText(image, f'AngleSym: {angle_symmetry:.1f}', (30, h-20),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.7, (200, 200, 200), 2)

        except Exception as e:
            # If pose not detected, still log missing frame
            collected["frame"].append(frame_idx)
            collected["detection"].append(0)
            collected["avg_knee_angle"].append(np.nan)
            collected["avg_spine_angle"].append(np.nan)
            collected["l_knee_angle"].append(np.nan)
            collected["r_knee_angle"].append(np.nan)
            collected["l_spine_angle"].append(np.nan)
            collected["r_spine_angle"].append(np.nan)
            collected["mean_speed"].append(0.0)

        mp_drawing.draw_landmarks(image, results.pose_landmarks, mp_pose.POSE_CONNECTIONS)
        cv2.imshow('Squat Form Correction + Rep Counter', image)

        frame_idx += 1
        if cv2.waitKey(10) & 0xFF == ord('q'):
            break

cap.release()
cv2.destroyAllWindows()

# --- Post-run: compute metrics and save CSV ---
# Compute tracking success rate
detection_array = np.array(collected["detection"])
tracking_success_rate = float(np.mean(detection_array))

# Jitter score: use std of frame-to-frame mean speed + mean absolute acceleration
speeds = np.array(collected["mean_speed"])
jitter_score = float(np.std(speeds)) if speeds.size > 1 else 0.0
mean_abs_acc = mean_abs_acceleration(list(speeds))

# Angle deviation (left-right symmetry)
l_knee = np.array(collected["l_knee_angle"], dtype=np.float64)
r_knee = np.array(collected["r_knee_angle"], dtype=np.float64)
valid_lr = ~np.isnan(l_knee) & ~np.isnan(r_knee)
angle_deviation_lr = float(np.mean(np.abs(l_knee[valid_lr] - r_knee[valid_lr]))) if np.any(valid_lr) else np.nan

# ROM per rep
rom_scores = []
for interval in pred_rep_intervals:
    s, e = interval
    arr = np.array(collected["avg_knee_angle"][s:e+1], dtype=np.float64)
    arr = arr[~np.isnan(arr)]
    if arr.size == 0:
        continue
    actual_min = float(np.min(arr))
    actual_max = float(np.max(arr))
    actual_range = actual_max - actual_min
    expected_range = EXPECTED_SQUAT_ROM_DEGREES[1] - EXPECTED_SQUAT_ROM_DEGREES[0]
    rom_fraction = max(0.0, min(1.0, actual_range / expected_range))
    rom_scores.append(rom_fraction)
rom_score_mean = float(np.mean(rom_scores)) if len(rom_scores) > 0 else np.nan

# Stability: variance of spine angle during the session (lower = more stable)
spine_arr = np.array(collected["avg_spine_angle"], dtype=np.float64)
stability_score = float(np.nanvar(spine_arr)) if np.any(~np.isnan(spine_arr)) else np.nan

# Rep detection metrics vs GT (if GT provided)
rep_metrics_summary = None
if gt_reps is not None:
    rep_metrics_summary = rep_detection_metrics(pred_rep_intervals, gt_reps)

# DTW similarity vs expert
dtw_sim = None
if expert_traj is not None:
    # compare sequences of avg_knee_angle (drop nans)
    user_seq = np.array(collected["avg_knee_angle"], dtype=np.float64)
    mask = ~np.isnan(user_seq)
    user_seq = user_seq[mask]
    # expert_traj might be longer/shorter; compute similarity
    try:
        dtw_sim = dtw_similarity(user_seq, expert_traj)
    except Exception as e:
        dtw_sim = None

# Write per-frame CSV + summary row
fieldnames = [
    "frame", "detection", "avg_knee_angle", "avg_spine_angle",
    "l_knee_angle", "r_knee_angle", "l_spine_angle", "r_spine_angle",
    "mean_speed"
]
with open(OUT_CSV, "w", newline="") as csvfile:
    writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
    writer.writeheader()
    for i in range(len(collected["frame"])):
        row = {k: (collected[k][i] if i < len(collected[k]) else "") for k in fieldnames}
        writer.writerow(row)
    # summary rows appended after frames
    writer.writerow({})
    writer.writerow({"frame": "SUMMARY", "detection": f"TrackingRate={tracking_success_rate:.3f}"})
    writer.writerow({"frame": "SUMMARY", "avg_knee_angle": f"JitterStd={jitter_score:.4f}", "avg_spine_angle": f"MeanAbsAcc={mean_abs_acc:.4f}"})
    writer.writerow({"frame": "SUMMARY", "l_knee_angle": f"AngleDevLR={angle_deviation_lr:.3f}", "r_knee_angle": f"ROMMean={rom_score_mean:.3f}"})
    writer.writerow({"frame": "SUMMARY", "l_spine_angle": f"StabilityVar={stability_score:.4f}"})
    writer.writerow({"frame": "SUMMARY", "r_spine_angle": f"PredReps={len(pred_rep_intervals)}"})

    if rep_metrics_summary is not None:
        writer.writerow({"frame": "SUMMARY", "mean_speed": f"RepMetrics={rep_metrics_summary}"})
    if dtw_sim is not None:
        writer.writerow({"frame": "SUMMARY", "mean_speed": f"DTW_similarity={dtw_sim:.4f}"})

print("Saved per-frame CSV and summary to", OUT_CSV)
print("Metrics summary:")
print(f"Tracking Success Rate: {tracking_success_rate:.3f}")
print(f"Jitter (std of mean speeds): {jitter_score:.4f}")
print(f"Mean Absolute Acceleration: {mean_abs_acc:.4f}")
print(f"Angle Deviation (L vs R knees): {angle_deviation_lr:.3f} degrees")
print(f"ROM score (mean across predicted reps): {rom_score_mean if not np.isnan(rom_score_mean) else 'N/A'}")
print(f"Stability score (spine angle variance): {stability_score if not np.isnan(stability_score) else 'N/A'}")
print(f"Predicted rep intervals (count): {len(pred_rep_intervals)}")
if rep_metrics_summary:
    print("Rep detection vs GT:", rep_metrics_summary)
if dtw_sim is not None:
    print(f"DTW similarity vs expert: {dtw_sim:.4f}")


Saved per-frame CSV and summary to squat_metrics.csv
Metrics summary:
Tracking Success Rate: 0.813
Jitter (std of mean speeds): 12.8726
Mean Absolute Acceleration: 3.7997
Angle Deviation (L vs R knees): 2.975 degrees
ROM score (mean across predicted reps): 0.6485730968504136
Stability score (spine angle variance): 46.61852550120942
Predicted rep intervals (count): 6
