In [1]:
import open3d as o3d
import numpy as np
import copy
import time

Jupyter environment detected. Enabling Open3D WebVisualizer.
[Open3D INFO] WebRTC GUI backend enabled.
[Open3D INFO] WebRTCWindowSystem: HTTP handshake server disabled.


In [65]:
# VISUALIZE READY CLOUDS
source = o3d.io.read_point_cloud('<PATH TO TEST CLOUD>')
target = o3d.io.read_point_cloud('<PATH TO REFERENCE CLOUD>')
draw_registration_result(source, target, np.identity(4))



In [66]:
voxel_size = 0.05
source_down, source_fpfh = preprocess_point_cloud(source, voxel_size)
target_down, target_fpfh = preprocess_point_cloud(target, voxel_size)
draw_registration_result(source_down, target_down, np.identity(4))

:: Downsample with a voxel size 0.050.
:: Estimate normal with search radius 0.100.
:: Compute FPFH feature with search radius 0.250.
:: Downsample with a voxel size 0.050.
:: Estimate normal with search radius 0.100.
:: Compute FPFH feature with search radius 0.250.


In [3]:
def draw_registration_result(source, target, transformation):
    source_temp = copy.deepcopy(source)
    target_temp = copy.deepcopy(target)
    source_temp.paint_uniform_color([1, 0.706, 0])
    target_temp.paint_uniform_color([0, 0.651, 0.929])
    source_temp.transform(transformation)
    o3d.visualization.draw_geometries([source_temp, target_temp],
                                      zoom=1,
                                      front=[0.6452, -0.3036, -0.7011],
                                      lookat=[1.9892, 2.0208, 1.8945],
                                      up=[-0.2779, -0.9482, 0.1556])

In [4]:
def preprocess_point_cloud(pcd, voxel_size):
    print(":: Downsample with a voxel size %.3f." % voxel_size)
    pcd_down = pcd.voxel_down_sample(voxel_size)

    radius_normal = voxel_size * 2
    print(":: Estimate normal with search radius %.3f." % radius_normal)
    pcd_down.estimate_normals(
        o3d.geometry.KDTreeSearchParamHybrid(radius=radius_normal, max_nn=30))

    radius_feature = voxel_size * 5
    print(":: Compute FPFH feature with search radius %.3f." % radius_feature)
    pcd_fpfh = o3d.pipelines.registration.compute_fpfh_feature(
        pcd_down,
        o3d.geometry.KDTreeSearchParamHybrid(radius=radius_feature, max_nn=100))
    return pcd_down, pcd_fpfh

In [5]:
def prepare_dataset(source_path, target_path, voxel_size):
    print(":: Load two point clouds and disturb initial pose.")

    #demo_icp_pcds = o3d.data.DemoICPPointClouds()
    #source = o3d.io.read_point_cloud(demo_icp_pcds.paths[0])
    #target = o3d.io.read_point_cloud(demo_icp_pcds.paths[1])
    #trans_init = np.asarray([[0.0, 0.0, 1.0, 0.0], [1.0, 0.0, 0.0, 0.0],
    #                         [0.0, 1.0, 0.0, 0.0], [0.0, 0.0, 0.0, 1.0]])
    #source.transform(trans_init)
    source = o3d.io.read_point_cloud(source_path)
    target = o3d.io.read_point_cloud(target_path)
    draw_registration_result(source, target, np.identity(4))

    source_down, source_fpfh = preprocess_point_cloud(source, voxel_size)
    target_down, target_fpfh = preprocess_point_cloud(target, voxel_size)
    return source, target, source_down, target_down, source_fpfh, target_fpfh

In [64]:
source = o3d.io.read_point_cloud('<PATH TO TEST CLOUD>')
target = o3d.io.read_point_cloud('<PATH TO REFERENCE CLOUD>')
source = source.translate([0, 0, 0], relative=False)
source.scale(4, center=source.get_center())
#source.scale(2.0, center=source.get_center())
#o3d.visualization.draw_geometries([source, target])
#R = source.get_rotation_matrix_from_xyz((np.radians(0), np.radians(0), np.radians(0)))
#source.rotate(R, center=source.get_center())
draw_registration_result(source, target, np.identity(4))



In [38]:
voxel_size = 0.05
source_down, source_fpfh = preprocess_point_cloud(source, voxel_size)
target_down, target_fpfh = preprocess_point_cloud(target, voxel_size)
draw_registration_result(source_down, target_down, np.identity(4))

:: Downsample with a voxel size 0.050.
:: Estimate normal with search radius 0.100.
:: Compute FPFH feature with search radius 0.250.
:: Downsample with a voxel size 0.050.
:: Estimate normal with search radius 0.100.
:: Compute FPFH feature with search radius 0.250.


