# Table of Contents
1. Create icustay_detail.parquet
2. Create gcs_first_day.parquet
   * Create chartevents_part1.parquet
   * Create gcs_base.parquet
3. Create vitals_first_day.parquet
4. Create urine_output_first_day.parquet
5. Create ventilation_classification.parquet
6. Create ventilation_durations.parquet
7. Create ventilation_first_day.parquet
8.  Create oasis.parquet
9.  Create vasopressors_duration.parquet
10. Create vent_df.parquet
11. Create vaso_df.parquet
12. Create discharge.parquet
13. Create demographics.parquet
14. Create oasis_df.parquet

In [1]:
import polars as pl
import duckdb
import sys, gc

DATA_LOCATION = 'Mimic3_Data'


In [None]:
icustay_detail_query = f'''SELECT ie.subject_id, ie.hadm_id, ie.icustay_id

-- patient level factors
, pat.gender, pat.dod

-- hospital level factors
, adm.admittime, adm.dischtime
, DATE_DIFF('DAY', adm.dischtime, adm.admittime) as los_hospital
, DATE_DIFF('YEAR', ie.intime, pat.dob) as admission_age
, adm.ethnicity
, case when ethnicity in
  (
       'WHITE' --  40996
     , 'WHITE - RUSSIAN' --    164
     , 'WHITE - OTHER EUROPEAN' --     81
     , 'WHITE - BRAZILIAN' --     59
     , 'WHITE - EASTERN EUROPEAN' --     25
  ) then 'white'
  when ethnicity in
  (
      'BLACK/AFRICAN AMERICAN' --   5440
    , 'BLACK/CAPE VERDEAN' --    200
    , 'BLACK/HAITIAN' --    101
    , 'BLACK/AFRICAN' --     44
    , 'CARIBBEAN ISLAND' --      9
  ) then 'black'
  when ethnicity in
    (
      'HISPANIC OR LATINO' --   1696
    , 'HISPANIC/LATINO - PUERTO RICAN' --    232
    , 'HISPANIC/LATINO - DOMINICAN' --     78
    , 'HISPANIC/LATINO - GUATEMALAN' --     40
    , 'HISPANIC/LATINO - CUBAN' --     24
    , 'HISPANIC/LATINO - SALVADORAN' --     19
    , 'HISPANIC/LATINO - CENTRAL AMERICAN (OTHER)' --     13
    , 'HISPANIC/LATINO - MEXICAN' --     13
    , 'HISPANIC/LATINO - COLOMBIAN' --      9
    , 'HISPANIC/LATINO - HONDURAN' --      4
  ) then 'hispanic'
  when ethnicity in
  (
      'ASIAN' --   1509
    , 'ASIAN - CHINESE' --    277
    , 'ASIAN - ASIAN INDIAN' --     85
    , 'ASIAN - VIETNAMESE' --     53
    , 'ASIAN - FILIPINO' --     25
    , 'ASIAN - CAMBODIAN' --     17
    , 'ASIAN - OTHER' --     17
    , 'ASIAN - KOREAN' --     13
    , 'ASIAN - JAPANESE' --      7
    , 'ASIAN - THAI' --      4
  ) then 'asian'
  when ethnicity in
  (
       'AMERICAN INDIAN/ALASKA NATIVE' --     51
     , 'AMERICAN INDIAN/ALASKA NATIVE FEDERALLY RECOGNIZED TRIBE' --      3
  ) then 'native'
  when ethnicity in
  (
      'UNKNOWN/NOT SPECIFIED' --   4523
    , 'UNABLE TO OBTAIN' --    814
    , 'PATIENT DECLINED TO ANSWER' --    559
  ) then 'unknown'
  else 'other' end as ethnicity_grouped
  -- , 'OTHER' --   1512
  -- , 'MULTI RACE ETHNICITY' --    130
  -- , 'PORTUGUESE' --     61
  -- , 'MIDDLE EASTERN' --     43
  -- , 'NATIVE HAWAIIAN OR OTHER PACIFIC ISLANDER' --     18
  -- , 'SOUTH AMERICAN' --      8
, adm.hospital_expire_flag
, DENSE_RANK() OVER (PARTITION BY adm.subject_id ORDER BY adm.admittime) AS hospstay_seq
, CASE
    WHEN DENSE_RANK() OVER (PARTITION BY adm.subject_id ORDER BY adm.admittime) = 1 THEN True
    ELSE False END AS first_hosp_stay

-- icu level factors
, ie.intime, ie.outtime
, ABS(DATE_DIFF('DAY', ie.outtime, ie.intime)) as los_icu
, DENSE_RANK() OVER (PARTITION BY ie.hadm_id ORDER BY ie.intime) AS icustay_seq

-- first ICU stay *for the current hospitalization*
, CASE
    WHEN DENSE_RANK() OVER (PARTITION BY ie.hadm_id ORDER BY ie.intime) = 1 THEN True
    ELSE False END AS first_icu_stay

FROM read_csv('{DATA_LOCATION}/icustays.csv', types={{'INTIME': 'DATETIME', 'OUTTIME': 'DATETIME'}}, ignore_errors=True) ie
INNER JOIN read_csv('{DATA_LOCATION}/admissions.csv', types={{'ADMITTIME': 'DATETIME', 'DISCHTIME': 'DATETIME'}}, ignore_errors=True) adm
    ON ie.hadm_id = adm.hadm_id
INNER JOIN read_csv('{DATA_LOCATION}/patients.csv', types={{'DOB': 'DATETIME'}}, ignore_errors=True) pat
    ON ie.subject_id = pat.subject_id
WHERE adm.has_chartevents_data = 1
ORDER BY ie.subject_id, adm.admittime, ie.intime'''
duckdb.sql(f'''COPY ({icustay_detail_query}) TO '{DATA_LOCATION}/icustay_detail.parquet' (FORMAT parquet);''')

In [3]:
# we need to make a temporary table
chartevents_part1_query = f'''SELECT ICUSTAY_ID, ITEMID, VALUE, VALUENUM, CHARTTIME FROM read_csv('{DATA_LOCATION}/CHARTEVENTS.csv', types={{'VALUE': 'VARCHAR', 'CHARTTIME': 'DATETIME'}}) WHERE ITEMID IN (184, 454, 723, 223900, 223901, 220739) AND (ERROR IS NULL OR ERROR = 0)'''
duckdb.sql(f'''COPY ({chartevents_part1_query}) TO '{DATA_LOCATION}/chartevents_part1.parquet' (FORMAT parquet);''')

