# CDC Adult Sepsis Event (ASE) Implementation

## Overview
This notebook implements the CDC Adult Sepsis Event (ASE) surveillance definition based on the CDC Sepsis Surveillance Toolkit (March 2018).

### CDC ASE Definition:
**Sepsis = Component A (Presumed Serious Infection) + Component B (Acute Organ Dysfunction)**

- **Component A**: Blood culture + ≥4 Qualifying Antimicrobial Days (QAD)
- **Component B**: ≥1 organ dysfunction criterion within ±2 calendar days of blood culture

## 1. Setup and Configuration

In [None]:
# Standard library imports
import json
import logging
from datetime import datetime, timedelta
from pathlib import Path
import warnings

# Data processing imports
import pandas as pd
import numpy as np
import duckdb

# clifpy imports for CLIF data loading
from clifpy.tables import (
    Adt,
    Hospitalization,
    Patient,
    Labs,
    MedicationAdminContinuous,
    MedicationAdminIntermittent,
    MicrobiologyCulture,
    RespiratorySupport,
    HospitalDiagnosis,
)

# Configuration
warnings.filterwarnings('ignore')
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# Display settings
pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', 100)
pd.set_option('display.width', None)

print("Setup complete!")

In [None]:
# Load configuration
config_path = Path('../config/config.json')
with open(config_path, 'r') as f:
    config = json.load(f)

# Extract configuration parameters
data_directory = config.get('data_directory', config.get('tables_path'))
filetype = config.get('filetype', 'parquet')
timezone = config.get('timezone', 'UTC')

print(f"Configuration loaded:")
print(f"  Data directory: {data_directory}")
print(f"  File type: {filetype}")
print(f"  Timezone: {timezone}")

In [None]:
# Initialize DuckDB connection
con = duckdb.connect(database=':memory:', read_only=False)
con.execute(f"SET TimeZone = '{timezone}'")
con.execute("SET threads TO 10")

print("DuckDB connection initialized")

## Cohort

In [None]:
adt_df = Adt.from_file(
        data_directory=data_directory,
        filetype=filetype,
        timezone=timezone,
        columns=["hospitalization_id", "in_dttm", "out_dttm", "location_category"],
    ).df
# Filter ADT for hospitalizations with at least one ICU, Ward, or ED location
adt_df['location_category_lower'] = adt_df['location_category'].str.lower()

# Filter for rows containing 'icu', 'ward', or 'ed'
cohort_df = adt_df[
    adt_df['location_category_lower'].str.contains('icu|ward|ed', case=False, na=False)
].copy()

# Extract unique hospitalization IDs
hospitalization_ids = cohort_df['hospitalization_id'].unique().tolist()

print(f"Total ADT rows: {len(adt_df):,}")
print(f"Rows with ICU/Ward/ED: {len(cohort_df):,}")
print(f"Unique hospitalizations with ICU/Ward/ED: {len(hospitalization_ids):,}")
print(f"\nLocation category breakdown:")
print(cohort_df['location_category'].value_counts())


## 2. Data Loading with clifpy

In [None]:
# Load hospitalizations first to get patient IDs
print("Loading hospitalizations...")
hosp_df = Hospitalization.from_file(
    data_directory=data_directory,
    filetype=filetype,
    timezone=timezone,
    filters={"hospitalization_id": hospitalization_ids}
).df

# Get unique patient IDs from the loaded hospitalizations
patient_ids = hosp_df['patient_id'].unique().tolist()
print(f"Extracted {len(patient_ids)} unique patient IDs from hospitalizations.")


con.register("hospitalizations", hosp_df)

print(f"Loaded {len(hosp_df)} hospitalizations")
print(f"Unique hospitalization IDs: {len(hospitalization_ids)}")

In [None]:
# Load hospitalizations first to get patient IDs
print("Loading patients...")
patient_df = Patient.from_file(
    data_directory=data_directory,
    filetype=filetype,
    timezone=timezone,
    filters={"patient_id": patient_ids}
).df
con.register("patient", patient_df)

## 3. Component A: Presumed Serious Infection

### CDC Definition (Pages 5-8):
1. Blood culture obtained (result not required)
2. ≥4 Qualifying Antimicrobial Days (QAD) starting within ±2 calendar days of blood culture
   - New antimicrobial (not given in prior 2 days)
   - At least one IV/IM antimicrobial required
   - 1-day gaps allowed between antimicrobial days

### 3.1 Blood Culture Identification

In [None]:
# Load microbiology cultures
print("Loading microbiology cultures...")
culture_df = MicrobiologyCulture.from_file(
    data_directory=data_directory,
    filetype=filetype,
    timezone=timezone,
    filters={"hospitalization_id": hospitalization_ids,
             "fluid_category": ["blood_buffy"]}
).df

print(f"Total cultures loaded: {len(culture_df)}")
print(f"Fluid categories: {culture_df['fluid_category'].value_counts().to_dict()}")

# Create culture time column (use collect time, fallback to order time)
# culture_df['culture_time'] = culture_df['collect_dttm'].fillna(culture_df['order_dttm'])
culture_df['culture_time'] = culture_df['collect_dttm']
culture_df['culture_day'] = pd.to_datetime(culture_df['culture_time']).dt.date
# Assign unique ID to EACH blood culture per hospitalization
# This allows us to evaluate each blood culture independently per CDC guidelines
culture_df = culture_df.sort_values(['hospitalization_id', 'culture_time'])
culture_df = culture_df.groupby(['hospitalization_id', 'culture_time']).first().reset_index()
culture_df['bc_id'] = culture_df.groupby('hospitalization_id').cumcount() + 1

print(f"\nBlood cultures per patient statistics:")
bc_per_patient = culture_df.groupby('hospitalization_id')['bc_id'].max()
print(f"  Patients with 1 blood culture: {(bc_per_patient == 1).sum()}")
print(f"  Patients with 2+ blood cultures: {(bc_per_patient > 1).sum()}")
print(f"  Max blood cultures per patient: {bc_per_patient.max()}")

# Register with DuckDB
con.register("blood_cultures", culture_df)

### 3.2 Load Antimicrobials

In [None]:
# Load intermittent medications (antibiotics)
print("Loading intermittent medications (antibiotics)...")
med_int_df = MedicationAdminIntermittent.from_file(
    data_directory=data_directory,
    filetype=filetype,
    timezone=timezone,
    filters={
        "hospitalization_id": hospitalization_ids,
        "med_group": ["CMS_sepsis_qualifying_antibiotics"],
    }
).df

print(f"Intermittent antibiotics loaded: {len(med_int_df)}")

# Standardize antibiotic data
med_int_df['med_admin_time'] = med_int_df['admin_dttm']
med_int_df['med_admin_day'] = pd.to_datetime(med_int_df['med_admin_time']).dt.date

# Identify IV/IM routes
iv_im_routes = ['iv', 'im', 'intravenous', 'intramuscular']
med_int_df['med_route_category_lower'] = med_int_df['med_route_category'].str.lower()
med_int_df['is_iv_im'] = med_int_df['med_route_category_lower'].isin(iv_im_routes).astype(int)

# Register with DuckDB using consistent name
con.register("antibiotics", med_int_df)

print(f"\nTotal antibiotics: {len(med_int_df)}")
print(f"Unique patients with antibiotics: {med_int_df['hospitalization_id'].nunique()}")
print(f"\nRoute distribution:")
print(med_int_df['med_route_category_lower'].value_counts().head(10))
print(f"\nIV/IM antibiotics: {med_int_df['is_iv_im'].sum()} ({med_int_df['is_iv_im'].mean()*100:.1f}%)")

### 3.3 Identify New Antimicrobials and Calculate QAD

CDC Requirements:
- "New antimicrobial" = not given in prior 2 calendar days
- First antimicrobial must be IV/IM and within ±2 calendar days of blood culture
- Need ≥4 consecutive days (allowing 1-day gaps)

