<a href="https://colab.research.google.com/github/dipanjanS/mastering-intelligent-agents-langgraph-workshop-dhs2025/blob/main/Module-6-Deploying-Monitoring-and-Evaluating-Agentic-AI-Systems/M6LC1_Build_and_Deploy_a_Healthcare_Utilization_Review_AI_Agent_with_LangGraph_and_FastAPI.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Build and Deploy a Healthcare Utilization Review Agent with LangGraph & FastAPI

This notebook walks you through building and running a **Healthcare Utilization Review** agent using **LangGraph** and exposing it via a **FastAPI** web service. You’ll deploy the API inside the notebook (Uvicorn in a background thread) and call it from the same notebook with `requests`.

![](https://i.imgur.com/ThwlmGc.png)

**What you’ll get**
- A LangGraph agent (System Prompt + LLM) that calls domain tools:
  - `fetch_patient_record`, `match_guideline`, `check_guideline_validity`, `recommend_care_plan`
- A FastAPI service with health & patient endpoints plus a review endpoint (JSON), and a streaming review endpoint
- In-memory demo data: `patient_records`, `medical_guidelines`, `care_recommendations`
- Client cells that hit the API with `requests.get(...)` and `requests.post(...)`

**Endpoints**
- `GET /health` – service status
- `GET /patients` – list patient IDs
- `GET /patient/{id}` – patient details
- `POST /review/invoke` – run a full review and return JSON
- `POST /review/stream` – stream review progress and tokens (SSE), if enabled

**Review JSON output (non-streaming)**
- `patient_id`
- `decision` (`APPROVED` or `NEEDS REVIEW`)
- `reasoning`
- `care_recommendation`
- `processing_time` (seconds)

**Architecture (at a glance)**
Client (`requests`) → **FastAPI** → **Agent (System Prompt + LLM)** → **Tools** → **In-memory Data** → API Response

> Tip: The API is started in a background thread so you can test endpoints from the same notebook. Set `OPENAI_API_KEY` before running.


## Install Dependencies

Install the required packages for the agent, tools, and API. Run this **once** (rerun if the runtime resets).


In [None]:
!pip install fastapi==0.116.1 uvicorn==0.35.0 langchain==0.3.27 langchain-community==0.3.27 langchain-openai==0.3.30 langgraph==0.6.5 --quiet

## Configure API Keys & Environment

Set your OpenAI API key for `ChatOpenAI` and set it in the environment

In [None]:
import os
import getpass

# OpenAI API Key (for chat & embeddings)
if not os.environ.get("OPENAI_API_KEY"):
    os.environ["OPENAI_API_KEY"] = getpass.getpass("Enter your OpenAI API key (https://platform.openai.com/account/api-keys):\n")


## Imports

In [None]:
import os
import json
import time
import asyncio
import threading
import requests
from typing import Annotated, List, Dict, Any, AsyncGenerator
from typing_extensions import TypedDict
from contextlib import asynccontextmanager

# FastAPI imports
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
import uvicorn

# LangGraph and LangChain imports
from langgraph.graph.message import add_messages
from langgraph.graph import StateGraph, START, END
from langgraph.prebuilt import ToolNode, tools_condition
from langchain_openai import ChatOpenAI
from langchain_core.tools import tool
from langchain_core.messages import HumanMessage, AIMessage

## Sample Data (Guidelines, Care Plans, Patient Records)

This notebook uses **in-memory** Python lists to keep the demo self-contained.  
You can later swap these out for a database or API without changing the agent’s logic.

### What’s included
- **`medical_guidelines`** — evidence-style rules the agent can match against.
  - Fields: `procedure`, `diagnosis`, `required_symptoms` (list), `notes` (free text).
  - Example:
    ```python
    {
      "procedure": "CT Abdomen",
      "diagnosis": "Suspected Appendicitis",
      "required_symptoms": ["abdominal pain", "nausea", "RLQ tenderness"],
      "notes": "CT imaging justified if appendicitis is unclear."
    }
    ```
- **`care_recommendations`** — next-step suggestions keyed by diagnosis.
  - Fields: `diagnosis`, `next_step`.
  - Example:
    ```python
    {
      "diagnosis": "Suspected Appendicitis",
      "next_step": "Do CT to confirm and refer for surgery if positive."
    }
    ```
- **`patient_records`** — small synthetic chart notes for testing.
  - Fields: `patient_id`, `age`, `sex`, `symptoms` (list), `diagnosis`, `procedure`, `notes`.
  - Example:
    ```python
    {
      "patient_id": "P101",
      "age": 38,
      "sex": "Male",
      "symptoms": ["abdominal pain", "nausea"],
      "diagnosis": "Possible early appendicitis",
      "procedure": "CT Abdomen",
      "notes": "Mild abdominal pain and nausea but no localized tenderness or rebound noted."
    }
    ```

### How the agent uses these
- **Guideline matching**: `procedure` + `diagnosis` → pick the closest entry in `medical_guidelines`.
- **Validity check**: compare `patient_records[*].symptoms` vs. `required_symptoms` and read `notes`.
- **Care plan**: map `diagnosis` → `care_recommendations[*].next_step`.

In [None]:
medical_guidelines = [
    {"procedure": "MRI Brain", "diagnosis": "Migraine", "required_symptoms": ["headache", "nausea"],
     "notes": "MRI not recommended unless neurological deficits or red flags present."},
    {"procedure": "CT Chest", "diagnosis": "Suspected Pulmonary Embolism", "required_symptoms": ["chest pain", "shortness of breath", "tachycardia"],
     "notes": "CTPA appropriate for high probability PE cases with positive D-dimer."},
    {"procedure": "MRI Lumbar Spine", "diagnosis": "Chronic Low Back Pain", "required_symptoms": ["back pain > 6 weeks", "neurological deficit"],
     "notes": "MRI only if pain persists despite conservative therapy and neuro signs are present."},
    {"procedure": "CT Chest", "diagnosis": "Community-Acquired Pneumonia", "required_symptoms": ["fever", "cough"],
     "notes": "CT Chest reserved for inconclusive X-rays or immunocompromised patients."},
    {"procedure": "CT Abdomen", "diagnosis": "Suspected Appendicitis", "required_symptoms": ["abdominal pain", "nausea", "RLQ tenderness"],
     "notes": "CT imaging justified if appendicitis is unclear."}
]

care_recommendations = [
    {"diagnosis": "Migraine", "next_step": "Start migraine treatment; imaging not necessary unless red flags appear."},
    {"diagnosis": "Suspected Pulmonary Embolism", "next_step": "Begin anticoagulation and confirm with CTPA."},
    {"diagnosis": "Chronic Low Back Pain", "next_step": "Refer to physiotherapy; MRI only if neuro symptoms persist."},
    {"diagnosis": "Community-Acquired Pneumonia", "next_step": "Start empirical antibiotics; reserve CT for poor responders."},
    {"diagnosis": "Suspected Appendicitis", "next_step": "Do CT to confirm and refer for surgery if positive."}
]

patient_records = [
    {"patient_id": "P101", "age": 38, "sex": "Male", "symptoms": ["abdominal pain", "nausea"],
     "diagnosis": "Possible early appendicitis", "procedure": "CT Abdomen",
     "notes": "Mild abdominal pain and nausea but no localized tenderness or rebound noted."},
    {"patient_id": "P102", "age": 65, "sex": "Female", "symptoms": ["chest pain", "shortness of breath", "tachycardia"],
     "diagnosis": "Clinical suspicion of PE", "procedure": "CT Chest",
     "notes": "Wells score high probability; D-dimer positive."},
    {"patient_id": "P103", "age": 30, "sex": "Female", "symptoms": ["recurrent headache"],
     "diagnosis": "Classic migraine presentation", "procedure": "MRI Brain",
     "notes": "No neuro signs or red flags. Typical migraine pattern."},
    {"patient_id": "P104", "age": 45, "sex": "Male", "symptoms": ["back pain > 6 weeks", "neurological deficit"],
     "diagnosis": "Chronic Low Back Pain", "procedure": "MRI Lumbar Spine",
     "notes": "Persistent low back pain with left leg numbness; unresponsive to physiotherapy."},
    {"patient_id": "P105", "age": 70, "sex": "Female", "symptoms": ["fever", "cough"],
     "diagnosis": "Community-Acquired Pneumonia", "procedure": "CT Chest",
     "notes": "Initial chest X-ray inconclusive; patient is immunocompromised with underlying COPD."}
]


## Tools for the Utilization Review Agent

These are **LangChain tools** (decorated with `@tool`) that the agent can call during a review.  
They encapsulate domain logic and return **small, structured dicts** the agent can reason over.

### Summary of tools

| Tool | Purpose | Inputs | Output keys |
|---|---|---|---|
| `fetch_patient_record` | Retrieve and summarize a patient chart from in-memory data | `patient_id: str` | `patient_summary` _(str)_, or `error` |
| `match_guideline` | Pick the closest clinical guideline for a (procedure, diagnosis) pair using the LLM | `procedure: str`, `diagnosis: str` | `matched_guideline` _(str)_ |
| `check_guideline_validity` | Validate whether patient symptoms/notes meet the guideline’s criteria | `symptoms: list[str]`, `required_symptoms: list[str]`, `notes: str` | `validity_result` _(str)_ |
| `recommend_care_plan` | Suggest next steps for the given diagnosis | `diagnosis: str` | `recommendation` _(str)_ |

> All LLM-backed tools use `ChatOpenAI` (temperature = 0, streaming enabled in code) and return **concise textual justifications** under a single key.

### Typical call order used by the agent
1. `fetch_patient_record(patient_id)` → summarize context  
2. `match_guideline(procedure, diagnosis)` → find best-fit rule  
3. `check_guideline_validity(symptoms, required_symptoms, notes)` → approve vs. needs review  
4. `recommend_care_plan(diagnosis)` → action steps / alternatives

### Example outputs (shape)
```json
// fetch_patient_record
{ "patient_summary": "Patient ID: P102\nAge: 65, Sex: Female\nReported Symptoms: chest pain, shortness of breath, tachycardia\nPreliminary Diagnosis: Clinical suspicion of PE\nRequested Procedure: CT Chest\nClinical Notes: Wells score high probability; D-dimer positive." }

// match_guideline
{ "matched_guideline": "CTPA is appropriate for high-probability PE with positive D-dimer. Required symptoms: chest pain, shortness of breath, tachycardia. Caveats: ensure renal function adequate for contrast." }

// check_guideline_validity
{ "validity_result": "Criteria met: symptoms align and notes indicate high probability (Wells) with positive D-dimer. Imaging is medically necessary." }

// recommend_care_plan
{ "recommendation": "Begin anticoagulation and confirm with CTPA; monitor hemodynamics; consider risk stratification." }


In [None]:
@tool
def fetch_patient_record(patient_id: str) -> dict:
    """
    Fetches and summarizes a patient record based on the given patient ID.
    Returns a short summary string.
    """
    for record in patient_records:
        if record["patient_id"] == patient_id:
            summary = (
                f"Patient ID: {record['patient_id']}\n"
                f"Age: {record['age']}, Sex: {record['sex']}\n"
                f"Reported Symptoms: {', '.join(record['symptoms'])}\n"
                f"Preliminary Diagnosis: {record['diagnosis']}\n"
                f"Requested Procedure: {record['procedure']}\n"
                f"Clinical Notes: {record['notes']}"
            )
            return {"patient_summary": summary}
    return {"error": "Patient record not found."}

@tool
def match_guideline(procedure: str, diagnosis: str) -> dict:
    """Match a given procedure and diagnosis to the most relevant clinical guideline."""
    llm = ChatOpenAI(model="gpt-4o-mini", temperature=0, streaming=True)
    context = "\n".join([
        f"{i+1}. Procedure: {g['procedure']}, Diagnosis: {g['diagnosis']}, Required Symptoms: {g['required_symptoms']}, Notes: {g['notes']}"
        for i, g in enumerate(medical_guidelines)])

    prompt = f"""You are a clinical reviewer assessing whether a requested medical procedure aligns with existing evidence-based guidelines.

Instructions:
- Analyze the patient's procedure and diagnosis.
- Compare against the list of provided clinical guidelines.
- Select the guideline that best fits the case by reasoning on the common matches considering procedure and diagnosis.
- If none match, respond: "No appropriate guideline found for this case."
- If a match is found, summarize the matching guideline clearly including any required symptoms or caveats.

Patient Case:
- Procedure: {procedure}
- Diagnosis: {diagnosis}

Available Guidelines:
{context}
"""
    result = llm.invoke(prompt).content
    return {"matched_guideline": result}

@tool
def check_guideline_validity(symptoms: list, required_symptoms: list, notes: str) -> dict:
    """Determine whether the patient's symptoms and notes satisfy the guideline criteria."""
    llm = ChatOpenAI(model="gpt-4o-mini", temperature=0, streaming=True)
    prompt = f"""You are validating a medical procedure request based on documented symptoms and clinical context.

Instructions:
- Assess whether the patient's symptoms and notes fulfill the required guideline criteria.
- Consider nuances or indirect references (e.g. "long flight" implies immobility).
- Provide a reasoned judgment if the procedure is medically necessary.
- If it does not qualify, explain exactly which criteria are unmet.

Input:
- Patient Symptoms: {symptoms}
- Required Symptoms from Guideline: {required_symptoms}
- Clinical Notes: {notes}
"""
    result = llm.invoke(prompt).content
    return {"validity_result": result}

@tool
def recommend_care_plan(diagnosis: str) -> dict:
    """Recommend a follow-up care plan based on a given diagnosis."""
    llm = ChatOpenAI(model="gpt-4o-mini", temperature=0, streaming=True)
    options = "\n".join([
        f"{i+1}. Diagnosis: {c['diagnosis']}, Recommendation: {c['next_step']}"
        for i, c in enumerate(care_recommendations)])

    prompt = f"""You are a clinical support assistant suggesting appropriate next steps for a given medical diagnosis.

Instructions:
- Analyze the given diagnosis.
- Choose the closest match from the list of known recommendations.
- Explain why the match is appropriate.
- If no suitable recommendation is found, return: "No care recommendation found for this diagnosis."

Diagnosis Provided:
{diagnosis}

Available Recommendations:
{options}
"""
    result = llm.invoke(prompt).content
    return {"recommendation": result}


## LangGraph Agent Setup

This section wires up a **tool-using ReAct agent** with LangGraph. The agent reads a system prompt, plans calls to our tools, and produces a strict final summary (Decision, Reasoning, Care).

### Components
- **System Prompt** — defines the reviewer role and enforces the final **bullet-format** output.
- **State** — a `TypedDict` with a `messages` array aggregated by `add_messages`.
- **LLM (ChatOpenAI)** — initialized with `streaming=True` and **bound** to tools so it can call them.
- **Graph** — `StateGraph` with:
  - `agent` node: decides what to do next (call tools or finish).
  - `tools` node: executes tool calls (`ToolNode`).
  - Edges: `START → agent → (tools?) → agent → END` via `tools_condition`.


![](https://i.imgur.com/s9hSJ6l.png)

In [None]:
AGENT_SYS_PROMPT = """
You are a senior medical review assistant responsible for evaluating healthcare procedure requests.

You must call relevant tools to do the following:
1. Retrieve the full patient record using the patient ID.
2. Match the requested procedure and diagnosis to clinical guidelines.
3. Validate the match by comparing the patient's symptoms and notes to the guideline's requirements.
4. Recommend the appropriate next steps based on the diagnosis.
5. Output a final summary based on the guidelines given below.

Analyze all the results from the tool calls before making the final decision

Your final response should ONLY include the following bullets in the exact format specified:

- Final Decision: [APPROVED/NEEDS REVIEW]
- Decision Reasoning: [What criteria matched or did not match]
- Care recommendation or alternative steps: [care plan steps to take or alternative steps if it needs review]

Do NOT add any other extra content in the final response
"""

class State(TypedDict):
    messages: Annotated[list, add_messages]

llm = ChatOpenAI(model="gpt-4o-mini", temperature=0, streaming=True)
tools = [fetch_patient_record, match_guideline, check_guideline_validity, recommend_care_plan]
llm_with_tools = llm.bind_tools(tools)

def tool_calling_llm(state: State) -> State:
    """Handle reasoning and planning using the LLM (tool-call capable)."""
    current_state = state["messages"]
    state_with_instructions = [AGENT_SYS_PROMPT] + current_state
    response = [llm_with_tools.invoke(state_with_instructions)]
    return {"messages": response}

def create_utilization_review_agent():
    builder = StateGraph(State)
    builder.add_node("agent", tool_calling_llm)
    builder.add_node("tools", ToolNode(tools=tools))
    builder.add_edge(START, "agent")
    builder.add_conditional_edges("agent", tools_condition, ["tools", END])
    builder.add_edge("tools", "agent")
    return builder.compile()

utilization_review_agent = create_utilization_review_agent()

In [None]:
utilization_review_agent

## Test Agent Locally Directly

Use the compiled LangGraph agent **without** the API to run some utilization reviews

### Direct Invocation

In [None]:
prompt = "Review patient P101 for procedure justification."
result = utilization_review_agent.invoke({"messages": [("user", prompt)]},
                                         {"recursion_limit": 25})

In [None]:
print(result["messages"][-1].content)

### Live Streaming

In [None]:
prompt = "Review patient P101 for procedure justification."
for event in utilization_review_agent.stream({"messages": [("user", prompt)]},
                                             {"recursion_limit": 25},
                                             stream_mode='values'):
    event['messages'][-1].pretty_print()

## Wrap AI Agent in a FastAPI Web Service (API)

This cell initializes the **FastAPI** application and its lifecycle:

- **Lifespan manager** (`@asynccontextmanager`) runs once on startup and once on shutdown, printing clear logs so you know when the API is ready and when it stops. This replaces older `@app.on_event("startup"/"shutdown")` patterns.
- **App metadata** (`title`, `description`, `version`) populates the interactive docs (`/docs`) and helps with observability.
- **CORS middleware** is enabled with permissive settings (`*`) so your **notebook client** (and other origins) can call the API without browser/CORS issues.

> This cell only **creates and configures** the app; it does **not** start the server.

Routes are added in later cells, and the server is launched via Uvicorn in a background thread so you can test endpoints from the same notebook.


In [None]:
@asynccontextmanager
async def lifespan(app: FastAPI):
    # Startup
    print("Healthcare Utilization Review AI Agent API Starting...")
    print("AI Agent API initialized and ready")
    yield
    # Shutdown
    print("Healthcare Utilization Review AI Agent API Shutting down...")

app = FastAPI(
    title="Healthcare Utilization Review AI Agent API",
    description="AI-powered utilization review system",
    version="1.0.0",
    lifespan=lifespan,
)

app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)


## Define Request/Response Schema and API Endpoints

This cell declares the **Pydantic models** used for validation/typing and wires up the **HTTP routes** that your client calls.

### Schemas
- **`ReviewRequest`** — body for non-streaming review  
  - `patient_id: str`
- **`StreamingReviewRequest`** — body for streaming review  
  - `patient_id: str` · `include_tool_calls: bool = True`
- **`ReviewResponse`** — final structured review output (non-streaming)  
  - `patient_id, decision, reasoning, care_recommendation, processing_time: float`
- **`HealthResponse`** — health check payload  
  - `status, timestamp, version`

### Endpoints
- **GET `/health`** → returns `HealthResponse` (service heartbeat).
- **GET `/patients`** → `{ patients: [ids], count }` from in-memory data.
- **GET `/patient/{patient_id}`** → full record for a specific patient; `404` if not found.
- **POST `/review/invoke`** → runs the agent to completion and returns a single `ReviewResponse`.  
  - Internally: builds a prompt → `agent.invoke(...)` → parses the strict 3-bullet output into `decision`, `reasoning`, `care_recommendation`, and computes `processing_time`.
- **POST `/review/stream`** → streams the live review as **SSE** frames (one line per event).  
  - Emits JSON lines prefixed with `data: ` and ends with `data: [DONE]`.
  - Event types you’ll see:
    - `{"status": "starting", ...}` — initial notice.
    - `{"type": "tool_start", "tool": "<name>", "message": "..."}` — a tool began.
    - `{"type": "tool_end", "tool": "<name>", "message": "...", "summary": "<truncated output>"}` — tool finished.
    - `{"type": "token", "content": "<partial text>"} ` — incremental LLM text.
    - `{"type": "complete", "message": "Review completed", "full_response": "<full text>"}`
    - Final sentinel: `[DONE]`
  - Your client reads with `response.iter_lines(decode_unicode=True)`, processes each `data: ...` JSON, and **stops on `[DONE]`**.

> Notes  
> • The streaming response uses Server-Sent Events (SSE) semantics; for browser `EventSource` you’d typically set `media_type="text/event-stream"` (this demo uses a plain-text stream that works well with `requests`).  
> • The non-streaming endpoint is ideal for simple automations; the streaming endpoint is better for live progress UIs/logs.


In [None]:
# API Request/Response Schemas
class ReviewRequest(BaseModel):
    patient_id: str

class StreamingReviewRequest(BaseModel):
    patient_id: str
    include_tool_calls: bool = True

class ReviewResponse(BaseModel):
    patient_id: str
    decision: str
    reasoning: str
    care_recommendation: str
    processing_time: float

class HealthResponse(BaseModel):
    status: str
    timestamp: str
    version: str

# API endpoint to check API health
@app.get("/health", response_model=HealthResponse)
async def health_check():
    return HealthResponse(
        status="✅ API is Healthy",
        timestamp=time.strftime("%Y-%m-%d %H:%M:%S"),
        version="1.0.0",
    )

# API endpoint to get list of all patients
@app.get("/patients")
async def list_patients():
    return {
        "patients": [record["patient_id"] for record in patient_records],
        "count": len(patient_records),
    }

# API endpoint to get details of a specific patient
@app.get("/patient/{patient_id}")
async def get_patient_details(patient_id: str):
    for record in patient_records:
        if record["patient_id"] == patient_id:
            return record
    raise HTTPException(status_code=404, detail=f"Patient {patient_id} not found")

# API endpoint to run utilization review on a patient - returns response after all processing is complete
@app.post("/review/invoke", response_model=ReviewResponse)
async def review_patient(request: ReviewRequest):
    try:
        start_time = time.time()
        patient_exists = any(r["patient_id"] == request.patient_id for r in patient_records)
        if not patient_exists:
            raise HTTPException(status_code=404, detail=f"Patient {request.patient_id} not found")
        # call agent in invoke mode and get final response after agent completes processing everything
        prompt = f"Review patient {request.patient_id} for procedure justification."
        result = utilization_review_agent.invoke({"messages": [("user", prompt)]}, {"recursion_limit": 150})
        final_response = result["messages"][-1].content

        # Just for final formatting as per data schema
        # you can also just return the above content as response
        decision = reasoning = care_recommendation = ""
        for line in final_response.strip().split("\n"):
            if line.startswith("- Final Decision:"):
                decision = line.replace("- Final Decision:", "").strip()
            elif line.startswith("- Decision Reasoning:"):
                reasoning = line.replace("- Decision Reasoning:", "").strip()
            elif line.startswith("- Care recommendation or alternative steps:"):
                care_recommendation = line.replace("- Care recommendation or alternative steps:", "").strip()

        processing_time = time.time() - start_time
        return ReviewResponse(
            patient_id=request.patient_id,
            decision=decision,
            reasoning=reasoning,
            care_recommendation=care_recommendation,
            processing_time=round(processing_time, 2),
        )
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"Error processing review: {str(e)}")

# API endpoint to run utilization review on a patient - live streams intermediate processing and final results
@app.post("/review/stream")
async def review_stream_sse(request: StreamingReviewRequest):
    """Stream the medical review process using Server-Sent Events (SSE)."""
    # SSE is a communication mechanism where the server sends live updates to the client.
    # The client initiates the connection, and the server streams data to the client over the same HTTP connection.

    async def generate_sse() -> AsyncGenerator[str, None]:
        try:
            # Validate patient exists
            patient_exists = any(r["patient_id"] == request.patient_id for r in patient_records)
            if not patient_exists:
                yield f"data: {json.dumps({'error': f'Patient {request.patient_id} not found'})}\n\n"
                return

            # Send initial status
            yield f"data: {json.dumps({'status': 'starting', 'message': f'Starting review for patient {request.patient_id}'})}\n\n"

            prompt = f"Review patient {request.patient_id} for procedure justification."

            current_tool = None
            collected_content = ""

            # calls agent in async streaming mode to get live tokens as they are generated
            # generates an async iterator of events in the agent which can be streamed live
            async for event in utilization_review_agent.astream_events(
                {"messages": [("user", prompt)]},
                version="v1",
                config={"recursion_limit": 25}
            ):
                event_type = event["event"]
                event_data = event.get("data", {})
                # Stream and Show that tool call event has started
                if event_type == "on_tool_start":
                    tool_name = event.get("name", "unknown_tool")
                    current_tool = tool_name
                    if request.include_tool_calls:
                        yield f"data: {json.dumps({'type': 'tool_start', 'tool': tool_name, 'message': f'Executing {tool_name}...'})}\n\n"
                # Stream and Show that tool call event has ended
                elif event_type == "on_tool_end":
                    if request.include_tool_calls and current_tool:
                        # Get the output safely
                        output = event_data.get("output", {})
                        if isinstance(output, dict):
                            # Summarize tool output (could even use llm summarizer optionally)
                            summary = str(output)[:200] + "..." if len(str(output)) > 200 else str(output)
                        else:
                            summary = str(output)[:200] + "..." if len(str(output)) > 200 else str(output)

                        yield f"data: {json.dumps({'type': 'tool_end', 'tool': current_tool, 'message': f'Completed {current_tool}', 'summary': summary})}\n\n"
                    current_tool = None
                # Stream and show content tokens from Agent (LLMs or Tool Calls)
                elif event_type == "on_chat_model_stream":
                    chunk = event_data.get("chunk")
                    if chunk and hasattr(chunk, 'content') and chunk.content:
                        content = chunk.content
                        collected_content += content
                        yield f"data: {json.dumps({'type': 'token', 'content': content})}\n\n"

            # Send final completion status
            yield f"data: {json.dumps({'type': 'complete', 'message': 'Review completed', 'full_response': collected_content})}\n\n"
            yield "data: [DONE]\n\n"

        except Exception as e:
            yield f"data: {json.dumps({'error': f'Streaming error: {str(e)}'})}\n\n"

    return StreamingResponse(
        generate_sse(),
        media_type="text/plain", # for browser (JS apis) you typically use media_type="text/event-stream
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "Content-Type": "text/plain",
        }
    )


## Deploy API on the Server

This cell launches the FastAPI app **inside the notebook** by starting Uvicorn in a **background daemon thread**:

- **`run_server_in_background(port=...)`**  
  - Spawns `uvicorn.run(app, host="127.0.0.1", port=port)` in a separate thread so you can keep using the notebook.  
  - `time.sleep(5)` gives Uvicorn a moment to bind the port before you start sending requests.  
  - Returns the thread handle (`server_thread`) for reference.

- **Local-only bind**  
  - The server binds to `127.0.0.1` (loopback) which is ideal for notebook testing.  
  - Change to `0.0.0.0` only if you intentionally want external access (plus add auth and tighten CORS).

- **Stopping the server**  
  - In most notebook environments, daemon threads are not easily stopped mid-session; **restart the kernel** to fully stop the server, or avoid starting multiple instances on the same port.

- **Production tip**  
  - Package the app and run with a proper process manager, e.g.  
    - `uvicorn app:app --host 0.0.0.0 --port 8000`  
    - or `gunicorn -k uvicorn.workers.UvicornWorker app:app`  
  - Add authentication, structured logging, and restrict CORS to trusted origins.
  - OR you would package the agent code, API code and this startup code in a python file and then you would deploy it,
  e.g.,
  ```bash
  /content# nohup python server.py &
  ```

In [None]:
def run_server_in_background(port: int = 8000):
    """Run the FastAPI server in a background thread (useful for Colab)."""
    # In reality you would package the agent code, API code and this startup code in a python file
    # and then you would deploy it e.g nohup python server.py
    def run_server():
        uvicorn.run(app, host="127.0.0.1", port=port, log_level="info")
    thread = threading.Thread(target=run_server, daemon=True)
    thread.start()
    time.sleep(5)  # wait for startup
    return thread

In [None]:
server_thread = run_server_in_background(port=8010)
print("Server running at http://127.0.0.1:8010")

## Test Server API on your Client

Use Python’s `requests` from the **same notebook** to call and use the API you just deployed.

In [None]:
API_PORT=8010
API_URL = f"http://127.0.0.1:{API_PORT}"

### Check API Health

In [None]:
try:
    r = requests.get(f"{API_URL}/health", timeout=30)
    print("Health:", r.status_code, r.json())
except Exception as e:
    print("Health check failed:", e)

### Get all patients in the DB

In [None]:
try:
    r = requests.get(f"{API_URL}/patients", timeout=30)
    print("Patients:", r.json())
except Exception as e:
    print("List patients failed:", e)

### Get specific patient record details

In [None]:
try:
    r = requests.get(f"{API_URL}/patient/P101", timeout=10)
    print("Patient P101:")
    if r.status_code == 200:
        d = r.json()
        print("  ->", d)
except Exception as e:
    print("Patient details failed:", e)

### Run Utilization Review on patients - Invoke Mode

In [None]:
from IPython.display import JSON, display

for pid in ["P101", "P102"]:
    print('-'*25)
    try:
        r = requests.post(f"{API_URL}/review/invoke", json={"patient_id": pid}, timeout=60)
        print(f"Review {pid}:")
        if r.status_code == 200:
            display(JSON(r.json()))
        else:
            print(r.text)
    except Exception as e:
        print(f"Review {pid} failed:", e)
    print('-'*25)

### Run Utilization Review on patients - Live Streaming Mode

#### Get streaming response formatting utility functions

In [None]:
!gdown 1dSyjcjlFoZpYEqv4P9Oi0-kU2gIoolMB

#### 1. Simple Formatting of Streaming Results

In [None]:
from agent_utils import format_streaming_results_simple

patient_id = "P104"
print("=" * 80)
print(f"🏥 MEDICAL REVIEW AGENT - STREAMING ANALYSIS")
print("=" * 80)
print(f"📋 Patient ID: {patient_id}")
print(f"🕐 Started at: {time.strftime('%Y-%m-%d %H:%M:%S')}")
print("=" * 80)

try:
    response = requests.post(
        f"{API_URL}/review/stream",
        json={"patient_id": patient_id, "include_tool_calls": True},
        stream=True,
        timeout=120
    )

    if response.status_code == 200:
        print("✅ Connection established - Starting review...\n")
        for line in response.iter_lines(decode_unicode=True):
            if line and line.startswith('data: '):
                # get the streaming data
                data_part = line[6:] # removes data: prefix
                # check if agent has finished processing
                if data_part == "[DONE]":
                    print("\n" + "=" * 80)
                    print("✅ REVIEW COMPLETED SUCCESSFULLY")
                    print("=" * 80)
                    break
                # stream and show agent tokens live
                format_streaming_results_simple(data_part)

    else:
        print(f"❌ FAILED TO CONNECT")
        print(f"Status Code: {response.status_code}")
        print(f"Response: {response.text}")
        print("=" * 80)

except Exception as e:
    print(f"\n❌ STREAMING ERROR: {e}")
    print("=" * 80)

#### 2. Detailed Formatting of Streaming Results

In [None]:
from agent_utils import format_streaming_results_detailed

patient_id = "P104"
print("=" * 80)
print(f"🏥 MEDICAL REVIEW AGENT - STREAMING ANALYSIS")
print("=" * 80)
print(f"📋 Patient ID: {patient_id}")
print(f"🕐 Started at: {time.strftime('%Y-%m-%d %H:%M:%S')}")
print("=" * 80)

try:
    response = requests.post(
        f"{API_URL}/review/stream",
        json={"patient_id": patient_id, "include_tool_calls": True},
        stream=True,
        timeout=120
    )

    if response.status_code == 200:
        print("✅ Connection established - Starting review...\n")
        state = {"tool_counter": 0, "current_section": None}
        for line in response.iter_lines(decode_unicode=True):
            if line and line.startswith('data: '):
                # get the streaming data
                data_part = line[6:] # removes data: prefix
                # check if agent has finished processing
                if data_part == "[DONE]":
                    print("\n" + "=" * 80)
                    print("✅ REVIEW COMPLETED SUCCESSFULLY")
                    print("=" * 80)
                    break
                # stream and show agent tokens live
                format_streaming_results_detailed(data_part, state)

    else:
        print(f"❌ FAILED TO CONNECT")
        print(f"Status Code: {response.status_code}")
        print(f"Response: {response.text}")
        print("=" * 80)

except Exception as e:
    print(f"\n❌ STREAMING ERROR: {e}")
    print("=" * 80)