In [4]:
gcs_base_query = f'''SELECT pvt.ICUSTAY_ID
  , pvt.CHARTTIME

  -- Easier names - note we coalesced Metavision and CareVue IDs below
  , max(case when pvt.ITEMID = 454 then pvt.VALUENUM else null end) as GCSMotor
  , max(case when pvt.ITEMID = 723 then pvt.VALUENUM else null end) as GCSVerbal
  , max(case when pvt.ITEMID = 184 then pvt.VALUENUM else null end) as GCSEyes

  -- If verbal was set to 0 in the below select, then this is an intubated patient
  , case
      when max(case when pvt.ITEMID = 723 then pvt.VALUENUM else null end) = 0
    then 1
    else 0
    end as EndoTrachFlag

  , ROW_NUMBER ()
          OVER (PARTITION BY pvt.ICUSTAY_ID ORDER BY pvt.CHARTTIME ASC) as rn

  FROM  (
  select l.ICUSTAY_ID
  -- merge the ITEMIDs so that the pivot applies to both metavision/carevue data
  , case
      when l.ITEMID in (723,223900) then 723
      when l.ITEMID in (454,223901) then 454
      when l.ITEMID in (184,220739) then 184
      else l.ITEMID end
    as ITEMID

  -- convert the data into a number, reserving a value of 0 for ET/Trach
  , case
      -- endotrach/vent is assigned a value of 0, later parsed specially
      when l.ITEMID = 723 and l.VALUE = '1.0 ET/Trach' then 0 -- carevue
      when l.ITEMID = 223900 and l.VALUE = 'No Response-ETT' then 0 -- metavision

      else VALUENUM
      end
    as VALUENUM
  , l.CHARTTIME
  FROM '{DATA_LOCATION}/chartevents_part1.parquet' l

  -- get intime for CHARTTIME subselection
  inner join read_csv('{DATA_LOCATION}/icustays.csv', types={{'INTIME': 'DATETIME'}}) b
    on l.ICUSTAY_ID = b.ICUSTAY_ID

  -- Only get data for the first 24 hours
  and l.CHARTTIME between b.INTIME and DATE_ADD(b.INTIME, INTERVAL '1' DAY)
  -- DATETIME_ADD(b.INTIME, INTERVAL '1' DAY)
  ) pvt
  group by pvt.ICUSTAY_ID, pvt.CHARTTIME'''
duckdb.sql(f'''COPY ({gcs_base_query}) TO '{DATA_LOCATION}/gcs_base.parquet' (FORMAT parquet);''')

In [5]:
gcs_final_query = f'''
with gcs as (
  select b.*
  , b2.GCSVerbal as GCSVerbalPrev
  , b2.GCSMotor as GCSMotorPrev
  , b2.GCSEyes as GCSEyesPrev
  , case
      -- replace GCS during sedation with 15
      when b.GCSVerbal = 0
        then 15
      when b.GCSVerbal is null and b2.GCSVerbal = 0
        then 15
      when b2.GCSVerbal = 0
        then
            coalesce(b.GCSMotor,6)
          + coalesce(b.GCSVerbal,5)
          + coalesce(b.GCSEyes,4)
      else
            coalesce(b.GCSMotor,coalesce(b2.GCSMotor,6))
          + coalesce(b.GCSVerbal,coalesce(b2.GCSVerbal,5))
          + coalesce(b.GCSEyes,coalesce(b2.GCSEyes,4))
      end as GCS

  from '{DATA_LOCATION}/gcs_base.parquet' b
  -- join to itself within 6 hours to get previous value
  left join '{DATA_LOCATION}/gcs_base.parquet' b2
    on b.ICUSTAY_ID = b2.ICUSTAY_ID and b.rn = b2.rn+1 and b2.charttime > DATE_ADD(b.charttime, INTERVAL '-6' HOUR)
),
gcs_final as (
  select gcs.*
  -- This sorts the data by GCS, so rn=1 is the the lowest GCS values to keep
  , ROW_NUMBER ()
          OVER (PARTITION BY gcs.ICUSTAY_ID
                ORDER BY gcs.GCS
               ) as IsMinGCS
  from gcs
)
select ie.subject_id, ie.hadm_id, ie.icustay_id
-- The minimum GCS is determined by the above row partition, we only join if IsMinGCS=1
, GCS as mingcs
, coalesce(GCSMotor,GCSMotorPrev) as gcsmotor
, coalesce(GCSVerbal,GCSVerbalPrev) as gcsverbal
, coalesce(GCSEyes,GCSEyesPrev) as gcseyes
, EndoTrachFlag as endotrachflag

-- subselect down to the cohort of eligible patients
FROM '{DATA_LOCATION}/icustays.csv' ie
left join gcs_final gs
  on ie.icustay_id = gs.icustay_id and gs.IsMinGCS = 1
ORDER BY ie.icustay_id'''
duckdb.sql(f'''COPY ({gcs_final_query}) TO '{DATA_LOCATION}/gcs_first_day.parquet' (FORMAT parquet);''')

