# Abstraction and Texturing #

In [2]:
import os
import sys
import json
from pathlib import Path
import open3d as o3d
from PIL import Image, ImageDraw, ImageFilter
import numpy as np
import matplotlib.pyplot as plt
from matplotlib import cm
from pipeline.vignette_data import ProcessedVignette
from typing import Dict, Any, Optional, List
from scipy.spatial import ConvexHull
from scipy.interpolate import griddata
from scipy.ndimage import map_coordinates

# get the root for importing test data
os.environ["SAM_BACKEND"] = "coreml"
project_root = Path.cwd()
if str(project_root) not in sys.path:
    sys.path.append(str(project_root))

# Configuration
VIGNETTE_NAME = "capture2"
VIGNETTE_PATH = project_root / "test_data" / VIGNETTE_NAME
print(f"Project Root: {project_root}")
print(f"Testing Vignette: {VIGNETTE_PATH}")

# Paths for the inputs and outputs
rgb_path = VIGNETTE_PATH / "rgb.png"
metadata_path = VIGNETTE_PATH / "metadata.json"
results_path = VIGNETTE_PATH / "results"

Project Root: /Users/yuzhenzhang/Documents/Research/TestApp/SpatialVignetteServer
Testing Vignette: /Users/yuzhenzhang/Documents/Research/TestApp/SpatialVignetteServer/test_data/capture2


## Texturing ##

In [4]:
def project_points_to_2d(points_3d, intrinsics):
    """Projects 3D points (in original camera space) to 2D image coordinates."""
    projected = intrinsics @ points_3d.T
    d = projected[2, :] + 1e-6
    u = projected[0, :] / d
    v = projected[1, :] / d
    return np.vstack((u, v)).T

def plane_equation_to_point_normal(equation, fallback_point):
    """Derives a point and normal from a plane equation [a,b,c,d]."""
    normal = equation[:3]
    normal_norm = np.linalg.norm(normal)
    if normal_norm == 0: return fallback_point, np.array([0,0,1])
    normal = normal / normal_norm
    v = fallback_point - np.array([0, 0, 0])
    dist = np.dot(v, normal) + equation[3]
    point_on_plane = fallback_point - dist * normal
    return point_on_plane, normal


def find_minimum_bounding_rectangle(points_2d):
    """Finds the minimum area bounding rectangle for a set of 2D points."""
    if len(points_2d) < 3: return None
    hull = ConvexHull(points_2d)
    hull_points = points_2d[hull.vertices]
    min_area = float('inf')
    best_box = None
    for i in range(len(hull_points)):
        p1, p2 = hull_points[i], hull_points[(i + 1) % len(hull_points)]
        edge_vector = p2 - p1
        edge_angle = np.arctan2(edge_vector[1], edge_vector[0])
        c, s = np.cos(-edge_angle), np.sin(-edge_angle)
        rotation = np.array([[c, -s], [s, c]])
        rotated_points = hull_points @ rotation.T
        min_x, max_x = np.min(rotated_points[:, 0]), np.max(rotated_points[:, 0])
        min_y, max_y = np.min(rotated_points[:, 1]), np.max(rotated_points[:, 1])
        area = (max_x - min_x) * (max_y - min_y)
        if area < min_area:
            min_area = area
            corners_in_rotated_frame = np.array([
                [min_x, min_y], [max_x, min_y], [max_x, max_y], [min_x, max_y]
            ])
            best_box = corners_in_rotated_frame @ rotation
    return best_box

def get_plane_bounding_corners(inlier_points_centered, plane_equation) -> Optional[np.ndarray]:
    """Calculates the 4 corners of the plane's tightest bounding rectangle."""
    print("\n[Task] Calculating the 4 corners of the plane's bounding rectangle...")
    obb_center_centered = np.array(plane_equation.get('obb_center'))
    point_on_plane, normal = plane_equation_to_point_normal(plane_equation['equation'], fallback_point=obb_center_centered)
    axis_z = normal
    arbitrary_vec = np.array([0,0,1])
    if np.allclose(np.abs(np.dot(axis_z, arbitrary_vec)), 1.0): arbitrary_vec = np.array([0,1,0])
    axis_x = np.cross(axis_z, arbitrary_vec); axis_x /= np.linalg.norm(axis_x)
    axis_y = np.cross(axis_z, axis_x)
    vectors_from_origin = inlier_points_centered - point_on_plane
    points_2d = np.vstack([np.dot(vectors_from_origin, axis_x), np.dot(vectors_from_origin, axis_y)]).T
    corners_2d = find_minimum_bounding_rectangle(points_2d)
    if corners_2d is None:
        print("   - ERROR: Could not compute 2D bounding box.")
        return None
    corners_3d_centered = point_on_plane + corners_2d[:, 0, np.newaxis] * axis_x + corners_2d[:, 1, np.newaxis] * axis_y
    print("   - Successfully calculated 4 corners in 3D (centered space).")
    return corners_3d_centered

def create_texture_from_corner_distortion(rgb_image_path, corners_3d_world, corners_2d_pixels, output_path):
    """Creates a perspective-corrected texture based on corner distortion."""
    print("\n[Task] Creating perspective-corrected texture from corner distortion...")
    try:
        source_image = Image.open(rgb_image_path)
        w, h = source_image.size
    except Exception as e:
        print(f"   - ERROR: Could not load image. Error: {e}")
        return None, None
    edge_3d_1, edge_3d_2 = np.linalg.norm(corners_3d_world[1] - corners_3d_world[0]), np.linalg.norm(corners_3d_world[3] - corners_3d_world[0])
    edge_2d_1_vec, edge_2d_2_vec = corners_2d_pixels[1] - corners_2d_pixels[0], corners_2d_pixels[3] - corners_2d_pixels[0]
    edge_2d_1, edge_2d_2 = np.linalg.norm(edge_2d_1_vec), np.linalg.norm(edge_2d_2_vec)
    if abs(edge_2d_1_vec[0]) > abs(edge_2d_1_vec[1]):
        h_edge_3d, v_edge_3d, h_edge_2d, v_edge_2d = edge_3d_1, edge_3d_2, edge_2d_1, edge_2d_2
    else:
        h_edge_3d, v_edge_3d, h_edge_2d, v_edge_2d = edge_3d_2, edge_3d_1, edge_2d_2, edge_2d_1
    if h_edge_3d == 0 or v_edge_3d == 0: return None, None
    density_h, density_v = h_edge_2d / h_edge_3d, v_edge_2d / v_edge_3d
    if density_h == 0 or density_v == 0: return None, None
    target_density = max(density_h, density_v)
    scale_w, scale_h = target_density / density_h, target_density / density_v
    new_w, new_h = int(w * scale_w), int(h * scale_h)
    if new_w == 0 or new_h == 0: return None, None
    print(f"   - Final upscaled image size: {new_w}x{new_h}")
    upscaled_image = source_image.resize((new_w, new_h), Image.Resampling.LANCZOS)
    upscaled_image.save(output_path)
    print(f"   - Saved upscaled texture to: {output_path}")
    return (new_w, new_h), (scale_w, scale_h)

def project_inliers_onto_plane(inlier_points_centered, plane_equation) -> np.ndarray:
    """Projects the noisy inlier points onto the ideal mathematical plane."""
    print("\n[Task] Projecting inlier points onto the ideal plane...")
    eq = np.array(plane_equation['equation'])
    normal, d = eq[:3], eq[3]
    distances = inlier_points_centered @ normal + d
    projected_points_centered = inlier_points_centered - distances[:, np.newaxis] * normal
    print(f"   - Successfully projected {len(projected_points_centered)} points.")
    return projected_points_centered

# Need some refinements here
def create_distorted_texture(source_image_path, source_points, dest_points, output_path):
    """Warps an image based on a sparse vector field of moving points."""
    print("\n[Task] Creating distorted texture using vector field warp...")
    try:
        source_image = np.array(Image.open(source_image_path))
        h, w, _ = source_image.shape
    except Exception as e:
        print(f"   - ERROR: Could not load source upscaled image. Error: {e}")
        return
    vectors = dest_points - source_points
    grid_y, grid_x = np.mgrid[0:h, 0:w]
    displacement_field = griddata(dest_points, vectors, (grid_x, grid_y), method='cubic', fill_value=0)
    map_y = grid_y - displacement_field[:,:,1]
    map_x = grid_x - displacement_field[:,:,0]
    warped_image_r = map_coordinates(source_image[:,:,0], [map_y, map_x], order=1, mode='constant', cval=0)
    warped_image_g = map_coordinates(source_image[:,:,1], [map_y, map_x], order=1, mode='constant', cval=0)
    warped_image_b = map_coordinates(source_image[:,:,2], [map_y, map_x], order=1, mode='constant', cval=0)
    distorted_image_np = np.stack([warped_image_r, warped_image_g, warped_image_b], axis=-1).astype(np.uint8)
    distorted_image = Image.fromarray(distorted_image_np)
    distorted_image.save(output_path)
    print(f"   - Saved distorted texture to: {output_path}")

