In [1]:
import cv2, numpy as np, glob, os, time, matplotlib.pyplot as plt
import pyrealsense2 as rs
from scipy.spatial.transform import Rotation as R
import rby1_sdk as rb
import json
import math
import time

In [3]:
SAVE_DIR = "calib_data"
IMG_DIR = os.path.join(SAVE_DIR,"images"); os.makedirs(IMG_DIR, exist_ok=True)
os.makedirs(SAVE_DIR, exist_ok=True)
K_FILE = "K_1280x720.npy"
D_FILE = "D_1280x720.npy"
PRESET_FILE = "rs_locked_preset.json"
ADDRESS = "192.168.30.1:50051"
MODEL = "a"
POWER = ".*"
BASE_INDEX, LINK_TORSO_5_INDEX, EE_RIGHT_INDEX, EE_LEFT_INDEX = 0, 1, 2, 3

In [4]:
def get_joint_positions() -> list:
    robot = rb.create_robot(ADDRESS, MODEL)
    robot.connect()
    if not robot.is_connected():
        print("Robot is not connected")
        exit(1)
    if not robot.is_power_on(POWER):
        rv = robot.power_on(POWER)
        if not rv:
            print("Failed to power on")
            exit(1)
    robot.reset_fault_control_manager()
    robot.enable_control_manager()

    robot_state = robot.get_state()
    body = [round(x, 3) for x in robot_state.position[2:8]]
    right_arm = [round(x, 3) for x in robot_state.position[8:15]]
    left_arm = [round(x, 3) for x in robot_state.position[15:22]]
    head = [round(x,3) for x in robot_state.position[22:]]
    result = [
        body,
        right_arm,
        left_arm,
        head
    ]
    return result

def get_flange_homogeneous() -> np.array:
    robot = rb.create_robot(ADDRESS, MODEL)
    model = robot.model()
    robot.connect()
    
    dyn_model = robot.get_dynamics()
    dyn_state = dyn_model.make_state(["base", "link_torso_5", "ee_right", "ee_left"], model.robot_joint_names)
    dyn_state.set_q(robot.get_state().position)
    dyn_model.compute_forward_kinematics(dyn_state)
    T_base2left = dyn_model.compute_transformation(dyn_state, BASE_INDEX, EE_LEFT_INDEX)
    
    print(T_base2left)

    return T_base2left

def matrix_to_quat_xyzw(Rm):
    return R.from_matrix(Rm).as_quat().tolist()

In [5]:
def load_KD(k_path, d_path):
    try:
        K = np.load(k_path)
    except Exception:
        base = os.path.splitext(k_path)[0]
        try:
            K = np.load(base + ".npy")
        except Exception:
            raise RuntimeError(f"Cannot load K from {k_path}")
    try:
        D = np.load(d_path)
    except Exception:
        base = os.path.splitext(d_path)[0]
        try:
            D = np.load(base + "_D.npy")
        except Exception:
            raise RuntimeError(f"Cannot load D from {d_path}")
    return K, D

try:
    K_mat, D_vec = load_KD(K_FILE, D_FILE)
except Exception as e:
    print("Warning: cannot load K/D now:", e)
    K_mat, D_vec = None, None

IMG_DIR = "calib_images"
SAVE_DIR = "calib_records"
cols, rows, spacing = 4, 5, 0.02
DISPLAY_SCALE = 0.50
UNDISTORT_FOR_DETECT = False
K_FILE = "K_1280x720.npy"
D_FILE = "K_1280x720.npy"
PRESET_FILE = "rs_locked_preset.json"
SAVE_DEBUG_IMAGES = True
PATCH_DIR = "point_patches"
DEBUG_DIR = "debug_images"


os.makedirs(IMG_DIR, exist_ok=True)
os.makedirs(SAVE_DIR, exist_ok=True)
os.makedirs(PATCH_DIR, exist_ok=True)
os.makedirs(DEBUG_DIR, exist_ok=True)


K_mat, D_vec = load_KD(K_FILE, D_FILE)


In [6]:
robot = rb.create_robot(ADDRESS, MODEL)
robot.connect()
calibration_pos = [[0.0, 0.582, -1.122, 0.592, 0.0, 0.0], [-0.477, -0.185, -0.128, -2.261, -0.182, 1.702, 0.0], [-0.477, 0.185, 0.128, -2.261, 0.182, 1.702, -0.0]]

def move_head(pos):
    robot = rb.create_robot(ADDRESS, MODEL)
    robot.connect()
    head_0, head_1 = pos
    rc_builder = rb.RobotCommandBuilder().set_command(
        rb.ComponentBasedCommandBuilder().set_head_command(
            rb.HeadCommandBuilder()
            .set_command(rb.JointPositionCommandBuilder()
            .set_command_header(rb.CommandHeaderBuilder().set_control_hold_time(0.5))
            .set_minimum_time(1)
            .set_position([head_0, head_1])
            )
        )
    )
    rc = robot.send_command(rc_builder).get()
    return rc.finish_code == rb.RobotCommandFeedback.FinishCode.Ok

def move_to(pos = calibration_pos, minimum_time = 3):
    torso, right_arm, left_arm = pos
    rc_builder = rb.RobotCommandBuilder().set_command(
        rb.ComponentBasedCommandBuilder().set_body_command(
            rb.BodyComponentBasedCommandBuilder()
            # Set Torso Command with Joint Position Command
            .set_torso_command(
                rb.JointPositionCommandBuilder()
                .set_command_header(rb.CommandHeaderBuilder().set_control_hold_time(0.5))  # For staturation
                .set_minimum_time(minimum_time)
                .set_position(torso)
            )
            # Set Right Command with Joint Position Command
            .set_right_arm_command(
                rb.JointPositionCommandBuilder()
                .set_command_header(rb.CommandHeaderBuilder().set_control_hold_time(0.5))
                .set_minimum_time(minimum_time)
                .set_position(right_arm)
            )
            # Set Left Command with Joint Position Command
            .set_left_arm_command(
                rb.JointPositionCommandBuilder()
                .set_command_header(rb.CommandHeaderBuilder().set_control_hold_time(0.5))
                .set_minimum_time(minimum_time)
                .set_position(left_arm)
            )
        )
    )
    rc = robot.send_command(rc_builder).get()
    return rc.finish_code == rb.RobotCommandFeedback.FinishCode.Ok
    
