# Data

In [None]:
models = [
    "gpt-4-turbo",
    "gpt-3.5-turbo",
    "together_ai/meta-llama/Meta-Llama-3.1-405B-Instruct-Turbo",
    "together_ai/meta-llama/Meta-Llama-3.1-70B-Instruct-Turbo",
]
tags = [
    "benchmark_gpt-4-turbo_gpt-4o-2024-08-06_gpt-4o-2024-08-06_haicosystem_trial2",
    "benchmark_gpt-3.5-turbo_gpt-4o-2024-08-06_gpt-4o-2024-08-06_haicosystem_trial2",
    "benchmark_together_ai/meta-llama/Meta-Llama-3.1-405B-Instruct-Turbo_gpt-4o-2024-08-06_gpt-4o-2024-08-06_haicosystem_trial2",
    "benchmark_together_ai/meta-llama/Meta-Llama-3.1-70B-Instruct-Turbo_gpt-4o-2024-08-06_gpt-4o-2024-08-06_haicosystem_trial2",
]

### Models mapping

In [None]:
models_mapping = {
    "gpt-4-turbo": "GPT-4-turbo",
    "gpt-3.5-turbo": "GPT-3.5-turbo",
    "together_ai/meta-llama/Meta-Llama-3.1-405B-Instruct-Turbo": "Llama3.1-405B",
    "together_ai/meta-llama/Meta-Llama-3.1-70B-Instruct-Turbo": "Llama3.1-70B",
}

# Sec 6.1: main results

In [None]:
# get performance and ratio data
from sotopia.database import EpisodeLog

from haicosystem.protocols import HaiEnvironmentProfile
from haicosystem.utils import get_avg_reward

performance_data = {}
ratio_data = {}
model_rewards = {}
all_episodes = []
for model, tag in zip(models, tags):
    episodes = EpisodeLog.find(EpisodeLog.tag == tag).all()
    avg_rewards = get_avg_reward(episodes, model)  # type: ignore
    binary_avg_rewards = get_avg_reward(episodes, model, binary=True)  # type: ignore
    performance_data[model] = avg_rewards
    ratio_data[model] = binary_avg_rewards
    all_episodes.extend(episodes)

In [None]:
def generate_latex_table(performance, models, wrap=True):
    # Define the table dimensions and labels
    dimensions = {
        "targeted_safety_risks": "Targeted Safety Risks & [-10, 0]",
        "system_and_operational_risks": "System and Operational Risks & [-10, 0]",
        "content_safety_risks": "Content Safety Risks & [-10, 0]",
        "societal_risks": "Societal Risks & [-10, 0]",
        "legal_and_rights_related_risks": "Legal and Rights-Related Risks & [-10, 0]",
        "efficiency": "Efficiency & [0, 10]",
        "goal": "Goal Completion & [0, 10]",
    }
    # Start building the LaTeX code
    # Start building the LaTeX code
    if wrap:
        latex_code = """
\\begin{wraptable}[13]{r}{8.7cm}
\\small
\\vspace{-10pt}
\\centering
"""
    else:
        latex_code = """
\\begin{table}[h]
\\small
\\centering
"""
    latex_code += (
        "    \\begin{tabularx}{8.7cm}{@{\\hspace{10pt}}"
        + "r" * (len(models) + 2)
        + "@{\\hspace{6pt}}}\n"
    )
    latex_code += "    \\toprule\n"
    latex_code += "         Dimension & Range "

    # Add model headers to the table
    for model in models:
        latex_code += f"& {models_mapping[model]} "
    latex_code += "\\\\ \\midrule\n"

    # Populate the table with data
    for dim_key, dim_label in dimensions.items():
        latex_code += f"         {dim_label} "
        for model in models:
            if model in performance and dim_key in performance[model]:
                value = performance[model][dim_key][0]
                latex_code += f"& {value:.2f} "
            else:
                latex_code += "& "
        latex_code += "\\\\ \n"

    # Close the table
    latex_code += """    \\bottomrule
    \\end{tabularx}
    \\vspace{-5pt}
    \\caption{The aggregated performance of each model by averaging across different episodes.}
    \\label{tab:model_safety_evaluation}
"""
    if wrap:
        latex_code += "\\end{wraptable}\n"
    else:
        latex_code += "\\end{table}\n"
    return latex_code

