In [23]:
INPUT = [
    {
        "id" : 1, 
        "path": "/media/tes_unreal/1d0e94b2-c097-4f81-bb44-10ecc673aeb0/Upload/Mono"
    },
    {
        "id" : 2,
        "path": "/media/tes_unreal/94b9ac5d-05c4-4bd8-b9be-769f3822fb94/Upload"
    }

]

INPUT_ALL =  "/media/kiglis_local/My Passport/AnoVox/Dataset"

In [24]:

import os, sys

from Tools.util import *
from Tools.visualizer import sem_path_to_array
import Definitions
import plotly.express as px 
from concurrent.futures import ProcessPoolExecutor
import concurrent.futures
from tqdm import tqdm   
import pandas as pd
import json
import numpy as np
import plotly.graph_objects as go

IMG_SHAPE = (512, 768)
FRONTAL_CAMERA = "SEMANTIC-CAM(0, 0, 1.8)"
FRONTAL_LIDAR = "SEMANTIC-LIDAR(0, 0, 1.8)"

In [25]:
512*768*200*1500

117964800000

In [26]:
# Util functions
def load_sematnic_img(path):
    sem_imgs_path = find_directory("SEMANTIC-CAM*", path)
    _, sem_img = sem_path_to_array(sem_imgs_path)
    return sem_img  

def load_sematnic_img_from_path(path):
    _, sem_img = sem_path_to_array(path)
    return sem_img

def get_img_shape(scenario_path):
    sem_imgs_path = find_directory("SEMANTIC-CAM*", scenario_path)
    for file in os.listdir(sem_imgs_path):
        if os.path.isfile(os.path.join(sem_imgs_path, file)):
            img = cv2.imread(os.path.join(sem_imgs_path, file), cv2.IMREAD_UNCHANGED)
            return img.shape[:2]

def clean_json_from_types(json_path, types):
    with open(json_path, "r") as f:
        data = json.load(f)
        # for type in types:
        data["scenario_definition"]["scenarios"] = [x for x in data["scenario_definition"]["scenarios"] if x["anomaly_config"]["anomalytype"] in types]
    with open(json_path, "w") as f:
        json.dump(data, f, indent=4)

def clean_json_from_non_existent(json_path, scenario_path):
    with open(json_path, "r") as f:
        data = json.load(f)
        # for type in types:
        data["scenario_definition"]["scenarios"] = [x for x in data["scenario_definition"]["scenarios"] if os.path.isdir(os.path.join(scenario_path, "Scenario_" + x["id"]))]
    with open(json_path, "w") as f:
        json.dump(data, f, indent=4)

def npy_path_to_array(npy_path):
    values_list= []
    itr = list(get_files(npy_path))
    for file_name in itr:
        npy = np.load(npy_path + os.sep + file_name)
        pruned = []
        for i, value in enumerate(npy):
            pruned.append(value)
        pruned = np.array(pruned)
        
        npy_frame_id = int(file_name.split('_')[-1].split('.')[0])
        # print(npy_frame_id, file_name, npy.shape)
        values_list.append((npy_frame_id, pruned))
    
    # print(values_list)
    npy_sorted = sorted(values_list, key=lambda x: x[0])

    return [x[1] for x in npy_sorted]

def parse_scenarios(output_dir):
    scenario_config_dir = find_directory("Scenario_Configuration*", output_dir)
    town_configs = os.listdir(scenario_config_dir)
    scenarios = []
    for config in town_configs:
        if config.startswith("scenario_config"):
            scenarios.extend(parse_town(os.path.join(scenario_config_dir, config), output_dir))

    return scenarios

def parse_scenarios_with_dir(scenario_dir):
    town_configs = os.listdir(scenario_dir)
    output_dir = os.path.dirname(scenario_dir)
    
    scenarios = []
    for config in town_configs:
        if config.startswith("scenario_config"):
            scenarios.extend(parse_town(os.path.join(scenario_dir, config), output_dir))
    return scenarios
def parse_town(town_json_path, output_dir):
    scenarios = []
    with open(town_json_path, "r") as f:
        data = json.load(f)
        data = data["scenario_definition"]
        town = data["map"]
        for scenario in data["scenarios"]:
            scenario_folder = "Scenario_" + scenario["id"]
            scenario_path = os.path.join(output_dir, scenario_folder)
            scenario = Scenario(
                town, 
                scenario["id"],
                scenario["anomaly_config"]["anomalytype"], 
                scenario["weather_preset"],
                scenario_path,
                scenario["anomaly_config"]["anomaly_bp_name"])
            if scenario.exists:
                scenarios.append(scenario)
    return scenarios        

