---
<div align="center">

# LSDS | Machine Learning Pipeline
</div>

---

> ADD PROJECT OVERVIEW

---
## Project Dependencies
---

In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
# Remove Warnings
import warnings
warnings.filterwarnings('ignore')
from time import (time)

In [3]:
# Get the starting time before importing the modules
startTime = time()

# Import modules
import numpy as np
import pandas as pd
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.regression import LinearRegression, RandomForestRegressor
from pyspark.sql.types import ArrayType, StringType, BooleanType
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pprint import pprint

# Custom Imports
from Utils import (loadConfig, loadPathsConfig)
from DataPreprocessing.DatasetManager import (DatasetManager)

# Print the amount of time it took to import everything
print(f"Took{(time() - startTime) : .3e}(s) to import all the modules!")

Took 1.169e+00(s) to import all the modules!


In [4]:
# Load the configs
config = loadConfig()
pathsConfig = loadPathsConfig()

In [5]:
# Print the configs
pprint(config)
pprint(pathsConfig)

{'seed': 14, 'targetFeature': 'LOS'}
{'Datasets': {'ADMISSIONS': './Datasets/ADMISSIONS.csv',
              'CHARTEVENTS': './Datasets/CHARTEVENTS.csv',
              'DIAGNOSIS_ICD': './Datasets/DIAGNOSES_ICD.csv',
              'ICUSTAYS': './Datasets/ICUSTAYS.csv',
              'PATIENTS': './Datasets/PATIENTS.csv'}}


In [6]:
# Create a Apache Spark Session
spark = SparkSession.builder \
    .appName("Intensive Care Unit Data Analysis") \
    .config("spark.driver.memory", "6g") \
    .config("spark.executor.memory", "6g") \
    .config("spark.memory.fraction", "0.9") \
    .config("spark.memory.storageFraction", "0.5") \
    .getOrCreate()

# check spark configs to only errors:

spark.sparkContext.setLogLevel("ERROR")
spark.conf.set("spark.sql.shuffle.partitions", "500")
spark.conf.set("spark.sql.debug.maxToStringFields", "1000")

In [11]:
# Create a dataset manager
dsManager = DatasetManager(sparkSession=spark, config=config, pathsConfig=pathsConfig)

In [12]:
dsManager.loadAllDataFrames()

In [13]:
dsManager.dataframes
# type(dsManager.dataframes['ADMISSIONS'])

{'ADMISSIONS': DataFrame[SUBJECT_ID: int, HADM_ID: int, ADMITTIME: timestamp, DISCHTIME: timestamp, DEATHTIME: timestamp, ADMISSION_TYPE: string, ADMISSION_LOCATION: string, DISCHARGE_LOCATION: string, INSURANCE: string, LANGUAGE: string, RELIGION: string, MARITAL_STATUS: string, ETHNICITY: string, EDREGTIME: timestamp, EDOUTTIME: timestamp, DIAGNOSIS: string, HOSPITAL_EXPIRE_FLAG: int, HAS_CHARTEVENTS_DATA: int],
 'DIAGNOSIS_ICD': DataFrame[SUBJECT_ID: int, HADM_ID: int, SEQ_NUM: int, ICD9_CODE: string],
 'ICUSTAYS': DataFrame[SUBJECT_ID: int, HADM_ID: int, ICUSTAY_ID: int, DBSOURCE: string, FIRST_CAREUNIT: string, LAST_CAREUNIT: string, FIRST_WARDID: int, LAST_WARDID: int, INTIME: timestamp, OUTTIME: timestamp, LOS: double],
 'PATIENTS': DataFrame[SUBJECT_ID: int, GENDER: string, DOB: timestamp, DOD: timestamp, DOD_HOSP: timestamp, DOD_SSN: timestamp, EXPIRE_FLAG: int]}

In [36]:
# Testing the timit decorator
# result = dsManager.printConfig()
df = dsManager.loadDataFrame(filename='ADMISSIONS')

In [11]:
# %%timeit
admissions = dsManager.loadDataFrame(filename='ADMISSIONS')
print("\n", admissions.describe(), "\n")
admissions.limit(3).toPandas()


 DataFrame[summary: string, SUBJECT_ID: string, HADM_ID: string, ADMISSION_TYPE: string, ADMISSION_LOCATION: string, DISCHARGE_LOCATION: string, INSURANCE: string, LANGUAGE: string, RELIGION: string, MARITAL_STATUS: string, ETHNICITY: string, DIAGNOSIS: string, HOSPITAL_EXPIRE_FLAG: string, HAS_CHARTEVENTS_DATA: string] 