# Revise this for a smoother mask, is the math the same resolution?
def create_mask_on_texture(texture_path, scaled_points_2d, point_radius, output_mask_path):
    """Creates and saves a mask by drawing circles on a blank canvas."""
    print("\n[Task] Creating mask for the upscaled texture...")
    try:
        texture = Image.open(texture_path)
        mask = Image.new('L', texture.size, 0)
        draw = ImageDraw.Draw(mask)
        for p in scaled_points_2d:
            bbox = (p[0] - point_radius, p[1] - point_radius, p[0] + point_radius, p[1] + point_radius)
            draw.ellipse(bbox, fill=255)
        mask.save(output_mask_path)
        print(f"   - Saved mask to: {output_mask_path}")
    except Exception as e:
        print(f"   - ERROR: Could not create mask. Error: {e}")

def apply_mask_to_texture(distorted_path: str, mask_path: str, output_path: str, feather: float = 0.0):
    """
    Keep only the masked pixels from `distorted_path`, make all others transparent,
    and save a new PNG with alpha at `output_path`.
    """
    img = Image.open(distorted_path).convert("RGBA")
    mask = Image.open(mask_path).convert("L")

    if mask.size != img.size:
        mask = mask.resize(img.size, resample=Image.BILINEAR)

    if feather > 0:
        mask = mask.filter(ImageFilter.GaussianBlur(radius=feather))

    img_np = np.array(img).astype(np.float32)
    mask_np = np.array(mask).astype(np.float32) / 255.0

    # Premultiply RGB by mask
    img_np[..., 0:3] *= mask_np[..., None]

    # Set alpha = mask
    img_np[..., 3] = mask_np * 255.0

    out = Image.fromarray(img_np.astype(np.uint8), mode="RGBA")
    out.save(output_path)
    print(f"   - Saved masked texture to: {output_path}")

## Rectify plane ##

In [12]:
import numpy as np
from PIL import Image
from scipy.ndimage import map_coordinates

# ---------- small helpers ----------

def _homography_from_4pts(src_xy, dst_uv):
    """
    DLT homography from 4 points.
    src_xy: (4,2) -> rectangle or plane coords
    dst_uv: (4,2) -> image pixel coords
    Returns H s.t. [u v 1]^T ~ H [x y 1]^T
    """
    A = []
    for (x, y), (u, v) in zip(src_xy, dst_uv):
        A.append([x, y, 1, 0, 0, 0, -u*x, -u*y, -u])
        A.append([0, 0, 0, x, y, 1, -v*x, -v*y, -v])
    A = np.asarray(A, dtype=np.float64)
    _, _, Vt = np.linalg.svd(A)
    h = Vt[-1] / Vt[-1, -1]
    return h.reshape(3, 3)

def _warp_rect_from_quad(image_np, quad_uv, W, H):
    """
    Sample 'image_np' over a W x H rect by mapping each rect pixel
    to the image quad via homography. Returns uint8 RGBA.
    quad_uv: (4,2) pixel coords in order [C0, C1, C2, C3]
    """
    rect_pts = np.array([[0, 0], [W-1, 0], [W-1, H-1], [0, H-1]], dtype=np.float64)
    H_rect_to_img = _homography_from_4pts(rect_pts, quad_uv.astype(np.float64))

    xs = np.linspace(0, W-1, W)
    ys = np.linspace(0, H-1, H)
    X, Y = np.meshgrid(xs, ys)
    ones = np.ones_like(X)
    XY1 = np.stack([X, Y, ones], axis=-1)             # (H, W, 3)
    UVW = XY1 @ H_rect_to_img.T
    U = UVW[..., 0] / (UVW[..., 2] + 1e-9)
    V = UVW[..., 1] / (UVW[..., 2] + 1e-9)

    out = np.zeros((H, W, image_np.shape[2]), dtype=np.float32)
    for c in range(image_np.shape[2]):
        out[..., c] = map_coordinates(image_np[..., c], [V, U],
                                      order=1, mode='constant', cval=0.0)
    return np.clip(out, 0, 255).astype(np.uint8)

# ---------- main 2-step function ----------

def rectify_plane_texture_two_step(
    masked_texture_path: str,
    corners_pixels: np.ndarray,    # (4,2) pixel coords in the *same image space* as masked_texture
    corners_3d_world: np.ndarray,  # (4,3) 3D corners in camera/world coords
    output_path: str
):
    """
    Step 1: crop quad -> rectangle using current UVs (pixel corners).
    Step 2: remove foreshortening by rescaling to plane's metric aspect (Lx/Ly).
    Output keeps highest reasonable detail (no downsampling along either axis).
    Corner order must be [C0, C1, C2, C3] going around the quad
    such that C0->C1 and C0->C3 are the two edges that meet at C0.
    """
    # Load masked RGBA (already premultiplied in your pipeline)
    img = Image.open(masked_texture_path).convert("RGBA")
    img_np = np.array(img).astype(np.float32)

    # --- STEP 1: quad crop (choose rect size from *projected* edge lengths) ---
    c0, c1, c2, c3 = [np.asarray(p, dtype=np.float64) for p in corners_pixels]
    W1 = int(np.ceil(np.linalg.norm(c1 - c0)))
    H1 = int(np.ceil(np.linalg.norm(c3 - c0)))
    W1 = max(W1, 1); H1 = max(H1, 1)

    step1_np = _warp_rect_from_quad(img_np, np.stack([c0, c1, c2, c3], axis=0), W1, H1)

    # --- STEP 2: un-foreshorten to true plane aspect using 3D edge lengths ---
    C0, C1, C2, C3 = [np.asarray(P, dtype=np.float64) for P in corners_3d_world]
    Lx = float(np.linalg.norm(C1 - C0))   # metric width
    Ly = float(np.linalg.norm(C3 - C0))   # metric height
    if Lx <= 1e-12 or Ly <= 1e-12:
        raise ValueError("Degenerate plane edges (zero length).")

    # Preserve detail: pick pixels-per-meter so we don't downsample either axis
    s = max(W1 / Lx, H1 / Ly)
    W_out = max(1, int(np.ceil(Lx * s)))
    H_out = max(1, int(np.ceil(Ly * s)))

    # Resample step1 to (W_out, H_out) with high-quality filter
    step1_img = Image.fromarray(step1_np, mode="RGBA")
    step2_img = step1_img.resize((W_out, H_out), Image.Resampling.LANCZOS)
    step2_img.save(output_path)
    print(f"✔ Rectified plane texture saved: {output_path}  (size: {W_out}x{H_out})")

In [None]:
def create_textured_planes(
    vignette: "ProcessedVignette",
    rgb_image_path: str,
    output_dir: str,
    point_radius: int = 10,
    target_plane_id: int = -1 # -1 for all planes
) -> None:
    """
    Main method that takes in a processed vignette, take its texture and plane abstraction.
    Create a set of textured planes.
    """
    # Clear previous textured planes
    vignette.clear_abstractions('textured_planes', auto_save=False)

    # Load necessary data
    planes = vignette.get_abstractions('planes')
    if not planes:
        print("   - No planes found. Aborting.")
        return
    capture_meta = vignette.metadata.get('capture_metadata', {})
    center_offset = np.array(capture_meta.get('center_offset', [0, 0, 0]))
    intrinsics = np.array(capture_meta.get('camera_intrinsics', {}).get('columns', np.identity(3))).T

    try:
        source_image = Image.open(rgb_image_path).convert("RGBA")
        w, h = source_image.size
    except Exception as e:
        print(f"   - ERROR: Could not read source image at {rgb_image_path}. Error: {e}")
        return
    
    # Prepare asset folder
    assets_path = Path(output_dir) / "assets" / "planes"
    assets_path.mkdir(parents=True, exist_ok=True)

    textured_planes_list: List[Dict[str, Any]] = []

    # For each plane
    for plane in planes:
        plane_id = plane.get('plane_id')
        if target_plane_id != -1 and target_plane_id != plane_id:
            print(f"\n--- Skipping plane {plane_id} ---")
            continue
        print(f"\n--- Processing plane {plane_id} ---")

        # get plane bounding corners 
        inlier_points_centered = vignette.points[plane['point_indices']] # Points on the plane
        corners_3d_centered = get_plane_bounding_corners(inlier_points_centered, plane)
        if corners_3d_centered is None: 
            print("   - Plane corneres not found. Aborting.")
            return
        corners_3d_world = corners_3d_centered + center_offset
        corners_2d_pixels = project_points_to_2d(corners_3d_world, intrinsics)

        # Get a stretched texture to ensure pixel density
        output_texture_path = assets_path / f"plane_{plane_id}_perspective_corrected.png"
        new_dims, scale_factors = create_texture_from_corner_distortion(rgb_image_path, corners_3d_world, corners_2d_pixels, str(output_texture_path))
        if not new_dims:
            print("   - Error distorting plane. Aborting.")
            return
        new_w, new_h = new_dims
       
        # Project all points onto the plane
        projected_points_centered = project_inliers_onto_plane(inlier_points_centered, plane)

        # Distort texture with point projection vectors
        inlier_points_world = inlier_points_centered + center_offset
        inlier_pixel_coords = project_points_to_2d(inlier_points_world, intrinsics)
        scaled_inlier_coords = inlier_pixel_coords * np.array(scale_factors)

        projected_points_world = projected_points_centered + center_offset
        projected_pixel_coords = project_points_to_2d(projected_points_world, intrinsics)
        scaled_projected_coords = projected_pixel_coords * np.array(scale_factors)

        output_distorted_path = assets_path / f"plane_{plane_id}_distorted.png"
        create_distorted_texture(str(output_texture_path), scaled_inlier_coords, scaled_projected_coords, str(output_distorted_path))

        # Create a mask for this plane
        output_mask_path = assets_path / f"plane_{plane_id}_mask.png"
        create_mask_on_texture(str(output_distorted_path), scaled_projected_coords, point_radius=point_radius, output_mask_path=str(output_mask_path))

        # Apply mask to distorted_texture
        output_masked_texture_path = assets_path / f"plane_{plane_id}_distorted_masked.png"
        apply_mask_to_texture(
            distorted_path=str(output_distorted_path),
            mask_path=str(output_mask_path),
            output_path=str(output_masked_texture_path),
            feather=0.5 * point_radius,   # optional: soften edges a bit
        )

        # Get a cropped texture without perspective foreshortening
        output_rectified_path = assets_path / f"plane_{plane_id}_rectified.png"
        scaled_corners_pixels = corners_2d_pixels * np.array(scale_factors)
        rectify_plane_texture_two_step(
            masked_texture_path=str(output_masked_texture_path),
            corners_pixels=scaled_corners_pixels,   # (4,2) in the SAME image space as masked texture
            corners_3d_world=corners_3d_world,      # (4,3)
            output_path=str(output_rectified_path)
        )

        # Save results in the vignette
        corner_uvs = corners_2d_pixels / np.array(source_image.size)
        textured_planes_list.append({
            "plane_id": plane_id,
            "corners_3d_centered": corners_3d_centered.tolist(),
            "corner_uvs": corner_uvs.tolist(),
            "mesh_faces": [[0, 1, 3], [1, 2, 3]], # Quad faces
            "scale_factors": {
                "w": scale_factors[0],
                "h": scale_factors[1]
            },
            "asset_paths": {
                "perspective_corrected": str(output_texture_path.relative_to(output_texture_path.parent.parent.parent)),
                "distorted": str(output_distorted_path.relative_to(output_distorted_path.parent.parent.parent)),
                "mask": str(output_mask_path.relative_to(output_mask_path.parent.parent.parent)),
                "distorted_masked": str(output_masked_texture_path.relative_to(output_masked_texture_path.parent.parent.parent)),
                "rectified": str(output_rectified_path.relative_to(output_rectified_path.parent.parent.parent))
            }
        })
        print(f"   - Successfully saved analysis results for plane {plane_id}.")

    vignette.metadata['textured_abstractions'] = {'planes': textured_planes_list}
    vignette.save()
    print("\n--- Finished Plane-Driven Textured Plane Creation ---")
    