def find_all_sensors(sensor_type, scenario_path):
    sensor_dirs = []
    for root, dirs, files in os.walk(scenario_path):
        for dir in dirs:
            if dir.startswith(sensor_type):
                sensor_dirs.append(dir)
    return sensor_dirs

class Scenario:
    def __init__(self, town, scenario_id, scenario_type, weather_preset, scenario_path, anomaly_name):
        # If scenario has no directory, return None
        if not os.path.isdir(scenario_path):
            print(f"Scenario {scenario_id} does not exist. Skipping. {scenario_path}")
            self.exists = False
        else:
            self.exists = True
            self.town = town
            self.id = scenario_id
            self.type = scenario_type
            self.anomaly_name = anomaly_name
            self.weather_preset = weather_preset
            self.path = scenario_path
            self.sem_sensors = find_all_sensors("SEMANTIC-CAM", self.path)
            self.lidar_sensors = find_all_sensors("SEMANTIC-LIDAR", self.path)  


In [27]:
# Data processing functions
def find_anomaly_pixels(scenario, anomalies):
    sem_imgs = load_sematnic_img(scenario.path)

    imgs = sem_imgs[:, :, :, 0]
    result_heatmpap = np.zeros((imgs.shape[1], imgs.shape[2]))
    n_clear_frames = 0
    n_anomaly_frames = 0
    for id in anomalies:
        # find the number frames where the anomaly is present and absent
        found_pixels = np.array(np.where(imgs == id)).swapaxes(1, 0)
        unique_frames  = np.unique(found_pixels[:, 0])
        
        # FIXME: this is not correct, a frame can have multiple anomalies and they will be double counted
        n_anomaly_frames += unique_frames.shape[0]
        n_clear_frames += imgs.shape[0] - n_anomaly_frames   
        
        # count the number of anomalies in each pixel for all frames
        unique, counts = np.unique(found_pixels[:, 1:3], axis=0, return_counts=True)
        result_heatmpap[(unique[:, 0], unique[:, 1])] =+ counts

    class_counts = dict()
    for label in Definitions.LABELS:
        label_found = np.where(imgs == label.id)[0].shape[0]
        class_counts[label.name] = label_found
    # print(class_counts)
    return result_heatmpap, class_counts, n_anomaly_frames, n_clear_frames

def compute_heatmap(scenario, anomaly_ids):
    sensors = scenario.sem_sensors
    frontal_sensor_name = [sensor for sensor in sensors if FRONTAL_CAMERA in sensor][0]
    frontal_sensor_path = os.path.join(scenario.path, frontal_sensor_name)

    # Compute heatmap for frontal sensor
    sem_imgs = load_sematnic_img_from_path(frontal_sensor_path)
    imgs = sem_imgs[:, :, :, 0]
    frontal_heatmpap = np.zeros((imgs.shape[1], imgs.shape[2]))
    for id in anomaly_ids:
        found_pixels = np.array(np.where(imgs == id)).swapaxes(1, 0)
        unique, counts = np.unique(found_pixels[:, 1:3], axis=0, return_counts=True)
        frontal_heatmpap[(unique[:, 0], unique[:, 1])] =+ counts
    
    # Compute heatmap for all sesnors
    if len(sensors) > 1:
        all_heatmap = np.zeros((imgs.shape[1], imgs.shape[2]))
        for sensor in sensors:
            if sensor != frontal_sensor_name:
                sem_imgs = load_sematnic_img_from_path(os.path.join(scenario.path, sensor))
                imgs = sem_imgs[:, :, :, 0]
                for id in anomaly_ids:
                    found_pixels = np.array(np.where(imgs == id)).swapaxes(1, 0)
                    unique, counts = np.unique(found_pixels[:, 1:3], axis=0, return_counts=True)
                    all_heatmap[(unique[:, 0], unique[:, 1])] =+ counts
    else:
        all_heatmap = frontal_heatmpap

    return frontal_heatmpap, all_heatmap

