# 5. Live Camera Detection (ONNX) - Plank

**Objective:**
1. Load the converted ONNX model pipeline.
2. Initialize MediaPipe Pose for landmark detection.
3. Capture video from a webcam or video file.
4. For each frame, perform pose estimation, extract features, and use the ONNX model for inference.
5. Implement feedback logic with smoothing and debouncing.
6. Display the classification, confidence, and feedback on the video feed.

**Note:** To stop the feed, press 'q' while the OpenCV window is active.

In [6]:
import cv2
import mediapipe as mp
import numpy as np
import onnxruntime as ort
from collections import deque
import time
import os
import sys

module_path = os.path.abspath(os.path.join('../../..'))
if module_path not in sys.path:
    sys.path.append(module_path)

from utils.geometry_utils import GeometryUtils

## 5.1 Configuration

In [7]:
# >>>>> YOU CAN CHANGE THIS VALUE TO 'LR', 'KNN', 'DT', 'RF', or 'XGB' if you saved it earlier <<<<<
MODEL_KEY = 'RF'
ONNX_MODEL_PATH = f"../models/onnx/{MODEL_KEY}_model.onnx"
VIDEO_SOURCE = 0

# --- Detection & Logic Configuration ---
VISIBILITY_THRESHOLD = 0.5
SMOOTHING_WINDOW_SIZE = 7
STATUS_HISTORY_LENGTH = 10
STATUS_COOLDOWN_SEC = 0.5
CONFIDENCE_THRESHOLD_CORRECT = 0.65
CONFIDENCE_THRESHOLD_WRONG = 0.50

# --- UI & Color Configuration ---
COLORS = {
    'landmark_main': (245, 117, 66),
    'landmark_secondary': (245, 66, 230),
    'status_correct': (0, 255, 0), # Green
    'status_high_hips': (0, 0, 255), # Red
    'status_low_hips': (0, 165, 255), # Orange
    'status_analyzing': (255, 255, 0), # Cyan/Yellow
    'text_primary': (255, 255, 255), # White
    'text_feedback': (0, 255, 255)
}
LABEL_MAP = {0: "CORRECT", 1: "HIGH HIPS", 2: "LOW HIPS"}
COLOR_MAP = {0: COLORS['status_correct'], 1: COLORS['status_high_hips'], 2: COLORS['status_low_hips']}

## 5.2 Helper Functions and Classes

In [8]:
# Feature extraction logic (must be identical to training)
LANDMARK_NAMES = [ 'nose', 'left_eye_inner', 'left_eye', 'left_eye_outer', 'right_eye_inner', 'right_eye', 'right_eye_outer', 'left_ear', 'right_ear', 'mouth_left', 'mouth_right', 'left_shoulder', 'right_shoulder', 'left_elbow', 'right_elbow', 'left_wrist', 'right_wrist', 'left_pinky', 'right_pinky', 'left_index', 'right_index', 'left_thumb', 'right_thumb', 'left_hip', 'right_hip', 'left_knee', 'right_knee', 'left_ankle', 'right_ankle', 'left_heel', 'right_heel', 'left_foot_index', 'right_foot_index' ]
geo_utils = GeometryUtils()

def extract_features_from_landmarks(landmarks, visibility_threshold):
    lm_coords = {}
    for i, name in enumerate(LANDMARK_NAMES):
        lm = landmarks.landmark[i]
        if lm.visibility > visibility_threshold:
            lm_coords[name] = [lm.x, lm.y]
        else:
            lm_coords[name] = [np.nan, np.nan]
    
    def get_coord(name): return lm_coords.get(name, [np.nan, np.nan])

    features = []
    # This order MUST match the training features exactly
    features.append(geo_utils.calculate_angle(get_coord('left_shoulder'), get_coord('left_elbow'), get_coord('left_wrist')))
    features.append(geo_utils.calculate_angle(get_coord('right_shoulder'), get_coord('right_elbow'), get_coord('right_wrist')))
    features.append(geo_utils.calculate_angle(get_coord('left_elbow'), get_coord('left_shoulder'), get_coord('left_hip')))
    features.append(geo_utils.calculate_angle(get_coord('right_elbow'), get_coord('right_shoulder'), get_coord('right_hip')))
    features.append(geo_utils.calculate_angle(get_coord('left_shoulder'), get_coord('left_hip'), get_coord('left_knee')))
    features.append(geo_utils.calculate_angle(get_coord('right_shoulder'), get_coord('right_hip'), get_coord('right_knee')))
    features.append(geo_utils.calculate_angle(get_coord('left_hip'), get_coord('left_knee'), get_coord('left_ankle')))
    features.append(geo_utils.calculate_angle(get_coord('right_hip'), get_coord('right_knee'), get_coord('right_ankle')))
    features.append(geo_utils.calculate_angle(get_coord('left_shoulder'), get_coord('left_hip'), get_coord('left_ankle')))
    features.append(geo_utils.calculate_angle(get_coord('right_shoulder'), get_coord('right_hip'), get_coord('right_ankle')))
    features.append(geo_utils.distance_point_to_line(get_coord('left_hip'), get_coord('left_shoulder'), get_coord('left_knee')))
    features.append(geo_utils.distance_point_to_line(get_coord('right_hip'), get_coord('right_shoulder'), get_coord('right_knee')))
    features.append(abs(get_coord('left_shoulder')[1] - get_coord('left_hip')[1]))
    features.append(abs(get_coord('right_shoulder')[1] - get_coord('right_hip')[1]))
    features.append(abs(get_coord('left_hip')[1] - get_coord('left_ankle')[1]))
    features.append(abs(get_coord('right_hip')[1] - get_coord('right_ankle')[1]))
    features.append(geo_utils.calculate_distance(get_coord('left_shoulder'), get_coord('left_hip')))
    features.append(geo_utils.calculate_distance(get_coord('right_shoulder'), get_coord('right_hip')))
    features.append(geo_utils.calculate_distance(get_coord('left_hip'), get_coord('left_ankle')))
    features.append(geo_utils.calculate_distance(get_coord('right_hip'), get_coord('right_ankle')))
    return np.array([features], dtype=np.float32)

