In [None]:

import csv
import math
import pandas as pd
import numpy as np
from pathlib import Path
import duckdb

DATA = Path(r"D:\Data wrangling data\mimic-iv-3.1")  

paths = {
    "icustays":       DATA / "icu"  / "icustays.csv",
    "chartevents":    DATA / "icu"  / "chartevents.csv",
    "procedures_icd": DATA / "hosp" / "procedures_icd.csv",
    "diagnoses_icd":  DATA / "hosp" / "diagnoses_icd.csv",
    "labevents":      DATA / "hosp" / "labevents.csv",
}
con = duckdb.connect("mimic_pipeline.duckdb")
D_ICD = (DATA / "hosp" / "d_icd_procedures.csv").as_posix()

con.execute(f"""
CREATE OR REPLACE VIEW d_icd_procedures AS
SELECT * FROM read_csv_auto('{D_ICD}', header=True, union_by_name=True);
""")


con.execute("PRAGMA threads=8;")                 # use available cores
con.execute("PRAGMA memory_limit='8GB';")        # tune to your machine
con.execute("PRAGMA temp_directory='D:/duckdb_tmp';")  # fast SSD location if possible
con.execute(f"""
CREATE OR REPLACE VIEW icustays AS
SELECT * FROM read_csv_auto('{paths["icustays"]}', union_by_name=True);
""")
con.execute("""
CREATE OR REPLACE TABLE vasc_proc_candidates AS
SELECT DISTINCT
  CAST(icd_version AS INTEGER) AS icd_version,
  TRIM(icd_code) AS icd_code,
  long_title
FROM d_icd_procedures
WHERE
  -- Aortic / aneurysm / endovascular
  lower(long_title) LIKE '%aort%'
  OR lower(long_title) LIKE '%aneurysm%'
  OR lower(long_title) LIKE '%endovascular%'
  OR lower(long_title) LIKE '%stent%'
  OR lower(long_title) LIKE '%graft%'

  -- Bypass / revascularization
  OR lower(long_title) LIKE '%bypass%'
  OR lower(long_title) LIKE '%revascular%'

  -- Carotid / endarterectomy
  OR lower(long_title) LIKE '%carotid%'
  OR lower(long_title) LIKE '%endarterectomy%'

  -- Peripheral vascular
  OR lower(long_title) LIKE '%femoral%'
  OR lower(long_title) LIKE '%iliac%'
  OR lower(long_title) LIKE '%popliteal%'
  OR lower(long_title) LIKE '%tibial%'
  OR lower(long_title) LIKE '%peripheral vascular%';
""")

OUT_CODES = (DATA / "phenotypes" / "vascular_procedure_codes_generated.csv").as_posix()
(DATA / "phenotypes").mkdir(parents=True, exist_ok=True)

con.execute(f"""
COPY (
  SELECT icd_version, icd_code
  FROM vasc_proc_candidates
  ORDER BY icd_version, icd_code
) TO '{OUT_CODES}' (HEADER, DELIMITER ',');
""")

print("Wrote:", OUT_CODES)


print("n candidates:", con.execute("SELECT COUNT(*) FROM vasc_proc_candidates;").fetchone())

con.execute(f"""
CREATE OR REPLACE VIEW chartevents AS
SELECT * FROM read_csv_auto('{paths["chartevents"]}', union_by_name=True);
""")
con.execute("""
COPY (
  SELECT icd_version, icd_code, long_title
  FROM vasc_proc_candidates
  ORDER BY icd_version, icd_code
) TO 'vascular_procedure_candidates_with_titles.csv' (HEADER, DELIMITER ',');
""")

con.execute(f"""
CREATE OR REPLACE VIEW procedures_icd AS
SELECT * FROM read_csv_auto('{paths["procedures_icd"]}', union_by_name=True);
""")

con.execute(f"""
CREATE OR REPLACE VIEW diagnoses_icd AS
SELECT * FROM read_csv_auto('{paths["diagnoses_icd"]}', union_by_name=True);
""")



Wrote: D:/Data wrangling data/mimic-iv-3.1/phenotypes/vascular_procedure_codes_generated.csv
n candidates: (11171,)


<duckdb.duckdb.DuckDBPyConnection at 0x211d7f74630>

In [None]:

con.execute(f"""
CREATE OR REPLACE VIEW labevents AS
SELECT * FROM read_csv(
  '{paths["labevents"]}',
  header=true,
  delim=',',
  quote='"',
  escape='"',
  union_by_name=true,
  strict_mode=true
);
""")

MAP_ITEMIDS = (220052, 220181, 225312)  # ABP mean, NIBP mean, etc.
CREAT_ITEMID = 50912                   # creatinine
CODESET = DATA / "phenotypes" / "vascular_procedure_codes_generated.csv"


