In [13]:
from typing import Tuple, List

import numpy as np
import open3d as o3d
import os
import copy
import numpy as np
import matplotlib.pyplot as plt
import exercises.tools.utils as utils
from matplotlib import animation, rc
import matplotlib.pyplot as plt
from scipy.stats import multivariate_normal
from IPython.display import display, Math, Latex, Markdown, HTML

%matplotlib inline

In [7]:
map_cloud = o3d.io.read_point_cloud("app/dataset/map.pcd") #projects/dataset/map.pcd

# Print the number of points in the map cloud
print(f"Loaded {len(map_cloud.points)} data points from map.pcd")

Loaded 143949 data points from map.pcd


In [10]:
frames_dir = "app/dataset/frames"
frame_files = os.listdir(frames_dir)
frames = [o3d.io.read_point_cloud(os.path.join(frames_dir, f)) for f in frame_files]

In [11]:
print(frames[0].get_center())

[-0.29560711 -0.48462959 -0.79555747]


In [14]:
def draw_registration_result(source, target, transformation):
    source_temp = copy.deepcopy(source)
    target_temp = copy.deepcopy(target)
    source_temp.paint_uniform_color([1, 0.706, 0])
    target_temp.paint_uniform_color([0, 0.651, 0.929])
    source_temp.transform(transformation)
    o3d.visualization.draw_geometries([source_temp, target_temp],
                                      zoom=0.4459,
                                      front=[0.9288, -0.2951, -0.2242],
                                      lookat=[1.6784, 2.0612, 1.4451],
                                      up=[-0.3402, -0.9189, -0.1996])

In [15]:
#visualize registration
threshold = 0.02
trans_init = np.asarray([[0.862, 0.011, -0.507, 0.5],
                         [-0.139, 0.967, -0.215, 0.7],
                         [0.487, 0.255, 0.835, -1.4], [0.0, 0.0, 0.0, 1.0]])
draw_registration_result(map_cloud, frames[0], trans_init)

In [16]:
print("Initial alignment")
evaluation = o3d.pipelines.registration.evaluate_registration(
    map_cloud, frames[0], threshold, trans_init)
print(evaluation)

Initial alignment
RegistrationResult with fitness=6.946905e-06, inlier_rmse=2.677878e-03, and correspondence_set size of 1
Access transformation to get result.


In [26]:
#lib wrapper for cpp registration libary from: https://github.com/hummat/registration
import os
import ctypes
import csv

import numpy as np


def load_library(path: str = os.getcwd(), name: str = "libregistration_pcl110") -> None:
    global REGLIB
    try:
        REGLIB = np.ctypeslib.load_library(libname=name, loader_path=path)
        print(REGLIB)
    except OSError:
        print("Compiled C++ library was not found in the current directory. Please use `load_library` to load it from "
              "a custom directory, then ignore this message.")


load_library()


def load_data(path: str, delimiter: str = ' ') -> np.ndarray:
    """Loads point cloud data of type `CSV`, `PLY` and `PCD`.

    The file should contain one point per line where each number is separated by the `delimiter` character.
    Any none numeric lines will be skipped.

    Args:
        path (str): The path to the file.
        delimiter (char): Separation of numbers in each line of the file.

    Returns:
        A ndarray of shape NxD where `N` are the number of points in the point cloud and `D` their dimension.
    """
    data = list()
    with open(path, newline='\n') as file:
        reader = csv.reader(file, delimiter=delimiter, quoting=csv.QUOTE_NONNUMERIC)
        lines = 0
        skips = 0
        while True:
            try:
                row = next(reader)
                row = [x for x in row if not isinstance(x, str)]
                if len(row) in [3, 6, 9]:
                    data.append(row[:3])
                else:
                    skips += 1
            except ValueError:
                skips += 1
                pass
            except StopIteration:
                print(f"Found {lines} lines. Skipped {skips}. Loaded {lines - skips} points.")
                break
            lines += 1
    return np.array(data)


def set_argtypes(algorithm, source, target):
    """Tells the underlying C++ code which data types and dimensions to expect.

    Args:
        algorithm (str): The registration algorithm to use. One of `icp` or `ndt`.
        source (ndarray): The source point cloud.
        target (ndarray): The target point cloud.
    """
    REGLIB.icp.restype = ctypes.c_double
    REGLIB.ndt.restype = ctypes.c_double
    argtypes = [np.ctypeslib.ndpointer(dtype=np.float64, ndim=source.ndim, shape=source.shape,
                                       flags='C_CONTIGUOUS'), ctypes.c_size_t,
                np.ctypeslib.ndpointer(dtype=np.float64, ndim=target.ndim, shape=target.shape,
                                       flags='C_CONTIGUOUS'), ctypes.c_size_t,
                np.ctypeslib.ndpointer(dtype=np.float64, ndim=2, shape=(4, 4), flags='C_CONTIGUOUS'),
                ctypes.c_int, ctypes.c_double, ctypes.c_double, ctypes.c_double, ctypes.c_double, ctypes.c_bool]
    if algorithm == 'icp':
        REGLIB.icp.argtypes = argtypes
    elif algorithm == 'ndt':
        argtypes.extend([ctypes.c_float, ctypes.c_double, ctypes.c_float])
        REGLIB.ndt.argtypes = argtypes


