In [None]:
from transformers import AutoTokenizer, AutoModelForCausalLM
import torch
import pandas as pd
import numpy as np
import re

In [None]:
def extra_raw_and_answer_and_confidence(raw_response, generated_tokens, transition_scores):
    #print(f"Model raw response is:\n{raw_response}")
    # Regular expression patterns to extract the decision and percentage
    decision_pattern = r'Correctness:\s*(\w+)'
    external_confidence_pattern = r'Confidence:\s*(\d+)%?'
    
    # Extracting the decision
    decision_match = re.search(decision_pattern, raw_response)
    decision = decision_match.group(1) if decision_match else None
    
    # Extracting the confidecne
    external_confidence_match = re.search(external_confidence_pattern, raw_response)
    external_confidence = external_confidence_match.group(1) if external_confidence_match else None

    assert decision is not None and external_confidence is not None, "Cannot extract decision or external external_confidence"
    assert decision == 'Yes' or decision == 'No', "The decision must be 'Yes' or 'No'"
    try:
        external_confidence = int(external_confidence) / 100.0
    except:
        raise ValueError(f"The external confidence {external_confidence} is not a number")
    assert 0 <= external_confidence <= 1, "External confidence must be between 0 and 1"
    decision = 1 if decision == 'Yes' else 0

    for tok, score in zip(generated_tokens, transition_scores):
        score_cpu = score.cpu().numpy()
        #print(f"| {tok:5d} | {tokenizer.decode(tok):8s} | {score_cpu:.3f} | {np.exp(score_cpu):.2%}")
        if "Yes" in tokenizer.decode(tok) or "No" in tokenizer.decode(tok):
            internal_confidence = np.exp(score_cpu)
            break
    
    return raw_response, decision, external_confidence, internal_confidence

In [None]:
def extra_raw_and_answer_and_confidence_A_B(raw_response, generated_tokens, transition_scores):
    #print(f"Model raw response is:\n{raw_response}")
    # Regular expression patterns to extract the decision and percentage
    decision_pattern = r'Answer:\s*(\w)'
    external_confidence_pattern = r'Probability:\s*(\d+)%?'
    
    # Extracting the decision
    decision_match = re.search(decision_pattern, raw_response)
    decision = decision_match.group(1) if decision_match else None
    
    # Extracting the confidecne
    external_confidence_match = re.search(external_confidence_pattern, raw_response)
    external_confidence = external_confidence_match.group(1) if external_confidence_match else None

    assert decision is not None and external_confidence is not None, "Cannot extract decision or external confidence"
    assert decision == 'A' or decision == 'B', "The decision must be 'A' or 'B'"
    try:
        external_confidence = int(external_confidence) / 100.0
    except:
        raise ValueError(f"The external confidence {external_confidence} is not a number")
    assert 0 <= external_confidence <= 1, "External confidence must be between 0 and 1"
    decision = 1 if decision == 'A' else 0

    for tok, score in zip(generated_tokens, transition_scores):
        score_cpu = score.cpu().numpy()
        #print(f"| {tok:5d} | {tokenizer.decode(tok):8s} | {score_cpu:.3f} | {np.exp(score_cpu):.2%}")
        if tokenizer.decode(tok).strip() == 'A' or tokenizer.decode(tok).strip() == 'B':
            internal_confidence = np.exp(score_cpu)
            break

    return raw_response, decision, external_confidence, internal_confidence

