In [2]:
,#@title Project Configuration { display-mode: "form" }

PROJECT_ID = "datathon-484308" #@param {type:"string"}
DATASET_PROJECT_ID = 'amsterdamumcdb'
DATASET_ID = 'van_gogh_2026_datathon_update'
LOCATION = 'eu'

import os
from google.colab import auth
from google.cloud import bigquery

# Set environment variable for Google Cloud Project
os.environ["GOOGLE_CLOUD_PROJECT"] = PROJECT_ID

print(f"Project ID set to: {PROJECT_ID}")
print(f"Dataset: {DATASET_PROJECT_ID}.{DATASET_ID} (Location: {LOCATION})")

#@title Authenticate User { display-mode: "form" }
auth.authenticate_user()
print('Successfully authenticated')

#@title Initialize Client { display-mode: "form" }
from google.cloud import bigquery

# Default query job configuration
def_config = bigquery.job.QueryJobConfig(default_dataset=f"{DATASET_PROJECT_ID}.{DATASET_ID}")

# Create the BigQuery client
client = bigquery.Client(project=PROJECT_ID, location=LOCATION, default_query_job_config=def_config)

# Enable data table display in Colab
%load_ext google.colab.data_table
from google.colab.data_table import DataTable
DataTable.max_columns = 50
DataTable.max_rows = 20000

print("BigQuery client initialized and ready.")

import pandas as pd
import pandas_gbq
import matplotlib.pyplot as plt
import numpy as np

config_gbq = {
    "query": {
        "defaultDataset": {"projectId": DATASET_PROJECT_ID, "datasetId": DATASET_ID},
        "location": LOCATION,
    }
}

src_config = f"{DATASET_PROJECT_ID}.{DATASET_ID}"

MY_NEW_DATASET_ID = "my_ventilation_study"

def q(sql: str) -> pd.DataFrame:
    return pandas_gbq.read_gbq(
        sql,
        project_id=PROJECT_ID,
        configuration=config_gbq,
        use_bqstorage_api=True,
    )

Project ID set to: datathon-484308
Dataset: amsterdamumcdb.van_gogh_2026_datathon_update (Location: eu)
Successfully authenticated
BigQuery client initialized and ready.


In [22]:
# --- B. Standard Concept Mapping (ID -> Column Name) ---
# These are concepts where the ID alone is sufficient to identify the parameter.

# Core Mechanical Ventilation (MV) Settings
MV_CORE = {
    3022875: 'peep_set',
    3012410: 'tv_set',
    3007469: 'rr_set'
}

# Airway Dimensions
AIRWAY = {
    2000000629: 'ett_depth',
    21491186: 'ett_diameter',
    36305611: 'trach_diameter'
}

# Extended MV Settings
MV_EXTENDED = {
    3000461: 'ps_set',
    2000000209: 'pinsp_above_peep',
    3045410: 'minute_vol_set',
    36304672: 'insp_time_set',
    # Note: 3000461 is repeated in your original list (ps_set), keeping both keys for now if intended
}

# Measured Ventilation Values
MV_MEASURED = {
    3016226: 'peep_meas',
    2000000238: 'peak_pressure',
    21490854: 'tv_meas'
}

# Standard NIV Parameters (ID based)
NIV_STANDARD = {
    2000000188: 'bipap_high',
    2000000191: 'bipap_low',
    2000000634: 'niv_leckage',
    42868484: 'niv_device'
}

# Oxygen Therapy
O2_THERAPY = {
    3005629: 'o2_flow'
}

# Blood Gas Analysis
BGA_OTHER = {
    3027801: 'pao2',
    3027946: 'paco2',
    3019977: 'ph'
}

# Simple Vitals (1 ID only)
VITALS_SIMPLE = {
    3024171: 'rr' # Respiratory Rate (User specified ID)
}

# Combine all simple concepts
STANDARD_CONCEPTS = {**MV_CORE, **AIRWAY, **MV_EXTENDED, **MV_MEASURED,
                     **NIV_STANDARD, **O2_THERAPY, **BGA_OTHER, **VITALS_SIMPLE}


