Project Phase 1: Stepwise API Exploration

Step 1: Import Libraries


In [1]:
!pip install -q requests pandas streamlit pyngrok faiss-cpu sentence-transformers numpy

import requests
import pandas as pd
import json
import hashlib
from datetime import datetime
import faiss
from sentence_transformers import SentenceTransformer
import numpy as np

In [2]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [3]:
# Secure KEY INPUT
import os
import getpass

# Securely Capture Key
# Input will be invisible. Paste key and press Enter.
key_input = getpass.getpass("üîë Enter Gemini API Key (Invisible Input): ")

if not key_input.startswith("AIza"):
    print("‚ö†Ô∏è Warning: Key might be invalid (usually starts with 'AIza').")
else:
    print("‚úÖ API Key captured securely in Environment Variable.")

# 2. Set as Environment Variable for the Session
os.environ["GEMINI_API_KEY"] = key_input

üîë Enter Gemini API Key (Invisible Input): ¬∑¬∑¬∑¬∑¬∑¬∑¬∑¬∑¬∑¬∑
‚úÖ API Key captured securely in Environment Variable.


In [4]:
%%writefile build_embeddings.py
import pandas as pd
import numpy as np
import faiss
import json
from sentence_transformers import SentenceTransformer

# === REAL PATH (from readlink) ===
BASE = "/content/drive/MyDrive/LLM_Based_GenAI_Sem1/data/"

# ---------------------------------------------
# Load Data
# ---------------------------------------------
df = pd.read_csv(f"{BASE}/clinical_trials_diabetes_full.csv")

df["status"] = df["status"].astype(str).str.strip().str.title()
bad_status = ["Terminated", "Withdrawn", "Suspended", "No Longer Available", "Unknown"]
df_clean = df[~df["status"].isin(bad_status)].copy()

# ---------------------------------------------
# Chunking
# ---------------------------------------------
chunks = []
chunk_map = []

for idx, row in df_clean.iterrows():
    title = str(row.get("brief_title", "")).strip()
    summary = str(row.get("brief_summary", "")).strip()

    if len(summary) < 20:
        continue

    text = f"Title: {title}\nSummary: {summary}"
    chunks.append(text)

    chunk_map.append({
        "nct_id": row["nct_id"],
        "title": title,
        "text": text,
        "status": row["status"]
    })

print(f"Created {len(chunks)} chunks.")

# ---------------------------------------------
# Embeddings
# ---------------------------------------------
embed_model = SentenceTransformer("all-MiniLM-L6-v2")
embeddings = embed_model.encode(chunks, batch_size=64, show_progress_bar=True)

np.save(f"{BASE}/clinical_trials_diabetes_full_embeddings.npy", embeddings)
print("Saved clinical_trials_diabetes_full_embeddings.npy")

# ---------------------------------------------
# Save chunk map
# ---------------------------------------------
with open(f"{BASE}/clinical_trials_diabetes_full_chunk_map.json", "w") as f:
    json.dump(chunk_map, f)

print("Saved clinical_trials_diabetes_full_chunk_map.json")

# ---------------------------------------------
# Build & Save FAISS
# ---------------------------------------------
dimension = embeddings.shape[1]
index = faiss.IndexFlatL2(dimension)
index.add(np.array(embeddings).astype("float32"))
faiss.write_index(index, f"{BASE}/clinical_trials_diabetes_full_faiss.index")

print("Saved clinical_trials_diabetes_full_faiss.index")
print("‚úÖ Embedding build COMPLETE.")


Overwriting build_embeddings.py


In [5]:
!python build_embeddings.py

2025-11-24 22:48:52.241430: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:467] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
E0000 00:00:1764024532.268865    2767 cuda_dnn.cc:8579] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
E0000 00:00:1764024532.274883    2767 cuda_blas.cc:1407] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
W0000 00:00:1764024532.289538    2767 computation_placer.cc:177] computation placer already registered. Please check linkage and avoid linking the same target more than once.
W0000 00:00:1764024532.289564    2767 computation_placer.cc:177] computation placer already registered. Please check linkage and avoid linking the same target more than once.
W0000 00:00:1764024532.289568    2767 computation_placer.cc:177] computation placer alr

In [6]:
%%writefile utils.py
import json
import hashlib
from datetime import datetime

import faiss
from sentence_transformers import SentenceTransformer

# --- Confidence score from distance ---

def calculate_confidence_score(distance: float, normalization_factor: float = 1.0) -> float:
    """Inverse L2 distance score in (0,1]; closer = higher confidence."""
    return normalization_factor / (normalization_factor + float(distance))


# --- Load pre-built index + chunk map ---

def load_data_and_index(chunk_map_path: str, faiss_path: str):
    """Loads pre-built chunks and FAISS index for quick startup."""
    print("‚è≥ Loading pre-built RAG index...")

    with open(chunk_map_path, "r") as f:
        chunk_map = json.load(f)

    index = faiss.read_index(faiss_path)

    embed_model = SentenceTransformer("all-MiniLM-L6-v2")

    print(f"‚úÖ RAG Index Ready: {index.ntotal} vectors loaded.")
    return embed_model, index, chunk_map


# --- Provenance logging ---

def log_provenance_step(agent_name: str, input_data, output_data, detail=None):
    """
    Creates a detailed log entry for a single agent step.
    """
    log_entry = {
        "timestamp": datetime.now().isoformat(),
        "agent": agent_name,
        "input": input_data,
        "output": output_data,
        "detail": detail or {},
        "model_version": "gemini-2.0-flash",
    }
    return log_entry


# --- Reproducibility hash ---

def generate_reproducibility_hash(conversation_history, corpus_version: str = "v1.0"):
    """
    Generates a deterministic session hash based on the conversation history.
    """
    queries = [turn.get("query", "") for turn in conversation_history]
    raw = f"{corpus_version}|{'|'.join(queries)}"
    return hashlib.md5(raw.encode("utf-8")).hexdigest()


Overwriting utils.py


In [7]:
%%writefile run_bot.py
import json
import re
import os
import sys
from typing import List, Dict, Any

import numpy as np
import google.generativeai as genai
from google.generativeai.types import HarmCategory, HarmBlockThreshold

# --- Updated Import: Robust Cross-Encoder Initialization ---
CrossEncoder = None
try:
    from sentence_transformers import CrossEncoder
    print("‚úÖ sentence_transformers imported successfully.")
except ImportError:
    print("‚ö†Ô∏è sentence_transformers not found. Reranking will be disabled.")
except Exception as e:
    print(f"‚ö†Ô∏è Error importing CrossEncoder: {e}. Reranking disabled.")

from utils import (
    load_data_and_index,
    log_provenance_step,
    generate_reproducibility_hash,
    calculate_confidence_score,
)

# ============================================================
# CONFIG / PATHS
# ============================================================

# ‚ö†Ô∏è OLD CONFIG (COMMENTED OUT)
# ‚ö†Ô∏è FOR GITHUB: keep this as "***" and DO NOT commit your real key.
# API_KEY = "******"  # <-- replace in Colab with your real key before running
# if API_KEY == "***":
#     print("‚ö†Ô∏è WARNING: You must set API_KEY in run_bot.py before running.")
# genai.configure(api_key=API_KEY)
# gemini_model = genai.GenerativeModel("models/gemini-2.0-flash")

# --- NEW CONFIG (SECURE & 2.0 MODEL) ---
API_KEY = os.environ.get("GEMINI_API_KEY")

if not API_KEY:
    print("‚ùå ERROR: API Key not found. Please run the 'Secure Input' cell first.")
    sys.exit(1)

genai.configure(api_key=API_KEY)

# Using the Experimental 2.0 Flash endpoint
gemini_model = genai.GenerativeModel("models/gemini-2.0-flash")

CHUNK_PATH = "/content/drive/MyDrive/LLM_Based_GenAI_Sem1/data/clinical_trials_diabetes_full_chunk_map.json"
FAISS_PATH = "/content/drive/MyDrive/LLM_Based_GenAI_Sem1/data/clinical_trials_diabetes_full_faiss.index"

# Load embedding model, FAISS index, and chunk metadata
embed_model, faiss_index, chunk_map = load_data_and_index(CHUNK_PATH, FAISS_PATH)

# --- NEW: Reranker Initialization ---
reranker = None
if CrossEncoder:
    try:
        print("‚è≥ Loading Reranker Model (Cross-Encoder)...")
        # High precision reranker
        reranker = CrossEncoder('cross-encoder/ms-marco-MiniLM-L-6-v2')
        print("‚úÖ Reranker Loaded.")
    except Exception as e:
        print(f"‚ö†Ô∏è Reranker model download failed (using pure FAISS): {e}")


