In [84]:
# Data Visualization

import numpy as np
import open3d as o3d

frame = []

# Load the numpy file
tf_0 = np.load(r'PCD_Data/view00_tf.npy')
tf_1 = np.load(r'PCD_Data/view01_tf.npy')
tf_2 = np.load(r'PCD_Data/view02_tf.npy')
tf_3 = np.load(r'PCD_Data/view03_tf.npy')
tf_4 = np.load(r'PCD_Data/view04_tf.npy')
tf_5 = np.load(r'PCD_Data/view05_tf.npy')
tf_6 = np.load(r'PCD_Data/view06_tf.npy')
tf_7 = np.load(r'PCD_Data/view07_tf.npy')
tf_8 = np.load(r'PCD_Data/view08_tf.npy')
tf_obj = np.load(r'PCD_Data/initial_obj_pose.npy')

# Load pcd files
pcd_0 = o3d.io.read_point_cloud(r"PCD_Data/view00.pcd")
pcd_1 = o3d.io.read_point_cloud(r"PCD_Data/view01.pcd")
pcd_2 = o3d.io.read_point_cloud(r"PCD_Data/view02.pcd")
pcd_3 = o3d.io.read_point_cloud(r"PCD_Data/view03.pcd")
pcd_4 = o3d.io.read_point_cloud(r"PCD_Data/view04.pcd")

world_frame = o3d.geometry.TriangleMesh.create_coordinate_frame(size=100.0, origin=[0, 0, 0])
frame.append(world_frame)


tf_0_frame = o3d.geometry.TriangleMesh.create_coordinate_frame(size=50.0)
tf_0_frame.transform(tf_0)
tf_1_frame = o3d.geometry.TriangleMesh.create_coordinate_frame(size=100.0)
tf_1_frame.transform(tf_1)
tf_2_frame = o3d.geometry.TriangleMesh.create_coordinate_frame(size=100.0)
tf_2_frame.transform(tf_2)
tf_3_frame = o3d.geometry.TriangleMesh.create_coordinate_frame(size=100.0)
tf_3_frame.transform(tf_3)
tf_4_frame = o3d.geometry.TriangleMesh.create_coordinate_frame(size=100.0)
tf_4_frame.transform(tf_4)
tf_5_frame = o3d.geometry.TriangleMesh.create_coordinate_frame(size=100.0)
tf_5_frame.transform(tf_5)
tf_6_frame = o3d.geometry.TriangleMesh.create_coordinate_frame(size=100.0)
tf_6_frame.transform(tf_6)
tf_7_frame = o3d.geometry.TriangleMesh.create_coordinate_frame(size=100.0)
tf_7_frame.transform(tf_7)
tf_8_frame = o3d.geometry.TriangleMesh.create_coordinate_frame(size=100.0)
tf_8_frame.transform(tf_8)
frame.append(tf_0_frame)
frame.append(tf_1_frame)
frame.append(tf_2_frame)
frame.append(tf_3_frame)
frame.append(tf_4_frame)
frame.append(tf_5_frame)
frame.append(tf_6_frame)
frame.append(tf_7_frame)
frame.append(tf_8_frame)

object_frame = o3d.geometry.TriangleMesh.create_coordinate_frame(size=100.0, origin=tf_obj[:3])
frame.append(object_frame)


o3d.visualization.draw_geometries(frame + [pcd_2])
# o3d.visualization.draw_geometries(frame + [pcd_0, pcd_1, pcd_2, pcd_3, pcd_4])



In [85]:
import plotly.graph_objects as go
import numpy as np

# --- Parameters ---
SCAN_HEIGHT = 500.0
VIEWPOINTS = 8
DESIRED_ANGLE_DEG = 75.0
obj_base_pos = np.array([0, 0, 0])
scan_radius = SCAN_HEIGHT * np.tan(np.radians(90.0 - DESIRED_ANGLE_DEG))

# --- Generate Path ---
angles = np.linspace(0, 2*np.pi, VIEWPOINTS, endpoint=False)
tx = obj_base_pos[0] + scan_radius * np.cos(angles)
ty = obj_base_pos[1] + scan_radius * np.sin(angles)
tz = np.full_like(tx, obj_base_pos[2] + SCAN_HEIGHT)

# --- Plotly Visualization ---
fig = go.Figure()

# 1. Plot Viewpoints
fig.add_trace(go.Scatter3d(x=tx, y=ty, z=tz, mode='markers+lines',
                         marker=dict(size=8, color='blue'),
                         line=dict(color='blue', dash='dash'),
                         name='Camera Path'))

