
# Push-up Form Checker — **All-in-One Notebook** (Prep + Train + Pose Feedback + Classifier Integration)

This notebook combines:
1. **Dataset Prep**: Reads Kaggle push-up dataset lists (`correct.txt`, `incorrect.txt`), splits by video, extracts frames to YOLO classification folders.
2. **Classifier Training**: Trains an Ultralytics YOLO **classification** model (`correct` vs `incorrect`).
3. **Evaluation**: Prints a classification report and saves a confusion matrix.
4. **Rep Counter (YOLO-Pose)**: Uses YOLO **pose** to detect keypoints and count reps, with *minor feedback* (e.g., depth, hip sag, elbow flare).
5. **Classifier Integration**: When a rep completes, grabs a few frames and runs the trained classifier to tag the rep as `correct/incorrect`.

> You can run sections independently. The rep counter works with **webcam** or **video file**.


In [5]:

# === 0. Installs (uncomment if needed) =======================================
%pip install ultralytics opencv-python numpy pandas scikit-learn matplotlib pyyaml tqdm


Note: you may need to restart the kernel to use updated packages.



[notice] A new release of pip is available: 25.2 -> 25.3
[notice] To update, run: python.exe -m pip install --upgrade pip


In [6]:

# === 1. Imports & Config =====================================================
from ultralytics import YOLO
import cv2, os, yaml, math, json, time, random
import numpy as np
import pandas as pd
from pathlib import Path
from tqdm import tqdm
from collections import deque, defaultdict
from sklearn.metrics import classification_report, confusion_matrix
import matplotlib.pyplot as plt

# ---------------------- USER CONFIG ------------------------------------------
# Set your Kaggle dataset root (where videos + labels/* live):
DATASET_DIR = "C:/Data/hboict/Sem7-AIFS/Personal_Project"   # <-- CHANGE THIS

# Relative paths to the two label files (relative to DATASET_DIR):
CORRECT_FILE = "labels/correct.txt"
INCORRECT_FILE = "labels/incorrect.txt"

# Where extracted frames and splits will live:
OUTPUT_FRAMES_DIR = "data/frames"

# Split ratios (by video), fixed seed for reproducibility:
SPLIT = {"train": 0.7, "val": 0.15, "test": 0.15}
SEED = 42

# Frame extraction:
FPS = 2            # frames per second
IMGSZ = 224        # classifier input size

# Classifier training defaults:
CLASSIFIER_MODEL = "yolov8n-cls.pt"  # can try yolov8s-cls.pt later
EPOCHS = 40
BATCH = 64
LR0 = 0.01
WEIGHT_DECAY = 0.0005

# Where to save YOLO runs:
RUNS_DIR = "runs"
RUN_NAME = "pushup-cls4"
BEST_WEIGHTS = f"{RUNS_DIR}/{RUN_NAME}/weights/best.pt"


# Pose model for rep counter:
POSE_WEIGHTS = "yolov8n-pose.pt"     # small and fast; try s/m for accuracy
CONF_THRES = 0.5
IOU_THRES = 0.5

# Rep counting & feedback:
FRAMES_PER_REP_FOR_CLASSIFY = 7
VOTE_STRATEGY = "majority"  # or "avg_confidence"
USE_WEBCAM = True
VIDEO_SOURCE = 0            # webcam index or "/path/to/video.mp4"
FRAME_SKIP = 1              # process every nth frame
SHOW_WINDOW = True
DRAW_OVERLAY = True
SAVE_OUTPUT = False
OUTPUT_VIDEO = "pushup_session_with_labels.mp4"
SESSION_DIR = "sessions"

os.makedirs(SESSION_DIR, exist_ok=True)
os.makedirs(OUTPUT_FRAMES_DIR, exist_ok=True)



## 2. Dataset Discovery + Split (by video)
Reads the two label files (correct/incorrect), builds a dataframe, and assigns each **video** to train/val/test.


In [3]:

def read_list(p: Path):
    with open(p, "r") as f:
        return [ln.strip() for ln in f if ln.strip()]

