<hr style="border:30px solid Firebrick "> </hr>
<hr style="border:2px solid Firebrick "> </hr>

# Agentic Workflow Automation for Northwestern Memorial Hospital
**Author:** Atef Bader, PhD

**Last Edit:** 12/17/2024



## Goals

- Automate Call/Inquiry processing using Langgraph/Langchain with OpenAI
- Use OpenAI to route and answer user's questions directed to different departments represented by different agents for Northwestern Memorial Hospital

<hr style="border:2px solid Firebrick "> </hr>


<img src="attachment:6925f10a-1fae-4385-a348-d427e8a93cf0.png" align="center" width="500"/>


<hr style="border:5px solid orange "> </hr>


In [None]:
'''%%capture --no-stderr
%pip install uv
%uv pip install chromadb==0.4.22
%uv pip install tiktoken==0.9.0
%uv pip install langchain==0.3.20
%uv pip install langchain-community==0.3.10
%uv pip install langchain-openai==0.3.1
%uv pip install langchainhub
%uv pip install langchain-text-splitters==0.3.6
%uv pip install langgraph==0.3.1
%uv pip install openai==1.65.3
%uv pip install PyMuPDF==1.25.3
%uv pip install pypdf==5.3.1
%uv pip install pillow==11.1.0
%uv pip install beautifulsoup4==4.13.3
%uv pip install  mermaid_cli
%uv pip install grandalf'''
    
  

In [None]:
from IPython.display import Image as IPImage
from IPython.display import Image, display

from typing import TypedDict, Optional, List, Dict, Any, Annotated, Tuple, Optional, Literal
from typing_extensions import TypedDict
import operator

from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import MessagesState
from langgraph.graph import StateGraph, START, END

from langgraph.prebuilt import tools_condition, ToolNode
from langchain_core.messages import AIMessage, HumanMessage, SystemMessage, BaseMessage
from langchain_openai import ChatOpenAI

from langgraph.graph.message import add_messages

### NEW
import shutil
from langchain.agents import create_agent
from datetime import date, datetime, timedelta
import pandas as pd
from __future__ import annotations
from langchain_core.tools import tool
import os
from pathlib import Path
from dotenv import load_dotenv, find_dotenv
import csv
import re
from langchain_chroma import Chroma
from langchain_openai import OpenAIEmbeddings
import json
from langchain_core.documents import Document

load_dotenv(find_dotenv())

LANGCHAIN_API_KEY = os.getenv("LANGCHAIN_API_KEY")
LANGCHAIN_PROJECT = os.getenv("LANGCHAIN_PROJECT")
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
LANGCHAIN_TRACING_V2 = os.getenv("LANGCHAIN_TRACING_V2") == "true"

INPUT_DIR = Path.cwd() / "Input"
OUTPUT_DIR = Path.cwd() / "Output"

KB_DIR = Path.cwd() / "kb"
CARDIOLOGY_SCHEDULE_CSV = KB_DIR / "cardiology_appointment_slots.csv"
CARDIOLOGY_KB_PATH = "./kb/cardiology_kb.jsonl"

REBUILD_CHROMA = False   # <-- set to False to reuse persisted DB
#CHROMA_DIR = "./chroma_kb"
CHROMA_DIR = "./chroma_kb_rebuild" if REBUILD_CHROMA else "./chroma_kb"


print("LANGCHAIN_PROJECT:", LANGCHAIN_PROJECT)
print("LANGCHAIN_TRACING_V2:", LANGCHAIN_TRACING_V2)

In [None]:
'''import os, getpass

def _set_env(var: str):
    if not os.environ.get(var):
        os.environ[var] = getpass.getpass(f"{var}: ")

_set_env("OPENAI_API_KEY")'''

# Declare state dictionary structure

In [None]:
# Requirement 1: Define the structure of agent state for the LangGraph
class InquiryState(TypedDict):
    messages: Annotated[List[BaseMessage], add_messages]
    inquiry: str
    next_node: str
    response: Optional[str]

    # routing
    intent: Optional[str]
    active_agent: Optional[str]  # which agent currently owns the thread
    last_router_reason: Optional[str]

    # retrieval payload
    retrieved: Optional[List[Dict[str, Any]]]  # JSON-serializable so easy to log/debug. [{id, text, score, meta}, ...]
    retrieval_confidence: Optional[float]


ROUTABLE_AGENTS = {
    "ER",
    "Radiology",
    "PrimaryCare",
    "Cardiology",
    "Pediatrics",
    "BillingInsurance",
}

