In [None]:
import pandas as pd
import numpy as np
from sqlalchemy import create_engine, text
import warnings
warnings.filterwarnings('ignore')

def connect_db():
    """Establish database connection to MIMIC IV"""
    try:
        engine = create_engine('****')
        print("Successfully connected to MIMIC IV database!")
        return engine
    except Exception as e:
        print(f"Connection failed: {e}")
        return None

def get_clabsi_cohort(engine):
    """
    Get validated CLABSI cases with enhanced organism classification
    and culture validation criteria.
    """
    clabsi_query = text("""
    WITH clabsi_icd AS (
        SELECT DISTINCT hadm_id, MIN(icd_code) as icd_code
        FROM mimiciv_hosp.diagnoses_icd
        WHERE icd_code IN ('99931', '99932', 'T80211A')
        GROUP BY hadm_id
    ),
    microbe_classifications AS (
        SELECT 
            m.hadm_id,
            m.charttime,
            m.org_name,
            COUNT(*) OVER (
                PARTITION BY m.hadm_id, m.org_name
            ) as cultures_of_org,
            COUNT(*) OVER (
                PARTITION BY m.hadm_id, 
                DATE_TRUNC('day', m.charttime),
                m.org_name
            ) as daily_org_cultures,
            CASE
                -- New microbe filter: exclude common contaminants unless >=2 cultures
                WHEN m.org_name IN ('Bacillus species', 'Micrococcus species')
                     AND COUNT(*) OVER (
                         PARTITION BY m.hadm_id, 
                         DATE_TRUNC('day', m.charttime),
                         m.org_name
                     ) < 2 THEN 'Contaminant'
                
                -- Rest of existing classification logic
                WHEN m.org_name IN (
                    'STAPH AUREUS COAG +', 'ESCHERICHIA COLI',
                    'KLEBSIELLA PNEUMONIAE', 'PSEUDOMONAS AERUGINOSA',
                    'ENTEROCOCCUS FAECIUM', 'CANDIDA ALBICANS'
                ) THEN 'Primary Pathogen'
                
                WHEN m.org_name IN (
                    'STAPHYLOCOCCUS, COAGULASE NEGATIVE',
                    'STAPHYLOCOCCUS EPIDERMIDIS'
                ) AND COUNT(*) OVER (
                    PARTITION BY m.hadm_id, 
                    DATE_TRUNC('day', m.charttime),
                    m.org_name
                ) >= 2 THEN 'Validated Common Contaminant'
                
                WHEN m.org_name LIKE 'CANDIDA%' OR
                     m.org_name IN (
                        'ENTEROCOCCUS FAECALIS',
                        'SERRATIA MARCESCENS',
                        'BACTEROIDES FRAGILIS GROUP'
                     ) THEN 'Other Pathogen'
                ELSE 'Requires Review'
            END as organism_classification
        FROM mimiciv_hosp.microbiologyevents m
        WHERE m.spec_type_desc = 'BLOOD CULTURE'
        AND m.org_name IS NOT NULL
    ),
    validated_cases AS (
        SELECT DISTINCT
            i.subject_id,
            i.hadm_id,
            i.stay_id,
            i.intime as icu_admission,
            i.outtime as icu_discharge,
            l.starttime as line_start,
            l.endtime as line_end,
            l.line_type,
            EXTRACT(EPOCH FROM (l.endtime - l.starttime))/86400 as line_duration_days,
            m.charttime as infection_time,
            m.org_name,
            m.cultures_of_org,
            m.daily_org_cultures,
            m.organism_classification
        FROM mimiciv_icu.icustays i
        INNER JOIN clabsi_icd c ON i.hadm_id = c.hadm_id
        INNER JOIN mimiciv_derived.invasive_line l ON i.stay_id = l.stay_id
        INNER JOIN microbe_classifications m ON i.hadm_id = m.hadm_id
        WHERE
            -- New line validation filter
            l.line_type IN (
                'PICC', 'Multi Lumen', 'Dialysis', 'Triple Introducer',
                'Pre-Sep', 'Hickman', 'Portacath', 'Cordis/Introducer',
                'Continuous Cardiac Output PA', 'PA'
            )
            AND l.line_type NOT IN ('Peripheral IV', 'Arterial Line')
            
            AND m.charttime > l.starttime
            AND m.charttime <= l.starttime + INTERVAL '14 days'
            AND EXTRACT(EPOCH FROM (l.endtime - l.starttime))/86400 >= 2
            AND m.organism_classification IN (
                'Primary Pathogen',
                'Validated Common Contaminant',
                'Other Pathogen'
            )
    )
    SELECT *,
        CASE
            WHEN organism_classification = 'Primary Pathogen' THEN 'Confirmed CLABSI'
            WHEN organism_classification = 'Validated Common Contaminant' 
                 AND daily_org_cultures >= 2 THEN 'Confirmed CLABSI'
            WHEN organism_classification = 'Other Pathogen' 
                 AND cultures_of_org >= 2 THEN 'Confirmed CLABSI'
            ELSE 'Requires Review'
        END as clabsi_status
    FROM validated_cases v
    WHERE line_duration_days >= 2
    ORDER BY stay_id, infection_time;
    """)
    
    clabsi_df = pd.read_sql(clabsi_query, engine)
    return clabsi_df

# Initialize database connection and load cohort
print("Loading CLABSI cohort...")
engine = connect_db()
if engine:
    clabsi_cohort = get_clabsi_cohort(engine)


