In [None]:
# %pip install -U langchain langgraph langchain_community langchain_openai langchain_core ollama pandas duckdb faiss-cpu sentence-transformers biopython pypdf pydantic lxml html2text beautifulsoup4 matplotlib -qqq

In [None]:
import os
import getpass
from dotenv import load_dotenv

load_dotenv()

if "LANGCHAIN_API_KEY" not in os.environ or "ENTREZ_EMAIL" not in os.environ:
    print("Required environment variables not set. Please set them in your .env file or environment.")
else:
    print("Environment variables loaded successfully.")

os.environ["LANGCHAIN_PROJECT"] = "AI_Clinical_Trials_Architect"

In [None]:
from langchain_community.chat_models import ChatOllama
from langchain_community.embeddings import OllamaEmbeddings

llm_config = {
    "planner": ChatOllama(model="llama3.1:8b", temperature=0.0, format='json'),
    "drafter": ChatOllama(model="qwen2:7b", temperature=0.2),
    "sql_coder": ChatOllama(model="qwen2:7b", temperature=0.0),
    "director": ChatOllama(model="llama3:70b", temperature=0.0, format='json'),
    "embedding_model": OllamaEmbeddings(model="nomic-embed-text")
}

In [None]:
print("LLM clients configured:")
print(f"Planner ({llm_config['planner'].model}): {llm_config['planner']}")
print(f"Drafter ({llm_config['drafter'].model}): {llm_config['drafter']}")
print(f"SQL Coder ({llm_config['sql_coder'].model}): {llm_config['sql_coder']}")
print(f"Director ({llm_config['director'].model}): {llm_config['director']}")
print(f"Embedding Model ({llm_config['embedding_model'].model}): {llm_config['embedding_model']}")

In [None]:
global data_paths

data_paths = {
    "base": "./data",
    "pubmed": "./data/pubmed_articles",
    "fda": "./data/fda_guidelines",
    "ethics": "./data/ethical_guidelines",
    "mimic": "./data/mimic_db"
}

for path in data_paths.values():
    # os.makedirs(path, exist_ok=True)
    # print(f"Directory initialized: {path}")
    if not os.path.exists(path):
        os.makedirs(path)
        print(f"Created directory: {path}")

In [None]:
from Bio import Entrez, Medline

def download_pubmed_articles(query, max_articles=20):
    """Fetches abstracts from PubMed for a given query and saves them as text files."""

    Entrez.email = os.environ.get("ENTREZ_EMAIL", "aashishjain3009@gmail.com")

    # Step 1: Use Entrez.esearch to find the PubMed IDs (PMIDs) for articles matching our query.
    handle = Entrez.esearch(db="pubmed", term=query, retmax=max_articles, sort="relevance")
    record = Entrez.read(handle)
    id_list = record["IdList"]

    # Step 2: Use Entrez.efetch to retrieve the full records (in MEDLINE format) for the list of PMIDs.
    handle = Entrez.efetch(db="pubmed", id=id_list, rettype="medline", retmode="text")
    records = Medline.parse(handle)

    count = 0
    # Step 3: Iterate through the retrieved records, parse them, and save each abstract to a file.
    for i, record in enumerate(records):
        pmid = record.get("PMID", "")
        title = record.get("TI", "No Title")
        abstract = record.get("AB", "No Abstract")
        if pmid:
            filepath = os.path.join(data_paths["pubmed"], f"{pmid}.txt")
            with open(filepath, "w") as f:
                f.write(f"Title: {title}\n\nAbstract: {abstract}")
            print(f"[{i+1}/{len(id_list)}] Fetching PMID: {pmid}... Saved to {filepath}")
            count += 1
    return count

In [None]:
pubmed_query = "(SGLT2 inhibitor) AND (type 2 diabetes) AND (renal impairment)"
num_downloaded = download_pubmed_articles(pubmed_query)
print(f"PubMed download complete. {num_downloaded} articles saved.")

In [None]:
import requests, io
from pypdf import PdfReader

def download_and_extract_text_from_pdf(url, output_path):
    """Downloads a PDF from a URL, saves it, and also extracts its text content to a separate .txt file."""
    print(f"Downloading FDA Guideline: {url}")
    try:
        response = requests.get(url)
        response.raise_for_status()

        with open(output_path, "wb") as f:
            f.write(response.content)
        print(f"Successfully downloaded and saved to {output_path}")

        reader = PdfReader(io.BytesIO(response.content))
        text = ""
        for page in reader.pages:
            text += page.extract_text() + "\n\n"

        txt_output_path = os.path.splitext(output_path)[0] + ".txt"
        with open(txt_output_path, "w") as f:
            f.write(text)
        return True
    except requests.exceptions.RequestException as e:
        print(f"Error downloading file: {e}")
        return False

In [None]:
fda_url = "https://www.fda.gov/media/71185/download"
fda_pdf_path = os.path.join(data_paths["fda"], "fda_diabetes_guidance.pdf")
download_and_extract_text_from_pdf(fda_url, fda_pdf_path)

