# **GymBro** - Computer Vision Based Exercise Form Analysis System
***- Developed by Chanitha Abeygunawardena***


GymBro is a computer vision based exercise form analysis and feedback system for strength training. The system currently focuses on bicep curls, where it uses pose estimation to track joint positions, evaluate movement patterns, and classify repetitions as correct or incorrect form. The system is also optimised for Google Colab to simplify setup and execution, while remaining fully compatible with local Python environments.

It detects common issues such as elbow drift, incomplete range of motion, and poor movement control, while also identifying properly executed repetitions. The architecture is exercise-agnostic, allowing the same pipeline to be extended to other exercises by defining new movement rules or integrating learned classifiers.

**Highlights**
- Environment-aware execution (Google Colab–optimised with local fallback)
- Reproducible pipeline using deterministic seeding and stratified data splits
- Primary subject tracking to ignore background bystanders
- Automated per-clip quality assurance and sanity checks
- Label-free, rule-driven exercise form classification with feedback generation
- Idempotent output handling for safe and stable re-runs

**Tech Stack**
- Python, OpenCV, MediaPipe Pose, YOLOv8, XGBoost, scikit-learn, NumPy, Pandas, joblib, Google Colab, Jupyter Notebook



In [1]:
# %% Environment detection

import os
import sys
import subprocess

# Detect Colab
try:
    import google.colab
    IN_COLAB = True
except ImportError:
    IN_COLAB = False

DRIVE_MOUNTED = False

if IN_COLAB:
    from google.colab import drive

    # Mount Drive only if not already mounted
    if not os.path.exists("/content/drive/MyDrive"):
        print("[info] Mounting Google Drive...")
        drive.mount("/content/drive")
    else:
        print("[ok] Google Drive already mounted.")

    DRIVE_MOUNTED = os.path.exists("/content/drive/MyDrive")

    if not DRIVE_MOUNTED:
        raise RuntimeError("Google Drive mount failed. Stop and remount manually.")

    # Install runtime dependencies (quiet, non-fatal)
    try:
        subprocess.run(
            [sys.executable, "-m", "pip", "install", "-q",
             "ultralytics", "opencv-python", "xgboost"],
            check=False
        )
        print("[ok] Dependencies ready.")
    except Exception as e:
        print("[warn] pip install failed:", e)
else:
    print("[info] Running locally. Skipping Drive mount and installs.")



[info] Mounting Google Drive...
Mounted at /content/drive
[ok] Dependencies ready.


In [2]:
# %% Imports and seeds
import os, sys, math, json, time, traceback, random
from pathlib import Path
from glob import glob
import numpy as np
import pandas as pd
import cv2

from ultralytics import YOLO
import joblib
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix
from xgboost import XGBClassifier

# Reproducibility
SEED = 42
random.seed(SEED)
np.random.seed(SEED)

# Torch/Device
try:
    import torch
    DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
except Exception:
    DEVICE = "cpu"

print("Device:", DEVICE)

Creating new Ultralytics Settings v0.0.6 file ✅ 
View Ultralytics Settings with 'yolo settings' or at '/root/.config/Ultralytics/settings.json'
Update Settings with 'yolo settings key=value', i.e. 'yolo settings runs_dir=path/to/dir'. For help see https://docs.ultralytics.com/quickstart/#ultralytics-settings.
Device: cuda


In [3]:
# %% Paths and directories
if 'IN_COLAB' not in globals():
    IN_COLAB = False

if IN_COLAB:
    BASE = Path("/content/drive/MyDrive/GymBro")
else:
    BASE = Path.cwd() / "GymBro"

DIRS = {
    "good_form_videos": BASE / "exercise_videos/good_form",
    "bad_form_videos": BASE / "exercise_videos/bad_form",
    "good_form_keypoints": BASE / "exercise_keypoints/good_form",
    "bad_form_keypoints": BASE / "exercise_keypoints/bad_form",
    "good_form_labeled": BASE / "exercise_labeled_videos/good_form",
    "bad_form_labeled": BASE / "exercise_labeled_videos/bad_form",
    "test_videos": BASE / "test_videos",
    "artifacts": BASE / "artifacts",
}
for p in DIRS.values():
    p.mkdir(parents=True, exist_ok=True)

MODEL_PATH = BASE / "form_classifier_model.pkl"
print("BASE:", BASE)

BASE: /content/drive/MyDrive/GymBro


In [4]:
# %% Config
CONF_TH = 0.6          # YOLO confidence threshold
MIN_AREA_RATIO = 0.08  # choose main person: min area of bbox vs frame
FRAME_SKIP = 1         # 1 = use every frame
FPS_FALLBACK = 25.0

# Rule thresholds
THR = {
    "TRUNK_SWING_STD_DEG": 7.5,   # torso angle std > 7.5 deg
    "CONST_LEAN_MEAN_DEG": 8.0,   # mean lean > 8 deg
    "ELBOWS_FORWARD_X": 0.04,     # elbow x forward (normalized) relative to shoulder
    "ELBOWS_FLARE_X": 0.05,       # elbow lateral distance from torso
    "SHOULDER_SHRUG_Y": 0.03,     # shoulder y range (normalized) rising with wrist
    "WRIST_ANGLE_DEG": 20.0,      # wrist bend > 20 deg from neutral
    "HALF_TOP_RATIO": 0.2,        # bottom 20% not reached
    "HALF_BOTTOM_RATIO": 0.8,     # top 80% not reached
    "HIP_Y_STD": 0.02,            # leg drive hip y std > 0.02
    "TORSO_TWIST_X_STD": 0.03,    # L/R shoulder x diff std
    "NECK_FWD_X": 0.03,           # head x forward relative to shoulders
    "ASYM_DELTA": 0.08,           # L vs R wrist y difference range
    "OUT_OF_SYNC_FRAMES": 4,      # phase lag frames
    "SPEED_BOUNCE_P95": 0.10,     # 95th percentile of |vel(wrist y)|
}

