In [1]:
import os
from datetime import datetime

import cv2
import numpy as np
import pyrealsense2 as rs


def main():
    # ==========================
    # 기본 설정
    # ==========================
    WIDTH, HEIGHT, FPS = 1280, 720, 30
    SAVE_EVERY_N_FRAME = 1   # 1 = 모든 프레임 저장 (라벨링용)
                             # 2 = 2프레임에 1번, 3 = 3프레임에 1번

    BASE_DIR = "recordings"
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")

    video_dir = os.path.join(BASE_DIR, timestamp, "video")
    image_dir = os.path.join(BASE_DIR, timestamp, "images")

    os.makedirs(video_dir, exist_ok=True)
    os.makedirs(image_dir, exist_ok=True)

    video_path = os.path.join(video_dir, "color.mp4")

    # ==========================
    # RealSense 설정
    # ==========================
    pipeline = rs.pipeline()
    config = rs.config()
    config.enable_stream(rs.stream.color, WIDTH, HEIGHT, rs.format.bgr8, FPS)

    profile = pipeline.start(config)

    # ==========================
    # VideoWriter 설정
    # ==========================
    fourcc = cv2.VideoWriter_fourcc(*"mp4v")
    writer = cv2.VideoWriter(video_path, fourcc, FPS, (WIDTH, HEIGHT))

    if not writer.isOpened():
        pipeline.stop()
        raise RuntimeError("❌ VideoWriter 열기 실패 (codec/경로 확인)")

    print("===================================")
    print("[INFO] Recording started")
    print(f"[INFO] Resolution : {WIDTH} x {HEIGHT}")
    print(f"[INFO] FPS        : {FPS}")
    print(f"[INFO] Video path : {video_path}")
    print(f"[INFO] Image path : {image_dir}")
    print("[INFO] Press Ctrl+C to stop")
    print("===================================")

    frame_idx = 0
    saved_img_idx = 0

    try:
        while True:
            frames = pipeline.wait_for_frames()
            color_frame = frames.get_color_frame()
            if not color_frame:
                continue

            color = np.asanyarray(color_frame.get_data())

            # --------------------------
            # 1) 영상 저장
            # --------------------------
            writer.write(color)

            # --------------------------
            # 2) 이미지 저장
            # --------------------------
            if frame_idx % SAVE_EVERY_N_FRAME == 0:
                img_name = f"frame_{saved_img_idx:06d}.jpg"
                img_path = os.path.join(image_dir, img_name)
                cv2.imwrite(img_path, color)
                saved_img_idx += 1

            frame_idx += 1

            # 상태 출력 (너무 시끄럽지 않게)
            if frame_idx % 30 == 0:
                print(f"[REC] frames={frame_idx} | images={saved_img_idx}", end="\r")

    except KeyboardInterrupt:
        print("\n[INFO] Recording stopped by user.")

    finally:
        writer.release()
        pipeline.stop()
        print("===================================")
        print("[INFO] Saved results")
        print(f"[INFO] Total frames : {frame_idx}")
        print(f"[INFO] Saved images : {saved_img_idx}")
        print("===================================")


if __name__ == "__main__":
    main()


[INFO] Recording started
[INFO] Resolution : 1280 x 720
[INFO] FPS        : 30
[INFO] Video path : recordings/20251218_131935/video/color.mp4
[INFO] Image path : recordings/20251218_131935/images
[INFO] Press Ctrl+C to stop
[REC] frames=270 | images=270
[INFO] Recording stopped by user.
[INFO] Saved results
[INFO] Total frames : 289
[INFO] Saved images : 289


In [None]:
import os
import json
import shutil
import re

# =========================
# 설정
# =========================
TARGET_DIR = "/home/dw/ws_job_msislab/amr_project/data/image_data/for_yolo_obj/20251218"
PREFIXES = [0, 1000000, 2000000]  # 원본 + 2복사 = 총 3배

pattern = re.compile(r"(frame_)(\d+)\.jpg")

files = os.listdir(TARGET_DIR)

count = 0

for jpg in sorted(files):
    if not jpg.endswith(".jpg"):
        continue

    m = pattern.match(jpg)
    if not m:
        continue

    base, num = m.group(1), int(m.group(2))
    json_name = jpg.replace(".jpg", ".json")
    json_path = os.path.join(TARGET_DIR, json_name)

    if not os.path.exists(json_path):
        print(f"[SKIP] JSON 없음: {json_name}")
        continue

    for offset in PREFIXES:
        new_num = num + offset
        new_jpg = f"{base}{new_num:06d}.jpg"
        new_json = f"{base}{new_num:06d}.json"

        new_jpg_path = os.path.join(TARGET_DIR, new_jpg)
        new_json_path = os.path.join(TARGET_DIR, new_json)

        # 1) 이미지 복사
        if not os.path.exists(new_jpg_path):
            shutil.copy(
                os.path.join(TARGET_DIR, jpg),
                new_jpg_path
            )

        # 2) JSON 복사 + imagePath 수정
        if not os.path.exists(new_json_path):
            with open(json_path, "r") as f:
                data = json.load(f)

            data["imagePath"] = new_jpg  # ⭐ 핵심

            with open(new_json_path, "w") as f:
                json.dump(data, f, indent=2)

        count += 1

print(f"✅ 완료: 총 {count}개 파일 세트 처리됨")


✅ 완료: 총 27개 파일 세트 처리됨


### 1. yolov11-obb 박스 확인하기


### 일단 잡히긴 하는데 계속 박스크기가 달라지는 문제가 발생

In [None]:
import cv2
import numpy as np
import pyrealsense2 as rs
from ultralytics import YOLO
import time

MODEL_PATH = "/home/dw/ws_job_msislab/amr_project/src/runs/obb/smoke_test_v1/weights/best.pt"