import os
import json
from typing import List, Dict, Any, Union

def load_joints_from_json_dir(dir_path: str,
                              json_glob: str = "he_record_*.json") -> List[Union[List[float], Dict[str, Any]]]:

    files = sorted([f for f in os.listdir(dir_path) if f.endswith(".json") and json_glob.replace("*", "") in f or f.startswith(json_glob.split("*")[0])])
    if not files:
        files = sorted([f for f in os.listdir(dir_path) if f.endswith(".json")])

    results = []
    for fn in files:
        full = os.path.join(dir_path, fn)
        try:
            with open(full, "r") as fh:
                rec = json.load(fh)
        except Exception as e:
            print(f"failed to load {full}: {e}")
            continue

        # bd = rec.get("board_detection")
        # if not bd:
        #     continue
        # if not bd.get("found", False):
        #     continue

        joints = rec.get("joints")
        if joints is None:
            print(f"{fn} has no 'joints' field; skipping")
            continue
        out_val = joints

        results.append(out_val)

    print(f"[INFO] scanned {len(files)} json files in '{dir_path}'. found {len(results)} frames with board_detection.found==True")
    return results



카메라(컬러+정렬된 depth) 프레임을 저장하고, 체커(원형)로 2D 코너 검출 → 각 코너에 대해 depth(median)로 3D 포인트 복원 → solvePnP(또는 RANSAC)로 target→camera pose 추정 → 로봇에서 gripper→base(H_base_grip) 읽어서 (gripper↔base, target↔cam) 페어를 모아 hand-eye 보정 → depth 기반으로 포인트 비교/검증하는 플로우.

In [7]:
import numpy as np
K = np.load("K_1280x720.npy")
print("K:\n", K)
D = np.load("D_1280x720.npy") if os.path.exists("D_1280x720.npy") else None
print("D:", D)
assert K.shape == (3,3) and K[0,0]>0 and K[1,1]>0

K:
 [[309.60391334   0.         680.48736427]
 [  0.         841.62754512 393.05409321]
 [  0.           0.           1.        ]]
D: [[ 6.19763036e+01 -9.70926778e+02 -6.50846595e-01  1.21165149e+00
   4.99324976e+03]]


In [37]:
def make_objp_for_pattern(pattern_type):
    if pattern_type == "asymmetric":
        objp = np.zeros((cols*rows,3), np.float64)
        idxp = 0
        for j in range(rows):
            for i in range(cols):
                x = (2 * i + (j % 2)) * spacing
                y = j * spacing
                objp[idxp,0] = x; objp[idxp,1] = y; idxp += 1
        return objp
    else:
        objp = np.zeros((cols*rows,3), np.float64)
        grid = np.mgrid[0:cols, 0:rows].T.reshape(-1,2)
        objp[:,:2] = grid * spacing
        return objp
objp = make_objp_for_pattern("asymmetric")
print("objp[0:8]:\n", objp[:8])
print("objp count:", len(objp))

objp[0:8]:
 [[0.   0.   0.  ]
 [0.08 0.   0.  ]
 [0.16 0.   0.  ]
 [0.24 0.   0.  ]
 [0.32 0.   0.  ]
 [0.04 0.04 0.  ]
 [0.12 0.04 0.  ]
 [0.2  0.04 0.  ]]
objp count: 20


In [21]:
import os, glob, json, csv
import cv2
import numpy as np

JSON_DIR = "calib_records"
IMG_DIR = "calib_images"
OUT_DIR = "debug_images"
CSV_OUT = os.path.join(OUT_DIR, "points_depths_summary.csv")
os.makedirs(OUT_DIR, exist_ok=True)

use_json_mapping = True

def depth_to_color(depth, dmin, dmax):
    if depth is None or (isinstance(depth, float) and np.isnan(depth)):
        return (128,128,128)
    if dmax <= dmin:
        t = 0.5
    else:
        t = float((depth - dmin) / float(dmax - dmin))
        t = max(0.0, min(1.0, t))
    G = int(round(255 * (1.0 - t)))
    R = int(round(255 * t))
    return (0, G, R)

def draw_and_save(img_path, imgpts, depths, out_path, csv_rows=None):
    img = cv2.imread(img_path)
    if img is None:
        print("img open fail:", img_path)
        return False
    font = cv2.FONT_HERSHEY_SIMPLEX
    if depths is None:
        depths = [float('nan')] * len(imgpts)
    depths_arr = np.array([d if d is not None else np.nan for d in depths], dtype=np.float64)
    finite = depths_arr[np.isfinite(depths_arr)]
    if finite.size > 0:
        dmin = float(np.percentile(finite, 5))
        dmax = float(np.percentile(finite, 95))
        if dmax <= dmin:
            dmin = float(finite.min()); dmax = float(finite.max())
    else:
        dmin = 0.0; dmax = 1.0
    for i, (u, v) in enumerate(imgpts):
        ui = int(round(u)); vi = int(round(v))
        depth_val = float(depths_arr[i]) if i < len(depths_arr) else float('nan')
        color = depth_to_color(depth_val, dmin, dmax)
        cv2.circle(img, (ui, vi), radius=6, color=color, thickness=2)
        cv2.putText(img, str(i), (ui + 8, vi + 4), font, 0.5, (0,0,0), thickness=2, lineType=cv2.LINE_AA)
        cv2.putText(img, str(i), (ui + 8, vi + 4), font, 0.5, (255,255,255), thickness=1, lineType=cv2.LINE_AA)
        depth_text = f"{depth_val:.3f} m" if np.isfinite(depth_val) else "NaN"
        tx, ty = ui + 8, vi + 22
        cv2.putText(img, depth_text, (tx, ty), font, 0.45, (0,0,0), thickness=3, lineType=cv2.LINE_AA)
        cv2.putText(img, depth_text, (tx, ty), font, 0.45, (255,255,255), thickness=1, lineType=cv2.LINE_AA)
        if i < len(imgpts) - 1:
            p1 = (ui, vi)
            p2 = tuple(np.round(imgpts[i+1]).astype(int))
            cv2.line(img, p1, p2, (200,200,200), 1)
        if csv_rows is not None:
            csv_rows.append({
                "image": os.path.basename(img_path),
                "idx": i,
                "u": float(u),
                "v": float(v),
                "depth_m": (float(depth_val) if np.isfinite(depth_val) else None)
            })
    cv2.imwrite(out_path, img)
    return True

