# Bundle pose estimation by reflected rays
Find rigid pose (R, t) of the mirror bundle and assign observed reflected rays to mirrors.


Algorithm outline:
1. Load mirrors (local centers/normals) and observed rays (two points per ray).
2. Estimate ray normals: `n_est = normalize(d_in - d_out)` from observed directions.
3. Iterate: for current R, t build cost (angle + distance of mirror center to ray), find best mirror?ray assignment (rays < mirrors).
4. Update R with Kabsch on normals; update t to place centers onto closest points of assigned rays.
5. Stop when assignment/cost stabilizes.


In [44]:
import json
from dataclasses import dataclass
from pathlib import Path
from typing import List, Tuple
import itertools as it
import numpy as np
import plotly.graph_objects as go


In [45]:
BUNDLE_CONFIG = Path('bundle_config.txt')
RAYS_CONFIG = Path('rays_config.txt')
# Incoming ray: along +Z, hits the center of each mirror (laser from small Z to large Z)
D_IN = np.array([0.0, 0.0, 1.0])


In [46]:
@dataclass
class Mirror:
    name: str
    cut_deg: float
    tilt_deg: float
    height: float
    x: float
    y: float

@dataclass
class BundleConfig:
    fiber_diameter: float = 1.0
    pitch: float = 1.4
    label_offset: float = 0.6
    mirrors: List[Mirror] = None

def _parse_float(value: str, field: str, raw: str) -> float:
    try:
        return float(value)
    except ValueError as exc:
        raise ValueError(f'Failed to read {field} in line: {raw}') from exc

def load_bundle_config(path: Path) -> BundleConfig:
    mirrors: List[Mirror] = []
    globals_cfg = {}
    for raw in path.read_text(encoding='utf-8').splitlines():
        clean = raw.split('#', 1)[0].strip()
        if not clean:
            continue
        if clean.startswith('mirror'):
            parts = [p.strip() for p in clean.split(',') if p.strip()]
            params = {}
            for part in parts:
                if '=' in part:
                    k, v = part.split('=', 1)
                    params[k.strip()] = v.strip()
            name = params.get('mirror') or params.get('name')
            cut = _parse_float(params.get('cut', ''), 'cut', raw)
            tilt = _parse_float(params.get('tilt', '0'), 'tilt', raw)
            height = _parse_float(params.get('height', '0'), 'height', raw)
            x = _parse_float(params.get('x', '0'), 'x', raw)
            y = _parse_float(params.get('y', '0'), 'y', raw)
            mirrors.append(Mirror(name=name, cut_deg=cut, tilt_deg=tilt, height=height, x=x, y=y))
        else:
            if '=' not in clean:
                continue
            k, v = clean.split('=', 1)
            globals_cfg[k.strip()] = float(v.strip())
    return BundleConfig(
        fiber_diameter=float(globals_cfg.get('fiber_diameter', 1.0)),
        pitch=float(globals_cfg.get('pitch', globals_cfg.get('pitch_mm', 1.4))),
        label_offset=float(globals_cfg.get('label_offset', 0.6)),
        mirrors=mirrors,
    )

def load_rays_config(path: Path):
    rays = []
    for raw in path.read_text(encoding='utf-8').splitlines():
        clean = raw.split('#', 1)[0].strip()
        if not clean:
            continue
        parts = [p.strip() for p in clean.split(',') if p.strip()]
        p0 = p1 = None
        for p in parts:
            if p.startswith('p0='):
                vals = p.split('=', 1)[1].strip().split()
                p0 = np.array([float(v) for v in vals], dtype=float)
            if p.startswith('p1='):
                vals = p.split('=', 1)[1].strip().split()
                p1 = np.array([float(v) for v in vals], dtype=float)
        if p0 is None or p1 is None:
            raise ValueError(f'Missing p0/p1 in line: {raw}')
        rays.append((p0, p1))
    return rays

def mirror_geometry(m: Mirror):
    cut_rad = np.deg2rad(m.cut_deg)
    tilt_rad = np.deg2rad(m.tilt_deg)
    n_local = np.array([
        np.sin(cut_rad) * np.cos(tilt_rad),
        np.sin(cut_rad) * np.sin(tilt_rad),
        np.cos(cut_rad),
    ], dtype=float)
    c_local = np.array([m.x, m.y, m.height], dtype=float)
    # Flip normal outward (away from bundle center) if its XY projection points inward
    if np.linalg.norm(c_local[:2]) > 1e-6:
        n_xy = n_local[:2]
        c_xy = c_local[:2]
        if np.dot(n_xy, c_xy) < 0:
            n_local = -n_local
    # Ensure the reflective side faces incoming beam (dot(D_IN, n) < 0)
    if np.dot(D_IN, n_local) > 0:
        n_local = -n_local
    return n_local / np.linalg.norm(n_local), c_local