In [None]:
qad_query = """
WITH
/* 0) Cultures */
cultures AS (
  SELECT
    hospitalization_id,
    bc_id,
    culture_time,
    DATE(culture_day) AS culture_day
  FROM blood_cultures
  WHERE culture_time IS NOT NULL
),

/* 1) Antibiotics at day level (vancomycin exception) */
abx_day AS (
  SELECT DISTINCT
    a.hospitalization_id,
    DATE(a.med_admin_day) AS antibiotic_day,
    CASE
      WHEN LOWER(a.med_category) = 'vancomycin' AND a.is_iv_im = 1 THEN 'vancomycin_iv'
      WHEN LOWER(a.med_category) = 'vancomycin' AND a.is_iv_im = 0 THEN 'vancomycin_oral'
      ELSE a.med_category
    END AS med_category_tracked,
    a.is_iv_im
  FROM antibiotics a
  JOIN hospitalizations h
    ON a.hospitalization_id = h.hospitalization_id
  WHERE a.med_admin_day IS NOT NULL
    AND a.med_admin_day >= h.admission_dttm
    AND a.med_admin_day <= h.discharge_dttm
),

/* 2) Mark new courses per drug (new if gap > 2 days) */
abx_course_marked AS (
  SELECT
    hospitalization_id,
    med_category_tracked,
    antibiotic_day,
    CASE
      WHEN LAG(antibiotic_day) OVER (
        PARTITION BY hospitalization_id, med_category_tracked
        ORDER BY antibiotic_day
      ) IS NULL THEN 1
      WHEN antibiotic_day - LAG(antibiotic_day) OVER (
        PARTITION BY hospitalization_id, med_category_tracked
        ORDER BY antibiotic_day
      ) > 2 THEN 1
      ELSE 0
    END AS new_course_flag,
    MAX(is_iv_im) OVER (
      PARTITION BY hospitalization_id, med_category_tracked, antibiotic_day
    ) AS any_iv_im_that_day
  FROM abx_day
),

/* 2b) Assign course_id */
abx_courses AS (
  SELECT
    hospitalization_id,
    med_category_tracked,
    SUM(new_course_flag) OVER (
      PARTITION BY hospitalization_id, med_category_tracked
      ORDER BY antibiotic_day
      ROWS UNBOUNDED PRECEDING
    ) AS course_id,
    antibiotic_day,
    any_iv_im_that_day
  FROM abx_course_marked
),

/* 3a) Course bounds */
course_bounds AS (
  SELECT
    hospitalization_id,
    med_category_tracked,
    course_id,
    MIN(antibiotic_day) AS course_start_day,
    MAX(antibiotic_day) AS course_end_day
  FROM abx_courses
  GROUP BY hospitalization_id, med_category_tracked, course_id
),

/* 3b) Whether course START DAY is IV/IM */
course_intervals AS (
  SELECT
    b.hospitalization_id,
    b.med_category_tracked,
    b.course_id,
    b.course_start_day,
    b.course_end_day,
    MAX(
      CASE
        WHEN a.antibiotic_day = b.course_start_day THEN a.any_iv_im_that_day
        ELSE 0
      END
    ) AS start_day_is_iv_im
  FROM course_bounds b
  JOIN abx_courses a
    ON a.hospitalization_id = b.hospitalization_id
   AND a.med_category_tracked = b.med_category_tracked
   AND a.course_id = b.course_id
  GROUP BY
    b.hospitalization_id, b.med_category_tracked, b.course_id,
    b.course_start_day, b.course_end_day
),

/* 4) Join cultures to courses; mark starts in the ±2 day window */
culture_course_window AS (
  SELECT
    c.hospitalization_id,
    c.bc_id,
    c.culture_time,
    c.culture_day,
    ci.med_category_tracked,
    ci.course_start_day,
    ci.course_end_day,
    ci.start_day_is_iv_im,
    CASE
      WHEN ci.course_start_day BETWEEN c.culture_day - 2 AND c.culture_day + 2 THEN 1
      ELSE 0
    END AS course_start_in_window
  FROM cultures c
  JOIN course_intervals ci
    ON c.hospitalization_id = ci.hospitalization_id
),

/* 4b) Anchor: earliest new antimicrobial start in window (any route),
       and require at least one new parenteral start in window */
qad_anchor AS (
  SELECT
    hospitalization_id,
    bc_id,
    culture_time,
    culture_day,
    MIN(CASE WHEN course_start_in_window = 1 THEN course_start_day END) AS qad_start_day,
    MAX(CASE
          WHEN course_start_in_window = 1 AND start_day_is_iv_im = 1 THEN 1
          ELSE 0
        END) AS has_new_parenteral_in_window
  FROM culture_course_window
  GROUP BY hospitalization_id, bc_id, culture_time, culture_day
  HAVING MIN(CASE WHEN course_start_in_window = 1 THEN course_start_day END) IS NOT NULL
),

/* 5) Eligible courses: only those starting on/after qad_start_day */
eligible_courses AS (
  SELECT DISTINCT
    a.hospitalization_id,
    a.bc_id,
    a.culture_time,
    a.culture_day,
    a.qad_start_day,
    a.has_new_parenteral_in_window,
    w.med_category_tracked,
    w.course_start_day,
    w.course_end_day
  FROM qad_anchor a
  JOIN culture_course_window w
    ON a.hospitalization_id = w.hospitalization_id
   AND a.bc_id = w.bc_id
  WHERE w.course_start_day >= a.qad_start_day
),

/* QC: meds started in window (anchors) */
qc_anchor_meds AS (
  SELECT
    hospitalization_id,
    bc_id,
    string_agg(DISTINCT med_category_tracked, ', ') AS anchor_meds_in_window,
    string_agg(
      DISTINCT CASE WHEN start_day_is_iv_im = 1 THEN med_category_tracked ELSE NULL END,
      ', '
    ) AS anchor_parenteral_meds_in_window
  FROM culture_course_window
  WHERE course_start_in_window = 1
  GROUP BY hospitalization_id, bc_id
),

/* QC: meds eligible to contribute after QAD starts */
qc_run_meds AS (
  SELECT
    hospitalization_id,
    bc_id,
    string_agg(DISTINCT med_category_tracked, ', ') AS run_meds
  FROM eligible_courses
  GROUP BY hospitalization_id, bc_id
),

/* 6) Expand covered days for eligible courses (counts single-gap q48h days) */
covered_days AS (
  SELECT DISTINCT
    hospitalization_id,
    bc_id,
    culture_time,
    culture_day,
    qad_start_day,
    has_new_parenteral_in_window,
    CAST(gs AS DATE) AS covered_day
  FROM eligible_courses
  CROSS JOIN generate_series(course_start_day, course_end_day, INTERVAL 1 DAY) AS t(gs)
  WHERE CAST(gs AS DATE) BETWEEN qad_start_day AND (qad_start_day + INTERVAL 6 DAY)
),

/* 7) Initial consecutive run starting at qad_start_day */
run_calc AS (
  SELECT
    hospitalization_id,
    bc_id,
    culture_time,
    culture_day,
    qad_start_day,
    has_new_parenteral_in_window,
    covered_day,
    ROW_NUMBER() OVER (
      PARTITION BY hospitalization_id, bc_id
      ORDER BY covered_day
    ) AS rn,
    (covered_day - qad_start_day) AS day_offset
  FROM covered_days
  WHERE covered_day >= qad_start_day
),

initial_run AS (
  SELECT
    hospitalization_id,
    bc_id,
    culture_time,
    culture_day,
    qad_start_day,
    has_new_parenteral_in_window,
    COUNT(*) AS qad_days,
    MIN(covered_day) AS qad_run_start,
    MAX(covered_day) AS qad_run_end
  FROM run_calc
  WHERE (day_offset - (rn - 1)) = 0
  GROUP BY hospitalization_id, bc_id, culture_time, culture_day, qad_start_day, has_new_parenteral_in_window
)

/* Final output (one row per culture) */
SELECT
  ir.hospitalization_id,
  ir.bc_id,
  ir.culture_time,
  ir.culture_day,
  ir.qad_start_day,
  ir.qad_days,
  ir.qad_run_start,
  ir.qad_run_end,
  ir.has_new_parenteral_in_window,

  CASE
    WHEN ir.has_new_parenteral_in_window = 1 AND ir.qad_days >= 4 THEN 1
    ELSE 0
  END AS meets_qad_criteria,

  am.anchor_meds_in_window,
  am.anchor_parenteral_meds_in_window,
  rm.run_meds

FROM initial_run ir
LEFT JOIN qc_anchor_meds am
  ON ir.hospitalization_id = am.hospitalization_id
 AND ir.bc_id = am.bc_id
LEFT JOIN qc_run_meds rm
  ON ir.hospitalization_id = rm.hospitalization_id
 AND ir.bc_id = rm.bc_id
ORDER BY ir.hospitalization_id, ir.bc_id
"""



# Execute QAD calculation
qad_results = con.execute(qad_query).fetchdf()

print(f"Patients with blood culture and new IV/IM antibiotic: {len(qad_results)}")
print(f"Patients meeting QAD criteria (≥4 days): {qad_results['meets_qad_criteria'].sum()}")

# Store QAD results
con.register("qad_results", qad_results)

### 3.4 Apply QAD Censoring Rules

CDC allows QAD < 4 days if patient dies, transfers to acute care, or goes to hospice (Page 8)

In [None]:
censoring_query = """
WITH qad_with_censor AS (
  SELECT
    q.*,

    -- From hospitalization
    h.discharge_dttm,
    h.discharge_category,

    -- From patient (death)
    p.death_dttm,

    -- Prefer hospitalization end; fall back to patient death only if discharge day is missing
    CASE
      WHEN h.discharge_dttm IS NOT NULL THEN h.discharge_dttm
      WHEN p.death_dttm IS NOT NULL THEN p.death_dttm
      ELSE NULL
    END AS censor_dttm,

    DATE(
      CASE
        WHEN h.discharge_dttm IS NOT NULL THEN h.discharge_dttm
        WHEN p.death_dttm IS NOT NULL THEN p.death_dttm
        ELSE NULL
      END
    ) AS censor_day,

    -- qualifying censoring categories (use exactly what your pipeline expects)
    CASE
      WHEN h.discharge_category IN (
        'expired', 'Expired',
        'acute_care_hospital', 'Acute Care Hospital',
        'hospice', 'Hospice'
      )
      THEN 1 ELSE 0
    END AS qualifies_for_censoring

  FROM qad_results q
  INNER JOIN hospitalizations h
    ON q.hospitalization_id = h.hospitalization_id
  LEFT JOIN patient p
    ON h.patient_id = p.patient_id
)

SELECT
  hospitalization_id,
  bc_id,
  culture_time,
  culture_day,

  qad_start_day,
  qad_days,
  qad_run_start,
  qad_run_end,

  discharge_dttm,
  discharge_category,
  death_dttm,
  censor_dttm,
  censor_day,
  qualifies_for_censoring,

  has_new_parenteral_in_window,
  meets_qad_criteria,

  anchor_meds_in_window,
  anchor_parenteral_meds_in_window,
  run_meds,

  CASE WHEN qad_run_end > censor_day THEN 1 ELSE 0 END AS run_extends_past_censor,

  CASE
    WHEN meets_qad_criteria = 1 THEN 1
    WHEN qad_days >= 1
      AND has_new_parenteral_in_window = 1
      AND qualifies_for_censoring = 1
      AND censor_dttm IS NOT NULL
      AND censor_day <= qad_start_day + INTERVAL 3 DAY
      AND qad_run_end >= censor_day - INTERVAL 1 DAY
    THEN 1
    ELSE 0
  END AS meets_qad_with_censoring,

  CASE
    WHEN meets_qad_criteria = 1
      THEN 'Meets QAD (standard)'
    WHEN qad_days >= 1
      AND has_new_parenteral_in_window = 1
      AND qualifies_for_censoring = 1
      AND censor_dttm IS NOT NULL
      AND censor_day <= qad_start_day + INTERVAL 3 DAY
      AND qad_run_end >= censor_day - INTERVAL 1 DAY
      THEN 'Meets QAD (censoring exception)'
    WHEN has_new_parenteral_in_window = 0
      THEN 'Fails QAD: no new IV/IM in window'
    ELSE 'Fails QAD: insufficient QAD days'
  END AS final_qad_status

FROM qad_with_censor
"""


final_qad = con.execute(censoring_query).fetchdf()
con.register("final_qad", final_qad)


## `final_qad` output columns (QAD + ASE censoring)
 