# 2. Plot Object Center
fig.add_trace(go.Scatter3d(x=[obj_base_pos[0]], y=[obj_base_pos[1]], z=[obj_base_pos[2]],
                         marker=dict(size=10, color='red'),
                         name='Object Center'))

# 3. Draw "Look-at" Vectors
for i in range(VIEWPOINTS):
    fig.add_trace(go.Scatter3d(x=[tx[i], obj_base_pos[0]], 
                               y=[ty[i], obj_base_pos[1]], 
                               z=[tz[i], obj_base_pos[2]],
                               mode='lines', line=dict(color='green', width=2),
                               showlegend=False))

fig.update_layout(scene=dict(aspectmode='data',
                  xaxis_title='X (mm)', yaxis_title='Y (mm)', zaxis_title='Z (mm)'),
                  title=f"Interactive Scan Path ({DESIRED_ANGLE_DEG}° Angle)")
fig.show()

In [78]:
# Helper Functions

import open3d as o3d
import numpy as np
import math
import copy
import os


def get_rotation_matrix_z(deg):
    """Creates a 3x3 rotation matrix for the Z-axis."""
    rad = math.radians(deg)
    c, s = math.cos(rad), math.sin(rad)
    return np.array([[c, -s, 0], 
                     [s,  c, 0], 
                     [0,  0, 1]])


def merge_multiview_scan(data_dir, initial_pos, initial_angle, box_size):
    """Merges multiple view PCDs and removes points below the detected plane."""
    pcd_files = sorted([f for f in os.listdir(data_dir) if f.endswith('.pcd') and "view0" in f])
    merged_pcd = o3d.geometry.PointCloud()
    
    obb = o3d.geometry.OrientedBoundingBox(
        center=np.array(initial_pos), 
        R=get_rotation_matrix_z(-initial_angle), 
        extent=np.array(box_size)
    )

    for file_name in pcd_files:
        pcd = o3d.io.read_point_cloud(os.path.join(data_dir, file_name))
   
        # 1. Detect the plane
        plane_model, inliers = pcd.segment_plane(distance_threshold=3.0, ransac_n=3, num_iterations=2000)
        [a, b, c, d] = plane_model

        # 2. Extract all points as a numpy array
        pts = np.asarray(pcd.points)

        # 3. Calculate distance to plane for every point: ax + by + cz + d
        # Points on the plane result in 0. Points above are positive, below are negative.
        distances = a * pts[:, 0] + b * pts[:, 1] + c * pts[:, 2] + d
        
        # 4. Create an index of points that are ABOVE the plane
        # A small offset (e.g., 0.5mm) to avoid keeping table noise
        above_plane_indices = np.where(distances > 0.5)[0]
        pcd = pcd.select_by_index(above_plane_indices)

        # 5. Crop to the OBB and merge
        merged_pcd += pcd.crop(obb)

    return merged_pcd


def preprocess_point_cloud(pcd, num_points):
    """Downsamples, estimates normals, and computes FPFH features."""
    pcd_down = pcd.farthest_point_down_sample(num_points)
    avg_dist = np.mean(pcd_down.compute_nearest_neighbor_distance())
    
    # Estimate Normals
    pcd_down.estimate_normals(
        o3d.geometry.KDTreeSearchParamHybrid(radius=avg_dist * 2, max_nn=30))
    
    # Orientation fix: Ensure normals point 'up' (positive Z)
    normals = np.asarray(pcd_down.normals)
    for i in range(len(normals)):
        if normals[i][2] < 0:
            normals[i] *= -1

    # Compute FPFH (Feature descriptors for global matching)
    fpfh = o3d.pipelines.registration.compute_fpfh_feature(
        pcd_down, o3d.geometry.KDTreeSearchParamHybrid(radius=avg_dist * 5, max_nn=100))
    
    return pcd_down, fpfh


def run_global_registration(source, target, source_fpfh, target_fpfh, voxel_size):
    """
    Performs RANSAC-based global registration to find a rough alignment.
    """
    distance_threshold = voxel_size * 1.5
    
    # RANSAC based on feature matching
    result = o3d.pipelines.registration.registration_ransac_based_on_feature_matching(
        source, target, source_fpfh, target_fpfh, 
        mutual_filter=True,
        max_correspondence_distance=distance_threshold,
        estimation_method=o3d.pipelines.registration.TransformationEstimationPointToPoint(False),
        ransac_n=3, 
        checkers=[
            o3d.pipelines.registration.CorrespondenceCheckerBasedOnEdgeLength(0.9),
            o3d.pipelines.registration.CorrespondenceCheckerBasedOnDistance(distance_threshold)
        ], 
        criteria=o3d.pipelines.registration.RANSACConvergenceCriteria(4000000, 500)
    )
    return result