# ============================================================
# AGENT 1 ‚Äî Symptom Parser
# ============================================================

# --- OLD PARSER (COMMENTED OUT) ---
# # class SymptomParser:
# #     def __init__(self, model):
# #         self.model = model

# #     def parse(self, text: str):
# #         """
# #         Returns:
# #           parsed: dict with symptoms, duration, context, intent
# #           log: provenance entry
# #         """
# #         prompt = (
# #             "You are a medical NLP parser.\n"
# #             "Extract structured info and detect whether this is a greeting or a symptom query.\n\n"
# #             f'Input: "{text}"\n\n'
# #             "Return ONLY valid JSON with this format:\n"
# #             "{\n"
# #             '  "symptoms": ["list", "of", "symptoms"],\n'
# #             '  "duration": "text or null",\n'
# #             '  "context": "extra free-text context",\n'
# #             '  "intent": "greeting" or "symptom_query" or "other"\n'
# #             "}\n"
# #         )

# #         try:
# #             res = self.model.generate_content(prompt)
# #             raw = (res.text or "").strip()
# #             match = re.search(r"\{.*\}", raw, re.DOTALL)
# #             if match:
# #                 parsed = json.loads(match.group(0))
# #             else:
# #                 parsed = json.loads(raw)
# #         except Exception:
# #             # Fallback
# #             parsed = {
# #                 "symptoms": [text],
# #                 "duration": None,
# #                 "context": "",
# #                 "intent": "symptom_query",
# #             }

# #         log = log_provenance_step("SymptomParser", text, parsed)
# #         return parsed, log



# # class SymptomParser:
# #     def __init__(self, model):
# #         self.model = model

# #     def parse(self, text: str):
# #         """
# #         Returns:
# #           parsed: dict with symptoms, duration, context, intent, relevance_to_diabetes
# #           log: provenance entry
# #         """
# #         prompt = (
# #             "You are a medical NLP parser for a diabetes clinical trial chatbot.\n"
# #             "Extract structured info and classify the query type.\n\n"
# #             f'Input: "{text}"\n\n'
# #             "Return ONLY valid JSON with this format:\n"
# #             "{\n"
# #             '  "symptoms": ["list", "of", "symptoms"],\n'
# #             '  "duration": "text or null",\n'
# #             '  "context": "extra free-text context",\n'
# #             '  "intent": "greeting" or "symptom_query" or "general_question" or "off_topic",\n'
# #             '  "is_diabetes_related": true or false,\n'
# #             '  "query_type": "knowledge_seeking" or "symptom_matching" or "greeting"\n'
# #             "}\n\n"
# #             "Intent rules:\n"
# #             "- 'greeting': hi, hello, hey, etc.\n"
# #             "- 'general_question': asking about diabetes info (symptoms, treatment, etc.)\n"
# #             "- 'symptom_query': describing personal symptoms\n"
# #             "- 'off_topic': not related to diabetes at all\n\n"
# #             "is_diabetes_related:\n"
# #             "- true if query mentions diabetes, blood sugar, insulin, HbA1c, or diabetes complications\n"
# #             "- false if symptoms/conditions are unrelated (e.g., headache, stomach upset alone)\n"
# #         )

# #         try:
# #             res = self.model.generate_content(prompt)
# #             raw = (res.text or "").strip()
# #             match = re.search(r"\{.*\}", raw, re.DOTALL)
# #             if match:
# #                 parsed = json.loads(match.group(0))
# #             else:
# #                 parsed = json.loads(raw)
# #         except Exception:
# #             # Fallback
# #             parsed = {
# #                 "symptoms": [text],
# #                 "duration": None,
# #                 "context": "",
# #                 "intent": "symptom_query",
# #                 "is_diabetes_related": True,
# #                 "query_type": "symptom_matching",
# #             }

# #         log = log_provenance_step("SymptomParser", text, parsed)
# #         return parsed, log




# class SymptomParser:
#     def __init__(self, model):
#         self.model = model

#     def parse(self, text: str):
#         """
#         Returns:
#           parsed: dict with symptoms, duration, context, intent, relevance_to_diabetes
#           log: provenance entry
#         """
#         prompt = (
#             "You are a medical NLP parser for a diabetes clinical trial chatbot.\n"
#             "Extract structured info and classify the query type.\n\n"
#             f'Input: "{text}"\n\n'
#             "Return ONLY valid JSON with this format:\n"
#             "{\n"
#             '  "symptoms": ["list", "of", "symptoms"],\n'
#             '  "duration": "text or null",\n'
#             '  "context": "extra free-text context",\n'
#             '  "intent": "greeting" or "symptom_query" or "general_question" or "off_topic",\n'
#             '  "is_diabetes_related": true or false,\n'
#             '  "query_type": "knowledge_seeking" or "symptom_matching" or "greeting",\n'
#             '  "user_question": "the actual question being asked in plain English"\n'
#             "}\n\n"
#             "Classification rules:\n"
#             "- intent='greeting' ‚Üí query_type='greeting' (hi, hello, hey)\n"
#             "- intent='general_question' ‚Üí query_type='knowledge_seeking' (asking ABOUT diabetes, not describing symptoms)\n"
#             "  Examples: 'What are symptoms of diabetes?', 'How is diabetes treated?', 'What is HbA1c?'\n"
#             "- intent='symptom_query' ‚Üí query_type='symptom_matching' (user describing THEIR symptoms)\n"
#             "  Examples: 'I have high blood sugar', 'I feel tired and thirsty'\n"
#             "- intent='off_topic' ‚Üí not diabetes-related at all\n\n"
#             "is_diabetes_related:\n"
#             "- true if about diabetes, blood sugar, insulin, HbA1c, or diabetes complications\n"
#             "- false if unrelated (headache alone, cold, flu, etc.)\n"
#         )

#         try:
#             res = self.model.generate_content(prompt)
#             raw = (res.text or "").strip()
#             match = re.search(r"\{.*\}", raw, re.DOTALL)
#             if match:
#                 parsed = json.loads(match.group(0))
#             else:
#                 parsed = json.loads(raw)

#             # Validation: ensure query_type matches intent
#             intent = parsed.get("intent", "symptom_query")
#             if intent == "general_question":
#                 parsed["query_type"] = "knowledge_seeking"
#             elif intent == "greeting":
#                 parsed["query_type"] = "greeting"
#             elif intent == "symptom_query":
#                 parsed["query_type"] = "symptom_matching"

#         except Exception:
#             # Fallback
#             parsed = {
#                 "symptoms": [text],
#                 "duration": None,
#                 "context": "",
#                 "intent": "symptom_query",
#                 "is_diabetes_related": True,
#                 "query_type": "symptom_matching",
#                 "user_question": text,
#             }

#         log = log_provenance_step("SymptomParser", text, parsed)
#         return parsed, log

# --- NEW PARSER (UPDATED) ---
class SymptomParser:
    def __init__(self, model):
        self.model = model

    def parse(self, text: str):
        """
        Returns:
          parsed: dict with symptoms, duration, context, intent, relevance_to_diabetes
          log: provenance entry
        """
        prompt = (
            "You are a medical NLP parser for a diabetes clinical trial chatbot.\n"
            "Extract structured info and classify the query type.\n\n"
            f'Input: "{text}"\n\n'
            "Return ONLY valid JSON with this format:\n"
            "{\n"
            '  "symptoms": ["list", "of", "symptoms"],\n'
            '  "duration": "text or null",\n'
            '  "context": "extra free-text context",\n'
            '  "intent": "greeting" or "symptom_query" or "general_question" or "off_topic",\n'
            '  "is_diabetes_related": true or false,\n'
            '  "query_type": "knowledge_seeking" or "symptom_matching" or "greeting",\n'
            '  "user_question": "the actual question being asked in plain English"\n'
            "}\n\n"
            "Classification rules:\n"
            "- intent='greeting' ‚Üí query_type='greeting' (hi, hello, hey)\n"
            "- intent='general_question' ‚Üí query_type='knowledge_seeking' (asking ABOUT diabetes, not describing symptoms)\n"
            "  Examples: 'What are symptoms of diabetes?', 'How is diabetes treated?', 'What is HbA1c?'\n"
            "- intent='symptom_query' ‚Üí query_type='symptom_matching' (user describing THEIR symptoms)\n"
            "  Examples: 'I have high blood sugar', 'I feel tired and thirsty'\n"
            "- intent='off_topic' ‚Üí not diabetes-related at all\n\n"
            "is_diabetes_related:\n"
            "- true if about diabetes, blood sugar, insulin, HbA1c, or diabetes complications\n"
            "- false if unrelated (headache alone, cold, flu, etc.)\n"
        )

        try:
            res = self.model.generate_content(prompt)
            raw = (res.text or "").strip()
            match = re.search(r"\{.*\}", raw, re.DOTALL)
            if match:
                parsed = json.loads(match.group(0))
            else:
                parsed = json.loads(raw)

            # Validation: ensure query_type matches intent
            intent = parsed.get("intent", "symptom_query")
            if intent == "general_question":
                parsed["query_type"] = "knowledge_seeking"
            elif intent == "greeting":
                parsed["query_type"] = "greeting"
            elif intent == "symptom_query":
                parsed["query_type"] = "symptom_matching"

        except Exception:
            # Fallback
            parsed = {
                "symptoms": [text],
                "duration": None,
                "context": "",
                "intent": "symptom_query",
                "is_diabetes_related": True,
                "query_type": "symptom_matching",
                "user_question": text,
            }

        log = log_provenance_step("SymptomParser", text, parsed)
        return parsed, log


