# Test plane-based registration
Test different plane-based registration methods

In [None]:
%matplotlib widget
import numpy as np
import os
import time
import plotly.graph_objects as go
from copy import deepcopy

import planeslam.io as io
from planeslam.general import plot_3D_setup, color_legend
from planeslam.scan import pc_to_scan
from planeslam.general import NED_to_ENU, trajectory_plot_trace
from planeslam.geometry.util import quat_to_rot_mat

os.environ['KMP_DUPLICATE_LIB_OK']='True'

%load_ext autoreload
%autoreload 2

In [None]:
np.set_printoptions(suppress=True)

### Load AirSim data

In [None]:
# Read in point cloud data
binpath = os.path.join(os.getcwd(), '..', '..', 'data', 'airsim', 'blocks_60_samples_loop_closure', 'lidar', 'Drone0')
PC_data = io.read_lidar_bin(binpath)

# Read in ground-truth poses (in drone local frame)
posepath = os.path.join(os.getcwd(), '..', '..', 'data', 'airsim', 'blocks_60_samples_loop_closure', 'poses', 'Drone0')
drone_positions, drone_orientations = io.read_poses(posepath)

In [None]:
# Convert to ENU
num_scans = len(PC_data)

for i in range(num_scans):
    PC_data[i] = NED_to_ENU(PC_data[i])

drone_positions = NED_to_ENU(drone_positions)
drone_orientations = NED_to_ENU(drone_orientations)

drone_rotations = np.zeros((3,3,num_scans))
for i in range(num_scans):
    drone_rotations[:,:,i] = quat_to_rot_mat(drone_orientations[i])

In [None]:
# Plot ground-truth trajectory
gt_traj_trace = go.Scatter3d(x=drone_positions[:,0], y=drone_positions[:,1], z=drone_positions[:,2])
fig = go.Figure(data=gt_traj_trace)
fig.update_layout(width=1000, height=600, scene=dict(aspectmode='data'))
fig.show()

In [None]:
data = trajectory_plot_trace(drone_rotations, drone_positions)
fig = go.Figure(data=data)
fig.update_layout(width=1000, height=600, scene=dict(aspectmode='data'))
fig.show()

In [None]:
# Extract scans
num_scans = len(PC_data)
scans = num_scans * [None]
for i in range(num_scans):
    scans[i] = pc_to_scan(PC_data[i])

### Decoupled SVD

In [None]:
# Initialize transformed scans
scans_transformed = num_scans * [None]
for i in range(num_scans):
    scans_transformed[i] = deepcopy(scans[i])

In [None]:
from planeslam.registration import decoupled_register

R_abs = quat_to_rot_mat(drone_orientations[0])
t_abs = drone_positions[0,:].copy()
traj_est = np.zeros((num_scans, 3))
traj_est[0] = t_abs
avg_runtime = 0

for i in range(1, len(scans)):
    start_time = time.time()
    R_hat, t_hat = decoupled_register(scans[i-1], scans[i])
    # (R,t) maps scan 0 to scan 1, (R,-t) maps pose 0 to pose 1
    t_abs -= (R_abs @ t_hat).flatten()
    R_abs = R_hat @ R_abs
    avg_runtime += time.time() - start_time
    traj_est[i] = t_abs
    scans_transformed[i].transform(R_abs, t_abs)

avg_runtime /= len(scans)-1
print("average registration time: ", avg_runtime)

In [None]:
# Plot trajectories
est_traj_trace = go.Scatter3d(x=traj_est[:,0], y=traj_est[:,1], z=traj_est[:,2])
fig = go.Figure(data=[gt_traj_trace, est_traj_trace])
fig.update_layout(width=1500, height=900, scene=dict(aspectmode='data'))
fig.show()

In [None]:
# RMSE error
np.mean(np.linalg.norm(traj_est - drone_positions, axis=1))

Decoupled Gauss Newton

In [None]:
# Initialize transformed scans
scans_transformed = num_scans * [None]
for i in range(num_scans):
    scans_transformed[i] = deepcopy(scans[i])

In [None]:
from planeslam.registration import decoupled_GN_register

R_abs = quat_to_rot_mat(drone_orientations[0])
t_abs = drone_positions[0,:].copy()
traj_est = np.zeros((num_scans, 3))
traj_est[0] = t_abs
traj_Rs = np.zeros((3, 3, num_scans))
avg_runtime = 0

for i in range(1, len(scans)):
    start_time = time.time()
    R_hat, t_hat = decoupled_GN_register(scans[i-1], scans[i])
    # (R_hat,t_hat) maps scan 0 to scan 1, (R_hat,-t_hat) maps pose 0 to pose 1
    t_abs -= (R_abs @ t_hat).flatten()
    R_abs = R_hat @ R_abs
    avg_runtime += time.time() - start_time
    traj_est[i] = t_abs
    traj_Rs[:,:,i] = R_abs.T
    scans_transformed[i].transform(R_abs.T, t_abs)