In [47]:
def kabsch(A: np.ndarray, B: np.ndarray) -> np.ndarray:
    # A, B: 3 x N
    H = A @ B.T
    U, S, Vt = np.linalg.svd(H)
    R = Vt.T @ U.T
    if np.linalg.det(R) < 0:
        Vt[-1, :] *= -1
        R = Vt.T @ U.T
    return R

def distance_point_to_line(p: np.ndarray, a: np.ndarray, d: np.ndarray) -> float:
    diff = p - a
    proj = np.dot(diff, d) * d
    return float(np.linalg.norm(diff - proj))

def solve_pose(bundle: BundleConfig, rays, *, w_ang=1.0, w_pos=1.0, w_div=10.0):
    """Brute-force assignment (rays <= mirrors), closed-form R via Kabsch, t via least squares to lines."""
    normals_local = []
    centers_local = []
    for m in bundle.mirrors:
        n_loc, c_loc = mirror_geometry(m)
        normals_local.append(n_loc)
        centers_local.append(c_loc)
    normals_local = np.stack(normals_local)
    centers_local = np.stack(centers_local)

    d_obs = []
    n_est = []
    ray_anchors = []
    for p0, p1 in rays:
        d = p1 - p0
        d = d / np.linalg.norm(d)
        d_obs.append(d)
        n_est.append((D_IN - d) / np.linalg.norm(D_IN - d))
        ray_anchors.append(p0)
    d_obs = np.stack(d_obs)
    n_est = np.stack(n_est)
    ray_anchors = np.stack(ray_anchors)

    m = len(bundle.mirrors)
    k = len(rays)
    best = None

    mirrors_idx = list(range(m))
    rays_idx = list(range(k))

    for mirrors_subset in it.combinations(mirrors_idx, k):
        A = normals_local[list(mirrors_subset)]
        C = centers_local[list(mirrors_subset)]
        for perm in it.permutations(rays_idx):
            B = n_est[list(perm)]
            anchors = ray_anchors[list(perm)]
            dirs = d_obs[list(perm)]

            R = kabsch(A.T, B.T)

            P = np.eye(3)[None, ...] - dirs[:, :, None] * dirs[:, None, :]
            Rc = (R @ C.T).T
            lhs = P.sum(axis=0)
            rhs = (P @ (anchors - Rc)[:, :, None]).sum(axis=0)[:, 0]
            try:
                t = np.linalg.solve(lhs, rhs)
            except np.linalg.LinAlgError:
                continue

            normals_w = (R @ normals_local.T).T
            centers_w = (R @ centers_local.T).T + t

            total = 0.0
            for mi, rj in zip(mirrors_subset, perm):
                n = normals_w[mi]
                d_pred = D_IN - 2 * np.dot(n, D_IN) * n
                d_pred = d_pred / np.linalg.norm(d_pred)
                if np.dot(d_pred, n) < 0:
                    d_pred = -d_pred
                ang = 1.0 - np.dot(d_pred, d_obs[rj])
                dist = distance_point_to_line(centers_w[mi], ray_anchors[rj], d_obs[rj])
                rel = ray_anchors[rj] - centers_w[mi]
                along = np.dot(rel, d_obs[rj])
                div_pen = 0.0 if along > 0 else -along
                total += w_ang * ang + w_pos * dist + w_div * div_pen

            if best is None or total < best[0]:
                best = (total, list(zip(mirrors_subset, perm)), R, t)

    if best is None:
        raise RuntimeError('Assignment not found')
    return best


In [48]:
def rotation_matrix_to_euler(R: np.ndarray):
    sy = np.sqrt(R[0,0]**2 + R[1,0]**2)
    singular = sy < 1e-8
    if not singular:
        x = np.arctan2(R[2,1], R[2,2])
        y = np.arctan2(-R[2,0], sy)
        z = np.arctan2(R[1,0], R[0,0])
    else:
        x = np.arctan2(-R[1,2], R[1,1])
        y = np.arctan2(-R[2,0], sy)
        z = 0.0
    return np.rad2deg(np.array([x, y, z]))  # roll, pitch, yaw (XYZ intrinsic)


