In [1]:
import open3d as o3d
import numpy as np
import copy

from hacman_real_env.pcd_env import PCDObsEnv
env = PCDObsEnv(camera_alignment=False) # set to False to start from scratch

colors = [np.random.rand(3) for i in range(100)]

Jupyter environment detected. Enabling Open3D WebVisualizer.
[Open3D INFO] WebRTC GUI backend enabled.
[Open3D INFO] WebRTCWindowSystem: HTTP handshake server disabled.
Started camera 0
Started camera 2
Started camera 3


## Single Camera PCD Preprocessing

Processing consists of the following steps:
- random downsample to 1/4 of the points
- voxel downsample
- remove outliers
- estimate normals
- compute FPFH features

In [2]:
def display_inlier_outlier(cloud, ind):
    # Compute normals to help visualize
    # cloud.estimate_normals(search_param=o3d.geometry.KDTreeSearchParamHybrid(radius=0.1, max_nn=30))

    inlier_cloud = cloud.select_by_index(ind)
    inlier_cloud.estimate_normals(search_param=o3d.geometry.KDTreeSearchParamHybrid(radius=0.1, max_nn=30))
    outlier_cloud = cloud.select_by_index(ind, invert=True)

    print("Showing outliers (red) and inliers (gray): ")
    outlier_cloud.paint_uniform_color([1, 0, 0])
    inlier_cloud.paint_uniform_color([0.8, 0.8, 0.8])
    o3d.visualization.draw_geometries([inlier_cloud, outlier_cloud])

def preprocess_point_cloud(pcd, voxel_size):
    print(":: Random downsample to 1/5 size.")
    pcd_size = len(pcd.points)
    pcd_down_mask = np.random.choice(pcd_size, int(pcd_size / 5))
    pcd_down = pcd.select_by_index(pcd_down_mask)

    print(":: Downsample with a voxel size %.3f." % voxel_size)
    pcd_down = pcd_down.voxel_down_sample(voxel_size)

    radius = 0.02
    print(":: Remove radius outlier with radius %.3f." % radius)
    nb_points = int(((radius / voxel_size) ** 2) * 0.8) 
    cl, ind = pcd_down.remove_radius_outlier(nb_points=nb_points, radius=radius)
    # cl, ind = pcd_down.remove_statistical_outlier(nb_neighbors=20,
    #                                                 std_ratio=1)
    display_inlier_outlier(pcd_down, ind)
    pcd_down = cl

    radius_normal = voxel_size * 4
    print(":: Estimate normal with search radius %.3f." % radius_normal)
    pcd_down.estimate_normals(
        o3d.geometry.KDTreeSearchParamHybrid(radius=radius_normal, max_nn=30))

    radius_feature = voxel_size * 5
    print(":: Compute FPFH feature with search radius %.3f." % radius_feature)
    pcd_fpfh = o3d.pipelines.registration.compute_fpfh_feature(
        pcd_down,
        o3d.geometry.KDTreeSearchParamHybrid(radius=radius_feature, max_nn=100))
    return pcd_down, pcd_fpfh

voxel_size = 0.005
# Obtain the point cloud
pcd = env.get_single_raw_pcd(0)
pcd = o3d.geometry.PointCloud(o3d.utility.Vector3dVector(pcd))
print(f"PCD shape: {len(pcd.points)}")
pcd_down, pcd_fpfh = preprocess_point_cloud(pcd, voxel_size)
o3d.visualization.draw_geometries([pcd_down])

PCD shape: 340717
:: Random downsample to 1/5 size.
:: Downsample with a voxel size 0.005.
:: Remove radius outlier with radius 0.020.
Showing outliers (red) and inliers (gray): 
:: Estimate normal with search radius 0.020.
:: Compute FPFH feature with search radius 0.025.


In [3]:
# To test if the pcd looks good after preprocessing
pcd = o3d.geometry.PointCloud()
for cam_id in env.camera_indices:
    processed_pcd = env.get_single_pcd(cam_id)
    processed_pcd.paint_uniform_color(colors[cam_id])
    pcd += processed_pcd
