# Physionet 2012 Mortality Prediction Challenge

The [Physionet 2012](https://www.physionet.org/content/challenge-2012/1.0.0/) Computing in Cardiology Challenge is to develop methods for patient-specific prediction of in-hospital mortality.

This notebook downloads and prepares the dataset as a tall and a wide table of timeseries observations and as a wide table of static observations. We will use [PySpark](https://spark.apache.org/docs/latest/api/python/index.html) to process the raw data and output parquet formatted datasets.

## Prepare data

Each patient record is tall table in a `.txt` file with columns `Time`, `Parameter`, `Value`. For example:

```
Time,Parameter,Value
00:00,RecordID,132539
00:00,Age,54
00:00,Gender,0
00:00,Height,-1
00:00,ICUType,4
00:00,Weight,-1
00:07,GCS,15
00:07,HR,73
00:07,NIDiasABP,65
00:07,NIMAP,92.33
00:07,NISysABP,147
00:07,RespRate,19
```

We download the data from the Physionet website, concatenate each record into a dataframe and save the table as a `.parquet` file. We will add a column called `Dataset` to indicate the original dataset name. While we process the records, the `Time` column will also be transformed from a timestamp into an integer as minutes from ICU admission.

In [5]:
import os
import io
import urllib
import tarfile
from pathlib import Path
import numpy as np
import pandas as pd
import pyarrow as pa
import pyarrow.dataset as ds

In [6]:
# constants
ROOT = Path("./data")
TIMESERIES_FILE = ROOT / "physionet2012_timeseries.parquet"
STATIC_FILE = ROOT / "physionet2012_static.parquet"
TIMESERIES_WIDE = ROOT / "physionet2012_timeseries_wide.parquet"

Download files

In [7]:
def download_url(url, root, filename=None):
    if not filename:
        filename = os.path.basename(url)
    fpath = os.path.join(root, filename)
    os.makedirs(root, exist_ok=True)
    try:
        urllib.request.urlretrieve(url, fpath)
    except (urllib.error.URLError, IOError) as e:
        if url[:5] == 'https':
            url = url.replace('https:', 'http:')
            urllib.request.urlretrieve(url, fpath)


def unzip(file, root):
    if file.endswith("tar.gz"):
        tar = tarfile.open(file, "r:gz")
        tar.extractall(path=root)
        tar.close()
    if file.endswith("tar"):
        tar = tarfile.open(file, "r:")
        tar.extractall(path=root)

In [9]:
base_url = 'https://physionet.org/files/challenge-2012/1.0.0/'
files = [
    'set-a.tar.gz',
    'set-b.tar.gz',
    'set-c.tar.gz',
    'Outcomes-a.txt',
    'Outcomes-b.txt',
    'Outcomes-c.txt'
]

for file in files:
    print(file)
    download_url(base_url + file, ROOT)
    unzip(os.path.join(ROOT, file), ROOT)

set-a.tar.gz
set-b.tar.gz
set-c.tar.gz
Outcomes-a.txt
Outcomes-b.txt
Outcomes-c.txt


Load each patient record and create a timeseries dataframe and a static variable dataframe.

In [10]:
datasets = ['set-a','set-b','set-c']
outcomes = ['Outcomes-a.txt','Outcomes-c.txt','Outcomes-c.txt']
static_vars = [
    'Age',
    'Gender',
    'Height',
    'ICUType',
    'Weight'
]
id_var = 'RecordID'
time_var = 'Time'
categorical = ['MechVent','Gender','ICUType']


def load_dataset(root, name):
    txt_all = list()
    for f in os.listdir(root / name):
        with open(root / name / f, 'r') as fp:
            txt = fp.readlines()
        # get recordid to add as a column
        recordid = txt[1].rstrip('\n').split(',')[-1]
        txt = [t.rstrip('\n').split(',') + [int(recordid)] for t in txt]
        txt_all.extend(txt[1:])
    df = pd.DataFrame(
        txt_all,
        columns=['Time', 'Parameter', 'Value', 'RecordID']
    )
    df.loc[:,'Dataset'] = name
    return df


def _parse_time_string_to_minutes(s):
    hours, mins = s.split(':')
    return (float(hours) + float(mins)/60) * 60

In [12]:
# load all datasets
df = pd.concat([load_dataset(ROOT, name) for name in datasets])

In [14]:
# drop rows without a parameter name
df = df.loc[df.Parameter!=''].copy()
# convert time to minutes from ICU admission
df.loc[:,'Time'] = df.Time.apply(_parse_time_string_to_minutes)
# drop static vars
cols = static_vars + [id_var]
timeseries = (df.loc[~df.Parameter.isin(cols)].copy())
timeseries = (
    timeseries
    .sort_values(['Dataset','RecordID','Time'])
    .reset_index(drop=True)
)

In [15]:
# static variables as a wide dataframe
static = df.loc[
    df.Parameter.isin(static_vars), 
    ['RecordID','Parameter','Value','Dataset']
].copy()
static = (
    static
    .groupby(['RecordID', 'Parameter', 'Dataset'])
    [['Value']]
    .last()
    .reset_index()
    .pivot(index=['Dataset','RecordID'], columns='Parameter',values='Value')
    .reset_index()
)

In [17]:
# merge outcomes with static vars
out = pd.concat(
    [
        pd.read_csv(ROOT / f).assign(Dataset=datasets[i]) 
        for i, f in enumerate(outcomes)
    ]
)
static = pd.merge(
    static, 
    out, 
    how='inner', 
    left_on=['Dataset','RecordID'],
    right_on=['Dataset','RecordID']
)

Now that we have a timeseries and static dataframe, we can start cleaning the data by replacing missing values and outliers.

In [18]:
for col in static.columns:
    if col == 'Dataset':
        continue
    static.loc[:, col] = pd.to_numeric(static.loc[:, col])
timeseries.loc[:, 'Value'] = pd.to_numeric(timeseries.Value)

static.replace(-1, np.nan, inplace=True)
timeseries.replace(-1, np.nan, inplace=True)
timeseries.dropna(inplace=True)

In [19]:
static.head()

Unnamed: 0,Dataset,RecordID,Age,Gender,Height,ICUType,Weight,SAPS-I,SOFA,Length_of_stay,Survival,In-hospital_death
0,set-a,132539,54,0.0,,4,,6.0,1.0,5.0,,0
1,set-a,132540,76,1.0,175.3,2,81.6,16.0,8.0,8.0,,0
2,set-a,132541,44,0.0,,3,56.7,21.0,11.0,19.0,,0
3,set-a,132543,68,1.0,180.3,3,84.6,7.0,1.0,9.0,575.0,0
4,set-a,132545,88,0.0,,3,,17.0,2.0,4.0,918.0,0


In [20]:
timeseries.head()

Unnamed: 0,Time,Parameter,Value,RecordID,Dataset
0,7.0,GCS,15.0,132539,set-a
1,7.0,HR,73.0,132539,set-a
2,7.0,NIDiasABP,65.0,132539,set-a
3,7.0,NIMAP,92.33,132539,set-a
4,7.0,NISysABP,147.0,132539,set-a


## Plausibility filters

We use a CSV file to store the plausible ranges for every variable.

In [21]:
features_csv = '''
"Variable","NiceName","PlausibilityLower","PlausibilityUpper","Outlier"
"Albumin","Serum Albumin (g/dL)",1.2,4.8,"cap"
"ALP","Alkaline phosphatase (IU/L)",29,400,"cap"
"ALT","Alanine transaminase (IU/L)",6,458,"cap"
"AST","Aspartate transaminase (IU/L)",6,465,"cap"
"Bilirubin","Bilirubin (mg/dL)",0.1,24.8,"cap"
"BUN","Blood urea nitrogen (mg/dL)",3,118,"cap"
"Cholesterol","Cholesterol (mg/dL)",,,"cap"
"Creatinine","Serum creatinine (mg/dL)",0.28,10.23,"cap"
"DiasABP","Invasive diastolic arterial blood pressure (mmHg)",30,120,"cap"
"FiO2","Fractional inspired O2 (0-1)",0.21,1,"cap"
"GCS","Glasgow Coma Score (3-15)",,,"cap"
"Glucose","Serum glucose (mg/dL)",54,447,"cap"
"HCO3","Serum bicarbonate (mmol/L)",5,50,"cap"
"HCT","Hematocrit (%)",18,51,"cap"
"HR","Heart rate (bpm)",46,140,"cap"
"K","Serum potassium (mEq/L)",2.6,6.4,"cap"
"Lactate","Lactate (mmol/L)",0.4,14.2,"cap"
"Mg","Serum magnesium (mmol/L)",1.1,3.5,"cap"
"MAP","Invasive mean arterial blood pressure (mmHg)",40,203,"cap"
"MechVent","Mechanical ventilation respiration",0,1,"cap"
"Na","Serum sodium (mEq/L)",119,157,"cap"
"NIDiasABP","Non-invasive diastolic arterial blood pressure (mmHg)",30,120,"cap"
"NIMAP","Non-invasive mean arterial blood pressure (mmHg)",40,203,"cap"
"NISysABP","Non-invasive systolic arterial blood pressure (mmHg)",40,200,"cap"
"PaCO2","partial pressure of arterial CO2 (mmHg)",18,103,"cap"
"PaO2","Partial pressure of arterial O2 (mmHg)",0,550,"cap"
"pH","Arterial pH (0-14)",6,9,"cap"
"Platelets","Platelets (cells/nL)",,,"cap"
"RespRate","Respiration rate (bpm)",0,42,"cap"
"SaO2","O2 saturation in hemoglobin (%)",80,100,"cap"
"SysABP","Invasive systolic arterial blood pressure (mmHg)",40,200,"cap"
"Temp","Temperature (C)",32.94,40,"cap"
"TroponinI","Troponin-I (ug/L)",,,"cap"
"TroponinT","Troponin-T (ug/L)",,,"cap"
"Urine","Urine output (mL)",,,"cap"
"WBC","White blood cell count (cells/nL)",1.1,39.12,"cap"
"Weight","Weight (kg)",35,299,"cap"
'''

features = pd.read_csv(io.StringIO(features_csv)).set_index('Variable')

We will use [apache-spark](https://spark.apache.org/) to process the datasets. PySpark lets us write modular code that can be reused across similar datasets and scales well as the dataset size gets larger. The PySpark API is also very functional and the builder design pattern to construct queries avoids messy string formatting and concatenation. Unlike Pandas, PySpark has query optimization to efficiently process the data.

To install `apache-spark` on MacOS, run `brew install apache-spark`. The code below also needs the python packages `pyspark` and `findspark`.

In [22]:
def init_spark(appName="MyApp", memory=12):
    """
    This function assumes you already have SPARK_HOME and PYSPARK_SUBMIT_ARGS 
    environment variables set. Requires findspark and PySpark.
    """
    import os
    import findspark

    def _parse_master(pyspark_submit_args):
        sargs = pyspark_submit_args.split()
        for j, sarg in enumerate(sargs):
            if sarg == "--master":
                try:
                    return sargs[j + 1]
                except:
                    raise Exception("Could not parse master from PYSPARK_SUBMIT_ARGS")
        raise Exception("Could not parse master from PYSPARK_SUBMIT_ARGS")

    if "SPARK_HOME" not in os.environ:
        raise Exception("SPARK_HOME environment variable not set.")
    if "PYSPARK_SUBMIT_ARGS" not in os.environ:
        os.environ[
            "PYSPARK_SUBMIT_ARGS"
        ] = f"--master local[12] --driver-memory {memory}g --executor-memory {memory}g pyspark-shell"
    if "PYSPARK_SUBMIT_ARGS" not in os.environ:
        # export PYSPARK_SUBMIT_ARGS = " --master local[8] --driver-memory 8g --executor-memory 8g pyspark-shell"
        raise Exception("PYSPARK_SUNBMIT_ARGS environment variable not set.")
    findspark.init(os.environ["SPARK_HOME"])
    spark_master = _parse_master(os.environ["PYSPARK_SUBMIT_ARGS"])

    from pyspark.sql import SparkSession

    spark = SparkSession.builder.master(spark_master).appName(appName).getOrCreate()
    return spark


In [23]:
os.environ.setdefault('SPARK_HOME', '/usr/local/Cellar/apache-spark/3.2.0/libexec')
spark = init_spark(memory=4)
print(spark.sparkContext.uiWebUrl)

http://macc02y44r2jg5j:4040


In [25]:
from pyspark.sql.window import Window
import pyspark.sql.functions as F

In [26]:
# load the dataframes in spark
timeseries = spark.createDataFrame(timeseries)
static = spark.createDataFrame(static)

We process the tall dataframe by applying plausibility filters on all columns with special handling for pH, Temperature, and FiO2.

In [27]:
parameter_var = 'Parameter'
value_var = 'Value'
parameter_col = F.col(parameter_var)
value_col = F.col(value_var)

# special pre-processing
value = (
    F.when((parameter_col == 'pH') & (value_col<0.8) & (value_col>0.65), value_col * 10)
    .when((parameter_col == 'pH') & (value_col<80) & (value_col>65), value_col * 0.1)
    .when((parameter_col == 'pH') & (value_col<800) & (value_col>650), value_col * 0.01)
    .when((parameter_col == 'FiO2') & (value_col>1) & (value_col<=100), value_col * 0.1)
    .when((parameter_col == 'FiO2') & (value_col<0.21), 0.21)
    .when((parameter_col == 'Temp') & (value_col>1) & (value_col<10), value_col*9/5+32)
    .when((parameter_col == 'Temp') & (value_col>95) & (value_col<113), (value_col-32)*5/9)
)

# use the features CSV file to remove outliers
for var in features.index:
    upper, lower = features.loc[var,['PlausibilityUpper','PlausibilityLower']]
    if np.isnan(upper) & np.isnan(lower):
        continue
    if ~np.isnan(lower):
        value = value.when((parameter_col == var) & (value_col < lower), lower)
    if ~np.isnan(upper):
        value = value.when((parameter_col == var) & (value_col > upper), upper)

value = value.otherwise(value_col).alias(value_var)

timeseries = (
    timeseries
    .select(
        'RecordID',
        'Dataset',
        'Time',
        'Parameter',
        value
    )
    .filter(value_col.isNotNull())
)

Save the cleaned tables

In [33]:
timeseries.write.mode('overwrite').partitionBy('Dataset').parquet(str(TIMESERIES_FILE))
static.write.mode('overwrite').parquet(str(STATIC_FILE))

Transform from tall to wide table and resample to the last value in each hour.

In [34]:
wide_cols = ['Dataset','RecordID','Bloc']
for var in features.index:
    wide_cols.append(F.when(parameter_col == var, value_col).otherwise(None).alias(var))

wide = (
    timeseries
    .select(wide_cols)
    .groupBy(['Dataset','RecordID','Time'])
    .agg(*[F.last(F.col(var)).alias(var) for var in features.index])
    .orderBy(['RecordID','Time'], ascending=True)
)

In [35]:
# write to disk
(
    wide
    .write
    .mode('overwrite')
    .partitionBy('Dataset')
    .parquet(str(TIMESERIES_WIDE))
)

Load the wide table

In [53]:
dx = pd.read_parquet(TIMESERIES_WIDE)

In [54]:
dx.head()

Unnamed: 0,RecordID,Bloc,Albumin,ALP,ALT,AST,Bilirubin,BUN,Cholesterol,Creatinine,...,RespRate,SaO2,SysABP,Temp,TroponinI,TroponinT,Urine,WBC,Weight,Dataset
0,132539,1,,,,,,,,,...,,,,,,,60.0,,,set-a
1,132539,2,,,,,,,,,...,,,,,,,30.0,,,set-a
2,132539,3,,,,,,,,,...,,,,,,,170.0,,,set-a
3,132539,4,,,,,,,,,...,,,,,,,60.0,,,set-a
4,132539,5,,,,,,,,,...,20.0,,,,,,,,,set-a


Cleanup: delete downloaded files and keep only the derived parquet files.

In [50]:
import shutil
def delete(file) -> None:
    if os.path.isdir(file):
        shutil.rmtree(file)
    else:
        if os.path.exists(file):
            os.remove(file)
            
keep = [
    TIMESERIES_FILE,
    STATIC_FILE,
    TIMESERIES_WIDE
]
for f in ROOT.glob("*"):
    if f not in keep:
        print(f)
        delete(f)

data/set-b
data/set-c
data/set-a


In [1]:
!jupyter nbconvert --to markdown physionet2012-prepare-data.ipynb

[NbConvertApp] Converting notebook physionet2012-prepare-data.ipynb to markdown
[NbConvertApp] Writing 19549 bytes to physionet2012-prepare-data.md
