# Big Data and Cloud Computing - Assignment #2

Group AC

- Bárbara Nóbrega Galiza - 202408654
- Carolina Nunes Valente Pires - 202408704
- Cláudia Oliveira -  202005668

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, count, stddev, when, isnan, ceil, coalesce, to_timestamp, countDistinct, collect_list, lit, mean, min, max, sum, first, udf
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
import time
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np 
from functools import reduce
from pyspark.ml.feature import Imputer
from pyspark.sql.types import IntegerType
import joblib
from sklearn.model_selection import train_test_split, GridSearchCV, KFold
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
from sklearn.metrics import (
    mean_absolute_error, mean_squared_error, r2_score,
    mean_absolute_percentage_error, make_scorer
)
from sklearn.linear_model import LinearRegression, Ridge, Lasso
from sklearn.ensemble import RandomForestRegressor
from xgboost import XGBRegressor
from sklearn.tree import DecisionTreeRegressor

In [None]:
#import os
#os.environ["PYSPARK_PYTHON"] = r"C:\Users\claud\AppData\Local\Programs\Python\Python39\python.exe"
#os.environ["PYSPARK_DRIVER_PYTHON"] = r"C:\Users\claud\AppData\Local\Programs\Python\Python39\python.exe"

In [3]:
spark = SparkSession.builder \
    .appName("ICU Length of Stay Prediction") \
    .config("spark.executor.memory", "4g") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()

This next chunk of code, differently from pandas, does not load the data immediatly to memory, it only builds a logical plan for the computation. However, it still take some time because it also does file discovery and infers schema from a sample (still an I/O operation).

In [4]:
start_time = time.time()

icustays = spark.read.csv("tables/ICUSTAYS.csv", header=True, inferSchema=True)
chartevents = spark.read.csv("tables/CHARTEVENTS_BIG.csv", header=True, inferSchema=True)
diagnoses = spark.read.csv('tables/DIAGNOSES_ICD.csv', header=True, inferSchema=True)
diagnosis_names = spark.read.csv('tables/D_ICD_DIAGNOSES.csv', header=True, inferSchema=True)
admissions = spark.read.csv('tables/ADMISSIONS.csv', header=True, inferSchema=True)
patients = spark.read.csv('tables/PATIENTS.csv', header=True, inferSchema=True)
items = spark.read.csv('tables/D_ITEMS.csv', header=True, inferSchema=True)


end_time = time.time()

print(f"Total CSV load time: {end_time - start_time:.2f} seconds")

Total CSV load time: 394.53 seconds


Checking if inferSchema worked:

In [5]:
print(icustays.printSchema())
print(chartevents.printSchema())
print(diagnoses.printSchema())
print(admissions.printSchema())
print(patients.printSchema())
print(items.printSchema())

root
 |-- ROW_ID: integer (nullable = true)
 |-- SUBJECT_ID: integer (nullable = true)
 |-- HADM_ID: integer (nullable = true)
 |-- ICUSTAY_ID: integer (nullable = true)
 |-- DBSOURCE: string (nullable = true)
 |-- FIRST_CAREUNIT: string (nullable = true)
 |-- LAST_CAREUNIT: string (nullable = true)
 |-- FIRST_WARDID: integer (nullable = true)
 |-- LAST_WARDID: integer (nullable = true)
 |-- INTIME: timestamp (nullable = true)
 |-- OUTTIME: timestamp (nullable = true)
 |-- LOS: double (nullable = true)

