# ECDF Generator with Yearly Breakdown

This notebook processes **Labs, Vitals, and Respiratory Support** using **DuckDB**.

## Imports and Setup

In [None]:
import json
import yaml
import os
from pathlib import Path
from typing import Dict, List, Any, Tuple, Optional
import duckdb
import pandas as pd
import numpy as np
import time

PROJECT_ROOT = Path(os.getcwd()).parent.parent
print(f"Project root: {PROJECT_ROOT}")
print(f"DuckDB version: {duckdb.__version__}")

## Configuration Loading

In [None]:
def load_configs(
    clif_config_path: str = None,
    outlier_config_path: str = None,
    lab_vital_config_path: str = None
) -> Tuple[Dict, Dict, Dict]:
    """
    Load all required configuration files.
    """
    if clif_config_path is None:
        clif_config_path = PROJECT_ROOT / 'config' / 'config.json'
    if outlier_config_path is None:
        outlier_config_path = PROJECT_ROOT / 'modules' / 'ecdf' / 'config' / 'outlier_config.yaml'
    if lab_vital_config_path is None:
        lab_vital_config_path = PROJECT_ROOT / 'modules' / 'ecdf' / 'config' / 'lab_vital_config.yaml'

    with open(clif_config_path, 'r') as f:
        clif_config = json.load(f)

    with open(outlier_config_path, 'r') as f:
        outlier_config = yaml.safe_load(f)

    with open(lab_vital_config_path, 'r') as f:
        lab_vital_config = yaml.safe_load(f)

    return clif_config, outlier_config, lab_vital_config

In [None]:
clif_config, outlier_config, lab_vital_config = load_configs()

print("CLIF Config:")
print(f"  tables_path: {clif_config.get('tables_path')}")
print(f"  file_type: {clif_config.get('file_type')}")
print()
print(f"Outlier config tables: {list(outlier_config.get('tables', {}).keys())}")
print(f"Lab categories in config: {list(lab_vital_config.get('labs', {}).keys())}")
print(f"Vital categories in config: {list(lab_vital_config.get('vitals', {}).keys())}")

## DuckDB Connection

In [None]:
# Create an in-memory DuckDB connection
con = duckdb.connect(database=':memory:')

print("DuckDB connection established")

## Helper Functions

In [None]:
def sanitize_unit_for_filename(unit) -> str:
    """
    Sanitize unit string for use in filename.
    Handles both string and list inputs.
    """
    if unit is None:
        return "unknown"
    
    if isinstance(unit, list):
        unit = '_'.join(str(u) for u in unit if u is not None)
    
    if not unit:
        return "unknown"

    sanitized = unit.replace('/', '_').replace('%', 'pct').replace('Â°', 'deg')
    sanitized = ''.join(c if c.isalnum() or c == '_' else '_' for c in sanitized)
    while '__' in sanitized:
        sanitized = sanitized.replace('__', '_')
    sanitized = sanitized.strip('_').lower()

    return sanitized

## Lab Standardization (using clifpy)

Note: clifpy returns pandas DataFrames, so we still use pandas for this step.
DuckDB can efficiently read the output parquet file.

In [None]:
import clifpy
from clifpy.tables import Labs
print(f"clifpy location: {clifpy.__file__}")

In [None]:
def standardize_labs(tables_path: str, file_type: str, output_path: str) -> None:
    """
    Standardize lab reference units and save as parquet.
    Uses pandas/clifpy for standardization, then saves for DuckDB consumption.
    """
    labs_path = os.path.join(tables_path, f'clif_labs.{file_type}')
    print(f"Loading labs from {labs_path}...")

    labs_df = pd.read_parquet(labs_path)
    print(f"Loaded {len(labs_df):,} rows")
    
    labs_inst = Labs(data=labs_df)
    labs_inst.get_lab_reference_units(save=True, output_directory="../../output/final/ecdf_yearly/labs")
    
    labs_standard = labs_inst.standardize_reference_units(
        save=True, 
        output_directory=str(Path(output_path).parent),
        lowercase=True, 
        inplace=False
    )
    
    labs_standard.to_parquet(output_path, index=False)
    print(f"Saved standardized labs to {output_path}")

