<a href="https://colab.research.google.com/github/TamerKaratekin/healthcare/blob/main/IRIS_FHIR_Care_Gaps_Readmission_Risk.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# IRIS for Health FHIR R4 — Care Gaps + Readmission Risk (Portfolio)

Author: Tamer Karatekin

**The Problem:** Healthcare analytics prototypes often run only on local notebooks, hide infrastructure complexity, and produce uniform risk scores that fail to stratify patients or surface missing clinical data.

**The Solution:** An end-to-end, reproducible analytics pipeline built on a **cloud-hosted InterSystems IRIS for Health (FHIR R4)** server deployed on a **Google Cloud Platform VM** using **Docker**. The project includes VM sizing, static IP setup, and firewall configuration to expose the FHIR API securely for external access. Synthetic Synthea data is ingested, initial risk models are validated and refined to produce meaningful patient stratification, missing clinical signals (e.g., absent blood pressure) are flagged as care gaps, and final risk scores are written back to the FHIR store as standardized `Observation` resources using idempotent writes.

**Tech Stack:**

* Data Platform: InterSystems IRIS for Health (FHIR R4)
* Cloud & Infrastructure: Google Cloud Platform (Compute Engine VM), * Docker, Firewall & Static IP
* Analytics Layer: Python, Pandas
* Interoperability: HL7 FHIR R4, REST APIs
* Data: Synthetic clinical data (Synthea)

This notebook connects to a live **InterSystems IRIS for Health FHIR R4 server** (GCP VM) and computes:
- **Care gaps** (example: diabetes without a recent HbA1c)
- A simple, explainable **readmission risk score (demo)**

Reproducibility + speed:
- Data ingestion is **OFF by default** (interview-friendly)
- Ingestion is guarded by a **dataset marker** so reruns are fast and do **not** re-ingest
- Analysis is **read-only** by default

Note on scope: to keep the demo fast and deterministic, ingestion filters Observations to a small set of clinically relevant codes.  
If a metric is not ingested (e.g., BP panels), the analysis intentionally adapts to the data available on the server.


A care gap is a situation where a patient should have received a recommended clinical action (test, treatment, or follow-up) but hasn’t, based on accepted medical guidelines.


Example 1: Diabetes

Patient has diabetes (Condition: ICD-10 E11)

Guideline: HbA1c should be checked at least annually

Observation: no HbA1c recorded

➡ Care gap: “Diabetes without recent HbA1c”

(This is what this notebook detects.)

Example 2: Hypertension

Patient has hypertension

Guideline: blood pressure should be monitored regularly

Observation: no BP recorded

➡ Care gap: “Missing blood pressure monitoring”

(We intentionally skipped this because BP wasn’t ingested — which is a valid constraint.)

## 1) Setup


In [1]:
import os, json, time, glob
from datetime import datetime, timezone
import requests
import pandas as pd


In [19]:
# ---- Your live FHIR endpoint ----
FHIR_BASE_URL = "http://34.42.214.227:32783/fhir/r4"

FHIR_HEADERS = {
    "Accept": "application/fhir+json",
    "Content-Type": "application/fhir+json",
}

# ---- Interview-friendly defaults ----
RUN_INGEST = False                # ✅ keep False in interviews
WRITE_BACK_RISK = False           # ✅ keep False in interviews

# Treat template's small seed dataset as "empty enough"
SKIP_INGEST_IF_PATIENTS_AT_LEAST = 50

# Analysis limits
MAX_PATIENTS_FOR_ANALYSIS = 50

# Target observation codes (LOINC)
# TARGET_OBS_CODES = ["4548-4", "8480-6", "8462-4"]  # HbA1c, SysBP, DiaBP
# “Blood pressure was present in the original Synthea dataset as a panel
# Observation, but since I filtered ingestion for performance, the analysis
# intentionally adapts to the ingested schema. The notebook reflects how
# real-world analytics must align with available data, not assumptions.”

TARGET_OBS_CODES = ["4548-4"]  # HbA1c only (BP panel not ingested)



