In [None]:
from matplotlib import pyplot as plt
import seaborn as sns
import pandas as pd

import dask.dataframe as dd
import bigframes.pandas as bf
import pandas as pd
import numpy as np 
import matplotlib.pyplot as plt
import seaborn as sb 
from sklearn.model_selection import train_test_split
from google.colab import drive


from keras.callbacks import ModelCheckpoint
from keras.models import Sequential
from keras.layers import Dense, Activation, Flatten

from sklearn.metrics import confusion_matrix

# CDLE Project

# Context

MIMIC-III is a large, freely-available database comprising deidentified health-related data associated with over forty thousand patients who stayed in critical care units of the Beth Israel Deaconess Medical Center between 2001 and 2012. The database includes information such as demographics, vital sign measurements made at the bedside, laboratory test results, procedures, medications, caregiver notes, imaging reports, and mortality. 

The objectives for this project are:    
- Apply the use of large scale data processing libraries to analyze and process the data on the dataset.
- Develop a machine learning pipeline to predict lenght of stay.

In this notebook, we detail the steps taken to complete all the way from handling the compressed data to achieving a working pipeline and applying learning models to the data. 

# Data Exploration and Visualization

## Reading data directly from GCS bucket

loading GCP credentials and accessing the compressed file from the bucket. We start by using Pandas and reading only the first rows due to the dimensionality of the data, approximatedly 330 million entries (~32 GB)

In [None]:
drive.mount('/content/drive', force_remount = True)

Mounted at /content/drive


In [None]:
%cat drive/MyDrive/bigdata-415811-b29e8958abf8.json

