## Setup Spark Enviornment

In [0]:
# setup spark enviornment on google colab
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www-eu.apache.org/dist/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz
!tar xf spark-2.4.4-bin-hadoop2.7.tgz
!pip install -q findspark

## Load Spark Session

In [0]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.4-bin-hadoop2.7"

In [0]:
import findspark
findspark.init("spark-2.4.4-bin-hadoop2.7")  # SPARK_HOME
#from pyspark.sql import SparkSession
from pyspark.sql import *
from pyspark.sql.functions import col, asc, desc
import pyspark.sql.functions as F
import sys

In [0]:
# from pyspark import SparkContext
# sc = SparkContext("local", "BD4H")

In [0]:
# spark._conf.get('spark.driver.memory')
# spark.sparkContext.stop()
spark = SparkSession.builder.master("local[*]").getOrCreate()

spark.sparkContext._conf.getAll()

[('spark.app.id', 'local-1575620858464'),
 ('spark.driver.host', 'e2f181e313d4'),
 ('spark.rdd.compress', 'True'),
 ('spark.driver.port', '38743'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.master', 'local[*]'),
 ('spark.executor.id', 'driver'),
 ('spark.submit.deployMode', 'client'),
 ('spark.ui.showConsoleProgress', 'true'),
 ('spark.app.name', 'pyspark-shell')]

In [0]:
# config sparksession

# conf = spark.sparkContext._conf.setAll([('spark.executor.memory', '16g'),
#                                         ('spark.app.name', 'Spark Updated Conf'),
#                                         ('spark.executor.cores', '4'),
#                                         ('spark.cores.max', '4'),
#                                         ('spark.driver.memory','16g')])
# spark.sparkContext.stop()
# spark = SparkSession.builder.master("local[*]").config(conf=conf).getOrCreate()

## Load data from google drive

In [0]:
# connect google drive
from google.colab import drive
drive.mount('/content/drive', force_remount=True)

Go to this URL in a browser: https://accounts.google.com/o/oauth2/auth?client_id=947318989803-6bn6qk8qdgf4n4g3pfee6491hc0brc4i.apps.googleusercontent.com&redirect_uri=urn%3aietf%3awg%3aoauth%3a2.0%3aoob&response_type=code&scope=email%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdocs.test%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive.photos.readonly%20https%3a%2f%2fwww.googleapis.com%2fauth%2fpeopleapi.readonly

Enter your authorization code:
··········
Mounted at /content/drive


In [0]:
# copy data from google drive to colab
cp -rvf drive/My\ Drive/mimic-iii-clinical-database-1.4 .

'drive/My Drive/mimic-iii-clinical-database-1.4' -> './mimic-iii-clinical-database-1.4'
'drive/My Drive/mimic-iii-clinical-database-1.4/.DS_Store' -> './mimic-iii-clinical-database-1.4/.DS_Store'
'drive/My Drive/mimic-iii-clinical-database-1.4/CAREGIVERS.csv' -> './mimic-iii-clinical-database-1.4/CAREGIVERS.csv'
'drive/My Drive/mimic-iii-clinical-database-1.4/CALLOUT.csv' -> './mimic-iii-clinical-database-1.4/CALLOUT.csv'
'drive/My Drive/mimic-iii-clinical-database-1.4/ADMISSIONS.csv' -> './mimic-iii-clinical-database-1.4/ADMISSIONS.csv'
'drive/My Drive/mimic-iii-clinical-database-1.4/NOTEEVENTS.csv' -> './mimic-iii-clinical-database-1.4/NOTEEVENTS.csv'


# **Data Preparation**

### Load admission table

In [0]:
# import org.apache.spark.sql.functions._
# import sqlContext.implicits._

admission_file_path = './mimic-iii-clinical-database-1.4/ADMISSIONS.csv'
df_admission = spark.read.csv(admission_file_path, inferSchema=True, header=True)

In [0]:
df_admission.printSchema()

root
 |-- ROW_ID: integer (nullable = true)
 |-- SUBJECT_ID: integer (nullable = true)
 |-- HADM_ID: integer (nullable = true)
 |-- ADMITTIME: timestamp (nullable = true)
 |-- DISCHTIME: timestamp (nullable = true)
 |-- DEATHTIME: timestamp (nullable = true)
 |-- ADMISSION_TYPE: string (nullable = true)
 |-- ADMISSION_LOCATION: string (nullable = true)
 |-- DISCHARGE_LOCATION: string (nullable = true)
 |-- INSURANCE: string (nullable = true)
 |-- LANGUAGE: string (nullable = true)
 |-- RELIGION: string (nullable = true)
 |-- MARITAL_STATUS: string (nullable = true)
 |-- ETHNICITY: string (nullable = true)
 |-- EDREGTIME: timestamp (nullable = true)
 |-- EDOUTTIME: timestamp (nullable = true)
 |-- DIAGNOSIS: string (nullable = true)
 |-- HOSPITAL_EXPIRE_FLAG: integer (nullable = true)
 |-- HAS_CHARTEVENTS_DATA: integer (nullable = true)



In [0]:
df_admission.show(10)

+------+----------+-------+-------------------+-------------------+-------------------+--------------+--------------------+--------------------+---------+--------+-----------------+--------------+--------------------+-------------------+-------------------+--------------------+--------------------+--------------------+
|ROW_ID|SUBJECT_ID|HADM_ID|          ADMITTIME|          DISCHTIME|          DEATHTIME|ADMISSION_TYPE|  ADMISSION_LOCATION|  DISCHARGE_LOCATION|INSURANCE|LANGUAGE|         RELIGION|MARITAL_STATUS|           ETHNICITY|          EDREGTIME|          EDOUTTIME|           DIAGNOSIS|HOSPITAL_EXPIRE_FLAG|HAS_CHARTEVENTS_DATA|
+------+----------+-------+-------------------+-------------------+-------------------+--------------+--------------------+--------------------+---------+--------+-----------------+--------------+--------------------+-------------------+-------------------+--------------------+--------------------+--------------------+
|    21|        22| 165315|2196-04-09

In [0]:
selected_columns = ('SUBJECT_ID', 'HADM_ID', 'ADMITTIME', 'DISCHTIME', 'DEATHTIME', 'ADMISSION_TYPE')
df_admission_selected = df_admission.select(*selected_columns)
df_admission_selected.show(10)

+----------+-------+-------------------+-------------------+-------------------+--------------+
|SUBJECT_ID|HADM_ID|          ADMITTIME|          DISCHTIME|          DEATHTIME|ADMISSION_TYPE|
+----------+-------+-------------------+-------------------+-------------------+--------------+
|        22| 165315|2196-04-09 12:26:00|2196-04-10 15:54:00|               null|     EMERGENCY|
|        23| 152223|2153-09-03 07:15:00|2153-09-08 19:10:00|               null|      ELECTIVE|
|        23| 124321|2157-10-18 19:34:00|2157-10-25 14:00:00|               null|     EMERGENCY|
|        24| 161859|2139-06-06 16:14:00|2139-06-09 12:48:00|               null|     EMERGENCY|
|        25| 129635|2160-11-02 02:06:00|2160-11-05 14:55:00|               null|     EMERGENCY|
|        26| 197661|2126-05-06 15:16:00|2126-05-13 15:00:00|               null|     EMERGENCY|
|        27| 134931|2191-11-30 22:16:00|2191-12-03 14:45:00|               null|       NEWBORN|
|        28| 162569|2177-09-01 07:15:00|

In [0]:
# sort by subject_ID and admission date
df_admission_sorted = df_admission_selected.sort(asc('SUBJECT_ID'), asc('ADMITTIME'))
df_admission_sorted.show(10)

+----------+-------+-------------------+-------------------+-------------------+--------------+
|SUBJECT_ID|HADM_ID|          ADMITTIME|          DISCHTIME|          DEATHTIME|ADMISSION_TYPE|
+----------+-------+-------------------+-------------------+-------------------+--------------+
|         2| 163353|2138-07-17 19:04:00|2138-07-21 15:48:00|               null|       NEWBORN|
|         3| 145834|2101-10-20 19:08:00|2101-10-31 13:58:00|               null|     EMERGENCY|
|         4| 185777|2191-03-16 00:28:00|2191-03-23 18:41:00|               null|     EMERGENCY|
|         5| 178980|2103-02-02 04:31:00|2103-02-04 12:15:00|               null|       NEWBORN|
|         6| 107064|2175-05-30 07:15:00|2175-06-15 16:00:00|               null|      ELECTIVE|
|         7| 118037|2121-05-23 15:05:00|2121-05-27 11:57:00|               null|       NEWBORN|
|         8| 159514|2117-11-20 10:22:00|2117-11-24 14:20:00|               null|       NEWBORN|
|         9| 150750|2149-11-09 13:06:00|

In [0]:
df_admission_sorted.createOrReplaceTempView("admission_sorted")
df_admission_row_num = spark.sql("""
    select *, row_number() over(partition by SUBJECT_ID order by ADMITTIME) as row_num from admission_sorted
""")

df_admission_row_num.createOrReplaceTempView("admission_row_num")

In [0]:
df_admission_shifted = spark.sql(""" 
select SUBJECT_ID,
    HADM_ID,
    DISCHTIME,
    DEATHTIME,
    ADMITTIME,
    ADMISSION_TYPE,
    case when NEXT_ADMISSION_TYPE IS NULL then NULL else NEXT_ADMITTIME end as NEXT_ADMITTIME,
    NEXT_ADMISSION_TYPE
from(
select t1.*, 
       t2.ADMITTIME AS NEXT_ADMITTIME, 
       CASE WHEN t2.ADMISSION_TYPE='ELECTIVE' THEN NULL else t2.ADMISSION_TYPE end as NEXT_ADMISSION_TYPE 
from admission_row_num t1 left join admission_row_num t2 on t1.SUBJECT_ID = t2.SUBJECT_ID AND t2.row_num -1 = t1.row_num) subt
""")

In [0]:
df_admission_shifted.head()

Row(SUBJECT_ID=148, HADM_ID=199488, DISCHTIME=datetime.datetime(2107, 10, 8, 13, 35), DEATHTIME=None, ADMITTIME=datetime.datetime(2107, 9, 5, 14, 58), ADMISSION_TYPE='EMERGENCY', NEXT_ADMITTIME=None, NEXT_ADMISSION_TYPE=None)

In [0]:
# backfill data
def fill_backward(df, id_column, key_column, fill_column):

    # Fill null's with last *non null* value in the window
    ff = df.withColumn(
        'fill_bwd',
        F.last(fill_column, True) # True: fill with last non-null
        .over(
            Window.partitionBy(id_column)
            .orderBy(desc(key_column))
            .rowsBetween(-sys.maxsize, 0))
        )

    # Drop the old column and rename the new column
    ff_out = ff.drop(fill_column).withColumnRenamed('fill_bwd', fill_column)

    return ff_out

In [0]:
df_admission_shifted = df_admission_shifted.sort(asc('SUBJECT_ID'), asc('ADMITTIME'))
df_readmit = fill_backward(df_admission_shifted, "SUBJECT_ID", "ADMITTIME", "NEXT_ADMITTIME")
df_readmit = fill_backward(df_readmit, "SUBJECT_ID", "ADMITTIME", "NEXT_ADMISSION_TYPE")

In [0]:
df_readmit.printSchema()

root
 |-- SUBJECT_ID: integer (nullable = true)
 |-- HADM_ID: integer (nullable = true)
 |-- DISCHTIME: timestamp (nullable = true)
 |-- DEATHTIME: timestamp (nullable = true)
 |-- ADMITTIME: timestamp (nullable = true)
 |-- ADMISSION_TYPE: string (nullable = true)
 |-- NEXT_ADMITTIME: timestamp (nullable = true)
 |-- NEXT_ADMISSION_TYPE: string (nullable = true)



In [0]:
# res.show(100)
timeFmt = "yyyy-MM-dd'T'HH:mm:ss.SSS"
timeDiff = (F.unix_timestamp('NEXT_ADMITTIME', format=timeFmt)
            - F.unix_timestamp('ADMITTIME', format=timeFmt))

# res = res.withColumn("DAYS_NEXT_ADMIT", timeDiff / 86400)

df_readmit = df_readmit.withColumn("DAYS_NEXT_ADMIT", F.datediff('NEXT_ADMITTIME', 'ADMITTIME'))

In [0]:
df_readmit.show(10)

+----------+-------+-------------------+-------------------+-------------------+--------------+--------------+-------------------+---------------+
|SUBJECT_ID|HADM_ID|          DISCHTIME|          DEATHTIME|          ADMITTIME|ADMISSION_TYPE|NEXT_ADMITTIME|NEXT_ADMISSION_TYPE|DAYS_NEXT_ADMIT|
+----------+-------+-------------------+-------------------+-------------------+--------------+--------------+-------------------+---------------+
|       148| 199488|2107-10-08 13:35:00|               null|2107-09-05 14:58:00|     EMERGENCY|          null|               null|           null|
|       463| 197296|2198-10-09 16:41:00|               null|2198-10-05 16:43:00|     EMERGENCY|          null|               null|           null|
|       471| 135879|2122-07-30 17:50:00|2122-07-30 17:50:00|2122-07-22 14:04:00|     EMERGENCY|          null|               null|           null|
|       833| 179120|2137-05-27 14:19:00|               null|2137-05-23 04:46:00|       NEWBORN|          null|        

In [0]:
# res.show(100)
df_readmit[['DAYS_NEXT_ADMIT']].describe().show()

+-------+------------------+
|summary|   DAYS_NEXT_ADMIT|
+-------+------------------+
|  count|             11399|
|   mean|420.37599789455214|
| stddev| 638.6542232864784|
|    min|                 0|
|    max|              4121|
+-------+------------------+



In [0]:
# res.write.csv('processed_admit.csv')
# df_readmit.toPandas().to_csv('processed_admit_2.csv')

In [0]:
# load csv to spark dataframe, got some errors
# noteevents_file_loc = './mimic-iii-clinical-database-1.4/NOTEEVENTS.csv'
# df_noteevents = spark.read.csv(noteevents_file_loc, inferSchema=True, header=True, delimiter)

### Load notes table from pandas dataframe

In [0]:
import pandas as pd
from pyspark.sql.types import *

# use pandas to load data, got out of memory error when using spark
note_file_path = "./mimic-iii-clinical-database-1.4/NOTEEVENTS.csv"
df_notes = pd.read_csv(note_file_path)  # , dtype={'TEXT': str}
# df_notes['TEXT'] = df_notes.TEXT.values
df_notes.TEXT = df_notes.TEXT.fillna(' ')
df_notes.TEXT = df_notes.TEXT.str.replace('\n',' ')
df_notes.TEXT = df_notes.TEXT.str.replace('\r',' ')


  interactivity=interactivity, compiler=compiler, result=result)