In [6]:
vitals_query = f'''
SELECT pvt.subject_id, pvt.hadm_id, pvt.icustay_id
, min(case when VitalID = 1 then valuenum ELSE NULL END) AS heartrate_min
, max(case when VitalID = 1 then valuenum ELSE NULL END) AS heartrate_max
, avg(case when VitalID = 1 then valuenum ELSE NULL END) AS heartrate_mean
, min(case when VitalID = 2 then valuenum ELSE NULL END) AS sysbp_min
, max(case when VitalID = 2 then valuenum ELSE NULL END) AS sysbp_max
, avg(case when VitalID = 2 then valuenum ELSE NULL END) AS sysbp_mean
, min(case when VitalID = 3 then valuenum ELSE NULL END) AS diasbp_min
, max(case when VitalID = 3 then valuenum ELSE NULL END) AS diasbp_max
, avg(case when VitalID = 3 then valuenum ELSE NULL END) AS diasbp_mean
, min(case when VitalID = 4 then valuenum ELSE NULL END) AS meanbp_min
, max(case when VitalID = 4 then valuenum ELSE NULL END) AS meanbp_max
, avg(case when VitalID = 4 then valuenum ELSE NULL END) AS meanbp_mean
, min(case when VitalID = 5 then valuenum ELSE NULL END) AS resprate_min
, max(case when VitalID = 5 then valuenum ELSE NULL END) AS resprate_max
, avg(case when VitalID = 5 then valuenum ELSE NULL END) AS resprate_mean
, min(case when VitalID = 6 then valuenum ELSE NULL END) AS tempc_min
, max(case when VitalID = 6 then valuenum ELSE NULL END) AS tempc_max
, avg(case when VitalID = 6 then valuenum ELSE NULL END) AS tempc_mean
, min(case when VitalID = 7 then valuenum ELSE NULL END) AS spo2_min
, max(case when VitalID = 7 then valuenum ELSE NULL END) AS spo2_max
, avg(case when VitalID = 7 then valuenum ELSE NULL END) AS spo2_mean
, min(case when VitalID = 8 then valuenum ELSE NULL END) AS glucose_min
, max(case when VitalID = 8 then valuenum ELSE NULL END) AS glucose_max
, avg(case when VitalID = 8 then valuenum ELSE NULL END) AS glucose_mean

FROM  (
  select ie.subject_id, ie.hadm_id, ie.icustay_id
  , case
    when itemid in (211,220045) and valuenum > 0 and valuenum < 300 then 1 -- HeartRate
    when itemid in (51,442,455,6701,220179,220050) and valuenum > 0 and valuenum < 400 then 2 -- SysBP
    when itemid in (8368,8440,8441,8555,220180,220051) and valuenum > 0 and valuenum < 300 then 3 -- DiasBP
    when itemid in (456,52,6702,443,220052,220181,225312) and valuenum > 0 and valuenum < 300 then 4 -- MeanBP
    when itemid in (615,618,220210,224690) and valuenum > 0 and valuenum < 70 then 5 -- RespRate
    when itemid in (223761,678) and valuenum > 70 and valuenum < 120  then 6 -- TempF, converted to degC in valuenum call
    when itemid in (223762,676) and valuenum > 10 and valuenum < 50  then 6 -- TempC
    when itemid in (646,220277) and valuenum > 0 and valuenum <= 100 then 7 -- SpO2
    when itemid in (807,811,1529,3745,3744,225664,220621,226537) and valuenum > 0 then 8 -- Glucose

    else null end as vitalid
      -- convert F to C
  , case when itemid in (223761,678) then (valuenum-32)/1.8 else valuenum end as valuenum

  from read_csv('{DATA_LOCATION}/icustays.csv', types={{'INTIME': 'DATETIME'}}) ie
  left join read_csv('{DATA_LOCATION}/chartevents.csv', types={{'CHARTTIME': 'DATETIME'}}) ce
  on ie.icustay_id = ce.icustay_id
  and ce.charttime between ie.intime and DATE_ADD(ie.intime, INTERVAL '1' DAY)
  and ABS(DATE_DIFF('SECOND', ce.charttime, ie.intime)) >= 60
  -- and DATE_DIFF('HOUR', ce.charttime, ie.intime) <= 24
  -- exclude rows marked as error
  and (ce.error IS NULL or ce.error = 0)
  where ce.itemid in
  (
  -- HEART RATE
  211, --"Heart Rate"
  220045, --"Heart Rate"

  -- Systolic/diastolic

  51, --	Arterial BP [Systolic]
  442, --	Manual BP [Systolic]
  455, --	NBP [Systolic]
  6701, --	Arterial BP #2 [Systolic]
  220179, --	Non Invasive Blood Pressure systolic
  220050, --	Arterial Blood Pressure systolic

  8368, --	Arterial BP [Diastolic]
  8440, --	Manual BP [Diastolic]
  8441, --	NBP [Diastolic]
  8555, --	Arterial BP #2 [Diastolic]
  220180, --	Non Invasive Blood Pressure diastolic
  220051, --	Arterial Blood Pressure diastolic


  -- MEAN ARTERIAL PRESSURE
  456, --"NBP Mean"
  52, --"Arterial BP Mean"
  6702, --	Arterial BP Mean #2
  443, --	Manual BP Mean(calc)
  220052, --"Arterial Blood Pressure mean"
  220181, --"Non Invasive Blood Pressure mean"
  225312, --"ART BP mean"

  -- RESPIRATORY RATE
  618,--	Respiratory Rate
  615,--	Resp Rate (Total)
  220210,--	Respiratory Rate
  224690, --	Respiratory Rate (Total)


  -- SPO2, peripheral
  646, 220277,

  -- GLUCOSE, both lab and fingerstick
  807,--	Fingerstick Glucose
  811,--	Glucose (70-105)
  1529,--	Glucose
  3745,--	BloodGlucose
  3744,--	Blood Glucose
  225664,--	Glucose finger stick
  220621,--	Glucose (serum)
  226537,--	Glucose (whole blood)

  -- TEMPERATURE
  223762, -- "Temperature Celsius"
  676,	-- "Temperature C"
  223761, -- "Temperature Fahrenheit"
  678 --	"Temperature F"

  )
) pvt
group by pvt.subject_id, pvt.hadm_id, pvt.icustay_id
order by pvt.subject_id, pvt.hadm_id, pvt.icustay_id'''

duckdb.sql(f'''COPY ({vitals_query}) TO '{DATA_LOCATION}/vitals_first_day.parquet' (FORMAT parquet);''')

In [7]:
urine_output_query = f'''select
  -- patient identifiers
  ie.subject_id, ie.hadm_id, ie.icustay_id

  -- volumes associated with urine output ITEMIDs
  , sum(
      -- we consider input of GU irrigant as a negative volume
      case
        when oe.itemid = 227488 and oe.value > 0 then -1*oe.value
        else oe.value
    end) as urineoutput
FROM read_csv('{DATA_LOCATION}/icustays.csv', types={{'INTIME': 'DATETIME'}}) ie
-- Join to the outputevents table to get urine output
left join read_csv('{DATA_LOCATION}/outputevents.csv', types={{'CHARTTIME': 'DATETIME'}}) oe
-- join on all patient identifiers
on ie.subject_id = oe.subject_id and ie.hadm_id = oe.hadm_id and ie.icustay_id = oe.icustay_id
-- and ensure the data occurs during the first day
and oe.charttime between ie.intime and (DATE_ADD(ie.intime, INTERVAL '1' DAY)) -- first ICU day
where itemid in
(
-- these are the most frequently occurring urine output observations in CareVue
40055, -- "Urine Out Foley"
43175, -- "Urine ."
40069, -- "Urine Out Void"
40094, -- "Urine Out Condom Cath"
40715, -- "Urine Out Suprapubic"
40473, -- "Urine Out IleoConduit"
40085, -- "Urine Out Incontinent"
40057, -- "Urine Out Rt Nephrostomy"
40056, -- "Urine Out Lt Nephrostomy"
40405, -- "Urine Out Other"
40428, -- "Urine Out Straight Cath"
40086,--	Urine Out Incontinent
40096, -- "Urine Out Ureteral Stent #1"
40651, -- "Urine Out Ureteral Stent #2"

-- these are the most frequently occurring urine output observations in MetaVision
226559, -- "Foley"
226560, -- "Void"
226561, -- "Condom Cath"
226584, -- "Ileoconduit"
226563, -- "Suprapubic"
226564, -- "R Nephrostomy"
226565, -- "L Nephrostomy"
226567, --	Straight Cath
226557, -- R Ureteral Stent
226558, -- L Ureteral Stent
227488, -- GU Irrigant Volume In
227489  -- GU Irrigant/Urine Volume Out
)
group by ie.subject_id, ie.hadm_id, ie.icustay_id
order by ie.subject_id, ie.hadm_id, ie.icustay_id'''
duckdb.sql(f'''COPY ({urine_output_query}) TO '{DATA_LOCATION}/urine_output_first_day.parquet' (FORMAT parquet);''')

