# Init

In [1]:
import os
os.chdir('..')
os.getcwd()

'/Users/wliao0504/code/clif/CLIF-epi-of-sedation'

## Import

In [2]:
from clifpy import ClifOrchestrator
import pandas as pd
import duckdb
from pathlib import Path

In [3]:
site = 'mimic'

In [4]:
q = """
SET TimeZone='US/Eastern'
"""
duckdb.sql(q)

## Utils

In [5]:
def run_query_from_file(sql_file_path: str) -> pd.DataFrame:
    """
    Loads a query from a .sql file and executes it using the given DuckDB connection.

    Args:
        con: An active DuckDB connection.
        sql_file_path: The absolute path to the .sql file.

    Returns:
        A pandas DataFrame with the results of the query.
    """
    print(f"--- Loading and executing query from {sql_file_path} ---")
    
    # Read the entire content of the .sql file
    query = Path(sql_file_path).read_text()

    # Execute the query and return as a pandas DataFrame
    result_df = duckdb.sql(query).df()

    print("Query executed successfully.")
    return result_df

# Proprocess

In [6]:
cohort_hosp_ids_df = pd.read_csv('data/cohort_hosp_ids.csv')
cohort_hosp_ids = cohort_hosp_ids_df['hospitalization_id'].astype(str).tolist()

## Resp

In [7]:
co = ClifOrchestrator(config_path="config/mimic_config.json")
co.initialize(tables = ['respiratory_support'])

📢 ClifOrchestrator initialized
📢 Initialized respiratory_support table
📢 Data directory: /Users/wliao0504/code/clif/CLIF-MIMIC/output/rclif-dev-test
📢 File type: parquet
📢 Timezone: US/Eastern
📢 Output directory: /Users/wliao0504/code/clif/CLIF-epi-of-sedation/output
📢 Loaded schema from /Users/wliao0504/code/clif/CLIF-epi-of-sedation/.venv/lib/python3.12/site-packages/clifpy/schemas/respiratory_support_schema.yaml
📢 Loaded outlier configuration


In [8]:
# processed_bf = co.respiratory_support.waterfall(bfill = True)

In [9]:
# processed_bf.df.to_parquet(f"output/intermediate/{site}_resp_processed_bf.parquet")

In [10]:
# resp_p = processed_bf.df
resp_p = pd.read_parquet(f"output/intermediate/{site}_resp_processed_bf.parquet")

In [11]:
q = f"""
SELECT hospitalization_id
    , recorded_dttm
    , device_category, device_name
    , mode_category, mode_name
    , fio2_set
    , peep_set
    , tracheostomy
    , _mode_prev: LAG(mode_category) OVER (PARTITION BY hospitalization_id ORDER BY recorded_dttm)
    , _switch_to_ps_cpap: CASE 
        -- last mode is a control mode
        WHEN contains(_mode_prev, 'control') 
            AND mode_category in ('pressure support/cpap') 
            THEN 1 ELSE 0 END
    , 
FROM resp_p
INNER JOIN cohort_hosp_ids_df USING (hospitalization_id)
WHERE hospitalization_id in ('20001361', '20004088', '20005024')
"""
resp_view_0 = duckdb.sql(q).df()