In [0]:
# convert pandas df to spark df
# ref: https://stackoverflow.com/questions/37513355/converting-pandas-dataframe-into-spark-dataframe-error
# Auxiliar functions
def equivalent_type(f):
    if f == 'datetime64[ns]': return DateType()
    elif f == 'int64': return LongType()
    elif f == 'int32': return IntegerType()
    elif f == 'float64': return FloatType()
    else: return StringType()

def define_structure(string, format_type):
    try: 
        typo = equivalent_type(format_type)
    except: 
        typo = StringType()
    return StructField(string, typo)


# Given pandas dataframe, it will return a spark's dataframe.
def pandas_to_spark(pandas_df):
    columns = list(pandas_df.columns)
    types = list(pandas_df.dtypes)
    struct_list = []
    for column, typo in zip(columns, types): 
      struct_list.append(define_structure(column, typo))
    p_schema = StructType(struct_list)
    # print(p_schema)
    return spark.createDataFrame(pandas_df, p_schema)

In [0]:
# you may get OOM error, limit the number of rows convert to spark dataframe for demo purpose
df_notes_spark = pandas_to_spark(df_notes)
# df_notes_spark = pandas_to_spark(df_notes.head(10000))
df_notes_spark.show(10)

In [0]:
df_notes_spark_selected = df_notes_spark.filter("CATEGORY == 'Discharge summary'").groupBy('SUBJECT_ID','HADM_ID').agg(F.last("TEXT"))