In [None]:
# Standardize labs (run once)
STANDARDIZED_LABS_PATH = '../../output/intermediate/clif_labs_standardized.parquet'
os.makedirs(os.path.dirname(STANDARDIZED_LABS_PATH), exist_ok=True)

# Run standardization
standardize_labs(
    clif_config['tables_path'],
    clif_config['file_type'],
    STANDARDIZED_LABS_PATH
)

# Also save CSV copy
labs_standardized = pd.read_csv('../../output/intermediate/lab_reference_unit_standardized.csv') 
labs_standardized.to_csv('../../output/final/ecdf_yearly/labs/clif_labs_standardized.csv', index=False)

## Load ICU Windows into Memory Table

Materialize ICU windows ONCE as a table, not a view.

In [None]:
def load_icu_windows_duckdb(
    con: duckdb.DuckDBPyConnection,
    tables_path: str,
    file_type: str
) -> None:
    """
    Load ICU time windows into a materialized table (not a view).
    """
    adt_path = os.path.join(tables_path, f'clif_adt.{file_type}')
    print(f"Loading ICU time windows from {adt_path}...")

    # Create a TABLE (not view) - this materializes the data once
    con.execute(f"""
        CREATE OR REPLACE TABLE icu_windows AS
        SELECT 
            hospitalization_id,
            in_dttm::TIMESTAMP AS in_dttm,
            out_dttm::TIMESTAMP AS out_dttm
        FROM read_parquet('{adt_path}')
        WHERE LOWER(location_category) = 'icu'
    """)
    
    count = con.execute("SELECT COUNT(*) FROM icu_windows").fetchone()[0]
    print(f"Loaded {count:,} ICU time windows into memory")

In [None]:
load_icu_windows_duckdb(
    con,
    clif_config['tables_path'],
    clif_config['file_type']
)

print("\nSample ICU time windows:")
con.execute("SELECT * FROM icu_windows LIMIT 10").df()

## ECDF Computation with Yearly Breakdown (DuckDB)

Computes:
- Overall ECDF across all years
- Per-year ECDF for each year in the data

In [None]:
def compute_yearly_ecdf_duckdb(
    con: duckdb.DuckDBPyConnection, 
    table_name: str
) -> pd.DataFrame:
    """
    Compute ECDF with overall and per-year probabilities.
    
    The table must have columns: value, year
    
    Returns DataFrame with columns:
        value, overall_probability, 2018_probability, 2019_probability, ...
    """
    # Get distinct years
    years = con.execute(f"SELECT DISTINCT year FROM {table_name} ORDER BY year").fetchall()
    years = [int(y[0]) for y in years]
    print(f"  Years found: {years}")
    
    # Compute overall ECDF
    overall_ecdf = con.execute(f"""
        WITH ranked AS (
            SELECT 
                value,
                CUME_DIST() OVER (ORDER BY value) as probability
            FROM {table_name}
        )
        SELECT 
            value,
            MAX(probability) as overall_probability
        FROM ranked
        GROUP BY value
        ORDER BY value
    """).df()
    
    # Compute per-year ECDFs and join
    result_df = overall_ecdf.copy()
    
    for year in years:
        year_ecdf = con.execute(f"""
            WITH year_data AS (
                SELECT value FROM {table_name} WHERE year = {year}
            ),
            ranked AS (
                SELECT 
                    value,
                    CUME_DIST() OVER (ORDER BY value) as probability
                FROM year_data
            )
            SELECT 
                value,
                MAX(probability) as probability
            FROM ranked
            GROUP BY value
            ORDER BY value
        """).df()
        
        year_ecdf = year_ecdf.rename(columns={'probability': f'{year}_probability'})
        
        # Merge with result - use left join to keep all values from overall
        result_df = result_df.merge(year_ecdf, on='value', how='left')
    
    # Sort by value
    result_df = result_df.sort_values('value').reset_index(drop=True)
    
    return result_df

## Binning Functions