INTENT_MAP = {
    "greeting": "Greeting",
    "generalinquiry": "GeneralInquiry",
    "er": "ER",
    "radiology": "Radiology",
    "primarycare": "PrimaryCare",
    "cardiology": "Cardiology",
    "pediatrics": "Pediatrics",
    "billinginsurance": "BillingInsurance",
}

TransitionLabel = Literal["NEW_TOPIC", "CONTINUATION"]


def _normalize_intent(intent: Optional[str]) -> Optional[str]:
    if not intent:
        return None
    key = re.sub(r"[^a-z]", "", intent.lower())
    return INTENT_MAP.get(key, intent.strip())


def _latest_user_inquiry(state: InquiryState) -> str:
    msgs = state.get("messages") or []
    for msg in reversed(msgs):
        if isinstance(msg, HumanMessage):
            if isinstance(msg.content, str):
                return msg.content
            if isinstance(msg.content, list):
                text_parts = [part.get("text", "") for part in msg.content if isinstance(part, dict) and part.get("type") == "text"]
                return " ".join(text_parts).strip()
    return state.get("inquiry", "")


def _latest_prior_exchange(state: InquiryState) -> tuple[str, str]:
    msgs = state.get("messages") or []
    latest_user = _latest_user_inquiry(state)

    prior_ai = ""
    seen_latest_user = False
    for msg in reversed(msgs):
        if isinstance(msg, HumanMessage) and not seen_latest_user:
            seen_latest_user = True
            continue
        if seen_latest_user and isinstance(msg, AIMessage):
            if isinstance(msg.content, str):
                prior_ai = msg.content
            else:
                prior_ai = str(msg.content)
            break

    return latest_user, prior_ai


def _classify_transition(state: InquiryState, llm: ChatOpenAI) -> TransitionLabel:
    latest_user, prior_ai = _latest_prior_exchange(state)
    active_agent = state.get("active_agent") or "None"

    system = SystemMessage(
        content=(
            "You classify whether the latest user message is a continuation of the prior exchange "
            "or a clearly new topic or department request. "
            "Return exactly one token: NEW_TOPIC or CONTINUATION."
        )
    )

    human = HumanMessage(content=[{
        "type": "text",
        "text": (
            f"Active agent: {active_agent}\n"
            f"Prior assistant message: {prior_ai or '(none)'}\n"
            f"Latest user message: {latest_user}\n\n"
            "Return NEW_TOPIC or CONTINUATION only."
        ),
    }])

    raw = llm.invoke([system, human]).content.strip().upper()
    if "CONTINUATION" in raw:
        return "CONTINUATION"
    if "NEW_TOPIC" in raw:
        return "NEW_TOPIC"

    # Safe default: allow reclassification/handoff over sticky routing.
    return "NEW_TOPIC"

def _messages_to_transcript(state, max_turns: int = 6) -> str:
    """Turn the last N human/AI messages into a text transcript."""
    msgs = state.get("messages") or []

    # Keep only the last ~2*max_turns messages (human+ai pairs)
    msgs = msgs[-(2 * max_turns):]

    lines = []
    for m in msgs:
        role = "User" if isinstance(m, HumanMessage) else "Assistant"
        # Handle both str and list-style message content
        if isinstance(m.content, str):
            text = m.content
        else:
            text = str(m.content)
        lines.append(f"{role}: {text}")
    return "\n".join(lines).strip()

def _extract_agent_text(result: dict) -> str:
    """Handle both AgentExecutor-style {'output': ...} and LangGraph-style {'messages': [...]}."""
    if not isinstance(result, dict):
        return str(result)

    # 1) AgentExecutor style
    out = result.get("output")
    if isinstance(out, str) and out.strip():
        return out.strip()

    # 2) Messages style
    msgs = result.get("messages") or result.get("message") or []
    if isinstance(msgs, list) and msgs:
        # Walk backwards to find the last non-empty assistant message
        for m in reversed(msgs):
            if isinstance(m, AIMessage) and isinstance(m.content, str) and m.content.strip():
                return m.content.strip()
            # Some frameworks store messages as dicts
            if isinstance(m, dict):
                c = m.get("content")
                role = m.get("role") or m.get("type")
                if role in ("assistant", "ai") and isinstance(c, str) and c.strip():
                    return c.strip()

    # 3) Fallback: stringify
    return ""


# Creating or loading knowledge base stores