def draw_obb_polygon(img, poly, color=(0, 255, 0), thickness=2):
    """
    poly: (4,2) float/np array (xyxyxyxy -> reshape(4,2))
    """
    pts = poly.reshape(-1, 1, 2).astype(np.int32)
    cv2.polylines(img, [pts], isClosed=True, color=color, thickness=thickness)

def main():
    # -------------------------
    # 1) YOLO OBB 모델 로드
    # -------------------------
    model = YOLO(MODEL_PATH)
    print("[INFO] Model loaded.")
    print("[INFO] Classes:", model.names)

    # -------------------------
    # 2) RealSense 컬러 스트림
    # -------------------------
    pipeline = rs.pipeline()
    config = rs.config()

    width, height, fps = 1280, 720, 30
    config.enable_stream(rs.stream.color, width, height, rs.format.bgr8, fps)

    pipeline.start(config)
    print("[INFO] RealSense started.")
    print("[INFO] Press 'q' to quit.")

    prev_t = time.time()

    try:
        while True:
            frames = pipeline.wait_for_frames()
            color_frame = frames.get_color_frame()
            if not color_frame:
                continue

            frame = np.asanyarray(color_frame.get_data())

            # -------------------------
            # 3) YOLO 추론 (OBB)
            # -------------------------
            results = model.predict(frame, imgsz=640, conf=0.25, verbose=False)
            r = results[0]

            # FPS 표시
            now = time.time()
            fps_show = 1.0 / max(1e-6, (now - prev_t))
            prev_t = now
            cv2.putText(frame, f"FPS: {fps_show:.1f}", (10, 30),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255, 255, 255), 2)

            # -------------------------
            # 4) OBB 결과 그리기
            # -------------------------
            if getattr(r, "obb", None) is not None and r.obb is not None:
                obb = r.obb

                # xyxyxyxy: (N, 8) -> 각 박스 4점 (x1,y1,...,x4,y4)
                if obb.xyxyxyxy is not None and len(obb.xyxyxyxy) > 0:
                    polys = obb.xyxyxyxy.cpu().numpy()   # (N, 8)
                    confs = obb.conf.cpu().numpy()       # (N,)
                    clss  = obb.cls.cpu().numpy().astype(int)  # (N,)

                    for poly8, cf, ci in zip(polys, confs, clss):
                        poly = poly8.reshape(4, 2)
                        draw_obb_polygon(frame, poly)

                        # 라벨: 박스 첫 점 근처에 표시
                        x0, y0 = int(poly[0, 0]), int(poly[0, 1])
                        name = model.names.get(ci, str(ci))
                        cv2.putText(frame, f"{name} {cf:.2f}", (x0, y0 - 5),
                                    cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)
                else:
                    cv2.putText(frame, "No OBB detected", (10, 60),
                                cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2)
            else:
                cv2.putText(frame, "Result has no OBB output", (10, 60),
                            cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2)

            cv2.imshow("D405 + YOLO OBB Test", frame)
            key = cv2.waitKey(1) & 0xFF
            if key == ord('q'):
                break

    finally:
        pipeline.stop()
        cv2.destroyAllWindows()

if __name__ == "__main__":
    main()


[INFO] Model loaded.
[INFO] Classes: {0: 'box'}
[INFO] RealSense started.
[INFO] Press 'q' to quit.


### 2. conf도 올리고 IOU도 올리자

In [None]:
import cv2
import numpy as np
import pyrealsense2 as rs
from ultralytics import YOLO
import time

MODEL_PATH = "/home/dw/ws_job_msislab/amr_project/src/runs/obb/smoke_test_v2/weights/best.pt"

CONF_THRES = 0.75   # ✅ 0.85 이상만
IOU_THRES  = 0.75   # ✅ NMS IoU 올림(0.7~0.85 사이 추천)
IMGSZ      = 960    # ✅ 640보다 안정적(필요하면 1280)

def draw_obb_polygon(img, poly4x2, thickness=2):
    pts = poly4x2.reshape(-1, 1, 2).astype(np.int32)
    cv2.polylines(img, [pts], isClosed=True, color=(0, 255, 0), thickness=thickness)

