In [None]:
# --- Cell 1: Imports, config, GUI probe ---

import sys, time, datetime
from pathlib import Path

import numpy as np
import cv2
import mediapipe as mp
import depthai as dai

# Preview / record settings
PREVIEW_W, PREVIEW_H, FPS = 960, 540, 30
WINDOW_NAME = "OAK-D Pose (cam0 | cam1)"
FONT = cv2.FONT_HERSHEY_SIMPLEX
# Fast/low-res mode: try to pull a small preview from the device (highest FPS),
# otherwise fall back to full-res and downscale on host.
FAST_LOWRES = True          # toggle this to False to go back to full-res
LOWRES_W, LOWRES_H = 320, 180   # tiny, very fast for MediaPipe
TARGET_FPS = 60                 # request high fps if the node allows it

# If fast mode is on, also shrink preview/record sizes to match for max throughput
if FAST_LOWRES:
    PREVIEW_W, PREVIEW_H = LOWRES_W, LOWRES_H


def can_open_cv_window(force_headless: bool = False) -> bool:
    if force_headless:
        return False
    try:
        cv2.namedWindow("_probe_", cv2.WINDOW_NORMAL)
        cv2.imshow("_probe_", np.zeros((1, 1, 3), dtype="uint8"))
        cv2.waitKey(1)
        cv2.destroyWindow("_probe_")
        return True
    except Exception:
        return False

HAS_GUI = can_open_cv_window(False)
print(f"DepthAI: {dai.__version__} | OpenCV GUI available: {HAS_GUI}")


In [None]:
# --- Cell 2: Device discovery helpers ---

def list_devices():
    """Return a list of DeviceInfo objects for all connected OAKs."""
    return dai.Device.getAllAvailableDevices()

def pick_two_devices_or_die():
    devs = list_devices()
    if len(devs) < 2:
        raise RuntimeError(f"Need 2 OAK devices, found {len(devs)}.")
    return devs[0], devs[1]

def pick_rgb_socket(device: dai.Device):
    """
    Pick the RGB camera socket on a device, or fall back to the first available socket.
    """
    sockets = list(device.getConnectedCameras())
    if not sockets:
        raise RuntimeError("No cameras connected/reported on this device.")
    # Prefer RGB if present
    for s in sockets:
        if str(s).endswith("RGB") or s == getattr(dai.CameraBoardSocket, "RGB", None):
            return s
    return sockets[0]


In [None]:
# --- Cell 3: MediaPipe Pose wrapper ---

class PoseEstimator:
    def __init__(self, model_complexity=1):
        mp_pose = mp.solutions.pose
        self.pose = mp_pose.Pose(
            static_image_mode=False,
            model_complexity=model_complexity,
            enable_segmentation=False,
            min_detection_confidence=0.5,
            min_tracking_confidence=0.5,
        )
        self.drawer = mp.solutions.drawing_utils
        self.styles = mp.solutions.drawing_styles
        self.connections = mp_pose.POSE_CONNECTIONS

    def process_and_draw(self, frame_bgr):
        # frame_bgr is modified in-place (drawing)
        res = self.pose.process(cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2RGB))
        if res.pose_landmarks:
            self.drawer.draw_landmarks(
                frame_bgr, res.pose_landmarks, self.connections,
                landmark_drawing_spec=self.styles.get_default_pose_landmarks_style()
            )
        return frame_bgr

    def close(self):
        self.pose.close()


In [None]:
# --- Cell 4: Recording & playback utilities ---

class DualRecorder:
    def __init__(self, out_dir, fps=FPS, size=(PREVIEW_W, PREVIEW_H)):
        out_dir = Path(out_dir)
        out_dir.mkdir(parents=True, exist_ok=True)
        fourcc = cv2.VideoWriter_fourcc(*"mp4v")
        ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
        self.f0 = str(out_dir / f"cam0_{ts}.mp4")
        self.f1 = str(out_dir / f"cam1_{ts}.mp4")
        self.w0 = cv2.VideoWriter(self.f0, fourcc, fps, size)
        self.w1 = cv2.VideoWriter(self.f1, fourcc, fps, size)
        self.active = True
        print(f"[REC] → {self.f0}\n[REC] → {self.f1}")

    def write(self, f0, f1):
        if not self.active:
            return
        if f0 is not None: self.w0.write(f0)
        if f1 is not None: self.w1.write(f1)

    def stop(self):
        if self.active:
            self.w0.release(); self.w1.release()
            self.active = False
            print("[REC] stopped")