In [None]:
def load_kb_jsonl(path: str, agent_name: str) -> list[Document]:
    docs: list[Document] = []
    for line in Path(path).read_text(encoding="utf-8").splitlines():
        if not line.strip():
            continue
        row = json.loads(line)
        tags = row.get("tags", [])
        doc_text = f"Q: {row['question']}\nA: {row['answer']}\nTags: {', '.join(tags)}"
        docs.append(
            Document(
                page_content=doc_text,
                metadata={
                    "id": row["id"],
                    "agent": agent_name,
                    "tags": row.get("tags", []),
                    "question": row["question"],
                },
            )
        )
    return docs

# This creates the collection and saves it to disk (persist_directory). Next runs can just load it.
# if changing knowledge base content, delete the ./chroma_kb folder and rebuild to avoid accidentally keeping stale embeddings.

def build_or_load_chroma_collection(
    collection_name: str,
    persist_directory: str,
    documents: list[Document] | None = None,
    rebuild: bool = False,
):
    embeddings = OpenAIEmbeddings(model="text-embedding-3-small")

    store = Chroma(
        collection_name=collection_name,
        persist_directory=persist_directory,
        embedding_function=embeddings,
    )

    # Only add documents if:
    # - documents were provided
    # - and we are rebuilding OR collection is empty
    if documents:
        current_count = store._collection.count()

        if current_count == 0:
            ids = [d.metadata["id"] for d in documents]
            store.add_documents(documents, ids=ids)

    return store

In [None]:
cardio_docs = load_kb_jsonl(CARDIOLOGY_KB_PATH, agent_name="Cardiology")
cardiology_store = build_or_load_chroma_collection(
    collection_name="kb_cardiology",
    persist_directory=CHROMA_DIR,
    documents=cardio_docs,
    rebuild=REBUILD_CHROMA,   # toggle
)

In [None]:
'''embeddings_fn = OpenAIEmbeddings(model="text-embedding-3-small")

cardiology_store = Chroma(
    collection_name="kb_cardiology",
    persist_directory="./chroma_kb",
    embedding_function=embeddings_fn,
)'''

In [None]:
# Keyword overlap scorer

def _tokenize(s: str) -> set[str]:
    s = s.lower()
    s = re.sub(r"[^a-z0-9\s]", " ", s)
    return set(t for t in s.split() if len(t) > 2)

def keyword_overlap_score(query: str, text: str) -> float:
    q = _tokenize(query)
    if not q:
        return 0.0
    d = _tokenize(text)
    return len(q & d) / len(q)

### Hybrid retrieval function
grabs top k_vec from Chroma  
rescoring with a weighted mix of:  
- vector rank-based score (simple, stable across distance metrics)
- keyword overlap score  

outputs top k_final + a confidence value

In [None]:
def hybrid_retrieve(
    query: str,
    store: Chroma,
    k_vec: int = 8,
    k_final: int = 3,
    alpha: float = 0.75,   # weight on vector ranking
) -> Tuple[List[Dict[str, Any]], float]:
    """
    Returns: (retrieved_items, confidence)
    retrieved_items: [{id, text, score, meta}, ...]
    confidence: float in [0, 1] (rough heuristic)
    """
    docs = store.similarity_search(query, k=k_vec)

    # Convert to serializable items
    items = []
    for i, d in enumerate(docs):
        text = d.page_content
        meta = d.metadata or {}
        items.append({
            "id": meta.get("id", f"doc_{i}"),
            "text": text,
            "meta": meta,
            "vec_rank": i,  # 0 best
        })

    if not items:
        return [], 0.0

    # Vector rank score: best doc ~1.0, worst ~0.0
    denom = max(1, (len(items) - 1))
    for it in items:
        vec_score = 1.0 - (it["vec_rank"] / denom)
        kw_score = keyword_overlap_score(query, it["text"])
        it["score"] = alpha * vec_score + (1 - alpha) * kw_score

    items.sort(key=lambda x: x["score"], reverse=True)
    top = items[:k_final]

    # Confidence heuristic: best score + gap to 2nd
    best = top[0]["score"]
    second = top[1]["score"] if len(top) > 1 else 0.0
    confidence = max(0.0, min(1.0, best * 0.85 + (best - second) * 0.15))

    return top, confidence

### Cardiology retrieval