In [12]:
q = f"""
WITH base AS (
    FROM resp_p
    INNER JOIN cohort_hosp_ids_df USING (hospitalization_id)
    SELECT hospitalization_id
         , recorded_dttm
         , device_category, device_name
         , mode_category, mode_name
         , mode_cat_id
         , fio2_set
         , peep_set
         , pressure_support_set
         , tracheostomy
         , _mode_prev: LAG(mode_category) OVER w_mode
         , _switch_to_ps_cpap: CASE
             WHEN contains(_mode_prev, 'control')
              AND mode_category IN ('pressure support/cpap')
             THEN 1 ELSE 0 END
    WHERE hospitalization_id IN ('20001361', '20004088', '20005024')
    WINDOW w_mode AS (PARTITION BY hospitalization_id ORDER BY recorded_dttm)
),
-- segment attributes keyed by existing mode_cat_id
seg_bounds AS (
    FROM base
    SELECT hospitalization_id
         , _seg_id: mode_cat_id 
         , _seg_start: MIN(recorded_dttm)
         , _seg_last_seen: MAX(recorded_dttm)
         , _seg_mode: ANY_VALUE(mode_category)
    GROUP BY hospitalization_id, _seg_id
),
-- compute segment end and previous segment mode
seg_timing AS (
    FROM seg_bounds
    SELECT hospitalization_id
         , _seg_id
         , _seg_mode
         , _seg_start
         , _seg_last_seen -- timing of the last row for this segment
         , _next_seg_start: LEAD(_seg_start) OVER w_seg -- timing of the first row of the next segment
         , _seg_end: COALESCE(_next_seg_start, _seg_last_seen)
         , _prev_seg_mode: LAG(_seg_mode) OVER w_seg
    WINDOW w_seg AS (PARTITION BY hospitalization_id ORDER BY _seg_start)
),
seg_enriched AS (
    FROM seg_timing
    SELECT hospitalization_id
         , _seg_id
         , _seg_mode
         , _seg_start
         , _seg_end
         , _prev_seg_mode
         , _seg_duration_min: date_diff('minute', _seg_start, _seg_end)
         , _is_ps_cpap_after_control: CASE
             WHEN _seg_mode = 'pressure support/cpap'
              AND contains(_prev_seg_mode, 'control')
             THEN 1 ELSE 0 END
         , _persists_30min: CASE
             WHEN _seg_mode = 'pressure support/cpap'
              AND contains(_prev_seg_mode, 'control')
              AND date_diff('minute', _seg_start, _seg_end) >= 30
             THEN 1 ELSE 0 END
),
final AS (
    FROM base AS s
    LEFT JOIN seg_enriched AS se
      ON se.hospitalization_id = s.hospitalization_id
     AND se._seg_id           = s.mode_cat_id
    SELECT s.hospitalization_id
         , s.recorded_dttm
         , s.device_category, s.device_name
         , s.mode_category, s.mode_name
         , s.fio2_set
         , s.peep_set
         , s.pressure_support_set
         , s.tracheostomy
         , s._switch_to_ps_cpap
         , se._is_ps_cpap_after_control
         , se._persists_30min
         , _sbt_done: CASE 
             WHEN se._persists_30min = 1 
              AND s.peep_set <= 8
              AND s.pressure_support_set <= 8
             THEN 1 ELSE 0 END
)
SELECT *
FROM seg_enriched
ORDER BY hospitalization_id, _seg_id
"""
seg_view = duckdb.sql(q).df()

In [13]:
sbt_blocks_df = run_query_from_file('code/sbt.sql')

--- Loading and executing query from code/sbt.sql ---
Query executed successfully.


In [65]:
resp_view = run_query_from_file('code/sbt.sql')

--- Loading and executing query from code/sbt.sql ---
Query executed successfully.


## ADT

In [66]:
adt = co.load_table(
    'adt',
    columns = ['hospitalization_id', 'in_dttm', 'out_dttm', 'location_name', 'location_category'],
    filters = {
        'hospitalization_id': cohort_hosp_ids
    }
)
adt_df = adt.df

📢 Initialized adt table
📢 Data directory: /Users/wliao0504/code/clif/CLIF-MIMIC/output/rclif-dev-test
📢 File type: parquet
📢 Timezone: US/Eastern
📢 Output directory: /Users/wliao0504/code/clif/CLIF-epi-of-sedation/output
📢 Loaded schema from /Users/wliao0504/code/clif/CLIF-epi-of-sedation/.venv/lib/python3.12/site-packages/clifpy/schemas/adt_schema.yaml
📢 Loaded outlier configuration


## Vitals

In [67]:
vitals = co.load_table(
    'vitals', 
    columns = ['hospitalization_id', 'recorded_dttm', 'vital_category', 'vital_value'],
    filters = {
        'vital_category': ['spo2', 'weight_kg', 'respiratory_rate', 'heart_rate'],
        'hospitalization_id': cohort_hosp_ids
    }
    )
vitals_df = vitals.df

📢 Initialized vitals table
📢 Data directory: /Users/wliao0504/code/clif/CLIF-MIMIC/output/rclif-dev-test
📢 File type: parquet
📢 Timezone: US/Eastern
📢 Output directory: /Users/wliao0504/code/clif/CLIF-epi-of-sedation/output
📢 Loaded schema from /Users/wliao0504/code/clif/CLIF-epi-of-sedation/.venv/lib/python3.12/site-packages/clifpy/schemas/vitals_schema.yaml
📢 Loaded outlier configuration