class DualPlayer:
    def __init__(self, f0, f1):
        self.cap0, self.cap1 = cv2.VideoCapture(f0), cv2.VideoCapture(f1)
        if not self.cap0.isOpened() or not self.cap1.isOpened():
            raise RuntimeError("Playback files not found or unreadable.")
        print(f"[PLAY] {f0}\n[PLAY] {f1}")

    def read(self):
        ok0, f0 = self.cap0.read()
        ok1, f1 = self.cap1.read()
        if not ok0 and not ok1:
            return None, None, False
        return (f0 if ok0 else None), (f1 if ok1 else None), True

    def stop(self):
        self.cap0.release(); self.cap1.release()

def latest_pair(base="recordings"):
    base = Path(base)
    if not base.exists(): return None, None
    dirs = sorted([p for p in base.iterdir() if p.is_dir()], key=lambda p: p.stat().st_mtime, reverse=True)
    for d in dirs:
        c0 = max(d.glob("cam0_*.mp4"), default=None, key=lambda p: p.stat().st_mtime if p.exists() else 0)
        c1 = max(d.glob("cam1_*.mp4"), default=None, key=lambda p: p.stat().st_mtime if p.exists() else 0)
        if c0 and c1: return str(c0), str(c1)
    return None, None


In [None]:
# --- Cell 5: Build per-device pipelines using the new API style (fast/low-res aware) ---

def _request_preview_or_full(builder, width, height):
    """
    Try to get a low-res preview output from the on-device scaler.
    Fallback to full resolution if preview isn't available in this build.
    Returns (queue, using_preview: bool)
    """
    # Prefer preview (low-res)
    if FAST_LOWRES and hasattr(builder, "requestPreviewOutput"):
        try:
            # Some builds take (w,h), some use named args — try both
            try:
                qbuilder = builder.requestPreviewOutput(width, height)
            except TypeError:
                qbuilder = builder.requestPreviewOutput(width=width, height=height)
            q = qbuilder.createOutputQueue()
            return q, True
        except Exception:
            pass

    # Fallback: full resolution
    q = builder.requestFullResolutionOutput().createOutputQueue()
    return q, False


def make_output_queue_for_device(device: dai.Device):
    """
    Using the 'new' API:
      with dai.Pipeline(device) as pipeline:
          cam = pipeline.create(dai.node.Camera).build(socket)
          (preview or full) -> OutputQueue
    Returns (pipeline_context_manager, socket_name).
    """
    socket = pick_rgb_socket(device)
    socket_name = str(socket)

    class _PipeWrapper:
        def __enter__(self):
            self.cm = dai.Pipeline(device)
            self.pipeline = self.cm.__enter__()

            # Build camera node for this socket
            cam_builder = self.pipeline.create(dai.node.Camera).build(socket)

            # If the builder exposes FPS control in this API, request high FPS
            for attr in ("setFps", "setFrameRate"):
                if hasattr(cam_builder, attr):
                    try:
                        getattr(cam_builder, attr)(TARGET_FPS)
                    except Exception:
                        pass

            # Prefer preview output (low-res) if available
            self.queue, self.using_preview = _request_preview_or_full(
                cam_builder, LOWRES_W, LOWRES_H
            )

            self.pipeline.start()
            return self

        def __exit__(self, exc_type, exc, tb):
            try:
                if hasattr(self.pipeline, "stop"):
                    self.pipeline.stop()
            except Exception:
                pass
            return self.cm.__exit__(exc_type, exc, tb)

        def isRunning(self):
            return self.pipeline.isRunning()

        def get(self):
            return self.queue.get()

    return _PipeWrapper(), socket_name


In [None]:
# --- Cell 6: Main loop (dual device, new API) ---