In [None]:
'''CARDIOLOGY_CONF_THRESHOLD = 0.62  # tune later

# retrieval
def retrieve_cardiology(state: InquiryState) -> InquiryState:
    inquiry = _latest_user_inquiry(state)
    retrieved, conf = hybrid_retrieve(inquiry, cardiology_store, k_vec=8, k_final=3)

    print("Retrieved docs:", len(retrieved))
    if retrieved:
        print("Top doc id:", retrieved[0]["id"])
        print("Top doc preview:", retrieved[0]["text"][:120])

    retrieved = "".join(
        [f"[{r['id']}]\n{r['text']}" for r in retrieved]
    )

    return {
        "inquiry": inquiry,
        "active_agent": "Cardiology",
        "retrieved": retrieved,
        "retrieval_confidence": conf,
    }'''


In [None]:
@tool
def retrieve_cardiology(query: str, k_vec: int = 8, k_final: int = 3) -> Dict[str, Any]:
    """
    Retrieve cardiology knowledge-base snippets relevant to the user query.

    Returns snippets formatted as:
      [snippet_id]
      snippet text

    The assistant should cite snippets as [snippet_id].
    """
    retrieved, conf = hybrid_retrieve(query, cardiology_store, k_vec=k_vec, k_final=k_final)

    snippets: List[Dict[str, str]] = []
    for r in (retrieved or [])[:k_final]:
        snippets.append(
            {"id": str(r.get("id", "")), "text": str(r.get("text", ""))}
        )

    retrieved_text = "\n\n".join([f"[{s['id']}]\n{s['text']}" for s in snippets])

    return {
        "query": query,
        "retrieval_confidence": conf,
        "num_snippets": len(snippets),
        "snippets": snippets,
        "retrieved_text": retrieved_text,
    }

## Cardiology get schedule implementation

In [None]:
def _parse_iso_date(s: str) -> Optional[date]:
    try:
        return datetime.strptime(s, "%Y-%m-%d").date()
    except Exception:
        return None

def _next_week_range(today: date) -> Tuple[date, date]:
    """
    'Next week' = next Monday (relative to today's week) through the following Monday (exclusive).
    """
    this_monday = today - timedelta(days=today.weekday())  # Mon of current week
    next_monday = this_monday + timedelta(days=7)
    following_monday = next_monday + timedelta(days=7)
    return next_monday, following_monday

def _extract_week_hint(question: str) -> str:
    """
    Minimal intent parser.
    Returns: "next_week" (default) or "this_week" if question says 'this week'.
    """
    q = (question or "").lower()
    if "this week" in q:
        return "this_week"
    if "next week" in q:
        return "next_week"
    # default for appointment scheduling questions like "any availability next week?"
    return "next_week"

def _this_week_range(today: date) -> Tuple[date, date]:
    this_monday = today - timedelta(days=today.weekday())
    next_monday = this_monday + timedelta(days=7)
    return this_monday, next_monday

def get_cardiology_schedule(
    question: str,
    csv_path: str,
    today: Optional[str] = None,
    department: str = "Cardiology",
    only_available: bool = True,
    max_results: int = 12,
) -> Dict[str, Any]:
    """
    Look up appointment slot availability in a CSV schedule and return matching slots.

    Use this tool when a user asks about appointment availability (e.g.:
    "Do you have any available appointments with a cardiologist next week?").

    Args:
      question: The user question.
      csv_path: Path to the schedule CSV.
      today: Optional override for today's date in YYYY-MM-DD (helps testing).
      department: Department to filter on (default Cardiology).
      only_available: If True, returns AVAILABLE slots only.
      max_results: Max slots to return (keeps tool output small).

    Returns:
      A dictionary containing the interpreted date range, counts, and a list of slots.
    """
    if not csv_path:
        raise ValueError("csv_path is required.")

    if not os.path.exists(csv_path):
        raise FileNotFoundError(f"Schedule CSV not found at: {csv_path}")

    # Determine "today"
    if today:
        today_d = _parse_iso_date(today)
        if not today_d:
            raise ValueError("today must be in YYYY-MM-DD format")
    else:
        today_d = date.today()

    # Determine time window
    window = _extract_week_hint(question)
    if window == "this_week":
        start_d, end_d = _this_week_range(today_d)
    else:
        start_d, end_d = _next_week_range(today_d)

    df = pd.read_csv(csv_path)

    # Validate columns
    required = {"date", "department", "slot_start_time_local", "slot_status"}
    missing = required - set(df.columns)
    if missing:
        raise ValueError(f"CSV missing required columns: {sorted(missing)}")

    # Normalize and filter
    df["date_parsed"] = pd.to_datetime(df["date"], errors="coerce").dt.date
    df = df[df["date_parsed"].notna()]
    df = df[(df["date_parsed"] >= start_d) & (df["date_parsed"] < end_d)]

    if department:
        df = df[df["department"].astype(str).str.lower() == department.lower()]

    if only_available:
        df = df[df["slot_status"].astype(str).str.upper() == "AVAILABLE"]

    # Sort by date/time
    df["slot_time_norm"] = df["slot_start_time_local"].astype(str)
    df = df.sort_values(["date_parsed", "slot_time_norm"], ascending=True)

    # Convert to compact records
    slots: List[Dict[str, Any]] = []
    for _, r in df.head(max_results).iterrows():
        slots.append(
            {
                "date": str(r.get("date", "")),
                "day_of_week": str(r.get("day_of_week", "")),
                "start_time": str(r.get("slot_start_time_local", "")),
                "duration_minutes": int(r.get("slot_duration_minutes", 0) or 0),
                "appointment_type": str(r.get("appointment_type", "")),
                "provider_name": str(r.get("provider_name", "")),
                "location": str(r.get("location", "")),
                "status": str(r.get("slot_status", "")),
            }
        )

    return {
        "tool": "get_schedule",
        "question": question,
        "interpreted": {
            "today": today_d.isoformat(),
            "time_window": window,
            "range_start": start_d.isoformat(),
            "range_end_exclusive": end_d.isoformat(),
            "department": department,
            "only_available": only_available,
        },
        "results": {
            "total_matching_slots": int(df.shape[0]),
            "returned_slots": len(slots),
            "slots": slots,
        },
    }

