In [3]:
%load_ext autoreload
%autoreload 2

from pathlib import Path
import torch
import yaml
import sys
from torchmetrics import MetricCollection
from omegaconf import OmegaConf
import matplotlib.pyplot as plt
import matplotlib as mpl
import numpy as np
from pytorch_lightning import seed_everything
from pytorch_lightning.utilities import move_data_to_device
sys.path.append('fbsource/fbcode/surreal/')
from maploc.data import KittiDataModule
from maploc.module import GenericModule
from maploc.osm.viz import Colormap, plot_nodes
from maploc.test_kitti import evaluate_single_image
from maploc.utils.viz_localization import likelihood_overlay, plot_dense_rotations
from maploc.osm.viz import plot_nodes
from maploc.utils.viz_localization import plot_pose
from maploc.utils.viz_2d import plot_images, plot_keypoints, features_to_RGB, save_plot, add_text
from maploc.models.hough_voting import expectation_xyr
colormap = Colormap()
torch.set_grad_enabled(False);
plt.rcParams.update({'figure.max_open_warning': 0})

from pprint import pprint
from maploc import eval_kitti
pprint(eval_kitti.experiments)

# Run the full evaluation

Single image

In [None]:
# manually specify the experiment and a config coverride
cfg = {'model': {"num_rotations": 256, "apply_map_prior": False}}
exper = "bev1-osm2-mly13-n100_res101-vgg19_depth-bins33-nonorm-1to9_bs9-resize512_d8-nrot64-normvalid-prior-rep"  # best MLY
exper = "bev1-osm2-kitti-5m_res101-vgg19_depth-bins33-nonorm-1to9_bs9-f256_d8-nrot64-normvalid-prior-rep_ft-lr5"  # best MLY+KITTI

exper = "bev1-osm2-mly13-n100_res101-vgg19_depth-bins33-nonorm-1to9_bs9-resize512_d8-nrot64-normvalid-prior-rep_ckpt"  # new model to try

# or pull a default experiment and override from the eval file
exper, cfg = eval_kitti.experiments["BEV_MLY"]

# cfg = {**cfg, "data": {"local_dir": "data/kitti"}}  # comment out if you are on ondemand
cfg = {**cfg, "data": {"local_dir": "data/kitti", "max_num_val": 1000, "selection_subset_val": "random"}}
results = eval_kitti.run("val", exper, cfg, sequential=False)

Sequential

In [None]:
results = eval_kitti.run("test", exper, cfg, sequential=True)

# Visualize a few examples

In [None]:
def viz_fn(i, model, batch, pred, results):
    results.pop("xy_expectation_error")
    for k in list(results):
        if "recall" in k:
            results.pop(k)
    print(i, {k: np.round(v.cpu().numpy().astype(float), 2).tolist() for k, v in results.items()})
    pred = move_data_to_device(pred, "cpu")
    batch = move_data_to_device(batch, "cpu")

    image = batch["image"][0].permute(1, 2, 0)
    rasters = batch["map"][0]
    uv_gt = batch["xy"][0]
    yaw_gt = batch["roll_pitch_yaw"][0, -1]

    lp_uvt = lp_uv = pred["log_probs"][0]
    has_rotation = lp_uvt.ndim == 3
    if has_rotation:
        lp_uv = lp_uvt.max(-1).values
    prob = lp_uv.exp()
    feats_map = pred["map"]["map_features"][0][0]
    feats_map_rgb, = features_to_RGB(feats_map.numpy())

    text1 = f'error: {results["xy_max_error"]:.1f}m/{results.get("yaw_max_error", -1):.1f}°'
    text1 += f'  lateral/longit: {results["directional_error"][0]:.1f}m/{results["directional_error"][1]:.1f}m'
    map_viz = colormap.apply(rasters)
    plot_images([image, map_viz, lp_uv, likelihood_overlay(prob.numpy(), map_viz), feats_map_rgb],
                titles=[text1, 'map', 'loglikelihood', 'likelihood', 'map features'], dpi=75, cmaps='jet')
    colormap.add_colorbar()
    plot_nodes(1, rasters[2])
    plot_pose([1], uv_gt, yaw_gt, s=1/35, c="forestgreen", w=0.015)
    plot_pose([1], pred["xy_max"][0], pred.get("yaw_max", [None])[0], s=1/35, c="k", w=0.015)
    plot_dense_rotations(2, lp_uvt.exp(), s=1/15)

    if "features_bev" not in pred:
        return

    feats_q = pred["features_bev"][0]
    mask_bev = pred["valid_bev"][0]
    if "log_prior" in pred["map"]:
        prior = pred["map"]["log_prior"][0][0].sigmoid()
    else:
        prior = None
    norm_q = conf_q = torch.norm(feats_q, dim=0)
    if "bev" in pred and "confidence" in pred["bev"]:
        conf_q = pred["bev"]["confidence"][0]
    conf_q = conf_q.masked_fill(~mask_bev, np.nan)
    norm_q = norm_q.masked_fill(~mask_bev, np.nan)

    feats_q_rgb, = features_to_RGB(feats_q.numpy(), masks=[mask_bev])
    feats_q_rgb = np.concatenate([feats_q_rgb, mask_bev[..., None]], -1)
    plot_images([norm_q, conf_q, feats_q_rgb] + ([] if prior is None else [prior]),
                titles=["BEV feature norm", "BEV weight", "BEV features"]+([] if prior is None else ['map prior']), dpi=50, cmaps='jet')

