In [1]:
import numpy as np
import pandas as pd
from datetime import timedelta
from scipy import stats

In [2]:
THRESH_SPO2_LOW = 92
THRESH_SPO2_CRITICAL = 88
THRESH_SHOCK_INDEX_WARNING = 0.9
THRESH_SHOCK_INDEX_CRITICAL = 1.0
THRESH_MAP_LOW = 65  # Mean Arterial Pressure threshold for shock
THRESH_LACTATE_HIGH = 2.0  # mmol/L for tissue hypoperfusion
THRESH_LACTATE_CRITICAL = 4.0
THRESH_UO_LOW = 0.5  # mL/kg/hr (oliguria)

# Trend Analysis
TREND_WINDOW = 6  # Number of readings for short-term trend analysis


# AGE-SPECIFIC VITAL SIGN THRESHOLDS (Low, Normal, High)

AGE_THRESHOLDS = {
    'neonate': {
        'rr_low': 30, 'rr_normal': 40, 'rr_high': 60,
        'hr_low': 100, 'hr_normal': 140, 'hr_high': 160,
        'sbp_low': 60, 'sbp_normal': 70, 'sbp_high': 90,
        'temp_low': 36.0, 'temp_normal': 37.2, 'temp_high': 38.0
    },
    'infant': {
        'rr_low': 24, 'rr_normal': 30, 'rr_high': 40,
        'hr_low': 80, 'hr_normal': 120, 'hr_high': 140,
        'sbp_low': 70, 'sbp_normal': 85, 'sbp_high': 100,
        'temp_low': 36.0, 'temp_normal': 37.2, 'temp_high': 38.0
    },
    'child': {
        'rr_low': 16, 'rr_normal': 20, 'rr_high': 30,
        'hr_low': 70, 'hr_normal': 90, 'hr_high': 110,
        'sbp_low': 80, 'sbp_normal': 95, 'sbp_high': 110,
        'temp_low': 36.0, 'temp_normal': 37.0, 'temp_high': 38.0
    },
    'adolescent': {
        'rr_low': 12, 'rr_normal': 16, 'rr_high': 20,
        'hr_low': 60, 'hr_normal': 75, 'hr_high': 100,
        'sbp_low': 90, 'sbp_normal': 105, 'sbp_high': 120,
        'temp_low': 35.8, 'temp_normal': 36.8, 'temp_high': 37.8
    },
    'adult': {
        'rr_low': 12, 'rr_normal': 16, 'rr_high': 20,
        'hr_low': 60, 'hr_normal': 80, 'hr_high': 100,
        'sbp_low': 90, 'sbp_normal': 115, 'sbp_high': 130,
        'temp_low': 35.5, 'temp_normal': 36.8, 'temp_high': 38.0
    },
    'geriatric': {  # Added for elderly patients who may have different baselines
        'rr_low': 12, 'rr_normal': 16, 'rr_high': 24, # Often higher RR baseline
        'hr_low': 55, 'hr_normal': 70, 'hr_high': 90, # Often lower HR
        'sbp_low': 90, 'sbp_normal': 125, 'sbp_high': 140, # Often higher SBP
        'temp_low': 35.5, 'temp_normal': 36.5, 'temp_high': 37.5 # Often lower temp
    }
}
# CPE-specific slope thresholds (per hour)
CPE_RR_SLOPE_THRESH = 1.0    # >= +1 respirations/hour over 12h
CPE_HR_SLOPE_THRESH = 2.0    # >= +2 bpm/hour over 12h
CPE_SPO2_SLOPE_THRESH = -0.5 # <= -0.5 %/hour over 12h (falling)
CPE_TREND_WINDOW_HRS = 12    # Lookback window for long-term trends
CPE_MIN_READINGS = 4         # Min data points to calculate a reliable trend

In [3]:
def assign_age_category(df):
    """
    Assigns an age category based on the 'age' column in the DataFrame.
    Now includes more granular pediatric categories and a geriatric category.
    """
    df = df.copy()
    
    def _categorize(age):
        if age <= 0.083: return 'neonate'     # < 1 month
        elif age <= 1:   return 'infant'      # 1 month - 1 year
        elif age < 5:    return 'child'       # 1 - 5 years
        elif age < 13:   return 'adolescent'  # 5 - 12 years
        elif age < 65:   return 'adult'       # 13 - 64 years
        else:            return 'geriatric'   # 65+ years
    
    if 'age' in df.columns:
        df['age_category'] = df['age'].apply(_categorize)
    else:
        # Default to adult if age is not provided
        df['age_category'] = 'adult'
    
    return df