def main():
    # -------------------------
    # 1) YOLO OBB 모델 로드
    # -------------------------
    model = YOLO(MODEL_PATH)
    print("[INFO] Model loaded:", MODEL_PATH)
    print("[INFO] Classes:", model.names)
    print(f"[INFO] conf>={CONF_THRES}, iou={IOU_THRES}, imgsz={IMGSZ}")

    # -------------------------
    # 2) RealSense Color 스트림
    # -------------------------
    pipeline = rs.pipeline()
    config = rs.config()

    width, height, fps = 1280, 720, 30
    config.enable_stream(rs.stream.color, width, height, rs.format.bgr8, fps)
    pipeline.start(config)

    cv2.namedWindow("D405 + YOLO11-OBB (conf>=0.85)", cv2.WINDOW_NORMAL)
    cv2.resizeWindow("D405 + YOLO11-OBB (conf>=0.85)", width, height)

    prev_t = time.time()

    try:
        while True:
            frames = pipeline.wait_for_frames()
            color_frame = frames.get_color_frame()
            if not color_frame:
                continue

            frame = np.asanyarray(color_frame.get_data())

            # -------------------------
            # 3) YOLO 추론 (OBB)
            # -------------------------
            results = model.predict(
                frame,
                imgsz=IMGSZ,
                conf=CONF_THRES,   # ✅ 여기서 1차 필터링
                iou=IOU_THRES,     # ✅ NMS IoU
                verbose=False
            )
            r = results[0]

            # FPS 표시
            now = time.time()
            fps_show = 1.0 / max(1e-6, (now - prev_t))
            prev_t = now
            cv2.putText(frame, f"FPS: {fps_show:.1f}", (10, 30),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255, 255, 255), 2)

            det_count = 0

            # -------------------------
            # 4) OBB 결과 그리기 (conf>=0.85만)
            # -------------------------
            if getattr(r, "obb", None) is not None and r.obb is not None:
                obb = r.obb

                if obb.xyxyxyxy is not None and len(obb.xyxyxyxy) > 0:
                    polys = obb.xyxyxyxy.cpu().numpy()             # (N, 8)
                    confs = obb.conf.cpu().numpy().astype(float)   # (N,)
                    clss  = obb.cls.cpu().numpy().astype(int)      # (N,)

                    # predict(conf=...)로 이미 걸러지지만, 혹시 몰라 한번 더 안전 필터
                    keep = confs >= CONF_THRES

                    for poly8, cf, ci in zip(polys[keep], confs[keep], clss[keep]):
                        poly = poly8.reshape(4, 2)
                        draw_obb_polygon(frame, poly)

                        x0, y0 = int(poly[0, 0]), int(poly[0, 1])
                        name = model.names.get(ci, str(ci))
                        cv2.putText(frame, f"{name} {cf:.2f}", (x0, y0 - 5),
                                    cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)
                        det_count += 1

            cv2.putText(frame, f"Detections: {det_count}", (10, 60),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.8,
                        (0, 255, 0) if det_count > 0 else (0, 0, 255), 2)

            cv2.imshow("D405 + YOLO11-OBB (conf>=0.85)", frame)
            key = cv2.waitKey(1) & 0xFF
            if key == ord("q"):
                break

    finally:
        pipeline.stop()
        cv2.destroyAllWindows()

if __name__ == "__main__":
    main()


[INFO] Model loaded: /home/dw/ws_job_msislab/amr_project/src/runs/obb/smoke_test_v2/weights/best.pt
[INFO] Classes: {0: 'box'}
[INFO] conf>=0.75, iou=0.75, imgsz=960


### 3. 카메라와 박스의 거리를 측정 (depth)

In [1]:
import cv2
import numpy as np
import pyrealsense2 as rs
from ultralytics import YOLO
import time

MODEL_PATH = "/home/dw/ws_job_msislab/amr_project/src/runs/obb/smoke_test_v2/weights/best.pt"

CONF_THRES = 0.85
IOU_THRES  = 0.75
IMGSZ      = 640

DEPTH_WIN = 7  # depth=0이면 주변 median으로 보정

def draw_obb_polygon(img, poly4x2, thickness=2):
    pts = poly4x2.reshape(-1, 1, 2).astype(np.int32)
    cv2.polylines(img, [pts], isClosed=True, color=(0, 255, 0), thickness=thickness)

def clamp(v, lo, hi):
    return max(lo, min(hi, v))

def get_depth_median_around(depth_frame, cx, cy, win=7):
    half = win // 2
    ds = []
    for dy in range(-half, half + 1):
        for dx in range(-half, half + 1):
            x = cx + dx
            y = cy + dy
            try:
                d = depth_frame.get_distance(x, y)  # meters
            except:
                d = 0.0
            if d > 0.0:
                ds.append(d)
    if not ds:
        return 0.0
    return float(np.median(ds))

def obb_angle_deg_upright0_rightplus(poly4x2: np.ndarray) -> float:
    """
    목표:
      - 박스가 화면에서 '세워져 있으면' 0도
      - 화면에서 오른쪽으로 기울면 +, 왼쪽으로 기울면 -
      - 범위: -90 ~ +90 근처로 자연스럽게 나옴

    방법:
      - OBB 4점 PCA로 장축 방향 벡터 v=(vx,vy) 구함
      - 벡터 방향을 항상 아래(+y) 향하도록 통일 (vy>=0 되게)
      - angle_raw = atan2(vx, vy) : 세로축(아래방향) 기준 기울기
      - 마지막에 화면 기준 부호가 반대면 -angle 처리
    """
    p = poly4x2.astype(np.float32)

    # 중심화
    c = p.mean(axis=0, keepdims=True)
    q = p - c

    # PCA (2x2)
    cov = np.cov(q.T)
    eigvals, eigvecs = np.linalg.eig(cov)

    # 가장 큰 고유값의 고유벡터 = 장축
    v = eigvecs[:, np.argmax(eigvals)].astype(np.float32)
    vx, vy = float(v[0]), float(v[1])

    # 방향 통일: 아래(+y)로 향하게 (vy >= 0)
    if vy < 0:
        vx, vy = -vx, -vy

    # 세워짐(아래방향) 기준 각도: 0이 upright
    angle = float(np.degrees(np.arctan2(vx, vy)))  # -90~+90 근처

    # ✅ 너 화면 기준에서 부호가 반대라 했으니 뒤집기
    angle = -angle

    return angle