exper, cfg = eval_kitti.experiments["BEV_MLY"]
cfg = {**cfg, "data": {"local_dir": "data/kitti"}}
results = eval_kitti.run("test", exper, cfg, sequential=False, num=5, callback=viz_fn, progress=False)

# Visualize sequential predictions

In [None]:
from maploc.data.sequential import unpack_batches
from maploc.models.hough_voting import log_softmax_spatial

def viz_fn(i, model, batches, preds, results, aligner):
    skip = len(batches) // 16
    batches = batches[::skip]
    results = results[::skip]
    preds = preds[::skip]

    batches = [move_data_to_device(b, "cpu") for b in batches]
    images, canvas, maps, yaws_gt, uv_gt, xy_gt = unpack_batches(batches)
    maps = list(map(colormap.apply, maps))
    plot_images([aligner.belief.max(-1).values.cpu(), aligner.belief.max(-1).values.exp().cpu()], cmaps="jet", dpi=75)

    errors = torch.stack([r["xy_max_error"] for r in results])
    errors_seq = torch.stack([r["xy_seq_error"] for r in results])
    errors_yaw = torch.stack([r["yaw_max_error"] for r in results])
    errors_yaw_seq = torch.stack([r["yaw_seq_error"] for r in results])
    print(f"pos: {errors.mean().item():.2f}m --> {errors_seq.mean().item():.2f}m, yaw: {errors_yaw.mean().item():.1f}° --> {errors_yaw_seq.mean().item():.1f}°")
    # print(aligner.yaw_slam2geo.abs()%360)
    
    logprobs = torch.stack([p["log_probs"] for p in preds]).cpu()
    uvt_p = torch.stack([p["xyr_max"] for p in preds]).cpu()
    uv_seq = torch.stack([p["xy_seq"] for p in preds]).cpu()
    yaw_seq = torch.stack([p["yaw_seq"] for p in preds]).cpu()

    # Viz
    plot_images(images, dpi=50)

    plot_images([likelihood_overlay(p.max(-1).values.exp().numpy(), m) for m, p in zip(maps, logprobs)], cmaps="jet", dpi=50)
    [plot_pose([i], uv, y, s=1/35, c="g", w=0.015) for i, (uv, y) in enumerate(zip(uv_gt, yaws_gt))];
    [plot_pose([i], uvt[:2], uvt[2], s=1/35, c="k", w=0.015) for i, uvt in enumerate(uvt_p)];
    [plot_pose([i], uv, t, s=1/35, c="r", w=0.015) for i, (uv, t) in enumerate(zip(uv_seq, yaw_seq))];

    plot_images([lp.max(-1).values for lp in logprobs], cmaps="jet", titles=[f"{e:.2f}" for e in errors], dpi=50)
    [plot_dense_rotations(i, lp.exp(), s=1/15) for i, lp in enumerate(logprobs)];

    if aligner.priors:
        priors = log_softmax_spatial(torch.stack(aligner.priors[::skip]).cumsum(0)).cpu()
        plot_images([p.max(-1).values for p in priors], cmaps="jet", titles=[f"{e:.2f}" for e in errors_seq])
        [plot_dense_rotations(i, p.exp(), s=1/15) for i, p in enumerate(priors)];

exper, cfg = eval_kitti.experiments["BEV_MLY"]
cfg = {**cfg, "data": {"local_dir": "data/kitti"}}
cfg["chunking"] = {"max_length": 32}  # see maploc.data.sequential.chunk_sequence_v2 for all parameters
results = eval_kitti.run("test", exper, cfg, sequential=True, num=2, callback=viz_fn, progress=False)