In [None]:
def process(filename, model_name, temperature):
    df = pd.read_csv(filename)
    question_list = df['question'].tolist()
    answer_list = df['proposed_answer'].tolist()

    input_text = '''You are an experienced doctor who has rich clinical and medical knowledge. Now, given the following question and the proposed answer from one of your students.
    Question: [QUESTION]
    Stuent Proposed Answer: [ANSWER]
    Please decide if the student's answer is correct or not and rate the confidence of your decision as a percentage ranging from 0% to 100% (inclusive), where 0% is the least confident and 100% is the most confident. Then provide your reasoning of your decision.
    Return your response as follows:
    Correctness: Yes/No
    Confidence: [PERCENTAGE]
    Explanation: [EXPLANATION]
    
    Your Response:'''
    actual_input_text_list = []
    for question, answer in zip(question_list, answer_list):
        actual_input_text = input_text.replace('[QUESTION]', question).replace('[ANSWER]', answer)
        actual_input_text_list.append(actual_input_text)

    raw_response_list = []
    decision_list = []
    external_confidence_list = []
    internal_confidence_list = []
    fails_count = 0
    for i, actual_input_text in enumerate(actual_input_text_list):
        print(f"Processing prompt {i + 1}/{len(actual_input_text_list)}")
        input_ids = tokenizer(actual_input_text, return_tensors="pt").to("cuda")
        outputs = model.generate(**input_ids, max_new_tokens = 64, temperature = temperature, do_sample = temperature > 0, return_dict_in_generate=True, output_scores=True)
        
        transition_scores = model.compute_transition_scores(
            outputs.sequences, outputs.scores, normalize_logits=True
        )

        input_length = input_ids.input_ids.shape[1]
        generated_tokens = outputs.sequences[:, input_length:]

        response = tokenizer.decode(outputs.sequences[:, input_length:][0], skip_special_tokens=True)
        try:
            raw_response, decision, external_confidence, internal_confidence = extra_raw_and_answer_and_confidence(response, generated_tokens[0], transition_scores[0])
        except Exception as e:
            print(e)
            fails_count += 1
            print(f"Cannot extract the decision or the confidence. Current fails: {fails_count}/{i + 1}")
            raw_response, decision, external_confidence, internal_confidence = response.split("\n", 1)[-1], np.nan, np.nan, np.nan
        raw_response_list.append(raw_response)
        decision_list.append(decision)
        external_confidence_list.append(external_confidence)
        internal_confidence_list.append(internal_confidence)
        print(f"The decision is {decision}")
        print(f"The external confidence is {external_confidence}")
        print(f"The internal confidence is {internal_confidence}")
    return raw_response_list, decision_list, external_confidence_list, internal_confidence_list

In [None]:
def process_A_B(filename, model_name, prompt_id, target_ratio, temperature):
    df = pd.read_csv(filename)
    question_list = df['question'].tolist()
    if target_ratio is None:
        choice_A_str_list = df['choice_A_str'].tolist()
        choice_B_str_list = df['choice_B_str'].tolist()
    else:
        choice_A_str_list = df[f'choice_A_str ({target_ratio*100:.0f}%)'].tolist()
        choice_B_str_list = df[f'choice_B_str ({target_ratio*100:.0f}%)'].tolist()

    if prompt_id == 2:
        input_text = '''You are an experienced doctor who has rich clinical and medical knowledge. Now, given the following question and the corresponding options:
        Question: [QUESTION]
        Options:  A.[Option A]; B.[Option B]
        Please choose the correct option and rate the probability of the option being correct as a percentage ranging from 0% to 100% (inclusive), where 0% is the least probability and 100% is the highest probability for the option to be correct. Then provide your reasoning of your decision.
        Return your response as follows:
        Answer: A/B
        Probability: [PERCENTAGE]
        Explanation: [EXPLANATION]
        
        Your Response:'''

    elif prompt_id == 3:
        input_text = '''You are an experienced doctor who has rich clinical and medical knowledge. Please choose the correct option and determine the probability of the selection for the following question:
        Question: [QUESTION]
        Options:  A.[Option A]; B.[Option B]
        The probability should be listed in percentage format ranging from 0% to 100%. 
        Please return your response as follows:
        Answer: A/B
        Probability: [PERCENTAGE]
        
        Your Response:'''

    else:
        raise ValueError("The prompt_id must be either 2 or 3")
        
    actual_input_text_list = []
    for question, choice_A_str, choice_B_str in zip(question_list, choice_A_str_list, choice_B_str_list):
        actual_input_text = input_text.replace('[QUESTION]', question).replace('[Option A]', choice_A_str).replace('[Option B]', choice_B_str)
        actual_input_text_list.append(actual_input_text)

    raw_response_list = []
    decision_list = []
    external_confidence_list = []
    internal_confidence_list = []
    fails_count = 0
    for i, actual_input_text in enumerate(actual_input_text_list):
        print(f"Processing prompt {i + 1}/{len(actual_input_text_list)}")
        #print(f"The input text is:\n{actual_input_text}")
        input_ids = tokenizer(actual_input_text, return_tensors="pt").to("cuda")
        outputs = model.generate(**input_ids, max_new_tokens = 64, temperature = temperature, do_sample = temperature > 0, return_dict_in_generate=True, output_scores=True)
        
        transition_scores = model.compute_transition_scores(
            outputs.sequences, outputs.scores, normalize_logits=True
        )

        input_length = input_ids.input_ids.shape[1]
        generated_tokens = outputs.sequences[:, input_length:]

        response = tokenizer.decode(outputs.sequences[:, input_length:][0], skip_special_tokens=True)
        try:
            raw_response, decision, external_confidence, internal_confidence = extra_raw_and_answer_and_confidence_A_B(response, generated_tokens[0], transition_scores[0])
        except Exception as e:
            print(e)
            fails_count += 1
            print(f"Cannot extract the decision or the confidence. Current fails: {fails_count}/{i + 1}")
            raw_response, decision, external_confidence, internal_confidence = response.split("\n", 1)[-1], np.nan, np.nan, np.nan
        raw_response_list.append(raw_response)
        decision_list.append(decision)
        external_confidence_list.append(external_confidence)
        internal_confidence_list.append(internal_confidence)
        print(f"The decision is {decision}")
        print(f"The external confidence is {external_confidence}")
        print(f"The internal confidence is {internal_confidence}")
    return raw_response_list, decision_list, external_confidence_list, internal_confidence_list