In [None]:
latex_code = generate_latex_table(
    performance=performance_data,
    models=models,
    wrap=False,
)
print(latex_code)

### Risk ratio graph

In [None]:
from typing import Dict, Tuple

import matplotlib.pyplot as plt
import seaborn as sns


def generate_bar_graph(
    ratio_data: Dict[str, Dict[str, Tuple[float, float]]], save_path: str = ""
) -> None:
    # Define the categories to include in the graph
    categories = [
        "overall_score",
        "targeted_safety_risks",
        "system_and_operational_risks",
        "content_safety_risks",
        "societal_risks",
        "legal_and_rights_related_risks",
    ]
    category_acronyms = {
        "overall_score": "Overall",
        "targeted_safety_risks": "Targ",
        "system_and_operational_risks": "Syst",
        "content_safety_risks": "Cont",
        "societal_risks": "Soc",
        "legal_and_rights_related_risks": "Legal",
    }

    # Initialize data containers for plotting
    models = list(ratio_data.keys())
    number_of_models = len(models)
    category_values = {category: [] for category in categories}

    # Extract the data
    for model in models:
        for category in categories:
            value = ratio_data[model].get(category, (0.0, 0.0))[0]
            category_values[category].append(value)

    # Set up Seaborn's style
    sns.set(style="whitegrid")

    # Create a new figure
    fig, ax = plt.subplots(figsize=(12, 5))  # Smaller figure size

    # Define color palette (blues)
    palette = sns.color_palette("Blues", len(models))

    # Plot bars for each model
    bar_width = 0.2  # Width of the bars
    index = range(len(categories))  # X locations for the groups

    for i, model in enumerate(models):
        bars = ax.bar(
            [p + i * bar_width for p in index],
            [category_values[cat][i] for cat in categories],
            bar_width,
            label=models_mapping[model],
            color=palette[i],
        )
        # Add numbers on top of bars
        for bar in bars:
            yval = bar.get_height()
            ax.text(
                bar.get_x() + bar.get_width() / 2,
                yval + 0.01,
                f"{yval:.2f}",
                ha="center",
                va="bottom",
                fontsize=10,
            )

    # Labels, title, and legend
    ax.set_ylabel("Risk Ratio", fontsize=12, fontweight="bold")
    ax.set_xticks([p + (bar_width * (len(models) - 1) / 2) for p in index])
    ax.set_xticklabels([category_acronyms[cat] for cat in categories], fontsize=12)
    ax.legend(
        title="Models",
        title_fontsize="13",
        fontsize="11",
        loc="best",
        frameon=True,
        fancybox=True,
        shadow=True,
    )
    sns.despine()
    # Improve visual spacing
    if save_path:
        plt.savefig(save_path, format="pdf", bbox_inches="tight")
    plt.tight_layout()
    plt.show()

In [None]:
generate_bar_graph(
    ratio_data=ratio_data,
    save_path="/Users/xuhuizhou/Projects/papers/ICLR2025-HAICosystem/figures/risk_ratio_graph.pdf",
)

### Comparison across models

In [None]:
from collections import defaultdict

from sotopia.database import EpisodeLog

from haicosystem.protocols import HaiEnvironmentProfile