| Column | Type (typical) | Meaning | How it’s computed / where it comes from |
|---|---|---|---|
| `hospitalization_id` | string | Unique hospitalization (encounter) identifier. | From CLIF `hospitalizations`, propagated through joins. |
| `bc_id` | int | Blood culture index within a hospitalization (1, 2, 3, …). Each blood culture is evaluated independently. | Created as `bc_id` in `blood_cultures`. |
| `culture_time` | datetime | Timestamp for the blood culture event (order time, or collection time if order time is unavailable). | Derived from `blood_cultures.culture_time`. |
| `culture_day` | date | Calendar day of the blood culture event. Used for the ±2-day window logic. | Taken directly from `blood_cultures.culture_day` (already precomputed) or as `DATE(culture_day)` in SQL. |
| `qad_start_day` | date | The **first QAD day** for the culture: the earliest **NEW antimicrobial course start day** that is within the ±2 day window around `culture_day`. | `MIN(course_start_day)` among course starts in `[culture_day-2, culture_day+2]`. |
| `qad_days` | int | Length of the **initial consecutive QAD run** starting at `qad_start_day` (calendar day-based). | Count of consecutive `covered_day` values starting from `qad_start_day` until the first missing day. |
| `qad_run_start` | date | Start day of the consecutive QAD run (equal to `qad_start_day`). | `MIN(covered_day)` within the initial consecutive run. |
| `qad_run_end` | date | End day of the consecutive QAD run starting at `qad_start_day`. | `MAX(covered_day)` within the initial consecutive run. |
| `has_new_parenteral_in_window` | 0/1 | Indicates presence of at least one NEW parenteral (IV/IM) antimicrobial course started within the ±2 day window. Required for CDC ASE presumed infection. | `MAX(start_day_is_iv_im)` among course starts in window. |
| `meets_qad_criteria` | 0/1 | Standard CDC presumed infection rule: new IV/IM in window AND ≥4 QAD days. | `CASE WHEN has_new_parenteral_in_window=1 AND qad_days>=4 THEN 1`. |
| `anchor_meds_in_window` | string | QC list of antimicrobial names whose course start day falls within the ±2 day window (the “new antimicrobials” considered for anchoring). | `string_agg(DISTINCT med_category_tracked)` from window-starting courses. |
| `anchor_parenteral_meds_in_window` | string | QC list of window-starting antimicrobial names whose start day is IV/IM. | `string_agg(DISTINCT ...)` filtered to `start_day_is_iv_im=1`. |
| `run_meds` | string | QC list of antimicrobial names from courses that start on or after `qad_start_day` (eligible to contribute to covered days). | `string_agg(DISTINCT med_category_tracked)` from eligible courses. |
| `discharge_dttm` | datetime | Hospital discharge timestamp. | From CLIF `hospitalizations.discharge_dttm`. |
| `discharge_category` | string | Hospital discharge disposition category (used to determine censoring eligibility). | From CLIF `hospitalizations.discharge_category`. |
| `death_dttm` | datetime | Patient-level death timestamp (provided for QC, not used as the primary censor time in the final logic). | From CLIF `patient.death_dttm`. |
| `censor_dttm` | datetime | Timestamp used as the censoring event time for ASE censoring exception logic (discharge is preferred, falling back to death if discharge is unavailable). | `CASE WHEN discharge_dttm IS NOT NULL THEN discharge_dttm ELSE death_dttm END`. |
| `censor_day` | date | Calendar day of censoring event time used in censoring exception logic. | `DATE(censor_dttm)`. |
| `qualifies_for_censoring` | 0/1 | Indicates whether the discharge category is one of the ASE censoring-eligible outcomes (e.g., expired, acute care hospital transfer, hospice). | `CASE WHEN discharge_category IN (...) THEN 1`. |
| `run_extends_past_censor` | 0/1 | QC flag: indicates whether `qad_run_end` is after `censor_day` (should be 0 after the discharge-first logic). | `CASE WHEN qad_run_end > censor_day THEN 1`. |
| `meets_qad_with_censoring` | 0/1 | Final presumed infection flag allowing the censoring exception: qualifies if standard rule is met OR (eligible censoring, early censoring, and QAD run continues to day-of or day-prior). | Standard rule OR censoring exception rule. |
| `final_qad_status` | string | Human-readable reason for pass/fail (standard pass, censoring exception, or failure reason). | CASE expression summarizing logic outcome. |


## 4. Component B: Acute Organ Dysfunction

### CDC Definition (Page 5 and 9):
At least ONE organ dysfunction criterion within ±2 calendar days of blood culture:
1. **Renal**: Creatinine ≥2x baseline (exclude ESRD)
2. **Hepatic**: Bilirubin ≥2.0 mg/dL AND ≥2x baseline
3. **Coagulation**: Platelets <100 cells/μL AND ≥50% decline from baseline ≥100
4. **Metabolic**: Lactate ≥2.0 mmol/L
5. **Cardiovascular**: New vasopressor initiation
6. **Respiratory**: New mechanical ventilation initiation

### 4.1 Load Labs and Diagnoses

In [None]:
# Load laboratory data
print("Loading laboratory data...")
labs_df = Labs.from_file(
    data_directory=data_directory,
    filetype=filetype,
    timezone=timezone,
    filters={"hospitalization_id": hospitalization_ids,
            "lab_category": ["creatinine", "bilirubin_total", "platelet_count", "lactate"],
            },
    columns=[
            "hospitalization_id",
            "lab_category",
            "lab_value",
            "lab_value_numeric",
            "lab_result_dttm",
            "lab_order_dttm",
        ],
).df

con.register("labs", labs_df)

# Load diagnoses for ESRD exclusion

# List of ESRD codes (lowercase, no dots or whitespace)
ESRD_CODES = [
    "n186",    # End stage renal disease
    "z4931",   # Encounter for continuous renal replacement therapy (CRRT) for ESRD (CMS/HCC)
    "z4901",   # Encounter regarding vascular access for dialysis for end-stage renal disease (CMS/HCC)
    "i120",    # Hypertensive chronic kidney disease with stage 5 CKD or ESRD
    "i1311",   # Hypertensive heart and chronic kidney disease with heart failure and stage 5 CKD
    "i132",    # Hypertensive heart and chronic kidney disease with ESRD (alternate code)
    "i120",    # Hypertensive chronic kidney disease with stage 5 CKD or ESRD (alternate code)
    "i272",    # Pulmonary hypertension associated with ESRD on dialysis (CMS/HCC)
]

print("Loading diagnoses...")
diagnosis_df = HospitalDiagnosis.from_file(
    data_directory=data_directory,
    filetype=filetype,
    timezone=timezone,
    filters={"hospitalization_id": hospitalization_ids}
).df

# Clean diagnosis_code: lowercase, strip whitespace, remove dots if present
diagnosis_df['diagnosis_code_clean'] = (
    diagnosis_df['diagnosis_code']
    .astype(str)
    .str.lower()
    .str.strip()
    .str.replace('.', '', regex=False)
)

# Identify ESRD patients by the above code list
esrd_patients = (
    diagnosis_df[
        diagnosis_df['diagnosis_code_clean'].isin(ESRD_CODES)
    ][['hospitalization_id']].drop_duplicates()
)
esrd_patients['has_esrd'] = 1

print(f"Total diagnoses: {len(diagnosis_df)}")
print(f"ESRD patients (ESRD code match): {len(esrd_patients)}")

con.register("diagnoses", diagnosis_df)
con.register("esrd_patients", esrd_patients)

del diagnosis_df  # Free memory


### 4.2 Lab dysfunction

In [None]:
# --- Lab dysfunction constants (mirrors ASE.py style) ---
WINDOW_DAYS = 2  # ±2 calendar days around blood culture day
OUTLIERS = {
    "creatinine_max": 20,
    "bilirubin_max": 80,
    "platelet_max": 2000,
    "lactate_max": 30,
}

# Toolkit text has a small inconsistency: main criteria says +100% from baseline (2x), Appendix D template shows ≥50% compared to baseline.
BILI_MULTIPLIER = 2.0  

In [None]:
# =============================================================================
#  bc_episodes must start from ALL blood cultures 
# =============================================================================

con.execute("""
CREATE OR REPLACE TEMP TABLE bc_episodes AS
SELECT
  bc.hospitalization_id,
  bc.bc_id,
  bc.culture_time AS blood_culture_dttm,
  bc.culture_day  AS blood_culture_day,

  -- Component A: if no QAD row exists, presumed infection should be 0 (not dropped)
  COALESCE(q.meets_qad_with_censoring, 0) AS meets_qad_with_censoring,

  -- QC columns: will be NULL for episodes with no QAD anchor (fine)
  q.anchor_meds_in_window,
  q.anchor_parenteral_meds_in_window,
  q.run_meds

FROM blood_cultures bc
LEFT JOIN final_qad q
  ON bc.hospitalization_id = q.hospitalization_id
 AND bc.bc_id = q.bc_id

WHERE bc.culture_time IS NOT NULL
""")