In [None]:
ethics_content = """
Title: Summary of the Belmont Report Principles for Clinical Research
1. Respect for Persons: This principle requires that individuals be treated as autonomous agents and that persons with diminished autonomy are entitled to protection. This translates to robust informed consent process. Inclusion/exclusion criteria must not unduly target or coerce vulnerable populations, such as economically disadvantaged individuals, prisoners, or those with severe cognitive impairments, unless the research is directly intended to benefit that population.
2. Beneficence: This principle involves two complementary rules: (1) do not harm and (2) maximize possible benefits and minimize possible harms. The criteria must be designed to select a population that is most likely to benefit and least likely to be harmed by the intervention. The risks to subjects must be reasonable in relation to anticipated benefits.
3. Justice: This principle concerns the fairness of distribution of the burdens and benefits of research. The selection of research subjects must be equitable. Criteria should not be designed to exclude certain groups without a sound scientific or safety-related justification. For example, excluding participants based on race, gender or socioeconomic status is unjust unless there is a clear rationale to the drug's mechanism or risk profile.
"""

ethics_path = os.path.join(data_paths["ethics"], "belmont_summary.txt")

with open(ethics_path, "w") as f:
    f.write(ethics_content)
print(f"Created ethics guideline file: {ethics_path}")

In [None]:
import duckdb, pandas as pd, os

def load_real_mimic_data():
    """Loads real MIMIC-III CSVs into a persistent DuckDB database file, processing the massive LABEVENTS table efficiently."""
    print("Attempting to load real MIMIC-III data from local CSVs...")
    db_path = os.path.join(data_paths["mimic"], "mimic3_real.db")
    csv_dir = os.path.join(data_paths["mimic"], "mimiciii_csvs")

    required_files = {
        "patients": os.path.join(csv_dir, "PATIENTS.csv"),
        "diagnoses": os.path.join(csv_dir, "DIAGNOSES_ICD.csv"),
        "labevents": os.path.join(csv_dir, "LABEVENTS.csv"),
    }

    missing_files = [path for path in required_files.values() if not os.path.exists(path)]
    if missing_files:
        print("ERROR: The following MIMIC-III files were not found:")
        for f in missing_files: print(f"- {f}")
        print("\nPlease download them as instructed and place them in the correct directory.")
        return None
    
    print("Required files found. Proceeding with database creation.")
    if os.path.exists(db_path):
        os.remove(db_path)
    con = duckdb.connect(db_path)

    print(f"Loading {required_files['patients']} into DuckDB...")
    con.execute(f"CREATE TABLE patients AS SELECT SUBJECT_ID, GENDER, DOB, DOD FROM read_csv_auto('{required_files['patients']}')")
    
    print(f"Loading {required_files['diagnoses']} into DuckDB...")
    con.execute(f"CREATE TABLE diagnoses_icd AS SELECT SUBJECT_ID, ICD9_CODE FROM read_csv_auto('{required_files['diagnoses']}')")

    print(f"Loading and processing {required_files['labevents']} (this may take several minutes)...")
    # 1. Load the data into a temporary 'staging' table, treating all columns as text ('all_varchar=True').
    #    This prevents parsing errors with mixed data types. We also filter for only the lab item IDs we
    #    care about (50912 for Creatinine, 50852 for HbAlc) and use a regex to ensure VALUENUM is numeric.

    sql = f"""CREATE TABLE labevents_staging AS
                    SELECT SUBJECT_ID, ITEMID, VALUENUM
                    FROM read_csv_auto('{required_files['labevents']}', all_varchar=True)
                    WHERE ITEMID IN ('50912','50852') AND VALUENUM IS NOT NULL AND VALUENUM ~ '^[0-9]+(\\.[0-9]+)?$'
                """
    con.execute(sql)

    # 2. Create the final, clean table by selecting from the staging table and casting the columns to their correct numeric types.
    con.execute("CREATE TABLE labevents AS SELECT SUBJECT_ID, CAST(ITEMID AS INTEGER) AS ITEMID, CAST(VALUENUM AS DOUBLE) AS VALUENUM FROM labevents_staging")
    # 3. Drop the temporary staging table to save space.
    con.execute("DROP TABLE labevents_staging")
    con.close()
    return db_path

# Execute the function to build the database
try:
    db_path = load_real_mimic_data()
except Exception as e:
    print(f"Error while building MIMIC DB: {e}")
    db_path = None

# IF the database was created successfully, connect to it and inspect the schema and some sample data.
if db_path:
    print(f"\nReal MIMIC-III database created at: {db_path}")
    print(f"\nTesting database connection and schema...")
    con = duckdb.connect(db_path)
    print(f"Tables in DB: {con.execute('SHOW TABLES').df()['name'].tolist()}")
    print("\nSample of 'patients' table:")
    print(con.execute("SELECT * FROM patients limit 5").df())
    print("\nSample of 'diagnoses_icd table:")
    print(con.execute("SELECT * FROM diagnoses_icd LIMIT 5").df())
    con.close()
