<a href="https://colab.research.google.com/github/Ayomidejoe/NaijaMedQA_SLL_Eval/blob/main/NaijaMedQASLLeval.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Install dependencies for Gemini and Hugging Face quantization
!pip install -q google-generativeai nest_asyncio
!pip install -q transformers accelerate bitsandbytes
!pip install -q sentence-transformers evaluate rouge_score
!pip install -q evaluate # Ensure evaluate is installed
!pip install -U bitsandbytes

In [None]:
import pandas as pd
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM, BitsAndBytesConfig
import gc
import asyncio
import nest_asyncio
import google.generativeai as genai
from google.api_core import exceptions
from sentence_transformers import SentenceTransformer, util
import evaluate
import numpy as np
import json
import time
import os
import google.auth
import gspread
import gspread_dataframe
from google.auth import default as get_default_credentials

In [None]:
import google.auth
from google.colab import auth
# 1. Authenticate the user
auth.authenticate_user()

# 2. Get the credentials
creds, _ = google.auth.default()

# 3. Authorize gspread
gs = gspread.authorize(creds)

# 4. Open the sheet
spreadsheet_url = "https://docs.google.com/spreadsheets/d/13AsHGT68HfClFbPXLJNjrXSTtq7UggnqkXKgV9YC78w"
wks = gs.open_by_url(spreadsheet_url).worksheet('Sheet1')

# 5. Correct function name: get_as_dataframe
df_gsheet = gspread_dataframe.get_as_dataframe(wks)

# 6. Select columns and clean up empty rows/columns (common with gspread_dataframe)
df_test_cases = df_gsheet[['Question', 'Doctor_answer']].head(5)

print("Successfully loaded data:")
print(df_test_cases)

In [None]:
# Apply Async Patch (if not already applied)
nest_asyncio.apply()

# Setup Google Gemini (The Judge)
# Ensure your GOOGLE_API_KEY is set in the environment or directly here
# For Colab, it's recommended to store it as a Colab secret
os.environ["GOOGLE_API_KEY"] = "APIKEY"
genai.configure(api_key=os.environ.get("GOOGLE_API_KEY"), transport="rest")

# Re-defining model_ids and bnb_config
model_ids = [
    "Qwen/Qwen2.5-1.5B-Instruct",
    "microsoft/phi-3.5-mini-instruct",
    "TinyLlama/TinyLlama-1.1B-Chat-v1.0",
    "stabilityai/stablelm-zephyr-3b"

]
bnb_config = BitsAndBytesConfig(
    load_in_4bit=True,
    bnb_4bit_compute_dtype=torch.float16,
    bnb_4bit_quant_type="nf4"
)

# Helper function to clear GPU memory
def cleanup():
    # Using 'global' ensures we are pointing to the models defined outside
    global model, tokenizer

    if 'model' in globals(): del model
    if 'tokenizer' in globals(): del tokenizer

    import gc # Import here specifically to be safe
    gc.collect()

    if torch.cuda.is_available():
        torch.cuda.empty_cache()
        torch.cuda.ipc_collect()

    print("GPU Memory Cleared.")

In [None]:
# --- Data Structure ---
class LLMTestCase:
    def __init__(self, input_query: str, reference_answer: str):
        self.input = input_query
        self.reference = reference_answer  # The "Gold Standard"
        self.actual_output = None
        self.scores = {} # Dictionary to store multiple metrics
        self.reasoning = None

# --- The "Evaluatee" (Lightweight Model Wrapper) ---
class MedicalLLMClient:
    def __init__(self, model, tokenizer):
        self.model = model
        self.tokenizer = tokenizer

    def generate_answer(self, test_case: LLMTestCase):
        messages = [
            {"role": "system", "content": "You are a specialized medical assistant. Answer the user's question. Be highly concise and professional."},
            {"role": "user", "content": test_case.input}
        ]

        text = self.tokenizer.apply_chat_template(
            messages,
            tokenize=False,
            add_generation_prompt=True
        )

        inputs = self.tokenizer([text], return_tensors="pt").to(self.model.device)

        with torch.no_grad():
            outputs = self.model.generate(
                **inputs,
                max_new_tokens=150,
                temperature=0.3 # Keep it factual
            )

        response = self.tokenizer.decode(outputs[0], skip_special_tokens=True)

        if "assistant" in response:
            return response.split("assistant")[-1].strip()
        return response.split("\n")[-1].strip()

# --- The "Judge" (Gemini Wrapper) ---
class GeminiRelevancyMetric:
    def __init__(self):
        self.judge_model = genai.GenerativeModel(
            model_name='gemini-2.5-flash'
        )

    def evaluate_with_retry(self, prompt, retries=3, delay=2):
        for i in range(retries):
            try:
                response = self.judge_model.generate_content(
                    prompt,
                    generation_config={"response_mime_type": "application/json"}
                )
                return response.text
            except (exceptions.InternalServerError, exceptions.ServiceUnavailable, Exception) as e:
                if i == retries - 1: raise e
                print(f"   Connection glitch, retrying in {delay}s... ({str(e)[:50]})")
                time.sleep(delay)
                delay *= 2

    async def evaluate(self, test_case: LLMTestCase):
        evaluation_prompt = f"""
        You are an expert medical evaluator.
        rate the answer given and give score with the highest being 1.

        Question: {test_case.input}
        Actual Output: {test_case.actual_output}

        Return JSON: {{\"score\": float, \"reason\": string}}
        """

        try:
            loop = asyncio.get_event_loop()
            raw_text = await loop.run_in_executor(None, self.evaluate_with_retry, evaluation_prompt)

            clean_json = raw_text.replace('```json', '').replace('```', '').strip()
            result = json.loads(clean_json)

            test_case.score = result.get("score", 0)
            test_case.reasoning = result.get("reason", "No reason provided")
        except Exception as e:
            test_case.score = 0
            test_case.reasoning = f"Evaluation Failed: {str(e)}"

        return test_case.score

