In [None]:
import os,sys,re,math,copy, random, time,json
import argparse
import numpy as np
import pandas as pd
from datetime import datetime

from openai import OpenAI

from bert_score import score


In [None]:
# set up llm params 
models = ["gpt-4o-2024-05-13", "gpt-4o-2024-08-06", "gpt-4o-2024-11-20", "gpt-4o-mini-2024-07-18"]
openai_params = {"model":models[-1],"temperature":0, "max_tokens":300, "top_p":0.5}
api_key = "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"  # Replace with your key


In [None]:
# util functions for generation and evaluation
def load_json(file_path):
    with open(file_path, 'r', encoding='utf-8') as file: 
        data = json.load(file) 
    return data

def format_msg_role(role, prompt):
    return {"role": role, "content": prompt}
def write_json(file_path, data):
    with open(file_path, 'w', encoding='utf-8') as file: 
        json.dump(data, file, indent=4)

def get_time():
    """Returns the current date and time in the format 'yymmdd-hhmm'."""
    return datetime.now().strftime("%m%d_%H%M")


def calculate_bert_scores(reference, hypotheses, model_type="bert-base-uncased"):
    # BERTScore requires both references and hypotheses to be lists
    references = [reference] * len(hypotheses)  # Duplicate reference for each hypothesis

    # Calculate BERTScore
    P, R, F1 = score(hypotheses, references, model_type=model_type, verbose=False)

    # Return precision scores as a list
    return P.tolist()

def get_bertP(res_list):
    for prob_ind, prob_obj in enumerate(res_list):
        ref = prob_obj['A']
        hyp = res_list[prob_ind]['HA']
        bertP = round(calculate_bert_scores(ref, [hyp])[-1],4)
        #
        res_list[prob_ind]['bertP'] = bertP
        res_list[prob_ind]['hj_bert'] = 0.5*bertP + 0.5*float(res_list[prob_ind]['human_judge'])
    return res_list

def basic_openai_chat_completion(api_key, openai_params, conv_prompt):
    client = OpenAI(api_key=api_key)
    openai_params.update({"messages":conv_prompt})
    response = client.chat.completions.create(**openai_params)
    answer_msg = response.choices[0].message.content
    #
    return response, answer_msg


In [None]:
TEST_DATA_ROOT = "/home/yuliang/ece1786/proj/Memoraid/Test_Data"
# database information
pers_info_json = load_json(os.path.join(TEST_DATA_ROOT, "Personal_info.json"))
serv_info_json = load_json(os.path.join(TEST_DATA_ROOT, "Service_info.json"))

# load test dataset
qa_easy = load_json(os.path.join(TEST_DATA_ROOT, "QA_easy.json"))
qa_mid = load_json(os.path.join(TEST_DATA_ROOT, "QA_mid.json"))
qa_hard = load_json(os.path.join(TEST_DATA_ROOT, "QA_hard.json"))


In [None]:
# system prompt for baseline model
sys_prompt = f"""
You are a helpful assistant. You task is to effectively communicate with an Alzheimer's patient (user), answering user's questions with given context.

The following are provided context regarding the user:
Pseronal information regarding the user:
{json.dumps(pers_info_json)}
Service information provided by the care provider:
{json.dumps(serv_info_json)}

Answer the following question:
"""

sys_msg = format_msg_role("system", sys_prompt)

In [None]:
# baseline evaluation
# result path
BASELINE_EVAL_PATH = f"/home/yuliang/ece1786/proj/eval_results/baseline/{get_time()}"
os.makedirs(BASELINE_EVAL_PATH, exist_ok=True)
#
test_data_list = {"qa_easy":qa_easy, "qa_mid":qa_mid, "qa_hard":qa_hard}

for curr_test_dataset_name, curr_test_dataset in test_data_list.items():
    for prob_ind, prob_obj in enumerate(curr_test_dataset):
        usr_msg = format_msg_role("user", prob_obj['QD'])
        _,agent_res_msg = basic_openai_chat_completion(api_key, openai_params, [sys_msg, usr_msg])
        curr_test_dataset[prob_ind]['HA'] = agent_res_msg
        #
        ref = prob_obj['A']
        hyp = agent_res_msg
        curr_test_dataset[prob_ind]['bertP'] = round(calculate_bert_scores(ref, [hyp])[-1],4)
    #
    print(f"finished: {curr_test_dataset_name} -> writing io | average_bertP: {np.mean([prob_obj['bertP'] for prob_obj in curr_test_dataset]):.4f}")
    write_json(os.path.join(BASELINE_EVAL_PATH, f"{curr_test_dataset_name}_baseline.json"), curr_test_dataset)
        

