In [None]:
import re
import openai
import json
import numpy as np
import time
from datasets import load_dataset
from tqdm.notebook import tqdm
from sklearn.metrics import confusion_matrix
import random
random.seed(42)

with open("../api_key.txt", "r") as f:
    openai.api_key = f.read().strip()

In [None]:
test_subset_name = "attreval_gensearch"
model = "gpt-4"
# model = "gpt-3.5-turbo"

# mode = "few_shot"
mode = "zero_shot"

test_data = [row for row in load_dataset("osunlp/AttrScore",test_subset_name)['test']]

TASK_PROMPTS = {p['task_name']:[p['prompt_template'],p['input_template'],p['label_map']] for p in json.load(open("../task_prompts.json"))}

In [None]:
def read_few_shot_demo(prompt_type):
    demo_file_name = {"attribution-no-definition": "demo_attr.txt",
                                  "attribution-with-definition": "demo_attr.txt",
                                  "fact-checking": "demo_fact-checking.txt",
                                  "nli": "demo_NLI.txt",
                                  "summarization": "demo_sum.txt"}
    with open(f"../few-shot-demo/{demo_file_name[prompt_type]}") as rf:
        demo_str = rf.read()
        rf.close()
    return demo_str

In [None]:
def format_prompt(example, prompt_type = "attribution-with-definition", input_has_query = True, mode="zero_shot"):
    task_prompt, input_template, _ = TASK_PROMPTS[prompt_type]
    if input_has_query:
        query = example['query'] if example['query'] and example['query'] not in ["nan",""] else ""
        answer = example['answer'] if example['answer'] and example['answer'] not in ["nan",""] else ""
        input = input_template.format(query + " " + answer, example['reference'])

    else:
        answer = example['answer'] if example['answer'] and example['answer'] not in ["nan",""] else ""
        input = input_template.format(answer, example['reference'])
    
    if mode=="few_shot":
        demo_str = read_few_shot_demo(prompt_type)
        prompt = "\n{}\n{}\n\n### Input: \n{}\n### Response:".format(task_prompt,demo_str,input)
    else:
        prompt = "\n{}\n\n### Input: \n{}\n### Response:".format(task_prompt,input)
    
    return prompt

In [None]:
def get_attr_from_chatgpt(prompt, model="gpt-3.5-turbo"):
    messages=[
        # {"role": "system", "content": "You are a helpful assistant."},
        {"role": "user", "content": prompt},
    ]
    while True:
        try:
            response = openai.ChatCompletion.create(
                model=model,
                messages=messages,
                temperature=0,
                top_p=0.9,
                max_tokens=512,
                n=1
            )
            # print(response)
            return response['choices'][0]['message']['content'].strip()
        except:
            print("error")
            time.sleep(5)

In [None]:
prompt = format_prompt(test_data[91],'attribution-with-definition',input_has_query=True,mode=mode)
response = get_attr_from_chatgpt(prompt,model)
print(prompt)
print(response)

In [None]:
output_file = "./{}_{}_{}_result.json".format(model,test_subset_name,mode)
print(output_file)

In [None]:

#downsample to save cost

# random.shuffle(test_data)
# test_data = test_data[:500]

for task_name in ["attribution-with-definition"]:
    res_key = '{}.eval.{}'.format(model,task_name)

    for example in tqdm(test_data):
        prompt = format_prompt(example,prompt_type=task_name,mode=mode)
        example[res_key] = get_attr_from_chatgpt(prompt,model)

json.dump(test_data,open(output_file,'w'))

In [None]:
def extract_pred_label(prediction, prompt_type = "attribution-with-definition"):
    label_map = TASK_PROMPTS[prompt_type][-1]
    label_regex = r"|".join(list(label_map.keys()))

    pred_label = re.search(label_regex, prediction, re.IGNORECASE).group() if re.search(
            label_regex,
            prediction, re.IGNORECASE) is not None else 'None'

    pred_label = label_map[pred_label.capitalize()] if pred_label.capitalize() in label_map else "None"
    return pred_label


In [None]:
def evaluate_confusion_matrix(confusion_matrix):
    num_classes = confusion_matrix.shape[0]
    precision = np.zeros(num_classes)
    recall = np.zeros(num_classes)
    f1 = np.zeros(num_classes)

    for i in range(num_classes):
        true_positives = confusion_matrix[i, i]
        false_positives = np.sum(confusion_matrix[:, i]) - true_positives
        false_negatives = np.sum(confusion_matrix[i, :]) - true_positives

        precision[i] = true_positives / (true_positives + false_positives)
        recall[i] = true_positives / (true_positives + false_negatives)
        f1[i] = 2 * precision[i] * recall[i] / (precision[i] + recall[i])

    micro_true_positives = np.sum(np.diag(confusion_matrix))
    micro_false_positives = np.sum(confusion_matrix, axis=0) - np.diag(confusion_matrix)

    micro_f1 = micro_true_positives / (micro_true_positives + np.sum(micro_false_positives))
    macro_f1 = np.mean(f1)

    return precision, recall, f1, micro_f1, macro_f1

In [None]:
for task_name in ["attribution-with-definition"]:
    res_key = '{}.eval.{}'.format(model,task_name)
    pred_labels = [extract_pred_label(example[res_key], prompt_type=task_name) for example in json.load(open(output_file))]
    true_labels = [example['label'] for example in json.load(open(output_file))]
    conf_matrix = confusion_matrix(true_labels, pred_labels, labels=["Attributable", "Contradictory", "Extrapolatory"])

    precision, recall, f1, micro_f1, macro_f1 = evaluate_confusion_matrix(conf_matrix)

    print(task_name)
    print(conf_matrix)
    print("Precision:", precision)
    print("Recall:", recall)
    print("F1:", f1)
    print("micro_f1:", micro_f1)
    print("macro_f1:", macro_f1)