In [None]:
def get_control_cohort(engine, clabsi_cohort, matching_ratio=4):
    # Calculate target numbers based on unique ICU stays per line type
    unique_stays = clabsi_cohort.drop_duplicates('stay_id')[['stay_id', 'line_type']]
    line_targets = unique_stays['line_type'].value_counts() * matching_ratio
    
    control_query = text("""
    WITH line_patients AS (
        -- Get all patients with qualifying central lines 
        SELECT DISTINCT
            i.subject_id,
            i.hadm_id,
            i.stay_id,
            i.intime as icu_admission,
            i.outtime as icu_discharge,
            l.starttime as line_start,
            l.endtime as line_end,
            l.line_type,
            EXTRACT(EPOCH FROM (l.endtime - l.starttime))/86400 as line_duration_days,
            -- Add demographics
            p.gender,
            p.anchor_age + EXTRACT(EPOCH FROM adm.admittime - MAKE_TIMESTAMP(p.anchor_year, 1, 1, 0, 0, 0))/31556908.8 AS admission_age,
            adm.race,
            adm.admission_type
        FROM mimiciv_icu.icustays i
        INNER JOIN mimiciv_derived.invasive_line l 
            ON i.stay_id = l.stay_id
        INNER JOIN mimiciv_hosp.admissions adm 
            ON i.hadm_id = adm.hadm_id
        INNER JOIN mimiciv_hosp.patients p
            ON i.subject_id = p.subject_id
        WHERE l.line_type IN (
            'PICC', 'Multi Lumen', 'Dialysis', 'Triple Introducer',
            'Pre-Sep', 'Hickman', 'Portacath', 'Cordis/Introducer',
            'Continuous Cardiac Output PA', 'PA'
        )
        AND EXTRACT(EPOCH FROM (l.endtime - l.starttime))/86400 >= 2
    ),
    excluded_patients AS (
        -- Exclude CLABSI cases and positive cultures
        SELECT DISTINCT p.stay_id
        FROM line_patients p
        LEFT JOIN mimiciv_hosp.diagnoses_icd d 
            ON p.hadm_id = d.hadm_id
        LEFT JOIN mimiciv_hosp.microbiologyevents m
            ON p.hadm_id = m.hadm_id
            AND m.charttime BETWEEN p.line_start AND p.line_end
        WHERE d.icd_code IN ('99931', '99932', 'T80211A')
        OR (
            m.spec_type_desc = 'BLOOD CULTURE'
            AND m.org_name IS NOT NULL
        )
    ),
    eligible_controls AS (
        SELECT 
            p.*,
            CASE 
                WHEN p.line_type = 'Multi Lumen' THEN :multi_lumen_target
                WHEN p.line_type = 'Dialysis' THEN :dialysis_target
                WHEN p.line_type = 'PICC' THEN :picc_target
                WHEN p.line_type = 'PA' THEN :pa_target
                WHEN p.line_type = 'Cordis/Introducer' THEN :cordis_target
                WHEN p.line_type = 'Hickman' THEN :hickman_target
                WHEN p.line_type = 'Portacath' THEN :portacath_target
                WHEN p.line_type = 'Continuous Cardiac Output PA' THEN :cco_pa_target
                ELSE 40
            END as target_n,
            ROW_NUMBER() OVER (
                PARTITION BY p.line_type 
                ORDER BY random()
            ) as type_rank
        FROM line_patients p
        WHERE NOT EXISTS (
            SELECT 1 
            FROM excluded_patients e 
            WHERE p.stay_id = e.stay_id
        )
    )
    SELECT *
    FROM eligible_controls
    WHERE type_rank <= target_n
    ORDER BY line_type, type_rank;
    """)
    
    # Execute query with parameters
    controls_df = pd.read_sql(
        control_query, 
        engine,
        params={
            'multi_lumen_target': int(line_targets.get('Multi Lumen', 0)),
            'dialysis_target': int(line_targets.get('Dialysis', 0)),
            'picc_target': int(line_targets.get('PICC', 0)),
            'pa_target': int(line_targets.get('PA', 0)),
            'cordis_target': int(line_targets.get('Cordis/Introducer', 0)),
            'hickman_target': int(line_targets.get('Hickman', 0)),
            'portacath_target': int(line_targets.get('Portacath', 0)),
            'cco_pa_target': int(line_targets.get('Continuous Cardiac Output PA', 0))
        }
    )
    
    # Print summary
    print("\nControl Cohort Summary:")
    print("-" * 30)
    print(f"Total controls: {len(controls_df)}")
    print(f"Unique patients: {controls_df['subject_id'].nunique()}")
    print(f"Unique ICU stays: {controls_df['stay_id'].nunique()}")
    
    print("\nMatching Ratios by Line Type:")
    unique_clabsi = unique_stays['line_type'].value_counts()
    unique_controls = controls_df['line_type'].value_counts()
    ratios = pd.DataFrame({
        'CLABSI_stays': unique_clabsi,
        'Control_stays': unique_controls,
        'Ratio': unique_controls / unique_clabsi
    }).round(2)
    print(ratios)
    
    return controls_df

# Get matched controls
if 'clabsi_cohort' in locals():
    control_cohort = get_control_cohort(engine, clabsi_cohort)

In [None]:
def get_demographic_features(engine, cohort_df):
    demo_query = text("""
    WITH demographics AS (
        SELECT DISTINCT
            icu.stay_id,
            icu.hadm_id,
            icu.subject_id,
            -- Demographics
            p.gender,
            p.anchor_age + EXTRACT(EPOCH FROM adm.admittime - MAKE_TIMESTAMP(p.anchor_year, 1, 1, 0, 0, 0))/31556908.8 AS admission_age,
            -- Simplified ethnicity grouping
            CASE
                WHEN adm.race LIKE '%BLACK%' OR adm.race LIKE '%AFRICAN%' THEN 'BLACK'
                WHEN adm.race LIKE '%ASIAN%' THEN 'ASIAN'
                WHEN adm.race LIKE '%HISPANIC%' OR adm.race LIKE '%LATINO%' THEN 'HISPANIC'
                WHEN adm.race LIKE '%WHITE%' THEN 'WHITE'
                ELSE 'OTHER'
            END AS ethnicity
        FROM mimiciv_icu.icustays icu
        INNER JOIN mimiciv_hosp.admissions adm 
            ON icu.hadm_id = adm.hadm_id
        INNER JOIN mimiciv_hosp.patients p 
            ON icu.subject_id = p.subject_id
        WHERE icu.stay_id IN :stay_ids
    ),
    comorbidities AS (
        SELECT 
            icu.stay_id,
            MAX(CASE WHEN d.icd_code LIKE 'I10%' OR d.icd_code LIKE '401%' THEN 1 ELSE 0 END) as has_htn,
            MAX(CASE WHEN d.icd_code LIKE 'J44%' OR d.icd_code LIKE '496%' THEN 1 ELSE 0 END) as has_copd,
            MAX(CASE WHEN d.icd_code LIKE 'K70%' OR d.icd_code LIKE 'K72%' OR d.icd_code LIKE 'K76%' THEN 1 ELSE 0 END) as has_liver,
            MAX(CASE WHEN d.icd_code LIKE 'I21%' OR d.icd_code LIKE 'I22%' OR d.icd_code LIKE '410%' THEN 1 ELSE 0 END) as has_mi,
            MAX(CASE WHEN d.icd_code LIKE 'I50%' OR d.icd_code LIKE '428%' THEN 1 ELSE 0 END) as has_chf,
            MAX(CASE WHEN d.icd_code LIKE 'I63%' OR d.icd_code LIKE '433%' OR d.icd_code LIKE '434%' THEN 1 ELSE 0 END) as has_cva,
            MAX(CASE WHEN d.icd_code LIKE 'C%' OR d.icd_code LIKE '14%' OR d.icd_code LIKE '20%' THEN 1 ELSE 0 END) as has_cancer,
            MAX(CASE WHEN d.icd_code LIKE 'B20%' OR d.icd_code LIKE 'B21%' OR d.icd_code LIKE '042%' THEN 1 ELSE 0 END) as has_aids,
            MAX(CASE WHEN d.icd_code LIKE 'E11%' OR d.icd_code LIKE '250%' THEN 1 ELSE 0 END) as has_diabetes
        FROM mimiciv_icu.icustays icu
        LEFT JOIN mimiciv_hosp.diagnoses_icd d
            ON icu.hadm_id = d.hadm_id
        WHERE icu.stay_id IN :stay_ids
        GROUP BY icu.stay_id
    )
    SELECT 
        d.*,
        COALESCE(c.has_htn, 0) as has_htn,
        COALESCE(c.has_copd, 0) as has_copd,
        COALESCE(c.has_liver, 0) as has_liver,
        COALESCE(c.has_mi, 0) as has_mi,
        COALESCE(c.has_chf, 0) as has_chf,
        COALESCE(c.has_cva, 0) as has_cva,
        COALESCE(c.has_cancer, 0) as has_cancer,
        COALESCE(c.has_aids, 0) as has_aids,
        COALESCE(c.has_diabetes, 0) as has_diabetes
    FROM demographics d
    LEFT JOIN comorbidities c ON d.stay_id = c.stay_id
    ORDER BY d.stay_id;
    """)
    
    # Execute query with parameters
    stay_ids = tuple(int(x) for x in cohort_df['stay_id'].unique())
    demo_df = pd.read_sql(demo_query, engine, params={'stay_ids': stay_ids})
    
    # Create binary features from gender and ethnicity
    demo_df['gender'] = (demo_df['gender'] == 'M').astype(int)
    demo_df = pd.get_dummies(demo_df, columns=['ethnicity'], prefix='ethnicity')
    
    print("\nDemographic Features Summary:")
    print("-" * 30)
    print(f"Features extracted for {len(demo_df)} cases")
    
    print("\nNumerical Feature Statistics:")
    print(demo_df[['admission_age']].describe().round(2))
    
    print("\nComorbidity Frequencies:")
    comorbidity_cols = [col for col in demo_df.columns if col.startswith('has_')]
    print(demo_df[comorbidity_cols].sum().sort_values(ascending=False))
    
    return demo_df

