In [1]:
from huggingface_hub import login

login(token="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")

In [2]:
from transformers import AutoModelForCausalLM, AutoTokenizer
from peft import PeftModel
import torch

base_model = "meta-llama/Llama-3.1-8B-Instruct"

In [3]:
tokenizer = AutoTokenizer.from_pretrained(base_model)
model = AutoModelForCausalLM.from_pretrained(
    base_model,
    torch_dtype=torch.float16,
    device_map="auto"
)


Loading checkpoint shards:   0%|          | 0/4 [00:00<?, ?it/s]

In [4]:
import pandas as pd

test_df = pd.read_csv("./Final_Prompt-Style_Test_Set_V2.csv")
print("Columns in test dataset:", test_df.columns.tolist())

Columns in test dataset: ['prompt', 'diagnosis']


In [5]:
test_df["doctor_prompt"] = test_df["prompt"].apply(
    lambda x: f"Patient: {x}\nDoctor: Based on the above, what is the most likely diagnosis and what advice do you recommend?\n Answer start with Diagnosis: XXX."
)

In [6]:
def generate_output(prompt, max_new_tokens=200):
    inputs = tokenizer(prompt, return_tensors="pt").to(model.device)
    with torch.no_grad():
        output_ids = model.generate(
            **inputs,
            max_new_tokens=max_new_tokens,
            do_sample=True,
            temperature=0.7,
            top_p=0.9,
            eos_token_id=tokenizer.eos_token_id
        )
    output_text = tokenizer.decode(output_ids[0], skip_special_tokens=True)
    return output_text

In [7]:
def extract_answer(output_text):
    if "Doctor:" in output_text:
        answer = output_text.split("Doctor:")[-1].strip()
    elif "Output:" in output_text:
        answer = output_text.split("Output:")[-1].strip()
    else:
        answer = output_text.strip()
    return answer

output_text = "Instruction: ...\nDoctor: Diagnosis: Hypertension\nAdvice: Please consult a physician."
print(extract_answer(output_text))

Diagnosis: Hypertension
Advice: Please consult a physician.


In [8]:
test_df["generated_output"] = test_df["prompt"].apply(generate_output)

Setting `pad_token_id` to `eos_token_id`:128009 for open-end generation.
Setting `pad_token_id` to `eos_token_id`:128009 for open-end generation.
Setting `pad_token_id` to `eos_token_id`:128009 for open-end generation.
Setting `pad_token_id` to `eos_token_id`:128009 for open-end generation.
Setting `pad_token_id` to `eos_token_id`:128009 for open-end generation.
Setting `pad_token_id` to `eos_token_id`:128009 for open-end generation.
Setting `pad_token_id` to `eos_token_id`:128009 for open-end generation.
Setting `pad_token_id` to `eos_token_id`:128009 for open-end generation.
Setting `pad_token_id` to `eos_token_id`:128009 for open-end generation.
Setting `pad_token_id` to `eos_token_id`:128009 for open-end generation.
Setting `pad_token_id` to `eos_token_id`:128009 for open-end generation.
Setting `pad_token_id` to `eos_token_id`:128009 for open-end generation.
Setting `pad_token_id` to `eos_token_id`:128009 for open-end generation.
Setting `pad_token_id` to `eos_token_id`:128009 for

In [9]:
test_df["extracted_diagnosis"] = test_df["generated_output"].apply(extract_answer)

In [10]:
test_df["extracted_diagnosis"] = test_df["extracted_diagnosis"].str.lower()
test_df["diagnosis_lower"] = test_df["diagnosis"].str.lower()

In [11]:
# !pip install sacrebleu

In [12]:
test_df.to_csv("./base_predictions_V2.csv", index=False, encoding="utf-8")


In [13]:
# !pip install rouge_score

In [25]:
import pandas as pd
import numpy as np
from rouge_score import rouge_scorer
from nltk.translate.meteor_score import meteor_score
from fuzzywuzzy import fuzz
import re

