# Precision Vent (Van Gogh) — Final Dataset Builder

This notebook builds the **analysis-ready RL dataset** for optimizing **PEEP** to reduce a **driving-pressure surrogate** on AmsterdamUMCdb **Van Gogh** (OMOP).

## Outputs (in your GCP project)
- `pv_concepts_checked`
- `pv_vent_episodes_v2`
- `pv_hourly_features_v2`
- `pv_hourly_pressors_v2`
- `pv_learning_dataset_v2`

## Locked definitions (Van Gogh)
- **Action**: `PEEP` cleaned to `[1,30]` cmH₂O
- **Outcome**: `dp_dyn_next`
- **Driving pressure surrogate**: `dp_dyn = Pmax − PEEP` (plateau pressure not present)
- **FiO₂** normalized to `[0,1]`
- **MAP**: computed as `(SBP + 2×DBP)/3` when SBP+DBP exist
- **Pressors**: ingredient-expanded via `concept_ancestor`


## 0) Config

In [1]:
# ====== USER CONFIG ======
PROJECT_ID = "getting-started-datathon-26"
DATASET_PROJECT_ID = "amsterdamumcdb"
DATASET_ID = "van_gogh_2026_datathon"
LOCATION = "EU"

USER_DATASET = "precision_vent"
TABLE_PREFIX = "pv"

SAMPLE_RATE_EPISODES = 1.0   # set 0.05 for fast iteration
ADULT_ONLY = False
ADULT_MIN_AGE = 18

# Cleaning bounds
PEEP_MIN, PEEP_MAX = 1.0, 30.0
PMAX_MIN, PMAX_MAX = 5.0, 80.0
FIO2_MIN, FIO2_MAX = 0.0, 1.0
SBP_MIN, SBP_MAX = 40.0, 250.0
DBP_MIN, DBP_MAX = 20.0, 150.0


## 1) Auth + BigQuery client

In [2]:
import os
from google.cloud import bigquery

os.environ["GOOGLE_CLOUD_PROJECT"] = PROJECT_ID

try:
    from google.colab import auth
    auth.authenticate_user()
    print("✅ Colab authentication complete.")
except Exception:
    print("ℹ️ Not running in Colab (or auth skipped). If local, run:")
    print("   gcloud auth login")
    print("   gcloud auth application-default login")

client = bigquery.Client(project=PROJECT_ID, location=LOCATION)
print("✅ BigQuery client ready:", client.project)


ℹ️ Not running in Colab (or auth skipped). If local, run:
   gcloud auth login
   gcloud auth application-default login
✅ BigQuery client ready: getting-started-datathon-26


## 2) Create output dataset (if missing)

In [3]:
from google.api_core.exceptions import Conflict
from google.cloud import bigquery

ds = bigquery.Dataset(f"{PROJECT_ID}.{USER_DATASET}")
ds.location = LOCATION
try:
    client.create_dataset(ds)
    print(f"✅ Created dataset: {PROJECT_ID}.{USER_DATASET}")
except Conflict:
    print(f"ℹ️ Dataset already exists: {PROJECT_ID}.{USER_DATASET}")


ℹ️ Dataset already exists: getting-started-datathon-26.precision_vent


## 3) Detect schema (does measurement have visit_occurrence_id?)

In [4]:
# Detect schema differences in Van Gogh (not all OMOP columns exist)
meas_cols = client.query(f"""
SELECT column_name
FROM `{DATASET_PROJECT_ID}.{DATASET_ID}.INFORMATION_SCHEMA.COLUMNS`
WHERE table_name="measurement"
""").to_dataframe()["column_name"].tolist()

drug_cols = client.query(f"""
SELECT column_name
FROM `{DATASET_PROJECT_ID}.{DATASET_ID}.INFORMATION_SCHEMA.COLUMNS`
WHERE table_name="drug_exposure"
""").to_dataframe()["column_name"].tolist()

MEAS_COLS = set(meas_cols)
DRUG_COLS = set(drug_cols)

HAS_VISIT_IN_MEAS = "visit_occurrence_id" in MEAS_COLS
HAS_VISIT_IN_DRUG = "visit_occurrence_id" in DRUG_COLS

# Pick best available unit column for measurement (if any)
if "unit_concept_id" in MEAS_COLS:
    MEAS_UNIT_COL = "unit_concept_id"
elif "unit_source_value" in MEAS_COLS:
    MEAS_UNIT_COL = "unit_source_value"
elif "unit" in MEAS_COLS:
    MEAS_UNIT_COL = "unit"
else:
    MEAS_UNIT_COL = None

print("HAS visit_occurrence_id in measurement?", HAS_VISIT_IN_MEAS)
print("HAS visit_occurrence_id in drug_exposure?", HAS_VISIT_IN_DRUG)
print("Measurement unit column:", MEAS_UNIT_COL)




HAS visit_occurrence_id in measurement? False
HAS visit_occurrence_id in drug_exposure? False
Measurement unit column: unit_source_value


## 4) Concept IDs (final MVP set)

In [5]:
# Measurements (measurement_concept_id)
MEAS = {
    # PEEP (action)
    "peep_ordered": 3022875,
    "peep_actual": 21490855,
    "peep_total": 42527140,

    # Peak/Max airway pressure (DP surrogate numerator)
    "pmax": 3016078,

    # FiO2
    "fio2_vent": 3024882,
    "fio2_inhaled": 42869590,

    # BP (DBP confirmed; SBP IDs to validate coverage)
    "dbp_invasive": 21490851,
    "dbp_noninvasive": 21492240,
    "sbp_invasive": 21490852,
    "sbp_noninvasive": 21492241,
}

# Pressors ingredient name resolution (we'll query concept table)
PRESSOR_INGREDIENT_NAMES = {
    "norepi": ["norepinephrine", "noradrenaline"],
    "epi": ["epinephrine", "adrenaline"],
    "dopamine": ["dopamine"],
    "terlipressin": ["terlipressin"],
}


## 5) Write concept metadata table (`pv_concepts_checked`)

In [6]:
from google.cloud import bigquery

CONCEPTS_TABLE = f"`{PROJECT_ID}.{USER_DATASET}.{TABLE_PREFIX}_concepts_checked`"
ids_to_check = list(MEAS.values()) + [2907531, 19076867, 36411287, 19119253]  # example drugs you shared

client.query(f"""
CREATE OR REPLACE TABLE {CONCEPTS_TABLE} AS
SELECT concept_id, concept_name, domain_id, vocabulary_id, concept_class_id,
       standard_concept, invalid_reason
FROM `{DATASET_PROJECT_ID}.{DATASET_ID}.concept`
WHERE concept_id IN UNNEST(@ids)
""", job_config=bigquery.QueryJobConfig(
    query_parameters=[bigquery.ArrayQueryParameter("ids", "INT64", ids_to_check)]
)).result()

