In [None]:
import numpy as np
import pandas as pd
from sklearn.metrics import precision_recall_fscore_support

In [None]:
all_decisions_df = pd.read_csv("results/all_decisions_df.csv")
all_decisions_df.fillna({"model": ""}, inplace=True)

In [None]:
def iterate_experiments(df: pd.DataFrame) -> pd.DataFrame:
    for model in df["model"].unique():
        model_df = df.query("model == @model")
        for dataset in df["dataset"].unique():
            dataset_df = model_df.query("dataset == @dataset")
            for task_scope in df["task_scope"].unique():
                task_scope_df = dataset_df.query("task_scope == @task_scope")
                for experiment_run in task_scope_df["experiment_run"].unique():
                    if task_scope == "n-gram":
                        experiment_df = task_scope_df
                    else:
                        experiment_df = task_scope_df.query(
                            "experiment_run == @experiment_run"
                        )
                    yield model, dataset, task_scope, experiment_run, experiment_df

In [None]:
scores = []
for model, dataset, task_scope, experiment_run, experiment_df in iterate_experiments(
    all_decisions_df
):
    for (
        other_model,
        _,
        other_task_scope,
        other_experiment_run,
        other_experiment_df,
    ) in iterate_experiments(all_decisions_df.query("dataset == @dataset")):
        # skip inter model combinations
        if (
            (model == "GPT-3.5") and (other_model == "GPT-4")
        ) or (
            (model == "GPT-4") and (other_model == "GPT-3.5")
        ):
            continue
        
        # use a simple average when we have the same task scope twice
        if (task_scope == other_task_scope) and (experiment_run != other_experiment_run):
            continue

        _left = experiment_df[["source", "target", "decision", "benchmark"]]
        _right = other_experiment_df[["source", "target", "decision", "benchmark"]]
        _df = _left.merge(
            _right, on=["source", "target"], suffixes=["_left", "_right"], how="outer"
        )
        unioned_decision = ((_df["decision_left"] == "yes") | (_df["decision_right"] == "yes"))
        p, r, f1, _ = precision_recall_fscore_support(
            _df["benchmark_left"],
            unioned_decision,
            average="binary",
            pos_label=True,
            zero_division=0.0,
        )

        scores.append(
            {
                "dataset": dataset,
                "from": task_scope,
                "from_run": experiment_run,
                "from_model": model,
                "to": other_task_scope,
                "to_run": other_experiment_run,
                "to_model": other_model,
                "precision": p,
                "recall": r,
                "f1-score": f1,
            }
        )

scores_df = pd.DataFrame(scores)
scores_df

In [None]:
scope_order = ["n-gram", "1-to-1", "1-to-n", "n-to-1", "n-to-n"]

scores_table = pd.pivot(
    scores_df
    .groupby(["dataset", "from", "to", "to_model"])[["f1-score", "precision", "recall"]]
    .mean()
    .reset_index()
    .groupby(["from", "to", "to_model"])[["f1-score", "precision", "recall"]]
    .mean()
    .reset_index(),
    index="from",
    columns=["to_model", "to"],
    values=["f1-score", "precision", "recall"],
).loc[
    scope_order
]

In [None]:
import plotly.graph_objects as go


def should_visualize(from_scope: str, to_scope: str) -> bool:
    """To create a diagonal table, this method helps in deciding which cell to draw."""
    return scope_order.index(from_scope) <= scope_order.index(to_scope)


columns = [[""] + ["GPT-3.5"] * 4 + ["GPT-4"] * 3, scope_order + scope_order[2:]]
values = []
for from_scope, row in scores_table.iterrows():
    value_row = []
    for (model, to_scope) in zip(*columns):
        value = scores_table.loc[from_scope, ("f1-score", model, to_scope)]
        if pd.isnull(value):
            value_row.append(pd.NA)
            continue

        if not should_visualize(from_scope, to_scope):
            value_row.append(0.0)
            continue
        
        if from_scope == "n-gram":
            reference_value = scores_table.loc[from_scope, ("f1-score", "", from_scope)]
        else:
            reference_value = scores_table.loc[from_scope, ("f1-score", model, from_scope)]
        value_row.append(
            scores_table.loc[from_scope, ("f1-score", model, to_scope)] - reference_value
        )
    values.append(value_row)
        
texts = [
    [
        (
            f"{scores_table.loc[from_scope, ('f1-score', model, to_scope)]:.3f} "
            f"({scores_table.loc[from_scope, ('precision', model, to_scope)]:.2f}, "
            f"{scores_table.loc[from_scope, ('recall', model, to_scope)]:.2f})"
        ) if should_visualize(from_scope, to_scope) else ""
        for (model, to_scope) in zip(*columns)
    ]
    for from_scope, row in scores_table.iterrows()
]

fig = go.Figure(
    layout=dict(
        title="Median F1-scores compared to baseline (green: better, purple: worse)",
        height=600,
        width=1000,
        yaxis={"autorange": "reversed"}
    ),
    data=go.Heatmap(
        x=columns,
        y=scores_table.index,
        z=values,
        text=texts,
        texttemplate="%{text}",
        textfont={"size": 12},
        colorscale="PrGn",
        zmin=-0.5,
        zmax=0.5,
        showscale=False,
    ),
)
fig.show()