In [None]:
%matplotlib widget
%load_ext autoreload
import numpy as np
import os 
import matplotlib
import matplotlib.pyplot as plt
import matplotlib.figure, matplotlib.axes
import os
import sys
import numpy as np
import pickle as pkl
import yaml
import tqdm
import collections.abc

thisfiledir=os.path.abspath("")
deepracingmodelsdir = os.path.abspath(os.path.join(thisfiledir, ".."))
deepracingdir = os.path.abspath(os.path.join(thisfiledir, "..", "..", "deepracing_py"))
if (not (deepracingmodelsdir in sys.path)) or (not (deepracingdir in sys.path)):
    sys.path = [deepracingmodelsdir, deepracingdir] + sys.path

homedir = os.environ["HOME"]

mtrdir=os.path.join(homedir, "deepracingws", "MTR")
print(mtrdir)
if (not (mtrdir in sys.path)):
    sys.path.insert(0, mtrdir)
print(sys.path)
class PredictionResults(collections.abc.Mapping[str,np.ndarray]):
    def __init__(self, resultsdict : dict[str,np.ndarray], data_dir : str, modelname : str) -> None:
        self.resultsdict = resultsdict
        self.data_dir = data_dir
        self.modelname = modelname
    def __iter__(self):
        return iter(self.resultsdict)
    def __len__(self):
        return len(self.resultsdict)
    def __getitem__(self, key):
        return self.resultsdict[key]
    def __setitem__(self, key, value):
        self.resultsdict[key] = value
    def numsamples(self):
        return self.resultsdict["predictions"].shape[0]
    def keys(self):
        return self.resultsdict.keys()
    @staticmethod
    def from_data_file(data_file : str, modelname : str, sort_idx : np.ndarray | None = None) -> 'PredictionResults':
        data_dir = os.path.dirname(data_file)
        with open(data_file, "rb") as f:
            results_file = np.load(f)
            if sort_idx is None:
                results_dict = {k: v.copy() for (k,v) in results_file.items()}
            else:
                results_dict = {k: v[sort_idx].copy() for (k,v) in results_file.items()}
        return PredictionResults(results_dict, data_dir, modelname)
    def compute_fde(self):
        self.resultsdict["fde"] = np.linalg.norm(self.resultsdict["predictions"][:,-1,[0,1]] - self.resultsdict["ground_truth"][:,-1,[0,1]], ord=2.0, axis=1)
class color:
   PURPLE = '\033[95m'
   CYAN = '\033[96m'
   DARKCYAN = '\033[36m'
   BLUE = '\033[94m'
   GREEN = '\033[92m'
   YELLOW = '\033[93m'
   RED = '\033[91m'
   BOLD = '\033[1m'
   UNDERLINE = '\033[4m'
   END = '\033[0m'

In [None]:
import deepracing_models.data_loading.utils.file_utils as file_utils
from deepracing_models.data_loading import SubsetFlag
import torch.utils.data as torchdata

datadir = "/p/DeepRacing/unpacked_datasets/local_fitting/v1/deepracing_standard"
datasets = file_utils.load_datasets_from_files(datadir,   flag=SubsetFlag.TEST)

bezier_experiment = "widespread_beans_6059"
bezier_results_dir = os.path.join("/p/DeepRacing/mixnet_bezier_results", bezier_experiment)
bezier_results = PredictionResults.from_data_file(os.path.join(bezier_results_dir, "data.npz"), "BezierMixNet")
bezier_results.compute_fde()

composite_experiment = "sunny_coyote_3579"
composite_results_dir = os.path.join("/p/DeepRacing/bamf_results", composite_experiment)
composite_results = PredictionResults.from_data_file(os.path.join(composite_results_dir, "data.npz"), "BARTé")
composite_results.compute_fde()