def icp(source,
        target,
        nr_iterations=25,
        distance_threshold=1.0,
        epsilon=0.01,
        inlier_threshold=0.05,
        downsample=0,
        visualize=False):
    """The `Iterative Closest Point` (ICP) algorithm.

    Args:
        source (ndarray): The point cloud that we want to align to the target.
        target (ndarray): The point cloud that the source is aligned to.
        nr_iterations (int): The maximum number of iterations the internal optimization should run for.
        distance_threshold (float): The maximum distance threshold between two correspondent points in
                                    source -> target. If the distance is larger than this threshold, the points will
                                    be ignored in the alignment process.
        epsilon (float): The transformation epsilon (maximum allowable difference between two consecutive
                 transformations) in order for an optimization to be considered as having converged to the final
                 solution.
        inlier_threshold (float): The inlier distance threshold for the internal RANSAC outlier rejection loop.
                          The method considers a point to be an inlier, if the distance between the target data
                          index and the transformed source index is smaller than the given inlier distance
                          threshold.
        downsample (float): Assembles a local 3D grid over a given PointCloud and downsamples + filters the data.
        visualize (bool): Can be used to visualize and control the progress of the algorithm.

    Returns:
        A ndarray with the final transformation matrix between source and target.
    """

    transformation = np.identity(4)
    set_argtypes('icp', source, target)
    score = REGLIB.icp(source, len(source), target, len(target), transformation,
                       nr_iterations, distance_threshold, epsilon, inlier_threshold, downsample, visualize)
    print(f"ICP converged. Fitness score: {score:.2f}") if score > 0 else print("ICP did not converge!")
    return transformation


def ndt(source,
        target,
        nr_iterations=25,
        distance_threshold=1.0,
        epsilon=0.01,
        inlier_threshold=0.05,
        downsample=0,
        visualize=False,
        resolution=1.0,
        step_size=0.1,
        voxelize=0):
    """The `Normal Distributions Transform` (NDT) algorithm.

    Args:
        source (ndarray): The point cloud that we want to align to the target.
        target (ndarray): The point cloud that the source is aligned to.
        nr_iterations (int): The maximum number of iterations the internal optimization should run for.
        distance_threshold (float): The maximum distance threshold between two correspondent points in
                                    source -> target. If the distance is larger than this threshold, the points will
                                    be ignored in the alignment process.
        epsilon (float): The transformation epsilon (maximum allowable difference between two consecutive
                 transformations) in order for an optimization to be considered as having converged to the final
                 solution.
        inlier_threshold (float): The inlier distance threshold for the internal RANSAC outlier rejection loop.
                          The method considers a point to be an inlier, if the distance between the target data
                          index and the transformed source index is smaller than the given inlier distance
                          threshold.
        downsample (float): Assembles a local 3D grid over a given PointCloud and downsamples + filters the data.
        visualize (bool): Can be used to visualize and control the progress of the algorithm.
        resolution (float): The resolution of the voxel grid. Try increasing this in case of core dumps.
        step_size (float): The Newton line search maximum step length.
        voxelize (bool): If set to `True`, the source cloud is converted into a voxel model before alignment.

    Returns:
        A ndarray with the final transformation matrix between source and target.
    """

    transformation = np.identity(4)
    set_argtypes('ndt', source, target)
    score = REGLIB.ndt(source, len(source), target, len(target), transformation,
                  nr_iterations, distance_threshold, epsilon, inlier_threshold, downsample, visualize,
                  resolution, step_size, voxelize)
    print(f"NDT converged. Fitness score: {score:.2f}") if score > 0 else print("NDT did not converge!")
    return transformation

Compiled C++ library was not found in the current directory. Please use `load_library` to load it from a custom directory, then ignore this message.


In [30]:
import os
import time



    # Only needed if you want to use manually compiled library code
    # reglib.load_library(os.path.join(os.curdir, "cmake-build-debug"))

    # Load you data)
source_points = load_data("app/dataset/frames/frame_1.pcd")
target_points = load_data("app/dataset/map.pcd")

    # Run the registration algorithm
start = time.time()
load_library("app/libregistration_pcl110.so")
trans = ndt(source=source_points, target=target_points, nr_iterations=1, epsilon=0.01,
                       inlier_threshold=0.05, distance_threshold=5.0, downsample=0, visualize=True)
                       #resolution=12.0, step_size=0.5, voxelize=0)
print("Runtime:", time.time() - start)
print(trans)

Found 22369 lines. Skipped 11. Loaded 22358 points.
Found 143960 lines. Skipped 11. Loaded 143949 points.
<CDLL '/app/libregistration_pcl110.so', handle 1035ffa0 at 0x7f08e371f9a0>
NDT converged. Fitness score: 0.04
Runtime: 129.18612122535706
[[ 9.99998927e-01 -1.14443805e-03  9.44722851e-04  8.39681830e-04]
 [ 1.14385341e-03  9.99999166e-01  6.19078171e-04  1.50578637e-02]
 [-9.45430540e-04 -6.17996848e-04  9.99999404e-01 -9.88431573e-02]
 [ 0.00000000e+00  0.00000000e+00  0.00000000e+00  1.00000000e+00]]
