In [None]:
from decorators import measure_time
import open3d as o3d
import numpy as np
from scipy.spatial import KDTree
import teaserpp_python


# ######################################################################################
# Carregamento e pré-processamento dos dados

def load_point_cloud(file_path: str) -> o3d.geometry.PointCloud:
    """
        Carrega um arquivo de ponto 3D em um objeto PointCloud.
    """
    # Carrega nuvem de pontos a partir do arquivo especificado
    cloud = o3d.io.read_point_cloud(file_path)
    return cloud


def preprocess_point_cloud(cloud: o3d.geometry.PointCloud,
                           voxel_size: int | float,
                           verbose=False) -> o3d.geometry.PointCloud:
    """
        Recebe uma nuvem de pontos e aplica o pre-processamento:
        - Redimensiona a nuvem de pontos para o tamanho especificado;
        - Calcula as normais;
        Referência: https://www.open3d.org/docs/release/tutorial/pipelines/global_registration.html#Extract-geometric-feature
    """
    radius_normal = voxel_size * 2
    if verbose:
        print(
            f"Redimensionando a nuvem de pontos para o tamanho {voxel_size:0.03f}.")
    cloud_downsampled = cloud.voxel_down_sample(voxel_size)
    if verbose:
        print(
            f"Calculando as normais com busca de raio {radius_normal:0.03f}.")
    cloud_downsampled.estimate_normals(
        o3d.geometry.KDTreeSearchParamHybrid(radius=radius_normal, max_nn=30))
    return cloud_downsampled


def compute_feature_descriptors(cloud: o3d.geometry.PointCloud,
                                voxel_size: int | float,
                                verbose=False) -> o3d.pipelines.registration.Feature:
    """
        Recebe uma nuvem de pontos e aplica o descritor FPFH da biblioteca Open3D.
        Referência: https://www.open3d.org/docs/release/tutorial/pipelines/global_registration.html#Extract-geometric-feature
    """
    radius_feature = voxel_size * 5
    if verbose:
        print(
            f"Processando o descritor FPFH do `Open3D` com o raio de busca de {radius_feature:0.03f}")
    feature = o3d.pipelines.registration.compute_fpfh_feature(
        cloud,
        o3d.geometry.KDTreeSearchParamHybrid(radius=radius_feature,
                                             max_nn=100))
    return feature


# ######################################################################################
# Processamento dos dados com a biblioteca Open3D
#     - Global registration usando RANSAC
#     - Fast Global Registration
#     - ICP com o método de ponto a ponto
#     - ICP com o método de ponto a plano

@measure_time
def global_registration(source_cloud: o3d.geometry.PointCloud,
                        target_cloud: o3d.geometry.PointCloud,
                        source_features: o3d.pipelines.registration.Feature,
                        target_features: o3d.pipelines.registration.Feature,
                        voxel_size: int | float,
                        verbose=False) -> o3d.pipelines.registration.RegistrationResult:
    """
        Recebe uma par de nuvem de pontos, bem como seus descritores e realiza *global registration*.
        Utiliza o método RANSAC da biblioteca Open3D.
        Referência: https://www.open3d.org/docs/release/tutorial/pipelines/global_registration.html#RANSAC
    """
    distance_threshold = voxel_size * 1.5
    max_iteration = 100000
    confidence = 0.999
    if verbose:
        print("Realiza o alinhamento grosseiro usando o método RANSAC.")
        print(f"O raio de busca é {distance_threshold:0.03f}.")
    result = o3d.pipelines.registration.registration_ransac_based_on_feature_matching(
        source_cloud, target_cloud, source_features, target_features, True,
        distance_threshold,
        o3d.pipelines.registration.TransformationEstimationPointToPoint(False),
        3, [
            o3d.pipelines.registration.CorrespondenceCheckerBasedOnEdgeLength(
                0.9),
            o3d.pipelines.registration.CorrespondenceCheckerBasedOnDistance(
                distance_threshold)
        ],
        o3d.pipelines.registration.RANSACConvergenceCriteria(
            max_iteration, confidence))
    # Executa até atingir `max_iteration` ou `confidence`.
    return result