print("✅ Wrote:", CONCEPTS_TABLE)
client.query(f"SELECT * FROM {CONCEPTS_TABLE} ORDER BY domain_id, concept_id").to_dataframe()


✅ Wrote: `getting-started-datathon-26.precision_vent.pv_concepts_checked`


Unnamed: 0,concept_id,concept_name,domain_id,vocabulary_id,concept_class_id,standard_concept,invalid_reason
0,2907531,50 ML norepinephrine 0.2 MG/ML Injection,Drug,RxNorm Extension,Quant Clinical Drug,S,
1,19076867,epinephrine 0.1 MG/ML Injectable Solution,Drug,RxNorm,Clinical Drug,S,
2,19119253,terlipressin,Drug,RxNorm,Ingredient,S,
3,36411287,50 ML Dopamine 4 MG/ML Injectable Solution,Drug,RxNorm Extension,Quant Clinical Drug,S,
4,3016078,Maximum [Pressure] Respiratory system airway o...,Measurement,LOINC,Clinical Observation,S,
5,3022875,Positive end expiratory pressure setting Venti...,Measurement,LOINC,Clinical Observation,S,
6,3024882,Oxygen/Total gas setting [Volume Fraction] Ven...,Measurement,LOINC,Clinical Observation,S,
7,21490851,Invasive Diastolic blood pressure,Measurement,LOINC,Clinical Observation,S,
8,21490852,Invasive Mean blood pressure,Measurement,LOINC,Clinical Observation,S,
9,21490855,PEEP Respiratory system --on ventilator,Measurement,LOINC,Clinical Observation,S,


## 6) Resolve pressor ingredient IDs (ingredient-expanded)

In [7]:
from google.cloud import bigquery

name_list = sorted({n for v in PRESSOR_INGREDIENT_NAMES.values() for n in v})

df_ing = client.query(f"""
SELECT concept_id, concept_name, concept_class_id, domain_id, standard_concept, invalid_reason
FROM `{DATASET_PROJECT_ID}.{DATASET_ID}.concept`
WHERE domain_id='Drug'
  AND concept_class_id='Ingredient'
  AND LOWER(concept_name) IN UNNEST(@names)
ORDER BY concept_name
""", job_config=bigquery.QueryJobConfig(
    query_parameters=[bigquery.ArrayQueryParameter("names", "STRING", name_list)]
)).to_dataframe()

df_ing




Unnamed: 0,concept_id,concept_name,concept_class_id,domain_id,standard_concept,invalid_reason
0,1337860,dopamine,Ingredient,Drug,S,
1,1343916,epinephrine,Ingredient,Drug,S,
2,1321341,norepinephrine,Ingredient,Drug,S,
3,19119253,terlipressin,Ingredient,Drug,S,


### Build ingredient ID lists (edit if any are missing)

In [8]:
def pick_ids(names):
    s = set()
    for nm in names:
        match = df_ing[df_ing["concept_name"].str.lower() == nm]
        for cid in match["concept_id"].tolist():
            s.add(int(cid))
    return sorted(s)

ING = {
    "norepi": pick_ids(PRESSOR_INGREDIENT_NAMES["norepi"]),
    "epi": pick_ids(PRESSOR_INGREDIENT_NAMES["epi"]),
    "dopamine": pick_ids(PRESSOR_INGREDIENT_NAMES["dopamine"]),
    "terlipressin": pick_ids(PRESSOR_INGREDIENT_NAMES["terlipressin"]),
}
ING_ALL = sorted({cid for ids in ING.values() for cid in ids})

print("Ingredient IDs:", ING)
print("All ingredient IDs:", ING_ALL)


Ingredient IDs: {'norepi': [1321341], 'epi': [1343916], 'dopamine': [1337860], 'terlipressin': [19119253]}
All ingredient IDs: [1321341, 1337860, 1343916, 19119253]


## 7) Measurement coverage + deciles (sanity check)

In [9]:
from google.cloud import bigquery

# Pull measurement schema and find any unit-like column
meas_cols = client.query(f"""
SELECT column_name
FROM `{DATASET_PROJECT_ID}.{DATASET_ID}.INFORMATION_SCHEMA.COLUMNS`
WHERE table_name = 'measurement'
""").to_dataframe()["column_name"].tolist()

# Pick best available unit column (if any)
if "unit_concept_id" in meas_cols:
    unit_col = "unit_concept_id"
elif "unit_source_value" in meas_cols:
    unit_col = "unit_source_value"
elif "unit" in meas_cols:
    unit_col = "unit"
else:
    unit_col = None

print("Detected unit column:", unit_col)

meas_ids = list(MEAS.values())
top_units_sql = f", APPROX_TOP_COUNT({unit_col}, 5) AS top_units" if unit_col else ""

sql = f"""
SELECT
  measurement_concept_id,
  COUNT(*) AS n_total,
  COUNTIF(value_as_number IS NULL) AS n_null,
  MIN(SAFE_CAST(value_as_number AS FLOAT64)) AS min_val,
  APPROX_QUANTILES(SAFE_CAST(value_as_number AS FLOAT64), 10) AS deciles,
  MAX(SAFE_CAST(value_as_number AS FLOAT64)) AS max_val
  {top_units_sql}
FROM `{DATASET_PROJECT_ID}.{DATASET_ID}.measurement`
WHERE measurement_concept_id IN UNNEST(@meas_ids)
GROUP BY measurement_concept_id
ORDER BY n_total DESC
"""

df_stats = client.query(
    sql,
    job_config=bigquery.QueryJobConfig(
        query_parameters=[bigquery.ArrayQueryParameter("meas_ids", "INT64", meas_ids)]
    ),
).to_dataframe()

df_stats




Detected unit column: unit_source_value


Unnamed: 0,measurement_concept_id,n_total,n_null,min_val,deciles,max_val,top_units
0,21490852,17383466,15306,-3261332.0,"[-3261332.0, 57.0, 63.0, 67.0, 71.0, 75.0, 79....",6395321.0,"[{'value': 'mmHg', 'count': 9734443}, {'value'..."
1,3022875,10425042,939,-200.0,"[-200.0, 0.2, 4.9, 5.0, 5.0, 5.0, 6.0, 7.00000...",6540.6,"[{'value': '', 'count': 8018367}, {'value': 'c..."
2,21490851,7175006,4199,-32700.0,"[-32700.003160202417, 45.99999977109935, 50.0,...",8812609.0,"[{'value': '', 'count': 5094997}, {'value': 'm..."
3,42527140,1667756,89,-99.0,"[-99.0, 20.0, 39.0, 47.0, 49.0, 51.0, 53.0, 57...",795.0,"[{'value': '', 'count': 1660258}, {'value': 'c..."
4,21492241,1473652,2672,-90.0,"[-90.0, 63.0, 70.0, 75.0, 79.0, 84.0, 88.0, 94...",239810.0,"[{'value': 'mmHg', 'count': 1050362}, {'value'..."
5,21492240,121663,8,0.0,"[0.0, 48.000001831205225, 54.99999622313922, 5...",20265.0,"[{'value': 'mmHg', 'count': 119363}, {'value':..."