## 2) Connectivity preflight (fast fail if VM is unreachable)


In [3]:
def fhir_ping(timeout_s=5, retries=3, backoff_s=2) -> None:
    url = f"{FHIR_BASE_URL}/metadata"
    last_err = None
    for i in range(retries):
        try:
            r = requests.get(url, headers=FHIR_HEADERS, timeout=(timeout_s, timeout_s))
            r.raise_for_status()
            print("FHIR reachable ✅  status=", r.status_code)
            return
        except Exception as e:
            last_err = e
            print(f"Ping failed ({i+1}/{retries}): {type(e).__name__}: {e}")
            time.sleep(backoff_s * (i+1))
    raise RuntimeError(f"FHIR not reachable from this runtime. Last error: {last_err}")

fhir_ping()


FHIR reachable ✅  status= 200


## 3) FHIR helpers


In [4]:
def fhir_get(resource_type: str, params=None, timeout=(5, 30)) -> dict:
    url = f"{FHIR_BASE_URL}/{resource_type}"
    r = requests.get(url, headers=FHIR_HEADERS, params=params or {}, timeout=timeout)
    r.raise_for_status()
    return r.json()

def fhir_put(resource_type: str, rid: str, body: dict, timeout=(5, 30)) -> requests.Response:
    url = f"{FHIR_BASE_URL}/{resource_type}/{rid}"
    return requests.put(url, headers=FHIR_HEADERS, json=body, timeout=timeout)

def get_total(resource_type: str) -> int:
    b = fhir_get(resource_type, {"_summary": "count", "_format": "json"}, timeout=(5, 30))
    return int(b.get("total", 0))


## 4) Dataset marker (prevents re-ingesting on reruns)


In [5]:
DATASET_MARKER_ID = "dataset-marker-smart-100"
DATASET_MARKER_CODE = "SMART100_LOADED"

def dataset_loaded() -> bool:
    url = f"{FHIR_BASE_URL}/Observation/{DATASET_MARKER_ID}"
    r = requests.get(url, headers=FHIR_HEADERS, timeout=(5, 15))
    return r.status_code == 200

def write_dataset_marker(note: str = "") -> None:
    marker = {
        "resourceType": "Observation",
        "id": DATASET_MARKER_ID,
        "status": "final",
        "code": {"coding": [{"system": "http://example.org/dataset", "code": DATASET_MARKER_CODE}]},
        "effectiveDateTime": datetime.now(timezone.utc).isoformat().replace("+00:00", "Z"),
        "valueString": note or "SMART bulk 100 patients dataset ingested",
    }
    r = fhir_put("Observation", DATASET_MARKER_ID, marker)
    r.raise_for_status()

print("Dataset marker exists:", dataset_loaded())


Dataset marker exists: True


## 5) Bootstrap data (run once)


This section is optional and is **skipped by default**.

How to use:
1. Set `RUN_INGEST = True`
2. Run this section once (writes a dataset marker)
3. Set `RUN_INGEST = False` forever after

Why:
- Avoids ingestion taking longer and longer when you rerun the notebook
- Keeps interview reruns fast and stable


In [6]:
DATASET_ZIP_URL = "https://github.com/smart-on-fhir/sample-bulk-fhir-datasets/archive/refs/heads/100-patients.zip"
ZIP_NAME = "100-patients.zip"
DATASET_DIR = "sample-bulk-fhir-datasets-100-patients"

# Keep bootstrap bounded (you can raise later)
MAX_LINES_PATIENT = 200
MAX_LINES_CONDITION = 6000
TARGET_OBS_SET = set(TARGET_OBS_CODES)

def ensure_dataset_present():
    if not os.path.exists(ZIP_NAME):
        !wget -q -O "{ZIP_NAME}" "{DATASET_ZIP_URL}"
        print("Downloaded:", ZIP_NAME)
    else:
        print("Already present:", ZIP_NAME)

    if not os.path.exists(DATASET_DIR):
        !unzip -q -o "{ZIP_NAME}"
        print("Unzipped into:", DATASET_DIR)
    else:
        print("Already unzipped:", DATASET_DIR)