# ============================================================
# AGENT 2 ‚Äî ProfileAgent
# ============================================================

# --- OLD PROFILE AGENT (COMMENTED OUT) ---
# class ProfileAgent:
#     def __init__(self, initial_profile: Dict[str, Any] = None):
#         if initial_profile is None:
#             initial_profile = {
#                 "user_id": "Patient",
#                 "conditions": ["diabetes"],
#                 "history": [],
#             }
#         self.profile = initial_profile

#     def update_profile(self, turn_data: Dict[str, Any]):
#         self.profile.setdefault("history", []).append(turn_data)
#         snapshot = {
#             "user_id": self.profile.get("user_id", "Patient"),
#             "conditions": self.profile.get("conditions", []),
#             "num_turns": len(self.profile["history"]),
#         }
#         log = log_provenance_step("ProfileAgent", turn_data, {"profile_snapshot": snapshot})
#         return log

# --- NEW PROFILE AGENT (STATEFUL) ---
class ProfileAgent:
    def __init__(self, initial_profile: Dict[str, Any] = None):
        if initial_profile is None:
            initial_profile = {
                "user_id": "Patient",
                "conditions": ["diabetes"], # Default context
                "extracted_conditions": [], # Dynamic memory
                "history": [],
            }
        self.profile = initial_profile

    def update_profile(self, turn_data: Dict[str, Any]):
        """
        Updates history and extracts persistent medical entities.
        """
        self.profile.setdefault("history", []).append(turn_data)
        self.profile.setdefault("extracted_conditions", [])

        # Heuristic: Add new symptoms found in this turn to the persistent profile
        parsed = turn_data.get("parsed", {})
        new_symptoms = parsed.get("symptoms", [])

        if new_symptoms:
            current_conditions = set(self.profile["extracted_conditions"])
            for sym in new_symptoms:
                if sym and len(sym) > 3: # Avoid noise
                    current_conditions.add(sym.lower())
            self.profile["extracted_conditions"] = list(current_conditions)

        snapshot = {
            "user_id": self.profile.get("user_id", "Patient"),
            "known_conditions": self.profile.get("extracted_conditions", []),
            "num_turns": len(self.profile["history"]),
        }
        log = log_provenance_step("ProfileAgent", turn_data, {"profile_snapshot": snapshot})
        return log


# ============================================================
# AGENT 3 ‚Äî RetrievalAgent
# ============================================================

# # --- OLD RETRIEVAL AGENT (COMMENTED OUT) ---
# class RetrievalAgent:
#     def __init__(self, embed_model, faiss_index, chunk_map, profile_agent: ProfileAgent = None):
#         self.embed_model = embed_model
#         self.index = faiss_index
#         self.chunk_map = chunk_map
#         self.profile_agent = profile_agent

#     def retrieve(self, parsed: Dict[str, Any], top_k: int = 5):
#         symptoms = parsed.get("symptoms") or []
#         context = parsed.get("context") or ""
#         query = (" ".join(symptoms) + " " + context).strip()

#         if not query:
#             retrieval = {"query": "", "trials": [], "avg_confidence": 0.0}
#             log = log_provenance_step("RetrievalAgent", parsed, retrieval, {"reason": "empty_query"})
#             return retrieval, log

#         q_emb = self.embed_model.encode([query])
#         distances, indices = self.index.search(q_emb.astype("float32"), top_k)

#         trials = []
#         confs = []

#         for rank, idx in enumerate(indices[0]):
#             item = self.chunk_map[idx]
#             dist = float(distances[0][rank])
#             conf = calculate_confidence_score(dist)
#             confs.append(conf)

#             trials.append({
#                 "nct_id": item["nct_id"],
#                 "title": item["title"],
#                 "text": item["text"],
#                 "status": item["status"],
#                 "distance": dist,
#                 "confidence": conf,
#                 "rank": rank + 1,
#             })

#         avg_conf = float(np.mean(confs)) if confs else 0.0

#         retrieval = {
#             "query": query,
#             "trials": trials,
#             "avg_confidence": avg_conf,
#         }

#         detail = {
#             "top_k": top_k,
#             "avg_confidence": avg_conf,
#             "num_trials": len(trials),
#         }

#         log = log_provenance_step("RetrievalAgent", parsed, retrieval, detail)
#         return retrieval, log

# --- NEW RETRIEVAL AGENT (RERANKING) ---
class RetrievalAgent:
    def __init__(self, embed_model, faiss_index, chunk_map, profile_agent: ProfileAgent = None):
        self.embed_model = embed_model
        self.index = faiss_index
        self.chunk_map = chunk_map
        self.profile_agent = profile_agent

    def retrieve(self, parsed: Dict[str, Any], top_k: int = 5):
        # Fetch 3x candidates for reranking
        FETCH_K = top_k * 3

        symptoms = parsed.get("symptoms") or []
        context = parsed.get("context") or ""
        # user_question usually captures the intent best
        query = parsed.get("user_question") or (" ".join(symptoms) + " " + context).strip()

        if not query:
            retrieval = {"query": "", "trials": [], "avg_confidence": 0.0}
            log = log_provenance_step("RetrievalAgent", parsed, retrieval, {"reason": "empty_query"})
            return retrieval, log

        # 1. FAISS Retrieval (Fast/Dense)
        q_emb = self.embed_model.encode([query])
        distances, indices = self.index.search(q_emb.astype("float32"), FETCH_K)

        initial_candidates = []
        for rank, idx in enumerate(indices[0]):
            if idx == -1: continue
            item = self.chunk_map[idx]
            dist = float(distances[0][rank])
            initial_candidates.append({
                "nct_id": item["nct_id"],
                "text": item["text"],
                "status": item["status"],
                "faiss_dist": dist,
            })

        # 2. Reranking (Cross-Encoder)
        final_trials = []
        confs = []

        if reranker and initial_candidates:
            # Score (Query, Doc) pairs
            pairs = [[query, cand["text"]] for cand in initial_candidates]
            scores = reranker.predict(pairs)

            # Attach scores
            for i, cand in enumerate(initial_candidates):
                cand["rerank_score"] = float(scores[i])

            # Sort by rerank score (descending)
            initial_candidates.sort(key=lambda x: x["rerank_score"], reverse=True)

            # Take top_k
            top_hits = initial_candidates[:top_k]

            for rank, item in enumerate(top_hits):
                # Sigmoid normalization for confidence
                logit = item["rerank_score"]
                conf = 1 / (1 + np.exp(-logit))
                confs.append(conf)

                final_trials.append({
                    "nct_id": item["nct_id"],
                    "title": item["text"].split("\n")[0].replace("Title: ", ""),
                    "text": item["text"],
                    "status": item["status"],
                    "confidence": conf,
                    "rank": rank + 1,
                    "method": "reranked"
                })
        else:
            # Fallback if reranker is not loaded
            top_hits = initial_candidates[:top_k]
            for rank, item in enumerate(top_hits):
                conf = calculate_confidence_score(item["faiss_dist"])
                confs.append(conf)
                final_trials.append({
                    "nct_id": item["nct_id"],
                    "title": item["text"].split("\n")[0].replace("Title: ", ""),
                    "text": item["text"],
                    "status": item["status"],
                    "confidence": conf,
                    "rank": rank + 1,
                    "method": "faiss_only"
                })

        avg_conf = float(np.mean(confs)) if confs else 0.0

        retrieval = {
            "query": query,
            "trials": final_trials,
            "avg_confidence": avg_conf,
        }

        detail = {
            "top_k": top_k,
            "avg_confidence": avg_conf,
            "num_trials": len(final_trials),
            "method": "reranked" if reranker else "faiss_only"
        }

        log = log_provenance_step("RetrievalAgent", parsed, retrieval, detail)
        return retrieval, log


