# Minimize L1 loss between closest points

Load data

In [1]:
import rosbag
import rospy

bag = rosbag.Bag("/home/harry7557558/fast_lio_ws/bag/utias/2024-02-21-13-06-52.bag")

messages = {}
for topic, msg, msg_t in bag.read_messages():
    if topic not in messages:
        messages[topic] = []
    messages[topic].append(msg)

livox_imu = messages['/livox/imu']
livox_lidar = messages['/livox/lidar']

bag.close()

print(len(livox_imu))
print(len(livox_lidar))


2158
108


Interpolate IMU data

In [2]:
import numpy as np
from scipy.interpolate import CubicSpline

angular_velocities = np.array([[msg.angular_velocity.x, msg.angular_velocity.y, msg.angular_velocity.z] for msg in livox_imu])
linear_accelerations = 9.81 * np.array([[msg.linear_acceleration.x, msg.linear_acceleration.y, msg.linear_acceleration.z] for msg in livox_imu])

# Extract timestamps from IMU messages
timestamps_raw = np.array([msg.header.stamp.to_sec() for msg in livox_imu])
timestamps = timestamps_raw - timestamps_raw[0]

# Create cubic spline interpolations for angular velocity and linear acceleration
angular_interp = CubicSpline(timestamps, angular_velocities, axis=0)
linear_interp = CubicSpline(timestamps, linear_accelerations, axis=0)

Downsample point cloud

In [3]:
from plyfile import PlyData, PlyElement
import point_cloud_utils as pcu


def downsample_point_cloud(pc):

    # H = np.dot(pc.T, pc) / pc.shape[0]
    # U, S, Vt = np.linalg.svd(H)
    # print(S**0.5)

    bbox_size = np.amax(pc, 0) - np.amin(pc, 0)
    size = np.prod(bbox_size)**(1/3)

    sizeof_voxel = bbox_size / (8*size)

    return pcu.downsample_point_cloud_on_voxel_grid(sizeof_voxel, pc)


def write_point_cloud(points, filename):
    vertex = np.array([(x, y, z) for x, y, z in points], dtype=[('x', 'f4'), ('y', 'f4'), ('z', 'f4')])
    vertex_element = PlyElement.describe(vertex, 'vertex')
    PlyData([vertex_element], text=False).write(filename)


def get_cloud_time(lidar_msg):
    points = sorted(lidar_msg.points, key=lambda _: _.offset_time)
    time_start = lidar_msg.header.stamp.to_sec()
    # points = [p for p in points if timestamps_raw[0]<=time_start+1e-9*p.offset_time<=timestamps_raw[-1]]
    times = 1e-9 * np.array([p.offset_time for p in points])
    return time_start + times - timestamps_raw[0]


def get_cloud(lidar_msg, id=-1):
    points = sorted(lidar_msg.points, key=lambda _: _.offset_time)
    time_start = lidar_msg.header.stamp.to_sec()
    # points = [p for p in points if timestamps_raw[0]<=time_start+1e-9*p.offset_time<=timestamps_raw[-1]]
    points = np.array([(p.x, p.y, p.z) for p in points])
    # print(times.shape)
    # print(points.shape)
    points_downsampled = downsample_point_cloud(points)
    # print(points_downsampled.shape)
    if id >= 0:
        write_point_cloud(points, "{:04d}-raw.ply".format(id))
        write_point_cloud(points_downsampled, "{:04d}-downsampled.ply".format(id))
    return points_downsampled

pcl1 = get_cloud(livox_lidar[0], 0)
pcl1_times = get_cloud_time(livox_lidar[0])
pcl2 = get_cloud(livox_lidar[40], 40)
pcl2_times = get_cloud_time(livox_lidar[40])
pcl3 = get_cloud(livox_lidar[100], 100)
pcl3_times = get_cloud_time(livox_lidar[100])

# print(livox_imu[0], 0)
# print(livox_imu[0], 10)

Point cloud matching robust to outliers, minimize sum of L1 losses

In [4]:
from scipy.spatial import KDTree
import scipy.optimize

4x4 affine matrix version

In [5]:
def decode_T(T):
    T = T.reshape((4, 4))
    U, S, Vt = np.linalg.svd(T[:3, :3])
    R = np.dot(U, Vt)
    t = T.T[3, :3]
    return R, t

def decode_T_jacobian(T):
    epsilon = 1e-6
    R, t = decode_T(T)
    num_rows_R, num_cols_R = R.shape
    num_rows_t, = t.shape
    num_dims_T = len(T)
    jacobian_R = np.zeros((num_rows_R, num_cols_R, num_dims_T))
    jacobian_t = np.zeros((num_rows_t, num_dims_T))
    for i in range(num_dims_T):
        delta_T = np.zeros_like(T)
        delta_T[i] = epsilon
        R_plus, t_plus = decode_T(T + delta_T)
        R_minus, t_minus = decode_T(T - delta_T)
        jacobian_R[:, :, i] = (R_plus - R_minus) / (2 * epsilon)
        jacobian_t[:, i] = (t_plus - t_minus) / (2 * epsilon)
    return jacobian_R, jacobian_t


