In [None]:
%matplotlib inline
%load_ext autoreload
%autoreload 2
import matplotlib.pyplot as plt
from sotopia.database import EpisodeLog, AnnotationForEpisode
from pydantic import ValidationError
from sotopia.generation_utils.generate import LLM_Name
from typing import get_args
import numpy as np
import numpy.typing as npt
import pandas as pd
from collections import defaultdict
from rich.console import Console
from sotopia.envs.evaluators import EvaluationBySocialDimensions

# Figures in the arXiv paper


## Figure 1

Correlation between human scores and model scores.


In [None]:
SOCIAL_DIMENSIONS: list[str] = list(EvaluationBySocialDimensions.__fields__.keys())
social_dim_offest = {
    "relationship": 0,
    "knowledge": 0,
    "secret": 0,
    "financial_and_material_benefits": 0,
    "social_rules": 0,
    "goal": 0,
    "believability": 0,
}

data_points: list[tuple[float, float]] = []
annotation_pks = AnnotationForEpisode.all_pks()
episode2human_annotations: dict[str, list[npt.NDArray[np.float32]]] = dict()
for annotation_pk in annotation_pks:
    annotation = AnnotationForEpisode.get(annotation_pk)
    human_rewards = annotation.rewards
    human_rewards_list: list[tuple[float, float]] = []
    for social_dim in SOCIAL_DIMENSIONS:
        human_rewards_list.append(
            (
                float(human_rewards[0][1][social_dim] + social_dim_offest[social_dim]),
                float(human_rewards[1][1][social_dim] + social_dim_offest[social_dim]),
            )
        )
    human_rewards_np = np.array(human_rewards_list)
    if annotation.episode not in episode2human_annotations:
        episode2human_annotations[annotation.episode] = [human_rewards_np]
    else:
        episode2human_annotations[annotation.episode].append(human_rewards_np)
for episode_pk in episode2human_annotations:
    all_human_annotations_for_episode = episode2human_annotations[episode_pk]
    average_human_annotations_for_episode = np.mean(
        all_human_annotations_for_episode, axis=0
    )
    model_rewards = EpisodeLog.get(episode_pk).rewards
    if len(model_rewards) and not isinstance(model_rewards[0], float):
        model_rewards_list: list[tuple[float, float]] = []
        for social_dim in SOCIAL_DIMENSIONS:
            model_rewards_list.append(
                (
                    float(
                        model_rewards[0][1][social_dim] + social_dim_offest[social_dim]
                    ),
                    float(
                        model_rewards[1][1][social_dim] + social_dim_offest[social_dim]
                    ),
                )
            )
        model_rewards_np = np.array(model_rewards_list)

        for i in range(len(SOCIAL_DIMENSIONS)):
            data_points.append(
                (average_human_annotations_for_episode[i][0], model_rewards_np[i][0])
            )
            data_points.append(
                (average_human_annotations_for_episode[i][1], model_rewards_np[i][1])
            )

data_points_np = np.array(data_points)
## make a scatter plot with datapoints with a regression line
### regression line with numpy
reg = np.polyfit(data_points_np[:, 0], data_points_np[:, 1], 1)
### scatter plot
fig, ax = plt.subplots(figsize=(5, 5))
data_points_unique, data_points_count = np.unique(
    data_points_np, axis=0, return_counts=True
)
colors = np.abs(data_points_unique[:, 0] - data_points_unique[:, 1])
ax.scatter(
    data_points_unique[:, 0],
    data_points_unique[:, 1],
    np.sqrt(data_points_count) * 10,
    alpha=0.5,
    c=-colors,
    cmap="coolwarm",
)
ax.plot(data_points_np[:, 0], reg[0] * data_points_np[:, 0] + reg[1], color="#F86A6E")
ax.set_xlabel("human reward")
ax.set_ylabel("model reward")
ax.set_title("human vs model reward")
# ax.grid(True)
plt.savefig("human_vs_model_reward.pdf", format="pdf", bbox_inches="tight")
plt.show()

## Figure 3

Model performance with respect to social dimensions


We first retrieve all episodes from the database with tag format: "<model*1>*<model_2>\_v0.0.1".


