In [1]:
# Pushup analysis with Pose Estimation + Temporal Signal Analysis
# - Counts pushups by form (standard, wide, narrow/diamond, knee)
# - Detects whether user is in pushup position vs standing/sitting/unknown
# - Scores each rep (percent rating) with feedback (lockout / depth / alignment)
# - Produces an overlay video, a per-rep CSV, and a JSON summary
#
# Input video (uploaded by user): /mnt/data/Pushups.mp4
# Outputs:
#   /mnt/data/pushup_analysis.mp4         (overlay video)
#   /mnt/data/pushup_analysis.csv         (per-rep details)
#   /mnt/data/pushup_summary.json         (session summary)
#   Displayed DataFrame: per-form counts and scores
#
# Notes:
# - Requires mediapipe and opencv-python available in this environment.

import cv2
import json
import math
import os
import sys
import traceback
import numpy as np
import pandas as pd
from collections import deque, defaultdict

try:
    import mediapipe as mp
except Exception as e:
    raise RuntimeError("mediapipe package is required for pose estimation but could not be imported.")

In [2]:
# ---- Pushup position & one-arm detection knobs ----
SHOW_WINDOW = True
FORCE_ROTATION = 90
# Presence-based arm visibility thresholds
ARM_VIS_THRESH = 0.60      # a joint is considered visible if visibility >= this
ONE_ARM_MIN_FRAC = 0.60    # fraction of rep frames with exactly 1 arm present

# Lift/hidden-hand detection
LIFT_GAP_FRAC = 0.10      # y-gap (as fraction of body scale) for "lifted" hand (higher = smaller y)
LIFT_MIN_FRAC = 0.60      # fraction of rep frames satisfying one-arm condition
VIS_THRESH    = 0.50      # if a wrist visibility is below this, treat as "hidden"
LIFTED_LOCK   = 165       # lifted arm elbow ~ locked (deg)
DRIVER_ELBOW_DELTA = 15   # min bend range (deg) on the grounded arm across the rep

from collections import Counter
form_counts = Counter()   # will track {"one-arm": x, "two-arm": y}

In [3]:
# --------------- Utility Functions ---------------

def angle_3pt(a, b, c):
    """
    Returns the angle ABC (at point b) in degrees, given points a, b, c as (x, y)
    """
    try:
        ab = np.array([a[0] - b[0], a[1] - b[1]])
        cb = np.array([c[0] - b[0], c[1] - b[1]])
        # Normalize to avoid weird values
        ab_norm = ab / (np.linalg.norm(ab) + 1e-9)
        cb_norm = cb / (np.linalg.norm(cb) + 1e-9)
        cosang = np.dot(ab_norm, cb_norm)
        cosang = np.clip(cosang, -1.0, 1.0)
        ang = math.degrees(math.acos(cosang))
        return ang
    except Exception:
        return np.nan


def dist(a, b):
    return math.hypot(a[0] - b[0], a[1] - b[1])


def robust_percentile(x, q):
    x = np.array([v for v in x if not np.isnan(v)])
    if x.size == 0:
        return np.nan
    return float(np.percentile(x, q))


def smooth(series, window=5):
    """Simple moving average smoothing"""
    if len(series) < 2 or window <= 1:
        return series[:]
    kernel = np.ones(window) / window
    out = np.convolve(series, kernel, mode='same')
    # End-point fix
    out[0] = np.mean(series[:window])
    out[-1] = np.mean(series[-window:])
    return out.tolist()


# --------------- Form & State Classification ---------------

def classify_form(frame_feat):
    # Only used as a fallback label; final form set at rep completion.
    # Default to 'two-arm' unless proven one-arm later.
    return 'two-arm'

def classify_body_state(frame_feat):
    torso_angle  = frame_feat.get("torso_angle", np.nan)
    shoulder_y   = frame_feat.get("shoulder_y_mean", np.nan)
    wrist_y_mean = frame_feat.get("wrist_y_mean", np.nan)
    if np.isnan(torso_angle) or np.isnan(shoulder_y) or np.isnan(wrist_y_mean):
        return "unknown"
    # pushup-ish when torso nearer horizontal and wrists below shoulders
    if torso_angle <= 35 and wrist_y_mean > shoulder_y:
        return "pushup"
    return "unknown"

# --------------- Scoring Functions ---------------