if use_json_mapping:
    json_files = sorted(glob.glob(os.path.join(JSON_DIR, "he_record_*.json")))
    if not json_files:
        print("no json in", JSON_DIR)

    all_csv_rows = []
    for jf in json_files:
        try:
            rec = json.load(open(jf, "r"))
        except Exception as e:
            print("json read fail:", jf); continue
        bd = rec.get("board_detection", {})
        imgpts = bd.get("imgpts2d")
        depths = bd.get("depths_m", None)
        if not imgpts:
            print("no imgpts - skip", jf); continue
        imgpts = np.array(imgpts, dtype=np.float64)
        imgname = rec.get("image")
        if imgname:
            img_path = os.path.join(IMG_DIR, imgname)
        else:
            base = os.path.basename(jf).replace("he_record_", "he_img_").replace(".json", ".png")
            img_path = os.path.join(IMG_DIR, base)
        if not os.path.exists(img_path):
            print("img not found:", img_path); continue
        out_name = os.path.splitext(os.path.basename(img_path))[0] + "_idx_depth.png"
        out_path = os.path.join(OUT_DIR, out_name)
        csv_rows_frame = []
        ok = draw_and_save(img_path, imgpts, depths, out_path, csv_rows=csv_rows_frame)
        if ok:
            print("saved:", out_path)
            all_csv_rows.extend(csv_rows_frame)
        else:
            print("fail:", img_path)
    if all_csv_rows:
        keys = ["image", "idx", "u", "v", "depth_m"]
        with open(CSV_OUT, "w", newline='') as cf:
            writer = csv.DictWriter(cf, fieldnames=keys)
            writer.writeheader()
            for r in all_csv_rows:
                writer.writerow(r)
        print("csv written:", CSV_OUT)
    else:
        print("no rows")
else:
    img_files = sorted(glob.glob(os.path.join(IMG_DIR, "*.*")))
    all_csv_rows = []
    for img_path in img_files:
        base = os.path.basename(img_path)
        json_guess = base.replace("he_img_", "he_record_").rsplit(".", 1)[0] + ".json"
        jf = os.path.join(JSON_DIR, json_guess)
        if not os.path.exists(jf):
            print("skip no json for", img_path); continue
        rec = json.load(open(jf, "r"))
        bd = rec.get("board_detection", {})
        imgpts = bd.get("imgpts2d")
        depths = bd.get("depths_m", None)
        if not imgpts:
            print("no imgpts - skip", jf); continue
        imgpts = np.array(imgpts, dtype=np.float64)
        out_name = os.path.splitext(base)[0] + "_idx_depth.png"
        out_path = os.path.join(OUT_DIR, out_name)
        csv_rows_frame = []
        ok = draw_and_save(img_path, imgpts, depths, out_path, csv_rows=csv_rows_frame)
        if ok:
            print("saved:", out_path)
            all_csv_rows.extend(csv_rows_frame)
        else:
            print("fail:", img_path)
    if all_csv_rows:
        keys = ["image", "idx", "u", "v", "depth_m"]
        with open(CSV_OUT, "w", newline='') as cf:
            writer = csv.DictWriter(cf, fieldnames=keys)
            writer.writeheader()
            for r in all_csv_rows:
                writer.writerow(r)
        print("csv written:", CSV_OUT)
    else:
        print("no rows")


saved: debug_images/he_img_0000_idx_depth.png
saved: debug_images/he_img_0001_idx_depth.png
saved: debug_images/he_img_0002_idx_depth.png
saved: debug_images/he_img_0003_idx_depth.png
saved: debug_images/he_img_0004_idx_depth.png
saved: debug_images/he_img_0005_idx_depth.png
saved: debug_images/he_img_0006_idx_depth.png
saved: debug_images/he_img_0007_idx_depth.png
saved: debug_images/he_img_0008_idx_depth.png
saved: debug_images/he_img_0009_idx_depth.png
saved: debug_images/he_img_0010_idx_depth.png
saved: debug_images/he_img_0011_idx_depth.png
saved: debug_images/he_img_0012_idx_depth.png
saved: debug_images/he_img_0013_idx_depth.png
saved: debug_images/he_img_0014_idx_depth.png
saved: debug_images/he_img_0015_idx_depth.png
saved: debug_images/he_img_0016_idx_depth.png
saved: debug_images/he_img_0017_idx_depth.png
saved: debug_images/he_img_0018_idx_depth.png
saved: debug_images/he_img_0019_idx_depth.png
saved: debug_images/he_img_0020_idx_depth.png
saved: debug_images/he_img_0021_id

In [None]:
HEAD_CAL_POSE = [math.radians(0), math.radians(-12)]
move_head(HEAD_CAL_POSE)

True

In [None]:
import os, glob, cv2, json
import numpy as np

IMG_DIR = "calib_images"
OUT_DIR = "debug_detect"
os.makedirs(OUT_DIR, exist_ok=True)

# board guesses (adjust if your board is different)
cols, rows = 5, 4   # pattern size (cols, rows) used in your pipeline
use_asymmetric = True

# blob detector variants to try
def make_detector(variant=0):
    p = cv2.SimpleBlobDetector_Params()
    if variant == 0:
        p.minThreshold=5; p.maxThreshold=220; p.thresholdStep=5
        p.filterByArea=True; p.minArea=10; p.maxArea=50000
        p.filterByCircularity=True; p.minCircularity=0.4
        p.filterByInertia=True; p.minInertiaRatio=0.1
        p.filterByConvexity=True; p.minConvexity=0.3
        p.filterByColor=True; p.blobColor=255
    elif variant == 1:
        p.minThreshold=1; p.maxThreshold=240; p.thresholdStep=10
        p.filterByArea=True; p.minArea=5; p.maxArea=100000
        p.filterByCircularity=False
        p.filterByInertia=False
        p.filterByConvexity=False
        p.filterByColor=True; p.blobColor=255
    else:
        p.minThreshold=10; p.maxThreshold=200; p.thresholdStep=10
        p.filterByArea=True; p.minArea=30; p.maxArea=50000
        p.filterByCircularity=True; p.minCircularity=0.6
        p.filterByInertia=True; p.minInertiaRatio=0.2
        p.filterByConvexity=True; p.minConvexity=0.6
        p.filterByColor=True; p.blobColor=255
    return cv2.SimpleBlobDetector_create(p)