if 'clabsi_cohort' in locals():
    print("Testing demographic feature extraction...")
    try:
        demographic_features = get_demographic_features(engine, clabsi_cohort)
        print("\nShape of extracted features:", demographic_features.shape)
        print("\nFeature names:")
        print(demographic_features.columns.tolist())
        print("\nMissing values summary:")
        print(demographic_features.isnull().sum()[demographic_features.isnull().sum() > 0])
    except Exception as e:
        print("Error during feature extraction:", str(e))
else:
    print("Please run the CLABSI cohort creation first")

In [None]:
def get_device_features(engine, cohort_df):
   device_query = text("""
   WITH device_timing AS (
       -- Get infection timing for each case
       SELECT 
           icu.stay_id,
           icu.hadm_id,
           icu.subject_id,
           COALESCE(
               (SELECT MIN(charttime) 
               FROM mimiciv_hosp.microbiologyevents me 
               WHERE me.hadm_id = icu.hadm_id 
               AND me.org_name IS NOT NULL 
               AND me.spec_type_desc = 'BLOOD CULTURE'),
               icu.outtime
           ) as end_time
       FROM mimiciv_icu.icustays icu
       WHERE icu.stay_id IN :stay_ids
   ),
   trach_status AS (
       -- Identify tracheostomy before infection
       SELECT DISTINCT
           dt.stay_id,
           MAX(CASE WHEN 
               LOWER(ce.value) IN ('tracheostomy tube', 'trach mask')
               AND ce.charttime < dt.end_time
           THEN 1 ELSE 0 END) as has_tracheostomy
       FROM device_timing dt
       LEFT JOIN mimiciv_icu.chartevents ce ON dt.stay_id = ce.stay_id
       LEFT JOIN mimiciv_icu.d_items di ON ce.itemid = di.itemid
       WHERE (LOWER(di.label) LIKE '%o2%' OR LOWER(di.label) LIKE '%delivery%')
       GROUP BY dt.stay_id
   ),
   ostomy_status AS (
        -- Identify ostomies before infection using specific itemids
        SELECT 
            dt.stay_id,
            MAX(CASE WHEN 
                ce.itemid IN (
                    227458,  -- Ostomy
                    227637,  -- Ostomy Care
                    228341,  -- Ostomy Bag
                    228342,  -- Ostomy Output
                    228343,  -- Ostomy Type
                    228344,  -- Ostomy/RNWL
                    228345,  -- Ostomy/RNTL
                    228346,  -- Ostomy/RNY
                    228347   -- Ostomy/ROY
                )
                AND ce.charttime < dt.end_time
            THEN 1 ELSE 0 END) as has_ostomy
        FROM device_timing dt
        LEFT JOIN mimiciv_icu.chartevents ce 
            ON dt.stay_id = ce.stay_id
        GROUP BY dt.stay_id
    )
   SELECT 
       dt.stay_id,
       COALESCE(t.has_tracheostomy, 0) as has_tracheostomy,
       COALESCE(o.has_ostomy, 0) as has_ostomy
   FROM device_timing dt
   LEFT JOIN trach_status t ON dt.stay_id = t.stay_id
   LEFT JOIN ostomy_status o ON dt.stay_id = o.stay_id
   ORDER BY dt.stay_id;
   """)
   
   # Execute query with parameters
   stay_ids = tuple(int(x) for x in cohort_df['stay_id'].unique())
   device_df = pd.read_sql(device_query, engine, params={'stay_ids': stay_ids})
   
   print("\nDevice Features Summary:")
   print("-" * 30)
   print(f"Features extracted for {len(device_df)} cases")
   
   print("\nDevice Frequencies:")
   device_cols = ['has_tracheostomy', 'has_ostomy']
   print(device_df[device_cols].sum().sort_values(ascending=False))
   
   return device_df

# Test the function if clabsi_cohort exists
if 'clabsi_cohort' in locals():
   print("Testing device feature extraction...")
   try:
       device_features = get_device_features(engine, clabsi_cohort)
       print("\nShape of extracted features:", device_features.shape)
       print("\nFeature names:")
       print(device_features.columns.tolist())
       print("\nMissing values summary:")
       print(device_features.isnull().sum()[device_features.isnull().sum() > 0])
   except Exception as e:
       print("Error during feature extraction:", str(e))
else:
   print("Please run the CLABSI cohort creation first")

In [None]:
def get_line_care(engine, cohort_df):
    line_care_query = text("""
    WITH line_events AS (
        SELECT 
            ie.stay_id,
            ie.hadm_id,
            il.line_type,
            il.line_site,
            CASE 
                WHEN LOWER(il.line_site) LIKE '%femoral%' THEN 'Femoral'
                WHEN LOWER(il.line_site) LIKE '%subclavian%' THEN 'Subclavian'
                WHEN LOWER(il.line_site) LIKE '%jugular%' OR LOWER(il.line_site) LIKE '%ij%' THEN 'Internal Jugular'
                WHEN LOWER(il.line_site) LIKE '%picc%' OR LOWER(il.line_site) LIKE '%peripherally inserted%' THEN 'PICC'
                ELSE 'Other'
            END as site_category,
            CASE 
                WHEN LOWER(il.line_site) LIKE '%femoral%' THEN 'High Risk'
                WHEN LOWER(il.line_site) LIKE '%subclavian%' THEN 'Low Risk'
                WHEN LOWER(il.line_site) LIKE '%jugular%' OR LOWER(il.line_site) LIKE '%ij%' THEN 'Moderate Risk'
                ELSE 'Standard Risk'
            END as risk_category,
            il.starttime,
            il.endtime,
            EXTRACT(EPOCH FROM (il.endtime - il.starttime))/86400.0 as line_duration_days,
            LEAD(il.starttime) OVER (
                PARTITION BY ie.stay_id, il.line_type 
                ORDER BY il.starttime
            ) as next_line_start,
            LAG(il.endtime) OVER (
                PARTITION BY ie.stay_id, il.line_type 
                ORDER BY il.starttime
            ) as prev_line_end
        FROM mimiciv_icu.icustays ie
        INNER JOIN mimiciv_derived.invasive_line il 
            ON ie.stay_id = il.stay_id
        WHERE ie.stay_id IN :stay_ids
        AND il.line_type IN (
            'PICC', 'Multi Lumen', 'Dialysis', 'Triple Introducer',
            'Pre-Sep', 'Hickman', 'Portacath', 'Cordis/Introducer',
            'Continuous Cardiac Output PA', 'PA'
        )
    ),
    site_complications AS (
        SELECT 
            ce.stay_id,
            il.line_type,
            il.line_site,
            COUNT(CASE 
                WHEN LOWER(ce.value) LIKE '%redness%' OR LOWER(ce.value) LIKE '%erythema%' THEN 1 
            END) as redness_count,
            COUNT(CASE 
                WHEN LOWER(ce.value) LIKE '%drainage%' OR LOWER(ce.value) LIKE '%purulent%' OR LOWER(ce.value) LIKE '%exudate%' THEN 1 
            END) as drainage_count,
            COUNT(CASE 
                WHEN LOWER(ce.value) LIKE '%tender%' OR LOWER(ce.value) LIKE '%pain%' THEN 1 
            END) as tenderness_count,
            COUNT(CASE 
                WHEN LOWER(ce.value) LIKE '%swelling%' OR LOWER(ce.value) LIKE '%edema%' THEN 1 
            END) as swelling_count,
            COUNT(CASE 
                WHEN LOWER(ce.value) LIKE '%blood%' OR LOWER(ce.value) LIKE '%oozing%' THEN 1 
            END) as bleeding_count,
            COUNT(CASE 
                WHEN LOWER(ce.value) LIKE '%infiltrat%' OR LOWER(ce.value) LIKE '%migration%' OR LOWER(ce.value) LIKE '%dislodg%' THEN 1 
            END) as severe_complication_count
        FROM mimiciv_derived.invasive_line il
        INNER JOIN mimiciv_icu.chartevents ce 
            ON il.stay_id = ce.stay_id
            AND ce.charttime BETWEEN il.starttime AND il.endtime
        WHERE ce.itemid IN (
            224188, 224269, 224267, 224270, 224263, 224264, 224268, 225315
        )
        AND il.line_type IN (
            'PICC', 'Multi Lumen', 'Dialysis', 'Triple Introducer',
            'Pre-Sep', 'Hickman', 'Portacath', 'Cordis/Introducer',
            'Continuous Cardiac Output PA', 'PA'
        )
        AND ce.stay_id IN :stay_ids
        GROUP BY ce.stay_id, il.line_type, il.line_site
    )
    SELECT 
        le.stay_id,
        le.line_type,
        le.site_category,
        le.risk_category,
        COUNT(DISTINCT le.line_site) as unique_sites,
        SUM(CASE WHEN le.site_category = 'Femoral' THEN 1 ELSE 0 END) as femoral_count,
        SUM(CASE WHEN le.site_category = 'Subclavian' THEN 1 ELSE 0 END) as subclavian_count,
        SUM(CASE WHEN le.site_category = 'Internal Jugular' THEN 1 ELSE 0 END) as ij_count,
        SUM(CASE WHEN le.site_category = 'PICC' THEN 1 ELSE 0 END) as picc_count,
        MIN(le.line_duration_days) as shortest_duration,
        MAX(le.line_duration_days) as longest_duration,
        AVG(le.line_duration_days) as avg_duration,
        COALESCE(MAX(sc.redness_count), 0) as total_redness_events,
        COALESCE(MAX(sc.drainage_count), 0) as total_drainage_events,
        COALESCE(MAX(sc.tenderness_count), 0) as total_tenderness_events,
        COALESCE(MAX(sc.swelling_count), 0) as total_swelling_events,
        COALESCE(MAX(sc.bleeding_count), 0) as total_bleeding_events,
        COALESCE(MAX(sc.severe_complication_count), 0) as total_severe_complications
    FROM line_events le
    LEFT JOIN site_complications sc 
        ON le.stay_id = sc.stay_id 
        AND le.line_type = sc.line_type
        AND le.line_site = sc.line_site
    GROUP BY 
        le.stay_id,
        le.line_type,
        le.site_category,
        le.risk_category
    ORDER BY 
        le.stay_id, 
        le.line_type;
    """)

    # Convert stay IDs for SQL parameterization
    stay_ids = tuple(int(x) for x in cohort_df['stay_id'].unique())

    # Execute query and return the DataFrame
    line_care_df = pd.read_sql(line_care_query, engine, params={'stay_ids': stay_ids})
    
    # Print summary stats
    print("\nCentral Line Care Metrics Summary:")
    print("-" * 40)
    print(f"Total unique stays: {line_care_df['stay_id'].nunique()}")
    print(f"Line types captured: {line_care_df['line_type'].nunique()}")
    print("\nRisk Category Distribution:")
    print(line_care_df['risk_category'].value_counts(dropna=False))
    
    return line_care_df