def score_depth(min_elbow_deg):
    """100 at <=60°, 50 at 100°, 0 at >=120° (linear segments)"""
    if np.isnan(min_elbow_deg):
        return 0.0
    if min_elbow_deg <= 60:
        return 100.0
    if min_elbow_deg >= 120:
        return 0.0
    # Linear decline from 60→120
    return float(np.interp(min_elbow_deg, [60, 100, 120], [100, 50, 0]))


def score_lockout(max_elbow_deg):
    """100 at >=175°, 50 at 165°, 0 at <=150° (linear segments)"""
    if np.isnan(max_elbow_deg):
        return 0.0
    if max_elbow_deg >= 175:
        return 100.0
    if max_elbow_deg <= 150:
        return 0.0
    return float(np.interp(max_elbow_deg, [150, 165, 175], [0, 50, 100]))


def score_alignment(hip_angle_series):
    """
    Score based on hip angle (shoulder-hip-ankle) over a rep.
    Ideal ~180°; 100 at 175-185, 50 at ±20, 0 at ±40
    """
    if not hip_angle_series:
        return 0.0
    dev = [abs(v - 180) for v in hip_angle_series if not np.isnan(v)]
    if not dev:
        return 0.0
    mean_dev = float(np.mean(dev))
    if mean_dev <= 5:
        return 100.0
    if mean_dev >= 40:
        return 0.0
    # Linear decline: 5→20 → 100→50, 20→40 → 50→0
    if mean_dev <= 20:
        return float(np.interp(mean_dev, [5, 20], [100, 50]))
    else:
        return float(np.interp(mean_dev, [20, 40], [50, 0]))


def rep_feedback(depth_score, lock_score, align_score):
    todo = []
    if depth_score < 80:
        todo.append("go deeper")
    if lock_score < 80:
        todo.append("lock your arms more at the top")
    if align_score < 70:
        todo.append("keep hips aligned (avoid sag/pike)")
    if not todo:
        return "great rep!"
    # Join feedback succinctly
    if len(todo) == 1:
        return todo[0]
    if len(todo) == 2:
        return f"{todo[0]} and {todo[1]}"
    return f"{todo[0]}, {todo[1]}, and {todo[2]}"


# --------------- Pose Processing ---------------

mp_pose = mp.solutions.pose
mp_drawing = mp.solutions.drawing_utils
mp_styles = mp.solutions.drawing_styles

L = mp_pose.PoseLandmark

# Indices
LEFT = {
    "shoulder": L.LEFT_SHOULDER.value,
    "elbow": L.LEFT_ELBOW.value,
    "wrist": L.LEFT_WRIST.value,
    "hip": L.LEFT_HIP.value,
    "knee": L.LEFT_KNEE.value,
    "ankle": L.LEFT_ANKLE.value,
}
RIGHT = {
    "shoulder": L.RIGHT_SHOULDER.value,
    "elbow": L.RIGHT_ELBOW.value,
    "wrist": L.RIGHT_WRIST.value,
    "hip": L.RIGHT_HIP.value,
    "knee": L.RIGHT_KNEE.value,
    "ankle": L.RIGHT_ANKLE.value,
}


def get_xy(landmarks, idx, shape):
    h, w = shape[:2]
    lm = landmarks[idx]
    return (lm.x * w, lm.y * h)