mixnet_experiment = "agricultural_flue_8932"
mixnet_results_dir = os.path.join("/p/DeepRacing/mixnet_results", mixnet_experiment)
mixnet_results = PredictionResults.from_data_file(os.path.join(mixnet_results_dir, "data.npz"), "MixNet")
mixnet_results["ground_truth"] = composite_results["ground_truth"].copy()
mixnet_results.compute_fde()
mtr_experiment = "formal_pedestal_9890"
mtr_results_dir =  os.path.join("/p/DeepRacing/mtr_results", mtr_experiment)
mtr_data_dir = "/p/DeepRacing/unpacked_datasets/local_fitting/v1/mtr_format/1second"
mtr_scenarios_dir = os.path.join(mtr_data_dir, "processed_scenarios_test")
mtr_sortfile = os.path.join(mtr_results_dir, "test_plots", "idx_sort.npz")
if not os.path.isfile(mtr_sortfile):
    with open(os.path.join(mtr_data_dir, "processed_scenarios_test_infos.pkl"), "rb") as f:
        mtr_infos = pkl.load(f)
    mtr_keys = mtr_infos[0].keys()
    entries = []
    for (idx, info) in tqdm.tqdm(enumerate(mtr_infos), total=len(mtr_infos)):
        scenario_id = info["scenario_id"]
        with open(os.path.join(mtr_scenarios_dir, scenario_id+".metadata.yaml"), "r") as f:
            scenario_metadata = yaml.safe_load(f)
        deepracing_dir = os.path.dirname(scenario_metadata["deepracing_file"])
        dset_index = scenario_metadata["index"]
        car_index = int(os.path.basename(deepracing_dir).split("_")[-1])
        dated_trackname : str = os.path.basename(os.path.dirname(deepracing_dir))
        trackname = dated_trackname.split("_")[0]
        entries.append((idx, scenario_id, trackname, car_index, dset_index))
    entries_sorted = sorted(entries, key=lambda entry : (entry[2], entry[3], entry[4]))
    scenario_ids_sorted = np.asarray([e[1] for e in entries_sorted], dtype=object)
    idx_sort = np.asarray([e[0] for e in entries_sorted], dtype=np.int64)
    with open(mtr_sortfile, "wb") as f:
        np.savez(f, idx_sort=idx_sort, scenario_ids=scenario_ids_sorted)

with open(mtr_sortfile, "rb") as f:
    npfile = np.load(f, allow_pickle=True)
    sort_idx_mtr = npfile["idx_sort"].copy()
    scenario_ids_sorted = npfile["scenario_ids"].copy()

mtr_results = PredictionResults.from_data_file(os.path.join(mtr_results_dir, "test_plots", "data.npz"), "MTR", sort_idx=sort_idx_mtr)
mtr_results["predictions_all"] = mtr_results["predictions"].copy()
mtr_results["predictions"] = np.zeros_like(mtr_results["predictions_all"][:,0])
for idx in range(mtr_results["predictions_all"].shape[0]):
    mtr_results["predictions"][idx] = mtr_results["predictions_all"][idx,mtr_results["best_curve_idx"][idx]]
mtr_results.compute_fde()
# for k in ["history", "ground_truth"]

for model in [bezier_results, mixnet_results, composite_results, mtr_results]:
    print("%s has %d points" % (model.modelname, model["history"].shape[0]))
    print("%s has keys: %s" % (model.modelname, str(list(model.keys()))))



In [None]:
# idx_samp = 15
# best_curve_idx = mtr_results["best_curve_idx"]
# mtr_predictions = np.stack([mtr_results["predictions"][i, best_curve_idx[i]] for i in range(best_curve_idx.shape[0])], axis=0)
# comp_samp = composite_results["predictions"][idx_samp,1:]
# gt_samp = composite_results["ground_truth"][idx_samp,1:]
# mtr_samp = mtr_predictions[idx_samp]
import shutil

def plot_error_histograms(results : PredictionResults, bins=200):
    figures = []
    savedir = os.path.join(results.data_dir, "plots")
    os.makedirs(savedir, exist_ok=True)
    for (key, title) in {("ade", "MinADE"), ("lateral_error","Lateral Error"), ("longitudinal_error","Longitudinal Error")}:
        errors = results[key]
        modelname = results.modelname
        fig : matplotlib.figure.Figure = plt.figure()
        plt.hist(errors, bins=bins)
        plt.title(title + ": " + modelname)
        figures.append(fig)
        fig.savefig(os.path.join(savedir, "%s_%s_histogram.png" % (modelname, key)), backend="agg")
        fig.savefig(os.path.join(savedir, "%s_%s_histogram.pdf" % (modelname, key)), backend="pdf")
        figbox : matplotlib.figure.Figure = plt.figure()
        plt.title(title + ": " + modelname)
        plt.boxplot(errors, notch=True)
        figbox.savefig(os.path.join(savedir, "%s_%s_boxplot.png" % (modelname, key)), backend="agg")
        figbox.savefig(os.path.join(savedir, "%s_%s_boxplot.pdf" % (modelname, key)), backend="pdf")
        figures.append(figbox)
    return figures