def run_global_registration_adaptive(source_down, target_down, source_fpfh, target_fpfh):
    """
    Performs iterative RANSAC-based global registration to find the best rough alignment.
    Matches the logic of testing multiple thresholds to find the highest fitness and lowest RMSE.
    """
    max_attempts = 5
    best_fitness = -0.1
    best_inlier_rmse = 100.0
    best_result = None
    best_threshold = None 

    # Thresholds to test, from coarse (10.0) to fine (3.0)
    thresholds = [10.0, 9.0, 8.0, 7.0, 6.0, 5.0, 4.0, 3.0]
    # thresholds = [7]

    for attempt in range(max_attempts):
        for thr in thresholds:            
            result = o3d.pipelines.registration.registration_ransac_based_on_feature_matching(
                source_down, target_down, source_fpfh, target_fpfh, 
                mutual_filter=True, # Improved matching
                max_correspondence_distance=thr,
                estimation_method=o3d.pipelines.registration.TransformationEstimationPointToPoint(False),
                ransac_n=3, 
                checkers=[
                    o3d.pipelines.registration.CorrespondenceCheckerBasedOnEdgeLength(0.85),
                    o3d.pipelines.registration.CorrespondenceCheckerBasedOnDistance(thr)
                ], 
                criteria=o3d.pipelines.registration.RANSACConvergenceCriteria(10000, 0.99)
            )

            # Update best result if fitness is good and RMSE is lower
            if result.fitness > 0.85 and result.inlier_rmse < best_inlier_rmse:
                best_fitness = result.fitness
                best_inlier_rmse = result.inlier_rmse
                best_result = result
                best_threshold = thr
            
            # Early exit if we find a very high-quality match
            if best_fitness > 0.95 and best_inlier_rmse < 2.35: 
                print(f"Excellent Global Fit Found at Threshold {best_threshold}")
                return best_result, best_threshold
            
    if best_result is None:
        print("Warning: RANSAC could not find a fit above 0.85 fitness.")
        # Fallback to the last result generated if nothing met the 0.85 criteria
        return result, thr

    print(f"RANSAC Finished. Best Threshold: {best_threshold} | Fitness: {best_fitness:.4f}")
    return best_result, best_threshold


# One time local refinement using ICP
def run_local_refinement(source, target, initial_transformation, voxel_size):
    """
    Performs ICP registration to refine the alignment found by RANSAC.
    """
    # We use a smaller threshold for ICP to ensure high precision
    distance_threshold = voxel_size * 0.4
    
    result = o3d.pipelines.registration.registration_icp(
        source, target, distance_threshold, initial_transformation,
        o3d.pipelines.registration.TransformationEstimationPointToPoint(),
        o3d.pipelines.registration.ICPConvergenceCriteria(max_iteration=2000)
    )
    return result


# Progressive Local Refinement using ICP
def run_local_refinement_adaptive(source, target, initial_trans, best_ransac_thr):
    """
    Refines alignment using an iterative ICP loop. 
    Starts with a coarse threshold and progressively tightens for precision.
    """
    # Define a range of multipliers to tighten the search radius
    # multipliers = [1.0, 0.8, 0.5, 0.25]
    multipliers = [1.0, 0.9, 0.8, 0.7, 0.6, 0.5, 0.4, 0.3, 0.2, 0.1]
    # multipliers = [0.6]
    thresholds = [best_ransac_thr * m for m in multipliers]
    
    best_result = None
    best_inlier_rmse = float('inf') # Start with the highest possible error
    
    print(f"{'Threshold':<12} | {'Fitness':<12} | {'RMSE':<12}")
    print("-" * 45)

    for thr in thresholds:
        # Execute Point-to-Point ICP
        reg_icp = o3d.pipelines.registration.registration_icp(
            source, target, thr, initial_trans,
            o3d.pipelines.registration.TransformationEstimationPointToPoint(),
            o3d.pipelines.registration.ICPConvergenceCriteria(max_iteration=1000)
        )

        # Logging the current iteration results
        print(f"{thr:<12.2f} | {reg_icp.fitness:<12.4f} | {reg_icp.inlier_rmse:<12.4f}")

        # Selection Logic: Prioritize lowest RMSE (highest precision) 
        # as long as the fitness is acceptable (e.g., > 85%)
        if reg_icp.fitness > 0.85 and reg_icp.inlier_rmse < best_inlier_rmse:
            best_inlier_rmse = reg_icp.inlier_rmse
            best_result = reg_icp

    # Fallback: if no run met the 85% fitness, return the last result
    return best_result if best_result is not None else reg_icp