def extract_frame_features(landmarks, shape):
    """Compute per-frame geometric features used for classification and scoring"""
    h, w = shape[:2]
    pts = {}
    for side, dct in (("L", LEFT), ("R", RIGHT)):
        pts[side] = {}
        for k, idx in dct.items():
            lm = landmarks[idx]
            pts[side][k] = (lm.x * w, lm.y * h)

    # Distances
    wrist_dist = abs(pts["L"]["wrist"][0] - pts["R"]["wrist"][0])
    shoulder_dist = abs(pts["L"]["shoulder"][0] - pts["R"]["shoulder"][0])

    # Knee angle (hip-knee-ankle) both sides avg
    knee_L = angle_3pt(pts["L"]["hip"], pts["L"]["knee"], pts["L"]["ankle"])
    knee_R = angle_3pt(pts["R"]["hip"], pts["R"]["knee"], pts["R"]["ankle"])
    knee_angle = np.nanmean([knee_L, knee_R])

    # Torso angle relative to horizontal (0=horizontal, 90=vertical)
    # Use vector hip→shoulder (avg of both sides)
    shoulder_mid = ((pts["L"]["shoulder"][0] + pts["R"]["shoulder"][0]) / 2.0,
                    (pts["L"]["shoulder"][1] + pts["R"]["shoulder"][1]) / 2.0)
    hip_mid = ((pts["L"]["hip"][0] + pts["R"]["hip"][0]) / 2.0,
               (pts["L"]["hip"][1] + pts["R"]["hip"][1]) / 2.0)

    v = np.array([shoulder_mid[0] - hip_mid[0], shoulder_mid[1] - hip_mid[1]])
    v_norm = v / (np.linalg.norm(v) + 1e-9)
    # Angle to horizontal axis
    torso_angle = abs(math.degrees(math.atan2(v_norm[1], v_norm[0])))
    # Map to [0,90]
    torso_angle = torso_angle if torso_angle <= 90 else 180 - torso_angle

    # Elbow angle (shoulder-elbow-wrist) both sides avg
    elbow_L = angle_3pt(pts["L"]["shoulder"], pts["L"]["elbow"], pts["L"]["wrist"])
    elbow_R = angle_3pt(pts["R"]["shoulder"], pts["R"]["elbow"], pts["R"]["wrist"])
    elbow_angle = np.nanmean([elbow_L, elbow_R])

    # Hip straightness: angle shoulder-hip-ankle both sides
    hip_L = angle_3pt(pts["L"]["shoulder"], pts["L"]["hip"], pts["L"]["ankle"])
    hip_R = angle_3pt(pts["R"]["shoulder"], pts["R"]["hip"], pts["R"]["ankle"])
    hip_angle = np.nanmean([hip_L, hip_R])

    wrist_y_mean = (pts["L"]["wrist"][1] + pts["R"]["wrist"][1]) / 2.0
    shoulder_y_mean = (pts["L"]["shoulder"][1] + pts["R"]["shoulder"][1]) / 2.0
    hip_y_mean = (pts["L"]["hip"][1] + pts["R"]["hip"][1]) / 2.0

    # y’s and body scale (you already have wrist_Ly/wrist_Ry/body_scale etc.)
    wrist_Ly = pts["L"]["wrist"][1]
    wrist_Ry = pts["R"]["wrist"][1]
    ankle_y_mean = (pts["L"]["ankle"][1] + pts["R"]["ankle"][1]) / 2.0
    shoulder_y_mean = (pts["L"]["shoulder"][1] + pts["R"]["shoulder"][1]) / 2.0
    body_scale = abs(shoulder_y_mean - ankle_y_mean) + 1e-6

    # >>> NEW: visibility of wrists from landmarks <<<
    wL_vis = landmarks[LEFT["wrist"]].visibility if hasattr(landmarks[0], "visibility") else 1.0
    wR_vis = landmarks[RIGHT["wrist"]].visibility if hasattr(landmarks[0], "visibility") else 1.0
    
    # >>> NEW: per-joint visibility (if available)
    def vis(idx):
        lm = landmarks[idx]
        return getattr(lm, "visibility", 1.0)

    vis_L = {
        "shoulder": vis(LEFT["shoulder"]),
        "elbow":    vis(LEFT["elbow"]),
        "wrist":    vis(LEFT["wrist"]),
    }
    vis_R = {
        "shoulder": vis(RIGHT["shoulder"]),
        "elbow":    vis(RIGHT["elbow"]),
        "wrist":    vis(RIGHT["wrist"]),
    }

    return {
        # --- existing fields you already return ---
        "wrist_dist": wrist_dist,
        "shoulder_dist": shoulder_dist,
        "torso_angle": torso_angle,
        "elbow_angle": elbow_angle,
        "elbow_L": elbow_L,
        "elbow_R": elbow_R,
        "hip_angle": hip_angle,
        "wrist_y_mean": (wrist_Ly + wrist_Ry) / 2.0,
        "shoulder_y_mean": shoulder_y_mean,
        "hip_y_mean": hip_y_mean,
        "shoulder_mid": shoulder_mid,
        "hip_mid": hip_mid,
        "wrist_Ly": wrist_Ly,
        "wrist_Ry": wrist_Ry,
        "body_scale": body_scale,

        # >>> add the visibilities <<<
        "vis_L_shoulder": vis_L["shoulder"],
        "vis_L_elbow":    vis_L["elbow"],
        "vis_L_wrist":    vis_L["wrist"],
        "vis_R_shoulder": vis_R["shoulder"],
        "vis_R_elbow":    vis_R["elbow"],
        "vis_R_wrist":    vis_R["wrist"],
    }