# Rich messages per issue
ISSUE_INFO = {
    "trunk_swing": {
        "why": "Using momentum when the weight is too heavy or the core is loose.",
        "risks": "Lower back strain and less tension on biceps.",
        "fix": "Use lighter weight, brace abs and glutes, keep ribs stacked over hips."
    },
    "elbows_forward": {
        "why": "Trying to shorten the lever by drifting elbows in front.",
        "risks": "Front delts take over, less biceps stimulus.",
        "fix": "Pin elbows by the ribs. Think hinge at the elbow only."
    },
    "elbows_flare": {
        "why": "Letting upper arms drift out to the sides.",
        "risks": "Shoulder internal rotation stress and poor biceps line of pull.",
        "fix": "Squeeze a towel in your armpits or stand closer to a wall."
    },
    "shoulder_shrug": {
        "why": "Compensating with traps as the weight rises.",
        "risks": "Neck tightness and upper trap overuse.",
        "fix": "Keep shoulders down and back. Think pockets heavy."
    },
    "wrist_extended": {
        "why": "Letting the wrist bend back as you curl.",
        "risks": "Wrist irritation and weaker force transfer.",
        "fix": "Keep wrists neutral, knuckles toward ceiling, crush the handle lightly."
    },
    "wrist_flexed": {
        "why": "Curling the wrist at the top to finish the rep.",
        "risks": "Forearm takes over, biceps tension drops.",
        "fix": "Keep the forearm and wrist in one line. Lead with elbows, not hands."
    },
    "half_top": {
        "why": "Avoiding the bottom range to make reps easier.",
        "risks": "Missed lengthened tension and smaller strength gains.",
        "fix": "Lower to near full elbow extension with control."
    },
    "half_bottom": {
        "why": "Stopping short of the top range.",
        "risks": "Less peak contraction and mind-muscle link.",
        "fix": "Lift to at least chest height and squeeze 0.5–1 s."
    },
    "leg_drive": {
        "why": "Knee dip to start the weight moving.",
        "risks": "Turns a curl into a cheat move, stresses knees and back.",
        "fix": "Soft knees but still. Brace and start with elbow flexion only."
    },
    "torso_twist": {
        "why": "Rotating to help a weaker arm or mask fatigue.",
        "risks": "Spine torque and uneven loading.",
        "fix": "Square hips and shoulders. Curl both sides evenly."
    },
    "neck_forward": {
        "why": "Chasing the weight with the head during effort.",
        "risks": "Neck strain and poor scapular position.",
        "fix": "Chin tucked. Keep back of head tall."
    },
    "asymmetric": {
        "why": "One arm stronger or drifting more than the other.",
        "risks": "Imbalances and joint stress on the dominant side.",
        "fix": "Lighten the load and match the weaker side’s range."
    },
    "out_of_sync": {
        "why": "Arms moving at different tempos.",
        "risks": "Uneven fatigue and sloppy technique.",
        "fix": "Use a metronome count. Up 2, down 2 together."
    },
    "speed_bounce": {
        "why": "Dropping fast and rebounding at the bottom.",
        "risks": "Elbow irritation and less time under tension.",
        "fix": "Control the bottom, slight pause, then curl."
    },
    "constant_lean": {
        "why": "Leaning back to shorten the lever or open the shoulder angle.",
        "risks": "Low back stress and delts taking over.",
        "fix": "Stand tall, ribs over hips, squeeze glutes lightly."
    },
}


In [5]:
# %% Helpers
VALID_EXTS = {".mp4",".MP4",".mov",".mkv",".avi"}

def list_videos(folder: Path):
    return sorted([p for p in folder.iterdir() if p.is_file() and p.suffix in VALID_EXTS])

def safe_keypoints_csv_for(video_path: Path, target_dir: Path) -> Path:
    return target_dir / f"{video_path.stem}_keypoints.csv"

def open_video_writer(out_path: Path, frame_w: int, frame_h: int, fps: float):
    if not fps or not np.isfinite(fps) or fps <= 0:
        fps = FPS_FALLBACK
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    writer = cv2.VideoWriter(str(out_path), fourcc, float(fps), (int(frame_w), int(frame_h)))
    return writer

def normalize_xy_df(df: pd.DataFrame):
    df = df.copy()
    if {"frame_width","frame_height"}.issubset(df.columns):
        w = df["frame_width"].replace(0, np.nan).fillna(df["frame_width"].replace(0, np.nan).median())
        h = df["frame_height"].replace(0, np.nan).fillna(df["frame_height"].replace(0, np.nan).median())
        df["x"] = df["x"] / w
        df["y"] = df["y"] / h
    return df