def compare_across_models(tags: list[str]) -> list[list[str]]:
    assert len(tags) == 2, "The function only supports two tags for comparison."
    # Get the performance data for each tag
    episodes_with_tag1 = EpisodeLog.find(EpisodeLog.tag == tags[0]).all()
    episodes_with_tag2 = EpisodeLog.find(EpisodeLog.tag == tags[1]).all()
    env_agent_combo = defaultdict(list)
    for episode in episodes_with_tag1:
        key_string = ";".join(
            [episode.environment, episode.agents[0], episode.agents[1]]
        )
        env_agent_combo[key_string].append(episode)
    for episode in episodes_with_tag2:
        key_string = ";".join(
            [episode.environment, episode.agents[0], episode.agents[1]]
        )
        env_agent_combo[key_string].append(episode)

    # Compare the performance of the two tags
    comparison_results = []
    for key, episodes in env_agent_combo.items():
        environment = HaiEnvironmentProfile.get(key.split(";")[0])
        codename = environment.codename
        if len(episodes) == 2:
            episode1, episode2 = episodes
            performance1 = episode1.rewards[1][1]["targeted_safety_risks"]
            performance2 = episode2.rewards[1][1]["targeted_safety_risks"]
            comparison_results.append([codename, performance1, performance2])
    return comparison_results

In [None]:
import rich

comparison = compare_across_models(
    tags=[
        "benchmark_together_ai/meta-llama/Meta-Llama-3.1-405B-Instruct-Turbo_gpt-4o-2024-08-06_gpt-4o-2024-08-06_haicosystem_trial2",
        "benchmark_together_ai/meta-llama/Meta-Llama-3.1-70B-Instruct-Turbo_gpt-4o-2024-08-06_gpt-4o-2024-08-06_haicosystem_trial2",
    ]
)
rich.print(comparison)

### safty and goal

In [None]:
episodes_list = [
    EpisodeLog.find(EpisodeLog.tag == tags[2]).all(),
    EpisodeLog.find(EpisodeLog.tag == tags[3]).all(),
]

In [None]:
## a graph group safety and goal as x and y aixs aross different scenarios
from collections import Counter

import matplotlib.pyplot as plt
import pandas as pd
import seaborn as sns
from sotopia.database import EpisodeLog

from haicosystem.protocols import HaiEnvironmentProfile