# ============================================================
# AGENT 4 ‚Äî DiagnosisAdvisor
# ============================================================

# --- OLD DIAGNOSIS ADVISOR (COMMENTED OUT) ---

# # class DiagnosisAdvisor:
# #     def __init__(self, model):
# #         self.model = model

# #     def advise(self, parsed: Dict[str, Any], retrieved: Dict[str, Any]):
# #         trials = retrieved.get("trials", [])
# #         avg_conf = retrieved.get("avg_confidence", 0.0)

# #         # If retrieval is very low confidence, veto early
# #         draft = {
# #             "recommendation": "",
# #             "avg_confidence": avg_conf,
# #         }

# #         if not trials or avg_conf < 0.15:
# #             draft["recommendation"] = (
# #                 "EVIDENCE IS INSUFFICIENT TO ANSWER THIS QUESTION DIRECTLY based on the "
# #                 "retrieved clinical trials. Please consult your healthcare provider."
# #             )
# #             draft["confidence_veto"] = True
# #             log = log_provenance_step(
# #                 "DiagnosisAdvisor",
# #                 {"parsed": parsed, "retrieval_meta": {"avg_confidence": avg_conf, "num_trials": len(trials)}},
# #                 draft,
# #                 {"veto": True},
# #             )
# #             return draft, log

# #         evidence_parts = []
# #         for t in trials:
# #             evidence_parts.append(
# #                 f"Trial {t['nct_id']} (rank {t['rank']}, confidence {t['confidence']:.2f}):\n{t['text']}\n"
# #             )
# #         evidence = "\n".join(evidence_parts)

# #         prompt = (
# #             "You are an evidence-based medical assistant summarizing clinical trials.\n"
# #             "You MUST answer based ONLY on the evidence below.\n"
# #             "If the evidence does not clearly answer the question, explicitly say:\n"
# #             '"EVIDENCE IS INSUFFICIENT TO ANSWER THIS QUESTION DIRECTLY."\n\n'
# #             "Rules:\n"
# #             "- Do NOT diagnose.\n"
# #             "- Do NOT tell the user to start/stop/change any medication.\n"
# #             "- Summarize what the trials studied (population, interventions, outcomes).\n"
# #             "- End with: 'Please discuss these findings with your healthcare provider before making any changes.'\n\n"
# #             "PATIENT QUERY (parsed JSON):\n"
# #             f"{json.dumps(parsed, indent=2)}\n\n"
# #             "RETRIEVED CLINICAL TRIAL EVIDENCE:\n"
# #             f"{evidence}\n"
# #         )

# #         try:
# #             res = self.model.generate_content(prompt)
# #             text = (res.text or "").strip()
# #             if not text:
# #                 text = "EVIDENCE IS INSUFFICIENT TO ANSWER THIS QUESTION DIRECTLY."
# #             draft["recommendation"] = text
# #             draft["confidence_veto"] = False
# #         except Exception:
# #             draft["recommendation"] = "Unable to generate advice at this time."
# #             draft["confidence_veto"] = True

# #         log = log_provenance_step(
# #             "DiagnosisAdvisor",
# #             {"parsed": parsed, "retrieval_meta": {"avg_confidence": avg_conf, "num_trials": len(trials)}},
# #             draft,
# #         )
# #         return draft, log



# class DiagnosisAdvisor:
#     def __init__(self, model):
#         self.model = model

#     # def _handle_general_question(self, parsed: Dict[str, Any], retrieved: Dict[str, Any]):
#     #     """Handle general knowledge questions about diabetes"""
#     #     trials = retrieved.get("trials", [])
#     #     user_query = " ".join(parsed.get("symptoms", []))

#     #     evidence_parts = []
#     #     for t in trials[:3]:  # Use top 3 for context
#     #         evidence_parts.append(
#     #             f"Trial {t['nct_id']}: {t['text'][:300]}...\n"
#     #         )
#     #     evidence = "\n".join(evidence_parts) if evidence_parts else "No specific trials found."

#     #     prompt = (
#     #         "You are a diabetes health educator. Answer the user's question clearly and accurately.\n"
#     #         "Use your medical knowledge AND the clinical trial evidence provided as validation.\n\n"
#     #         f"USER QUESTION: {user_query}\n\n"
#     #         "SUPPORTING EVIDENCE FROM CLINICAL TRIALS:\n"
#     #         f"{evidence}\n\n"
#     #         "Instructions:\n"
#     #         "- Answer the question directly and clearly\n"
#     #         "- If trials provide relevant context, mention them\n"
#     #         "- Keep answer concise (3-4 sentences)\n"
#     #         "- End with: 'For personalized advice, please consult your healthcare provider.'\n"
#     #     )

#     #     try:
#     #         res = self.model.generate_content(prompt)
#     #         text = (res.text or "").strip()
#     #         if not text:
#     #             text = "I don't have enough information to answer this question accurately."
#     #         return text
#     #     except Exception:
#     #         return "Unable to generate an answer at this time."


#     def _handle_general_question(self, parsed: Dict[str, Any], retrieved: Dict[str, Any]):
#         """Handle general knowledge questions about diabetes"""
#         trials = retrieved.get("trials", [])
#         user_question = parsed.get("user_question") or " ".join(parsed.get("symptoms", []))

#         # Build evidence context (top 3 trials)
#         evidence_parts = []
#         for t in trials[:3]:
#             evidence_parts.append(
#                 f"Trial {t['nct_id']}: {t['text'][:400]}"
#             )
#         evidence = "\n\n".join(evidence_parts) if evidence_parts else "No specific trials available."

#         prompt = (
#             "You are a diabetes health educator. Answer the user's question clearly using your medical knowledge.\n"
#             "The clinical trial evidence below provides real-world context - mention it if relevant.\n\n"
#             f"USER'S QUESTION: {user_question}\n\n"
#             "CLINICAL TRIAL CONTEXT (for reference):\n"
#             f"{evidence}\n\n"
#             "Instructions:\n"
#             "- Answer the question directly in 3-5 sentences\n"
#             "- Be specific and educational\n"
#             "- If trials mention relevant findings, cite them briefly\n"
#             "- End with: 'For personalized advice, please consult your healthcare provider.'\n\n"
#             "Example for 'What are symptoms of diabetes?':\n"
#             "Common symptoms of diabetes include increased thirst, frequent urination, unexplained weight loss, "
#             "fatigue, blurred vision, and slow-healing wounds. Type 1 diabetes symptoms often appear suddenly, "
#             "while type 2 symptoms develop gradually. Some clinical trials (like NCT...) study complications "
#             "such as neuropathy and retinopathy. For personalized advice, please consult your healthcare provider.\n"
#         )

#         try:
#             res = self.model.generate_content(prompt)
#             text = (res.text or "").strip()
#             if not text or len(text) < 50:
#                 text = "I don't have enough information to answer this question accurately. Please consult your healthcare provider."
#             return text
#         except Exception as e:
#             return f"Unable to generate an answer at this time. Please try rephrasing your question."



#     # def _handle_symptom_query(self, parsed: Dict[str, Any], retrieved: Dict[str, Any]):
#     #     """Handle symptom-based queries with evidence"""
#     #     trials = retrieved.get("trials", [])
#     #     avg_conf = retrieved.get("avg_confidence", 0.0)
#     #     symptoms = parsed.get("symptoms", [])
#     #     user_input = parsed.get("user_question") or ", ".join(symptoms)

#     #     evidence_parts = []
#     #     for t in trials:
#     #         evidence_parts.append(
#     #             f"Trial {t['nct_id']} (confidence {t['confidence']:.2f}):\n{t['text']}"
#     #         )
#     #     evidence = "\n\n".join(evidence_parts)