{
  "type": "service_account",
  "project_id": "bigdata-415811",
  "private_key_id": "b29e8958abf877e3c8abe2f0242935856ed06069",
  "private_key": "-----BEGIN PRIVATE KEY-----\nMIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQCqAGrp24hZ3qSx\n3aXG5knn1ZP9y1KA6tt4aZw8aTgvJiix6ST1ZOVieQSO0Mb2b2Zt46zHUXwl0Ptr\nV/dDTac2jO4xGbEVRn1fG4TqqY7Hdpr0E71sWoz6lPaMVUlbTwpDFQZg7biAGZjd\nWXv+ipjz1m/IisPgFP8V9cPMWp2R/tPBjgHlH6KdxSzg95hri4JHn/a2aftyRSGU\npwoaSr35VbLsKzlC/uRQ2eJVGlm5648M583pFN3jU8tjcwZJTd55HgCt23JhpvYV\nnYo1zWknTi7PaH/YY287OduzwTiL3NFLXwJPaWhHP+o5kdvn/uOWu2dI7xmuY4X2\njc7PPuXJAgMBAAECggEAEMXjBpERlE1gakisSDI1QUWcOQaQLLZPYmNoJfk7MQK2\nsewHCJu0eqlmStl8QfkGjgnQRKesZDZI8iO/GpG9MWeJAkjuxQK8EK/QRs9lLhas\nje+rKTdLcNdU9o5dIyBKqsj2pMZbpW7xG0jOkDQYSHmkNJEFZ7TmoKHrmDxqDssA\nqlcmAwn0KxoD23aLN5effq1/E+IJ83BoEu+6rdcNt+8CVObSa0iSWRs68q3/kvGC\nIk4/ChT6bcTbK0kcyEnZHffflMly9UNn7FM1P5M0kI+IR3jzo+b8kXlLuwwZ8tHv\ne2j1pfJm1QwN7wkF+0HPHmomkdS9Z5C/re7IMpXQQQKBgQDSkzUaqmLNrWUeVQ2k\nRKEwIBNZJaZhE7t6mgm0yFW/feHUUUAJ

In [None]:
%cp drive/MyDrive/bigdata-415811-b29e8958abf8.json .

In [None]:
import os
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "bigdata-415811-b29e8958abf8.json"


In [None]:
df = pd.read_csv('gs://cdle2324/MIMIC-III-DATABASE/CHARTEVENTS.csv.gz', nrows = 10)

In [None]:
df.describe()

Unnamed: 0,ROW_ID,SUBJECT_ID,HADM_ID,ICUSTAY_ID,ITEMID,CGID,VALUE,VALUENUM,WARNING,ERROR,RESULTSTATUS,STOPPED
count,10.0,10.0,10.0,10.0,10.0,10.0,10.0,10.0,10.0,10.0,0.0,0.0
mean,792.5,36.0,165660.0,241249.0,223887.1,19180.6,19.398,19.398,0.1,0.0,,
std,3.02765,0.0,0.0,0.0,1317.058883,1745.266564,33.237855,33.237855,0.316228,0.0,,
min,788.0,36.0,165660.0,241249.0,220224.0,17525.0,0.0,0.0,0.0,0.0,,
25%,790.25,36.0,165660.0,241249.0,223958.25,17525.0,1.4575,1.4575,0.0,0.0,,
50%,792.5,36.0,165660.0,241249.0,224329.5,19174.0,4.5,4.5,0.0,0.0,,
75%,794.75,36.0,165660.0,241249.0,224331.75,20823.0,13.25,13.25,0.0,0.0,,
max,797.0,36.0,165660.0,241249.0,224665.0,20889.0,100.0,100.0,1.0,0.0,,


In [None]:
df.head()

Unnamed: 0,ROW_ID,SUBJECT_ID,HADM_ID,ICUSTAY_ID,ITEMID,CHARTTIME,STORETIME,CGID,VALUE,VALUENUM,VALUEUOM,WARNING,ERROR,RESULTSTATUS,STOPPED
0,788,36,165660,241249,223834,2134-05-12 12:00:00,2134-05-12 13:56:00,17525,15.0,15.0,L/min,0,0,,
1,789,36,165660,241249,223835,2134-05-12 12:00:00,2134-05-12 13:56:00,17525,100.0,100.0,,0,0,,
2,790,36,165660,241249,224328,2134-05-12 12:00:00,2134-05-12 12:18:00,20823,0.37,0.37,,0,0,,
3,791,36,165660,241249,224329,2134-05-12 12:00:00,2134-05-12 12:19:00,20823,6.0,6.0,min,0,0,,
4,792,36,165660,241249,224330,2134-05-12 12:00:00,2134-05-12 12:19:00,20823,2.5,2.5,,0,0,,


given that the challenge lies in the dimension of the data, our goal became to find strategies to reduce or treat the data while attempting to avoid using too many resources when it comes to memory and processing power.

As discussed earlier, there are many libraries and techniques that can be used to handle data in this large scale. We attempted 2 approaches:
- **BigFrames**: used to access data from BigQuery, allowing to turn your dataset into a table which you can query
- **Dask**: libray used to manage and manipulate data while conserving memory by not having to load the data into memory immediately

## Using BigFrames

An alternative approach was to utilize the BigQuery functionalities from GCP. 

Using Cloud Compute, we downloaded, decompressed and uploaded the data to our own bucket through the command line using gsutil:

```gsutil cat gs://originbucket/file.csv.gz | zcat | gsutil cp - gs://destinationbucket/file.csv```

We then converted the tabular bucket data into a Big Query database, done through the GCS console. With this, we are able to query entries from the database instead of processing the data locally:

In [None]:
bf.options.bigquery.location = "us"
bf.options.bigquery.project = "dsls-415811"

df = bf.read_gbq("dsls-415811.datasetcdle.chartevents")

df.head()

Querying does take some time, but it allows us to process information without using resources from our own local machine. 

Unfortunately, querying is also a expensive service and GCS can block us if we abuse its use. To solve this, we can add some filters in the query, that are reflected on the code as normal pandas functions, to trim the data down to the content we will use.

In order to predict the lenght of stay, we began by dropping empty/irrelevant columns, and by checking the top 15 most items per stay on the database. This will allow us to remove every entry that is relatively useless to the prediction, as we want to generalize to the most common tests done to every patient.

In [None]:
columns_to_drop = ["ROW_ID", "SUBJECT_ID", "STORETIME", "WARNING", "ERROR", "RESULTSTATUS", "STOPPED"]

df = df.drop(columns=columns_to_drop)

top_10 = df["ITEMID"].value_counts().head(15).to_pandas().index # these entries come in the ITEM_XXXX, so we just removed that and made a new list
bottom_10 = df["ITEMID"].value_counts().tail(10).index

item_ids_to_keep = [211, 742, 646, 618, 212, 161, 128, 550, 1125, 220045, 8220, 8225, 8268, 223059, 224726, 225657, 226741, 227039, 227122, 228181]

df = df[df['ITEMID'].isin(item_ids_to_keep)]

## Using Dask

In [None]:
df = dd.read_csv("CHARTEVENTS.csv")

columns_to_drop = ["ROW_ID", "SUBJECT_ID", "STORETIME", "WARNING", "ERROR", "RESULTSTATUS", "STOPPED"]

df = df.drop(columns=columns_to_drop)

top_15 = df["ITEMID"].value_counts().head(15).to_pandas().index

item_ids_to_keep = [211, 742, 646, 618, 212, 161, 128, 550, 1125, 220045, 220210]

df = df[df['ITEMID'].isin(item_ids_to_keep)]

df.to_csv("processed_data.csv", index=False, single_file=True)

print(df.head().compute())

## Pre-Processing

With this, dask processes the entire file without loading it all to memory. It is significantly faster than bigframes as it doesn't have to query anything, but it does take some local resources and we must have the entire CSV file locally.

Now, we must process the remaining entries into a suitable data to train, we will do the following:
1. Group the dataset by ICU stay, as multiple tests of the same thing are done per stay, we always preserve the first value so each entry represents the earliest instance of each test (tests done later will compromise the  significancy of the predictions).
2. Calculate the stay time by doing the difference between the registered time of the last one with the first instance of time of that sta, in seconds.
3. If there isn't any value for an entry, make it nan.

In [None]:
rows = []

for idx, hadm_id in enumerate(df["HADM_ID"].unique(), 1):
    grouped_df = df[df["HADM_ID"] == hadm_id]

    grouped_df_pandas = grouped_df.to_pandas()

    # Calculate ICU stay time (values are in the timestamp datatype)
    icu_stay_time = (grouped_df_pandas["CHARTTIME"].max() - grouped_df_pandas["CHARTTIME"].min()).total_seconds() / 60

    # Create a dictionary to store the row data
    row = {"HADM_ID": hadm_id, "ICU_stay_time": icu_stay_time}

    item_counts = grouped_df_pandas["ITEMID"].value_counts()

    for item in top_15:
        row[f"ITEMID_{item}"] = item_counts.get(item, float('nan'))

    rows.append(row)

data = pd.DataFrame(rows)

With the data processed, we can move into applying learning models, on the next notebook.