else:
    print("MIMIC DB not created.")


In [None]:
from langchain_community.document_loaders import DirectoryLoader, TextLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_community.vectorstores import FAISS
from langchain_core.documents import Document

def create_vector_store(folder_path: str, embedding_model, store_name: str):
    """Loads all .txt files from a folder, splits them into chunks, and creates an in-memory FAISS vector store."""
    print(f"--- Creating {store_name} Vector Store ---")
    loader = DirectoryLoader(folder_path, glob="**/*.txt", loader_cls=TextLoader, show_progress=True)
    documents = loader.load()

    if not documents:
        print(f"No documents found in {folder_path}, skipping vector store creation.")
        return None
    
    text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=100)
    texts = text_splitter.split_documents(documents)

    print(f"Loaded {len(documents)} documents, split into {len(texts)} chunks.")
    print("Generating embeddings and indexing into FAISS... (This may take a moment)")
    db = FAISS.from_documents(texts, embedding_model)
    print(f"{store_name} Vector Store created successfully.")
    return db

def create_retrievers(embedding_model):
    """Creates vector store retrievers for all unstructured data sources and consolidates all knowledge stores."""
    pubmed_db = create_vector_store(data_paths["pubmed"], embedding_model, "PubMed")
    fda_db = create_vector_store(data_paths["fda"], embedding_model, "FDA")
    ethics_db = create_vector_store(data_paths["ethics"], embedding_model, "Ethics")

    return {
        "pubmed_retriever": pubmed_db.as_retriever(search_kwargs={"k": 3}) if pubmed_db else None,
        "fda_retriever": fda_db.as_retriever(search_kwargs={"k": 3}) if fda_db else None,
        "ethics_retriever": ethics_db.as_retriever(search_kwargs={"k": 3}) if ethics_db else None,
        "mimic_db_path": db_path
    }

In [None]:
knowledge_stores = create_retrievers(llm_config["embedding_model"])

print("\nKnowledge stores and retrievers created successfully.")
for name, store in knowledge_stores.items():
    print(f"{name}: {store}")

In [None]:
from pydantic import BaseModel, Field
from typing import Literal

class GuildSOP(BaseModel):
    """Standard Operating Procedures for the Trial Design Guild.. This object acts as a dynamic configuraiton for the entire RAG workflow."""
    planner_prompt: str = Field(description="The system prompt for the Planner Agent.")
    researcher_retriever_k: int = Field(description="Number of documents for the Medical Researcher to retrieve.", default=3)
    synthesizer_prompt: str = Field(description="The system prompt for the Criteria Synthesizer Agent.")
    synthesizer_model: Literal["qwen2:7b", "llama3.1:8b"] = Field(description="The LLM to use for the Synthesizer.", default="qwen2:7b")
    use_sql_analyst: bool = Field(description="Whether to use the Patient Cohort Analyst agent.", default=True)
    use_ethics_specialist: bool = Field(description="Whether to use the Ethics Specialist agent.", default=True)

In [None]:
import json

baseline_sop = GuildSOP(
    planner_prompt="""You are a master planner for clinical trial design. Your task is to receive a high-level trial concept and break it down into a structured plan with specific sub-tasks for a team of specialists: a Regulatory Specialist, an Ethics Specialist, and a Patient Cohort Analyst. Output a JSON object with a single key 'plan' containing a list of tasks. Each task must have 'agent', 'task_description' keys.""",
    synthesizer_prompt="""You are an expert medical writer. Your task is to synthesize the structured findings from all specialist teams into a formal 'Inclusion and Exclusion Criteria' document. Be concise, precise, and adhere strictly to the information provided. Structure your output into two sections: 'Inclusion Criteria' and 'Exclusion Criteria'.""",
    researcher_retriever_k=3,
    synthesizer_model="qwen2:7b",
    use_sql_analyst=True,
    use_ethics_specialist=True
)

print("Baseline GuildSOP (v1.0):")
print(json.dumps(baseline_sop.model_dump(), indent=4))

In [None]:
from typing import List, Dict, Any, Optional
from pydantic import BaseModel
from typing_extensions import TypedDict

class AgentOutput(BaseModel):
    """A structured output for each agent's findings."""
    agent_name: str
    findings: Any


class GuildState(TypedDict):
    """The state of the Trial Design Guild's workflow, passed between all nodes."""
    initial_request: str
    plan: Optional[Dict[str, Any]]
    agent_outputs: List[AgentOutput]
    final_criteria: Optional[str]
    sop: GuildSOP

In [None]:
import json
from typing import List