In [68]:
q = """
WITH w AS (
PIVOT_WIDER vitals_df
ON vital_category
USING MAX(vital_value)
)
SELECT *
    , _tachy: CASE WHEN heart_rate > 130 THEN 1 ELSE 0 END
    , _brady: CASE WHEN heart_rate < 60 THEN 1 ELSE 0 END
    , _rr_high: CASE WHEN respiratory_rate > 35 THEN 1 ELSE 0 END
    , _rr_low: CASE WHEN respiratory_rate < 8 THEN 1 ELSE 0 END
FROM w
ORDER BY hospitalization_id, recorded_dttm
"""
vitals_w = duckdb.sql(q).df()
vitals_w.head()

Unnamed: 0,hospitalization_id,recorded_dttm,heart_rate,respiratory_rate,spo2,weight_kg,_tachy,_brady,_rr_high,_rr_low
0,20001305,2178-03-25 02:59:00-05:00,,,,44.0,0,0,0,0
1,20001305,2178-03-25 05:32:00-05:00,76.0,18.0,,,0,0,0,0
2,20001305,2178-03-25 05:33:00-05:00,,,,43.9,0,0,0,0
3,20001305,2178-03-25 05:49:00-05:00,,,100.0,,0,0,0,0
4,20001305,2178-03-25 06:00:00-05:00,73.0,25.0,100.0,,0,0,0,0


## PA

In [69]:
co.load_table(
    'patient_assessments',
    columns = ['hospitalization_id', 'recorded_dttm', 'assessment_category', 'numerical_value'],
    filters = {
        'assessment_category': ['gcs_total'],
        'hospitalization_id': cohort_hosp_ids
    }
)

pa_df = co.patient_assessments.df

📢 Initialized patient_assessments table
📢 Data directory: /Users/wliao0504/code/clif/CLIF-MIMIC/output/rclif-dev-test
📢 File type: parquet
📢 Timezone: US/Eastern
📢 Output directory: /Users/wliao0504/code/clif/CLIF-epi-of-sedation/output
📢 Loaded schema from /Users/wliao0504/code/clif/CLIF-epi-of-sedation/.venv/lib/python3.12/site-packages/clifpy/schemas/patient_assessments_schema.yaml
📢 Loaded outlier configuration


In [70]:
q = """
PIVOT_WIDER pa_df
ON assessment_category
USING MAX(numerical_value)
"""
pa_w = duckdb.sql(q).df()
pa_w.head()

Unnamed: 0,hospitalization_id,recorded_dttm,gcs_total
0,28672136,2165-04-28 16:00:00-05:00,15.0
1,28907618,2152-09-01 20:00:00-05:00,10.0
2,28907618,2152-09-08 06:00:00-05:00,10.0
3,29010365,2130-04-23 08:00:00-05:00,15.0
4,23386436,2155-09-11 04:13:00-05:00,12.0


## Meds

In [71]:
mac = co.load_table(
    'medication_admin_continuous',
    columns = ['hospitalization_id', 'admin_dttm', 'med_name', 'med_category', 'med_dose', 'med_dose_unit'],
    filters = {
        'med_group': ['vasoactives'], 
        'hospitalization_id': cohort_hosp_ids
    }
    )

📢 Initialized medication_admin_continuous table
📢 Data directory: /Users/wliao0504/code/clif/CLIF-MIMIC/output/rclif-dev-test
📢 File type: parquet
📢 Timezone: US/Eastern
📢 Output directory: /Users/wliao0504/code/clif/CLIF-epi-of-sedation/output
📢 Loaded schema from /Users/wliao0504/code/clif/CLIF-epi-of-sedation/.venv/lib/python3.12/site-packages/clifpy/schemas/medication_admin_continuous_schema.yaml
📢 Loaded outlier configuration


In [72]:
from clifpy.utils.unit_converter import convert_dose_units_by_med_category

preferred_units = {
    'dopamine': 'mcg/kg/min',
    'dobutamine': 'mcg/kg/min',
    'norepinephrine': 'mcg/kg/min',
    'epinephrine': 'mcg/kg/min',
    'phenylephrine': 'mcg/kg/min',
    'angiotensin': 'mcg/kg/min',
    'vasopressin': 'u/min',
    'milrinone': 'mcg/kg/min'
    }