In [12]:
def execute_global_registration(source_down, target_down, source_fpfh,
                                target_fpfh, voxel_size):
    distance_threshold = voxel_size * 1.5
    print(":: RANSAC registration on downsampled point clouds.")
    print("   Since the downsampling voxel size is %.3f," % voxel_size)
    print("   we use a liberal distance threshold %.3f." % distance_threshold)
    result = o3d.pipelines.registration.registration_ransac_based_on_feature_matching(
        source_down, target_down, source_fpfh, target_fpfh, True,
        distance_threshold,
        o3d.pipelines.registration.TransformationEstimationPointToPoint(False),
        3, [
            o3d.pipelines.registration.CorrespondenceCheckerBasedOnEdgeLength(
                0.9),
            o3d.pipelines.registration.CorrespondenceCheckerBasedOnDistance(
                distance_threshold)
        ], o3d.pipelines.registration.RANSACConvergenceCriteria(1000000, 0.999))
    return result

In [83]:
source_sample = source.uniform_down_sample(34)
target_sample = target.uniform_down_sample(89)
#draw_registration_result(source_sample, target_sample, np.identity(4))

In [84]:
print(f"ORIGINAL\n\ttarget: {len(target.points)}, source: {len(source.points)}, ratio(target / source): {len(target.points) / len(source.points)}")
print(f"SAMPLE\n\ttarget: {len(target_sample.points)}, source: {len(source_sample.points)}, ratio(target / source): {len(target_sample.points) / len(source_sample.points)}")

ORIGINAL
	target: 909606, source: 349096, ratio(target / source): 2.6056041891055757
SAMPLE
	target: 10221, source: 10268, ratio(target / source): 0.9954226723802103


In [55]:
# NEW
voxel_size = 0.05  # means 5cm for this dataset
source_sample_down, source_sample_fpfh = preprocess_point_cloud(source_sample, voxel_size)
target_sample_down, target_sample_fpfh = preprocess_point_cloud(target_sample, voxel_size)

:: Downsample with a voxel size 0.050.
:: Estimate normal with search radius 0.100.
:: Compute FPFH feature with search radius 0.250.
:: Downsample with a voxel size 0.050.
:: Estimate normal with search radius 0.100.
:: Compute FPFH feature with search radius 0.250.


In [273]:
# NEW
result_ransac = execute_global_registration(source_sample_down, target_sample_down,
                                            source_sample_fpfh, target_sample_fpfh,
                                            voxel_size)
print(result_ransac)
draw_registration_result(source_sample_down, target_sample_down, result_ransac.transformation)

:: RANSAC registration on downsampled point clouds.
   Since the downsampling voxel size is 0.050,
   we use a liberal distance threshold 0.075.
RegistrationResult with fitness=2.173095e-01, inlier_rmse=5.619675e-02, and correspondence_set size of 231
Access transformation to get result.


In [26]:
# NEW
# optimize RANSAC global registration based on chamfer
from tqdm.auto import tqdm
def optimize_registration(n_iterations, voxel_size, specific_ransac=None):
    if specific_ransac is None:
        best_ransac = None
        best_chamfer = None
    else:
        best_ransac = specific_ransac
        best_chamfer = calculate_chamfer(source, target, best_ransac.transformation)
        source_sample_down, source_sample_fpfh = preprocess_point_cloud(source_sample, voxel_size)
        target_sample_down, target_sample_fpfh = preprocess_point_cloud(target_sample, voxel_size)
    for i in tqdm(range(n_iterations)):
        if specific_ransac is None:
            source_sample_down, source_sample_fpfh = preprocess_point_cloud(source_sample, voxel_size)
            target_sample_down, target_sample_fpfh = preprocess_point_cloud(target_sample, voxel_size)
        result_ransac = execute_global_registration(source_sample_down, target_sample_down,
                                                source_sample_fpfh, target_sample_fpfh,
                                                voxel_size)
        chamfer = calculate_chamfer(source, target, result_ransac.transformation)
        if best_chamfer is None or chamfer < best_chamfer:
            best_ransac = result_ransac
            best_chamfer = chamfer
        print(f"[ITERATION {i}] best chamfer: {best_chamfer}")
    return best_ransac, best_chamfer

In [56]:
result_ransac, chamfer = optimize_registration(10, voxel_size, None)
print(chamfer)

  0%|          | 0/10 [00:00<?, ?it/s]