## 8) Ventilation episodes from clean PEEP (`pv_vent_episodes_v2`)

In [10]:
from google.cloud import bigquery

VENT_EPISODES_TABLE = f"`{PROJECT_ID}.{USER_DATASET}.{TABLE_PREFIX}_vent_episodes_v2`"
PEEP_IDS = [MEAS["peep_total"], MEAS["peep_actual"], MEAS["peep_ordered"]]

adult_filter = ""
if ADULT_ONLY:
    adult_filter = f"""
AND (EXTRACT(YEAR FROM s.visit_start_datetime) - s.year_of_birth) >= {ADULT_MIN_AGE}
"""

if HAS_VISIT_IN_MEAS:
    peep_src_cte = """
peep_src AS (
  SELECT
    person_id,
    visit_occurrence_id,
    measurement_datetime AS ts,
    SAFE_CAST(value_as_number AS FLOAT64) AS peep_val
  FROM `{DATASET_PROJECT_ID}.{DATASET_ID}.measurement`
  WHERE measurement_concept_id IN UNNEST(@peep_ids)
    AND SAFE_CAST(value_as_number AS FLOAT64) BETWEEN @peep_min AND @peep_max
)
""".format(DATASET_PROJECT_ID=DATASET_PROJECT_ID, DATASET_ID=DATASET_ID)
else:
    peep_src_cte = """
icu_stays AS (
  SELECT v.visit_occurrence_id, v.person_id, v.visit_start_datetime, v.visit_end_datetime, p.year_of_birth
  FROM `{DATASET_PROJECT_ID}.{DATASET_ID}.visit_occurrence` v
  LEFT JOIN `{DATASET_PROJECT_ID}.{DATASET_ID}.person` p
    ON p.person_id = v.person_id
),
peep_src AS (
  SELECT
    m.person_id,
    s.visit_occurrence_id,
    m.measurement_datetime AS ts,
    SAFE_CAST(m.value_as_number AS FLOAT64) AS peep_val
  FROM `{DATASET_PROJECT_ID}.{DATASET_ID}.measurement` m
  JOIN icu_stays s
    ON s.person_id = m.person_id
   AND m.measurement_datetime BETWEEN s.visit_start_datetime AND s.visit_end_datetime
  WHERE m.measurement_concept_id IN UNNEST(@peep_ids)
    AND SAFE_CAST(m.value_as_number AS FLOAT64) BETWEEN @peep_min AND @peep_max
  {adult_filter}
)
""".format(DATASET_PROJECT_ID=DATASET_PROJECT_ID, DATASET_ID=DATASET_ID, adult_filter=adult_filter)

vent_sql = f"""
CREATE OR REPLACE TABLE {VENT_EPISODES_TABLE} AS
WITH
{peep_src_cte},
peep_lag AS (
  SELECT
    person_id,
    visit_occurrence_id,
    ts,
    LAG(ts) OVER (PARTITION BY person_id, visit_occurrence_id ORDER BY ts) AS prev_ts
  FROM peep_src
),
flags AS (
  SELECT
    *,
    CASE
      WHEN prev_ts IS NULL THEN 1
      WHEN TIMESTAMP_DIFF(ts, prev_ts, HOUR) >= 24 THEN 1
      ELSE 0
    END AS new_episode
  FROM peep_lag
),
numbered AS (
  SELECT
    *,
    SUM(new_episode) OVER (PARTITION BY person_id, visit_occurrence_id ORDER BY ts) AS episode_number
  FROM flags
),
episodes AS (
  SELECT
    person_id,
    visit_occurrence_id AS icu_stay_id,
    episode_number AS imv_episode_id,
    MIN(ts) AS imv_start_time,
    MAX(ts) AS imv_end_time,
    TIMESTAMP_DIFF(MAX(ts), MIN(ts), MINUTE) / 60.0 AS ventilation_duration_hours
  FROM numbered
  GROUP BY person_id, icu_stay_id, imv_episode_id
)
SELECT * FROM episodes
WHERE ventilation_duration_hours > 0
  AND RAND() < @sample_rate
"""

client.query(
    vent_sql,
    job_config=bigquery.QueryJobConfig(
        query_parameters=[
            bigquery.ArrayQueryParameter("peep_ids", "INT64", PEEP_IDS),
            bigquery.ScalarQueryParameter("peep_min", "FLOAT64", PEEP_MIN),
            bigquery.ScalarQueryParameter("peep_max", "FLOAT64", PEEP_MAX),
            bigquery.ScalarQueryParameter("sample_rate", "FLOAT64", SAMPLE_RATE_EPISODES),
        ]
    ),
).result()

print("✅ Built:", VENT_EPISODES_TABLE)
client.query(f"SELECT * FROM {VENT_EPISODES_TABLE} ORDER BY ventilation_duration_hours DESC LIMIT 20").to_dataframe()


✅ Built: `getting-started-datathon-26.precision_vent.pv_vent_episodes_v2`




Unnamed: 0,person_id,icu_stay_id,imv_episode_id,imv_start_time,imv_end_time,ventilation_duration_hours
0,38624,59677,2,2010-02-13 07:41:00+00:00,2010-08-26 01:41:00+00:00,4650.0
1,15460,75753,1,2010-01-01 13:51:00+00:00,2010-07-01 01:51:00+00:00,4332.0
2,60588,59105,1,2017-01-01 00:25:00+00:00,2017-06-23 09:47:00+00:00,4161.366667
3,4335,44620,1,2002-01-01 00:38:00+00:00,2002-06-20 16:38:00+00:00,4096.0
4,55431,56676,2,2002-05-01 16:34:00+00:00,2002-08-18 05:34:00+00:00,2605.0
5,25107,60388,1,2017-01-01 00:11:00+00:00,2017-04-18 16:11:00+00:00,2584.0
6,62565,62650,1,2010-01-01 04:51:00+00:00,2010-04-17 20:01:00+00:00,2559.166667
7,43307,74113,1,2017-01-01 00:56:00+00:00,2017-04-15 20:56:00+00:00,2516.0
8,58263,49703,2,2002-01-03 05:44:00+00:00,2002-04-05 19:29:00+00:00,2221.75
9,57807,76624,1,2002-01-01 04:51:00+00:00,2002-04-01 22:13:00+00:00,2177.366667


## 9) Hourly features (`pv_hourly_features_v2`)

In [11]:
from google.cloud import bigquery

