# ðŸ§ª labs_data_cleaning_param.ipynb

Streaming-friendly lab data cleaner for large MIMIC-III LABEVENTS files.

This notebook:
- Reads LABEVENTS.csv in chunks (memory-safe)
- Filters for CBC tests
- Keeps only rows where FLAG == 'abnormal' or NaN (normal)
- Encodes FLAG as 1 (abnormal) or 0 (normal)
- Formats SUBJECT_ID as 'pXXXXXX'
- Uses configs/labs_cleaning.yaml for settings

In [4]:

import sys, importlib
print('Python:', sys.version.split()[0])
for pkg in ['pandas','numpy','yaml']:
    try:
        m = importlib.import_module(pkg)
        print(pkg, getattr(m, '__version__', 'n/a'))
    except Exception as e:
        print(pkg, 'not installed:', e)


Python: 3.11.9
pandas 2.2.2
numpy 2.0.2
yaml 6.0.1


In [5]:

# Load configuration
from pathlib import Path
import yaml, os

CFG_PATH = os.environ.get("CFG", "configs/labs_cleaning.yaml")
with open(CFG_PATH, "r") as f:
    cfg = yaml.safe_load(f)
L = cfg["labs_cleaning"]
print("Loaded configuration from:", CFG_PATH)
print(L)