# --- C. Complex Merged Mappings (N-to-1) ---
# Format: { "Column_Name": [List_of_IDs] }
# These values will be merged into a single column.
# Note: SpO2 will receive additional normalization logic (x100 if <= 1) in the SQL generator.
VITALS_COMPLEX = {
    # Heart Rate: Monitor (3027018) or Palpation (3003841)
    'hr': [3027018, 3003841, 3001376, 21490872],
    # SpO2: Pulse Oximetry (40762499) or BGA (3016502)
    'spo2': [40762499, 3016502],

    # Systolic BP: Invasive (21490853) or Non-invasive (21492239)
    'sbp': [21490853, 21492239],

    # Diastolic BP: Invasive (21490851) or Non-invasive (21492240)
    'dbp': [21490851, 21492240],

    'fio2': [2000000205, 2000000204, 2000000203]
}

# --- C. Special Parameter Logic (NIV Source Value Filtering) ---
# Some NIV parameters share IDs with other concepts and must be filtered by 'source_value'.
# Format: {'id': concept_id, 'source': 'exact string match', 'label': 'column_name'}
NIV_SPECIAL_CONFIG = [
    {"id": 2000000186, "source": "NIV Backup RR (Set)", "label": "niv_backup_rr"},
    {"id": 3024171, "source": "Frequentie ingesteld;R UMCA NIV FREQUENTIE S", "label": "niv_freq_set"},
    {"id": 2000000209, "source": "Pinsp/IPAP ingesteld;R UMCA NIV IPAP", "label": "niv_ipap"},
    {"id": 3024171, "source": "Frequentie gemeten;R UMCA NIV FREQUENTIE GEMETEN", "label": "niv_freq_meas"},
    {"id": 36303816, "source": "Tidal volume gemeten ;R UMCA NIV TIDAL VOLUME M", "label": "niv_tv_meas"},
    {"id": 21490754, "source": "AMV gemeten;R UMCA NIV MINUUTVOLUME M", "label": "niv_mv_meas"},
    {"id": 3005629, "source": "O2 flow;R UMCA NIV O2 FLOW", "label": "niv_o2_flow"},
    {"id": 36304672, "source": "Insp tijd (s);R UMCA NIV INSPIRATIE TIJD", "label": "niv_insp_time"}
]