def extract_features_from_csv(csv_path: Path):
    if not csv_path.exists(): return None
    df = pd.read_csv(csv_path)
    if df.empty or not {"frame","keypoint_id","x","y"}.issubset(df.columns):
        return None
    df = normalize_xy_df(df)
    g = df.groupby(["frame","keypoint_id"])[["x","y"]].mean().unstack().sort_index(axis=1)
    feats_mean = g.mean(numeric_only=True).to_numpy()
    feats_std  = g.std(numeric_only=True).fillna(0).to_numpy()
    return np.r_[feats_mean, feats_std]

def pick_main_person(res, fw, fh, min_area_ratio=MIN_AREA_RATIO, conf_th=CONF_TH):
    if res.boxes is None or len(res.boxes) == 0:
        return None
    boxes = res.boxes.xyxy
    confs = res.boxes.conf
    boxes = boxes.detach().cpu().numpy()
    confs = confs.detach().cpu().numpy() if confs is not None else np.ones(len(boxes), dtype=np.float32)

    areas = (boxes[:,2]-boxes[:,0]) * (boxes[:,3]-boxes[:,1])
    cx = (boxes[:,0] + boxes[:,2]) / 2.0
    cy = (boxes[:,1] + boxes[:,3]) / 2.0

    cxf, cyf = fw/2.0, fh/2.0
    center_dist = np.sqrt((cx - cxf)**2 + (cy - cyf)**2)

    min_area = float(min_area_ratio) * float(fw) * float(fh)
    keep = [i for i,(a,c) in enumerate(zip(areas, confs)) if a >= min_area and c >= conf_th]
    if not keep:
        keep = [int(np.argmax(areas))]
    scores = [(areas[i] / (1.0 + center_dist[i])) for i in keep]
    return keep[int(np.argmax(scores))]

In [6]:
# %% Geometry helpers for detectors (COCO 17-keypoint layout)
KP = {
    "nose":0, "le":1, "re":2, "lear":3, "rear":4,
    "lsh":5, "rsh":6, "leb":7, "reb":8, "lwr":9, "rwr":10,
    "lhip":11, "rhip":12, "lknee":13, "rknee":14, "lank":15, "rank":16
}

def angle(a, b, c):
    ab = a - b; cb = c - b
    denom = (np.linalg.norm(ab) * np.linalg.norm(cb)) + 1e-8
    cosv = np.clip(np.dot(ab, cb) / denom, -1.0, 1.0)
    return np.arccos(cosv)

def torso_angle_deg(L_sh, R_sh, L_hip, R_hip):
    sh = (L_sh + R_sh) / 2.0
    hip = (L_hip + R_hip) / 2.0
    v = sh - hip
    return np.degrees(np.arctan2(abs(v[0]), abs(v[1]) + 1e-8))

def vel(series):
    if len(series) < 2: return np.zeros_like(series)
    d = np.diff(series)
    return np.r_[d, d[-1]]

def phase_lag_frames(x, y, max_lag=30):
    lags = range(-max_lag, max_lag+1)
    scores = []
    x = np.asarray(x); y = np.asarray(y)
    x = (x - np.mean(x)) / (np.std(x) + 1e-8)
    y = (y - np.mean(y)) / (np.std(y) + 1e-8)
    for L in lags:
        if L < 0: s = np.dot(x[:L], y[-L:])
        elif L > 0: s = np.dot(x[L:], y[:-L])
        else: s = np.dot(x, y)
        scores.append(s)
    return lags[int(np.argmax(scores))]

def segment_reps(wrist_y, min_gap=8):
    v = vel(wrist_y)
    troughs = np.where((np.r_[0, v[:-1]] < 0) & (v >= 0))[0]
    keep = []
    last = -1e9
    for t in troughs:
        if t - last >= min_gap:
            keep.append(int(t)); last = t
    return keep

def load_keypoints_series(csv_path: Path):
    df = pd.read_csv(csv_path)
    if df.empty: return None
    df = normalize_xy_df(df)
    g = df.pivot_table(index="frame", columns="keypoint_id", values=["x","y"], aggfunc="mean")
    g = g.sort_index().ffill().bfill().fillna(0)
    def pt(kpid):
        return np.stack([g["x"][kpid].to_numpy(), g["y"][kpid].to_numpy()], axis=1)
    series = {
        "L_sh": pt(KP["lsh"]), "R_sh": pt(KP["rsh"]),
        "L_el": pt(KP["leb"]), "R_el": pt(KP["reb"]),
        "L_wr": pt(KP["lwr"]), "R_wr": pt(KP["rwr"]),
        "L_hip": pt(KP["lhip"]), "R_hip": pt(KP["rhip"]),
        "nose": pt(KP["nose"]),
    }
    series["wrist_y_L"] = series["L_wr"][:,1]; series["wrist_y_R"] = series["R_wr"][:,1]
    series["shoulder_y_mean"] = (series["L_sh"][:,1] + series["R_sh"][:,1]) / 2.0
    series["hip_y_mean"] = (series["L_hip"][:,1] + series["R_hip"][:,1]) / 2.0
    series["shoulder_x_diff"] = (series["L_sh"][:,0] - series["R_sh"][:,0])
    series["head_rel_x"] = series["nose"][:,0] - (series["L_sh"][:,0] + series["R_sh"][:,0]) / 2.0
    return series

def smooth(arr, win=7):
    win = max(3, int(win) | 1)  # force odd >=3
    k = np.ones(win, dtype=np.float32) / float(win)
    return np.convolve(arr, k, mode="same")


