# Data Extraction (DuckDB)

Objective: materialize cohort and 48h modality slices required for feature engineering.

## Workflow
1. Configure database path and connectors
2. Load or derive subject ID cohort
3. Extract first admissions (LOS >=54h filter inline)
4. Extract all admissions (for readmission logic if needed later)
5. Pull demographics
6. Pull 0–48h vitals, labs, prescriptions, procedures

## Notes
- All timing windows use hour deltas from admission start.


In [1]:
# Environment & Imports (local)
import os, sys, pathlib
from importlib.util import find_spec

# Add project root to path if notebook is under notebooks/
ROOT = pathlib.Path(__file__).resolve().parents[1] if '__file__' in globals() else pathlib.Path.cwd().parents[0]
if str(ROOT) not in sys.path: sys.path.insert(0, str(ROOT))

missing = []
for pkg in ['duckdb','pandas','matplotlib']:
    if find_spec(pkg) is None: missing.append(pkg)
if missing:
    print('Missing packages detected:', missing)
    print('Install them (example): pip install ' + ' '.join(missing))

import pandas as pd
import matplotlib.pyplot as plt
try:
    import duckdb
except ModuleNotFoundError:
    raise RuntimeError('duckdb not installed. Please install and re-run.')

print('Imports ready.')

Imports ready.


In [2]:
# Imports & configuration
from pathlib import Path
import pandas as pd, numpy as np
import duckdb, json, math, sys, datetime
from importlib import reload
PROJECT_ROOT = Path.cwd().parent if Path.cwd().name == 'notebooks' else Path.cwd()
DATA_DIR = PROJECT_ROOT / 'data'
ARTIFACTS_DIR = PROJECT_ROOT / 'project' / 'artifacts'
DB_PATH = DATA_DIR / 'mimiciii.duckdb'  # Adjust if your filename differs
SUBJECT_SAMPLE_LIMIT = None  # e.g., 500 for a quick run; set None for full
RANDOM_SEED = 42
print('PROJECT_ROOT:', PROJECT_ROOT)
print('DuckDB path exists:', DB_PATH.exists())
ARTIFACTS_DIR.mkdir(parents=True, exist_ok=True)

PROJECT_ROOT: c:\Users\Almog Luz\Documents\GitHub\mlhc-final-project
DuckDB path exists: True


## 1. Setup
Initialize paths and imports.

In [3]:
cohort_path = DATA_DIR / 'initial_cohort.csv'
if not cohort_path.exists():
    raise FileNotFoundError(f'Missing cohort file: {cohort_path}')
cohort = pd.read_csv(cohort_path)
if 'subject_id' not in cohort.columns:
    raise ValueError('`subject_id` column missing in cohort CSV')
cohort = cohort.dropna(subset=['subject_id'])
cohort['subject_id'] = cohort['subject_id'].astype(int)
if SUBJECT_SAMPLE_LIMIT is not None:
    cohort = cohort.sample(n=min(SUBJECT_SAMPLE_LIMIT, len(cohort)), random_state=RANDOM_SEED)
print('Cohort size (after optional sample):', len(cohort))
cohort.head()

Cohort size (after optional sample): 32513


Unnamed: 0,subject_id
0,22392
1,2847
2,12056
3,25600
4,73125


## 2. First Admissions
Retrieve first admission rows per subject and compute LOS.

In [4]:
con = duckdb.connect(str(DB_PATH))
print('Connected to DuckDB.')
# (Optional) list tables to verify schema
try:
    tbls = con.execute("SHOW TABLES").fetchdf()
    print('Available tables:', tbls['name'].tolist())
except Exception as e:
    print('Could not list tables:', e)

Connected to DuckDB.
Available tables: ['ADMISSIONS', 'CALLOUT', 'CAREGIVERS', 'CHARTEVENTS', 'CPTEVENTS', 'DATETIMEEVENTS', 'DIAGNOSES_ICD', 'DRGCODES', 'D_CPT', 'D_ICD_DIAGNOSES', 'D_ICD_PROCEDURES', 'D_ITEMS', 'D_LABITEMS', 'ICUSTAYS', 'INPUTEVENTS_CV', 'INPUTEVENTS_MV', 'LABEVENTS', 'MICROBIOLOGYEVENTS', 'NOTEEVENTS', 'OUTPUTEVENTS', 'PATIENTS', 'PRESCRIPTIONS', 'PROCEDUREEVENTS_MV', 'PROCEDURES_ICD', 'SERVICES', 'TRANSFERS']
Available tables: ['ADMISSIONS', 'CALLOUT', 'CAREGIVERS', 'CHARTEVENTS', 'CPTEVENTS', 'DATETIMEEVENTS', 'DIAGNOSES_ICD', 'DRGCODES', 'D_CPT', 'D_ICD_DIAGNOSES', 'D_ICD_PROCEDURES', 'D_ITEMS', 'D_LABITEMS', 'ICUSTAYS', 'INPUTEVENTS_CV', 'INPUTEVENTS_MV', 'LABEVENTS', 'MICROBIOLOGYEVENTS', 'NOTEEVENTS', 'OUTPUTEVENTS', 'PATIENTS', 'PRESCRIPTIONS', 'PROCEDUREEVENTS_MV', 'PROCEDURES_ICD', 'SERVICES', 'TRANSFERS']