mac_converted, mac_summary = convert_dose_units_by_med_category(
    mac.df,
    vitals_df = vitals_df,
    preferred_units = preferred_units
)

No weight_kg column found, adding the most recent from vitals


In [73]:
q = """
WITH t1 AS (
    SELECT hospitalization_id
        , admin_dttm
        , med_category_unit: med_category || '_' || REPLACE(med_dose_unit_converted, '/', '_')
        , med_dose_converted
    FROM mac_converted
)
, t2 AS (
    PIVOT_WIDER t1
    ON med_category_unit
    USING FIRST(med_dose_converted)
)
SELECT *
FROM t2
ORDER BY hospitalization_id, admin_dttm
"""
mac_w = duckdb.sql(q).df()

## Wide

In [None]:
# co.create_wide_dataset(
#     tables_to_load = ['vitals', 'patient_assessments'],
#     batch_size = -1
# )
# wide = co.wide_df

📢 🚀 WIDE DATASET CREATION STARTED
📢 Phase 1: Initialization
📢 Phase 2: Encounter Processing
📢 Phase 3: Table Loading
📢   3.1: Auto-loading base tables
📢   3.2: Loading optional tables: ['vitals', 'patient_assessments']
📢 Phase 4: Calling Wide Dataset Utility
📢 Phase 4: Wide Dataset Processing (utility function)
⚠️ optional_tables parameter is deprecated. Converting to category_filters format
📢 Processing all 546028 hospitalizations
📢 Loading and filtering base tables
📢        - Base tables filtered - Hospitalization: 546028, Patient: 364627, ADT: 57335
📢   4.2: Determining processing mode
📢        - Single mode: Processing all 546028 hospitalizations at once
📢   4.S: === SINGLE PROCESSING MODE ===
📢            - Base cohort created with 546028 records
📢     4.S.3: Processing tables
📢            - Processing vitals
📢 Loaded 8478277 records from vitals
📢            === PIVOTING VITALS ===
📢 Pivoted vitals: 3085653 combo_ids with 4 category columns
📢            - Processing patient_assess

# Join

## Timestamps

In [75]:
q = """
SELECT hospitalization_id, recorded_dttm AS event_dttm FROM resp_view
UNION
SELECT hospitalization_id, recorded_dttm AS event_dttm FROM vitals_w
UNION
SELECT hospitalization_id, admin_dttm AS event_dttm FROM mac_w
UNION
SELECT hospitalization_id, recorded_dttm AS event_dttm FROM pa_w
--UNION
--SELECT hospitalization_id, in_dttm AS event_dttm FROM adt_df
--UNION
--SELECT hospitalization_id, out_dttm AS event_dttm FROM adt_df
"""
all_timestamps = duckdb.sql(q).df()

all_timestamps['_dh'] = all_timestamps['event_dttm'].dt.floor('h')
all_timestamps['_hr'] = all_timestamps['event_dttm'].dt.hour

In [76]:
q = """
WITH day_starts AS (
    FROM all_timestamps
    SELECT *
        , _shift: CASE WHEN _hr >= 7 AND _hr < 19 THEN 'day' ELSE 'night' END
        , _is_day_start: CASE
            WHEN _hr = 7 AND COALESCE(LAG(_hr) OVER w, -1) != 7 THEN 1
            ELSE 0 END
    WINDOW w AS (PARTITION BY hospitalization_id ORDER BY event_dttm)       
)
FROM day_starts
INNER JOIN cohort_hosp_ids_df USING (hospitalization_id)
SELECT *
    , _nth_day: SUM(_is_day_start) OVER w
    , _day_shift_id: 'day' || _nth_day::INT::TEXT || '_' || _shift
WINDOW w AS (PARTITION BY hospitalization_id ORDER BY event_dttm)       
ORDER BY hospitalization_id, event_dttm
"""
timestamps_w_ids = duckdb.sql(q).df()