def main():
    model = YOLO(MODEL_PATH)
    print("[INFO] Model loaded:", MODEL_PATH)
    print("[INFO] Classes:", model.names)
    print(f"[INFO] conf>={CONF_THRES}, iou={IOU_THRES}, imgsz={IMGSZ}")

    pipeline = rs.pipeline()
    config = rs.config()

    width, height, fps = 640, 480, 30
    config.enable_stream(rs.stream.color, width, height, rs.format.bgr8, fps)
    config.enable_stream(rs.stream.depth, width, height, rs.format.z16, fps)

    pipeline.start(config)
    align = rs.align(rs.stream.color)

    win_name = "D405 640x480 + YOLO11-OBB + XYZ + Angle(upright=0)"
    cv2.namedWindow(win_name, cv2.WINDOW_NORMAL)
    cv2.resizeWindow(win_name, width, height)

    prev_t = time.time()

    try:
        while True:
            frames = pipeline.wait_for_frames()
            frames = align.process(frames)

            color_frame = frames.get_color_frame()
            depth_frame = frames.get_depth_frame()
            if not color_frame or not depth_frame:
                continue

            frame = np.asanyarray(color_frame.get_data())
            intr = color_frame.profile.as_video_stream_profile().get_intrinsics()

            # YOLO OBB
            results = model.predict(
                frame,
                imgsz=IMGSZ,
                conf=CONF_THRES,
                iou=IOU_THRES,
                verbose=False
            )
            r = results[0]

            # FPS
            now = time.time()
            fps_show = 1.0 / max(1e-6, (now - prev_t))
            prev_t = now
            cv2.putText(frame, f"FPS: {fps_show:.1f}", (10, 25),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)

            det_count = 0
            best = None  # (conf, cls, poly4x2)

            # Draw + pick best
            if getattr(r, "obb", None) is not None and r.obb is not None:
                obb = r.obb
                if obb.xyxyxyxy is not None and len(obb.xyxyxyxy) > 0:
                    polys = obb.xyxyxyxy.cpu().numpy()
                    confs = obb.conf.cpu().numpy().astype(float)
                    clss  = obb.cls.cpu().numpy().astype(int)

                    keep = confs >= CONF_THRES
                    for poly8, cf, ci in zip(polys[keep], confs[keep], clss[keep]):
                        poly = poly8.reshape(4, 2)
                        draw_obb_polygon(frame, poly)

                        x0, y0 = int(poly[0, 0]), int(poly[0, 1])
                        name = model.names.get(ci, str(ci))
                        cv2.putText(frame, f"{name} {cf:.2f}", (x0, y0 - 5),
                                    cv2.FONT_HERSHEY_SIMPLEX, 0.55, (0, 255, 0), 2)

                        det_count += 1
                        if best is None or cf > best[0]:
                            best = (cf, ci, poly)

            cv2.putText(frame, f"Detections: {det_count}", (10, 50),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.7,
                        (0, 255, 0) if det_count > 0 else (0, 0, 255), 2)

            # Best 1개만 숫자 표시
            if best is not None:
                cf, ci, poly = best
                name = model.names.get(ci, str(ci))

                # 중심 픽셀
                cx = int(np.mean(poly[:, 0]))
                cy = int(np.mean(poly[:, 1]))
                cx = clamp(cx, 0, width - 1)
                cy = clamp(cy, 0, height - 1)
                cv2.circle(frame, (cx, cy), 4, (0, 255, 255), -1)

                # ✅ 각도: upright=0 / 화면 기준 right + / left -
                angle_deg = obb_angle_deg_upright0_rightplus(poly)

                # depth
                depth_m = depth_frame.get_distance(cx, cy)
                if depth_m <= 0.0:
                    depth_m = get_depth_median_around(depth_frame, cx, cy, win=DEPTH_WIN)

                if depth_m > 0.0:
                    X, Y, Z = rs.rs2_deproject_pixel_to_point(intr, [cx, cy], depth_m)
                    dist = float(np.sqrt(X*X + Y*Y + Z*Z))

                    cv2.putText(frame, f"[BEST] {name} conf={cf:.2f}", (10, 80),
                                cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255,255,255), 2)
                    cv2.putText(frame, f"center(px)=({cx},{cy})  angle={angle_deg:+.2f} deg (upright=0, R+, L-)",
                                (10, 105), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0,255,255), 2)
                    cv2.putText(frame, f"cam XYZ=({X:+.3f},{Y:+.3f},{Z:+.3f}) m",
                                (10, 130), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0,255,255), 2)
                    cv2.putText(frame, f"Z={Z:.3f} m   dist={dist:.3f} m",
                                (10, 155), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0,255,255), 2)
                else:
                    cv2.putText(frame, f"[BEST] {name} conf={cf:.2f} (Depth invalid=0)",
                                (10, 95), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0,0,255), 2)

            cv2.imshow(win_name, frame)
            key = cv2.waitKey(1) & 0xFF
            if key == ord("q"):
                break

    finally:
        pipeline.stop()
        cv2.destroyAllWindows()

if __name__ == "__main__":
    main()



[INFO] Model loaded: /home/dw/ws_job_msislab/amr_project/src/runs/obb/smoke_test_v2/weights/best.pt
[INFO] Classes: {0: 'box'}
[INFO] conf>=0.85, iou=0.75, imgsz=640


### 4. 10프레임으로 평균 계산하기 (한번 값 얻고 종료)

In [2]:
import numpy as np
import pyrealsense2 as rs
from ultralytics import YOLO
import time
from collections import deque

MODEL_PATH = "/home/dw/ws_job_msislab/amr_project/src/runs/obb/smoke_test_v2/weights/best.pt"

CONF_THRES = 0.85
IOU_THRES  = 0.75
IMGSZ      = 640

DEPTH_WIN = 7
AVG_N = 10
TIMEOUT_SEC = 20.0  # ✅ 20초 안에 10프레임 못 모으면 종료

def clamp(v, lo, hi):
    return max(lo, min(hi, v))

def get_depth_median_around(depth_frame, cx, cy, win=7):
    half = win // 2
    ds = []
    for dy in range(-half, half + 1):
        for dx in range(-half, half + 1):
            x = cx + dx
            y = cy + dy
            try:
                d = depth_frame.get_distance(x, y)
            except:
                d = 0.0
            if d > 0.0:
                ds.append(d)
    if not ds:
        return 0.0
    return float(np.median(ds))