def compute_class_annotations(scenario):
    sensors = scenario.sem_sensors
    frontal_sensor_name = [sensor for sensor in sensors if FRONTAL_CAMERA in sensor][0]
    frontal_sensor_path = os.path.join(scenario.path, frontal_sensor_name)
    sem_imgs = load_sematnic_img_from_path(frontal_sensor_path)

    imgs = sem_imgs[:, :, :, 0]
    class_counts = dict()
    for label in Definitions.LABELS:
        label_found = np.where(imgs == label.id)[0].shape[0]
        class_counts[label.name] = label_found
    return class_counts

def compute_frame_count(scenario, anomaly_ids):
    sensors = scenario.sem_sensors
    frontal_sensor_name = [sensor for sensor in sensors if FRONTAL_CAMERA in sensor][0]
    frontal_sensor_path = os.path.join(scenario.path, frontal_sensor_name)
    n_clear_frames = 0
    n_anomaly_frames = 0

    if len(sensors) > 1:
        all_unique_frames = []
        for sensor in sensors:
            sem_imgs = load_sematnic_img_from_path(os.path.join(scenario.path, sensor))
            imgs = sem_imgs[:, :, :, 0]
            for id in anomaly_ids:
                found_pixels = np.array(np.where(imgs == id)).swapaxes(1, 0)
                unique_frames  = np.unique(found_pixels[:, 0])
                all_unique_frames.extend(unique_frames)
        all_unique_frames = np.unique(all_unique_frames)

        n_anomaly_frames = all_unique_frames.shape[0]
        n_clear_frames = imgs.shape[0] - n_anomaly_frames
    else:
        sem_imgs = load_sematnic_img_from_path(frontal_sensor_path)
        imgs = sem_imgs[:, :, :, 0]
        for id in anomaly_ids:
            found_pixels = np.array(np.where(imgs == id)).swapaxes(1, 0)
            unique_frames  = np.unique(found_pixels[:, 0])

            n_anomaly_frames += unique_frames.shape[0]
            n_clear_frames += imgs.shape[0] - n_anomaly_frames

    return n_anomaly_frames, n_clear_frames

def compute_lidar_anomaly_frames(scenario, anomaly_ids):
    lidar_sensors = scenario.lidar_sensors
    frontal_lidar_name = [sensor for sensor in scenario.lidar_sensors if FRONTAL_LIDAR in sensor][0]
    frontal_lidar_path = os.path.join(scenario.path, frontal_lidar_name)

    n_clear_frames = 0
    n_anomaly_frames = 0
    
    sensor_npys = [npy_path_to_array(os.path.join(scenario.path, npy)) for npy in lidar_sensors]
    for i in range(len(sensor_npys[0])):
        is_anomaly_frame = False
        for npy in sensor_npys:
            for id in anomaly_ids:
                if id in npy[i][:, 3]:
                    is_anomaly_frame = True
                    break
            if is_anomaly_frame:
                break
        if is_anomaly_frame:
            n_anomaly_frames += 1
        else:
            n_clear_frames += 1

    return n_anomaly_frames, n_clear_frames

In [28]:
# Run concurrent functions


def run_concurrent_heatmap(scenarios, anomalies):
    final_frontal_heatmap = np.zeros(IMG_SHAPE, dtype=np.float64)
    final_all_heatmap = np.zeros(IMG_SHAPE, dtype=np.float64)

    with concurrent.futures.ProcessPoolExecutor() as executor:
        futures = {executor.submit(compute_heatmap, scenario, anomalies) for scenario in scenarios}
        with tqdm(total=len(scenarios)) as progress:
            for future in concurrent.futures.as_completed(futures):
                frontal_pxl, all_pxl = future.result()
                final_frontal_heatmap += frontal_pxl
                final_all_heatmap += all_pxl
                progress.update()
    return final_frontal_heatmap, final_all_heatmap

def run_concurrent_class_annotations(scenarios):
    class_annotations = dict()

    with concurrent.futures.ProcessPoolExecutor() as executor:
        futures = {executor.submit(compute_class_annotations, scenario) for scenario in scenarios}
        with tqdm(total=len(scenarios)) as progress:
            for future in concurrent.futures.as_completed(futures):
                class_counts = future.result()
                for key, value in class_counts.items():
                    if key in class_annotations:
                        class_annotations[key] += value
                    else:
                        class_annotations[key] = value
                progress.update()
    return class_annotations