In [2]:
processed_vignette_path = results_path / "processed_vignette.npz"
from pipeline.vignette_data import ProcessedVignette
processed_vignette = ProcessedVignette.load(processed_vignette_path)
create_textured_planes(processed_vignette, rgb_path, VIGNETTE_PATH)

Set/updated per-point attribute: 'confidence'.
Set/updated per-point attribute: 'plane_id'.
Set/updated per-point attribute: 'cylinder_id'.
Set/updated per-point attribute: 'sphere_id'.
Set/updated per-point attribute: 'cuboid_id'.
Set/updated per-point attribute: 'component_id'.
Set/updated per-point attribute: 'best_fit_id'.
Loaded processed vignette from: /Users/yuzhenzhang/Documents/Research/TestApp/SpatialVignetteServer/test_data/capture2/results/processed_vignette.npz


NameError: name 'create_textured_planes' is not defined

In [20]:
from open3d.visualization import rendering

# Visualize them
def visualize_planes(vignette: "ProcessedVignette", show_points: bool = True, target_plane_id: int = -1):
    textured_planes = vignette.metadata.get('textured_abstractions', {}).get('planes', [])
    if not textured_planes:
        print("  - No textured plane found. Aborting.")
        return
    
    geometries_to_draw = []

    # Points
    if show_points:
        pcd = processed_vignette.to_open3d()
        colors_np = np.asarray(pcd.colors)
        
        if target_plane_id != -1:
            # Color points for the plane
            abs_plane = next((p for p in processed_vignette.get_abstractions('planes') if p.get('plane_id') == target_plane_id), None)
            if abs_plane and abs_plane.get('point_indices'):
                idx = np.asarray(abs_plane['point_indices'], dtype=int)
                idx = idx[(idx >= 0) & (idx < colors_np.shape[0])]
                colors_np[idx] = [1.0, 0.0, 0.0] # Red
        
        # colors_np = np.zeros((colors_np.shape[0], 3), dtype=float)

        pcd.colors = o3d.utility.Vector3dVector(colors_np)
        pcd_mat = rendering.MaterialRecord()
        pcd_mat.shader = "defaultUnlit"

        geometries_to_draw.append({"name": "point_cloud", "geometry": pcd, "material": pcd_mat})
        print("--- Visualized Points ---")

    # Planes
    for plane in textured_planes:
        plane_id = plane.get('plane_id')
        if target_plane_id != -1 and target_plane_id != plane_id:
            print(f"--- Skipping plane {plane_id} ---")
            continue

        print(f"--- Preparing plane {plane_id} ---")
        # Get geometry
        vertices_np = np.asarray(plane['corners_3d_centered'])
        #faces_ij_k = plane['mesh_faces']

        front_faces = [list(map(int, f)) for f in plane['mesh_faces']]  # [[i,j,k], ...]
        # back_faces  = [[tri[0], tri[2], tri[1]] for tri in front_faces]        # reverse winding for back
        all_faces   = front_faces # + back_faces
        # plane_mesh, all_faces = build_double_sided_mesh(vertices_np, faces_ij_k)
        plane_mesh = o3d.geometry.TriangleMesh(
            o3d.utility.Vector3dVector(vertices_np),
            o3d.utility.Vector3iVector(all_faces),
        )

        # Add UVs
        # Use 0-1 instead
        # per_vertex_uv = np.asarray(plane['corner_uvs'], dtype=float)
        per_vertex_uv = np.array([
            [0.0, 0.0],  # corner 0
            [1.0, 0.0],  # corner 1
            [1.0, 1.0],  # corner 2
            [0.0, 1.0],  # corner 3
        ], dtype=float)
        per_vertex_uv[:, 1] = 1.0 - per_vertex_uv[:, 1] # flip v

        tri_uv_list = []
        for (i, j, k) in all_faces:
            tri_uv_list.extend([per_vertex_uv[i], per_vertex_uv[j], per_vertex_uv[k]])
        tri_uvs = np.asarray(tri_uv_list, dtype=float)  # shape: (num_triangles*3, 2)

        plane_mesh.triangle_uvs = o3d.utility.Vector2dVector(tri_uvs)
        plane_mesh.compute_vertex_normals()

        # Transparent material (default)
        tex_path = (VIGNETTE_PATH / plane['asset_paths']['rectified']).resolve()
        # tex_path = (VIGNETTE_PATH / 'rgb.png').resolve()
        mat = rendering.MaterialRecord()
        mat.shader = "defaultLitTransparency"
        mat.albedo_img = o3d.io.read_image(str(tex_path))

        geometries_to_draw.append({
            "name": f"plane_{plane_id}",
            "geometry": plane_mesh,
            "material": mat
        })

        print(f"  - Plane {plane_id} Ready")

    o3d.visualization.draw(
        geometries_to_draw,
        show_skybox = False,
        bg_color=(1, 1, 1, 1),
    )

In [23]:
visualize_planes(processed_vignette, target_plane_id=-1)

Generating Open3D point cloud with 'rgb' colors...
--- Visualized Points ---
--- Preparing plane 1 ---
  - Plane 1 Ready
--- Preparing plane 2 ---
  - Plane 2 Ready
--- Preparing plane 3 ---
  - Plane 3 Ready
--- Preparing plane 4 ---
  - Plane 4 Ready
--- Preparing plane 5 ---
  - Plane 5 Ready
--- Preparing plane 6 ---
  - Plane 6 Ready
--- Preparing plane 7 ---
  - Plane 7 Ready


### Debug UVs ###

In [15]:
# --- UV Debug Saver (no rendering; all planes) --------------------------------
# Assumes these exist in your notebook:
#   VIGNETTE_PATH: pathlib.Path to your vignette folder
#   processed_vignette: your loaded ProcessedVignette instance

import numpy as np
from pathlib import Path
from PIL import Image, ImageDraw, ImageFont

# =========================
# Helpers for UV debugging
# =========================
def _draw_cross(draw: ImageDraw.ImageDraw, xy, size=8, width=2):
    x, y = xy
    draw.line((x - size, y, x + size, y), width=width)
    draw.line((x, y - size, x, y + size), width=width)

def _uv_to_px_top_left(u, v, W, H):
    # (0,0) at top-left; v increases downward (image-space)
    return (u * (W - 1), v * (H - 1))

def _uv_to_px_bottom_left(u, v, W, H):
    # (0,0) at bottom-left; v increases upward (GL-style)
    return (u * (W - 1), (1.0 - v) * (H - 1))