def minl1_affine(pc1, pc2, T_guess=np.identity(4)):
    T = np.array(T_guess)

    kdtree_pc1 = KDTree(pc1)

    def fun(T):
        R, t = decode_T(T)
        dRdt, dtdT = decode_T_jacobian(T)
        pc2_transformed = np.dot(pc2, R.T) + t
        g_pc2_transformed = np.dot(pc2, dRdt) + dtdT
        distances, indices = kdtree_pc1.query(pc2_transformed)
        g_distances = np.einsum('nij,ni->nj', g_pc2_transformed, pc2_transformed-pc1[indices]) / \
                (distances.reshape((len(distances), 1)) + 1e-8)
        cost = np.mean(distances)
        g_cost = np.mean(g_distances, axis=0)
        # print(cost)
        return cost, g_cost
    
    # print(scipy.optimize.check_grad(lambda _: fun(_)[0], lambda _: fun(_)[1], T.reshape((-1,))))

    res = scipy.optimize.minimize(fun, T.reshape((-1,)), jac=True)
    print('success', res.success, 'nit', res.nit, 'nfev', res.nfev)
    R, t = decode_T(res.x)
    T[:3, :3] = R
    T[:3, 3] = t
    pc2_transformed = np.concatenate([pc2, np.ones((len(pc2), 1))], axis=1)
    pc2_transformed = np.dot(T, pc2_transformed.T).T[:, :3]
    return pc2_transformed, T


from time import perf_counter
t0 = perf_counter()
pc2_transformed, T = minl1_affine(pcl1, pcl2)
t1 = perf_counter()
print(t1-t0, "secs")

write_point_cloud(pc2_transformed, "minl1_affine.ply")

success True nit 29 nfev 39
0.2572471029998269 secs


so3 exponential map + translation

In [6]:
def decode_so3t(T):
    phi = T[:3]
    t = T[3:6]
    theta = np.linalg.norm(phi)
    R = np.eye(3)
    if theta != 0.0:
        n = phi / theta
        nnT = np.outer(n, n)
        n_star = np.array([[0.0, -n[2], n[1]], [n[2], 0.0, -n[0]], [-n[1], n[0], 0.0]])
        R = np.cos(theta) * R + \
            (1.0-np.cos(theta)) * nnT + \
            np.sin(theta) * n_star
    assert np.linalg.norm(R@R.T-np.eye(3)) < 1e-12
    return R, t


def minl1_so3t(pc1, pc2, T_guess=np.zeros(6)):
    T = np.array(T_guess)

    kdtree_pc1 = KDTree(pc1)

    def fun(T):
        R, t = decode_so3t(T)
        Rp = np.dot(pc2, R.T)
        pc2_transformed = Rp + t
        g_pc2_transformed = np.zeros((*Rp.shape, len(T)), dtype=Rp.dtype)
        g_pc2_transformed[:, 0, 1] = Rp[:, 2]
        g_pc2_transformed[:, 0, 2] = -Rp[:, 1]
        g_pc2_transformed[:, 1, 0] = -Rp[:, 2]
        g_pc2_transformed[:, 1, 2] = Rp[:, 0]
        g_pc2_transformed[:, 2, 0] = Rp[:, 1]
        g_pc2_transformed[:, 2, 1] = -Rp[:, 0]
        g_pc2_transformed[:, 0, 3] = 1.0
        g_pc2_transformed[:, 1, 4] = 1.0
        g_pc2_transformed[:, 2, 5] = 1.0
        distances, indices = kdtree_pc1.query(pc2_transformed)
        g_distances = np.einsum('nij,ni->nj', g_pc2_transformed, pc2_transformed-pc1[indices]) / \
                (distances.reshape((len(distances), 1)) + 1e-8)
        cost = np.mean(distances)
        g_cost = np.mean(g_distances, axis=0)
        # print(cost)
        return cost, g_cost

    # print(scipy.optimize.check_grad(lambda _: fun(_)[0], lambda _: fun(_)[1], T))
    # assert False

    res = scipy.optimize.minimize(fun, T, jac=True)
    print('success', res.success, 'nit', res.nit, 'nfev', res.nfev)
    R, t = decode_so3t(res.x)
    pc2_transformed = np.dot(pc2, R.T) + t
    return pc2_transformed, res.x


from time import perf_counter
t0 = perf_counter()
pc2_transformed, T = minl1_so3t(pcl1, pcl2)
t1 = perf_counter()
print(t1-t0, "secs")

write_point_cloud(pc2_transformed, "minl1_so3t.ply")

success True nit 24 nfev 35
0.13971671300168964 secs


se3 exponential map, naive gradient