In [None]:
@tool
def get_cardiology_schedule_tool(question: str, today: str | None = None) -> dict:
    """
    Get AVAILABLE cardiology appointment slots for next week from the hospital schedule CSV.

    Args:
      question: The userâ€™s question about scheduling/availability.
      today: Optional override in YYYY-MM-DD (useful for testing).
    Returns:
      Dict with interpreted date range + matching slots.
    """
    # Assumes you already defined `get_schedule(question, csv_path, today, ...)`
    return get_cardiology_schedule(
        question=question,
        csv_path=CARDIOLOGY_SCHEDULE_CSV,
        today=today,
        department="Cardiology",
        only_available=True,
        max_results=12,
    )

### small test

In [None]:
print(get_cardiology_schedule_tool.invoke({
    "question": "Do you have any available appointments with a cardiologist next week?",
    "today": "2026-02-27",
}))

In [None]:


llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)

cardiology_agent = create_agent(model = llm, 
                                tools = [retrieve_cardiology, get_cardiology_schedule_tool], 
                                system_prompt=("You are the Cardiology assistant for a hospital call center.\n"
                                                "Answer the user's inquiry using ONLY the information that you get from your tools.\n"
                                                "If the answer is not explicitly supported by the context from those tools, say you don't have enough information.\n"
                                                "When you use information from retrieve_cardiology, cite it like [snippet_id].\n"
                                                "If the user asks about scheduling/availability, call get_cardiology_schedule_tool.\n"
                                                "When answering scheduling questions, summarize the earliest few available slots and ask what day/time the caller prefers.\n"
                                                ),
                                )

def cardiology_node(state):

    msgs = [m for m in (state.get("messages") or []) if isinstance(m, (HumanMessage, AIMessage))]
    msgs = msgs[-12:]  # keep recent context only

    result = cardiology_agent.invoke({"messages": msgs})

    resp = _extract_agent_text(result)

    if not resp:
        resp = "(Debug: agent returned no assistant text.)"

    final_response = "Cardiology:: " + resp

    return {
        "active_agent": "Cardiology",
        "response": final_response,
        "next_node": END,
        "messages": [AIMessage(content=final_response)],
    }