def planner_agent(state: GuildState) -> GuildState:
    """Receives the initial request and creates a structured plan for the specialist agents."""
    print("--- EXECUTING PLANNER AGENT ---")

    sop = state["sop"]
    llm = llm_config["planner"]

    prompt = f"""
{sop.planner_prompt}

Trial Concept:
{state['initial_request']}

INSTRUCTIONS:
- Return a numbered list of execution steps.
- Each step must be one short sentence.
- Do not include explanations or commentary.
"""

    print(f"Planner Prompt:\n{prompt}")

    response = llm.invoke(prompt)

    raw_text = response.content.strip()
    print(f"Raw Planner Output:\n{raw_text}")

    # Simple, robust parsing
    plan: List[str] = [
        line.lstrip("0123456789. ").strip()
        for line in raw_text.split("\n")
        if line.strip()
    ]

    print(f"Parsed Plan:\n{json.dumps(plan, indent=2)}")

    # Normalize planner output into the structured dict expected downstream
    structured_plan = {"plan": [{"agent": "Generalist", "task_description": s} for s in plan]}

    return {
        **state,
        "plan": structured_plan
    }

In [None]:
def retrieval_agent(task_description: str, state: GuildState, retriever_name: str, agent_name: str) -> AgentOutput:
    """A generic agent function that performs retrieval from a specific vector store based on a task description."""
    print(f"--- EXECUTING {agent_name.upper()} ---")
    print(f"Task: {task_description}")

    retriever = knowledge_stores[retriever_name]

    if agent_name == "Medical Researcher":
        retriever.search_kwargs['k'] = state['sop'].researcher_retriever_k
        print(f"Using k={state['sop'].researcher_retriever_k} for retrieval.")

    retrieval_docs = retriever.invoke(task_description)

    findings = "\n\n---\n\n".join([f"Source: {doc.metadata.get('source', 'N/A')}\n\n{doc.page_content}" for doc in retrieval_docs])
    print(f"Retrieved {len(retrieval_docs)} documents")
    print(f"Sample finding:\n{findings[:500]}...")

    return AgentOutput(agent_name, findings)

In [None]:
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

def patient_cohort_analyst(task_description: str, state: GuildState) -> AgentOutput:
    """Estimates cohort size by generating and then executing a SQL query against the MIMIC database."""
    print("--- EXECUTING PATIENT COHORT ANALYST ---")

    if not state['sop'].use_sql_analyst:
        print("SQL Analyst skipped as per SOP.")
        return AgentOutput(agent_name="Patient Cohort Analyst", findings="Analysis skipped as per SOP.")
    

    con = duckdb.connect(knowledge_stores['mimic_db_path'])
    schema_query = """
    SELECT table_name, column_name, data_type
    FROM information_schema.columns
    WHERE table_schema = 'main' ORDER BY table_name, column_name;
    """
    schema = con.execute(schema_query).df()
    con.close()

    sql_generation_prompt = ChatPromptTemplate.from_messages([
        ("system", f"You are an expert SQL writer specializing in DuckDB. Your task is to write a single, valid SQL query to count unique patients based on a request. The database contains MIMIC-III patient data with the following schema:\n{schema.to_string()}\n\nIMPORTANT: All column names in your query MUST be uppercase (e.g., SELECT SUBJECT_ID, ICD9_CODE...).\n\nKey Mappings:\m- T2DM (Type 2 Diabetes) corresponds to ICD9_CODE '25000'.\n-Moderate renal impairment can be estimated by a creatinine lab value (ITEM 50912) where VALUENUM is between 1.5 and 3.0.\n- Uncontrolled T2D can be estimated by an HbAlc lab value (ITEMID 50852) where VALUENUM is greater than 8.0"),
        ("human", "Please write a SQL query to count the number of unique patients who meet the following criteria: {task}")
    ])

    sql_chain = sql_generation_promp | llm_config['sql_coder'] | StrOutputParser()

    print(f"Generating SQL for task: {task_description}")
    sql_query = sql_chain.invoke({"task": task_description})
    sql_query = sql_query.strip().replace("```sql", "").replace("```", "")
    print(f"Generate SQL Query:\n{sql_query}")
    try:
        con = duckdb.connect(knowledge_stores['mimic_db_path'])
        result = con.execute(sql_query).fetchone()
        patient_count = result[0] if result else 0
        con.close()

        findings = f"Generated SQL Query:\n{sql_query}\n\nEstimated eligible patient count from the database: {patient_count}."
        print(f"Query executed successfully. Estimated patient count: {patient_count}")
    except Exception as e:
        findings = f"Error executing SQL query: {e}. Defaulting to a count of 0."
        print(f"Error during query execution: {e}")
    return AgentOutput(agent_name="Patient Cohort Analyst", findings=findings)