In [7]:
def decode_se3(T):
    rho = T[0:3]
    phi = T[3:6]
    theta = max(np.linalg.norm(phi), 1e-12)
    n = phi / theta
    nnT = np.outer(n, n)
    n_star = np.array([[0.0, -n[2], n[1]], [n[2], 0.0, -n[0]], [-n[1], n[0], 0.0]])
    C = np.cos(theta) * np.eye(3) + \
        (1.0-np.cos(theta)) * nnT + \
        np.sin(theta) * n_star
    J = np.sin(theta)/theta * np.eye(3) + \
        (1.0-np.sin(theta)/theta) * nnT + \
        (1.0-np.cos(theta))/theta * n_star
    assert np.linalg.norm(C@C.T-np.eye(3)) < 1e-12
    # gamma = 2.0*(1.0-np.cos(theta))/theta**2
    # assert np.linalg.norm(J@J.T-(gamma*np.eye(3)+(1.0-gamma)*nnT)) < 1e-10
    dJdtheta = (theta*np.cos(theta)-np.sin(theta))/theta**2 * (np.eye(3)-nnT) + \
        (theta*np.sin(theta)+np.cos(theta)-1)/theta**2 * n_star
    dnnTdn = np.array([
        [[2*n[0], 0, 0], [n[1], n[0], 0], [n[2], 0, n[0]]],
        [[n[1], n[0], 0], [0, 2*n[1], 0], [0, n[2], n[1]]],
        [[n[2], 0, n[0]], [0, n[2], n[1]], [0, 0, 2*n[2]]]
    ])
    dnsdn = np.array([
        [[0, 0, 0], [0, 0, -1], [0, 1, 0]],
        [[0, 0, 1], [0, 0, 0], [-1, 0, 0]],
        [[0, -1, 0], [1, 0, 0], [0, 0, 0]]
    ], dtype=n.dtype)
    dJdn = (1.0-np.sin(theta)/theta) * dnnTdn + \
        (1.0-np.cos(theta))/theta * dnsdn
    dJdphi = np.tensordot(dJdtheta, n, axes=0) + \
        np.matmul(dJdn, (np.eye(3)-nnT)/theta)
    dvdphi = (dJdphi*rho.reshape(1,len(rho),1)).sum(axis=1)
    return C, J, J@rho, dvdphi


def minl1_se3_naive(pc1, pc2, T_guess=np.zeros(6)):
    T = np.array(T_guess)

    kdtree_pc1 = KDTree(pc1)

    def fun(T):
        C, J, r, dvdphi = decode_se3(T)
        Cv = np.dot(pc2, C.T)
        pc2_transformed = Cv + r
        g_pc2_transformed = np.zeros((*pc2.shape, len(T)), dtype=pc2.dtype)
        for i in range(3):
            g_pc2_transformed[:,0,3+i] = Cv[:,2]*J[1][i]-Cv[:,1]*J[2][i]
            g_pc2_transformed[:,1,3+i] = Cv[:,0]*J[2][i]-Cv[:,2]*J[0][i]
            g_pc2_transformed[:,2,3+i] = Cv[:,1]*J[0][i]-Cv[:,0]*J[1][i]
        g_pc2_transformed[:,:,3:] += dvdphi
        g_pc2_transformed[:,:,:3] = J
        distances, indices = kdtree_pc1.query(pc2_transformed)
        g_distances = np.einsum('nij,ni->nj', g_pc2_transformed, pc2_transformed-pc1[indices]) / \
                (distances.reshape((len(distances), 1)) + 1e-8)
        cost = np.mean(distances)
        g_cost = np.mean(g_distances, axis=0)
        # print(cost)
        return cost, g_cost

    # print(scipy.optimize.check_grad(lambda _: fun(_)[0], lambda _: fun(_)[1], T))
    # assert False

    res = scipy.optimize.minimize(fun, T, jac=True)
    print('success', res.success, 'nit', res.nit, 'nfev', res.nfev)
    R, _1, t, _2 = decode_se3(res.x)
    pc2_transformed = np.dot(pc2, R.T) + t
    return pc2_transformed, res.x


from time import perf_counter
t0 = perf_counter()
# T = np.array([-1, 2, -3, 0.2, 0.1, 0.3])
T = np.array([0, 0, 0, 0, 0, 0])
pc2_transformed, T = minl1_se3_naive(pcl1, pcl2, T)
t1 = perf_counter()
print(t1-t0, "secs")

write_point_cloud(pc2_transformed, "minl1_se3_naive.ply")

success True nit 25 nfev 39
0.16239492000022437 secs


try it on a bunch of point clouds

In [8]:
pcls = [
    get_cloud(livox_lidar[0], -1)
]
all_pcl = pcls[-1]
T = np.zeros(6)
minl1 = minl1_so3t
for i in range(1, 80, 10):
    pcl = get_cloud(livox_lidar[i])
    # pcl_transformed, T = minl1(pcls[-1], pcl, T)
    pcl_transformed, T = minl1(all_pcl, pcl, T)
    pcls.append(pcl_transformed)
    all_pcl = np.concatenate((all_pcl, pcl_transformed))
print(all_pcl.shape)
write_point_cloud(all_pcl, "minl1.ply")

success True nit 19 nfev 23
success True nit 16 nfev 26
success True nit 37 nfev 47
success True nit 28 nfev 38
success True nit 43 nfev 55
success True nit 42 nfev 55
success True nit 49 nfev 68
success True nit 57 nfev 77
(52150, 3)
