In [None]:
from ast import literal_eval
from collections import Counter

import numpy as np
import pandas as pd
from q2d.checker import analyze_errors_by_severity
from q2d.common.utils import get_data_id
from scipy import stats
from sklearn.metrics import cohen_kappa_score

In [None]:
dataset = pd.read_csv("./datasets/annotations.csv")
dataset["sample_id_fixed"] = dataset.apply(
    lambda x: get_data_id(x["code"]) + "[SEP]" + get_data_id(x["query"]) + "[SEP]" + x["version"], axis=1
)
dataset

In [None]:
experiment_to_ids = {
    "GPT-4o": ["6851ee157b9b577dc44f1d6d", "6851ee257b9b577dc44f1d9e"],
    "Claude 0-shot": ["680ca2aace63f152b3965528", "680ca293ce63f152b39654f7"],
    "Qwen2.5-Coder-14B 0-shot": ["680ca307ce63f152b396558a", "680ca321ce63f152b39655bb"],
    "Qwen2.5-Coder-14B SFT Claude Synth": ["680ca197ce63f152b3965402", "680ca17dce63f152b39653d1"],
    "Qwen2.5-Coder-14B SFT Fixed Claude Synth": ["680ca278ce63f152b39654c6", "680ca262ce63f152b3965495"],
}
annotators = {"first", "second", "agreed"}
current_annotator = "agreed"  # first to get kappa_ci_asymptotic value

In [None]:
def kappa_ci_asymptotic(rater1, rater2, alpha=0.05):
    rater1, rater2 = np.asarray(rater1), np.asarray(rater2)
    n = len(rater1)
    kappa = cohen_kappa_score(rater1, rater2)

    cats = np.unique(np.concatenate([rater1, rater2]))
    cat_to_idx = {cat: i for i, cat in enumerate(cats)}

    k = len(cats)
    contingency = np.zeros((k, k))
    for i in range(n):
        contingency[cat_to_idx[rater1[i]], cat_to_idx[rater2[i]]] += 1

    p = contingency / n
    p_i = p.sum(axis=1)
    p_j = p.sum(axis=0)
    p_o = np.trace(p)
    p_e = np.sum(p_i * p_j)

    if np.isclose(p_e, 1):
        return kappa, kappa, kappa

    var_kappa = (
        p_o * (1 - p_o) / ((1 - p_e) ** 2)
        + 2 * (1 - p_o) * (2 * p_o * p_e - np.sum(np.diag(p) * (p_i + p_j))) / ((1 - p_e) ** 3)
        + (1 - p_o) ** 2 * (np.sum(p * np.power(p_i[:, None] + p_j, 2)) - 4 * p_e**2) / ((1 - p_e) ** 4)
    ) / n

    z = stats.norm.ppf(1 - alpha / 2)
    se = np.sqrt(max(0, var_kappa))

    return kappa, max(-1, kappa - z * se), min(1, kappa + z * se)

In [None]:
def explode_nodes(df: pd.DataFrame) -> pd.DataFrame:
    df["temp"] = df["nodes"].apply(lambda x: [(key, val) for key, val in literal_eval(x).items()])
    df_exploded = df.explode("temp")
    df_exploded["label"] = df_exploded["temp"].apply(lambda x: x[0])
    df_exploded["node_id"] = df_exploded["temp"].apply(lambda x: x[1])

    df_exploded = df_exploded[df_exploded["node_id"].apply(len) > 0]
    df_exploded = df_exploded.explode("node_id")

    return df_exploded.drop(columns=["temp"])[["sample_id_fixed", "node_id", "label", "repo", "version", "task_id"]]


all_tasks = set(task for name, tasks in experiment_to_ids.items() for task in tasks)
current_annotator_labels = explode_nodes(
    dataset[
        (dataset["annotator"] == current_annotator)
        & (dataset["status"] == "Finalized")
        & dataset["task_id"].isin(all_tasks)
    ][["sample_id_fixed", "nodes", "repo", "version", "task_id"]]
)

