In [1]:
import os

import google.generativeai as genai

  from .autonotebook import tqdm as notebook_tqdm


In [3]:
import csv
def csv_to_json_evaluate(csv_file_path: str) -> list[dict]:
    """
    Convert CSV file to JSON format
    Assumes columns: question_text, Answer, Answer2
    """
    data = []
    with open(csv_file_path, 'r', encoding='utf-8') as f:
        reader = csv.DictReader(f)
        for idx, row in enumerate(reader, start=1):
            data.append({
                "question_text": row.get("question_text", "").strip(),
                "ground_truth": row.get("ground truth", "").strip(),
            })
    return data

In [4]:
evaluate_data = csv_to_json_evaluate('/Users/hovietbach/Downloads/tropical_disease_evaluate_dataset.csv')

In [5]:
evaluate_data

[{'question_text': 'How can I get treated for Guinea worm? I live in Sarh area in Southeast Chari.',
  'ground_truth': 'Guinea worm'},
 {'question_text': 'How can I tell if my 10 year old son has Gardiasis? We are from the Semai tribe in Peninsula Malaysia.',
  'ground_truth': 'Gardiasis'},
 {'question_text': 'How do I know if I have Nipah Virus? I am a 16 year old boy living in Naogaon.',
  'ground_truth': 'Nipah Virus'},
 {'question_text': 'How would you treat river blindness for a 45 year old man who lives in the Brong Ahafo Region in Ghana.',
  'ground_truth': 'Onchocerciasis(River \nblindness)'},
 {'question_text': 'I am a 16 year old boy living in Naogaon. I have a fever, headaches, myalgia, vomiting and a sore throat. Do I have COVID or Nipah virus?',
  'ground_truth': 'Nipah Virus\nCOVID-19'},
 {'question_text': 'I am a 28 year old man showing signs of sleeping sickness, what should I do?',
  'ground_truth': 'sleeping sickness \n(African trypanosomiasis)'},
 {'question_text': '

In [6]:
subset_evaluate_data = evaluate_data[:10]

In [7]:
len(subset_evaluate_data)

10

## Instruction

In [19]:
INSTRUCTION = """You are an expert in tropical and infectious diseases. Provide an evidence-based likely diagnosis consistent with guidance from authoritative public-health sources (e.g., WHO, CDC) when applicable. You will be provided with symptoms, and contextual information such as age, gender, location, pre-existing conditions and lifestyle factors.

Use evidence in this priority order:

Patient’s symptoms + history (highest priority)

Geography/epidemiology (where they are / recent travel / exposures)

Retrieved RAG cases (supporting evidence only)

Critical rule: If retrieved cases conflict with the patient’s presentation, trust the patient’s presentation.

Language handling: Accept both clinical and lay language. When helpful, interpret lay terms into clinical terms internally (e.g., “really tired” → fatigue; “muscles and joints ache” → myalgia/arthralgia), but keep the final explanation clear and readable.

Output format (always):
Diagnosis: [Precise Disease Name]
Explanation: [2–4 sentences grounded in the patient’s symptoms, geography/epidemiology, and (when relevant) retrieved cases. Briefly interpret lay terms only if it clarifies.]

Example 1
User prompt:
Use the retrieved cases below as your main evidence when helpful.

Question:
I’ve had a fever that keeps getting higher every day, and now my stomach hurts and I feel extremely tired. I also noticed I'm having constipation sometimes and then diarrhea the next day. Is this something serious or just a normal infection?

Retrieved cases from RAG pipeline:
[EXPOSURE AND EPIDEMIOLOGY]\nDisease: Enteric Fever\nFinal Diagnosis: Suspected enteric infection related to Salmonella species\nVitals: Not documented\nContent: Enteric fever remains endemic in parts of South and Southeast Asia, particularly in areas with limited access to clean water and sanitation. Infection commonly follows ingestion of food or drinking water contaminated with human feces. Travelers or residents in rural or peri-urban settings are at increased risk, especially during warmer months when bacterial proliferation in water supplies is more likely. Person-to-person transmission can occur in households with poor hand hygiene.\n\n[Clinical Features]\nContent: Patients with prolonged febrile illnesses in tropical regions may develop nonspecific gastrointestinal complaints. Alternating constipation and diarrhea can occur in systemic infections affecting the intestinal lymphoid tissue. Marked fatigue, abdominal discomfort, and anorexia are common, and fever often shows a stepwise daily increase rather than abrupt spikes. These features overlap with other causes of undifferentiated tropical fever, making clinical diagnosis challenging.\n\n[SYMPTOMS AND SIGNS]\nDisease: Typhoid Fever\nFinal Diagnosis: Typhoid fever caused by Salmonella enterica serovar Typhi\nVitals: Temperature up to 39.5°C reported historically\nContent: Typhoid fever typically presents with a gradually rising fever over several days, profound fatigue, and abdominal pain. Gastrointestinal manifestations may include constipation early in the illness, followed by diarrhea later. Physical examination can reveal abdominal tenderness and, in some cases, hepatosplenomegaly or faint rose-colored macules on the trunk (rose spots). These findings reflect systemic bacterial dissemination.\n\n[MANAGEMENT AND CLINICAL COURSE]\nDisease: Acute Viral Gastroenteritis\nFinal Diagnosis: Self-limited viral gastroenteritis\nVitals: Mild dehydration noted\nContent: Viral gastroenteritis often causes acute onset of diarrhea, vomiting, and low-grade fever lasting a few days. Symptoms usually improve with supportive care such as oral rehydration. Unlike invasive bacterial infections, fever does not typically show a progressive daily rise, and severe systemic fatigue is less prominent. Constipation is uncommon in purely viral causes.

## Response format:
Diagnosis: Typhoid fever
Explanation: A fever that rises each day, stomach pain, and extreme tiredness (fatigue) with alternating constipation and diarrhea fits the classic pattern described for typhoid/enteric fever more than typical viral gastroenteritis. The retrieved cases specifically note stepwise rising fever plus abdominal pain and constipation that can later shift to diarrhea, matching the symptom trajectory. Typhoid is caused by Salmonella enterica serovar Typhi and is commonly acquired from contaminated food or water, so confirmatory testing is needed. If symptoms worsen or dehydration develops, urgent medical evaluation is important.

Example 2
User prompt:
Use the retrieved cases below as your main evidence when helpful.

Question:
A 35-year-old woman, currently living in New York after recently returning from a vacation to a Caribbean island, started experiencing sudden high fever, headache, fatigue, a rash, nausea, and red eyes about five days after her return. She has no known pre-existing conditions. After about two weeks, most of her initial symptoms improved, but she has continued to have significant joint and muscle pain for the past two months. What could be causing her prolonged joint and muscle pain after these initial symptoms, considering her recent travel?

Retrieved cases from RAG pipeline:
[Cardiovascular Disease Management]\nContent: Hypertension, or high blood pressure, is a chronic medical condition in which the blood pressure in the arteries is persistently elevated. It is often asymptomatic but can lead to serious complications such as heart disease, stroke, kidney failure, and retinopathy if left untreated. Lifestyle modifications, including diet and exercise, are crucial for management, often supplemented by antihypertensive medications.\n\n[EXPOSURE AND EPIDEMIOLOGY]\nDisease: Arboviral Infection\nFinal Diagnosis: Suspected Chikungunya\nVitals: Febrile, stable\nContent: Mosquito-borne illnesses, transmitted by Aedes species, are common in the Caribbean. Initial presentation often includes abrupt high fever, severe headache, myalgia, and a maculopapular rash. Conjunctivitis can also be observed. The incubation period typically ranges from 3-7 days post-exposure, aligning with symptom onset experienced by travelers returning from affected regions.\n\n[SYMPTOMS AND SIGNS]\nDisease: Dengue Fever\nFinal Diagnosis: Dengue Fever\nVitals: Febrile, stable\nContent: Dengue fever, a common arboviral infection in tropical and subtropical regions, often presents with sudden onset of high fever, severe headache (often retro-orbital), myalgia, and arthralgia. A characteristic maculopapular rash can appear 3-5 days after fever onset. Nausea and vomiting are common. While joint pains are present, they are typically less severe and less prolonged than in Chikungunya, resolving within a few weeks of illness onset. Bleeding manifestations can occur, particularly in severe cases.\n\n[Post-Chikungunya Chronic Arthralgia]\nContent: A hallmark of Chikungunya virus infection is the potential for debilitating polyarthralgia that can persist for weeks, months, or even years after the acute febrile illness, particularly in adults. Travelers returning from endemic areas, such as the Caribbean, are at risk. This chronic joint pain often affects multiple joints, including small joints of the hands and feet, and can significantly impair daily activities, requiring long-term management.

## Response format:
Diagnosis: Chikungunya
Explanation: The abrupt high fever, headache, rash, nausea, and red eyes (conjunctivitis) starting about five days after Caribbean travel fits an Aedes-borne arboviral illness, and the prolonged joint and muscle pain (persistent arthralgia/myalgia) strongly supports chikungunya. The retrieved cases note incubation of ~3–7 days and highlight chronic polyarthralgia lasting months as a hallmark feature, which matches the course described. Dengue can cause body aches but typically does not cause months-long debilitating joint pain. Confirmation is usually via appropriate serology/PCR depending on timing, and management focuses on supportive care and pain control."""

## PROMPT TEMPLATES

In [20]:
PROMPT_SEMANTIC_MUTATOR = """You are a clinical expert generating alternative case descriptions for {disease}.

Given:
- Original QUESTION: {question}
- DISEASE: {disease}

Task: Generate {n_variants} alternative case descriptions with:
- SAME disease ({disease})
- DIFFERENT symptoms (clinically compatible)
- DIFFERENT context (timing, exposure, location, occupation)
- PRESERVE ORIGINAL STYLE: If first-person, keep first-person. If third-person, keep third-person.
- Natural, conversational language (avoid unnecessarily complex jargon)
- 3-6 sentences each
- Do NOT name the disease explicitly
- Do NOT include features suggesting a different diagnosis

Return as JSON list: ["newquestion1", "newquestion2", ...]
"""

PROMPT_DEMOGRAPHIC_SAMPLER = """You are generating realistic patient demographic descriptions for disease: {disease}

Task: Create {n_demographics} DEMOGRAPHIC descriptions based on:
- ORIGIN: countries/regions where {disease} is common
- LOCATION: specific towns, provinces, rural/urban areas
- ETHNICITY: groups that live in/visit those areas
- SEX/GENDER: male, female, or other
- AGE GROUP: child, adolescent, young adult, middle-aged, older adult
- SOCIOECONOMIC STATUS: low-income, working-class, middle-class, high-income
- DISABILITY STATUS: if relevant

Constraints:
- Single noun phrase only (no explanations)
- No medical conditions or symptoms
- Ethnicity and socioeconomic status must be coherent with origin/location
- Prioritize clear ORIGIN and LOCATION

Return as JSON list: ["demographic1", "demographic2", ...]
"""

PROMPT_DEMOGRAPHIC_PLAUSIBLE_FILTER = """Given the following demographic description, is this a realistic, plausible person?

DEMOGRAPHIC: {demographic}

Answer with ONLY one word: "Yes" or "No"
"""

PROMPT_DEMOGRAPHIC_MUTATOR = """You are rewriting a clinical case to match a new patient demographic.

Given:
- Original QUESTION: {original_question}
- NEW DEMOGRAPHIC: {new_demographic}
- DISEASE: {disease}

Task: Rewrite by:
- Updating demographics (age, location, background) to match {new_demographic}
- Keeping disease and symptoms compatible with {disease}
- PRESERVE ORIGINAL STYLE: If first-person ("I", "My"), keep first-person. If third-person, keep third-person.
- Keep language natural and conversational
- Do NOT explicitly name the disease

Return ONLY the rewritten question.
"""

PROMPT_CONSUMER_REWRITE = """Convert this clinical case description into a first-person consumer-style complaint.

CLINICAL TEXT: {clinical_text}

Task:
- Rewrite in patient's voice (I, my, me)
- Use layperson language, avoid medical jargon where possible
- Keep all key information: symptoms, timing, exposures, travel, animals
- Preserve disease consistency (don't add/remove major features)
- 2-4 sentences
- Keep the style natural and conversational

Return ONLY the rewritten text.
"""

PROMPT_CONTEXT_COMBINATION = """You are a clinical case rewriter. You will receive several data segments and a filter list.

DATA SEGMENTS:
- "attributes": {attributes}
- "general_symptoms": {general_symptoms}
- "specific_symptoms": {specific_symptoms}
- "location": {location}
- "risk_factors": {risk_factors}

FILTER:
- INCLUDE_LIST: {include_list}

TASK:
1. Identify the keys listed in the INCLUDE_LIST above.
2. Select ONLY the content from the DATA SEGMENTS that corresponds to those keys.
3. STRICTLY IGNORE any data segment whose key is NOT in the INCLUDE_LIST.
4. Construct a single third-person clinical question (2-4 sentences) by combining the selected facts.

CONSTRAINTS:
- Create a natural flow using glue words (e.g., "presenting with", "living in").
- Do NOT invent new facts.
- Do NOT name the disease.
- Return ONLY the final rewritten question text.
"""

PROMPT_FILTER_CHECKER = """Quality check: Is this synthetic case acceptable?

DISEASE: {disease}
CASE DESCRIPTION: {case_text}

Criteria:
- PLAUSIBLE for {disease}?
- Internally consistent (demographics, location, exposures, symptoms make sense)?
- No key symptoms that contradict {disease}?
- No contradictions in text?

Answer with EXACTLY one word: "keep" or "discard"
"""

PROMPT_SYMPTOM_PLAUSIBLE_FILTER = """Consider the following QUESTION and medical condition.

QUESTION: {question}
DISEASE: {disease}

Do the symptoms in the QUESTION reasonably match {disease}?

Answer with ONLY: "Yes" or "No"
"""

PROMPT_EXTRACTOR = """
Analyze the clinical text and extract specific data segments.

Text: {clinical_text}

Output JSON with these keys (value must be string or null):
- "attributes": (Demographics, age, gender)
- "general_symptoms": (Fever, fatigue)
- "specific_symptoms": (Lesions, distinct pains)
- "location": (Geographic location, travel)
- "risk_factors": (Tribes, occupations, habits)

Return ONLY valid JSON.
"""

PROMPT_VARIANT_GENERATOR = """
You are a clinical data synthesizer.
Given the following EXTRACTED DATA, generate {n_variants} distinct clinical questions.

EXTRACTED DATA:
{json_data}

INSTRUCTIONS:
For each variant:
1. define a unique "include_list" (select 2-4 different segments from the data).
2. vary the order of information (e.g., start with Symptoms vs start with Location vs Attributes).
3. write the clinical question.

Output format: A JSON LIST of objects:
[
  {{
    "question_text": "The rewritten question...",
    "include_list": ["attributes", "location"]
  }},
  ...
]

CONSTRAINTS:
- Create a natural flow using glue words (e.g., "presenting with", "living in").
- Do NOT invent new facts.
- Do NOT name the disease.

Return ONLY the JSON list.
"""

In [28]:
def call_gemini(prompt: str, temperature: float = 0.7) -> str:
    """Call Gemini API using gemini-3-flash-preview"""
    # Select the specific model from your list
    gemini_model = genai.GenerativeModel('gemini-2.0-flash-001')

    # Configure generation parameters to match your Claude settings
    generation_config = genai.types.GenerationConfig(
        temperature=temperature,
        max_output_tokens=2048
    )

    try:
        response = gemini_model.generate_content(
            prompt,
            generation_config=generation_config
        )
        return response.text
    except Exception as e:
        print(f"Error calling Gemini: {e}")
        return ""

In [22]:
genai.configure(api_key=os.environ["GEMINI_API_KEY"])
models = genai.list_models()
for m in models:
    print(m.name, "→", m.supported_generation_methods)

models/embedding-gecko-001 → ['embedText', 'countTextTokens']
models/gemini-2.5-flash → ['generateContent', 'countTokens', 'createCachedContent', 'batchGenerateContent']
models/gemini-2.5-pro → ['generateContent', 'countTokens', 'createCachedContent', 'batchGenerateContent']
models/gemini-2.0-flash-exp → ['generateContent', 'countTokens', 'bidiGenerateContent']
models/gemini-2.0-flash → ['generateContent', 'countTokens', 'createCachedContent', 'batchGenerateContent']
models/gemini-2.0-flash-001 → ['generateContent', 'countTokens', 'createCachedContent', 'batchGenerateContent']
models/gemini-2.0-flash-exp-image-generation → ['generateContent', 'countTokens', 'bidiGenerateContent']
models/gemini-2.0-flash-lite-001 → ['generateContent', 'countTokens', 'createCachedContent', 'batchGenerateContent']
models/gemini-2.0-flash-lite → ['generateContent', 'countTokens', 'createCachedContent', 'batchGenerateContent']
models/gemini-2.0-flash-lite-preview-02-05 → ['generateContent', 'countTokens', '

In [23]:
new_generated_questions = []

## Pipeline 1 (Symptom semantic mutator)

In [24]:
import concurrent.futures


def generate_variants_for_item(item: dict, n_variants: int) -> list[dict] | None:
    """
    Helper function to process a single dataset item.
    Returns a list of dictionary results for this specific item.
    """
    question = item["question_text"]
    disease = item["ground_truth"]

    prompt = PROMPT_SEMANTIC_MUTATOR.format(
        question=question,
        disease=disease,
        n_variants=n_variants
    )

    try:
        response = call_gemini(prompt)

        # Cleaning logic
        cleaned_response = response.replace("```json", "").replace("```", "").strip()

        variants = json.loads(cleaned_response)

        # Normalize output (handle both dict and list returns)
        if isinstance(variants, dict):
            new_generated_questions = list(variants.values())
        elif isinstance(variants, list):
            new_generated_questions = variants
        else:
            raise ValueError("Output format not recognized (expected list or dict)")

        # Create the partial result list for this item
        item_results = []
        for new_question in new_generated_questions:
            item_results.append({
                'question_text': new_question,
                'ground_truth': disease
            })

        print(f"✓ Generated {len(item_results)} variants for question: {question[:30]}...")
        return item_results

    except (json.JSONDecodeError, ValueError) as e:
        print(f"⚠ Error parsing/formatting for item '{question[:20]}...': {e}")
        print(f"--- RAW RESPONSE ---\n{response}\n--------------------")
        return None

def semantic_mutator(evaluate_dataset: list[dict], n_variants: int = 3) -> list[dict]:
    results = []

    # We wrap the executor in a try/finally block to ensure cleanup
    # or handle the logic explicitly inside.
    executor = concurrent.futures.ThreadPoolExecutor(max_workers=10)

    # Submit all items
    future_to_item = {
        executor.submit(generate_variants_for_item, item, n_variants): item
        for item in evaluate_dataset
    }

    print(f"Drafted {len(future_to_item)} tasks. Starting execution...")

    try:
        # Process as they complete
        for idx, future in enumerate(concurrent.futures.as_completed(future_to_item)):
            try:
                # future.result() will raise the exception if the API call failed
                batch_results = future.result()

                # Check if batch_results is not None (your helper returns None on JSON error)
                if batch_results:
                    results.extend(batch_results)

            except Exception as exc:
                # ---------------------------------------------------------
                # CRITICAL CHANGE: Handle the crash (e.g., Quota Exceeded)
                # ---------------------------------------------------------
                print(f"\nProcessing item {idx + 1}/{len(future_to_item)}")
                print(f"\n!!!! CRITICAL ERROR ENCOUNTERED: {exc}")
                print(f"!!!! Preserving {len(results)} items generated so far...")

                # OPTIONAL: Check specifically for Quota errors if you want
                # if "429" in str(e) or "ResourceExhausted" in str(e): ...

                # 1. Stop the loop so we don't try to get results from other futures
                #    that are likely to fail too.

                # 2. Cancel pending futures to stop sending requests (Python 3.9+)
                #    If you are on Python < 3.9, remove 'cancel_futures=True'
                executor.shutdown(wait=False, cancel_futures=True)

                # 3. Break the loop to exit the 'as_completed' iteration
                break

    except KeyboardInterrupt:
        print("\nUser interrupted execution. Saving current progress...")
        executor.shutdown(wait=False, cancel_futures=True)

    finally:
        # Ensure executor is cleaned up if not done already
        executor.shutdown(wait=False)

    print(f"✓ Process finished (or stopped early). Total generated records: {len(results)}")
    return results

In [30]:
pipeline1_results = semantic_mutator(subset_evaluate_data, n_variants=3)

Drafted 10 tasks. Starting execution...
Error calling Gemini: 429 You exceeded your current quota, please check your plan and billing details. For more information on this error, head to: https://ai.google.dev/gemini-api/docs/rate-limits. To monitor your current usage, head to: https://ai.dev/usage?tab=rate-limit. 
* Quota exceeded for metric: generativelanguage.googleapis.com/generate_content_free_tier_requests, limit: 5, model: gemini-3-flash
Please retry in 45.788585831s. [links {
  description: "Learn more about Gemini API quotas"
  url: "https://ai.google.dev/gemini-api/docs/rate-limits"
}
, violations {
  quota_metric: "generativelanguage.googleapis.com/generate_content_free_tier_requests"
  quota_id: "GenerateRequestsPerMinutePerProjectPerModel-FreeTier"
  quota_dimensions {
    key: "model"
    value: "gemini-3-flash"
  }
  quota_dimensions {
    key: "location"
    value: "global"
  }
  quota_value: 5
}
, retry_delay {
  seconds: 45
}
]Error calling Gemini: 429 You exceeded yo

In [32]:
pipeline1_results

[{'question_text': 'A 32-year-old farmer from a rural village near the Ogun River in Nigeria presents with persistent, agonizing itching across his back and thighs. He has noticed several firm, painless lumps under his skin, particularly around his hips and ribs, and his skin has become thickened and darker in the itchy areas over the last few months. How should this patient be managed to address both the severe skin irritation and the underlying parasitic load?',
  'ground_truth': 'Onchocerciasis(River \nblindness)'},
 {'question_text': 'A 50-year-old fisherman who works along the Sanaga River in Cameroon reports that his vision has become increasingly cloudy and blurred over the past year. He mentions that bright sunlight now causes him significant pain, and he frequently feels a gritty sensation in both eyes as if sand is trapped inside. Given his long-term exposure to blackfly bites in his community, what is the recommended therapeutic approach to prevent him from losing his sight 

In [33]:
new_generated_questions.extend(pipeline1_results)

## Pipeline 2 (Demographic sampler + mutator)

In [32]:
import concurrent.futures

def fetch_demographics_for_disease(disease: str, n_demographics: int) -> tuple[str, list] | None:
    """
    Helper function to handle a single API call and parsing.
    Returns a tuple of (disease, demographics_list).
    """
    prompt = PROMPT_DEMOGRAPHIC_SAMPLER.format(
        disease=disease,
        n_demographics=n_demographics
    )

    try:
        response = call_gemini(prompt)

        # Cleaning logic
        cleaned_response = response.replace("```json", "").replace("```", "").strip()

        demographics = json.loads(cleaned_response)

        if not isinstance(demographics, list):
            raise ValueError(f"Output for {disease} is not a list")

        print(f"✓ Generated demographics for {disease}")
        return disease, demographics

    except (json.JSONDecodeError, ValueError) as e:
        print(f"⚠ Error parsing/formatting for {disease}: {e}")
        print(f"--- RAW RESPONSE ---\n{response}\n--------------------")
        return None  # Re-raising to match your original 'fail hard' logic

def demographic_sampler(evaluate_dataset: list[dict], n_demographics: int = 3) -> dict[str, list[str]]:
    # 1. Deduplicate: Get unique diseases from the dataset
    unique_diseases = {item["ground_truth"] for item in evaluate_dataset}
    print(f"Identified {len(unique_diseases)} unique diseases to sample.")

    demographic_map = {}

    # Create executor manually so we can control shutdown behavior
    executor = concurrent.futures.ThreadPoolExecutor(max_workers=10)

    # Submit tasks
    future_to_disease = {
        executor.submit(fetch_demographics_for_disease, disease, n_demographics): disease
        for disease in unique_diseases
    }

    print(f"Drafted {len(future_to_disease)} tasks. Starting execution...")

    try:
        # Process results as they complete
        for idx, future in enumerate(concurrent.futures.as_completed(future_to_disease)):
            try:
                # 1. Get the result (this re-raises API exceptions like Quota Errors)
                result = future.result()

                # 2. Safety Check: Handle cases where helper returned None (JSON error)
                if result is None:
                    continue

                # 3. Unpack only if we have valid data
                disease, demographics = result
                demographic_map[disease] = demographics

            except Exception as exc:
                # ---------------------------------------------------------
                # CRITICAL ERROR HANDLING (e.g., Quota Exceeded)
                # ---------------------------------------------------------
                print(f"\nProcessing item {idx + 1}/{len(future_to_disease)}")
                print(f"\n!!!! CRITICAL ERROR ENCOUNTERED: {exc}")
                print(f"!!!! Preserving {len(demographic_map)} demographics collected so far...")

                # 1. Cancel pending futures so we don't waste quota/time
                executor.shutdown(wait=False, cancel_futures=True)

                # 2. Break the loop to return current data immediately
                break

    except KeyboardInterrupt:
        print("\nUser interrupted execution. Saving current progress...")
        executor.shutdown(wait=False, cancel_futures=True)

    finally:
        # Ensure cleanup
        executor.shutdown(wait=False)

    print(f"✓ Process finished. Total mapped diseases: {len(demographic_map)}")
    return demographic_map

In [33]:
demographic_samples = demographic_sampler(subset_evaluate_data, n_demographics=2)

Identified 9 unique diseases to sample.
Drafted 9 tasks. Starting execution...
Error calling Gemini: 429 You exceeded your current quota, please check your plan and billing details. For more information on this error, head to: https://ai.google.dev/gemini-api/docs/rate-limits. To monitor your current usage, head to: https://ai.dev/usage?tab=rate-limit. 
* Quota exceeded for metric: generativelanguage.googleapis.com/generate_content_free_tier_input_token_count, limit: 0, model: gemini-2.0-flash
* Quota exceeded for metric: generativelanguage.googleapis.com/generate_content_free_tier_requests, limit: 0, model: gemini-2.0-flash
* Quota exceeded for metric: generativelanguage.googleapis.com/generate_content_free_tier_requests, limit: 0, model: gemini-2.0-flash
Please retry in 55.954964728s. [links {
  description: "Learn more about Gemini API quotas"
  url: "https://ai.google.dev/gemini-api/docs/rate-limits"
}
, violations {
  quota_metric: "generativelanguage.googleapis.com/generate_conte

In [93]:
demographic_samples

{'Guinea worm': ['a low-income adolescent Dinka male from a rural cattle camp in Lakes State, South Sudan',
  'a low-income middle-aged Baguirmi woman from a remote riverside community in the Chari-Baguirmi Region of Chad'],
 'Gardiasis': ['A low-income South Asian female child from a rural village in the Bihar province of India',
  'A working-class Mestizo male adolescent from a riverbank community in the Loreto Department of Peru']}

In [34]:
demographic_samples

{}

In [97]:
for item in demographic_samples.items():
    print(f"Disease: {item[0]}")
    for demo in item[1]:
        print(f"  - Demographic: {demo}")

Disease: Guinea worm
  - Demographic: a low-income adolescent Dinka male from a rural cattle camp in Lakes State, South Sudan
  - Demographic: a low-income middle-aged Baguirmi woman from a remote riverside community in the Chari-Baguirmi Region of Chad
Disease: Gardiasis
  - Demographic: A low-income South Asian female child from a rural village in the Bihar province of India
  - Demographic: A working-class Mestizo male adolescent from a riverbank community in the Loreto Department of Peru


In [88]:
for demographic_sample in demographic_samples:
    demographic = demographic_samples[demographic_sample]
    print(f"{demographic_sample}: {demographic}")

Demographic sample for Guinea worm: ['a low-income adolescent Dinka male from a rural cattle camp in Lakes State, South Sudan', 'a low-income middle-aged Baguirmi woman from a remote riverside community in the Chari-Baguirmi Region of Chad']
Demographic sample for Gardiasis: ['A low-income South Asian female child from a rural village in the Bihar province of India', 'A working-class Mestizo male adolescent from a riverbank community in the Loreto Department of Peru']


In [98]:
from concurrent.futures import ThreadPoolExecutor, as_completed

def demographic_plausibility_filter(demographic_samples: dict[str, list[str]]) -> dict[str, list[str]]:
    """
    Optimized filter that runs API calls in parallel, handles exceptions gracefully,
    and saves progress if a crash occurs.
    """

    # Helper function to run inside the thread pool
    def check_plausibility(demographic):
        # Ensure PROMPT_DEMOGRAPHIC_PLAUSIBLE_FILTER and call_gemini are defined globally or passed in
        prompt = PROMPT_DEMOGRAPHIC_PLAUSIBLE_FILTER.format(demographic=demographic)
        response = call_gemini(prompt)
        is_plausible = "yes" in response.lower().strip()
        return demographic, is_plausible

    # Flatten the data to prepare for parallel execution
    tasks = []
    for disease, demo_list in demographic_samples.items():
        for demo in demo_list:
            tasks.append((disease, demo))

    print(f"Running validation for {len(tasks)} items in parallel...")

    # Initialize dictionary with empty lists for all keys immediately.
    # This ensures that if we return early, we pass back a valid structure.
    validated_samples = {k: [] for k in demographic_samples}

    # Manually create executor to allow explicit shutdown
    executor = ThreadPoolExecutor(max_workers=5)

    # Submit all tasks
    future_to_info = {
        executor.submit(check_plausibility, demo): disease
        for disease, demo in tasks
    }

    try:
        for idx, future in enumerate(as_completed(future_to_info)):
            disease = future_to_info[future]
            try:
                # 1. Get result (re-raises API exceptions like Quota Errors)
                demo_text, is_plausible = future.result()

                # Logging
                # status = "✓ PLAUSIBLE" if is_plausible else "✗ IMPLAUSIBLE"
                # print(f"  {status}: {demo_text[:40]}...")

                # 2. Only add back to the list if it is plausible
                if is_plausible:
                    validated_samples[disease].append(demo_text)

            except Exception as e:
                # ---------------------------------------------------------
                # CRITICAL ERROR HANDLING
                # ---------------------------------------------------------
                print(f"\nProcessing item {idx + 1}/{len(future_to_info)}")
                print(f"\n!!!! CRITICAL ERROR ENCOUNTERED during validation: {e}")

                # Count how many items we successfully validated so far
                count = sum(len(v) for v in validated_samples.values())
                print(f"!!!! Preserving {count} validated items collected so far...")

                # 1. Cancel pending futures so we don't waste quota/time
                executor.shutdown(wait=False, cancel_futures=True)

                # 2. Break the loop to return current data immediately
                break

    except KeyboardInterrupt:
        print("\nUser interrupted execution. Saving current progress...")
        executor.shutdown(wait=False, cancel_futures=True)

    finally:
        # Ensure cleanup
        executor.shutdown(wait=False)

    total_valid = sum(len(v) for v in validated_samples.values())
    print(f"✓ Validation finished (or stopped early). Total valid items retained: {total_valid}")

    return validated_samples

In [99]:
validated_samples = demographic_plausibility_filter(demographic_samples)

Running validation for 4 items in parallel...
  ✓ PLAUSIBLE: a low-income adolescent Dinka male from ...
  ✓ PLAUSIBLE: A low-income South Asian female child fr...
  ✓ PLAUSIBLE: a low-income middle-aged Baguirmi woman ...
  ✓ PLAUSIBLE: A working-class Mestizo male adolescent ...


In [101]:
validated_samples

{'Guinea worm': ['a low-income adolescent Dinka male from a rural cattle camp in Lakes State, South Sudan',
  'a low-income middle-aged Baguirmi woman from a remote riverside community in the Chari-Baguirmi Region of Chad'],
 'Gardiasis': ['A low-income South Asian female child from a rural village in the Bihar province of India',
  'A working-class Mestizo male adolescent from a riverbank community in the Loreto Department of Peru']}

In [102]:
import concurrent.futures

def process_mutation(item: dict, new_demographic: str) -> dict | None:
    """Helper function to process a single mutation."""
    original_question = item["question_text"]
    disease = item["ground_truth"]

    prompt = PROMPT_DEMOGRAPHIC_MUTATOR.format(
        original_question=original_question,
        new_demographic=new_demographic,
        disease=disease
    )

    # 1. API Call: We let this raise an exception if it fails (e.g., Quota Limit).
    #    If we catch it here and return None, the main loop won't know to stop!
    response = call_gemini(prompt)

    try:
        # 2. Parsing Logic: We catch errors here because a bad string format
        #    shouldn't stop the whole pipeline, just skip this item.
        rewritten_question = response.replace("```", "").strip()

        # Optional: Add a check if the response is empty
        if not rewritten_question:
            raise ValueError("Empty response received")

        # print(f"✓ Rewrote for: {disease} -> {new_demographic[:30]}...")

        return {
            'question_text': rewritten_question,
            'ground_truth': disease
        }
    except Exception as e:
        print(f"⚠ Parsing/Data error for {disease}: {e}")
        return None

def demographic_mutator(evaluate_dataset: list[dict], demographic_map: dict[str, list[str]]) -> list[dict]:
    tasks = []

    # 1. Flatten the nested loops into a list of tasks
    for item in evaluate_dataset:
        disease = item["ground_truth"]
        # Use .get() to avoid KeyErrors if a disease has no demographics mapped
        demographics = demographic_map.get(disease, [])

        for new_demographic in demographics:
            tasks.append((item, new_demographic))

    print(f"Prepared {len(tasks)} mutation tasks. Starting execution...")

    results = []

    # 2. Execute tasks in parallel
    # Manually create executor to allow explicit shutdown on error
    executor = concurrent.futures.ThreadPoolExecutor(max_workers=10)

    # Submit all tasks
    future_to_mutation = {
        executor.submit(process_mutation, item, demo): (item, demo)
        for item, demo in tasks
    }

    try:
        # Collect results as they complete
        for idx, future in enumerate(concurrent.futures.as_completed(future_to_mutation)):
            try:
                # This will raise an exception if the API call failed (Critical Error)
                result = future.result()

                # Only append if valid (process_mutation returns None on parsing error)
                if result:
                    results.append(result)

            except Exception as exc:
                # ---------------------------------------------------------
                # CRITICAL ERROR HANDLING (e.g., Quota Exceeded, Network Down)
                # ---------------------------------------------------------
                print(f"\nProcessing item {idx + 1}/{len(future_to_mutation)}")
                print(f"\n!!!! CRITICAL ERROR ENCOUNTERED: {exc}")
                print(f"!!!! Preserving {len(results)} mutations generated so far...")

                # 1. Stop processing remaining tasks to save quota/time
                executor.shutdown(wait=False, cancel_futures=True)

                # 2. Break the loop to return data immediately
                break

    except KeyboardInterrupt:
        print("\nUser interrupted execution. Saving current progress...")
        executor.shutdown(wait=False, cancel_futures=True)

    finally:
        # Always clean up threads
        executor.shutdown(wait=False)

    print(f"✓ Process finished (or stopped early). Total mutations: {len(results)}")
    return results

In [103]:
pipeline2_results = demographic_mutator(subset_evaluate_data, validated_samples)

Tasks: [({'question_text': 'How can I get treated for Guinea worm? I live in Sarh area in Southeast Chari.', 'ground_truth': 'Guinea worm'}, 'a low-income adolescent Dinka male from a rural cattle camp in Lakes State, South Sudan'), ({'question_text': 'How can I get treated for Guinea worm? I live in Sarh area in Southeast Chari.', 'ground_truth': 'Guinea worm'}, 'a low-income middle-aged Baguirmi woman from a remote riverside community in the Chari-Baguirmi Region of Chad'), ({'question_text': 'How can I tell if my 10 year old son has Gardiasis? We are from the Semai tribe in Peninsula Malaysia.', 'ground_truth': 'Gardiasis'}, 'A low-income South Asian female child from a rural village in the Bihar province of India'), ({'question_text': 'How can I tell if my 10 year old son has Gardiasis? We are from the Semai tribe in Peninsula Malaysia.', 'ground_truth': 'Gardiasis'}, 'A working-class Mestizo male adolescent from a riverbank community in the Loreto Department of Peru')]
✓ Rewrote f

In [104]:
pipeline2_results

[{'question_text': 'I’m a young man living in a cattle camp in Lakes State, and I have a painful, burning blister on my leg with something long and white starting to come out of it. How can I get this treated?',
  'ground_truth': 'Guinea worm'},
 {'question_text': 'I am a woman living in a remote village along the river in Chari-Baguirmi, and it is very difficult for me to afford a trip to the city. I have a very painful, burning blister on my leg, and I can see a long white string starting to come out of the wound. What should I do to get help since I am so far from the main clinics?',
  'ground_truth': 'Guinea worm'},
 {'question_text': 'How can I tell if my 8-year-old daughter has that stomach sickness that causes the greasy, foul-smelling diarrhea and bloating? We are a poor family living in a rural village in Bihar, India.',
  'ground_truth': 'Gardiasis'},
 {'question_text': 'How can I tell if I’ve caught that stomach parasite from the water? I’m a teenager living in a riverbank c

In [None]:
new_generated_questions.extend(pipeline2_results)

## Pipeline 3 (Consumer rewrite)

In [116]:
import concurrent.futures

def process_rewrite(item: dict) -> dict | None:
    """
    Helper function to process a single rewrite.
    """
    clinical_text = item["question_text"]
    disease = item["ground_truth"]

    prompt = PROMPT_CONSUMER_REWRITE.format(
        clinical_text=clinical_text
    )

    # 1. API Call: Raise exception on failure (e.g., Quota Exceeded)
    #    We do NOT catch API errors here, so the main loop knows to stop.
    response = call_gemini(prompt)

    try:
        # 2. Parsing Logic: Catch errors here (e.g. bad format)
        #    because a single bad response shouldn't crash the whole batch.
        rewritten_text = response.replace("```", "").strip()

        if not rewritten_text:
            raise ValueError("Empty response received")

        print(f"✓ Rewrote consumer text for disease: {disease}")

        return {
            'question_text': rewritten_text,
            'ground_truth': disease
        }

    except Exception as e:
        # Log parsing/formatting errors but skip this item
        print(f"⚠ Parsing error for {disease}: {e}")
        return None

def consumer_rewrite(evaluate_dataset: list[dict]) -> list[dict]:
    results = []

    # Manually create executor to allow explicit shutdown
    executor = concurrent.futures.ThreadPoolExecutor(max_workers=10)

    # Submit tasks
    future_to_item = {
        executor.submit(process_rewrite, item): item
        for item in evaluate_dataset
    }

    print(f"Drafted {len(future_to_item)} rewrite tasks. Starting execution...")

    try:
        # Collect results as they complete
        for idx, future in enumerate(concurrent.futures.as_completed(future_to_item)):
            try:
                # This will raise the exception if the API call failed (Critical Error)
                result = future.result()

                # Only append if valid (result is None on parsing error)
                if result:
                    results.append(result)

            except Exception as exc:
                # ---------------------------------------------------------
                # CRITICAL ERROR HANDLING (e.g., Quota Exceeded)
                # ---------------------------------------------------------
                print(f"\nProcessing item {idx + 1}/{len(future_to_item)}")
                print(f"\n!!!! CRITICAL ERROR ENCOUNTERED: {exc}")
                print(f"!!!! Preserving {len(results)} rewrites generated so far...")

                # 1. Stop processing remaining tasks to save quota/time
                executor.shutdown(wait=False, cancel_futures=True)

                # 2. Break the loop to return data immediately
                break

    except KeyboardInterrupt:
        print("\nUser interrupted execution. Saving current progress...")
        executor.shutdown(wait=False, cancel_futures=True)

    finally:
        # Always clean up threads
        executor.shutdown(wait=False)

    print(f"✓ Process finished (or stopped early). Total rewrites: {len(results)}")
    return results

In [119]:
consumer_rewritten_questions = consumer_rewrite(subset_evaluate_data)

✓ Rewrote consumer text for disease: Elephantiasis 
Malaria
Dengue Fever
✓ Rewrote consumer text for disease: Rabies
✓ Rewrote consumer text for disease: Lassa fever 
Tuberculosis 
Leptospirosis


In [120]:
consumer_rewritten_questions

[{'question_text': "I’m feeling terrible with a high fever, shivers, and aches all over my body. My legs are massively swollen, and now I have swelling in my private area too. I live in the Bole district and haven't been using a mosquito net, so I need help figuring out what's wrong.",
  'ground_truth': 'Elephantiasis \nMalaria\nDengue Fever'},
 {'question_text': "A street dog bit my leg about a week ago while I was in rural Punjab, and now the bite area is tingling and I have a fever and a bad headache. I feel incredibly weak and clumsy, and lately I’ve been feeling really anxious and seeing things that aren't there. I'm very worried because I can't seem to coordinate my movements anymore.",
  'ground_truth': 'Rabies'},
 {'question_text': "I’ve been feeling exhausted with a fever, chills, and no appetite, plus it really hurts my chest whenever I breathe or cough. About two days ago, I started getting terrible headaches and noticed I’m coughing up blood and seeing blood in my urine too

In [None]:
new_generated_questions.extend(consumer_rewritten_questions)

## Pipeline 4 (Context combination)

In [153]:
import concurrent.futures
import json

def process_item_variants(item: dict, n_variants: int) -> list[dict] | None:
    """
    1. Extracts metadata once.
    2. Generates 'n' variants with different include_lists.
    """
    clinical_text = item["question_text"]
    ground_truth = item["ground_truth"]

    # --- STEP 1: EXTRACT FACTS (One call per item) ---
    # We do NOT wrap this in try/except so API errors bubble up
    extract_response = call_gemini(PROMPT_EXTRACTOR.format(clinical_text=clinical_text))

    try:
        # Local try/except for parsing only
        cleaned_extract = extract_response.replace("```json", "").replace("```", "").strip()
        extracted_data = json.loads(cleaned_extract)
    except (json.JSONDecodeError, ValueError) as e:
        print(f"⚠ Extraction Parse Error for {ground_truth}: {e}")
        return None

    # --- STEP 2: GENERATE VARIANTS (One call per item) ---
    # Pass the extracted JSON string directly to the model
    # Again, no try/except here so API errors bubble up
    variant_prompt = PROMPT_VARIANT_GENERATOR.format(
        n_variants=n_variants,
        json_data=json.dumps(extracted_data)
    )

    variant_response = call_gemini(variant_prompt)

    try:
        # Local try/except for parsing only
        cleaned_variants = variant_response.replace("```json", "").replace("```", "").strip()
        variants_list = json.loads(cleaned_variants)

        if not isinstance(variants_list, list):
             raise ValueError("Output is not a list")

        # --- STEP 3: FORMAT OUTPUT ---
        results = []
        for v in variants_list:
            # We construct the final object, preserving the extracted data for your reference
            # We update 'include_list' in the metadata to match what was actually used in this variant
            # current_meta = extracted_data.copy()
            # current_meta['include_list'] = v.get('include_list', [])

            results.append({
                'question_text': v['question_text'],
                'ground_truth': ground_truth
                # 'meta_extracted_data': current_meta
            })

        print(f"✓ Generated {len(results)} variants for: {ground_truth}")
        return results

    except (json.JSONDecodeError, ValueError) as e:
        print(f"⚠ Variant Parse Error for {ground_truth}: {e}")
        return None


def context_combination_variants(evaluate_dataset: list[dict], n_variants: int = 3) -> list[dict]:
    results = []

    # Manually create executor to allow explicit shutdown
    executor = concurrent.futures.ThreadPoolExecutor(max_workers=10)

    # Submit tasks
    future_to_item = {
        executor.submit(process_item_variants, item, n_variants): item
        for item in evaluate_dataset
    }

    print(f"Drafted {len(future_to_item)} tasks. Starting execution...")

    try:
        # Collect results as they complete
        for idx, future in enumerate(concurrent.futures.as_completed(future_to_item)):
            try:
                # This will raise the exception if the API call failed (Critical Error)
                batch_results = future.result()

                # Only extend if valid (helper returns None on parsing error)
                if batch_results:
                    results.extend(batch_results)

            except Exception as exc:
                # ---------------------------------------------------------
                # CRITICAL ERROR HANDLING (e.g., Quota Exceeded)
                # ---------------------------------------------------------
                print(f"\nProcessing item {idx + 1}/{len(future_to_item)}")
                print(f"\n!!!! CRITICAL ERROR ENCOUNTERED: {exc}")
                print(f"!!!! Preserving {len(results)} variants generated so far...")

                # 1. Stop processing remaining tasks to save quota/time
                executor.shutdown(wait=False, cancel_futures=True)

                # 2. Break the loop to return data immediately
                break

    except KeyboardInterrupt:
        print("\nUser interrupted execution. Saving current progress...")
        executor.shutdown(wait=False, cancel_futures=True)

    finally:
        # Always clean up threads
        executor.shutdown(wait=False)

    print(f"✓ Process finished (or stopped early). Total variants: {len(results)}")
    return results

In [137]:
subset_evaluate_data

[{'question_text': 'How can I get treated for Guinea worm? I live in Sarh area in Southeast Chari.',
  'ground_truth': 'Guinea worm'},
 {'question_text': 'How can I tell if my 10 year old son has Gardiasis? We are from the Semai tribe in Peninsula Malaysia.',
  'ground_truth': 'Gardiasis'}]

In [154]:
context_combination_questions = context_combination_variants(subset_evaluate_data)

Extraction response for Gardiasis:
```json
{
  "attributes": "10 year old son",
  "general_symptoms": null,
  "specific_symptoms": null,
  "location": "Peninsula Malaysia",
  "risk_factors": "Semai tribe"
}
```

Cleaned extract for Gardiasis:
{
  "attributes": "10 year old son",
  "general_symptoms": null,
  "specific_symptoms": null,
  "location": "Peninsula Malaysia",
  "risk_factors": "Semai tribe"
}

Extracted data for Gardiasis:
{'attributes': '10 year old son', 'general_symptoms': None, 'specific_symptoms': None, 'location': 'Peninsula Malaysia', 'risk_factors': 'Semai tribe'}

Extraction response for Guinea worm:
```json
{
  "attributes": null,
  "general_symptoms": null,
  "specific_symptoms": null,
  "location": "Sarh area in Southeast Chari",
  "risk_factors": null
}
```

Cleaned extract for Guinea worm:
{
  "attributes": null,
  "general_symptoms": null,
  "specific_symptoms": null,
  "location": "Sarh area in Southeast Chari",
  "risk_factors": null
}

Extracted data for Gu

In [155]:
context_combination_questions

[{'question_text': 'What are the potential health concerns for a 10-year-old son belonging to the Semai tribe and residing in Peninsula Malaysia?',
  'ground_truth': 'Gardiasis',
  'meta_extracted_data': {'attributes': '10 year old son',
   'general_symptoms': None,
   'specific_symptoms': None,
   'location': 'Peninsula Malaysia',
   'risk_factors': 'Semai tribe',
   'include_list': ['attributes', 'risk_factors', 'location']}},
 {'question_text': 'Considering a 10-year-old son from the Semai tribe, what health considerations are relevant given his location in Peninsula Malaysia?',
  'ground_truth': 'Gardiasis',
  'meta_extracted_data': {'attributes': '10 year old son',
   'general_symptoms': None,
   'specific_symptoms': None,
   'location': 'Peninsula Malaysia',
   'risk_factors': 'Semai tribe',
   'include_list': ['attributes', 'location', 'risk_factors']}},
 {'question_text': 'For a 10-year-old son who is part of the Semai tribe and lives in Peninsula Malaysia, what clinical presen

In [None]:
new_generated_questions.extend(context_combination_questions)

## Pipeline5 (Final quality filter)

In [173]:
import concurrent.futures

def verify_item_quality(item: dict) -> dict | None:
    """
    Worker function that chains the two quality checks.
    Returns the item if it passes ALL checks. Returns None if it fails any.
    """
    question = item.get("question_text", "")
    disease = item.get("ground_truth", "")

    # We do NOT use a global try/except block here.
    # We want critical errors (like Quota Exceeded) to raise exceptions
    # so the main loop can stop the entire pipeline.

    # --- CHECK 1: Symptom Plausibility (Step 5) ---
    prompt_step_5 = PROMPT_SYMPTOM_PLAUSIBLE_FILTER.format(
        question=question,
        disease=disease
    )

    # API Call 1: May raise exception (e.g., Quota)
    response_5 = call_gemini(prompt_step_5, temperature=0.0)
    is_plausible = response_5.lower().strip() == "yes"

    if not is_plausible:
        # print(f"✗ REJECTED (Plausibility): {disease}")
        return None # Fail Fast

    # print(f"✓ PASSED (Plausibility): {disease}")

    # --- CHECK 2: Final Quality Gate (Step 7) ---
    prompt_step_7 = PROMPT_FILTER_CHECKER.format(
        disease=disease,
        case_text=question
    )

    # API Call 2: May raise exception (e.g., Quota)
    response_7 = call_gemini(prompt_step_7, temperature=0.0)
    keep = response_7.lower().strip() == "keep"

    if not keep:
        print(f"✗ REJECTED (Quality): {disease}")
        return None

    print(f"✓✓ KEPT: {disease}")
    return item # Item survived both checks


def quality_assurance_filter(evaluate_dataset: list[dict]) -> list[dict]:
    """
    Main pipeline function with graceful failure handling.
    """
    valid_results = []

    print(f"Starting quality check for {len(evaluate_dataset)} items...")

    # Manually create executor to allow explicit shutdown
    executor = concurrent.futures.ThreadPoolExecutor(max_workers=10)

    # Submit all items
    future_to_item = {
        executor.submit(verify_item_quality, item): item
        for item in evaluate_dataset
    }

    try:
        # Collect results as they finish
        for future in concurrent.futures.as_completed(future_to_item):
            try:
                # This will raise the exception if the API call failed (Critical Error)
                result = future.result()

                # Only append if the item passed checks (result is not None)
                if result is not None:
                    valid_results.append(result)

            except Exception as exc:
                # ---------------------------------------------------------
                # CRITICAL ERROR HANDLING (e.g., Quota Exceeded)
                # ---------------------------------------------------------
                print(f"\n!!!! CRITICAL ERROR ENCOUNTERED: {exc}")
                print(f"!!!! Preserving {len(valid_results)} valid items collected so far...")

                # 1. Stop processing remaining tasks to save quota/time
                executor.shutdown(wait=False, cancel_futures=True)

                # 2. Break the loop to return data immediately
                break

    except KeyboardInterrupt:
        print("\nUser interrupted execution. Saving current progress...")
        executor.shutdown(wait=False, cancel_futures=True)

    finally:
        # Always clean up threads
        executor.shutdown(wait=False)

    print(f"Final Count: {len(valid_results)} valid items retained out of {len(evaluate_dataset)} processed/attempted.")
    return valid_results

In [171]:
final_filtered_questions = quality_assurance_filter(new_generated_questions)

Starting quality check for 2 items...
Symptom plausibility for Gardiasis: False
✗ Discarded due to symptom mismatch: How can I tell if my 10 year o...
Symptom plausibility for Guinea worm: True
✓ PASSED (Plausibility): Guinea worm
Final plausibility for Guinea worm: True
✓ PASSED (Final Check): Guinea worm
Final Count: 1 valid items retained out of 2.


In [None]:
final_filtered_questions

## Anh em chạy đến đây thôi. Anh em chạy rồi paste cái final_filtered_questions vào cột tên mình trong sheet nhé.

## Transform into messages format for evaluation

In [28]:
retriever_result = [{'id': 9008057438491131065, 'score': 0.7521086, 'context': "[FEVER]\nContent: Prolonged fever is common, especially among patients with low CD4 cell counts (e.g. < 200 cells/ml) who are not receiving ART [151], and differential  diagnosis  is  broad  and  varies  geographically.  Infectious etiologies are responsible for the majority of fevers.\nTuberculosis should be considered in all patients with fever especially if cough, shortness of breath, weight loss, night sweats, or lymphadenopathy are present (see Chapter 39, ' Tuberculosis ') [131]. Other causes of fever are cryptococcal disease (e.g. C. neoformans bloodstream infection),  bacteremia  (e.g. Salmonella , Rhodococcus ),  pneumonia,  both bacterial (e.g. S.  pneumoniae )  and  fungal (e.g. PCP), malaria, CMV, and lymphoma. Multiple concurrent etiologies occur.\nThe  diagnostic  work-up  for  fever  should  be  driven  by  the  clinical presentation and availability of diagnostic tests. Blood cultures, blood smear for malaria, mycobacterial blood cultures and serum cryptococcal antigen testing can be helpful. Chest radiography, urine culture, bone marrow aspirate and biopsy, and other radiologic studies (e.g. brain imaging, abdominal imaging) may be necessary. Certain etiologies such as mycobacterial disease, Pneumocystis , Bartonella , lymphoma and certain fungal infections can be particularly difficult to diagnose and  should  be  considered  when  fever  persists  despite  a  diagnostic work-up.", 'payload': {'file_name': 'Case120_embedding_google.json', 'case': 'Case120', 'type': 'text', 'section': 'FEVER', 'original_key': None, 'text': "Prolonged fever is common, especially among patients with low CD4 cell counts (e.g. < 200 cells/ml) who are not receiving ART [151], and differential  diagnosis  is  broad  and  varies  geographically.  Infectious etiologies are responsible for the majority of fevers.\nTuberculosis should be considered in all patients with fever especially if cough, shortness of breath, weight loss, night sweats, or lymphadenopathy are present (see Chapter 39, ' Tuberculosis ') [131]. Other causes of fever are cryptococcal disease (e.g. C. neoformans bloodstream infection),  bacteremia  (e.g. Salmonella , Rhodococcus ),  pneumonia,  both bacterial (e.g. S.  pneumoniae )  and  fungal (e.g. PCP), malaria, CMV, and lymphoma. Multiple concurrent etiologies occur.\nThe  diagnostic  work-up  for  fever  should  be  driven  by  the  clinical presentation and availability of diagnostic tests. Blood cultures, blood smear for malaria, mycobacterial blood cultures and serum cryptococcal antigen testing can be helpful. Chest radiography, urine culture, bone marrow aspirate and biopsy, and other radiologic studies (e.g. brain imaging, abdominal imaging) may be necessary. Certain etiologies such as mycobacterial disease, Pneumocystis , Bartonella , lymphoma and certain fungal infections can be particularly difficult to diagnose and  should  be  considered  when  fever  persists  despite  a  diagnostic work-up.", 'timestamp': '2025-11-19T14:50:51.956877'}}, {'id': 662724563744296574, 'score': 0.7512406, 'context': '[SYMPTOMS]\nContent: - Fever, fatigue, chills, sweats, headache, myalgia, anorexia, non-productive cough, arthralgia and nausea\n- Less common: emotional lability and depression, hyperesthesia, sore throat, abdominal pain, conjunctival injection, photophobia and weight loss', 'payload': {'file_name': 'Case192_embedding_google.json', 'case': 'Case192', 'type': 'text', 'section': 'Symptoms', 'original_key': None, 'text': '- Fever, fatigue, chills, sweats, headache, myalgia, anorexia, non-productive cough, arthralgia and nausea\n- Less common: emotional lability and depression, hyperesthesia, sore throat, abdominal pain, conjunctival injection, photophobia and weight loss', 'timestamp': '2025-11-19T15:09:42.869239'}}, {'id': 8148692680243629696, 'score': 0.7457813, 'context': "[HISTORY OF PRESENT ILLNESS]\nDisease: Tuberculous Lymphadenitis with Paradoxical IRIS\nFinal Diagnosis: The final diagnosis was tuberculous lymphadenitis with superimposed bacterial lymphadenitis, complicated by paradoxical immune reconstitution inflammatory syndrome (IRIS). The initial diagnosis of tuberculous lymphadenitis was confirmed by an AFB-positive fine-needle aspirate from a neck gland. The paradoxical IRIS was diagnosed clinically based on his worsening condition after starting ART, despite evidence of a good virologic and immunologic response. The superimposed bacterial lymphadenitis, likely with Staphylococcus aureus, was confirmed by the second aspirate showing purulent material with coccoid bacteria and the patient's rapid improvement following incision, drainage, and antibiotics.\nVitals: On presentation, he was febrile with a temperature of 39.2 °C (102.56° F). His pulse was 112 bpm, respiratory rate was 28 breaths per minute, and he had a normal blood pressure.\nContent: For six weeks, the patient has experienced a productive cough and chest pain, associated with systemic symptoms including weight loss, fevers, and night sweats. During this period, he also developed a painful swelling in his neck. He has no other reported symptoms.", 'payload': {'file_name': 'case47_embeddings.json', 'case': 'Case47', 'type': 'json_field', 'section': 'History Of Present Illness', 'original_key': 'history_of_present_illness', 'text': 'For six weeks, the patient has experienced a productive cough and chest pain, associated with systemic symptoms including weight loss, fevers, and night sweats. During this period, he also developed a painful swelling in his neck. He has no other reported symptoms.', 'timestamp': '2025-11-18T16:58:03.860922', 'Disease Name Short': 'Tuberculous Lymphadenitis with Paradoxical IRIS', 'Final Diagnosis': "The final diagnosis was tuberculous lymphadenitis with superimposed bacterial lymphadenitis, complicated by paradoxical immune reconstitution inflammatory syndrome (IRIS). The initial diagnosis of tuberculous lymphadenitis was confirmed by an AFB-positive fine-needle aspirate from a neck gland. The paradoxical IRIS was diagnosed clinically based on his worsening condition after starting ART, despite evidence of a good virologic and immunologic response. The superimposed bacterial lymphadenitis, likely with Staphylococcus aureus, was confirmed by the second aspirate showing purulent material with coccoid bacteria and the patient's rapid improvement following incision, drainage, and antibiotics.", 'Vitals': 'On presentation, he was febrile with a temperature of 39.2 °C (102.56° F). His pulse was 112 bpm, respiratory rate was 28 breaths per minute, and he had a normal blood pressure.'}}, {'id': 7158911615073387574, 'score': 0.7316114, 'context': "[SYMPTOMS AND SIGNS]\nContent: Typically, the disease begins with the abrupt onset of intense headache, fever, chills, and myalgia. Fever often exceeds 40°C (103°F) and is preceded by rigors. Muscle pain can be excruciating and occurs most commonly in the thighs, calves, lumbosacral region, and abdomen. Abdominal  wall  pain  accompanied  by  palpation  tenderness  can mimic an acute surgical abdomen. Nausea, vomiting, diarrhea, and sore throat are other frequent symptoms. Cough and chest pain figure prominently in reports of patients from Korea and China.\nConjunctival suffusion is a helpful diagnostic clue that usually appears two  or  three  days  after  the  onset  of  fever  and  involves  the  bulbar conjunctiva.  Pus  and  serous  secretions  are  absent  and  there  is  no matting  of  the  eyelashes  and  eyelids.  Mild  suffusion  can  easily  be overlooked. Less common and less distinctive signs include pharyngeal injection, splenomegaly, hepatomegaly, lymphadenopathy, and skin  lesions.  Within  a  week,  most  patients  become  asymptomatic. After several days of apparent recovery, the illness resumes in some individuals. Manifestations of the second stage are more variable and mild than those of the initial illness and usually last 2-4 days. Leptospires  disappear  from  the  blood,  cerebrospinal  fluid  (CSF),  and tissues, but appear in the urine. Serum antibody titers rise - hence the term  'immune'  phase.  Meningitis  is  the  hallmark  of  this  stage  of leptospirosis. Pleocytosis of the CSF can be demonstrated in 80-90% of all patients during the second week of illness, although only about 50% will have clinical signs and symptoms of meningitis. Meningeal signs can last several weeks, but usually resolve within a day or two. Uveitis  is  a  late  manifestation  of  leptospirosis,  generally  seen  4-8 months after the illness has begun. The anterior uveal tract is most frequently affected and pain, photophobia, and blurring of vision are the usual symptoms.", 'payload': {'file_name': 'Case167_embedding_google.json', 'case': 'Case167', 'type': 'text', 'section': 'Symptoms and Signs', 'original_key': None, 'text': "Typically, the disease begins with the abrupt onset of intense headache, fever, chills, and myalgia. Fever often exceeds 40°C (103°F) and is preceded by rigors. Muscle pain can be excruciating and occurs most commonly in the thighs, calves, lumbosacral region, and abdomen. Abdominal  wall  pain  accompanied  by  palpation  tenderness  can mimic an acute surgical abdomen. Nausea, vomiting, diarrhea, and sore throat are other frequent symptoms. Cough and chest pain figure prominently in reports of patients from Korea and China.\nConjunctival suffusion is a helpful diagnostic clue that usually appears two  or  three  days  after  the  onset  of  fever  and  involves  the  bulbar conjunctiva.  Pus  and  serous  secretions  are  absent  and  there  is  no matting  of  the  eyelashes  and  eyelids.  Mild  suffusion  can  easily  be overlooked. Less common and less distinctive signs include pharyngeal injection, splenomegaly, hepatomegaly, lymphadenopathy, and skin  lesions.  Within  a  week,  most  patients  become  asymptomatic. After several days of apparent recovery, the illness resumes in some individuals. Manifestations of the second stage are more variable and mild than those of the initial illness and usually last 2-4 days. Leptospires  disappear  from  the  blood,  cerebrospinal  fluid  (CSF),  and tissues, but appear in the urine. Serum antibody titers rise - hence the term  'immune'  phase.  Meningitis  is  the  hallmark  of  this  stage  of leptospirosis. Pleocytosis of the CSF can be demonstrated in 80-90% of all patients during the second week of illness, although only about 50% will have clinical signs and symptoms of meningitis. Meningeal signs can last several weeks, but usually resolve within a day or two. Uveitis  is  a  late  manifestation  of  leptospirosis,  generally  seen  4-8 months after the illness has begun. The anterior uveal tract is most frequently affected and pain, photophobia, and blurring of vision are the usual symptoms.", 'timestamp': '2025-11-19T15:03:36.321341'}}, {'id': 6177139379158878770, 'score': 0.7270574, 'context': "[HISTORY OF PRESENT ILLNESS]\nDisease: Histoplasmosis\nFinal Diagnosis: The final diagnosis is disseminated histoplasmosis caused by the fungus Histoplasma capsulatum. This was confirmed by the identification of the organism in a submandibular lymph node biopsy via both PCR and subsequent culture. The clinical presentation of disseminated abscesses, osteomyelitis, and generalized lymphadenopathy, along with the patient's travel history to an endemic region in northern Thailand, supported the diagnosis. The dramatic clinical and laboratory response to specific antifungal therapy further confirmed the etiology.\nVitals: On admission, the patient was afebrile with a blood pressure of 130/80 mmHg and a pulse of 80 bpm.\nContent: The patient developed recurrent skin and subcutaneous abscesses, progressive lymphadenopathy, and weight loss over a two-month period. Her symptoms began 6 to 8 weeks after she returned from a family visit to northern Thailand. During that trip, she had experienced an illness with high fever, dry cough, and fatigue, which may have represented acute pulmonary histoplasmosis. Despite various antibiotic therapies prior to transfer, her clinical symptoms and inflammatory markers deteriorated. Initial pus and blood cultures were sterile, and a lymph node biopsy showed non-specific lymphadenitis.", 'payload': {'file_name': 'case61_embeddings_nomic.json', 'case': 'Case61', 'type': 'json_field', 'section': 'History Of Present Illness', 'original_key': 'history_of_present_illness', 'text': 'The patient developed recurrent skin and subcutaneous abscesses, progressive lymphadenopathy, and weight loss over a two-month period. Her symptoms began 6 to 8 weeks after she returned from a family visit to northern Thailand. During that trip, she had experienced an illness with high fever, dry cough, and fatigue, which may have represented acute pulmonary histoplasmosis. Despite various antibiotic therapies prior to transfer, her clinical symptoms and inflammatory markers deteriorated. Initial pus and blood cultures were sterile, and a lymph node biopsy showed non-specific lymphadenitis.', 'timestamp': '2025-11-18T16:09:14.601312', 'Disease Name Short': 'Histoplasmosis', 'Final Diagnosis': "The final diagnosis is disseminated histoplasmosis caused by the fungus Histoplasma capsulatum. This was confirmed by the identification of the organism in a submandibular lymph node biopsy via both PCR and subsequent culture. The clinical presentation of disseminated abscesses, osteomyelitis, and generalized lymphadenopathy, along with the patient's travel history to an endemic region in northern Thailand, supported the diagnosis. The dramatic clinical and laboratory response to specific antifungal therapy further confirmed the etiology.", 'Vitals': 'On admission, the patient was afebrile with a blood pressure of 130/80 mmHg and a pulse of 80 bpm.'}}]

In [29]:
contexts_list = [d.get("context", "") for d in (retriever_result or [])]
contexts = "\n\n---\n".join(contexts_list)

In [30]:
all_contexts = [contexts, contexts]

In [31]:
all_contexts

["[FEVER]\nContent: Prolonged fever is common, especially among patients with low CD4 cell counts (e.g. < 200 cells/ml) who are not receiving ART [151], and differential  diagnosis  is  broad  and  varies  geographically.  Infectious etiologies are responsible for the majority of fevers.\nTuberculosis should be considered in all patients with fever especially if cough, shortness of breath, weight loss, night sweats, or lymphadenopathy are present (see Chapter 39, ' Tuberculosis ') [131]. Other causes of fever are cryptococcal disease (e.g. C. neoformans bloodstream infection),  bacteremia  (e.g. Salmonella , Rhodococcus ),  pneumonia,  both bacterial (e.g. S.  pneumoniae )  and  fungal (e.g. PCP), malaria, CMV, and lymphoma. Multiple concurrent etiologies occur.\nThe  diagnostic  work-up  for  fever  should  be  driven  by  the  clinical presentation and availability of diagnostic tests. Blood cultures, blood smear for malaria, mycobacterial blood cultures and serum cryptococcal antige

In [32]:
subset_evaluate_data

[{'question_text': 'How can I get treated for Guinea worm? I live in Sarh area in Southeast Chari.',
  'ground_truth': 'Guinea worm'},
 {'question_text': 'How can I tell if my 10 year old son has Gardiasis? We are from the Semai tribe in Peninsula Malaysia.',
  'ground_truth': 'Gardiasis'},
 {'question_text': 'How do I know if I have Nipah Virus? I am a 16 year old boy living in Naogaon.',
  'ground_truth': 'Nipah Virus'},
 {'question_text': 'How would you treat river blindness for a 45 year old man who lives in the Brong Ahafo Region in Ghana.',
  'ground_truth': 'Onchocerciasis(River \nblindness)'},
 {'question_text': 'I am a 16 year old boy living in Naogaon. I have a fever, headaches, myalgia, vomiting and a sore throat. Do I have COVID or Nipah virus?',
  'ground_truth': 'Nipah Virus\nCOVID-19'},
 {'question_text': 'I am a 28 year old man showing signs of sleeping sickness, what should I do?',
  'ground_truth': 'sleeping sickness \n(African trypanosomiasis)'},
 {'question_text': '

In [33]:
# Use enumerate() to get both the index (i) and the dictionary (item)
for i, item in enumerate(subset_evaluate_data):
    # Ensure we don't go out of bounds if lists are different lengths
    if i < len(all_contexts):
        item['retrieved_contexts'] = all_contexts[i]

In [34]:
subset_evaluate_data

[{'question_text': 'How can I get treated for Guinea worm? I live in Sarh area in Southeast Chari.',
  'ground_truth': 'Guinea worm',
  'retrieved_contexts': "[FEVER]\nContent: Prolonged fever is common, especially among patients with low CD4 cell counts (e.g. < 200 cells/ml) who are not receiving ART [151], and differential  diagnosis  is  broad  and  varies  geographically.  Infectious etiologies are responsible for the majority of fevers.\nTuberculosis should be considered in all patients with fever especially if cough, shortness of breath, weight loss, night sweats, or lymphadenopathy are present (see Chapter 39, ' Tuberculosis ') [131]. Other causes of fever are cryptococcal disease (e.g. C. neoformans bloodstream infection),  bacteremia  (e.g. Salmonella , Rhodococcus ),  pneumonia,  both bacterial (e.g. S.  pneumoniae )  and  fungal (e.g. PCP), malaria, CMV, and lymphoma. Multiple concurrent etiologies occur.\nThe  diagnostic  work-up  for  fever  should  be  driven  by  the  cl

In [205]:
evaluate_data = []
for item in subset_evaluate_data:
    evaluate_data.append(
        {
            "messages" : [
                {
                    "role": "user",
                    "content": f"User question\n{item['question_text']}\n\n# Retrieved cases\n{item['retrieved_contexts']}"
                },
                {
                    "role": "system",
                    "content": INSTRUCTION
                }
            ]
        }
    )

In [206]:
evaluate_data

[{'messages': [{'role': 'user',
    'content': "User question\nHow can I get treated for Guinea worm? I live in Sarh area in Southeast Chari.\n\n# Retrieved cases\n[FEVER]\nContent: Prolonged fever is common, especially among patients with low CD4 cell counts (e.g. < 200 cells/ml) who are not receiving ART [151], and differential  diagnosis  is  broad  and  varies  geographically.  Infectious etiologies are responsible for the majority of fevers.\nTuberculosis should be considered in all patients with fever especially if cough, shortness of breath, weight loss, night sweats, or lymphadenopathy are present (see Chapter 39, ' Tuberculosis ') [131]. Other causes of fever are cryptococcal disease (e.g. C. neoformans bloodstream infection),  bacteremia  (e.g. Salmonella , Rhodococcus ),  pneumonia,  both bacterial (e.g. S.  pneumoniae )  and  fungal (e.g. PCP), malaria, CMV, and lymphoma. Multiple concurrent etiologies occur.\nThe  diagnostic  work-up  for  fever  should  be  driven  by  th

## Future: Pipeline Composition

In [175]:
import concurrent.futures
import time

def step_1_extract(data):
    time.sleep(1) # Simulate API call
    print("Step 1 complete")
    return f"{data} -> Extracted"

def step_2_rewrite(extracted_data):
    time.sleep(1) # Simulate API call
    print("Step 2 complete")
    return f"{extracted_data} -> Rewritten"

# --- THE COMPOSITION ---
def pipeline_wrapper(data):
    """
    Composes step 1 and step 2 manually.
    """
    # Result of 1 feeds into 2
    intermediate = step_1_extract(data)
    final_result = step_2_rewrite(intermediate)
    return final_result

# Usage
data_inputs = ["Case A", "Case B", "Case C"]

with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
    # You submit the WRAPPER, not the individual steps
    future_to_data = {
        executor.submit(pipeline_wrapper, data): data
        for data in data_inputs
    }

    for future in concurrent.futures.as_completed(future_to_data):
        print(f"Final Result: {future.result()}")

Step 1 complete
Step 1 complete
Step 1 complete
Step 2 completeStep 2 complete
Final Result: Case C -> Extracted -> Rewritten

Final Result: Case B -> Extracted -> Rewritten
Step 2 complete
Final Result: Case A -> Extracted -> Rewritten
