In [2]:
import sys
import os
import time
from easydict import EasyDict as edict

import cv2
import torch
import numpy as np
import plotly.graph_objects as go
import matplotlib.pyplot as plt

sys.path.append('../')

import config.astyx_config as cnf
from models.model_utils import create_model
from evaluate import evaluate_mAP

sys.path.append('./')

from data_process_astyx.astyx_dataloader import create_val_dataloader


In [None]:
root = #### your Path

configs = edict({"arch": "darknet",
                 "cfgfile": "./config/cfg/complex_yolov4.cfg",
                 "use_giou_loss": False,
                 "device": "cpu",
                 "dataset_dir": f"{root}/path/to/data",
                 "num_samples": 100,
                 "dist_prop_lidar": 1,
                 "dist_prop_radar": 1,
                 "disturb_types_training_lidar" : [None],
                 "disturb_levels_training_lidar": [0],
                 "disturb_types_training_radar" : [None],
                 "disturb_levels_training_radar": [0],
                 "set_seed": 1000,
                 "distributed": False,
                 "batch_size": 1,
                 "pin_memory": True,
                 "num_workers": 1,
                 "img_size": 608,
                 "conf_thresh": 0.5,
                 "nms_thresh": 0.5,
                 "iou_thresh": 0.5,
                 "pretrained_path": f"{root}/path/to/model"
                })

In [None]:
def create_config(configs, lidar_disturb, radar_disturb, seed):
    """
    Creates a new configuration based on the original configuration.
    
    Parameters:
    - configs: edict
        The original configuration to be copied.
    - lidar_disturb: str or None
        The type of disturbance for the LiDAR sensor.
    - radar_disturb: str or None
        The type of disturbance for the RADAR sensor.
    - seed: int
        The random seed to be used for the new configuration.
    
    Returns:
    - config_copy: edict
        The created configuration with specified disturbance types and the set seed.
    """
    config_copy = edict(configs.copy())
    config_copy.disturb_types_training_lidar = [lidar_disturb] if lidar_disturb else ['None']
    config_copy.disturb_types_training_radar = [radar_disturb] if radar_disturb else ['None']
    config_copy.set_seed = seed
    config_copy.subdivisions = int(64 / config_copy.batch_size)
    return config_copy

def evaluate_disturbance(config, level, model):
    """
    Evaluates the model's performance under the specified disturbance conditions.
    
    Parameters:
    - config: edict
        The configuration containing the current disturbance conditions.
    - level: float
        The level of disturbance to be applied during evaluation.
    - model: Model
        The model being evaluated.
    
    Returns:
    - precision[0]: float
        The precision of the model on the first class.
    - recall[0]: float
        The recall of the model on the first class.
    - AP[0]: float
        The average precision (AP) of the model on the first class.
    - f1[0]: float
        The F1 score of the model on the first class.
    """
    if config.disturb_types_training_lidar != ['None']:
        config.disturb_levels_training_lidar = [level]
    if config.disturb_types_training_radar != ['None']:
        config.disturb_levels_training_radar = [level]
    val_dataloader = create_val_dataloader(config)
    precision, recall, AP, f1, ap_class = evaluate_mAP(val_dataloader, model, config, None)
    return precision[0], recall[0], AP[0], f1[0]

def run_evaluations(config, model, levels):
    """
    Iterates through different disturbance levels and evaluates model performance for each.
    
    Parameters:
    - config: edict
        The configuration containing the current disturbance conditions.
    - model: Model
        The model being evaluated.
    - levels: list of floats
        The different levels of disturbance to be tested.
    
    Returns:
    - eval_values: np.ndarray
        An array of evaluation results across the different disturbance levels.
    """
    eval_values = np.zeros(len(levels))
    for level_id, level in enumerate(levels):
        eval_values[level_id] = evaluate_disturbance(config, level, model)
    return eval_values

def plot_single(ax, data, title, disturbance_types, levels):
    """
    Creates a single subplot visualizing the model evaluation results.
    
    Parameters:
    - ax: matplotlib.axes.Axes
        The axis on which the subplot will be drawn.
    - data: np.ndarray
        The data to be plotted.
    - title: str
        The title of the subplot.
    - disturbance_types: list of str
        The types of disturbances to be displayed on the y-axis.
    - levels: list of int
        The levels of disturbances to be displayed on the x-axis.
    
    Returns:
    - None
    """
    ax.set_yticks(range(len(disturbance_types)), disturbance_types)
    ax.set_xticks(range(len(levels)), levels)
    cbar = plt.colorbar(ax.imshow(data[:, 1:], vmin=0, vmax=1), ax=ax, orientation='horizontal', location='bottom')
    cbar.set_label(f'AP')
    ax.set_title(title)
    ax.set_xlabel("Level")
    ax.set_ylabel("Type of distortion")