for another_annotator in annotators:
    if another_annotator != current_annotator:
        another_annotator_labels = explode_nodes(
            dataset[
                (dataset["annotator"] == another_annotator)
                & (dataset["status"] == "Finalized")
                & dataset["task_id"].isin(all_tasks)
            ][["sample_id_fixed", "nodes", "repo", "version", "task_id"]]
        )
        merged_df = current_annotator_labels.merge(
            another_annotator_labels,
            "inner",
            on=["sample_id_fixed", "task_id", "version", "repo", "node_id"],
            suffixes=("_cur", "_another"),
        )
        kappa, kappa_min, kappa_max = kappa_ci_asymptotic(merged_df["label_cur"], merged_df["label_another"])
        print(f"Cohen kappa {current_annotator} to {another_annotator}: {kappa} ci: [{kappa_min}, {kappa_max}]")

In [None]:
all_tasks = set(task for name, tasks in experiment_to_ids.items() for task in tasks)
subset = dataset[
    (dataset["annotator"] == current_annotator)
    & (dataset["status"] == "Finalized")
    & dataset["task_id"].isin(all_tasks)
]
stats = subset.apply(lambda x: Counter({key: len(val) for key, val in literal_eval(x["nodes"]).items()}), axis=1)
max_stats = pd.DataFrame.from_dict([dict(stat) for stat in stats])
max_stats["sample_id_fixed"] = subset["sample_id_fixed"].reset_index(drop=True)
max_stats = max_stats.groupby("sample_id_fixed").max().reset_index()

assert len(max_stats) == 24 * 2

total_max_stats = max_stats[["Sufficiency", "Completeness", "Hallucinations", "Verbosity"]].sum()
total_max_stats

In [None]:
all_macro_stats = {}
all_micro_stats = {}
for name, tasks in experiment_to_ids.items():
    subset = dataset[
        (dataset["annotator"] == current_annotator)
        & (dataset["status"] == "Finalized")
        & dataset["task_id"].isin(tasks)
    ]
    stats = subset.apply(lambda x: Counter({key: len(val) for key, val in literal_eval(x["nodes"]).items()}), axis=1)

    micro_stats = dict(
        {"Sufficiency": 0, "Completeness": 0, "Verbosity": 0, "Hallucinations": 0} | sum(stats, start=Counter())
    )

    macro_stats = pd.DataFrame.from_dict([dict(stat) for stat in stats])
    macro_stats["sample_id_fixed"] = subset["sample_id_fixed"].reset_index(drop=True)
    macro_stats = macro_stats.merge(max_stats, on="sample_id_fixed", suffixes=("", "_max"))

    all_macro_stats[name] = macro_stats
    all_micro_stats[name] = micro_stats

