In [None]:
import json
from typing import Tuple, List, Dict, Any, Optional, Union
import ijson
import hashlib

In [None]:
def findClosestPoint(
    point: Tuple[float, float, float], pointsStr: str
) -> Tuple[int, int]:
    # point is a tuple of (x, y, z)
    # pointsStr is a string of a list of points seperated by @
    stringPoints = pointsStr.split("@")
    point = [float(value) for value in point]
    # cast each strring from 0.1,0.2,0.3 to [0.1, 0.2, 0.3]
    points = [list(map(float, point.split(","))) for point in stringPoints]
    # find closest match of point in points
    closest = min(points, key=lambda p: sum((a - b) ** 2 for a, b in zip(p, point)))
    # return index of closest point and total length of points
    return (points.index(closest), len(points))

In [None]:
def distanceToLastRoadPoint(
        point: Tuple[float, float, float], pointsStr: str
) -> float:
    stringPoints = pointsStr.split("@")
    point = [float(value) for value in point]
    points = [list(map(float, point.split(","))) for point in stringPoints]
    lastPoint = points[-1]
    # find index in last point closest to 0.56
    # round to 2 decomal places
    if round(lastPoint[1], 1) == 0.5:
        lastPoint[1], lastPoint[2] = lastPoint[2], lastPoint[1]

    return sum((a - b) ** 2 for a, b in zip(lastPoint, point))

In [None]:
road1 = []
road2 = []
road3 = []
road4 = []
road5 = []
road6 = []
road7 = []
road8 = []
road9 = []