detectors = [make_detector(i) for i in range(3)]
img_files = sorted(glob.glob(os.path.join(IMG_DIR, "*.*")))

# try charuco if aruco present
try:
    aruco = cv2.aruco
    has_aruco = True
except Exception:
    has_aruco = False

for fp in img_files:
    img = cv2.imread(fp)
    if img is None:
        print("fail: open", fp); continue
    gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
    success = False
    info = []

    # preprocessing variants
    variants = [
        ("orig", gray),
        ("blur", cv2.GaussianBlur(gray, (5,5), 0)),
        ("equal", cv2.equalizeHist(gray)),
        ("thresh", cv2.adaptiveThreshold(gray,255,cv2.ADAPTIVE_THRESH_GAUSSIAN_C,cv2.THRESH_BINARY,11,2))
    ]

    for vname, g in variants:
        if success: break
        if use_asymmetric:
            for det_i, det in enumerate(detectors):
                try:
                    flag = cv2.CALIB_CB_ASYMMETRIC_GRID
                    ret, centers = cv2.findCirclesGrid(g, (cols, rows), flags=flag, blobDetector=det)
                    if ret:
                        out = img.copy()
                        cv2.drawChessboardCorners(out, (cols, rows), centers, ret)
                        outp = os.path.join(OUT_DIR, os.path.basename(fp).replace(".",f"_asym_{vname}{det_i}."))
                        outp = outp + "png"
                        cv2.imwrite(outp, out)
                        print("ok asym", os.path.basename(fp), vname, "det", det_i)
                        success = True
                        break
                except Exception:
                    pass
            if success: break


    if not success:
        print("warn: no pattern", os.path.basename(fp))


warn: no pattern he_img_0000.png


In [None]:
import os, glob, json
import numpy as np
import cv2


JSON_DIR = "calib_records"
K_FILE_FALLBACK = "K_1280x720.npy"
D_FILE_FALLBACK = "D_1280x720.npy"
IMG_W = 1280
IMG_H = 720
COLS, ROWS, SPACING = 5, 4, 0.04
MAX_D_MAG_WARN = 1e3
RMSE_GOOD = 5.0

def load_KD_from_files(sample_json=None):
    K = None; D = None
    if sample_json:
        kinf = sample_json.get("camera_intrinsics")
        if kinf and os.path.exists(kinf):
            try:
                K = np.load(kinf)
                print("ok: loaded K from json field")
            except Exception:
                print("warn: failed load K from json field")
    if K is None and os.path.exists(K_FILE_FALLBACK):
        K = np.load(K_FILE_FALLBACK); print("ok: loaded", K_FILE_FALLBACK)
    if os.path.exists(D_FILE_FALLBACK):
        D = np.load(D_FILE_FALLBACK); print("ok: loaded", D_FILE_FALLBACK)
    if K is None:
        raise RuntimeError("err: K not found")
    if D is None:
        D = np.zeros((5,), dtype=float); print("info: using zero D")
    return K, D

def basic_checks(K, D, img_w=None, img_h=None):
    ok = True
    if K.shape != (3,3):
        print("err: K shape", K.shape); ok = False
    fx, fy = K[0,0], K[1,1]
    cx, cy = K[0,2], K[1,2]
    skew = K[0,1]
    if fx <= 0 or fy <= 0:
        print("err: fx/fy nonpositive", fx, fy); ok = False
    else:
        print("ok: fx,fy", round(fx,2), round(fy,2))
    if abs(skew) > 1e-3 * fx:
        print("warn: skew not ~0:", skew)
    else:
        print("ok: skew small")
    if img_w and img_h:
        if not (0 <= cx <= img_w):
            print("warn: cx out of bounds", cx)
        else:
            print("ok: cx", round(cx,2))
        if not (0 <= cy <= img_h):
            print("warn: cy out of bounds", cy)
        else:
            print("ok: cy", round(cy,2))
    print("D len", D.shape, "values:", np.round(D,4).tolist())
    if np.any(np.abs(D) > MAX_D_MAG_WARN):
        print("warn: large distortion coefficients -> likely wrong")
    return ok

def make_objp(pattern_type="asymmetric", cols=COLS, rows=ROWS, spacing=SPACING):
    if pattern_type == "asymmetric":
        objp = np.zeros((cols*rows,3), np.float64)
        idx=0
        for j in range(rows):
            for i in range(cols):
                x = (2*i + (j%2))*spacing
                y = j*spacing
                objp[idx,0] = x; objp[idx,1] = y; idx+=1
        return objp
    else:
        objp = np.zeros((cols*rows,3), np.float64)
        grid = np.mgrid[0:cols, 0:rows].T.reshape(-1,2)
        objp[:,:2] = grid * spacing
        return objp

def compute_reproj_rmse(objp, imgpts, rvec, tvec, K, D):
    proj, _ = cv2.projectPoints(objp, rvec, tvec, K, D)
    proj = proj.reshape(-1,2)
    img = np.asarray(imgpts, dtype=np.float64).reshape(-1,2)
    errs = np.linalg.norm(img - proj, axis=1)
    return float(np.sqrt(np.mean(errs**2))), errs

