In [66]:
import numpy as np
import open3d as o3d
import os
import glob
import random

from open3d.geometry import PointCloud
from open3d.pipelines.registration import Feature, RegistrationResult

from typing import List

# FPFH + RANSAC + PlaneICP Pipeline
The merged result turns good only after great effort in finetuning the hyperparameters and looking for the best merging order.<br>
It is because of the unreliability of features captured by FPFH, which frequently lead to misalignment especially when there are highly symmetric features or insufficient overlapping points. <br>

In [67]:
folder_path = "bunny/data" 
ply_files = sorted(glob.glob(os.path.join(folder_path, "*.ply")))

In [68]:
# Eliminate randomness for reproducibility
random.seed(42)              
np.random.seed(42)            
o3d.utility.random.seed(42)  

Hyperparameters

In [69]:
voxel_size = 0.005

# Coefficient for various voxel sizes
e_downsample = 0.5
e_normal = 2 # The range of neighbored points to be considered when estimating normal vectors using PCA
e_fpfh = 4 
e_gr = 2 # Correspondance range factor
e_ICP = 2

n_sample = 4 # Minimal samples required for RANSAC.
# Coefficients for RANSAC filter constraints.
ctr_len = 0.9 
ctr_e_gr = 2
ctr_ang = 40

normal_nn = 30 # Maximum neighbors considered by PCA in normal estimation.
fpfh_nn = 100

In [70]:
def pcd_preprocess(pcd: PointCloud) -> PointCloud:
    """
    Preprocess point cloud by downsampling and estimate normals for all the points.

    Arguments:
        pcd (PointCloud):
            The pointcloud to process.

    Returns:
        pcd_down (PointCloud):
            Processed pointcloud.
    """
    # Downsample
    pcd_down = pcd.voxel_down_sample(voxel_size * e_downsample)
    # Normal vectors estimation.
    # Use KDTree to efficiently find knn.
    pcd_down.estimate_normals(o3d.geometry.KDTreeSearchParamHybrid(radius=voxel_size * e_normal, max_nn=normal_nn))
    return pcd_down

In [71]:
def fpfh_descriptor(pcd: PointCloud) -> Feature:
    """
    Generate FPFH feature descriptors for the pointcloud.

    Arguments:
        pcd (PointCloud):
            Point cloud to extract features.
    
    Returns:
        fpfh_features (Features):
            Extracted FPFH feature descriptors.
    """
    # Use KDTree to efficiently find neighbors for aggregating information measured by Darboux frame in the histogram.
    fpfh_features = o3d.pipelines.registration.compute_fpfh_feature(
        pcd, o3d.geometry.KDTreeSearchParamHybrid(radius=voxel_size * e_fpfh, max_nn=fpfh_nn))
    return fpfh_features

In [72]:
def global_registration(
        pcd_source: PointCloud, fpfh_source: Feature, pcd_target: PointCloud, fpfh_target: Feature) -> RegistrationResult:
    """
    Use global registration with RANSAC to get initial estimation of R and t to register the source cloud to the target.

    Arguments:
        pcd_source (PointCloud):
            Source that going to be aligned.
        fpfh_source (Feature):
            FPFH feauture descriptors of the source PointCloud.
        pcd_target (PointCloud):
            Target to which the point cloud will be registered.
        fpfh_target (Feature):
            FPFH features of the target point cloud.

    Returns: 
        result (RegistrationResult):
            The result of registration, containing important information like R and t.
    """

    # Criteria of RANSAC for fitting Rt.
    criteria = o3d.pipelines.registration.RANSACConvergenceCriteria(
        max_iteration=100000,  # Max amount of trials allowed
        confidence=0.999    # Confidence that the found model is correct. 
    )
    
    result = o3d.pipelines.registration.registration_ransac_based_on_feature_matching(
        pcd_source, pcd_target,
        fpfh_source, fpfh_target,
        False, # Mutual Correspondence: whether the found conrrespondences should have symmetric distances.
        max_correspondence_distance=voxel_size * e_gr, # Point pairs whose internal distances are lager than this will be excluded for correspondences.
        estimation_method=o3d.pipelines.registration.TransformationEstimationPointToPoint(False), # Use point to point distances, scale is enabled if partials have various scaling sizes.
        ransac_n=n_sample, # #Samples of RANSAC initial choices.
        checkers=[
            # o3d.pipelines.registration.CorrespondenceCheckerBasedOnEdgeLength(ctr_len), # Edges within the target and the source shoudl have similar edge lengths.
            o3d.pipelines.registration.CorrespondenceCheckerBasedOnDistance(voxel_size * ctr_e_gr), # Point to point distances should be small.
            # o3d.pipelines.registration.CorrespondenceCheckerBasedOnNormal(ctr_ang) # Normals should have similar directions
        ],
        criteria=criteria
    )
    return result