In [None]:
def operator_router(state):

    llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
    inquiry = _latest_user_inquiry(state)
    active_agent = _normalize_intent(state.get("active_agent"))

    query = f"""Classify the user's intents based on the following input: '{inquiry}'.
            List of possible intent values: Greeting, GeneralInquiry, ER, Radiology, PrimaryCare, Cardiology, Pediatrics, BillingInsurance
            Return only the intent value of the inquiry identified with no extra text or characters"""

    human_message = HumanMessage(
        content=[
            {"type": "text", "text": query},
        ],
    )

    system_message = SystemMessage(content="You are a helpful assistant tasked with classifying the intent of user's inquiry")

    response = llm.invoke([system_message] + [human_message])
    classified_intent = _normalize_intent(response.content.strip())

    transition: Optional[TransitionLabel] = None
    if active_agent in ROUTABLE_AGENTS:
        transition = _classify_transition(state, llm)

    # Department routing wins, even during continuation.
    if classified_intent in ROUTABLE_AGENTS:
        if active_agent in ROUTABLE_AGENTS and classified_intent != active_agent:
            reason = "continuation_but_department_handoff" if transition == "CONTINUATION" else "new_topic_department_handoff"
        elif active_agent in ROUTABLE_AGENTS and classified_intent == active_agent and transition == "CONTINUATION":
            reason = "continuation_to_active_agent"
        else:
            reason = "fresh_intent_classification"

        return {
            "inquiry": inquiry,
            "intent": classified_intent,
            "active_agent": classified_intent,
            "last_router_reason": reason,
            "next_node": classified_intent,
            "response": None,
        }

    # If user is clearly continuing with an active department, do not let
    # Greeting/GeneralInquiry steal the turn from the owning agent.
    if (
        active_agent in ROUTABLE_AGENTS
        and transition == "CONTINUATION"
        and classified_intent in {"Greeting", "GeneralInquiry"}
    ):
        return {
            "inquiry": inquiry,
            "intent": active_agent,
            "active_agent": active_agent,
            "last_router_reason": f"continuation_overrode_{classified_intent.lower()}",
            "next_node": active_agent,
            "response": None,
        }

    if classified_intent == "Greeting":
        greeting = "Hello there, This is Northwestern Memorial Hospital, How can I assist you today?"
        return {
            "inquiry": inquiry,
            "intent": "Greeting",
            "active_agent": None,
            "last_router_reason": "fresh_intent_classification",
            "next_node": END,
            "response": greeting,
            "messages": [AIMessage(content=greeting)],
        }

    if classified_intent == "GeneralInquiry":
        general_response = "For general information about nearby parking, hotels and restaurants, please visit https://www.nm.org/ and navigate to Patients & Visitors link "
        return {
            "inquiry": inquiry,
            "intent": "GeneralInquiry",
            "active_agent": None,
            "last_router_reason": "fresh_intent_classification",
            "next_node": END,
            "response": general_response,
            "messages": [AIMessage(content=general_response)],
        }

    # If intent is unmapped but transition says continuation, keep owner.
    if active_agent in ROUTABLE_AGENTS and transition == "CONTINUATION":
        return {
            "inquiry": inquiry,
            "intent": active_agent,
            "active_agent": active_agent,
            "last_router_reason": "continuation_with_unmapped_intent",
            "next_node": active_agent,
            "response": None,
        }

    fallback_response = "I could not determine the right department. Could you share a bit more detail about your request?"
    return {
        **state,
        "inquiry": inquiry,
        "intent": None,
        "active_agent": None,
        "last_router_reason": "unmapped_intent",
        "next_node": END,
        "response": fallback_response,
        "messages": [AIMessage(content=fallback_response)],
    }


In [None]:
def er_agent(state):

    knowledge_base = """

    "inquiry": "Should I go to the ER or urgent care?",
    "response": "Go to the ER for chest pain, stroke symptoms, severe injuries, heavy bleeding, or difficulty breathing. Urgent care is appropriate for minor injuries or mild illnesses.",

    "inquiry": "What should I bring to the ER?",
    "response": "Bring a photo ID, insurance card, list of medications, allergies, and any relevant medical history if available.",

    "inquiry": "How long is the wait time?",
    "response": "Patients are treated based on medical urgency. Critical cases are seen first, so wait times vary.",

    "inquiry": "Will I be admitted to the hospital?",
    "response": "Admission depends on your diagnosis and condition. The ER physician will determine if inpatient care is required.",

    "inquiry": "Can someone stay with me in the ER?",
    "response": "Visitor policies depend on hospital guidelines and patient condition. Check with staff upon arrival."

    """

    print("ER KNOWLEDGE-BASE IS EMPTY")
    final_response = "ER: YOU NEED TO ADD-YOUR-KNOWLEDGE-BASE"
    return {
        "inquiry": _latest_user_inquiry(state),
        "active_agent": "ER",
        "next_node": END,
        "response": final_response,
        "messages": [AIMessage(content=final_response)],
    }