In [8]:
ventilation_classification_query = f'''select
  icustay_id, charttime
  -- case statement determining whether it is an instance of mech vent
  , max(
    case
      when itemid is null or value is null then 0 -- can't have null values
      when itemid = 720 and value != 'Other/Remarks' THEN 1  -- VentTypeRecorded
      when itemid = 223848 and value != 'Other' THEN 1
      when itemid = 223849 then 1 -- ventilator mode
      when itemid = 467 and value = 'Ventilator' THEN 1 -- O2 delivery device == ventilator
      when itemid in
        (
        445, 448, 449, 450, 1340, 1486, 1600, 224687 -- minute volume
        , 639, 654, 681, 682, 683, 684,224685,224684,224686 -- tidal volume
        , 218,436,535,444,459,224697,224695,224696,224746,224747 -- High/Low/Peak/Mean/Neg insp force ("RespPressure")
        , 221,1,1211,1655,2000,226873,224738,224419,224750,227187 -- Insp pressure
        , 543 -- PlateauPressure
        , 5865,5866,224707,224709,224705,224706 -- APRV pressure
        , 60,437,505,506,686,220339,224700 -- PEEP
        , 3459 -- high pressure relief
        , 501,502,503,224702 -- PCV
        , 223,667,668,669,670,671,672 -- TCPCV
        , 224701 -- PSVlevel
        )
        THEN 1
      else 0
    end
    ) as MechVent
    , max(
      case
        -- initiation of oxygen therapy indicates the ventilation has ended
        when itemid = 226732 and value in
        (
          'Nasal cannula', -- 153714 observations
          'Face tent', -- 24601 observations
          'Aerosol-cool', -- 24560 observations
          'Trach mask ', -- 16435 observations
          'High flow neb', -- 10785 observations
          'Non-rebreather', -- 5182 observations
          'Venti mask ', -- 1947 observations
          'Medium conc mask ', -- 1888 observations
          'T-piece', -- 1135 observations
          'High flow nasal cannula', -- 925 observations
          'Ultrasonic neb', -- 9 observations
          'Vapomist' -- 3 observations
        ) then 1
        when itemid = 467 and value in
        (
          'Cannula', -- 278252 observations
          'Nasal Cannula', -- 248299 observations
          -- 'None', -- 95498 observations
          'Face Tent', -- 35766 observations
          'Aerosol-Cool', -- 33919 observations
          'Trach Mask', -- 32655 observations
          'Hi Flow Neb', -- 14070 observations
          'Non-Rebreather', -- 10856 observations
          'Venti Mask', -- 4279 observations
          'Medium Conc Mask', -- 2114 observations
          'Vapotherm', -- 1655 observations
          'T-Piece', -- 779 observations
          'Hood', -- 670 observations
          'Hut', -- 150 observations
          'TranstrachealCat', -- 78 observations
          'Heated Neb', -- 37 observations
          'Ultrasonic Neb' -- 2 observations
        ) then 1
      else 0
      end
    ) as OxygenTherapy
    , max(
      case when itemid is null or value is null then 0
        -- extubated indicates ventilation event has ended
        when itemid = 640 and value = 'Extubated' then 1
        when itemid = 640 and value = 'Self Extubation' then 1
      else 0
      end
      )
      as Extubated
    , max(
      case when itemid is null or value is null then 0
        when itemid = 640 and value = 'Self Extubation' then 1
      else 0
      end
      )
      as SelfExtubated
from read_csv('{DATA_LOCATION}/chartevents.csv', types={{'CHARTTIME': 'DATETIME', 'VALUE': 'VARCHAR'}}) ce
where ce.value is not null
-- exclude rows marked as error
and (ce.error != 1 or ce.error IS NULL)
and itemid in
(
    -- the below are settings used to indicate ventilation
      720, 223849 -- vent mode
    , 223848 -- vent type
    , 445, 448, 449, 450, 1340, 1486, 1600, 224687 -- minute volume
    , 639, 654, 681, 682, 683, 684,224685,224684,224686 -- tidal volume
    , 218,436,535,444,224697,224695,224696,224746,224747 -- High/Low/Peak/Mean ("RespPressure")
    , 221,1,1211,1655,2000,226873,224738,224419,224750,227187 -- Insp pressure
    , 543 -- PlateauPressure
    , 5865,5866,224707,224709,224705,224706 -- APRV pressure
    , 60,437,505,506,686,220339,224700 -- PEEP
    , 3459 -- high pressure relief
    , 501,502,503,224702 -- PCV
    , 223,667,668,669,670,671,672 -- TCPCV
    , 224701 -- PSVlevel

    -- the below are settings used to indicate extubation
    , 640 -- extubated

    -- the below indicate oxygen/NIV, i.e. the end of a mechanical vent event
    , 468 -- O2 Delivery Device#2
    , 469 -- O2 Delivery Mode
    , 470 -- O2 Flow (lpm)
    , 471 -- O2 Flow (lpm) #2
    , 227287 -- O2 Flow (additional cannula)
    , 226732 -- O2 Delivery Device(s)
    , 223834 -- O2 Flow

    -- used in both oxygen + vent calculation
    , 467 -- O2 Delivery Device
)
group by icustay_id, charttime
UNION DISTINCT
-- add in the extubation flags from procedureevents_mv
-- note that we only need the start time for the extubation
-- (extubation is always charted as ending 1 minute after it started)
select
  icustay_id, starttime as charttime
  , 0 as MechVent
  , 0 as OxygenTherapy
  , 1 as Extubated
  , case when itemid = 225468 then 1 else 0 end as SelfExtubated
from read_csv('{DATA_LOCATION}/procedureevents_mv.csv', types={{'STARTTIME': 'DATETIME'}}) pe
where itemid in
(
  227194 -- "Extubation"
, 225468 -- "Unplanned Extubation (patient-initiated)"
, 225477 -- "Unplanned Extubation (non-patient initiated)"
)'''
duckdb.sql(f'''COPY ({ventilation_classification_query}) TO '{DATA_LOCATION}/ventilation_classification.parquet' (FORMAT parquet);''')

