In [None]:
import os
import argparse
import numpy as np
import pickle as pkl
import matplotlib.pyplot as plt
from jupyter_server.transutils import base_dir
from sklearn.metrics import roc_auc_score, roc_curve, balanced_accuracy_score, accuracy_score, precision_score, recall_score, f1_score , precision_recall_curve

import utils
from cos_baselines import embedding_model_thresholds

heuristic_thresholds = {'movies':0.8, 'books':1, 'world':None}

def movies_answer_heuristic(predicted_answer, gt_answer, threshold=0.8):
    predicted_cast = utils.spacy_extract_entities(predicted_answer)
    intersection, union = utils.calculate_intersection_and_union(gt_answer['movie_cast'], predicted_cast)

    answer_simple_heuristic = len(intersection) / len(predicted_cast) > threshold if len(predicted_cast) != 0 else True
    return answer_simple_heuristic


def books_answer_heuristic(predicted_answer, gt_answer, threshold=3):
    answer_simple_heuristic = sum([1 for x in gt_answer.values() if utils.check_entity_in_sentence(x, predicted_answer)])

    return answer_simple_heuristic >= threshold

def world_answer_heuristic(predicted_answer, gt_answer):
    for x in gt_answer.values():
        for i in x:
            if utils.check_entity_in_sentence(i, predicted_answer):
                return True
    return False



def calc_ans_heuristic(predicted_answer, gt_answer, dataset_name, heuristic_threshold):
    if dataset_name == 'books':
        gt = [books_answer_heuristic(x, y, heuristic_threshold) for x, y in zip(predicted_answer, gt_answer)]
    elif dataset_name == 'movies':
        gt = [movies_answer_heuristic(x, y, heuristic_threshold) for x, y in zip(predicted_answer, gt_answer)]
    elif dataset_name == 'world':
        gt = [world_answer_heuristic(x, y) for x, y in zip(predicted_answer, gt_answer)]
    else:
        gt = []

    return gt


def auc_plot(gt, pred, title, file_name, save_path='./'):
    pred = [x if x <= 1 else 1.0 for x in pred]

    pred = 1 - np.array(pred)
    gt = np.array(gt)
    gt = 1 - gt

    # calculate roc curve
    fpr, tpr, _ = roc_curve(gt, pred)

    ns_probs = [0 for _ in range(len(gt))]
    ns_fpr, ns_tpr, _ = roc_curve(gt, ns_probs)

    # plot the roc curve for the model
    plt.plot(ns_fpr, ns_tpr, linestyle='--')
    plt.plot(fpr, tpr, marker='.', label='Model')
    # axis labels
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    # show the legend
    plt.legend()
    # add the auc score and optimal threshold to the plot
    props = dict(boxstyle='round', facecolor='grey', alpha=0.5)
    # plt.text(0.7, 0.02, f'AUC: {roc_auc_score(gt, pred):.3f}\nOptimal Threshold: {optimal_threshold:.3f}\nBalanced Acc: {bal_acc:.3f}', fontsize=8, bbox=props)
    plt.text(0.1, 0.02, f'AUC: {roc_auc_score(gt, pred):.3f}\n', fontsize=8, bbox=props)

    # title
    plt.title(title)

    # show the plot
    save_path = os.path.join(save_path, file_name)
    plt.savefig(save_path)
    plt.cla()


def calc_auc_and_bal_acc(gt, pred):
    #  gt = true if not hallucinated, false if hallucinated
    #  pred is the cosine similarity between the generated question and the original question
    #  cosine similarity is high if it is not hallucinated


    pred = np.array([x if x <= 1 else 1.0 for x in pred])
    gt = np.array(gt)

    optimal_threshold = embedding_model_thresholds["sbert"]
    # optimal_threshold = 0.8
    pred_not_hallucinated = pred > optimal_threshold


    # gt = 1 - gt
    # pred = 1 - pred


    bal_acc = balanced_accuracy_score(1 - gt, 1 - pred_not_hallucinated)
    auc = roc_auc_score(gt, pred)

    return auc, bal_acc, 1 - gt, 1 - pred_not_hallucinated

