In [1]:
import open3d as o3d
import numpy as np
from tqdm import tqdm
import pandas as pd

def estimate_point_density(pcd, radius_mm=1.0, min_neighbors=30, verbose=False):
    kdtree = o3d.geometry.KDTreeFlann(pcd)
    counts = []
    total_checked = 0
    for i in range(0, len(pcd.points), max(1, len(pcd.points)//1000)):  # sample ~1000 points
        [_, idx, _] = kdtree.search_radius_vector_3d(pcd.points[i], radius_mm)
        neighbor_count = len(idx) - 1  # exclude the point itself
        if neighbor_count >= min_neighbors:
            counts.append(neighbor_count)
        total_checked += 1

    if counts:
        avg_density = np.mean(counts)
        if verbose:
            print(f"[INFO] Average density (excluding sparse points): {avg_density:.2f} points per {radius_mm}mm sphere")
            print(f"[INFO] Used {len(counts)} / {total_checked} sampled points (≥ {min_neighbors} neighbors)")
    else:
        avg_density = 0
        print(f"[WARN] No sampled points had ≥ {min_neighbors} neighbors.")

    return avg_density

def raycast_topdown(mesh, x_rotation, y_rotation, z_rotation, spacing=0.1):

    Rx = mesh.get_rotation_matrix_from_axis_angle([np.deg2rad(x_rotation), 0, 0])
    Ry = mesh.get_rotation_matrix_from_axis_angle([0, np.deg2rad(y_rotation), 0])
    Rz = mesh.get_rotation_matrix_from_axis_angle([0, 0, np.deg2rad(z_rotation)])
    mesh.rotate(Rx, center=mesh.get_center())
    mesh.rotate(Ry, center=mesh.get_center())
    mesh.rotate(Rz, center=mesh.get_center())

    scene = o3d.t.geometry.RaycastingScene()
    t_mesh = o3d.t.geometry.TriangleMesh.from_legacy(mesh)
    _ = scene.add_triangles(t_mesh)
    aabb = mesh.get_axis_aligned_bounding_box()
    min_x, min_y, _ = aabb.min_bound
    max_x, max_y, _ = aabb.max_bound
    x_vals = np.linspace(min_x, max_x, int((max_x - min_x)/spacing))
    y_vals = np.linspace(min_y, max_y, int((max_y - min_y)/spacing))
    xx, yy = np.meshgrid(x_vals, y_vals)
    origin_z = aabb.max_bound[2] + 10
    origins = np.stack([xx.ravel(), yy.ravel(), np.full(xx.size, origin_z)], axis=1)
    directions = np.tile([0, 0, -1], (len(origins), 1))
    rays = o3d.core.Tensor(np.hstack((origins, directions)), dtype=o3d.core.Dtype.Float32)
    hits = scene.cast_rays(rays)
    mask = hits['t_hit'].isfinite()
    hit_points = origins[mask.numpy()] + hits['t_hit'][mask].numpy().reshape(-1, 1) * directions[mask.numpy()]
    return o3d.geometry.PointCloud(o3d.utility.Vector3dVector(hit_points))

def preprocess_for_fpfh(pcd, voxel_size):
    """Downsample and estimate FPFH features for a point cloud."""
    pcd_down = pcd.voxel_down_sample(voxel_size)
    pcd_down.estimate_normals(
        search_param=o3d.geometry.KDTreeSearchParamHybrid(radius=voxel_size * 2, max_nn=30)
    )
    fpfh = o3d.pipelines.registration.compute_fpfh_feature(
        pcd_down,
        search_param=o3d.geometry.KDTreeSearchParamHybrid(radius=voxel_size * 5, max_nn=100)
    )
    return pcd_down, fpfh

def run_global_icp_alignment(scan_pcd, cad_pcd, voxel_size=1.0, verbose=False):
    """Run global RANSAC + ICP refinement."""
    scan_down, scan_fpfh = preprocess_for_fpfh(scan_pcd, voxel_size)
    cad_down, cad_fpfh = preprocess_for_fpfh(cad_pcd, voxel_size)

    # RANSAC-based global registration
    result_ransac = o3d.pipelines.registration.registration_ransac_based_on_feature_matching(
        cad_down, scan_down, cad_fpfh, scan_fpfh, mutual_filter=True,
        max_correspondence_distance=voxel_size * 1.5,
        estimation_method=o3d.pipelines.registration.TransformationEstimationPointToPoint(False),
        ransac_n=4,
        checkers=[
            o3d.pipelines.registration.CorrespondenceCheckerBasedOnEdgeLength(0.9),
            o3d.pipelines.registration.CorrespondenceCheckerBasedOnDistance(voxel_size * 1.5)
        ],
        criteria=o3d.pipelines.registration.RANSACConvergenceCriteria(4000000, 500)
    )

    eval_ransac = o3d.pipelines.registration.evaluate_registration(
        cad_down, scan_down, max_correspondence_distance=voxel_size * 1.5,
        transformation=result_ransac.transformation
    )
    print(f"[INFO] RANSAC Alignment RMSE: {eval_ransac.inlier_rmse:.4f} | Fitness: {eval_ransac.fitness:.4f}")

    if verbose:
        print("[INFO] Global RANSAC transformation:")
        print(result_ransac.transformation)

    scan_pcd.estimate_normals()
    cad_pcd.estimate_normals()

    result_icp = o3d.pipelines.registration.registration_icp(
        cad_pcd, scan_pcd, 2.0, result_ransac.transformation,
        o3d.pipelines.registration.TransformationEstimationPointToPlane()
    )

    # Evaluate ICP result
    eval_icp = o3d.pipelines.registration.evaluate_registration(
        cad_pcd, scan_pcd, max_correspondence_distance=2.0,
        transformation=result_icp.transformation
    )
    print(f"[INFO] ICP Alignment RMSE: {eval_icp.inlier_rmse:.4f} | Fitness: {eval_icp.fitness:.4f}")

    if verbose:
        print("[INFO] ICP refinement transformation:")
        print(result_icp.transformation)
        print(f"[INFO] ICP Fitness: {result_icp.fitness:.4f}, RMSE: {result_icp.inlier_rmse:.4f}")

    return result_icp.transformation

def run_fpfh_ransac_only(scan_pcd, cad_pcd, voxel_size=1.0, verbose=False):
    """Run global FPFH-based RANSAC registration only (no ICP refinement)."""
    scan_down, scan_fpfh = preprocess_for_fpfh(scan_pcd, voxel_size)
    cad_down, cad_fpfh = preprocess_for_fpfh(cad_pcd, voxel_size)

    result_ransac = o3d.pipelines.registration.registration_ransac_based_on_feature_matching(
        cad_down, scan_down, cad_fpfh, scan_fpfh, mutual_filter=True,
        max_correspondence_distance=voxel_size * 1.5,
        estimation_method=o3d.pipelines.registration.TransformationEstimationPointToPoint(False),
        ransac_n=4,
        checkers=[
            o3d.pipelines.registration.CorrespondenceCheckerBasedOnEdgeLength(0.9),
            o3d.pipelines.registration.CorrespondenceCheckerBasedOnDistance(voxel_size * 1.5)
        ],
        criteria=o3d.pipelines.registration.RANSACConvergenceCriteria(4000000, 500)
    )

    if verbose:
        print("[INFO] Global RANSAC transformation (FPFH only):")
        print(result_ransac.transformation)
        print(f"[INFO] Fitness: {result_ransac.fitness:.4f}, RMSE: {result_ransac.inlier_rmse:.4f}")

    return result_ransac.transformation, result_ransac

def run_centered_icp_sweep(source, target, thresholds, method='point_to_point', verbose=False):
    """
    Run ICP alignment between source and target using center-to-center translation as initial guess.
    Evaluate ICP over a list of thresholds and return the best transformation and evaluation table.
    """
    trans_init = np.eye(4)

    if method == 'point_to_point':
        estimation = o3d.pipelines.registration.TransformationEstimationPointToPoint()
    elif method == 'point_to_plane':
        source.estimate_normals()
        target.estimate_normals()
        estimation = o3d.pipelines.registration.TransformationEstimationPointToPlane()
    else:
        raise ValueError("Unsupported method. Use 'point_to_point' or 'point_to_plane'.")

    results = []
    for threshold in tqdm(thresholds, desc="Evaluating ICP thresholds"):
        reg_result = o3d.pipelines.registration.registration_icp(
            source, target, threshold, trans_init, estimation)
        
        evaluation = o3d.pipelines.registration.evaluate_registration(
            source, target, threshold, reg_result.transformation)

        if verbose:
            print(f"Threshold: {threshold:.3f}")
            print(f"ICP Fitness: {reg_result.fitness:.4f}, RMSE: {reg_result.inlier_rmse:.4f}")
            print(f"Eval Fitness: {evaluation.fitness:.4f}, Eval RMSE: {evaluation.inlier_rmse:.4f}")
            print(f"Transformation:\n{reg_result.transformation}\n{'-' * 40}")

        results.append({
            "Threshold": threshold,
            "RMSE": evaluation.inlier_rmse,
            "Fitness": evaluation.fitness,
            "Transformation": reg_result.transformation
        })

    df = pd.DataFrame(results)
    best_idx = df["RMSE"].idxmin()
    best_result = df.loc[best_idx]

    return best_result, df

def crop_scan_near_mesh(scan_pcd, cad_mesh, max_distance=2.5):

    t_mesh = o3d.t.geometry.TriangleMesh.from_legacy(cad_mesh)
    scene = o3d.t.geometry.RaycastingScene()
    _ = scene.add_triangles(t_mesh)

    scan_pts = np.asarray(scan_pcd.points)
    pcd_tensor = o3d.core.Tensor(scan_pts, dtype=o3d.core.Dtype.Float32)

    signed_dists = scene.compute_signed_distance(pcd_tensor)
    #abs_dists = np.abs(signed_dists.numpy())
    sd = signed_dists.numpy()
    mask = (sd < 0) | ((sd >= 0) & (sd < max_distance))

    cropped = o3d.geometry.PointCloud(o3d.utility.Vector3dVector(scan_pts[mask]))
    return cropped

def remove_outliers(pcd, nb_neighbors=20, std_ratio=2.0, radius=1.5, min_points=10):
    pcd_clean, ind_stat = pcd.remove_statistical_outlier(nb_neighbors=nb_neighbors, std_ratio=std_ratio)
    pcd_clean, ind_rad = pcd_clean.remove_radius_outlier(nb_points=min_points, radius=radius)
    
    return pcd_clean

def remove_outliers2(pcd, nb_neighbors=20, std_ratio=2.0, radius=1.5, min_points=10):
    # Step 1: Statistical outlier removal
    _, ind_stat = pcd.remove_statistical_outlier(nb_neighbors=nb_neighbors, std_ratio=std_ratio)
    
    # Apply SOR inliers to get cleaned PCD
    pcd_stat = pcd.select_by_index(ind_stat)

    # Step 2: Radius outlier removal
    _, ind_rad = pcd_stat.remove_radius_outlier(nb_points=min_points, radius=radius)

    # Get final cleaned PCD
    pcd_clean = pcd_stat.select_by_index(ind_rad)

    # Determine final inlier indices relative to original point cloud
    ind_final = np.asarray(ind_stat)[ind_rad]
    ind_outliers = np.setdiff1d(np.arange(len(pcd.points)), ind_final)

    return pcd_clean, ind_final, ind_outliers

def dbscan_cleanup(pcd, eps=1.5, min_points=15):
    labels = np.array(pcd.cluster_dbscan(eps=eps, min_points=min_points))
    largest_cluster = labels == np.bincount(labels[labels >= 0]).argmax()
    final = pcd.select_by_index(np.where(largest_cluster)[0])
    return final

def preProcessData(scan_pcd, cad_mesh, x_rotation, y_rotation, z_rotation, verbose=False):
    o3d.visualization.draw_geometries([scan_pcd])
    avg_density = estimate_point_density(scan_pcd, radius_mm=1.0, verbose=verbose)
    cad_pcd = raycast_topdown(cad_mesh, x_rotation=x_rotation, y_rotation=y_rotation, z_rotation=z_rotation, spacing=0.1)
    o3d.visualization.draw_geometries([cad_pcd])
    T_final = run_global_icp_alignment(scan_pcd, cad_pcd, voxel_size=1.0, verbose=verbose)
    #cad_pcd_center = np.mean(np.asarray(cad_pcd.points), axis=0)
    #scan_pcd_center = np.mean(np.asarray(scan_pcd.points), axis=0)

    # Compute the translation vector
    #translation = cad_pcd_center - scan_pcd_center

    # Translate the hull_pcd to align the centers
    #scan_pcd.points = o3d.utility.Vector3dVector(np.asarray(scan_pcd.points) + translation)
    #T_fpfh, result = run_fpfh_ransac_only(scan_pcd, cad_pcd, voxel_size=1.0, verbose=True)
    # thresholds = np.linspace(1, 150, 25)
    # best_result, table = run_centered_icp_sweep(scan_pcd, cad_pcd, thresholds, method='point_to_point', verbose=True)
    # 📊 Display table
    # print("\n=== ICP Sweep Results ===")
    # print(table[["Threshold", "RMSE", "Fitness"]].to_string(index=False))
    # print("\nBest result:")
    # print(f"Threshold: {best_result['Threshold']:.3f}")
    # print(f"RMSE: {best_result['RMSE']:.4f}")
    # print(f"Fitness: {best_result['Fitness']:.4f}")
    cad_pcd.paint_uniform_color([0,1,0])
    scan_pcd.paint_uniform_color([1,0,0])
    cad_pcd.transform(T_final)
    cad_mesh.transform(T_final)
    # cad_pcd.transform(T_final)
    # cad_mesh.transform(T_final)
    o3d.visualization.draw_geometries([scan_pcd, cad_pcd])
    cropped = crop_scan_near_mesh(scan_pcd, cad_mesh, max_distance=1.5)
    cropped.paint_uniform_color([0.2, 0.35, 0.8])  # Blue for cropped points
    cleaned = remove_outliers(cropped, nb_neighbors=20, std_ratio=4.0, radius=2.5, min_points=8)
    o3d.visualization.draw_geometries([cleaned])
    preprocessed_pcd = dbscan_cleanup(cleaned, eps=0.6, min_points=15)
    o3d.visualization.draw_geometries([preprocessed_pcd])
    nn_distance = np.mean([preprocessed_pcd.compute_nearest_neighbor_distance()])
    radius_normals = nn_distance*4
    preprocessed_pcd.estimate_normals(search_param=o3d.geometry.KDTreeSearchParamHybrid(radius=radius_normals, max_nn=16), fast_normal_computation=True)
    return preprocessed_pcd, avg_density, cad_pcd

def visualize_fpfh_alignment(scan_pcd, cad_mesh, x_rotation, y_rotation, z_rotation, voxel_size=1, vis_voxel_size=0.1, verbose=False):
    # Step 1: Create CAD point cloud via raycasting and downsample both for visualization
    cad_pcd_raw = raycast_topdown(cad_mesh, x_rotation, y_rotation, z_rotation, spacing=0.1)
    # scan_vis = scan_pcd.voxel_down_sample(vis_voxel_size)
    # cad_vis = cad_pcd_raw.voxel_down_sample(vis_voxel_size)

    # # Step 2: Move both to origin
    # scan_center = scan_vis.get_center()
    # cad_center = cad_vis.get_center()
    # scan_vis.translate(-scan_center)
    # cad_vis.translate(-cad_center)

    # # Step 3: Offset CAD for separation
    # scan_bbox = scan_vis.get_axis_aligned_bounding_box()
    # cad_bbox = cad_vis.get_axis_aligned_bounding_box()
    # x_offset = scan_bbox.get_extent()[0] + 0.5 * cad_bbox.get_extent()[0]
    # cad_vis_pre = cad_vis.translate([x_offset-20, 0, -40], relative=False)

    # Step 4: Color and show pre-alignment
    # scan_vis.paint_uniform_color([1, 0, 0])  # Red
    # cad_vis_pre.paint_uniform_color([0, 1, 0])  # Green
    # o3d.visualization.draw_geometries([scan_vis, cad_vis_pre], window_name="Before FPFH RANSAC Alignment")

    # Step 5: Perform real alignment using full-resolution data
    scan_down, scan_fpfh = preprocess_for_fpfh(scan_pcd, voxel_size)
    cad_down, cad_fpfh = preprocess_for_fpfh(cad_pcd_raw, voxel_size)

    result_ransac = o3d.pipelines.registration.registration_ransac_based_on_feature_matching(
        cad_down, scan_down, cad_fpfh, scan_fpfh, mutual_filter=True,
        max_correspondence_distance=voxel_size * 1.5,
        estimation_method=o3d.pipelines.registration.TransformationEstimationPointToPoint(False),
        ransac_n=4,
        checkers=[
            o3d.pipelines.registration.CorrespondenceCheckerBasedOnEdgeLength(0.9),
            o3d.pipelines.registration.CorrespondenceCheckerBasedOnDistance(voxel_size * 1.5)
        ],
        criteria=o3d.pipelines.registration.RANSACConvergenceCriteria(4000000, 500)
    )

    # Step 6: Apply transformation to full-res CAD and downsample for visualization
    cad_aligned_vis = cad_pcd_raw.transform(result_ransac.transformation).voxel_down_sample(vis_voxel_size)
    scan_pcd.paint_uniform_color([1, 0, 0])
    cad_aligned_vis.paint_uniform_color([0, 1, 0])
    o3d.visualization.draw_geometries([scan_pcd, cad_aligned_vis], window_name="After FPFH RANSAC Alignment")

    return result_ransac.transformation

def visualize_outlier_removal(scan_pcd, cad_mesh, x_rotation, y_rotation, z_rotation, voxel_size=1, vis_voxel_size=0.1, verbose=False):
    #o3d.visualization.draw_geometries([scan_pcd])
    #avg_density = estimate_point_density(scan_pcd, radius_mm=1.0, verbose=verbose)
    cad_pcd = raycast_topdown(cad_mesh, x_rotation=x_rotation, y_rotation=y_rotation, z_rotation=z_rotation, spacing=0.1)
    #o3d.visualization.draw_geometries([cad_pcd])
    T_final = run_global_icp_alignment(scan_pcd, cad_pcd, voxel_size=1.0, verbose=verbose)
    cad_pcd.paint_uniform_color([0,1,0])
    scan_pcd.paint_uniform_color([1,0,0])
    cad_pcd.transform(T_final)
    cad_mesh.transform(T_final)
    #o3d.visualization.draw_geometries([scan_pcd, cad_pcd])
    cropped = crop_scan_near_mesh(scan_pcd, cad_mesh, max_distance=1.5)
    cropped.paint_uniform_color([0.2, 0.35, 0.8])  # Blue for cropped points
    cleaned, inlier_indices, outlier_indices = remove_outliers2(cropped, nb_neighbors=20, std_ratio=4.0, radius=2.5, min_points=8)

    # Select subsets
    inlier_pcd = cropped.select_by_index(inlier_indices)
    outlier_pcd = cropped.select_by_index(outlier_indices)

    # Color for clarity
    inlier_pcd.paint_uniform_color([0.6, 0.6, 0.6])  # Gray
    outlier_pcd.paint_uniform_color([1.0, 0.0, 0.0])  # Red

    # Visualize together
    o3d.visualization.draw_geometries([inlier_pcd, outlier_pcd])

def visualize_dbscan_cleanup(scan_pcd, cad_mesh, x_rotation, y_rotation, z_rotation, voxel_size=1, vis_voxel_size=0.1, verbose=False):
    #o3d.visualization.draw_geometries([scan_pcd])
    #avg_density = estimate_point_density(scan_pcd, radius_mm=1.0, verbose=verbose)
    cad_pcd = raycast_topdown(cad_mesh, x_rotation=x_rotation, y_rotation=y_rotation, z_rotation=z_rotation, spacing=0.1)
    #o3d.visualization.draw_geometries([cad_pcd])
    T_final = run_global_icp_alignment(scan_pcd, cad_pcd, voxel_size=1.0, verbose=verbose)
    cad_pcd.paint_uniform_color([0,1,0])
    scan_pcd.paint_uniform_color([1,0,0])
    cad_pcd.transform(T_final)
    cad_mesh.transform(T_final)
    #o3d.visualization.draw_geometries([scan_pcd, cad_pcd])
    cropped = crop_scan_near_mesh(scan_pcd, cad_mesh, max_distance=1.5)
    cropped.paint_uniform_color([0.2, 0.35, 0.8])  # Blue for cropped points


Jupyter environment detected. Enabling Open3D WebVisualizer.
[Open3D INFO] WebRTC GUI backend enabled.
[Open3D INFO] WebRTCWindowSystem: HTTP handshake server disabled.


In [2]:
point_cloud_location = "/home/chris/Code/PointClouds/data/FLIPscans/gratetouched.ply"
pcd = o3d.io.read_point_cloud(point_cloud_location)

mesh = o3d.io.read_triangle_mesh("/home/chris/Code/PointClouds/data/FLIPscans/GrateAndCover/ventilationgrate.STL")
mesh.compute_vertex_normals()

TriangleMesh with 2856 points and 1708 triangles.

In [None]:
# Preprocess the point cloud
result = preProcessData(pcd, mesh, x_rotation=0, y_rotation=0, z_rotation=0)

[INFO] RANSAC Alignment RMSE: 0.0000 | Fitness: 0.0000
[INFO] ICP Alignment RMSE: 0.0000 | Fitness: 0.0000