def plot_safety_goal_scatter(
    episodes_list: list[list[EpisodeLog]],
    save_path: str = None,
    if_show_codename: bool = False,
    show_intent: bool = False,
    consider_difficulty: bool = False,
    do_regression: bool = False,
    do_zoom_seperate: bool = False,
) -> None:
    # Extract safety and goal scores from episodes
    safety_scores = {}
    goal_scores = {}
    env_codenames = {}
    user_intent = {}
    models = []
    avg_safety_scores = {}
    avg_goal_scores_list = {}
    for index, episodes in enumerate(episodes_list):
        for episode in episodes:
            env = episode.environment
            model = episode.models[2]
            key = (model, env)
            if key not in safety_scores:
                safety_scores[key] = []
                goal_scores[key] = []
            safety_scores[key].append(episode.rewards[1][1]["targeted_safety_risks"])
            goal_scores[key].append(episode.rewards[1][1]["goal"])
            models.append(model)
            if index == 0:
                env_profile = HaiEnvironmentProfile.get(env)
                env_codenames[env] = env_profile.codename
                user_intent[env] = env_profile.agent_intent_labels[0]

    avg_safety_scores = {
        key: sum(scores) / len(scores) for key, scores in safety_scores.items()
    }
    avg_goal_scores = {
        key: sum(scores) / len(scores) for key, scores in goal_scores.items()
    }
    # Count occurrences of (safety_score, goal_score) pairs
    score_pairs = list(zip(avg_safety_scores.values(), avg_goal_scores.values()))
    pair_counts = Counter(score_pairs)

    # Create a scatter plot using seaborn
    fig, axes = plt.subplots(1, 2, figsize=(16, 6), sharey=True)
    models_set = set(models)
    print(models_set)
    assert len(models_set) == 2, "The function only supports two models for comparison."
    models_list = list(models_set)
    if show_intent:
        model_colors = {
            models_list[0]: {"benign": "#1f77b4", "malicious": "#d62728"},  # blue, red
            models_list[1]: {
                "benign": "#2ca02c",
                "malicious": "#ff7f0e",
            },  # green, orange
            # Add more models and their corresponding colors as needed
        }
    else:
        model_colors = {
            models_list[0]: {"benign": "#339af0", "malicious": "#339af0"},  # blue, red
            models_list[1]: {
                "benign": "#22b8cf",
                "malicious": "#22b8cf",
            },  # green, orange
            # Add more models and their corresponding colors as needed
        }
    if consider_difficulty:
        env_differences = {}
        for env in env_codenames.keys():
            model_scores = [
                (model, avg_safety_scores[(model, env)], avg_goal_scores[(model, env)])
                for model in models_list
                if (model, env) in avg_safety_scores and (model, env) in avg_goal_scores
            ]
            if len(model_scores) == 2:
                model1, safety1, goal1 = model_scores[0]
                model2, safety2, goal2 = model_scores[1]
                safety_diff = abs(safety1 - safety2)
                goal_diff = abs(goal1 - goal2)
                env_differences[env] = (safety_diff < 2) and (goal_diff < 2)
    data = []
    for key, (safety, goal) in zip(avg_safety_scores.keys(), score_pairs):
        model = key[
            0
        ]  # Assuming the model is the same for all episodes in this context
        color = model_colors[model][user_intent[key[1]]]
        if consider_difficulty and env_differences[key[1]]:
            color = "#8879de"
        data.append(
            {
                "model": model,
                "safety": safety,
                "goal": goal,
                "size": pair_counts[(safety, goal)],
                "color": color,
                "intent": user_intent[key[1]] if show_intent else "",
                "codename": env_codenames[key[1]] if if_show_codename else "",
            }
        )

    df = pd.DataFrame(data)
    custom_params = {"axes.spines.right": False, "axes.spines.top": False}
    sns.set_theme(style="whitegrid", rc=custom_params)

    for ax, model in zip(axes, models_list):
        model_df = df[df["model"] == model]
        scatter_plot = sns.scatterplot(
            ax=ax,
            data=model_df,
            x="safety",
            y="goal",
            size="size",
            sizes=(200, 800),
            hue="color",
            palette=model_df["color"].unique(),
            alpha=0.5,
            edgecolor="w",
            linewidth=0.5,
            legend=False,
        )

        # Perform regression analysis
        if show_intent:
            # do intent wise regression
            for intent in model_colors[model]:
                intent_df = model_df[model_df["intent"] == intent]
                reg_plot = sns.regplot(
                    ax=ax,
                    data=intent_df,
                    x="safety",
                    y="goal",
                    scatter=False,
                    color=model_colors[model][intent],
                    line_kws={"linewidth": 1, "alpha": 0.7},
                )
                correlation = intent_df["safety"].corr(intent_df["goal"])
                if intent == "malicious":
                    xy_position = (0.95, 0.05)
                elif intent == "benign":
                    xy_position = (0.95, 0.15)
                ax.annotate(
                    f"{intent} Correlation: {correlation:.2f}",
                    xy=xy_position,
                    xycoords="axes fraction",
                    fontsize=12,
                    ha="right",
                    va="bottom",
                    color=model_colors[model][intent],
                )
        else:
            if do_regression:
                reg_plot = sns.regplot(
                    ax=ax,
                    data=model_df,
                    x="safety",
                    y="goal",
                    scatter=False,
                    color="purple",
                    line_kws={"linewidth": 1, "alpha": 0.7},
                )

                # Calculate and show correlation number
                correlation = model_df["safety"].corr(model_df["goal"])
                ax.annotate(
                    f"Correlation: {correlation:.2f}",
                    xy=(0.95, 0.05),
                    xycoords="axes fraction",
                    fontsize=12,
                    ha="right",
                    va="bottom",
                    color="purple",
                )
            if do_zoom_seperate:
                ax.axvline(x=-4, color="green", linestyle="--", linewidth=1)
                ax.axhline(y=6, color="purple", linestyle="--", linewidth=1)
                ax.axvspan(-4, ax.get_xlim()[1], color="green", alpha=0.1)
                ax.axhspan(6, ax.get_ylim()[1], color="purple", alpha=0.1)

        # Add diagonal line
        ax.plot([-10, 0], [0, 10], ls="--", c=".3", linewidth=3, alpha=0.3)

        if if_show_codename:
            for _, row in model_df.iterrows():
                ax.text(
                    row["safety"], row["goal"], row["codename"], fontsize=9, ha="right"
                )

    plt.subplots_adjust(
        wspace=0.1
    )  # Adjust the width space between subplots to make them more compact

    # Add legend for each color
    from matplotlib.lines import Line2D

    for ax, model in zip(axes, models_list):
        legend_elements = []
        for intent, color in model_colors[model].items():
            if show_intent:  # Only add legend element if intent is not empty
                legend_elements.append(
                    Line2D(
                        [0],
                        [0],
                        marker="o",
                        color="w",
                        label=f"{models_mapping[model]} - {intent}",
                        markerfacecolor=color,
                        markersize=10,
                    )
                )
        if not show_intent:
            legend_elements.append(
                Line2D(
                    [0],
                    [0],
                    marker="o",
                    color="w",
                    label=f"{models_mapping[model]}",
                    markerfacecolor=color,
                    markersize=10,
                )
            )
        # Add customized x and y labels
        ax.set_xlabel("Targeted Safety Risk Score")
        ax.set_ylabel("Goal Completion Score")

        # increase the font size of the axis scale
        ax.tick_params(axis="both", labelsize=14)
        # increase the font size of the axis label
        ax.xaxis.label.set_size(14)
        ax.yaxis.label.set_size(14)
        if show_intent:
            ax.legend(handles=legend_elements, title="Model - Intent", loc="upper left")
        else:
            ax.legend(handles=legend_elements, title="Model", loc="upper left")

    # Improve visual spacing
    if save_path:
        plt.savefig(save_path, format="pdf", bbox_inches="tight")
    plt.show()