def plot_outliers(results_list : list[PredictionResults], plotdir : str, metric_key="ade", N=1, worst=True, history_source_idx = 0):
    if plotdir is None or (not type(plotdir)==str):
        raise ValueError("plotdir must be a string")
    if os.path.isfile(plotdir):
        raise ValueError("plotdir must be a directory")
    ref_results = results_list[0]
    idx_sort = np.argsort(ref_results[metric_key])
    if worst:
        idx_sort = np.flipud(idx_sort)
        subdir_name="bottom_%d_%s_%s" % (N, ref_results.modelname, metric_key)
    else:
        subdir_name="top_%d_%s_%s" % (N, ref_results.modelname, metric_key)
    plotdirfull = os.path.join(plotdir, subdir_name)
    if os.path.isdir(plotdirfull):
        shutil.rmtree(plotdirfull)
    os.makedirs(plotdirfull)
    history_source = results_list[history_source_idx]
    for plot_idx in range(N):
        history = history_source["history"][idx_sort[plot_idx]]
        ground_truth = history_source["ground_truth"][idx_sort[plot_idx]]
        fig : matplotlib.figure.Figure = plt.figure()
        plt.plot(history[:,0], history[:,1], label="History")
        plt.scatter(ground_truth[:,0], ground_truth[:,1], label="Ground Truth")
        for (idx, results) in enumerate(results_list):
            predictions = results["predictions"][idx_sort[plot_idx]]
            plt.plot(predictions[:,0], predictions[:,1], label=results.modelname)
        plt.legend()
        fig.savefig(os.path.join(plotdirfull, "sample_%d.pdf" % (plot_idx+1,)))
        fig.savefig(os.path.join(plotdirfull, "sample_%d.png" % (plot_idx+1,)))
        plt.close(fig=fig)
    return idx_sort
    

In [None]:
from texttable import Texttable
import latextable
def boldstring(input_string : str):
    return input_string
    # return color.BOLD + input_string + color.END
def create_table(results : list[PredictionResults]) -> Texttable:
    texttable = Texttable(max_width=0)
    title_to_key = {
        "ADE" : "ade",
        "Lateral\nError" : "lateral_error",
        "Longitudinal\nError" : "longitudinal_error",
        "FDE" : "fde"
    }
    # model_name_to_label = {
    #     "ADE" : "ade",
    #     "Lateral\nError" : "lateral_error",
    #     "Longitudinal\nError" : "longitudinal_error",
    #     "FDE" : "fde"
    # }
    column_names = ["Model"] + sorted(title_to_key.keys())
    texttable.set_cols_align(["c"]*len(column_names))
    texttable.set_cols_valign(["m"]*len(column_names))
    texttable.header([boldstring(s) for s in column_names])
    # texttable.add_row([boldstring(s) for s in column_names])
    for result in results:
        texttable.add_row([result.modelname] + [str(np.mean(result[title_to_key[cname]])) for cname in column_names[1:]])
    return texttable
results_textable = create_table([composite_results, mtr_results, mixnet_results, bezier_results])
results_textable.set_deco(Texttable.BORDER | Texttable.HLINES | Texttable.HEADER | Texttable.VLINES)
print(results_textable.draw())
latex_table = latextable.draw_latex(results_textable, caption="Dem' rezults.", label="table:results")
print(latex_table.replace(r"\begin{table}", r"\begin{table}[!htbp]"))


plot_outliers([mtr_results, composite_results, mixnet_results, bezier_results], "/p/DeepRacing/result_plots/plots", history_source_idx=1, N=25)
plot_outliers([composite_results, mtr_results, mixnet_results, bezier_results], "/p/DeepRacing/result_plots/plots", history_source_idx=0, N=25)
idx_good = plot_outliers([composite_results, mtr_results, mixnet_results, bezier_results], "/p/DeepRacing/result_plots/plots", history_source_idx=0, N=25, worst=False)
error_sort = np.flip(np.argsort(mtr_results["ade"]), axis=0)
mtr_results_sorted = {k : mtr_results[k][error_sort] for k in mtr_results.keys()}
composite_results_sorted = {k : composite_results[k][error_sort] for k in composite_results.keys()}



In [None]:
control_points_good = composite_results["curves"][idx_good]
predictions_good = composite_results["predictions"][idx_good]
history_good = composite_results["history"][idx_good]