In [9]:
ventilation_durations_query = f'''with vd0 as
(
  select
    icustay_id
    -- this carries over the previous charttime which had a mechanical ventilation event
    , case
        when MechVent=1 then
          LAG(CHARTTIME, 1) OVER (partition by icustay_id, MechVent order by charttime)
        else null
      end as charttime_lag
    , charttime
    , MechVent
    , OxygenTherapy
    , Extubated
    , SelfExtubated
  from read_parquet('{DATA_LOCATION}/ventilation_classification.parquet') ce
)
, vd1 as
(
  select
      icustay_id
      , charttime_lag
      , charttime
      , MechVent
      , OxygenTherapy
      , Extubated
      , SelfExtubated

      -- if this is a mechanical ventilation event, we calculate the time since the last event
      , case
          -- if the current observation indicates mechanical ventilation is present
          -- calculate the time since the last vent event
          when MechVent=1 then
            ABS(DATE_DIFF('MINUTE', CHARTTIME, charttime_lag))/60
          else null
        end as ventduration

      , LAG(Extubated,1)
      OVER
      (
      partition by icustay_id, case when MechVent=1 or Extubated=1 then 1 else 0 end
      order by charttime
      ) as ExtubatedLag

      -- now we determine if the current mech vent event is a "new", i.e. they've just been intubated
      , case
        -- if there is an extubation flag, we mark any subsequent ventilation as a new ventilation event
          --when Extubated = 1 then 0 -- extubation is *not* a new ventilation event, the *subsequent* row is
          when
            LAG(Extubated,1)
            OVER
            (
            partition by icustay_id, case when MechVent=1 or Extubated=1 then 1 else 0 end
            order by charttime
            )
            = 1 then 1
          -- if patient has initiated oxygen therapy, and is not currently vented, start a newvent
          when MechVent = 0 and OxygenTherapy = 1 then 1
            -- if there is less than 8 hours between vent settings, we do not treat this as a new ventilation event
          when CHARTTIME > DATE_ADD(charttime_lag, INTERVAL '8' HOUR)
            then 1
        else 0
        end as newvent
  -- use the staging table with only vent settings from chart events
  FROM vd0 ventsettings
)
, vd2 as
(
  select vd1.*
  -- create a cumulative sum of the instances of new ventilation
  -- this results in a monotonic integer assigned to each instance of ventilation
  , case when MechVent=1 or Extubated = 1 then
      SUM( newvent )
      OVER ( partition by icustay_id order by charttime )
    else null end
    as ventnum
  --- now we convert CHARTTIME of ventilator settings into durations
  from vd1
)
-- create the durations for each mechanical ventilation instance
select icustay_id
  -- regenerate ventnum so it's sequential
  , ROW_NUMBER() over (partition by icustay_id order by ventnum) as ventnum
  , min(charttime) as starttime
  , max(charttime) as endtime
  , ABS(DATE_DIFF('MINUTE', max(charttime), min(charttime)))/60 AS duration_hours
from vd2
group by icustay_id, vd2.ventnum
having min(charttime) != max(charttime)
-- patient had to be mechanically ventilated at least once
-- i.e. max(mechvent) should be 1
-- this excludes a frequent situation of NIV/oxygen before intub
-- in these cases, ventnum=0 and max(mechvent)=0, so they are ignored
and max(mechvent) = 1
order by icustay_id, ventnum'''
duckdb.sql(f'''COPY ({ventilation_durations_query}) TO '{DATA_LOCATION}/ventilation_durations.parquet' (FORMAT parquet);''')

In [10]:
ventilation_first_day_query = f'''-- Determines if a patient is ventilated on the first day of their ICU stay.
-- Creates a table with the result.
-- Requires the `ventilation_durations` table, generated by ../ventilation-durations.sql

select
  ie.subject_id, ie.hadm_id, ie.icustay_id
  -- if vd.icustay_id is not null, then they have a valid ventilation event
  -- in this case, we say they are ventilated
  -- otherwise, they are not
  , max(case
      when vd.icustay_id is not null then 1
    else 0 end) as vent
FROM read_csv('{DATA_LOCATION}/icustays.csv', types={{'INTIME': 'DATETIME'}}) ie
left join read_parquet('{DATA_LOCATION}/ventilation_durations.parquet') vd
  on ie.icustay_id = vd.icustay_id
  and
  (
    -- ventilation duration overlaps with ICU admission -> vented on admission
    (vd.starttime <= ie.intime and vd.endtime >= ie.intime)
    -- ventilation started during the first day
    OR (vd.starttime >= ie.intime and vd.starttime <= DATE_ADD(ie.intime, INTERVAL '1' DAY))
  )
group by ie.subject_id, ie.hadm_id, ie.icustay_id
order by ie.subject_id, ie.hadm_id, ie.icustay_id'''
duckdb.sql(f'''COPY ({ventilation_first_day_query}) TO '{DATA_LOCATION}/ventilation_first_day.parquet' (FORMAT parquet);''')