def obb_angle_deg_upright0_rightplus(poly4x2: np.ndarray) -> float:
    """
    upright=0, right tilt +, left tilt -
    (PCA 장축 기반, 화면 기준 부호 보정 포함)
    """
    p = poly4x2.astype(np.float32)
    c = p.mean(axis=0, keepdims=True)
    q = p - c

    cov = np.cov(q.T)
    eigvals, eigvecs = np.linalg.eig(cov)
    v = eigvecs[:, np.argmax(eigvals)].astype(np.float32)

    vx, vy = float(v[0]), float(v[1])
    if vy < 0:
        vx, vy = -vx, -vy

    angle = float(np.degrees(np.arctan2(vx, vy)))  # -90~+90 근처
    angle = -angle  # ✅ 화면 기준 부호 반대면 뒤집기
    return angle

def main():
    model = YOLO(MODEL_PATH)
    print("[INFO] Model loaded:", MODEL_PATH)
    print(f"[INFO] conf>={CONF_THRES}, iou={IOU_THRES}, imgsz={IMGSZ}")
    print(f"[INFO] Collecting {AVG_N} valid frames then exit... (timeout {TIMEOUT_SEC}s)\n")

    pipeline = rs.pipeline()
    config = rs.config()

    width, height, fps = 640, 480, 30
    config.enable_stream(rs.stream.color, width, height, rs.format.bgr8, fps)
    config.enable_stream(rs.stream.depth, width, height, rs.format.z16, fps)

    pipeline.start(config)
    align = rs.align(rs.stream.color)

    buf = deque(maxlen=AVG_N)  # (X,Y,Z,dist,angle)

    t0 = time.time()

    try:
        while True:
            if time.time() - t0 > TIMEOUT_SEC:
                print("[FAIL] Timeout: not enough valid frames.")
                break

            frames = pipeline.wait_for_frames()
            frames = align.process(frames)

            color_frame = frames.get_color_frame()
            depth_frame = frames.get_depth_frame()
            if not color_frame or not depth_frame:
                continue

            frame = np.asanyarray(color_frame.get_data())
            intr = color_frame.profile.as_video_stream_profile().get_intrinsics()

            # YOLO OBB 추론
            results = model.predict(
                frame,
                imgsz=IMGSZ,
                conf=CONF_THRES,
                iou=IOU_THRES,
                verbose=False
            )
            r = results[0]

            # best 1개 선택
            best = None  # (conf, cls, poly4x2)
            if getattr(r, "obb", None) is not None and r.obb is not None:
                obb = r.obb
                if obb.xyxyxyxy is not None and len(obb.xyxyxyxy) > 0:
                    polys = obb.xyxyxyxy.cpu().numpy()
                    confs = obb.conf.cpu().numpy().astype(float)
                    clss  = obb.cls.cpu().numpy().astype(int)

                    keep = confs >= CONF_THRES
                    for poly8, cf, ci in zip(polys[keep], confs[keep], clss[keep]):
                        if best is None or cf > best[0]:
                            best = (cf, ci, poly8.reshape(4, 2))

            if best is None:
                continue

            cf, ci, poly = best

            # center px
            cx = int(np.mean(poly[:, 0]))
            cy = int(np.mean(poly[:, 1]))
            cx = clamp(cx, 0, width - 1)
            cy = clamp(cy, 0, height - 1)

            # depth
            depth_m = depth_frame.get_distance(cx, cy)
            if depth_m <= 0.0:
                depth_m = get_depth_median_around(depth_frame, cx, cy, win=DEPTH_WIN)
            if depth_m <= 0.0:
                continue

            # deproject
            X, Y, Z = rs.rs2_deproject_pixel_to_point(intr, [cx, cy], depth_m)
            dist = float(np.sqrt(X*X + Y*Y + Z*Z))
            angle = obb_angle_deg_upright0_rightplus(poly)

            buf.append((float(X), float(Y), float(Z), float(dist), float(angle)))

            print(f"[{len(buf)}/{AVG_N}] conf={cf:.2f}  center=({cx},{cy})  XYZ=({X:+.3f},{Y:+.3f},{Z:+.3f})  angle={angle:+.2f}")

            if len(buf) == AVG_N:
                arr = np.array(buf, dtype=np.float32)
                mx, my, mz, md, ma = arr.mean(axis=0)

                print("\n========== RESULT (AVERAGE) ==========")
                print(f"count         : {AVG_N}")
                print(f"cam XYZ avg   : ({mx:+.4f}, {my:+.4f}, {mz:+.4f}) m")
                print(f"dist_avg      : {md:.4f} m")
                print(f"angle_avg(deg): {ma:+.2f}   (upright=0, right +, left -)")
                print("======================================")
                break

    finally:
        pipeline.stop()

if __name__ == "__main__":
    main()


[INFO] Model loaded: /home/dw/ws_job_msislab/amr_project/src/runs/obb/smoke_test_v2/weights/best.pt
[INFO] conf>=0.85, iou=0.75, imgsz=640
[INFO] Collecting 10 valid frames then exit... (timeout 20.0s)

[1/10] conf=0.95  center=(288,161)  XYZ=(-0.038,-0.099,+0.482)  angle=+0.88
[2/10] conf=0.94  center=(288,163)  XYZ=(-0.040,-0.100,+0.499)  angle=+0.27
[3/10] conf=0.94  center=(288,161)  XYZ=(-0.038,-0.097,+0.475)  angle=+0.74
[4/10] conf=0.94  center=(288,161)  XYZ=(-0.036,-0.092,+0.451)  angle=+0.29
[5/10] conf=0.94  center=(288,160)  XYZ=(-0.038,-0.099,+0.479)  angle=+0.77
[6/10] conf=0.94  center=(288,163)  XYZ=(-0.037,-0.093,+0.465)  angle=+0.47
[7/10] conf=0.94  center=(288,163)  XYZ=(-0.039,-0.098,+0.490)  angle=+0.41
[8/10] conf=0.94  center=(288,163)  XYZ=(-0.038,-0.096,+0.480)  angle=+0.31
[9/10] conf=0.94  center=(288,163)  XYZ=(-0.038,-0.094,+0.471)  angle=+0.30
[10/10] conf=0.94  center=(288,162)  XYZ=(-0.038,-0.095,+0.471)  angle=+0.32