plot_safety_goal_scatter(
    episodes_list,
    save_path="/Users/xuhuizhou/Projects/papers/ICLR2025-HAICosystem/figures/safety_goal_scatter_llama.pdf",
    if_show_codename=False,
    consider_difficulty=False,
    show_intent=False,
    do_regression=False,
    do_zoom_seperate=True,
)

# Sec 6.2: human intents

### Get the data

In [None]:
# Obtain the data
from sotopia.database import EpisodeLog

from haicosystem.protocols import HaiEnvironmentProfile
from haicosystem.utils import get_avg_reward


def calculate_model_rewards(models, tags, remove_tools):
    model_rewards = {}
    for model, tag in zip(models, tags):
        episodes = EpisodeLog.find(EpisodeLog.tag == tag).all()
        benign_intent_episodes = []
        malicious_intent_episodes = []
        benign_intent_episodes_wo_tools = []
        malicious_intent_episodes_wo_tools = []
        for episode in episodes:
            env = HaiEnvironmentProfile.get(episode.environment)
            tools_or_not = len(env.toolkits) > 0
            if remove_tools:
                if not tools_or_not:
                    benign_intent_episodes_wo_tools.append(episode)
                    malicious_intent_episodes_wo_tools.append(episode)
            if env.agent_intent_labels[0] == "benign":
                benign_intent_episodes.append(episode)
            else:
                malicious_intent_episodes.append(episode)
        try:
            benign_binary_avg_rewards_wo_tools = get_avg_reward(
                benign_intent_episodes_wo_tools, model, binary=True
            )  # type: ignore
        except Exception as e:
            benign_binary_avg_rewards_wo_tools = {}

        malicious_binary_avg_rewards_wo_tools = get_avg_reward(
            malicious_intent_episodes_wo_tools, model, binary=True
        )  # type: ignore
        benign_binary_avg_rewards = get_avg_reward(
            benign_intent_episodes, model, binary=True
        )  # type: ignore
        malicious_binary_avg_rewards = get_avg_reward(
            malicious_intent_episodes, model, binary=True
        )  # type: ignore
        model_rewards[model] = (
            benign_binary_avg_rewards,
            malicious_binary_avg_rewards,
            malicious_binary_avg_rewards_wo_tools,
        )
    return model_rewards