def discover_videos(dataset_dir: Path, correct_file: str, incorrect_file: str):
    correct_videos = read_list(dataset_dir / correct_file)
    incorrect_videos = read_list(dataset_dir / incorrect_file)
    rows = []
    for v in correct_videos:
        rows.append({"path": str(dataset_dir / v), "label": "correct", "video": Path(v).stem})
    for v in incorrect_videos:
        rows.append({"path": str(dataset_dir / v), "label": "incorrect", "video": Path(v).stem})
    df = pd.DataFrame(rows)
    missing = df[~df["path"].apply(lambda p: Path(p).exists())]
    if not missing.empty:
        print("WARNING: Some listed videos are missing on disk:\n", missing)
    return df

def split_by_video(df: pd.DataFrame, split_cfg: dict, seed: int = 42):
    random.seed(seed)
    videos = df["video"].unique().tolist()
    random.shuffle(videos)
    n = len(videos)
    n_train = int(split_cfg["train"] * n)
    n_val = int(split_cfg["val"] * n)
    train_videos = set(videos[:n_train])
    val_videos = set(videos[n_train:n_train+n_val])
    test_videos = set(videos[n_train+n_val:])

    def assign_split(v):
        if v in train_videos: return "train"
        if v in val_videos: return "val"
        return "test"

    df = df.copy()
    df["split"] = df["video"].apply(assign_split)
    return df

dataset_dir = Path(DATASET_DIR)
df = discover_videos(dataset_dir, CORRECT_FILE, INCORRECT_FILE)
df = split_by_video(df, split_cfg=SPLIT, seed=SEED)
df.head()


Unnamed: 0,path,label,video,split
0,C:\Data\hboict\Sem7-AIFS\Personal_Project\Corr...,correct,Copy of push up 1,val
1,C:\Data\hboict\Sem7-AIFS\Personal_Project\Corr...,correct,Copy of push up 1,val
2,C:\Data\hboict\Sem7-AIFS\Personal_Project\Corr...,correct,Copy of push up 100,train
3,C:\Data\hboict\Sem7-AIFS\Personal_Project\Corr...,correct,Copy of push up 100,train
4,C:\Data\hboict\Sem7-AIFS\Personal_Project\Corr...,correct,Copy of push up 101,train



## 3. Extract Frames into YOLO Classification Folders
Creates `data/frames/{train,val,test}/{correct,incorrect}` and samples frames at `FPS`.


In [4]:

def extract_frames(df: pd.DataFrame, output_dir: Path, fps: int = 2, imgsz: int = 224):
    for split in ["train", "val", "test"]:
        for cls in ["correct", "incorrect"]:
            (output_dir / split / cls).mkdir(parents=True, exist_ok=True)

    written = []
    for _, row in tqdm(df.iterrows(), total=len(df), desc="Extracting frames"):
        video_path = row["path"]
        label = row["label"]
        split = row["split"]
        vid = cv2.VideoCapture(video_path)
        if not vid.isOpened():
            print(f"Could not open {video_path}")
            continue
        vfps = vid.get(cv2.CAP_PROP_FPS)
        if vfps <= 0: vfps = 30.0
        interval = max(int(round(vfps / fps)), 1)
        frame_idx = 0
        saved_idx = 0
        while True:
            ret, frame = vid.read()
            if not ret:
                break
            if frame_idx % interval == 0:
                h, w = frame.shape[:2]
                side = min(h, w)
                y0 = (h - side) // 2
                x0 = (w - side) // 2
                crop = frame[y0:y0+side, x0:x0+side]
                img = cv2.resize(crop, (imgsz, imgsz), interpolation=cv2.INTER_AREA)
                out_name = f"{Path(video_path).stem}_f{saved_idx:05d}.jpg"
                out_path = output_dir / split / label / out_name
                cv2.imwrite(str(out_path), img)
                written.append({"path": str(out_path), "label": label, "split": split})
                saved_idx += 1
            frame_idx += 1
        vid.release()
    return pd.DataFrame(written)

frames_index = extract_frames(df, output_dir=Path(OUTPUT_FRAMES_DIR), fps=FPS, imgsz=IMGSZ)
frames_index.to_csv(Path(OUTPUT_FRAMES_DIR)/"frames_index.csv", index=False)
frames_index.head()