In [None]:
lab_dysfunction_query = f"""
WITH
bc_hosp AS (
  SELECT * FROM bc_episodes
),
bc_hosp_ids AS (
  SELECT DISTINCT hospitalization_id FROM bc_hosp
),

-- Filter labs early (performance), normalize value + timestamp, and apply outlier caps
labs_filtered AS (
  SELECT
    l.hospitalization_id,
    l.lab_category,
    COALESCE(l.lab_value_numeric, TRY_CAST(l.lab_value AS DOUBLE)) AS value,
    COALESCE(l.lab_result_dttm, l.lab_order_dttm) AS lab_dttm
  FROM labs l
  WHERE l.hospitalization_id IN (SELECT hospitalization_id FROM bc_hosp_ids)
    AND l.lab_category IN ('creatinine','bilirubin_total','platelet_count','lactate')
    AND COALESCE(l.lab_value_numeric, TRY_CAST(l.lab_value AS DOUBLE)) IS NOT NULL
    AND COALESCE(l.lab_result_dttm, l.lab_order_dttm) IS NOT NULL
    AND (
      (l.lab_category = 'creatinine'      AND COALESCE(l.lab_value_numeric, TRY_CAST(l.lab_value AS DOUBLE)) <= {OUTLIERS['creatinine_max']})
      OR
      (l.lab_category = 'bilirubin_total' AND COALESCE(l.lab_value_numeric, TRY_CAST(l.lab_value AS DOUBLE)) <= {OUTLIERS['bilirubin_max']})
      OR
      (l.lab_category = 'platelet_count'  AND COALESCE(l.lab_value_numeric, TRY_CAST(l.lab_value AS DOUBLE)) <= {OUTLIERS['platelet_max']})
      OR
      (l.lab_category = 'lactate'         AND COALESCE(l.lab_value_numeric, TRY_CAST(l.lab_value AS DOUBLE)) <= {OUTLIERS['lactate_max']})
    )
),

-- Community baselines: whole hospitalization
baseline_community AS (
  SELECT
    hospitalization_id,
    MIN(CASE WHEN lab_category = 'creatinine'      THEN value END) AS cr_baseline_co,
    MIN(CASE WHEN lab_category = 'bilirubin_total' THEN value END) AS bili_baseline_co,
    MAX(CASE WHEN lab_category = 'platelet_count'  THEN value END) AS plt_baseline_raw_co,
    MAX(CASE WHEN lab_category = 'platelet_count' AND value >= 100 THEN 1 ELSE 0 END) AS plt_has_ge100_co
  FROM labs_filtered
  WHERE lab_category IN ('creatinine','bilirubin_total','platelet_count')
  GROUP BY hospitalization_id
),
baseline_community_final AS (
  SELECT
    hospitalization_id,
    cr_baseline_co,
    bili_baseline_co,
    CASE WHEN plt_has_ge100_co = 1 THEN plt_baseline_raw_co ELSE NULL END AS plt_baseline_co
  FROM baseline_community
),

-- Labs in the ±2 calendar-day window around blood culture day (per bc_id)
labs_window AS (
  SELECT
    lf.hospitalization_id,
    bc.bc_id,
    lf.lab_category,
    lf.value,
    lf.lab_dttm,
    bc.blood_culture_day
  FROM labs_filtered lf
  JOIN bc_hosp bc
    ON lf.hospitalization_id = bc.hospitalization_id
  WHERE DATE(lf.lab_dttm) BETWEEN bc.blood_culture_day - INTERVAL '{WINDOW_DAYS} days'
                             AND bc.blood_culture_day + INTERVAL '{WINDOW_DAYS} days'
),

-- Hospital baselines: within ±2 days of blood culture day (per bc_id)
baseline_hospital AS (
  SELECT
    hospitalization_id,
    bc_id,
    MIN(CASE WHEN lab_category = 'creatinine'      THEN value END) AS cr_baseline_ho,
    MIN(CASE WHEN lab_category = 'bilirubin_total' THEN value END) AS bili_baseline_ho,
    MAX(CASE WHEN lab_category = 'platelet_count' AND value >= 100 THEN value END) AS plt_baseline_ho
  FROM labs_window
  WHERE lab_category IN ('creatinine','bilirubin_total','platelet_count')
  GROUP BY hospitalization_id, bc_id
),

-- ESRD flags (your table: esrd_patients has hospitalization_id, has_esrd=1)
esrd_temp AS (
  SELECT hospitalization_id, 1 AS esrd
  FROM esrd_patients
),

labs_with_baselines AS (
  SELECT
    lw.*,
    bc.cr_baseline_co,
    bc.bili_baseline_co,
    bc.plt_baseline_co,
    bh.cr_baseline_ho,
    bh.bili_baseline_ho,
    bh.plt_baseline_ho,
    e.esrd
  FROM labs_window lw
  LEFT JOIN baseline_community_final bc
    ON lw.hospitalization_id = bc.hospitalization_id
  LEFT JOIN baseline_hospital bh
    ON lw.hospitalization_id = bh.hospitalization_id AND lw.bc_id = bh.bc_id
  LEFT JOIN esrd_temp e
    ON lw.hospitalization_id = e.hospitalization_id
),

-- AKI: creatinine >= 2x baseline, exclude ESRD
aki AS (
  SELECT
    hospitalization_id,
    bc_id,
    MIN(CASE WHEN esrd IS NULL AND cr_baseline_co IS NOT NULL AND value >= 2.0 * cr_baseline_co THEN lab_dttm END) AS aki_dttm_co,
    MIN(CASE WHEN esrd IS NULL AND cr_baseline_ho IS NOT NULL AND value >= 2.0 * cr_baseline_ho THEN lab_dttm END) AS aki_dttm_ho
  FROM labs_with_baselines
  WHERE lab_category = 'creatinine'
  GROUP BY hospitalization_id, bc_id
),

-- Hyperbilirubinemia: bili >=2.0 and relative increase vs baseline
hyperbili AS (
  SELECT
    hospitalization_id,
    bc_id,
    MIN(CASE WHEN bili_baseline_co IS NOT NULL AND value >= 2.0 AND value >= {BILI_MULTIPLIER} * bili_baseline_co THEN lab_dttm END) AS hyperbili_dttm_co,
    MIN(CASE WHEN bili_baseline_ho IS NOT NULL AND value >= 2.0 AND value >= {BILI_MULTIPLIER} * bili_baseline_ho THEN lab_dttm END) AS hyperbili_dttm_ho
  FROM labs_with_baselines
  WHERE lab_category = 'bilirubin_total'
  GROUP BY hospitalization_id, bc_id
),

-- Thrombocytopenia: value <100 and <= 0.5 * baseline, baseline must be usable (>=100 rule handled by baseline_* tables)
thrombo AS (
  SELECT
    hospitalization_id,
    bc_id,
    MIN(CASE WHEN plt_baseline_co IS NOT NULL AND value < 100.0 AND value <= 0.5 * plt_baseline_co THEN lab_dttm END) AS thrombo_dttm_co,
    MIN(CASE WHEN plt_baseline_ho IS NOT NULL AND value < 100.0 AND value <= 0.5 * plt_baseline_ho THEN lab_dttm END) AS thrombo_dttm_ho
  FROM labs_with_baselines
  WHERE lab_category = 'platelet_count'
  GROUP BY hospitalization_id, bc_id
),

-- Optional lactate >=2.0 (no baseline)
lactate AS (
  SELECT
    hospitalization_id,
    bc_id,
    MIN(CASE WHEN value >= 2.0 THEN lab_dttm END) AS lactate_dttm
  FROM labs_with_baselines
  WHERE lab_category = 'lactate'
  GROUP BY hospitalization_id, bc_id
)

SELECT
  bc.hospitalization_id,
  bc.bc_id,
  bc.blood_culture_dttm,
  bc.blood_culture_day,
  bc.meets_qad_with_censoring,

  -- Baselines (QC)
  bco.cr_baseline_co,
  bco.bili_baseline_co,
  bco.plt_baseline_co,
  bho.cr_baseline_ho,
  bho.bili_baseline_ho,
  bho.plt_baseline_ho,

  -- ESRD exclusion flag (QC)
  CASE WHEN e.esrd IS NULL THEN 0 ELSE 1 END AS has_esrd,

  -- Dysfunction times under BOTH baseline scenarios
  a.aki_dttm_co,
  a.aki_dttm_ho,
  hb.hyperbili_dttm_co,
  hb.hyperbili_dttm_ho,
  t.thrombo_dttm_co,
  t.thrombo_dttm_ho,

  -- Optional
  lac.lactate_dttm

FROM bc_hosp bc
LEFT JOIN baseline_community_final bco
  ON bc.hospitalization_id = bco.hospitalization_id
LEFT JOIN baseline_hospital bho
  ON bc.hospitalization_id = bho.hospitalization_id AND bc.bc_id = bho.bc_id
LEFT JOIN esrd_temp e
  ON bc.hospitalization_id = e.hospitalization_id
LEFT JOIN aki a
  ON bc.hospitalization_id = a.hospitalization_id AND bc.bc_id = a.bc_id
LEFT JOIN hyperbili hb
  ON bc.hospitalization_id = hb.hospitalization_id AND bc.bc_id = hb.bc_id
LEFT JOIN thrombo t
  ON bc.hospitalization_id = t.hospitalization_id AND bc.bc_id = t.bc_id
LEFT JOIN lactate lac
  ON bc.hospitalization_id = lac.hospitalization_id AND bc.bc_id = lac.bc_id
ORDER BY bc.hospitalization_id, bc.bc_id
"""

lab_dysfunction_df = con.execute(lab_dysfunction_query).fetchdf()
con.register("lab_dysfunction", lab_dysfunction_df)

print("lab_dysfunction rows:", len(lab_dysfunction_df))


### 4.3 Load Clinical Interventions (Vasopressors and Mechanical Ventilation)

In [None]:
# Vasopressors (load med_continuous + adt → use → drop)
med_cont_df = MedicationAdminContinuous.from_file(
        data_directory=data_directory,
        filetype=filetype,
        timezone=timezone,
        filters={
            "hospitalization_id": hospitalization_ids,
            "med_group": ["vasoactives"],
        },
    ).df
con.register("med_continuous", med_cont_df)

adt_df = Adt.from_file(
        data_directory=data_directory,
        filetype=filetype,
        timezone=timezone,
        filters={"hospitalization_id": hospitalization_ids},
        columns=["hospitalization_id", "in_dttm", "out_dttm", "location_category"],
    ).df
con.register("adt", adt_df)

resp_df = RespiratorySupport.from_file(
        data_directory=data_directory,
        filetype=filetype,
        timezone=timezone,
        filters={"hospitalization_id": hospitalization_ids},
    ).df
con.register("respiratory", resp_df)

In [None]:
# Make sure blood_cultures exists in DuckDB already 

con.execute("""
CREATE OR REPLACE TEMP VIEW blood_cultures_temp AS
SELECT
  hospitalization_id,
  bc_id,
  culture_time AS blood_culture_dttm
FROM blood_cultures
WHERE culture_time IS NOT NULL
""")

### 4.4 Calculate Clinical Intervention Organ Dysfunction