# df_notes_spark_selected.groupBy('SUBJECT_ID','HADM_ID').agg(F.last("TEXT")).show()
df_notes_spark_selected = df_notes_spark_selected.withColumnRenamed("last(TEXT, false)", "TEXT")
df_notes_spark_selected.show()

### Merge admission and notes tables

In [0]:
# join two dataframes
df_readmit_notes_spark = df_readmit.join(df_notes_spark_selected, ['SUBJECT_ID','HADM_ID'], "left_outer")
df_readmit_notes_spark.show()

+----------+-------+-------------------+-------------------+-------------------+--------------+-------------------+-------------------+---------------+--------------------+
|SUBJECT_ID|HADM_ID|          DISCHTIME|          DEATHTIME|          ADMITTIME|ADMISSION_TYPE|     NEXT_ADMITTIME|NEXT_ADMISSION_TYPE|DAYS_NEXT_ADMIT|                TEXT|
+----------+-------+-------------------+-------------------+-------------------+--------------+-------------------+-------------------+---------------+--------------------+
|       168| 141436|2139-06-13 11:14:00|               null|2139-06-11 12:55:00|       NEWBORN|               null|               null|           null|                null|
|       305| 133059|2125-05-03 18:12:00|               null|2125-04-26 11:45:00|     EMERGENCY|2125-12-31 18:40:00|          EMERGENCY|            249|                null|
|       319| 124954|2156-08-16 17:52:00|               null|2156-08-12 14:01:00|     EMERGENCY|               null|               null|