def loadJsons(filenames):
    results: Dict[str, Dict[int, Any]] = {}
    roads: Dict[str, Any] = {}
    road_success_dict = {}

    df_dict = {}
    for filename in filenames:
        # iterate over each object in the json file
        with open(filename, "r") as f:
            objects = ijson.items(f, "item")
            for benchmarking_obj in objects:
                # access the scenario
                scenario = benchmarking_obj["scenario"]
                # acess the waypoints within scenario
                waypoints = scenario["waypoints"]
                # get a hash of the waypoints
                waypointsHash = hashlib.sha256(str(waypoints).encode()).hexdigest()
                # if the hash equial asjdn write the object in a list
                # if waypointsHash == "44160852c628732f811ea9907657f37de4d02894da958e5d858d669ff2578995":
                #    road1.append(benchmarking_obj)
                # elif waypointsHash == "be859fbf2b9946c8d4834d026e413df51dc9f70bd637bcd0dc45ec7e8e3926af":
                #    road2.append(benchmarking_obj)
                # elif waypointsHash == "0a246a82022e55cbd10c2d46db0e90f41111798ccb7179b3c9ef1f9d7e549882":
                #    road3.append(benchmarking_obj)
                # elif waypointsHash == "493f3da92ee367c09bf8647aab2fb671462701ef0397c3749dd93b18585e1aa1":
                #    road4.append(benchmarking_obj)
                # elif waypointsHash == "382e50f80e16e79861d84d5d102dae4b793c449941b48d4963560d13e4b0f053":
                #    road5.append(benchmarking_obj)
                # elif waypointsHash == "90a8e61085e4e9918cb2c35f1d751b76a3a4b9ba5ef677deaa441487827fd026":
                #    road8.append(benchmarking_obj)

                roads[waypointsHash] = waypoints

                # get the perturbation_function
                perturbation_function = scenario["perturbation_function"]
                # get the perturbation_scale
                perturbation_scale = scenario["perturbation_scale"]
                # get the boolean value for isSuccess
                isSuccess = benchmarking_obj["isSuccess"]

                # get the average value of the float list in xte
                xte = benchmarking_obj["xte"]
                xteAvg = sum(xte) / len(xte)
                xteAvg = float(xteAvg)
                max_xte = max(xte)
                max_xte = float(max_xte)

                # get the last value in the pos list
                pos = benchmarking_obj["pos"]
                posLast = pos[-1]
                # swap the y and z values
                posLast[1], posLast[2] = posLast[2], posLast[1]

                # find the closest point to the last value in pos
                closestPoint, totalPoints = findClosestPoint(posLast, waypoints)

                quickness = 1 - (closestPoint / totalPoints)

                # get distance to last road point
                distance = distanceToLastRoadPoint(posLast, waypoints)

                isSuccess = max_xte < 2.0 and distance <= 6.0

                # check if there is a key for the perturbation_function
                if perturbation_function not in results:
                    results[perturbation_function] = {}
                # check if there is a key for the perturbation_scale
                if perturbation_scale not in results[perturbation_function]:
                    results[perturbation_function][perturbation_scale] = {
                        "success": [],
                        "xte": [],
                        "quickness": [],
                        "roadHash": [],
                    }
                # increment the success or failure
                results[perturbation_function][perturbation_scale]["success"].append(
                    isSuccess
                )

                # append the xte and quickness values
                results[perturbation_function][perturbation_scale]["xte"].append(xteAvg)
                results[perturbation_function][perturbation_scale]["quickness"].append(
                    quickness
                )
                results[perturbation_function][perturbation_scale]["roadHash"].append(
                    waypointsHash
                )

                # other index
                index = f"Scale_{perturbation_scale}_Success"
                index2 = f"Scale_{perturbation_scale}_XTE"
                index3 = f"Scale_{perturbation_scale}_Quickness"
                index4 = f"Scale_{perturbation_scale}_RoadHash"
                index5 = f"Scale_{perturbation_scale}_MaxXTE"
                index6 = f"Scale_{perturbation_scale}_Distance"

                if perturbation_function not in df_dict:
                    df_dict[perturbation_function] = {}
                if index not in df_dict[perturbation_function]:
                    df_dict[perturbation_function][index] = [isSuccess]
                else:
                    df_dict[perturbation_function][index].append(isSuccess)
                if index2 not in df_dict[perturbation_function]:
                    df_dict[perturbation_function][index2] = [xteAvg]
                else:
                    df_dict[perturbation_function][index2].append(xteAvg)
                if index3 not in df_dict[perturbation_function]:
                    df_dict[perturbation_function][index3] = [quickness]
                else:
                    df_dict[perturbation_function][index3].append(quickness)
                if index4 not in df_dict[perturbation_function]:
                    df_dict[perturbation_function][index4] = [waypointsHash]
                else:
                    df_dict[perturbation_function][index4].append(waypointsHash)
                if index5 not in df_dict[perturbation_function]:
                    df_dict[perturbation_function][index5] = [max_xte]
                else:
                    df_dict[perturbation_function][index5].append(max_xte)
                if index6 not in df_dict[perturbation_function]:
                    df_dict[perturbation_function][index6] = [distance]
                else:
                    df_dict[perturbation_function][index6].append(distance)

                # check if there is a key for roadHash in roads
                if waypointsHash not in roads:
                    roads[waypointsHash] = waypoints
                if waypointsHash not in road_success_dict:
                    road_success_dict[
                        waypointsHash
                    ] = f"{perturbation_function}_{perturbation_scale}-"
                if isSuccess:
                    road_success_dict[
                        waypointsHash
                    ] += f"{perturbation_function}_{perturbation_scale}-"

    return results, roads, road_success_dict, df_dict


results, roads, road_success_dict, df_dict = loadJsons(["./data.json"])

if False:
    # write all items from road1 in a json file
    with open("donkey_benchmark_normal_perturbations_road1_dave.json", "w") as f:
        json.dump(road1, f, indent=4)
    # do tihs for all roads
    with open("donkey_benchmark_normal_perturbations_road2_dave.json", "w") as f:
        json.dump(road2, f, indent=4)
    with open("donkey_benchmark_normal_perturbations_road3_dave.json", "w") as f:
        json.dump(road3, f, indent=4)
    with open("donkey_benchmark_normal_perturbations_road4_dave.json", "w") as f:
        json.dump(road4, f, indent=4)
    with open("donkey_benchmark_normal_perturbations_road5_dave.json", "w") as f:
        json.dump(road5, f, indent=4)
    with open("donkey_benchmark_normal_perturbations_road8_dave.json", "w") as f:
        json.dump(road8, f, indent=4)

    # create a zip from all road jsons
    # zip all the files
    import zipfile

    with zipfile.ZipFile(
        "donkey_benchmark_normal_perturbations_dave.zip", "w", zipfile.ZIP_DEFLATED
    ) as zipf:
        zipf.write("donkey_benchmark_normal_perturbations_road1_dave.json")
        zipf.write("donkey_benchmark_normal_perturbations_road2_dave.json")
        zipf.write("donkey_benchmark_normal_perturbations_road3_dave.json")
        zipf.write("donkey_benchmark_normal_perturbations_road4_dave.json")
        zipf.write("donkey_benchmark_normal_perturbations_road5_dave.json")
        zipf.write("donkey_benchmark_normal_perturbations_road8_dave.json")