In [73]:
def planeICP(source: PointCloud, target:PointCloud, gr_result: RegistrationResult) -> RegistrationResult:
    """
    Refine the registration result using point to plane ICP, which is usually more robust than point to point ICP.

    Arguments:
        source (PointCloud):
            Source
        target (PointCloud):
            Target
        gr_result (RegistrationResult):
            The result from global assignment

    Returns:
        result_icp (RegistrationResult):
            The refined result using planeICP.
    """
    result_icp = o3d.pipelines.registration.registration_icp(
        source, target,
        max_correspondence_distance=voxel_size * e_ICP,
        init=gr_result.transformation, 
        estimation_method=o3d.pipelines.registration.TransformationEstimationPointToPlane()
    )

    return result_icp

In [74]:
def merge_point_clouds(pcd1: PointCloud, pcd2: PointCloud) -> PointCloud:
    """
    Merge 2 pointclouds together with their normal and colors, if they have.
    Then downsample the points.

    Arguments:
        pcd1,pcd2 (PointCloud):
            2 PointClouds to merge.
        
    Returns:
        merged_down (PointCloud):
            The merged point cloud.
    """
    merged = o3d.geometry.PointCloud()

    merged.points = o3d.utility.Vector3dVector(
        np.vstack([np.asarray(pcd1.points), np.asarray(pcd2.points)])
    )
    
    if pcd1.has_normals() and pcd2.has_normals():
        merged.normals = o3d.utility.Vector3dVector(
            np.vstack([np.asarray(pcd1.normals), np.asarray(pcd2.normals)])
        )

    if pcd1.has_colors() and pcd2.has_colors():
        merged.colors = o3d.utility.Vector3dVector(
            np.vstack([np.asarray(pcd1.colors), np.asarray(pcd2.colors)])
        )

    # merged_down = merged.voxel_down_sample(voxel_size * e_downsample)

    return merged

In [75]:
def incremental_registration(pcds: List[PointCloud], fpfhs: List[RegistrationResult]) -> PointCloud:
    """
    Register a list of point clouds incrementally, using FPFH + Global Registration + PlaneICP.

    Arguments:
        pcds (List[PointCloud]):
            The list of point clouds to merge.
        fpfhs (List[Feature]):
            The list of corresponding FPFH features

    Returns:
        target (PointCloud):
            The merged point cloud.
    """
    # Use the first item as the initial target.
    target = pcds[0]
    fpfh_target = fpfhs[0]
    
    for source_ind in range(1, len(pcds)):
        source = pcds[source_ind]
        fpfh_source = fpfhs[source_ind]
        
        result_gr = global_registration(source, fpfh_source, target, fpfh_target)
        result_ICP = planeICP(source, target, result_gr)

        source.transform(result_ICP.transformation)

        target = merge_point_clouds(target, source)  
        fpfh_target = fpfh_descriptor(target) # Recalculate the FPFH feature of the merged target.

    return target

In [76]:
pcds = [o3d.io.read_point_cloud(ply_file) for ply_file in ply_files]
pcds_preprocessed = [pcd_preprocess(pcd) for pcd in pcds]
fpfhs = [fpfh_descriptor(pcd_preprocessed) for pcd_preprocessed in pcds_preprocessed]

In [77]:
indices = [0, 1, 5, 9, 2, 3, 4, 6, 7, 8]
pcds_ordered = [pcds_preprocessed[i] for i in indices]
fpfhs_ordered = [fpfhs[i] for i in indices]
merged_pcd = incremental_registration(pcds_ordered, fpfhs_ordered)
o3d.visualization.draw_geometries([merged_pcd])

In [78]:
o3d.io.write_point_cloud("registered_bunny.ply", merged_pcd)

True