def compute_metrics(series):
    """
    Compute clip-level metrics reused by issue detectors and positives.
    Includes robust neck-forward features (smoothed, baseline-referenced).
    Expects normalized series from load_keypoints_series(...).
    Uses global THR for HALF_TOP_RATIO / HALF_BOTTOM_RATIO.
    """
    import numpy as np

    # small, local smoother to avoid external deps
    def _smooth(arr, win=7):
        win = max(3, int(win) | 1)  # odd >= 3
        k = np.ones(win, dtype=np.float32) / float(win)
        return np.convolve(arr, k, mode="same")

    # torso angle across frames (deg from vertical)
    ta = np.array([
        torso_angle_deg(series["L_sh"][i], series["R_sh"][i],
                        series["L_hip"][i], series["R_hip"][i])
        for i in range(len(series["L_sh"]))
    ])

    # wrist signals (y), velocities, ROM thresholds
    wyL, wyR = series["wrist_y_L"], series["wrist_y_R"]
    wy = (wyL + wyR) / 2.0
    vy = np.abs(vel(wy))
    wy_min, wy_max = float(np.min(wy)), float(np.max(wy))
    wy_range = wy_max - wy_min + 1e-8
    bottom_thresh = wy_min + THR["HALF_TOP_RATIO"] * wy_range
    top_thresh    = wy_min + THR["HALF_BOTTOM_RATIO"] * wy_range

    # elbow/shoulder lateral + forward metrics (x)
    el_fw = np.mean(
        (series["L_el"][:, 0] + series["R_el"][:, 0]) / 2.0
        - (series["L_sh"][:, 0] + series["R_sh"][:, 0]) / 2.0
    )
    el_flare = 0.5 * (
        np.mean(np.abs(series["L_el"][:, 0] - series["L_sh"][:, 0])) +
        np.mean(np.abs(series["R_el"][:, 0] - series["R_sh"][:, 0]))
    )

    # smoothed head-forward metric (baseline from first 10%)
    head_rel_x_raw = series["head_rel_x"]
    head_rel_x_sm  = _smooth(head_rel_x_raw, win=7)
    n = len(head_rel_x_sm)
    base_n = max(5, int(0.1 * n))
    baseline = float(np.median(head_rel_x_sm[:base_n])) if base_n > 0 else float(np.median(head_rel_x_sm))
    head_forward = np.maximum(0.0, head_rel_x_sm - baseline)  # forward-only above baseline

    # correlation of head motion with wrist rise
    corr_head_wy = float(np.corrcoef(head_rel_x_sm, wy)[0, 1]) if n > 1 else 0.0

    # other aggregates
    shoulder_y = series["shoulder_y_mean"]
    corr_shy_wy = float(np.corrcoef(shoulder_y, wy)[0, 1]) if len(shoulder_y) > 1 else 0.0

    m = {
        # torso control
        "ta_std": float(np.std(ta)),
        "ta_mean": float(np.mean(ta)),

        # shoulder/wrist coupling
        "shoulder_y_range": float(np.max(shoulder_y) - np.min(shoulder_y)),
        "corr_shy_wy": corr_shy_wy,

        # wrist bend proxies (COCO hand limitation)
        "wr_bend_L": float(np.std(series["L_wr"][:, 0] - series["L_el"][:, 0])),
        "wr_bend_R": float(np.std(series["R_wr"][:, 0] - series["R_el"][:, 0])),

        # ROM info
        "wy_min": wy_min,
        "wy_max": wy_max,
        "wy_range": wy_range,
        "bottom_thresh": float(bottom_thresh),
        "top_thresh": float(top_thresh),

        # hip stability (leg drive)
        "hip_y_std": float(np.std(series["hip_y_mean"])),

        # torso twist
        "shoulder_x_diff_std": float(np.std(series["shoulder_x_diff"])),

        # asymmetry and timing
        "asym_max": float(np.max(np.abs(wyL - wyR))),
        "lag_frames": int(abs(phase_lag_frames(wyL, wyR, max_lag=30))),

        # speed/bounce
        "vy_p95": float(np.percentile(vy, 95)),

        # elbow forward/flare
        "el_fw": float(el_fw),
        "el_flare": float(el_flare),

        # improved neck-forward metrics
        "head_forward_mean": float(np.mean(head_forward)),
        "head_forward_p90":  float(np.percentile(head_forward, 90)),
        "corr_head_wy":      corr_head_wy,
    }
    return m