# --- Advanced Evaluator ---
class AdvancedEvaluator:
    def __init__(self):
        self.sbert_model = SentenceTransformer('all-MiniLM-L6-v2')
        self.rouge = evaluate.load('rouge')

    def calculate_metrics(self, test_case: LLMTestCase):
        ref = test_case.reference
        hyp = test_case.actual_output

        emb1 = self.sbert_model.encode(ref, convert_to_tensor=True)
        emb2 = self.sbert_model.encode(hyp, convert_to_tensor=True)
        sbert_score = util.cos_sim(emb1, emb2).item()

        rouge_results = self.rouge.compute(
            predictions=[hyp],
            references=[ref],
            use_aggregator=False
        )

        test_case.scores = {
            "SBERT_Similarity": round(sbert_score, 4),
            "F1_Score": round(rouge_results['rougeL'][0], 4),
            "Recall": round(rouge_results['rouge1'][0], 4),
            "Precision": round(rouge_results['rouge2'][0], 4)
        }

# 1. Define Medical Test Cases from Google Sheet
def create_medical_test_cases(df):
    medical_test_cases = []
    for _, row in df.iterrows():
        medical_test_cases.append(
            LLMTestCase(
                input_query=row['Question'],
                reference_answer=row['Doctor_answer']
            )
        )
    return medical_test_cases

In [None]:
# --- Generate and Evaluate ---
async def run_full_pipeline(model, tokenizer, test_cases):
    print(f"--- Processing {len(test_cases)} Medical Queries ---")

    med_client = MedicalLLMClient(model, tokenizer)
    gemini_judge = GeminiRelevancyMetric()
    adv_evaluator = AdvancedEvaluator()

    for i, case in enumerate(test_cases):
        print(f"\n[{i+1}/{len(test_cases)}] Question: {case.input}")
        case.actual_output = med_client.generate_answer(case)
        print(f"   Response: {case.actual_output[:100]}...")

        print("   Calculating SBERT & Overlap scores...")
        adv_evaluator.calculate_metrics(case)

        print("   Requesting Gemini Judge Evaluation...")
        await gemini_judge.evaluate(case)
        print(f"   Score: {case.score} | Reason: {case.reasoning[:60]}...")

        if i < len(test_cases) - 1:
            print("   Sleeping 6s to avoid API Quota (429) errors...")
            await asyncio.sleep(6)

    print("\nAll evaluations complete for this model.")

    current_model_results = []
    for c in test_cases:
        res = {
            "Question": c.input,
            "Judge Score": c.score,
            "SBERT": c.scores.get("SBERT_Similarity"),
            "F1": c.scores.get("F1_Score"),
            "Recall": c.scores.get("Recall"),
            "Judge Reason": c.reasoning
        }
        current_model_results.append(res)
    return current_model_results


# --- The Comparison Loop ---
all_model_results_list = []

for m_id in model_ids:
    print(f"\nNOW LOADING: {m_id}")

    # Use float16 or 4-bit quantization to save RAM
    tokenizer = AutoTokenizer.from_pretrained(m_id)
    model = AutoModelForCausalLM.from_pretrained(
        m_id,
        quantization_config=bnb_config,
        device_map="auto",
        low_cpu_mem_usage=True  # Important to prevent RAM spikes
    )

    dataset = create_medical_test_cases(df_test_cases)
    model_evaluation_results = await run_full_pipeline(model, tokenizer, dataset)

    for res in model_evaluation_results:
        res["Model_ID"] = m_id
        all_model_results_list.append(res)

    # --- AGGRESSIVE CLEANUP ---
    del model
    del tokenizer
    import gc
    gc.collect()
    if torch.cuda.is_available():
        torch.cuda.empty_cache()

# 4. Display Results in a Table
pd.set_option('display.max_colwidth', 150)
df_display = pd.DataFrame(all_model_results_list)
display(df_display)

# Make the combined results available for plotting
global results_df
results_df = df_display # This will be used by the plotting cell

In [None]:
#Calculate Average Scores and Rank Models
df_model_avg_scores = results_df.groupby('Model_ID')[['Judge Score', 'SBERT', 'F1', 'Recall']].mean().reset_index()
df_model_avg_scores = df_model_avg_scores.sort_values(by='Judge Score', ascending=False)
print("Average Scores and Model Ranks:")
display(df_model_avg_scores)

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns

# 1. Bar chart for Average Judge Score
plt.figure(figsize=(10, 6))
sns.barplot(x='Model_ID', y='Judge Score', data=df_model_avg_scores, palette='viridis', hue='Model_ID', legend=False)
plt.title('Average Judge Score per Model')
plt.xlabel('Model ID')
plt.ylabel('Average Judge Score')
plt.xticks(rotation=45, ha='right')
plt.tight_layout()
plt.show()

# 2. Grouped bar chart for SBERT, F1, and Recall
df_melted_scores = df_model_avg_scores.melt(id_vars=['Model_ID'], value_vars=['SBERT', 'F1', 'Recall'],
                                            var_name='Metric', value_name='Score')

plt.figure(figsize=(12, 7))
sns.barplot(x='Model_ID', y='Score', hue='Metric', data=df_melted_scores, palette='tab10')
plt.title('Comparison of Average SBERT, F1, and Recall Scores per Model')
plt.xlabel('Model ID')
plt.ylabel('Average Score')
plt.xticks(rotation=45, ha='right')
plt.legend(title='Metric')
plt.tight_layout()
plt.show()