def compute_rouge(refs, preds):
    scorer = rouge_scorer.RougeScorer(['rouge1', 'rougeL'], use_stemmer=True)
    rouge1, rougel = [], []
    for ref, pred in zip(refs, preds):
        scores = scorer.score(ref, pred)
        rouge1.append(scores['rouge1'].fmeasure)
        rougel.append(scores['rougeL'].fmeasure)
    return np.mean(rouge1), np.mean(rougel)

def compute_avg_meteor(refs, preds, verbose=True):
    scores = []
    for i, (ref, pred) in enumerate(zip(refs, preds)):
        score = meteor_score([ref.split()], pred.split())
        scores.append(score)        
        # if verbose:
        #     print(f"[{i}]")
        #     print(f"GT   : {ref}")
        #     print(f"PRED : {pred}")
        #     print(f"METEOR : {score:.4f}")
        #     print("-" * 40)
    return np.mean(scores)

def compute_fuzzy_accuracy(refs, preds, threshold=80):
    matches = [fuzz.partial_ratio(ref.lower(), pred.lower()) >= threshold for ref, pred in zip(refs, preds)]
    return sum(matches) / len(matches)


df = pd.read_csv("./base_predictions_V2.csv")
# gt_list = df["ground_truth"].astype(str).tolist()
# df["extracted_diagnosis"] = df["extracted_diagnosis"].apply(
#     lambda x: np.nan if isinstance(x, str) and x.count('\n') >= 60 else x
# )

# df["extracted_diagnosis"] = df["extracted_diagnosis"].apply(
#     lambda x: np.nan if isinstance(x, str) and len(x.split()) > 5 else x
# )

gt_list = df["diagnosis_lower"].astype(str).str.strip().str.lower().tolist()


def clean_prediction(pred):
    match = re.search(r"Diagnosis:\s*(.+)", pred, re.IGNORECASE)
    if match:
        diagnosis_text = match.group(1).strip()
        diagnosis_first_line = re.split(r"\s*\n", diagnosis_text)[0].strip()
        return diagnosis_first_line.lower()
    else:
        return pred.strip().lower()

pred_list = df["extracted_diagnosis"].astype(str).apply(clean_prediction).tolist()

def extract_diagnosis(text):
    match = re.search(r'diagnosis is\s+(?:a\s+)?(.*?)(?:[.,]|$)', text, re.IGNORECASE)
    if match:
        return match.group(1).strip()
    return ""

pred_list = [extract_diagnosis(text) or "" for text in pred_list]

r1, rl = compute_rouge(gt_list, pred_list)
meteor = compute_avg_meteor(gt_list, pred_list)
fuzzy_acc = compute_fuzzy_accuracy(gt_list, pred_list)

print(f"ROUGE-1 F1: {r1:.2f}")
print(f"ROUGE-L F1: {rl:.2f}")
print(f"METEOR Score: {meteor:.2f}")
print(f"Fuzzy Matching Accuracy: {fuzzy_acc * 100:.2f}%")


ROUGE-1 F1: 0.19
ROUGE-L F1: 0.19
METEOR Score: 0.12
Fuzzy Matching Accuracy: 29.88%


In [26]:
from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction

def compute_avg_bleu(refs, preds, verbose=True):
    smoothie = SmoothingFunction().method4
    bleu_scores = []

    for i, (ref, pred) in enumerate(zip(refs, preds)):
        ref_tokens = ref.split()
        pred_tokens = pred.split()
        bleu = sentence_bleu([ref_tokens], pred_tokens, weights=(1, 0, 0, 0), smoothing_function=smoothie)
        bleu_scores.append(bleu)
        
        # if verbose:
        #     print(f"[{i}]")
        #     print(f"GT   : {ref}")
        #     print(f"PRED : {pred}")
        #     print(f"BLEU : {bleu:.4f}")
        #     print("-" * 40)

    return np.mean(bleu_scores)

bleu = compute_avg_bleu(gt_list, pred_list)
print(f"BLEU Score: {bleu:.2f}")


BLEU Score: 0.17