In [82]:
# Main Processing Pipeline

# --- 1. Configuration ---
DATA_DIR = "PCD_Data"
SOURCE_PATH = "workpiece/z_bracket_10000.pcd" #  CAD model

tf_obj = np.load(r'PCD_Data/initial_obj_pose.npy')
YOLO_POS = tf_obj[:3]                                   # Initial guess from YOLO
YOLO_ANGLE = tf_obj[4]                                  # Initial angle from YOLO
CROP_BOX = [100, 100, 80]                               # Region of interest size


# --- 2. Data Preparation ---
print("Step 1: Merging and cleaning scans...")
source_cloud = o3d.io.read_point_cloud(SOURCE_PATH)
target_cloud = merge_multiview_scan(DATA_DIR, YOLO_POS, YOLO_ANGLE, CROP_BOX)
o3d.visualization.draw_geometries([target_cloud], window_name="Merged Target Cloud")

# Get downsampled versions and features for RANSAC
source_down, source_fpfh = preprocess_point_cloud(source_cloud, 10000)
target_down, target_fpfh = preprocess_point_cloud(target_cloud, 10000)
o3d.visualization.draw_geometries([target_down], window_name="Downsampled Target Cloud")


# --- 3. Global Alignment (RANSAC) ---
print("Step 2: Running RANSAC Global Registration...")
ransac_res, best_thr = run_global_registration_adaptive(source_down, target_down, source_fpfh, target_fpfh)
print(ransac_res)

# Visualize RANSAC result
source_temp = copy.deepcopy(source_cloud)
source_temp.transform(ransac_res.transformation)
source_temp.paint_uniform_color([1, 0, 0])
target_down.paint_uniform_color([0, 0.651, 0.929])
o3d.visualization.draw_geometries([source_temp, target_down], window_name="RANSAC Result")


Step 1: Merging and cleaning scans...
Step 2: Running RANSAC Global Registration...
RANSAC Finished. Best Threshold: 10.0 | Fitness: 0.9035
RegistrationResult with fitness=9.035000e-01, inlier_rmse=3.232130e+00, and correspondence_set size of 9035
Access transformation to get result.


In [83]:
# --- 4. Local Alignment (ICP) ---
print("Step 3: Running ICP Local Refinement...")
icp_res = run_local_refinement_adaptive(source_cloud, target_down, ransac_res.transformation, best_thr)
print(icp_res)

# --- 5. Extract Final Results ---
final_matrix = icp_res.transformation
initial_POS = final_matrix[:3, 3] 
initial_ORI = final_matrix[:3, :3] 
print("="*30)
print(f"Final Position: {initial_POS}")
print(f"Final Orientation Matrix:\n{initial_ORI}")
print(f"Fitness: {icp_res.fitness:.4f}")
print(f"RMSE: {icp_res.inlier_rmse:.4f}")


# --- 6. Final Visualization ---
source_final = copy.deepcopy(source_cloud).transform(final_matrix)
source_final.paint_uniform_color([1, 0, 0])         # Red: CAD Model
target_cloud.paint_uniform_color([0, 0.65, 0.93])   # Blue: Scanned Data
o3d.visualization.draw_geometries([source_final, target_cloud])

Step 3: Running ICP Local Refinement...
Threshold    | Fitness      | RMSE        
---------------------------------------------
10.00        | 1.0000       | 2.2150      
9.00         | 1.0000       | 2.2150      
8.00         | 1.0000       | 2.2150      
7.00         | 0.9997       | 2.2118      
6.00         | 0.9950       | 2.1738      
5.00         | 0.9692       | 2.0155      
4.00         | 0.9171       | 1.7837      
3.00         | 0.7607       | 1.4865      
2.00         | 0.6009       | 1.2458      
1.00         | 0.1644       | 0.7691      
RegistrationResult with fitness=9.171000e-01, inlier_rmse=1.783708e+00, and correspondence_set size of 9171
Access transformation to get result.
Final Position: [608.59589415  74.13075693  -1.38167208]
Final Orientation Matrix:
[[-1.70002780e-02 -9.99212505e-01 -3.58519206e-02]
 [ 9.99855358e-01 -1.70074392e-02 -1.05242650e-04]
 [-5.04589587e-04 -3.58485240e-02  9.99357108e-01]]
Fitness: 0.9171
RMSE: 1.7837