# Call the function
line_care_df = get_line_care(engine, cohort_df)
display(line_care_df.head())


In [None]:
def get_lab_values(engine, cohort_df):
    lab_query = text("""
    WITH base_labs AS (
        -- Complete Blood Count
        SELECT 
            ie.stay_id,
            cb.charttime,
            'CBC' as panel,
            cb.wbc,
            cb.hemoglobin,
            cb.platelet,
            NULL::double precision as creatinine,
            NULL::double precision as sodium,
            NULL::double precision as potassium,
            NULL::double precision as chloride,
            NULL::double precision as bicarbonate,
            NULL::double precision as inr,
            NULL::double precision as pt,
            NULL::double precision as ptt
        FROM mimiciv_icu.icustays ie
        INNER JOIN mimiciv_derived.complete_blood_count cb
            ON ie.subject_id = cb.subject_id
        WHERE ie.stay_id = ANY(:stay_ids)
            
        UNION ALL
        
        -- Chemistry Panel
        SELECT 
            ie.stay_id,
            ch.charttime,
            'CHEM' as panel,
            NULL::double precision as wbc,
            NULL::double precision as hemoglobin,
            NULL::double precision as platelet,
            ch.creatinine,
            ch.sodium,
            ch.potassium,
            ch.chloride,
            ch.bicarbonate,
            NULL::double precision as inr,
            NULL::double precision as pt,
            NULL::double precision as ptt
        FROM mimiciv_icu.icustays ie
        INNER JOIN mimiciv_derived.chemistry ch
            ON ie.subject_id = ch.subject_id
        WHERE ie.stay_id = ANY(:stay_ids)
            
        UNION ALL
        
        -- Coagulation Panel
        SELECT 
            ie.stay_id,
            cg.charttime,
            'COAG' as panel,
            NULL::double precision as wbc,
            NULL::double precision as hemoglobin,
            NULL::double precision as platelet,
            NULL::double precision as creatinine,
            NULL::double precision as sodium,
            NULL::double precision as potassium,
            NULL::double precision as chloride,
            NULL::double precision as bicarbonate,
            cg.inr,
            cg.pt,
            cg.ptt
        FROM mimiciv_icu.icustays ie
        INNER JOIN mimiciv_derived.coagulation cg
            ON ie.subject_id = cg.subject_id
        WHERE ie.stay_id = ANY(:stay_ids)
    )
    SELECT 
        stay_id,
        -- CBC metrics
        MIN(wbc) as wbc_min,
        MAX(wbc) as wbc_max,
        AVG(wbc) as wbc_mean,
        MIN(platelet) as plt_min,
        MAX(platelet) as plt_max,
        AVG(platelet) as plt_mean,
        MIN(hemoglobin) as hgb_min,
        MAX(hemoglobin) as hgb_max,
        AVG(hemoglobin) as hgb_mean,
        
        -- Chemistry metrics
        MIN(creatinine) as creat_min,
        MAX(creatinine) as creat_max,
        AVG(creatinine) as creat_mean,
        MIN(sodium) as sodium_min,
        MAX(sodium) as sodium_max,
        AVG(sodium) as sodium_mean,
        
        -- Coagulation metrics
        MIN(inr) as inr_min,
        MAX(inr) as inr_max,
        AVG(inr) as inr_mean,
        MIN(pt) as pt_min,
        MAX(pt) as pt_max,
        AVG(pt) as pt_mean,
        
        -- Lab counts
        COUNT(DISTINCT charttime) as lab_count,
        SUM(CASE WHEN wbc > 12 THEN 1 ELSE 0 END) as high_wbc_count,
        SUM(CASE WHEN wbc < 4 THEN 1 ELSE 0 END) as low_wbc_count,
        SUM(CASE WHEN platelet < 150 THEN 1 ELSE 0 END) as low_plt_count
    FROM base_labs
    GROUP BY stay_id
    ORDER BY stay_id;
    """)

    
    stay_ids = [int(x) for x in cohort_df['stay_id'].unique()]
    
    
    lab_df = pd.read_sql(lab_query, engine, params={'stay_ids': stay_ids})
    
    if len(lab_df) > 0:
        print("\nLaboratory Values Summary:")
        print("-" * 30)
        print(f"Features extracted for {len(lab_df)} cases")
        
        if len(lab_df) > 1:  # Only show these stats if we have multiple rows
            print("\nLab Measurement Frequencies:")
            print(lab_df['lab_count'].describe().round(2))
            
            print("\nKey Lab Value Statistics:")
            key_metrics = ['wbc_mean', 'plt_mean', 'hgb_mean',
                         'high_wbc_count', 'low_plt_count']
            print(lab_df[key_metrics].describe().round(2))
    
    return lab_df