Extracting frames: 100%|██████████| 200/200 [01:27<00:00,  2.28it/s]


Unnamed: 0,path,label,split
0,data\frames\val\correct\Copy of push up 1_f000...,correct,val
1,data\frames\val\correct\Copy of push up 1_f000...,correct,val
2,data\frames\val\correct\Copy of push up 1_f000...,correct,val
3,data\frames\val\correct\Copy of push up 1_f000...,correct,val
4,data\frames\val\correct\Copy of push up 1_f000...,correct,val



## 4. Train YOLO Classification
Train an image classifier on extracted frames. Results land in `runs/classify/pushup-cls/`.


In [5]:

cls_model = YOLO(CLASSIFIER_MODEL)
results = cls_model.train(
    data=str(OUTPUT_FRAMES_DIR),
    epochs=EPOCHS,
    imgsz=IMGSZ,
    batch=BATCH,
    lr0=LR0,
    weight_decay=WEIGHT_DECAY,
    project=RUNS_DIR,
    name=RUN_NAME,
    verbose=True
)
results


New https://pypi.org/project/ultralytics/8.3.223 available  Update with 'pip install -U ultralytics'
Ultralytics 8.3.203  Python-3.13.7 torch-2.8.0+cpu CPU (13th Gen Intel Core i7-13700H)
[34m[1mengine\trainer: [0magnostic_nms=False, amp=True, augment=False, auto_augment=randaugment, batch=64, bgr=0.0, box=7.5, cache=False, cfg=None, classes=None, close_mosaic=10, cls=0.5, compile=False, conf=None, copy_paste=0.0, copy_paste_mode=flip, cos_lr=False, cutmix=0.0, data=data/frames, degrees=0.0, deterministic=True, device=cpu, dfl=1.5, dnn=False, dropout=0.0, dynamic=False, embed=None, epochs=40, erasing=0.4, exist_ok=False, fliplr=0.5, flipud=0.0, format=torchscript, fraction=1.0, freeze=None, half=False, hsv_h=0.015, hsv_s=0.7, hsv_v=0.4, imgsz=224, int8=False, iou=0.7, keras=False, kobj=1.0, line_width=None, lr0=0.01, lrf=0.01, mask_ratio=4, max_det=300, mixup=0.0, mode=train, model=yolov8n-cls.pt, momentum=0.937, mosaic=1.0, multi_scale=False, name=pushup-cls4, nbs=64, nms=False, op

ultralytics.utils.metrics.ClassifyMetrics object with attributes:

confusion_matrix: <ultralytics.utils.metrics.ConfusionMatrix object at 0x000002A46817C590>
curves: []
curves_results: []
fitness: 0.8644067943096161
keys: ['metrics/accuracy_top1', 'metrics/accuracy_top5']
results_dict: {'metrics/accuracy_top1': 0.7288135886192322, 'metrics/accuracy_top5': 1.0, 'fitness': 0.8644067943096161}
save_dir: WindowsPath('C:/Data/hboict/Sem7-AIFS/Personal_Project/runs/pushup-cls4')
speed: {'preprocess': 0.0001960452106691855, 'inference': 4.783190960392729, 'loss': 1.1864497121107781e-05, 'postprocess': 4.971756384113414e-05}
task: 'classify'
top1: 0.7288135886192322
top5: 1.0

## 4.5. Test auto locate/ checks

In [6]:
from pathlib import Path
import glob, os

# ABSOLUTE paths (match your screenshot)
ROOT_FRAMES = Path(r"C:/Data/hboict/Sem7-AIFS/Personal_Project/data/frames").resolve()
TRAIN_DIR   = ROOT_FRAMES / "train"
VAL_DIR     = ROOT_FRAMES / "val"
TEST_DIR    = ROOT_FRAMES / "test"

print("Using:", ROOT_FRAMES)

# Nuke stale Ultralytics caches so it rescans
for f in glob.glob(str(ROOT_FRAMES / "*.cache")):
    try:
        os.remove(f); print("Removed cache:", f)
    except Exception as e:
        print("Could not remove", f, e)

def count_imgs(p: Path):
    return len(glob.glob(str(p / "*.jpg"))) + len(glob.glob(str(p / "*.png")))

for split_dir in [TRAIN_DIR, VAL_DIR, TEST_DIR]:
    for cls in ["correct","incorrect"]:
        d = split_dir / cls
        samples = sorted(glob.glob(str(d / "*.jpg")) + glob.glob(str(d / "*.png")))[:3]
        print(f"{d} | exists={d.exists()} | count={count_imgs(d)} | samples={samples}")


Using: C:\Data\hboict\Sem7-AIFS\Personal_Project\data\frames
Removed cache: C:\Data\hboict\Sem7-AIFS\Personal_Project\data\frames\train.cache
Removed cache: C:\Data\hboict\Sem7-AIFS\Personal_Project\data\frames\val.cache
C:\Data\hboict\Sem7-AIFS\Personal_Project\data\frames\train\correct | exists=True | count=295 | samples=['C:\\Data\\hboict\\Sem7-AIFS\\Personal_Project\\data\\frames\\train\\correct\\Copy of push up 100_f00000.jpg', 'C:\\Data\\hboict\\Sem7-AIFS\\Personal_Project\\data\\frames\\train\\correct\\Copy of push up 100_f00001.jpg', 'C:\\Data\\hboict\\Sem7-AIFS\\Personal_Project\\data\\frames\\train\\correct\\Copy of push up 100_f00002.jpg']
C:\Data\hboict\Sem7-AIFS\Personal_Project\data\frames\train\incorrect | exists=True | count=326 | samples=['C:\\Data\\hboict\\Sem7-AIFS\\Personal_Project\\data\\frames\\train\\incorrect\\10_f00000.jpg', 'C:\\Data\\hboict\\Sem7-AIFS\\Personal_Project\\data\\frames\\train\\incorrect\\10_f00001.jpg', 'C:\\Data\\hboict\\Sem7-AIFS\\Personal_Pro


## 5. Evaluate on Test Split + Save Confusion Matrix


In [26]:

best_weights = Path(BEST_WEIGHTS)
if not best_weights.exists():
    raise FileNotFoundError(f"Best weights not found at {best_weights}. Train first.")

from ultralytics import YOLO
eval_model = YOLO(str(best_weights))
metrics = eval_model.val(data=str(OUTPUT_FRAMES_DIR), imgsz=IMGSZ, split="test")
print(metrics)


# Build confusion matrix via direct predictions
class_names = ["correct", "incorrect"]
y_true, y_pred = [], []
test_dir = Path(OUTPUT_FRAMES_DIR) / "test"
img_paths = []
for cls_idx, cls in enumerate(class_names):
    for p in (test_dir / cls).glob("*.jpg"):
        img_paths.append((str(p), cls_idx))

bs = 64
for i in range(0, len(img_paths), bs):
    batch = img_paths[i:i+bs]
    imgs = [b[0] for b in batch]
    gts = [b[1] for b in batch]
    preds = eval_model(imgs, verbose=False)
    for gt, pred in zip(gts, preds):
        pred_cls = int(np.argmax(pred.probs.data))
        y_true.append(gt)
        y_pred.append(pred_cls)

cm = confusion_matrix(y_true, y_pred, labels=[0,1])
report = classification_report(y_true, y_pred, target_names=class_names, digits=4)
print(report)

def plot_confusion_matrix(cm, class_names, out_path: Path):
    fig = plt.figure(figsize=(5,4))
    plt.imshow(cm, interpolation='nearest')
    plt.title('Confusion matrix')
    plt.colorbar()
    tick_marks = np.arange(len(class_names))
    plt.xticks(tick_marks, class_names, rotation=45)
    plt.yticks(tick_marks, class_names)
    cm_normalized = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
    for i in range(cm.shape[0]):
        for j in range(cm.shape[1]):
            plt.text(j, i, f"{cm[i, j]}\n({cm_normalized[i, j]:.2f})",
                     horizontalalignment="center", verticalalignment="center")
    plt.ylabel('True label')
    plt.xlabel('Predicted label')
    plt.tight_layout()
    fig.savefig(Path(RUNS_DIR)/RUN_NAME/"confusion_matrix.png", dpi=200)
    plt.close(fig)

plot_confusion_matrix(cm, class_names, Path(RUNS_DIR) / RUN_NAME / "confusion_matrix.png")


Ultralytics 8.3.203  Python-3.13.7 torch-2.8.0+cpu CPU (13th Gen Intel Core i7-13700H)


YOLOv8n-cls summary (fused): 30 layers, 1,437,442 parameters, 0 gradients, 3.3 GFLOPs
[34m[1mtrain:[0m C:\Data\hboict\Sem7-AIFS\Personal_Project\data\frames\train... found 621 images in 2 classes  
[34m[1mval:[0m C:\Data\hboict\Sem7-AIFS\Personal_Project\data\frames\val... found 177 images in 2 classes  
[34m[1mtest:[0m C:\Data\hboict\Sem7-AIFS\Personal_Project\data\frames\test... found 113 images in 2 classes  
[34m[1mtest: [0mFast image access  (ping: 0.00.0 ms, read: 100.029.6 MB/s, size: 14.3 KB)
[K[34m[1mtest: [0mScanning C:\Data\hboict\Sem7-AIFS\Personal_Project\data\frames\test... 113 images, 0 corrupt: 100% ━━━━━━━━━━━━ 113/113 87.3Kit/s 0.0s
[K               classes   top1_acc   top5_acc: 100% ━━━━━━━━━━━━ 8/8 9.7it/s 0.8s0.1s
                   all      0.867          1
Speed: 0.0ms preprocess, 4.6ms inference, 0.0ms loss, 0.0ms postprocess per image
Results saved to [1mC:\Data\hboict\Sem7-AIFS\Personal_Project\runs\classify\val3[0m
ultralytics.utils.metric


## 6. Rep Counter with **YOLO-Pose** + Minor Feedback
- Uses `yolov8n-pose.pt` to extract COCO keypoints.
- Simple heuristics compute:
  - **Depth** (elbow angle at bottom)
  - **Hip Sag** (shoulder–hip alignment)
  - **Elbow Flaring** (upper-arm angle relative to torso)
  - **Tempo** (rep duration)


In [10]:

# --- Geometry helpers --------------------------------------------------------
def angle(a, b, c):
    # angle at b: a-b-c
    a, b, c = np.array(a), np.array(b), np.array(c)
    ba = a - b
    bc = c - b
    cosang = np.dot(ba, bc) / (np.linalg.norm(ba) * np.linalg.norm(bc) + 1e-6)
    cosang = np.clip(cosang, -1.0, 1.0)
    return math.degrees(math.acos(cosang))

def line_point_distance(p, a, b):
    # distance from p to line through a-b
    p, a, b = np.array(p), np.array(a), np.array(b)
    if np.allclose(a,b):
        return np.linalg.norm(p-a)
    return np.linalg.norm(np.cross(b-a, a-p)) / (np.linalg.norm(b-a) + 1e-6)

# --- COCO keypoint indices (YOLOv8-pose) ------------------------------------
# 0 nose, 5 left_shoulder, 6 right_shoulder, 7 left_elbow, 8 right_elbow,
# 9 left_wrist, 10 right_wrist, 11 left_hip, 12 right_hip, 13 left_knee, 14 right_knee,
# 15 left_ankle, 16 right_ankle
KS = dict(nose=0, ls=5, rs=6, le=7, re=8, lw=9, rw=10, lh=11, rh=12, lk=13, rk=14, la=15, ra=16)

def get_xy(kps, i):
    # kps shape: (17,3) -> x,y,conf
    return (float(kps[i,0]), float(kps[i,1]))

def valid_triplet(kps, a,b,c, minconf=0.35):
    return kps[a,2]>minconf and kps[b,2]>minconf and kps[c,2]>minconf

# --- Form rules (heuristics) -------------------------------------------------
def assess_form(kps, phase="bottom"):
    issues = []
    # Depth: elbow angle at bottom should be small (e.g., < 80-90 deg)
    if valid_triplet(kps, KS['ls'], KS['le'], KS['lw']):
        left_elbow = angle(get_xy(kps, KS['ls']), get_xy(kps, KS['le']), get_xy(kps, KS['lw']))
    else:
        left_elbow = None
    if valid_triplet(kps, KS['rs'], KS['re'], KS['rw']):
        right_elbow = angle(get_xy(kps, KS['rs']), get_xy(kps, KS['re']), get_xy(kps, KS['rw']))
    else:
        right_elbow = None

    elbow_min = None
    if left_elbow is not None and right_elbow is not None:
        elbow_min = min(left_elbow, right_elbow)
    elif left_elbow is not None:
        elbow_min = left_elbow
    elif right_elbow is not None:
        elbow_min = right_elbow

    # if phase == "bottom" and elbow_min is not None and elbow_min > 100:
    #     issues.append("Depth too shallow (elbows not bent enough)")
        
    if phase == "bottom" and elbow_min is not None and elbow_min > 120:
        issues.append("Depth too shallow (aim for elbows ~90°)")

    # Hip sag: distance of hip from line shoulder-ankle should be small
    # Use right side if available else left
    side = "r" if kps[KS['rs'],2] > kps[KS['ls'],2] else "l"
    if side == "r":
        parts = ['rs','rh','ra']
    else:
        parts = ['ls','lh','la']
    if kps[KS[parts[0]],2]>0.35 and kps[KS[parts[1]],2]>0.35 and kps[KS[parts[2]],2]>0.35:
        shoulder = get_xy(kps, KS[parts[0]])
        hip = get_xy(kps, KS[parts[1]])
        ankle = get_xy(kps, KS[parts[2]])
        sag = line_point_distance(hip, shoulder, ankle)
        # Normalize by body length shoulder-ankle
        norm = np.linalg.norm(np.array(shoulder) - np.array(ankle)) + 1e-6
        if sag / norm > 0.12:
            issues.append("Hips sagging (keep core tight)")

    # Elbow flaring: angle between upper-arm and torso too large
    # torso vector: shoulder-hip; upper-arm: shoulder-elbow
    if kps[KS['rs'],2]>0.35 and kps[KS['rh'],2]>0.35 and kps[KS['re'],2]>0.35:
        rs, rh, re = get_xy(kps, KS['rs']), get_xy(kps, KS['rh']), get_xy(kps, KS['re'])
        torso_vec = np.array(rh) - np.array(rs)
        arm_vec = np.array(re) - np.array(rs)
        cosang = abs(np.dot(torso_vec, arm_vec)) / (np.linalg.norm(torso_vec)*np.linalg.norm(arm_vec)+1e-6)
        flare_angle = math.degrees(math.acos(np.clip(cosang, -1, 1)))
        if flare_angle > 65:  # heuristic threshold
            issues.append("Elbows flaring out (keep ~45° from torso)")

    return issues


In [11]:

# --- Pose-based rep counter --------------------------------------------------
class PoseRepCounter:
    def __init__(self, pose_model, frames_for_classify=7, vote_strategy="majority"):
        self.model = pose_model
        self.prev_y = deque(maxlen=5)
        self.state = "up"
        self.last_bottom_time = None
        self.reps = 0
        self.rep_events = []  # list of dicts
        self.frames_buffer = deque(maxlen=frames_for_classify)
        self.frames_for_classify = frames_for_classify
        self.vote_strategy = vote_strategy

    def update(self, frame):
        self.frames_buffer.append(frame.copy())
        res = self.model(frame, verbose=False)[0]
        phase = None
        kps_best = None

        # choose highest-conf person
        if res.keypoints is not None and len(res.keypoints) > 0:
            # Each item has .xy (N,17,2) and .conf (N,17)
            best_i = 0
            best_score = -1
            for i, kp in enumerate(res.keypoints):
                conf = float(kp.conf.mean())
                if conf > best_score:
                    best_score = conf
                    best_i = i
            kps = res.keypoints[best_i].data[0].cpu().numpy()  # (17,3)
            kps_best = kps

            # vertical proxy: average y of shoulders & hips
            y_points = []
            for idx in [KS['ls'], KS['rs'], KS['lh'], KS['rh']]:
                if kps[idx,2] > 0.35:
                    y_points.append(kps[idx,1])
            if len(y_points)>=2:
                y_mean = float(np.mean(y_points))
                self.prev_y.append(y_mean)

            if len(self.prev_y) == self.prev_y.maxlen:
                dy = self.prev_y[-1] - self.prev_y[0]
                # simple hysteresis on y to detect down/up
                if self.state == "up" and dy > 8:
                    self.state = "down"
                    phase = "down"
                elif self.state == "down" and dy < -8:
                    self.state = "up"
                    phase = "up"
                    # Up transition after down => count a rep
                    self.reps += 1
                    issues = assess_form(kps, phase="bottom")
                    event = {
                        "rep": self.reps,
                        "time": time.time(),
                        "issues": issues,
                        "kps_conf": float(np.mean(kps[:,2]))
                    }
                    self.rep_events.append(event)
                    return True, event, kps_best

        return False, None, kps_best


# 6.5 or something:
Check for accurate labeling of frames

In [12]:
from ultralytics import YOLO
from pathlib import Path
import random
import cv2

model = YOLO("runs/pushup-cls4/weights/best.pt")

# Get one random frame from each class
correct_sample = random.choice(list(Path("data/frames/test/correct").glob("*.jpg")))
incorrect_sample = random.choice(list(Path("data/frames/test/incorrect").glob("*.jpg")))

for img_path in [correct_sample, incorrect_sample]:
    result = model(img_path, verbose=False)[0]
    print(img_path.name, result.names, result.probs.top1, result.names[result.probs.top1])

Copy of push up 102_f00003.jpg {0: 'correct', 1: 'incorrect'} 1 incorrect
Copy of push up 107_f00001.jpg {0: 'correct', 1: 'incorrect'} 1 incorrect



## 7. Integrate Classifier into Pose Rep Counter
When a rep is detected, we classify the buffered frames as `correct/incorrect` using the trained classifier.


In [8]:

# Load pose + classifier
pose_model = YOLO(POSE_WEIGHTS)
classifier = YOLO(str(Path(BEST_WEIGHTS))) if Path(BEST_WEIGHTS).exists() else None

# Voting helper
def vote_frames(model, frames, strategy="majority"):
    preds = []
    for f in frames:
        r = model(f, verbose=False)[0]
        top1 = int(r.probs.top1); conf = float(r.probs.top1conf)
        preds.append((top1, conf))
    counts = defaultdict(int); confs = defaultdict(list)
    for cls_idx, c in preds:
        counts[cls_idx]+=1; confs[cls_idx].append(c)
    if strategy == "avg_confidence":
        best_cls = max(confs.keys(), key=lambda c: np.mean(confs[c]))
        best_conf = float(np.mean(confs[best_cls]))
    else:
        max_votes = max(counts.values())
        majority = [c for c,v in counts.items() if v==max_votes]
        if len(majority)==1:
            best_cls = majority[0]; best_conf = float(np.mean(confs[best_cls]))
        else:
            best_cls = max(majority, key=lambda c: np.mean(confs[c]))
            best_conf = float(np.mean(confs[best_cls]))
    return best_cls, best_conf

counter = PoseRepCounter(pose_model, frames_for_classify=FRAMES_PER_REP_FOR_CLASSIFY, vote_strategy=VOTE_STRATEGY)

# Live/video loop
if USE_WEBCAM:
    cap = cv2.VideoCapture(VIDEO_SOURCE)
else:
    cap = cv2.VideoCapture(VIDEO_SOURCE)

if SAVE_OUTPUT:
    fourcc = cv2.VideoWriter_fourcc(*"mp4v")
    out = cv2.VideoWriter(OUTPUT_VIDEO, fourcc, 30.0, (int(cap.get(3)), int(cap.get(4))))
else:
    out = None

rep_log = []
last_rep_info = None
frame_idx = 0
print("Starting loop. Press 'q' to quit.")

while True:
    ok, frame = cap.read()
    if not ok:
        break
    frame_idx += 1
    if frame_idx % FRAME_SKIP != 0:
        if SHOW_WINDOW and DRAW_OVERLAY:
            cv2.imshow("Pose Rep + Feedback + Classifier", frame)
            if cv2.waitKey(1) & 0xFF == ord('q'):
                break
        continue

    rep_event, event, kps_best = counter.update(frame)

    if rep_event:
        label_txt, conf_txt = None, None
        if classifier is not None and len(counter.frames_buffer) >= FRAMES_PER_REP_FOR_CLASSIFY:
            frames_for_rep = list(counter.frames_buffer)[-FRAMES_PER_REP_FOR_CLASSIFY:]
            best_cls, best_conf = vote_frames(classifier, frames_for_rep, strategy=VOTE_STRATEGY)
            label_txt = classifier.names[best_cls]
            conf_txt = round(best_conf, 3)

        info = {
            "rep": event["rep"],
            "timestamp": time.time(),
            "issues": event["issues"],
            "kps_conf": event["kps_conf"],
            "label": label_txt,
            "label_conf": conf_txt
        }
        rep_log.append(info)
        last_rep_info = info
        print(f"[REP {info['rep']}] issues={info['issues']} label={label_txt} ({conf_txt})")

    # Overlay
    if DRAW_OVERLAY:
        vis = frame.copy()
        cv2.putText(vis, f"Reps: {counter.reps}", (20,40), cv2.FONT_HERSHEY_SIMPLEX, 1.0, (255,255,255), 2, cv2.LINE_AA)
        if last_rep_info:
            last = last_rep_info
            msg = f"Last: {last.get('label','?')} ({last.get('label_conf','-')})"
            cv2.putText(vis, msg, (20,80), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255,255,255), 2, cv2.LINE_AA)
            if len(last['issues'])>0:
                cv2.putText(vis, "Issues: " + "; ".join(last['issues'][:2]), (20,120),
                            cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255,255,255), 2, cv2.LINE_AA)
        frame = vis

    if out is not None:
        out.write(frame)

    if SHOW_WINDOW:
        cv2.imshow("Pose Rep + Feedback + Classifier", frame)
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break

cap.release()
if out is not None:
    out.release()
cv2.destroyAllWindows()

# Save session
os.makedirs(SESSION_DIR, exist_ok=True)
log_path = Path(SESSION_DIR)/f"session_{int(time.time())}.json"
with open(log_path, "w") as f:
    json.dump(rep_log, f, indent=2)
print("Saved session log to", str(log_path))


Starting loop. Press 'q' to quit.
[REP 1] issues=[] label=incorrect (0.999)
[REP 2] issues=[] label=incorrect (0.972)
[REP 3] issues=[] label=incorrect (0.8)


  return np.linalg.norm(np.cross(b-a, a-p)) / (np.linalg.norm(b-a) + 1e-6)


[REP 4] issues=['Depth too shallow (elbows not bent enough)', 'Hips sagging (keep core tight)', 'Elbows flaring out (keep ~45° from torso)'] label=incorrect (0.981)
[REP 5] issues=['Depth too shallow (elbows not bent enough)', 'Elbows flaring out (keep ~45° from torso)'] label=incorrect (0.999)
[REP 6] issues=['Depth too shallow (elbows not bent enough)', 'Elbows flaring out (keep ~45° from torso)'] label=incorrect (0.999)
[REP 7] issues=['Depth too shallow (elbows not bent enough)'] label=incorrect (0.997)
[REP 8] issues=['Depth too shallow (elbows not bent enough)'] label=incorrect (0.998)
[REP 9] issues=[] label=incorrect (0.996)
[REP 10] issues=['Depth too shallow (elbows not bent enough)', 'Elbows flaring out (keep ~45° from torso)'] label=incorrect (0.996)
[REP 11] issues=['Depth too shallow (elbows not bent enough)', 'Elbows flaring out (keep ~45° from torso)'] label=incorrect (0.997)
[REP 12] issues=['Depth too shallow (elbows not bent enough)'] label=incorrect (0.997)
[REP 13]