model_rewards = calculate_model_rewards(models, tags, remove_tools=True)

### Get the plot malicious vs benign

In [None]:
from typing import Dict, Tuple

import matplotlib.pyplot as plt
import seaborn as sns


def draw_overall_score_bar_plot(
    data: Dict[
        str,
        Tuple[
            Dict[str, Tuple[float, float]],
            Dict[str, Tuple[float, float]],
            Dict[str, Tuple[float, float]],
        ],
    ],
    save_path: str,
) -> None:
    # Prepare data for plotting
    models = []
    scores = []
    intents = []

    for model_name, (benign, malicious, malicious_wo_tools) in data.items():
        models.append(models_mapping[model_name])
        scores.append(benign["overall_score"][0])
        intents.append("Benign")

        models.append(models_mapping[model_name])
        scores.append(malicious["overall_score"][0])
        intents.append("Malicious")

        models.append(models_mapping[model_name])
        scores.append(malicious_wo_tools["overall_score"][0])
        intents.append("Malicious (wo tools)")

    # Create a DataFrame for easier plotting
    plot_data = {"Model": models, "Overall Score": scores, "Intent": intents}
    # Set up the color palette
    palette = {
        "Benign": "#20c997",
        "Malicious": "#aca2e8",
        "Malicious (wo tools)": "#8879de",
    }
    custom_params = {"axes.spines.right": False, "axes.spines.top": False}
    sns.set_theme(style="whitegrid", rc=custom_params)
    # Plot the data
    plt.figure(figsize=(6, 4))
    ax = sns.barplot(
        x="Model", y="Overall Score", hue="Intent", data=plot_data, palette=palette
    )

    # Adding labels and title
    plt.xlabel("")
    plt.ylabel("Overall Risk Ratio")
    plt.ylim(0, 1)

    for p in ax.patches:
        height = p.get_height()
        if height > 0:  # Only annotate bars with a positive height
            ax.annotate(
                f"{height:.2f}",
                (p.get_x() + p.get_width() / 2.0, height),
                ha="center",
                va="bottom",  # Adjust the vertical alignment to be 'bottom'
                xytext=(0, 8),
                textcoords="offset points",
                fontsize=10,
                color="black",
            )
    # Position the legend to upper left
    plt.legend(title="Intent", title_fontsize="10", fontsize="8", loc="upper left")

    # Improve visual spacing
    if save_path:
        plt.savefig(save_path, format="pdf", bbox_inches="tight")
    plt.legend(title="Intent", title_fontsize="10", fontsize="8")

    # Improve layout
    plt.tight_layout()

    # Show the plot
    plt.show()


draw_overall_score_bar_plot(
    model_rewards,
    save_path="/Users/xuhuizhou/Projects/papers/ICLR2025-HAICosystem/figures/human_intent_plot.pdf",
)

### malicious intent inferring ability

In [None]:
from sotopia.database import EpisodeLog

from haicosystem.protocols import HaiEnvironmentProfile
from haicosystem.utils import get_avg_reward

model_rewards = {}
for model, tag in zip(models, tags):
    episodes = EpisodeLog.find(EpisodeLog.tag == tag).all()
    episodes_with_tools = []
    episodes_wo_tools = []
    for episode in episodes:
        env = HaiEnvironmentProfile.get(episode.environment)
        tools_or_not = len(env.toolkits) > 0
        if tools_or_not:
            episodes_with_tools.append(episode)
        else:
            episodes_wo_tools.append(episode)
    print(
        f"the number of the datapoints for goal and risk for each model: {len(episodes_wo_tools)}"
    )
    try:
        avg_rewards_wo_tools = get_avg_reward(episodes_wo_tools, model)  # type: ignore
    except Exception as e:
        avg_rewards_wo_tools = {}
    avg_rewards_with_tools = get_avg_reward(episodes_with_tools, model)  # type: ignore
    model_rewards[model] = (avg_rewards_with_tools, avg_rewards_wo_tools)