#     #     prompt = (
#     #         "You are an evidence-based diabetes health assistant.\n"
#     #         "The user has described symptoms. Provide a helpful response based ONLY on the clinical trial evidence.\n\n"
#     #         f"USER'S SYMPTOMS/CONCERN: {user_input}\n\n"
#     #         "RETRIEVED CLINICAL TRIAL EVIDENCE:\n"
#     #         f"{evidence}\n\n"
#     #         "Instructions:\n"
#     #         "- Start with 1-2 sentences acknowledging their symptoms\n"
#     #         "- Summarize what the trials found about these symptoms/conditions\n"
#     #         "- List 2-3 specific trials with their focus\n"
#     #         "- Do NOT diagnose or recommend medication changes\n"
#     #         "- End with: 'Please discuss these findings with your healthcare provider before making any changes.'\n\n"
#     #         "Example format:\n"
#     #         "Based on your symptoms of high blood sugar and fatigue, several diabetes trials have investigated these concerns. "
#     #         "Research shows that fatigue is commonly studied alongside glycemic control and quality of life measures. "
#     #         "Here are relevant trials:\n"
#     #         "‚Ä¢ NCT... examines fatigue in type 2 diabetes patients\n"
#     #         "‚Ä¢ NCT... studies the relationship between blood sugar levels and energy\n"
#     #         "Please discuss these findings with your healthcare provider before making any changes.\n"
#     #     )

#     #     try:
#     #         res = self.model.generate_content(prompt)
#     #         text = (res.text or "").strip()
#     #         if not text or len(text) < 50:
#     #             text = "The retrieved trials may not directly address your specific symptoms. Please consult your healthcare provider."
#     #         return text
#     #     except Exception:
#     #         return "Unable to generate advice at this time. Please consult your healthcare provider."


#     def _handle_symptom_query(self, parsed: Dict[str, Any], retrieved: Dict[str, Any]):
#         """Handle symptom-based queries with evidence"""
#         trials = retrieved.get("trials", [])
#         avg_conf = retrieved.get("avg_confidence", 0.0)
#         symptoms = parsed.get("symptoms", [])
#         user_input = parsed.get("user_question", ", ".join(symptoms))

#         evidence_parts = []
#         for t in trials[:5]:  # Top 5 trials
#             evidence_parts.append(
#                 f"‚Ä¢ {t['nct_id']}: {t['text'][:350]}"
#             )
#         evidence = "\n\n".join(evidence_parts)

#         prompt = (
#             "You are an evidence-based diabetes health assistant.\n"
#             "The user has described diabetes-related symptoms or concerns. Provide a helpful, empathetic response.\n\n"
#             f"USER'S SYMPTOMS/CONCERN: {user_input}\n\n"
#             "RETRIEVED CLINICAL TRIAL EVIDENCE:\n"
#             f"{evidence}\n\n"
#             "Instructions:\n"
#             "1. Start with 1-2 sentences acknowledging their concern (brief medical context)\n"
#             "2. Say 'Several clinical trials have investigated this' or similar transition\n"
#             "3. List 3-4 specific trials with brief descriptions:\n"
#             "   ‚Ä¢ NCT... examines [brief focus]\n"
#             "   ‚Ä¢ NCT... investigates [brief focus]\n"
#             "4. Do NOT diagnose or recommend medication changes\n"
#             "5. End with: 'Please discuss these findings with your healthcare provider before making any changes.'\n\n"
#             "Keep response concise (5-7 sentences total).\n"
#         )

#         try:
#             res = self.model.generate_content(prompt)
#             text = (res.text or "").strip()
#             if not text or len(text) < 50:
#                 text = "The retrieved trials may not directly address your specific symptoms. Please consult your healthcare provider."
#             return text
#         except Exception:
#             return "Unable to generate advice at this time. Please consult your healthcare provider."



#     def advise(self, parsed: Dict[str, Any], retrieved: Dict[str, Any]):
#         trials = retrieved.get("trials", [])
#         avg_conf = retrieved.get("avg_confidence", 0.0)
#         query_type = parsed.get("query_type", "symptom_matching")
#         is_diabetes_related = parsed.get("is_diabetes_related", True)

#         draft = {
#             "recommendation": "",
#             "avg_confidence": avg_conf,
#             "query_type": query_type,
#         }

#         # Handle off-topic queries
#         if not is_diabetes_related:
#             draft["recommendation"] = (
#                 "I'm specialized in diabetes-related clinical trials. Your query appears to be "
#                 "about symptoms or conditions not directly related to diabetes. "
#                 "If you have diabetes-related questions or symptoms (like high blood sugar, "
#                 "insulin management, complications, etc.), I'd be happy to help! "
#                 "Otherwise, please consult your healthcare provider for your current symptoms."
#             )
#             draft["confidence_veto"] = True
#             log = log_provenance_step(
#                 "DiagnosisAdvisor",
#                 {"parsed": parsed, "retrieval_meta": {"avg_confidence": avg_conf}},
#                 draft,
#                 {"veto": True, "reason": "off_topic"},
#             )
#             return draft, log

#         # Handle low confidence
#         if not trials or avg_conf < 0.15:
#             draft["recommendation"] = (
#                 "EVIDENCE IS INSUFFICIENT TO ANSWER THIS QUESTION DIRECTLY based on the "
#                 "retrieved clinical trials. Please consult your healthcare provider."
#             )
#             draft["confidence_veto"] = True
#             log = log_provenance_step(
#                 "DiagnosisAdvisor",
#                 {"parsed": parsed, "retrieval_meta": {"avg_confidence": avg_conf, "num_trials": len(trials)}},
#                 draft,
#                 {"veto": True, "reason": "low_confidence"},
#             )
#             return draft, log

#         # Route to appropriate handler
#         if query_type == "knowledge_seeking":
#             draft["recommendation"] = self._handle_general_question(parsed, retrieved)
#         else:
#             draft["recommendation"] = self._handle_symptom_query(parsed, retrieved)

#         draft["confidence_veto"] = False

#         log = log_provenance_step(
#             "DiagnosisAdvisor",
#             {"parsed": parsed, "retrieval_meta": {"avg_confidence": avg_conf, "num_trials": len(trials)}},
#             draft,
#         )
#         return draft, log