In [4]:
def apply_vital_range_flags(df):
    """
    Applies age-specific thresholds to flag abnormal vital signs.
    Now includes flags for all parameters needed across pipelines.
    """
    df = df.copy()
    df = assign_age_category(df)  # Inject age group

    # SpO₂ flags (Absolute threshold)
    df['flag_spo2_low'] = df['spo2'] < THRESH_SPO2_LOW
    df['flag_spo2_critical'] = df['spo2'] < THRESH_SPO2_CRITICAL

    # Temperature flags
    df['flag_temp_high'] = df.apply(lambda row: row['temperature'] >= AGE_THRESHOLDS[row['age_category']]['temp_high'], axis=1)
    df['flag_temp_low'] = df.apply(lambda row: row['temperature'] < AGE_THRESHOLDS[row['age_category']]['temp_low'], axis=1)

    # Respiratory Rate flags
    df['flag_rr_low'] = df.apply(lambda row: row['resp_rate'] < AGE_THRESHOLDS[row['age_category']]['rr_low'], axis=1)
    df['flag_rr_high'] = df.apply(lambda row: row['resp_rate'] >= AGE_THRESHOLDS[row['age_category']]['rr_high'], axis=1)

    # Heart Rate flags
    df['flag_hr_low'] = df.apply(lambda row: row['heart_rate'] < AGE_THRESHOLDS[row['age_category']]['hr_low'], axis=1)
    df['flag_hr_high'] = df.apply(lambda row: row['heart_rate'] >= AGE_THRESHOLDS[row['age_category']]['hr_high'], axis=1)

    # Calculate Shock Index (handle division by zero)
    df['shock_index'] = df['heart_rate'] / np.clip(df['sbp'], a_min=1, a_max=None)    
    # Flag based on Shock Index
    df['flag_si_warning'] = df['shock_index'] >= THRESH_SHOCK_INDEX_WARNING
    df['flag_si_critical'] = df['shock_index'] >= THRESH_SHOCK_INDEX_CRITICAL

    # Blood Pressure flags
    df['flag_sbp_low'] = df.apply(lambda row: row['sbp'] < AGE_THRESHOLDS[row['age_category']]['sbp_low'], axis=1)
    df['flag_sbp_high'] = df.apply(lambda row: row['sbp'] >= AGE_THRESHOLDS[row['age_category']]['sbp_high'], axis=1)
    df['flag_dbp_low'] = df.apply(lambda row: row['dbp'] < (AGE_THRESHOLDS[row['age_category']]['sbp_low'] * 0.6), axis=1) # Estimate DBP low
    df['flag_dbp_high'] = df.apply(lambda row: row['dbp'] >= (AGE_THRESHOLDS[row['age_category']]['sbp_high'] * 0.6), axis=1) # Estimate DBP high

    return df