In [None]:
def radiology_agent(state):

    radiology_knowledge_base = """

    "inquiry": "How do I prepare for a CT scan?",
    "response": "Follow any fasting instructions provided. Inform staff about allergies, especially to contrast dye, and disclose pregnancy if applicable.",

    "inquiry": "Is radiation from X-rays safe?",
    "response": "X-rays use low levels of radiation and are generally safe. Technicians take precautions to minimize exposure.",

    "inquiry": "Do I need contrast for my MRI?",
    "response": "Some MRIs require contrast to improve image clarity. Your provider will determine if it is necessary.",

    "inquiry": "How long does imaging take?",
    "response": "Most X-rays take 10?15 minutes, while CT or MRI scans may take 30?60 minutes depending on the study.",

    "inquiry": "How will I receive my results?",
    "response": "Results are reviewed by a radiologist and sent to your ordering provider, who will discuss findings with you."

    """

    print("Radiology KNOWLEDGE-BASE IS EMPTY")
    final_response = "Radiology: YOU NEED TO ADD-YOUR-KNOWLEDGE-BASE"
    return {
        "inquiry": _latest_user_inquiry(state),
        "active_agent": "Radiology",
        "next_node": END,
        "response": final_response,
        "messages": [AIMessage(content=final_response)],
    }


In [None]:
def primary_care_agent(state):

    knowledge_base = """

    "inquiry": "How often should I schedule a physical exam?",
    "response": "Adults should have a routine physical annually or as recommended based on age and health conditions.",

    "inquiry": "Can I get lab work done during my visit?",
    "response": "Yes, many routine labs can be performed in-office or ordered through an affiliated laboratory.",

    "inquiry": "Do you provide vaccinations?",
    "response": "Yes, we offer routine adult immunizations including flu, COVID-19, tetanus, and other recommended vaccines.",

    "inquiry": "How do I request a specialist referral?",
    "response": "Discuss your symptoms with your Primary Care provider, who can evaluate and issue a referral if needed.",

    "inquiry": "Can I discuss multiple concerns in one appointment?",
    "response": "Yes, but complex issues may require additional appointments to ensure adequate time for evaluation."

    """

    print("Primary Care KNOWLEDGE-BASE IS EMPTY")
    final_response = "Primary Care: YOU NEED TO ADD-YOUR-KNOWLEDGE-BASE"
    return {
        "inquiry": _latest_user_inquiry(state),
        "active_agent": "PrimaryCare",
        "next_node": END,
        "response": final_response,
        "messages": [AIMessage(content=final_response)],
    }


In [None]:
def pediatrics_agent(state):

    pediatrics_knowledge_base = """

    "inquiry": "Is my child?s cough or cold serious?",
    "response": "Most colds are viral and resolve within 7?10 days. Seek care if there is high fever, breathing difficulty, or worsening symptoms.",

    "inquiry": "What vaccines does my child need?",
    "response": "Vaccinations follow CDC-recommended schedules based on age. We can review your child?s immunization record during the visit.",

    "inquiry": "What is the correct medication dose for my child?",
    "response": "Medication dosing depends on weight and age. Always follow provider instructions and avoid adult medications unless directed.",

    "inquiry": "Are developmental milestones on track?",
    "response": "We assess growth and developmental milestones at well-child visits and address any concerns early.",

    "inquiry": "When should I take my child to the ER?",
    "response": "Go to the ER for difficulty breathing, seizures, severe dehydration, uncontrolled fever in infants, or serious injury."

    """

    print("Pediatrics KNOWLEDGE-BASE IS EMPTY")
    final_response = "Pediatrics: YOU NEED TO ADD-YOUR-KNOWLEDGE-BASE."
    return {
        "inquiry": _latest_user_inquiry(state),
        "active_agent": "Pediatrics",
        "next_node": END,
        "response": final_response,
        "messages": [AIMessage(content=final_response)],
    }


In [None]:
def billing_agent(state):

    knowledge_base = """

    "inquiry": "Is my visit covered by insurance?",
    "response": "Coverage depends on your specific plan. Contact your insurer or our billing department to verify benefits.",

    "inquiry": "What is a deductible and copay?",
    "response": "A copay is a fixed fee paid at the time of service. A deductible is the amount you pay before insurance begins covering costs.",

    "inquiry": "Why did I receive multiple bills?",
    "response": "You may receive separate bills for facility fees, provider services, or laboratory tests.",

    "inquiry": "How do I update my insurance information?",
    "response": "Provide updated insurance details through the patient portal or contact our billing office directly.",

    "inquiry": "What payment options are available?",
    "response": "We accept credit cards, checks, online payments, and offer payment plans for qualifying balances."

    """

    print("BillingInsurance KNOWLEDGE-BASE IS EMPTY")
    final_response = "BillingInsurance: YOU NEED TO ADD-YOUR-KNOWLEDGE-BASE"
    return {
        "inquiry": _latest_user_inquiry(state),
        "active_agent": "BillingInsurance",
        "next_node": END,
        "response": final_response,
        "messages": [AIMessage(content=final_response)],
    }