In [77]:
q = """
WITH t3 AS (
    FROM all_timestamps t
    LEFT JOIN mac_w m ON
        t.hospitalization_id = m.hospitalization_id
        AND t.event_dttm = m.admin_dttm
    SELECT t.hospitalization_id, t.event_dttm
        , LAST_VALUE(COLUMNS('_min') IGNORE NULLS) OVER (
            PARTITION BY t.hospitalization_id ORDER BY event_dttm
        )
), t4 AS (
    SELECT hospitalization_id, event_dttm
        , COALESCE(COLUMNS('_min'), 0)
    FROM t3
), t5 AS (
    SELECT *
        -- ref: https://doi.org/10.1016/j.jcrc.2020.11.002
        , _nee: norepinephrine_mcg_kg_min 
            + epinephrine_mcg_kg_min 
            + phenylephrine_mcg_kg_min / 10.0 
            + dopamine_mcg_kg_min / 100.0 
            + vasopressin_u_min * 2.5 
            + angiotensin_mcg_kg_min * 10
        , _hemo_stable_by_nee: CASE WHEN _nee <= 0.2 THEN 1 ELSE 0 END
        -- to cover the two vasos not in the formula: milrinone and dobutamine
        , _hemo_stable_by_abc: CASE WHEN dobutamine_mcg_kg_min < 0.5
            AND milrinone_mcg_kg_min = 0 THEN 1 ELSE 0 END
        , _hemo_stable: CASE WHEN _hemo_stable_by_nee AND _hemo_stable_by_abc THEN 1 ELSE 0 END
    FROM t4
)
SELECT *
FROM t5
ORDER BY hospitalization_id, event_dttm
"""
# a = augmented
mac_wa = duckdb.sql(q).df()

In [None]:
q = """
WITH t1 AS (
FROM timestamps_w_ids t
LEFT JOIN resp_view r ON
    t.hospitalization_id = r.hospitalization_id
    AND t.event_dttm = r.recorded_dttm
LEFT JOIN vitals_w v ON
    t.hospitalization_id = v.hospitalization_id
    AND t.event_dttm = v.recorded_dttm
    AND v.spo2 IS NOT NULL
LEFT JOIN mac_wa m ON
    t.hospitalization_id = m.hospitalization_id
    AND t.event_dttm = m.event_dttm
LEFT JOIN pa_w p ON
    t.hospitalization_id = p.hospitalization_id
    AND t.event_dttm = p.recorded_dttm
SELECT _hospitalization_id: t.hospitalization_id
    , _time: t.event_dttm
    , _nth_day
    , _day_shift_id
    --, _nth_hr: ROW_NUMBER() OVER (PARTITION BY _hospitalization_id ORDER BY _time)
    , device_name, device_category
    , mode_name, mode_category
    , fio2_set
    , peep_set
    , pressure_support_set
    , tracheostomy
    , gcs_total
    , spo2
    , _spo2: LAST_VALUE(v.spo2 IGNORE NULLS) OVER (PARTITION BY _hospitalization_id ORDER BY _time)
    , _resp_stable: CASE
        WHEN fio2_set <= 0.5
            AND peep_set <= 8
            AND _spo2 >= 88
        THEN 1 ELSE 0 END
    , m.* 
    , _stable: CASE WHEN _resp_stable AND _hemo_stable THEN 1 ELSE 0 END
    , sbt_eligible: CASE WHEN device_category = 'imv' AND _stable = 1 AND tracheostomy = 0 
        THEN 1 ELSE 0 END
    , sbt_done
    , extub
)
SELECT *
FROM t1
ORDER BY _hospitalization_id, _time
"""
full_view = duckdb.sql(q).df()

In [79]:
q = """
SELECT _hospitalization_id
    , _time
    , sbt_eligible
    , sbt_done
    , extub
    , device_category, device_name
    , mode_category, mode_name
    , fio2_set
    , peep_set
    , pressure_support_set
    , _spo2
FROM full_view
--WHERE hospitalization_id in ('20001361', '20004088', '20005024')
"""
check_view = duckdb.sql(q).df()

In [80]:
q = """
SELECT _hospitalization_id
    , _nth_day
    , sbt_eligible: COALESCE(MAX(sbt_eligible), 0)
    , sbt_done: COALESCE(MAX(sbt_done), 0)
    , extub: COALESCE(MAX(extub), 0)
FROM full_view
--WHERE hospitalization_id in ('20001361', '20004088', '20005024')
GROUP BY _hospitalization_id, _nth_day
ORDER BY _hospitalization_id, _nth_day
"""
agg_view = duckdb.sql(q).df()