In [None]:
results = []
for name, _ in experiment_to_ids.items():
    micro_stats = all_micro_stats[name]
    macro_stats = all_macro_stats[name]

    # MICRO
    micro_TP = micro_stats["Sufficiency"] + micro_stats["Completeness"]
    micro_TP_hard = micro_stats["Sufficiency"]
    micro_FP = micro_stats["Verbosity"] + micro_stats["Hallucinations"]
    micro_FN = (total_max_stats["Sufficiency"] + total_max_stats["Completeness"]) - (
        micro_stats["Sufficiency"] + micro_stats["Completeness"]
    )
    micro_FN_hard = total_max_stats["Sufficiency"] - micro_stats["Sufficiency"]

    micro_precision = round(
        micro_TP / (micro_TP + micro_FP),
        3,
    )
    micro_hard_precision = round(
        micro_TP_hard / (micro_TP_hard + micro_FP),
        3,
    )
    micro_recall = round(
        micro_TP / (micro_TP + micro_FN),
        3,
    )
    micro_hard_recall = round(micro_TP_hard / (micro_TP_hard + micro_FN_hard), 3)

    micro_F1 = round(2 * micro_TP / (2 * micro_TP + micro_FP + micro_FN), 3)
    micro_hard_F1 = round(2 * micro_TP_hard / (2 * micro_TP_hard + micro_FP + micro_FN), 3)

    # MACRO
    macro_TP = macro_stats["Sufficiency"] + macro_stats["Completeness"]
    macro_TP_hard = macro_stats["Sufficiency"]
    macro_FP = macro_stats["Verbosity"] + macro_stats["Hallucinations"]
    macro_FN = (macro_stats["Sufficiency_max"] + macro_stats["Completeness_max"]) - (
        macro_stats["Sufficiency"] + macro_stats["Completeness"]
    )
    macro_FN_hard = macro_stats["Sufficiency_max"] - macro_stats["Sufficiency"]

    macro_precision = round(
        np.mean(
            np.nan_to_num(
                macro_TP / (macro_TP + macro_FP),
                nan=1.0,
            )
        ),
        3,
    )
    macro_hard_precision = round(
        np.mean(
            np.nan_to_num(
                macro_TP_hard / (macro_TP_hard + macro_FP),
                nan=1.0,
            )
        ),
        3,
    )

    macro_recall = round(
        np.mean(
            np.nan_to_num(
                macro_TP / (macro_TP + macro_FN),
                nan=1.0,
            )
        ),
        3,
    )
    macro_hard_recall = round(np.mean(np.nan_to_num(macro_TP_hard / (macro_TP_hard + macro_FN_hard), nan=1.0)), 3)

    macro_F1 = round(np.mean(np.nan_to_num(2 * macro_TP / (2 * macro_TP + macro_FP + macro_FN), nan=1.0)), 3)
    macro_hard_F1 = round(
        np.mean(np.nan_to_num(2 * macro_TP_hard / (2 * macro_TP_hard + macro_FP + macro_FN), nan=1.0)), 3
    )

    results.append(
        {"Name": name}
        | dict(micro_stats)
        | {
            "Micro_Precision": micro_precision,
            "Micro_Hard_Precision": micro_hard_precision,
            "Micro_Recall": micro_recall,
            "Micro_Hard_Recall": micro_hard_recall,
            "Micro_F1": micro_F1,
            "Micro_Hard_F1": micro_hard_F1,
        }
        | {
            "Macro_Precision": macro_precision,
            "Macro_Hard_Precision": macro_hard_precision,
            "Macro_Recall": macro_recall,
            "Macro_Hard_Recall": macro_hard_recall,
            "Macro_F1": macro_F1,
            "Macro_Hard_F1": macro_hard_F1,
        }
    )

results_df = pd.DataFrame.from_records(results)
print(
    results_df[
        [
            "Name",
            "Sufficiency",
            "Completeness",
            "Hallucinations",
            "Verbosity",
        ]
    ]
    .round(3)
    .to_markdown(tablefmt="github", index=False)
)
print()
print(
    results_df[
        [
            "Name",
            "Micro_Precision",
            "Micro_Hard_Precision",
            "Micro_Recall",
            "Micro_Hard_Recall",
            "Micro_F1",
            "Micro_Hard_F1",
        ]
    ]
    .round(3)
    .to_markdown(tablefmt="github", index=False)
)
print()
print(
    results_df[
        [
            "Name",
            "Macro_Precision",
            "Macro_Hard_Precision",
            "Macro_Recall",
            "Macro_Hard_Recall",
            "Macro_F1",
            "Macro_Hard_F1",
        ]
    ]
    .round(3)
    .to_markdown(tablefmt="github", index=False)
)