In [0]:
# remove newborn type
df_readmit_notes_clean = df_readmit_notes_spark.filter("ADMISSION_TYPE != 'NEWBORN'")
df_readmit_notes_clean.show()

+----------+-------+-------------------+-------------------+-------------------+--------------+-------------------+-------------------+---------------+--------------------+
|SUBJECT_ID|HADM_ID|          DISCHTIME|          DEATHTIME|          ADMITTIME|ADMISSION_TYPE|     NEXT_ADMITTIME|NEXT_ADMISSION_TYPE|DAYS_NEXT_ADMIT|                TEXT|
+----------+-------+-------------------+-------------------+-------------------+--------------+-------------------+-------------------+---------------+--------------------+
|       305| 133059|2125-05-03 18:12:00|               null|2125-04-26 11:45:00|     EMERGENCY|2125-12-31 18:40:00|          EMERGENCY|            249|                null|
|       319| 124954|2156-08-16 17:52:00|               null|2156-08-12 14:01:00|     EMERGENCY|               null|               null|           null|                null|
|      1006| 108462|2159-08-29 14:43:00|2159-08-29 14:43:00|2159-08-20 22:15:00|     EMERGENCY|               null|               null|

In [0]:
df_readmit_notes_clean_label = df_readmit_notes_clean.withColumn('OUTPUT_LABEL', df_readmit_notes_clean.DAYS_NEXT_ADMIT<30)
df_readmit_notes_clean_label.show()

