In the below tutorial, we will demonstrate how you can use the
[ACES](https://eventstreamaces.readthedocs.io/en/latest/) package to extract a prediction task on the MIMIC-IV
Demo dataset!

## Tutorial Set-up

In [1]:
import os
from pathlib import Path

DEMO_DIR = Path(os.getenv("MEDS_DEMO_DIR", "./demo_output"))

### MIMIC-IV Demo Dataset

You can use the [MIMIC_IV_MEDS](https://github.com/Medical-Event-Data-Standard/MIMIC_IV_MEDS) package to easily download and automatically transform the MIMIC-IV-Demo dataset into MEDS:

In [2]:
OUTPUT_DIR = DEMO_DIR / "meds/"
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)

!MEDS_extract-MIMIC_IV root_output_dir=$OUTPUT_DIR do_demo=True do_copy=True

[2025-03-25 02:14:37,416][MIMIC_IV_MEDS.__main__][INFO] - Downloading demo data.
[2025-03-25 02:14:38,293][MIMIC_IV_MEDS.download][INFO] - Downloaded: demo_output/meds/raw_input/hosp/admissions.csv.gz
[2025-03-25 02:14:39,570][MIMIC_IV_MEDS.download][INFO] - Downloaded: demo_output/meds/raw_input/hosp/d_hcpcs.csv.gz
[2025-03-25 02:14:45,225][MIMIC_IV_MEDS.download][INFO] - Downloaded: demo_output/meds/raw_input/hosp/d_icd_diagnoses.csv.gz
[2025-03-25 02:14:48,035][MIMIC_IV_MEDS.download][INFO] - Downloaded: demo_output/meds/raw_input/hosp/d_icd_procedures.csv.gz
[2025-03-25 02:14:48,165][MIMIC_IV_MEDS.download][INFO] - Downloaded: demo_output/meds/raw_input/hosp/d_labitems.csv.gz
[2025-03-25 02:14:48,340][MIMIC_IV_MEDS.download][INFO] - Downloaded: demo_output/meds/raw_input/hosp/diagnoses_icd.csv.gz
[2025-03-25 02:14:48,461][MIMIC_IV_MEDS.download][INFO] - Downloaded: demo_output/meds/raw_input/hosp/drgcodes.csv.gz
[2025-03-25 02:14:50,354][MIMIC_IV_MEDS.download][INFO] - Downloaded: 

## Extracting the Prediction Task

### In-ICU Mortality

For this tutorial, we'll extract a cohort for a basic in-ICU mortality prediction task. Let's first define our
task parameters.

Our goal is to predict the mortality outcome for a patient's entire ICU admission using historical patient 
data plus the initial 24 hours of data after the patient was first admitted into the ICU. 

Suppose we only want to consider patients whose ICU admission was longer than 48 hours. As such, patients who 
died or are discharged from the ICU within 48 hours of being admitted are excluded.

**Note:** This task is distinct from in-***hospital*** mortality,  ***30-day*** mortality, or ***imminent*** 
mortality.

We can visualize this task as a series of inter-related windows using the below timeline:

![In-hospital Mortality](../../static/img/icu_mortality.svg)


- The "blue" region represents the "**input**" data window. All historical patient data up to and including the first 24 hours of a patient's ICU admission will serve as input into a downstream model.
- The "red" bar represents the ICU admission, and would "**trigger**" the start of our prediction task.
- The "yellow" region represents the first 48 hours of a patient's ICU admission, and we stipulate a "**gap**" whereby the patient must not have died or been discharged during that period.
- The "magenta" region represents our prediction "target" window which can be of varying length, as it ends whenever a patient dies or is discharged.

### ACES Configuration File

ACES uses a configuration language to capture cohort and task definitions. Details about this configuration language is available in the [ACES Documentation](https://eventstreamaces.readthedocs.io/en/latest/readme.html#task-configuration-file). 

Let's walkthrough the construction of an ACES configuration file. At a minimum, the most basic configuration file would contain the `predicates`, `trigger`, and `windows` sections. 

#### Predicates

To capture our task definition, we must first define at least three simple concepts, or ACES plain [predicates](https://eventstreamaces.readthedocs.io/en/latest/notebooks/predicates.html), for our demo dataset.

For starters, as we are specifically interested in mortality "in the ICU", an `ICU admission` and a `ICU discharge` predicate would be needed to represent events where patients are officially admitted to the ICU and where patients are officially discharged. We also need the `death` predicate to capture death events so we can accurately capture the mortality component. 

To define predicates, we would need to find how these concepts are represented in our dataset. For MIMIC-IV-Demo in MEDS format, these concepts can be found using simple regular expressions:

```yaml
predicates:
  icu_admission:
    code: { regex: "^ICU_ADMISSION//.*" }
  icu_discharge:
    code: { regex: "^ICU_DISCHARGE//.*" }
  death:
    code: { regex: "MEDS_DEATH.*" }
```

Since patients can either die or be discharged from the ICU, we may also create a more complex concept, or an ACES derived predicate, by joining the above simple concepts using an `OR` relationship:
```yaml
predicates:
  discharge_or_death:
    expr: or(icu_discharge, death)
```

#### Trigger

As mentioned above, a patient's admission into the ICU triggers our prediction task. A designated field defines this ACES [trigger](https://eventstreamaces.readthedocs.io/en/latest/readme.html#trigger-event), and its value must always be one of the specified predicates. For our task, this predicate would be `icu_admission`:

```yaml
trigger: icu_admission
```


#### Windows

The windows section contains the remaining three windows we defined previously - `input`, `gap`, and `target`.

For details on the configuration language syntax for windows, please see the [documentation](https://eventstreamaces.readthedocs.io/en/latest/readme.html#windows). Briefly, certain fields are present in all windows:

```yaml
windows:
  window_name:
    start:
    end:
    start_inclusive:
    end_inclusive:
```
However, some windows also have optional parameters, such as the `has` field, which captures predicate count criteria for that particular window:
```yaml
windows:
  window_name:
    ...
    has:
      predicate_a: (min, max)
      predicate_b: (min, max)
      ...
```

For our in-ICU mortality prediction task, we can define:

1. `input`, which begins at the start of a patient's record (ie., `null`), and ends 24 hours past `trigger` (ie., `icu_admission`). As we'd like to include the events specified at both the start and end of `input`, if present, we can set both `start_inclusive` and `end_inclusive` as `True`. 

    **Note**: Since we'd like our model to make a prediction at the end of `input`, we can set `index_timestamp` to be `end`, which corresponds to the timestamp of `trigger + 24h`.

    ```yaml
    windows:
      input:
        start: null
        end: trigger + 24h
        start_inclusive: True
        end_inclusive: True
        index_timestamp: end
    ```

2. `gap`, which also begins at `trigger`, and ends 48 hours after. As we have included the left boundary event in `trigger` (ie., `icu_admission`), it would be reasonable to **not** include it again as it should not play a role in `gap`. As such, we set `start_inclusive` to `False`. As we'd like our ICU admission to be at least 48 hours long, we can place constraints specifying that there cannot be any additional `icu_admission`, `icu_discharge`, or `death` in `gap`.

    ```yaml
    windows:
      gap:
        start: trigger
        end: start + 48h
        start_inclusive: False
        end_inclusive: True
        has:
          icu_admission: (None, 0)
          discharge_or_death: (None, 0)
    ```

3. `target`, which begins at the end of `gap`, and ends at the next discharge or death event (ie., `discharge_or_death` predicate). We can use this arrow notation which ACES recognizes as event references (ie., `->` and `<-`; see [Time Range Fields](https://eventstreamaces.readthedocs.io/en/latest/technical.html#time-range-fields)). In our case, we end `target` at the next `discharge_or_death`. Similarly, as we included the event at the end of `gap`, if any, already in `gap`, we can set `start_inclusive` to `False`. 

    **Note**: Since we'd like to make a binary mortality prediction, we can extract the `death` predicate as a label from `target`, by specifying the `label` field to be `death`.

    ```yaml
    windows:
      target:
        start: gap.end
        end: start -> discharge_or_death
        start_inclusive: False
        end_inclusive: True
        label: death
    ```

Now, we can put all the components together to form a complete ACES configuration file that captures everything we need for our cohort and task:

```yaml
predicates:
  icu_admission:
    code: { regex: "^ICU_ADMISSION//.*" }
  icu_discharge:
    code: { regex: "^ICU_DISCHARGE//.*" }
  death:
    code: { regex: "MEDS_DEATH.*" }
  discharge_or_death:
    expr: or(icu_discharge, death)

trigger: icu_admission

windows:
  input:
    start: null
    end: trigger + 24h
    start_inclusive: True
    end_inclusive: True
    index_timestamp: end
  gap:
    start: trigger
    end: start + 48h
    start_inclusive: False
    end_inclusive: True
    has:
      icu_admission: (None, 0)
      discharge_or_death: (None, 0)
  target:
    start: gap.end
    end: start -> discharge_or_death
    start_inclusive: False
    end_inclusive: True
    label: death
```

### End-to-End Extraction using the ACES CLI

With the configuration file ready, extracting the cohort from our demo dataset is extremely straightforward. All we need to do is run a simple command-line tool.

In [3]:
in_icu = """
predicates:
  icu_admission:
    code: { regex: "^ICU_ADMISSION//.*" }
  icu_discharge:
    code: { regex: "^ICU_DISCHARGE//.*" }
  death:
    code: { regex: "MEDS_DEATH.*" }
  discharge_or_death:
    expr: or(icu_discharge, death)

trigger: icu_admission

windows:
  input:
    start: null
    end: trigger + 24h
    start_inclusive: True
    end_inclusive: True
    index_timestamp: end
  gap:
    start: trigger
    end: start + 48h
    start_inclusive: False
    end_inclusive: True
    has:
      icu_admission: (None, 0)
      discharge_or_death: (None, 0)
  target:
    start: gap.end
    end: start -> discharge_or_death
    start_inclusive: False
    end_inclusive: True
    label: death
"""

Let's save the final configuration file in a YAML file in our demo directory:

In [4]:
COHORT_NAME = "in_icu"
COHORT_DIR = DEMO_DIR / "cohorts"
COHORT_DIR.mkdir(parents=True, exist_ok=True)

with open(COHORT_DIR / f"{COHORT_NAME}.yaml", "w") as f:
    f.write(in_icu)

We can now set some variables for CLI parameters. For more information on CLI arguments, please see the [documentation](https://eventstreamaces.readthedocs.io/en/latest/usage.html#detailed-instructions), including [instructions](https://eventstreamaces.readthedocs.io/en/latest/usage.html#multiple-shards) for using `expand_shards` for simultaneous extraction of cohorts over multiple MEDS shards.

In [5]:
DATA_STANDARD = "meds"
DATA_ROOT = OUTPUT_DIR / "MEDS_cohort/data/"
DATA_SHARD = "$(expand_shards train/1 tuning/1 held_out/1)"

In [6]:
!aces-cli \
    cohort_name=$COHORT_NAME \
    cohort_dir=$COHORT_DIR \
    data=sharded \
    data.standard=$DATA_STANDARD \
    data.root=$DATA_ROOT \
    data.shard=$DATA_SHARD -m

[2025-03-25 02:15:42,893][HYDRA] Launching 3 jobs locally
[2025-03-25 02:15:42,893][HYDRA] 	#0 : cohort_name=in_icu cohort_dir=demo_output/cohorts data=sharded data.standard=meds data.root=demo_output/meds/MEDS_cohort/data data.shard=train/0
[32m2025-03-25 02:15:43.313[0m | [1mINFO    [0m | [36maces.__main__[0m:[36mmain[0m:[36m149[0m - [1mLoading config from 'demo_output/cohorts/in_icu.yaml'[0m
[32m2025-03-25 02:15:43.318[0m | [1mINFO    [0m | [36maces.config[0m:[36mload[0m:[36m1341[0m - [1mParsing windows...[0m
[32m2025-03-25 02:15:43.318[0m | [1mINFO    [0m | [36maces.config[0m:[36mload[0m:[36m1350[0m - [1mParsing trigger event...[0m
[32m2025-03-25 02:15:43.318[0m | [1mINFO    [0m | [36maces.config[0m:[36mload[0m:[36m1392[0m - [1mParsing predicates...[0m
[32m2025-03-25 02:15:43.323[0m | [1mINFO    [0m | [36maces.__main__[0m:[36mmain[0m:[36m159[0m - [1mAttempting to get predicates dataframe given:
standard: meds
ts_format: '%m

#### Inspecting Results

The CLI would output a parquet file for each corresponding data split. Let's examine the output for the train split:

In [7]:
import pandas as pd
results = pd.read_parquet(COHORT_DIR / COHORT_NAME / "train" / "0.parquet")

Here is the label distribution of the patients that met our cohort criteria in the train split:

In [8]:
results['boolean_value'].value_counts()

boolean_value
False    52
True      8
Name: count, dtype: int64

Recall that our `label` predicate was set to `death` in our configuration file. Thus, we can interpret this as 8 patients who died, and 52 patients who were discharged from the ICU. The full results provide the exact `subject_id` for all patients that meet our cohort criteria, and `prediction_time` corresponds to the `index_timestamp` defined in our configuration file.

In [9]:
results

Unnamed: 0,subject_id,prediction_time,boolean_value,integer_value,float_value,categorical_value
0,10002428,2156-04-13 16:24:18,False,,,
1,10002428,2156-04-20 18:11:19,False,,,
2,10002428,2156-05-01 21:53:00,False,,,
3,10002428,2156-05-12 14:49:34,False,,,
4,10002495,2141-05-23 20:18:01,False,,,
5,10003400,2137-02-26 23:37:19,False,,,
6,10003400,2137-08-11 19:54:51,False,,,
7,10003400,2137-08-18 17:36:37,True,,,
8,10004235,2196-02-25 17:07:00,False,,,
9,10004422,2111-01-18 09:44:50,False,,,
