<a href="https://colab.research.google.com/github/JohnFGC/Stanford_AA274A/blob/main/HW4_P1_ICP.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# ICP For Point-Cloud Registration

In this problem you will be implementing the ICP (Iterative Closest Point) algorithm to register two point clouds. Registration refers to aligning to be coherent. In robotics, this is needed when a lidar scan from different parts of a room for instance need to be joined together to create a full map of the environment.You can see some examples in [this paper](http://redwood-data.org/indoor_lidar_rgbd/paper.pdf) if you're interested.

Let's get started by first installing some required packages.

# Setup

In [1]:
!pip install open3d

Collecting open3d
  Downloading open3d-0.19.0-cp312-cp312-manylinux_2_31_x86_64.whl.metadata (4.3 kB)
Collecting dash>=2.6.0 (from open3d)
  Downloading dash-3.2.0-py3-none-any.whl.metadata (10 kB)
Collecting configargparse (from open3d)
  Downloading configargparse-1.7.1-py3-none-any.whl.metadata (24 kB)
Collecting ipywidgets>=8.0.4 (from open3d)
  Downloading ipywidgets-8.1.7-py3-none-any.whl.metadata (2.4 kB)
Collecting addict (from open3d)
  Downloading addict-2.4.0-py3-none-any.whl.metadata (1.0 kB)
Collecting pyquaternion (from open3d)
  Downloading pyquaternion-0.9.9-py3-none-any.whl.metadata (1.4 kB)
Collecting retrying (from dash>=2.6.0->open3d)
  Downloading retrying-1.4.2-py3-none-any.whl.metadata (5.5 kB)
Collecting comm>=0.1.3 (from ipywidgets>=8.0.4->open3d)
  Downloading comm-0.2.3-py3-none-any.whl.metadata (3.7 kB)
Collecting widgetsnbextension~=4.0.14 (from ipywidgets>=8.0.4->open3d)
  Downloading widgetsnbextension-4.0.14-py3-none-any.whl.metadata (1.6 kB)
Collecting 

In [2]:
import open3d as o3d
import numpy as np
import matplotlib.pyplot as plt
import ipywidgets as widgets
from IPython.display import display

## Visualization Code for Point Clouds

In [3]:
def plot_point_clouds(pcd_list, azim=-60, elev=30):
    fig = plt.figure(figsize=(8, 8))
    ax = fig.add_subplot(111, projection='3d')
    color_fns = [plt.cm.viridis, plt.cm.inferno]
    for i, pcd in enumerate(pcd_list):
      points = np.asarray(pcd.points)
      colors = color_fns[i](points[:, 2] / points[:, 2].max())
      ax.scatter(points[:, 0], points[:, 1], points[:, 2], s=10, c=colors)
    ax.set_xlabel('X')
    ax.set_ylabel('Y')
    ax.set_zlabel('Z')
    ax.view_init(azim=azim, elev=elev)

    plt.show()

# Data Setup

For our experiments in this homework, we will be using the famous [Stanford Bunny.](https://graphics.stanford.edu/data/3Dscanrep/)

In [4]:
# Downloading Point Cloud Data
!gdown 1DPT5Zy2lDtbRNmMnug1zrrXqQT-5ZSV1
!gdown 1sXF6TmEpPf0EZyzn3bz_h_ppJHfUiwcS

Downloading...
From: https://drive.google.com/uc?id=1DPT5Zy2lDtbRNmMnug1zrrXqQT-5ZSV1
To: /content/bun_zipper.ply
100% 3.03M/3.03M [00:00<00:00, 132MB/s]
Downloading...
From: https://drive.google.com/uc?id=1sXF6TmEpPf0EZyzn3bz_h_ppJHfUiwcS
To: /content/bun045.ply
100% 1.97M/1.97M [00:00<00:00, 72.9MB/s]


In [5]:
full_bunny = o3d.io.read_point_cloud("bun_zipper.ply")

In [6]:
# `pcd` refers to "point cloud data"
# Our goal throughout this notebook will be to match the point cloud data
# containing a partial view of the bunny
# with the full 3D mesh model of the same bunny
pcd = o3d.io.read_point_cloud("bun045.ply")

In [7]:
# pcd is an open3d data structure with different attributes
# You can access the 3D points representing the lidar data like this:
pcd_np = np.asarray(pcd.points)
print(pcd_np.shape)

(40097, 3)


Let's visualize the full bunny mesh model. Feel free to use the slider bars to view the bunny from different angles.

In [8]:
widgets.interact(
    plot_point_clouds,
    pcd_list=widgets.fixed([full_bunny]),
    azim=widgets.IntSlider(-90, min=-180, max=180, step=5, description="Azimuth"),
    elev=widgets.IntSlider(90, min=0, max=90, step=5, description="Elevation")
)

interactive(children=(IntSlider(value=-90, description='Azimuth', max=180, min=-180, step=5), IntSlider(value=…

Let us now visualize the partial point cloud data. Again, feel free to use the sliders. You should be able to see that this scan is not a complete scan of the bunny. We will be referring to this partial point cloud data as the "source", and the full scan of the bunny as the "target".

The goal therefore will be to match points in the source to the target. Mathematically, we can say that our overarching goal is to find a matrix T, such that

$P_{source} T = P_{target}$

where $P_i$ represents a matrix of the point cloud data

In [9]:
widgets.interact(
    plot_point_clouds,
    pcd_list=widgets.fixed([pcd]),
    azim=widgets.IntSlider(-90, min=-180, max=180, step=5, description="Azimuth"),
    elev=widgets.IntSlider(90, min=0, max=90, step=5, description="Elevation")
)

interactive(children=(IntSlider(value=-90, description='Azimuth', max=180, min=-180, step=5), IntSlider(value=…

# Global Registration

Before we can use the ICP algorithm, we need to generate an initial transformation that can represents our best guess for registering the two point clouds. We will do this using the RANSAC algorithm.

**You do not have to write any code for this part of the problem. You are highly encouraged to follow along to gain a better understanding of the process.**

First, let's visualize both the source and target point clouds together so that we can see whether they overlap or not.

In [10]:
def plot_point_clouds(pcd_list, azim=-60, elev=30):
    fig = plt.figure(figsize=(8, 8))
    ax = fig.add_subplot(111, projection='3d')
    color_fns = [plt.cm.viridis, plt.cm.inferno]
    for i, pcd in enumerate(pcd_list):
      points = np.asarray(pcd.points)
      colors = color_fns[i](points[:, 2] / points[:, 2].max())
      ax.scatter(points[:, 0], points[:, 1], points[:, 2], s=10, c=colors)
    ax.set_xlabel('X')
    ax.set_ylabel('Y')
    ax.set_zlabel('Z')
    ax.view_init(azim=azim, elev=elev)

    plt.show()

# Interactive visualization in Jupyter Notebook using ipywidgets
widgets.interact(
    plot_point_clouds,
    pcd_list=widgets.fixed([full_bunny, pcd]),
    azim=widgets.IntSlider(-90, min=-180, max=180, step=5, description="Azimuth"),
    elev=widgets.IntSlider(90, min=0, max=90, step=5, description="Elevation")
)

interactive(children=(IntSlider(value=-90, description='Azimuth', max=180, min=-180, step=5), IntSlider(value=…

We can see that the two point clouds are clearly misaligned right now. So first, we will extract some features of each point cloud. These features, called the "FPFH" features of the point clouds are a vector of 33 values for each point in the point cloud that represents some unique features of that point. Therefore, if you have N points in your point cloud, your FPFH feature matrix for that point cloud will be of shape (N, 33).

Since N can be really large for raw point cloud data, we will downsample it a bit so that it is easier to experiment with.

In [11]:
# Extract features
def preprocess_point_cloud(pcd, voxel_size):
    print(":: Downsample with a voxel size %.3f." % voxel_size)
    pcd_down = pcd.voxel_down_sample(voxel_size)

    radius_normal = voxel_size * 2
    print(":: Estimate normal with search radius %.3f." % radius_normal)
    pcd_down.estimate_normals(
        o3d.geometry.KDTreeSearchParamHybrid(radius=radius_normal, max_nn=30))

    radius_feature = voxel_size * 5
    print(":: Compute FPFH feature with search radius %.3f." % radius_feature)
    pcd_fpfh = o3d.pipelines.registration.compute_fpfh_feature(
        pcd_down,
        o3d.geometry.KDTreeSearchParamHybrid(radius=radius_feature, max_nn=100))
    return pcd_down, pcd_fpfh

In [12]:
def prepare_dataset(source, target, voxel_size):
    source_down, source_fpfh = preprocess_point_cloud(source, voxel_size)
    target_down, target_fpfh = preprocess_point_cloud(target, voxel_size)
    return source, target, source_down, target_down, source_fpfh, target_fpfh

In [13]:
source, target, source_down, target_down, source_fpfh, target_fpfh = prepare_dataset(pcd, full_bunny, 0.01)

:: Downsample with a voxel size 0.010.
:: Estimate normal with search radius 0.020.
:: Compute FPFH feature with search radius 0.050.
:: Downsample with a voxel size 0.010.
:: Estimate normal with search radius 0.020.
:: Compute FPFH feature with search radius 0.050.


This code block will run the RANSAC algorithm using the open3d library. You can see that the function returns a transformation matrix, which we can apply to our source point cloud data.

In [14]:
def execute_global_registration(source_down, target_down, source_fpfh,
                                target_fpfh, voxel_size):
    distance_threshold = voxel_size * 1.5
    print(":: RANSAC registration on downsampled point clouds.")
    print("   Since the downsampling voxel size is %.3f," % voxel_size)
    print("   we use a liberal distance threshold %.3f." % distance_threshold)
    result = o3d.pipelines.registration.registration_ransac_based_on_feature_matching(
        source_down, target_down, source_fpfh, target_fpfh, False, distance_threshold)
    return result.transformation

In [15]:
T = execute_global_registration(source_down, target_down, source_fpfh, target_fpfh, voxel_size=0.01)

:: RANSAC registration on downsampled point clouds.
   Since the downsampling voxel size is 0.010,
   we use a liberal distance threshold 0.015.


In [16]:
print("T shape:", T.shape)
T

T shape: (4, 4)


array([[ 0.85860813,  0.00212961,  0.51262808, -0.05450511],
       [-0.05257434,  0.9950843 ,  0.08392364, -0.00383452],
       [-0.50992942, -0.0990086 ,  0.85449943, -0.00540713],
       [ 0.        ,  0.        ,  0.        ,  1.        ]])

# After RANSAC

Let's now apply this transformation matrix to our point cloud data to see how well the point clouds are now registered.

In [17]:
from copy import deepcopy

lidar_transformed = deepcopy(pcd).transform(T)

In [18]:
widgets.interact(
    plot_point_clouds,
    pcd_list=widgets.fixed([full_bunny, lidar_transformed]),
    azim=widgets.IntSlider(-90, min=-180, max=180, step=5, description="Azimuth"),
    elev=widgets.IntSlider(90, min=0, max=90, step=5, description="Elevation")
)

interactive(children=(IntSlider(value=-90, description='Azimuth', max=180, min=-180, step=5), IntSlider(value=…

As you can see, they are much better aligned than before but need fine-tuning.We will now use ICP to try to improve this alignment, a process also called "Local Refinement"

# Basic ICP

In this section, we will implement a basic version of the ICP algorithm. The ICP algorithm has the following steps:



1.   For every point in the source point cloud, find its nearest neighbor in the target point cloud
2.   Find a matrix T that maps points in the source point cloud to the target point cloud while minimizing the euclidean distance between a point and its nearest neighbor (this will be our **error** for each point)
3. Apply this transformation, T, to the source point cloud data
4. Compute the average error for this transformation across all points in the source point cloud data
5. Repeat steps 1 - 4 for N iterations, or break if:

$abs(e_t - e_{t-1}) < \tau$

where $e_i$ is the average error in the i-th iteration and tau is the tolerance set as a hyperparameter.



In [19]:
from sklearn.neighbors import NearestNeighbors
import cv2
from tqdm import tqdm, trange

In [20]:


def best_fit_transform(A, B):
    '''
    Calculates the least-squares best-fit transform that maps corresponding points A to B in m spatial dimensions
    Input:
      A: Naxm numpy array of corresponding points
      B: Nbxm numpy array of corresponding points
    Returns:
      T: (m+1)x(m+1) homogeneous transformation matrix that maps A on to B
      R: mxm rotation matrix
      t: mx1 translation vector
    '''
    # get number of dimensions
    m = A.shape[1]

    # translate points to their centroids
    centroid_A = np.mean(A, axis=0)
    centroid_B = np.mean(B, axis=0)
    AA = A - centroid_A
    BB = B - centroid_B

    # rotation matrix
    H = np.dot(AA.T, BB)
    U, S, Vt = np.linalg.svd(H)
    R = np.dot(Vt.T, U.T)

    # special reflection case
    if np.linalg.det(R) < 0:
       Vt[m-1,:] *= -1
       R = np.dot(Vt.T, U.T)

    # translation
    t = centroid_B.T - np.dot(R,centroid_A.T)

    # homogeneous transformation
    T = np.identity(m+1)
    T[:m, :m] = R
    T[:m, m] = t

    return T, R, t


def nearest_neighbor(src, dst, radius=0.01):
    '''
    Find the nearest (Euclidean) neighbor in dst for each point in src
    Input:
        src: Nxm array of points
        dst: Nxm array of points
    Output:
        distances: Euclidean distances of the nearest neighbor
        indices: dst indices of the nearest neighbor
    '''
    ######################################################################
    ######################### YOUR CODE HERE #############################
    # Use the NearestNeighbors class sklearn.neighbors to compute the nearest
    # neighbor of each point in the source dataset to the target dataset
    # Note that by using this class and its methods correctly, you should be
    # able to get the distance between a point and its nearest neighbor,
    # as well as the indices of the nearest neighbor in the target point cloud
    nbrs = NearestNeighbors(n_neighbors=1, algorithm='ball_tree').fit(dst)
    distances, indices = nbrs.kneighbors(src)
    ######################################################################
    return distances.ravel(), indices.ravel()


def icp(A, B, init_pose=None, max_iterations=20, tolerance=0.001, knn_radius=0.01):
    '''
    The Iterative Closest Point method: finds best-fit transform that maps points A on to points B
    Input:
        A: Nxm numpy array of source mD points
        B: Nxm numpy array of destination mD point
        init_pose: (m+1)x(m+1) homogeneous transformation
        max_iterations: exit algorithm after max_iterations
        tolerance: convergence criteria
    Output:
        T: final homogeneous transformation that maps A on to B
        distances: Euclidean distances (errors) of the nearest neighbor
        i: number of iterations to converge
    '''
    # get number of dimensions
    m = A.shape[1]

    # make points homogeneous, copy them to maintain the originals
    src = np.ones((m+1,A.shape[0]))
    dst = np.ones((m+1,B.shape[0]))
    src[:m,:] = np.copy(A.T)
    dst[:m,:] = np.copy(B.T)

    # apply the initial pose estimation
    if init_pose is not None:
        src = np.dot(init_pose, src)

    prev_error = 0
    ######################################################################
    ######################### YOUR CODE HERE #############################
    # Write the loop for the ICP algorithm here
    # Follow the steps outlined in the prompt above.
    # Hints:
    # - Use the functions `nearest_neighbor()` and `best_fit_transform()`
    # - src and dst matrices are defined above with shape (m+1,N), while the above
    #   two function expect matrices of shape (N,m)
    for i in trange(max_iterations):
      nn_dists, nn_inds = nearest_neighbor(src[:m,:].T, dst[:m,:].T)
      T,_,_ = best_fit_transform(src[:m,:].T, dst[:m,nn_inds].T)
      transformed_src = np.dot(T, src)
      error = np.mean(nn_dists)
      if np.abs(prev_error - error) < tolerance:
        break
      src = transformed_src
      prev_error = error
    ######################################################################
    # calculate final transformation
    T,_,_ = best_fit_transform(A, src[:m,:].T)

    return T

In [21]:
source_np = np.asarray(source_down.points)
target_np = np.asarray(target_down.points)

T_icp = icp(source_np, target_np, T, max_iterations=20, tolerance=1e-5, knn_radius=1e-2)

 45%|████▌     | 9/20 [00:00<00:00, 159.01it/s]


In [22]:
pcd_down_ICP = deepcopy(source_down).transform(T_icp) # transformed downsampled pcd
pcd_ICP = deepcopy(source).transform(T_icp) # transformed complete size pcd

If your implementation of ICP is correct, you should see that the point clouds roughly align. The source point cloud can seem invisible, which means that it can be aligned to be inside the target point cloud, which is acceptable at this stage. This happens because we implemented a very basic version of ICP that is not robust to noise, outliers, etc.

In [23]:
widgets.interact(
    plot_point_clouds,
    pcd_list=widgets.fixed([full_bunny, pcd_ICP]),
    azim=widgets.IntSlider(-90, min=-180, max=180, step=5, description="Azimuth"),
    elev=widgets.IntSlider(90, min=0, max=90, step=5, description="Elevation")
)

interactive(children=(IntSlider(value=-90, description='Azimuth', max=180, min=-180, step=5), IntSlider(value=…

This visualization should be slightly easier to view and you should see some points from the source point cloud along the edges of the bunny

In [24]:
widgets.interact(
    plot_point_clouds,
    pcd_list=widgets.fixed([full_bunny, pcd_down_ICP]),
    azim=widgets.IntSlider(-90, min=-180, max=180, step=5, description="Azimuth"),
    elev=widgets.IntSlider(90, min=0, max=90, step=5, description="Elevation")
)

interactive(children=(IntSlider(value=-90, description='Azimuth', max=180, min=-180, step=5), IntSlider(value=…

# Robust ICP Using open3D

We will now use the open3D library's functions to use a robust version of the ICP algorithm. You do not have to write the ICP algorithm itself, but you will only be using library functions from open3D.

Follow [this tutorial](http://www.open3d.org/docs/release/tutorial/pipelines/icp_registration.html#Point-to-point-ICP) to see which functions to call and how to use them. You should be calling the ICP registration method, use the point-to-point transformation estimation method, and include a convergence critera to set a max number of iterations. Everything should be available in this same tutorial, you shouldn't have to google or find anything else.

In [25]:
threshold = 0.01
max_iterations=3000

In [26]:
print("Apply point-to-point ICP")
######################################################################
######################### YOUR CODE HERE #############################
trans_init = np.identity(4)
reg_p2p = o3d.pipelines.registration.registration_icp(
    source, target, threshold, trans_init,
    o3d.pipelines.registration.TransformationEstimationPointToPoint())
######################################################################
print(reg_p2p)
print("Transformation is:")
print(reg_p2p.transformation)

Apply point-to-point ICP
RegistrationResult with fitness=1.000000e+00, inlier_rmse=1.508278e-03, and correspondence_set size of 40097
Access transformation to get result.
Transformation is:
[[ 0.7981984  -0.02791668  0.60174743 -0.04948853]
 [ 0.00424178  0.9991613   0.04072719 -0.00194528]
 [-0.60237971 -0.02995589  0.79764737 -0.00838935]
 [ 0.          0.          0.          1.        ]]


In [27]:
pcd_robust_ICP = deepcopy(pcd).transform(reg_p2p.transformation)

In [29]:
widgets.interact(
    plot_point_clouds,
    pcd_list=widgets.fixed([full_bunny, pcd_robust_ICP]),
    azim=widgets.IntSlider(-90, min=-180, max=180, step=5, description="Azimuth"),
    elev=widgets.IntSlider(90, min=0, max=90, step=5, description="Elevation")
)

interactive(children=(IntSlider(value=-90, description='Azimuth', max=180, min=-180, step=5), IntSlider(value=…

This visualization should now look very well aligned. If done correctly, you should see the source and target point clouds mixing with each other a bit.