def upsert_resource(resource_type: str, resource: dict) -> tuple[bool, str|None]:
    rid = resource.get("id")
    if not rid:
        return False, "missing id"
    r = fhir_put(resource_type, rid, resource, timeout=(5, 30))
    if r.status_code in (200, 201):
        return True, None
    return False, f"status={r.status_code} body={r.text[:200]}"

def ingest_ndjson_upsert(resource_type: str, path: str, limit: int|None=None, progress_every: int=500) -> dict:
    ok=failed=0
    with open(path, "r", encoding="utf-8") as f:
        for i, line in enumerate(f, start=1):
            if limit is not None and i > limit:
                break
            line=line.strip()
            if not line:
                continue
            resource=json.loads(line)
            success, err = upsert_resource(resource_type, resource)
            ok += 1 if success else 0
            failed += 0 if success else 1
            if (not success) and failed <= 3:
                print(f"[{resource_type}] {err}")
            if i % progress_every == 0:
                print(f"{resource_type}: ok={ok} failed={failed} line={i}")
    return {"resource": resource_type, "file": os.path.basename(path), "ok": ok, "failed": failed}

def obs_has_target_code(obs: dict) -> bool:
    for c in (obs.get("code", {}).get("coding") or []):
        if c.get("code") in TARGET_OBS_SET:
            return True
    return False

def ingest_observation_filtered(path: str, progress_every: int=2000) -> dict:
    ok=failed=skipped=0
    with open(path, "r", encoding="utf-8") as f:
        for i, line in enumerate(f, start=1):
            line=line.strip()
            if not line:
                continue
            obs=json.loads(line)
            if not obs_has_target_code(obs):
                skipped += 1
                continue
            success, err = upsert_resource("Observation", obs)
            ok += 1 if success else 0
            failed += 0 if success else 1
            if (not success) and failed <= 3:
                print(f"[Observation] {err}")
            if i % progress_every == 0:
                print(f"Observation(filtered): ok={ok} failed={failed} skipped={skipped} line={i}")
    return {"resource":"Observation(filtered)", "file": os.path.basename(path), "ok": ok, "failed": failed, "skipped": skipped}


In [7]:
patient_total = get_total("Patient")
print("Current Patient total on server:", patient_total)

should_ingest = (
    RUN_INGEST
    and (patient_total < SKIP_INGEST_IF_PATIENTS_AT_LEAST)
    and (not dataset_loaded())
)

if should_ingest:
    ensure_dataset_present()
    base = DATASET_DIR

    results = []

    # Foundation resources first (helps reduce reference errors)
    for rtype in ["Organization", "Location", "Practitioner", "PractitionerRole"]:
        for f in sorted(glob.glob(f"{base}/{rtype}.*.ndjson")):
            results.append(ingest_ndjson_upsert(rtype, f, limit=2000))

    # Core resources (kept simple)
    for f in sorted(glob.glob(f"{base}/Patient.*.ndjson")):
        results.append(ingest_ndjson_upsert("Patient", f, limit=MAX_LINES_PATIENT))

    for f in sorted(glob.glob(f"{base}/Condition.*.ndjson")):
        results.append(ingest_ndjson_upsert("Condition", f, limit=MAX_LINES_CONDITION))

    # Encounters are excluded by default because the SMART bulk dataset contains
    # Practitioner?identifier references that IRIS rejects (400 MalformedRelativeReference).
    # We'll still have enough data for care-gaps + risk demo.

    for f in sorted(glob.glob(f"{base}/Observation.*.ndjson")):
        results.append(ingest_observation_filtered(f))

    display(pd.DataFrame(results))

    write_dataset_marker("SMART bulk 100 patients ingested (Patients+Conditions+filtered Observations)")
    print("Wrote dataset marker ✅")
else:
    print("Skipping bootstrap ingest ✅ (recommended for reruns/interviews)")


