In [1]:
import numpy as np
import os
import pandas as pd

from collections import Counter
from config import MAIN_DIR
from math import comb
from sklearn.metrics import accuracy_score, recall_score, precision_score, f1_score, cohen_kappa_score
from typing import Sequence, Dict, Any

from statsmodels.stats import inter_rater as irr
from statsmodels.stats.contingency_tables import mcnemar

In [2]:
DATA_DIR = os.path.join("..", "data", "answers")

human_result_path = os.path.join(MAIN_DIR, "artifacts", "human_responses.xlsx")
ai_result_path = os.path.join(MAIN_DIR, "artifacts", "ai_results_summary.xlsx")
save_folder = os.path.join("..", "artifacts")

In [33]:
def majority_vote(
    data: Sequence  
) -> Any:
    counter = Counter(data)
    return max(counter, key=counter.get)

def find_median(
    sequence: Sequence  
) -> int:
    return np.argsort(sequence)[len(sequence)//2]

def rename_column(
    column: pd.Series,
    rename_dict: Dict = {
        "USUALLY APPROPRIATE": "UA/MBA",
        "MAY BE APPROPRIATE": "UA/MBA",
        "UA": "UA/MBA", "MBA": "UA/MBA",
        "USUALLY NOT APPROPRIATE": "UNA",
        "INSUFFICIENT INFORMATION": "ICI"
        }
) -> pd.Series:
    return column.replace(rename_dict)

def evaluate_results(
    labels: Sequence, preds: Sequence    
) -> Dict:
    accuracy = accuracy_score(labels, preds)
    macro_precision = precision_score(labels, preds, average = "macro", zero_division=np.nan)
    weighted_precision = precision_score(labels, preds, average = "weighted", zero_division=np.nan)
    macro_recall = recall_score(labels, preds, average = "macro", zero_division=np.nan)
    weighted_recall = recall_score(labels, preds, average = "weighted", zero_division=np.nan)
    macro_f1 = f1_score(labels, preds, average = "macro", zero_division=np.nan)
    weighted_f1 = f1_score(labels, preds, average = "weighted", zero_division=np.nan)
    
    return {
        "accuracy": accuracy,
        "macro_precision": macro_precision,
        "weighted_precision": weighted_precision,
        "macro_recall": macro_recall,
        "weighted_recall": weighted_recall,
        "macro_f1": macro_f1,
        "weighted_f1": weighted_f1,
    }
    
def preprocess(result_df):
    result_df.columns = result_df.columns.str.strip()
    result_df["Approp Score"] = result_df["Approp Score"].replace("USUALLY APPROPRIATE", "UA")
    result_df["Approp Score"] = result_df["Approp Score"].replace("USUALLY NOT APPROPRIATE", "UNA")
    result_df["Approp Score"] = result_df["Approp Score"].replace("MAY BE APPROPRIATE", "MBA")
    result_df["Approp Score"] = result_df["Approp Score"].replace("INSUFFICIENT INFORMATION", "ICI")
    result_df["ANSKEY1"] = result_df["ANSKEY1"].str.strip()
    return result_df
    
def evaluate_human_results(result_df, classification_dict):
    result_df = preprocess(result_df)
    assert len(result_df) == 70, "Number of testcases must be 70."

    preds = result_df["Approp Score"].replace(classification_dict)
    labels = result_df["ANSKEY1"].replace(classification_dict)

    accuracy = accuracy_score(labels, preds)
    macro_precision = precision_score(labels, preds, average = "macro", zero_division=np.nan)
    weighted_precision = precision_score(labels, preds, average = "weighted", zero_division=np.nan)
    macro_recall = recall_score(labels, preds, average = "macro", zero_division=np.nan)
    weighted_recall = recall_score(labels, preds, average = "weighted", zero_division=np.nan)
    macro_f1 = f1_score(labels, preds, average = "macro", zero_division=np.nan)
    weighted_f1 = f1_score(labels, preds, average = "weighted", zero_division=np.nan)
    
    return {
        "accuracy": accuracy,
        "macro_precision": macro_precision,
        "weighted_precision": weighted_precision,
        "macro_recall": macro_recall,
        "weighted_recall": weighted_recall,
        "macro_f1": macro_f1,
        "weighted_f1": weighted_f1
    }

def _make_df_square(table):
    """
    Reindex a pandas DataFrame so that it becomes square, meaning that
    the row and column indices contain the same values, in the same
    order.  The row and column index are extended to achieve this.
    """

    if not isinstance(table, pd.DataFrame):
        return table

    # If the table is not square, make it square
    if not table.index.equals(table.columns):
        ix = list(set(table.index) | set(table.columns))
        ix.sort()
        table = table.reindex(index=ix, columns=ix, fill_value=0)

    # Ensures that the rows and columns are in the same order.
    table = table.reindex(table.columns)

    return table

def calculate_midp_mcnemar(confusion_matrix):
    confusion_matrix = _make_df_square(confusion_matrix)
    confusion_matrix = np.asarray(confusion_matrix, dtype=int)
    n1, n2 = confusion_matrix[0, 1], confusion_matrix[1, 0]
    statistic = np.minimum(n1, n2)
    total_sum = n1 + n2
    mcnemar_results = mcnemar(confusion_matrix, exact=True)
    mcnemar_pvalue = mcnemar_results.pvalue
    midp_mcnemar_pvalue = mcnemar_pvalue - comb(total_sum, statistic) * (0.5 ** total_sum)
    return midp_mcnemar_pvalue

# Analysis

In [39]:
summaries = []
matching_dict = {}

## Process Human Responses

In [35]:
human_labels = [
    'Human_rad1(CLP)',
    'Human_rad2(KGY)',
    'Human_trainee1(CHY)',
    'Human_trainee2(SUD)',
    'Human_ortho(NYH)'
]

In [40]:
for human_label in human_labels:
    summary = {"respondent": human_label}
    human_df = pd.read_excel(human_result_path, sheet_name=human_label)
    human_df["human_gt"] = rename_column(human_df["human_gt"])
    human_df["answer"] = rename_column(
        human_df["answer"],
        rename_dict = {
        "USUALLY APPROPRIATE": "UA/MBA",
        "MAY BE APPROPRIATE": "UA/MBA",
        "UA": "UA/MBA", "MBA": "UA/MBA",
        "USUALLY NOT APPROPRIATE": "UNA",
        "INSUFFICIENT INFORMATION": "ICI"
        }
        )
    
    evaluations = evaluate_results(human_df["human_gt"], human_df["answer"])
    # evaluations = evaluate_human_results(
    #     human_df, {"UA": "UA/MBA", "MBA": "UA/MBA", "UNA": "UNA", "ICI": "ICI"})
    summary.update(evaluations)
    summaries.append(summary)
    
    matching_dict[human_label] = human_df["human_gt"] == human_df["answer"]

## Process AI Responses

In [42]:
df_dict = {}
summary_results = {}

ai_modes = [
    "NoRAG", "NoRAG Reorder",
    "BaseRAG", "BaseRAG Reorder",
    "CombinedRAG", "CombinedRAG Reorder"
    # "BaseRAG-COT"
]

for ai_mode in ai_modes:
    
    summary_results[ai_mode] = {
        "accuracy": [],
        "macro_precision": [],
        "weighted_precision": [],
        "macro_recall": [],
        "weighted_recall": [],
        "macro_f1": [],
        "weighted_f1": [],
    }
    
    df_dict[ai_mode] = pd.read_excel(ai_result_path, sheet_name=ai_mode, index_col="No")
    df_dict[ai_mode]["human_gt"] = rename_column(df_dict[ai_mode]["human_gt"])
    
    for i in range(5):
        df_dict[ai_mode][f"answer_{i+1}"] = rename_column(df_dict[ai_mode][f"answer_{i+1}"])
        result_metrics = evaluate_results(df_dict[ai_mode]["human_gt"], df_dict[ai_mode][f"answer_{i+1}"])
        
        summary_results[ai_mode]["accuracy"].append(result_metrics["accuracy"])
        summary_results[ai_mode]["macro_precision"].append(result_metrics["macro_precision"])
        summary_results[ai_mode]["weighted_precision"].append(result_metrics["weighted_precision"])
        summary_results[ai_mode]["macro_recall"].append(result_metrics["macro_recall"])
        summary_results[ai_mode]["weighted_recall"].append(result_metrics["weighted_recall"])
        summary_results[ai_mode]["macro_f1"].append(result_metrics["macro_f1"])
        summary_results[ai_mode]["weighted_f1"].append(result_metrics["weighted_f1"])
        
    median_idx = find_median(summary_results[ai_mode]["accuracy"])
    df_dict[ai_mode]["median_run"] = df_dict[ai_mode][f"answer_{median_idx+1}"]
    summary_results[ai_mode]["accuracy"].append(summary_results[ai_mode]["accuracy"][median_idx])
    summary_results[ai_mode]["macro_precision"].append(summary_results[ai_mode]["macro_precision"][median_idx])
    summary_results[ai_mode]["weighted_precision"].append(summary_results[ai_mode]["weighted_precision"][median_idx])
    summary_results[ai_mode]["macro_recall"].append(summary_results[ai_mode]["macro_recall"][median_idx])
    summary_results[ai_mode]["weighted_recall"].append(summary_results[ai_mode]["weighted_recall"][median_idx])
    summary_results[ai_mode]["macro_f1"].append(summary_results[ai_mode]["macro_f1"][median_idx])
    summary_results[ai_mode]["weighted_f1"].append(summary_results[ai_mode]["weighted_f1"][median_idx])

    df_dict[ai_mode]["majority_vote"] = df_dict[ai_mode][[f"answer_{idx+1}" for idx in range(5)]].apply(lambda x: majority_vote(x), axis = 1)
    majority_metrics = evaluate_results(df_dict[ai_mode]["human_gt"], df_dict[ai_mode]["majority_vote"])
    summary_results[ai_mode]["accuracy"].append(majority_metrics["accuracy"])
    summary_results[ai_mode]["macro_precision"].append(majority_metrics["macro_precision"])
    summary_results[ai_mode]["weighted_precision"].append(majority_metrics["weighted_precision"])
    summary_results[ai_mode]["macro_recall"].append(majority_metrics["macro_recall"])
    summary_results[ai_mode]["weighted_recall"].append(majority_metrics["weighted_recall"])
    summary_results[ai_mode]["macro_f1"].append(majority_metrics["macro_f1"])
    summary_results[ai_mode]["weighted_f1"].append(majority_metrics["weighted_f1"])

    summary_results[ai_mode]["labels"] = [
        "Run1", "Run2", "Run3", "Run4", "Run5", "median_run", "Majority Vote"
    ]
    
    matching_dict[f"{ai_mode}_median"] = df_dict[ai_mode]["median_run"] == df_dict[ai_mode]["human_gt"]
    matching_dict[f"{ai_mode}_majorityvote"] = df_dict[ai_mode]["majority_vote"] == df_dict[ai_mode]["human_gt"]

In [43]:
for ai_mode, metrics in summary_results.items():
    for label, accuracy, macro_precision, weighted_precision, macro_recall, weighted_recall, macro_f1, weighted_f1 \
        in zip(
            metrics["labels"],
            metrics["accuracy"],
            metrics["macro_precision"],
            metrics["weighted_precision"],
            metrics["macro_recall"],
            metrics["weighted_recall"],
            metrics["macro_f1"],
            metrics["weighted_f1"]
        ):
            summaries.append(
                {
                    'respondent': f"{ai_mode}_{label}",
                    'accuracy': accuracy,
                    'macro_precision': macro_precision,
                    'weighted_precision': weighted_precision,
                    'macro_recall': macro_recall,
                    'weighted_recall': weighted_recall,
                    'macro_f1': macro_f1,
                    'weighted_f1': weighted_f1
                }
            )  

## Generate summary metrics

In [45]:
summary_df = pd.DataFrame(summaries)

summary_df.to_csv(
    os.path.join(save_folder, "result_summaries_3classes_updated.csv"),
    index=False)

# McNemar Tests

In [46]:
respondents = list(matching_dict.keys())
mcnemar_matrix = [[None]*len(respondents) for _ in range(len(respondents))]

In [47]:
for row_idx in range(len(respondents)):
    for col_idx in range(len(respondents)):
        if row_idx != col_idx:
            confusion_matrix = [[0, 0], [0, 0]]
            confusion_matrix[0][0] = (matching_dict[respondents[row_idx]] & matching_dict[respondents[col_idx]]).sum()
            confusion_matrix[0][1] = (matching_dict[respondents[row_idx]] & ~matching_dict[respondents[col_idx]]).sum()
            confusion_matrix[1][0] = (~matching_dict[respondents[row_idx]] & matching_dict[respondents[col_idx]]).sum()
            confusion_matrix[1][1] = (~matching_dict[respondents[row_idx]] & ~matching_dict[respondents[col_idx]]).sum()
            midp_mcnemar_pvalue = calculate_midp_mcnemar(confusion_matrix)
            mcnemar_matrix[row_idx][col_idx] = midp_mcnemar_pvalue

In [48]:
mcnemar_df = pd.DataFrame(mcnemar_matrix,
                          columns=respondents,index=respondents)
mcnemar_df.to_csv(
    os.path.join(save_folder, "mcnemar_3classes.csv"))

# Answer Consistency

## Cohen Kappa

In [49]:
settings = ["NoRAG", "BaseRAG", "CombinedRAG"]
run_labels = [
    'answer_1', 'answer_2', 'answer_3', 'answer_4', 'answer_5',
    'median_run', 'majority_vote'
]

In [50]:
with pd.ExcelWriter(os.path.join(save_folder, "pairwise_cohen_kappa.xlsx")) as writer:

    for setting in settings:
        row_respondents = [f"{setting}_{label}" for label in run_labels]
        row_data = [df_dict[setting][label] for label in run_labels]
        column_respondents = [f"{setting} Reorder_{label}" for label in run_labels]
        col_data = [df_dict[f"{setting} Reorder"][label] for label in run_labels]
        cohen_kappa_matrix = [[None]*len(column_respondents)
                            for _ in range(len(row_respondents))]
        
        for row_idx in range(len(row_respondents)):
            for col_idx in range(len(column_respondents)):
                row_response = row_data[row_idx]
                col_response = col_data[col_idx]
                cohen_kappa_matrix[row_idx][col_idx] = cohen_kappa_score(row_response, col_response)
                
        cohen_kappa_df = pd.DataFrame(
            cohen_kappa_matrix, 
            columns=column_respondents, index=row_respondents
            )
        
        cohen_kappa_df.to_excel(writer, sheet_name=setting, engine='xlsxwriter')
        

## Fleiss Kappa

In [51]:
ai_settings = [
    ["NoRAG"],
    ["NoRAG Reorder"],
    ["NoRAG", "NoRAG Reorder"],
    ["BaseRAG"],
    ["BaseRAG Reorder"],
    ["BaseRAG", "BaseRAG Reorder"],
    ["CombinedRAG"],
    ["CombinedRAG Reorder"],
    ["CombinedRAG", "CombinedRAG Reorder"]
]

In [52]:
fleiss_kappa_dict = {}

for ai_df_names in ai_settings:
    df_list = [df_dict[ai_df_name][[f"answer_{idx+1}" for idx in range(5)]] for ai_df_name in ai_df_names]
    agg = irr.aggregate_raters(pd.concat(df_list, axis=1))
    ai_mode = " + ".join(ai_df_names)
    fleiss_kappa_dict[ai_mode] = irr.fleiss_kappa(agg[0], method='fleiss')

In [53]:
fleiss_kappa_df = pd.DataFrame(
    {
        "ai_mode": list(fleiss_kappa_dict.keys()),
        "fleiss_kappa_score": list(fleiss_kappa_dict.values())
    }
)

fleiss_kappa_df.to_csv(
    os.path.join(save_folder, "fleiss_kappa.csv"),
    index = False
)