@measure_time
def fast_global_registration(source_cloud: o3d.geometry.PointCloud,
                             target_cloud: o3d.geometry.PointCloud,
                             source_features: o3d.pipelines.registration.Feature,
                             target_features: o3d.pipelines.registration.Feature,
                             voxel_size: int | float,
                             verbose=False) -> o3d.pipelines.registration.RegistrationResult:
    """
         Recebe uma par de nuvem de pontos, bem como seus descritores e realiza *fast global registration*.
         Referência: https://www.open3d.org/docs/release/tutorial/pipelines/global_registration.html#id2
    """
    distance_threshold = voxel_size * 0.5
    if verbose:
        print("Realiza o *fast global registration*.")
        print(f"O raio de busca é {distance_threshold:0.03f}.")
    result = o3d.pipelines.registration.registration_fgr_based_on_feature_matching(
        source_cloud, target_cloud, source_features, target_features,
        o3d.pipelines.registration.FastGlobalRegistrationOption(
            maximum_correspondence_distance=distance_threshold))
    return result


@measure_time
def fine_alignment_point_to_point(source_cloud: o3d.geometry.PointCloud,
                                  target_cloud: o3d.geometry.PointCloud,
                                  initial_transform,
                                  voxel_size: int | float,
                                  verbose=False) -> o3d.pipelines.registration.RegistrationResult:
    """
        Recebe um par de nuvem de pontos, bem como uma transformação inicial para realizar um alinhamento fino.
        Utiliza o método Point to Point da biblioteca Open3D.
        Referência: https://www.open3d.org/docs/release/tutorial/pipelines/global_registration.html#Local-refinement
    """
    distance_threshold = voxel_size * 0.4
    if verbose:
        print("Realizando o alinhamento com ICP - Point to Point")
    result = o3d.pipelines.registration.registration_icp(
        source_cloud, target_cloud, distance_threshold,
        initial_transform.transformation,
        o3d.pipelines.registration.TransformationEstimationPointToPoint())
    return result


@measure_time
def fine_alignment_point_to_plane(source_cloud: o3d.geometry.PointCloud,
                                  target_cloud: o3d.geometry.PointCloud,
                                  initial_transform,
                                  voxel_size: int | float,
                                  verbose=False) -> o3d.pipelines.registration.RegistrationResult:
    """
        Recebe um par de nuvem de pontos, bem como uma transformação inicial para realizar um alinhamento fino.
        Utiliza o método Point to Plane da biblioteca Open3D.
        Referência: https://www.open3d.org/docs/release/tutorial/pipelines/global_registration.html#Local-refinement
    """
    distance_threshold = voxel_size * 0.4
    if verbose:
        print("Realizando o alinhamento com ICP - Point to Plane com ")
    result = o3d.pipelines.registration.registration_icp(
        source_cloud, target_cloud, distance_threshold,
        initial_transform.transformation,
        o3d.pipelines.registration.TransformationEstimationPointToPlane())
    return result


# ######################################################################################
# Processamento dos dados com a biblioteca TEASER++
# E algumas funções auxiliares necessárias.

def pcd2xyz(pcd):
    """
        Função auxiliar para converter um objeto PointCloud em um array numpy.
        Usando em `establish_correspondences()`
        Retirado de: https://github.com/MIT-SPARK/TEASER-plusplus/blob/master/examples/teaser_python_fpfh_icp/helpers.py
    """
    return np.asarray(pcd.points).T