def save_uv_debug_images(texture_path: Path, uv_corners, out_dir: Path, plane_id: int):
    """
    Draw the 4 UV corner points over the texture and save two PNGs:
      - *_UV-topLeft.png     (assumes v downward)
      - *_UV-bottomLeft.png  (assumes v upward)
    """
    out_dir.mkdir(parents=True, exist_ok=True)
    img = Image.open(texture_path).convert("RGBA")
    W, H = img.size

    canvases = [
        ("topLeft", _uv_to_px_top_left),
        ("bottomLeft", _uv_to_px_top_left),
    ]

    stroke = 3
    try:
        font = ImageFont.load_default()
    except Exception:
        font = None

    uv_corners = np.asarray(uv_corners, dtype=float).reshape(4, 2)

    for label, uv2px in canvases:
        canvas = img.copy()
        draw = ImageDraw.Draw(canvas)

        # Convert each UV to pixel coordinates
        px = [uv2px(float(u), float(v), W, H) for (u, v) in uv_corners]

        # Connect corners (0->1->2->3->0) and draw crosshairs + labels
        draw.line([*px, px[0]], width=stroke)
        for i, (x, y) in enumerate(px):
            _draw_cross(draw, (x, y), size=8, width=stroke)
            draw.text((x + 6, y + 6), f"{i} ({uv_corners[i,0]:.3f},{uv_corners[i,1]:.3f})", font=font)

        out_path = out_dir / f"{texture_path.stem}_plane{plane_id}_UV-{label}.png"
        canvas.save(out_path)
        print(f"[UV DEBUG] Saved: {out_path}")

# ==================================
# Gather planes and write UV overlays
# ==================================
textured_abstractions = processed_vignette.metadata.get('textured_abstractions', {}).get('planes', [])
uv_debug_dir = (VIGNETTE_PATH / "debug_uvs").resolve()

if not textured_abstractions:
    print("No textured planes found to debug.")
else:
    print(f"Found {len(textured_abstractions)} textured planes. Writing UV debug overlays...")
    saved_count = 0
    skipped_count = 0

    for plane_data in textured_abstractions:
        plane_id = plane_data.get('plane_id', 'unknown')
        # Expecting plane_data['mesh_uvs'] to be 4 corners in [0,1]
        uv_corners = plane_data.get('corner_uvs', None)
        asset_paths = plane_data.get('asset_paths', None)
        rel_tex = asset_paths.get('mask', None)

        if uv_corners is None or rel_tex is None:
            print(f"[SKIP] Plane {plane_id}: missing mesh_uvs or texture_path.")
            skipped_count += 1
            continue

        tex_path = (VIGNETTE_PATH / rel_tex).resolve()
        if not tex_path.exists():
            print(f"[SKIP] Plane {plane_id}: texture not found at {tex_path}")
            skipped_count += 1
            continue

        try:
            save_uv_debug_images(
                texture_path=tex_path,
                uv_corners=np.asarray(uv_corners, dtype=float),
                out_dir=uv_debug_dir,
                plane_id=plane_id
            )
            saved_count += 1
        except Exception as e:
            print(f"[ERROR] Plane {plane_id}: failed to save UV debug: {e}")
            skipped_count += 1

    print(f"\nDone. Planes processed: {len(textured_abstractions)} | Saved: {saved_count} | Skipped: {skipped_count}")
    print("Check your UV overlays in:", uv_debug_dir)
# ------------------------------------------------------------------------------

Found 7 textured planes. Writing UV debug overlays...
[UV DEBUG] Saved: /Users/yuzhenzhang/Documents/Research/TestApp/SpatialVignetteServer/test_data/capture2/debug_uvs/plane_1_mask_plane1_UV-topLeft.png
[UV DEBUG] Saved: /Users/yuzhenzhang/Documents/Research/TestApp/SpatialVignetteServer/test_data/capture2/debug_uvs/plane_1_mask_plane1_UV-bottomLeft.png
[UV DEBUG] Saved: /Users/yuzhenzhang/Documents/Research/TestApp/SpatialVignetteServer/test_data/capture2/debug_uvs/plane_2_mask_plane2_UV-topLeft.png
[UV DEBUG] Saved: /Users/yuzhenzhang/Documents/Research/TestApp/SpatialVignetteServer/test_data/capture2/debug_uvs/plane_2_mask_plane2_UV-bottomLeft.png
[UV DEBUG] Saved: /Users/yuzhenzhang/Documents/Research/TestApp/SpatialVignetteServer/test_data/capture2/debug_uvs/plane_3_mask_plane3_UV-topLeft.png
[UV DEBUG] Saved: /Users/yuzhenzhang/Documents/Research/TestApp/SpatialVignetteServer/test_data/capture2/debug_uvs/plane_3_mask_plane3_UV-bottomLeft.png
[UV DEBUG] Saved: /Users/yuzhenzhang/

In [2]:
processed_vignette_path = results_path / "processed_vignette.npz"
from pipeline.vignette_data import ProcessedVignette
processed_vignette = ProcessedVignette.load(processed_vignette_path)

Set/updated per-point attribute: 'confidence'.
Set/updated per-point attribute: 'plane_id'.
Set/updated per-point attribute: 'cylinder_id'.
Set/updated per-point attribute: 'sphere_id'.
Set/updated per-point attribute: 'cuboid_id'.
Set/updated per-point attribute: 'component_id'.
Set/updated per-point attribute: 'best_fit_id'.
Set/updated per-point attribute: 'curvature'.
Set/updated per-point attribute: 'edgeness'.
Set/updated per-point attribute: 'anisotropy'.
Set/updated per-point attribute: 'planarity'.
Set/updated per-point attribute: 'sphericity'.
Set/updated per-point attribute: 'flow_vectors'.
Loaded processed vignette from: /Users/yuzhenzhang/Documents/Research/TestApp/SpatialVignetteServer/test_data/capture2/results/processed_vignette.npz


In [7]:
# Test PCA
from pipeline import abstraction
# Identify Components
abstraction.identify_components(
    processed_vignette, 
    cluster_eps=0.03, # Neighborhood radius
    min_cluster_points=100
)

# PCA Structural Analysis
abstraction.analyze_structural_properties(processed_vignette, True)

Identifying components with eps=0.03...
Set/updated per-point attribute: 'component_id'.
   - Found 2 components and assigned 'component_id' attribute.
Analyzing structural properties with PCA...
Cleared 4 abstractions of type 'structural_properties'.
Added new abstraction of type 'structural_properties'.
   - Global properties: L=0.64, P=0.28, S=0.08
   - Analyzing 2 components...
     - Component #0: L=0.90, P=0.08, S=0.01
Added new abstraction of type 'structural_properties'.
     - Component #1: L=0.64, P=0.20, S=0.16
Added new abstraction of type 'structural_properties'.
Saved processed vignette to: /Users/yuzhenzhang/Documents/Research/TestApp/SpatialVignetteServer/test_data/capture2/results/processed_vignette.npz
Finished structural property analysis.


In [5]:
import open3d as o3d
import numpy as np
from typing import List, Dict, Any

# Assuming your ProcessedVignette class is available
# from vignette import ProcessedVignette 

def create_axis_visual(
    properties: Dict[str, Any], 
    scale_factor: float = 1.0
) -> o3d.geometry.LineSet:
    """
    Creates an Open3D LineSet to represent the PCA axes.

    Args:
        properties: A dictionary containing 'centroid', 'axes', and 'variances'.
        scale_factor: A multiplier to adjust the visual length of the axes.

    Returns:
        An o3d.geometry.LineSet object representing the three axes.
    """
    center = np.array(properties['centroid'])
    # axes is a list of 3 lists, each being a vector
    axes = np.array(properties['axes']) 
    # variances is a list of 3 floats
    variances = np.array(properties['variances'])

    # The length of each axis line will be proportional to the standard deviation
    # We use [:, np.newaxis] to properly broadcast the multiplication
    axis_lengths = np.sqrt(variances) * scale_factor
    endpoints = center + axes * axis_lengths[:, np.newaxis]

    # The points for our LineSet: the center, then the 3 endpoints
    points = np.vstack([center, endpoints])
    
    # The lines connect the center (index 0) to each endpoint (indices 1, 2, 3)
    lines = [[0, 1], [0, 2], [0, 3]]
    
    # Colors for the axes: Red (primary), Green (secondary), Blue (tertiary)
    # The order of pca.components_ is already from largest to smallest variance.
    colors = [[1, 0, 0], [0, 1, 0], [0, 0, 1]] 

    line_set = o3d.geometry.LineSet()
    line_set.points = o3d.utility.Vector3dVector(points)
    line_set.lines = o3d.utility.Vector2iVector(lines)
    line_set.colors = o3d.utility.Vector3dVector(colors)
    
    return line_set