In [None]:
import json

# iterate over a dict
for key, value in results.items():
    # iterate over a dict
    for key2, value2 in value.items():
        # iterate over a dict
        for key3, value3 in value2.items():
            # print the key and value
            print(key, key2, key3, value3)


def delete_entries(json_file, field, values):
    # Read the JSON file
    with open(json_file, "r") as f:
        data = json.load(f)

    # Delete the first and third entry from the list
    data = data[1:3] + data[4:]

    # Write the updated JSON data back to the file
    with open(json_file, "w") as f:
        json.dump(data, f)


def findEntries(json_file, values):
    # Read the JSON file
    with open(json_file, "r") as f:
        data = json.load(f)

    # iterate over the data
    counter = 0
    for entry in data:
        counter += 1
        # check if the entry has the field
        if "scenario" in entry and entry["scenario"]["waypoints"] in values:
            print("counter has value in field")

    # Write the updated JSON data back to the file
    with open(json_file, "w") as f:
        json.dump(data, f)

def findEntries(json_file, values):
    # Read the JSON file
    with open(json_file, "r") as f:
        data = json.load(f)

    # iterate over the data
    counter = 0
    for entry in data:
        counter += 1
        # check if the entry has the field
        if "scenario" in entry and entry["scenario"]["waypoints"] in values:
            print("counter has value in field")

    # Write the updated JSON data back to the file
    with open(json_file, "w") as f:
        json.dump(data, f)

In [None]:
from pathlib import Path

import matplotlib
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from matplotlib.colors import LinearSegmentedColormap

from plottable import ColumnDefinition, Table
from plottable.cmap import normed_cmap
from plottable.formatters import decimal_to_percent
from plottable.plots import circled_image # image

In [None]:
df = pd.DataFrame.from_dict(df_dict, orient="index").round(2)

# copy dataframe as backup
df_copy = df.copy()

scale_0_columns = [col for col in df.columns if "Scale_0" in col and not "RoadHash" in col]
scale_1_columns = [col for col in df.columns if "Scale_1" in col and not "RoadHash" in col]
scale_2_columns = [col for col in df.columns if "Scale_2" in col and not "RoadHash" in col]
scale_3_columns = [col for col in df.columns if "Scale_3" in col and not "RoadHash" in col]
scale_4_columns = [col for col in df.columns if "Scale_4" in col and not "RoadHash" in col]


df.drop(
    [
        "Scale_0_RoadHash",
        "Scale_1_RoadHash",
        "Scale_2_RoadHash",
        "Scale_3_RoadHash",
        "Scale_4_RoadHash",
    ],
    axis=1,
    inplace=True,
)

In [None]:
df["Scale_0_XTE"] = df["Scale_0_XTE"].apply(lambda x: sum(x) / len(x))
# format to 2 decimal places
df["Scale_0_XTE"] = df["Scale_0_XTE"].map("{:.2f}".format)
df["Scale_1_XTE"] = df["Scale_1_XTE"].apply(lambda x: sum(x) / len(x))
df["Scale_1_XTE"] = df["Scale_1_XTE"].map("{:.2f}".format)
df["Scale_2_XTE"] = df["Scale_2_XTE"].apply(lambda x: sum(x) / len(x))
df["Scale_2_XTE"] = df["Scale_2_XTE"].map("{:.2f}".format)
df["Scale_3_XTE"] = df["Scale_3_XTE"].apply(lambda x: sum(x) / len(x))
df["Scale_3_XTE"] = df["Scale_3_XTE"].map("{:.2f}".format)
df["Scale_4_XTE"] = df["Scale_4_XTE"].apply(lambda x: sum(x) / len(x))
df["Scale_4_XTE"] = df["Scale_4_XTE"].map("{:.2f}".format)

