In [None]:
import numpy as np
import matplotlib.pyplot as plt
import plotly.graph_objects as go
import cv2 as cv
from tqdm import tqdm
import os
import json

from lac.utils.plotting import plot_path_3d, plot_3d_points, plot_poses
from lac.utils.frames import invert_transform_mat
from lac.util import (
    rmse,
    get_positions_from_poses,
    positions_rmse_from_poses,
    rotations_rmse_from_poses,
)

%load_ext autoreload
%autoreload 2

In [None]:
import symforce

try:
    symforce.set_epsilon_to_symbol()
except symforce.AlreadyUsedEpsilon:
    print("Already set symforce epsilon")
    pass
import symforce.symbolic as sf

from lac.localization.symforce_util import odometry_lander_relpose_fgo
from lac.localization.imu_recovery import (
    recover_rotation,
    recover_rotation_exact,
    recover_translation,
)
from lac.localization.fgo import FactorGraph

# Symforce testing


In [None]:
import copy

p = sf.Pose3()
sf.Pose3(R=p.R, t=p.t)

# Test FGO


In [None]:
# data_path = "../../output/imu_20hz/" + "data_log.json"
data_path = "../../results/runs/map1_seed4_spiral_4.5_2.0_run2/data_log.json"
json_data = json.load(open(f"{data_path}"))

initial_pose = np.array(json_data["initial_pose"])
lander_pose_rover = np.array(json_data["lander_pose_rover"])
lander_pose_world = initial_pose @ lander_pose_rover

poses = [initial_pose]
imu_data = []
for frame in json_data["frames"]:
    poses.append(np.array(frame["pose"]))
    imu_data.append(np.array(frame["imu"]))
imu_data = np.array(imu_data)

gt_translations = np.zeros((len(poses), 3))
for i in range(len(poses)):
    gt_translations[i] = poses[i][:3, 3]

dt = 0.05
times = np.arange(0, len(imu_data) * dt, dt)

In [None]:
from lac.localization.imu_recovery import ImuEstimator

imu_estimator = ImuEstimator(initial_pose)
imu_recovery_poses = [initial_pose]
imu_recovery_deltas = []
gt_pose_deltas = []

for i in tqdm(range(len(imu_data))):
    imu_estimator.update(imu_data[i], exact=False)
    imu_recovery_poses.append(imu_estimator.get_pose())
    imu_recovery_deltas.append(imu_estimator.get_pose_delta())
    gt_pose_deltas.append(poses[i + 1] @ invert_transform_mat(poses[i]))

In [None]:
imu_delta_integrated = []
imu_delta_integrated.append(initial_pose)
for i in range(len(imu_recovery_deltas)):
    imu_delta_integrated.append(imu_delta_integrated[-1] @ imu_recovery_deltas[i])

In [None]:
N = -1
fig = go.Figure()
fig = plot_poses(poses[:N], fig=fig, no_axes=True, color="black", name="Ground truth")
fig = plot_poses(imu_recovery_poses[:N], fig=fig, no_axes=True, color="blue", name="IMU recovery")
fig = plot_poses(
    imu_delta_integrated[:2000], fig=fig, no_axes=True, color="red", name="IMU recovery integrated"
)
fig.update_layout(height=700, width=1200, scene_aspectmode="data")
fig.show()

Simulate lander measurements


In [None]:
lander_position = lander_pose_world[:3, 3]

# Lander range and bearing measurements
lander_pose_measurements = []  # relative pose of lander in rover frame
lander_range_measurements = []
lander_los_measurements = []

for i in range(len(poses)):
    t_i = poses[i][:3, 3]
    delta_t = t_i - lander_position
    # TODO: add noise
    lander_range_measurements.append(np.linalg.norm(delta_t))
    lander_pose_measurements.append(invert_transform_mat(poses[i]) @ lander_pose_world)
    lander_los_measurements.append(-delta_t / np.linalg.norm(delta_t))