+----------+-------+-------------------+-------------------+-------------------+--------------+-------------------+-------------------+---------------+--------------------+------------+
|SUBJECT_ID|HADM_ID|          DISCHTIME|          DEATHTIME|          ADMITTIME|ADMISSION_TYPE|     NEXT_ADMITTIME|NEXT_ADMISSION_TYPE|DAYS_NEXT_ADMIT|                TEXT|OUTPUT_LABEL|
+----------+-------+-------------------+-------------------+-------------------+--------------+-------------------+-------------------+---------------+--------------------+------------+
|       305| 133059|2125-05-03 18:12:00|               null|2125-04-26 11:45:00|     EMERGENCY|2125-12-31 18:40:00|          EMERGENCY|            249|                null|       false|
|       319| 124954|2156-08-16 17:52:00|               null|2156-08-12 14:01:00|     EMERGENCY|               null|               null|           null|                null|        null|
|      1006| 108462|2159-08-29 14:43:00|2159-08-29 14:43:00|2159-08-20

### Generate training data (method 1 with pyspark)

In [0]:
# generate training/testing data
# shuffle the samples
df_readmit_notes_clean = df_readmit_notes_clean.toPandas().sample(n=len(df_readmit_notes_clean.toPandas()), random_state=1024).reset_index(drop=True)