avg_runtime /= len(scans)-1
print("average registration time: ", avg_runtime)

In [None]:
traj_est

In [None]:
est_traj = trajectory_plot_trace(traj_Rs, traj_est)
gt_traj = trajectory_plot_trace(drone_rotations, drone_positions, color="blue")
fig = go.Figure(data=(est_traj + gt_traj))
fig.update_layout(width=1500, height=900, scene=dict(aspectmode='data'))
fig.show()

In [None]:
drone_positions - traj_est

In [None]:
# Plot trajectories
est_traj_trace = go.Scatter3d(x=traj_est[:,0], y=traj_est[:,1], z=traj_est[:,2])
fig = go.Figure(data=[gt_traj_trace, est_traj_trace])
fig.update_layout(width=1500, height=900, scene=dict(aspectmode='data'))
fig.show()

In [None]:
# RMSE error
np.mean(np.linalg.norm(traj_est - drone_positions, axis=1))

In [None]:
R_hat, t_hat = decoupled_GN_register(scans[11], scans[12])

In [None]:
R_hat

In [None]:
t_hat

Merging 

In [None]:
# Plot scans
fig = go.Figure()

for i, scan in enumerate(scans_transformed):
    for t in scan.plot_trace():
        fig.add_trace(t)

fig.update_layout(width=1000, height=600, scene=dict(aspectmode='data'))
fig.show()

In [None]:
merged = scans_transformed[0]

for s in scans_transformed[1:]:
    merged = merged.merge(s, dist_thresh=7.5)
    merged.reduce_inside(p2p_dist_thresh=5)
    merged.remove_small_planes()
    merged.fuse_edges()

In [None]:
# Plot merge
fig = go.Figure(data=merged.plot_trace())
fig.update_layout(width=1500, height=900, scene=dict(aspectmode='data'))
fig.show()

In [None]:
from plotly.subplots import make_subplots
from planeslam.registration import get_correspondences

# Plot 2 scans
source = scans_transformed[40]
target = merged

fig = make_subplots(rows=1, cols=2, specs=[[{'type': 'surface'}, {'type': 'surface'}]])

for t in source.plot_trace(show_normals=True):
    fig.add_trace(t, row=1, col=1)

for t in target.plot_trace(show_normals=True):
    fig.add_trace(t, row=1, col=2)

fig.update_layout(width=1600, height=600, scene=dict(aspectmode='data'), scene2=dict(aspectmode='data'))
fig.show()

correspondences = get_correspondences(source, target)
print(correspondences)

In [None]:
from planeslam.registration import decoupled_GN_register

R_hat, t_hat = decoupled_GN_register(source, target)

print(R_hat)
print(t_hat)

Test with velodyne

In [None]:
pcpath = os.path.join(os.getcwd(),'..', '..', 'data', 'velodyne', '8_5_2022', 'flightroom', 'test')

In [None]:
PCs = []
for i in range(200, 400, 5):  # ignore first 200 and last 200 frames
    filename = pcpath+'\pc_'+str(i)+'.npy'
    PC = np.load(filename)
    PCs.append(PC)

In [None]:
# Extract scans
num_scans = len(PCs)
scans = num_scans * [None]
scans_transformed = num_scans * [None]
for i in range(num_scans):
    scans[i] = pc_to_scan(PCs[i], ds_rate=5)

In [None]:
from planeslam.registration import decoupled_GN_register

R_abs = quat_to_rot_mat(drone_orientations[0])
t_abs = drone_positions[0,:].copy()
traj_est = np.zeros((num_scans, 3))
traj_est[0] = t_abs
avg_runtime = 0

for i in range(1, len(scans)):
    start_time = time.time()
    R_hat, t_hat = decoupled_GN_register(scans[i-1], scans[i])
    # (R_hat,t_hat) maps scan 0 to scan 1, (R_hat,-t_hat) maps pose 0 to pose 1
    t_abs -= (R_abs @ t_hat).flatten()
    R_abs = R_hat @ R_abs
    avg_runtime += time.time() - start_time
    traj_est[i] = t_abs
    scans_transformed[i].transform(R_abs, t_abs)

avg_runtime /= len(scans)-1
print("average registration time: ", avg_runtime)

In [None]:
# Plot trajectories
est_traj_trace = go.Scatter3d(x=traj_est[:,0], y=traj_est[:,1], z=traj_est[:,2])
fig = go.Figure(data=[est_traj_trace])
fig.update_layout(width=1500, height=900, scene=dict(aspectmode='data'))
fig.show()