In [49]:
AXIS_LOCAL = np.array([0.0, 0.0, 1.0])
COLORS = ['#1f77b4', '#ff7f0e', '#2ca02c', '#d62728', '#9467bd', '#8c564b', '#e377c2', '#7f7f7f', '#bcbd22', '#17becf']


def compute_world(bundle, R, t):
    normals = []
    centers = []
    for m in bundle.mirrors:
        n_loc, c_loc = mirror_geometry(m)
        normals.append(n_loc)
        centers.append(c_loc)
    normals = np.stack(normals)
    centers = np.stack(centers)
    normals_w = (R @ normals.T).T
    centers_w = (R @ centers.T).T + t
    return normals_w, centers_w


def ellipse_boundary(center, normal, axis_world, fiber_diameter, samples=64):
    minor_r = fiber_diameter * 0.5
    cos_theta = float(np.clip(np.dot(normal, axis_world), -1.0, 1.0))
    major_r = minor_r / max(abs(cos_theta), 1e-6)
    tangent = axis_world - cos_theta * normal
    if np.linalg.norm(tangent) < 1e-8:
        tangent = np.cross(normal, np.array([1.0, 0.0, 0.0]))
    if np.linalg.norm(tangent) < 1e-8:
        tangent = np.cross(normal, np.array([0.0, 1.0, 0.0]))
    e1 = tangent / np.linalg.norm(tangent)
    e2 = np.cross(normal, e1)
    theta = np.linspace(0.0, 2 * np.pi, samples)
    boundary = center + major_r * np.cos(theta)[:, None] * e1 + minor_r * np.sin(theta)[:, None] * e2
    return boundary


def add_slice(fig, center, size, plane, name):
    lin = np.linspace(-size, size, 6)
    if plane == 'xy':
        X, Y = np.meshgrid(lin + center[0], lin + center[1])
        Z = np.full_like(X, center[2])
    elif plane == 'xz':
        X, Z = np.meshgrid(lin + center[0], lin + center[2])
        Y = np.full_like(X, center[1])
    else:  # yz
        Y, Z = np.meshgrid(lin + center[1], lin + center[2])
        X = np.full_like(Y, center[0])
    fig.add_trace(go.Surface(x=X, y=Y, z=Z, showscale=False, opacity=0.08, colorscale=[[0,'lightgray'],[1,'lightgray']], name=name))


def plot_solution(bundle, rays, R, t, match, ray_len=30.0, slice_scale=4.0):
    normals_w, centers_w = compute_world(bundle, R, t)
    axis_world = R @ AXIS_LOCAL
    fig = go.Figure()

    # Compute region for slices around bundle
    center_mean = centers_w.mean(axis=0)
    extent = max(np.ptp(centers_w, axis=0).max() * 1.5, bundle.fiber_diameter * slice_scale, 5.0)
    add_slice(fig, center_mean, extent, 'xy', 'slice XY')
    add_slice(fig, center_mean, extent, 'xz', 'slice XZ')
    add_slice(fig, center_mean, extent, 'yz', 'slice YZ')

    # Observed rays (as supplied)
    for j, (p0, p1) in enumerate(rays):
        fig.add_trace(go.Scatter3d(
            x=[p0[0], p1[0]], y=[p0[1], p1[1]], z=[p0[2], p1[2]],
            mode='lines', line=dict(color='rgba(0,0,150,0.7)', width=5),
            name=f'obs ray {j}'
        ))

    # Predicted rays and mirror apertures
    for i, (c, n) in enumerate(zip(centers_w, normals_w)):
        color = COLORS[i % len(COLORS)]
        # incoming ray to center
        inc_start = c - D_IN * ray_len
        fig.add_trace(go.Scatter3d(x=[inc_start[0], c[0]], y=[inc_start[1], c[1]], z=[inc_start[2], c[2]],
                        mode='lines', line=dict(color='rgba(100,100,100,0.7)', width=3, dash='dash'),
                        name=f'incoming to M{i}' if i==0 else None, showlegend=(i==0)))
        # reflected
        d_pred = D_IN - 2 * np.dot(n, D_IN) * n
        d_pred = d_pred / np.linalg.norm(d_pred)
        end = c + d_pred * ray_len
        fig.add_trace(go.Scatter3d(
            x=[c[0], end[0]], y=[c[1], end[1]], z=[c[2], end[2]],
            mode='lines', line=dict(color=color, width=4), name=f'mirror {i} pred'
        ))
        boundary = ellipse_boundary(c, n, axis_world, bundle.fiber_diameter)
        fig.add_trace(go.Scatter3d(x=boundary[:, 0], y=boundary[:, 1], z=boundary[:, 2],
                        mode='lines', line=dict(color=color, width=2), showlegend=False))
        fig.add_trace(go.Scatter3d(x=[c[0]], y=[c[1]], z=[c[2]], mode='markers+text',
                        marker=dict(color=color, size=4), text=[f'M{i}'], textposition='top center', showlegend=False))

    fig.update_layout(height=900, width=900, title='Observed rays vs predicted reflected rays',
        scene=dict(aspectmode='cube', aspectratio=dict(x=1, y=1, z=1), xaxis_title='X', yaxis_title='Y', zaxis_title='Z'),
        margin=dict(l=0, r=0, t=40, b=0))
    fig.show()