# prepare validation and test data 
df_valid_test = df_readmit_notes_clean.sample(frac=0.30, random_state=1024)
df_test = df_valid_test.sample(frac=0.5, random_state=1024)
df_valid = df_valid_test.drop(df_test.index)

# prepare training data
df_train_all = df_readmit_notes_clean.drop(df_valid_test.index)

In [0]:
df_train_all.head()

Unnamed: 0,SUBJECT_ID,HADM_ID,DISCHTIME,DEATHTIME,ADMITTIME,ADMISSION_TYPE,NEXT_ADMITTIME,NEXT_ADMISSION_TYPE,DAYS_NEXT_ADMIT,TEXT
0,27087,159446,2143-05-13 15:03:00,NaT,2143-05-06 07:15:00,ELECTIVE,NaT,,,
1,20412,145783,2133-08-11 13:08:00,NaT,2133-08-06 16:45:00,EMERGENCY,NaT,,,
2,86018,114855,2198-03-03 17:52:00,NaT,2198-03-02 19:31:00,EMERGENCY,2198-04-09 17:35:00,EMERGENCY,38.0,
3,15967,171844,2108-09-16 14:30:00,NaT,2108-09-14 15:13:00,EMERGENCY,2111-08-14 14:51:00,EMERGENCY,1064.0,
4,78509,129981,2134-02-04 13:43:00,NaT,2134-01-29 07:15:00,ELECTIVE,NaT,,,


### Generate training data (method 2 with pandas)

In [0]:
## use pandas to generate data

# merge df_readmit and df_notes
df_notes_selected = df_notes.loc[df_notes.CATEGORY == 'Discharge summary']

# choose the last note summary
df_notes_selected = (df_notes_selected.groupby(['SUBJECT_ID','HADM_ID']).nth(-1)).reset_index()

df_readmit_notes = pd.merge(df_readmit.toPandas()[['SUBJECT_ID','HADM_ID','ADMITTIME','DISCHTIME','DAYS_NEXT_ADMIT',\
                                'NEXT_ADMITTIME','ADMISSION_TYPE','DEATHTIME']],
                        df_notes_selected[['SUBJECT_ID','HADM_ID','TEXT']], 
                        on = ['SUBJECT_ID','HADM_ID'],
                        how = 'left')
# assert len(df_readmit.toPandas()) == len(df_adm_notes), 'Number of rows increased'

df_readmit_notes.groupby('ADMISSION_TYPE').apply(
    lambda g: g.TEXT.isnull().sum())/df_readmit_notes.groupby('ADMISSION_TYPE').size()

# remove newborn type
df_readmit_notes_clean = df_readmit_notes.loc[df_readmit_notes.ADMISSION_TYPE != 'NEWBORN'].copy()

# consider 30 re-admission
df_readmit_notes_clean['OUTPUT_LABEL'] = (df_readmit_notes_clean.DAYS_NEXT_ADMIT < 30).astype('int')
print(df_readmit_notes_clean[['OUTPUT_LABEL']].sum())

In [0]:
# generate training/testing data
# shuffle the samples
df_readmit_notes_clean = df_readmit_notes_clean.sample(n=len(df_readmit_notes_clean), random_state=1024).reset_index(drop=True)

# prepare validation and test data 
df_valid_test = df_readmit_notes_clean.sample(frac=0.30, random_state=1024)
df_test = df_valid_test.sample(frac=0.5, random_state=1024)
df_valid = df_valid_test.drop(df_test.index)

# prepare training data
df_train_all = df_readmit_notes_clean.drop(df_valid_test.index)