In [62]:
import json
import numpy as np
import scipy.stats as stats

Provide the path to the evaluation results for each direction in the following variables:
- `eval_results_path`
- `reverse_eval_results_path`

"A" and "B" will be based on "A" and "B" from `eval_results_path`.

In [63]:
eval_results_path = ""
reverse_eval_results_path = ""

forward_map = {
    "A": "A",
    "B": "B",
    "tie": "tie",
}

reverse_map = {
    "B": "A",
    "A": "B",
    "tie": "tie",
}

with open(eval_results_path) as f:
    eval_results = json.load(f)

with open(reverse_eval_results_path) as f:
    reverse_eval_results = json.load(f)


Results are output after the cell and the bidirectional merged results are saved in the same directory as `eval_results_path` with the name `robust_merged` added to the original name.

In [None]:
criteria = ["contrast", "relevancy", "diversity", "usefulness"]
baseline = {
    "A": 0,
    "B": 0,
    "tie": 0,
}

results = []

overall_results = tuple(zip(eval_results, reverse_eval_results))

merged_results = {}

for forward, backward in overall_results:
    asp_results = forward["aspect_eval"]
    reverse_asp_results = backward["aspect_eval"]
    counts = {criterion: baseline.copy() for criterion in criteria}
    counts["query"] = forward["query"]
    assert counts["query"] == backward["query"]

    if counts["query"] not in merged_results:
        merged_results[counts["query"]] = {}

    curr_wins_A = {criterion: 0 for criterion in criteria}
    curr_wins_B = {criterion: 0 for criterion in criteria}
    for asp_result in asp_results:
        if asp_result not in reverse_asp_results:
            print(asp_result)
            continue
        if asp_result not in merged_results[counts["query"]]:
            merged_results[counts["query"]][asp_result] = {}

        for criterion in criteria:
            asp_results[asp_result][criterion] = asp_results[asp_result][criterion].lower() if len(asp_results[asp_result][criterion]) > 1 else asp_results[asp_result][criterion]
            reverse_asp_results[asp_result][criterion] = reverse_asp_results[asp_result][criterion].lower() if len(reverse_asp_results[asp_result][criterion]) > 1 else reverse_asp_results[asp_result][criterion]

            winner = forward_map[asp_results[asp_result][criterion]]
            reverse_winner = reverse_map[reverse_asp_results[asp_result][criterion]]

            if winner == reverse_winner:
                counts[criterion][winner] += 1
                curr_wins_A[criterion] += 1 if winner == "A" else 0
                curr_wins_B[criterion] += 1 if winner == "B" else 0
                merged_results[counts["query"]][asp_result][criterion] = winner
            else:
                counts[criterion]["tie"] += 1
                merged_results[counts["query"]][asp_result][criterion] = "tie"

    results.append(counts)
curr = "pairwise"
assert curr in eval_results_path
merged_path = eval_results_path.replace(curr, "robust_merged_pairwise")
with open(merged_path, "w") as f:
    json.dump(merged_results, f)

# aggregate A, B tier results for each criterion
aggregated_results = {criterion: baseline.copy() for criterion in criteria}
for result in results:
    for criterion in criteria:
        for tier in ["A", "B", "tie"]:
            aggregated_results[criterion][tier] += result[criterion][tier]


for criterion in criteria:
    tracker = []
    A = aggregated_results[criterion]["A"]
    B = aggregated_results[criterion]["B"]
    tie = aggregated_results[criterion]["tie"]
    tracker.extend([1] * A)
    tracker.extend([0] * B)
    tracker.extend([0.5] * tie)
    print(f"Criterion: {criterion}")
    t, p = stats.ttest_1samp(tracker, 0.5)
    A_win_rate = sum(tracker) / (A + B + tie)
    print(f"t: {t}, p: {p}, A win rate: {A_win_rate*100}%")

def mean_confidence_interval(data, confidence=0.95):
    a = 1.0 * np.array(data)
    n = len(a)
    m, se = np.mean(a), stats.sem(a)
    h = se * stats.t.ppf((1 + confidence) / 2., n-1)
    return m, m-h, m+h

print("\nResults for each criterions, 95% CI in brackets")
for criterion in criteria:
    tracker = []
    A = aggregated_results[criterion]["A"]
    B = aggregated_results[criterion]["B"]
    tie = aggregated_results[criterion]["tie"]
    tracker.extend([1] * A)
    tracker.extend([0] * B)
    tracker.extend([0.5] * tie)
    
    # Calculate 95% CI
    mean, ci_lower, ci_upper = mean_confidence_interval(tracker)
    # print(f"95% CI for {criterion}: ({ci_lower:.2f}, {ci_upper:.2f})")
    print(f"{criterion}:\n{mean:.2f} [{ci_lower:.2f}, {ci_upper:.2f}]")