## 5.3 Initialize Models and Video Capture

In [9]:
# MediaPipe Pose
mp_pose = mp.solutions.pose
mp_drawing = mp.solutions.drawing_utils
pose = mp_pose.Pose(min_detection_confidence=0.5, min_tracking_confidence=0.5, model_complexity=1)

# ONNX Runtime
ort_session = None
input_name = None
try:
    ort_session = ort.InferenceSession(ONNX_MODEL_PATH)
    input_name = ort_session.get_inputs()[0].name
    print(f"ONNX model '{ONNX_MODEL_PATH}' loaded successfully.")
except Exception as e:
    print(f"Error loading ONNX model: {e}")

# OpenCV Video Capture
cap = cv2.VideoCapture(VIDEO_SOURCE)
if not cap.isOpened():
    print(f"Error: Could not open video source '{VIDEO_SOURCE}'.")

ONNX model '../models/onnx/RF_model.onnx' loaded successfully.


## 5.4 Main Detection Loop

In [10]:
def run_plank_detector():
    if not cap.isOpened() or ort_session is None:
        print("Setup failed. Exiting.")
        return

    # State variables
    proba_history = deque(maxlen=SMOOTHING_WINDOW_SIZE)
    status_log = deque(maxlen=STATUS_HISTORY_LENGTH)
    last_status = "ANALYZING"
    last_status_change_time = time.time()
    
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            print("End of video stream.")
            break
        
        # Process frame
        image_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        results = pose.process(image_rgb)
        image_bgr = cv2.cvtColor(image_rgb, cv2.COLOR_RGB2BGR)
        
        current_status = "ANALYZING"
        status_color = COLORS['status_analyzing']
        confidence_str = ""

        if results.pose_landmarks:
            mp_drawing.draw_landmarks(image_bgr, results.pose_landmarks, mp_pose.POSE_CONNECTIONS,
                                      mp_drawing.DrawingSpec(color=COLORS['landmark_main'], thickness=2, circle_radius=2),
                                      mp_drawing.DrawingSpec(color=COLORS['landmark_secondary'], thickness=2, circle_radius=2))
            
            # Feature Extraction
            features = extract_features_from_landmarks(results.pose_landmarks, VISIBILITY_THRESHOLD)

            if np.isnan(features).all():
                current_status = "POOR VISIBILITY"
            else:
                # Inference with ONNX
                ort_inputs = {input_name: features}
                pred_label, pred_proba_list_of_dicts = ort_session.run(None, ort_inputs)

                # FIX: Convert the ONNX probability output (list of dicts) to a simple numpy array
                prob_dict = pred_proba_list_of_dicts[0]
                prob_array = np.array([prob_dict[k] for k in sorted(prob_dict.keys())])
                proba_history.append(prob_array)

                # Smoothing and Debouncing
                if len(proba_history) == SMOOTHING_WINDOW_SIZE:
                    smoothed_proba = np.mean(proba_history, axis=0)
                    final_prediction = np.argmax(smoothed_proba)
                    confidence = smoothed_proba[final_prediction]
                    confidence_str = f"{confidence:.2f}"
                    
                    status_log.append(final_prediction)
                    
                    # Debounce: Check for consistent status
                    if len(status_log) == STATUS_HISTORY_LENGTH and len(set(status_log)) == 1:
                        confirmed_status = status_log[0]
                        if confirmed_status != last_status and (time.time() - last_status_change_time > STATUS_COOLDOWN_SEC):
                            last_status = confirmed_status
                            last_status_change_time = time.time()
                    
                    # Determine display status based on confidence
                    if (last_status == 0 and confidence >= CONFIDENCE_THRESHOLD_CORRECT) or \
                       (last_status != 0 and confidence >= CONFIDENCE_THRESHOLD_WRONG):
                        current_status = LABEL_MAP.get(last_status, "UNKNOWN")
                        status_color = COLOR_MAP.get(last_status, COLORS['status_analyzing'])
                    else:
                        current_status = "ADJUSTING"
                        
        else:
            current_status = "NO POSE DETECTED"
            proba_history.clear()
            status_log.clear()
            last_status = "ANALYZING"

        # UI Display
        cv2.rectangle(image_bgr, (0,0), (image_bgr.shape[1], 80), (20, 20, 20), -1)
        cv2.putText(image_bgr, 'STATUS', (20, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, COLORS['text_primary'], 2)
        cv2.putText(image_bgr, current_status, (150, 35), cv2.FONT_HERSHEY_SIMPLEX, 1, status_color, 2, cv2.LINE_AA)
        cv2.putText(image_bgr, 'CONFIDENCE', (20, 65), cv2.FONT_HERSHEY_SIMPLEX, 0.7, COLORS['text_primary'], 2)
        cv2.putText(image_bgr, confidence_str, (180, 65), cv2.FONT_HERSHEY_SIMPLEX, 0.7, status_color, 2, cv2.LINE_AA)

        cv2.imshow('Plank Form Detector (ONNX)', image_bgr)
        
        if cv2.waitKey(5) & 0xFF == ord('q'):
            break
            
    cap.release()
    cv2.destroyAllWindows()
    # This is important for Jupyter to release the window properly
    for i in range(5): cv2.waitKey(1)
    pose.close()
    print("Detection loop finished.")

# Run the main function
run_plank_detector()

Detection loop finished.