# --- NEW DIAGNOSIS ADVISOR (CONTEXT AWARE) ---
class DiagnosisAdvisor:
    def __init__(self, model):
        self.model = model

    def _handle_general_question(self, parsed: Dict[str, Any], retrieved: Dict[str, Any]):
        """Handle general knowledge questions about diabetes"""
        trials = retrieved.get("trials", [])
        user_question = parsed.get("user_question") or " ".join(parsed.get("symptoms", []))

        # Build evidence context (top 3 trials)
        evidence_parts = []
        for t in trials[:3]:
            evidence_parts.append(f"Trial {t['nct_id']}: {t['text'][:400]}")
        evidence = "\n\n".join(evidence_parts) if evidence_parts else "No specific trials available."

        prompt = (
            "You are a diabetes health educator. Answer the user's question clearly using your medical knowledge.\n"
            "The clinical trial evidence below provides real-world context - mention it if relevant.\n\n"
            f"USER'S QUESTION: {user_question}\n\n"
            "CLINICAL TRIAL CONTEXT (for reference):\n"
            f"{evidence}\n\n"
            "Instructions:\n"
            "- Answer the question directly in 3-5 sentences\n"
            "- Be specific and educational\n"
            "- If trials mention relevant findings, cite them briefly\n"
            "- End with: 'For personalized advice, please consult your healthcare provider.'\n"
        )

        try:
            res = self.model.generate_content(prompt)
            text = (res.text or "").strip()
            if not text or len(text) < 50:
                text = "I don't have enough information to answer this question accurately. Please consult your healthcare provider."
            return text
        except Exception:
            return "Unable to generate an answer at this time. Please try rephrasing your question."

    def _handle_symptom_query(self, parsed: Dict[str, Any], retrieved: Dict[str, Any], profile: Dict[str, Any]):
        """Handle symptom-based queries with evidence AND profile context"""
        trials = retrieved.get("trials", [])
        symptoms = parsed.get("symptoms", [])
        user_input = parsed.get("user_question", ", ".join(symptoms))

        # Inject Profile Context
        known_conditions = ", ".join(profile.get("extracted_conditions", []))
        patient_context = f"Patient Profile: Known conditions include {known_conditions}." if known_conditions else "Patient Profile: New patient."

        evidence_parts = []
        for t in trials[:5]:  # Top 5 trials
            evidence_parts.append(f"‚Ä¢ {t['nct_id']}: {t['text'][:350]}")
        evidence = "\n\n".join(evidence_parts)

        prompt = (
            "You are an evidence-based diabetes health assistant.\n"
            "The user has described diabetes-related symptoms or concerns. Provide a helpful, empathetic response.\n\n"
            f"{patient_context}\n"
            f"USER'S SYMPTOMS/CONCERN: {user_input}\n\n"
            "RETRIEVED CLINICAL TRIAL EVIDENCE:\n"
            f"{evidence}\n\n"
            "Instructions:\n"
            "1. Start with 1-2 sentences acknowledging their concern (brief medical context)\n"
            "2. Say 'Several clinical trials have investigated this' or similar transition\n"
            "3. List 3-4 specific trials with brief descriptions:\n"
            "   ‚Ä¢ NCT... examines [brief focus]\n"
            "   ‚Ä¢ NCT... investigates [brief focus]\n"
            "4. Do NOT diagnose or recommend medication changes\n"
            "5. End with: 'Please discuss these findings with your healthcare provider before making any changes.'\n\n"
            "Keep response concise (5-7 sentences total).\n"
        )

        try:
            res = self.model.generate_content(prompt)
            text = (res.text or "").strip()
            if not text or len(text) < 50:
                text = "The retrieved trials may not directly address your specific symptoms. Please consult your healthcare provider."
            return text
        except Exception:
            return "Unable to generate advice at this time. Please consult your healthcare provider."

    def advise(self, parsed: Dict[str, Any], retrieved: Dict[str, Any], profile: Dict[str, Any]):
        trials = retrieved.get("trials", [])
        avg_conf = retrieved.get("avg_confidence", 0.0)
        query_type = parsed.get("query_type", "symptom_matching")
        is_diabetes_related = parsed.get("is_diabetes_related", True)

        draft = {
            "recommendation": "",
            "avg_confidence": avg_conf,
            "query_type": query_type,
        }

        # Handle off-topic queries
        if not is_diabetes_related:
            draft["recommendation"] = (
                "I'm specialized in diabetes-related clinical trials. Your query appears to be "
                "about symptoms or conditions not directly related to diabetes. "
                "If you have diabetes-related questions or symptoms (like high blood sugar, "
                "insulin management, complications, etc.), I'd be happy to help! "
                "Otherwise, please consult your healthcare provider for your current symptoms."
            )
            draft["confidence_veto"] = True
            log = log_provenance_step("DiagnosisAdvisor", parsed, draft, {"veto": True, "reason": "off_topic"})
            return draft, log

        # Handle low confidence
        if not trials or avg_conf < 0.15:
            draft["recommendation"] = (
                "EVIDENCE IS INSUFFICIENT TO ANSWER THIS QUESTION DIRECTLY based on the "
                "retrieved clinical trials. Please consult your healthcare provider."
            )
            draft["confidence_veto"] = True
            log = log_provenance_step("DiagnosisAdvisor", parsed, draft, {"veto": True, "reason": "low_confidence"})
            return draft, log

        # Route to appropriate handler
        if query_type == "knowledge_seeking":
            draft["recommendation"] = self._handle_general_question(parsed, retrieved)
        else:
            draft["recommendation"] = self._handle_symptom_query(parsed, retrieved, profile)

        draft["confidence_veto"] = False

        log = log_provenance_step("DiagnosisAdvisor", parsed, draft)
        return draft, log


# ============================================================
# AGENT 5 ‚Äî ActiveSafetyFilter
# ============================================================

# --- OLD SAFETY FILTER (COMMENTED OUT) ---

# class ActiveSafetyFilter:
#     def __init__(self, model):
#         self.model = model
#         self.safety_cfg = {
#             HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: HarmBlockThreshold.BLOCK_NONE,
#         }

#     def verify(self, advice_text: str, trials: List[Dict[str, Any]]):
#         evidence_text = "\n".join(t["text"] for t in trials)

#         audit_prompt = (
#             "You are a Medical Safety Officer reviewing AI-generated advice.\n\n"
#             "ADVICE:\n"
#             f"{advice_text}\n\n"
#             "EVIDENCE FROM CLINICAL TRIALS:\n"
#             f"{evidence_text[:4000]}\n\n"
#             "Check for safety issues:\n"
#             "- If the advice suggests stopping or changing medication without a doctor ‚Üí UNSAFE.\n"
#             "- If it gives a diagnosis ‚Üí UNSAFE.\n"
#             "- If it makes claims not supported by the evidence ‚Üí UNSAFE.\n\n"
#             'If the advice is acceptable, respond with exactly: SAFE\n'
#             'If it is not acceptable, respond starting with: CORRECTED: \n'
#         )

#         try:
#             res = self.model.generate_content(audit_prompt, safety_settings=self.safety_cfg)
#             txt = (res.text or "").strip()
#             if txt.startswith("SAFE"):
#                 final_text = advice_text
#                 status = "Pass"
#             else:
#                 final_text = f"‚ö†Ô∏è SAFETY REVISION:\n{txt}"
#                 status = "Revised"
#         except Exception:
#             final_text = "‚ö†Ô∏è Safety filter triggered. Please consult a doctor."
#             status = "Revised (API)"

#         log = log_provenance_step(
#             "ActiveSafetyFilter",
#             {"advice": advice_text},
#             {"final_text": final_text, "status": status},
#         )
#         return final_text, status, log

# --- NEW SAFETY FILTER (UNCHANGED BUT RE-DECLARED) ---
class ActiveSafetyFilter:
    def __init__(self, model):
        self.model = model
        self.safety_cfg = {
            HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: HarmBlockThreshold.BLOCK_NONE,
        }

    def verify(self, advice_text: str, trials: List[Dict[str, Any]]):
        evidence_text = "\n".join(t["text"] for t in trials)

        audit_prompt = (
            "You are a Medical Safety Officer reviewing AI-generated advice.\n\n"
            "ADVICE:\n"
            f"{advice_text}\n\n"
            "EVIDENCE FROM CLINICAL TRIALS:\n"
            f"{evidence_text[:4000]}\n\n"
            "Check for safety issues:\n"
            "- If the advice suggests stopping or changing medication without a doctor ‚Üí UNSAFE.\n"
            "- If it gives a diagnosis ‚Üí UNSAFE.\n"
            "- If it makes claims not supported by the evidence ‚Üí UNSAFE.\n\n"
            'If the advice is acceptable, respond with exactly: SAFE\n'
            'If it is not acceptable, respond starting with: CORRECTED: <safer version>\n'
        )

        try:
            res = self.model.generate_content(audit_prompt, safety_settings=self.safety_cfg)
            txt = (res.text or "").strip()
            if txt.startswith("SAFE"):
                final_text = advice_text
                status = "Pass"
            else:
                final_text = f"‚ö†Ô∏è SAFETY REVISION:\n{txt}"
                status = "Revised"
        except Exception:
            final_text = "‚ö†Ô∏è Safety filter triggered. Please consult a doctor."
            status = "Revised (API)"

        log = log_provenance_step(
            "ActiveSafetyFilter",
            {"advice": advice_text},
            {"final_text": final_text, "status": status},
        )
        return final_text, status, log


# ============================================================
# HEALTHCARE BOT (Orchestrator)
# ============================================================

# --- OLD BOT (COMMENTED OUT) ---
# ============================================================
# HEALTHCARE BOT (Orchestrator)
# ============================================================

# class HealthcareBot:
#     def __init__(self, gemini_model, embed_model, faiss_index, chunk_map, initial_profile=None):
#         self.parser = SymptomParser(gemini_model)
#         self.profile_agent = ProfileAgent(initial_profile)
#         self.retriever = RetrievalAgent(embed_model, faiss_index, chunk_map, self.profile_agent)
#         self.advisor = DiagnosisAdvisor(gemini_model)
#         self.safety = ActiveSafetyFilter(gemini_model)

#         self.history: List[Dict[str, Any]] = []
#         self.provenance_chain: List[Dict[str, Any]] = []

#     def _handle_simple_greeting(self, user_input: str):
#         user_id = self.profile_agent.profile.get("user_id", "there")
#         msg = (
#             f"Hello {user_id}! I'm your clinical trial health assistant. "
#             "Tell me your symptoms or a question about diabetes-related trials, "
#             "and I‚Äôll summarize relevant evidence. I cannot diagnose or give direct medical orders."
#         )

#         log = log_provenance_step(
#             "GreetingAgent",
#             user_input,
#             msg,
#             {"type": "greeting"},
#         )
#         self.provenance_chain.append(log)

#         session_hash = generate_reproducibility_hash(self.history + [{"query": user_input}])
#         self.history.append({"query": user_input, "response_hash": session_hash})

#         return {
#             "recommendation": msg,
#             "cited_trials": [],
#             "safety_status": "Non-RAG",
#             "session_hash": session_hash,
#             "provenance_chain": self.provenance_chain,
#         }