HOURLY_FEATURES_TABLE = f"`{PROJECT_ID}.{USER_DATASET}.{TABLE_PREFIX}_hourly_features_v2`"
NEEDED_MEAS_IDS = [
    MEAS["peep_total"], MEAS["peep_actual"], MEAS["peep_ordered"],
    MEAS["pmax"],
    MEAS["fio2_vent"], MEAS["fio2_inhaled"],
    MEAS["dbp_invasive"], MEAS["dbp_noninvasive"],
    MEAS["sbp_invasive"], MEAS["sbp_noninvasive"],
]

# --- optional unit guards (soft filters) ---
# These guards only exclude rows where a *non-empty* unit is clearly wrong.
# If unit is blank/null (common in Van Gogh), we keep the row.
unit_guard = ""
if MEAS_UNIT_COL == "unit_source_value":
    unit_guard = f"""
   AND (
        m.{MEAS_UNIT_COL} IS NULL OR m.{MEAS_UNIT_COL} = ''
        OR (m.measurement_concept_id IN (@sbp_invasive, @sbp_noninv, @dbp_invasive, @dbp_noninv)
            AND LOWER(m.{MEAS_UNIT_COL}) IN ('mmhg','mm hg','mm[hg]'))
        OR (m.measurement_concept_id IN (@peep_total, @peep_actual, @peep_ordered, @pmax)
            AND (LOWER(m.{MEAS_UNIT_COL}) LIKE '%h2o%' OR LOWER(m.{MEAS_UNIT_COL}) LIKE '%cm%'))
        OR (m.measurement_concept_id IN (@fio2_vent, @fio2_inhaled)
            AND (LOWER(m.{MEAS_UNIT_COL}) LIKE '%/%' OR LOWER(m.{MEAS_UNIT_COL}) LIKE '%fraction%' OR LOWER(m.{MEAS_UNIT_COL}) LIKE '%percent%' OR STRPOS(LOWER(m.{MEAS_UNIT_COL}), '%') > 0))
      )
"""

# --- measurement join: restrict to ICU stay when visit_occurrence_id is missing ---
if HAS_VISIT_IN_MEAS:
    meas_join_sql = f"""
  JOIN `{DATASET_PROJECT_ID}.{DATASET_ID}.measurement` m
    ON m.person_id = g.person_id
   AND m.visit_occurrence_id = g.icu_stay_id
"""
    meas_stay_time_guard = ""  # already tied to visit id
else:
    meas_join_sql = f"""
  JOIN `{DATASET_PROJECT_ID}.{DATASET_ID}.visit_occurrence` v
    ON v.visit_occurrence_id = g.icu_stay_id
   AND v.person_id = g.person_id
  JOIN `{DATASET_PROJECT_ID}.{DATASET_ID}.measurement` m
    ON m.person_id = g.person_id
   AND m.measurement_datetime BETWEEN v.visit_start_datetime AND v.visit_end_datetime
"""
    meas_stay_time_guard = ""  # baked into join

hourly_sql = f"""
CREATE OR REPLACE TABLE {HOURLY_FEATURES_TABLE} AS
WITH episodes AS (
  SELECT * FROM {VENT_EPISODES_TABLE}
),
grid AS (
  SELECT e.person_id, e.icu_stay_id, e.imv_episode_id, t AS hour_ts
  FROM episodes e,
  UNNEST(GENERATE_TIMESTAMP_ARRAY(
    TIMESTAMP_TRUNC(e.imv_start_time, HOUR),
    TIMESTAMP_TRUNC(e.imv_end_time, HOUR),
    INTERVAL 1 HOUR
  )) AS t
),
meas_hour AS (
  SELECT
    g.person_id, g.icu_stay_id, g.imv_episode_id, g.hour_ts,
    m.measurement_concept_id,
    ARRAY_AGG(SAFE_CAST(m.value_as_number AS FLOAT64) ORDER BY m.measurement_datetime DESC LIMIT 1)[OFFSET(0)] AS val
  FROM grid g
{meas_join_sql}
   AND m.measurement_concept_id IN UNNEST(@meas_ids)
   AND m.value_as_number IS NOT NULL
   AND m.measurement_datetime >= g.hour_ts
   AND m.measurement_datetime < TIMESTAMP_ADD(g.hour_ts, INTERVAL 1 HOUR)
{unit_guard}
  GROUP BY g.person_id, g.icu_stay_id, g.imv_episode_id, g.hour_ts, m.measurement_concept_id
),
pivoted AS (
  SELECT
    person_id, icu_stay_id, imv_episode_id, hour_ts,
    MAX(IF(measurement_concept_id=@peep_total,   val, NULL)) AS peep_total,
    MAX(IF(measurement_concept_id=@peep_actual,  val, NULL)) AS peep_actual,
    MAX(IF(measurement_concept_id=@peep_ordered, val, NULL)) AS peep_ordered,
    MAX(IF(measurement_concept_id=@pmax, val, NULL)) AS pmax,
    MAX(IF(measurement_concept_id=@fio2_vent,    val, NULL)) AS fio2_vent,
    MAX(IF(measurement_concept_id=@fio2_inhaled, val, NULL)) AS fio2_inhaled,
    MAX(IF(measurement_concept_id=@dbp_invasive, val, NULL)) AS dbp_invasive,
    MAX(IF(measurement_concept_id=@dbp_noninv,   val, NULL)) AS dbp_noninv,
    MAX(IF(measurement_concept_id=@sbp_invasive, val, NULL)) AS sbp_invasive,
    MAX(IF(measurement_concept_id=@sbp_noninv,   val, NULL)) AS sbp_noninv
  FROM meas_hour
  GROUP BY person_id, icu_stay_id, imv_episode_id, hour_ts
),
filled AS (
  SELECT
    *,
    LAST_VALUE(COALESCE(peep_total, peep_actual, peep_ordered) IGNORE NULLS) OVER w AS peep_raw,
    LAST_VALUE(pmax IGNORE NULLS) OVER w AS pmax_raw,
    LAST_VALUE(COALESCE(fio2_vent, fio2_inhaled) IGNORE NULLS) OVER w AS fio2_raw,
    LAST_VALUE(COALESCE(dbp_invasive, dbp_noninv) IGNORE NULLS) OVER w AS dbp_raw,
    LAST_VALUE(COALESCE(sbp_invasive, sbp_noninv) IGNORE NULLS) OVER w AS sbp_raw
  FROM pivoted
  WINDOW w AS (
    PARTITION BY person_id, icu_stay_id, imv_episode_id
    ORDER BY hour_ts
    ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
  )
),
cleaned AS (
  SELECT
    person_id, icu_stay_id, imv_episode_id, hour_ts,
    CASE WHEN peep_raw BETWEEN @peep_min AND @peep_max THEN peep_raw ELSE NULL END AS peep,
    CASE WHEN pmax_raw BETWEEN @pmax_min AND @pmax_max THEN pmax_raw ELSE NULL END AS pmax,
    CASE
      WHEN fio2_raw IS NULL THEN NULL
      WHEN fio2_raw > 1.5 THEN fio2_raw/100.0
      ELSE fio2_raw
    END AS fio2_pre,
    CASE
      WHEN fio2_raw IS NULL THEN NULL
      WHEN (CASE WHEN fio2_raw > 1.5 THEN fio2_raw/100.0 ELSE fio2_raw END) BETWEEN @fio2_min AND @fio2_max
        THEN (CASE WHEN fio2_raw > 1.5 THEN fio2_raw/100.0 ELSE fio2_raw END)
      ELSE NULL
    END AS fio2,
    CASE WHEN sbp_raw BETWEEN @sbp_min AND @sbp_max THEN sbp_raw ELSE NULL END AS sbp,
    CASE WHEN dbp_raw BETWEEN @dbp_min AND @dbp_max THEN dbp_raw ELSE NULL END AS dbp
  FROM filled
),
with_map AS (
  SELECT
    *,
    CASE WHEN sbp IS NOT NULL AND dbp IS NOT NULL THEN (sbp + 2.0*dbp)/3.0 ELSE NULL END AS map_calc
  FROM cleaned
)
SELECT
  *,
  CASE WHEN pmax IS NOT NULL AND peep IS NOT NULL THEN (pmax - peep) ELSE NULL END AS dp_dyn
FROM with_map
"""