In [50]:
bundle = load_bundle_config(BUNDLE_CONFIG)
rays = load_rays_config(RAYS_CONFIG)
total, match, R, t = solve_pose(bundle, rays, w_ang=1.0, w_pos=1.0, w_div=10.0)
euler = rotation_matrix_to_euler(R)
print(f'Total cost: {total:.4f}')
print('Rotation R:')
print(R)
print(f'Euler XYZ (deg): roll={euler[0]:.3f}, pitch={euler[1]:.3f}, yaw={euler[2]:.3f}')
print('Translation t:')
print(t)
print('Assignment (mirror idx -> ray idx):', match)
plot_solution(bundle, rays, R, t, match, ray_len=30.0)


Total cost: 26.6967
Rotation R:
[[ 0.8264262   0.56120075  0.04553527]
 [ 0.5625771  -0.82633112 -0.02615125]
 [ 0.02295111  0.04722918 -0.99862037]]
Euler XYZ (deg): roll=177.292, pitch=-1.315, yaw=34.244
Translation t:
[  452.25263119 -1169.16675957  -344.37923886]
Assignment (mirror idx -> ray idx): [(0, 1), (1, 2), (2, 3), (3, 5), (4, 4), (5, 0)]


In [52]:
# Synthetic self-test
def generate_rays_from_bundle(bundle, R_true, t_true, ray_len=30.0):
    normals = []
    centers = []
    for m in bundle.mirrors:
        n_loc, c_loc = mirror_geometry(m)
        normals.append(n_loc)
        centers.append(c_loc)
    normals = np.stack(normals)
    centers = np.stack(centers)
    normals_w = (R_true @ normals.T).T
    centers_w = (R_true @ centers.T).T + t_true
    rays = []
    for c, n in zip(centers_w, normals_w):
        d_out = D_IN - 2 * np.dot(n, D_IN) * n
        d_out = d_out / np.linalg.norm(d_out)
        if np.dot(d_out, n) < 0:
            d_out = -d_out
        p0 = c.copy()
        p1 = c + d_out * ray_len
        rays.append((p0, p1))
    return rays

theta = np.deg2rad(10.0)
R_true = np.array([[np.cos(theta), -np.sin(theta), 0], [np.sin(theta), np.cos(theta), 0], [0, 0, 1]])
t_true = np.array([5.0, -3.0, 10.0])
synthetic_rays = generate_rays_from_bundle(bundle, R_true, t_true, ray_len=30.0)
total_s, match_s, R_hat, t_hat = solve_pose(bundle, synthetic_rays, w_ang=1.0, w_pos=1.0, w_div=10.0)
rot_err = np.linalg.norm(R_true - R_hat)
trans_err = np.linalg.norm(t_true - t_hat)
print('Synthetic test:')
print(f'  Total cost: {total_s:.4f}')
print(f'  Rotation error Frobenius: {rot_err:.6e}')
print(f'  Translation error norm:   {trans_err:.6e}')
euler_hat = rotation_matrix_to_euler(R_hat)
print(f'  Euler XYZ (deg) est: roll={euler_hat[0]:.3f}, pitch={euler_hat[1]:.3f}, yaw={euler_hat[2]:.3f}')
print('  Assignment (mirror idx -> ray idx):', match_s)
plot_solution(bundle, synthetic_rays, R_hat, t_hat, match_s, ray_len=30.0)


Synthetic test:
  Total cost: 2.1186
  Rotation error Frobenius: 2.828427e+00
  Translation error norm:   2.702123e+00
  Euler XYZ (deg) est: roll=91.053, pitch=-0.000, yaw=-170.000
  Assignment (mirror idx -> ray idx): [(0, 3), (1, 1), (2, 2), (3, 0), (4, 5), (5, 4), (6, 6)]