o3d.visualization.draw_geometries([pcd])

## Aligning Multiple Cameras

In [5]:
def draw_registration_result(source, target, transformation):
    source_temp = copy.deepcopy(source)
    target_temp = copy.deepcopy(target)
    source_temp.paint_uniform_color([1, 0.706, 0])
    target_temp.paint_uniform_color([0, 0.651, 0.929])
    source_temp.transform(transformation)
    o3d.visualization.draw_geometries([source_temp, target_temp])

def compute_align_to_target(target_pcd, other_pcds, threshold=0.01, visualize=False):
    """
    Compute alignments from other_pcds to base_pcd
    
    Input:
        target_pcd: Open3D point cloud
        other_pcds: dict of Open3D point clouds {cam_id: pcd}
    Return:
        dict of transforms {cam_id: transforms}
    """
    transforms = {}
    for cam_id, source in other_pcds.items():        
        print(f":: Aligning camera {cam_id} with target pcd")
        print(":: Apply point-to-point ICP")
        trans_init = np.identity(4)
        # reg_p2p = o3d.pipelines.registration.registration_icp(
        #     source, target, threshold, trans_init,
        #     o3d.pipelines.registration.TransformationEstimationPointToPoint())
        reg_p2p = o3d.pipelines.registration.registration_icp(
            source, target_pcd, threshold, trans_init,
            o3d.pipelines.registration.TransformationEstimationPointToPlane())
        print(reg_p2p)
        print("Transformation is:")
        print(reg_p2p.transformation)
        if visualize:
            draw_registration_result(source, target_pcd, reg_p2p.transformation)

        transforms[cam_id] = reg_p2p.transformation.copy()
    
    # For base camer, set to identity
    transforms[base_cam_id] = np.identity(4)
    
    return transforms

def align_pcds(pcds, transforms):
    """
    Align point clouds using transforms
    
    Input:
        pcds: dict of Open3D point clouds {cam_id: pcd}
        transforms: dict of transforms {cam_id: transforms}.
    Return:
        Open3D point cloud
    """
    transformed_pcds = o3d.geometry.PointCloud()
    for cam_id in env.camera_indices:
        transformed_pcd = pcds[cam_id].transform(transforms[cam_id])
        transformed_pcd.paint_uniform_color(colors[cam_id])
        transformed_pcds += transformed_pcd
    
    return transformed_pcds


base_cam_id = 0     # Set all other cameras to align with camera 0

# Obtain individual point clouds
pcds = {}
for cam_id in env.camera_indices:
    pcds[cam_id] = env.get_single_pcd(cam_id)

# Compute alignments
transforms = compute_align_to_target(
    pcds[base_cam_id], pcds, threshold=0.01, visualize=False) # Set to True to visualize pairwise alignment

# Transform all point clouds
camera_aligned_pcds = align_pcds(pcds, transforms)
o3d.visualization.draw_geometries([camera_aligned_pcds])

:: Aligning camera 0 with target pcd
:: Apply point-to-point ICP
RegistrationResult with fitness=1.000000e+00, inlier_rmse=0.000000e+00, and correspondence_set size of 25187
Access transformation to get result.
Transformation is:
[[1. 0. 0. 0.]
 [0. 1. 0. 0.]
 [0. 0. 1. 0.]
 [0. 0. 0. 1.]]
:: Aligning camera 2 with target pcd
:: Apply point-to-point ICP
RegistrationResult with fitness=5.510258e-01, inlier_rmse=4.091608e-03, and correspondence_set size of 16652
Access transformation to get result.
Transformation is:
[[ 0.99959372  0.02685219  0.00955831 -0.00686438]
 [-0.02671207  0.99953804 -0.01449731  0.04562095]
 [-0.00994318  0.0142361   0.99984922  0.01635733]
 [ 0.          0.          0.          1.        ]]