def detect_issues_with_scores(metrics, clip_name: str = ""):
    """
    Return [{'key','score','reason'}, ...] for issues that fire.
    Uses thresholds in THR and metrics from compute_metrics(...).
    clip_name is optional and helps make neck_forward stricter on front45 shots.
    """
    M, T = metrics, THR
    out = []

    def add(key, val, thr, bigger_is_bad=True, reason=""):
        if bigger_is_bad:
            fired = val > thr
            gap = (val - thr) / (abs(thr) + 1e-8)
        else:
            fired = val < thr
            gap = (thr - val) / (abs(thr) + 1e-8)
        if fired:
            conf = max(0.0, min(1.0, 0.5 + 0.5 * gap))  # simple 0..1 confidence
            out.append({"key": key, "score": float(conf), "reason": reason or f"{key}: {val:.4f} vs {thr:.4f}"})

    # Core posture and ROM faults
    add("trunk_swing", M["ta_std"], T["TRUNK_SWING_STD_DEG"], True, "Torso angle varies a lot across reps.")
    add("constant_lean", M["ta_mean"], T["CONST_LEAN_MEAN_DEG"], True, "Back is leaned behind vertical most of the set.")
    add("elbows_forward", M["el_fw"], T["ELBOWS_FORWARD_X"], True, "Elbows drift forward relative to shoulder center.")
    add("elbows_flare", M["el_flare"], T["ELBOWS_FLARE_X"], True, "Elbows sit laterally away from torso.")

    # Shoulder shrug needs both coupling and range
    if M.get("corr_shy_wy", 0.0) > 0.5 and M.get("shoulder_y_range", 0.0) > T["SHOULDER_SHRUG_Y"]:
        out.append({"key": "shoulder_shrug", "score": 0.6, "reason": "Shoulders rise with the curl and move a fair amount."})

    # Wrist bend proxy (COCO has no hand angle, so use x spread vs elbow)
    if max(M["wr_bend_L"], M["wr_bend_R"]) > (T["WRIST_ANGLE_DEG"] / 100.0):
        out.append({"key": "wrist_extended", "score": 0.5, "reason": "Wrist position deviates from forearm line often."})
        out.append({"key": "wrist_flexed", "score": 0.5, "reason": "Wrist position deviates from forearm line often."})

    # ROM faults
    add("half_top", M["wy_min"], M["bottom_thresh"], True, "Does not reach near the bottom range.")
    add("half_bottom", M["wy_max"], M["top_thresh"], False, "Does not reach near the top range.")

    # Leg drive and twist
    add("leg_drive", M["hip_y_std"], T["HIP_Y_STD"], True, "Hips bounce up and down across reps.")
    add("torso_twist", M["shoulder_x_diff_std"], T["TORSO_TWIST_X_STD"], True, "Shoulders rotate left-right across reps.")

    # Neck forward — stricter, baseline referenced, angle aware
    neck_thr = T["NECK_FWD_X"]
    if "front45" in str(clip_name).lower():
        neck_thr *= 1.3  # parallax bump for 45° front

    cond_amt = (M["head_forward_mean"] > neck_thr) or (M["head_forward_p90"] > neck_thr * 1.15)
    cond_corr = (M["corr_head_wy"] > 0.25)  # head moves with the curl
    cond_posture = (M["ta_mean"] < T["CONST_LEAN_MEAN_DEG"] * 0.8)  # not already leaned back a lot

    if cond_amt and cond_corr and cond_posture:
        gap_mean = (M["head_forward_mean"] - neck_thr) / (abs(neck_thr) + 1e-8)
        gap_p90  = (M["head_forward_p90"]  - neck_thr * 1.15) / (abs(neck_thr * 1.15) + 1e-8)
        gap = max(gap_mean, gap_p90)
        conf = max(0.0, min(1.0, 0.5 + 0.5 * gap))
        out.append({
            "key": "neck_forward",
            "score": float(conf),
            "reason": f"head_forward_mean={M['head_forward_mean']:.3f}, p90={M['head_forward_p90']:.3f}, "
                      f"corr_head_wy={M['corr_head_wy']:.2f}, thr={neck_thr:.3f}"
        })

    # Asymmetry, timing, and bounce
    add("asymmetric", M["asym_max"], T["ASYM_DELTA"], True, "Left vs right wrist heights diverge.")
    add("out_of_sync", M["lag_frames"], T["OUT_OF_SYNC_FRAMES"], True, "Arms are out of phase in timing.")
    add("speed_bounce", M["vy_p95"], T["SPEED_BOUNCE_P95"], True, "Wrist speed spikes at the bottom.")

    # Deduplicate by best score
    best = {}
    for d in out:
        k = d["key"]
        if k not in best or d["score"] > best[k]["score"]:
            best[k] = d
    return list(best.values())

