# EMAR Feature Extraction & Risk Factor Analysis

## Description
This script processes the Electronic Medication Administration Record (EMAR) from MIMIC-IV to engineer features related to pharmacological risk factors for Hospital-Acquired Pressure Injuries (HAPI).

## Clinical Justification
Medications are a critical proxy for patient physiological state and immobility levels, directly mapping to the Braden Scale sub-categories:
* **Perfusion (Tissue Tolerance):** "Vasoactive Agents" (e.g., Norepinephrine, Dopamine) cause peripheral vasoconstriction, reducing blood flow to skin and increasing ischemia risk.
* **Sensory Perception:** "Sedatives" (e.g., Propofol, Versed) reduce a patient's ability to feel pressure or pain.
* **Mobility/Activity:** "Opioids" and Sedatives depress the central nervous system, reducing spontaneous body movement and repositioning.

## Inputs & Outputs
* **Input:** `emar.csv` (Linking admission IDs to medication events)
* **Input:** `emar_detail.csv` (Detailed drug names and administration info)
* **Output:** `emar_feat.csv` (Aggregated medication features per admission)

In [None]:
import os
from collections import defaultdict
import pandas as pd
import gc

In [2]:
# Configuration
BASE_DIR = r"D:\School\5141"

EMAR_PATH        = os.path.join(BASE_DIR, "emar.csv", "emar.csv")
EMAR_DETAIL_PATH = os.path.join(BASE_DIR, "emar_detail.csv", "emar_detail.csv")
OUTPUT_PATH      = os.path.join(BASE_DIR, "emar_feat.csv")

# Chunk size for reading emar_detail
CHUNKSIZE = 500_000 

In [3]:
# High-risk medication classes.

# GENERAL HIGH RISK
# Combined list of all drugs known to impact perfusion or mobility.
HIGH_RISK_MED_KEYWORDS = [
    "norepinephrine", "levophed", "epinephrine", "vasopressin",
    "dopamine", "dobutamine", "phenylephrine", "neosynephrine",
    "propofol", "midazolam", "versed", "lorazepam", "ativan",
    "dexmedetomidine", "precedex",
    "fentanyl", "morphine", "hydromorphone", "dilaudid", "oxycodone"
]

# VASOACTIVE AGENTS
# Peripheral Vasoconstriction leads to Ischemia.
VASOACTIVE_KEYWORDS = [
    "norepinephrine", "levophed", "epinephrine", "vasopressin",
    "dopamine", "dobutamine", "phenylephrine", "neosynephrine"
]

# SEDATIVES
# Immobility & Sensory Loss: Patients on these drips cannot move to relieve pressure.
SEDATION_KEYWORDS = [
    "propofol", "midazolam", "versed", "lorazepam", "ativan",
    "dexmedetomidine", "precedex"
]

# OPIOIDS
# CNS Depression: Reduces spontaneous movement during sleep/rest.
OPIOID_KEYWORDS = [
    "fentanyl", "morphine", "hydromorphone", "dilaudid", "oxycodone"
]


In [None]:
# Loader Functions
def load_emar(path: str):
    """
    Loads the base EMAR linking table with strict memory optimization.
    
    Operations:
    1. Reads only the 'emar_id' and 'hadm_id' columns from the massive CSV.
    2. Drops rows where 'hadm_id' is missing (useless for our analysis).
    3. Casts 'hadm_id' to Int64 to allow clean merging.
    
    Args:
        path (str): The file path to `emar.csv`.
        
    Returns:
        pd.DataFrame: A lookup table mapping Medication IDs to Admission IDs.
    """
    # 1. Check columns first
    header = pd.read_csv(path, nrows=0)
    cols = set(header.columns)
    
    use_cols = ["emar_id", "hadm_id"]
    if "subject_id" in cols:
        use_cols.append("subject_id")
        
    # 2. Read ONLY needed columns
    df = pd.read_csv(path, usecols=use_cols, low_memory=False)

    # 3. Drop rows with no admission ID immediately
    df = df.dropna(subset=["hadm_id"])
    df["hadm_id"] = df["hadm_id"].astype("Int64")

    return df

def detect_emar_detail_text_col(path: str):
    """
    Identifies the column containing drug names in the detail file.
    
    Operations:
    1. Reads a small sample (1000 rows) of the detail file.
    2. Iterates through a list of known possible column names.
    3. Returns the first match found.
    
    Arguments:
        path (str): The file path to `emar_detail.csv`.
        
    Returns:
        str: The name of the column containing medication text.
    """

    sample = pd.read_csv(path, low_memory=False, nrows=1000)
    
    candidates = ["product_description", "medication", "drug_name"]
    for c in candidates:
        if c in sample.columns:
            print(f"Using text column: {c}")
            return c
    raise ValueError("Could not find medication text column in emar_detail.")