def visualize_vignette_abstractions(
    vignette: 'ProcessedVignette',
    draw_global_props: bool = True,
    draw_component_props: bool = True,
    axis_scale_factor: float = 1.0
):
    """
    Creates an interactive Open3D visualization of a vignette and its abstractions.

    - Colors the point cloud by 'component_id'.
    - Draws the principal axes for the global shape and/or individual components.

    Args:
        vignette: The ProcessedVignette object to visualize.
        draw_global_props: If True, draws the PCA axes for the entire vignette.
        draw_component_props: If True, draws the PCA axes for each component.
        axis_scale_factor: Adjusts the visual length of the drawn axes.
    """
    # 1. Generate the point cloud colored by component ID
    try:
        pcd = vignette.to_open3d(color_mode='component_id')
    except AttributeError:
        print("Warning: 'component_id' not found. Displaying with RGB colors.")
        pcd = vignette.to_open3d(color_mode='rgb')

    geometries_to_draw = [pcd]
    
    # 2. Get the structural properties
    all_props = vignette.get_abstractions('structural_properties')
    if not all_props:
        print("No structural properties found to visualize.")
        o3d.visualization.draw_geometries([pcd])
        return

    # 3. Create geometries for the properties
    for props in all_props:
        if props['type'] == 'global' and draw_global_props:
            print("Visualizing global properties...")
            global_axes = create_axis_visual(props, scale_factor=axis_scale_factor)
            geometries_to_draw.append(global_axes)
            
        elif props['type'] == 'component' and draw_component_props:
            comp_id = props['component_id']
            print(f"Visualizing properties for component #{comp_id}...")
            component_axes = create_axis_visual(props, scale_factor=axis_scale_factor)
            geometries_to_draw.append(component_axes)
            
    # 4. Launch the visualizer
    print("\nLaunching Open3D visualizer...")
    print(" - Press 'q' or 'esc' to close the window.")
    o3d.visualization.draw_geometries(geometries_to_draw)

In [8]:
visualize_vignette_abstractions(
    processed_vignette,
    draw_global_props=True,    # Show axes for the whole object
    draw_component_props=True, # Show axes for each colored part
    axis_scale_factor=1
)

Generating Open3D point cloud with 'component_id' colors...
Visualizing global properties...
Visualizing properties for component #0...
Visualizing properties for component #1...

Launching Open3D visualizer...
 - Press 'q' or 'esc' to close the window.


## Symmetry ##

In [12]:
import numpy as np
from scipy.spatial import KDTree
from scipy.spatial.transform import Rotation
from typing import Dict, Any, Optional

# (Keep your _reflect_points helper function as is)
def _reflect_points(points, plane_normal, plane_point):
    normal = np.array(plane_normal) / np.linalg.norm(plane_normal)
    d = -np.dot(normal, np.array(plane_point))
    dists = np.dot(points, normal) + d
    return points - 2 * dists[:, np.newaxis] * normal


def _score_reflection_plane_robust(points, point_tree, plane_normal, plane_point, match_threshold):
    """Robustly scores a reflection plane using bidirectional consistency."""
    # Forward check
    reflected_points = _reflect_points(points, plane_normal, plane_point)
    distances, neighbor_indices = point_tree.query(reflected_points, k=1)
    
    potential_inlier_mask = distances < match_threshold
    potential_indices = np.where(potential_inlier_mask)[0]
    
    # --- [DEBUG] Print results of the initial forward check ---
    print(f"      [DEBUG] Potential inliers found (distance < {match_threshold}m): {len(potential_indices)} / {len(points)}")

    # Backward consistency check
    consistent_inlier_indices = []
    if len(potential_indices) > 0:
        neighbors_to_check = points[neighbor_indices[potential_indices]]
        reflected_neighbors = _reflect_points(neighbors_to_check, plane_normal, plane_point)
        _, reverse_neighbor_indices = point_tree.query(reflected_neighbors, k=1)
        
        is_consistent = (reverse_neighbor_indices == potential_indices)
        consistent_inlier_indices = potential_indices[is_consistent]
        # --- [DEBUG] Print results of the backward consistency check ---
        print(f"      [DEBUG] Consistent inliers found (passed backward check): {len(consistent_inlier_indices)} / {len(potential_indices)}")

    if len(consistent_inlier_indices) == 0:
        return {'score': float('inf'), 'inlier_ratio': 0, 'mean_error': float('inf')}
        
    inlier_ratio = len(consistent_inlier_indices) / len(points)
    mean_error = np.mean(distances[consistent_inlier_indices])
    score = mean_error / inlier_ratio if inlier_ratio > 0 else float('inf')
    
    return {'score': score, 'inlier_ratio': inlier_ratio, 'mean_error': mean_error}


def analyze_global_symmetry(
    vignette: 'ProcessedVignette', 
    match_threshold: float = 0.02, 
    min_inlier_ratio: float = 0.5,
    auto_save: bool = False
) -> None:
    print("Analyzing global symmetry (robust method)...")
    vignette.clear_abstractions('symmetries', auto_save=False)

    all_props = vignette.get_abstractions('structural_properties')
    if not all_props:
        print("   [DEBUG] EXIT: Structural properties not found. Run PCA first.")
        return
    global_props = next((p for p in all_props if p['type'] == 'global'), None)
    if not global_props:
        print("   [DEBUG] EXIT: Global structural properties not found.")
        return

    points = vignette.points
    component_labels = vignette.get_attribute('component_id')
    print(f"   [DEBUG] Initial point count: {len(points)}")
    if component_labels is not None:
        non_noise_mask = component_labels != -1
        points = points[non_noise_mask]
        print(f"   [DEBUG] Non-noise point count: {len(points)}")

    if len(points) < 100:
        print(f"   [DEBUG] EXIT: Not enough points for reliable analysis (found {len(points)}, need 100).")
        return

    point_tree = KDTree(points)
    centroid = np.array(global_props['centroid'])
    axes = np.array(global_props['axes'])

    best_reflection = {'score': float('inf')}
    axis_names = ['Primary', 'Secondary', 'Tertiary']

    print("   --- Testing Reflectional Symmetry ---")
    for i in range(3):
        plane_normal = axes[i]
        print(f"   -> Testing plane normal to '{axis_names[i]}' axis...")
        result = _score_reflection_plane_robust(points, point_tree, plane_normal, centroid, match_threshold)
        print(f"      [DEBUG] Result: Score={result['score']:.4f}, Inlier Ratio={result['inlier_ratio']:.2%}, Mean Error={result['mean_error']:.4f}m")
        
        if result['score'] < best_reflection['score']:
            best_reflection = result
            best_reflection['plane_normal'] = plane_normal.tolist()

    print("   --- Final Decision ---")
    best_inlier_ratio = best_reflection.get('inlier_ratio', 0)
    print(f"   [DEBUG] Best inlier ratio found: {best_inlier_ratio:.2%}. Required minimum: {min_inlier_ratio:.2%}")
    
    if best_inlier_ratio > min_inlier_ratio:
        print(f"   SUCCESS: Found reflectional symmetry that meets the criteria!")
        reflection_result = {
            'type': 'reflectional',
            'score': best_reflection['score'],
            'inlier_ratio': best_reflection['inlier_ratio'],
            'mean_inlier_error': best_reflection['mean_error'],
            'plane_point': centroid.tolist(),
            'plane_normal': best_reflection['plane_normal']
        }
        vignette.add_abstraction('symmetries', reflection_result, auto_save=False)
    else:
        print("   - No reflectional symmetry found that passed the minimum inlier ratio threshold.")

    # (Rotational symmetry part would go here)
    
    print("Finished symmetry analysis.")
    if auto_save and vignette.file_path:
        vignette.save()

In [14]:
analyze_global_symmetry(
    vignette=processed_vignette, 
    match_threshold=0.03, 
    min_inlier_ratio=0.5,
    auto_save=False
)

Analyzing global symmetry (robust method)...
   [DEBUG] Initial point count: 8971
   [DEBUG] Non-noise point count: 8971
   --- Testing Reflectional Symmetry ---
   -> Testing plane normal to 'Primary' axis...
      [DEBUG] Potential inliers found (distance < 0.03m): 3057 / 8971
      [DEBUG] Consistent inliers found (passed backward check): 267 / 3057
      [DEBUG] Result: Score=0.0489, Inlier Ratio=2.98%, Mean Error=0.0015m
   -> Testing plane normal to 'Secondary' axis...
      [DEBUG] Potential inliers found (distance < 0.03m): 8688 / 8971
      [DEBUG] Consistent inliers found (passed backward check): 816 / 8688
      [DEBUG] Result: Score=0.0173, Inlier Ratio=9.10%, Mean Error=0.0016m
   -> Testing plane normal to 'Tertiary' axis...
      [DEBUG] Potential inliers found (distance < 0.03m): 8647 / 8971
      [DEBUG] Consistent inliers found (passed backward check): 1801 / 8647
      [DEBUG] Result: Score=0.0071, Inlier Ratio=20.08%, Mean Error=0.0014m
   --- Final Decision ---
   

In [18]:
# Complex version
import numpy as np
from scipy.spatial import KDTree
from scipy.spatial.transform import Rotation
from typing import Dict, Any, Optional, List

# (Keep _reflect_points and _score_reflection_plane_robust as they are)
# ...