count         : 10
cam XYZ avg   : 

In [4]:
import numpy as np
import pyrealsense2 as rs
from ultralytics import YOLO
import time
from collections import deque

MODEL_PATH = "/home/dw/ws_job_msislab/amr_project/src/runs/obb/smoke_test_v2/weights/best.pt"

CONF_THRES = 0.85
IOU_THRES  = 0.75
IMGSZ      = 640

DEPTH_WIN = 7
AVG_N = 10
TIMEOUT_SEC = 20.0  # 20초 안에 10프레임 못 모으면 종료

# ✅ 너가 말한 오프셋만 적용 (그리퍼 = 카메라 기준)
#   y는 -0.07, z는 -0.18
OFF_X = 0.0
OFF_Y = -0.07
OFF_Z = -0.18

def clamp(v, lo, hi):
    return max(lo, min(hi, v))

def get_depth_median_around(depth_frame, cx, cy, win=7):
    half = win // 2
    ds = []
    for dy in range(-half, half + 1):
        for dx in range(-half, half + 1):
            x = cx + dx
            y = cy + dy
            try:
                d = depth_frame.get_distance(x, y)
            except:
                d = 0.0
            if d > 0.0:
                ds.append(d)
    if not ds:
        return 0.0
    return float(np.median(ds))

def obb_angle_deg_upright0_rightplus(poly4x2: np.ndarray) -> float:
    """
    upright=0, right tilt +, left tilt -
    (PCA 장축 기반, 화면 기준 부호 보정 포함)
    """
    p = poly4x2.astype(np.float32)
    c = p.mean(axis=0, keepdims=True)
    q = p - c

    cov = np.cov(q.T)
    eigvals, eigvecs = np.linalg.eig(cov)
    v = eigvecs[:, np.argmax(eigvals)].astype(np.float32)

    vx, vy = float(v[0]), float(v[1])
    if vy < 0:
        vx, vy = -vx, -vy

    angle = float(np.degrees(np.arctan2(vx, vy)))  # -90~+90 근처
    angle = -angle  # 화면 기준 부호 보정
    return angle

def main():
    model = YOLO(MODEL_PATH)
    print("[INFO] Model loaded:", MODEL_PATH)
    print(f"[INFO] conf>={CONF_THRES}, iou={IOU_THRES}, imgsz={IMGSZ}")
    print(f"[INFO] Collecting {AVG_N} valid frames then exit... (timeout {TIMEOUT_SEC}s)")
    print(f"[INFO] Offset applied after average: X{OFF_X:+.3f}, Y{OFF_Y:+.3f}, Z{OFF_Z:+.3f} (m)\n")

    pipeline = rs.pipeline()
    config = rs.config()

    width, height, fps = 640, 480, 30
    config.enable_stream(rs.stream.color, width, height, rs.format.bgr8, fps)
    config.enable_stream(rs.stream.depth, width, height, rs.format.z16, fps)

    pipeline.start(config)
    align = rs.align(rs.stream.color)

    buf = deque(maxlen=AVG_N)  # (X,Y,Z,dist,angle)
    t0 = time.time()

    try:
        while True:
            if time.time() - t0 > TIMEOUT_SEC:
                print("[FAIL] Timeout: not enough valid frames.")
                break

            frames = pipeline.wait_for_frames()
            frames = align.process(frames)

            color_frame = frames.get_color_frame()
            depth_frame = frames.get_depth_frame()
            if not color_frame or not depth_frame:
                continue

            frame = np.asanyarray(color_frame.get_data())
            intr = color_frame.profile.as_video_stream_profile().get_intrinsics()

            # YOLO OBB 추론
            results = model.predict(
                frame,
                imgsz=IMGSZ,
                conf=CONF_THRES,
                iou=IOU_THRES,
                verbose=False
            )
            r = results[0]

            # best 1개 선택
            best = None  # (conf, cls, poly4x2)
            if getattr(r, "obb", None) is not None and r.obb is not None:
                obb = r.obb
                if obb.xyxyxyxy is not None and len(obb.xyxyxyxy) > 0:
                    polys = obb.xyxyxyxy.cpu().numpy()
                    confs = obb.conf.cpu().numpy().astype(float)
                    clss  = obb.cls.cpu().numpy().astype(int)

                    keep = confs >= CONF_THRES
                    for poly8, cf, ci in zip(polys[keep], confs[keep], clss[keep]):
                        if best is None or cf > best[0]:
                            best = (cf, ci, poly8.reshape(4, 2))

            if best is None:
                continue

            cf, ci, poly = best

            # center px
            cx = int(np.mean(poly[:, 0]))
            cy = int(np.mean(poly[:, 1]))
            cx = clamp(cx, 0, width - 1)
            cy = clamp(cy, 0, height - 1)

            # depth
            depth_m = depth_frame.get_distance(cx, cy)
            if depth_m <= 0.0:
                depth_m = get_depth_median_around(depth_frame, cx, cy, win=DEPTH_WIN)
            if depth_m <= 0.0:
                continue

            # deproject
            X, Y, Z = rs.rs2_deproject_pixel_to_point(intr, [cx, cy], depth_m)
            dist = float(np.sqrt(X*X + Y*Y + Z*Z))
            angle = obb_angle_deg_upright0_rightplus(poly)

            buf.append((float(X), float(Y), float(Z), float(dist), float(angle)))
            print(f"[{len(buf)}/{AVG_N}] conf={cf:.2f} center=({cx},{cy}) XYZ=({X:+.3f},{Y:+.3f},{Z:+.3f}) angle={angle:+.2f}")

            if len(buf) == AVG_N:
                arr = np.array(buf, dtype=np.float32)
                mx, my, mz, md, ma = arr.mean(axis=0)

                print("\n========== RESULT (AVERAGE) ==========")
                print(f"count         : {AVG_N}")
                print(f"cam XYZ avg   : ({mx:+.4f}, {my:+.4f}, {mz:+.4f}) m")
                print(f"dist_avg      : {md:.4f} m")
                print(f"angle_avg(deg): {ma:+.2f}   (upright=0, right +, left -)")
                print("======================================")

                # ✅ 오프셋 적용 결과(그리퍼 기준)
                gx = mx + OFF_X
                gy = my + OFF_Y
                gz = mz + OFF_Z
                gdist = float(np.sqrt(gx*gx + gy*gy + gz*gz))

                print("\n========== RESULT (OFFSET) ==========")
                print(f"offset applied: X{OFF_X:+.3f}  Y{OFF_Y:+.3f}  Z{OFF_Z:+.3f}  (m)")
                print(f"grip XYZ      : ({gx:+.4f}, {gy:+.4f}, {gz:+.4f}) m")
                print(f"dist_offset   : {gdist:.4f} m")
                print(f"angle_avg(deg): {ma:+.2f}   (same angle)")
                print("=====================================")
                break

    finally:
        pipeline.stop()