In [None]:
def get_all_severity_scores(engine, cohort_df):
    
    queries = {
        'sofa': """
        SELECT
            stay_id,
            sofa AS sofa_score,
            respiration,
            coagulation,
            liver,
            cardiovascular,
            cns,
            renal
        FROM mimiciv_derived.first_day_sofa
        WHERE stay_id = ANY(:stay_ids)
        """,
        'apsiii': """
        -- Integrated APS-III SQL logic
        WITH apsiii_cte AS (
            SELECT
                ie.subject_id,
                ie.hadm_id,
                ie.stay_id,
                apsiii,
                CAST(1 AS DOUBLE PRECISION) / (
                    1 + EXP(-(-4.4360 + 0.04726 * (apsiii)))
                ) AS apsiii_prob
            FROM mimiciv_icu.icustays AS ie
            LEFT JOIN mimiciv_derived.apsiii AS apsiii_table
            ON ie.stay_id = apsiii_table.stay_id
        )
        SELECT * FROM apsiii_cte
        WHERE stay_id = ANY(:stay_ids);
        """,
        'sapsii': """
        -- Integrated SAPS-II SQL logic
        WITH sapsii_cte AS (
            SELECT
                s.subject_id,
                s.hadm_id,
                s.stay_id,
                sapsii,
                CAST(1 AS DOUBLE PRECISION) / (
                    1 + EXP(-(-7.7631 + 0.0737 * (sapsii) + 0.9971 * LN(sapsii + 1)))
                ) AS sapsii_prob
            FROM mimiciv_derived.sapsii AS s
        )
        SELECT * FROM sapsii_cte
        WHERE stay_id = ANY(:stay_ids);
        """
    }

 
    stay_ids = [int(x) for x in cohort_df['stay_id'].unique()]

    
    results = {}

   
    for score_type, query in queries.items():
        try:
            print(f"Retrieving {score_type.upper()} scores...")
            results[score_type] = pd.read_sql(
                text(query),
                engine,
                params={'stay_ids': stay_ids}
            )
            print(f"Successfully retrieved {len(results[score_type])} rows for {score_type.upper()} scores.")
        except Exception as e:
            print(f"Error retrieving {score_type.upper()} scores: {e}")
            results[score_type] = None

    return results


severity_scores = get_all_severity_scores(engine, clabsi_cohort)

if severity_scores['sofa'] is not None:
    print("\nFirst-Day SOFA Scores:")
    display(severity_scores['sofa'].head())

if severity_scores['apsiii'] is not None:
    print("\nAPS-III Scores:")
    display(severity_scores['apsiii'].head())

if severity_scores['sapsii'] is not None:
    print("\nSAPS-II Scores:")
    display(severity_scores['sapsii'].head())



In [None]:
def get_line_type_risk_analysis(clabsi_cohort):
    """
    Analyze total line duration instead of duration before CLABSI
    """
    # Calculate total line duration
    clabsi_cohort['line_duration_total'] = (
        (clabsi_cohort['line_end'] - clabsi_cohort['line_start']).dt.total_seconds() / 86400
    )
    
    # Remove invalid durations
    clabsi_cohort = clabsi_cohort[clabsi_cohort['line_duration_total'] > 0]

    # Group by line type
    line_risk_summary = clabsi_cohort.groupby('line_type').agg(
        avg_duration=('line_duration_total', 'mean'),
        median_duration=('line_duration_total', 'median'),
        max_duration=('line_duration_total', 'max'),
        line_count=('line_type', 'count')
    ).reset_index()
    
    return line_risk_summary




In [None]:
def threshold_analysis(clabsi_cohort, thresholds=[3, 5, 7, 10]):
    if 'line_duration_before_clabsi' not in clabsi_cohort.columns:
        clabsi_cohort['adjusted_endtime'] = clabsi_cohort[['line_end', 'infection_time']].min(axis=1)
        clabsi_cohort['line_duration_before_clabsi'] = (
            (clabsi_cohort['adjusted_endtime'] - clabsi_cohort['line_start']).dt.total_seconds() / 86400
        )
        clabsi_cohort = clabsi_cohort[clabsi_cohort['line_duration_before_clabsi'] > 0]

    # Initialize result storage
    results = []

    # Perform threshold analysis for each line type
    for line_type, group in clabsi_cohort.groupby('line_type'):
        total_lines = len(group)
        for threshold in thresholds:
            exceed_count = (group['line_duration_before_clabsi'] > threshold).sum()
            exceed_rate = exceed_count / total_lines * 100
            results.append({
                'line_type': line_type,
                'threshold': threshold,
                'exceed_count': exceed_count,
                'total_lines': total_lines,
                'exceed_rate': exceed_rate
            })

    # Convert results to a DataFrame
    threshold_df = pd.DataFrame(results)
    return threshold_df

# Perform threshold analysis
thresholds = [3, 5, 7, 10]
threshold_summary = threshold_analysis(clabsi_cohort, thresholds)

# Display results
print("\nThreshold Analysis Summary:")
display(threshold_summary)


In [None]:
def get_chg_bath_features(engine, cohort_df):
    bath_query = text("""
    WITH bath_events AS (
        SELECT 
            ce.stay_id,
            ce.charttime,
            EXTRACT(EPOCH FROM (
                ce.charttime - LAG(ce.charttime) OVER (
                    PARTITION BY ce.stay_id ORDER BY ce.charttime
                )
            ))/86400 AS days_since_last_bath
        FROM mimiciv_icu.chartevents ce
        INNER JOIN mimiciv_derived.invasive_line il 
            ON ce.stay_id = il.stay_id
            AND ce.charttime BETWEEN il.starttime - INTERVAL '2 hours' 
                             AND il.endtime + INTERVAL '2 hours'
        WHERE ce.itemid = 228137  -- CHG Bath
    ),
    bath_analysis AS (
        SELECT
            stay_id,
            COUNT(*) AS total_baths,
            MAX(days_since_last_bath) AS max_gap_between_baths
        FROM bath_events
        GROUP BY stay_id
    )
    SELECT 
        ba.*,
        CASE WHEN ba.total_baths = 0 THEN 1 ELSE 0 END as no_bath_flag,
        CASE WHEN ba.max_gap_between_baths > 3 THEN 1 ELSE 0 END as irregular_bathing
    FROM bath_analysis ba
    WHERE ba.stay_id IN :stay_ids
    """)
    
    stay_ids = tuple(int(x) for x in cohort_df['stay_id'].unique())
    return pd.read_sql(bath_query, engine, params={'stay_ids': stay_ids})