In [11]:
oasis_query = f'''with surgflag as
(
  select ie.icustay_id
    , max(case
        when lower(curr_service) like '%surg%' then 1
        when curr_service = 'ORTHO' then 1
    else 0 end) as surgical
  FROM read_csv('{DATA_LOCATION}/icustays.csv', types={{'INTIME': 'DATETIME'}}) ie
  left join read_csv('{DATA_LOCATION}/services.csv', types={{'TRANSFERTIME': 'DATETIME'}}) se
    on ie.hadm_id = se.hadm_id
    and se.transfertime < DATE_ADD(ie.intime, INTERVAL '1' DAY)
  group by ie.icustay_id
)
, cohort as
(
select ie.subject_id, ie.hadm_id, ie.icustay_id
      , ie.intime
      , ie.outtime
      , adm.deathtime
      , ABS(DATE_DIFF('MINUTE', ie.intime, adm.admittime)) as preiculos
      , ABS(DATE_DIFF('YEAR', ie.intime, pat.dob)) as age
      , gcs.mingcs
      , vital.heartrate_max
      , vital.heartrate_min
      , vital.meanbp_max
      , vital.meanbp_min
      , vital.resprate_max
      , vital.resprate_min
      , vital.tempc_max
      , vital.tempc_min
      , vent.vent as mechvent
      , uo.urineoutput

      , case
          when adm.ADMISSION_TYPE = 'ELECTIVE' and sf.surgical = 1
            then 1
          when adm.ADMISSION_TYPE is null or sf.surgical is null
            then null
          else 0
        end as electivesurgery

      -- age group
      , case
        when ABS(DATE_DIFF('YEAR', ie.intime, pat.dob)) <= 1 then 'neonate'
        when ABS(DATE_DIFF('YEAR', ie.intime, pat.dob)) <= 15 then 'middle'
        else 'adult' end as icustay_age_group

      -- mortality flags
      , case
          when adm.deathtime between ie.intime and ie.outtime
            then 1
          when adm.deathtime <= ie.intime -- sometimes there are typographical errors in the death date
            then 1
          when adm.dischtime <= ie.outtime and adm.discharge_location = 'DEAD/EXPIRED'
            then 1
          else 0 end
        as icustay_expire_flag
      , adm.hospital_expire_flag
FROM read_csv('{DATA_LOCATION}/icustays.csv', types={{'INTIME': 'DATETIME', 'OUTTIME': 'DATETIME'}}) ie
inner join read_csv('{DATA_LOCATION}/admissions.csv', types={{'DISCHTIME': 'DATETIME', 'DEATHTIME': 'DATETIME', 'ADMITTIME': 'DATETIME'}}, ignore_errors=True) adm
  on ie.hadm_id = adm.hadm_id
inner join read_csv('{DATA_LOCATION}/patients.csv', types={{'DOB': 'DATETIME'}}) pat
  on ie.subject_id = pat.subject_id
left join surgflag sf
  on ie.icustay_id = sf.icustay_id
-- join to custom tables to get more data....
left join read_parquet('{DATA_LOCATION}/gcs_first_day.parquet') gcs
  on ie.icustay_id = gcs.icustay_id
left join read_parquet('{DATA_LOCATION}/vitals_first_day.parquet') vital
  on ie.icustay_id = vital.icustay_id
left join read_parquet('{DATA_LOCATION}/urine_output_first_day.parquet') uo
  on ie.icustay_id = uo.icustay_id
left join read_parquet('{DATA_LOCATION}/ventilation_first_day.parquet') vent
  on ie.icustay_id = vent.icustay_id
)
, scorecomp as
(
select co.subject_id, co.hadm_id, co.icustay_id
, co.icustay_age_group
, co.icustay_expire_flag
, co.hospital_expire_flag

-- Below code calculates the component scores needed for oasis
, case when preiculos is null then null
     when preiculos < 10.2 then 5
     when preiculos < 297 then 3
     when preiculos < 1440 then 0
     when preiculos < 18708 then 2
     else 1 end as preiculos_score
,  case when age is null then null
      when age < 24 then 0
      when age <= 53 then 3
      when age <= 77 then 6
      when age <= 89 then 9
      when age >= 90 then 7
      else 0 end as age_score
,  case when mingcs is null then null
      when mingcs <= 7 then 10
      when mingcs < 14 then 4
      when mingcs = 14 then 3
      else 0 end as gcs_score
,  case when heartrate_max is null then null
      when heartrate_max > 125 then 6
      when heartrate_min < 33 then 4
      when heartrate_max >= 107 and heartrate_max <= 125 then 3
      when heartrate_max >= 89 and heartrate_max <= 106 then 1
      else 0 end as heartrate_score
,  case when meanbp_min is null then null
      when meanbp_min < 20.65 then 4
      when meanbp_min < 51 then 3
      when meanbp_max > 143.44 then 3
      when meanbp_min >= 51 and meanbp_min < 61.33 then 2
      else 0 end as meanbp_score
,  case when resprate_min is null then null
      when resprate_min <   6 then 10
      when resprate_max >  44 then  9
      when resprate_max >  30 then  6
      when resprate_max >  22 then  1
      when resprate_min <  13 then 1 else 0
      end as resprate_score
,  case when tempc_max is null then null
      when tempc_max > 39.88 then 6
      when tempc_min >= 33.22 and tempc_min <= 35.93 then 4
      when tempc_max >= 33.22 and tempc_max <= 35.93 then 4
      when tempc_min < 33.22 then 3
      when tempc_min > 35.93 and tempc_min <= 36.39 then 2
      when tempc_max >= 36.89 and tempc_max <= 39.88 then 2
      else 0 end as temp_score
,  case when UrineOutput is null then null
      when UrineOutput < 671.09 then 10
      when UrineOutput > 6896.80 then 8
      when UrineOutput >= 671.09
       and UrineOutput <= 1426.99 then 5
      when UrineOutput >= 1427.00
       and UrineOutput <= 2544.14 then 1
      else 0 end as urineoutput_score
,  case when mechvent is null then null
      when mechvent = 1 then 9
      else 0 end as mechvent_score
,  case when electivesurgery is null then null
      when electivesurgery = 1 then 0
      else 6 end as electivesurgery_score


-- The below code gives the component associated with each score
-- This is not needed to calculate oasis, but provided for user convenience.
-- If both the min/max are in the normal range (score of 0), then the average value is stored.
, preiculos
, age
, mingcs as gcs
,  case when heartrate_max is null then null
      when heartrate_max > 125 then heartrate_max
      when heartrate_min < 33 then heartrate_min
      when heartrate_max >= 107 and heartrate_max <= 125 then heartrate_max
      when heartrate_max >= 89 and heartrate_max <= 106 then heartrate_max
      else (heartrate_min+heartrate_max)/2 end as heartrate
,  case when meanbp_min is null then null
      when meanbp_min < 20.65 then meanbp_min
      when meanbp_min < 51 then meanbp_min
      when meanbp_max > 143.44 then meanbp_max
      when meanbp_min >= 51 and meanbp_min < 61.33 then meanbp_min
      else (meanbp_min+meanbp_max)/2 end as meanbp
,  case when resprate_min is null then null
      when resprate_min <   6 then resprate_min
      when resprate_max >  44 then resprate_max
      when resprate_max >  30 then resprate_max
      when resprate_max >  22 then resprate_max
      when resprate_min <  13 then resprate_min
      else (resprate_min+resprate_max)/2 end as resprate
,  case when tempc_max is null then null
      when tempc_max > 39.88 then tempc_max
      when tempc_min >= 33.22 and tempc_min <= 35.93 then tempc_min
      when tempc_max >= 33.22 and tempc_max <= 35.93 then tempc_max
      when tempc_min < 33.22 then tempc_min
      when tempc_min > 35.93 and tempc_min <= 36.39 then tempc_min
      when tempc_max >= 36.89 and tempc_max <= 39.88 then tempc_max
      else (tempc_min+tempc_max)/2 end as temp
,  UrineOutput
,  mechvent
,  electivesurgery
from cohort co
)
, score as
(
select s.*
    , coalesce(age_score,0)
    + coalesce(preiculos_score,0)
    + coalesce(gcs_score,0)
    + coalesce(heartrate_score,0)
    + coalesce(meanbp_score,0)
    + coalesce(resprate_score,0)
    + coalesce(temp_score,0)
    + coalesce(urineoutput_score,0)
    + coalesce(mechvent_score,0)
    + coalesce(electivesurgery_score,0)
    as oasis
from scorecomp s
)
select
  subject_id, hadm_id, icustay_id
  , icustay_age_group
  , hospital_expire_flag
  , icustay_expire_flag
  , oasis
  -- Calculate the probability of in-hospital mortality
  , 1 / (1 + exp(- (-6.1746 + 0.1275*(oasis) ))) as oasis_PROB
  , age, age_score
  , preiculos, preiculos_score
  , gcs, gcs_score
  , heartrate, heartrate_score
  , meanbp, meanbp_score
  , resprate, resprate_score
  , temp, temp_score
  , urineoutput, urineoutput_score
  , mechvent, mechvent_score
  , electivesurgery, electivesurgery_score
from score
order by icustay_id'''
duckdb.sql(f'''COPY ({oasis_query}) TO '{DATA_LOCATION}/oasis.parquet' (FORMAT parquet);''')