# --- D. Classification Logic (Categories) ---
VENTILATION_MODE_CONCEPT_ID = 3004921
VENTILATION_MODE_CATEGORIES = {
    'NO_VENTILATION': [
        "Standby", "Stand By", "stand-by", "Stand by", "Ambient",
        "HME (kunstneus)", "Ventilation mode not supported by CIE",
        "Optional", "anders (opm.)"
    ],
    'NIV': [
        "NIV", "PS/CPAP in NIV", "Pressure Support / CPAP in NIV",
        "PS/ CPAP in NIV", "PC in NIV", "Pressure Control in NIV",
        "NIV-ST", "nCPAP", "nCPAP-PS", "nCPAP-IPPV", "Nasal CPAP",
        "Nasale CPAP/IMV", "BiPAP/ST", "S/T"
    ],
    'HIGH_FLOW_O2': [
        "Hoge flow", "High flow adult", "High flow groot",
        "High flow klein", "High flow junior", "02 Therapie"
    ],
    'ASSISTED': [
        "PS/CPAP", "Spont", "SPONT", "Pressure Support / CPAP", "ASV",
        "SIMV-PCVG", "PS", "PS/CPAP (trig)", "PSV-Pro",
        "PC - PS Automode trigger on", "NAVA", "CPAP/ASB", "CPAP_ASB",
        "PC - PS Automode trigger off", "SIMV-PC",
        "SIMV (Press. Contr.) + Pressure Support", "SIMV(PC)+PS",
        "BIPAP/ASB", "BIPAP", "CPAP", "SIMV, SIMV-VC", "(P)-CMV/ PCV+",
        "SIMV(VC)+PS", "P-SIMV/ P-SIMV+", "ASB", "PSIMV", "Bi Vente",
        "Bipap", "automode on: PS/ CPAP", "BiLevel",
        "SIMV (Vol. Cocitr.) + Pressure Support", "CPAP/PSV", "PSV",
        "DuoPAP", "P-CMV + APV/ (S)CMV+", "APRV", "Bi-level", "MMV/ASB",
        "SIMV_ASB", "SIMV (PC) + PS", "Pressure Support",
        "SIMV (Pressure Reg. Volume Control) + Pressure Support",
        "P-SIMV- APV/ SIMV+", "Volume Support", "APRV/BiPhasic",
        "CPAP, CPAP/PSV", "PC-SIMV", "MMV", "Pressure Support Mode",
        "PS.Tv", "CPAP op MT", "Auto Pressure Support", "MMV_ASB", "VS",
        "SB", "Bivent", "Pressure SIMV", "BIPAP-APRV", "PRVC SIMV",
        "SIMV/ASB/AutoFlow", "PRVC (trig)", "automode on: PRVC",
        "Biphasic", "BIPAP-SIMV", "CPAP/Apnea", "SIMV/AutoFlow",
        "Volume geassisteerd", "SIMV", "IPPV/IMV", "APVSIMV"
    ],
    # ADDED "PC " and "VC " below
    'CONTROLLED_PRESSURE': [
        "PC ", "PCV-VG", "Pressure control mode (PCV)", "PC", "Pressure Control",
        "pressure control mode (PCV)", "PC (No trig)", "PCMV",
        "Pressure Controled", "Druk gecontroleerd", "Pressure controlled Mode",
        "(A)PCV", "HFO", "Pressure Reg. Volume Control", "PRVC Assist/Control",
        "PSC", "Pressure Assist/Control", "P-CMV", "PRVC (No trig)",
        "PRVC (bij VC)", "PRVC", "APVCMV", "HLM-MODE", "BACKUP: (P)CMV",
        "Backup", "PCV", "Autoflow", "IPPV/AutoFlow", "IPPV/ASSIST/AutoFlow"
    ],
    'CONTROLLED_VOLUME': [
        "VC ", "volume control mode (VCV, CMV)", "Bag only",
        "Volume control mode (VCV, CMV)", "(S)CMV", "(S)-CMV", "VC",
        "Volume Control", "CPPV_Assist", "Volume gecontroleerd",
        "IPPV/ASSIST", "IPPV", "CPPV/ASSIST", "CPPV",
        "Volume controlled Ventilation Mode", "VCV", "IPPV_Assist",
        "Volume Assist/Control", "V(A)C", "VC (No trig)", "VC (trig)"
    ],
    'UNCERTAIN': [
        "kPa", "Adults", "mmHg", "%"
    ]
}

# 2. Define Order of Priority (Top = Most Invasive / Lowest Rank Number)
# This list controls the 'MIN(rank)' logic.
MODE_HIERARCHY = [
    'CONTROLLED_VOLUME',
    'CONTROLLED_PRESSURE',
    'ASSISTED',
    'NIV',
    'HIGH_FLOW_O2',
    'INVASIVE/UNCERTAIN', # Fallback category
    'NO_VENTILATION',
    'UNCERTAIN'
]

# 3. Dynamic Map Generation: e.g., {'CONTROLLED_VOLUME': 10, 'CONTROLLED_PRESSURE': 20, ...}
PRIORITY_MAP = {mode: (i + 1) * 10 for i, mode in enumerate(MODE_HIERARCHY)}

print("✅ Configuration loaded successfully.")

✅ Configuration loaded successfully.


In [23]:
# @title 2. Generate and Execute SQL (With FiO2 & OR-Detection)
from google.cloud import bigquery

# --- CONFIGURATION ---
TABLE_NAME = "ventilation_hours_final_complete"
FULL_TABLE_ID = f"{PROJECT_ID}.{MY_NEW_DATASET_ID}.{TABLE_NAME}"
# We define the marker string for Operating Room (Anesthesiology)
OR_MARKER_STRING = '%UMCA AN %'

