## Setup

In [1]:
from pyspark.sql import SparkSession

Spark can be configured with lots of settings.  
In particular we highlight the "local[*]" which configures spark to run in standalone mode, so everything is running in the local machine, using all available threads.

In [2]:
spark = SparkSession.builder \
    .appName("Setup") \
    .master("local[*]") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "4g") \
    .config("spark.driver.host","127.0.0.1") \
    .config("spark.driver.bindAddress","127.0.0.1") \
    .getOrCreate()
sc = spark.sparkContext

Since the dataset is too large (4.2 Gigabytes compressed), we load it into Spark. However, it is inefficient to process gzip-compressed CSV files directly with Spark due to their non-splittable nature, and using an unziped CSV file is not always splittable. As shown in the code below, the CSV file took over 7 minutes to run, which is a considerable time.

```spark.read.csv``` evaluates lazily, which means that the read only occurs when an action is called.

In [4]:
file_path = "dataset/CHARTEVENTS.csv"

df = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .option("delimiter", ",") \
    .load(file_path)

df.printSchema()
df.show(5)

24/05/11 15:48:21 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors

root
 |-- ROW_ID: integer (nullable = true)
 |-- SUBJECT_ID: integer (nullable = true)
 |-- HADM_ID: integer (nullable = true)
 |-- ICUSTAY_ID: integer (nullable = true)
 |-- ITEMID: integer (nullable = true)
 |-- CHARTTIME: timestamp (nullable = true)
 |-- STORETIME: timestamp (nullable = true)
 |-- CGID: integer (nullable = true)
 |-- VALUE: string (nullable = true)
 |-- VALUENUM: double (nullable = true)
 |-- VALUEUOM: string (nullable = true)
 |-- ERROR: integer (nullable = true)
 |-- RESULTSTATUS: string (nullable = true)
 |-- STOPPED: string (nullable = true)

+------+----------+-------+----------+------+-------------------+-------------------+-----+-----+--------+--------+-------+-----+------------+-------+
+------+----------+-------+----------+------+-------------------+-------------------+-----+-----+--------+--------+-------+-----+------------+-------+
|   788|        36| 165660|    241249|223834|2134-05-12 12:00:00|2134-05-12 13:56:00|17525|   15|    15.0|   L/min|      0|  

                                                                                

To improve performance, we load the dataset into Spark using the Parquet file format with Snappy compression, ensuring splittable and efficient parallel processing across multiple nodes in the cluster.

In [None]:
parquet_file_path = "dataset/CHARTEVENTS.parquet"

df.write.format("parquet") \
    .option("compression", "snappy") \
    .save(parquet_file_path)

#spark.stop()

Now it's instant!?  
Actually for this format, the data is read lazily and so is only read when required.  
Parquet is columnar storage, which means it organizes data by columns rather than by rows, and this difference affects how data is displayed.

Now we are going to repeat the process for "ICUSTAYS" and "D_ICD_DIAGNOSES"

In [15]:
icu_file_path = "dataset/ICUSTAYS.csv"

df_icustays = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .option("delimiter", ",") \
    .load(icu_file_path)

df_icustays.printSchema()
df_icustays.show(5)

root
 |-- ROW_ID: integer (nullable = true)
 |-- SUBJECT_ID: integer (nullable = true)
 |-- HADM_ID: integer (nullable = true)
 |-- ICUSTAY_ID: integer (nullable = true)
 |-- DBSOURCE: string (nullable = true)
 |-- FIRST_CAREUNIT: string (nullable = true)
 |-- LAST_CAREUNIT: string (nullable = true)
 |-- FIRST_WARDID: integer (nullable = true)
 |-- LAST_WARDID: integer (nullable = true)
 |-- INTIME: timestamp (nullable = true)
 |-- OUTTIME: timestamp (nullable = true)
 |-- LOS: double (nullable = true)

+------+----------+-------+----------+--------+--------------+-------------+------------+-----------+-------------------+-------------------+------+
|ROW_ID|SUBJECT_ID|HADM_ID|ICUSTAY_ID|DBSOURCE|FIRST_CAREUNIT|LAST_CAREUNIT|FIRST_WARDID|LAST_WARDID|             INTIME|            OUTTIME|   LOS|
+------+----------+-------+----------+--------+--------------+-------------+------------+-----------+-------------------+-------------------+------+
|   365|       268| 110404|    280836| carev

In [17]:
icu_parquet_file_path = "dataset/ICUSTAYS.parquet"

df_icustays.write.format("parquet") \
    .option("compression", "snappy") \
    .save(icu_parquet_file_path)

In [11]:
d_icd_diagnoses_file_path = "dataset/D_ICD_DIAGNOSES.csv"

df_d_icd_diagnoses = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .option("delimiter", ",") \
    .load(d_icd_diagnoses_file_path)

df_d_icd_diagnoses.printSchema()
df_d_icd_diagnoses.show(5)

root
 |-- ROW_ID: integer (nullable = true)
 |-- ICD9_CODE: string (nullable = true)
 |-- SHORT_TITLE: string (nullable = true)
 |-- LONG_TITLE: string (nullable = true)

+------+---------+--------------------+--------------------+
|ROW_ID|ICD9_CODE|         SHORT_TITLE|          LONG_TITLE|
+------+---------+--------------------+--------------------+
|   174|    01166|TB pneumonia-oth ...|Tuberculous pneum...|
|   175|    01170|TB pneumothorax-u...|Tuberculous pneum...|
|   176|    01171|TB pneumothorax-n...|Tuberculous pneum...|
|   177|    01172|TB pneumothorx-ex...|Tuberculous pneum...|
|   178|    01173|TB pneumothorax-m...|Tuberculous pneum...|
+------+---------+--------------------+--------------------+
only showing top 5 rows



In [12]:
d_icd_diagnoses_parquet_file_path = "dataset/D_ICD_DIAGNOSES.parquet"

df_d_icd_diagnoses.write.format("parquet") \
    .option("compression", "snappy") \
    .save(d_icd_diagnoses_parquet_file_path)

In [3]:
file_path = "dataset/PATIENTS.csv"

df_patients = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .option("delimiter", ",") \
    .load(file_path)

df_patients.printSchema()
df_patients.show(5)

root
 |-- ROW_ID: integer (nullable = true)
 |-- SUBJECT_ID: integer (nullable = true)
 |-- GENDER: string (nullable = true)
 |-- DOB: timestamp (nullable = true)
 |-- DOD: timestamp (nullable = true)
 |-- DOD_HOSP: timestamp (nullable = true)
 |-- DOD_SSN: timestamp (nullable = true)
 |-- EXPIRE_FLAG: integer (nullable = true)

+------+----------+------+-------------------+-------------------+-------------------+-------+-----------+
|ROW_ID|SUBJECT_ID|GENDER|                DOB|                DOD|           DOD_HOSP|DOD_SSN|EXPIRE_FLAG|
+------+----------+------+-------------------+-------------------+-------------------+-------+-----------+
|   234|       249|     F|2075-03-13 00:00:00|               NULL|               NULL|   NULL|          0|
|   235|       250|     F|2164-12-27 00:00:00|2188-11-22 00:00:00|2188-11-22 00:00:00|   NULL|          1|
|   236|       251|     M|2090-03-15 00:00:00|               NULL|               NULL|   NULL|          0|
|   237|       252|     M|2

In [6]:
patients_file_path = "dataset/PATIENTS.parquet"

spark.conf.set("spark.sql.parquet.int96RebaseModeInWrite", "CORRECTED")

df_patients.write.format("parquet") \
    .option("compression", "snappy") \
    .save(patients_file_path)

In [9]:
spark.stop()