In [29]:
import os
import os.path as osp
import numpy as np
import numpy.linalg as npla
import matplotlib.pyplot as plt
%matplotlib inline
%matplotlib widget
plt.rcParams.update({
    "text.usetex": True,
    "font.family": "serif",
    "font.serif": ["Times"],
    'font.size': 14,
})

np.set_printoptions(6, suppress=True)

from pylgmath import se3op, Transformation
from pysteam.problem import OptimizationProblem, StaticNoiseModel, L2LossFunc, WeightedLeastSquareCostTerm
from pysteam.solver import GaussNewtonSolver
from pysteam.evaluable.se3 import SE3StateVar, compose_rinv, compose, tran2vec

In [30]:
def rotation_error(T):
    """Calculates a single rotation value corresponding to the upper-left 3x3 rotation matrix.
        Uses axis-angle representation to get a single number for rotation
    Args:
        T (np.ndarray): 4x4 transformation matrix T = [C, r; 0 0 0 1]
    Returns:
        float: rotation
    """
    d = 0.5 * (np.trace(T[0:3, 0:3]) - 1)
    return np.arccos(max(min(d, 1.0), -1.0))


def translation_error(T, dim=3):
    """Calculates a euclidean distance corresponding to the translation vector within a 4x4 transform.
    Args:
        T (np.ndarray): 4x4 transformation matrix T = [C, r; 0 0 0 1]
        dim (int): If dim=2 we only use x,y, otherwise we use all dims.
    Returns:
        float: translation distance
    """
    if dim == 2:
        return np.sqrt(T[0, 3]**2 + T[1, 3]**2)
    return np.sqrt(T[0, 3]**2 + T[1, 3]**2 + T[2, 3]**2)

def get_inverse_tf(T):
    """Returns the inverse of a given 4x4 homogeneous transform.
    Args:
        T (np.ndarray): 4x4 transformation matrix
    Returns:
        np.ndarray: inv(T)
    """
    T2 = T.copy()
    T2[:3, :3] = T2[:3, :3].transpose()
    T2[:3, 3:] = -1 * T2[:3, :3] @ T2[:3, 3:]
    return T2

def trajectory_distances(poses):
    """Calculates path length along the trajectory.
    Args:
        poses (List[np.ndarray]): list of 4x4 poses (T_i_k, 'i' is a fixed reference frame)
    Returns:
        List[float]: distance along the trajectory, increasing as a function of time / list index
    """
    dist = [0]
    for i in range(1, len(poses)):
        P1 = poses[i - 1]
        P2 = poses[i]
        dx = P1[0, 3] - P2[0, 3]
        dy = P1[1, 3] - P2[1, 3]
        dz = P1[2, 3] - P2[2, 3]
        dist.append(dist[i-1] + np.sqrt(dx**2 + dy**2 + dz**2))
    return dist

def last_frame_from_segment_length(dist, first_frame, length):
    """Retrieves the index of the last frame for our current analysis.
        last_frame should be 'dist' meters away from first_frame in terms of distance traveled along the trajectory.
    Args:
        dist (List[float]): distance along the trajectory, increasing as a function of time / list index
        first_frame (int): index of the starting frame for this sequence
        length (float): length of the current segment being evaluated
    Returns:
        last_frame (int): index of the last frame in this segment
    """
    for i in range(first_frame, len(dist)):
        if dist[i] > dist[first_frame] + length:
            return i
    return -1

def get_stats(err, lengths):
    """Computes the average translation and rotation within a sequence (across subsequences of diff lengths).
    Args:
        err (List[Tuple]): each entry in list is [first_frame, r_err, t_err, length, speed]
        lengths (List[int]): list of lengths that odometry is evaluated at
    Returns:
        average translation (%) and rotation (deg/m) errors
    """
    t_err = 0
    r_err = 0
    len2id = {x: i for i, x in enumerate(lengths)}
    t_err_len = [0.0]*len(len2id)
    r_err_len = [0.0]*len(len2id)
    len_count = [0]*len(len2id)
    for e in err:
        t_err += e[2]
        r_err += e[1]
        j = len2id[e[3]]
        t_err_len[j] += e[2]
        r_err_len[j] += e[1]
        len_count[j] += 1
    t_err /= float(len(err))
    r_err /= float(len(err))
    return t_err * 100, r_err * 180 / np.pi, [a/float(b) * 100 for a, b in zip(t_err_len, len_count)], \
        [a/float(b) * 180 / np.pi for a, b in zip(r_err_len, len_count)]