def main(out_dir="recordings", auto_record=False):
    dev0_info, dev1_info = pick_two_devices_or_die()

    # Open the two devices explicitly
    dev0 = dai.Device(dev0_info)
    dev1 = dai.Device(dev1_info)

    # Create per-device pipeline wrappers + queues
    pipe0, name0 = make_output_queue_for_device(dev0)
    pipe1, name1 = make_output_queue_for_device(dev1)

    # Pose estimators
    pose0 = PoseEstimator()
    pose1 = PoseEstimator()

    # Session dir
    session_dir = Path(out_dir) / datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
    session_dir.mkdir(parents=True, exist_ok=True)

    # Playback and recorder
    recorder = None
    player = None
    live = True

    if HAS_GUI:
        cv2.namedWindow(WINDOW_NAME, cv2.WINDOW_NORMAL)
        cv2.resizeWindow(WINDOW_NAME, PREVIEW_W * 2, PREVIEW_H)

    if auto_record:
        recorder = DualRecorder(session_dir, fps=FPS, size=(PREVIEW_W, PREVIEW_H))

    t_prev = time.time()

    try:
        with pipe0 as p0, pipe1 as p1:
            while p0.isRunning() and p1.isRunning():
                if live:
                    # Read frames from both queues (blocking .get() keeps them fresh)
                    f0 = p0.get().getCvFrame()
                    f1 = p1.get().getCvFrame()
                else:
                    # Playback mode
                    f0, f1, ok = player.read()
                    if not ok:
                        print("[PLAY] done.")
                        player.stop(); player = None
                        live = True
                        continue

                # Resize to preview size and run pose
                def proc(frame, pose):
                    if frame is None:
                        return None
                    if (frame.shape[1], frame.shape[0]) != (PREVIEW_W, PREVIEW_H):
                        frame = cv2.resize(frame, (PREVIEW_W, PREVIEW_H))
                    return pose.process_and_draw(frame)

                f0p = proc(f0, pose0)
                f1p = proc(f1, pose1)

                if f0p is None and f1p is None:
                    if HAS_GUI: cv2.waitKey(1)
                    else: time.sleep(0.005)
                    continue
                if f0p is None: f0p = np.zeros((PREVIEW_H, PREVIEW_W, 3), np.uint8)
                if f1p is None: f1p = np.zeros((PREVIEW_H, PREVIEW_W, 3), np.uint8)

                combined = cv2.hconcat([f0p, f1p])

                # FPS overlay
                now = time.time()
                fps = 1.0 / max(1e-6, now - t_prev)
                t_prev = now
                label = ("LIVE" if live else "PLAY") + (" + REC" if (recorder and recorder.active) else "")
                cv2.putText(combined, f"{label}  FPS: {fps:.1f}", (10, 30), FONT, 0.9, (0, 255, 0), 2)

                # Show
                key = 255
                if HAS_GUI:
                    cv2.imshow(WINDOW_NAME, combined)
                    key = cv2.waitKey(1) & 0xFF

                # Record
                if recorder and recorder.active:
                    recorder.write(f0p, f1p)

                # Hotkeys
                if key == ord('q'):
                    break
                elif key == ord('r') and HAS_GUI:
                    if recorder and recorder.active:
                        recorder.stop(); recorder = None
                    else:
                        recorder = DualRecorder(session_dir, fps=FPS, size=(PREVIEW_W, PREVIEW_H))
                elif key == ord('p') and HAS_GUI:
                    if live:
                        f0path, f1path = latest_pair(out_dir)
                        if not f0path or not f1path:
                            print("[PLAY] No recordings found.")
                        else:
                            player = DualPlayer(f0path, f1path)
                            live = False
                    else:
                        if player: player.stop(); player = None
                        live = True
                elif key == ord('s') and HAS_GUI:
                    ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
                    p0path = session_dir / f"cam0_snap_{ts}.png"
                    p1path = session_dir / f"cam1_snap_{ts}.png"
                    cv2.imwrite(str(p0path), f0p)
                    cv2.imwrite(str(p1path), f1p)
                    print(f"[SNAP] {p0path}\n[SNAP] {p1path}")

    finally:
        if recorder and recorder.active:
            recorder.stop()
        if player:
            player.stop()
        pose0.close(); pose1.close()
        if HAS_GUI:
            cv2.destroyAllWindows()


In [None]:

main(out_dir="recordings", auto_record=False)