def rep_is_one_arm_presence_based(rep_feats,
                                  vis_thresh=ARM_VIS_THRESH,
                                  min_frac=ONE_ARM_MIN_FRAC):
    """
    'One-arm' if, for >= min_frac of the rep frames, exactly one arm is 'present':
    - An arm is 'present' if its shoulder/elbow/wrist visibilities are mostly above vis_thresh.
    """
    if not rep_feats:
        return False

    frames_with_one = 0
    total = 0

    for f in rep_feats:
        if not f:
            continue
        # side presence: require at least 2 of the 3 joints visible for robustness
        L_present = sum([
            f.get("vis_L_shoulder", 0) >= vis_thresh,
            f.get("vis_L_elbow", 0)    >= vis_thresh,
            f.get("vis_L_wrist", 0)    >= vis_thresh,
        ]) >= 2

        R_present = sum([
            f.get("vis_R_shoulder", 0) >= vis_thresh,
            f.get("vis_R_elbow", 0)    >= vis_thresh,
            f.get("vis_R_wrist", 0)    >= vis_thresh,
        ]) >= 2

        if not (L_present or R_present):
            # neither arm confidently present -> ignore this frame for decision
            continue

        total += 1
        if (L_present and not R_present) or (R_present and not L_present):
            frames_with_one += 1

    if total == 0:
        return False

    return (frames_with_one / total) >= min_frac

        
def rep_is_one_arm(rep_feats,
                   lift_gap_frac=LIFT_GAP_FRAC,
                   min_frac=LIFT_MIN_FRAC,
                   vis_thresh=VIS_THRESH,
                   lifted_lock=LIFTED_LOCK,
                   driver_delta=DRIVER_ELBOW_DELTA):
    """
    A rep is one-arm if, for most frames:
      - One wrist is 'hidden' (low visibility) OR clearly 'lifted' (higher by gap),
      - The opposite (grounded) arm shows clear elbow bend/extend across the rep,
      - The lifted arm stays near-locked (doesn't bend much).
    """
    if not rep_feats:
        return False

    # body scale (robust)
    scales = [f.get("body_scale") for f in rep_feats if f and not np.isnan(f.get("body_scale", np.nan))]
    if not scales:
        return False
    scale = float(np.median(scales))
    if scale <= 0:
        return False
    gap = lift_gap_frac * scale

    # Per-frame classification; collect elbow angles by role (driver vs lifted)
    driver_elbows = []
    lifted_elbows = []
    one_arm_frames = 0
    total_frames = 0

    for f in rep_feats:
        if not f: 
            continue
        Ly, Ry = f.get("wrist_Ly", np.nan), f.get("wrist_Ry", np.nan)
        vL, vR = f.get("wrist_Lvis", 1.0), f.get("wrist_Rvis", 1.0)
        eL, eR = f.get("elbow_L", np.nan), f.get("elbow_R", np.nan)
        if np.isnan(Ly) or np.isnan(Ry) or np.isnan(eL) or np.isnan(eR):
            continue
        total_frames += 1

        # Determine 'grounded' hand = lower (larger y) if both visible enough; otherwise use the visible one
        left_hidden  = vL < vis_thresh
        right_hidden = vR < vis_thresh

        if left_hidden and not right_hidden:
            grounded = 'R'; lifted = 'L'
        elif right_hidden and not left_hidden:
            grounded = 'L'; lifted = 'R'
        else:
            # Neither strongly hidden: use y-gap to detect lifted
            left_lifted  = (Ly + gap) < Ry   # left higher (smaller y)
            right_lifted = (Ry + gap) < Ly   # right higher
            if left_lifted and not right_lifted:
                grounded, lifted = 'R', 'L'
            elif right_lifted and not left_lifted:
                grounded, lifted = 'L', 'R'
            else:
                grounded = None
                lifted = None

        if grounded and lifted:
            one_arm_frames += 1
            if grounded == 'L':
                driver_elbows.append(eL)
                lifted_elbows.append(eR)
            else:
                driver_elbows.append(eR)
                lifted_elbows.append(eL)

    if total_frames == 0:
        return False

    # Enough of the rep looked one-arm?
    if (one_arm_frames / total_frames) < min_frac:
        return False

    # Driver shows bend/extend, lifted stays near locked
    if not driver_elbows or not lifted_elbows:
        return False

    driver_range = float(np.nanmax(driver_elbows) - np.nanmin(driver_elbows))
    lifted_min   = float(np.nanmin(lifted_elbows))

    return (driver_range >= driver_delta) and (lifted_min >= (lifted_lock - 5))