def reprojection_test_on_jsons(K, D, json_dir=JSON_DIR, pattern_type="asymmetric"):
    json_files = sorted(glob.glob(os.path.join(json_dir, "he_record_*.json")))
    if not json_files:
        print("info: no JSONs for reprojection test")
        return
    summary = []
    for jf in json_files:
        try:
            rec = json.load(open(jf,"r"))
        except Exception:
            continue
        bd = rec.get("board_detection",{})
        if not bd.get("found", False):
            continue
        imgpts = bd.get("imgpts2d", [])
        depths = bd.get("depths_m", [])
        imgpts = np.array(imgpts, dtype=np.float64)
        depths_np = np.array([d if d is not None else np.nan for d in depths], dtype=np.float64)
        mask = (~np.isnan(depths_np)) & (depths_np>0) & (depths_np<5.0)
        if mask.sum() < 4:
            mask = np.ones(len(depths_np), dtype=bool)
        objp = make_objp(pattern_type)
        m = min(len(objp), len(imgpts))
        objp = objp[:m]; imgpts = imgpts[:m]; mask = mask[:m]
        objp_sub = objp[mask]; imgpts_sub = imgpts[mask]
        try:
            ret = cv2.solvePnPRansac(objp_sub, imgpts_sub, K, D, iterationsCount=1500, reprojectionError=8.0, confidence=0.99)
            ok = False
            if isinstance(ret, tuple) and len(ret)==4:
                ok, rvec, tvec, inliers = ret
            elif isinstance(ret, tuple) and len(ret)==3:
                rvec,tvec,inliers = ret; ok = True
            else:
                ok = False
            if not ok or rvec is None or tvec is None:
                ok2, rvec2, tvec2 = cv2.solvePnP(objp_sub, imgpts_sub, K, D, flags=cv2.SOLVEPNP_ITERATIVE)
                if ok2:
                    rvec, tvec = rvec2, tvec2
                else:
                    raise RuntimeError("pnp fail")
            rmse, errs = compute_reproj_rmse(objp_sub, imgpts_sub, rvec, tvec, K, D)
            rmse0, _ = compute_reproj_rmse(objp_sub, imgpts_sub, rvec, tvec, K, np.zeros_like(D))
            summary.append((os.path.basename(jf), float(rmse), float(rmse0)))
            print(f"{os.path.basename(jf)} rmse={rmse:.3f} rmse_noD={rmse0:.3f}")
        except Exception as e:
            print("warn: pnp fail for", os.path.basename(jf))
            continue
    if summary:
        rmses = np.array([s[1] for s in summary])
        mean_rmse = float(rmses.mean())
        print("summary mean rmse:", round(mean_rmse,3))
        bad = [(n,r0,r1) for n,r0,r1 in summary if r0 > RMSE_GOOD and r1 < r0*0.5]
        if bad:
            print("warn: frames where zero-D better (possible bad D):")
            for b in bad[:5]:
                print(" -", b[0], "rmse", round(b[1],2), "rmse_noD", round(b[2],2))
    else:
        print("info: no reproj results")

def synthetic_projection_test(K, D, img_w=IMG_W, img_h=IMG_H):
    xs = np.linspace(-0.2, 0.2, 5)
    ys = np.linspace(-0.15, 0.15, 4)
    pts = []
    for y in ys:
        for x in xs:
            pts.append([x, y, 1.0])
    obj = np.array(pts, dtype=np.float64)
    rvec = np.zeros((3,1)); tvec = np.array([[0.0],[0.0],[0.0]])
    proj, _ = cv2.projectPoints(obj, rvec, tvec, K, D)
    proj = proj.reshape(-1,2)
    inside = np.logical_and.reduce((proj[:,0]>=0, proj[:,0]<img_w, proj[:,1]>=0, proj[:,1]<img_h))
    pct_inside = float(inside.sum())/len(obj)*100.0
    print("synthetic: %d/%d points inside image (%.1f%%)" % (inside.sum(), len(obj), pct_inside))
    if pct_inside < 50.0:
        print("warn: many projections outside image -> K or extrinsic offsets suspect")
    else:
        print("ok: synthetic projection looks reasonable")

if __name__ == "__main__":
    sample = None
    jfiles = sorted(glob.glob(os.path.join(JSON_DIR, "he_record_*.json")))
    if jfiles:
        try:
            sample = json.load(open(jfiles[0],"r"))
        except Exception:
            sample = None
    K, D = load_KD_from_files(sample)
    print("checking K/D ..")
    basic_checks(K, D, img_w=IMG_W, img_h=IMG_H)
    print("synthetic test ..")
    synthetic_projection_test(K, D)
    print("reprojection test on JSONs ..")
    reprojection_test_on_jsons(K, D)
    print("done")


ok: loaded K from json field
ok: loaded D_1280x720.npy
checking K/D ..
ok: fx,fy 309.6 841.63
ok: skew small
ok: cx 680.49
ok: cy 393.05
D len (1, 5) values: [[61.9763, -970.9268, -0.6508, 1.2117, 4993.2498]]
warn: large distortion coefficients -> likely wrong
synthetic test ..
synthetic: 19/20 points inside image (95.0%)
ok: synthetic projection looks reasonable
reprojection test on JSONs ..
he_record_0000.json rmse=8986606.039 rmse_noD=326.999
he_record_0001.json rmse=138768792.970 rmse_noD=562.455
he_record_0002.json rmse=35911128424.579 rmse_noD=491.698
he_record_0003.json rmse=317245281.049 rmse_noD=409.634
he_record_0004.json rmse=37508428252.863 rmse_noD=697.768
he_record_0005.json rmse=78.139 rmse_noD=83.028
he_record_0006.json rmse=77.367 rmse_noD=82.213
he_record_0007.json rmse=274.305 rmse_noD=152.232
he_record_0008.json rmse=27508319447.232 rmse_noD=1019.701
he_record_0009.json rmse=77.117 rmse_noD=85.286
he_record_0010.json rmse=3154669819.498 rmse_noD=504.111
he_record_00

In [15]:

params = cv2.SimpleBlobDetector_Params()
params.minThreshold=10; params.maxThreshold=220; params.thresholdStep=10
params.filterByArea = True; params.minArea = 30; params.maxArea = 50000
params.filterByCircularity = True; params.minCircularity = 0.6
params.filterByInertia = True; params.minInertiaRatio = 0.2
params.filterByConvexity = True; params.minConvexity = 0.6
params.filterByColor = True; params.blobColor = 255
detector = cv2.SimpleBlobDetector_create(params)

pipeline = rs.pipeline()
cfg = rs.config()
cfg.enable_stream(rs.stream.color, 1280, 720, rs.format.bgr8, 30)
cfg.enable_stream(rs.stream.depth, 1280, 720, rs.format.z16, 30)
profile = pipeline.start(cfg)
time.sleep(0.1)
align_to = rs.stream.color
align = rs.align(align_to)