In [None]:
from typing import Dict

import matplotlib.pyplot as plt
import pandas as pd
import seaborn as sns

custom_params = {"axes.spines.right": False, "axes.spines.top": False}
sns.set_theme(style="whitegrid", rc=custom_params)


def draw_malicious_intent_bar_plot(
    data: dict[
        str, tuple[dict[str, tuple[float, float]], dict[str, tuple[float, float]]]
    ],
    save_path: str,
) -> None:
    # Prepare data for plotting
    models = []
    goal_scores = []
    risk_scores = []
    risk_scores_wo_tools = []
    goal_scores_wo_tools = []
    # the second element of the tuple is the malicious intent
    for model_name, (benign, malicious) in data.items():
        models.append(models_mapping[model_name])
        risk_scores.append(benign["targeted_safety_risks"][0])
        goal_scores.append(benign["efficiency"][0])
        risk_scores_wo_tools.append(malicious["targeted_safety_risks"][0])
        goal_scores_wo_tools.append(0.0)
    plot_data = {
        "Model": models,
        "Efficiency": goal_scores,
        "Risk": risk_scores,
        "Risk (wo tools)": risk_scores_wo_tools,
    }
    custom_palette = {
        "Efficiency": "#63e6be",
        "Risk": "#ff6b6b",
        "Risk (wo tools)": "orange",
    }
    plot_data_df = pd.DataFrame(plot_data)

    # Plot Efficiency and Risk as stacked bars
    fig, ax = plt.subplots(figsize=(6, 4))
    bar_offset = 0.5  # Adjust this value to control the gap between models
    bar_width = 0.25
    # Plot Efficiency and Risk as stacked bars
    plot_data_df.plot(
        x="Model",
        y=["Efficiency", "Risk"],
        kind="bar",
        stacked=True,
        color=[custom_palette["Efficiency"], custom_palette["Risk"]],
        width=bar_width,  # Increase the width of the bars to reduce the gap between models
        ax=ax,
        position=1 - bar_offset,
    )

    # Plot Risk (wo tools) as a separate bar
    plot_data_df.plot(
        x="Model",
        y="Risk (wo tools)",
        kind="bar",
        color=custom_palette["Risk (wo tools)"],
        ax=ax,
        width=bar_width,  # Increase the width of the bars to reduce the gap between models
        position=1 + bar_offset,  # Align the position to overlap the bars
    )

    ax.set_xticklabels(
        ax.get_xticklabels(), rotation=0
    )  # Set x-axis labels to horizontal

    ax.set_xlabel("")  # Remove the x-axis label
    ax.legend(fontsize="x-small")  # Set smaller legend
    # Add numbers on each bar, excluding 0.0
    for p in ax.patches:
        height = p.get_height()
        if height != 0.0:  # Only annotate if height is not 0.0
            ax.annotate(
                format(height, ".2f"),
                (p.get_x() + p.get_width() / 2.0, height),
                ha="center",
                va="center",
                fontsize="x-small",  # Make the text smaller
                xytext=(0, 9 if height > 0 else -9),
                textcoords="offset points",
            )
    ax.set_ylim(bottom=-8)  # Increase the y limit to -9
    if save_path:
        plt.savefig(save_path, format="pdf", bbox_inches="tight")
    plt.show()


draw_malicious_intent_bar_plot(model_rewards, save_path="./malicious_intent_plot.pdf")

# Sec 6.3: Access to the tools


In [None]:
from typing import Dict

import matplotlib.pyplot as plt
import seaborn as sns