def find_candidate_normals_via_voting(
    points: np.ndarray, 
    num_samples: int = 500,
    num_bins: int = 50
) -> List[np.ndarray]:
    """
    Finds candidate symmetry plane normals using a pairwise voting scheme.

    Args:
        points: Centered point cloud (non-noise points).
        num_samples: The number of random points to sample for generating pairs.
        num_bins: The resolution of the voting space for spherical coordinates.

    Returns:
        A list of promising normal vectors.
    """
    print("      [DEBUG] Starting pairwise voting to find more candidate planes...")
    # A KDTree on the centered points to find partners
    tree = KDTree(points)
    
    # Accumulator for votes (azimuth, elevation)
    accumulator = np.zeros((num_bins, num_bins))

    # Sample a subset of points to generate pairs from
    sample_indices = np.random.choice(len(points), size=min(num_samples, len(points)), replace=False)
    
    for i in sample_indices:
        p_i = points[i]
        # Search for a partner in the opposite direction
        # We query for a few neighbors in the opposite direction to get a good candidate
        _, partner_indices = tree.query(-p_i, k=5)
        
        for j in partner_indices:
            if i == j: continue
            p_j = points[j]
            
            # The normal of the perpendicular bisector is simply the vector connecting the pair
            normal = p_j - p_i
            norm_mag = np.linalg.norm(normal)
            if norm_mag < 1e-6: continue
            normal /= norm_mag
            
            # Convert normal vector to spherical coordinates (azimuth, elevation) for voting
            # Azimuth (phi) -> [0, 2*pi], Elevation (theta) -> [0, pi]
            phi = np.arctan2(normal[1], normal[0]) + np.pi # range [0, 2pi]
            theta = np.arccos(normal[2]) # range [0, pi]
            
            # Map to bin indices
            phi_idx = int((phi / (2 * np.pi)) * (num_bins - 1))
            theta_idx = int((theta / np.pi) * (num_bins - 1))
            
            accumulator[theta_idx, phi_idx] += 1
            
    # Find the peak in the accumulator
    # For more robustness, one could use peak-finding algorithms, but max is a good start.
    # We will find the top 3 peaks for more candidates.
    flat_indices = np.argsort(accumulator.flatten())[-3:] # Get top 3
    
    candidate_normals = []
    print(f"      [DEBUG] Top accumulator votes: {[accumulator.flatten()[i] for i in flat_indices]}")
    for flat_idx in flat_indices:
        theta_idx, phi_idx = np.unravel_index(flat_idx, (num_bins, num_bins))
        
        # Convert bin index back to spherical coordinates
        phi = (phi_idx / (num_bins - 1)) * 2 * np.pi - np.pi
        theta = (theta_idx / (num_bins - 1)) * np.pi
        
        # Convert back to Cartesian normal vector
        nx = np.cos(phi) * np.sin(theta)
        ny = np.sin(phi) * np.sin(theta)
        nz = np.cos(theta)
        candidate_normals.append(np.array([nx, ny, nz]))
        
    return candidate_normals

def analyze_global_symmetry(
    vignette: 'ProcessedVignette', 
    match_threshold: float = 0.02, 
    min_inlier_ratio: float = 0.5,
    use_voting: bool = True,
    auto_save: bool = False
) -> None:
    """
    Analyzes global symmetry with a robust method for partial point clouds.
    Combines PCA and pairwise voting to find candidate planes.
    """
    print("Analyzing global symmetry (robust method)...")
    vignette.clear_abstractions('symmetries', auto_save=False)

    # --- REORGANIZED & FIXED: Data Preparation and Sanity Checks ---
    # 1. Check for and retrieve PCA results (structural properties).
    all_props = vignette.get_abstractions('structural_properties')
    if not all_props:
        print("   [DEBUG] EXIT: Structural properties not found. Run PCA first.")
        return
        
    global_props = next((p for p in all_props if p['type'] == 'global'), None)
    if not global_props:
        print("   [DEBUG] EXIT: Global structural properties not found.")
        return

    # 2. Prepare the point cloud, filtering out noise.
    points = vignette.points
    component_labels = vignette.get_attribute('component_id')
    print(f"   [DEBUG] Initial point count: {len(points)}")
    if component_labels is not None:
        non_noise_mask = component_labels != -1
        points = points[non_noise_mask]
        print(f"   [DEBUG] Non-noise point count: {len(points)}")

    if len(points) < 100:
        print(f"   [DEBUG] EXIT: Not enough points for reliable analysis (found {len(points)}, need 100).")
        return

    # 3. Now that all checks have passed, define key variables.
    # The centroid from PCA is our reference point for the plane's position.
    centroid = np.array(global_props['centroid'])
    # The point cloud centered around this specific centroid is used for voting.
    centered_points = points - centroid
    # The KDTree for scoring uses the original (non-centered) coordinates.
    point_tree = KDTree(points)
    
    # --- Candidate Generation ---
    candidate_normals = []
    # Tier 1: Add the high-confidence PCA axes.
    pca_axes = np.array(global_props['axes'])
    candidate_normals.extend(pca_axes)
    
    # Tier 2: Add candidates from pairwise voting.
    if use_voting:
        voted_normals = find_candidate_normals_via_voting(centered_points)
        candidate_normals.extend(voted_normals)
    
    # Remove near-duplicate normals before testing.
    # This avoids re-testing very similar planes.
    unique_normals_set = {tuple(row) for row in np.round(candidate_normals, 2)}
    unique_normals = [np.array(t) for t in unique_normals_set]
    print(f"   - Testing a total of {len(unique_normals)} unique candidate planes.")

    # --- Testing and Scoring ---
    best_reflection = {'score': float('inf')}
    print("   --- Testing Reflectional Symmetry ---")
    for i, normal in enumerate(unique_normals):
        print(f"   -> Testing candidate plane #{i+1}...")
        result = _score_reflection_plane_robust(points, point_tree, normal, centroid, match_threshold)
        
        # This check is now inside the loop for clarity.
        if result is not None and 'score' in result:
             print(f"      [DEBUG] Result: Score={result['score']:.4f}, Inlier Ratio={result['inlier_ratio']:.2%}, Mean Error={result['mean_error']:.4f}m")
             if result['score'] < best_reflection['score']:
                best_reflection = result
                best_reflection['plane_normal'] = normal.tolist()

    # --- Final Decision ---
    print("   --- Final Decision ---")
    best_inlier_ratio = best_reflection.get('inlier_ratio', 0)
    print(f"   [DEBUG] Best inlier ratio found: {best_inlier_ratio:.2%}. Required minimum: {min_inlier_ratio:.2%}")
    
    if best_inlier_ratio > min_inlier_ratio:
        print(f"   SUCCESS: Found reflectional symmetry that meets the criteria!")
        reflection_result = {
            'type': 'reflectional',
            'score': best_reflection['score'],
            'inlier_ratio': best_reflection['inlier_ratio'],
            'mean_inlier_error': best_reflection['mean_error'],
            'plane_point': centroid.tolist(),
            'plane_normal': best_reflection['plane_normal']
        }
        vignette.add_abstraction('symmetries', reflection_result, auto_save=False)
    else:
        print("   - No reflectional symmetry found that passed the minimum inlier ratio threshold.")
    
    print("Finished symmetry analysis.")
    if auto_save and vignette.file_path:
        vignette.save()


In [19]:
analyze_global_symmetry(
    vignette=processed_vignette, 
    match_threshold=0.03, 
    min_inlier_ratio=0.5,
    use_voting=True,
    auto_save=False
)

Analyzing global symmetry (robust method)...
   [DEBUG] Initial point count: 8971
   [DEBUG] Non-noise point count: 8971
      [DEBUG] Starting pairwise voting to find more candidate planes...
      [DEBUG] Top accumulator votes: [np.float64(41.0), np.float64(42.0), np.float64(52.0)]
   - Testing a total of 6 unique candidate planes.
   --- Testing Reflectional Symmetry ---
   -> Testing candidate plane #1...
      [DEBUG] Potential inliers found (distance < 0.03m): 3057 / 8971
      [DEBUG] Consistent inliers found (passed backward check): 269 / 3057
      [DEBUG] Result: Score=0.0482, Inlier Ratio=3.00%, Mean Error=0.0014m
   -> Testing candidate plane #2...
      [DEBUG] Potential inliers found (distance < 0.03m): 6887 / 8971
      [DEBUG] Consistent inliers found (passed backward check): 1243 / 6887
      [DEBUG] Result: Score=0.0144, Inlier Ratio=13.86%, Mean Error=0.0020m
   -> Testing candidate plane #3...
      [DEBUG] Potential inliers found (distance < 0.03m): 7295 / 8971
   

## Inter Primitive Relations ##

In [26]:
import numpy as np
import itertools
from typing import Dict, Any, Optional

# This assumes your ProcessedVignette class is defined elsewhere and has
# the methods get_abstractions and add_abstraction.