In [None]:
total_errors = []
unique_errors = []
for name, tasks in experiment_to_ids.items():
    subset = dataset[
        (dataset["annotator"] == current_annotator)
        & (dataset["status"] == "Finalized")
        & dataset["task_id"].isin(tasks)
    ]
    # total_nodes = sum(subset.apply(lambda x: sum(len(val) for val in literal_eval(x["nodes"]).items()), axis=1))

    cur_total = analyze_errors_by_severity(
        subset[["diagram", "code"]].to_dict(orient="records"), mode="total", average=None
    )
    # cur_total_mean = {f"{key}_Mean":val/total_nodes for key, val in cur_total.items()}
    cur_unique = analyze_errors_by_severity(
        subset[["diagram", "code"]].to_dict(orient="records"), mode="unique", average=None
    )
    # cur_unique_mean = {f"{key}_Mean":val/total_nodes for key, val in cur_unique.items()}

    total_errors.append({"Name": name} | cur_total)
    unique_errors.append({"Name": name} | cur_unique)
print("TOTAL:")
print(
    pd.DataFrame.from_records(total_errors)[["Name", "Low", "Medium", "High"]]
    .round(3)
    .to_markdown(tablefmt="github", index=False)
)
print()
print("UNIQUE:")
print(
    pd.DataFrame.from_records(unique_errors)[["Name", "Low", "Medium", "High"]]
    .round(3)
    .to_markdown(tablefmt="github", index=False)
)

In [None]:
total_errors = []
unique_errors = []
for name, tasks in experiment_to_ids.items():
    subset = dataset[
        (dataset["annotator"] == current_annotator)
        & (dataset["status"] == "Finalized")
        & dataset["task_id"].isin(tasks)
    ]
    # total_nodes = sum(subset.apply(lambda x: sum(len(val) for val in literal_eval(x["nodes"]).items()), axis=1))

    cur_total = analyze_errors_by_severity(
        subset[["diagram", "code"]].to_dict(orient="records"), mode="total", average="macro"
    )
    # cur_total_mean = {f"{key}_Mean":val/total_nodes for key, val in cur_total.items()}
    cur_unique = analyze_errors_by_severity(
        subset[["diagram", "code"]].to_dict(orient="records"), mode="unique", average="macro"
    )
    # cur_unique_mean = {f"{key}_Mean":val/total_nodes for key, val in cur_unique.items()}

    total_errors.append({"Name": name} | cur_total)
    unique_errors.append({"Name": name} | cur_unique)
print("TOTAL:")
print(
    pd.DataFrame.from_dict(total_errors)[["Name", "Low", "Medium", "High"]]
    .round(3)
    .to_markdown(tablefmt="github", index=False)
)
print()
print("UNIQUE:")
print(
    pd.DataFrame.from_dict(unique_errors)[["Name", "Low", "Medium", "High"]]
    .round(3)
    .to_markdown(tablefmt="github", index=False)
)

In [None]:
total_errors = []
unique_errors = []
for name, tasks in experiment_to_ids.items():
    subset = dataset[
        (dataset["annotator"] == current_annotator)
        & (dataset["status"] == "Finalized")
        & dataset["task_id"].isin(tasks)
    ]
    # total_nodes = sum(subset.apply(lambda x: sum(len(val) for val in literal_eval(x["nodes"]).items()), axis=1))

    cur_total = analyze_errors_by_severity(
        subset[["diagram", "code"]].to_dict(orient="records"), mode="total", average="micro"
    )
    # cur_total_mean = {f"{key}_Mean":val/total_nodes for key, val in cur_total.items()}
    cur_unique = analyze_errors_by_severity(
        subset[["diagram", "code"]].to_dict(orient="records"), mode="unique", average="micro"
    )
    # cur_unique_mean = {f"{key}_Mean":val/total_nodes for key, val in cur_unique.items()}

    total_errors.append({"Name": name} | cur_total)
    unique_errors.append({"Name": name} | cur_unique)
print("TOTAL:")
print(
    pd.DataFrame.from_records(total_errors)[["Name", "Low", "Medium", "High"]]
    .round(3)
    .to_markdown(tablefmt="github", index=False)
)
print()
print("UNIQUE:")
print(
    pd.DataFrame.from_records(unique_errors)[["Name", "Low", "Medium", "High"]]
    .round(3)
    .to_markdown(tablefmt="github", index=False)
)