def detect_positives(metrics):
    """Simple ‘good’ cues to praise when thresholds are comfortably met."""
    M, T = metrics, THR
    positives = []

    if M["ta_std"] < (T["TRUNK_SWING_STD_DEG"] * 0.6):
        positives.append("Stable torso — minimal swing.")
    if M["ta_mean"] < (T["CONST_LEAN_MEAN_DEG"] * 0.6):
        positives.append("Upright posture — no lean back.")
    if M["el_flare"] < (T["ELBOWS_FLARE_X"] * 0.6):
        positives.append("Elbows stay close to torso.")
    if M["el_fw"] < (T["ELBOWS_FORWARD_X"] * 0.6):
        positives.append("Elbows don’t drift forward.")
    if M["wy_min"] <= M["bottom_thresh"] and M["wy_max"] >= M["top_thresh"]:
        positives.append("Full range — reaches both bottom and top.")
    if M["hip_y_std"] < (T["HIP_Y_STD"] * 0.5):
        positives.append("No leg drive — hips stable.")
    if M["shoulder_x_diff_std"] < (T["TORSO_TWIST_X_STD"] * 0.6):
        positives.append("Square torso — no twist.")
    if M["asym_max"] < (T["ASYM_DELTA"] * 0.6):
        positives.append("Left and right arms stay even.")
    if M["lag_frames"] <= max(1, T["OUT_OF_SYNC_FRAMES"]//2):
        positives.append("Good timing — arms move together.")
    if M["vy_p95"] < (T["SPEED_BOUNCE_P95"] * 0.7):
        positives.append("Controlled tempo — no bounce.")

    return positives[:6]  # keep it concise

In [7]:
# %% Rule-based detectors
def detect_issues(series):
    issues = []

    ta = np.array([torso_angle_deg(series["L_sh"][i], series["R_sh"][i], series["L_hip"][i], series["R_hip"][i])
                   for i in range(len(series["L_sh"]))])
    ta_std = np.std(ta); ta_mean = np.mean(ta)

    wyL, wyR = series["wrist_y_L"], series["wrist_y_R"]
    wy = (wyL + wyR) / 2.0
    vy = np.abs(vel(wy))
    vy_p95 = np.percentile(vy, 95)

    wy_min, wy_max = float(np.min(wy)), float(np.max(wy))
    wy_range = wy_max - wy_min + 1e-8
    bottom_thresh = wy_min + THR["HALF_TOP_RATIO"] * wy_range
    top_thresh = wy_min + THR["HALF_BOTTOM_RATIO"] * wy_range

    shoulder_y = series["shoulder_y_mean"]
    hip_y = series["hip_y_mean"]

    lag = abs(phase_lag_frames(wyL, wyR, max_lag=30))
    asym = np.max(np.abs((wyL - wyR)))

    if ta_std > THR["TRUNK_SWING_STD_DEG"]:
        issues.append("trunk_swing")
    if ta_mean > THR["CONST_LEAN_MEAN_DEG"] and ta_std < THR["TRUNK_SWING_STD_DEG"]/2.0:
        issues.append("constant_lean")

    el_fw = np.mean((series["L_el"][:,0] + series["R_el"][:,0]) / 2.0 - (series["L_sh"][:,0] + series["R_sh"][:,0]) / 2.0)
    if el_fw > THR["ELBOWS_FORWARD_X"]:
        issues.append("elbows_forward")
    el_flare = np.mean(np.abs(series["L_el"][:,0] - series["L_sh"][:,0])) + np.mean(np.abs(series["R_el"][:,0] - series["R_sh"][:,0]))
    if el_flare/2.0 > THR["ELBOWS_FLARE_X"]:
        issues.append("elbows_flare")

    corr = np.corrcoef(shoulder_y, wy)[0,1] if len(shoulder_y) > 1 else 0.0
    if corr > 0.5 and (np.max(shoulder_y) - np.min(shoulder_y)) > THR["SHOULDER_SHRUG_Y"]:
        issues.append("shoulder_shrug")

    wr_bend_L = np.std(series["L_wr"][:,0] - series["L_el"][:,0])
    wr_bend_R = np.std(series["R_wr"][:,0] - series["R_el"][:,0])
    if wr_bend_L > THR["WRIST_ANGLE_DEG"]/100.0 or wr_bend_R > THR["WRIST_ANGLE_DEG"]/100.0:
        issues += ["wrist_extended","wrist_flexed"]

    if np.min(wy) > bottom_thresh:
        issues.append("half_top")
    if np.max(wy) < top_thresh:
        issues.append("half_bottom")

    if np.std(hip_y) > THR["HIP_Y_STD"]:
        issues.append("leg_drive")

    if np.std(series["shoulder_x_diff"]) > THR["TORSO_TWIST_X_STD"]:
        issues.append("torso_twist")

    if np.mean(series["head_rel_x"]) > THR["NECK_FWD_X"]:
        issues.append("neck_forward")

    if asym > THR["ASYM_DELTA"]:
        issues.append("asymmetric")
    if lag > THR["OUT_OF_SYNC_FRAMES"]:
        issues.append("out_of_sync")

    if vy_p95 > THR["SPEED_BOUNCE_P95"]:
        issues.append("speed_bounce")

    return sorted(list(set(issues)))

In [8]:
# %% Load pose model
pose_model = YOLO("yolov8n-pose.pt")
pose_model.to(DEVICE)
print("Loaded pose model on:", DEVICE)

[KDownloading https://github.com/ultralytics/assets/releases/download/v8.4.0/yolov8n-pose.pt to 'yolov8n-pose.pt': 100% ━━━━━━━━━━━━ 6.5MB 103.0MB/s 0.1s
Loaded pose model on: cuda


In [9]:
# %% Training data processing (extract keypoints + labeled renders)
def process_folder(src_folder: Path, out_label_folder: Path, out_csv_folder: Path, skip_existing=True):
    videos = list_videos(src_folder)
    print(f"[info] Found {len(videos)} videos in {src_folder}")
    for vp in videos:
        try:
            out_csv = safe_keypoints_csv_for(vp, out_csv_folder)
            out_video = out_label_folder / f"output_{vp.name}"
            if skip_existing and out_csv.exists() and out_video.exists():
                print(f"[skip] {vp.name}")
                continue

            cap = cv2.VideoCapture(str(vp))
            if not cap.isOpened():
                print(f"[warn] Cannot open {vp}")
                continue

            fw = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
            fh = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
            fps = cap.get(cv2.CAP_PROP_FPS)

            writer = open_video_writer(out_video, fw, fh, fps)

            frame_num = 0
            rows = []

            while True:
                ok, frame = cap.read()
                if not ok: break
                if frame_num % FRAME_SKIP != 0:
                    frame_num += 1
                    continue

                results = pose_model(frame, verbose=False, conf=CONF_TH, device=0 if DEVICE=="cuda" else None)
                res = results[0]
                writer.write(res.plot())

                idx = pick_main_person(res, fw, fh)
                kps = res.keypoints
                if idx is not None and kps is not None and len(kps.xy) > idx:
                    xs = kps.xy[idx][:,0]; ys = kps.xy[idx][:,1]
                    confs = kps.conf[idx] if hasattr(kps, "conf") and kps.conf is not None else None
                    for kid, (x, y) in enumerate(zip(xs, ys)):
                        row = {'frame': frame_num, 'keypoint_id': kid, 'x': float(x), 'y': float(y),
                               'frame_width': fw, 'frame_height': fh}
                        if confs is not None and len(confs) > kid:
                            row['confidence'] = float(confs[kid])
                        rows.append(row)

                frame_num += 1

            cap.release()
            writer.release()

            pd.DataFrame(rows).to_csv(out_csv, index=False)
            print(f"[ok] {vp.name}: wrote {out_csv.name} and {out_video.name}")
        except Exception as e:
            print(f"[error] {vp.name}: {e}")
            traceback.print_exc()

# Run for both classes
process_folder(DIRS['good_form_videos'], DIRS['good_form_labeled'], DIRS['good_form_keypoints'])
process_folder(DIRS['bad_form_videos'],  DIRS['bad_form_labeled'],  DIRS['bad_form_keypoints'])

[info] Found 32 videos in /content/drive/MyDrive/GymBro/exercise_videos/good_form
[skip] good_bottom-pause_front45_01.mov
[skip] good_bottom-pause_front45_02.mov
[skip] good_bottom-pause_side_01.mov
[skip] good_bottom-pause_side_02.mov
[skip] good_constant-tension_front45_01.mov
[skip] good_constant-tension_front45_02.mov
[skip] good_constant-tension_side_01.mov
[skip] good_constant-tension_side_02.mov
[skip] good_even-tempo_front45_01.mov
[skip] good_even-tempo_front45_02.mov
[skip] good_even-tempo_side_01.mov
[skip] good_even-tempo_side_02.mov
[skip] good_light-load_front45_01.mov
[skip] good_light-load_front45_02.mov
[skip] good_light-load_side_01.mov
[skip] good_light-load_side_02.mov
[skip] good_mod-load_front45_01.mov
[skip] good_mod-load_front45_02.mov
[skip] good_mod-load_side_01.mov
[skip] good_mod-load_side_02.mov
[skip] good_slow-ecc_front45_01.mov
[skip] good_slow-ecc_front45_02.mov
[skip] good_slow-ecc_side_01.mov
[skip] good_slow-ecc_side_02.mov
[skip] good_strict_front45

In [10]:
# %% QA checks
def qa_scan(key_csv_dir: Path):
    issues = []
    for csv in sorted(key_csv_dir.glob("*.csv")):
        df = pd.read_csv(csv)
        if df.empty:
            issues.append((csv.name, "empty")); continue
        frames = df["frame"].nunique()
        if frames < 30:
            issues.append((csv.name, f"short_clip_{frames}f"))
        if df["keypoint_id"].nunique() < 12:
            issues.append((csv.name, "few_keypoints"))
    return issues

qa_problems = qa_scan(DIRS["good_form_keypoints"]) + qa_scan(DIRS["bad_form_keypoints"])
print("QA problems:", qa_problems if qa_problems else "none")

QA problems: none


In [11]:
# %% Train classifier
X, y = [], []
for label, folder in enumerate([DIRS['good_form_keypoints'], DIRS['bad_form_keypoints']]):
    for csv in sorted(folder.glob("*.csv")):
        feats = extract_features_from_csv(csv)
        if feats is not None:
            X.append(feats); y.append(label)

X = np.array(X, dtype=np.float32)
y = np.array(y, dtype=np.int64)
print(f"[info] samples={len(X)}, bad={int((y==1).sum())}, good={int((y==0).sum())}")

if len(np.unique(y)) < 2:
    raise RuntimeError("Both classes required to train. Add clips to good_form and bad_form.")

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=SEED, stratify=y)

clf = XGBClassifier(
    random_state=SEED,
    n_estimators=250,
    learning_rate=0.1,
    max_depth=4,
    subsample=0.9,
    colsample_bytree=0.9,
    n_jobs=2,
    verbosity=0,
)
clf.fit(X_train, y_train)

y_pred = clf.predict(X_test)
print(classification_report(y_test, y_pred, target_names=["Good", "Bad"]))
print("Confusion Matrix:\n", confusion_matrix(y_test, y_pred))

MODEL_PATH.parent.mkdir(parents=True, exist_ok=True)
joblib.dump(clf, MODEL_PATH)
print(f"[ok] saved model -> {MODEL_PATH}")

[info] samples=93, bad=61, good=32
              precision    recall  f1-score   support

        Good       1.00      1.00      1.00         7
         Bad       1.00      1.00      1.00        12

    accuracy                           1.00        19
   macro avg       1.00      1.00      1.00        19
weighted avg       1.00      1.00      1.00        19

Confusion Matrix:
 [[ 7  0]
 [ 0 12]]
[ok] saved model -> /content/drive/MyDrive/GymBro/form_classifier_model.pkl


In [12]:
# %% Inference on one test video + detailed issue breakdown
import sys

TEST_NAME = str(input("Enter File Name: "))  # e.g., test_clip.mov/mp4 in GymBro/test_videos
test_video = DIRS['test_videos'] / TEST_NAME

# simple existence check (no crash)
if not test_video.exists() or not test_video.is_file():
    print(f"[error] File not found in test_videos: {TEST_NAME}")
    sys.exit(0)  # stop the cell quietly

cap = cv2.VideoCapture(str(test_video))
if not cap.isOpened():
    print(f"[error] Cannot open {test_video.name}")
    sys.exit(0)

fw = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)); fh = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = cap.get(cv2.CAP_PROP_FPS) or FPS_FALLBACK
out_video = DIRS['test_videos'] / f"output_{test_video.name}"
writer = open_video_writer(out_video, fw, fh, fps)