# write_json(qa_easy)

finished: qa_easy -> writing io
finished: qa_mid -> writing io
finished: qa_hard -> writing io


In [None]:
for curr_test_dataset_name, curr_test_dataset in test_data_list.items():
    print(f"finished: {curr_test_dataset_name} -> writing io | average_bertP: {np.mean([prob_obj['bertP'] for prob_obj in curr_test_dataset]):.4f}")


## eval (individual result analysis only)

In [None]:
# The following code is for individual testing, where we take each generated answer set and 
# calcualte the bertP score and human judge score for each answer. Then we calculate the 
# weighted scores.
BASELINE_RATED_EVAL_PATH = "/home/yuliang/ece1786/proj/res/baseline_rated/"
WKFL_RATED_EVAL_PATH = "/home/yuliang/ece1786/proj/res/wkfl_rated/"


In [None]:
def get_bertP(res_list):
    for prob_ind, prob_obj in enumerate(res_list):
        ref = prob_obj['A']
        hyp = res_list[prob_ind]['HA']
        bertP = round(calculate_bert_scores(ref, [hyp])[-1],4)
        #
        res_list[prob_ind]['bertP'] = bertP
        res_list[prob_ind]['hj_bert'] = 0.5*bertP + 0.5*float(res_list[prob_ind]['human_judge'])
    return res_list
def get_scores(data_list):
    total_human_judge = sum(d['human_judge'] for d in data_list)
    total_bertP = sum(d['bertP'] for d in data_list)
    count = len(data_list)
    
    return {
        "average_human_judge": total_human_judge / count,
        "average_bertP": total_bertP / count
    }

In [None]:
# iterate over each level on each individual file for calculating the scores.
qa_levels = ['easy', 'mid', 'hard']

for level in qa_levels:
    wkfl_list =load_json(os.path.join(WKFL_RATED_EVAL_PATH, f"4OQA_{level}_updated.json"))
    baseline_list = load_json(os.path.join(BASELINE_RATED_EVAL_PATH, f"qa_{level}_baseline.json"))
    #
    wkfl_list_ = get_bertP(wkfl_list)
    baseline_list_ = get_bertP(baseline_list)
    #
    write_json(os.path.join(BASELINE_RATED_EVAL_PATH, f"bert_qa_{level}_baseline.json"), wkfl_list_)
    write_json(os.path.join(WKFL_RATED_EVAL_PATH, f"bert_4OQA_{level}_updated.json"), baseline_list_)
    # display the scores 
    print('-'*20)
    print(f"finished: {level}")
    print(f"baseline| {np.mean([obj['bertP'] for obj in baseline_list_]):.4f},{np.mean([obj['human_judge'] for obj in baseline_list_]):.4f},{np.mean([obj['hj_bert'] for obj in baseline_list_]):.4f}")
    print(f"wkfl| {np.mean([obj['bertP'] for obj in wkfl_list_]):.4f},{np.mean([obj['human_judge'] for obj in wkfl_list_]):.4f},{np.mean([obj['hj_bert'] for obj in wkfl_list_]):.4f}")


--------------------
finished: easy
baseline| 0.5873,1.0000,0.7936
wkfl| 0.4502,0.9143,0.6823
--------------------
finished: mid
baseline| 0.6184,1.0000,0.8092
wkfl| 0.4544,0.9714,0.7129
--------------------
finished: hard
baseline| 0.5405,1.0000,0.7702
wkfl| 0.4389,0.9429,0.6909


In [None]:
# count the tokens from each json file
def count_tokens(json_file):
    plain_text = json.dumps(json_file)
    tokens = plain_text.split()
    token_count = len(tokens)
    return token_count

pers_info_json = load_json(os.path.join(TEST_DATA_ROOT, "Personal_info.json"))
serv_info_json = load_json(os.path.join(TEST_DATA_ROOT, "Service_info.json"))

count_tokens(pers_info_json)