def calc_auc_and_acc(base_dir, dataset_name, k_range, avg_max='avg'):
    heuristic_threshold = heuristic_thresholds[dataset_name]
    dataset_dir = os.path.join(base_dir, dataset_name)
    print("Dataset: ", dataset_name, "##############################################")

    ans_models = ["gpt", "llama_7b"]

    for ans_model in ans_models:
        print("Answer Model: ", ans_model, "####################")

        file_path = os.path.join(dataset_dir, f'{ans_model}_combined.pkl')
        with open(file_path, 'rb') as handle:
            results = pkl.load(handle)

            gt_answers = [res['answer_args'] for res in results]
            pred_ans = [res['predicted_answer'] for res in results]


            gt = calc_ans_heuristic(pred_ans, gt_answers, dataset_name, heuristic_threshold)


            # exp_type = 'predicted_questions_const'
            exp_type = 'predicted_questions_var'

            print("Experiment Type: ", exp_type)

            predicted_questions_cosine = [{key: value for m_res in res[exp_type] for key, value in m_res.items()} for res in results]

            pred_questions_cosine_gpt = [res['gpt'][:k_range] for res in predicted_questions_cosine]
            pred_questions_cosine_gpt = [[item[2] for item in inner_list] for inner_list in pred_questions_cosine_gpt]

            pred_questions_cosine_l7 = [res['l7'][:k_range] for res in predicted_questions_cosine]
            pred_questions_cosine_l7 = [[item[2] for item in inner_list] for inner_list in pred_questions_cosine_l7]

            pred_questions_cosine_l13 = [res['l13'][:k_range] for res in predicted_questions_cosine]
            pred_questions_cosine_l13 = [[item[2] for item in inner_list] for inner_list in pred_questions_cosine_l13]

            pred_questions_cosine_ensemble = [res1 + res2 + res3 for res1, res2, res3 in zip(pred_questions_cosine_gpt, pred_questions_cosine_l7, pred_questions_cosine_l13)]

            for f, f_name in zip([np.max, np.average], ['max', 'avg']):
                print(f_name + '\n')
                pred_questions_cosine_gpt_ = [f(x) for x in pred_questions_cosine_gpt]
                pred_questions_cosine_l7_ = [f(x) for x in pred_questions_cosine_l7]
                pred_questions_cosine_l13_ = [f(x) for x in pred_questions_cosine_l13]
                pred_questions_cosine_ensemble_ = [f(x) for x in pred_questions_cosine_ensemble]


                print(f'k={k_range}, heuristic_threshold={heuristic_threshold}')
                print(f'Hallucination rate: {1 - (sum(gt)/len(gt)):.3f}')
                gpt_res = calc_auc_and_bal_acc(gt, pred_questions_cosine_gpt_)
                print(f'gpt:\n  AUC: {gpt_res[0]:.3f}, Balanced Acc: {gpt_res[1]:.3f}')
                l7_res = calc_auc_and_bal_acc(gt, pred_questions_cosine_l7_)
                print(f'llama7:\n  AUC: {l7_res[0]:.3f}, Balanced Acc: {l7_res[1]:.3f}')
                l13_res = calc_auc_and_bal_acc(gt, pred_questions_cosine_l13_)
                print(f'llama13:\n  AUC: {l13_res[0]:.3f}, Balanced Acc: {l13_res[1]:.3f}')
                ensemble_res = calc_auc_and_bal_acc(gt, pred_questions_cosine_ensemble_)
                print(f'ensemble:\n  AUC: {ensemble_res[0]:.3f}, Balanced Acc: {ensemble_res[1]:.3f}')
                print('\n\n')

In [None]:
BASE_DIR = "/Users/jiviteshjain/Documents/CMU/Coursework/Sem-1/ANLP/Assignment-3"

In [None]:
dataset_name = "books"
base_dir = BASE_DIR
k_range = 5


heuristic_threshold = heuristic_thresholds[dataset_name]
dataset_dir = os.path.join(base_dir, dataset_name)
print("Dataset: ", dataset_name, "##############################################")

ans_model = "gpt"
print("Answer Model: ", ans_model, "####################")