def find_knn_cpu(feat0, feat1, knn=1, return_distance=False):
    """
        Função auxiliar para executar `find_correspondences()`
        Retirado de: https://github.com/MIT-SPARK/TEASER-plusplus/blob/master/examples/teaser_python_fpfh_icp/helpers.py
    """
    # feat1tree = cKDTree(feat1) # Documentação recomenda usar KDTree em vez de cKDTree
    # https://docs.scipy.org/doc/scipy/reference/generated/scipy.spatial.cKDTree.html
    feat1tree = KDTree(feat1)
    # dists, nn_inds = feat1tree.query(feat0, k=knn, n_jobs=-1) # n_jobs=-1 is not supported in cKDTree
    dists, nn_inds = feat1tree.query(feat0, k=knn, workers=-1)
    if return_distance:
        return nn_inds, dists
    else:
        return nn_inds


def find_correspondences(feats0, feats1, mutual_filter=True):
    """
        Função necessária para executar `establish_correspondences()`
        Retirado de: https://github.com/MIT-SPARK/TEASER-plusplus/blob/master/examples/teaser_python_fpfh_icp/helpers.py
    """
    nns01 = find_knn_cpu(feats0, feats1, knn=1, return_distance=False)
    corres01_idx0 = np.arange(len(nns01))
    corres01_idx1 = nns01

    if not mutual_filter:
        return corres01_idx0, corres01_idx1

    nns10 = find_knn_cpu(feats1, feats0, knn=1, return_distance=False)
    corres10_idx1 = np.arange(len(nns10))
    corres10_idx0 = nns10

    mutual_filter = (corres10_idx0[corres01_idx1] == corres01_idx0)
    corres_idx0 = corres01_idx0[mutual_filter]
    corres_idx1 = corres01_idx1[mutual_filter]
    return corres_idx0, corres_idx1


def establish_correspondences(source_cloud: o3d.geometry.PointCloud,
                              target_cloud: o3d.geometry.PointCloud,
                              source_features: o3d.pipelines.registration.Feature,
                              target_features: o3d.pipelines.registration.Feature,
                              verbose=False):
    """
        Estabelece correspondências entre os pontos de uma nuvem de pontos.
        Necessário para a função `robust_global_registration()`
        Referência: https://github.com/MIT-SPARK/TEASER-plusplus/tree/master/examples/teaser_python_fpfh_icp#4-establish-putative-correspondences
    """

    # Converte os descritores em arrays numpy
    source_features_xyz = np.array(source_features.data).T
    target_features_xyz = np.array(target_features.data).T

    #
    source_cloud_xyz = pcd2xyz(source_cloud)  # np array of size 3 by N
    target_cloud_xyz = pcd2xyz(target_cloud)  # np array of size 3 by M

    source_corrs, target_corrs = find_correspondences(source_features_xyz,
                                                      target_features_xyz,
                                                      mutual_filter=True)
    source_corrs = source_cloud_xyz[:,
                                    source_corrs]  # np array of size 3 by num_corrs
    target_corrs = target_cloud_xyz[:,
                                    target_corrs]  # np array of size 3 by num_corrs
    if verbose:
        num_corrs = source_corrs.shape[1]
        print(f'FPFH gerou {num_corrs} correspondências.')
    return source_corrs, target_corrs


@measure_time
def robust_global_registration(source_corrs,
                               target_corrs,
                               voxel_size: int | float,
                               verbose=False):
    """
    Recebe uma par de nuvem de pontos, bem como seus descritores e realiza *global registration*.
    Utiliza a biblioteca TEASER++.
    Referência: https://github.com/MIT-SPARK/TEASER-plusplus/tree/master/examples/teaser_python_fpfh_icp
    """
    noise_bound = voxel_size
    if verbose:
        print("Configurando os parâmetros do TEASER++.")
    solver_params = teaserpp_python.RobustRegistrationSolver.Params()
    solver_params.cbar2 = 1.0
    solver_params.noise_bound = noise_bound
    solver_params.estimate_scaling = False
    solver_params.inlier_selection_mode = teaserpp_python.RobustRegistrationSolver.INLIER_SELECTION_MODE.PMC_EXACT
    solver_params.rotation_tim_graph = teaserpp_python.RobustRegistrationSolver.INLIER_GRAPH_FORMULATION.CHAIN
    solver_params.rotation_estimation_algorithm = teaserpp_python.RobustRegistrationSolver.ROTATION_ESTIMATION_ALGORITHM.GNC_TLS
    solver_params.rotation_gnc_factor = 1.4
    solver_params.rotation_max_iterations = 10000
    solver_params.rotation_cost_threshold = 1e-16
    if verbose:
        print("Realiza o alinhamento grosseiro usando o método TEASER++.")
    solver = teaserpp_python.RobustRegistrationSolver(solver_params)
    solver.solve(source_corrs, target_corrs)
    return solver.getSolution()