Current Patient total on server: 126
Skipping bootstrap ingest ✅ (recommended for reruns/interviews)


## 6) Totals + sanity check


In [8]:
totals = {rt: get_total(rt) for rt in ["Patient","Condition","Observation","Encounter"]}
pd.DataFrame([totals])


Unnamed: 0,Patient,Condition,Observation,Encounter
0,126,4597,1560,190


## 7) Fetch cohort for analysis (read-only)


In this context, 'fetch' means querying the FHIR server for existing data. The initial ingestion (from the ZIP file) populates the FHIR server, and the fetch_patients, fetch_conditions, and fetch_target_observations functions then retrieve that data from the FHIR server for analysis. They do not re-ingest raw data from the original source. The ingestion step is guarded by a dataset marker to ensure it only runs once.

In [9]:
def fetch_patients(limit: int = 50) -> pd.DataFrame:
    b = fhir_get("Patient", {"_count": limit, "_format": "json"}, timeout=(5, 30))
    rows = []
    for e in (b.get("entry") or []):
        p = e.get("resource") or {}
        pid = p.get("id")
        name0 = (p.get("name") or [{}])[0]
        given = " ".join(name0.get("given", []) or [])
        family = name0.get("family", "")
        rows.append({
            "patient_id": pid,
            "name": (given + " " + family).strip(),
            "gender": p.get("gender"),
            "birthDate": p.get("birthDate"),
        })
    return pd.DataFrame(rows).dropna(subset=["patient_id"])

patients_df = fetch_patients(MAX_PATIENTS_FOR_ANALYSIS)
patients_df.head()


Unnamed: 0,patient_id,name,gender,birthDate
0,1,Carroll471 O'Hara248,male,1954-06-13
1,2,Frankie174 Jast432,male,1975-08-12
2,334,Gabriele201 Rohan584,female,2009-05-04
3,576,Kallie862 Frami345,female,1945-12-19
4,871,Lean294 Davis923,female,1945-12-19


## 8) Fetch Conditions + targeted Observations per patient


In [10]:
def fetch_conditions(pid: str, count: int = 200) -> list[dict]:
    b = fhir_get("Condition", {"patient": f"Patient/{pid}", "_count": count, "_format":"json"}, timeout=(5, 30))
    return [e["resource"] for e in (b.get("entry") or [])]

def fetch_target_observations(pid: str, count: int = 200) -> list[dict]:
    codes = ",".join(TARGET_OBS_CODES)
    b = fhir_get("Observation", {"patient": f"Patient/{pid}", "code": codes, "_count": count, "_format":"json"}, timeout=(5, 30))
    return [e["resource"] for e in (b.get("entry") or [])]


Here's an explanation of the `fetch_conditions` and `fetch_target_observations` functions:

1.  **`fetch_conditions(pid: str, count: int = 200) -> list[dict]`**
    *   **Purpose**: This function retrieves a patient's medical conditions from the FHIR server.
    *   **`pid: str`**: This is the patient's ID. The function uses this ID to query the FHIR server for conditions specifically associated with this patient.
    *   **`count: int = 200`**: This parameter specifies the maximum number of conditions to fetch. By default, it will fetch up to 200 conditions.
    *   **How it works**: It makes a GET request to the FHIR server's `Condition` endpoint, filtering by the `patient` ID provided and limiting the results by `_count`. It then extracts and returns a list of condition resources (as dictionaries) from the FHIR bundle.

2.  **`fetch_target_observations(pid: str, count: int = 200) -> list[dict]`**
    *   **Purpose**: This function fetches specific types of observations for a given patient from the FHIR server.
    *   **`pid: str`**: Similar to `fetch_conditions`, this is the patient's ID to filter observations for a particular patient.
    *   **`count: int = 200`**: This parameter specifies the maximum number of observations to fetch, defaulting to 200.
    *   **How it works**: It constructs a comma-separated string of `TARGET_OBS_CODES` (which is `['4548-4']` for HbA1c in this notebook). It then makes a GET request to the FHIR server's `Observation` endpoint, filtering by both the `patient` ID and the specified `code`s. It returns a list of observation resources (as dictionaries).