def run_concurrent_frame_count(scenarios, anomalies):
    total_clear_frames = 0
    total_anom_frames = 0

    with concurrent.futures.ProcessPoolExecutor() as executor:
        futures = {executor.submit(compute_frame_count, scenario, anomalies) for scenario in scenarios}
        with tqdm(total=len(scenarios)) as progress:
            for future in concurrent.futures.as_completed(futures):
                n_anom_frames, n_clear_frames = future.result()
                total_clear_frames += n_clear_frames
                total_anom_frames += n_anom_frames
                progress.update()
    return total_clear_frames, total_anom_frames

def run_concurrent_lidar_frame_count(scenarios, anomalies):
    total_clear_frames = 0
    total_anom_frames = 0

    with concurrent.futures.ProcessPoolExecutor() as executor:
        futures = {executor.submit(compute_lidar_anomaly_frames, scenario, anomalies) for scenario in scenarios}
        with tqdm(total=len(scenarios)) as progress:
            for future in concurrent.futures.as_completed(futures):
                n_anom_frames, n_clear_frames = future.result()
                total_clear_frames += n_clear_frames
                total_anom_frames += n_anom_frames
                progress.update()
    return total_clear_frames, total_anom_frames

In [ ]:
static_types = ["home", "nature", "special", "falling", "airplane", "animal"]
dynamic_types = ["anomaly"]

static_anomalie_ids = np.array([ label.id for label in Definitions.LABELS if label.name in static_types])
dynamic_anomalie_ids = np.array([ label.id for label in Definitions.LABELS if label.name in dynamic_types])

In [ ]:
# Recursively traverse all directories in INPUT_ALL and parse all scenarios
def get_all_config_directories(path):
    config_dirs = []
    for root, dirs, files in os.walk(path):
        for dir in dirs:
            if dir.startswith("Scenario_Configuration"):
                config_dirs.append(os.path.join(root, dir))
    return config_dirs
    

In [ ]:
scenarios_1 = []

# make a list of all dirs in INPUT.0.path directory
input_1_dirs = get_all_config_directories(INPUT[0]["path"])
print(input_1_dirs)
scenarios_1 = [parse_scenarios_with_dir(d) for d in input_1_dirs]
scenarios_1 = [item for sublist in scenarios_1 for item in sublist]

print(len(scenarios_1))

scnearios_2 = []

input_2_dirs = get_all_config_directories(INPUT[1]["path"])
scnearios_2 = [parse_scenarios_with_dir(d) for d in input_2_dirs]
scnearios_2 = [item for sublist in scnearios_2 for item in sublist]

print(len(scnearios_2))



In [ ]:
# scenarios_all = []
# recursive search for all scenario configurations in INPUT_ALL
# scenarios_config_paths = get_all_config_directories(INPUT_ALL)
# scenarios_all = [parse_scenarios_with_dir(d) for d in scenarios_config_paths]
# scenarios_all = [item for sublist in scenarios_all for item in sublist]

# print(len(scenarios_all))

In [ ]:
scenarios = scenarios_1 + scnearios_2 

print(len(scenarios))

In [ ]:
# [print(scenario) for scenario in scenarios if len(scenario.sem_sensors) > 1] 
# print(len(scnearios))

In [ ]:
# check if some scenarios are not included in scenario configuration
for scenario in scenarios:
    # go to the path where scenario is located
    
    # give the path to the folder that contains the scenario
    scenario_before_path = os.path.dirname(scenario.path)

    list_of_scenarios = os.listdir(scenario_before_path)
    for tmp_scenario_path in list_of_scenarios:
        tmp_scenario = tmp_scenario_path.split("_")[-1]
        # print(tmp_scenario)
        if tmp_scenario not in [s.id for s in scenarios] and "Files" not in tmp_scenario:
            print(f"Scenario {tmp_scenario} is not included in the scenario configuration {scenario_before_path}/{tmp_scenario_path}")

In [ ]:
# Filter for static scenarios
scenarios_static = [s for s in scenarios if s.type == "STATIC"]

# Filter for dynamic scenarios
scenarios_dynamic = [s for s in scenarios if s.type != "STATIC" and s.type != "NORMALITY"]

scenario_normality = [s for s in scenarios if s.type == "NORMALITY"]

print(f"Static scenarios: {len(scenarios_static)}, Dynamic scenarios: {len(scenarios_dynamic)}, Normality scenarios: {len(scenario_normality)}")