:: Aligning camera 3 with target pcd
:: Apply point-to-point ICP
RegistrationResult with fitness=8.176254e-01, inlier_rmse=4.230187e-03, and correspondence_set size of 17767
Access transformation to get result.
Transformation is:
[[ 0.99936221 -0.03488579  0.00762536 -0.008

libusb: error [udev_hotplug_event] ignoring udev action change
libusb: error [udev_hotplug_event] ignoring udev action bind


In [7]:
"""Save the results"""
import os, json
from scipy.spatial.transform import Rotation

save_dir = os.path.join("calibration", "finetune_results")

# Save the transform matrices as npz
save_path = os.path.join(save_dir, "camera_alignments.npz")
save_content = {
    str(cam_id): transform for cam_id, transform in transforms.items()
} # Convert cam_id to string to save as npz
np.savez(save_path, **save_content)

# Also save the humann readable transforms as xyz, quat into json
save_content = {}
for cam_id, transform in transforms.items():
    quat = Rotation.from_matrix(transform[:3, :3]).as_quat()
    save_content[cam_id] = {
        "xyz": transform[:3, 3].tolist(),
        "quaternion": quat.tolist()
    }
save_path = os.path.join(save_dir, "camera_alignments.json")
with open(save_path, "w") as f:
    json.dump(save_content, f, indent=2)

## Aligning Base PCD to Ground Truth Plane (Not used)

In [42]:
gt_plane = np.meshgrid(np.arange(0.1, 0.85, 0.005), np.arange(-0.85, 0.25, 0.005))
gt_plane = np.vstack([gt_plane[0].flatten(), gt_plane[1].flatten(), np.zeros_like(gt_plane[0].flatten())]).T
gt_plane_pcd = o3d.geometry.PointCloud(o3d.utility.Vector3dVector(gt_plane))
gt_plane_pcd.estimate_normals(search_param=o3d.geometry.KDTreeSearchParamHybrid(radius=0.1, max_nn=30))

# Obtain individual point clouds
pcds = {}
for cam_id in env.camera_indices:
    pcds[cam_id] = env.get_single_pcd(cam_id)
# Transform all point clouds (using previous alignment)
camera_aligned_pcds = align_pcds(pcds, transforms)

# Visualize point clouds
obs_pcd = copy.deepcopy(camera_aligned_pcds)
draw_registration_result(obs_pcd, gt_plane_pcd, np.identity(4))

# Compute alignments
pcds = {"combined": obs_pcd}
gt_transforms = compute_align_to_target(
    gt_plane_pcd, pcds, threshold=0.01, visualize=False) # Set to True to visualize pairwise alignment

# We only need the z transform and the rotation
base_transform = gt_transforms["combined"]
base_transform[0:2, 3] = 0

# Apply base transform to all other transforms
transformed_pcd = obs_pcd.transform(base_transform)
draw_registration_result(obs_pcd, gt_plane_pcd, base_transform)
# transformed_pcd = obs_pcd.transform(base_transform)
# transformed_pcd.paint_uniform_color([1, 0, 0])
# obs_pcd.paint_uniform_color([0, 0, 1])
# o3d.visualization.draw_geometries([
#     # transformed_pcd,
#     obs_pcd,
#     gt_plane_pcd
#     ])

:: Aligning camera combined with target pcd
:: Apply point-to-point ICP
RegistrationResult with fitness=7.767550e-01, inlier_rmse=3.228349e-03, and correspondence_set size of 60924
Access transformation to get result.
Transformation is:
[[ 9.99995509e-01  9.12183142e-05 -2.99546888e-03  6.22445100e-05]
 [-6.25554319e-06  9.99598015e-01  2.83515316e-02 -1.45830893e-04]
 [ 2.99685093e-03 -2.83513855e-02  9.99593526e-01 -2.15817886e-02]
 [ 0.00000000e+00  0.00000000e+00  0.00000000e+00  1.00000000e+00]]