Loaded configuration from: configs/labs_cleaning.yaml
{'labs_csv': 'data/raw/LABEVENTS.csv', 'd_labitems_csv': 'data/raw/D_LABITEMS.csv', 'output_clean_csv': 'data/clean/labs_cbc_clean.csv', 'subject_filter_csv': None, 'columns': {'subject_id': ['SUBJECT_ID', 'subject_id'], 'hadm_id': ['HADM_ID', 'hadm_id'], 'charttime': ['CHARTTIME', 'charttime'], 'itemid': ['ITEMID', 'itemid'], 'label': ['LABEL', 'label', 'TESTNAME', 'test_name'], 'value': ['VALUENUM', 'value', 'VALUE'], 'valueuom': ['VALUEUOM', 'valueuom'], 'flag': ['FLAG', 'flag', 'ABNORMAL', 'abnormal', 'RESULTS', 'results']}, 'cbc_labels': ['Hemoglobin', 'Hematocrit', 'WBC', 'Platelet Count', 'RBC', 'MCV', 'MCH', 'MCHC', 'RDW', 'Neutrophils', 'Lymphocytes', 'Monocytes', 'Eosinophils', 'Basophils'], 'cbc_itemids': [], 'flag_keep': ['positive', 'negative', 'abnormal', 'high', 'low'], 'drop_na_value': True, 'dedupe_keys': ['subject_id', 'charttime', 'label'], 'time_format_out': '%Y-%m-%d %H:%M:%S', 'chunksize': 300000, 'streaming': 

In [7]:

# Streaming cleaner (avoids OOM, processes 18GB+ files safely)
import pandas as pd, numpy as np
from pathlib import Path
from csv import QUOTE_MINIMAL

def choose_col(df, candidates, required=False, name=""):
    for c in candidates:
        if c in df.columns:
            return c
    if required:
        raise KeyError(f"Required column not found for {name}: candidates={candidates}")
    return None

def load_d_labitems(path):
    if path and Path(path).exists():
        d = pd.read_csv(path, low_memory=False)
        label_col = next((c for c in ["LABEL","label","TESTNAME","test_name"] if c in d.columns), None)
        itemid_col = next((c for c in ["ITEMID","itemid"] if c in d.columns), None)
        if label_col and itemid_col:
            return d[[itemid_col,label_col]].rename(columns={itemid_col:"ITEMID", label_col:"LABEL"})
    return None

def normalize_flags(s):
    if s is None: return None
    s2 = s.astype(str).str.lower().str.strip()
    return s2.replace({
        "abnormal": "abnormal",
        "high": "high",
        "low": "low",
        "pos": "positive",
        "neg": "negative",
        "positive": "positive",
        "negative": "negative"
    })

Path(Path(L["output_clean_csv"]).parent).mkdir(parents=True, exist_ok=True)

dli = load_d_labitems(L.get("d_labitems_csv"))
cbc_labels = set(L.get("cbc_labels", []))
cbc_itemids = set(L.get("cbc_itemids", []))
if not cbc_itemids and dli is not None and cbc_labels:
    cbc_itemids = set(dli.loc[dli["LABEL"].isin(cbc_labels), "ITEMID"].astype("Int64").dropna().astype(int).tolist())

sample = pd.read_csv(L["labs_csv"], nrows=1000, low_memory=False)
col_subject = choose_col(sample, L["columns"]["subject_id"]) or "SUBJECT_ID"
col_time    = choose_col(sample, L["columns"]["charttime"]) or "CHARTTIME"
col_itemid  = choose_col(sample, L["columns"]["itemid"]) or "ITEMID"
col_label   = choose_col(sample, L["columns"]["label"])
col_value   = choose_col(sample, L["columns"]["value"]) or "VALUENUM"
col_uom     = choose_col(sample, L["columns"]["valueuom"]) or "VALUEUOM"
col_flag    = choose_col(sample, L["columns"]["flag"])

usecols = sorted(set(filter(None, [col_subject, col_time, col_itemid, col_label, col_value, col_uom, col_flag])))
dtype_map = {}
if col_itemid: dtype_map[col_itemid] = "Int64"
if col_value: dtype_map[col_value] = "float32"
if col_subject: dtype_map[col_subject] = "Int64"

wrote_header = False
kept = 0
total = 0
chunksize = int(L.get("chunksize", 300_000))

for chunk in pd.read_csv(L["labs_csv"], usecols=usecols, dtype=dtype_map, chunksize=chunksize, low_memory=True):
    total += len(chunk)
    mask = pd.Series(False, index=chunk.index)
    if cbc_itemids:
        mask = mask | chunk[col_itemid].isin(list(cbc_itemids))
    if col_label and cbc_labels:
        mask = mask | chunk[col_label].astype(str).isin(cbc_labels)
    chunk = chunk[mask]
    if chunk.empty:
        continue

    out = pd.DataFrame({
        "subject_id": chunk[col_subject].astype("Int64"),
        "charttime": pd.to_datetime(chunk[col_time], errors="coerce"),
        "label": chunk[col_label].astype(str) if col_label else None,
        "itemid": chunk[col_itemid].astype("Int64") if col_itemid else None,
        "value": pd.to_numeric(chunk[col_value], errors="coerce") if col_value else None,
        "valueuom": chunk[col_uom].astype(str) if col_uom else None,
        "flag": normalize_flags(chunk[col_flag]) if col_flag else None,
    })

    # Keep only abnormal or NaN flags, encode abnormal->1, NaN->0
    if "flag" in out.columns:
        out["flag"] = out["flag"].astype("string").str.lower().str.strip()
        out = out[(out["flag"] == "abnormal") | (out["flag"].isna())]
        out["flag"] = out["flag"].map({"abnormal": 1}).fillna(0).astype("Int8")

    # Format subject_id as pXXXXXX
    out["subject_id"] = out["subject_id"].apply(lambda x: f"p{int(x):06d}" if pd.notna(x) else x)

    # Drop missing value rows if configured
    if L.get("drop_na_value", True):
        out = out[out["value"].notna()]

    out["charttime"] = out["charttime"].dt.strftime(L.get("time_format_out", "%Y-%m-%d %H:%M:%S"))
    kept += len(out)
    out.to_csv(L["output_clean_csv"], mode="a", index=False, header=(not wrote_header), quoting=QUOTE_MINIMAL)
    wrote_header = True

print(f"âœ… Streaming complete. Scanned ~{total:,} rows; kept ~{kept:,}.")
print(f"Output saved to: {L['output_clean_csv']}")


âœ… Streaming complete. Scanned ~27,854,055 rows; kept ~2,991,947.
Output saved to: data/clean/labs_cbc_clean.csv