client.query(
    hourly_sql,
    job_config=bigquery.QueryJobConfig(
        query_parameters=[
            bigquery.ArrayQueryParameter("meas_ids", "INT64", NEEDED_MEAS_IDS),

            bigquery.ScalarQueryParameter("peep_total", "INT64", MEAS["peep_total"]),
            bigquery.ScalarQueryParameter("peep_actual", "INT64", MEAS["peep_actual"]),
            bigquery.ScalarQueryParameter("peep_ordered", "INT64", MEAS["peep_ordered"]),
            bigquery.ScalarQueryParameter("pmax", "INT64", MEAS["pmax"]),
            bigquery.ScalarQueryParameter("fio2_vent", "INT64", MEAS["fio2_vent"]),
            bigquery.ScalarQueryParameter("fio2_inhaled", "INT64", MEAS["fio2_inhaled"]),
            bigquery.ScalarQueryParameter("dbp_invasive", "INT64", MEAS["dbp_invasive"]),
            bigquery.ScalarQueryParameter("dbp_noninv", "INT64", MEAS["dbp_noninvasive"]),
            bigquery.ScalarQueryParameter("sbp_invasive", "INT64", MEAS["sbp_invasive"]),
            bigquery.ScalarQueryParameter("sbp_noninv", "INT64", MEAS["sbp_noninvasive"]),

            bigquery.ScalarQueryParameter("peep_min", "FLOAT64", PEEP_MIN),
            bigquery.ScalarQueryParameter("peep_max", "FLOAT64", PEEP_MAX),
            bigquery.ScalarQueryParameter("pmax_min", "FLOAT64", PMAX_MIN),
            bigquery.ScalarQueryParameter("pmax_max", "FLOAT64", PMAX_MAX),
            bigquery.ScalarQueryParameter("fio2_min", "FLOAT64", FIO2_MIN),
            bigquery.ScalarQueryParameter("fio2_max", "FLOAT64", FIO2_MAX),
            bigquery.ScalarQueryParameter("sbp_min", "FLOAT64", SBP_MIN),
            bigquery.ScalarQueryParameter("sbp_max", "FLOAT64", SBP_MAX),
            bigquery.ScalarQueryParameter("dbp_min", "FLOAT64", DBP_MIN),
            bigquery.ScalarQueryParameter("dbp_max", "FLOAT64", DBP_MAX),
        ]
    )
).result()

print("✅ Built:", HOURLY_FEATURES_TABLE)
client.query(f"""
SELECT person_id, icu_stay_id, imv_episode_id, hour_ts, peep, pmax, dp_dyn, fio2, sbp, dbp, map_calc
FROM {HOURLY_FEATURES_TABLE}
ORDER BY hour_ts DESC
LIMIT 50
""").to_dataframe()


✅ Built: `getting-started-datathon-26.precision_vent.pv_hourly_features_v2`




Unnamed: 0,person_id,icu_stay_id,imv_episode_id,hour_ts,peep,pmax,dp_dyn,fio2,sbp,dbp,map_calc
0,57957,32081,1,2023-09-04 01:00:00+00:00,5.0,,,,93.0,59.0,70.333333
1,57957,32081,1,2023-09-04 00:00:00+00:00,5.0,,,,100.0,63.0,75.333333
2,57957,32081,1,2023-09-03 23:00:00+00:00,5.0,,,,112.0,73.0,86.0
3,57957,32081,1,2023-09-03 22:00:00+00:00,5.0,,,,100.0,61.0,74.0
4,57957,32081,1,2023-09-03 21:00:00+00:00,5.0,,,,97.0,62.0,73.666667
5,57957,32081,1,2023-09-03 20:00:00+00:00,5.0,,,,100.0,65.0,76.666667
6,57957,32081,1,2023-09-03 19:00:00+00:00,5.0,,,,111.0,75.0,87.0
7,57957,32081,1,2023-09-03 18:00:00+00:00,5.0,,,,98.0,68.0,78.0
8,57957,32081,1,2023-09-03 17:00:00+00:00,5.0,,,,98.0,65.0,76.0
9,57957,32081,1,2023-09-03 16:00:00+00:00,5.0,,,,101.0,65.0,77.0


## 10) Hourly pressor flags (`pv_hourly_pressors_v2`)

In [19]:
from google.cloud import bigquery

# ---------- 0) Detect whether drug_exposure has visit_occurrence_id ----------
drug_cols = client.query(f"""
SELECT column_name
FROM `{DATASET_PROJECT_ID}.{DATASET_ID}.INFORMATION_SCHEMA.COLUMNS`
WHERE table_name = 'drug_exposure'
""").to_dataframe()["column_name"].tolist()

HAS_VISIT_IN_DRUG = "visit_occurrence_id" in set(drug_cols)
print("HAS_VISIT_IN_DRUG?", HAS_VISIT_IN_DRUG)

# ---------- 1) Table name ----------
HOURLY_PRESSORS_TABLE = f"`{PROJECT_ID}.{USER_DATASET}.{TABLE_PREFIX}_hourly_pressors_v2`"

# ---------- 2) Join logic ----------
rx_visit_select = "d.visit_occurrence_id AS icu_stay_id," if HAS_VISIT_IN_DRUG else ""
rx_visit_join = "r.icu_stay_id = g.icu_stay_id" if HAS_VISIT_IN_DRUG else "TRUE"