In [None]:
def criteria_synthesizer(state: GuildState) -> GuildState:
    """Synthesizes all the structured findings from the specialist agents into the final criteria document."""
    print("--- EXECUTING CRITERIA SYNTHESIZER ---")

    sop = state['sop']
    drafter_llm = ChatOllama(model=sop.synthesizer_model, temperature=0.2)

    context = "\n\n---\n\n".join([f"**{out.agent_name} Findings:**\n{out.findings}" for out in state['agent_outputs']])

    prompt = f"{sop.synthesizer_prompt}\n\n**Context from Specialist Teams:**\n{context}"
    print(f"Synthesizer is using model '{sop.synthesizer_model}'.")

    response = drafter_llm.invoke(prompt)
    print("Final criteria generated.")

    return {**state, "final_criteria":response.content}

In [None]:
from langgraph.graph import StateGraph, END

def specialist_execution_node(state: GuildState) -> GuildState:
    """This node acts as a dispatcher, executing all specialist tasks defined in the plan."""
    plan_tasks = state['plan']['plan']
    outputs = []

    for task in plan_tasks:
        agent_name = task['agent']
        task_desc = task['task_description']

        if "Regulatory" in agent_name:
            output = retrieval_agent(task_desc, state, "fda_retriever", "Regulatory Specialist")
        elif "Medical" in agent_name:
            output = retrieval_agent(task_desc, state, "pubmed_retriever", "Medical Researcher")
        elif "Ethics" in agent_name and state['sop'].use_ethics_specialist:
            output = retrieval_agent(task_desc, state, "ethics_retriever", "Ethics Specialist")
        elif "Cohort" in agent_name:
            output = patient_cohort_analyst(task_desc, state)
        else:
            continue

        outputs.append(output)

    return {**state, "agent_outputs": outputs}


In [None]:
workflow = StateGraph(GuildState)

workflow.add_node("planner", planner_agent)
workflow.add_node("execution_specialist", specialist_execution_node)
workflow.add_node("synthesizer", criteria_synthesizer)

workflow.set_entry_point("planner")

workflow.add_edge("planner", "execution_specialist")
workflow.add_edge("execution_specialist", "synthesizer")
workflow.add_edge("synthesizer", END)

guild_graph = workflow.compile()
print("Graph compiled successfully.")

In [None]:
try:
    from IPython.display import Image
except ImportError:
    print("Could not import pygraphviz. Install it to visualize the graph.")

In [None]:
test_request = "Draft inclusion/exclusion criteria for a Phase II trial of 'Sotagliflozin', a novel SGLT2 inhibitor, for adults with uncontrolled Type 2 Diabetes (HbAlc > 8.0%) and moderate chronic kidney disease (CKD Stage 3)."

print("Running the full Guild graph with baseline SOP v1.0...")
graph_input = {
    "initial_request": test_request,
    "sop": baseline_sop
}
final_result = guild_graph.invoke(graph_input)

print("\nFinal Guild Output:")
print("---------------------")
print(final_result['final_criteria'])

In [None]:
from pydantic import BaseModel, Field
from langchain_core.prompts import ChatPromptTemplate

class GradedScore(BaseModel):
    """A Pydantic model to structure the output of our LLM-as-a-Judge evaluators."""
    score: float = Field(description="A score from 0.0 to 1.0")
    reasoning: str = Field(description="A brief justification for the score.")

# Evaluator 1: Scientific Rigor (LLM-as-a-Judge)
def scientific_rigor_evaluator(generated_criteria: str, pubmed_context: str) -> GradedScore:
    """Evaluates if the generated criteria are scientifically justified by the provided literature."""
    evaluator_llm = llm_config['director'].with_structured_output(GradedScore)
    prompt = ChatPromptTemplate.from_messages([
        ("system", "You are an expert clinical scientist. Evaluate a set of clinical trial criteria based on the provided scientific literature. A score of 1.0 means the criteria are perfectly aligned with and justified by the literature. A score of 0.0 means they contradict or ignore the literature."),
        ("human", "Evaluate the following criteria:\n\n**Generated Criteria:**\n{criteria}\n\n**Supporting Scientific Context:**\n{context}")
    ])

    chain = prompt | evaluator_llm
    return chain.invoke({"criteria": generated_criteria, "context": pubmed_context})

# Evaluator 2: Regulatory Compliance (LLM-as-a-Judge)
def regulatory_comliance_evaluator(generated_criteria: str, fda_context: str) -> GradedScore:
    """Evaluates if the generated criteria adhere to the provided FDA guidelines."""
    evaluator_llm = llm_config['director'].with_structured_output(GradedScore)
    prompt = ChatPromptTemplate.from_messages([
        ("system", "You are an expert regulatory affairs specialist. Evaluate if a set of clinical trial criteria adheres to the provided FDA guidelines. A score of 1.0 means full compliance."),
        ("human", "Evaluate the following criteria:\n\n**Generated Criteria:**\n{criteria}\n\n**Applicable FDA Guidelines:**\n{context}")
    ])

    chain = prompt | evaluator_llm
    return chain.invoke({"criteria": generated_criteria, "context": fda_context})