#     def _handle_off_topic(self, user_input: str, parsed: Dict[str, Any]):
#         """Handle off-topic queries"""
#         msg = (
#             "I'm specialized in diabetes-related clinical trials and information. "
#             "Your query appears to be about symptoms or conditions not directly related to diabetes. "
#             "If you have diabetes-related questions (blood sugar, insulin, complications, medications, etc.), "
#             "I'd be happy to help! Otherwise, please consult your healthcare provider."
#         )

#         log = log_provenance_step("OffTopicHandler", user_input, msg, {"type": "off_topic"})
#         self.provenance_chain.append(log)

#         session_hash = generate_reproducibility_hash(self.history + [{"query": user_input}])

#         turn_data = {
#             "query": user_input,
#             "parsed": parsed,
#             "nct_ids": [],
#             "safety_status": "Off-topic",
#             "session_hash": session_hash,
#         }
#         profile_log = self.profile_agent.update_profile(turn_data)
#         self.provenance_chain.append(profile_log)
#         self.history.append({"query": user_input, "response_hash": session_hash})

#         return {
#             "recommendation": msg,
#             "cited_trials": [],
#             "safety_status": "Off-topic",
#             "session_hash": session_hash,
#             "provenance_chain": self.provenance_chain,
#         }



#     def _handle_knowledge_question(self, user_input: str, parsed: Dict[str, Any]):
#         """Handle general diabetes knowledge questions using LLM's built-in knowledge"""

#         user_question = parsed.get("user_question", user_input)

#         prompt = (
#             "You are a certified diabetes educator. Answer this question clearly and accurately "
#             "using evidence-based medical knowledge.\n\n"
#             f"QUESTION: {user_question}\n\n"
#             "Instructions:\n"
#             "- Provide a clear, educational answer (4-6 sentences)\n"
#             "- Use medical accuracy\n"
#             "- Be specific with examples when relevant\n"
#             "- Mention that clinical trials are available if they want personalized info\n"
#             "- End with: 'For personalized guidance based on your specific situation, "
#             "please ask about your symptoms or consult your healthcare provider.'\n"
#         )

#         try:
#             res = self.advisor.model.generate_content(prompt)
#             answer = (res.text or "").strip()

#             if not answer or len(answer) < 50:
#                 answer = (
#                     "I can help you find relevant clinical trials for your specific symptoms. "
#                     "For general diabetes information, please consult your healthcare provider "
#                     "or ask me about specific symptoms you're experiencing."
#                 )
#         except Exception:
#             answer = "Unable to answer at this time. Please try rephrasing your question."

#         # Log as knowledge-based response
#         log = log_provenance_step(
#             "KnowledgeAgent",
#             user_input,
#             answer,
#             {"type": "general_knowledge", "no_retrieval": True}
#         )
#         self.provenance_chain.append(log)

#         session_hash = generate_reproducibility_hash(self.history + [{"query": user_input}])

#         # Update profile
#         turn_data = {
#             "query": user_input,
#             "parsed": parsed,
#             "nct_ids": [],
#             "safety_status": "Knowledge-Based (No Retrieval)",
#             "session_hash": session_hash,
#         }
#         profile_log = self.profile_agent.update_profile(turn_data)
#         self.provenance_chain.append(profile_log)
#         self.history.append({"query": user_input, "response_hash": session_hash})

#         return {
#             "recommendation": answer,
#             "cited_trials": [],
#             "safety_status": "Knowledge-Based (No Retrieval)",
#             "session_hash": session_hash,
#             "provenance_chain": self.provenance_chain,
#         }




#     def process_query(self, user_input: str):
#         self.provenance_chain = []

#         # 1. Parse
#         parsed, parse_log = self.parser.parse(user_input)
#         self.provenance_chain.append(parse_log)

#         intent = (parsed.get("intent") or "symptom_query").lower()
#         is_diabetes_related = parsed.get("is_diabetes_related", True)
#         query_type = parsed.get("query_type", "symptom_matching")

#         # Handle greetings
#         if intent == "greeting":
#             return self._handle_simple_greeting(user_input)

#         # Handle off-topic queries (skip retrieval)
#         if intent == "off_topic" or not is_diabetes_related:
#             msg = (
#                 "I'm specialized in diabetes-related clinical trials. Your query appears to be "
#                 "about symptoms or conditions not directly related to diabetes. "
#                 "If you have diabetes-related questions or symptoms (like high blood sugar, "
#                 "insulin management, complications, etc.), I'd be happy to help!"
#             )

#             session_hash = generate_reproducibility_hash(self.history + [{"query": user_input}])
#             turn_data = {
#                 "query": user_input,
#                 "parsed": parsed,
#                 "nct_ids": [],
#                 "safety_status": "Off-topic (No Retrieval)",
#                 "session_hash": session_hash,
#             }
#             profile_log = self.profile_agent.update_profile(turn_data)
#             self.provenance_chain.append(profile_log)
#             self.history.append({"query": user_input, "response_hash": session_hash})

#             return {
#                 "recommendation": msg,
#                 "cited_trials": [],
#                 "safety_status": "Off-topic (No Retrieval)",
#                 "session_hash": session_hash,
#                 "provenance_chain": self.provenance_chain,
#             }

#         # NEW: Handle general knowledge questions (no retrieval needed)
#         if query_type == "knowledge_seeking":
#             user_question = parsed.get("user_question", user_input)

#             prompt = (
#                 "You are a certified diabetes educator. Answer this question clearly and accurately "
#                 "using evidence-based medical knowledge.\n\n"
#                 f"QUESTION: {user_question}\n\n"
#                 "Instructions:\n"
#                 "- Provide a clear, educational answer (4-6 sentences)\n"
#                 "- Use medical accuracy and be specific\n"
#                 "- Mention common examples when relevant\n"
#                 "- End with: 'For personalized guidance based on your specific situation, "
#                 "please describe your symptoms or consult your healthcare provider.'\n"
#             )

#             try:
#                 res = self.advisor.model.generate_content(prompt)
#                 answer = (res.text or "").strip()

#                 if not answer or len(answer) < 50:
#                     answer = (
#                         "I can help you find relevant clinical trials for your specific symptoms. "
#                         "For general diabetes information, please consult your healthcare provider "
#                         "or ask me about specific symptoms you're experiencing."
#                     )
#             except Exception:
#                 answer = "Unable to answer at this time. Please try rephrasing your question."

#             # Log as knowledge-based response
#             log = log_provenance_step(
#                 "KnowledgeAgent",
#                 user_input,
#                 answer,
#                 {"type": "general_knowledge", "no_retrieval": True}
#             )
#             self.provenance_chain.append(log)

#             session_hash = generate_reproducibility_hash(self.history + [{"query": user_input}])

#             turn_data = {
#                 "query": user_input,
#                 "parsed": parsed,
#                 "nct_ids": [],
#                 "safety_status": "Knowledge-Based (No Retrieval)",
#                 "session_hash": session_hash,
#             }
#             profile_log = self.profile_agent.update_profile(turn_data)
#             self.provenance_chain.append(profile_log)
#             self.history.append({"query": user_input, "response_hash": session_hash})

#             return {
#                 "recommendation": answer,
#                 "cited_trials": [],
#                 "safety_status": "Knowledge-Based (No Retrieval)",
#                 "session_hash": session_hash,
#                 "provenance_chain": self.provenance_chain,
#             }

#         # 2. Retrieve (for symptom queries)
#         retrieved, retrieve_log = self.retriever.retrieve(parsed)
#         self.provenance_chain.append(retrieve_log)

#         # 3. Advisor
#         draft_advice, advise_log = self.advisor.advise(parsed, retrieved)
#         self.provenance_chain.append(advise_log)

#         trials = retrieved.get("trials", [])
#         if draft_advice.get("confidence_veto", False) or not trials:
#             final_text = draft_advice["recommendation"]
#             safety_status = "Vetoed (Low Confidence)"
#             evidence_list = []
#         else:
#             # 4. Safety
#             final_text, safety_status, safety_log = self.safety.verify(
#                 draft_advice["recommendation"],
#                 trials,
#             )
#             self.provenance_chain.append(safety_log)
#             evidence_list = trials

#         nct_ids = [t["nct_id"] for t in evidence_list]

#         session_hash = generate_reproducibility_hash(self.history + [{"query": user_input}])

#         # 5. Update profile/history
#         turn_data = {
#             "query": user_input,
#             "parsed": parsed,
#             "nct_ids": nct_ids,
#             "safety_status": safety_status,
#             "session_hash": session_hash,
#         }
#         profile_log = self.profile_agent.update_profile(turn_data)
#         self.provenance_chain.append(profile_log)
#         self.history.append({"query": user_input, "response_hash": session_hash})