dev = profile.get_device()
depth_sensor = dev.first_depth_sensor()
depth_scale = depth_sensor.get_depth_scale()
print("[INFO] depth scale (m/unit):", depth_scale)

def get_depth_m_at_pixel(depth_image, u, v, depth_scale, init_patch=3, max_patch=21):
    h, w = depth_image.shape
    u0 = int(round(u)); v0 = int(round(v))
    for r in range(init_patch, max_patch+1, 2):
        x1 = max(0, u0 - r); x2 = min(w-1, u0 + r)
        y1 = max(0, v0 - r); y2 = min(h-1, v0 + r)
        patch = depth_image[y1:y2+1, x1:x2+1].astype(np.float32)
        valid = patch > 0
        if valid.sum() > 0:
            med = np.median(patch[valid]) * depth_scale
            return float(med), (x1,x2,y1,y2)
    return None, None

def pixel_to_cam_point(u, v, z, K):
    fx = float(K[0,0]); fy = float(K[1,1]); cx = float(K[0,2]); cy = float(K[1,2])
    x = (u - cx) * z / fx
    y = (v - cy) * z / fy
    return np.array([x, y, z], dtype=float)

def campoint_to_grip_base(p_cam, H_base_grip=None, cam2grip_R=None, cam2grip_t=None):
    out = {}
    if cam2grip_R is not None and cam2grip_t is not None:
        p_grip = cam2grip_R.dot(p_cam) + cam2grip_t
        out["p_grip"] = p_grip.tolist()
        if H_base_grip is not None:
            Rbg = H_base_grip[:3,:3]; tbg = H_base_grip[:3,3]
            p_base = Rbg.dot(p_grip) + tbg
            out["p_base"] = p_base.tolist()
    return out

def make_objp_for_pattern(pattern_type):
    if pattern_type == "asymmetric":
        objp = np.zeros((cols*rows,3), np.float64)
        idxp = 0
        for j in range(rows):
            for i in range(cols):
                x = (2 * i + (j % 2)) * spacing
                y = j * spacing
                objp[idxp,0] = x; objp[idxp,1] = y; idxp += 1
        return objp
    else:
        objp = np.zeros((cols*rows,3), np.float64)
        grid = np.mgrid[0:cols, 0:rows].T.reshape(-1,2)
        objp[:,:2] = grid * spacing
        return objp

idx = len([n for n in os.listdir(IMG_DIR) if n.endswith(".png")])
print("Press 's' to save frame+pose, 'q' to quit.")
result = load_joints_from_json_dir(dir_path="read_positions_1343")
try:
    for row in result:
        move_to(row, 2)
        time.sleep(5)
        frames = pipeline.wait_for_frames()
        aligned = align.process(frames)
        color_frame = aligned.get_color_frame()
        depth_frame = aligned.get_depth_frame()
        if not color_frame or not depth_frame:
            continue
        color = np.asanyarray(color_frame.get_data())
        depth = np.asanyarray(depth_frame.get_data())

       
        t_wall = time.time()
        t_mon = time.monotonic_ns()
        img_name = f"he_img_{idx:04d}.png"
        img_path = os.path.join(IMG_DIR, img_name)
        cv2.imwrite(img_path, color)

        joints = get_joint_positions()
        H = get_flange_homogeneous()
        H_list = H.tolist()
        t_vec = H[:3,3].tolist()
        quat = matrix_to_quat_xyzw(H[:3,:3])

        gray = cv2.cvtColor(color, cv2.COLOR_BGR2GRAY)
        gray_for_detect = gray
        if UNDISTORT_FOR_DETECT and (K_mat is not None and D_vec is not None):
            newK, _ = cv2.getOptimalNewCameraMatrix(K_mat, D_vec, (gray.shape[1], gray.shape[0]), 0)
            gray_for_detect = cv2.undistort(color, K_mat, D_vec, None, newK)
            gray_for_detect = cv2.cvtColor(gray_for_detect, cv2.COLOR_BGR2GRAY)

        board_info = {"found": False, "pattern_type": None, "imgpts2d": [], "depths_m": [], "points_cam": [], "rvec": None, "tvec": None, "inliers": None}
        centers = None

        try:
            ret_asym, centers_asym = cv2.findCirclesGrid(gray_for_detect, (cols, rows),
                                                            flags=cv2.CALIB_CB_ASYMMETRIC_GRID,
                                                            blobDetector=detector)
        except Exception:
            ret_asym = False; centers_asym = None
        if ret_asym:
            centers = centers_asym.reshape(-1,2).astype(np.float32)
            pattern_type = "asymmetric"
        else:
            try:
                ret_sym, centers_sym = cv2.findCirclesGrid(gray_for_detect, (cols, rows),
                                                            flags=cv2.CALIB_CB_SYMMETRIC_GRID,
                                                            blobDetector=detector)
            except Exception:
                ret_sym = False; centers_sym = None
            if ret_sym:
                centers = centers_sym.reshape(-1,2).astype(np.float32)
                pattern_type = "symmetric"

        if centers is not None:
            board_info["found"] = True
            board_info["pattern_type"] = pattern_type
            board_info["imgpts2d"] = centers.tolist()

            depths_list = []
            campts_list = []
            for i,(cx,cy) in enumerate(centers):
                z_m, bbox = get_depth_m_at_pixel(depth, cx, cy, depth_scale, init_patch=3, max_patch=21)
                if z_m is None:
                    depths_list.append(None)
                    campts_list.append(None)
                else:
                    depths_list.append(z_m)
                    if K_mat is not None:
                        p_cam = pixel_to_cam_point(float(cx), float(cy), z_m, K_mat)
                        campts_list.append(p_cam.tolist())
                    else:
                        campts_list.append([None,None,z_m])
                if SAVE_DEBUG_IMAGES and bbox is not None:
                    x1,x2,y1,y2 = bbox
                    patch = gray[y1:y2+1, x1:x2+1]
                    # cv2.imwrite(os.path.join(PATCH_DIR, f"patch_{idx:04d}_pt{i:02d}.png"), patch)
            board_info["depths_m"] = depths_list
            board_info["points_cam"] = campts_list

            if K_mat is not None:
                objp = make_objp_for_pattern(pattern_type)

                try:
                    ok, rvec_r, tvec_r, inliers = cv2.solvePnPRansac(objp.astype(np.float64), centers.astype(np.float64), K_mat, D_vec,
                                                                        iterationsCount=200, reprojectionError=8.0, confidence=0.99)
                except Exception:
                    ok = False; rvec_r = None; tvec_r = None; inliers = None
                if ok:
                    board_info["rvec"] = rvec_r.reshape(3).tolist()
                    board_info["tvec"] = tvec_r.reshape(3).tolist()
                    board_info["inliers"] = int(len(inliers)) if inliers is not None else None
                else:
                    try:
                        ok2, rvec2, tvec2 = cv2.solvePnP(objp.astype(np.float64), centers.astype(np.float64), K_mat, D_vec, flags=cv2.SOLVEPNP_ITERATIVE)
                        if ok2:
                            board_info["rvec"] = rvec2.reshape(3).tolist()
                            board_info["tvec"] = tvec2.reshape(3).tolist()
                    except Exception:
                        pass
        rec = {
            "image": img_name,
            "timestamp": t_wall,
            "monotonic_ns": t_mon,
            "camera_intrinsics": K_FILE,
            "preset": PRESET_FILE,
            "gripper_pose_base": {"H": H_list, "translation_m": t_vec, "rotation_quat_xyzw": quat},
            "joints": joints,
            "board_detection": board_info,
            "notes": ""
        }
        json_name = f"he_record_{idx:04d}.json"
        with open(os.path.join(SAVE_DIR, json_name), 'w') as f:
            json.dump(rec, f, indent=2)

        print("Saved", img_name, json_name, "found:", board_info["found"], "pattern:", board_info.get("pattern_type"))
        idx += 1