file_path = os.path.join(dataset_dir, f'{ans_model}_combined.pkl')
with open(file_path, 'rb') as handle:
    results = pkl.load(handle)

    gt_answers = [res['answer_args'] for res in results]
    pred_ans = [res['predicted_answer'] for res in results]


    # True in gt means the answer is correct (matches ground truth ans) and not hallucinated.
    gt = calc_ans_heuristic(pred_ans, gt_answers, dataset_name, heuristic_threshold)


    # exp_type = 'predicted_questions_const'
    exp_type = 'predicted_questions_var'

    print("Experiment Type: ", exp_type)

    predicted_questions_cosine = [{key: value for m_res in res[exp_type] for key, value in m_res.items()} for res in results]

    pred_questions_cosine_gpt = [res['gpt'][:k_range] for res in predicted_questions_cosine]
    pred_questions_cosine_gpt = [[item[2] for item in inner_list] for inner_list in pred_questions_cosine_gpt]

    pred_questions_cosine_l7 = [res['l7'][:k_range] for res in predicted_questions_cosine]
    pred_questions_cosine_l7 = [[item[2] for item in inner_list] for inner_list in pred_questions_cosine_l7]

    pred_questions_cosine_l13 = [res['l13'][:k_range] for res in predicted_questions_cosine]
    pred_questions_cosine_l13 = [[item[2] for item in inner_list] for inner_list in pred_questions_cosine_l13]

    pred_questions_cosine_ensemble = [res1 + res2 + res3 for res1, res2, res3 in zip(pred_questions_cosine_gpt, pred_questions_cosine_l7, pred_questions_cosine_l13)]

    for f, f_name in zip([np.max, np.average], ['max', 'avg']):
        print(f_name + '\n')
        pred_questions_cosine_gpt_ = [f(x) for x in pred_questions_cosine_gpt]
        pred_questions_cosine_l7_ = [f(x) for x in pred_questions_cosine_l7]
        pred_questions_cosine_l13_ = [f(x) for x in pred_questions_cosine_l13]
        pred_questions_cosine_ensemble_ = [f(x) for x in pred_questions_cosine_ensemble]

        # if pred is high, it means the gen question matched the original question,
        # so answer was not hallucinated.

        print(f'k={k_range}, heuristic_threshold={heuristic_threshold}')
        print(f'Hallucination rate: {1 - (sum(gt)/len(gt)):.3f}')
        gpt_res = calc_auc_and_bal_acc(gt, pred_questions_cosine_gpt_)
        print(f'gpt:\n  AUC: {gpt_res[0]:.3f}, Balanced Acc: {gpt_res[1]:.3f}')
        l7_res = calc_auc_and_bal_acc(gt, pred_questions_cosine_l7_)
        print(f'llama7:\n  AUC: {l7_res[0]:.3f}, Balanced Acc: {l7_res[1]:.3f}')
        l13_res = calc_auc_and_bal_acc(gt, pred_questions_cosine_l13_)
        print(f'llama13:\n  AUC: {l13_res[0]:.3f}, Balanced Acc: {l13_res[1]:.3f}')
        ensemble_res = calc_auc_and_bal_acc(gt, pred_questions_cosine_ensemble_)
        print(f'ensemble:\n  AUC: {ensemble_res[0]:.3f}, Balanced Acc: {ensemble_res[1]:.3f}')
        print('\n\n')

        gt_hallucinated = gpt_res[2]
        pred_hallucinated = gpt_res[3]

In [None]:
from sklearn.metrics import confusion_matrix
import seaborn as sns
import matplotlib.pyplot as plt

conf_matrix = confusion_matrix(y_true=gt_hallucinated, y_pred=pred_hallucinated, labels=[1, 0])
plt.figure(figsize=(6, 6))
sns.heatmap(conf_matrix, annot=True, fmt="d", cmap="Blues", cbar=False,
            xticklabels=["Predicted Positive", "Predicted Negative"],
            yticklabels=["Actual Positive", "Actual Negative"])
plt.xlabel("Prediction")
plt.ylabel("Ground Truth")
plt.title("Books Confusion Matrix")

fig = plt.gcf()
fig.savefig("books_confusion_matrix.png", bbox_inches='tight', dpi=300)
plt.show()


In [None]:
tp = conf_matrix[0, 0]
fp = conf_matrix[1, 0]
tn = conf_matrix[1, 1]
fn = conf_matrix[0, 1]

print(tp, fn)
print(fp, tn)
print(conf_matrix)

In [None]:
fpr = fp / (fp + tn)
fnr = fn / (fn + tp)

ppv = tp / (tp + fp)
npv = tn / (tn + fn)

In [None]:
print("PPV: ", ppv)
print("NPV: ", npv)
print("FPR: ", fpr)
print("FNR: ", fnr)

In [None]:
# ppv and npv: higher is better
# fpr and fnr: lower is better

# ppv and fpr are bad
# ppv is low -> what is classifies as hallucination is not hallucination
# fpr is high -> not hallucination classified as hallucination

# too many false positives

In [None]:
# find false positive indices
gt_is_false_indices = gt_hallucinated == 0
fp_indices = np.where(pred_hallucinated[gt_is_false_indices] == 1)[0]
fp_indices

In [None]:
fp_examples = [results[i] for i in fp_indices]