def rotate_frame(frame, deg):
    if deg == 90:
        return cv2.rotate(frame, cv2.ROTATE_90_CLOCKWISE)
    if deg == -90:
        return cv2.rotate(frame, cv2.ROTATE_90_COUNTERCLOCKWISE)
    return frame

# --------------- Rep Segmentation ---------------

class RepCounter:
    """
    Counts reps using elbow angle signal.
    - Enter bottom when elbow angle < down_th
    - Count rep when returning to top with elbow angle > up_th
    """
    def __init__(self, up_th=165, down_th=95, min_frames_per_half=5):
        self.up_th = up_th
        self.down_th = down_th
        self.state = "top"   # "top", "down"
        self.frames_in_state = 0
        self.min_frames_per_half = min_frames_per_half
        self.current_rep_frames = []  # list of (frame_idx)
        self.completed_reps = []      # list of list of frame indices (per rep)

    def update(self, frame_idx, elbow_angle):
        if np.isnan(elbow_angle):
            self.frames_in_state = 0
            self.current_rep_frames.clear()
            self.state = "top"
            return None

        # State machine
        if self.state == "top":
            if elbow_angle < self.down_th:
                # Start going down
                self.state = "down"
                self.frames_in_state = 1
                self.current_rep_frames = [frame_idx]
            else:
                self.frames_in_state += 1
        elif self.state == "down":
            self.current_rep_frames.append(frame_idx)
            if elbow_angle > self.up_th and self.frames_in_state >= self.min_frames_per_half:
                # Completed: down then up
                rep_frames = self.current_rep_frames[:]
                self.completed_reps.append(rep_frames)
                # Reset
                self.state = "top"
                self.frames_in_state = 0
                self.current_rep_frames = []
                return rep_frames
            else:
                self.frames_in_state += 1
        return None

In [4]:
# --------------- Main Processing ---------------

video_path = "Pushups.mp4"
if not os.path.exists(video_path):
    raise FileNotFoundError(f"Input video not found at {video_path}")

cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
    raise RuntimeError("Could not open the video. The file might be corrupted or in an unsupported format.")

# --- Rotation-aware writer setup ---
fps = cap.get(cv2.CAP_PROP_FPS) or 30.0
src_w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
src_h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

# Output dimensions depend on rotation
if FORCE_ROTATION in (90, -90):
    out_w, out_h = src_h, src_w   # swap if rotated 90°
else:
    out_w, out_h = src_w, src_h

# Choose codec and create writer
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
out_path_mp4 = "pushup_analysis.mp4"
writer = cv2.VideoWriter(out_path_mp4, fourcc, fps, (out_w, out_h))

# Fallback if MP4 not available on your system
if not writer.isOpened():
    fourcc = cv2.VideoWriter_fourcc(*"XVID")
    out_path_mp4 = "pushup_analysis.avi"
    writer = cv2.VideoWriter(out_path_mp4, fourcc, fps, (out_w, out_h))


pose = mp_pose.Pose(model_complexity=1, enable_segmentation=False, smooth_landmarks=True)

# Storage
frame_feats = []
elbow_angles = []
hip_angles = []
body_states = []
forms = []

# more forgiving thresholds + slightly shorter half-cycle
rep_counter = RepCounter(up_th=160, down_th=110, min_frames_per_half=3)


# Rolling mode estimates to avoid jitter
form_window = deque(maxlen=7)
state_window = deque(maxlen=7)

# For overlay
rep_num = 0
last_rep_score = None
last_rep_feedback = ""

# Per-rep details for CSV
rep_rows = []

frame_idx = 0