In [None]:
vaso_query = """
WITH bc_hosp AS (
    SELECT * FROM blood_cultures_temp
),
-- First compute new initiation at the vasopressor level (before joining to BCs)
vaso_with_prev AS (
    SELECT
        m.hospitalization_id,
        m.admin_dttm,
        m.med_name,
        m.med_category,
        DATE(m.admin_dttm) AS admin_date,
        LAG(DATE(m.admin_dttm)) OVER (
            PARTITION BY m.hospitalization_id, m.med_category
            ORDER BY m.admin_dttm
        ) AS prev_admin_date
    FROM med_continuous m
    LEFT JOIN adt a
      ON m.hospitalization_id = a.hospitalization_id
     AND m.admin_dttm >= a.in_dttm
     AND m.admin_dttm <  a.out_dttm
    WHERE m.med_group = 'vasoactives'
      AND m.med_dose > 0
      AND (a.location_category IS NULL OR LOWER(a.location_category) != 'procedural')
),
-- Filter to new initiations only
new_vaso AS (
    SELECT *
    FROM vaso_with_prev
    WHERE prev_admin_date IS NULL OR DATEDIFF('day', prev_admin_date, admin_date) > 1
),
-- Now join to blood cultures and filter by window
new_vaso_in_window AS (
    SELECT
        v.hospitalization_id,
        bc.bc_id,
        v.admin_dttm,
        v.med_category,
        bc.blood_culture_dttm
    FROM new_vaso v
    JOIN bc_hosp bc
      ON v.hospitalization_id = bc.hospitalization_id
    WHERE v.admin_dttm BETWEEN
          bc.blood_culture_dttm - INTERVAL '2 days'
          AND bc.blood_culture_dttm + INTERVAL '2 days'
)
SELECT
    hospitalization_id,
    bc_id,
    MIN(admin_dttm) AS vasopressor_dttm,
    FIRST(med_category ORDER BY admin_dttm) AS vasopressor_name
FROM new_vaso_in_window
GROUP BY hospitalization_id, bc_id
"""

vasopressor_df = con.execute(vaso_query).fetchdf()
con.register("vasopressor_df", vasopressor_df)


In [None]:
imv_query = """
WITH bc_hosp AS (
    SELECT * FROM blood_cultures_temp
),
-- First compute new IMV episodes at the patient level (before joining to BCs)
imv_with_prev AS (
    SELECT
        r.hospitalization_id,
        r.recorded_dttm,
        DATE(r.recorded_dttm) AS imv_date,
        LAG(DATE(r.recorded_dttm)) OVER (
            PARTITION BY r.hospitalization_id
            ORDER BY r.recorded_dttm
        ) AS prev_imv_date
    FROM respiratory r
    WHERE LOWER(r.device_category) = 'imv'
),
-- Filter to new episodes only
new_imv AS (
    SELECT *
    FROM imv_with_prev
    WHERE prev_imv_date IS NULL OR DATEDIFF('day', prev_imv_date, imv_date) > 1
),
-- Now join to blood cultures and filter by window
new_imv_in_window AS (
    SELECT
        i.hospitalization_id,
        bc.bc_id,
        i.recorded_dttm,
        bc.blood_culture_dttm
    FROM new_imv i
    JOIN bc_hosp bc
      ON i.hospitalization_id = bc.hospitalization_id
    WHERE i.recorded_dttm BETWEEN
          bc.blood_culture_dttm - INTERVAL '2 days'
          AND bc.blood_culture_dttm + INTERVAL '2 days'
)
SELECT
    hospitalization_id,
    bc_id,
    MIN(recorded_dttm) AS imv_dttm
FROM new_imv_in_window
GROUP BY hospitalization_id, bc_id
"""

imv_df = con.execute(imv_query).fetchdf()
con.register("imv_df", imv_df)

print("IMV rows:", len(imv_df))
imv_df.head()


### 4.5 Onset time and Onset Type - Community or Hospital

In [None]:
con.execute("""
CREATE OR REPLACE TEMP TABLE component_b_inputs AS
WITH base AS (
  SELECT
    bc.hospitalization_id,
    bc.bc_id,
    bc.blood_culture_dttm,
    bc.blood_culture_day,
    h.admission_dttm,

    -- Component A 
    bc.meets_qad_with_censoring AS presumed_infection,

    -- QAD columns
    q.qad_days AS total_qad,
    q.qad_run_start AS qad_start_date,
    q.qad_run_end AS qad_end_date,
    q.final_qad_status,

    -- QAD anchor day (treat as day-level; only count if there was a new parenteral start in window)
    CASE
      WHEN q.has_new_parenteral_in_window = 1 AND q.qad_start_day IS NOT NULL
        THEN CAST(q.qad_start_day AS TIMESTAMP)
      ELSE NULL
    END AS first_qad_dttm,

    -- Keep QC columns 
    bc.anchor_meds_in_window,
    bc.anchor_parenteral_meds_in_window,
    bc.run_meds,

    -- “Type for baseline selection” 
    CASE
      WHEN DATEDIFF('day', DATE(h.admission_dttm), DATE(bc.blood_culture_dttm)) + 1 <= 2
        THEN 'community'
      ELSE 'hospital'
    END AS type_for_baseline
  FROM bc_episodes bc
  JOIN hospitalizations h
    ON bc.hospitalization_id = h.hospitalization_id

  -- bring in the QAD-level columns from final_qad
  LEFT JOIN final_qad q
    ON bc.hospitalization_id = q.hospitalization_id
   AND bc.bc_id = q.bc_id
),

organ_nonlab AS (
  SELECT
    b.*,
    v.vasopressor_dttm,
    v.vasopressor_name,
    i.imv_dttm
  FROM base b
  LEFT JOIN vasopressor_df v
    ON b.hospitalization_id = v.hospitalization_id
   AND b.bc_id = v.bc_id
  LEFT JOIN imv_df i
    ON b.hospitalization_id = i.hospitalization_id
   AND b.bc_id = i.bc_id
),

organ_labs AS (
  SELECT
    o.*,

    -- pick the baseline scenario using type_for_baseline 
    CASE WHEN o.type_for_baseline = 'community' THEN ld.aki_dttm_co              ELSE ld.aki_dttm_ho              END AS aki_dttm,
    CASE WHEN o.type_for_baseline = 'community' THEN ld.hyperbili_dttm_co        ELSE ld.hyperbili_dttm_ho        END AS hyperbilirubinemia_dttm,
    CASE WHEN o.type_for_baseline = 'community' THEN ld.thrombo_dttm_co          ELSE ld.thrombo_dttm_ho          END AS thrombocytopenia_dttm,

    -- optional lactate (no baseline)
    ld.lactate_dttm,
    ld.has_esrd
  FROM organ_nonlab o
  LEFT JOIN lab_dysfunction ld
    ON o.hospitalization_id = ld.hospitalization_id
   AND o.bc_id = ld.bc_id
)

SELECT * FROM organ_labs
""")