In [None]:
for k in range(5):
    print("########################")
    print("k = ", k+1)
    print("########################")
    calc_auc_and_acc(BASE_DIR, "books", k+1, avg_max='avg')



In [None]:
def plot_calc(base_dir, dataset_name, k_range):
    heuristic_threshold = heuristic_thresholds[dataset_name]
    dataset_dir = os.path.join(base_dir, dataset_name)
    print("Dataset: ", dataset_name, "##############################################")

    ans_models = ["gpt"]

    for ans_model in ans_models:
        print("Answer Model: ", ans_model, "####################")

        file_path = os.path.join(dataset_dir, f'{ans_model}_combined.pkl')
        with open(file_path, 'rb') as handle:
            results = pkl.load(handle)

            gt_answers = [res['answer_args'] for res in results]
            pred_ans = [res['predicted_answer'] for res in results]


            gt = calc_ans_heuristic(pred_ans, gt_answers, dataset_name, heuristic_threshold)


            # exp_type = 'predicted_questions_const'
            exp_type = 'predicted_questions_var'

            print("Experiment Type: ", exp_type)

            predicted_questions_cosine = [{key: value for m_res in res[exp_type] for key, value in m_res.items()} for res in results]

            pred_questions_cosine_gpt = [res['gpt'][:k_range] for res in predicted_questions_cosine]
            pred_questions_cosine_gpt = [[item[2] for item in inner_list] for inner_list in pred_questions_cosine_gpt]

            pred_questions_cosine_l7 = [res['l7'][:k_range] for res in predicted_questions_cosine]
            pred_questions_cosine_l7 = [[item[2] for item in inner_list] for inner_list in pred_questions_cosine_l7]

            pred_questions_cosine_l13 = [res['l13'][:k_range] for res in predicted_questions_cosine]
            pred_questions_cosine_l13 = [[item[2] for item in inner_list] for inner_list in pred_questions_cosine_l13]

            pred_questions_cosine_ensemble = [res1 + res2 + res3 for res1, res2, res3 in zip(pred_questions_cosine_gpt, pred_questions_cosine_l7, pred_questions_cosine_l13)]

            for f, f_name in zip([np.max], ['max']):
                print(f_name + '\n')
                pred_questions_cosine_gpt_ = [f(x) for x in pred_questions_cosine_gpt]
                pred_questions_cosine_l7_ = [f(x) for x in pred_questions_cosine_l7]
                pred_questions_cosine_l13_ = [f(x) for x in pred_questions_cosine_l13]
                pred_questions_cosine_ensemble_ = [f(x) for x in pred_questions_cosine_ensemble]


                print(f'k={k_range}, heuristic_threshold={heuristic_threshold}')
                print(f'Hallucination rate: {1 - (sum(gt)/len(gt)):.3f}')
                gpt_res = calc_auc_and_bal_acc(gt, pred_questions_cosine_gpt_)
                print(f'gpt:\n  AUC: {gpt_res[0]:.3f}, Balanced Acc: {gpt_res[1]:.3f}')
                # l7_res = calc_auc_and_bal_acc(gt, pred_questions_cosine_l7_)
                # print(f'llama7:\n  AUC: {l7_res[0]:.3f}, Balanced Acc: {l7_res[1]:.3f}')
                # l13_res = calc_auc_and_bal_acc(gt, pred_questions_cosine_l13_)
                # print(f'llama13:\n  AUC: {l13_res[0]:.3f}, Balanced Acc: {l13_res[1]:.3f}')
                # ensemble_res = calc_auc_and_bal_acc(gt, pred_questions_cosine_ensemble_)
                # print(f'ensemble:\n  AUC: {ensemble_res[0]:.3f}, Balanced Acc: {ensemble_res[1]:.3f}')
                # print('\n\n')
                return gpt_res[0], gpt_res[1]

aucs = []
baccs = []
for k in range(5):
    auc, bacc = plot_calc(BASE_DIR, "books", k+1)
    aucs.append(auc)
    baccs.append(bacc)

In [None]:
fig, ax = plt.subplots(figsize=(8, 6))

x_vals = np.arange(1, 6)
# Plot the two series
ax.plot(x_vals, aucs, label="AUC", marker="o")
ax.plot(x_vals, baccs, label="B-Accuracy", marker="s")

# Add labels, title, and legend
ax.set_xlabel("K Size")
ax.set_ylabel("Score")
ax.set_title("Books Dataset")
ax.legend()

# Show grid
ax.grid(True)

# Show the plot
fig.savefig("books_auc_bacc.png", bbox_inches='tight', dpi=300)
plt.show()