if __name__ == "__main__":
    main()


[INFO] Model loaded: /home/dw/ws_job_msislab/amr_project/src/runs/obb/smoke_test_v2/weights/best.pt
[INFO] conf>=0.85, iou=0.75, imgsz=640
[INFO] Collecting 10 valid frames then exit... (timeout 20.0s)
[INFO] Offset applied after average: X+0.000, Y-0.070, Z-0.180 (m)

[1/10] conf=0.94 center=(291,165) XYZ=(-0.037,-0.099,+0.507) angle=+1.42
[2/10] conf=0.94 center=(291,166) XYZ=(-0.036,-0.097,+0.505) angle=+1.55
[3/10] conf=0.94 center=(291,164) XYZ=(-0.035,-0.095,+0.480) angle=+1.41
[4/10] conf=0.94 center=(291,164) XYZ=(-0.036,-0.098,+0.496) angle=+1.48
[5/10] conf=0.94 center=(291,166) XYZ=(-0.034,-0.092,+0.479) angle=+1.63
[6/10] conf=0.94 center=(291,165) XYZ=(-0.035,-0.094,+0.485) angle=+1.46
[7/10] conf=0.94 center=(291,165) XYZ=(-0.035,-0.093,+0.480) angle=+1.54
[8/10] conf=0.94 center=(291,165) XYZ=(-0.034,-0.091,+0.467) angle=+1.53
[9/10] conf=0.94 center=(291,166) XYZ=(-0.036,-0.095,+0.493) angle=+1.61
[10/10] conf=0.94 center=(291,164) XYZ=(-0.034,-0.094,+0.477) angle=+1.55

In [19]:
import numpy as np
import pyrealsense2 as rs
from ultralytics import YOLO
import time
from collections import deque

MODEL_PATH = "/home/dw/ws_job_msislab/amr_project/src/runs/obb/smoke_test_v2/weights/best.pt"

CONF_THRES = 0.85
IOU_THRES  = 0.75
IMGSZ      = 640

DEPTH_WIN = 7
AVG_N = 10
TIMEOUT_SEC = 20.0

# ✅ 오프셋 (m)
OFF_X = 0.0
OFF_Y = -0.07
OFF_Z = -0.18

def clamp(v, lo, hi):
    return max(lo, min(hi, v))

def get_depth_median_around(depth_frame, cx, cy, win=7):
    half = win // 2
    ds = []
    for dy in range(-half, half + 1):
        for dx in range(-half, half + 1):
            x = cx + dx
            y = cy + dy
            try:
                d = depth_frame.get_distance(x, y)
            except:
                d = 0.0
            if d > 0.0:
                ds.append(d)
    if not ds:
        return 0.0
    return float(np.median(ds))

def obb_angle_deg_upright0_rightplus(poly4x2: np.ndarray) -> float:
    p = poly4x2.astype(np.float32)
    c = p.mean(axis=0, keepdims=True)
    q = p - c

    cov = np.cov(q.T)
    eigvals, eigvecs = np.linalg.eig(cov)
    v = eigvecs[:, np.argmax(eigvals)].astype(np.float32)

    vx, vy = float(v[0]), float(v[1])
    if vy < 0:
        vx, vy = -vx, -vy

    angle = float(np.degrees(np.arctan2(vx, vy)))
    angle = -angle
    return angle