In [None]:
def compute_quantile_edges(con: duckdb.DuckDBPyConnection, table_name: str, num_bins: int) -> List[float]:
    """
    Compute quantile bin edges in a single query.
    """
    quantiles = [i / num_bins for i in range(num_bins + 1)]
    quantile_str = ', '.join([f"QUANTILE_CONT(value, {q})" for q in quantiles])
    
    result = con.execute(f"SELECT {quantile_str} FROM {table_name}").fetchone()
    edges = list(result)
    
    # Remove duplicates while preserving order
    unique_edges = []
    for edge in edges:
        if not unique_edges or abs(edge - unique_edges[-1]) > 1e-10:
            unique_edges.append(edge)
    
    return unique_edges

In [None]:
def count_all_bins_single_query(
    con: duckdb.DuckDBPyConnection,
    table_name: str,
    bin_edges: List[float]
) -> List[int]:
    """
    Count all bins in a SINGLE query using CASE WHEN.
    This is the key optimization - no more N separate queries!
    """
    if len(bin_edges) < 2:
        count = con.execute(f"SELECT COUNT(*) FROM {table_name}").fetchone()[0]
        return [count]
    
    # Build CASE WHEN for each bin
    case_parts = []
    for i in range(len(bin_edges) - 1):
        lower = bin_edges[i]
        upper = bin_edges[i + 1]
        
        if i == 0:
            # First bin: inclusive on both ends
            case_parts.append(f"SUM(CASE WHEN value >= {lower} AND value <= {upper} THEN 1 ELSE 0 END) as bin_{i}")
        else:
            # Other bins: exclusive lower, inclusive upper
            case_parts.append(f"SUM(CASE WHEN value > {lower} AND value <= {upper} THEN 1 ELSE 0 END) as bin_{i}")
    
    query = f"SELECT {', '.join(case_parts)} FROM {table_name}"
    result = con.execute(query).fetchone()
    
    return [int(c) for c in result]