# If we *can't* join by visit id, restrict drug exposures to the ICU stay window
stay_guard_join = ""
stay_guard_where = ""
if not HAS_VISIT_IN_DRUG:
    stay_guard_join = f"""
JOIN `{DATASET_PROJECT_ID}.{DATASET_ID}.visit_occurrence` v
  ON v.visit_occurrence_id = g.icu_stay_id
 AND v.person_id = g.person_id
"""
    # IMPORTANT: keep this in the WHERE-ish area (after LEFT JOIN) so it still applies
    stay_guard_where = """
 AND r.start_ts < v.visit_end_datetime
 AND r.end_ts   >= v.visit_start_datetime
"""

# ---------- 3) SQL (NOTE: drug_desc not 'desc') ----------
pressor_sql = f"""
CREATE OR REPLACE TABLE {HOURLY_PRESSORS_TABLE} AS
WITH grid AS (
  SELECT person_id, icu_stay_id, imv_episode_id, hour_ts
  FROM {HOURLY_FEATURES_TABLE}
),
drug_desc AS (
  SELECT ancestor_concept_id, descendant_concept_id AS drug_concept_id
  FROM `{DATASET_PROJECT_ID}.{DATASET_ID}.concept_ancestor`
  WHERE ancestor_concept_id IN UNNEST(@ing_all)
),
rx AS (
  SELECT
    d.person_id,
    {rx_visit_select}
    d.drug_concept_id,
    d.drug_exposure_start_datetime AS start_ts,
    COALESCE(d.drug_exposure_end_datetime, d.drug_exposure_start_datetime) AS end_ts
  FROM `{DATASET_PROJECT_ID}.{DATASET_ID}.drug_exposure` d
  WHERE d.drug_concept_id IN (SELECT drug_concept_id FROM drug_desc)
)
SELECT
  g.person_id, g.icu_stay_id, g.imv_episode_id, g.hour_ts,

  MAX(CASE WHEN r.drug_concept_id IN (
      SELECT drug_concept_id FROM drug_desc WHERE ancestor_concept_id IN UNNEST(@ing_norepi)
  ) THEN 1 ELSE 0 END) AS norepi_on,

  MAX(CASE WHEN r.drug_concept_id IN (
      SELECT drug_concept_id FROM drug_desc WHERE ancestor_concept_id IN UNNEST(@ing_epi)
  ) THEN 1 ELSE 0 END) AS epi_on,

  MAX(CASE WHEN r.drug_concept_id IN (
      SELECT drug_concept_id FROM drug_desc WHERE ancestor_concept_id IN UNNEST(@ing_dopamine)
  ) THEN 1 ELSE 0 END) AS dopamine_on,

  MAX(CASE WHEN r.drug_concept_id IN (
      SELECT drug_concept_id FROM drug_desc WHERE ancestor_concept_id IN UNNEST(@ing_terli)
  ) THEN 1 ELSE 0 END) AS terlipressin_on,

  MAX(CASE WHEN r.drug_concept_id IS NOT NULL THEN 1 ELSE 0 END) AS any_pressor_on

FROM grid g
{stay_guard_join}
LEFT JOIN rx r
  ON r.person_id = g.person_id
 AND ({rx_visit_join})
 AND r.start_ts < TIMESTAMP_ADD(g.hour_ts, INTERVAL 1 HOUR)
 AND r.end_ts >= g.hour_ts
{stay_guard_where}
GROUP BY g.person_id, g.icu_stay_id, g.imv_episode_id, g.hour_ts
"""

# ---------- 4) Execute ----------
job = client.query(
    pressor_sql,
    job_config=bigquery.QueryJobConfig(
        query_parameters=[
            bigquery.ArrayQueryParameter("ing_all", "INT64", ING_ALL),
            bigquery.ArrayQueryParameter("ing_norepi", "INT64", ING["norepi"]),
            bigquery.ArrayQueryParameter("ing_epi", "INT64", ING["epi"]),
            bigquery.ArrayQueryParameter("ing_dopamine", "INT64", ING["dopamine"]),
            bigquery.ArrayQueryParameter("ing_terli", "INT64", ING["terlipressin"]),
        ]
    ),
)
job.result()
print("✅ Built:", HOURLY_PRESSORS_TABLE)

# ---------- 5) Verify table exists ----------
check = client.query(f"""
SELECT table_name
FROM `{PROJECT_ID}.{USER_DATASET}.INFORMATION_SCHEMA.TABLES`
WHERE table_name = '{TABLE_PREFIX}_hourly_pressors_v2'
""").to_dataframe()
print(check)

# ---------- 6) Quick sanity summary ----------
client.query(f"""
SELECT any_pressor_on, COUNT(*) AS n
FROM {HOURLY_PRESSORS_TABLE}
GROUP BY any_pressor_on
ORDER BY any_pressor_on
""").to_dataframe()




HAS_VISIT_IN_DRUG? False
✅ Built: `getting-started-datathon-26.precision_vent.pv_hourly_pressors_v2`
              table_name
0  pv_hourly_pressors_v2




Unnamed: 0,any_pressor_on,n
0,0,2015860
1,1,783474


In [15]:
pressor_sql = f"""
CREATE OR REPLACE TABLE {HOURLY_PRESSORS_TABLE} AS
WITH grid AS (
  SELECT person_id, icu_stay_id, imv_episode_id, hour_ts
  FROM {HOURLY_FEATURES_TABLE}
),
drug_desc AS (
  SELECT ancestor_concept_id, descendant_concept_id AS drug_concept_id
  FROM `{DATASET_PROJECT_ID}.{DATASET_ID}.concept_ancestor`
  WHERE ancestor_concept_id IN UNNEST(@ing_all)
),
rx AS (
  SELECT
    d.person_id,
    {rx_visit_select}
    d.drug_concept_id,
    d.drug_exposure_start_datetime AS start_ts,
    COALESCE(d.drug_exposure_end_datetime, d.drug_exposure_start_datetime) AS end_ts
  FROM `{DATASET_PROJECT_ID}.{DATASET_ID}.drug_exposure` d
  WHERE d.drug_concept_id IN (SELECT drug_concept_id FROM drug_desc)
)
SELECT
  g.person_id, g.icu_stay_id, g.imv_episode_id, g.hour_ts,

  MAX(CASE WHEN r.drug_concept_id IN (
      SELECT drug_concept_id FROM drug_desc WHERE ancestor_concept_id IN UNNEST(@ing_norepi)
  ) THEN 1 ELSE 0 END) AS norepi_on,

  MAX(CASE WHEN r.drug_concept_id IN (
      SELECT drug_concept_id FROM drug_desc WHERE ancestor_concept_id IN UNNEST(@ing_epi)
  ) THEN 1 ELSE 0 END) AS epi_on,

  MAX(CASE WHEN r.drug_concept_id IN (
      SELECT drug_concept_id FROM drug_desc WHERE ancestor_concept_id IN UNNEST(@ing_dopamine)
  ) THEN 1 ELSE 0 END) AS dopamine_on,

  MAX(CASE WHEN r.drug_concept_id IN (
      SELECT drug_concept_id FROM drug_desc WHERE ancestor_concept_id IN UNNEST(@ing_terli)
  ) THEN 1 ELSE 0 END) AS terlipressin_on,

  MAX(CASE WHEN r.drug_concept_id IS NOT NULL THEN 1 ELSE 0 END) AS any_pressor_on

FROM grid g
{stay_guard_join}
LEFT JOIN rx r
  ON r.person_id = g.person_id
 AND ({rx_visit_join})
 AND r.start_ts < TIMESTAMP_ADD(g.hour_ts, INTERVAL 1 HOUR)
 AND r.end_ts >= g.hour_ts
{stay_guard_where}
GROUP BY g.person_id, g.icu_stay_id, g.imv_episode_id, g.hour_ts
"""