# Evaluator 3: Ethical Soundness (LLM-as-a-Judge)
def ethical_soundness_evaluator(generated_criteria: str, ethics_context: str) -> GradedScore:
    """Evaluates if the criteria adhere to the core principles of clinical research ethics."""
    evaluator_llm = llm_config['director'].with_structured_output(GradedScore)
    # The persona is now an "expert on clinical trial ethics".
    prompt = ChatPromptTemplate.from_messages([
        ("system", "You are an expert on clinical trial ethics. Evaluate if a set of criteria adheres to the ethical principles provided (summarizing the Belmont Report). A score of 1.0 means the criteria show strong respect for persons, beneficence, and justice."),
        ("human", "Evaluate the following criteria:\n\n**Generated Criteria:**\n{criteria}\n\n**Ethical Principles:**\n{context}")
    ])
    chain = prompt | evaluator_llm
    # We use the context from the Ethics Specialist's retriever.
    return chain.invoke({"criteria": generated_criteria, "context": ethics_context})

# Evaluator 4: Recruitment Feasibility (Programmatic)
def feasibility_evaluator(cohort_analyst_output: AgentOutput) -> GradedScore:
    """Scores feasibility by parsing the patient count from the SQL Analyst's output and normalizing it."""
    findings_text = cohort_analyst_output.findings
    try:
        count_str = findings_text.split("database: ")[1].replace(".", "")
        patient_count = int(count_str)
    except (IndexError, ValueError):
        return GradedScore(score=0.0, reasoning="Could not parse patient count from analyst output.")
    
    IDEAL_COUNT = 150.0
    score = min(1.0, patient_count / IDEAL_COUNT)
    reasoning = f"Estimated {patient_count} eligible patients. Score is normalized against an ideal target of {int(IDEAL_COUNT)}."
    return GradedScore(score=score, reasoning=reasoning)

# Evaluator 5: Operational Simplicity (Programmatic)
def simplicity_evaluator(generated_criteria: str) -> GradedScore:
    """Scores simplicity by penalizing the inclusion of expensive or complex screening tests."""
    EXPENSIVE_TESTS = ["mri", "genetic sequencing", "pet scan", "biopsy", "echocardiagram", "endoscopy"]
    test_count = sum(1 for test in EXPENSIVE_TESTS if test in generated_criteria.lower())

    score = max(0.0, 1.0 - (test_count * 0.5))
    reasoning = f"Found {test_count} expensive/complex screening procedures mentioned."
    return GradedScore(score, reasoning)


class EvaluationResult(BaseModel):
    """A Pydantic model to hold the complete 5D evaluation result."""
    rigor: GradedScore
    compliance: GradedScore
    ethics: GradedScore
    feasibility: GradedScore
    simplicity: GradedScore

In [None]:
def run_full_evaluation(guild_final_state: GuildState) -> EvaluationResult:
    """Orchestrates the entire evaluation process, calling each of the five specialist evaluators."""
    print("--- RUNNING FULL EVALUATION GAUNTLET ---")

    final_criteria = guild_final_state['final_criteria']
    agent_outputs = guild_final_state['agent_outputs']

    pubmed_context = next((o.findings for o in agent_outputs if o.agent_name == "Medical Researcher"), "")
    fda_context = next((o.findings for o in agent_outputs if o.agent_name == "Regulatory Specialist"), "")
    ethics_context = next((o.findings for o in agent_outputs if o.agent_name == "Ethics Specialist"), "")
    analyst_output = next((o.findings for o in agent_outputs if o.agent_name == "Patient Cohort Analyst"), None)

    print("Evaluating: Scientific Rigor...")
    rigor = scientific_rigor_evaluator(final_criteria, pubmed_context)
    print("Evaluating: Regulatory Compliance...")
    compliance = regulatory_comliance_evaluator(final_criteria, fda_context)
    print("Evaluating: Ethical Soundness...")
    ethics = ethical_soundness_evaluator(final_criteria, ethics_context)
    print("Evaluating: Recruitment Feasibility...")
    feasibility = feasibility_evaluator(analyst_output) if analyst_output else GradedScore(score=0, reasoning="Analyst did not run.")
    print("Evaluating: Operational Simplicity...")
    simplicity = simplicity_evaluator(final_criteria)

    print("--- EVALUATION GAUNTLET COMPLETE ---")
    return EvaluationResult(rigor=rigor, compliance=compliance, ethics=ethics, feasibility=feasibility, simplicity=simplicity)

In [None]:
baseline_evaluation_result = run_full_evaluation(final_result)

print("\nFull Evaluation Result for Baseline SOP:")
print(json.dumps(baseline_evaluation_result.model_dump(), indent=4))