try:
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        frame = rotate_frame(frame, FORCE_ROTATION)

        image_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        res = pose.process(image_rgb)

        if res.pose_landmarks:
            feats = extract_frame_features(res.pose_landmarks.landmark, frame.shape)
            frame_feats.append(feats)

            elbow_angles.append(feats["elbow_angle"])
            hip_angles.append(feats["hip_angle"])

            # State + form (smoothed voting)
            state = classify_body_state(feats)
            form = classify_form(feats)

            state_window.append(state)
            form_window.append(form)

            # Majority vote in the window
            state_mode = max(set(state_window), key=state_window.count)
            form_mode = max(set(form_window), key=form_window.count)

            body_states.append(state_mode)
            forms.append(form_mode)

            # Update rep counter for pushup state only
            # Count whenever elbow angle is valid (no strict body-state gating)
            if not np.isnan(feats["elbow_angle"]):
                rep_frames = rep_counter.update(frame_idx, feats["elbow_angle"])
                if rep_frames is not None and len(rep_frames) > 0:
                    rep_num += 1
                    # Compute rep metrics over these frames
                    rep_feats = [frame_feats[i] for i in rep_frames if i < len(frame_feats)]
                    rep_elbows = [f["elbow_angle"] for f in rep_feats]
                    rep_hips = [f["hip_angle"] for f in rep_feats]
                    rep_forms = [forms[i] for i in rep_frames if i < len(forms)]

                    min_elbow = float(np.nanmin(rep_elbows)) if rep_elbows else np.nan
                    max_elbow = float(np.nanmax(rep_elbows)) if rep_elbows else np.nan
                    depth = score_depth(min_elbow)
                    lockout = score_lockout(max_elbow)
                    align = score_alignment(rep_hips)
                    score = round(0.4 * depth + 0.4 * lockout + 0.2 * align, 1)
                    feedback = rep_feedback(depth, lockout, align)

                    # ----- Rep-level form logic: ONLY one-arm vs two-arm (presence-based) -----
                    if rep_is_one_arm_presence_based(rep_feats):
                        rep_form = "one-arm"
                    else:
                        rep_form = "two-arm"
                    
                    # increment exactly one bucket
                    form_counts[rep_form] += 1

                    t_start = rep_frames[0] / fps
                    t_end = rep_frames[-1] / fps

                    last_rep_score = score
                    last_rep_feedback = feedback

                    rep_rows.append({
                        "rep": rep_num,
                        "start_time_s": round(t_start, 2),
                        "end_time_s": round(t_end, 2),
                        "form": rep_form,
                        "min_elbow_deg": round(min_elbow, 1) if not np.isnan(min_elbow) else None,
                        "max_elbow_deg": round(max_elbow, 1) if not np.isnan(max_elbow) else None,
                        "depth_score": round(depth, 1),
                        "lockout_score": round(lockout, 1),
                        "alignment_score": round(align, 1),
                        "rep_score": score,
                        "feedback": feedback
                    })
        else:
            # No landmarks
            frame_feats.append(None)
            elbow_angles.append(np.nan)
            hip_angles.append(np.nan)
            body_states.append("unknown")
            forms.append("standard")

        # Draw overlay
        overlay = frame.copy()
        if res.pose_landmarks:
            mp_drawing.draw_landmarks(
                overlay,
                res.pose_landmarks,
                mp_pose.POSE_CONNECTIONS,
                landmark_drawing_spec=mp_styles.get_default_pose_landmarks_style())
        
        # ---------- HUD (dark multi-color, no TOTAL line) ----------
        onearm_c = int(form_counts.get("one-arm", 0))
        twoarm_c = int(form_counts.get("two-arm", 0))
        
        top_line  = f"FORM: {rep_form if 'rep_form' in locals() else (form_mode if forms else 'two-arm')}"
        rep_line  = f"REPS: {rep_num}"
        perc_line = f"LAST REP: {last_rep_score:.1f}%" if last_rep_score is not None else "LAST REP: --"
        tips_text = last_rep_feedback if last_rep_feedback else "TIPS: --"
        
        def wrap_line(s, max_chars=36):
            out, cur, count = [], [], 0
            for part in s.split():
                add = len(part) + (1 if cur else 0)
                if count + add > max_chars:
                    out.append(" ".join(cur)); cur=[part]; count=len(part)
                else:
                    cur.append(part); count += add
            if cur: out.append(" ".join(cur))
            return out or [s]
        
        tip_lines = wrap_line(f"TIPS: {tips_text}", max_chars=36)
        
        hud_lines = [
            (top_line,              (255, 0, 0)),  # cyan
            (rep_line,              (255, 0, 0)),  # cyan
            (perc_line,             (255, 0, 0)),  # cyan
            *[(line, (255, 0, 0)) for line in tip_lines],
            ("—",                   (120,120,120)),
            (f"ONE ARM: {onearm_c}",(128, 0, 128)),  # purple
            (f"TWO ARM: {twoarm_c}",(0, 128, 255)),  # orange-blue
        ]
        
        font = cv2.FONT_HERSHEY_SIMPLEX
        font_scale = 1
        thickness  = 3
        line_gap   = 10
        
        sizes = [cv2.getTextSize(s, font, font_scale, thickness)[0] for s, c in hud_lines]
        max_w = max(w for (w, h) in sizes)
        line_h = max(h for (w, h) in sizes) + line_gap
        block_w, block_h = max_w, line_h * len(hud_lines) - line_gap
        
        H, W = overlay.shape[:2]
        cx, cy = W // 2, int(H * 0.85)
        pad_x, pad_y = 20, 15
        
        box_left   = max(0, cx - (block_w // 2) - pad_x)
        box_right  = min(W-1, cx + (block_w // 2) + pad_x)
        box_top    = max(0, cy - (block_h // 2) - pad_y)
        box_bottom = min(H-1, cy + (block_h // 2) + pad_y)
        
        # outline only (dark gray)
        cv2.rectangle(overlay, (box_left, box_top), (box_right, box_bottom), (60,60,60), 2)
        
        start_y = box_top + pad_y + line_h - line_gap
        for (s, color), (tw, th) in zip(hud_lines, sizes):
            x = cx - tw // 2
            y = start_y
            cv2.putText(overlay, s, (x, y), font, font_scale, color, thickness, cv2.LINE_AA)
            start_y += line_h
        # ---------- end HUD ----------

        # Ensure frame matches writer size
        if overlay.shape[1] != out_w or overlay.shape[0] != out_h:
            overlay = cv2.resize(overlay, (out_w, out_h), interpolation=cv2.INTER_AREA)
        
        writer.write(overlay)

        if SHOW_WINDOW:
            cv2.imshow("Pushup Analyzer — LIVE", overlay)
            if (cv2.waitKey(1) & 0xFF) == ord('q'):
                break

        frame_idx += 1
finally:
    cap.release()
    writer.release()
    pose.close()

if SHOW_WINDOW:
    try:
        cv2.destroyAllWindows()
    except Exception:
        pass

# Build per-form summary and save CSV/JSON
df = pd.DataFrame(rep_rows)

if not df.empty:
    # Per-form counts and avg score
    summary = (
        df.groupby("form")
        .agg(count=("rep", "count"),
             avg_score=("rep_score", "mean"),
             avg_depth=("depth_score", "mean"),
             avg_lockout=("lockout_score", "mean"),
             avg_alignment=("alignment_score", "mean"))
        .reset_index()
    )
    summary["avg_score"] = summary["avg_score"].round(1)
    summary["avg_depth"] = summary["avg_depth"].round(1)
    summary["avg_lockout"] = summary["avg_lockout"].round(1)
    summary["avg_alignment"] = summary["avg_alignment"].round(1)
else:
    summary = pd.DataFrame(columns=["form", "count", "avg_score", "avg_depth", "avg_lockout", "avg_alignment"])

# Save artifacts
csv_path = "pushup_analysis.csv"
json_path = "pushup_summary.json"

df.to_csv(csv_path, index=False)

# Compute time in states
state_series = pd.Series(body_states)
state_counts = state_series.value_counts().to_dict()
state_seconds = {k: round(v / (fps if fps else 30.0), 2) for k, v in state_counts.items()}

session_summary = {
    "video_path": video_path,
    "output_video": out_path_mp4,
    "total_frames": total_frames,
    "fps": fps,
    "total_reps": int(df["rep"].max()) if not df.empty else 0,
    "state_time_seconds": state_seconds,
    "per_form_counts": summary.set_index("form")["count"].to_dict() if not summary.empty else {},
    "average_scores_by_form": summary.set_index("form")["avg_score"].to_dict() if not summary.empty else {},
}

with open(json_path, "w") as f:
    json.dump(session_summary, f, indent=2)

# Print console outputs (visible in the notebook cell output) so the user gets quick links/paths
print("Analysis complete.")
print(f"Overlay video: {out_path_mp4}")
print(f"Per-rep CSV: {csv_path}")
print(f"Session summary JSON: {json_path}")

NameError: name 'total_frames' is not defined