## 3. All Admissions Reference
Load all admissions for downstream readmission logic.

In [None]:
# Unified extraction + feature build (replaces manual admissions + modality cells)
import importlib
import project.pipelines as pc
pc = importlib.reload(pc)
from project.pipelines import run_training_side_pipeline

subject_ids_all = cohort['subject_id'].astype(int).tolist()
print('Subjects in cohort (pre optional sample):', len(subject_ids_all))

debug = True

features, labels, dbg = run_training_side_pipeline(
        con,
        cohort_subject_ids=subject_ids_all,
        debug=True,
    )

if debug:
    print('--- Pipeline Debug Stats ---')
    for k, v in dbg.items():
        print(f"{k}: {v}")

print('Unified features shape:', features.shape)
features.head(3)

Subjects in cohort (pre optional sample): 32513


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

first_adm_filtered shape: (28340, 14)
Distinct admittime: 28238
Demographics columns: ['subject_id', 'gender', 'dob', 'dod', 'expire_flag'] rows: 28340
Vitals rows / subjects: 20086122 24077
Labs rows / subjects: 3253639 27563
Prescriptions rows / subjects: 582577 23381
Procedures rows / subjects: 65178 8243
DEBUG(build_features): first_adm columns -> ['subject_id', 'hadm_id', 'admittime', 'dischtime', 'deathtime', 'admission_type', 'admission_location', 'discharge_location', 'diagnosis', 'insurance', 'language', 'marital_status', 'ethnicity', 'los_hours']
DEBUG(build_features): demo columns -> ['subject_id', 'gender', 'dob', 'dod', 'expire_flag']
Vitals rows / subjects: 20086122 24077
Labs rows / subjects: 3253639 27563
Prescriptions rows / subjects: 582577 23381
Procedures rows / subjects: 65178 8243
DEBUG(build_features): first_adm columns -> ['subject_id', 'hadm_id', 'admittime', 'dischtime', 'deathtime', 'admission_type', 'admission_location', 'discharge_location', 'diagnosis', 'i

Unnamed: 0_level_0,hematocrit (serum)__range,hematocrit (serum)__max,"rbc, other fluid__min","rbc, other fluid__last","rbc, other fluid__mean","rbc, other fluid__max",hematocrit (serum)__std,"rbc, pleural__max","rbc, pleural__mean","rbc, pleural__last",...,abdominal girth__measured,heparin dose__measured,xigris__measured,abi ankle bp l (impella)__measured,abi brachial bp r (impella)__measured,spont. resp. rate__measured,vad beat rate [left]__measured,vd/vt ratio (40-60%)__measured,zzz stim thresh ma [value]__measured,micro-neb treatment__measured
subject_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
1000,,,,,,,,,,,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
10000,,,,,,,,,,,...,,,,,,,,,,
10003,,,,,,,,,,,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0


## 4. Demographics
Pull patient-level gender, DOB, death date.

In [6]:
# Persist artifacts from unified feature matrix with schema guard (no mode flags)
import json, math
if features is None or features.empty:
    raise RuntimeError('No features produced; cannot persist artifacts.')

# Pre-persist variance guard (stronger than downstream):
_nuniq = features.nunique(dropna=True)
_zero_var = (_nuniq <= 1)
zero_var_count = int(_zero_var.sum())
zero_frac = zero_var_count / len(_nuniq) if len(_nuniq) else 0
print(f'[pre-persist-guard] columns={len(_nuniq)} zero/constant={zero_var_count} ({zero_frac:.2%})')
if zero_var_count == len(_nuniq):
    raise RuntimeError('Refusing to persist: all features are constant (flat matrix).')
if zero_frac > 0.50:
    raise RuntimeError(f'Refusing to persist: >50% ({zero_frac:.1%}) of columns constant; investigate extraction/pruning path.')

feat_path = ARTIFACTS_DIR / 'features_full.parquet'
prov_path = ARTIFACTS_DIR / 'feature_provenance.json'
cols_path = ARTIFACTS_DIR / 'feature_columns.json'
legacy_cols_path = (PROJECT_ROOT / 'project' / 'artifacts2' / 'feature_columns.json')

from project.features import build_feature_provenance

prev_cols = None
if cols_path.exists():
    try:
        prev_cols = json.loads(cols_path.read_text())
    except Exception:
        prev_cols = None
legacy_cols = None
if legacy_cols_path.exists():
    try:
        legacy_cols = json.loads(legacy_cols_path.read_text())
    except Exception:
        legacy_cols = None

current_cols = list(features.columns)
print(f'Current feature count: {len(current_cols)}')
if legacy_cols is not None:
    inter = len(set(current_cols) & set(legacy_cols))
    jacc = inter / len(set(current_cols) | set(legacy_cols))
    print(f'Legacy comparison -> intersection={inter} jaccard={jacc:.4f} legacy_count={len(legacy_cols)}')
    if len(current_cols) < 0.9 * len(legacy_cols):
        raise RuntimeError('Refusing to overwrite artifacts: feature schema size <90% of legacy (bug likely).')

numeric_only = [c for c in current_cols if str(c).isdigit()]
if numeric_only:
    print('Dropping numeric-only columns:', numeric_only)
    features = features.drop(columns=numeric_only, errors='ignore')
    current_cols = [c for c in current_cols if c not in numeric_only]

# Final variance snapshot after any drops
_nuniq2 = features.nunique(dropna=True)
print(f'[pre-persist-guard] post-clean constant columns: {int((_nuniq2<=1).sum())} / {len(_nuniq2)}')

features.to_parquet(feat_path)
prov = build_feature_provenance(features)
prov_path.write_text(json.dumps(prov, indent=2))
cols_path.write_text(json.dumps(current_cols, indent=2))
print('Saved:')
for p in [feat_path, prov_path, cols_path]:
    print('  -', p.resolve())

[pre-persist-guard] columns=1500 zero/constant=0 (0.00%)
Current feature count: 1500
[pre-persist-guard] post-clean constant columns: 0 / 1500
[pre-persist-guard] post-clean constant columns: 0 / 1500
Saved:
  - C:\Users\Almog Luz\Documents\GitHub\mlhc-final-project\project\artifacts\features_full.parquet
  - C:\Users\Almog Luz\Documents\GitHub\mlhc-final-project\project\artifacts\feature_provenance.json
  - C:\Users\Almog Luz\Documents\GitHub\mlhc-final-project\project\artifacts\feature_columns.json
Saved:
  - C:\Users\Almog Luz\Documents\GitHub\mlhc-final-project\project\artifacts\features_full.parquet
  - C:\Users\Almog Luz\Documents\GitHub\mlhc-final-project\project\artifacts\feature_provenance.json
  - C:\Users\Almog Luz\Documents\GitHub\mlhc-final-project\project\artifacts\feature_columns.json


In [7]:
# Persist labels (parallels feature artifact persistence)
if labels is None or labels.empty:
    raise RuntimeError('No labels dataframe returned from pipeline; cannot persist.')

labels_path = ARTIFACTS_DIR / 'labels.csv'
labels.to_csv(labels_path, index=False)
print('Saved labels to:', labels_path.resolve())

# If debug metadata about labels was returned, persist it
if isinstance(dbg, dict) and 'labels_meta' in dbg:
    import json
    meta_path = ARTIFACTS_DIR / 'labels_meta.json'
    try:
        meta_path.write_text(json.dumps(dbg['labels_meta'], indent=2))
        print('Saved labels meta to:', meta_path.resolve())
    except Exception as e:
        print('WARNING: could not persist labels_meta.json ->', e)

# Quick prevalence / integrity summary
id_cols = {'subject_id','hadm_id'}
print('Label columns summary:')
for col in labels.columns:
    if col in id_cols:
        continue
    series = labels[col]
    # Binary prevalence if values are 0/1
    if series.dropna().isin([0,1]).all():
        prev = float(series.mean()) if len(series) else float('nan')
        print(f'  {col}: prevalence={prev:.4f} (n={len(series)})')
    else:
        print(f'  {col}: non-binary or mixed values; describe -> min={series.min()} max={series.max()} nonnull={series.notna().sum()}')
print('Labels persistence complete.')

Saved labels to: C:\Users\Almog Luz\Documents\GitHub\mlhc-final-project\project\artifacts\labels.csv
Label columns summary:
  mortality_label: prevalence=0.1052 (n=28340)
  prolonged_los_label: prevalence=0.5249 (n=28340)
  readmission_label: prevalence=0.0435 (n=28340)
Labels persistence complete.


## 5. 48h Modalities
Vitals, labs, prescriptions, procedures within 0–48h window.

## 6. Summary Snapshots
Preview shape and sample rows for validation.

In [9]:
# 5. Parity check: ensure saved feature_columns.json matches in-memory ordering
import json
schema_path = ARTIFACTS_DIR / 'feature_columns.json'
if schema_path.exists():
    saved_cols = json.loads(schema_path.read_text())
    if list(features.columns) != saved_cols:
        raise AssertionError('Feature column ordering drift detected between in-memory features and saved schema.')
    else:
        print('Parity check passed: feature column order matches saved schema.')
else:
    print('WARNING: feature_columns.json missing; parity check skipped.')

Parity check passed: feature column order matches saved schema.


## 7. Next Steps
- Integrate with feature builder
- Persist cached parquet extracts (optional)
- Proceed to training pipeline notebook