Test Place Recognition and Hierarchical Localization on the ITLP-Campus dataset using `opr.pipelines`

In [None]:
import itertools
import shutil
from pathlib import Path

import faiss
import numpy as np
import torch
from torch.utils.data import DataLoader
from tqdm import tqdm

from opr.datasets.itlp import ITLPCampus
from opr.models.place_recognition import MinkLoc3D
from opr.pipelines.place_recognition import PlaceRecognitionPipeline


# Outdoor

## MinkLoc3D

### Prepare databases

In [None]:
TRACK_LIST = [
    "00_2023-10-25-night",
    "01_2023-11-09-twilight",
]

WEIGHTS_PATH = "/home/docker_opr/OpenPlaceRecognition/weights/place_recognition/best_soc_oriented.pth"


In [None]:
!pwd

In [None]:
from hydra.experimental import compose, initialize

initialize(config_path="../configs")

In [None]:
from hydra.utils import instantiate
dataloaders = {}

for track in TRACK_LIST:
    cfg = compose("train_soc.yaml", overrides=[f"dataset.dataset_root=/home/docker_opr/Datasets/MIPT_campus/indoor/{track}"])
    dataset = instantiate(cfg.dataset, subset="test", csv_file="track.csv")
    dataloaders[track] = DataLoader(
        dataset, batch_size=16, shuffle=False, num_workers=4, collate_fn=dataset.collate_fn
    )


In [None]:
from opr.models.place_recognition.soc import SOCMLPMixer
model = instantiate(cfg.model)
model.load_state_dict(torch.load(WEIGHTS_PATH)["model_state_dict"])
model = model.to("cuda")
model.eval()


In [None]:
for track in TRACK_LIST:
    descriptors = []
    with torch.no_grad():
        for batch in tqdm(dataloaders[track]):
            batch = {k: v.to("cuda") for k, v in batch.items()}
            final_descriptor = model(batch)["final_descriptor"]
            descriptors.append(final_descriptor.detach().cpu().numpy())
    descriptors = np.concatenate(descriptors, axis=0)

    index = faiss.IndexFlatL2(descriptors.shape[1])
    index.add(descriptors)
    Path(f"/home/docker_opr/Datasets/MIPT_campus/indoor/databases/{track}_or").mkdir(
        parents=True, exist_ok=True
    )
    faiss.write_index(
        index, f"/home/docker_opr/Datasets/MIPT_campus/indoor/databases/{track}_or/index.faiss"
    )

    shutil.copy(
        f"/home/docker_opr/Datasets/MIPT_campus/indoor/{track}/track.csv",
        f"/home/docker_opr/Datasets/MIPT_campus/indoor/databases/{track}_or/track.csv",
    )


In [None]:
shutil.copy(
        f"/home/docker_opr/Datasets/MIPT_campus/indoor/{track}/track.csv",
        f"/home/docker_opr/Datasets/MIPT_campus/indoor/databases/{track}_or/track.csv",
    )

# Test PlaceRecognitionPipeline

In [None]:
from typing import Tuple
import numpy as np
from scipy.spatial.transform import Rotation

def pose_to_matrix(pose):
    """From the 6D poses in the [tx ty tz qx qy qz qw] format to 4x4 pose matrices."""
    position = pose[:3]
    orientation_quat = pose[3:]
    rotation = Rotation.from_quat(orientation_quat)
    pose_matrix = np.eye(4)
    pose_matrix[:3,:3] = rotation.as_matrix()
    pose_matrix[:3,3] = position
    return pose_matrix

def compute_error(estimated_pose, gt_pose):
    """For the 6D poses in the [tx ty tz qx qy qz qw] format."""
    estimated_pose = pose_to_matrix(estimated_pose)
    gt_pose = pose_to_matrix(gt_pose)
    error_pose = np.linalg.inv(estimated_pose) @ gt_pose
    dist_error = np.sum(error_pose[:3, 3]**2) ** 0.5
    r = Rotation.from_matrix(error_pose[:3, :3])
    rotvec = r.as_rotvec()
    angle_error = (np.sum(rotvec**2)**0.5) * 180 / np.pi
    angle_error = abs(90 - abs(angle_error-90))
    return dist_error, angle_error

def compute_translation_error(gt_pose, pred_pose):
    """For the 4x4 pose matrices."""
    gt_trans = gt_pose[:3, 3]
    pred_trans = pred_pose[:3, 3]
    error = np.linalg.norm(gt_trans - pred_trans)
    return error

def compute_rotation_error(gt_pose, pred_pose):
    """For the 4x4 pose matrices."""
    gt_rot = Rotation.from_matrix(gt_pose[:3, :3])
    pred_rot = Rotation.from_matrix(pred_pose[:3, :3])
    error = Rotation.inv(gt_rot) * pred_rot
    error = error.as_euler('xyz', degrees=True)
    error = np.linalg.norm(error)
    return error

def compute_absolute_pose_error(gt_pose, pred_pose):
    """For the 4x4 pose matrices."""
    rotation_error = compute_rotation_error(gt_pose, pred_pose)
    translation_error = compute_translation_error(gt_pose, pred_pose)
    return rotation_error, translation_error

In [None]:
from geotransformer.utils.registration import compute_registration_error

In [None]:
from time import time

In [None]:
ij_permutations = list(itertools.permutations(range(len(TRACK_LIST)), 2))

median_dist_errors = []
median_angle_errors = []
mean_dist_errors = []
mean_angle_errors = []
pr_matches = []
PR_MATCH_THRESHOLD = 25.0
times = []

for i, j in tqdm(ij_permutations[:1], position=0):
    local_dist_errors = []
    local_angle_errors = []
    database = TRACK_LIST[i]
    query = TRACK_LIST[j]
    cfg = compose("train_soc.yaml", overrides=[f"dataset.dataset_root=/home/docker_opr/Datasets/MIPT_campus/indoor/{TRACK_LIST[j]}"])

    pipeline = PlaceRecognitionPipeline(
        database_dir=f"/home/docker_opr/Datasets/MIPT_campus/indoor/databases/{database}_or",
        model=instantiate(cfg.model),
        model_weights_path=WEIGHTS_PATH,
        device="cuda",
    )

    query_dataset = instantiate(cfg.dataset, subset="test", csv_file="track.csv")

    for sample in tqdm(query_dataset, position=1):
        start_time = time()
        out = pipeline.infer(sample)
        times.append(time() - start_time)
        
        dist_error, angle_error = compute_error(sample["pose"].numpy(), out["pose"])
        local_dist_errors.append(dist_error)
        local_angle_errors.append(angle_error)
        estimated_pose = pose_to_matrix(out["pose"])
        gt_pose = pose_to_matrix(sample["pose"].numpy())
        _, db_match_distance = compute_registration_error(gt_pose, estimated_pose)
        pr_matched = db_match_distance <= PR_MATCH_THRESHOLD
        pr_matches.append(pr_matched)


    median_dist_errors.append(np.median(local_dist_errors))
    median_angle_errors.append(np.median(local_angle_errors))
    mean_dist_errors.append(np.mean(local_dist_errors))
    mean_angle_errors.append(np.mean(local_angle_errors))


In [None]:
print(f"PlaceRecognition R@1 = {np.mean(pr_matches):0.3f}")

print(f"Mean Time = {(np.mean(times) * 1000):0.2f} ms")

In [None]:
median_dist_errors, mean_dist_errors


In [None]:
median_angle_errors, mean_angle_errors
