In [None]:
import os
import numpy as np
import matplotlib.colors as mcolors
import matplotlib.pyplot as plt

SMALL_SIZE = 12
MEDIUM_SIZE = 14
BIGGER_SIZE = 16


plt.rc("font", size=SMALL_SIZE)  # controls default text sizes
plt.rc("axes", titlesize=BIGGER_SIZE)  # fontsize of the axes title
plt.rc("axes", labelsize=BIGGER_SIZE)  # fontsize of the x and y labels
plt.rc("xtick", labelsize=SMALL_SIZE)  # fontsize of the tick labels
plt.rc("ytick", labelsize=SMALL_SIZE)  # fontsize of the tick labels
plt.rc("legend", fontsize=SMALL_SIZE)  # legend fontsize
plt.rc("figure", titlesize=BIGGER_SIZE)  # fontsize of the figure title
plt.rc("lines", linewidth=4)
plt.rc("grid", linestyle="--", color="black", alpha=0.5)
plt.rcParams["font.family"] = "serif"

colors_agents = {
    0: "#f9f06b",
    1: "#cdaB8f",
    2: "#dc8add",
    3: "#99c1f1",
}

colors_tracks = list(mcolors.TABLEAU_COLORS.keys())

fig_out_dir = os.path.join("figures", "case_study_trusts")
os.makedirs(fig_out_dir, exist_ok=True)

In [None]:
def get_quad_trust_axes():
    # Initialize figure and axes and save to class
    fig, axs = plt.subplots(2, 2, figsize=(10, 6))
    for i, txt in enumerate(["Agent", "Track"]):
        # -- attributes for bar plot
        axs[i, 0].set_title(f"{txt} Trust Mean")
        axs[i, 0].set_xlim([0, 1])
        axs[i, 0].set_xlabel("Mean Trust Value")
        axs[i, 0].set_ylabel("Identifier")
        axs[i, 0].xaxis.grid()
    
        # -- attributes for distribution plot
        axs[i, 1].set_title(f"{txt} Trust Distributions")
        axs[i, 1].set_xlim([0, 1])
        axs[i, 1].set_ylim([0, 10])
        axs[i, 1].set_xlabel("Trust Value")
        axs[i, 1].set_ylabel("PDF")
        axs[i, 1].grid()
    return axs


def plot_trust(axs, trust_agents, trust_tracks):
    x_trust = np.linspace(0, 1, 1000)
    for i, trusts in enumerate([trust_agents, trust_tracks]):
        yticks = []
        ytick_labels = []
        for j, (j_ID, trust) in enumerate(trusts.items()):
            color = colors_agents[j] if i == 0 else colors_tracks[j % len(colors_tracks)]
            label = f"Agent {j_ID}" if i == 0 else f"Track {j}"
            yticks.append(j)
            ytick_labels.append(label)
            # -- update bars
            y = j
            w = trust.mean
            height = 0.6
            axs[i, 0].barh(
                y, w, height=height, left=0.0, color=color, label=label
            )
        
            # -- update distributions
            pdfs = trust.pdf(x_trust)
            axs[i, 1].plot(
                    x_trust,
                    pdfs,
                    color=color,
                    label=label,
                )
            
        # set the ticks for the identifiers
        axs[i, 0].set_yticks(yticks)
        axs[i, 0].set_yticklabels(ytick_labels)            

In [None]:
fig_out_dir

### Make plots for case 0

In [None]:
from avstack_rosbag import DatasetPostprocessor


def load_postprocs(case_idx):
    case_idx_to_scene = {0: 0, 1: 1, 2: 1}
    postproc_benign = DatasetPostprocessor(
        bag_folder="/data/shared/CARLA/rosbags/baseline_intersection/",
        bag_name=f"baseline_intersection_{case_idx_to_scene[case_idx]}",
    )
    print(f"Loaded postprocessor of length {len(postproc_benign)}")
    
    postproc_attack = DatasetPostprocessor(
        bag_folder="/data/shared/CARLA/rosbags/cte_case/",
        bag_name=f"cte_case_{case_idx}_attacked",
    )
    print(f"Loaded postprocessor of length {len(postproc_attack)}")

    return postproc_benign, postproc_attack

In [None]:
from avtrust_bridge.bridge import TrustBridge


def plot_trust_distributions_over_frames(postproc_benign, postproc_attack, idx_frames, case_idx):
    loop_tuples = [
        ("benign", postproc_benign),
        ("attacked", postproc_attack),
    ]

    case_dir = os.path.join(fig_out_dir, f"case{case_idx}")
    os.makedirs(case_dir, exist_ok=True)
    
    for label, postproc in loop_tuples:
        # load trust messages
        trust_agents_msgs = postproc._get_messages_by_time("/trust/trust_agents")
        trust_agents_data = {k: TrustBridge.trust_array_ros_to_avstack(v) for k, v in trust_agents_msgs["data"].items()}
        trust_tracks_msgs = postproc._get_messages_by_time("/trust/trust_tracks")
        trust_tracks_data = {k: TrustBridge.trust_array_ros_to_avstack(v) for k, v in trust_tracks_msgs["data"].items()}
        times = list(trust_tracks_data.keys())
        
        # benign initial
        for idx in idx_frames:
            # show the time of plot
            print(f"Making plot at time {times[idx]}")

            # make plot
            axs = get_quad_trust_axes()
            plot_trust(axs, trust_agents_data[times[idx]], trust_tracks_data[times[idx]])
            plt.tight_layout()
            plt.savefig(os.path.join(case_dir, f"trust_plot_{label}_{idx:03d}.pdf"))
            plt.savefig(os.path.join(case_dir, f"trust_plot_{label}_{idx:03d}.png"))
            plt.show()

In [None]:
for i_case in range(3):
    postproc_benign, postproc_attack = load_postprocs(i_case)
    plot_trust_distributions_over_frames(postproc_benign, postproc_attack, [0, 14, 26, 36], i_case)