def calc_sequence_errors(poses_gt, poses_pred, dim=3):
    """Calculate the translation and rotation error for each subsequence across several different lengths.
    Args:
        T_gt (List[np.ndarray]): each entry in list is 4x4 transformation matrix, ground truth transforms
        T_pred (List[np.ndarray]): each entry in list is 4x4 transformation matrix, predicted transforms
        step_size (int): step size applied for computing distances travelled
    Returns:
        err (List[Tuple]): each entry in list is [first_frame, r_err, t_err, length, speed]
        lengths (List[int]): list of lengths that odometry is evaluated at
    """
    step_size = 10
    lengths = [100, 200, 300, 400, 500, 600, 700, 800]
    err = []
    # Pre-compute distances from ground truth as reference
    dist = trajectory_distances(poses_gt)

    for first_frame in range(0, len(poses_gt), step_size):
        for length in lengths:
            last_frame = last_frame_from_segment_length(dist, first_frame, length)
            if last_frame == -1:
                continue
            # Compute rotational and translation errors
            pose_delta_gt = get_inverse_tf(poses_gt[last_frame]) @ poses_gt[first_frame]
            pose_delta_res = get_inverse_tf(poses_pred[last_frame]) @ poses_pred[first_frame]
            pose_error = pose_delta_gt @ get_inverse_tf(pose_delta_res)
            if dim == 2:
                pose_error_vec = se3op.tran2vec(pose_error)  # T_gt_pred
                pose_error_vec[2:5] = 0.0  # z and roll, pitch to 0
                pose_error = se3op.vec2tran(pose_error_vec)
            r_err = rotation_error(pose_error)
            t_err = translation_error(pose_error)
            err.append([first_frame, r_err/float(length), t_err/float(length), length])
    return err, lengths

In [31]:
def rpy2rot(y, p, r):
    roll = lambda r: np.array([[1, 0, 0], [0, np.cos(r), np.sin(r)], [0, -np.sin(r), np.cos(r)]], dtype=np.float64)
    pitch = lambda p: np.array([[np.cos(p), 0, -np.sin(p)], [0, 1, 0], [np.sin(p), 0, np.cos(p)]], dtype=np.float64)
    yaw = lambda y: np.array([[np.cos(y), np.sin(y), 0], [-np.sin(y), np.cos(y), 0], [0, 0, 1]], dtype=np.float64)
    return roll(r) @ pitch(p) @ yaw(y)


def convert_line_to_pose(line):
    # returns T_iv
    line = line.replace('\n', ',').split(',')
    line = [float(i) for i in line[:-1]]
    # x, y, z -> 1, 2, 3
    # roll, pitch, yaw -> 7, 8, 9
    T = np.eye(4, dtype=np.float64)
    T[0, 3] = line[1]  # x
    T[1, 3] = line[2]  # y
    T[2, 3] = line[3]  # z
    T[:3, :3] = rpy2rot(line[9], line[8], line[7])

    return T


def load_pred_poses(filename):
    data = np.loadtxt(filename)
    T_s0_sk_list = []
    for v in data:
        T_s0_sk = np.eye(4)
        T_s0_sk[:3, :] = v.reshape(3, 4)
        T_s0_sk_list.append(T_s0_sk)
    return np.array(T_s0_sk_list)


def load_gt_poses(filename):
    with open(filename, 'r') as f:
        lines = f.readlines()
    T_ms_list = []
    for line in lines[1:]:
        pose = convert_line_to_pose(line)
        T_ms_list.append(pose)

    T_s0_m = npla.inv(T_ms_list[0])
    T_s0_sk_list = [T_s0_m @ T_ms for T_ms in T_ms_list]
    return np.array(T_s0_sk_list)


# sequence = 'boreas-2022-08-05-12-59'
sequence = 'boreas-2022-08-05-13-30'

gt_path = '/home/yuchen/ASRL/data/boreas/sequences/'
gt_poses = load_gt_poses(osp.join(gt_path, sequence, 'applanix/aeva_poses.csv'))

pred_path = '/home/yuchen/ASRL/temp/doppler_odometry/boreas/aeva/steam_0804_2_rv'
# pred_path = '/home/yuchen/ASRL/temp/doppler_odometry/boreas/aeva/elastic_reduced'
# pred_path = '/home/yuchen/ASRL/temp/doppler_odometry/boreas/aeva/steam_0804_2_rv_reduced'
pred_poses = load_pred_poses(osp.join(pred_path, sequence+'_poses.txt'))

In [32]:
# 2d
err, path_lengths = calc_sequence_errors(gt_poses, pred_poses, 2)
t_err_2d, r_err_2d, _, _ = get_stats(err, path_lengths)

