# Evaluation Notebook for RTLS Results

**Introduction:**
This notebook is designed for the evaluation of multi-camera tracking results derived from the Real-Time Location System (RTLS) in its operational mode. It focuses on providing a comprehensive analysis of the RTLS accuracy by incorporating a variety of metrics, including the Higher Order Tracking Accuracy (HOTA), detection accuracy (DetA), association accuracy (AssA), localization accuracy (LocA), IDF1, MOTA, among other Multiple Object Tracking (MOT) metrics. These metrics collectively offer an in-depth perspective on the RTLS's overall effectiveness.

RTLS technology is capable of generating 3D location data for all objects within a scene, projecting their positions onto the ground plane. This notebook serves as a tool for assessing the precision of these 3D locations by comparing them to ground-truth data, employing the Euclidean distance as the measure of accuracy. To utilize this notebook effectively, access to ground-truth 3D location data is required.


**Requirements:**


+ Ground truth file with 3D locations in MOT format
+ RTLS log file produced from the `mdx-rtls` Kafka topic
+ Valid config file (`app_rtls_config.json`) and calibration JSON file (`calibration.json`)


**Environment Setup:** 

+ ``conda create -n mtmc_analytics python=3.10``
+ ``conda activate mtmc_analytics``
+ ``conda install jupyter notebook``
+ ``pip3 install -r requirements.txt``

# Import modules

In [None]:
# **Copyright (c) 2009-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.**

import os
import json
from typing import List, Dict, Set, Tuple, Any

import mdx.mtmc.utils.trackeval as trackeval
from mdx.mtmc.config import AppConfig
from mdx.mtmc.core.calibration import Calibrator
from mdx.mtmc.utils.io_utils import validate_file_path, make_seq_ini_file, make_dir, load_json_from_file

# Define variables

In [None]:
ground_truth_path: str = ""
rtls_log_path: str = ""
app_config_path: str = "resources/app_rtls_config.json"
calibration_path: str = "resources/calibration_retail_synthetic.json"
seq_length: int = 11000

# Define helper methods

In [None]:
def prepare_ground_truth_file(input_file_path: str, output_file_path: str, frame_ids: Set[int], ground_truth_frame_id_offset: int) -> None:
    """
    Converts the ground truth file in a MOT file format for evaluation
    
    :param str input_file_path: input file path
    :param str output_file_path: output file path
    :param Set[int] frame_ids: frame IDs
    :param int ground_truth_frame_id_offset: Offset of frame IDs in ground truth
    :return: None
    ::

        prepare_ground_truth_file(input_file_path, output_file_path, frame_ids, ground_truth_frame_id_offset)
    """
    # Create intermediate dictionary to store ground truth entry
    map_frame_id_to_objects: Dict[int, Dict[int, List[float]]] = dict()

    output_file = open(output_file_path, "w")

    with open(input_file_path) as f:
        for line in f:
            line = line.rstrip()
            line_split = line.split(" ")

            # Extract frame ID from the line
            frame_id = int(line_split[2]) + ground_truth_frame_id_offset

            # Ignore the line if the frame ID is not present in the given frame IDs
            if frame_id not in frame_ids:
                continue

            # Extract 3D location
            location_x = float(line_split[7])
            location_y = float(line_split[8])

            # Append 3D location to the intermediate dictionary
            if frame_id not in map_frame_id_to_objects:
                map_frame_id_to_objects[frame_id] = dict()

            # Extract object ID
            object_id = int(line_split[1])
            if object_id not in map_frame_id_to_objects[frame_id]:
                map_frame_id_to_objects[frame_id][object_id] = list()
            map_frame_id_to_objects[frame_id][object_id].append(location_x)
            map_frame_id_to_objects[frame_id][object_id].append(location_y)

    # Write the ground truth to output file
    for frame_id in sorted(list(map_frame_id_to_objects.keys())):
        for object_id in sorted(list(map_frame_id_to_objects[frame_id].keys())):
            location_x = map_frame_id_to_objects[frame_id][object_id][0]
            location_y = map_frame_id_to_objects[frame_id][object_id][1]
            result_str = str(frame_id) + " " + str(object_id) + " -1 -1 -1 -1 1 " + str(location_x) + " " + str(location_y) + " -1\n"
            output_file.write(result_str)
    del map_frame_id_to_objects
    output_file.close()