In [None]:
class SOPGenePool:
    """A simple class to store and manage a collection of GuildSOPs and their evaluations, acting as our 'gene pool'."""
    def __init__(self):
        self.pool: List[Dict[str, Any]] = []
        self.version_counter = 0
    
    def add(self, sop: GuildSOP, eval_result: EvaluationResult, parent_version: Optional[int] = None):
        """Adds a new SOP and its evaluation result to the pool."""
        self.version_counter += 1
        entry = {
            "version": self.version_counter,
            "sop": sop,
            "evaluation": eval_result,
            "parent": parent_version
        }
        self.pool.append(entry)
        print(f"Added SOP v{self.version_counter} to the gene pool.")
    
    def get_latest_entry(self) -> Optional[Dict[str, Any]]:
        """A convenience method to retrieve the most recently added entry."""
        return self.pool[-1] if self.pool else None


class Diagnosis(BaseModel):
    """A Pydantic model for the structured output of the Diagnostician agent."""
    primary_weakness: Literal['rigor', 'compliance', 'ethics', 'feasibility', 'simplicity']
    root_cause_analysis: str = Field(description="A detailed analysis of why the weakness occured, referencing specific scores.")
    recommendation: str = Field(description="A high-level recommendation for how to modify the SOP to address the weakness.")


def performance_diagnostician(eval_result: EvaluationResult) -> Diagnosis:
    """Analyzes the 5D evaluation vector and diagnoses the primary weakness."""
    print("--- EVALUATING PERFORMANCE DIAGNOSTICIAN ---")
    diagnostician_llm = llm_config['director'].with_structured_output(Diagnosis)

    prompt = ChatPromptTemplate.from_messages([
        ("system", "You are a world-class management consultant specializing in process optimization. Your task is to analyze a performance scorecard and identify the single biggest weakness. Then, provide a root cause analysis and a strategic recommendation."),
        ("human", "Please analyze the following performance evaluation report:\n\n{report}")
    ])

    chain = prompt | diagnostician_llm
    return chain.invoke({"report": eval_result.json()})

In [None]:
class EvolvedSOPs(BaseModel):
    """A Pydantic container for a list of new, evolved GuildSOPs."""
    mutations: List[GuildSOP]

def sop_architect(diagnosis: Diagnosis, current_sop: GuildSOP) -> EvolvedSOPs:
    """Takes a diagnosis and the current SOP, and generates a list of new, mutated SOPs to test."""
    print("--- EXECUTING SOP ARCHITECT ---")
    architect_llm = llm_config['director'].with_structured_output(EvolvedSOPs)

    prompt = ChatPromptTemplate.from_messages([
        ("system", f"You are an AI process architect. Your job is to modify a process configuration (an SOP) to fix a diagnosed problem. The SOP is a JSON object with this schema: {GuildSOP.model_json_schema()}. You must return a list of 2-3 new, valid SOP JSON objects under the 'mutations' key. Propose diverse and creative mutations. For example, you can change prompts, toggle agents, change retrieval parameters, or even change the model used for a task. Only modify fields relevant to the diagnosis."),
        ("human", "Here is the current SOP:\n{current_sop}\n\nhere is the performance diagnosis:\n{diagnosis}\n\nBased on the diagnosis, please provide 2-3 new, improved SOPs.")
    ])

    chain = prompt | architect_llm
    return chain.invoke({"current_sop": current_sop.model_dump_json(), "diagnosis": diagnosis.model_dump_json()})

In [None]:
def run_evolution_cycle(gene_pool: SOPGenePool, trial_request: str):
    """Runs one full cycle of diagnosis, mutation, and re-evaluation."""
    print("\n"+"="*25+" STARTING NEW EVOLUTION CYCLE "+"="*25)

    # Step 1: Select the current best SOP to improve upon. For simplicity, we'll just take the latest one added to the pool.
    current_best_entry = gene_pool.get_latest_entry()
    parent_sop = current_best_entry['sop']
    parent_eval = current_best_entry['evaluation']
    parent_version = current_best_entry['version']
    print(f"Improving upong SOP v{parent_version}...")

    # Step 2: Diagnose the performance of the parent SOP
    diagnosis = performance_diagnostician(parent_eval)
    print(f"Diagnosis complete. Primary Weakness: '{diagnosis.primary_weakness}'. Recommendation: {diagnosis.recommendation}")

    # Step 3: Architect new SOP candidates based on the diagnosis.
    new_sop_candidates = sop_architect(diagnosis, parent_sop)
    print(f"Generated {len(new_sop_candidates.mutations)} new SOP candidates.")

    # Step 4: Evaluate each new candidate by running the full Guild graph and the evaluation gauntlet.
    for i, candidate_sop in enumerate(new_sop_candidates.mutations):
        print(f"\n--- Testing SOP candidate {i+1}/{len(new_sop_candidates.mutations)} ---")
        
        guild_input = {"initial_request": trial_request, "sop": candidate_sop}
        final_state = guild_graph.invoke(guild_input)

        eval_result = run_full_evaluation(final_state)
        gene_pool.add(sop=candidate_sop, eval_result=eval_result, parent_version=parent_version)

    print("\n"+"="*25+" EVOLUTION CYCLE COMPLETE "+"="*25)

