## Batch Evaluation

In [1]:
import numpy as np
from scipy.spatial.transform import Rotation as R

from evo.core.trajectory import PoseTrajectory3D
from evo.tools import file_interface
import os

def read_ref_to_tum(path_ref, path_stamps):
    stamps = np.loadtxt(path_stamps, dtype = np.float64)

    with open(path_ref, "r") as f:
        lines = f.readlines()

    xyz = np.zeros((len(lines), 3), dtype = np.float64)
    quat = np.zeros((len(lines), 4), dtype = np.float64)

    for i , line in enumerate(lines):
        Tcw = np.array(list(map(float, line.split()))).reshape(4, 4)
        # Tcw[:3, 1] *= -1
        # Tcw[:3, 2] *= -1
        
        xyz[i, :] = Tcw[0:3, 3]
        
        rot = R.from_matrix(Tcw[0:3, 0:3])
        quat[i, :] = rot.as_quat()
    
    return PoseTrajectory3D(xyz, quat, stamps)

# load ref
def load_traj_ref_dict(dir, path_stamps): 
    traj_ref_dict = {}
    for filename in os.listdir(dir): 
        traj_ref_dict[filename[:-4]] = read_ref_to_tum(os.path.join(dir, filename), path_stamps)
    return traj_ref_dict


ModuleNotFoundError: No module named 'evo'

In [2]:
# Batch error computation

from evo.core import sync
from evo.core import metrics

def compute_ape(dir_est, traj_ref_dict, max_diff = 0.01): 
    ape_dict = {}
    for filename in os.listdir(dir_est): 
        abs_path = os.path.join(dir_est, filename)
        if not os.path.isfile(abs_path): 
            continue 
        if not os.path.getsize(abs_path):
            continue

        traj_est = file_interface.read_tum_trajectory_file(abs_path)

        if traj_est.num_poses < 3:
            continue

        traj_ref = None
        if filename[:4] == 'room': 
            traj_ref = traj_ref_dict[filename[:5]]
        elif filename[:6] == 'office': 
            traj_ref = traj_ref_dict[filename[:7]]
        else: 
            print('file name not recognized')
        
        # synchronize, align and scale
        traj_ref, traj_est = sync.associate_trajectories(traj_ref, traj_est, max_diff)
        traj_est.align(traj_ref, correct_scale=True, correct_only_scale=False)

        # create metric relation and data
        pose_relation = metrics.PoseRelation.translation_part
        data = (traj_ref, traj_est)

        # process APE
        ape_metric = metrics.APE(pose_relation)
        ape_metric.process_data(data)

        ape_stat = ape_metric.get_statistic(metrics.StatisticsType.rmse)
        ape_dict[filename[:-4]] = ape_stat

    return ape_dict


def compute_rpe(dir_est, traj_ref_dict, max_diff = 0.01): 
    rpe_dict = {}
    for filename in os.listdir(dir_est): 
        abs_path = os.path.join(dir_est, filename)
        if not os.path.isfile(abs_path): 
            continue 
        if not os.path.getsize(abs_path):
            continue

        traj_est = file_interface.read_tum_trajectory_file(abs_path)

        if traj_est.num_poses < 3:
            continue

        traj_ref = None
        if filename[:4] == 'room': 
            traj_ref = traj_ref_dict[filename[:5]]
        elif filename[:6] == 'office': 
            traj_ref = traj_ref_dict[filename[:7]]
        else: 
            print('file name not recognized')
        
        # synchronize, align and scale
        traj_ref, traj_est = sync.associate_trajectories(traj_ref, traj_est, max_diff)

        pose_relation = metrics.PoseRelation.rotation_angle_rad

        # normal mode
        delta = 1

        # all pairs mode
        all_pairs = False  # activate

        data = (traj_ref, traj_est)

        rpe_metric = metrics.RPE(pose_relation=pose_relation, delta=delta, all_pairs=all_pairs)
        rpe_metric.process_data(data)

        rpe_stat = rpe_metric.get_statistic(metrics.StatisticsType.rmse)
        rpe_dict[filename[:-4]] = rpe_stat

    return rpe_dict


In [7]:
# Compute APE and RPE for each trajectory

dir_ref = "/media/huanglab/HuangLab10TB/slam_team/dataset/Replica/ref/"
path_stamps = "/media/huanglab/HuangLab10TB/slam_team/dataset/Replica/timestamp_20hz.txt"
traj_ref_dict = load_traj_ref_dict(dir_ref, path_stamps)


dir_est = "/media/huanglab/HuangLab10TB/slam_team/sibo/results_replica/sensor_misalign/static/"
ape_dict = compute_ape(dir_est, traj_ref_dict)
rpe_dict = compute_rpe(dir_est, traj_ref_dict)


In [8]:
# Compute Avergae KPs and MPs for each trajectory

def store_kps(dir_keypoints): 
    kp_dict = {}
    for filename in os.listdir(dir_keypoints): 
        abs_path = os.path.join(dir_keypoints, filename)

        with open(abs_path, 'r') as f: 
            kp_dict[filename[:-10]] = [int(line) for line in f]
        
    return kp_dict

dir_keypoints = "/media/huanglab/HuangLab10TB/slam_team/sibo/results_replica/sensor_misalign/static/keypoints/"
kp_dict = store_kps(dir_keypoints)


In [9]:
# Save results, with list of ALL settings regardles of having results

def save_all_val_with_null(ape_dict, rpe_dict, kp_dict): 
    lines = []
    for to_add in [5, 10, 20]: 
        ape_total = 0
        # office 
        for n in range(5):
            seq = "office" + str(n)
            key = '_'.join([seq, 'k' + str(to_add)])

            line = []
            line.append(seq)
            line.append(to_add)

            if key in ape_dict:
                line.append(ape_dict[key])
                ape_total += ape_dict[key]
            else:
                line.append(1)
                ape_total += 1
            
            if key in rpe_dict:
                line.append(ape_dict[key])
            else: 
                line.append(1)

            if key in kp_dict and key in ape_dict:
                for val in kp_dict[key]: 
                    line.append(val)
            
            lines.append(line)
        
        # room 
        for n in range(3):
            seq = "room" + str(n)
            key = '_'.join([seq, 'k' + str(to_add)])

            line = []
            line.append(seq)
            line.append(to_add)

            if key in ape_dict:
                line.append(ape_dict[key])
                ape_total += ape_dict[key]
            else:
                line.append(1)
                ape_total += 1
            
            if key in rpe_dict:
                line.append(ape_dict[key])
            else: 
                line.append(1)

            if key in kp_dict and key in ape_dict:
                for val in kp_dict[key]: 
                    line.append(val)
            
            lines.append(line)
        
        print('k' + str(to_add), ape_total / 8)
    
    with open("sensor_misalign_static.txt", 'w') as f:
        f.write('Sequence,Level,APE,RPE,Ave KF KPs,Ave KF MPs\n')
        for line in lines: 
            f.write(','.join(str(i) for i in line))
            f.write('\n')

save_all_val_with_null(ape_dict, rpe_dict, kp_dict)