None
root
 |-- ROW_ID: integer (nullable = true)
 |-- SUBJECT_ID: integer (nullable = true)
 |-- HADM_ID: integer (nullable = true)
 |-- ICUSTAY_ID: integer (nullable = true)
 |-- ITEMID: integer (nullable = true)
 |-- CHARTTIME: timestamp (nullable = true)
 |-- STORETIME: timestamp (nullable = true)
 |-- CGID: integer (nullable = true)
 |-- VALUE: string (nullable = true)
 |-- VALUENUM: double (nullable = true)
 |-- VALUEUOM: string (nullable = true)
 |-- ERROR: integer (nullable = tru

It appears to have worked.

Now, let's remove unnecessary columns to make computation more efficient:

Columns to be kept: 

- icustays: subject_id, hadm_id, icustay_id, intime, los
- chartevents: subject_id, icustay_id, itemid, charttime, value
- diagnoses: subject_id, hadm_id, seq_num, icd9_code
- admissions: subject_id, hadm_id, deathtime
- patients: subject_id, dob
- items: itemid, label

In [6]:
icustays = icustays.select("SUBJECT_ID", "HADM_ID", "ICUSTAY_ID", "INTIME", "LOS")
chartevents = chartevents.select("SUBJECT_ID", "ICUSTAY_ID", "ITEMID", "CHARTTIME", "VALUE")
diagnoses = diagnoses.select("SUBJECT_ID", "HADM_ID", "SEQ_NUM", "ICD9_CODE")
admissions = admissions.select("SUBJECT_ID", "HADM_ID", "DEATHTIME")
patients = patients.select("SUBJECT_ID", "DOB")
items = items.select("ITEMID", "LABEL")

Tables description:

In [7]:
start_time = time.time()

print(icustays.describe().show())
print(diagnoses.describe().show())
print(admissions.describe().show())
print(patients.describe().show())
print(items.describe().show())

end_time = time.time()

print(f"Total time: {end_time - start_time:.2f} seconds")

+-------+------------------+------------------+------------------+-----------------+
|summary|        SUBJECT_ID|           HADM_ID|        ICUSTAY_ID|              LOS|
+-------+------------------+------------------+------------------+-----------------+
|  count|             61532|             61532|             61532|            61522|
|   mean|  33888.6059123708| 149954.4706494182|249962.71024832607|4.917971580897899|
| stddev|28127.690913330127|28898.895903803314|  28890.5748673448| 9.63878425110918|
|    min|                 2|            100001|            200001|           1.0E-4|
|    max|             99999|            199999|            299999|         173.0725|
+-------+------------------+------------------+------------------+-----------------+

None
+-------+------------------+------------------+------------------+------------------+
|summary|        SUBJECT_ID|           HADM_ID|           SEQ_NUM|         ICD9_CODE|
+-------+------------------+------------------+----------

In [8]:
start_time = time.time()

print(chartevents.describe().show())

end_time = time.time()

print(f"Total time: {end_time - start_time:.2f} seconds")

+-------+------------------+------------------+------------------+------------------+
|summary|        SUBJECT_ID|        ICUSTAY_ID|            ITEMID|             VALUE|
+-------+------------------+------------------+------------------+------------------+
|  count|         330712483|         330414954|         330712483|         328641134|
|   mean|31300.602687235125| 250195.2241299678|  75668.3713958992| 77.32892799832888|
| stddev|27009.800314481552|28732.974394786408|104358.27040968732|2130.3461492287856|
|    min|                 2|            200001|                 1|             \tCDI|
|    max|             99999|            299999|            228451|              ~150|
+-------+------------------+------------------+------------------+------------------+

None
Total time: 610.36 seconds


The time necessary to apply describe, that is, calculate summary statistics is much bigger for the chartevents alone then for all the other tables, which is expected due to its size. It confirms once again chartevents will be the bottlencek of this ML pipeline.

## Data preprocessing 

### Filtering based on window

Filter the chartevents to only have events from the first 2 days of the ICU stay:

In [9]:
start_time = time.time()

spark.conf.set("spark.sql.session.timeZone", "UTC")

icustays = icustays.withColumn("INTIME", to_timestamp("INTIME"))
chartevents = chartevents.withColumn("CHARTTIME", to_timestamp("CHARTTIME"))

merged = chartevents.join(
    icustays.select("ICUSTAY_ID", "INTIME", "LOS", "HADM_ID"),
    on="ICUSTAY_ID",
    how="inner"
)

merged = merged.withColumn(
    "DAYS_FROM_INTIME",
    (col("CHARTTIME").cast("long") - col("INTIME").cast("long")) / 86400.0
)

merged = merged.withColumn(
    "DAYS_FROM_INTIME",
    when(col("DAYS_FROM_INTIME") < 0, lit(0)).otherwise(col("DAYS_FROM_INTIME"))
)

chartevents_2nd_day = merged.filter(col("DAYS_FROM_INTIME") < 2).drop("DAYS_FROM_INTIME", "CHARTTIME")

end_time = time.time()

print(f"Total time: {end_time - start_time:.2f} seconds")

Total time: 0.24 seconds


Fast because just the DAG was built. Real loading of the csv, merging, filtering and counting computation happens in the next cell:

In [10]:
start_time = time.time()

print(chartevents_2nd_day.count())

end_time = time.time()

print(f"Total time: {end_time - start_time:.2f} seconds")

117272707
Total time: 300.68 seconds


From 330M to 117M, a meagninful reduction, however 1/3 is still a good amount of  data. It means 1/3 of the chartevents were recorded within the first 48h of the ICU stay.

Checking columns: 

In [11]:
chartevents_2nd_day.columns

['ICUSTAY_ID', 'SUBJECT_ID', 'ITEMID', 'VALUE', 'INTIME', 'LOS', 'HADM_ID']

Since our goal is to predict the LOS of the patient after they stayed there for 2 days, it makes no sense to consider data of patients that died within the first 48h. Therefore, we first cut these patients' stays, then compute the top 5 most common items for the remainig ICU stays. Following this order makes computation more efficient, since we will only compute the top 5 based on a smaller number of chartevents, and assures unbiased results (top 5 are calculated only with valid ICU stays).

In [12]:
joined = chartevents_2nd_day.join(
    admissions.select("HADM_ID", "DEATHTIME"),
    on="HADM_ID",
    how="left"
)

joined = joined.withColumn("DEATHTIME", to_timestamp("DEATHTIME"))
joined = joined.withColumn("INTIME", to_timestamp("INTIME"))

joined = joined.withColumn(
    "HOURS_TO_DEATH",
    (col("DEATHTIME").cast("double") - col("INTIME").cast("double")) / 3600.0
)

alive_48h = joined.filter(
    col("DEATHTIME").isNull() |
    (col("HOURS_TO_DEATH") >= 48)
).drop("DEATHTIME", "INTIME", "HOURS_TO_DEATH")

In [13]:
alive_48h.columns

['HADM_ID', 'ICUSTAY_ID', 'SUBJECT_ID', 'ITEMID', 'VALUE', 'LOS']

In [14]:
start_time = time.time()

print(alive_48h.count())

end_time = time.time()

print(f"Total time: {end_time - start_time:.2f} seconds")

114801472
Total time: 281.26 seconds


About 3M rows were removed, with the total time being very close to when the removal was of 2/3 (~200M rows) of the data. This shows that in Spark, performance is driven more by the complexity of operations (in this case both used joins, colum creation with math and filters), rather than just the number of rows.

### Adding items (chartevents) as columns

Now, we keep only the ICU stays that have the top 5 most common items that will be used for modeling. Since there are multiple items and there will be one or more columns based on each of them, the goal here is to limit the number of columns in the dataset fed to the model. By picking the top 5, we minimize the number of nulls we will have when creating the columns.

In [15]:
start_time = time.time()

top_items = alive_48h.groupBy("ITEMID").count().orderBy(col("count").desc()).limit(15)

top_items_with_labels = top_items.join(
    items.select("ITEMID", "LABEL"),
    on="ITEMID",
    how="left"
)

top_items_with_labels.orderBy(col("count").desc()).show(truncate=False)

end_time = time.time()

print(f"Total time: {end_time - start_time:.2f} seconds")

+------+-------+---------------------------+
|ITEMID|count  |LABEL                      |
+------+-------+---------------------------+
|211   |1542598|Heart Rate                 |
|742   |1336371|calprevflg                 |
|618   |1306673|Respiratory Rate           |
|646   |1299691|SpO2                       |
|212   |1270531|Heart Rhythm               |
|161   |1242103|Ectopy Type                |
|220045|1195309|Heart Rate                 |
|220210|1184042|Respiratory Rate           |
|220277|1154033|O2 saturation pulseoxymetry|
|128   |1148774|Code Status                |
|550   |1142485|Precautions                |
|1125  |1089363|Service Type               |
|159   |956154 |Ectopy Frequency           |
|220048|884684 |Heart Rhythm               |
|227969|822776 |Safety Measures_U_1        |
+------+-------+---------------------------+

Total time: 294.61 seconds


From these, we can see 742 is not actually a measurament but a flag, so we cannot use it. We need also to inspect if the remaining items are valid.

In [16]:
alive_48h.filter(col("ITEMID") == 212).select("VALUE").show(n=5) 

+------------+
|       VALUE|
+------------+
|Normal Sinus|
|Normal Sinus|
|    AV Paced|
|Normal Sinus|
|Normal Sinus|
+------------+
only showing top 5 rows



After testing for all codes, we see the only numerical ones in the top 15 are Heart Rate (212 and 220045), Respiratory Rate (618 and 220210), and Sp02/O2 saturation pulseoxymetry (646 and 220277). The other two valid measurements are Heart Rhythm (212 and 220048) and Ectopy Type (161), but they are categorical. We could create one-hot columns for each value then compute the frequency for each stay. However, we inspected and saw there are different sets of categories for each ID and a very big amount (more than 20) of distintic values for Heart Rhythm. The same happens for Ectopy Type. Therefore we chose to only use the numerical variables.

In [17]:
alive_48h.filter(col("ITEMID") == 161).groupBy("VALUE").count().orderBy("count", ascending=False).show(truncate=False)

+----------------+-------+
|VALUE           |count  |
+----------------+-------+
|None            |1059239|
|PVC's           |134907 |
|PAC's           |43953  |
|Vent. Bigeminy  |1443   |
|NULL            |974    |
|Atrial Bigeminy |421    |
|Vent. Trigeminy |331    |
|Nod/Junc Escape |312    |
|PNC's           |252    |
|Fusion Beats    |112    |
|Vent. Escape    |85     |
|Atrial Trigeminy|32     |
|V Quadrigeminy  |22     |
|Nodal Bigeminy  |18     |
|Nodal Trigeminy |2      |
+----------------+-------+



Filtering based on the selected items:

In [18]:
items_filtered = alive_48h.filter(col("ITEMID").isin([211, 618, 646, 220045, 220210, 220277])).filter(col("VALUE").isNotNull())

start_time = time.time()

print(items_filtered.count())

end_time = time.time()

print(f"Total time: {end_time - start_time:.2f} seconds")

7661849
Total time: 240.73 seconds


The data reduced a lot but is still big, with 7.6M rows. Only applying filter was 20s quicker then applying joins and filters like before.

Normalize column names so they can be grouped:

In [19]:
items_filtered = items_filtered.join(
    items.select("ITEMID", "LABEL"),
    on="ITEMID",
    how="left"
)

items_filtered = items_filtered.replace({
    "O2 saturation pulseoxymetry": "SpO2",
}, subset=["LABEL"])

# items_filtered.filter(col("ITEMID") == 220277).select("LABEL").distinct().show() -> worked, no need to run everytime

The current dataset we have is one row by chartevent, so it has duplicated ICU stays. So, to check how much unique ICU stays we have:

In [20]:
items_filtered.select("ICUSTAY_ID").distinct().count()

58113

It is a much smaller number, but still reasonable.

Pivoting the data to get one row by ICU stay:

In [21]:
start_time = time.time()

num_agg_long = items_filtered.groupBy("ICUSTAY_ID", "LABEL").agg(
    mean("VALUE").alias("val_mean"),
    min("VALUE").alias("val_min"),
    max("VALUE").alias("val_max")
)

icu_features = num_agg_long.groupBy("ICUSTAY_ID").pivot("LABEL").agg(
    first("val_mean").alias("mean"),
    first("val_min").alias("min"),
    first("val_max").alias("max")
)

end_time = time.time()

print(f"Total time: {end_time - start_time:.2f} seconds")

Total time: 232.59 seconds


Instead of computing twice by calling show() and then count(), we use cache so the computation can be triggered just once and run in half of the time.

In [22]:
#icu_features.unpersist()

if not icu_features.is_cached:
    icu_features.cache()

In [23]:
start_time = time.time()

icu_features.show(5)
print(icu_features.count())

end_time = time.time()

print(f"Total time: {end_time - start_time:.2f} seconds")

+----------+-----------------+--------------+--------------+---------------------+--------------------+--------------------+-----------------+--------+--------+
|ICUSTAY_ID|  Heart Rate_mean|Heart Rate_min|Heart Rate_max|Respiratory Rate_mean|Respiratory Rate_min|Respiratory Rate_max|        SpO2_mean|SpO2_min|SpO2_max|
+----------+-----------------+--------------+--------------+---------------------+--------------------+--------------------+-----------------+--------+--------+
|    200166|78.51428571428572|            71|            90|   14.098591549295774|                  10|                   7|            100.0|     100|     100|
|    200379|76.22058823529412|            69|            91|   12.956521739130435|                  10|                   9|            97.25|     100|      99|
|    200625|81.20689655172414|           105|            85|   20.053571428571427|                  13|                  33|95.64814814814815|     100|      99|
|    200687|            143.5|    

Now, we can see there are some null values, due to performing an outer join. So now we check how many rows have any missing values:

In [24]:
null_rows = icu_features.filter(
    reduce(lambda a, b: a | b, (col(c).isNull() for c in icu_features.columns))
)

null_rows.count()

7740

In [25]:
icu_features.select([count(when(col(c).isNull(), c)).alias(c) for c in icu_features.columns]).show()

+----------+---------------+--------------+--------------+---------------------+--------------------+--------------------+---------+--------+--------+
|ICUSTAY_ID|Heart Rate_mean|Heart Rate_min|Heart Rate_max|Respiratory Rate_mean|Respiratory Rate_min|Respiratory Rate_max|SpO2_mean|SpO2_min|SpO2_max|
+----------+---------------+--------------+--------------+---------------------+--------------------+--------------------+---------+--------+--------+
|         0|              0|             0|             0|                 7728|                7712|                7712|     7699|    7699|    7699|
+----------+---------------+--------------+--------------+---------------------+--------------------+--------------------+---------+--------+--------+



Here we see we have 7740 rows with missing values, which represents about 13% of total data. Dropping would mean losing an import amount of data, and we can see we would lose heart rate information, since there are no missing values in those columns. Therefore, we will perform imputation. We will use the median because it is more robust to outliers. Since these measuraments are often taken automatically and in the same machine, their absence might not be related to the patient's state, so it is safer to imput values. Also, since our focus for this project is performance rather then the results themselves, it becomes more acceptable to do it.

Casting to double since some columns had type string:

In [26]:
icu_features = icu_features.withColumn("Heart Rate_min", col("Heart Rate_min").cast("double"))
icu_features = icu_features.withColumn("Heart Rate_max", col("Heart Rate_max").cast("double"))
icu_features = icu_features.withColumn("Respiratory Rate_min", col("Respiratory Rate_min").cast("double"))
icu_features = icu_features.withColumn("Respiratory Rate_max", col("Respiratory Rate_max").cast("double"))
icu_features = icu_features.withColumn("SpO2_min", col("SpO2_min").cast("double"))
icu_features = icu_features.withColumn("SpO2_max", col("SpO2_max").cast("double"))

In [27]:
cols_to_impute = ["Respiratory Rate_mean", "Respiratory Rate_min", "Respiratory Rate_max",
                  "SpO2_mean", "SpO2_min", "SpO2_max"]

imputer = Imputer(
    inputCols=cols_to_impute,
    outputCols=cols_to_impute,  
    strategy="median"
)

icu_features_imputed = imputer.fit(icu_features).transform(icu_features)

icu_features_imputed.select([count(when(col(c).isNull(), c)).alias(c) for c in icu_features.columns]).show()

+----------+---------------+--------------+--------------+---------------------+--------------------+--------------------+---------+--------+--------+
|ICUSTAY_ID|Heart Rate_mean|Heart Rate_min|Heart Rate_max|Respiratory Rate_mean|Respiratory Rate_min|Respiratory Rate_max|SpO2_mean|SpO2_min|SpO2_max|
+----------+---------------+--------------+--------------+---------------------+--------------------+--------------------+---------+--------+--------+
|         0|              0|             0|             0|                    0|                   0|                   0|        0|       0|       0|
+----------+---------------+--------------+--------------+---------------------+--------------------+--------------------+---------+--------+--------+



### Adding disease (diagnose)

First, we need to check how many different diagnoses (ICD9 codes) exist, to decide whether we need to group them or not before transforming them in columns.

In [28]:
diagnoses.select("ICD9_CODE").distinct().count()

6985

~7k is a very high number, so it is better to group these codes. We will use the chapter division defined by the [ICD9 code guidelines](https://www.ama-assn.org/sites/ama-assn.org/files/corp/media-browser/public/cpt/icd9cm_coding_guidelines_08_09_full_0.pdf), which will lead to 19 groups.

In [29]:
groups = [
    ("Infectious", 1, 139),
    ("Neoplasms", 140, 239),
    ("Endocrine_Nutritional", 240, 279),
    ("Blood", 280, 289),
    ("Mental", 290, 319),
    ("Nervous", 320, 389),
    ("Circulatory", 390, 459),
    ("Respiratory", 460, 519),
    ("Digestive", 520, 579),
    ("Genitourinary", 580, 629),
    ("Pregnancy_Childbirth", 630, 679),
    ("Skin", 680, 709),
    ("Musculoskeletal", 710, 739),
    ("Congenital", 740, 759),
    ("Perinatal", 760, 779),
    ("Symptoms", 780, 799),
    ("Injury_Poisoning", 800, 999),
]

def icd9_first3_num(icd9):
    if icd9 is None:
        return None
    if icd9.startswith('V') or icd9.startswith('E'):
        return None
    first3 = icd9[:3]
    try:
        return int(first3)
    except:
        return None
    

icd9_first3_num_udf = udf(icd9_first3_num, IntegerType())

df = diagnoses.withColumn("ICD9_FIRST3", icd9_first3_num_udf(col("ICD9_CODE")))

# V and E codes flags remain the same
df = df.withColumn("V_Code", when(col("ICD9_CODE").startswith("V"), 1).otherwise(0))
df = df.withColumn("E_Code", when(col("ICD9_CODE").startswith("E"), 1).otherwise(0))

# Create group columns using first 3 digits numeric
for group_name, start, end in groups:
    df = df.withColumn(
        group_name,
        when((col("ICD9_FIRST3") >= start) & (col("ICD9_FIRST3") <= end), 1).otherwise(0)
    )

agg_cols = [max(group_name).alias(group_name) for group_name, _, _ in groups] + [
    max("V_Code").alias("V_Code"),
    max("E_Code").alias("E_Code"),
]

df_grouped = df.groupBy("HADM_ID").agg(*agg_cols)

df_grouped.show()

+-------+----------+---------+---------------------+-----+------+-------+-----------+-----------+---------+-------------+--------------------+----+---------------+----------+---------+--------+----------------+------+------+
|HADM_ID|Infectious|Neoplasms|Endocrine_Nutritional|Blood|Mental|Nervous|Circulatory|Respiratory|Digestive|Genitourinary|Pregnancy_Childbirth|Skin|Musculoskeletal|Congenital|Perinatal|Symptoms|Injury_Poisoning|V_Code|E_Code|
+-------+----------+---------+---------------------+-----+------+-------+-----------+-----------+---------+-------------+--------------------+----+---------------+----------+---------+--------+----------------+------+------+
| 192988|         0|        0|                    0|    0|     0|      0|          1|          0|        0|            0|                   0|   0|              0|         0|        0|       0|               0|     0|     0|
| 124411|         0|        0|                    1|    0|     0|      0|          1|          0|   

Now, we merge with the icu_features_imputed and items_filtered dataframes:

In [30]:
patient_info = items_filtered.select('HADM_ID', 'ICUSTAY_ID', 'SUBJECT_ID', 'LOS').distinct()
final_df = icu_features_imputed.join(patient_info, on="ICUSTAY_ID", how="inner")
final_df = final_df.join(df_grouped, on="HADM_ID", how="left")

Again, let`s cache to perform two computations over the same df:

In [31]:
#final_df.unpersist()

if not final_df.is_cached:
    final_df.cache()

In [32]:
start_time = time.time()

print(final_df.count())
print(final_df.select([count(when(col(c).isNull(), c)).alias(c) for c in final_df.columns]).show())

end_time = time.time()

print(f"Total time: {end_time - start_time:.2f} seconds")

58113
+-------+----------+---------------+--------------+--------------+---------------------+--------------------+--------------------+---------+--------+--------+----------+---+----------+---------+---------------------+-----+------+-------+-----------+-----------+---------+-------------+--------------------+----+---------------+----------+---------+--------+----------------+------+------+
|HADM_ID|ICUSTAY_ID|Heart Rate_mean|Heart Rate_min|Heart Rate_max|Respiratory Rate_mean|Respiratory Rate_min|Respiratory Rate_max|SpO2_mean|SpO2_min|SpO2_max|SUBJECT_ID|LOS|Infectious|Neoplasms|Endocrine_Nutritional|Blood|Mental|Nervous|Circulatory|Respiratory|Digestive|Genitourinary|Pregnancy_Childbirth|Skin|Musculoskeletal|Congenital|Perinatal|Symptoms|Injury_Poisoning|V_Code|E_Code|
+-------+----------+---------------+--------------+--------------+---------------------+--------------------+--------------------+---------+--------+--------+----------+---+----------+---------+---------------------+

These two joins were faster then some previous operations using joins and filters, possibly due to lower amount of data. 

The data is now ready for modeling and visualization.

## Visualizations

In [33]:
""" los_values = icustays.select("LOS").dropna().rdd.flatMap(lambda x: x).collect() # rdd: more efficient 

max_los = int(np.ceil(max(los_values)))

print(max(los_values))

plt.figure(figsize=(16, 6))
plt.hist(los_values, bins=np.arange(0, max_los + 1, 1), edgecolor='black', color='skyblue')
plt.title("LOS distribution in the MIMIC dataset")
plt.ylabel("Count")
plt.xlabel("LOS (Days)")
plt.xticks(np.arange(0, max_los + 1, step=1), rotation=90)
plt.tight_layout()
plt.show() """

' los_values = icustays.select("LOS").dropna().rdd.flatMap(lambda x: x).collect() # rdd: more efficient \n\nmax_los = int(np.ceil(max(los_values)))\n\nprint(max(los_values))\n\nplt.figure(figsize=(16, 6))\nplt.hist(los_values, bins=np.arange(0, max_los + 1, 1), edgecolor=\'black\', color=\'skyblue\')\nplt.title("LOS distribution in the MIMIC dataset")\nplt.ylabel("Count")\nplt.xlabel("LOS (Days)")\nplt.xticks(np.arange(0, max_los + 1, step=1), rotation=90)\nplt.tight_layout()\nplt.show() '

-----------------------------

## Modelling

We first convert our final dataset to a Pandas DataFrame, since the initial data processing significantly reduced its size. This allows us to efficiently explore the use of scikit-learn pipelines and joblib in this modeling phase

In [50]:
df_model = final_df.toPandas()

X = df_model.drop(columns=['LOS', 'HADM_ID', 'ICUSTAY_ID'])
y = df_model['LOS']

X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42
)

scorer = make_scorer(lambda y_true, y_pred: np.sqrt(mean_squared_error(y_true, y_pred)), greater_is_better=False)
kf = KFold(n_splits=5, shuffle=True, random_state=42)

results = []

To maintain cleaner and more organized code, here are our evaluation and training functions:

In [51]:
def evaluate_and_store(model_name, y_test, y_pred, duration, mode):
    mae = mean_absolute_error(y_test, y_pred)
    mse = mean_squared_error(y_test, y_pred)
    rmse = np.sqrt(mse)
    r2 = r2_score(y_test, y_pred)
    mape = mean_absolute_percentage_error(y_test, y_pred) * 100

    results.append({
        "Model": model_name,
        "Mode": mode,
        "Time (s)": round(duration, 2),
        "MAE": round(mae, 2),
        "MSE": round(mse, 2),
        "RMSE": round(rmse, 2),
        "R²": round(r2, 2),
        "MAPE (%)": round(mape, 2)
    })

def run_manual(model_name, model):
    start = time.time()
    scaler = StandardScaler()
    X_train_scaled = scaler.fit_transform(X_train)
    X_test_scaled = scaler.transform(X_test)

    model.fit(X_train_scaled, y_train)
    y_pred = model.predict(X_test_scaled)
    duration = time.time() - start

    evaluate_and_store(model_name, y_test, y_pred, duration, "Manual")

def run_pipeline_no_grid(model_name, model):
    pipeline = Pipeline([
        ('scaler', StandardScaler()),
        ('regressor', model)
    ])

    start = time.time()
    pipeline.fit(X_train, y_train)
    y_pred = pipeline.predict(X_test)
    duration = time.time() - start

    evaluate_and_store(model_name, y_test, y_pred, duration, "Pipeline (no GridSearch)")

def run_pipeline_grid(model_name, model, param_grid, n_jobs_val):
    pipeline = Pipeline([
        ('scaler', StandardScaler()),
        ('regressor', model)
    ])

    grid = GridSearchCV(
        estimator=pipeline,
        param_grid=param_grid,
        scoring=scorer,
        cv=kf,
        n_jobs=n_jobs_val
    )

    start = time.time()
    grid.fit(X_train, y_train)
    duration = time.time() - start

    y_pred = grid.best_estimator_.predict(X_test)
    label = f"Pipeline + GridSearch (n_jobs={n_jobs_val})"
    evaluate_and_store(model_name, y_test, y_pred, duration, label)

To evaluate both performance and execution time using joblib and Pipeline, we test four different regression models, some more simpler than others. Also, we're testing 3 variations: No pipeline, no parallel; Pipeline, no parallel (n_jobs=1); Pipeline + GridSearch + joblib

In [None]:
models_info = [
    ("Linear Regression", LinearRegression(), {}),
    ("Ridge Regression", Ridge(), {
        'regressor__alpha': [0.01, 0.1, 1.0, 10.0, 100.0],
        'regressor__solver': ['auto'],
        'regressor__max_iter': [1000]
    }),
    ("Decision Tree", DecisionTreeRegressor(random_state=42), {
        'regressor__max_depth': [5, 10, None],
        'regressor__min_samples_split': [2, 5, 10]
    }),
    
    ("Random Forest", RandomForestRegressor(random_state=42), {
        'regressor__n_estimators': [100],
        'regressor__max_depth': [10, None]
    }),
    ("XGBoost", XGBRegressor(random_state=42, verbosity=0), {
        'regressor__n_estimators': [100],
        'regressor__learning_rate': [0.1],
        'regressor__max_depth': [3]
    })
]

In [53]:
for model_name, model, param_grid in models_info:
    run_manual(model_name, model)
    run_pipeline_no_grid(model_name, model)
    run_pipeline_grid(model_name, model, param_grid, n_jobs_val=1)
    run_pipeline_grid(model_name, model, param_grid, n_jobs_val=-1)

results_df = pd.DataFrame(results)
print("\n=== Summary Table ===")
print(results_df.to_string(index=False))


=== Summary Table ===
            Model                              Mode  Time (s)  MAE   MSE  RMSE   R²  MAPE (%)
Linear Regression                            Manual      0.16 4.45 81.60  9.03 0.20    450.08
Linear Regression          Pipeline (no GridSearch)      0.10 4.45 81.60  9.03 0.20    450.08
Linear Regression  Pipeline + GridSearch (n_jobs=1)      0.52 4.45 81.60  9.03 0.20    450.08
Linear Regression Pipeline + GridSearch (n_jobs=-1)      6.08 4.45 81.60  9.03 0.20    450.08
 Ridge Regression                            Manual      0.08 4.44 81.61  9.03 0.20    446.92
 Ridge Regression          Pipeline (no GridSearch)      0.08 4.44 81.61  9.03 0.20    446.92
 Ridge Regression  Pipeline + GridSearch (n_jobs=1)      1.76 4.41 81.87  9.05 0.20    430.96
 Ridge Regression Pipeline + GridSearch (n_jobs=-1)      3.37 4.41 81.87  9.05 0.20    430.96
    Random Forest                            Manual     85.99 3.81 63.42  7.96 0.38    183.56
    Random Forest          Pipeline (

Conclusoes do Modeling :))