In [None]:
def get_medications(engine, cohort_df):
    med_query = """
    WITH antibiotics AS (
        SELECT 
            stay_id,
            COUNT(DISTINCT antibiotic) AS distinct_antibiotics,
            COUNT(*) AS total_antibiotic_orders,
            MAX(EXTRACT(EPOCH FROM (stoptime - starttime))/86400) AS max_antibiotic_duration
        FROM mimiciv_derived.antibiotic
        WHERE stay_id = ANY(%(stay_ids)s)
        GROUP BY stay_id
    ),
    tpn AS (
        SELECT 
            stay_id,
            COUNT(*) AS tpn_orders,
            SUM(CASE WHEN rate > 0 THEN 1 ELSE 0 END) AS tpn_days
        FROM mimiciv_icu.inputevents
        WHERE stay_id = ANY(%(stay_ids)s)
        AND itemid IN (
            226089, -- TPN
            227690  -- Lipids
        )
        GROUP BY stay_id
    ),
    high_risk_meds AS (
        SELECT 
            stay_id,
            COUNT(CASE WHEN itemid IN (
                221662, -- Vasopressors
                221653, -- Steroids
                221668  -- Immunosuppressants
            ) THEN 1 ELSE NULL END) AS high_risk_med_count,
            COUNT(DISTINCT itemid) AS distinct_high_risk_meds
        FROM mimiciv_icu.inputevents
        WHERE stay_id = ANY(%(stay_ids)s)
        GROUP BY stay_id
    )
    SELECT 
        COALESCE(a.stay_id, t.stay_id, h.stay_id) AS stay_id,
        COALESCE(distinct_antibiotics, 0) AS distinct_antibiotics,
        COALESCE(total_antibiotic_orders, 0) AS total_antibiotic_orders,
        COALESCE(max_antibiotic_duration, 0) AS max_antibiotic_duration,
        COALESCE(tpn_orders, 0) AS tpn_orders,
        COALESCE(tpn_days, 0) AS tpn_days,
        COALESCE(high_risk_med_count, 0) AS high_risk_med_count,
        COALESCE(distinct_high_risk_meds, 0) AS distinct_high_risk_meds
    FROM antibiotics a
    FULL OUTER JOIN tpn t ON a.stay_id = t.stay_id
    FULL OUTER JOIN high_risk_meds h ON a.stay_id = h.stay_id
    """
    
    
    stay_ids = [int(x) for x in cohort_df['stay_id'].unique()]
    
    
    medication_df = pd.read_sql(
        med_query, 
        engine, 
        params={'stay_ids': stay_ids}
    )
    
    
    print("\nMedication Features Retrieved:")
    print(f"Number of rows: {len(medication_df)}")
    display(medication_df.head())
    
    print("\nSummary Statistics:")
    display(medication_df.describe().round(2))
    
    return medication_df


In [None]:
def get_clinical_features(engine, cohort_df):
    line_care_query = text("""
    WITH line_events AS (
        SELECT 
            ie.stay_id,
            il.line_type,
            il.starttime as line_start,
            il.endtime as line_end,
            MAX(ce.charttime) as last_dressing_change,
            COUNT(CASE WHEN ce.itemid = 228137 THEN 1 END) as actual_baths,
            CEIL(EXTRACT(EPOCH FROM (il.endtime - il.starttime))/86400/2) 
                as expected_baths
        FROM mimiciv_icu.icustays ie
        INNER JOIN mimiciv_derived.invasive_line il 
            ON ie.stay_id = il.stay_id
        LEFT JOIN mimiciv_icu.chartevents ce 
            ON ie.stay_id = ce.stay_id
            AND ce.charttime BETWEEN il.starttime AND il.endtime
        WHERE ie.stay_id IN :stay_ids
        GROUP BY ie.stay_id, il.line_type, il.starttime, il.endtime
    ),
    infection_timing AS (
        SELECT 
            i.stay_id,  -- Corrected: Get stay_id from icustays
            MIN(m.charttime) as infection_time
        FROM mimiciv_hosp.microbiologyevents m
        INNER JOIN mimiciv_icu.icustays i
            ON m.hadm_id = i.hadm_id
            AND m.charttime BETWEEN i.intime AND i.outtime
        WHERE m.spec_type_desc = 'BLOOD CULTURE'
          AND m.org_name IS NOT NULL
        GROUP BY i.stay_id
    )
    SELECT 
        le.stay_id,
        le.line_type,
        EXTRACT(EPOCH FROM (it.infection_time - le.last_dressing_change))/86400 
            as days_since_last_dressing_change,
        le.actual_baths / NULLIF(le.expected_baths, 0) 
            as chg_adherence_ratio,
        EXTRACT(EPOCH FROM (le.line_end - le.line_start))/86400 
            as line_duration_days
    FROM line_events le
    INNER JOIN infection_timing it 
        ON le.stay_id = it.stay_id
    ORDER BY le.stay_id;
    """)
    
    stay_ids = tuple(int(x) for x in cohort_df['stay_id'].unique())
    line_care_df = pd.read_sql(line_care_query, engine, params={'stay_ids': stay_ids})
    return line_care_df

In [None]:
def analyze_line_sites_clabsi_risk(engine, cohort_df, line_care_df):
    site_query = text("""
    WITH line_infections AS (
        SELECT 
            il.stay_id,
            il.line_type,
            il.line_site,
            CASE WHEN il.stay_id = ANY(:clabsi_ids) THEN 1 ELSE 0 END as is_clabsi,
            EXTRACT(EPOCH FROM (il.endtime - il.starttime))/86400 as line_duration_days
        FROM mimiciv_derived.invasive_line il
        WHERE il.stay_id = ANY(:clabsi_ids)
        AND il.line_site IS NOT NULL
        AND il.line_type NOT LIKE 'Arterial'  -- Exclude arterial lines
        AND il.line_type IN (
            'PICC', 'Multi Lumen', 'Dialysis', 'Triple Introducer', 
            'Pre-Sep', 'Hickman', 'Portacath', 'Cordis/Introducer',
            'Continuous Cardiac Output PA', 'PA'
        )
    )
    SELECT 
        line_type,
        line_site,
        COUNT(*) as total_lines,
        SUM(is_clabsi) as clabsi_count,
        ROUND(AVG(line_duration_days), 2) as avg_duration_days
    FROM line_infections
    GROUP BY line_type, line_site
    HAVING COUNT(*) >= 5
    ORDER BY clabsi_count DESC;
    """)

    try:
        clabsi_ids = [int(x) for x in cohort_df['stay_id'].unique()]
        
        site_risk = pd.read_sql(site_query, engine, params={'clabsi_ids': clabsi_ids})
        
        print("\nCentral Line Sites Associated with CLABSI:")
        print("-" * 40)
        display(site_risk)
        
        return site_risk

    except Exception as e:
        print(f"Error executing analysis: {e}")
        return None

# Execute 
site_risk_df = analyze_line_sites_clabsi_risk(engine, clabsi_cohort, line_care_df)