def main():
    model = YOLO(MODEL_PATH)
    print("[INFO] Model loaded:", MODEL_PATH)
    print(f"[INFO] conf>={CONF_THRES}, iou={IOU_THRES}, imgsz={IMGSZ}")
    print(f"[INFO] Collecting {AVG_N} valid frames then exit... (timeout {TIMEOUT_SEC}s)")
    print(f"[INFO] Offset applied after average (cm): X{OFF_X*100:+.1f}, Y{OFF_Y*100:+.1f}, Z{OFF_Z*100:+.1f}\n")

    pipeline = rs.pipeline()
    config = rs.config()

    width, height, fps = 640, 480, 30
    config.enable_stream(rs.stream.color, width, height, rs.format.bgr8, fps)
    config.enable_stream(rs.stream.depth, width, height, rs.format.z16, fps)

    pipeline.start(config)
    align = rs.align(rs.stream.color)

    buf = deque(maxlen=AVG_N)  # (X,Y,Z,dist,angle)
    t0 = time.time()

    try:
        while True:
            if time.time() - t0 > TIMEOUT_SEC:
                print("[FAIL] Timeout: not enough valid frames.")
                break

            frames = pipeline.wait_for_frames()
            frames = align.process(frames)

            color_frame = frames.get_color_frame()
            depth_frame = frames.get_depth_frame()
            if not color_frame or not depth_frame:
                continue

            frame = np.asanyarray(color_frame.get_data())
            intr = color_frame.profile.as_video_stream_profile().get_intrinsics()

            results = model.predict(
                frame,
                imgsz=IMGSZ,
                conf=CONF_THRES,
                iou=IOU_THRES,
                verbose=False
            )
            r = results[0]

            best = None  # (conf, cls, poly4x2)
            if getattr(r, "obb", None) is not None and r.obb is not None:
                obb = r.obb
                if obb.xyxyxyxy is not None and len(obb.xyxyxyxy) > 0:
                    polys = obb.xyxyxyxy.cpu().numpy()
                    confs = obb.conf.cpu().numpy().astype(float)
                    clss  = obb.cls.cpu().numpy().astype(int)

                    keep = confs >= CONF_THRES
                    for poly8, cf, ci in zip(polys[keep], confs[keep], clss[keep]):
                        if best is None or cf > best[0]:
                            best = (cf, ci, poly8.reshape(4, 2))

            if best is None:
                continue

            cf, ci, poly = best

            cx = int(np.mean(poly[:, 0]))
            cy = int(np.mean(poly[:, 1]))
            cx = clamp(cx, 0, width - 1)
            cy = clamp(cy, 0, height - 1)

            depth_m = depth_frame.get_distance(cx, cy)
            if depth_m <= 0.0:
                depth_m = get_depth_median_around(depth_frame, cx, cy, win=DEPTH_WIN)
            if depth_m <= 0.0:
                continue

            X, Y, Z = rs.rs2_deproject_pixel_to_point(intr, [cx, cy], depth_m)
            dist = float(np.sqrt(X*X + Y*Y + Z*Z))
            angle = obb_angle_deg_upright0_rightplus(poly)

            buf.append((float(X), float(Y), float(Z), float(dist), float(angle)))

            print(f"[{len(buf)}/{AVG_N}] conf={cf:.2f} center=({cx},{cy}) "
                  f"XYZ=({X*100:+.1f},{Y*100:+.1f},{Z*100:+.1f})cm  dist={dist*100:.1f}cm  angle={angle:+.2f}deg")

            if len(buf) == AVG_N:
                arr = np.array(buf, dtype=np.float32)
                mx, my, mz, md, ma = arr.mean(axis=0)

                print("\n========== RESULT (AVERAGE) ==========")
                print(f"count          : {AVG_N}")
                print(f"cam XYZ avg    : ({mx*100:+.2f}, {my*100:+.2f}, {mz*100:+.2f}) cm")
                print(f"dist_avg       : {md*100:.2f} cm")
                print(f"angle_avg(deg) : {ma:+.2f}   (upright=0, right +, left -)")
                print("======================================")

                gx = mx + OFF_X
                gy = my + OFF_Y
                gz = mz + OFF_Z
                gdist = float(np.sqrt(gx*gx + gy*gy + gz*gz))

                print("\n========== RESULT (OFFSET) ==========")
                print(f"offset applied : (X{OFF_X*100:+.1f}, Y{OFF_Y*100:+.1f}, Z{OFF_Z*100:+.1f}) cm")
                print(f"grip XYZ       : ({gx*100:+.2f}, {gy*100:+.2f}, {gz*100:+.2f}) cm")
                print(f"dist_offset    : {gdist*100:.2f} cm")
                print(f"angle_avg(deg) : {ma:+.2f}   (same angle)")
                print("=====================================")
                break

    finally:
        pipeline.stop()

if __name__ == "__main__":
    main()



[INFO] Model loaded: /home/dw/ws_job_msislab/amr_project/src/runs/obb/smoke_test_v2/weights/best.pt
[INFO] conf>=0.85, iou=0.75, imgsz=640
[INFO] Collecting 10 valid frames then exit... (timeout 20.0s)
[INFO] Offset applied after average (cm): X+0.0, Y-7.0, Z-18.0

[1/10] conf=0.96 center=(348,153) XYZ=(+3.5,-10.6,+47.0)cm  dist=48.4cm  angle=+18.40deg
[2/10] conf=0.96 center=(349,153) XYZ=(+3.5,-10.5,+46.6)cm  dist=47.9cm  angle=+18.80deg
[3/10] conf=0.96 center=(349,153) XYZ=(+3.3,-9.7,+42.9)cm  dist=44.1cm  angle=+18.75deg
[4/10] conf=0.96 center=(349,153) XYZ=(+3.4,-10.2,+45.1)cm  dist=46.4cm  angle=+18.65deg
[5/10] conf=0.96 center=(348,153) XYZ=(+3.3,-10.3,+45.5)cm  dist=46.8cm  angle=+18.60deg
[6/10] conf=0.96 center=(349,153) XYZ=(+3.4,-9.9,+44.1)cm  dist=45.3cm  angle=+18.59deg
[7/10] conf=0.96 center=(349,153) XYZ=(+3.5,-10.4,+46.1)cm  dist=47.4cm  angle=+19.27deg
[8/10] conf=0.96 center=(351,152) XYZ=(+3.8,-10.7,+47.0)cm  dist=48.4cm  angle=+19.91deg
[9/10] conf=0.96 center=