In [ ]:
# Checking if scenario paths are correct
img_shapes = [(print(scenario.path), get_img_shape(scenario.path)) for scenario in scenarios]


In [ ]:
print("Computing the heatmap...")
static_frontal_heatmap, static_all_heatmap = run_concurrent_heatmap(scenarios_static, static_anomalie_ids)

In [ ]:
print("Computing class annotations...")
static_class_annotations = run_concurrent_class_annotations(scenarios_static)

In [ ]:

print("Computing anomaly frame count...")
static_clear_frames, static_anom_frames = run_concurrent_frame_count(scenarios_static, static_anomalie_ids)

In [ ]:
print(len(scenarios_static))
static_lidar_clear_frames, static_lidar_anom_frames = run_concurrent_lidar_frame_count(scenarios_static, static_anomalie_ids)
print(static_lidar_clear_frames, static_lidar_anom_frames)

In [ ]:
print("Computing the heatmap...")
dynamic_frontal_heatmap, dynamic_all_heatmap = run_concurrent_heatmap(scenarios_dynamic, dynamic_anomalie_ids)

In [ ]:
print("Computing class annotations...")
dynamic_class_annotations = run_concurrent_class_annotations(scenarios_dynamic)

In [ ]:
print("Computing anomaly frame count...")
dynamic_clear_frames, dynamic_anom_frames = run_concurrent_frame_count(scenarios_dynamic, dynamic_anomalie_ids)

In [ ]:
print(len(scenarios_dynamic))
dynamic_lidar_clear_frames, dynamic_lidar_anom_frames = run_concurrent_lidar_frame_count(scenarios_dynamic, dynamic_anomalie_ids)
print(dynamic_lidar_clear_frames, dynamic_lidar_anom_frames)

In [ ]:
# Combining the static and dynamic class annotations
combined_class_annotations = {key: static_class_annotations.get(key, 0) + dynamic_class_annotations.get(key, 0) for key in set(static_class_annotations) | set(dynamic_class_annotations)}

In [ ]:
static_anomaly_names = [scenario.anomaly_name for scenario in scenarios_static]

import pickle
# picle list and save
with open("static_anomaly_names.pkl", "wb") as f:
    pickle.dump(static_anomaly_names, f)

In [ ]:
# pickle everything for later use
import pickle
data = {
    "scearios": scenarios,
    "static_frontal_heatmap": static_frontal_heatmap,
    "static_all_heatmap": static_all_heatmap,
    "static_class_annotations": static_class_annotations,
    "static_clear_frames": static_clear_frames,
    "static_anom_frames": static_anom_frames,
    "static_lidar_clear_frames": static_lidar_clear_frames,
    "static_lidar_anom_frames": static_lidar_anom_frames,

    "dynamic_frontal_heatmap": dynamic_frontal_heatmap,
    "dynamic_all_heatmap": dynamic_all_heatmap,
    "dynamic_class_annotations": dynamic_class_annotations,
    "dynamic_clear_frames": dynamic_clear_frames,
    "dynamic_anom_frames": dynamic_anom_frames,
    "dynamic_lidar_clear_frames": dynamic_lidar_clear_frames,
    "dynamic_lidar_anom_frames": dynamic_lidar_anom_frames,

    "combined_class_annotations": combined_class_annotations
}

with open("all_data.pkl", "wb") as f:
    pickle.dump(data, f)


In [ ]:
LOAD = False
import pickle

if LOAD:
    with open("all_data.pkl", "rb") as f:
        data = pickle.load(f)
    scenarios = data["scearios"]
    static_all_heatmap = data["static_all_heatmap"]
    static_frontal_heatmap = data["static_frontal_heatmap"]
    static_class_annotations = data["static_class_annotations"]
    static_clear_frames = data["static_clear_frames"]
    static_anom_frames = data["static_anom_frames"]
    static_lidar_clear_frames = data["static_lidar_clear_frames"]
    static_lidar_anom_frames = data["static_lidar_anom_frames"]

    dynamic_all_heatmap = data["dynamic_all_heatmap"]
    dynamic_frontal_heatmap = data["dynamic_frontal_heatmap"]
    dynamic_class_annotations = data["dynamic_class_annotations"]
    dynamic_clear_frames = data["dynamic_clear_frames"]
    dynamic_anom_frames = data["dynamic_anom_frames"]
    dynamic_lidar_clear_frames = data["dynamic_lidar_clear_frames"]
    dynamic_lidar_anom_frames = data["dynamic_lidar_anom_frames"]

    combined_class_annotations = data["combined_class_annotations"]