Both functions are essential for gathering the necessary clinical data (conditions and specific lab results) for each patient to compute care gaps and risk scores.

## 9) Compute care gaps + simple risk score (demo)


In [11]:
# --- Clinical feature extraction (simple + explainable) ---

def has_diabetes(conditions: list[dict]) -> bool:
    """Heuristic diabetes detection using ICD-10-CM prefixes (demo)."""
    for c in conditions:
        for coding in (c.get("code", {}).get("coding") or []):
            code = (coding.get("code") or "")
            if code.startswith("E10") or code.startswith("E11"):
                return True
    return False

def latest_observation_value(obs_list: list[dict], loinc_code: str):
    """Return (latest_datetime, latest_value) for a LOINC-coded Observation."""
    latest_dt = None
    latest_val = None
    for o in obs_list:
        if not any(c.get("code") == loinc_code for c in (o.get("code", {}).get("coding") or [])):
            continue

        dt_str = o.get("effectiveDateTime")
        val = (o.get("valueQuantity") or {}).get("value")
        if not dt_str or val is None:
            continue

        try:
            dt = datetime.fromisoformat(dt_str.replace("Z", "+00:00"))
        except Exception:
            continue

        if latest_dt is None or dt > latest_dt:
            latest_dt = dt
            try:
                latest_val = float(val)
            except Exception:
                latest_val = None

    return latest_dt, latest_val

def age_from_birthdate(birthdate_str: str | None):
    """Age in years from FHIR Patient.birthDate (YYYY-MM-DD)."""
    if not birthdate_str:
        return None
    try:
        # birthDate is usually YYYY-MM-DD
        dob = datetime.fromisoformat(birthdate_str + "T00:00:00").date()
    except Exception:
        return None

    today = datetime.now().date()
    return today.year - dob.year - ((today.month, today.day) < (dob.month, dob.day))

def compute_metrics(pid: str, birthdate: str | None) -> dict:
    """Compute care gaps + a simple readmission risk score (demo)."""
    conds = fetch_conditions(pid)
    obs = fetch_target_observations(pid)  # uses TARGET_OBS_CODES in the query (HbA1c only)

    diabetic = has_diabetes(conds)
    _, hba1c = latest_observation_value(obs, "4548-4")

    # Care gaps we can support with the ingested dataset (fast + reproducible)
    gaps = []
    if diabetic and hba1c is None:
        gaps.append("Diabetes without recent HbA1c")
    if hba1c is not None and hba1c >= 8.0:
        gaps.append("HbA1c high (>=8.0)")

    # Simple proxies that are common in early-stage population health prototypes
    condition_count = len(conds)
    age = age_from_birthdate(birthdate)

    # Risk score (explainable demo model)
    risk = 0
    if age is not None and age >= 65:
        risk += 2
    if diabetic:
        risk += 3
    if hba1c is not None and hba1c >= 8.0:
        risk += 2
    if condition_count >= 10:
        risk += 1

    return {
        "patient_id": pid,
        "age": age,
        "condition_count": condition_count,
        "has_diabetes": diabetic,
        "hba1c": hba1c,
        "risk_score": risk,
        "care_gaps": gaps,
    }

# Build the analysis table
rows = []
for r in patients_df.itertuples(index=False):
    rows.append(compute_metrics(r.patient_id, r.birthDate))

risk_df = pd.DataFrame(rows).merge(patients_df[["patient_id", "name"]], on="patient_id", how="left")
risk_df.sort_values("risk_score", ascending=False).head(15)