In [12]:
vasopressors_durations_query = f'''with io_cv as
(
  select
    icustay_id, charttime, itemid, stopped
    -- ITEMIDs (42273, 42802) accidentally store rate in amount column
    , case
        when itemid in (42273, 42802)
          then amount
        else rate
      end as rate
    , case
        when itemid in (42273, 42802)
          then rate
        else amount
      end as amount
  FROM read_csv('{DATA_LOCATION}/inputevents_cv.csv', types={{'CHARTTIME': 'DATETIME', 'ITEMID': 'INT', 'AMOUNT': 'FLOAT', 'RATE': 'FLOAT'}})
  where itemid in
  (
    30047,30120,30044,30119,30309,30127
  , 30128,30051,30043,30307,30042,30306,30125
  , 42273, 42802
  )
)
-- select only the ITEMIDs from the inputevents_mv table related to vasopressors
, io_mv as
(
  select
    icustay_id, linkorderid, starttime, endtime
  FROM read_csv('{DATA_LOCATION}/inputevents_mv.csv', types={{'STARTTIME': 'DATETIME', 'ENDTIME': 'DATETIME'}}) io
  -- Subselect the vasopressor ITEMIDs
  where itemid in
  (
  221906,221289,221749,222315,221662,221653,221986
  )
  and statusdescription != 'Rewritten' -- only valid orders
)
, vasocv1 as
(
  select
    icustay_id, charttime, itemid
    -- case statement determining whether the ITEMID is an instance of vasopressor usage
    , 1 as vaso

    -- the 'stopped' column indicates if a vasopressor has been disconnected
    , max(case when (stopped = 'Stopped' OR stopped like 'D/C%') then 1
          else 0 end) as vaso_stopped

    , max(case when rate is not null then 1 else 0 end) as vaso_null
    , max(rate) as vaso_rate
    , max(amount) as vaso_amount

  from io_cv
  group by icustay_id, charttime, itemid
)
, vasocv2 as
(
  select v.*
    , sum(vaso_null) over (partition by icustay_id, itemid order by charttime) as vaso_partition
  from
    vasocv1 v
)
, vasocv3 as
(
  select v.*
    , first_value(vaso_rate) over (partition by icustay_id, itemid, vaso_partition order by charttime) as vaso_prevrate_ifnull
  from
    vasocv2 v
)
, vasocv4 as
(
select
    icustay_id
    , charttime
    , itemid
    -- , (CHARTTIME - (LAG(CHARTTIME, 1) OVER (partition by icustay_id, vaso order by charttime))) AS delta

    , vaso
    , vaso_rate
    , vaso_amount
    , vaso_stopped
    , vaso_prevrate_ifnull

    -- We define start time here
    , case
        when vaso = 0 then null

        -- if this is the first instance of the vasoactive drug
        when vaso_rate > 0 and
          LAG(vaso_prevrate_ifnull,1)
          OVER
          (
          partition by icustay_id, itemid, vaso, vaso_null
          order by charttime
          )
          is null
          then 1

        -- you often get a string of 0s
        -- we decide not to set these as 1, just because it makes vasonum sequential
        when vaso_rate = 0 and
          LAG(vaso_prevrate_ifnull,1)
          OVER
          (
          partition by icustay_id, itemid, vaso
          order by charttime
          )
          = 0
          then 0

        -- sometimes you get a string of NULL, associated with 0 volumes
        -- same reason as before, we decide not to set these as 1
        -- vaso_prevrate_ifnull is equal to the previous value *iff* the current value is null
        when vaso_prevrate_ifnull = 0 and
          LAG(vaso_prevrate_ifnull,1)
          OVER
          (
          partition by icustay_id, itemid, vaso
          order by charttime
          )
          = 0
          then 0

        -- If the last recorded rate was 0, newvaso = 1
        when LAG(vaso_prevrate_ifnull,1)
          OVER
          (
          partition by icustay_id, itemid, vaso
          order by charttime
          ) = 0
          then 1

        -- If the last recorded vaso was D/C'd, newvaso = 1
        when
          LAG(vaso_stopped,1)
          OVER
          (
          partition by icustay_id, itemid, vaso
          order by charttime
          )
          = 1 then 1

        -- ** not sure if the below is needed
        --when (CHARTTIME - (LAG(CHARTTIME, 1) OVER (partition by icustay_id, vaso order by charttime))) > (interval '4 hours') then 1
      else null
      end as vaso_start

FROM
  vasocv3
)
-- propagate start/stop flags forward in time
, vasocv5 as
(
  select v.*
    , SUM(vaso_start) OVER (partition by icustay_id, itemid, vaso order by charttime) as vaso_first
FROM
  vasocv4 v
)
, vasocv6 as
(
  select v.*
    -- We define end time here
    , case
        when vaso = 0
          then null

        -- If the recorded vaso was D/C'd, this is an end time
        when vaso_stopped = 1
          then vaso_first

        -- If the rate is zero, this is the end time
        when vaso_rate = 0
          then vaso_first

        -- the last row in the table is always a potential end time
        -- this captures patients who die/are discharged while on vasopressors
        -- in principle, this could add an extra end time for the vasopressor
        -- however, since we later group on vaso_start, any extra end times are ignored
        when LEAD(CHARTTIME,1)
          OVER
          (
          partition by icustay_id, itemid, vaso
          order by charttime
          ) is null
          then vaso_first

        else null
        end as vaso_stop
    from vasocv5 v
)

-- -- if you want to look at the results of the table before grouping:
-- select
--   icustay_id, charttime, vaso, vaso_rate, vaso_amount
--     , case when vaso_stopped = 1 then 'Y' else '' end as stopped
--     , vaso_start
--     , vaso_first
--     , vaso_stop
-- from vasocv6 order by charttime;


, vasocv as
(
-- below groups together vasopressor administrations into groups
select
  icustay_id
  , itemid
  -- the first non-null rate is considered the starttime
  , min(case when vaso_rate is not null then charttime else null end) as starttime
  -- the *first* time the first/last flags agree is the stop time for this duration
  , min(case when vaso_first = vaso_stop then charttime else null end) as endtime
from vasocv6
where
  vaso_first is not null -- bogus data
and
  vaso_first != 0 -- sometimes *only* a rate of 0 appears, i.e. the drug is never actually delivered
and
  icustay_id is not null -- there are data for "floating" admissions, we don't worry about these
group by icustay_id, itemid, vaso_first
having -- ensure start time is not the same as end time
 min(charttime) != min(case when vaso_first = vaso_stop then charttime else null end)
and
  max(vaso_rate) > 0 -- if the rate was always 0 or null, we consider it not a real drug delivery
)
-- we do not group by ITEMID in below query
-- this is because we want to collapse all vasopressors together
, vasocv_grp as
(
SELECT
  s1.icustay_id,
  s1.starttime,
  MIN(t1.endtime) AS endtime
FROM vasocv s1
INNER JOIN vasocv t1
  ON  s1.icustay_id = t1.icustay_id
  AND s1.starttime <= t1.endtime
  AND NOT EXISTS(SELECT * FROM vasocv t2
                 WHERE t1.icustay_id = t2.icustay_id
                 AND t1.endtime >= t2.starttime
                 AND t1.endtime < t2.endtime)
WHERE NOT EXISTS(SELECT * FROM vasocv s2
                 WHERE s1.icustay_id = s2.icustay_id
                 AND s1.starttime > s2.starttime
                 AND s1.starttime <= s2.endtime)
GROUP BY s1.icustay_id, s1.starttime
ORDER BY s1.icustay_id, s1.starttime
)
-- now we extract the associated data for metavision patients
-- do not need to group by itemid because we group by linkorderid
, vasomv as
(
  select
    icustay_id, linkorderid
    , min(starttime) as starttime, max(endtime) as endtime
  from io_mv
  group by icustay_id, linkorderid
)
, vasomv_grp as
(
SELECT
  s1.icustay_id,
  s1.starttime,
  MIN(t1.endtime) AS endtime
FROM vasomv s1
INNER JOIN vasomv t1
  ON  s1.icustay_id = t1.icustay_id
  AND s1.starttime <= t1.endtime
  AND NOT EXISTS(SELECT * FROM vasomv t2
                 WHERE t1.icustay_id = t2.icustay_id
                 AND t1.endtime >= t2.starttime
                 AND t1.endtime < t2.endtime)
WHERE NOT EXISTS(SELECT * FROM vasomv s2
                 WHERE s1.icustay_id = s2.icustay_id
                 AND s1.starttime > s2.starttime
                 AND s1.starttime <= s2.endtime)
GROUP BY s1.icustay_id, s1.starttime
ORDER BY s1.icustay_id, s1.starttime
)
select
  icustay_id
  -- generate a sequential integer for convenience
  , ROW_NUMBER() over (partition by icustay_id order by starttime) as vasonum
  , starttime, endtime
  , ABS(DATE_DIFF('HOUR', endtime, starttime)) AS duration_hours
  -- add durations
from
  vasocv_grp

UNION ALL

select
  icustay_id
  , ROW_NUMBER() over (partition by icustay_id order by starttime) as vasonum
  , starttime, endtime
  , ABS(DATE_DIFF('HOUR', endtime, starttime)) AS duration_hours
  -- add durations
from
  vasomv_grp

order by icustay_id, vasonum'''
duckdb.sql(f'''COPY ({vasopressors_durations_query}) TO '{DATA_LOCATION}/vasopressors_durations.parquet' (FORMAT parquet);''')