In [5]:
def compute_recent_trends_delta(df):
    """
    Computes trends for each vital by differencing consecutive readings.
    Applies stricter interpretation using age-specific thresholds.
    """
    df = df.copy().sort_values("timestamp").reset_index(drop=True)

    if 'age_category' not in df.columns:
        df = assign_age_category(df)

    trends = {}
    recent = df.tail(TREND_WINDOW)
    age_group = recent['age_category'].iloc[-1]
    thresholds = AGE_THRESHOLDS[age_group]

    vital_map = {
        'resp_rate': ('rr_low', 'rr_normal', 'rr_high'),
        'heart_rate': ('hr_low', 'hr_normal', 'hr_high'),
        'sbp': ('sbp_low', 'sbp_normal', 'sbp_high'),
        'temperature': ('temp_low', 'temp_normal', 'temp_high'),
        'spo2': (None, None, None)  # handled separately
    }

    for vital in ['resp_rate', 'heart_rate', 'sbp', 'temperature', 'spo2']:
        if vital not in recent.columns or recent[vital].isnull().all():
            continue

        y = recent[vital].dropna().values
        if len(y) < 2:
            continue

        avg_delta = np.mean(np.diff(y))
        latest = y[-1]
        trends[f"{vital}_trend"] = round(avg_delta, 3)

        if vital == 'spo2':
            if latest < THRESH_SPO2_LOW:
                if avg_delta > 0:
                    flag = "Still abnormal — but improving"
                elif avg_delta < 0:
                    flag = "Abnormal and worsening"
                else:
                    flag = "Abnormal and flat"
            else:
                if avg_delta < 0:
                    flag = "Normal but deteriorating"
                else:
                    flag = "Normal and stable"

        else:
            low_key, norm_key, high_key = vital_map[vital]
            low = thresholds[low_key]
            normal = thresholds[norm_key]
            high = thresholds[high_key]

            if latest < low or latest > high:
                if (latest > high and avg_delta < 0) or (latest < low and avg_delta > 0):
                    flag = "Still abnormal — but improving"
                else:
                    flag = "Abnormal and worsening"
            else:
                if avg_delta < 0:
                    flag = "Normal but deteriorating"
                else:
                    flag = "Normal and stable"

        trends[f"{vital}_trend_flag"] = flag

    # Shock Index trend
    if all(col in recent.columns for col in ['heart_rate', 'sbp']):
        hr = recent['heart_rate'].values
        sbp = np.clip(recent['sbp'].values, a_min=1, a_max=None)
        si = hr / sbp

        if len(si) >= 2:
            avg_si_delta = np.mean(np.diff(si))
            trends['shock_index_trend'] = round(avg_si_delta, 3)

            latest_si = si[-1]
            # FIXED: Replaced undefined THRESH_SHOCK_INDEX_HIGH with THRESH_SHOCK_INDEX_CRITICAL
            if latest_si >= THRESH_SHOCK_INDEX_CRITICAL:
                flag = "Shock Index critical — improving" if avg_si_delta < 0 else "Shock Index critical — worsening"
            else:
                flag = "Normal but improving" if avg_si_delta < 0 else "Normal but rising"

            trends['shock_index_trend_flag'] = flag

    return trends

In [6]:
def calculate_long_term_slope(df, vital_col, window_hours=CPE_TREND_WINDOW_HRS, min_readings=CPE_MIN_READINGS):
    """
    Calculates the linear regression slope of a vital sign over a specified time window.
    The slope is returned as the change per hour.

    Args:
        df (pd.DataFrame): The patient's DataFrame, sorted by timestamp.
        vital_col (str): The name of the vital sign column (e.g., 'resp_rate').
        window_hours (int): The number of hours to look back.
        min_readings (int): Minimum number of data points required to compute a slope.

    Returns:
        float: The slope (change per hour), or None if not enough data.
    """
    if df.empty or vital_col not in df.columns:
        return None

    # Create a copy and ensure the DataFrame is sorted
    df = df.copy().sort_values('timestamp')
    now = df['timestamp'].iloc[-1]
    cutoff = now - timedelta(hours=window_hours)

    # Get data from the relevant window for the specific vital sign
    mask = (df['timestamp'] >= cutoff) & (df['timestamp'] <= now)
    # We need both the timestamp and the vital value to perform the regression
    trend_data = df.loc[mask, ['timestamp', vital_col]].dropna(subset=[vital_col])

    if len(trend_data) < min_readings:
        return None

    # Convert timestamps to numerical values (hours since the first point in the window)
    # This is the essential fix: using the actual timestamps, not the DataFrame index.
    x_timestamps = trend_data['timestamp']
    start_time = x_timestamps.iloc[0]
    # Calculate the time difference in hours for each point
    x_hours = np.array([(ts - start_time).total_seconds() / 3600.0 for ts in x_timestamps])

    y_values = trend_data[vital_col].values

    # Perform linear regression (y = mx + b). The slope (m) is the rate of change per hour.
    try:
        slope, _ = np.polyfit(x_hours, y_values, 1)
        return slope
    except (np.linalg.LinAlgError, TypeError, ValueError):
        # Handle cases where regression fails (e.g., all x values are the same)
        return None

In [7]:
def compute_cpe_trends(df):
    """
    Computes the long-term (12h) trends needed for CPE detection and adds them as columns.
    """
    # Ensure we are working on a copy of the main DataFrame
    df = df.copy()

    # Calculate and add the long-term slopes for CPE-specific criteria
    df['cpe_rr_slope_12h'] = calculate_long_term_slope(df, 'resp_rate', window_hours=12)
    df['cpe_hr_slope_12h'] = calculate_long_term_slope(df, 'heart_rate', window_hours=12)
    df['cpe_spo2_slope_12h'] = calculate_long_term_slope(df, 'spo2', window_hours=12)
    df['cpe_sbp_slope_12h'] = calculate_long_term_slope(df, 'sbp', window_hours=12)

    return df