# Helper to get priority
DEFAULT_PRIORITY = 99
def get_prio(key):
    return PRIORITY_MAP.get(key, DEFAULT_PRIORITY)

# 1. Build Rank Logic
rank_case_parts = []
for category, keywords in VENTILATION_MODE_CATEGORIES.items():
    prio = get_prio(category)
    clean_keywords = [k.replace("'", "\\'") for k in keywords]
    sql_list = ", ".join([f"'{k}'" for k in clean_keywords])
    rank_case_parts.append(f"WHEN value_source_value IN ({sql_list}) THEN {prio}")
rank_sql_snippet = "\n        ".join(rank_case_parts)

# 2. Build Pivots

# A) Collect IDs
std_ids = list(STANDARD_CONCEPTS.keys())
special_ids = [item['id'] for item in NIV_SPECIAL_CONFIG]
complex_ids = []
for ids in VITALS_COMPLEX.values():
    complex_ids.extend(ids)

all_ids = list(set(std_ids + special_ids + complex_ids + [VENTILATION_MODE_CONCEPT_ID]))
all_ids_sql = ", ".join(map(str, all_ids))

pivot_parts = []

# B) Standard Concepts
for cid, label in STANDARD_CONCEPTS.items():
    pivot_parts.append(f"MAX(CASE WHEN measurement_concept_id = {cid} THEN value_as_number END) AS {label}")

# C) Special NIV Parameters
for item in NIV_SPECIAL_CONFIG:
    pivot_parts.append(f"MAX(CASE WHEN measurement_concept_id = {item['id']} AND measurement_source_value = '{item['source']}' THEN value_as_number END) AS {item['label']}")

# D) Complex Vitals (with Normalization for SpO2 AND FiO2)
for col_name, ids in VITALS_COMPLEX.items():
    ids_sql = ", ".join(map(str, ids))

    if col_name in ['spo2', 'fio2']:
        # Normalize: If <= 1, multiply by 100
        pivot_parts.append(f"""
        MAX(CASE
            WHEN measurement_concept_id IN ({ids_sql}) THEN
                CASE
                    WHEN value_as_number <= 1 THEN value_as_number * 100
                    ELSE value_as_number
                END
        END) AS {col_name}
        """)
    else:
        pivot_parts.append(f"MAX(CASE WHEN measurement_concept_id IN ({ids_sql}) THEN value_as_number END) AS {col_name}")

full_pivot_sql = ",\n        ".join(pivot_parts)

# 3. Fallback Logic
niv_cols = list(NIV_STANDARD.values()) + [item['label'] for item in NIV_SPECIAL_CONFIG]
niv_check_condition = " OR ".join([f"{col} IS NOT NULL" for col in niv_cols])

mapping_cases = []
for mode, prio in PRIORITY_MAP.items():
    mapping_cases.append(f"WHEN best_rank = {prio} THEN '{mode}'")
mapping_sql = "\n            ".join(mapping_cases)

valid_modes = [m for m in MODE_HIERARCHY if m not in ['NO_VENTILATION', 'UNCERTAIN']]
valid_modes_sql = ", ".join([f"'{m}'" for m in valid_modes])


