# Pose Form Checker – **RPi 4B + MediaMTX (No Errors)**

*Laptop: MediaMTX + FFmpeg push → RPi: MediaPipe + live overlay.*

---

## Start Laptop
1. Run `start_mediamtx.bat` → leave open
2. Run `push_webcam.bat` → leave open

## RPi
Run cells below.

In [None]:
import cv2
import mediapipe as mp
import numpy as np
import pickle
import time
import os
import glob
from IPython import display as ipydisplay

mp_pose = mp.solutions.pose

RTSP_URL = "rtsp://10.227.207.170:8554/webcam"

# Suppress all H.264 warnings
os.environ["OPENCV_FFMPEG_CAPTURE_OPTIONS"] = (
    "rtsp_transport;tcp|"
    "fflags;nobuffer+discardcorrupt|"
    "flags;low_delay|"
    "max_delay;0|"
    "probesize;32|"
    "analyzeduration;0|"
    "err_detect;ignore_err"
)

## Extract Reference (Run Once)

In [None]:
def extract_all_references_from_folder(folder='exercises', target_fps=10.0):
    if not os.path.exists(folder):
        os.makedirs(folder)
        print(f"Created {folder}/ – add your .mp4 files")
        return

    video_files = [f for f in glob.glob(os.path.join(folder, "*.*")) if f.lower().endswith(('.mp4', '.avi', '.mov'))]
    if not video_files:
        print("No videos in exercises/")
        return

    pose = mp_pose.Pose(static_image_mode=False, model_complexity=1)
    for video_path in video_files:
        name = os.path.splitext(os.path.basename(video_path))[0]
        cap = cv2.VideoCapture(video_path)
        fps = cap.get(cv2.CAP_PROP_FPS) or 30
        interval = max(1, int(fps / target_fps))
        refs, cnt = [], 0
        while cap.isOpened():
            ret, frame = cap.read()
            if not ret: break
            if cnt % interval == 0:
                rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                res = pose.process(rgb)
                if res.pose_landmarks:
                    refs.append([(l.x, l.y, l.z, l.visibility) for l in res.pose_landmarks.landmark])
            cnt += 1
        cap.release()
        with open(f"{name}.pkl", 'wb') as f:
            pickle.dump(refs, f)
        print(f"{name}.pkl → {len(refs)} frames")
    pose.close()
    print("Done!")

extract_all_references_from_folder()

W0000 00:00:1762501091.738311    2776 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.
W0000 00:00:1762501091.846852    2776 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.


squat.pkl → 51 frames
push-up.pkl → 74 frames
romanian_deadlift.pkl → 27 frames
plank.pkl → 39 frames
deadlift.pkl → 34 frames
Done!


## Live Form Checker

In [None]:
def perform_exercise(exercise_name: str):
    pkl_path = f"{exercise_name}.pkl"
    if not os.path.exists(pkl_path):
        print(f"No reference: {pkl_path}")
        return
    with open(pkl_path, 'rb') as f:
        ref_seq = pickle.load(f)

    pose = mp_pose.Pose(static_image_mode=False, model_complexity=1)

    # GStreamer: hardware decode, no warnings
    gst = (
        f"rtspsrc location={RTSP_URL} latency=100 protocols=tcp ! "
        "rtph264depay ! h264parse ! v4l2h264dec ! "
        "videoconvert ! video/x-raw,format=BGR,width=640,height=480,framerate=15/1 ! "
        "appsink drop=true max-buffers=1 sync=false"
    )
    cap = cv2.VideoCapture(gst, cv2.CAP_GSTREAMER)

    if not cap.isOpened():
        print("GStreamer failed → FFmpeg")
        cap = cv2.VideoCapture(RTSP_URL, cv2.CAP_FFMPEG)
        cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)

    if not cap.isOpened():
        print("Stream failed")
        return

    print("Connected! Starting overlay...")
    display_handle = ipydisplay.display(ipydisplay.Image(data=b''), display_id=True)

    ref_idx = 0
    last_time = time.time()
    frame_delay = 1.0 / 10
    angle_thr = 20
    frame_cnt = 0

    def angle(a, b, c):
        a, b, c = np.array(a), np.array(b), np.array(c)
        ab, bc = a-b, c-b
        cos = np.dot(ab, bc) / (np.linalg.norm(ab) * np.linalg.norm(bc) + 1e-6)
        return np.degrees(np.arccos(np.clip(cos, -1, 1)))

    try:
        while cap.isOpened():
            ret, frame = cap.read()
            if not ret: continue
            if frame_cnt % 3 != 0:
                frame_cnt += 1
                continue
            frame_cnt += 1

            h, w, _ = frame.shape
            rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            results = pose.process(rgb)

            canvas = np.zeros((h, w, 3), np.uint8)

            # Blue: Reference
            ref = ref_seq[ref_idx]
            for a, b in mp_pose.POSE_CONNECTIONS:
                if ref[a][3] > 0.1 and ref[b][3] > 0.1:
                    cv2.line(canvas, (int(ref[a][0]*w), int(ref[a][1]*h)), (int(ref[b][0]*w), int(ref[b][1]*h)), (255, 0, 0), 2)

            # Green/Red: User
            if results.pose_landmarks:
                user = [(l.x, l.y, l.z, l.visibility) for l in results.pose_landmarks.landmark]
                for a, b in mp_pose.POSE_CONNECTIONS:
                    if all(x[3] > 0.1 for x in [ref[a], ref[b], user[a], user[b]]):
                        parent = next((c[0] for c in mp_pose.POSE_CONNECTIONS if c[1] == b), None)
                        if parent and ref[parent][3] > 0.1 and user[parent][3] > 0.1:
                            ra = angle((ref[a][0], ref[a][1]), (ref[b][0], ref[b][1]), (ref[parent][0], ref[parent][1]))
                            ua = angle((user[a][0], user[a][1]), (user[b][0], user[b][1]), (user[parent][0], user[parent][1]))
                            color = (0, 255, 0) if abs(ra - ua) < angle_thr else (0, 0, 255)
                            cv2.line(canvas, (int(user[a][0]*w), int(user[a][1]*h)), (int(user[b][0]*w), int(user[b][1]*h)), color, 2)

            _, buf = cv2.imencode('.jpeg', canvas, [int(cv2.IMWRITE_JPEG_QUALITY), 80])
            display_handle.update(ipydisplay.Image(data=buf.tobytes()))

            if time.time() - last_time >= frame_delay:
                last_time = time.time()
                ref_idx = (ref_idx + 1) % len(ref_seq)

            time.sleep(0.01)

    except KeyboardInterrupt:
        print("\nStopped")
    finally:
        cap.release()
        pose.close()
        ipydisplay.clear_output()
        print("Done")

## Start Workout

In [None]:
perform_exercise('romanian_deadlift')

Done