In [18]:
from google.cloud import bigquery

# ---------- 0) Detect whether drug_exposure has visit_occurrence_id ----------
drug_cols = client.query(f"""
SELECT column_name
FROM `{DATASET_PROJECT_ID}.{DATASET_ID}.INFORMATION_SCHEMA.COLUMNS`
WHERE table_name = 'drug_exposure'
""").to_dataframe()["column_name"].tolist()

HAS_VISIT_IN_DRUG = "visit_occurrence_id" in set(drug_cols)
print("HAS_VISIT_IN_DRUG?", HAS_VISIT_IN_DRUG)

# ---------- 1) Table name ----------
HOURLY_PRESSORS_TABLE = f"`{PROJECT_ID}.{USER_DATASET}.{TABLE_PREFIX}_hourly_pressors_v2`"

# ---------- 2) Join logic ----------
rx_visit_select = "d.visit_occurrence_id AS icu_stay_id," if HAS_VISIT_IN_DRUG else ""
rx_visit_join = "r.icu_stay_id = g.icu_stay_id" if HAS_VISIT_IN_DRUG else "TRUE"

# If we *can't* join by visit id, restrict drug exposures to the ICU stay window
stay_guard_join = ""
stay_guard_where = ""
if not HAS_VISIT_IN_DRUG:
    stay_guard_join = f"""
JOIN `{DATASET_PROJECT_ID}.{DATASET_ID}.visit_occurrence` v
  ON v.visit_occurrence_id = g.icu_stay_id
 AND v.person_id = g.person_id
"""
    # IMPORTANT: keep this in the WHERE-ish area (after LEFT JOIN) so it still applies
    stay_guard_where = """
 AND r.start_ts < v.visit_end_datetime
 AND r.end_ts   >= v.visit_start_datetime
"""

# ---------- 3) SQL (NOTE: drug_desc not 'desc') ----------
pressor_sql = f"""
CREATE OR REPLACE TABLE {HOURLY_PRESSORS_TABLE} AS
WITH grid AS (
  SELECT person_id, icu_stay_id, imv_episode_id, hour_ts
  FROM {HOURLY_FEATURES_TABLE}
),
drug_desc AS (
  SELECT ancestor_concept_id, descendant_concept_id AS drug_concept_id
  FROM `{DATASET_PROJECT_ID}.{DATASET_ID}.concept_ancestor`
  WHERE ancestor_concept_id IN UNNEST(@ing_all)
),
rx AS (
  SELECT
    d.person_id,
    {rx_visit_select}
    d.drug_concept_id,
    d.drug_exposure_start_datetime AS start_ts,
    COALESCE(d.drug_exposure_end_datetime, d.drug_exposure_start_datetime) AS end_ts
  FROM `{DATASET_PROJECT_ID}.{DATASET_ID}.drug_exposure` d
  WHERE d.drug_concept_id IN (SELECT drug_concept_id FROM drug_desc)
)
SELECT
  g.person_id, g.icu_stay_id, g.imv_episode_id, g.hour_ts,

  MAX(CASE WHEN r.drug_concept_id IN (
      SELECT drug_concept_id FROM drug_desc WHERE ancestor_concept_id IN UNNEST(@ing_norepi)
  ) THEN 1 ELSE 0 END) AS norepi_on,

  MAX(CASE WHEN r.drug_concept_id IN (
      SELECT drug_concept_id FROM drug_desc WHERE ancestor_concept_id IN UNNEST(@ing_epi)
  ) THEN 1 ELSE 0 END) AS epi_on,

  MAX(CASE WHEN r.drug_concept_id IN (
      SELECT drug_concept_id FROM drug_desc WHERE ancestor_concept_id IN UNNEST(@ing_dopamine)
  ) THEN 1 ELSE 0 END) AS dopamine_on,

  MAX(CASE WHEN r.drug_concept_id IN (
      SELECT drug_concept_id FROM drug_desc WHERE ancestor_concept_id IN UNNEST(@ing_terli)
  ) THEN 1 ELSE 0 END) AS terlipressin_on,

  MAX(CASE WHEN r.drug_concept_id IS NOT NULL THEN 1 ELSE 0 END) AS any_pressor_on

FROM grid g
{stay_guard_join}
LEFT JOIN rx r
  ON r.person_id = g.person_id
 AND ({rx_visit_join})
 AND r.start_ts < TIMESTAMP_ADD(g.hour_ts, INTERVAL 1 HOUR)
 AND r.end_ts >= g.hour_ts
{stay_guard_where}
GROUP BY g.person_id, g.icu_stay_id, g.imv_episode_id, g.hour_ts
"""

# ---------- 4) Execute ----------
job = client.query(
    pressor_sql,
    job_config=bigquery.QueryJobConfig(
        query_parameters=[
            bigquery.ArrayQueryParameter("ing_all", "INT64", ING_ALL),
            bigquery.ArrayQueryParameter("ing_norepi", "INT64", ING["norepi"]),
            bigquery.ArrayQueryParameter("ing_epi", "INT64", ING["epi"]),
            bigquery.ArrayQueryParameter("ing_dopamine", "INT64", ING["dopamine"]),
            bigquery.ArrayQueryParameter("ing_terli", "INT64", ING["terlipressin"]),
        ]
    ),
)
job.result()
print("✅ Built:", HOURLY_PRESSORS_TABLE)

# ---------- 5) Verify table exists ----------
check = client.query(f"""
SELECT table_name
FROM `{PROJECT_ID}.{USER_DATASET}.INFORMATION_SCHEMA.TABLES`
WHERE table_name = '{TABLE_PREFIX}_hourly_pressors_v2'
""").to_dataframe()
print(check)

# ---------- 6) Quick sanity summary ----------
client.query(f"""
SELECT any_pressor_on, COUNT(*) AS n
FROM {HOURLY_PRESSORS_TABLE}
GROUP BY any_pressor_on
ORDER BY any_pressor_on
""").to_dataframe()