In [13]:
vent_df_query = f'''
select i.HADM_ID, v.STARTTIME, v.ENDTIME
FROM '{DATA_LOCATION}/icustay_detail.parquet' i
INNER JOIN '{DATA_LOCATION}/ventilation_durations.parquet' v ON i.icustay_id = v.icustay_id
where v.starttime between intime and outtime
and v.endtime between intime and outtime
'''
duckdb.sql(f'''COPY ({vent_df_query}) TO '{DATA_LOCATION}/vent_df.parquet' (FORMAT parquet);''')

In [14]:
vaso_df_query = f'''
select i.hadm_id, v.starttime, v.endtime
FROM read_parquet('{DATA_LOCATION}/icustay_detail.parquet') i
INNER JOIN read_parquet('{DATA_LOCATION}/vasopressors_durations.parquet') v ON i.icustay_id = v.icustay_id
where v.starttime between intime and outtime
and v.endtime   between intime and outtime
'''
duckdb.sql(f'''COPY ({vaso_df_query}) TO '{DATA_LOCATION}/vaso_df.parquet' (FORMAT parquet);''')

In [15]:
discharge_query = f'''
SELECT DISTINCT SUBJECT_ID, HADM_ID, ETHNICITY, INSURANCE, DISCHARGE_LOCATION, ADMITTIME, DISCHTIME FROM read_csv('{DATA_LOCATION}/admissions.csv', types={{'DISCHTIME': 'DATETIME', 'ADMITTIME': 'DATETIME'}}, ignore_errors=True)'''
duckdb.sql(f'''COPY ({discharge_query}) TO '{DATA_LOCATION}/discharge.parquet' (FORMAT parquet);''')

In [18]:
demographics_query = f'''
SELECT DISTINCT SUBJECT_ID, HADM_ID, GENDER, ETHNICITY, CASE WHEN ADMISSION_AGE > 90 THEN 90 ELSE ADMISSION_AGE END AS AGE FROM read_parquet('{DATA_LOCATION}/icustay_detail.parquet')'''
duckdb.sql(f'''COPY ({demographics_query}) TO '{DATA_LOCATION}/demographics.parquet' (FORMAT parquet);''')

In [19]:
oasis_df_query = f'''
SELECT HADM_ID, MAX(OASIS) AS OASIS FROM read_parquet('{DATA_LOCATION}/oasis.parquet') GROUP BY HADM_ID'''
duckdb.sql(f'''COPY ({oasis_df_query}) TO '{DATA_LOCATION}/oasis_df.parquet' (FORMAT parquet);''')