In [ ]:
len(scenarios_dynamic)

In [ ]:
# Make static heatmap figures
static_frontal_heatmap_fig = px.imshow(static_frontal_heatmap, template="plotly_white")
static_all_heatmap_fig = px.imshow(static_all_heatmap, template="plotly_white")


static_all_heatmap_fig.update_layout(showlegend=False)

# remove both axis
static_all_heatmap_fig.update_xaxes(showticklabels=False)
static_all_heatmap_fig.update_yaxes(showticklabels=False)

# remove colorbar
static_all_heatmap_fig.update_layout(coloraxis_showscale=False)

static_frontal_heatmap_fig.show()
static_all_heatmap_fig.show()

static_frontal_heatmap_fig.write_image("Plots/static_frontal_heatmap.png")
static_all_heatmap_fig.write_image("Plots/static_all_heatmap.svg")


In [44]:
# convert total number of anomaly and clear frames to a dataframe
# df_static = pd.DataFrame({"Frames": ["Anomaly", "Clear"], "Count": [static_anom_frames, static_clear_frames]})
fig = go.Figure(data=[go.Pie(
    labels=["Anomaly", "Clear"], 
    values=[static_anom_frames, static_clear_frames],
    pull=[0.1, 0.0],
    hole=0.5,
    marker=dict(colors=["#CC6677", "#44AA99"]))]
)

title = "Distribution of frames with and without anomalies in static scenarios"
fig.update_traces(textinfo='percent+label', texttemplate='<b>%{label}</br></br>%{percent}</b>')
# make text on the plot white and bold
fig.update_traces(textfont_color="white")
fig.update_traces(textfont_size=13)

fig.update_layout(title=title, showlegend=False)
fig.update_layout(width=1000, height=600)
fig.update_layout(template="plotly_white")

static_visibile_anomaly_fig = fig
static_visibile_anomaly_fig.write_image("Plots/static_visability_distr.svg")
static_visibile_anomaly_fig.show()

In [45]:
# Make static lidar visible anomaly frames distribution
#  df_static_lidar = pd.DataFrame({"Frames": ["Anomaly", "Clear"], "Count": [static_lidar_anom_frames, static_lidar_clear_frames]})
#  px.pie(df_static_lidar, names="Frames", values="Count").show()

fig = go.Figure(data=[go.Pie(
    labels=["Anomaly", "Clear"], 
    values=[static_lidar_anom_frames, static_lidar_clear_frames],
    pull=[0.1, 0.0],
    hole=0.5,
    marker=dict(colors=["#CC6677", "#44AA99"]))]
)

fig.update_traces(textinfo='percent+label', texttemplate='<b>%{label}</br></br>%{percent}</b>')
fig.update_traces(textfont_color="white")
fig.update_traces(textfont_size=13)

title = "Distribution of visible anomaly frames from lidar in static scenarios"
fig.update_layout(title=title)
fig.update_layout(width=1000, height=600)
fig.update_layout(title=title, showlegend=False)
fig.update_layout(template="plotly_white")
static_lidar_visibile_anomaly_fig = fig
static_lidar_visibile_anomaly_fig.write_image("Plots/static_lidar_visability_distr.svg")
static_lidar_visibile_anomaly_fig.show()

In [None]:
# Make dynamic figures
# px.imshow(dynamic_frontal_heatmap).show()
# make the heatmap colorscale log scale

dynamic_frontal_heatmap_fig = px.imshow(dynamic_frontal_heatmap, template="plotly_white")
dynamic_all_heatmap_fig = px.imshow(dynamic_all_heatmap, template="plotly_white")

dynamic_frontal_heatmap_fig.write_image("Plots/dynamic_frontal_heatmap.svg")
dynamic_all_heatmap_fig.write_image("Plots/dynamic_all_heatmap.svg")

dynamic_frontal_heatmap_fig.show()
dynamic_all_heatmap_fig.show()


In [None]:
fig = go.Figure(data=[go.Pie(
    labels=["Anomaly", "Clear"], 
    values=[dynamic_anom_frames, dynamic_clear_frames],
    pull=[0.1, 0.0],
    hole=0.5,
    marker=dict(colors=["#CC6677", "#44AA99"]))]
)