def analyze_primitive_relations(
    vignette: 'ProcessedVignette',
    angle_tolerance_deg: float = 5.0,
    distance_tolerance_m: float = 0.01,
    auto_save: bool = False
) -> None:
    """
    Analyzes and stores geometric relationships between primitives in a vignette.
    
    This version supports:
    - Plane-Plane: Parallelism, Perpendicularity, Co-planarity, Distance.
    - Cylinder-Cylinder: Parallelism, Perpendicularity.
    
    (Includes detailed debug logging).
    """
    print("Analyzing primitive relationships...")
    vignette.clear_abstractions('primitive_relations', auto_save=False)
    
    # --- Calculate tolerance values from degrees ---
    # cos(angle) for parallel check. Angle is close to 0 or 180.
    parallel_dot_threshold = np.cos(np.deg2rad(angle_tolerance_deg))
    # cos(angle) for perpendicular check. Angle is close to 90.
    perp_dot_threshold = np.cos(np.deg2rad(90 - angle_tolerance_deg))
    
    print(f"   [DEBUG] Angle tolerance: {angle_tolerance_deg}°")
    print(f"   [DEBUG] Parallel check: dot product must be > {parallel_dot_threshold:.4f}")
    print(f"   [DEBUG] Perpendicular check: dot product must be < {perp_dot_threshold:.4f}")

    # --- Retrieve all primitives at the start ---
    planes = vignette.get_abstractions('planes') or []
    cylinders = vignette.get_abstractions('cylinders') or []

    # --- 1. Analyze Plane-Plane Relationships ---
    if len(planes) < 2:
        print("\n   - Skipping plane-plane analysis (less than 2 planes found).")
    else:
        print(f"\n   --- Analyzing {len(planes)} Plane-Plane pairs ---")
        for plane1, plane2 in itertools.combinations(planes, 2):
            id1 = plane1['plane_id']
            id2 = plane2['plane_id']
            
            print(f"\n   -> Comparing plane_{id1} and plane_{id2}...")

            # Extract and normalize normal vectors
            n1 = np.array(plane1['equation'][:3]); n1 /= np.linalg.norm(n1)
            n2 = np.array(plane2['equation'][:3]); n2 /= np.linalg.norm(n2)
            dot_product = np.abs(np.dot(n1, n2))
            
            print(f"      [DEBUG] Normalized dot product: {dot_product:.4f}")

            # Check for Parallelism
            if dot_product > parallel_dot_threshold:
                print(f"      [DEBUG] Test PASS: {dot_product:.4f} > {parallel_dot_threshold:.4f} (Parallel)")
                
                # Find a point on plane 1 to calculate distance
                eq1 = plane1['equation']
                if abs(eq1[2]) > 1e-6: p1 = np.array([0, 0, -eq1[3]/eq1[2]])
                elif abs(eq1[1]) > 1e-6: p1 = np.array([0, -eq1[3]/eq1[1], 0])
                else: p1 = np.array([-eq1[3]/eq1[0], 0, 0])
                
                eq2 = plane2['equation']
                distance = np.abs(np.dot(p1, eq2[:3]) + eq2[3]) / np.linalg.norm(eq2[:3])
                print(f"      [DEBUG] Distance between planes: {distance:.4f}m")

                relation_type = "co-planar" if distance < distance_tolerance_m else "parallel"
                print(f"      -> SUCCESS: Found '{relation_type.upper()}' relationship.")
                
                relation = {
                    'type': relation_type,
                    'primitives': [f'plane_{id1}', f'plane_{id2}'],
                    'angle_diff_deg': np.rad2deg(np.arccos(dot_product)),
                    'distance_m': distance if relation_type == 'parallel' else 0.0
                }
                vignette.add_abstraction('primitive_relations', relation, auto_save=False)

            # Check for Perpendicularity
            elif dot_product < perp_dot_threshold:
                print(f"      [DEBUG] Test PASS: {dot_product:.4f} < {perp_dot_threshold:.4f} (Perpendicular)")
                print(f"      -> SUCCESS: Found 'PERPENDICULAR' relationship.")
                relation = {
                    'type': 'perpendicular',
                    'primitives': [f'plane_{id1}', f'plane_{id2}'],
                    'angle_diff_deg': 90.0 - np.rad2deg(np.arcsin(dot_product))
                }
                vignette.add_abstraction('primitive_relations', relation, auto_save=False)
            
            else:
                 print(f"      [DEBUG] Test FAIL: No significant relationship found.")

    # --- 2. Analyze Cylinder-Cylinder Relationships ---
    if len(cylinders) < 2:
        print("\n   - Skipping cylinder-cylinder analysis (less than 2 cylinders found).")
    else:
        print(f"\n   --- Analyzing {len(cylinders)} Cylinder-Cylinder pairs ---")
        for cyl1, cyl2 in itertools.combinations(cylinders, 2):
            id1 = cyl1['cylinder_id']
            id2 = cyl2['cylinder_id']
            print(f"\n   -> Comparing cylinder_{id1} and cylinder_{id2}...")
            
            a1 = np.array(cyl1['axis']); a1 /= np.linalg.norm(a1)
            a2 = np.array(cyl2['axis']); a2 /= np.linalg.norm(a2)
            dot_product = np.abs(np.dot(a1, a2))
            print(f"      [DEBUG] Normalized dot product: {dot_product:.4f}")
            
            if dot_product > parallel_dot_threshold:
                print(f"      -> SUCCESS: Found 'PARALLEL' relationship.")
                relation = {'type': 'parallel', 'primitives': [f'cylinder_{id1}', f'cylinder_{id2}']}
                vignette.add_abstraction('primitive_relations', relation, auto_save=False)
            elif dot_product < perp_dot_threshold:
                print(f"      -> SUCCESS: Found 'PERPENDICULAR' relationship.")
                relation = {'type': 'perpendicular', 'primitives': [f'cylinder_{id1}', f'cylinder_{id2}']}
                vignette.add_abstraction('primitive_relations', relation, auto_save=False)
            else:
                print(f"      [DEBUG] Test FAIL: No significant relationship found.")
                
    if auto_save and vignette.file_path:
        vignette.save()
    print("\nFinished analyzing primitive relationships.")

In [27]:
analyze_primitive_relations(
    vignette=processed_vignette,
    angle_tolerance_deg=8.0,
    distance_tolerance_m=0.002,
    auto_save=False
)

Analyzing primitive relationships...
Cleared 3 abstractions of type 'primitive_relations'.
   [DEBUG] Angle tolerance: 8.0°
   [DEBUG] Parallel check: dot product must be > 0.9903
   [DEBUG] Perpendicular check: dot product must be < 0.1392

   --- Analyzing 7 Plane-Plane pairs ---

   -> Comparing plane_1 and plane_2...
      [DEBUG] Normalized dot product: 0.4833
      [DEBUG] Test FAIL: No significant relationship found.

   -> Comparing plane_1 and plane_3...
      [DEBUG] Normalized dot product: 0.4043
      [DEBUG] Test FAIL: No significant relationship found.

   -> Comparing plane_1 and plane_4...
      [DEBUG] Normalized dot product: 0.4607
      [DEBUG] Test FAIL: No significant relationship found.

   -> Comparing plane_1 and plane_5...
      [DEBUG] Normalized dot product: 0.2775
      [DEBUG] Test FAIL: No significant relationship found.

   -> Comparing plane_1 and plane_6...
      [DEBUG] Normalized dot product: 0.1408
      [DEBUG] Test FAIL: No significant relationship

## Point-wise features ##

In [3]:
import open3d as o3d
import numpy as np
from typing import Dict, Any, Optional

def analyze_local_features(
    vignette: 'ProcessedVignette',
    search_radius: float = 0.05,
    max_neighbors: int = 30,
    auto_save: bool = False
) -> None:
    """
    Analyzes per-point local geometric features like normals, curvature, and edgeness.

    This version manually computes covariance for robustness across Open3D versions.

    Args:
        vignette: The vignette to analyze. It will be modified in-place.
        search_radius: The radius (in meters) to search for neighbors.
        max_neighbors: The maximum number of neighbors to consider in the radius search.
        auto_save: If True, saves the vignette after modification.
    """
    print("Analyzing local geometric features (normals, curvature, etc.)...")
    
    # 1. Convert to Open3D PointCloud and estimate normals (this part is fine)
    pcd = o3d.geometry.PointCloud()
    pcd.points = o3d.utility.Vector3dVector(vignette.points)
    
    search_param = o3d.geometry.KDTreeSearchParamHybrid(radius=search_radius, max_nn=max_neighbors)
    print(f"   - Estimating normals with radius={search_radius}m and max_neighbors={max_neighbors}...")
    pcd.estimate_normals(search_param=search_param)
    pcd.orient_normals_consistent_tangent_plane(k=max_neighbors)
    
    vignette.normals = np.asarray(pcd.normals)
    print("   - Stored surface normals.")

    # --- CORRECTED SECTION: Manual Covariance, Curvature, and Edgeness Calculation ---
    print("   - Estimating curvature and edgeness...")
    
    # 2. Build a K-D Tree for efficient neighbor searches
    pcd_tree = o3d.geometry.KDTreeFlann(pcd)
    
    # Initialize arrays to hold the new per-point data
    curvatures = np.zeros(vignette.n_points)
    edgeness = np.zeros(vignette.n_points)
    
    # 3. Loop through each point to analyze its neighborhood
    points = np.asarray(pcd.points)
    for i in range(vignette.n_points):
        # Find neighbors of the i-th point
        [k, idx, _] = pcd_tree.search_radius_vector_3d(points[i], search_radius)
        
        # We need at least 4 points (the point itself + 3 others) to compute a meaningful covariance
        if k < 4:
            continue

        # Get the actual 3D coordinates of the neighboring points
        neighbors = points[idx, :]
        
        # Compute the 3x3 covariance matrix for the neighborhood
        # np.cov expects variables as rows, so we need to transpose the (k, 3) matrix to (3, k)
        cov_matrix = np.cov(neighbors.T)
        
        # Eigen-decomposition of the covariance matrix
        eigenvalues, _ = np.linalg.eigh(cov_matrix)
        
        # Sort eigenvalues: l1 >= l2 >= l3
        l1, l2, l3 = eigenvalues[2], eigenvalues[1], eigenvalues[0]
        
        sum_eigenvalues = l1 + l2 + l3
        if sum_eigenvalues < 1e-9:
            continue

        # Curvature calculation
        curvatures[i] = l3 / sum_eigenvalues
        
        # Edgeness/Linearity calculation
        if l1 > 1e-9:
            edgeness[i] = (l1 - l2) / l1

    # 4. Store the new attributes in the vignette
    vignette.set_attribute('curvature', curvatures, auto_save=False)
    vignette.set_attribute('edgeness', edgeness, auto_save=False)
    
    if auto_save and vignette.file_path:
        vignette.save()
        
    print("Finished analyzing local features.")