# 3d
err, path_lengths = calc_sequence_errors(gt_poses, pred_poses)
t_err, r_err, t_err_len, r_err_len = get_stats(err, path_lengths)

print(f"& {t_err_2d:.2f} & {r_err_2d:.4f} & {t_err:.2f} & {r_err:.4f} \\\\")

& 0.40 & 0.0011 & 3.03 & 0.0056 \\


In [33]:
def get_variables1(gt_poses, pred_poses):
  pred_T_s_sp1_var = []
  for i in range(len(pred_poses) - 1):
    tmp = get_inverse_tf(pred_poses[i]) @ pred_poses[i + 1]
    tmp = SE3StateVar(Transformation(T_ba=tmp), locked=True)
    pred_T_s_sp1_var.append(tmp)

  gt_T_s_sp1_var = []
  for i in range(len(gt_poses) - 1):
    tmp = get_inverse_tf(gt_poses[i]) @ gt_poses[i + 1]
    tmp = SE3StateVar(Transformation(T_ba=tmp), locked=True)
    gt_T_s_sp1_var.append(tmp)
  return gt_T_s_sp1_var, pred_T_s_sp1_var


def get_variables(gt_poses, pred_poses):
  step_size = 10
  # lengths = [100, 200, 300, 400, 500, 600, 700, 800]
  lengths = [100]
  # Pre-compute distances from ground truth as reference
  dist = trajectory_distances(gt_poses)

  pred_T_v_vpn, gt_T_v_vpn = [], []

  for first_frame in range(0, len(gt_poses), step_size):
    for length in lengths:
      last_frame = last_frame_from_segment_length(dist, first_frame, length)
      if last_frame == -1:
        continue

      tmp = get_inverse_tf(pred_poses[first_frame]) @ pred_poses[last_frame]
      tmp = SE3StateVar(Transformation(T_ba=tmp), locked=True)
      pred_T_v_vpn.append(tmp)

      tmp = get_inverse_tf(gt_poses[first_frame]) @ gt_poses[last_frame]
      tmp = SE3StateVar(Transformation(T_ba=tmp), locked=True)
      gt_T_v_vpn.append(tmp)

  return gt_T_v_vpn, pred_T_v_vpn

# measurements
gt_T_s_sp1_var, pred_T_s_sp1_var = get_variables(gt_poses, pred_poses)

T_gt_pred = SE3StateVar(Transformation(T_ba=np.eye(4)))

cost_terms = []

noise_model = StaticNoiseModel(np.eye(6))
loss_func = L2LossFunc()
for i in range(len(pred_T_s_sp1_var)):
  est_T_s_sp1 = compose_rinv(compose(T_gt_pred, pred_T_s_sp1_var[i]), T_gt_pred)
  error_func = tran2vec(compose_rinv(est_T_s_sp1, gt_T_s_sp1_var[i]))
  cost_terms.append(WeightedLeastSquareCostTerm(error_func, noise_model, loss_func))

opt_prob = OptimizationProblem()
opt_prob.add_state_var(T_gt_pred)
opt_prob.add_cost_term(*cost_terms)

gauss_newton = GaussNewtonSolver(opt_prob, verbose=True, max_iterations=100)
gauss_newton.optimize()

print(T_gt_pred.value)

Begin Optimization
------------------
Number of States:  1
Number of Cost Terms:  454
Initial Cost:  3514.189615182706
Iteration:    1  -  Cost:  3130.3763
Iteration:    2  -  Cost:  3128.3681
Iteration:    3  -  Cost:  3128.3560
Termination Cause:  CONVERGED RELATIVE CHANGE
[[ 0.999984 -0.002253 -0.005234 -0.370967]
 [ 0.001155  0.979577 -0.201065  0.528083]
 [ 0.00558   0.201055  0.979564  1.467649]
 [ 0.        0.        0.        1.      ]]


In [34]:
for i in range(len(pred_poses)):
  pred_poses[i] = T_gt_pred.value.matrix() @ pred_poses[i] @ npla.inv(T_gt_pred.value.matrix())

# 2d
err, path_lengths = calc_sequence_errors(gt_poses, pred_poses, 2)
t_err_2d, r_err_2d, _, _ = get_stats(err, path_lengths)

# 3d
err, path_lengths = calc_sequence_errors(gt_poses, pred_poses)
t_err, r_err, t_err_len, r_err_len = get_stats(err, path_lengths)

print(f"& {t_err_2d:.2f} & {r_err_2d:.4f} & {t_err:.2f} & {r_err:.4f} \\\\")

& 0.63 & 0.0014 & 4.36 & 0.0090 \\