In [None]:
def _is_valid_episode_log_pk(pk: str) -> bool:
    try:
        episode = EpisodeLog.get(pk=pk)
    except ValidationError:
        return False
    try:
        tag = episode.tag
        model_1, model_2, version = tag.split("_", maxsplit=2)
        if (
            model_1 in get_args(LLM_Name)
            and model_2 in get_args(LLM_Name)
            and version == "v0.0.1_clean"
        ):
            return True
        else:
            return False
    except (ValueError, AttributeError):
        # ValueError: tag has less than 3 parts
        # AttributeError: tag is None
        return False


episodes: list[EpisodeLog] = [
    EpisodeLog.get(pk=pk)
    for pk in filter(_is_valid_episode_log_pk, EpisodeLog.all_pks())
]

In [None]:
len(episodes)

And then for each model pair, calculate the average reward over all episodes:


In [None]:
model_pair2performance: dict[tuple[LLM_Name, LLM_Name], npt.NDArray[np.float32]] = (
    dict()
)


def _episode_rewards_to_np_array(
    episode_rewards: list[tuple[float, dict[str, float]]],
) -> npt.NDArray[np.float32]:
    return np.array([list(i[1].values()) for i in episode_rewards])


for episode in episodes:
    _, model_1, model_2 = episode.models
    try:
        assert all(isinstance(i, tuple) for i in episode.rewards), episode.rewards
        episode_rewards = _episode_rewards_to_np_array(episode.rewards)
        if model_pair2performance.get((model_1, model_2)) is None:
            model_pair2performance[(model_1, model_2)] = np.expand_dims(
                episode_rewards, axis=0
            )
        else:
            model_pair2performance[(model_1, model_2)] = np.vstack(
                (
                    model_pair2performance[(model_1, model_2)],
                    np.expand_dims(episode_rewards, axis=0),
                )
            )
    except AssertionError:
        pass

Aggregation over episodes:


In [None]:
for model_pair in model_pair2performance:
    assert isinstance(model_pair2performance[model_pair], np.ndarray)
    model_pair2performance[model_pair] = np.mean(
        model_pair2performance[model_pair], axis=0
    )

Print results for tables


In [None]:
console = Console()

MODELS: list[LLM_Name] = [
    "gpt-4",
    "gpt-3.5-turbo",
    "togethercomputer/llama-2-70b-chat",
    "togethercomputer/mpt-30b-chat",
]

SOCIAL_DIMENSIONS: list[str] = list(EvaluationBySocialDimensions.__fields__.keys()) + [
    "overall"
]

for i in range(8):
    scores: dict[tuple[LLM_Name, LLM_Name], float] = defaultdict(float)
    for model_pair in model_pair2performance:
        scores[model_pair] = (
            model_pair2performance[model_pair][0][i]
            + model_pair2performance[model_pair[::-1]][1][i]
        ) / 2
    # table = Table(title=f"SOCIAL_DIMENSION: {SOCIAL_DIMENSIONS[i]}")
    print(f"SOCIAL_DIMENSION: {SOCIAL_DIMENSIONS[i]}")
    print("\t", end="")
    for model in MODELS:
        print(f"{model}\t", end="")
    print()
    for model_1 in MODELS:
        print(f"{model_1}\t", end="")
        for model_2 in MODELS:
            print(f"{scores[(model_1, model_2)]:.2f}\t", end="")
        print()
    # table.add_column("Model")
    # for model in MODELS:
    #     table.add_column(model)
    # for model_1 in MODELS:
    #     table.add_row(model_1, *[f"{scores[(model_1, model_2)]:.2f}" for model_2 in MODELS])

    # console.print(table)

In [None]:
model_pair2performance.keys()

In [None]:
console = Console()

MODELS: list[LLM_Name] = [
    "gpt-3.5-turbo",
    "gpt-4",
    "togethercomputer/llama-2-70b-chat",
    "togethercomputer/mpt-30b-chat",
]