In [8]:
def detect_cpe(df):
    """
    Main function to detect Cardiogenic Pulmonary Edema.
    Integrates long-term trends with existing single-point flags for a comprehensive assessment.
    """
    if df.empty:
        return {"flag_cpe": False, "label": "Insufficient data", "evidence": {}}

    latest = df.iloc[-1]
    results = {"timestamp": latest.get('timestamp'), "evidence": {}}

    flags = {}
    
    # 1. Check Long-Term CPE Trend Flags (Primary Criteria)
    flags['rr_trend_long'] = (latest.get('cpe_rr_slope_12h') is not None and
                             latest.get('cpe_rr_slope_12h') >= CPE_RR_SLOPE_THRESH)
    results['evidence']['rr_slope_12h'] = latest.get('cpe_rr_slope_12h')

    flags['hr_trend_long'] = (latest.get('cpe_hr_slope_12h') is not None and
                             latest.get('cpe_hr_slope_12h') >= CPE_HR_SLOPE_THRESH)
    results['evidence']['hr_slope_12h'] = latest.get('cpe_hr_slope_12h')

    flags['spo2_trend_long'] = (latest.get('cpe_spo2_slope_12h') is not None and
                               latest.get('cpe_spo2_slope_12h') <= CPE_SPO2_SLOPE_THRESH)
    results['evidence']['spo2_slope_12h'] = latest.get('cpe_spo2_slope_12h')
    
    # 2. Incorporate existing single-point flags (from apply_vital_range_flags)
    flags['rr_high'] = latest.get('flag_rr_high', False)
    flags['spo2_low'] = latest.get('flag_spo2_low', False)
    flags['hr_high'] = latest.get('flag_hr_high', False)
    
    # 2.b More Sophisticated SBP Logic: Rising then Drop
    flags['sbp_swing'] = False
    sbp_slope = latest.get('cpe_sbp_slope_12h')
    results['evidence']['sbp_slope_12h'] = sbp_slope

    if sbp_slope is not None:
        # Get the SBP data from the last 12 hours
        now = latest['timestamp']
        window_start = now - timedelta(hours=CPE_TREND_WINDOW_HRS)
        # Use the original DF passed into detect_cpe, which has the full history
        sbp_data = df[(df['timestamp'] >= window_start) & (df['timestamp'] <= now)][['timestamp', 'sbp']].dropna()
        if not sbp_data.empty:
            max_sbp = sbp_data['sbp'].max()
            current_sbp = latest.get('sbp')
            # Check if there was a high value that has since dropped significantly
            if current_sbp is not None and max_sbp > 160: # Check for a prior peak
                drop_magnitude = max_sbp - current_sbp
                if drop_magnitude > 25:  # mm Hg drop from peak
                    flags['sbp_swing'] = True
                    results['evidence']['sbp_peak_drop_mmHg'] = drop_magnitude

    # 3. (Optional) Use short-term trend flags from your function for context
    short_term_trends = compute_recent_trends_delta(df.tail(TREND_WINDOW)) # Get trends for recent data
    results['evidence']['short_term_trend'] = short_term_trends.get('resp_rate_trend_flag', 'N/A')

    # 4. COMBINATION LOGIC (Now based primarily on long-term trends)
    # FIXED: cardiovascular_stress is correctly defined here and not overwritten
    respiratory_distress = (flags['rr_trend_long'] or flags['rr_high'])
    oxygen_drop = (flags['spo2_trend_long'] or flags['spo2_low'])
    cardiovascular_stress = (flags['hr_trend_long'] or flags['hr_high'] or flags['sbp_swing']) # Includes SBP swing

    high_confidence = (respiratory_distress and oxygen_drop and cardiovascular_stress)
    results['flag_cpe'] = high_confidence

    # 5. Generate Narrative
    narrative_parts = []
    if results['flag_cpe']:
        narrative_parts.append("Sustained trends over 12 hours suggest fluid overload:")
        if flags.get('rr_trend_long'):
            narrative_parts.append(f"RR steadily climbing (+{latest.get('cpe_rr_slope_12h', 0):.1f}/hr)")
        if flags.get('hr_trend_long'):
            narrative_parts.append(f"HR steadily climbing (+{latest.get('cpe_hr_slope_12h', 0):.1f}/hr)")
        if flags.get('spo2_trend_long'):
            narrative_parts.append(f"SpO₂ steadily falling ({latest.get('cpe_spo2_slope_12h', 0):.1f}%/hr)")
        if flags.get('sbp_swing'):
            narrative_parts.append("SBP peaked then dropped")
        # Add context from short-term trend
        narrative_parts.append(f"Recent short-term trend: {results['evidence']['short_term_trend']}")

        results['label'] = " | ".join(narrative_parts) # Better formatting
    else:
        results['label'] = "No strong evidence of cardiogenic pulmonary edema."

    return results