In [None]:
component_b_query = """
WITH x AS (
  SELECT
    c.*,
    -- Keep day-truncated versions for tie-breaking logic if needed
    DATE_TRUNC('day', blood_culture_dttm) AS blood_culture_day_ts,
    CASE WHEN first_qad_dttm IS NOT NULL THEN DATE_TRUNC('day', first_qad_dttm) END AS first_qad_day_ts,
    CASE WHEN vasopressor_dttm IS NOT NULL THEN DATE_TRUNC('day', vasopressor_dttm) END AS vasopressor_day_ts,
    CASE WHEN imv_dttm IS NOT NULL THEN DATE_TRUNC('day', imv_dttm) END AS imv_day_ts,
    CASE WHEN aki_dttm IS NOT NULL THEN DATE_TRUNC('day', aki_dttm) END AS aki_day_ts,
    CASE WHEN hyperbilirubinemia_dttm IS NOT NULL THEN DATE_TRUNC('day', hyperbilirubinemia_dttm) END AS hyperbili_day_ts,
    CASE WHEN thrombocytopenia_dttm IS NOT NULL THEN DATE_TRUNC('day', thrombocytopenia_dttm) END AS thrombo_day_ts,
    CASE WHEN lactate_dttm IS NOT NULL THEN DATE_TRUNC('day', lactate_dttm) END AS lactate_day_ts
  FROM component_b_inputs c
),

y AS (
  SELECT
    x.*,

    -- organ dysfunction flags
    (
      vasopressor_dttm IS NOT NULL OR imv_dttm IS NOT NULL OR
      aki_dttm IS NOT NULL OR hyperbilirubinemia_dttm IS NOT NULL OR
      thrombocytopenia_dttm IS NOT NULL OR lactate_dttm IS NOT NULL
    ) AS has_organ_dysfunction_w_lactate,

    (
      vasopressor_dttm IS NOT NULL OR imv_dttm IS NOT NULL OR
      aki_dttm IS NOT NULL OR hyperbilirubinemia_dttm IS NOT NULL OR
      thrombocytopenia_dttm IS NOT NULL
    ) AS has_organ_dysfunction_wo_lactate,

    -- sepsis (ASE) flags
    CASE WHEN presumed_infection = 1 AND (
      vasopressor_dttm IS NOT NULL OR imv_dttm IS NOT NULL OR
      aki_dttm IS NOT NULL OR hyperbilirubinemia_dttm IS NOT NULL OR
      thrombocytopenia_dttm IS NOT NULL OR lactate_dttm IS NOT NULL
    ) THEN 1 ELSE 0 END AS sepsis,

    CASE WHEN presumed_infection = 1 AND (
      vasopressor_dttm IS NOT NULL OR imv_dttm IS NOT NULL OR
      aki_dttm IS NOT NULL OR hyperbilirubinemia_dttm IS NOT NULL OR
      thrombocytopenia_dttm IS NOT NULL
    ) THEN 1 ELSE 0 END AS sepsis_wo_lactate,

    -- presumed infection onset (earliest of blood culture + first QAD), only if presumed_infection==1
    CASE
      WHEN presumed_infection = 1 THEN
        NULLIF(
          LEAST(
            COALESCE(blood_culture_dttm, TIMESTAMP '9999-12-31'),  -- Use original with time
            COALESCE(first_qad_dttm,     TIMESTAMP '9999-12-31')   -- This will be midnight 
          ),
          TIMESTAMP '9999-12-31'
        )
      ELSE NULL
    END AS presumed_infection_onset_dttm
  FROM x
),

z AS (
  SELECT
    y.*,
    -- ASE onset (WITH lactate): USE ORIGINAL DATETIME COLUMNS
    NULLIF(
      LEAST(
        COALESCE(blood_culture_dttm, TIMESTAMP '9999-12-31'),     -- Original with time
        COALESCE(first_qad_dttm,     TIMESTAMP '9999-12-31'),     -- This will still be midnight (QAD is day-level)
        COALESCE(vasopressor_dttm,   TIMESTAMP '9999-12-31'),     -- Original with time
        COALESCE(imv_dttm,           TIMESTAMP '9999-12-31'),     -- Original with time
        COALESCE(aki_dttm,           TIMESTAMP '9999-12-31'),     -- Original with time
        COALESCE(hyperbilirubinemia_dttm, TIMESTAMP '9999-12-31'), -- Original with time
        COALESCE(thrombocytopenia_dttm,   TIMESTAMP '9999-12-31'), -- Original with time
        COALESCE(lactate_dttm,       TIMESTAMP '9999-12-31')      -- Original with time
      ),
      TIMESTAMP '9999-12-31'
    ) AS ase_onset_w_lactate_dttm,

    -- ASE onset (WITHOUT lactate): USE ORIGINAL DATETIME COLUMNS
    NULLIF(
      LEAST(
        COALESCE(blood_culture_dttm, TIMESTAMP '9999-12-31'),
        COALESCE(first_qad_dttm,     TIMESTAMP '9999-12-31'),
        COALESCE(vasopressor_dttm,   TIMESTAMP '9999-12-31'),
        COALESCE(imv_dttm,           TIMESTAMP '9999-12-31'),
        COALESCE(aki_dttm,           TIMESTAMP '9999-12-31'),
        COALESCE(hyperbilirubinemia_dttm, TIMESTAMP '9999-12-31'),
        COALESCE(thrombocytopenia_dttm,   TIMESTAMP '9999-12-31')
      ),
      TIMESTAMP '9999-12-31'
    ) AS ase_onset_wo_lactate_dttm
  FROM y
),

w AS (
  SELECT
    z.*,

    -- First criteria: USE TRUNCATED VERSIONS for day-level comparison
    CASE
      WHEN ase_onset_w_lactate_dttm IS NULL THEN NULL
      WHEN DATE_TRUNC('day', blood_culture_dttm) = DATE_TRUNC('day', ase_onset_w_lactate_dttm) THEN 'blood_culture'
      WHEN first_qad_day_ts = DATE_TRUNC('day', ase_onset_w_lactate_dttm) THEN 'first_qad'
      WHEN DATE_TRUNC('day', vasopressor_dttm) = DATE_TRUNC('day', ase_onset_w_lactate_dttm) THEN 'vasopressor'
      WHEN DATE_TRUNC('day', imv_dttm) = DATE_TRUNC('day', ase_onset_w_lactate_dttm) THEN 'imv'
      WHEN DATE_TRUNC('day', aki_dttm) = DATE_TRUNC('day', ase_onset_w_lactate_dttm) THEN 'aki'
      WHEN DATE_TRUNC('day', hyperbilirubinemia_dttm) = DATE_TRUNC('day', ase_onset_w_lactate_dttm) THEN 'hyperbilirubinemia'
      WHEN DATE_TRUNC('day', thrombocytopenia_dttm) = DATE_TRUNC('day', ase_onset_w_lactate_dttm) THEN 'thrombocytopenia'
      WHEN DATE_TRUNC('day', lactate_dttm) = DATE_TRUNC('day', ase_onset_w_lactate_dttm) THEN 'lactate'
      ELSE NULL
    END AS ase_first_criteria_w_lactate,

    -- Repeat for without lactate version
    CASE
      WHEN ase_onset_wo_lactate_dttm IS NULL THEN NULL
      WHEN DATE_TRUNC('day', blood_culture_dttm) = DATE_TRUNC('day', ase_onset_wo_lactate_dttm) THEN 'blood_culture'
      WHEN first_qad_day_ts = DATE_TRUNC('day', ase_onset_wo_lactate_dttm) THEN 'first_qad'
      WHEN DATE_TRUNC('day', vasopressor_dttm) = DATE_TRUNC('day', ase_onset_wo_lactate_dttm) THEN 'vasopressor'
      WHEN DATE_TRUNC('day', imv_dttm) = DATE_TRUNC('day', ase_onset_wo_lactate_dttm) THEN 'imv'
      WHEN DATE_TRUNC('day', aki_dttm) = DATE_TRUNC('day', ase_onset_wo_lactate_dttm) THEN 'aki'
      WHEN DATE_TRUNC('day', hyperbilirubinemia_dttm) = DATE_TRUNC('day', ase_onset_wo_lactate_dttm) THEN 'hyperbilirubinemia'
      WHEN DATE_TRUNC('day', thrombocytopenia_dttm) = DATE_TRUNC('day', ase_onset_wo_lactate_dttm) THEN 'thrombocytopenia'
      ELSE NULL
    END AS ase_first_criteria_wo_lactate
  FROM z
)

SELECT
  hospitalization_id,
  bc_id,

  presumed_infection,
  sepsis,
  sepsis_wo_lactate,

  -- onset + type
  ase_onset_w_lactate_dttm,
  ase_first_criteria_w_lactate,
  ase_onset_wo_lactate_dttm,
  ase_first_criteria_wo_lactate,
  presumed_infection_onset_dttm,

  -- Community vs hospital onset type (based on onset day; admission day=day 1)
  CASE
    WHEN ase_onset_w_lactate_dttm IS NULL OR admission_dttm IS NULL THEN NULL
    WHEN DATEDIFF('day', DATE(admission_dttm), DATE(ase_onset_w_lactate_dttm)) + 1 <= 2 THEN 'community'
    ELSE 'hospital'
  END AS type,

  -- criteria timestamps (for QC)
  blood_culture_dttm,
  first_qad_dttm,
  vasopressor_dttm,
  vasopressor_name,
  imv_dttm,
  aki_dttm,
  hyperbilirubinemia_dttm,
  thrombocytopenia_dttm,
  lactate_dttm,

  -- flags
  has_organ_dysfunction_w_lactate,
  has_organ_dysfunction_wo_lactate,
  has_esrd,

  -- keep Component A QC columns
  anchor_meds_in_window,
  anchor_parenteral_meds_in_window,
  run_meds,
  final_qad_status,
  -- note: the baseline-selection helper used for lab picking
  type_for_baseline

FROM w
ORDER BY hospitalization_id, bc_id
"""
component_b_df = con.execute(component_b_query).fetchdf()
con.register("component_b", component_b_df)


## 5 Compute ASE

In [None]:
# =============================================================================
# 5) Compute ASE (assemble Component A + Component B t)
# =============================================================================

import numpy as np
import pandas as pd

# ---- 5a) Add QAD summary fields (to match ASE.py output) ----
# Map your Component A fields into ASE.py-style names:
#   total_qad      <- qad_days
#   qad_start_date <- qad_run_start
#   qad_end_date   <- qad_run_end
qad_summary = con.execute("""
    SELECT
        hospitalization_id,
        bc_id,
        qad_days     AS total_qad,
        qad_run_start AS qad_start_date,
        qad_run_end   AS qad_end_date
    FROM final_qad
""").fetchdf()

ase_df = component_b_df.merge(
    qad_summary,
    on=["hospitalization_id", "bc_id"],
    how="left",
)

# ---- 5b) Add no_sepsis_reason (ASE.py-style) ----
# Logic: if sepsis==1 => NA
#        else if presumed_infection==0 => "no_presumed_infection"
#        else => "no_organ_dysfunction"
ase_df["no_sepsis_reason"] = pd.NA
ase_df.loc[ase_df["sepsis"] != 1, "no_sepsis_reason"] = np.where(
    ase_df.loc[ase_df["sepsis"] != 1, "presumed_infection"].fillna(0).astype(int) == 0,
    "no_presumed_infection",
    "no_organ_dysfunction",
)

# ---- 5c) Ensure datetime columns are parsed (defensive) ----
_dt_cols = [
    "blood_culture_dttm",
    "first_qad_dttm",
    "presumed_infection_onset_dttm",
    "ase_onset_w_lactate_dttm",
    "ase_onset_wo_lactate_dttm",
    "vasopressor_dttm",
    "imv_dttm",
    "aki_dttm",
    "hyperbilirubinemia_dttm",
    "thrombocytopenia_dttm",
    "lactate_dttm",
]
for c in _dt_cols:
    if c in ase_df.columns:
        ase_df[c] = pd.to_datetime(ase_df[c], errors="coerce")

# ---- 5d) Validation ----
invalid = ase_df[(ase_df["sepsis"] == 1) & (ase_df["presumed_infection"] == 0)]
if len(invalid) > 0:
    raise ValueError(f"Found {len(invalid)} invalid rows: sepsis=1 with presumed_infection=0")

print("ASE (pre-RIT) rows:", len(ase_df))
print("ASE (pre-RIT) sepsis cases:", int(ase_df["sepsis"].fillna(0).sum()))
print("ASE (pre-RIT) presumed infection:", int(ase_df["presumed_infection"].fillna(0).sum()))


## 6. Apply RIT (Repeat Infection Timeframe) Post-Processing

CDC Page 9: For hospital-onset events, apply 14-day repeat infection timeframe to avoid duplicate counting

In [None]:
# =============================================================================
# 6) Apply RIT Post-Processing
#    Toolkit note: RIT is optional and applies to HOSPITAL-onset events.
# =============================================================================

RIT_DAYS = 14
APPLY_RIT = True
RIT_ONLY_HOSPITAL_ONSET = True  # aligns with toolkit guidance

def apply_rit_post_processing(df: pd.DataFrame, rit_days: int = 14, only_hospital_onset: bool = True) -> pd.DataFrame:
    df = df.copy()

    # ALL sepsis episodes are needed to track last_onset
    all_sepsis = df[df["sepsis"] == 1].copy()
    non_sepsis = df[df["sepsis"] != 1].copy()
    non_sepsis["episode_id"] = pd.NA

    if len(all_sepsis) == 0:
        return pd.concat([non_sepsis], ignore_index=True)

    all_sepsis = all_sepsis.sort_values(["hospitalization_id", "ase_onset_w_lactate_dttm", "bc_id"]).copy()

    keep_idx = []
    for hid, g in all_sepsis.groupby("hospitalization_id", sort=False):
        g = g.sort_values(["ase_onset_w_lactate_dttm", "bc_id"])
        last_onset = None
        for idx, row in g.iterrows():
            onset = row["ase_onset_w_lactate_dttm"]
            is_hospital = row["type"] == "hospital"
            
            if pd.isna(onset):
                keep_idx.append(idx)
                continue
            
            # Community-onset: always keep, but update last_onset
            if not is_hospital:
                keep_idx.append(idx)
                last_onset = onset
            # Hospital-onset: apply RIT filter against ANY prior sepsis
            else:
                if last_onset is None or (onset - last_onset).days > rit_days:
                    keep_idx.append(idx)
                    last_onset = onset
                # else: filtered out by RIT

    sepsis_filtered = all_sepsis.loc[keep_idx].copy()

    # Combine back
    combined = pd.concat([sepsis_filtered, non_sepsis], ignore_index=True)

    # Assign episode_id for ALL sepsis==1 rows (after RIT removal)
    combined = combined.sort_values(["hospitalization_id", "ase_onset_w_lactate_dttm", "bc_id"]).reset_index(drop=True)
    combined["episode_id"] = pd.NA
    mask = combined["sepsis"] == 1
    combined.loc[mask, "episode_id"] = (
        combined.loc[mask].groupby("hospitalization_id", sort=False).cumcount() + 1
    )
    combined["episode_id"] = combined["episode_id"].astype("Int64")

    # Put episode_id immediately after bc_id
    cols = list(combined.columns)
    if "episode_id" in cols and "bc_id" in cols:
        cols.remove("episode_id")
        bc_idx = cols.index("bc_id")
        cols.insert(bc_idx + 1, "episode_id")
        combined = combined[cols]

    return combined