In [None]:
builder = StateGraph(InquiryState)

builder.add_node("Operator", operator_router)
builder.add_node("ER", er_agent)
builder.add_node("Radiology", radiology_agent)
builder.add_node("PrimaryCare", primary_care_agent)
builder.add_node("Cardiology", cardiology_node)
builder.add_node("Pediatrics", pediatrics_agent)
builder.add_node("BillingInsurance", billing_agent)

builder.set_entry_point("Operator")

builder.add_conditional_edges(
    "Operator",
    lambda x: x["next_node"],
    {
        "ER": "ER",
        "PrimaryCare": "PrimaryCare",
        "Pediatrics": "Pediatrics",
        "Radiology": "Radiology",
        "Cardiology": "Cardiology", # route Cardiology intent to the retrieve node
        "BillingInsurance": "BillingInsurance",
        END: END
    }
)

for node in ["ER", "Radiology", "PrimaryCare", "Cardiology", "Pediatrics", "BillingInsurance"]:
    builder.add_edge(node, END)

memory_checkpointer = MemorySaver()

graph = builder.compile(checkpointer=memory_checkpointer)

In [None]:
display(Image(graph.get_graph().draw_mermaid_png()))


In [None]:
# Sample inquiries
# My child has a fever
# I need help with my medical bill
# Can I visit my friend in the ER?
# Do I need to fast for my scan?
# I want to schedule my cardiology appointment
# I want to see my doctor for my annual exam

thread_id = input("Thread ID (use same ID to continue a conversation): ").strip() or "default-thread"
config = {"configurable": {"thread_id": thread_id}}

while True:
    user_input = input("User: ")
    if user_input.lower() in {"q", "quit"}:
        print("Goodbye!")
        break

    result = graph.invoke({"messages": [HumanMessage(content=user_input)]}, config=config)

    assistant_message = next(
        (m for m in reversed(result.get("messages", [])) if isinstance(m, AIMessage)),
        None,
    )
    response = assistant_message.content if assistant_message else result.get("response", "No Response Returned")
    active_agent = result.get("active_agent")
    last_router_reason = result.get("last_router_reason")
    print(
        f"\nUser: {user_input}\n"
        f"Assistant: {response}\n"
        f"state['active_agent']: {active_agent}\n"
        f"state['last_router_reason']: {last_router_reason}\n"
    )

<br><br><br>

<hr style="border:30px solid coral "> </hr>
<hr style="border:2px solid coral "> </hr>


# Requirements Specification:

<hr style="border:2px solid coral "> </hr>


### Implementation Requirements:

Provide runs that will demonstrate a fully functional application for every case listed below:
1. The knowledge base for every agent
    - Knowledge Base can be generated by any GenAI model (ChatGPT, Gemini, Claude, etc.)
    - Knowledge Base can be stored in any data structure, file, or vector database
2. Multiturn conversation with every agent (For example, A person called Cardialogy Department asking for cause of their pain then decided to schedule an appointment to see cardialogist)
3. Transactions like booking an appointment or making a payment can be stored in any data structure (DataFrame, Array, List, Dictionary, ...), or file (CSV, JSON, Plaintext)
4. Your Agents must be able to answer EVERY question/inquiry listed below:
    - **ER (Emergency Room)**
        - When should I visit the ER instead of urgent care?
        - How long will I wait to be seen in the ER?
    - **Radiology**
        - How should I prepare for my MRI or CT scan?
        - When and how will I receive my imaging results?
    - **Primary Care**
        - How do I schedule or cancel an appointment?
        - Can I get a same-day visit for urgent issues?
    - **Cardiology**
        - What are common signs that I need to see a cardiologist?
        - What should I expect during a heart stress test?
    - **Pediatrics**
        - What vaccines does my child need at each age?
        - What should I do if my child develops a high fever?
    - **Billing & Insurance**
        - What insurance plans do you accept?
        - How can I view, understand, or pay my bill?
5. My name is Ashley Smith and I want to know the amount I owe you so I can pay it now using my CC.
6. My name is Johnatan Walter , I have an appointment with my doctor scheduled for Tuesday next week at 1:00pm and I want to change it to Thursday morning next week, whaat time slots are available on Thursday?