#HOW TO USE THE FULLY INTEGRATED PIPELINE
# 1. Load and preprocess your data
# df = pd.read_csv('vitals.csv')
# df['timestamp'] = pd.to_datetime(df['timestamp'])
# df = df.sort_values('timestamp').reset_index(drop=True)

# 2. Apply age-based flags (your existing function)
# df = apply_vital_range_flags(df)

# 3. Compute ALL trends: short-term (your function) and long-term (new function)
# df = compute_cpe_trends(df) # This function now internally calls your old one

# 4. Run the CPE detection
# cpe_result = detect_cpe(df)
# print(cpe_result)

In [9]:
import pandas as pd
from datetime import datetime, timedelta

# Create a timestamp index for the last 24 hours, with readings every 2 hours
start_time = datetime.now() - timedelta(hours=24)
timestamps = [start_time + timedelta(hours=i*2) for i in range(13)]  # 13 readings over 24h

# Sample data for a patient developing CPE
# Note the trends: RR and HR slowly rise, SpO2 slowly falls, SBP has a rise then drop.
sample_data = {
    'timestamp': timestamps,
    'age': [65] * 13,
    'resp_rate': [18] * 13,  # No trend
    'heart_rate': [80] * 13, # No trend
    'spo2': [97] * 13,      # No trend
    'sbp': [120] * 13,      # No trend
    'dbp': [80] * 13,
    'temperature': [36.8] * 13,
}

# Create DataFrame
df_sample = pd.DataFrame(sample_data)
print("Sample Patient Data (Developing CPE):")
print(df_sample[['timestamp', 'resp_rate', 'heart_rate', 'spo2', 'sbp']].to_string(index=False))

Sample Patient Data (Developing CPE):
                 timestamp  resp_rate  heart_rate  spo2  sbp
2025-11-07 16:25:22.763782         18          80    97  120
2025-11-07 18:25:22.763782         18          80    97  120
2025-11-07 20:25:22.763782         18          80    97  120
2025-11-07 22:25:22.763782         18          80    97  120
2025-11-08 00:25:22.763782         18          80    97  120
2025-11-08 02:25:22.763782         18          80    97  120
2025-11-08 04:25:22.763782         18          80    97  120
2025-11-08 06:25:22.763782         18          80    97  120
2025-11-08 08:25:22.763782         18          80    97  120
2025-11-08 10:25:22.763782         18          80    97  120
2025-11-08 12:25:22.763782         18          80    97  120
2025-11-08 14:25:22.763782         18          80    97  120
2025-11-08 16:25:22.763782         18          80    97  120


In [10]:
# 1. Load and preprocess your data (using the sample above)
df = df_sample.copy()
df = df.sort_values('timestamp').reset_index(drop=True)

# 2. Apply age-based flags (your existing function)
df = apply_vital_range_flags(df)

# 3. Compute CPE-specific long-term trends
df = compute_cpe_trends(df)

# 4. Run the CPE detection on the processed DataFrame
cpe_result = detect_cpe(df)

# 5. Print the results

print(f"Timestamp: {cpe_result['timestamp']}")
print(f"CPE Alert Triggered: {cpe_result['flag_cpe']}")
print(f"Confidence: {'High' if cpe_result['flag_cpe'] else 'None'}")
print(f"\nClinical Narrative:\n{cpe_result['label']}")
print(f"\nEvidence (Slopes calculated over 12h):")
for key, value in cpe_result['evidence'].items():
    print(f"  {key}: {value}")

Timestamp: 2025-11-08 16:25:22.763782
CPE Alert Triggered: False
Confidence: None

Clinical Narrative:
No strong evidence of cardiogenic pulmonary edema.

Evidence (Slopes calculated over 12h):
  rr_slope_12h: 3.2296613977699864e-16
  hr_slope_12h: 5.686312435689305e-17
  spo2_slope_12h: -4.288085906619806e-16
  sbp_slope_12h: 1.3833026512288869e-15
  short_term_trend: Normal and stable