if APPLY_RIT:
    ase_df_post_rit = apply_rit_post_processing(
        ase_df,
        rit_days=RIT_DAYS,
        only_hospital_onset=RIT_ONLY_HOSPITAL_ONSET,
    )
else:
    ase_df_post_rit = ase_df.copy()
    ase_df_post_rit["episode_id"] = pd.NA
    ase_df_post_rit["episode_id"] = ase_df_post_rit["episode_id"].astype("Int64")

print("ASE (post-RIT) rows:", len(ase_df_post_rit))
print("ASE (post-RIT) sepsis cases:", int(ase_df_post_rit["sepsis"].fillna(0).sum()))


## 7. Final Output 

In [None]:
# =============================================================================
# 7) Final Output (match ASE.py column set + keep your QC columns)
# =============================================================================

final_columns = [
    "hospitalization_id",
    "bc_id",
    "episode_id",
    "type",
    "presumed_infection",
    "sepsis",
    "sepsis_wo_lactate",
    "no_sepsis_reason",
    "blood_culture_dttm",
    "total_qad",
    "qad_start_date",
    "qad_end_date",
    "first_qad_dttm",
    "presumed_infection_onset_dttm",
    "ase_onset_w_lactate_dttm",
    "ase_first_criteria_w_lactate",
    "ase_onset_wo_lactate_dttm",
    "ase_first_criteria_wo_lactate",
    "vasopressor_dttm",
    "vasopressor_name",
    "imv_dttm",
    "aki_dttm",
    "hyperbilirubinemia_dttm",
    "thrombocytopenia_dttm",
    "lactate_dttm",
    "has_esrd"
]

# Keep the QC strings you added (only if present)
qc_columns = [
    "anchor_meds_in_window",
    "anchor_parenteral_meds_in_window",
    "run_meds",
    "final_qad_status"
]
final_columns_extended = final_columns + [c for c in qc_columns if c in ase_df_post_rit.columns]

# Subset safely (in case a column is absent)
cols_present = [c for c in final_columns_extended if c in ase_df_post_rit.columns]
missing = [c for c in final_columns_extended if c not in ase_df_post_rit.columns]
if missing:
    print("NOTE: missing expected columns (skipping):", missing)

ase_final = ase_df_post_rit[cols_present].copy()

# Register final output back into DuckDB (optional but useful for downstream SQL)
con.register("ase_final", ase_final)

# --- Checkpoints ---
print("\n=== FINAL ASE COUNTS ===")
print("Rows:", len(ase_final))
print("Presumed infection:", int(ase_final["presumed_infection"].fillna(0).sum()))
print("ASE sepsis (w lactate):", int(ase_final["sepsis"].fillna(0).sum()))
print("ASE sepsis (wo lactate):", int(ase_final["sepsis_wo_lactate"].fillna(0).sum()))

# Quick distribution by type (if present)
if "type" in ase_final.columns:
    print("\nType distribution among sepsis==1:")
    print(ase_final.loc[ase_final["sepsis"] == 1, "type"].value_counts(dropna=False))


In [None]:
# =============================================================================
# 8) Add hospitalizations without blood cultures to create full cohort output
# =============================================================================

# Get hospitalization IDs that are in cohort but NOT in ase_final
hosps_with_bc = set(ase_final['hospitalization_id'].unique())
hosps_without_bc = [h for h in hospitalization_ids if h not in hosps_with_bc]

print(f"Hospitalizations with BC: {len(hosps_with_bc):,}")
print(f"Hospitalizations without BC: {len(hosps_without_bc):,}")

# Create temp table for non-BC hospitalizations
no_bc_hosp_df = pd.DataFrame({'hospitalization_id': pd.array(hosps_without_bc, dtype='str')})
con.register("no_bc_hosps", no_bc_hosp_df)

# Query clinical data for non-BC patients
no_bc_clinical = con.execute("""
    -- Vasopressor (first new initiation, excluding OR)
    WITH vaso AS (
        SELECT 
            m.hospitalization_id,
            MIN(m.admin_dttm) AS vasopressor_dttm,
            FIRST(m.med_category ORDER BY m.admin_dttm) AS vasopressor_name
        FROM med_continuous m
        LEFT JOIN adt a ON m.hospitalization_id = a.hospitalization_id
            AND m.admin_dttm >= a.in_dttm AND m.admin_dttm < a.out_dttm
        WHERE m.hospitalization_id IN (SELECT hospitalization_id FROM no_bc_hosps)
          AND LOWER(m.med_group) = 'vasoactives'
          AND m.med_dose > 0
          AND (a.location_category IS NULL OR LOWER(a.location_category) != 'procedural')
        GROUP BY m.hospitalization_id
    ),
    
    -- IMV (first episode)
    imv AS (
        SELECT 
            hospitalization_id,
            MIN(recorded_dttm) AS imv_dttm
        FROM respiratory
        WHERE hospitalization_id IN (SELECT hospitalization_id FROM no_bc_hosps)
          AND LOWER(device_category) = 'imv'
        GROUP BY hospitalization_id
    ),
    
    -- Labs
    labs_agg AS (
        SELECT
            hospitalization_id,
            MIN(CASE WHEN LOWER(lab_category) = 'lactate' 
                     AND COALESCE(lab_value_numeric, TRY_CAST(lab_value AS DOUBLE)) >= 2.0 
                THEN COALESCE(lab_result_dttm, lab_order_dttm) END) AS lactate_dttm,
            MIN(CASE WHEN LOWER(lab_category) = 'platelet_count' 
                     AND COALESCE(lab_value_numeric, TRY_CAST(lab_value AS DOUBLE)) < 100 
                THEN COALESCE(lab_result_dttm, lab_order_dttm) END) AS thrombocytopenia_dttm,
            MIN(CASE WHEN LOWER(lab_category) = 'bilirubin_total' 
                     AND COALESCE(lab_value_numeric, TRY_CAST(lab_value AS DOUBLE)) >= 2.0 
                THEN COALESCE(lab_result_dttm, lab_order_dttm) END) AS hyperbilirubinemia_dttm
        FROM labs
        WHERE hospitalization_id IN (SELECT hospitalization_id FROM no_bc_hosps)
          AND LOWER(lab_category) IN ('lactate', 'platelet_count', 'bilirubin_total')
        GROUP BY hospitalization_id
    ),
    
    -- ESRD
    esrd AS (
        SELECT DISTINCT hospitalization_id, 1 AS esrd
        FROM esrd_patients
        WHERE hospitalization_id IN (SELECT hospitalization_id FROM no_bc_hosps)
    )
    
    SELECT 
        h.hospitalization_id,
        v.vasopressor_dttm,
        v.vasopressor_name,
        i.imv_dttm,
        l.lactate_dttm,
        l.thrombocytopenia_dttm,
        l.hyperbilirubinemia_dttm,
        COALESCE(e.esrd, 0) AS esrd
    FROM no_bc_hosps h
    LEFT JOIN vaso v ON h.hospitalization_id = v.hospitalization_id
    LEFT JOIN imv i ON h.hospitalization_id = i.hospitalization_id
    LEFT JOIN labs_agg l ON h.hospitalization_id = l.hospitalization_id
    LEFT JOIN esrd e ON h.hospitalization_id = e.hospitalization_id
""").fetchdf()

# Create base dataframe for non-BC patients
no_bc_df = no_bc_clinical.copy()
no_bc_df['bc_id'] = pd.NA
no_bc_df['episode_id'] = pd.NA
no_bc_df['type'] = pd.NA
no_bc_df['presumed_infection'] = 0
no_bc_df['sepsis'] = 0
no_bc_df['sepsis_wo_lactate'] = 0
no_bc_df['no_sepsis_reason'] = 'no_blood_culture'

# Add remaining columns as NA
for col in ase_final.columns:
    if col not in no_bc_df.columns:
        no_bc_df[col] = pd.NA

# Ensure same column order
no_bc_df = no_bc_df[ase_final.columns]

# Combine into single output
ase_final = pd.concat([ase_final, no_bc_df], ignore_index=True)
# Standardize NA values in no_sepsis_reason
ase_final['no_sepsis_reason'] = ase_final['no_sepsis_reason'].replace({np.nan: pd.NA})

# Update DuckDB registration
con.register("ase_final", ase_final)

print(f"\n=== FULL COHORT ASE OUTPUT ===")
print(f"Total rows: {len(ase_final):,}")
print(f"Unique hospitalizations: {ase_final['hospitalization_id'].nunique():,}")
print(f"\nHospitalizations with sepsis: {ase_final[ase_final['sepsis']==1]['hospitalization_id'].nunique():,} ({ase_final[ase_final['sepsis']==1]['hospitalization_id'].nunique()/ase_final['hospitalization_id'].nunique()*100:.1f}%)")
print(f"\nno_sepsis_reason breakdown:")
print(ase_final['no_sepsis_reason'].value_counts(dropna=False))

# Clinical data summary for non-BC patients
print(f"\n=== Non-BC patients clinical data ===")
non_bc = ase_final[ase_final['no_sepsis_reason'] == 'no_blood_culture']
print(f"Had vasopressor: {non_bc['vasopressor_dttm'].notna().sum():,} ({non_bc['vasopressor_dttm'].notna().mean()*100:.1f}%)")
print(f"Had IMV: {non_bc['imv_dttm'].notna().sum():,} ({non_bc['imv_dttm'].notna().mean()*100:.1f}%)")
print(f"Had elevated lactate: {non_bc['lactate_dttm'].notna().sum():,} ({non_bc['lactate_dttm'].notna().mean()*100:.1f}%)")