:: Downsample with a voxel size 0.050.
:: Estimate normal with search radius 0.100.
:: Compute FPFH feature with search radius 0.250.
:: Downsample with a voxel size 0.050.
:: Estimate normal with search radius 0.100.
:: Compute FPFH feature with search radius 0.250.
:: RANSAC registration on downsampled point clouds.
   Since the downsampling voxel size is 0.050,
   we use a liberal distance threshold 0.075.
[ITERATION 0] best chamfer: 0.2025934945725843
:: Downsample with a voxel size 0.050.
:: Estimate normal with search radius 0.100.
:: Compute FPFH feature with search radius 0.250.
:: Downsample with a voxel size 0.050.
:: Estimate normal with search radius 0.100.
:: Compute FPFH feature with search radius 0.250.
:: RANSAC registration on downsampled point clouds.
   Since the downsampling voxel size is 0.050,
   we use a liberal distance threshold 0.075.
[ITERATION 1] best chamfer: 0.2025934945725843
:: Downsample with a voxel size 0.050.
:: Estimate normal with search radius 0.1

In [57]:
# NEW
chamfer_original = calculate_chamfer(source, target, np.identity(4))
chamfer_aligned = calculate_chamfer(source, target, result_ransac.transformation)
print("ORIGINAL:", chamfer_original)
print("ALIGNED:", chamfer_aligned)

ORIGINAL: 0.18760007280641866
ALIGNED: 0.19124614651496474


In [58]:
draw_registration_result(source, target, result_ransac.transformation)



In [59]:
voxel_size = 0.05  # means 5cm for this dataset
source_down, source_fpfh = preprocess_point_cloud(source, voxel_size)
target_down, target_fpfh = preprocess_point_cloud(target, voxel_size)
draw_registration_result(source_down, target_down, result_ransac.transformation)

:: Downsample with a voxel size 0.050.
:: Estimate normal with search radius 0.100.
:: Compute FPFH feature with search radius 0.250.
:: Downsample with a voxel size 0.050.
:: Estimate normal with search radius 0.100.
:: Compute FPFH feature with search radius 0.250.


In [60]:
if chamfer_aligned < chamfer_original:
    print("ALIGNED IS BETTER")
    best_transformation = result_ransac.transformation
else:
    print("ORIGINAL IS BETTER")
    best_transformation = np.identity(4)

ORIGINAL IS BETTER


In [61]:
source_temp = copy.deepcopy(source)
source_temp.transform(best_transformation)
o3d.io.write_point_cloud('<PATH TO READY CLOUD>', source_temp)

True

In [27]:
# calculate chamfer distance
def calculate_chamfer(pcloud_1, pcloud_2, transformation):
    pcloud_1_temp = copy.deepcopy(pcloud_1)
    pcloud_2_temp = copy.deepcopy(pcloud_2)
    pcloud_1_temp.transform(transformation)
    dist_1 = pcloud_1_temp.compute_point_cloud_distance(pcloud_2_temp)
    dist_2 = pcloud_2_temp.compute_point_cloud_distance(pcloud_1_temp)
    return np.mean(dist_1) + np.mean(dist_2)

In [52]:
def refine_registration(source, target, source_fpfh, target_fpfh, voxel_size):
    distance_threshold = voxel_size * 0.4
    print(":: Point-to-plane ICP registration is applied on original point")
    print("   clouds to refine the alignment. This time we use a strict")
    print("   distance threshold %.3f." % distance_threshold)
    result = o3d.pipelines.registration.registration_icp(
        source, target, distance_threshold, result_ransac.transformation,
        o3d.pipelines.registration.TransformationEstimationPointToPlane())
    return result

In [30]:
source.estimate_normals()
target.estimate_normals()

In [31]:
result_icp = refine_registration(source, target, source_fpfh, target_fpfh,
                                 voxel_size)
print(result_icp)
draw_registration_result(source, target, result_icp.transformation)

:: Point-to-plane ICP registration is applied on original point
   clouds to refine the alignment. This time we use a strict
   distance threshold 0.020.
RegistrationResult with fitness=1.998898e-01, inlier_rmse=1.107952e-02, and correspondence_set size of 239118
Access transformation to get result.


In [32]:
def execute_fast_global_registration(source_down, target_down, source_fpfh,
                                     target_fpfh, voxel_size):
    distance_threshold = voxel_size * 0.5
    print(":: Apply fast global registration with distance threshold %.3f" \
            % distance_threshold)
    result = o3d.pipelines.registration.registration_fgr_based_on_feature_matching(
        source_down, target_down, source_fpfh, target_fpfh,
        o3d.pipelines.registration.FastGlobalRegistrationOption(
            maximum_correspondence_distance=distance_threshold))
    return result