In [None]:
class CLABSIFeatureProcessor:

    def __init__(self, engine):
        """
        Initialize the feature processor.
        
        Parameters
        ----------
        engine : SQLAlchemy engine
            Database connection engine
        """
        self.engine = engine
        
        
        self.time_windows = {
            'short': '24 hours',
            'medium': '48 hours',
            'long': '72 hours'
        }
        
        
        self.duration_thresholds = [3, 5, 7, 10]
        
        
        try:
            query = "SELECT stay_id FROM mimiciv_icu.icustays LIMIT 1"
            self.example_stay_id = pd.read_sql(query, self.engine)['stay_id'].iloc[0]
        except Exception as e:
            self.example_stay_id = None
            print(f"Warning: Could not set example_stay_id: {e}")

    def get_all_features(self, stay_id):
       
        try:
            # Create single-row dataframe with stay_id
            stay_df = pd.DataFrame({'stay_id': [stay_id]})
            
            # If this is a CLABSI case, add infection_time from clabsi_cohort
            if 'clabsi_cohort' in globals():
                infection_time = clabsi_cohort[clabsi_cohort['stay_id'] == stay_id]['infection_time'].iloc[0]
                stay_df['infection_time'] = infection_time
            else:
                # For non-CLABSI cases, use outtime as endpoint
                outtime_query = text("""
                    SELECT outtime 
                    FROM mimiciv_icu.icustays 
                    WHERE stay_id = :stay_id
                """)
                outtime = pd.read_sql(outtime_query, self.engine, params={'stay_id': stay_id})['outtime'].iloc[0]
                stay_df['infection_time'] = outtime
            
            # Get individual feature sets
            features = {
                'demographics': get_demographic_features(self.engine, stay_df),
                'devices': get_device_features(self.engine, stay_df),
                'labs': get_lab_values(self.engine, stay_df),
                'severity': get_all_severity_scores(self.engine, stay_df),
                'line_care': get_line_care(self.engine, stay_df),
                'medications': get_medications(self.engine, stay_df),
                'clinical': get_clinical_features(self.engine, stay_df),
                'chg_bath': get_chg_bath_features(self.engine, stay_df)
            }
            
            # Store feature sets as instance variables for feature names access
            for key, value in features.items():
                setattr(self, f"{key}_features", value)
            
            # Combine all features
            combined_features = self._combine_features(features)
            
            return combined_features
            
        except Exception as e:
            print(f"Error extracting features for stay_id {stay_id}: {str(e)}")
            return None
            
    def get_batch_features(self, cohort_df):
        
        try:
            print(f"Processing {len(cohort_df)} stays...")

            # Get features for all stays
            features = {
                'demographics': get_demographic_features(self.engine, cohort_df),
                'devices': get_device_features(self.engine, cohort_df),
                'labs': get_lab_values(self.engine, cohort_df),
                'severity': get_all_severity_scores(self.engine, cohort_df),
                'line_care': get_line_care(self.engine, cohort_df),
                'medications': get_medications(self.engine, cohort_df),
                'clinical': get_clinical_features(self.engine, cohort_df),
                'chg_bath': get_chg_bath_features(self.engine, cohort_df)
            }
            
            # Print status of each feature set
            for name, feature_set in features.items():
                if feature_set is not None:
                    print(f"{name}: {len(feature_set)} rows")
                else:
                    print(f"{name}: None")
            
            # Store feature sets as instance variables
            for key, value in features.items():
                setattr(self, f"{key}_features", value)
            
            # Combine features and preserve original cohort information
            print("Combining features...")
            combined_features = self._combine_features(features)
            if combined_features is not None:
                # Add relevant cohort columns
                cohort_cols = ['line_type', 'line_site', 'line_duration_days'] 
                cohort_info = cohort_df[['stay_id'] + [col for col in cohort_cols if col in cohort_df.columns]]
                combined_features = combined_features.merge(cohort_info, on='stay_id', how='left')
                print(f"Final feature set: {len(combined_features)} rows")
                
            return combined_features
                
        except Exception as e:
            print(f"Error in get_batch_features: {str(e)}")
            import traceback
            traceback.print_exc()
            return None
    
   # In the CLABSIFeatureProcessor class, modify _combine_features method
    def _combine_features(self, features):
        # Keep original line_type
        combined = features['line_care'].merge(features['demographics'], on='stay_id')
        
        # Add simplified version
        combined['line_type_simple'] = combined['line_type'].apply(
            lambda x: 'Multi-Lumen' if 'Multi Lumen' in x 
            else 'Dialysis' if 'Dialysis' in x 
            else 'PICC' if 'PICC' in x 
            else 'Other'
        )
    
    # Rest of your existing merging logic
        try:
            import traceback  # Add import at the top
            
            # Start with demographics as base
            combined = features['demographics'].copy()
            
            feature_sets = {
                'devices': features['devices'],
                'labs': features['labs'],
                'medications': features['medications'],
                'clinical': features['clinical']
            }
            
            # Merge straightforward feature sets
            for name, feature_set in feature_sets.items():
                if feature_set is not None:
                    feature_set = feature_set[feature_set['stay_id'].isin(combined['stay_id'])]
                    if not feature_set.empty:
                        combined = combined.merge(feature_set, on='stay_id', how='left')
            
            # Handle severity scores separately
            if features['severity'] is not None:
                severity_scores = features['severity']
                if severity_scores['sofa'] is not None:
                    combined = combined.merge(severity_scores['sofa'], on='stay_id', how='left')
                if severity_scores['apsiii'] is not None:
                    apsiii_cols = ['stay_id', 'apsiii', 'apsiii_prob']
                    combined = combined.merge(
                        severity_scores['apsiii'][apsiii_cols], 
                        on='stay_id', 
                        how='left'
                    )
                if severity_scores['sapsii'] is not None:
                    sapsii_cols = ['stay_id', 'sapsii', 'sapsii_prob']
                    combined = combined.merge(
                        severity_scores['sapsii'][sapsii_cols], 
                        on='stay_id', 
                        how='left'
                    )
            
            # Handle CHG bath features
            if features['chg_bath'] is not None:
                chg_features = features['chg_bath']
                chg_features = chg_features[chg_features['stay_id'].isin(combined['stay_id'])]
                if not chg_features.empty:
                    # Get actual columns that exist in chg_features
                    available_cols = ['stay_id', 'total_baths']
                    available_cols.extend([col for col in chg_features.columns 
                                        if col not in ['stay_id', 'total_baths'] 
                                        and col in chg_features.columns])
                    
                    combined = combined.merge(
                        chg_features[available_cols], 
                        on='stay_id', 
                        how='left'
                    )
            
            # Handle line care metrics
            if features['line_care'] is not None:
                line_features = features['line_care']
                line_features = line_features[line_features['stay_id'].isin(combined['stay_id'])]
                
                if not line_features.empty:
                    numeric_cols = line_features.select_dtypes(include=['float64', 'int64']).columns
                    numeric_cols = numeric_cols[numeric_cols != 'stay_id']
                    
                    categorical_cols = line_features.select_dtypes(include=['object']).columns
                    categorical_cols = categorical_cols[categorical_cols != 'stay_id']
                    
                    aggs = {}
                    
                    # Add numeric aggregations
                    for col in numeric_cols:
                        aggs[f"{col}_min"] = pd.NamedAgg(column=col, aggfunc='min')
                        aggs[f"{col}_max"] = pd.NamedAgg(column=col, aggfunc='max')
                        aggs[f"{col}_mean"] = pd.NamedAgg(column=col, aggfunc='mean')
                    
                    # Add categorical aggregations
                    for col in categorical_cols:
                        aggs[f"{col}_types"] = pd.NamedAgg(
                            column=col,
                            aggfunc=lambda x: '|'.join(sorted(x.unique()))
                        )
                    
                    line_care_agg = line_features.groupby('stay_id').agg(**aggs).reset_index()
                    combined = combined.merge(line_care_agg, on='stay_id', how='left')
            
            # Handle missing values
            numeric_cols = combined.select_dtypes(include=['float64', 'int64']).columns
            categorical_cols = combined.select_dtypes(include=['object']).columns
            
            # Fill numeric with median, excluding stay_id
            numeric_cols = numeric_cols[numeric_cols != 'stay_id']
            combined[numeric_cols] = combined[numeric_cols].fillna(combined[numeric_cols].median())
            
            # Fill categorical with mode, excluding stay_id
            categorical_cols = categorical_cols[categorical_cols != 'stay_id']
            for col in categorical_cols:
                if combined[col].isnull().any():
                    mode_value = combined[col].mode().iloc[0] if not combined[col].mode().empty else 'Unknown'
                    combined[col] = combined[col].fillna(mode_value)
            
            return combined
                
        except Exception as e:
            print(f"Error combining features: {str(e)}")
            traceback.print_exc()
            return None
    
    def get_feature_names(self):
        example_features = self.get_all_features(self.example_stay_id)
        if example_features is not None:
            return example_features.columns.tolist()
        return None
        
    def get_feature_stats(self, features_df):
        
        try:
            stats = {}
            
            numeric_cols = features_df.select_dtypes(include=['float64', 'int64']).columns
            stats['numeric'] = features_df[numeric_cols].describe()
            
            
            categorical_cols = features_df.select_dtypes(include=['object']).columns
            stats['categorical'] = {
                col: features_df[col].value_counts() 
                for col in categorical_cols
            }
            
            
            stats['missing'] = features_df.isnull().sum()
            
            return stats
            
        except Exception as e:
            print(f"Error calculating feature statistics: {str(e)}")
            return None