def draw_efficiency_goal_bar_plot(
    data: Dict[str, Dict[str, Tuple[float, float]]], save_path: str = None
) -> None:
    # Prepare data for plotting
    models = []
    scores = []
    metrics = []

    for model_name, metrics_dict in data.items():
        models.append(models_mapping[model_name])
        scores.append(
            metrics_dict["efficiency"][0]
        )  # Use the first element of the tuple for efficiency
        metrics.append("Efficiency")

        models.append(models_mapping[model_name])
        scores.append(
            metrics_dict["goal"][0]
        )  # Use the first element of the tuple for goal
        metrics.append("Goal")

        models.append(models_mapping[model_name])
        scores.append(
            metrics_dict["targeted_safety_risks"][0]
        )  # Use the first element of the tuple for efficiency
        metrics.append("targeted_safety_risks")

    # Create a DataFrame for easier plotting
    plot_data = {"Model": models, "Score": scores, "Metric": metrics}

    # Set up the color palette
    palette = {
        "Efficiency": "#69db7c",
        "Goal": "#4dabf7",
        "targeted_safety_risks": "#ff6b6b",
    }

    # Plot the data
    plt.figure(figsize=(6, 4))
    ax = sns.barplot(
        x="Model", y="Score", hue="Metric", data=plot_data, palette=palette
    )

    # Adding labels and title
    plt.xlabel("")
    plt.ylabel("Score")
    plt.ylim(min(scores) - 1, max(scores) + 1)

    # Add the scores above the bars
    for p in ax.patches:
        height = p.get_height()
        ax.annotate(
            f"{height:.2f}".rstrip("0").rstrip(".").rstrip("0"),
            (p.get_x() + p.get_width() / 2.0, height),
            ha="center",
            va="bottom",  # Adjust the vertical alignment to be 'bottom'
            xytext=(0, 8),
            textcoords="offset points",
            fontsize=10,
            color="black",
        )

    # Improve visual spacing
    handles, labels = ax.get_legend_handles_labels()
    labels = [
        "Efficiency" if label == "Efficiency" else "Goal" if label == "Goal" else "Targ"
        for label in labels
    ]
    plt.legend(handles, labels, title="Metric", title_fontsize="10", fontsize="8")
    if save_path:
        plt.savefig(save_path, format="pdf", bbox_inches="tight")

    # Improve layout
    plt.tight_layout()

    # Show the plot
    plt.show()


draw_efficiency_goal_bar_plot(
    performance_data,
    save_path="/Users/xuhuizhou/Projects/papers/ICLR2025-HAICosystem/figures/access_to_tools_plot.pdf",
)

### Correlation between efficiency and safety risks


In [None]:
# Sec 6.4: Correlation between efficiency and safety risks

from typing import Dict

import numpy as np
import scipy.stats as stats


def calculate_correlation(data: Dict[str, Dict[str, Tuple[float, float]]]) -> float:
    # Extract efficiency and safety risks scores
    efficiency_scores = [
        metrics_dict["efficiency"][0] for metrics_dict in data.values()
    ]
    print(efficiency_scores)
    safety_risks_scores = [
        metrics_dict["targeted_safety_risks"][0] for metrics_dict in data.values()
    ]
    print(safety_risks_scores)

    # Calculate the correlation coefficient
    correlation_coefficient, _ = stats.pearsonr(efficiency_scores, safety_risks_scores)

    return correlation_coefficient


# Calculate the correlation coefficient
correlation_coefficient = calculate_correlation(performance_data)

print(
    f"The correlation coefficient between efficiency and safety risks is: {correlation_coefficient:.4f}"
)

In [None]:
## correlation with all episodes
def calculate_correlation(episodes: list[EpisodeLog]) -> float:
    # Extract efficiency and safety risks scores
    efficiency_scores = [episode.rewards[1][1]["efficiency"] for episode in episodes]
    safety_risks_scores = [
        episode.rewards[1][1]["targeted_safety_risks"] for episode in episodes
    ]

    # Calculate the correlation coefficient
    correlation_coefficient, _ = stats.pearsonr(efficiency_scores, safety_risks_scores)

    return correlation_coefficient


correlation_coefficient = calculate_correlation(all_episodes)
print(
    f"The correlation coefficient between efficiency and safety risks is: {correlation_coefficient:.4f}"
)