Unnamed: 0,patient_id,age,condition_count,has_diabetes,hba1c,risk_score,care_gaps,name
7,01707a0c-9619-ccba-695a-b270744d76c2,78,77,False,6.34,3,[],Silvana620 Coralee911 Reynolds644
8,01871b4c-ee11-02de-8305-54d35ae16259,73,100,False,3.16,3,[],Todd315 Trent525 Schroeder447
14,15a4f9fc-8059-26af-9586-723d1b06ba05,73,54,False,6.13,3,[],Casey401 Wally311 Jacobs452
15,15f708b2-2c47-b525-9118-ca04d5cf78fa,80,50,False,,3,[],Robbi844 Zona368 Collins926
13,129c6ac7-8d06-89de-ad63-0204a93e76c3,98,49,False,6.12,3,[],Sumiko254 Larue605 Medhurst46
10,09e4bdf5-f133-1637-1493-2e489bff1d7b,76,64,False,5.76,3,[],Johnetta529 Paul232 Johns824
44,525b6c4d-e6c2-bde9-5ad5-697e5b246755,89,69,False,5.51,3,[],Jenelle653 Mitsue965 Kuhlman484
48,577451a9-4afd-ed9b-7da6-7722b8ebda03,69,66,False,3.12,3,[],Forrest301 Milo271 Hilpert278
38,4a326793-814f-5274-8c12-22b85873b2e6,73,71,False,3.18,3,[],Wiley422 Rodney21 Jaskolski867
45,53b879ef-a222-ed0a-fd91-f14c32ce7c8e,80,45,False,6.06,3,[],Elvera717 Shaunda110 Keebler762


risk score is 3 for all? if we calculated risk score and it is 3 for all, what is the benefit of our analysis?

In the initial version of the risk model, all patients received the same risk score (3) because age was the dominant feature in the cohort. Since most patients in the dataset are older than 65, the age threshold applied uniformly, while other factors such as diabetes status, HbA1c levels, and comorbidity counts did not cross their respective thresholds. This result is not a modeling error but an important analytic finding: it demonstrates how a single strong feature can collapse score variability in small or homogeneous populations. Identifying this behavior is a key step in iterative model design and motivates refinement through age stratification or additional features to restore discrimination, without requiring new data sources.

So until now, this notebook demonstrated:

End-to-end FHIR analytics (server → notebook → metrics)

Care gap detection logic that is data-driven

An explainable risk model whose behavior we can reason about

The ability to diagnose model collapse (uniform scores)

In [13]:
# --- Clinical feature extraction (simple + explainable) ---
# Stratifing age and condition count instead of using a single cutoff,
# because initial model has uniform risk scores (3))

def has_diabetes(conditions: list[dict]) -> bool:
    """Heuristic diabetes detection using ICD-10-CM prefixes (demo)."""
    for c in conditions:
        for coding in (c.get("code", {}).get("coding") or []):
            code = (coding.get("code") or "")
            if code.startswith("E10") or code.startswith("E11"):
                return True
    return False

def latest_observation_value(obs_list: list[dict], loinc_code: str):
    """Return (latest_datetime, latest_value) for a LOINC-coded Observation."""
    latest_dt = None
    latest_val = None
    for o in obs_list:
        if not any(c.get("code") == loinc_code for c in (o.get("code", {}).get("coding") or [])):
            continue

        dt_str = o.get("effectiveDateTime")
        val = (o.get("valueQuantity") or {}).get("value")
        if not dt_str or val is None:
            continue

        try:
            dt = datetime.fromisoformat(dt_str.replace("Z", "+00:00"))
        except Exception:
            continue

        if latest_dt is None or dt > latest_dt:
            latest_dt = dt
            try:
                latest_val = float(val)
            except Exception:
                latest_val = None

    return latest_dt, latest_val

def age_from_birthdate(birthdate_str: str | None):
    """Age in years from FHIR Patient.birthDate (YYYY-MM-DD)."""
    if not birthdate_str:
        return None
    try:
        # birthDate is usually YYYY-MM-DD
        dob = datetime.fromisoformat(birthdate_str + "T00:00:00").date()
    except Exception:
        return None

    today = datetime.now().date()
    return today.year - dob.year - ((today.month, today.day) < (dob.month, dob.day))