def prepare_prediction_file(input_file_path: str, output_file_path: str, ground_truth_delay_sec: float, fps: float) -> List[int]:
    """
    Converts the predicition file in a MOT file format for evaluation
    
    :param str input_file_path: input file path
    :param str output_file_path: output file path
    :param float ground_truth_delay_sec: delay of ground truth in second
    :param float fps: frame rate (FPS)
    :return: list of frame IDs
    :rtype: List[int]
    ::

        frame_ids = prepare_prediction_file(input_file_path, output_file_path, ground_truth_delay_sec, fps)
    """
    # Create a intermediate dictionary to store the prediction entry
    map_frame_id_to_objects: Dict[int, Dict[int, List[float]]] = dict()

    # Read the input file that contains messages from the Kafka topic of "mdx-rtls"
    with open(input_file_path) as f:
        for line in f:
            line = line.rstrip()

            # Remove the "b" symbol (byte) to read the line correctly 
            if line.startswith("b'"):
                line = line[2:-1]

            # Load the line
            line = json.loads(line)

            # Rectify the frame ID based on the delay of ground truth
            raw_frame_id = int(line["frameId"])
            corrected_frame_id = int(raw_frame_id - ((ground_truth_delay_sec / 2) * fps))
            if corrected_frame_id <= 0:
                continue

            # Read the 3D locations of objects
            for object_info in line["locationsOfObjects"]:
                object_info = str(object_info)
                object_info = object_info.replace("'", '"')
                object_json = json.loads(object_info)

                # Ignore log if location is not present
                if len(object_json["locations"]) == 0:
                    continue

                # Extract 3D location
                location_x = object_json["locations"][0][0]
                location_y = object_json["locations"][0][1]

                if corrected_frame_id not in map_frame_id_to_objects:
                    map_frame_id_to_objects[corrected_frame_id] = dict()

                # Extract the object ID
                object_id = int(object_json["id"])  
                if object_id not in map_frame_id_to_objects[corrected_frame_id]:
                    map_frame_id_to_objects[corrected_frame_id][object_id] = list()
                map_frame_id_to_objects[corrected_frame_id][object_id].append(location_x)
                map_frame_id_to_objects[corrected_frame_id][object_id].append(location_y)

    output_file = open(output_file_path, "w")

    # Write the prediction to output file
    for frame_id in sorted(list(map_frame_id_to_objects.keys())):
        for object_id in sorted(list(map_frame_id_to_objects[frame_id].keys())):
            location_x = map_frame_id_to_objects[frame_id][object_id][0]
            location_y = map_frame_id_to_objects[frame_id][object_id][1]
            result_str = str(frame_id) + " " + str(object_id) + " -1 -1 -1 -1 1 " + str(location_x) + " " + str(location_y) + " -1\n"
            output_file.write(result_str)

    output_file.close()
    frame_ids = list(map_frame_id_to_objects.keys())
    return frame_ids


def make_seq_maps_file(seq_maps_dir_path: str, sensor_ids: List[str], benchmark: str, split_to_eval: str) -> None:
    """
    Makes a sequence-maps file used by TrackEval library

    :param str seq_maps_dir_path: output directory path
    :param List[str] sensor_ids: sensor IDs
    :param str benchmark: name of the benchmark
    :param str split_to_eval: name of the split for evaluation
    :return: None
    ::

        make_seq_maps_file(seq_maps_dir_path, sensor_ids, benchmark, split_to_eval)
    """
    make_dir(seq_maps_dir_path)
    seq_maps_file_name = benchmark + "-" + split_to_eval + ".txt"
    seq_maps_file_path = os.path.join(seq_maps_dir_path, seq_maps_file_name)
    f = open(seq_maps_file_path, "w")
    f.write("name\n")

    for sensor_id in sensor_ids:
        f.write(sensor_id + "\n")
    f.close()


def setup_evaluation_configs(results_dir_path: str) -> Tuple[Dict[str, Any], Dict[str, Any]]:
    """
    Sets up evaluation configurations

    :param str results_dir_path: path to the folder that stores the results
    :return: dataset configuration and evaluation configuration
    :rtype: Tuple[Dict[str,Any],Dict[str,Any]]
    ::

        dataset_config, eval_config = setup_evaluation_configs(results_dir_path)
    """
    eval_config = trackeval.eval.Evaluator.get_default_eval_config()
    eval_config["PRINT_CONFIG"] = False
    eval_config["USE_PARALLEL"] = True
    
    # Create dataset configs for TrackEval library
    dataset_config = trackeval.datasets.MotChallenge3DLocation.get_default_dataset_config()
    dataset_config["DO_PREPROC"] = False
    dataset_config["SPLIT_TO_EVAL"] = "all"
    evaluation_dir_path = os.path.join(results_dir_path, "evaluation")
    make_dir(evaluation_dir_path)
    dataset_config["GT_FOLDER"] = os.path.join(evaluation_dir_path, "gt")
    dataset_config["TRACKERS_FOLDER"] = os.path.join(evaluation_dir_path, "scores")
    dataset_config["PRINT_CONFIG"] = False

    return dataset_config, eval_config