SOCIAL_DIMENSIONS: list[str] = list(EvaluationBySocialDimensions.__fields__.keys()) + [
    "overall"
]
table_dict_by_social_dimension: dict[str, dict[str, dict[str, float]]] = {}
for i in range(8):
    table_dict_by_social_dimension[SOCIAL_DIMENSIONS[i]] = {}
    scores: dict[tuple[LLM_Name, LLM_Name], float] = defaultdict(float)
    for model_pair in model_pair2performance:
        scores[model_pair] = (
            model_pair2performance[model_pair][0][i]
            + model_pair2performance[model_pair[::-1]][1][i]
        ) / 2
    # table = Table(title=f"SOCIAL_DIMENSION: {SOCIAL_DIMENSIONS[i]}")
    print(f"SOCIAL_DIMENSION: {SOCIAL_DIMENSIONS[i]}")
    print("\t", end="")
    for model in MODELS:
        print(f"{model}\t", end="")
    for model_1 in MODELS:
        table_dict_by_social_dimension[SOCIAL_DIMENSIONS[i]][model_1] = {}
        for model_2 in MODELS:
            table_dict_by_social_dimension[SOCIAL_DIMENSIONS[i]][model_1][model_2] = (
                scores[(model_1, model_2)]
            )
dict_df_by_social_dimension = {
    social_dimension: pd.DataFrame(table_dict_by_social_dimension[social_dimension])
    for social_dimension in SOCIAL_DIMENSIONS
}

In [None]:
import seaborn as sns

dimension = dict_df_by_social_dimension["goal"]
dimension_copy = dimension.copy()
dimension_copy = dimension_copy.rename(
    columns={"gpt-3.5-turbo": "gpt-3.5", "togethercomputer/llama-2-70b-chat": "llama-2"}
)
dimension_copy = dimension_copy.rename(
    index={"gpt-3.5-turbo": "gpt-3.5", "togethercomputer/llama-2-70b-chat": "llama-2"}
)
dimension_copy = dimension_copy.reindex(["gpt-4", "gpt-3.5", "llama-2"])
dimension_copy = dimension_copy[["gpt-4", "gpt-3.5", "llama-2"]]

sns.heatmap(
    dimension_copy, cmap=sns.color_palette("crest", as_cmap=True), annot=True, fmt=".2f"
)

In [None]:
import seaborn as sns
import matplotlib.pyplot as plt
import matplotlib as mpl
import numpy as np

mpl.rcParams["figure.dpi"] = 300
# Assuming you have individual DataFrames for each dimension named dimension1, dimension2, etc.
# For example: dimension1 = pd.DataFrame(...), dimension2 = pd.DataFrame(...), ...

# List of dimension names
dimension_names = [social_dimension for social_dimension in SOCIAL_DIMENSIONS]

# Set up the plot grid
grid_rows = 2  # Number of rows in the grid
grid_cols = 4  # Number of columns in the grid
fig, axes = plt.subplots(grid_rows, grid_cols, figsize=(15, 5))
plt.subplots_adjust(wspace=0.5, hspace=0.5)  # Adjust spacing between subplots

# Loop through each dimension and create a heatmap
for idx, dimension_name in enumerate(dimension_names):
    row = idx // grid_cols
    col = idx % grid_cols
    ax = axes[row, col]

    dimension = dict_df_by_social_dimension[dimension_name]
    dimension_copy = dimension.copy()
    dimension_copy = dimension_copy.rename(
        columns={
            "gpt-4": "GPT-4",
            "gpt-3.5-turbo": "GPT-3.5",
            "togethercomputer/llama-2-70b-chat": "Llama-2",
            "togethercomputer/mpt-30b-chat": "MPT",
        }
    )
    dimension_copy = dimension_copy.rename(
        index={
            "gpt-4": "GPT-4",
            "gpt-3.5-turbo": "GPT-3.5",
            "togethercomputer/llama-2-70b-chat": "Llama-2",
            "togethercomputer/mpt-30b-chat": "MPT",
        }
    )
    dimension_copy = dimension_copy.reindex(["GPT-4", "GPT-3.5", "Llama-2", "MPT"])
    dimension_copy = dimension_copy[["GPT-4", "GPT-3.5", "Llama-2", "MPT"]]

    sns.heatmap(dimension_copy, cmap="crest", annot=True, fmt=".2f", ax=ax)
    ax.set_title(f"{dimension_name}")

# Adjust layout and show the plot
plt.tight_layout()
plt.savefig("heatmap.pdf", format="pdf", bbox_inches="tight")
plt.show()