def compute_metrics(pid: str, birthdate: str | None) -> dict:
    """Compute care gaps + a simple readmission risk score (demo)."""
    conds = fetch_conditions(pid)
    obs = fetch_target_observations(pid)  # uses TARGET_OBS_CODES in the query (HbA1c only)

    diabetic = has_diabetes(conds)
    _, hba1c = latest_observation_value(obs, "4548-4")

    # Care gaps we can support with the ingested dataset (fast + reproducible)
    gaps = []
    if diabetic and hba1c is None:
        gaps.append("Diabetes without recent HbA1c")
    if hba1c is not None and hba1c >= 8.0:
        gaps.append("HbA1c high (>=8.0)")

    # Simple proxies that are common in early-stage population health prototypes
    condition_count = len(conds)
    age = age_from_birthdate(birthdate)

    # Risk score (explainable demo model)
    # Stratifing age and condition count instead of using a single cutoff.)
    risk = 0
    if age is not None and age >= 85:
        risk += 4
    elif age >= 75:
        risk += 3
    elif age >= 65:
        risk += 2
    if diabetic:
        risk += 3
    if hba1c is not None and hba1c >= 8.0:
        risk += 2
    if condition_count >= 15:
        risk += 2
    elif condition_count >= 8:
        risk += 1


    return {
        "patient_id": pid,
        "age": age,
        "condition_count": condition_count,
        "has_diabetes": diabetic,
        "hba1c": hba1c,
        "risk_score": risk,
        "care_gaps": gaps,
    }

# Build the analysis table
rows = []
for r in patients_df.itertuples(index=False):
    rows.append(compute_metrics(r.patient_id, r.birthDate))

risk_df = pd.DataFrame(rows).merge(patients_df[["patient_id", "name"]], on="patient_id", how="left")
risk_df.sort_values("risk_score", ascending=False).head(15)


Unnamed: 0,patient_id,age,condition_count,has_diabetes,hba1c,risk_score,care_gaps,name
13,129c6ac7-8d06-89de-ad63-0204a93e76c3,98,49,False,6.12,6,[],Sumiko254 Larue605 Medhurst46
24,239f5e4c-f482-ddae-c126-3179c0ff5985,109,100,False,,6,[],Alvin56 Adrian111 Hickle134
37,4953d3b5-f3f0-2aaf-3dc0-3c581ed15647,89,30,False,5.59,6,[],Roxann426 Kulas532
44,525b6c4d-e6c2-bde9-5ad5-697e5b246755,89,69,False,5.51,6,[],Jenelle653 Mitsue965 Kuhlman484
28,297a0b2a-0f16-f1c9-d80b-018a08da34e3,89,38,False,,6,[],Marylou497 Zetta950 Jacobs452
35,42618df6-53ac-d2d7-6281-1ab4094bc26c,82,100,False,,5,[],Augustine565 Ondricka197
10,09e4bdf5-f133-1637-1493-2e489bff1d7b,76,64,False,5.76,5,[],Johnetta529 Paul232 Johns824
15,15f708b2-2c47-b525-9118-ca04d5cf78fa,80,50,False,,5,[],Robbi844 Zona368 Collins926
45,53b879ef-a222-ed0a-fd91-f14c32ce7c8e,80,45,False,6.06,5,[],Elvera717 Shaunda110 Keebler762
7,01707a0c-9619-ccba-695a-b270744d76c2,78,77,False,6.34,5,[],Silvana620 Coralee911 Reynolds644


now we have risk scores as 6, 5, and 4. After refining the risk logic to stratify age bands and incorporate comorbidity burden, the model produces a meaningful distribution of risk scores rather than a single uniform value. Higher scores now correspond to very elderly patients and those with extensive condition histories, while younger or less complex patients score lower. Even when explicit care gaps are absent, this ranking enables prioritization for proactive outreach, reflecting how population health risk models are used in practice.

## 10) Optional: write back risk score as FHIR Observation (idempotent)