In [None]:
# Chuck processing function
def build_emar_features_chunked(emar, detail_path, text_col, chunksize):
    """
    Processes the massive EMAR detail file in chunks to extract clinical features.
    
    Operations:
    1. Iterates through emar_detail.csv`in chunks of chunksize rows.
    2. Merges each chunk with ema` to attach the Admission ID (hadm_id).
    3. Normalizes drug names to lowercase.
    4. Scans for high-risk keywords (Vasopressors, Sedatives, Opioids) using vectorized regex.
    5. Aggregates counts (total meds, distinct meds, high-risk meds) into dictionaries.
    6. Converts aggregated dictionaries into a final clean DataFrame.
    
     Arguments:
        emar (pd.DataFrame): The lookup dataframe from load_emar.
        detail_path (str): Path to the detailed medication file.
        text_col (str): The column name containing drug descriptions.
        chunksize (int): Number of rows to process at a time.
        
    Returns:
        pd.DataFrame: A feature matrix with one row per admission, containing
                      counts and binary flags for high-risk medication usage.
    """
    # Accumulators
    num_admin = defaultdict(int)
    distinct_meds = defaultdict(set)
    num_high_risk = defaultdict(int)
    
    has_high_risk = set()
    has_vaso = set()
    has_sed = set()
    has_opioid = set()
    
    # Compile regex patterns
    pat_high = "|".join(HIGH_RISK_MED_KEYWORDS)
    pat_vaso = "|".join(VASOACTIVE_KEYWORDS)
    pat_sed  = "|".join(SEDATION_KEYWORDS)
    pat_op   = "|".join(OPIOID_KEYWORDS)
    
    # Prepare mapping table 
    emar_map = emar[["emar_id", "hadm_id"]].drop_duplicates()
    
    chunk_idx = 0
    
    # Read detail file in chunks
    for chunk in pd.read_csv(detail_path, chunksize=chunksize, usecols=["emar_id", text_col], low_memory=False):
        chunk_idx += 1
        if chunk_idx % 5 == 0:
            gc.collect()
            
        # Merge to attach hadm_id
        chunk = chunk.merge(emar_map, on="emar_id", how="left")
        chunk = chunk.dropna(subset=["hadm_id"])
        
        if chunk.empty: continue
            
        chunk["hadm_id"] = chunk["hadm_id"].astype(int)
        chunk["text"] = chunk[text_col].astype(str).str.lower()
        
        # Count admins
        counts = chunk["hadm_id"].value_counts()
        for hadm, count in counts.items():
            num_admin[hadm] += count
            
        # Distinct meds (store in set)
        for hadm, grp in chunk.groupby("hadm_id")["text"]:
            distinct_meds[hadm].update(grp.unique())
            
        # High Risk Checks (Vectorized)
        mask_high = chunk["text"].str.contains(pat_high, na=False)
        if mask_high.any():
            high_chunk = chunk[mask_high]
            
            # Counts
            h_counts = high_chunk["hadm_id"].value_counts()
            for hadm, count in h_counts.items():
                num_high_risk[hadm] += count
            
            # Flags
            has_high_risk.update(high_chunk["hadm_id"].unique())
            
            # Sub-flags (only check rows that are already high risk)
            if pat_vaso:
                ids = high_chunk.loc[high_chunk["text"].str.contains(pat_vaso, na=False), "hadm_id"]
                has_vaso.update(ids.unique())
            
            if pat_sed:
                ids = high_chunk.loc[high_chunk["text"].str.contains(pat_sed, na=False), "hadm_id"]
                has_sed.update(ids.unique())
                
            if pat_op:
                ids = high_chunk.loc[high_chunk["text"].str.contains(pat_op, na=False), "hadm_id"]
                has_opioid.update(ids.unique())

    # Build Result DataFrame
    all_ids = set(num_admin.keys())
    
    feat = pd.DataFrame({"hadm_id": list(all_ids)})
    feat["hadm_id"] = feat["hadm_id"].astype("Int64")
    feat = feat.set_index("hadm_id")
    
    # Map accumulated values
    feat["num_meds_admin"] = feat.index.map(num_admin).fillna(0).astype(int)
    feat["num_distinct_meds"] = feat.index.map(lambda x: len(distinct_meds.get(x, set()))).astype(int)
    feat["num_high_risk_meds"] = feat.index.map(num_high_risk).fillna(0).astype(int)
    
    # Flags
    feat["has_high_risk_med"] = feat.index.isin(has_high_risk).astype(int)
    feat["has_vasoactive_med"] = feat.index.isin(has_vaso).astype(int)
    feat["has_sedation_med"] = feat.index.isin(has_sed).astype(int)
    feat["has_opioid_med"] = feat.index.isin(has_opioid).astype(int)
    
    feat = feat.reset_index()
    return feat

In [None]:
# Execute 
if __name__ == "__main__":
    emar_map = load_emar(EMAR_PATH)
    text_col = detect_emar_detail_text_col(EMAR_DETAIL_PATH)
    feat_df = build_emar_features_chunked(emar_map, EMAR_DETAIL_PATH, text_col, CHUNKSIZE)
    print(f"Saving {len(feat_df)} rows to {OUTPUT_PATH}")
    feat_df.to_csv(OUTPUT_PATH, index=False)
    print("Done.")