## 9. Clean Up

In [None]:
# Close DuckDB connection
con.close()
print("Database connection closed.")
print("\n=== CDC ASE IMPLEMENTATION COMPLETE ===")

# Test Production

In [None]:
import sys
sys.path.append('.')  # Ensure current directory is in path
import importlib
import ase_production
importlib.reload(ase_production)
from ase_production import compute_ase

# Test 1: Run production implementation with same hospitalizations
print("\n1. Running production ASE calculation...")
try:
    # Use the same hospitalization IDs from the notebook
    ase_production_results = compute_ase(
        hospitalization_ids=hospitalization_ids,  # Already defined in notebook
        config_path='../config/config.json',
        apply_rit=True,
        rit_only_hospital_onset=True,
        include_lactate=True,  # Match notebook setting
        verbose=True
    )
    print(f"\nProduction results shape: {ase_production_results.shape}")
    print(f"Production ASE events: {ase_production_results['sepsis'].sum()}")
except Exception as e:
    print(f"Error running production code: {e}")
    import traceback
    traceback.print_exc()

In [None]:
# =============================================================================
# DETAILED COMPARISON: ase_production_results vs ase_final
# Focus on onset times and types
# =============================================================================

# Load production results (adjust path as needed)
ase_prod = ase_production_results.copy()  # or however you saved it
ase_dev = ase_final.copy()

# Ensure consistent types for merging
ase_prod['hospitalization_id'] = ase_prod['hospitalization_id'].astype(str)
ase_dev['hospitalization_id'] = ase_dev['hospitalization_id'].astype(str)
ase_prod['bc_id'] = ase_prod['bc_id'].astype(str)
ase_dev['bc_id'] = ase_dev['bc_id'].astype(str)

# Create merge keys
ase_prod['_key'] = ase_prod['hospitalization_id'] + '_' + ase_prod['bc_id']
ase_dev['_key'] = ase_dev['hospitalization_id'] + '_' + ase_dev['bc_id']

print("="*70)
print("BASIC SHAPE COMPARISON")
print("="*70)
print(f"Production rows: {len(ase_prod):,}")
print(f"Notebook rows:   {len(ase_dev):,}")

# Row-level comparison
prod_keys = set(ase_prod['_key'])
dev_keys = set(ase_dev['_key'])
only_in_prod = prod_keys - dev_keys
only_in_dev = dev_keys - prod_keys
in_both = prod_keys & dev_keys

print(f"\nRows only in production: {len(only_in_prod):,}")
print(f"Rows only in notebook:   {len(only_in_dev):,}")
print(f"Rows in both:            {len(in_both):,}")

print("\n" + "="*70)
print("KEY METRICS COMPARISON")
print("="*70)
metrics = ['sepsis', 'sepsis_wo_lactate', 'presumed_infection']
print(f"{'Metric':<40} {'Production':>15} {'Notebook':>15} {'Match':>10}")
print("-"*80)
for m in metrics:
    p_val = ase_prod[m].sum()
    d_val = ase_dev[m].sum()
    match = "✓" if p_val == d_val else "✗"
    print(f"{m:<40} {p_val:>15,} {d_val:>15,} {match:>10}")

print("\n" + "="*70)
print("TYPE DISTRIBUTION COMPARISON")
print("="*70)
print("\nProduction 'type' distribution:")
print(ase_prod['type'].value_counts(dropna=False))
print("\nNotebook 'type' distribution:")
print(ase_dev['type'].value_counts(dropna=False))

print("\n" + "="*70)
print("NO_SEPSIS_REASON COMPARISON")
print("="*70)
print("\nProduction:")
print(ase_prod['no_sepsis_reason'].value_counts(dropna=False))
print("\nNotebook:")
print(ase_dev['no_sepsis_reason'].value_counts(dropna=False))

print("\n" + "="*70)
print("COLUMN-BY-COLUMN VALUE COMPARISON (matching rows only)")
print("="*70)

# Merge for comparison
merged = ase_prod.merge(
    ase_dev,
    on='_key',
    suffixes=('_prod', '_dev'),
    how='inner'
)
print(f"Matched rows: {len(merged):,}")

# Compare all columns
compare_cols = [
    'sepsis', 'sepsis_wo_lactate', 'presumed_infection', 'type',
    'total_qad', 'no_sepsis_reason',
    'ase_onset_w_lactate_dttm', 'ase_onset_wo_lactate_dttm',
    'ase_first_criteria_w_lactate', 'ase_first_criteria_wo_lactate',
    'presumed_infection_onset_dttm', 'blood_culture_dttm',
    'vasopressor_dttm', 'imv_dttm', 'aki_dttm', 
    'hyperbilirubinemia_dttm', 'thrombocytopenia_dttm', 'lactate_dttm'
]

mismatch_summary = []
for col in compare_cols:
    prod_col = f"{col}_prod"
    dev_col = f"{col}_dev"
    
    if prod_col not in merged.columns or dev_col not in merged.columns:
        print(f"{col}: COLUMN MISSING")
        continue
    
    # Convert datetime columns to string for comparison
    if 'dttm' in col or 'date' in col.lower():
        merged[prod_col] = pd.to_datetime(merged[prod_col], errors='coerce').astype(str)
        merged[dev_col] = pd.to_datetime(merged[dev_col], errors='coerce').astype(str)
    
    # Handle NaN comparisons
    prod_vals = merged[prod_col].fillna('_NA_').astype(str)
    dev_vals = merged[dev_col].fillna('_NA_').astype(str)
    
    mismatches = merged[prod_vals != dev_vals]
    n_mismatch = len(mismatches)
    pct = n_mismatch / len(merged) * 100
    
    status = "✓" if n_mismatch == 0 else f"✗ {n_mismatch:,} ({pct:.2f}%)"
    print(f"{col:<40} {status}")
    
    if n_mismatch > 0:
        mismatch_summary.append({
            'column': col,
            'n_mismatches': n_mismatch,
            'pct': pct,
            'sample_keys': mismatches['_key'].head(5).tolist()
        })

print("\n" + "="*70)
print("DETAILED MISMATCH ANALYSIS")
print("="*70)

# Focus on key columns with mismatches
key_cols = ['type', 'ase_onset_w_lactate_dttm', 'ase_first_criteria_w_lactate', 
            'sepsis', 'presumed_infection']

for col in key_cols:
    prod_col = f"{col}_prod"
    dev_col = f"{col}_dev"
    
    if prod_col not in merged.columns:
        continue
        
    prod_vals = merged[prod_col].fillna('_NA_').astype(str)
    dev_vals = merged[dev_col].fillna('_NA_').astype(str)
    mismatches = merged[prod_vals != dev_vals]
    
    if len(mismatches) > 0:
        print(f"\n--- {col} mismatches ({len(mismatches):,}) ---")
        
        # Show cross-tabulation of values
        print("\nValue mapping (prod -> dev):")
        print("skipped because too long...")
        # cross = pd.crosstab(
        #     mismatches[prod_col].fillna('_NA_'), 
        #     mismatches[dev_col].fillna('_NA_'),
        #     margins=True
        # )
        # print(cross)
        
        # Show sample rows
        print(f"\nSample mismatched rows:")
        sample_cols = ['_key', prod_col, dev_col]
        if 'sepsis_prod' in merged.columns:
            sample_cols.extend(['sepsis_prod', 'sepsis_dev'])
        print(mismatches[sample_cols].head(2))

print("\n" + "="*70)
print("SAMPLE PATIENT DEEP DIVE")
print("="*70)

# Pick a patient with mismatches
if len(mismatch_summary) > 0:
    # Find a key with type or onset mismatch
    type_mismatches = merged[
        merged['type_prod'].fillna('_NA_').astype(str) != 
        merged['type_dev'].fillna('_NA_').astype(str)
    ]
    
    if len(type_mismatches) > 0:
        sample_key = type_mismatches['_key'].iloc[0]
        sample_hid = sample_key.split('_')[0]
        
        print(f"\nSample hospitalization with 'type' mismatch: {sample_hid}")
        
        print("\n--- Production ---")
        prod_sample = ase_prod[ase_prod['hospitalization_id'] == sample_hid][
            ['hospitalization_id', 'bc_id', 'type', 'sepsis', 'presumed_infection',
             'blood_culture_dttm', 'ase_onset_w_lactate_dttm', 'ase_first_criteria_w_lactate']
        ]
        print(prod_sample.to_string())
        
        print("\n--- Notebook ---")
        dev_sample = ase_dev[ase_dev['hospitalization_id'] == sample_hid][
            ['hospitalization_id', 'bc_id', 'type', 'sepsis', 'presumed_infection',
             'blood_culture_dttm', 'ase_onset_w_lactate_dttm', 'ase_first_criteria_w_lactate']
        ]
        print(dev_sample.to_string())

# Cleanup
ase_prod.drop('_key', axis=1, inplace=True)
ase_dev.drop('_key', axis=1, inplace=True)

print("\n" + "="*70)
print("COMPARISON COMPLETE")
print("="*70)

In [None]:
# Count unique hospitalization IDs with sepsis- PRODUCTION
unique_hosp_sepsis = ase_production_results.loc[ase_production_results['sepsis'] == 1, 'hospitalization_id'].nunique()

# Count total unique hospitalization IDs
total_hosp = ase_production_results['hospitalization_id'].nunique()

# Calculate percentage
sepsis_percentage = (unique_hosp_sepsis / total_hosp) * 100

# Display results
print(f"Unique hospitalizations with sepsis: {unique_hosp_sepsis:,}")
print(f"Total unique hospitalizations: {total_hosp:,}")
print(f"Percentage with sepsis: {sepsis_percentage:.2f}%")

In [None]:
# Count unique hospitalization IDs with sepsis- DEVELOPMENT
unique_hosp_sepsis = ase_final.loc[ase_final['sepsis'] == 1, 'hospitalization_id'].nunique()

# Count total unique hospitalization IDs
total_hosp = ase_final['hospitalization_id'].nunique()

# Calculate percentage
sepsis_percentage = (unique_hosp_sepsis / total_hosp) * 100

# Display results
print(f"Unique hospitalizations with sepsis: {unique_hosp_sepsis:,}")
print(f"Total unique hospitalizations: {total_hosp:,}")
print(f"Percentage with sepsis: {sepsis_percentage:.2f}%")