fig.update_traces(textinfo='percent+label', texttemplate='<b>%{label}</br></br>%{percent}</b>')
fig.update_traces(textfont_color="white")
fig.update_traces(textfont_size=13)

title = "Distribution of frames with and without anomalies in dynamic scenarios"
fig.update_layout(title=title, showlegend=False)
fig.update_layout(width=1000, height=600)
fig.update_layout(template="plotly_white")

dynamic_visibile_anomaly_fig = fig
dynamic_visibile_anomaly_fig.write_image("Plots/dynamic_visability_distr.svg")
dynamic_visibile_anomaly_fig.show()

In [None]:
fig = go.Figure(data=[go.Pie(
    labels=["Anomaly", "Clear"], 
    values=[dynamic_lidar_anom_frames, dynamic_lidar_clear_frames],
    pull=[0.1, 0.0],
    hole=0.5,
    marker=dict(colors=["#CC6677", "#44AA99"]))]
)


fig.update_traces(textinfo='percent+label')
fig.update_traces(texttemplate='<b>%{label}</br></br>%{percent}</b>')
fig.update_traces(textfont_color="white")
fig.update_traces(textfont_size=13)

title = "Distribution of visible anomaly frames from lidar in dynamic scenarios"
fig.update_layout(title=title, showlegend=False)
fig.update_layout(width=1000, height=600)
fig.update_layout(template="plotly_white")

dynamic_lidar_visibile_anomaly_fig = fig
dynamic_lidar_visibile_anomaly_fig.write_image("Plots/dynamic_lidar_visability_distr.svg")
dynamic_lidar_visibile_anomaly_fig.show()

In [None]:
# install kaleido for exporting plotly figures to pdf

In [None]:
import kaleido

sorted_combined_annotations = dict(sorted(combined_class_annotations.items(), key=lambda item: item[1], reverse=True)) 
# remove empty keys
sorted_combined_annotations = {k: v for k, v in sorted_combined_annotations.items() if v != 0 and k != "unlabeled"}

keys = list(sorted_combined_annotations.keys())
print(len(keys))
values = list(sorted_combined_annotations.values())
all_anomaly_types = static_types + dynamic_types
print(all_anomaly_types)
# make a histogram for the class_annotations
print("train" in keys)
anomaly_names = [label for label in keys if label in all_anomaly_types]
clear_names = [label for label in keys if label not in all_anomaly_types]
print(keys)
color_discrete_map={key: "#CC6677" for key in anomaly_names}
color_discrete_map.update({key: "#44AA99" for key in clear_names})

fig = px.histogram(x=keys, y=values, color = keys, color_discrete_map=color_discrete_map, labels={"x": "Class", "y": "Number of pixels"})
fig.update_yaxes(type="log")

# remove legend from the plot
fig.update_layout(showlegend=False)

fig.update_layout(template="plotly_white")
# remove horiyontal grid lines
fig.update_yaxes(showgrid=False)
# remove sum of in y axis
fig.update_layout(yaxis_title="Number of pixels")
# make entire background white
fig.update_layout(plot_bgcolor="white")

# make it so it does not compress x axis labels
fig.update_layout(xaxis = dict(tickmode = 'array', tickvals = keys, ticktext = keys))

# make the plot wider
fig.update_layout(width=1000, height=600)

fig.show() 

#saving the plot as pdf 
fig.write_image("Plots/class_annotations.png")

In [46]:
# Calculate the distribution of towns and weather presets
towns = np.array([s.town for s in scenarios])
weather_presets = np.array([s.weather_preset for s in scenarios])

unique_towns, town_counts = np.unique(towns, return_counts=True)
unique_weather_presets, weather_counts = np.unique(weather_presets, return_counts=True)

df_towns = pd.DataFrame({"Towns": unique_towns, "Count": town_counts})
px.pie(df_towns, names="Towns", values="Count").show()

df_weather = pd.DataFrame({"Weather": unique_weather_presets, "Count": weather_counts})
px.pie(df_weather, names="Weather", values="Count").show()


In [None]:
# for scenario in scenarios:
#     anomaly_pxl, class_counts = get_scenario_heatmap(scenario)
#     final_heatmap += anomaly_pxl
#     for key, value in class_counts.items():
#         if key in class_annotations:
#             class_annotations[key] += value
#         else:
#             class_annotations[key] = value