finally:
    pipeline.stop()
    cv2.destroyAllWindows()


[INFO] depth scale (m/unit): 0.0010000000474974513
Press 's' to save frame+pose, 'q' to quit.
[INFO] scanned 31 json files in 'read_positions_1343'. found 31 frames with board_detection.found==True
[[-0.04985321  0.31682823 -0.94717186  0.42136549]
 [-0.72428412  0.64152165  0.25271027  0.09237487]
 [ 0.687697    0.69861996  0.19749175  1.2390986 ]
 [ 0.          0.          0.          1.        ]]
Saved he_img_0000.png he_record_0000.json found: True pattern: asymmetric
[[-0.11646527  0.3491276  -0.92980953  0.42564791]
 [-0.76475939  0.56580121  0.30824026  0.05311458]
 [ 0.63370254  0.74697986  0.20110242  1.24770045]
 [ 0.          0.          0.          1.        ]]
Saved he_img_0001.png he_record_0001.json found: True pattern: asymmetric
[[ 0.09508738  0.41750451 -0.90368599  0.43919397]
 [-0.74846524  0.62850505  0.21161564  0.09254945]
 [ 0.6563217   0.65625558  0.37225051  1.17878259]
 [ 0.          0.          0.          1.        ]]
Saved he_img_0002.png he_record_0002.js

In [None]:

params = cv2.SimpleBlobDetector_Params()
params.minThreshold=10; params.maxThreshold=220; params.thresholdStep=10
params.filterByArea = True; params.minArea = 30; params.maxArea = 50000
params.filterByCircularity = True; params.minCircularity = 0.6
params.filterByInertia = True; params.minInertiaRatio = 0.2
params.filterByConvexity = True; params.minConvexity = 0.6
params.filterByColor = True; params.blobColor = 255
detector = cv2.SimpleBlobDetector_create(params)

pipeline = rs.pipeline()
cfg = rs.config()
cfg.enable_stream(rs.stream.color, 1280, 720, rs.format.bgr8, 30)
cfg.enable_stream(rs.stream.depth, 1280, 720, rs.format.z16, 30)
profile = pipeline.start(cfg)
time.sleep(0.1)
align_to = rs.stream.color
align = rs.align(align_to)

dev = profile.get_device()
depth_sensor = dev.first_depth_sensor()
depth_scale = depth_sensor.get_depth_scale()
print("[INFO] depth scale (m/unit):", depth_scale)

def get_depth_m_at_pixel(depth_image, u, v, depth_scale, init_patch=3, max_patch=21):
    h, w = depth_image.shape
    u0 = int(round(u)); v0 = int(round(v))
    for r in range(init_patch, max_patch+1, 2):
        x1 = max(0, u0 - r); x2 = min(w-1, u0 + r)
        y1 = max(0, v0 - r); y2 = min(h-1, v0 + r)
        patch = depth_image[y1:y2+1, x1:x2+1].astype(np.float32)
        valid = patch > 0
        if valid.sum() > 0:
            med = np.median(patch[valid]) * depth_scale
            return float(med), (x1,x2,y1,y2)
    return None, None

def pixel_to_cam_point(u, v, z, K):
    fx = float(K[0,0]); fy = float(K[1,1]); cx = float(K[0,2]); cy = float(K[1,2])
    x = (u - cx) * z / fx
    y = (v - cy) * z / fy
    return np.array([x, y, z], dtype=float)

def campoint_to_grip_base(p_cam, H_base_grip=None, cam2grip_R=None, cam2grip_t=None):
    out = {}
    if cam2grip_R is not None and cam2grip_t is not None:
        p_grip = cam2grip_R.dot(p_cam) + cam2grip_t
        out["p_grip"] = p_grip.tolist()
        if H_base_grip is not None:
            Rbg = H_base_grip[:3,:3]; tbg = H_base_grip[:3,3]
            p_base = Rbg.dot(p_grip) + tbg
            out["p_base"] = p_base.tolist()
    return out

def make_objp_for_pattern(pattern_type):
    if pattern_type == "asymmetric":
        objp = np.zeros((cols*rows,3), np.float64)
        idxp = 0
        for j in range(rows):
            for i in range(cols):
                x = (2 * i + (j % 2)) * spacing
                y = j * spacing
                objp[idxp,0] = x; objp[idxp,1] = y; idxp += 1
        return objp
    else:
        objp = np.zeros((cols*rows,3), np.float64)
        grid = np.mgrid[0:cols, 0:rows].T.reshape(-1,2)
        objp[:,:2] = grid * spacing
        return objp