#         return {
#             "recommendation": final_text,
#             "cited_trials": nct_ids,
#             "safety_status": safety_status,
#             "session_hash": session_hash,
#             "provenance_chain": self.provenance_chain,
#         }



# --- NEW BOT (ORCHESTRATOR) ---
class HealthcareBot:
    def __init__(self, gemini_model, embed_model, faiss_index, chunk_map, initial_profile=None):
        self.parser = SymptomParser(gemini_model)
        self.profile_agent = ProfileAgent(initial_profile)
        self.retriever = RetrievalAgent(embed_model, faiss_index, chunk_map, self.profile_agent)
        self.advisor = DiagnosisAdvisor(gemini_model)
        self.safety = ActiveSafetyFilter(gemini_model)

        self.history: List[Dict[str, Any]] = []
        self.provenance_chain: List[Dict[str, Any]] = []

    def _handle_simple_greeting(self, user_input: str):
        user_id = self.profile_agent.profile.get("user_id", "there")
        msg = (
            f"Hello {user_id}! I'm your clinical trial health assistant. "
            "Tell me your symptoms or a question about diabetes-related trials, "
            "and I‚Äôll summarize relevant evidence. I cannot diagnose or give direct medical orders."
        )

        log = log_provenance_step("GreetingAgent", user_input, msg, {"type": "greeting"})
        self.provenance_chain.append(log)

        session_hash = generate_reproducibility_hash(self.history + [{"query": user_input}])
        self.history.append({"query": user_input, "response_hash": session_hash})

        return {
            "recommendation": msg,
            "cited_trials": [],
            "safety_status": "Non-RAG",
            "session_hash": session_hash,
            "provenance_chain": self.provenance_chain,
        }

    def _handle_off_topic(self, user_input: str, parsed: Dict[str, Any]):
        msg = (
            "I'm specialized in diabetes-related clinical trials. Your query appears to be "
            "about symptoms or conditions not directly related to diabetes. "
            "If you have diabetes-related questions, I'd be happy to help!"
        )
        log = log_provenance_step("OffTopicHandler", user_input, msg, {"type": "off_topic"})
        self.provenance_chain.append(log)
        session_hash = generate_reproducibility_hash(self.history + [{"query": user_input}])

        return {
            "recommendation": msg,
            "cited_trials": [],
            "safety_status": "Off-topic",
            "session_hash": session_hash,
            "provenance_chain": self.provenance_chain,
        }

    def _handle_knowledge_question(self, user_input: str, parsed: Dict[str, Any]):
        user_question = parsed.get("user_question", user_input)
        prompt = (
            "You are a certified diabetes educator. Answer this question clearly and accurately.\n"
            f"QUESTION: {user_question}\n"
        )
        try:
            res = self.advisor.model.generate_content(prompt)
            answer = (res.text or "").strip()
        except:
            answer = "Unable to answer at this time."

        log = log_provenance_step("KnowledgeAgent", user_input, answer, {"type": "general_knowledge"})
        self.provenance_chain.append(log)
        session_hash = generate_reproducibility_hash(self.history + [{"query": user_input}])

        return {
            "recommendation": answer,
            "cited_trials": [],
            "safety_status": "Knowledge-Based",
            "session_hash": session_hash,
            "provenance_chain": self.provenance_chain,
        }

    def process_query(self, user_input: str):
        self.provenance_chain = []

        # 1. Parse
        parsed, parse_log = self.parser.parse(user_input)
        self.provenance_chain.append(parse_log)

        intent = (parsed.get("intent") or "symptom_query").lower()
        is_diabetes_related = parsed.get("is_diabetes_related", True)
        query_type = parsed.get("query_type", "symptom_matching")

        if intent == "greeting":
            return self._handle_simple_greeting(user_input)
        if intent == "off_topic" or not is_diabetes_related:
            return self._handle_off_topic(user_input, parsed)
        if query_type == "knowledge_seeking":
            return self._handle_knowledge_question(user_input, parsed)

        # 2. Retrieve (now with Reranker)
        retrieved, retrieve_log = self.retriever.retrieve(parsed)
        self.provenance_chain.append(retrieve_log)

        # 3. Advisor (now with Profile Context)
        draft_advice, advise_log = self.advisor.advise(parsed, retrieved, self.profile_agent.profile)
        self.provenance_chain.append(advise_log)

        trials = retrieved.get("trials", [])
        if draft_advice.get("confidence_veto", False) or not trials:
            final_text = draft_advice["recommendation"]
            safety_status = "Vetoed (Low Confidence)"
            evidence_list = []
        else:
            # 4. Safety
            final_text, safety_status, safety_log = self.safety.verify(draft_advice["recommendation"], trials)
            self.provenance_chain.append(safety_log)
            evidence_list = trials

        nct_ids = [t["nct_id"] for t in evidence_list]
        session_hash = generate_reproducibility_hash(self.history + [{"query": user_input}])

        # 5. Update profile/history
        turn_data = {
            "query": user_input,
            "parsed": parsed,
            "nct_ids": nct_ids,
            "safety_status": safety_status,
            "session_hash": session_hash,
        }
        profile_log = self.profile_agent.update_profile(turn_data)
        self.provenance_chain.append(profile_log)
        self.history.append({"query": user_input, "response_hash": session_hash})

        return {
            "recommendation": final_text,
            "cited_trials": nct_ids,
            "safety_status": safety_status,
            "session_hash": session_hash,
            "provenance_chain": self.provenance_chain,
        }

# ============================================================
# GLOBAL BOT INSTANCE + ENTRYPOINT
# ============================================================

default_profile = {
    "user_id": "Patient",
    "conditions": ["diabetes"],
    "extracted_conditions": []
}

_bot = HealthcareBot(gemini_model, embed_model, faiss_index, chunk_map, initial_profile=default_profile)

def run_bot(user_input: str) -> Dict[str, Any]:
    return _bot.process_query(user_input)

Overwriting run_bot.py


UI frontend application simple web interface

https://docs.streamlit.io/develop/tutorials/chat-and-llm-apps/build-conversational-apps

In [8]:
%%writefile app.py
import streamlit as st
import os

if "GEMINI_API_KEY" not in os.environ:
    st.error("‚ö†Ô∏è API Key missing! Please run the 'Secure Input' cell in the notebook first.")

from run_bot import run_bot

st.title("Clinical Trial Health Advisor ü§ñ")
st.caption("AI for Healthcare - Clinical Trials RAG")

if "messages" not in st.session_state:
    st.session_state.messages = []

for msg in st.session_state.messages:
    with st.chat_message(msg["role"]):
        st.markdown(msg["content"])

if user_input := st.chat_input("Describe your symptoms..."):
    st.session_state.messages.append({"role": "user", "content": user_input})
    with st.chat_message("user"):
        st.markdown(user_input)

    with st.spinner("Searching clinical trials..."):
        result = run_bot(user_input)
        reply = result["recommendation"]

    with st.chat_message("assistant"):
        st.markdown(reply)

    st.session_state.messages.append({"role": "assistant", "content": reply})

Overwriting app.py


In [9]:
!wget -q https://github.com/cloudflare/cloudflared/releases/latest/download/cloudflared-linux-amd64
!mv cloudflared-linux-amd64 cloudflared
!chmod +x cloudflared

In [10]:
#AI LLM
!streamlit run app.py &>/dev/null&
!./cloudflared tunnel --url http://localhost:8501 --no-autoupdate

[90m2025-11-24T22:49:36Z[0m [32mINF[0m Thank you for trying Cloudflare Tunnel. Doing so, without a Cloudflare account, is a quick way to experiment and try it out. However, be aware that these account-less Tunnels have no uptime guarantee, are subject to the Cloudflare Online Services Terms of Use (https://www.cloudflare.com/website-terms/), and Cloudflare reserves the right to investigate your use of Tunnels for violations of such terms. If you intend to use Tunnels in production you should use a pre-created named tunnel by following: https://developers.cloudflare.com/cloudflare-one/connections/connect-apps
[90m2025-11-24T22:49:36Z[0m [32mINF[0m Requesting new quick Tunnel on trycloudflare.com...
[90m2025-11-24T22:49:39Z[0m [32mINF[0m +--------------------------------------------------------------------------------------------+
[90m2025-11-24T22:49:39Z[0m [32mINF[0m |  Your quick Tunnel has been created! Visit it at (it may take some time to be reachable):  |
[90m2025