# --- 4. ASSEMBLE QUERY ---
query_final = f"""
CREATE OR REPLACE TABLE `{FULL_TABLE_ID}` AS

WITH hourly_data AS (
    SELECT
        person_id,
        TIMESTAMP_TRUNC(measurement_datetime, HOUR) AS start_time,
        measurement_concept_id,
        measurement_source_value,
        value_as_number,
        -- 1. Calculate Rank for Mode
        CASE
            WHEN measurement_concept_id = {VENTILATION_MODE_CONCEPT_ID} THEN
                CASE
                    {rank_sql_snippet}
                    ELSE {DEFAULT_PRIORITY}
                END
            ELSE NULL
        END AS mode_rank,
        -- 2. Detect Operating Room Source (Cost-efficient check)
        CASE
            WHEN measurement_source_value LIKE '{OR_MARKER_STRING}' THEN 1
            ELSE 0
        END AS is_or_source_row
    FROM `{DATASET_PROJECT_ID}.{DATASET_ID}.measurement`
    WHERE measurement_concept_id IN ({all_ids_sql})
      AND measurement_datetime IS NOT NULL
      AND NOT (measurement_concept_id = 3022875 AND value_as_number > 40)
),

aggregated AS (
    SELECT
        person_id,
        start_time,
        {full_pivot_sql},
        MIN(mode_rank) as best_rank,
        -- Aggregate location flag: If ANY row in this hour is OR, set to 1
        MAX(is_or_source_row) as is_or
    FROM hourly_data
    GROUP BY person_id, start_time
),

classified AS (
    SELECT
        *,
        TIMESTAMP_ADD(start_time, INTERVAL 1 HOUR) AS end_time,
        CASE
            {mapping_sql}
            ELSE
                CASE
                    WHEN ({niv_check_condition}) THEN 'NIV'
                    WHEN peep_set > 0 THEN 'INVASIVE/UNCERTAIN'
                    ELSE 'UNKNOWN'
                END
        END AS ventilation_mode
    FROM aggregated
)

-- FINAL FILTER
SELECT *
FROM classified
WHERE ventilation_mode IN ({valid_modes_sql})
ORDER BY person_id, start_time
"""

# --- EXECUTION ---
print(f"Generating Final Table `{TABLE_NAME}`...")

try:
    client.create_dataset(f"{PROJECT_ID}.{MY_NEW_DATASET_ID}", exists_ok=True)
    job = client.query(query_final)
    job.result()
    print(f"✅ Success! Table `{FULL_TABLE_ID}` created.")

    # --- VALIDATION ---
    print("\n--- Summary: Mode vs. Location (OR) ---")
    loc_query = f"""
        SELECT
            ventilation_mode,
            COUNT(*) as hours,
            SUM(is_or) as or_hours,
            ROUND(SUM(is_or) / COUNT(*) * 100, 1) as pct_in_or
        FROM `{FULL_TABLE_ID}`
        GROUP BY 1
        ORDER BY 1
    """
    display(client.query(loc_query).to_dataframe())

    print("\n--- Summary: Vitals Averages (FiO2 Check) ---")
    vit_query = f"""
        SELECT
            ventilation_mode,
            ROUND(AVG(fio2), 1) as avg_fio2,
            ROUND(AVG(spo2), 1) as avg_spo2,
            ROUND(AVG(sbp), 0) as avg_sbp
        FROM `{FULL_TABLE_ID}`
        GROUP BY 1
        ORDER BY 1
    """
    display(client.query(vit_query).to_dataframe())

except Exception as e:
    print(f"❌ Error: {e}")

Generating Final Table `ventilation_hours_final_complete`...
✅ Success! Table `datathon-484308.my_ventilation_study.ventilation_hours_final_complete` created.

--- Summary: Mode vs. Location (OR) ---


Unnamed: 0,ventilation_mode,hours,or_hours,pct_in_or
0,ASSISTED,2668683,13691,0.5
1,CONTROLLED_PRESSURE,735436,50998,6.9
2,CONTROLLED_VOLUME,114681,40621,35.4
3,HIGH_FLOW_O2,24598,8,0.0
4,INVASIVE/UNCERTAIN,47394,17889,37.7
5,NIV,1221657,3265,0.3



--- Summary: Vitals Averages (FiO2 Check) ---


Unnamed: 0,ventilation_mode,avg_fio2,avg_spo2,avg_sbp
0,ASSISTED,43.5,96.0,127.0
1,CONTROLLED_PRESSURE,51.5,96.3,121.0
2,CONTROLLED_VOLUME,63.0,98.0,127.0
3,HIGH_FLOW_O2,64.3,94.6,124.0
4,INVASIVE/UNCERTAIN,56.4,97.5,127.0
5,NIV,45.8,96.1,126.0