rows = []
frame_num = 0
while True:
    ok, frame = cap.read()
    if not ok:
        break
    results = pose_model(frame, verbose=False, conf=CONF_TH, device=0 if DEVICE == "cuda" else None)
    res = results[0]
    writer.write(res.plot())

    idx = pick_main_person(res, fw, fh)
    kps = res.keypoints
    if idx is not None and kps is not None and len(kps.xy) > idx:
        xs = kps.xy[idx][:, 0]; ys = kps.xy[idx][:, 1]
        confs = kps.conf[idx] if hasattr(kps, "conf") and kps.conf is not None else None
        for kid, (x, y) in enumerate(zip(xs, ys)):
            row = {
                "frame": frame_num,
                "keypoint_id": kid,
                "x": float(x),
                "y": float(y),
                "frame_width": fw,
                "frame_height": fh,
            }
            if confs is not None and len(confs) > kid:
                row["confidence"] = float(confs[kid])
            rows.append(row)
    frame_num += 1

cap.release(); writer.release()

out_csv = DIRS['test_videos'] / f"{test_video.stem}_keypoints.csv"
pd.DataFrame(rows).to_csv(out_csv, index=False)
print(f"[ok] wrote {out_csv.name} and {out_video.name}")

# --- Classifier verdict (Good/Bad) ---
clf = joblib.load(MODEL_PATH)
feats = extract_features_from_csv(out_csv)
if feats is None:
    print("[error] No keypoints captured; try better lighting or move closer.")
    sys.exit(0)