In [None]:
ODOM_R_SIGMA = 0.1  # for rotations
ODOM_T_SIGMA = 0.1  # for translations [m]
ODOM_SIGMA = np.ones(6)
ODOM_SIGMA[:3] *= ODOM_R_SIGMA
ODOM_SIGMA[3:] *= ODOM_T_SIGMA

LANDER_ANGLE_SIGMA = 0.0001  # [rad]

LANDER_RELPOSE_SIGMA = 0.01 * np.ones(6)

In [None]:
N_WINDOW = 40
N_SHIFT = 10
N = 10000


def sliding_window_fgo():
    init_poses = imu_recovery_poses[:N_WINDOW]
    fgo_poses = [None] * N
    k_max = (N - N_WINDOW) // N_SHIFT

    for k in (pbar := tqdm(range(k_max))):
        window = slice(N_SHIFT * k, N_SHIFT * k + N_WINDOW)
        odometry = imu_recovery_deltas[window][:-1]
        # odometry = gt_odometry[window][:-1]
        lander_measurements = lander_pose_measurements[window]

        opt_poses, result = odometry_lander_relpose_fgo(
            init_poses,
            lander_pose_world,
            odometry,
            lander_measurements,
            ODOM_SIGMA,
            LANDER_RELPOSE_SIGMA,
            debug=False,
        )
        fgo_poses[N_SHIFT * k : N_SHIFT * (k + 1)] = opt_poses[:N_SHIFT]

        init_poses[:-N_SHIFT] = opt_poses[N_SHIFT:]
        if k != k_max - 1:
            pose = opt_poses[-1]
            for i in range(N_SHIFT):
                init_poses[-N_SHIFT + i] = pose @ imu_recovery_deltas[window][-1]
                pose = init_poses[-N_SHIFT + i]

    return fgo_poses

In [None]:
fgo_poses = sliding_window_fgo()

In [None]:
# all poses not None in fgo_poses
fgo_poses = [pose for pose in fgo_poses if pose is not None]
N = len(fgo_poses)

fig = go.Figure()
fig = plot_poses(poses[:N], fig=fig, no_axes=True, color="black", name="Ground truth")
fig = plot_poses(imu_recovery_poses[:N], fig=fig, no_axes=True, color="blue", name="IMU recovery")
fig = plot_poses(fgo_poses[:N], fig=fig, no_axes=True, color="green", name="FGO")
fig.update_layout(height=700, width=1200, scene_aspectmode="data")
fig.show()

## FGO Class


In [None]:
factor_graph = FactorGraph(initial_pose, ODOM_SIGMA, LANDER_RELPOSE_SIGMA)

In [None]:
N = 1000
UPDATE_RATE = 10

for i in tqdm(range(1, N)):
    factor_graph.add_pose(i)
    factor_graph.add_odometry_factor(i, imu_recovery_deltas[i])
    if i % UPDATE_RATE == 0:
        fgo_poses, result = factor_graph.optimize(window=(i - UPDATE_RATE, i))
# factor_graph.update(0, imu_odometry[0])

In [None]:
type(sf.Pose3())

In [None]:
from lac.localization.symforce_util import copy_pose

In [None]:
result.optimized_values["pose_0"].

In [None]:
factor_graph.factors

In [None]:
type(result.optimized_values["pose_0"])

In [None]:
from lac.localization.fgo import get_poses_from_values

In [None]:
fig = go.Figure()
# fig = plot_poses(poses[:N], fig=fig, no_axes=True, color="black", name="Ground truth")
# fig = plot_poses(imu_recovery_poses[:N], fig=fig, no_axes=True, color="blue", name="IMU recovery")
fig = plot_poses(fgo_poses[:N], fig=fig, no_axes=True, color="green", name="FGO")
fig.update_layout(height=700, width=1200, scene_aspectmode="data")
fig.show()