# do the same for quickness
df["Scale_0_Quickness"] = df["Scale_0_Quickness"].apply(lambda x: sum(x) / len(x))
# format to 2 decimal places
df["Scale_0_Quickness"] = df["Scale_0_Quickness"].map("{:.2f}".format)
df["Scale_1_Quickness"] = df["Scale_1_Quickness"].apply(lambda x: sum(x) / len(x))
df["Scale_1_Quickness"] = df["Scale_1_Quickness"].map("{:.2f}".format)
df["Scale_2_Quickness"] = df["Scale_2_Quickness"].apply(lambda x: sum(x) / len(x))
df["Scale_2_Quickness"] = df["Scale_2_Quickness"].map("{:.2f}".format)
df["Scale_3_Quickness"] = df["Scale_3_Quickness"].apply(lambda x: sum(x) / len(x))
df["Scale_3_Quickness"] = df["Scale_3_Quickness"].map("{:.2f}".format)
df["Scale_4_Quickness"] = df["Scale_4_Quickness"].apply(lambda x: sum(x) / len(x))
df["Scale_4_Quickness"] = df["Scale_4_Quickness"].map("{:.2f}".format)


# within each success column, count the amount of True values and divide by the total amount of values
df["Scale_0_Success"] = df["Scale_0_Success"].apply(lambda x: sum(x)/len(x))
df["Scale_1_Success"] = df["Scale_1_Success"].apply(lambda x: sum(x)/len(x))
df["Scale_2_Success"] = df["Scale_2_Success"].apply(lambda x: sum(x)/len(x))
df["Scale_3_Success"] = df["Scale_3_Success"].apply(lambda x: sum(x)/len(x))
df["Scale_4_Success"] = df["Scale_4_Success"].apply(lambda x: sum(x)/len(x))
# format to % with no decimal places

df = df.round(2)
df.head()

In [None]:
cmap = LinearSegmentedColormap.from_list(
    name="bugw", colors=["#ff1100", "#d95d55", "#c9ecb4", "#93d3ab", "#1a8a23"], N=256
)

In [None]:
col_defs = (
    [
        ColumnDefinition(
            name=col,
            title=col.replace("Scale_0_", ""),
            cmap=cmap,
            group="Intensity 0",
        )
        for col in scale_0_columns

    ]
    + [
        ColumnDefinition(
            name=col,
            title=col.replace("Scale_1_", ""),
            cmap=cmap,
            group="Intensity 1",
        )
                for col in scale_1_columns

    ]
    + [
        ColumnDefinition(
            name=col,
            title=col.replace("Scale_2_", ""),
            cmap=cmap,
            group="Intensity 2",
        )
                for col in scale_2_columns

    ]
    + [
        ColumnDefinition(
            name=col,
            title=col.replace("Scale_3_", ""),
            cmap=cmap,
            group="Intensity 3",
        )
                for col in scale_3_columns

    ]
    + [
        ColumnDefinition(
            name=col,
            title=col.replace("Scale_4_", ""),
            cmap=cmap,
            group="Intensity 4",
        )
                for col in scale_4_columns

    ]
)

In [None]:
fig, ax = plt.subplots(figsize=(25, 22))

table = Table(
    df,
    column_definitions=col_defs,
    row_dividers=True,
    footer_divider=True,
    ax=ax,
    textprops={"fontsize": 14},
    row_divider_kw={"linewidth": 1, "linestyle": (0, (1, 5))},
    col_label_divider_kw={"linewidth": 1, "linestyle": "-"},
    column_border_kw={"linewidth": 1, "linestyle": "-"},
)

##### Plot Heatmaps

In [None]:
import seaborn as sns

def plot_heatmap(df, kpi, cmap='coolwarm', fmt='.2f'):
    df = df.apply(pd.to_numeric, errors='coerce')
    df = df.fillna(0)  # Fill NaN values with 0

    plt.figure(figsize=(14, 10))
    sns.heatmap(df.T, annot=True, cmap=cmap, fmt=fmt)
    plt.title(f'Heatmap for {kpi}')
    plt.xlabel('Perturbation')
    plt.ylabel('Intensity Scale')
    # change font size of whole heatmap to 10
    plt.rcParams['font.size'] = 10
    plt.show()