HAS_VISIT_IN_DRUG? False
✅ Built: `getting-started-datathon-26.precision_vent.pv_hourly_pressors_v2`
              table_name
0  pv_hourly_pressors_v2




Unnamed: 0,any_pressor_on,n
0,0,2015860
1,1,783474


In [20]:
client.query("""
SELECT table_name
FROM `getting-started-datathon-26.precision_vent.INFORMATION_SCHEMA.TABLES`
WHERE table_name = 'pv_hourly_pressors_v2'
""").to_dataframe()




Unnamed: 0,table_name
0,pv_hourly_pressors_v2


## 11) Final RL dataset (`pv_learning_dataset_v2`)

In [21]:
LEARNING_TABLE = f"`{PROJECT_ID}.{USER_DATASET}.{TABLE_PREFIX}_learning_dataset_v2`"

client.query(f"""
CREATE OR REPLACE TABLE {LEARNING_TABLE} AS
WITH f AS (
  SELECT person_id, icu_stay_id, imv_episode_id, hour_ts,
         fio2, peep, pmax, dp_dyn, sbp, dbp, map_calc
  FROM {HOURLY_FEATURES_TABLE}
),
p AS (
  SELECT * FROM {HOURLY_PRESSORS_TABLE}
),
j AS (
  SELECT f.*,
         p.norepi_on, p.epi_on, p.dopamine_on, p.terlipressin_on, p.any_pressor_on
  FROM f
  LEFT JOIN p USING (person_id, icu_stay_id, imv_episode_id, hour_ts)
),
seq AS (
  SELECT
    *,
    peep AS action_peep,
    (peep - LAG(peep) OVER w) AS delta_peep,
    LEAD(dp_dyn) OVER w AS dp_dyn_next,
    LEAD(pmax) OVER w AS pmax_next,
    LEAD(fio2) OVER w AS fio2_next,
    LEAD(map_calc) OVER w AS map_next,
    LEAD(any_pressor_on) OVER w AS pressor_next
  FROM j
  WINDOW w AS (PARTITION BY person_id, icu_stay_id, imv_episode_id ORDER BY hour_ts)
)
SELECT * FROM seq
WHERE action_peep IS NOT NULL
""").result()

print("✅ Built:", LEARNING_TABLE)
client.query(f"""
SELECT person_id, icu_stay_id, imv_episode_id, hour_ts,
       action_peep, delta_peep, dp_dyn, dp_dyn_next,
       fio2, fio2_next, map_calc, map_next,
       any_pressor_on, pressor_next
FROM {LEARNING_TABLE}
ORDER BY hour_ts DESC
LIMIT 100
""").to_dataframe()


✅ Built: `getting-started-datathon-26.precision_vent.pv_learning_dataset_v2`




Unnamed: 0,person_id,icu_stay_id,imv_episode_id,hour_ts,action_peep,delta_peep,dp_dyn,dp_dyn_next,fio2,fio2_next,map_calc,map_next,any_pressor_on,pressor_next
0,57957,32081,1,2023-09-04 01:00:00+00:00,5.0,0.0,,,,,70.333333,,0,
1,57957,32081,1,2023-09-04 00:00:00+00:00,5.0,0.0,,,,,75.333333,70.333333,0,0
2,57957,32081,1,2023-09-03 23:00:00+00:00,5.0,0.0,,,,,86.000000,75.333333,0,0
3,57957,32081,1,2023-09-03 22:00:00+00:00,5.0,0.0,,,,,74.000000,86.000000,0,0
4,57957,32081,1,2023-09-03 21:00:00+00:00,5.0,0.0,,,,,73.666667,74.000000,0,0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
95,57957,49409,1,2023-08-28 16:00:00+00:00,9.0,0.0,,,,,86.333333,69.666667,0,0
96,57957,49409,1,2023-08-28 15:00:00+00:00,9.0,1.0,,,,,78.333333,86.333333,0,0
97,57957,49409,1,2023-08-28 14:00:00+00:00,8.0,1.0,,,,,78.333333,78.333333,0,0
98,57957,49409,1,2023-08-28 13:00:00+00:00,7.0,0.0,,,,,67.666667,78.333333,0,0


## 12) QA summary

In [22]:
client.query("""
SELECT table_name
FROM `getting-started-datathon-26.precision_vent.INFORMATION_SCHEMA.TABLES`
ORDER BY table_name
""").to_dataframe()




Unnamed: 0,table_name
0,pv_concepts_checked
1,pv_hourly_features_v1
2,pv_hourly_features_v2
3,pv_hourly_pressors_v1
4,pv_hourly_pressors_v2
5,pv_learning_dataset_v1
6,pv_learning_dataset_v2
7,pv_vent_episodes_v1
8,pv_vent_episodes_v2


In [23]:
qa = client.query(f"""
SELECT
  COUNT(*) AS n_rows,
  COUNTIF(action_peep IS NULL) AS n_action_null,
  COUNTIF(dp_dyn IS NULL) AS n_dp_null,
  COUNTIF(dp_dyn_next IS NULL) AS n_dp_next_null,
  COUNTIF(pmax IS NULL) AS n_pmax_null,
  COUNTIF(fio2 IS NULL) AS n_fio2_null,
  COUNTIF(map_calc IS NULL) AS n_map_null,
  APPROX_QUANTILES(action_peep, 10) AS peep_deciles,
  APPROX_QUANTILES(dp_dyn, 10) AS dp_deciles,
  APPROX_QUANTILES(pmax, 10) AS pmax_deciles,
  APPROX_QUANTILES(fio2, 10) AS fio2_deciles,
  APPROX_QUANTILES(map_calc, 10) AS map_deciles
FROM {LEARNING_TABLE}
""").to_dataframe()
qa




Unnamed: 0,n_rows,n_action_null,n_dp_null,n_dp_next_null,n_pmax_null,n_fio2_null,n_map_null,peep_deciles,dp_deciles,pmax_deciles,fio2_deciles,map_deciles
0,2781019,0,2781019,2781019,2781019,2781019,356333,"[1.0, 5.0, 5.0, 5.0, 6.0, 8.0, 8.0, 10.0, 10.0...",[],[],[],"[26.666666666666668, 53.333333333333336, 57.33..."


## 13) Export a sample (optional)

In [24]:
sample_df = client.query(f"""
SELECT *
FROM {LEARNING_TABLE}
WHERE dp_dyn_next IS NOT NULL
ORDER BY RAND()
LIMIT 200000
""").to_dataframe()

out_csv = "pv_learning_dataset_v2_sample_200k.csv"
sample_df.to_csv(out_csv, index=False)
print("✅ wrote", out_csv)




✅ wrote pv_learning_dataset_v2_sample_200k.csv