Unnamed: 0,SUBJECT_ID,HADM_ID,ADMITTIME,DISCHTIME,DEATHTIME,ADMISSION_TYPE,ADMISSION_LOCATION,DISCHARGE_LOCATION,INSURANCE,LANGUAGE,RELIGION,MARITAL_STATUS,ETHNICITY,EDREGTIME,EDOUTTIME,DIAGNOSIS,HOSPITAL_EXPIRE_FLAG,HAS_CHARTEVENTS_DATA
0,22,165315,2196-04-09 11:26:00,2196-04-10 14:54:00,NaT,EMERGENCY,EMERGENCY ROOM ADMIT,DISC-TRAN CANCER/CHLDRN H,Private,,UNOBTAINABLE,MARRIED,WHITE,2196-04-09 09:06:00,2196-04-09 12:24:00,BENZODIAZEPINE OVERDOSE,0,1
1,23,152223,2153-09-03 06:15:00,2153-09-08 18:10:00,NaT,ELECTIVE,PHYS REFERRAL/NORMAL DELI,HOME HEALTH CARE,Medicare,,CATHOLIC,MARRIED,WHITE,NaT,NaT,CORONARY ARTERY DISEASE\CORONARY ARTERY BYPASS...,0,1
2,23,124321,2157-10-18 18:34:00,2157-10-25 13:00:00,NaT,EMERGENCY,TRANSFER FROM HOSP/EXTRAM,HOME HEALTH CARE,Medicare,ENGL,CATHOLIC,MARRIED,WHITE,NaT,NaT,BRAIN MASS,0,1


In [17]:
# %%timeit
# chartEvents = dsManager.loadDataFrame(filename='CHARTEVENTS')
# print("\n", chartEvents.describe(), "\n")
# chartEvents.limit(3).toPandas()

In [14]:
# %%timeit
diagnosis = dsManager.loadDataFrame(filename='DIAGNOSIS_ICD')
print("\n", diagnosis.describe(), "\n")
diagnosis.limit(3).toPandas()


 DataFrame[summary: string, SUBJECT_ID: string, HADM_ID: string, SEQ_NUM: string, ICD9_CODE: string] 



Unnamed: 0,SUBJECT_ID,HADM_ID,SEQ_NUM,ICD9_CODE
0,109,172335,1,40301
1,109,172335,2,486
2,109,172335,3,58281


In [15]:
# %%timeit
icuStays = dsManager.loadDataFrame(filename='ICUSTAYS')
print("\n", icuStays.describe(), "\n")
icuStays.limit(3).toPandas()


 DataFrame[summary: string, SUBJECT_ID: string, HADM_ID: string, ICUSTAY_ID: string, DBSOURCE: string, FIRST_CAREUNIT: string, LAST_CAREUNIT: string, FIRST_WARDID: string, LAST_WARDID: string, LOS: string] 



Unnamed: 0,SUBJECT_ID,HADM_ID,ICUSTAY_ID,DBSOURCE,FIRST_CAREUNIT,LAST_CAREUNIT,FIRST_WARDID,LAST_WARDID,INTIME,OUTTIME,LOS
0,268,110404,280836,carevue,MICU,MICU,52,52,2198-02-14 23:27:38,2198-02-18 05:26:11,3.249
1,269,106296,206613,carevue,MICU,MICU,52,52,2170-11-05 11:05:29,2170-11-08 17:46:57,3.2788
2,270,188028,220345,carevue,CCU,CCU,57,57,2128-06-24 14:05:20,2128-06-27 11:32:29,2.8939


In [16]:
# %%timeit
patients = dsManager.loadDataFrame(filename='PATIENTS')
print("\n", patients.describe(), "\n")
patients.limit(3).toPandas()


 DataFrame[summary: string, SUBJECT_ID: string, GENDER: string, EXPIRE_FLAG: string] 



Unnamed: 0,SUBJECT_ID,GENDER,DOB,DOD,DOD_HOSP,DOD_SSN,EXPIRE_FLAG
0,249,F,2075-03-13,NaT,NaT,NaT,0
1,250,F,2164-12-27,2188-11-22,2188-11-22,NaT,1
2,251,M,2090-03-15,NaT,NaT,NaT,0


---
---
---

## Testing pyspark

In [4]:
spark = SparkSession.builder.getOrCreate()

In [5]:
# spark.read.csv('./Datasets/CHARTEVENTS.csv', header=True).show()
spark.read.csv('./Machine-Learning-Pipeline/Datasets/CHARTEVENTS.csv', header=True)



In [7]:
import pandas as pd
from pyspark.sql.functions import pandas_udf

@pandas_udf('long')
def pandas_plus_one(series: pd.Series) -> pd.Series:
    # Simply plus one by using pandas Series.
    return series + 1