In [4]:
analyze_local_features(
    processed_vignette, 
    search_radius=0.03, 
    auto_save=True
)

NameError: name 'processed_vignette' is not defined

## Point-Wise Features From 3D Points ##

This should be a more recent version

In [5]:
processed_vignette_path = results_path / "processed_vignette.npz"
from pipeline.vignette_data import ProcessedVignette
processed_vignette = ProcessedVignette.load(processed_vignette_path)

Set/updated per-point attribute: 'confidence'.
Set/updated per-point attribute: 'plane_id'.
Set/updated per-point attribute: 'cylinder_id'.
Set/updated per-point attribute: 'sphere_id'.
Set/updated per-point attribute: 'cuboid_id'.
Set/updated per-point attribute: 'component_id'.
Set/updated per-point attribute: 'best_fit_id'.
Set/updated per-point attribute: 'curvature'.
Set/updated per-point attribute: 'edgeness'.
Set/updated per-point attribute: 'anisotropy'.
Set/updated per-point attribute: 'planarity'.
Set/updated per-point attribute: 'sphericity'.
Set/updated per-point attribute: 'flow_vectors'.
Loaded processed vignette from: /Users/yuzhenzhang/Documents/Research/TestApp/SpatialVignetteServer/test_data/capture2/results/processed_vignette.npz


In [6]:
def analyze_local_features(
    vignette: 'ProcessedVignette',
    search_radius: float = 0.05,
    max_neighbors: int = 30,
    auto_save: bool = False
) -> None:
    # --- (Same setup as before) ---
    print("Analyzing rich local geometric features...")
    pcd = o3d.geometry.PointCloud(); pcd.points = o3d.utility.Vector3dVector(vignette.points)
    search_param = o3d.geometry.KDTreeSearchParamHybrid(radius=search_radius, max_nn=max_neighbors)
    pcd.estimate_normals(search_param=search_param)
    pcd.orient_normals_consistent_tangent_plane(k=max_neighbors)
    vignette.normals = np.asarray(pcd.normals)
    
    print("   - Estimating per-point geometric features...")
    pcd_tree = o3d.geometry.KDTreeFlann(pcd)
    points = np.asarray(pcd.points)
    n_points = vignette.n_points
    
    # Initialize arrays
    curvatures, edgeness, anisotropy, planarity, sphericity = (np.zeros(n_points) for _ in range(5))
    flow_vectors = np.zeros((n_points, 3)) # NEW: Array to store the flow vectors
    
    for i in range(n_points):
        [k, idx, _] = pcd_tree.search_radius_vector_3d(points[i], search_radius)
        if k < 5: continue
        
        neighbors = points[idx, :]
        cov_matrix = np.cov(neighbors.T)
        eigenvalues, eigenvectors = np.linalg.eigh(cov_matrix)
        
        # Sort eigenvalues and corresponding eigenvectors
        sort_indices = np.argsort(eigenvalues)[::-1]
        eigenvalues = eigenvalues[sort_indices]
        eigenvectors = eigenvectors[:, sort_indices]
        
        l1, l2, l3 = eigenvalues
        v1, _, _ = eigenvectors.T

        # --- NEW: Store the primary eigenvector as the flow vector ---
        flow_vectors[i] = v1

        # (Rest of the calculations for scalar features are the same)
        # ...
        sum_eigenvalues = l1 + l2 + l3
        if sum_eigenvalues < 1e-9: continue
        curvatures[i] = l3 / sum_eigenvalues
        if l1 > 1e-9:
            edgeness[i] = (l1 - l2) / l1
            anisotropy[i] = (l1 - l3) / l1
            planarity[i] = (l2 - l3) / l1
            sphericity[i] = l3 / l1
            
    # Store all attributes
    vignette.set_attribute('curvature', curvatures, auto_save=False)
    vignette.set_attribute('edgeness', edgeness, auto_save=False)
    vignette.set_attribute('anisotropy', anisotropy, auto_save=False)
    vignette.set_attribute('planarity', planarity, auto_save=False)
    vignette.set_attribute('sphericity', sphericity, auto_save=False)
    vignette.set_attribute('flow_vectors', flow_vectors, auto_save=False) # NEW
    
    if auto_save and vignette.file_path: vignette.save()
    print("Finished analyzing local features.")

In [7]:
analyze_local_features(
    processed_vignette, 
    search_radius=0.03, 
    max_neighbors=30,
    auto_save=True
)

Analyzing rich local geometric features...
   - Estimating per-point geometric features...
Set/updated per-point attribute: 'curvature'.
Set/updated per-point attribute: 'edgeness'.
Set/updated per-point attribute: 'anisotropy'.
Set/updated per-point attribute: 'planarity'.
Set/updated per-point attribute: 'sphericity'.
Set/updated per-point attribute: 'flow_vectors'.
Saved processed vignette to: /Users/yuzhenzhang/Documents/Research/TestApp/SpatialVignetteServer/test_data/capture2/results/processed_vignette.npz
Finished analyzing local features.


In [8]:
import open3d as o3d

# Assume 'processed_vignette' is loaded and has had analyze_local_features() run on it.

# 1. Visualize Curvature
# Bright yellow areas = sharp corners, bends, and noisy points.
# Deep blue areas = flat surfaces.
print("Visualizing Curvature...")
o3d.visualization.draw_geometries([processed_vignette.to_open3d(color_mode='curvature')])

Visualizing Curvature...
Generating Open3D point cloud with 'curvature' colors...


In [9]:
# 2. Visualize Edgeness (Linearity)
# Bright yellow areas = sharp creases, like the edge of a box or a fold.
print("Visualizing Edgeness...")
o3d.visualization.draw_geometries([processed_vignette.to_open3d(color_mode='edgeness')])

Visualizing Edgeness...
Generating Open3D point cloud with 'edgeness' colors...


In [10]:
# 3. Visualize Anisotropy
# Bright yellow areas = surfaces with a strong directional "flow" or "grain", like a cylinder.
print("Visualizing Anisotropy...")
o3d.visualization.draw_geometries([processed_vignette.to_open3d(color_mode='anisotropy')])

Visualizing Anisotropy...
Generating Open3D point cloud with 'anisotropy' colors...


In [11]:
# 4. Visualize Planarity
# Bright yellow areas = flat planes. This is great for confirming the results of RANSAC.
print("Visualizing Planarity...")
o3d.visualization.draw_geometries([processed_vignette.to_open3d(color_mode='planarity')])

Visualizing Planarity...
Generating Open3D point cloud with 'planarity' colors...


In [6]:
# 5. Visualize Sphericity
# Bright yellow areas = complex, non-directional geometry or noisy point clusters.
print("Visualizing Sphericity...")
o3d.visualization.draw_geometries([processed_vignette.to_open3d(color_mode='sphericity')])

Visualizing Sphericity...
Generating Open3D point cloud with 'sphericity' colors...


In [12]:
def visualize_vector_attribute(
    vignette: 'ProcessedVignette',
    attribute_name: str,
    step: int = 20,
    scale: float = 0.05
):
    """
    Visualizes a per-point vector attribute (e.g., normals) as lines.

    Args:
        vignette: The processed vignette containing the attribute.
        attribute_name: The name of the vector attribute to draw (e.g., 'normals', 'flow_vectors').
        step: Draws a vector for every 'step' points to avoid clutter. 1 = all points.
        scale: The visual length of the lines drawn in the plot.
    """
    print(f"Visualizing vector attribute: '{attribute_name}'...")
    
    pcd = vignette.to_open3d() # Get the base point cloud
    vectors = vignette.get_attribute(attribute_name)
    
    if vectors is None:
        print(f"Error: Attribute '{attribute_name}' not found in vignette.")
        return
        
    # Subsample points to avoid a cluttered visualization
    points_subset = vignette.points[::step]
    vectors_subset = vectors[::step]
    
    # Create LineSet geometry
    lines = []
    points_for_lines = []
    for i in range(len(points_subset)):
        p = points_subset[i]
        vec = vectors_subset[i]
        
        # The line starts at the point and ends at point + scaled vector
        start_point = p
        end_point = p + vec * scale
        
        # Add the two points and the line connecting them
        points_for_lines.append(start_point)
        points_for_lines.append(end_point)
        lines.append([2*i, 2*i + 1])
        
    line_set = o3d.geometry.LineSet(
        points=o3d.utility.Vector3dVector(points_for_lines),
        lines=o3d.utility.Vector2iVector(lines),
    )
    # Optional: Color the lines
    line_set.paint_uniform_color([1.0, 0.0, 0.0]) # Red lines
    
    # Draw the original point cloud AND the lines on top
    o3d.visualization.draw_geometries([pcd, line_set])

# --- Example Usage ---

# 1. First, make sure you've run the updated analysis function
# analyze_local_features(processed_vignette, auto_save=True)



In [None]:
visualize_vector_attribute(processed_vignette, 'flow_vectors', step=1, scale=0.01)

Visualizing vector attribute: 'flow_vectors'...
Generating Open3D point cloud with 'rgb' colors...