def create_kpi_df(kpi,df):

    df_copy = df.reset_index()
    scales = sorted(set(col.split('_')[1] for col in df_copy.columns if 'Scale' in col))
    perturbations = df_copy['index'].tolist()
    # Initialize an empty dataframe
    kpi_df = pd.DataFrame(index=scales, columns=perturbations)
    # Fill the dataframe with values
    for scale in scales:
        for perturbation in perturbations:
            # Construct the column name
            col_name = f'Scale_{scale}_{kpi}'
            if col_name in df_copy.columns:
                kpi_df.at[scale, perturbation] = df_copy.loc[df_copy['index'] == perturbation, col_name].values[0]
            else:
                # Assign 0 if the perturbation does not exist for the scale
                kpi_df.at[scale, perturbation] = 0

    # Convert index to integer for proper sorting
    kpi_df.index = kpi_df.index.astype(int)
    kpi_df.sort_index(inplace=True)

    return kpi_df

In [None]:
# sort the perturbations in the df based on the success rate over all scales
df_copy = df.copy()
df_copy["Avgerage_Success"] = df_copy.filter(regex="Success").mean(axis=1).tolist()
df_copy = df_copy.sort_values(
    by=[
        "Avgerage_Success",
        "Scale_0_Success",
        "Scale_1_Success",
        "Scale_2_Success",
        "Scale_3_Success",
        "Scale_4_Success",
    ],
    ascending=False,
)

for i in range(4):
    start = i * 11
    # take the first 10 entries
    print(f"from {start}")
    df_copy_i = df_copy[start : start + 11]

    # get the perturbations
    perturbations = df_copy_i.index.tolist()
    # create a dict of the scales and all success rates in this scale
    success = {
        "Intensity 0": df_copy_i["Scale_0_Success"].tolist(),
        "Intensity 1": df_copy_i["Scale_1_Success"].tolist(),
        "Intensity 2": df_copy_i["Scale_2_Success"].tolist(),
        "Intensity 3": df_copy_i["Scale_3_Success"].tolist(),
        "Intensity 4": df_copy_i["Scale_4_Success"].tolist(),
    }

    # get min and max of all success rates
    min_success = min(df_copy_i.filter(regex="Success").min(axis=1).tolist())
    max_success = max(df_copy_i.filter(regex="Success").max(axis=1).tolist())

    x = np.arange(len(perturbations))  # the label locations
    width = 0.15  # the width of the bars
    multiplier = 0

    fig, ax = plt.subplots(figsize=(25, 10))

    for attribute, success in success.items():
        offset = width * multiplier

        # Create a colormap
        cmap = LinearSegmentedColormap.from_list(
            name="bugw",
            colors=["#ff1100", "#d95d55", "#c9ecb4", "#93d3ab", "#1a8a23"],
            N=256,
        )
        # Normalize the success values to the range [0, 1]
        norm = plt.Normalize(0, 1)

        # Calculate the colors
        colors = cmap(norm(success))

        rects1 = ax.bar(
            x + offset,
            success,
            width,
            label=attribute,
            color=colors,
        )
        ax.bar_label(rects1, padding=3, fmt="%.2f %%", rotation=90)

        multiplier += 1

    # get the average value for each perturbation and filter for success
    average = df_copy_i.filter(regex="Success").mean(axis=1).tolist()

    for i in range(len(perturbations)):
        ax.plot(
            [i - width, i + 5 * width],
            [average[i], average[i]],
            color="r",
            linestyle="--",
        )

    # Add some text for labels, title and custom x-axis tick labels, etc.
    ax.set_ylabel("Success Rate")
    ax.set_title("Success Rate per Perturbation")
    ax.set_xticks(x + width, perturbations)
    ax.legend(
        loc="upper center",
        bbox_to_anchor=(0.5, -0.05),
        fancybox=True,
        shadow=True,
        ncol=5,
    )

    # set y range from min to max
    ax.set_ylim([min_success - 0.05, max_success + 0.05])

    plt.show()