In [None]:
model_name = "YOUR MODEL NAME"

tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForCausalLM.from_pretrained(
    model_name,
    device_map="auto",
    torch_dtype=torch.bfloat16
)

In [None]:
def run(model_name, filename_list, prompt_id_list, target_ratio_list, temperature_list):
    for filename in filename_list:
        for prompt_id in prompt_id_list:
            for target_ratio in target_ratio_list:
                for temperature in temperature_list:
                    print(f"Current parameters: model_name = {model_name}, filename = {filename}, prompt_id = {prompt_id}, target_ratio = {target_ratio}, temperature = {temperature}")
                    if prompt_id == 1:
                        raw_response_list, decision_list, external_confidence_list, internal_confidence_list = process(filename, model_name, temperature)
                        if target_ratio is None:
                            df = pd.read_csv(filename)
                            df[f'Raw Response (temp {temperature}) (temp {temperature}) ({model_name})'] = raw_response_list
                            df[f'Decision (temp {temperature}) ({model_name})'] = decision_list
                            df[f'External Confidence (temp {temperature}) ({model_name})'] = external_confidence_list
                            df[f'Internal Confidence (temp {temperature}) ({model_name})'] = internal_confidence_list
                            df.to_csv(filename, index = False)
                            print(f"Processed file saved to {filename}")
                        else:
                            df = pd.read_csv(filename)
                            df[f'Raw Response ({target_ratio*100:.0f}%) (temp {temperature}) ({model_name})'] = raw_response_list
                            df[f'Decision ({target_ratio*100:.0f}%) (temp {temperature}) ({model_name})'] = decision_list
                            df[f'External Confidence ({target_ratio*100:.0f}%) (temp {temperature}) ({model_name})'] = external_confidence_list
                            df[f'Internal Confidence ({target_ratio*100:.0f}%) (temp {temperature}) ({model_name})'] = internal_confidence_list
                            df.to_csv(filename, index = False)
                            print(f"Processed file saved to {filename}")
                    elif prompt_id == 2:
                        raw_response_list, decision_list, external_confidence_list, internal_confidence_list = process_A_B(filename, model_name, prompt_id, target_ratio, temperature)
                        if target_ratio is None:
                            df = pd.read_csv(filename)
                            df[f'Raw Response (AB) (temp {temperature}) ({model_name})'] = raw_response_list
                            df[f'Decision (AB) (temp {temperature}) ({model_name})'] = decision_list
                            df[f'External Confidence (AB) (temp {temperature}) ({model_name})'] = external_confidence_list
                            df[f'Internal Confidence (AB) (temp {temperature}) ({model_name})'] = internal_confidence_list
                            df.to_csv(filename, index = False)
                            print(f"Processed file saved to {filename}")
                        else:
                            df = pd.read_csv(filename)
                            df[f'Raw Response (AB) ({target_ratio*100:.0f}%) (temp {temperature}) ({model_name})'] = raw_response_list
                            df[f'Decision (AB) ({target_ratio*100:.0f}%) (temp {temperature}) ({model_name})'] = decision_list
                            df[f'External Confidence (AB) ({target_ratio*100:.0f}%) (temp {temperature}) ({model_name})'] = external_confidence_list
                            df[f'Internal Confidence (AB) ({target_ratio*100:.0f}%) (temp {temperature}) ({model_name})'] = internal_confidence_list
                            df.to_csv(filename, index = False)
                            print(f"Processed file saved to {filename}")
                    elif prompt_id == 3:
                        raw_response_list, decision_list, external_confidence_list, internal_confidence_list = process_A_B(filename, model_name, prompt_id, target_ratio, temperature)
                        if target_ratio is None:
                            df = pd.read_csv(filename)
                            df[f'Raw Response (AB2) (temp {temperature}) ({model_name})'] = raw_response_list
                            df[f'Decision (AB2) (temp {temperature}) ({model_name})'] = decision_list
                            df[f'External Confidence (AB2) (temp {temperature}) ({model_name})'] = external_confidence_list
                            df[f'Internal Confidence (AB2) (temp {temperature}) ({model_name})'] = internal_confidence_list
                            df.to_csv(filename, index = False)
                            print(f"Processed file saved to {filename}")
                        else:
                            df = pd.read_csv(filename)
                            df[f'Raw Response (AB2) ({target_ratio*100:.0f}%) (temp {temperature}) ({model_name})'] = raw_response_list
                            df[f'Decision (AB2) ({target_ratio*100:.0f}%) (temp {temperature}) ({model_name})'] = decision_list
                            df[f'External Confidence (AB2) ({target_ratio*100:.0f}%) (temp {temperature}) ({model_name})'] = external_confidence_list
                            df[f'Internal Confidence (AB2) ({target_ratio*100:.0f}%) (temp {temperature}) ({model_name})'] = internal_confidence_list
                            df.to_csv(filename, index = False)
                            print(f"Processed file saved to {filename}")
                    else:
                        raise ValueError("The prompt_id must be 1 or 2 or 3")