con.execute(f"""
CREATE OR REPLACE VIEW vasc_proc_codes AS
SELECT * FROM read_csv_auto('{CODESET}', union_by_name=True);
""")
con.execute("""
CREATE OR REPLACE TABLE vascular_hadm AS
SELECT
    p.subject_id,
    p.hadm_id,
    MIN(p.chartdate) AS first_proc_date
FROM procedures_icd p
INNER JOIN vasc_proc_codes c
  ON p.icd_version = c.icd_version
 AND p.icd_code    = c.icd_code
GROUP BY p.subject_id, p.hadm_id;
""")
con.execute("""
CREATE OR REPLACE TABLE vasc_icu AS
SELECT
    i.subject_id,
    i.hadm_id,
    i.stay_id,
    CAST(i.intime AS TIMESTAMP) AS intime,
    CAST(i.outtime AS TIMESTAMP) AS outtime,
    v.first_proc_date
FROM icustays i
INNER JOIN vascular_hadm v
  ON i.subject_id = v.subject_id
 AND i.hadm_id   = v.hadm_id;
""")
con.execute(f"""
CREATE OR REPLACE TABLE map_events_24h AS
SELECT
  c.subject_id,
  c.hadm_id,
  c.stay_id,
  CAST(c.charttime AS TIMESTAMP) AS charttime,
  c.valuenum AS map_value
FROM chartevents c
INNER JOIN vasc_icu v
  ON c.stay_id = v.stay_id
WHERE c.itemid IN {MAP_ITEMIDS}
  AND c.valuenum IS NOT NULL
  AND c.valuenum > 0 AND c.valuenum < 300
  AND CAST(c.charttime AS TIMESTAMP) >= v.intime
  AND CAST(c.charttime AS TIMESTAMP) <  v.intime + INTERVAL 24 HOUR;
""")
con.execute("""
CREATE OR REPLACE TABLE map_twa_24h AS
WITH ordered AS (
  SELECT
    stay_id,
    charttime AS t_start,
    LEAD(charttime) OVER (PARTITION BY stay_id ORDER BY charttime) AS t_next,
    map_value
  FROM map_events_24h
),
intervals AS (
  SELECT
    stay_id,
    map_value,
    (epoch(t_next) - epoch(t_start)) AS dt_seconds
  FROM ordered
  WHERE t_next IS NOT NULL
    AND t_next > t_start
)
SELECT
  stay_id,
  SUM(map_value * dt_seconds) / NULLIF(SUM(dt_seconds), 0) AS map_twa_24h
FROM intervals
GROUP BY stay_id;
""")

con.execute("""
CREATE OR REPLACE TABLE vasc_icu_with_map AS
SELECT
  v.*,
  m.map_twa_24h,
  CASE WHEN m.map_twa_24h > 75 THEN 1 ELSE 0 END AS map_target_gt75
FROM vasc_icu v
LEFT JOIN map_twa_24h m
  ON v.stay_id = m.stay_id;
""")
con.execute(f"""
CREATE OR REPLACE TABLE creatinine_window AS
SELECT
  v.subject_id,
  v.hadm_id,
  v.stay_id,
  v.intime,
  v.outtime,
  CAST(l.charttime AS TIMESTAMP) AS charttime,
  l.valuenum AS creatinine
FROM vasc_icu_with_map v
INNER JOIN labevents l
  ON v.subject_id = l.subject_id
 AND v.hadm_id   = l.hadm_id
WHERE l.itemid = {CREAT_ITEMID}
  AND l.valuenum IS NOT NULL
  AND CAST(l.charttime AS TIMESTAMP) >= v.intime - INTERVAL 48 HOUR
  AND CAST(l.charttime AS TIMESTAMP) <= v.intime + INTERVAL 7 DAY;
""")
con.execute("""
CREATE OR REPLACE TABLE dx_flags AS
SELECT
  subject_id,
  hadm_id,
  MAX(CASE
        WHEN (icd_version=10 AND icd_code LIKE 'I10%') OR (icd_version=9 AND icd_code LIKE '401%')
        THEN 1 ELSE 0
      END) AS has_htn,
  MAX(CASE
        WHEN (icd_version=10 AND icd_code LIKE 'E11%') OR (icd_version=9 AND icd_code LIKE '250%')
        THEN 1 ELSE 0
      END) AS has_dm
FROM diagnoses_icd
GROUP BY subject_id, hadm_id;
""")
con.execute("""
CREATE OR REPLACE TABLE analysis_base AS
SELECT
  v.*,
  COALESCE(d.has_htn, 0) AS has_htn,
  COALESCE(d.has_dm,  0) AS has_dm
FROM vasc_icu_with_map v
LEFT JOIN dx_flags d
  ON v.subject_id = d.subject_id
 AND v.hadm_id   = d.hadm_id;
""")

print(con.execute("SELECT COUNT(*) AS n_stays FROM analysis_base;").fetchall())
print(con.execute("SELECT COUNT(*) AS n_with_map FROM analysis_base WHERE map_twa_24h IS NOT NULL;").fetchall())
print(con.execute("SELECT map_target_gt75, COUNT(*) FROM analysis_base GROUP BY 1;").fetchall())
print(con.execute("SELECT COUNT(*) AS n_creat_rows FROM creatinine_window;").fetchall())





[(23691,)]
[(23595,)]
[(0, 11366), (1, 12325)]
[(218351,)]


In [None]:
OUT = DATA / "derived"
OUT.mkdir(parents=True, exist_ok=True)

con.execute(f"COPY analysis_base TO '{OUT / 'analysis_base.parquet'}' (FORMAT PARQUET);")
con.execute(f"COPY creatinine_window TO '{OUT / 'creatinine_window.parquet'}' (FORMAT PARQUET);")


con.execute(f"COPY analysis_base TO '{OUT / 'analysis_base.csv'}' (HEADER, DELIMITER ',');")



<duckdb.duckdb.DuckDBPyConnection at 0x211d7f74630>

Columns: [('icd_version', 'INTEGER', 'YES', None, None, None), ('icd_code', 'VARCHAR', 'YES', None, None, None)]
Sample rows: [(9, '0003'), (9, '0016'), (9, '0045'), (9, '0046'), (9, '0047'), (9, '0048'), (9, '0055'), (9, '0058'), (9, '0060'), (9, '0063')]
Row count: 11171


File exists: True
File size (GB): 39.06


RuntimeError: Query interrupted