In [None]:
def create_flat_bins_duckdb(
    con: duckdb.DuckDBPyConnection,
    table_name: str,
    num_bins: int = 10
) -> List[Dict[str, Any]]:
    """
    Create flat quantile bins using DuckDB - optimized version.
    """
    count = con.execute(f"SELECT COUNT(*) FROM {table_name}").fetchone()[0]
    
    if count == 0:
        return []
    
    if count < num_bins * 2:
        num_bins = max(1, count // 2)
    
    # Get bin edges
    bin_edges = compute_quantile_edges(con, table_name, num_bins)
    
    if len(bin_edges) < 2:
        min_val, max_val = con.execute(f"SELECT MIN(value), MAX(value) FROM {table_name}").fetchone()
        return [{
            'bin_num': 1,
            'bin_min': float(min_val),
            'bin_max': float(max_val),
            'count': count,
            'percentage': 100.0,
            'interval': f"[{min_val:.2f}, {max_val:.2f}]"
        }]
    
    # Count all bins in ONE query
    bin_counts = count_all_bins_single_query(con, table_name, bin_edges)
    
    bins = []
    for i in range(len(bin_edges) - 1):
        lower = bin_edges[i]
        upper = bin_edges[i + 1]
        bin_count = bin_counts[i]
        
        if i == 0:
            interval_str = f"[{lower:.2f}, {upper:.2f}]"
        else:
            interval_str = f"({lower:.2f}, {upper:.2f}]"
        
        bins.append({
            'bin_num': i + 1,
            'bin_min': float(lower),
            'bin_max': float(upper),
            'count': bin_count,
            'percentage': float(bin_count / count * 100),
            'interval': interval_str
        })
    
    return bins

In [None]:
def create_bins_for_segment_duckdb(
    con: duckdb.DuckDBPyConnection,
    table_name: str,
    segment_min: float,
    segment_max: float,
    num_bins: int,
    segment_name: str,
    extra_bins_last: int = 0,
    split_first: bool = False
) -> List[Dict[str, Any]]:
    """
    Create quantile-based bins for a segment - optimized version.
    """
    # Create temp table for segment (materialized, not view)
    con.execute(f"""
        CREATE OR REPLACE TEMP TABLE segment_data AS
        SELECT value FROM {table_name}
        WHERE value >= {segment_min} AND value <= {segment_max}
    """)
    
    segment_count = con.execute("SELECT COUNT(*) FROM segment_data").fetchone()[0]
    
    if segment_count == 0:
        return []
    
    if num_bins == 1 or segment_count < num_bins * 2:
        min_val, max_val = con.execute("SELECT MIN(value), MAX(value) FROM segment_data").fetchone()
        return [{
            'segment': segment_name,
            'bin_num': 1,
            'bin_min': float(min_val),
            'bin_max': float(max_val),
            'count': segment_count,
            'percentage': 100.0
        }]
    
    # Get bin edges
    bin_edges = compute_quantile_edges(con, 'segment_data', num_bins)
    
    if len(bin_edges) < 2:
        min_val, max_val = con.execute("SELECT MIN(value), MAX(value) FROM segment_data").fetchone()
        return [{
            'segment': segment_name,
            'bin_num': 1,
            'bin_min': float(min_val),
            'bin_max': float(max_val),
            'count': segment_count,
            'percentage': 100.0
        }]
    
    # Count all bins in ONE query
    bin_counts = count_all_bins_single_query(con, 'segment_data', bin_edges)
    
    bins = []
    for i in range(len(bin_edges) - 1):
        bins.append({
            'segment': segment_name,
            'bin_num': i + 1,
            'bin_min': float(bin_edges[i]),
            'bin_max': float(bin_edges[i + 1]),
            'count': bin_counts[i],
            'percentage': float(bin_counts[i] / segment_count * 100)
        })
    
    # Handle extra bins for extreme values
    if extra_bins_last > 0 and len(bins) > 1:
        if split_first:
            # Split the first bin
            extreme_bin = bins[0]
            con.execute(f"""
                CREATE OR REPLACE TEMP TABLE extreme_data AS
                SELECT value FROM segment_data
                WHERE value >= {extreme_bin['bin_min']} AND value <= {extreme_bin['bin_max']}
            """)
            
            extreme_count = con.execute("SELECT COUNT(*) FROM extreme_data").fetchone()[0]
            
            if extreme_count >= extra_bins_last * 2:
                tail_edges = compute_quantile_edges(con, 'extreme_data', extra_bins_last)
                
                if len(tail_edges) > 1:
                    tail_counts = count_all_bins_single_query(con, 'extreme_data', tail_edges)
                    
                    new_bins = []
                    for j in range(len(tail_edges) - 1):
                        new_bins.append({
                            'segment': segment_name,
                            'bin_num': j + 1,
                            'bin_min': float(tail_edges[j]),
                            'bin_max': float(tail_edges[j + 1]),
                            'count': tail_counts[j],
                            'percentage': float(tail_counts[j] / segment_count * 100)
                        })
                    
                    for bin_info in bins[1:]:
                        bin_info['bin_num'] = len(new_bins) + 1
                        new_bins.append(bin_info)
                    
                    bins = new_bins
        else:
            # Split the last bin
            extreme_bin = bins[-1]
            con.execute(f"""
                CREATE OR REPLACE TEMP TABLE extreme_data AS
                SELECT value FROM segment_data
                WHERE value > {extreme_bin['bin_min']} AND value <= {extreme_bin['bin_max']}
            """)
            
            extreme_count = con.execute("SELECT COUNT(*) FROM extreme_data").fetchone()[0]
            
            if extreme_count >= extra_bins_last * 2:
                tail_edges = compute_quantile_edges(con, 'extreme_data', extra_bins_last)
                
                if len(tail_edges) > 1:
                    tail_counts = count_all_bins_single_query(con, 'extreme_data', tail_edges)
                    
                    bins = bins[:-1]
                    for j in range(len(tail_edges) - 1):
                        bins.append({
                            'segment': segment_name,
                            'bin_num': len(bins) + 1,
                            'bin_min': float(tail_edges[j]),
                            'bin_max': float(tail_edges[j + 1]),
                            'count': tail_counts[j],
                            'percentage': float(tail_counts[j] / segment_count * 100)
                        })
    
    return bins

In [None]:
def create_all_bins_duckdb(
    con: duckdb.DuckDBPyConnection,
    table_name: str,
    normal_lower: float,
    normal_upper: float,
    outlier_min: float,
    outlier_max: float,
    bins_below: int,
    bins_normal: int,
    bins_above: int,
    extra_bins_below: int = 0,
    extra_bins_above: int = 0
) -> List[Dict[str, Any]]:
    """
    Create bins for all segments (below/normal/above).
    """
    all_bins = []

    if bins_below > 0 and outlier_min < normal_lower:
        below_bins = create_bins_for_segment_duckdb(
            con, table_name,
            outlier_min, normal_lower, bins_below, 'below',
            extra_bins_last=extra_bins_below,
            split_first=True
        )
        all_bins.extend(below_bins)

    if bins_normal > 0:
        normal_bins = create_bins_for_segment_duckdb(
            con, table_name,
            normal_lower, normal_upper, bins_normal, 'normal'
        )
        all_bins.extend(normal_bins)

    if bins_above > 0 and normal_upper < outlier_max:
        above_bins = create_bins_for_segment_duckdb(
            con, table_name,
            normal_upper, outlier_max, bins_above, 'above',
            extra_bins_last=extra_bins_above,
            split_first=False
        )
        all_bins.extend(above_bins)

    return all_bins

## Process Labs/Vitals with Yearly ECDF

In [None]:
def process_category_duckdb(
    con: duckdb.DuckDBPyConnection,
    table_type: str,
    category: str,
    unit: Optional[str],
    tables_path: str,
    file_type: str,
    outlier_range: Dict[str, float],
    cat_config: Dict[str, Any],
    output_dir: str = None,
    extreme_bins_count: int = 5,
    save_output: bool = False
) -> Tuple[Dict[str, Any], Any, Any]:
    """
    Process a single lab/vital category using DuckDB.
    KEY: Materializes data into a temp table ONCE, then all ops run on that.
    Now includes year column for yearly ECDF computation.
    """
    if table_type == 'labs':
        file_path = '../../output/intermediate/clif_labs_standardized.parquet'
        category_col = 'lab_category'
        value_col = 'lab_value_numeric'
        datetime_col = 'lab_result_dttm'
    else:
        file_path = os.path.join(tables_path, f'clif_vitals.{file_type}')
        category_col = 'vital_category'
        value_col = 'vital_value'
        datetime_col = 'recorded_dttm'

    if not os.path.exists(file_path):
        raise FileNotFoundError(f"Data file not found: {file_path}")

    display_name = f"{category} ({unit})" if table_type == 'labs' and unit else category
    print(f"Processing {display_name}...")

    # Build unit filter for SQL
    unit_filter = ""
    if table_type == 'labs' and unit:
        if isinstance(unit, list):
            unit_values = [f"'{u}'" for u in unit if u != '(no units)']
            if unit_values:
                unit_filter = f"AND (reference_unit IN ({', '.join(unit_values)})"
                if '(no units)' in unit:
                    unit_filter += " OR reference_unit IS NULL)"
                else:
                    unit_filter += ")"
            elif '(no units)' in unit:
                unit_filter = "AND reference_unit IS NULL"
        else:
            if unit == '(no units)':
                unit_filter = "AND reference_unit IS NULL"
            else:
                unit_filter = f"AND reference_unit = '{unit}'"

    # CRITICAL: Materialize into a TEMP TABLE, not a view!
    # Include year column for yearly ECDF
    con.execute(f"""
        CREATE OR REPLACE TEMP TABLE clean_data AS
        SELECT 
            d.{value_col} as value,
            EXTRACT(YEAR FROM d.{datetime_col}::TIMESTAMP)::INTEGER as year
        FROM read_parquet('{file_path}') d
        INNER JOIN icu_windows w ON d.hospitalization_id = w.hospitalization_id
        WHERE d.{category_col} = '{category}'
            {unit_filter}
            AND d.{datetime_col}::TIMESTAMP >= w.in_dttm
            AND d.{datetime_col}::TIMESTAMP <= w.out_dttm
            AND d.{value_col} >= {outlier_range['min']}
            AND d.{value_col} <= {outlier_range['max']}
    """)

    clean_count = con.execute("SELECT COUNT(*) FROM clean_data").fetchone()[0]
    print(f"  Clean count: {clean_count:,}")

    if clean_count == 0:
        print(f"  WARNING: No data found for {display_name}")
        return (
            {'category': category, 'unit': unit, 'original_count': 0, 'clean_count': 0},
            None,
            None
        )

    # Compute Yearly ECDF
    ecdf_df = compute_yearly_ecdf_duckdb(con, 'clean_data')
    print(f"  ECDF: {len(ecdf_df):,} distinct values (compression: {clean_count / len(ecdf_df):.1f}x)")

    # Compute Bins
    bins_config = cat_config.get('bins', {})
    bins_below = bins_config.get('below_normal', 0) or 0
    bins_normal = bins_config.get('normal', 0) or 0
    bins_above = bins_config.get('above_normal', 0) or 0

    extra_bins_below = extreme_bins_count if bins_below > 1 else 0
    extra_bins_above = extreme_bins_count if bins_above > 1 else 0

    normal_range = cat_config.get('normal_range', {})
    normal_lower = normal_range.get('lower', outlier_range['min'])
    normal_upper = normal_range.get('upper', outlier_range['max'])

    bins = create_all_bins_duckdb(
        con=con,
        table_name='clean_data',
        normal_lower=normal_lower,
        normal_upper=normal_upper,
        outlier_min=outlier_range['min'],
        outlier_max=outlier_range['max'],
        bins_below=bins_below,
        bins_normal=bins_normal,
        bins_above=bins_above,
        extra_bins_below=extra_bins_below,
        extra_bins_above=extra_bins_above
    )

    for bin_info in bins:
        if bin_info['bin_num'] == 1:
            interval = f"[{bin_info['bin_min']:.2f}, {bin_info['bin_max']:.2f}]"
        else:
            interval = f"({bin_info['bin_min']:.2f}, {bin_info['bin_max']:.2f}]"
        bin_info['interval'] = interval

    bins_df = pd.DataFrame(bins) if bins else pd.DataFrame()
    print(f"  Bins: {len(bins_df)}")

    if save_output and output_dir:
        if table_type == 'labs' and unit:
            unit_for_filename = unit[0] if isinstance(unit, list) else unit
            unit_safe = sanitize_unit_for_filename(unit_for_filename)
            filename = f'{category}_{unit_safe}.parquet'
        else:
            filename = f'{category}.parquet'

        ecdf_dir = os.path.join(output_dir, 'ecdf_yearly', table_type)
        os.makedirs(ecdf_dir, exist_ok=True)
        ecdf_df.to_parquet(os.path.join(ecdf_dir, filename), index=False)

        bins_dir = os.path.join(output_dir, 'bins', table_type)
        os.makedirs(bins_dir, exist_ok=True)
        bins_df.to_parquet(os.path.join(bins_dir, filename), index=False)

        print(f"  Saved to {output_dir}")

    # Cleanup temp table
    con.execute("DROP TABLE IF EXISTS clean_data")

    stats = {
        'category': category,
        'unit': unit if table_type == 'labs' else None,
        'clean_count': clean_count,
        'ecdf_distinct_values': len(ecdf_df),
        'num_bins': len(bins)
    }

    return stats, ecdf_df, bins_df

## Process Respiratory Support with Yearly ECDF

In [None]:
def process_respiratory_column_duckdb(
    con: duckdb.DuckDBPyConnection,
    column_name: str,
    tables_path: str,
    file_type: str,
    outlier_range: Dict[str, float],
    output_dir: str = None,
    num_bins: int = 10,
    save_output: bool = False
) -> Tuple[Dict[str, Any], Any, Any]:
    """
    Process a single respiratory support column using DuckDB.
    Now includes year column for yearly ECDF computation.
    """
    file_path = os.path.join(tables_path, f'clif_respiratory_support.{file_type}')

    if not os.path.exists(file_path):
        raise FileNotFoundError(f"Respiratory support file not found: {file_path}")

    print(f"Processing {column_name}...")

    # CRITICAL: Materialize into a TEMP TABLE with year column
    con.execute(f"""
        CREATE OR REPLACE TEMP TABLE clean_resp AS
        SELECT 
            d.{column_name} as value,
            EXTRACT(YEAR FROM d.recorded_dttm::TIMESTAMP)::INTEGER as year
        FROM read_parquet('{file_path}') d
        INNER JOIN icu_windows w ON d.hospitalization_id = w.hospitalization_id
        WHERE d.recorded_dttm::TIMESTAMP >= w.in_dttm
            AND d.recorded_dttm::TIMESTAMP <= w.out_dttm
            AND d.{column_name} IS NOT NULL
            AND d.{column_name} >= {outlier_range['min']}
            AND d.{column_name} <= {outlier_range['max']}
    """)

    clean_count = con.execute("SELECT COUNT(*) FROM clean_resp").fetchone()[0]
    print(f"  Clean count: {clean_count:,}")

    if clean_count == 0:
        print(f"  WARNING: No data found for {column_name}")
        return (
            {'column': column_name, 'clean_count': 0},
            None,
            None
        )

    # Compute Yearly ECDF
    ecdf_df = compute_yearly_ecdf_duckdb(con, 'clean_resp')
    print(f"  ECDF: {len(ecdf_df):,} distinct values (compression: {clean_count / len(ecdf_df):.1f}x)")

    # Compute Flat Bins
    bins = create_flat_bins_duckdb(con, 'clean_resp', num_bins=num_bins)
    bins_df = pd.DataFrame(bins) if bins else pd.DataFrame()
    print(f"  Bins: {len(bins_df)}")

    if save_output and output_dir:
        ecdf_dir = os.path.join(output_dir, 'ecdf_yearly', 'respiratory_support')
        os.makedirs(ecdf_dir, exist_ok=True)
        ecdf_df.to_parquet(os.path.join(ecdf_dir, f'{column_name}.parquet'), index=False)

        bins_dir = os.path.join(output_dir, 'bins', 'respiratory_support')
        os.makedirs(bins_dir, exist_ok=True)
        bins_df.to_parquet(os.path.join(bins_dir, f'{column_name}.parquet'), index=False)

        print(f"  Saved to {output_dir}")

    # Cleanup temp table
    con.execute("DROP TABLE IF EXISTS clean_resp")

    stats = {
        'column': column_name,
        'clean_count': clean_count,
        'ecdf_distinct_values': len(ecdf_df),
        'num_bins': len(bins)
    }

    return stats, ecdf_df, bins_df

## Configuration References

In [None]:
labs_config = lab_vital_config.get('labs', {})
labs_outlier = outlier_config['tables']['labs']['lab_value_numeric']

vitals_config = lab_vital_config.get('vitals', {})
vitals_outlier = outlier_config['tables']['vitals']['vital_value']

resp_outlier = outlier_config['tables'].get('respiratory_support', {})

resp_columns = [
    'fio2_set', 'lpm_set', 'tidal_volume_set', 'resp_rate_set',
    'pressure_control_set', 'pressure_support_set', 'flow_rate_set',
    'peak_inspiratory_pressure_set', 'inspiratory_time_set', 'peep_set',
    'tidal_volume_obs', 'resp_rate_obs', 'plateau_pressure_obs',
    'peak_inspiratory_pressure_obs', 'peep_obs', 'minute_vent_obs',
    'mean_airway_pressure_obs'
]

print(f"Labs: {len(labs_config)} categories")
print(f"Vitals: {len(vitals_config)} categories")
print(f"Respiratory: {len(resp_columns)} columns")

# Run Full Pipeline

In [None]:
OUTPUT_DIR = str(PROJECT_ROOT / 'output' / 'final')
os.makedirs(OUTPUT_DIR, exist_ok=True)
print(f"Output directory: {OUTPUT_DIR}")

start_time = time.time()

## Process All Labs

In [None]:
print("="*80)
print("Processing Labs - Yearly ECDF")
print("="*80)

labs_stats = []

for category, cat_config in labs_config.items():
    if cat_config is None:
        print(f"WARNING: Category '{category}' has None config, skipping")
        continue

    if not isinstance(cat_config, dict):
        continue

    if category not in labs_outlier:
        print(f"WARNING: Category '{category}' not in outlier config, skipping")
        continue

    config_unit = cat_config.get('reference_unit')
    if config_unit is None:
        print(f"WARNING: Category '{category}' has no reference_unit in config, skipping")
        continue

    if isinstance(config_unit, str):
        unit = config_unit.lower()
    elif isinstance(config_unit, list):
        unit = [u.lower() if isinstance(u, str) else u for u in config_unit]
    else:
        unit = config_unit

    try:
        stats, ecdf_df, bins_df = process_category_duckdb(
            con=con,
            table_type='labs',
            category=category,
            unit=unit,
            tables_path=clif_config['tables_path'],
            file_type=clif_config['file_type'],
            outlier_range=labs_outlier[category],
            cat_config=cat_config,
            output_dir=OUTPUT_DIR,
            extreme_bins_count=5,
            save_output=True
        )
        labs_stats.append(stats)
    except Exception as e:
        print(f"ERROR processing {category}: {e}")

print(f"\nProcessed {len(labs_stats)} lab categories")

## Process All Vitals

In [None]:
print("="*80)
print("Processing Vitals - Yearly ECDF")
print("="*80)

vitals_stats = []

for category, cat_config in vitals_config.items():
    if cat_config is None:
        print(f"WARNING: Category '{category}' has None config, skipping")
        continue

    if not isinstance(cat_config, dict):
        continue

    if category not in vitals_outlier:
        print(f"WARNING: Category '{category}' not in outlier config, skipping")
        continue

    extreme_bins = 5 if category in ['height_cm', 'weight_kg'] else 10

    try:
        stats, ecdf_df, bins_df = process_category_duckdb(
            con=con,
            table_type='vitals',
            category=category,
            unit=None,
            tables_path=clif_config['tables_path'],
            file_type=clif_config['file_type'],
            outlier_range=vitals_outlier[category],
            cat_config=cat_config,
            output_dir=OUTPUT_DIR,
            extreme_bins_count=extreme_bins,
            save_output=True
        )
        vitals_stats.append(stats)
    except Exception as e:
        print(f"ERROR processing {category}: {e}")

print(f"\nProcessed {len(vitals_stats)} vital categories")

## Process All Respiratory Support

In [None]:
print("="*80)
print("Processing Respiratory Support - Yearly ECDF")
print("="*80)

resp_stats = []

for column_name in resp_columns:
    if column_name not in resp_outlier:
        print(f"WARNING: Column '{column_name}' not in outlier config, skipping")
        continue
    
    try:
        stats, ecdf_df, bins_df = process_respiratory_column_duckdb(
            con=con,
            column_name=column_name,
            tables_path=clif_config['tables_path'],
            file_type=clif_config['file_type'],
            outlier_range=resp_outlier[column_name],
            output_dir=OUTPUT_DIR,
            num_bins=10,
            save_output=True
        )
        resp_stats.append(stats)
    except Exception as e:
        print(f"ERROR processing {column_name}: {e}")

print(f"\nProcessed {len(resp_stats)} respiratory columns")

## Summary

In [None]:
elapsed = time.time() - start_time

print("="*80)
print("Processing Summary")
print("="*80)
print(f"\nLabs: {len(labs_stats)} categories processed")
print(f"Vitals: {len(vitals_stats)} categories processed")
print(f"Respiratory: {len(resp_stats)} columns processed")
print(f"\nOutput saved to: {OUTPUT_DIR}")
print(f"  - ECDF (yearly): {OUTPUT_DIR}/")
print(f"  - Bins: {OUTPUT_DIR}/bins/")
print(f"\nElapsed: {elapsed:.1f}s")

## Example Output

In [None]:
# Show example output for one lab
example_file = os.path.join(OUTPUT_DIR, 'labs', 'hemoglobin_g_dl.parquet')
if os.path.exists(example_file):
    example_df = pd.read_parquet(example_file)
    print("Example output (hemoglobin):")
    print(f"Columns: {list(example_df.columns)}")
    print()
    print(example_df.head(20))
else:
    print("Example file not found - run the pipeline first")

## Cleanup

In [None]:
# Close the DuckDB connection when done
con.close()
print("DuckDB connection closed")