In [None]:
def start_task(task):
    if task == 1:
        # Task 1 (Main conclusion)
        #filename_list = ["clinical_knowledge.csv", "college_medicine.csv", "medQA_en.csv", "medQA_zh.csv", "SDoH.csv"]
        filename_list = ["SDoH.csv"]
        prompt_id_list = [2]
        target_ratio_list = [None]
        temperature_list = [0.3, 0.7]
        run(model_name, filename_list, prompt_id_list, target_ratio_list, temperature_list)

    if task == 2:
        # Task 2 (Imbalanced dataset)
        filename_list = ["medQA_en.csv"]
        prompt_id_list = [2]
        target_ratio_list = [0.05, 0.1, 0.3, 0.5, 0.7, 0.9]
        temperature_list = [0]
        run(model_name, filename_list, prompt_id_list, target_ratio_list, temperature_list)

    if task == 3:
        # Task 3 (Sensitivity analysis)
        filename_list = ["medQA_en.csv"]
        prompt_id_list = [1, 3]
        target_ratio_list = [None]
        temperature_list = [0]
        run(model_name, filename_list, prompt_id_list, target_ratio_list, temperature_list)

## Customized Task

In [None]:
filename_list = ["anatomy.csv", "college_biology.csv", "medical_genetics.csv", "professional_medicine.csv"]
prompt_id_list = [2]
target_ratio_list = [None]
temperature_list = [0]
run(model_name, filename_list, prompt_id_list, target_ratio_list, temperature_list)