In [16]:
def make_risk_observation(patient_id: str, score: float) -> dict:
    rid = f"readmit-risk-{patient_id}"  # stable id => safe to rerun
    return {
        "resourceType": "Observation",
        "id": rid,
        "status": "final",
        "code": {"coding": [{"system":"http://example.org/codes/risk","code":"READMIT-RISK","display":"Readmission risk score (demo)"}]},
        "subject": {"reference": f"Patient/{patient_id}"},
        "effectiveDateTime": datetime.now(timezone.utc).isoformat().replace("+00:00","Z"),
        "valueQuantity": {"value": float(score), "unit": "score"},
    }

if WRITE_BACK_RISK:
    ok=failed=0
    for row in risk_df.itertuples(index=False):
        body = make_risk_observation(row.patient_id, row.risk_score)
        r = fhir_put("Observation", body["id"], body, timeout=(5, 30))
        if r.status_code in (200, 201):
            ok += 1
        else:
            failed += 1
            if failed <= 3:
                print("Write-back failed:", r.status_code, r.text[:200])
    print({"ok": ok, "failed": failed})
else:
    print("Write-back disabled ✅")


{'ok': 50, 'failed': 0}


when the write flag is set to false:
This is intentional for:

interview runs (read-only)

faster execution

avoiding duplicate Observations

when the write flag is set to true, verify taht the records were added

http://34.42.214.227:32783/fhir/r4/Observation?code=READMIT-RISK&_format=json

What you should see:

A Bundle with ~50 Observation resources

Each Observation has:

code = READMIT-RISK

subject.reference = Patient/<id>

valueQuantity.value = risk_score


In [18]:
# Verify write-back count programmatically
verify = fhir_get(
    "Observation",
    {"code": "READMIT-RISK", "_summary": "count", "_format": "json"}
)
print("READMIT-RISK Observation count:", verify.get("total"))


READMIT-RISK Observation count: 50


In this notebook, I built an end-to-end, reproducible healthcare analytics pipeline on top of a real FHIR server using synthetic Synthea data. The goal was not just to ingest data, but to demonstrate how raw clinical data becomes actionable insight through stratification and care-gap analysis.

First, I ingested a controlled synthetic dataset into an InterSystems IRIS FHIR server in an idempotent way, so the notebook can be safely rerun without data duplication or performance degradation. This reflects how production healthcare pipelines must behave in regulated environments.

Next, I extracted patient-level clinical signals — age, condition burden, and key lab values like HbA1c.

During this process, I also surfaced missing clinical signals, most notably the absence of blood pressure observations for many patients. Rather than ignoring this, the notebook flags missing measurements as care gaps. In real clinical systems, missing data often indicates incomplete monitoring or workflow breakdowns, and recognizing these gaps is essential for population health management.

After the first pass of analysis, I noticed an important issue: every patient ended up with the same risk score. That immediately told me the model was not stratifying the population in a meaningful way. Instead of accepting the output, I treated this as a signal that the inputs and weighting logic were too coarse to differentiate patients.

I then refined the analysis to explicitly introduce risk stratification by incorporating age bands, overall condition burden, and available lab signals such as HbA1c. Once these dimensions were added, the population naturally separated into higher- and lower-risk tiers, which is the core purpose of stratification in healthcare analytics — identifying who needs attention first.

Finally, I wrote the resulting risk scores back into the FHIR server as standardized Observation resources. This closes the loop by turning analytics output into interoperable clinical data that can be reused by dashboards, care management tools, or decision-support systems.

Overall, this project demonstrates how I approach healthcare analytics and product design: validate outputs critically, refine models when results are not meaningful, stratify populations using clinically relevant signals, explicitly handle missing data, and integrate insights back into production-grade healthcare systems in a reproducible way.

## 11) Quick browser URLs


- CapabilityStatement: `http://34.42.214.227:32783/fhir/r4/metadata`  
- Patient count: `http://34.42.214.227:32783/fhir/r4/Patient?_summary=count&_format=json`  
- HbA1c Observations: `http://34.42.214.227:32783/fhir/r4/Observation?code=4548-4&_format=json`  
- Risk write-back (if enabled): `http://34.42.214.227:32783/fhir/r4/Observation?code=READMIT-RISK&_format=json`