idx = len([n for n in os.listdir(IMG_DIR) if n.endswith(".png")])
print("Press 's' to save frame+pose, 'q' to quit.")
try:
    while True:
        frames = pipeline.wait_for_frames()
        aligned = align.process(frames)
        color_frame = aligned.get_color_frame()
        depth_frame = aligned.get_depth_frame()
        if not color_frame or not depth_frame:
            continue
        color = np.asanyarray(color_frame.get_data())
        depth = np.asanyarray(depth_frame.get_data())

        disp = color.copy()
        if DISPLAY_SCALE != 1.0:
            disp = cv2.resize(disp, (int(disp.shape[1]*DISPLAY_SCALE), int(disp.shape[0]*DISPLAY_SCALE)))
        cv2.imshow("cap", disp)
        k = cv2.waitKey(1) & 0xFF
        if k == ord('s'):
            t_wall = time.time()
            t_mon = time.monotonic_ns()
            img_name = f"he_img_{idx:04d}.png"
            img_path = os.path.join(IMG_DIR, img_name)
            cv2.imwrite(img_path, color)

            joints = get_joint_positions()
            H = get_flange_homogeneous()
            H_list = H.tolist()
            t_vec = H[:3,3].tolist()
            quat = matrix_to_quat_xyzw(H[:3,:3])

            gray = cv2.cvtColor(color, cv2.COLOR_BGR2GRAY)
            gray_for_detect = gray
            if UNDISTORT_FOR_DETECT and (K_mat is not None and D_vec is not None):
                newK, _ = cv2.getOptimalNewCameraMatrix(K_mat, D_vec, (gray.shape[1], gray.shape[0]), 0)
                gray_for_detect = cv2.undistort(color, K_mat, D_vec, None, newK)
                gray_for_detect = cv2.cvtColor(gray_for_detect, cv2.COLOR_BGR2GRAY)

            board_info = {"found": False, "pattern_type": None, "imgpts2d": [], "depths_m": [], "points_cam": [], "rvec": None, "tvec": None, "inliers": None}
            centers = None

            try:
                ret_asym, centers_asym = cv2.findCirclesGrid(gray_for_detect, (cols, rows),
                                                             flags=cv2.CALIB_CB_ASYMMETRIC_GRID,
                                                             blobDetector=detector)
            except Exception:
                ret_asym = False; centers_asym = None
            if ret_asym:
                centers = centers_asym.reshape(-1,2).astype(np.float32)
                pattern_type = "asymmetric"
            else:
                try:
                    ret_sym, centers_sym = cv2.findCirclesGrid(gray_for_detect, (cols, rows),
                                                               flags=cv2.CALIB_CB_SYMMETRIC_GRID,
                                                               blobDetector=detector)
                except Exception:
                    ret_sym = False; centers_sym = None
                if ret_sym:
                    centers = centers_sym.reshape(-1,2).astype(np.float32)
                    pattern_type = "symmetric"

            if centers is not None:
                board_info["found"] = True
                board_info["pattern_type"] = pattern_type
                board_info["imgpts2d"] = centers.tolist()

                depths_list = []
                campts_list = []
                for i,(cx,cy) in enumerate(centers):
                    z_m, bbox = get_depth_m_at_pixel(depth, cx, cy, depth_scale, init_patch=3, max_patch=21)
                    if z_m is None:
                        depths_list.append(None)
                        campts_list.append(None)
                    else:
                        depths_list.append(z_m)
                        if K_mat is not None:
                            p_cam = pixel_to_cam_point(float(cx), float(cy), z_m, K_mat)
                            campts_list.append(p_cam.tolist())
                        else:
                            campts_list.append([None,None,z_m])
                    if SAVE_DEBUG_IMAGES and bbox is not None:
                        x1,x2,y1,y2 = bbox
                        patch = gray[y1:y2+1, x1:x2+1]
                        # cv2.imwrite(os.path.join(PATCH_DIR, f"patch_{idx:04d}_pt{i:02d}.png"), patch)
                board_info["depths_m"] = depths_list
                board_info["points_cam"] = campts_list

                if K_mat is not None:
                    objp = make_objp_for_pattern(pattern_type)

                    try:
                        ok, rvec_r, tvec_r, inliers = cv2.solvePnPRansac(objp.astype(np.float64), centers.astype(np.float64), K_mat, D_vec,
                                                                         iterationsCount=200, reprojectionError=8.0, confidence=0.99)
                    except Exception:
                        ok = False; rvec_r = None; tvec_r = None; inliers = None
                    if ok:
                        board_info["rvec"] = rvec_r.reshape(3).tolist()
                        board_info["tvec"] = tvec_r.reshape(3).tolist()
                        board_info["inliers"] = int(len(inliers)) if inliers is not None else None
                    else:
                        try:
                            ok2, rvec2, tvec2 = cv2.solvePnP(objp.astype(np.float64), centers.astype(np.float64), K_mat, D_vec, flags=cv2.SOLVEPNP_ITERATIVE)
                            if ok2:
                                board_info["rvec"] = rvec2.reshape(3).tolist()
                                board_info["tvec"] = tvec2.reshape(3).tolist()
                        except Exception:
                            pass
            rec = {
                "image": img_name,
                "timestamp": t_wall,
                "monotonic_ns": t_mon,
                "camera_intrinsics": K_FILE,
                "preset": PRESET_FILE,
                "gripper_pose_base": {"H": H_list, "translation_m": t_vec, "rotation_quat_xyzw": quat},
                "joints": joints,
                "board_detection": board_info,
                "notes": ""
            }
            json_name = f"he_record_{idx:04d}.json"
            with open(os.path.join(SAVE_DIR, json_name), 'w') as f:
                json.dump(rec, f, indent=2)

            print("Saved", img_name, json_name, "found:", board_info["found"], "pattern:", board_info.get("pattern_type"))
            idx += 1

        elif k == ord('q'):
            break

finally:
    pipeline.stop()
    cv2.destroyAllWindows()