def prepare_evaluation_folder(dataset_config: Dict[str, Any]) -> Tuple[str, str]:
    """
    Prepares evaluation folder

    :param Dict[str,Any] dataset_config: dataset configuration
    :return: prediction file path and ground truth file path
    :rtype: Tuple[str,str]
    ::

        pred_file_path, gt_file_path = prepare_evaluation_folder(dataset_config)
    """
    # Create evaluation configs for TrackEval library
    sensor_ids: Set[str] = set()
    sensor_ids.add("RTLS")
    sensor_ids = sorted(list(sensor_ids))

    # Create sequence maps file for evaluation
    seq_maps_dir_path = os.path.join(dataset_config["GT_FOLDER"], "seqmaps")
    make_seq_maps_file(seq_maps_dir_path, sensor_ids, dataset_config["BENCHMARK"], dataset_config["SPLIT_TO_EVAL"])

    # Create ground truth directory
    mot_version = dataset_config["BENCHMARK"] + "-" + dataset_config["SPLIT_TO_EVAL"]
    gt_root_dir_path = os.path.join(dataset_config["GT_FOLDER"], mot_version)
    gt_rtls_dir_path = os.path.join(gt_root_dir_path, "RTLS")
    make_dir(gt_rtls_dir_path)
    gt_output_dir_path = os.path.join(gt_rtls_dir_path, "gt")
    make_dir(gt_output_dir_path)
    gt_file_path = os.path.join(gt_output_dir_path, "gt.txt")

    # Generate sequence file required for TrackEval library
    make_seq_ini_file(gt_rtls_dir_path, camera="RTLS", seq_length=seq_length)          

    # Create prediction directory
    pred_dir_path = os.path.join(dataset_config["TRACKERS_FOLDER"], mot_version, "data", "data")
    make_dir(pred_dir_path)
    pred_file_path = os.path.join(pred_dir_path, "RTLS.txt")

    return pred_file_path, gt_file_path

# Start evaluation

In [None]:
# Load app config
app_config = AppConfig(**load_json_from_file(app_config_path))

# Prepare evaluation folders
dataset_config, eval_config = setup_evaluation_configs(app_config.io.outputDirPath)
pred_file_path, gt_file_path = prepare_evaluation_folder(dataset_config)

# Get the MTMC plus config parameters
ground_truth_frame_id_offset = app_config.io.groundTruthFrameIdOffset
mtmc_plus_location_window_sec = app_config.streaming.mtmcPlusLocationWindowSec
mtmc_plus_smoothing_window_sec = app_config.streaming.mtmcPlusSmoothingWindowSec

# Get FPS for sensor
calibrator = Calibrator(calibration_path)
sensors = calibrator.load_calibration_file(calibration_path)
fps = None
for sensor in sensors:
    for attribute in sensor.attributes:
        if attribute["name"] == "fps":
            if fps is None:
                fps = float(attribute["value"])
            elif fps != float(attribute["value"]):
                print(f"ERROR: Unmatched FPS for sensors: {fps} != {float(attribute['value'])}.")
                exit(1)
if fps is None:
    print(f"ERROR: FPS not available in calibration.")
    exit(1)

print(f"Found mtmcPlusLocationWindowSec as: {mtmc_plus_location_window_sec} seconds")
print(f"Found mtmcPlusSmoothingWindowSec as: {mtmc_plus_smoothing_window_sec} seconds")
print(f"Found sensor FPS as: {fps}")

# Generate prediction file
frame_ids = prepare_prediction_file(rtls_log_path, pred_file_path, (mtmc_plus_location_window_sec + mtmc_plus_smoothing_window_sec), fps)

# Generate ground truth file 
prepare_ground_truth_file(ground_truth_path, gt_file_path, set(frame_ids), ground_truth_frame_id_offset)

# Define the metrics to calculate
metrics_config = {"METRICS": ["HOTA", "CLEAR", "Identity"]}
metrics_config["PRINT_CONFIG"] = False
config = {**eval_config, **dataset_config, **metrics_config}  # Merge configs
eval_config = {k: v for k, v in config.items() if k in eval_config.keys()}
dataset_config = {k: v for k, v in config.items() if k in dataset_config.keys()}
metrics_config = {k: v for k, v in config.items() if k in metrics_config.keys()}

# Run the Evaluator
evaluator = trackeval.eval.Evaluator(eval_config)
dataset_list = [trackeval.datasets.MotChallenge3DLocation(dataset_config)]
metrics_list: List[str] = list()
for metric in [trackeval.metrics.HOTA, trackeval.metrics.CLEAR, trackeval.metrics.Identity]:
    if metric.get_name() in metrics_config["METRICS"]:
        metrics_list.append(metric(metrics_config))
if len(metrics_list) == 0:
    raise Exception("No metric selected for evaluation.")
evaluator.evaluate(dataset_list, metrics_list)

# Plot evaluation graphs

In [None]:
plots_dir_path = os.path.join(app_config.io.outputDirPath, "evaluation", "plots")
make_dir(plots_dir_path)
trackers_dir_path = dataset_config["TRACKERS_FOLDER"]
classes: List[str] = ["pedestrian"]
mot_version = dataset_config["BENCHMARK"] + "-" + dataset_config["SPLIT_TO_EVAL"]
data_dir_path = os.path.join(trackers_dir_path, mot_version)
tracker_names = os.listdir(validate_file_path(data_dir_path))
for cls in classes:
    trackeval.plotting.plot_compare_trackers(data_dir_path, tracker_names, cls, plots_dir_path)


# Delete output folder

In [None]:
!rm -r {app_config.io.outputDirPath}