def plot_results(eval_values_lidar, eval_values_radar, eval_values_both, disturbance_types, levels=[1, 2, 3, 4]):
    """
    Creates the overall plot with the model evaluations under various disturbance conditions.
    
    Parameters:
    - eval_values_lidar: np.ndarray
        The evaluation results for disturbances on the LiDAR sensor.
    - eval_values_radar: np.ndarray
        The evaluation results for disturbances on the RADAR sensor.
    - eval_values_both: np.ndarray
        The evaluation results for disturbances on both sensors.
    - disturbance_types: list of str
        The types of disturbances to be displayed on the y-axes.
    - levels: list of int, optional
        The levels of disturbances to be displayed on the x-axes.
    
    Returns:
    - fig: matplotlib.figure.Figure
        The created Matplotlib figure.
    - ax: numpy.ndarray of matplotlib.axes.Axes
        The axes of the subplots in the figure.
    """
    fig, ax = plt.subplots(2, 2, figsize=(20, 20), frameon=False)

    plot_single(ax[0, 0], eval_values_lidar, "Evaluated with interference on the LiDAR sensor", disturbance_types, levels)
    plot_single(ax[0, 1], eval_values_radar, "Evaluated with interference on the RADAR sensor", disturbance_types, levels)
    plot_single(ax[1, 1], eval_values_both, "Evaluated with interference on both sensors", disturbance_types, levels)
    
    ax[1, 0].axis("off")
    return fig, ax

def sensitivity_analysis(configs, model, disturbances, levels):
    """
    Performs a sensitivity analysis by evaluating the model on various disturbances and levels.
    
    Parameters:
    - configs: edict
        The original configuration from which copies will be made for the analysis.
    - model: Model
        The model being evaluated.
    - disturbances: list of str
        The types of disturbances to be investigated.
    - levels: list of floats
        The different levels of disturbance to be tested.
    
    Returns:
    - fig: matplotlib.figure.Figure
        The created Matplotlib figure with the results.
    - ax: numpy.ndarray of matplotlib.axes.Axes
        The axes of the subplots in the figure.
    - eval_values_lidar: np.ndarray
        The evaluation results for LiDAR disturbances.
    - eval_values_radar: np.ndarray
        The evaluation results for RADAR disturbances.
    - eval_values_both: np.ndarray
        The evaluation results for disturbances on both sensors.
    """
    seed = np.random.randint(10000)
    
    eval_values_lidar = np.zeros((len(disturbances), len(levels)))
    eval_values_radar = np.zeros((len(disturbances), len(levels)))
    eval_values_both = np.zeros((len(disturbances), len(levels)))

    for disturb_id, disturb in enumerate(disturbances):
        lidar_config = create_config(configs, disturb, None, seed)
        radar_config = create_config(configs, None, disturb, seed)
        both_config = create_config(configs, disturb, disturb, seed)
        
        eval_values_lidar[disturb_id] = run_evaluations(lidar_config, model, levels)
        eval_values_radar[disturb_id] = run_evaluations(radar_config, model, levels)
        eval_values_both[disturb_id] = run_evaluations(both_config, model, levels)

    fig, ax = plot_results(eval_values_lidar, eval_values_radar, eval_values_both, disturbances, levels)
    
    return fig, ax, eval_values_lidar, eval_values_radar, eval_values_both


# Low Level Fusion

In [None]:
configs.low_fusion = True
configs.high_fusion = False
configs.lidar = False
configs.radar = False
configs.VR = False

disturbances = ['random_noise', 'random_loss', 'random_shift', 'blobs', 'intensity']
levels = [[0.], [0.25], [0.5], [0.75], [1.]]

In [None]:
model = create_model(configs)
model.load_state_dict(torch.load(
                                 configs.pretrained_path,
                                 map_location=torch.device(configs.device)))
model = model.to(device=configs.device)
model = model.eval()

sensitivity_analysis(configs, model)