pred = clf.predict(np.array([feats], dtype=np.float32))[0]
verdict_cls = "Good" if pred == 0 else "Bad"

# Rule-based breakdown
series = load_keypoints_series(out_csv)
if series is None:
    print("[error] No keypoints for analysis. Check lighting and framing.")
    sys.exit(0)

metrics = compute_metrics(series)
issues = detect_issues_with_scores(metrics)
positives = detect_positives(metrics)

issues_sorted = sorted(issues, key=lambda d: d["score"], reverse=True)
strong_issues = sum(1 for d in issues_sorted if d["score"] >= 0.7)

verdict_final = verdict_cls
if verdict_cls == "Good" and strong_issues >= 2:
    verdict_final = "Bad"

print("\nForm Diagnosis")
print(f"{verdict_final} form  —  classifier={verdict_cls}, strong_issues={strong_issues}")

print("\nPositive Points:")
if positives:
    for s in positives:
        print(" •", s)
else:
    print(" • Keep working toward stable torso, full range, and even arms.")

print("\nNegative Points:")
if issues_sorted:
    for d in issues_sorted[:6]:
        key = d["key"]
        info = ISSUE_INFO.get(key, {"why": "", "risks": "", "fix": ""})
        print(f" • {key.replace('_', ' ')}  (confidence {d['score']:.2f})")
        if info["why"]:
            print("    Why This Happened:   ", info["why"])
        if info["risks"]:
            print("    Possible Side-Effects: ", info["risks"])
        if info["fix"]:
            print("    How to Fix:   ", info["fix"])
else:
    print(" • No clear faults flagged by the rule layer.")

report = {
    "clip": str(test_video.name),
    "classifier_verdict": verdict_cls.lower(),
    "final_verdict": verdict_final.lower(),
    "positives": positives,
    "issues": [
        {
            "key": d["key"],
            "confidence": d["score"],
            "why": ISSUE_INFO.get(d["key"], {}).get("why", ""),
            "risks": ISSUE_INFO.get(d["key"], {}).get("risks", ""),
            "fix": ISSUE_INFO.get(d["key"], {}).get("fix", ""),
            "reason_metric": d["reason"],
        } for d in issues_sorted
    ],
    "metrics_sample": metrics,
}
rep_path = DIRS["artifacts"] / f"{test_video.stem}_report.json"
with open(rep_path, "w") as f:
    json.dump(report, f, indent=2)
print(f"\n[ok] saved detailed report -> {rep_path}")


Enter File Name: Test.mov
[ok] wrote Test_keypoints.csv and output_Test.mov

Form Diagnosis
Good form  —  classifier=Good, strong_issues=0

Positive Points:
 • Stable torso — minimal swing.
 • Upright posture — no lean back.
 • Elbows stay close to torso.
 • Elbows don’t drift forward.
 • Full range — reaches both bottom and top.
 • No leg drive — hips stable.

Negative Points:
 • No clear faults flagged by the rule layer.

[ok] saved detailed report -> /content/drive/MyDrive/GymBro/artifacts/Test_report.json