def Rt2T(R, t):
    """
        Função auxiliar para unir a solução rotacionar e solução linear em uma única matriz
        Retirado de: https://github.com/MIT-SPARK/TEASER-plusplus/blob/master/examples/teaser_python_fpfh_icp/helpers.py
    """
    T = np.identity(4)
    T[:3, :3] = R
    T[:3, 3] = t
    return T

In [None]:
VISUALIZATION = False
VERBOSE = False
NUM_OF_EXEC = 1
DATASETS = (o3d.data.DemoICPPointClouds(),)

voxel_sizes = (0.05,)
execution_times = {}
results = {}

In [None]:
demo_clouds = o3d.data.DemoICPPointClouds()

source_cloud = o3d.io.read_point_cloud(demo_clouds.paths[0])
target_cloud = o3d.io.read_point_cloud(demo_clouds.paths[1])


In [None]:
voxel_size = 0.05
source_down = preprocess_point_cloud(source_cloud, voxel_size, VERBOSE)
target_down = preprocess_point_cloud(target_cloud, voxel_size, VERBOSE)

In [None]:
source_features = compute_feature_descriptors(source_down, voxel_size, VERBOSE)
target_features = compute_feature_descriptors(target_down, voxel_size, VERBOSE)

In [None]:
result_gr = global_registration(source_down,
                                        target_down,
                                        source_features,
                                        target_features,
                                        voxel_size,
                                        VERBOSE)

In [None]:
result_fgr = fast_global_registration(source_down,
                                              target_down,
                                              source_features,
                                              target_features,
                                              voxel_size,
                                              VERBOSE)

In [None]:
print(f"RANSAC: {result_gr}")
print(f"RANSAC: {result_gr.fitness}")
print(f"RANSAC: {result_gr.inlier_rmse}")
print(f"FGR: {result_fgr}")
print(f"FGR: {result_fgr.fitness}")
print(f"FGR: {result_fgr.inlier_rmse}")

In [None]:
# result_gr_icp_point = fine_alignment_point_to_point(source_cloud,
#                                                             target_cloud,
#                                                             result_gr,
#                                                             voxel_size,
#                                                             VERBOSE)

In [None]:
# result_gr_icp_plane = fine_alignment_point_to_plane(source_cloud,
#                                                             target_cloud,
#                                                             result_gr,
#                                                             voxel_size,
#                                                             VERBOSE)

In [None]:
# result_fgr_icp_point = fine_alignment_point_to_point(source_cloud,
#                                                              target_cloud,
#                                                              result_fgr,
#                                                              voxel_size,
#                                                              VERBOSE)

In [None]:
# result_fgr_icp_plane = fine_alignment_point_to_plane(source_cloud,
#                                                              target_cloud,
#                                                              result_fgr,
#                                                              voxel_size,
#                                                              VERBOSE)

# TEASER++

In [None]:
source_corrs, target_corrs = establish_correspondences(source_cloud,
                                                       target_cloud,
                                                       source_features,
                                                       target_features,
                                                       VERBOSE)

In [None]:
result_teaser = robust_global_registration(source_corrs,
                                           target_corrs,
                                           voxel_size,
                                           VERBOSE)