gene_pool = SOPGenePool()
print("Initialized SOP Gene Pool.")
gene_pool.add(sop=baseline_sop, eval_result=baseline_evaluation_result)
run_evolution_cycle(gene_pool, test_request)

In [None]:
# We'll iterate through our gene pool and print a formatted summary of each entry's performance.
print("SOP Gene Pool Evaluation Summary:")
print("---------------------------------")
for entry in gene_pool.pool:
    v = entry['version']
    p = entry['parent']
    evals = entry['evaluation']
    # Extract the score from each GradedScore object.
    r, c, e, f, s = evals.rigor.score, evals.compliance.score, evals.ethics.score, evals.feasibility.score, evals.simplicity.score
    parent_str = f"(Parent)" if p is None else f"(Child of v{p})"
    print(f"SOP v{v:<2} {parent_str:<14}: Rigor={r:.2f}, Compliance={c:.2f}, Ethics={e:.2f}, Feasibility={f:.2f}, Simplicity={s:.2f}")

In [None]:
import numpy as np

def identify_pareto_front(gene_pool:SOPGenePool) -> List[Dict[str, Any]]:
    """Identifies the non-dominated solutions (the Pareto Front) in the gene pool."""
    pareto_front = []
    pool_entries = gene_pool.pool

    for i, candidate in enumerate(pool_entries):
        is_dominated = False
        cand_scores = np.array([s['score'] for s in candidate['evaluation'].dict().values()])

        for j, other in enumerate(pool_entries):
            if i == j: continue

            other_scores = np.array(s['score'] for s in other['evaluation'].dict().values())

            if np.all(other_scores >= cand_scores) and np.any(other_scores > cand_scores):
                is_dominated = True
                break

        if not is_dominated:
            pareto_front.append(candidate)
    return pareto_front

pareto_sops = identify_pareto_front(gene_pool)

print("SOPs on the Pareto Front:")
print("-------------------------")
for entry in pareto_sops:
    v = entry['version']
    evals = entry['evaluation']
    r, c, e, f, s = evals.rigor.score, evals.compliance.score, evals.ethics.score, evals.feasibility.score, eval.simplicity.score
    print(f"SOP v{v}: Rigor={r:.2f}, Compliance={c:.2f}, Ethics={e:.2f}, Feasibility={f:.2f}, Simplicity={s:.2f}")

In [None]:
import matplotlib.pyplot as plt, pandas as pd

def visualize_frontier(pareto_sops):
    """Creates a 2D scatter plot and a parallel coordinates plot to visualize the Pareto front."""
    if not pareto_sops:
        print("No SOPs on the Pareto front to visualize.")
        return
    
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(18, 7))

    # --- Plot 1: 2D Scatter Plot (Rigor vs. Feasibility) ---
    labels = [f"v{s['version']}" for s in pareto_sops]
    rigor_scores = [s['evaluation'].rigor.score for s in pareto_sops]
    feasibility_scores = [s['evaluation'].feasibility.score for s in pareto_sops]

    ax1.scatter(rigor_scores, feasibility_scores, s=200, alpha=0.7, c='blue')
    for i, txt in enumerate(labels):
        ax1.annotate(txt, (rigor_scores[i], feasibility_scores[i]), xytext=(10, -10), textcoords='offset points', fontsize=14)
    ax1.set_title('Pareto Frontier: Rigor vs. Feasibility', fontsize=16)
    ax1.set_xlabel('Scientific Rigor Score', fontsize=14)
    ax1.set_ylabel('Recruitment Feasibility Score', fontsize=14)
    ax1.grid(True, linestyle='--', alpha=0.6)
    ax1.set_xlim(min(rigor_scores)-0.05, max(rigor_scores)+0.05)
    ax1.set_ylim(min(feasibility_scores)-0.1, max(feasibility_scores)+0.1)


    # --- Plot 2: Parallel Coordinates Plot for 5D Analysis ---
    data = []
    for s in pareto_sops:
        eval_dict = s['evaluation'].dict()
        scores = {k.capitalize(): v['score'] for k, v in eval_dict.items()}
        scores['SOP Version'] = f"v{s['version']}"
        data.append(scores)
    
    df = pd.DataFrame(data)

    # The core plotting function from pandas.
    pd.plotting.parallel_coordinates(df, 'SOP Version', colormap=plt.get_cmap("viridis"), ax=ax2, axvlines_kwargs={"linewidth": 1, "color": "grey"})
    ax2.set_title('5D Performance Trade-offs on Pareto Front', fontsize=16)
    ax2.grid(True, which='major', axis='y', linestyle='--', alpha=0.6)
    ax2.set_ylabel('Normalized Score', fontsize=14)
    ax2.legend(loc='lower center', bbox_to_anchor=(0.5, -0.15), ncol=len(labels))
    plt.tight_layout()
    plt.show()

visualize_frontier(pareto_sops)