# Test the feature processor
if engine:
    # Create processor instance
    feature_processor = CLABSIFeatureProcessor(engine)
    
    # Test with a single stay_id from clabsi_cohort
    print("Testing feature extraction with CLABSI case...")
    test_stay_id = clabsi_cohort['stay_id'].iloc[0]
    features = feature_processor.get_all_features(test_stay_id)
    
    if features is not None:
        print(f"\nExtracted {len(features.columns)} features for stay_id {test_stay_id}")
        
        # Display feature categories and first few rows
        print("\nFeature sample:")
        display(features.head())
        
        # Get feature statistics
        stats = feature_processor.get_feature_stats(features)
        if stats:
            print("\nFeature Statistics:")
            display(stats['numeric'])

In [None]:
# Get features for both cohorts
feature_processor = CLABSIFeatureProcessor(engine)

# Process CLABSI cases with cohort info
print("\nProcessing CLABSI cohort...")
clabsi_features = feature_processor.get_batch_features(clabsi_cohort)
if clabsi_features is not None:
    clabsi_features['is_clabsi'] = 1

# Process controls with cohort info 
print("\nProcessing control cohort...")
control_features = feature_processor.get_batch_features(control_cohort)
if control_features is not None:
    control_features['is_clabsi'] = 0

if clabsi_features is not None and control_features is not None:
    # Combine datasets
    combined_df = pd.concat([clabsi_features, control_features], axis=0)
    print(f"\nFinal dataset shape: {combined_df.shape}")
    
    from scipy.stats import ttest_ind, chi2_contingency, mannwhitneyu, fisher_exact
    from statsmodels.stats.multitest import multipletests
    import numpy as np
    
    feature_groups = {
        'Demographics': {
            'continuous': ['admission_age'],
            'categorical': [col for col in combined_df.columns if col.startswith('ethnicity_')]
        },
        'Line Characteristics': {
            'continuous': ['line_duration_days', 'avg_duration'],
            'categorical': ['line_type_types', 'site_category_types', 'risk_category_types']
        },
        'Laboratory Values': {
            'continuous': [col for col in combined_df.columns if any(x in col for x in ['wbc_', 'plt_', 'hgb_', 'creatinine_'])],
            'categorical': []
        },
        'Care Practices': {
            'continuous': [col for col in combined_df.columns if any(x in col for x in ['chg_bath', 'dressing_change', 'site_assessment'])],
            'categorical': []
        },
        'Severity Scores': {
            'continuous': [col for col in combined_df.columns if any(x in col for x in ['sofa', 'apsiii', 'sapsii'])],
            'categorical': []
        }
    }

    def analyze_features(df, feature_groups):
        results = []
        
        for group_name, features in feature_groups.items():
            print(f"\n{' '+group_name+' Analysis ':=^50}")
            
            # Analyze continuous features
            for feature in features['continuous']:
                if feature in df.columns:
                    # Check if the feature is actually numeric
                    if pd.api.types.is_numeric_dtype(df[feature]):
                        clabsi_vals = df[df['is_clabsi']==1][feature].dropna()
                        control_vals = df[df['is_clabsi']==0][feature].dropna()
                        
                        if len(clabsi_vals) > 0 and len(control_vals) > 0:
                            # Calculate descriptive statistics
                            stats = {
                                'Feature': feature,
                                'Group': group_name,
                                'Type': 'continuous',
                                'CLABSI_N': len(clabsi_vals),
                                'Control_N': len(control_vals),
                                'CLABSI_Mean': clabsi_vals.astype(float).mean(),
                                'CLABSI_SD': clabsi_vals.astype(float).std(),
                                'Control_Mean': control_vals.astype(float).mean(),
                                'Control_SD': control_vals.astype(float).std(),
                                'Difference': clabsi_vals.astype(float).mean() - control_vals.astype(float).mean()
                            }
                            
                            # Mann-Whitney U test
                            try:
                                stat, p_value = mannwhitneyu(clabsi_vals.astype(float), 
                                                        control_vals.astype(float), 
                                                        alternative='two-sided')
                                stats['P_Value'] = p_value
                            except Exception as e:
                                print(f"Warning: Mann-Whitney test failed for {feature}: {str(e)}")
                                stats['P_Value'] = np.nan
                                
                            results.append(stats)
                            
                            print(f"\n{feature}")
                            print(f"CLABSI (n={len(clabsi_vals)}): {stats['CLABSI_Mean']:.2f} ± {stats['CLABSI_SD']:.2f}")
                            print(f"Control (n={len(control_vals)}): {stats['Control_Mean']:.2f} ± {stats['Control_SD']:.2f}")
                            print(f"p-value: {stats['P_Value']:.4f}")
                    else:
                        print(f"Warning: {feature} was marked as continuous but contains non-numeric data")
                        # Move it to categorical if it's string data
                        if pd.api.types.is_string_dtype(df[feature]):
                            features['categorical'].append(feature)
            
            # Analyze categorical features
            for feature in features['categorical']:
                if feature in df.columns:
                    contingency = pd.crosstab(df['is_clabsi'], df[feature])
                    
                    if contingency.shape[1] == 2:  # Binary feature
                        try:
                            oddsratio, p_value = fisher_exact(contingency)
                            test_type = "Fisher's exact test"
                        except Exception as e:
                            print(f"Warning: Fisher's exact test failed for {feature}: {str(e)}")
                            oddsratio, p_value = np.nan, np.nan
                            test_type = "Test failed"
                    else:
                        try:
                            chi2, p_value, dof, expected = chi2_contingency(contingency)
                            oddsratio = np.nan
                            test_type = "Chi-square test"
                        except Exception as e:
                            print(f"Warning: Chi-square test failed for {feature}: {str(e)}")
                            p_value = np.nan
                            oddsratio = np.nan
                            test_type = "Test failed"
                    
                    stats = {
                        'Feature': feature,
                        'Group': group_name,
                        'Type': 'categorical',
                        'P_Value': p_value,
                        'Odds_Ratio': oddsratio,
                        'Test_Type': test_type
                    }
                    
                    # Calculate proportions
                    props = pd.crosstab(df['is_clabsi'], df[feature], normalize='index')
                    for col in props.columns:
                        stats[f'CLABSI_Prop_{col}'] = props.loc[1, col]
                        stats[f'Control_Prop_{col}'] = props.loc[0, col]
                    
                    results.append(stats)
                    
                    print(f"\n{feature}")
                    print(props.round(3))
                    print(f"{test_type} p-value: {p_value:.4f}")
                    if not np.isnan(oddsratio):
                        print(f"Odds ratio: {oddsratio:.2f}")
        
        return pd.DataFrame(results)

    # Perform analysis
    results_df = analyze_features(combined_df, feature_groups)
    
    # Multiple testing correction
    p_values = results_df['P_Value'].dropna()
    if len(p_values) > 0:
        rejected, p_corrected, _, _ = multipletests(p_values, method='fdr_bh')
        results_df.loc[p_values.index, 'P_Value_Adjusted'] = p_corrected
    
    # Sort results by significance
    results_df = results_df.sort_values('P_Value')
    
    print("\nSaving results...")
    results_df.to_csv('clabsi_analysis_results.csv', index=False)
    print("Results saved to clabsi_analysis_results.csv")
    
    print("\nSignificant Findings (FDR-adjusted p < 0.05):")
    sig_results = results_df[results_df['P_Value_Adjusted'] < 0.05]
    for _, row in sig_results.iterrows():
        print(f"\n{row['Group']} - {row['Feature']}")
        if row['Type'] == 'continuous':
            print(f"CLABSI: {row['CLABSI_Mean']:.2f} ± {row['CLABSI_SD']:.2f}")
            print(f"Control: {row['Control_Mean']:.2f} ± {row['Control_SD']:.2f}")
        else:
            print(f"Odds Ratio: {row['Odds_Ratio']:.2f}")
        print(f"Adjusted p-value: {row['P_Value_Adjusted']:.4f}")

else:
    print("Error: Failed to extract features for either CLABSI cases or controls")