In [1]:
import findspark
findspark.init('/home/ubuntu/spark-2.1.1-bin-hadoop2.7')
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder \
                    .appName('Iteration4.1')    \
                    .getOrCreate()
from pyspark.sql import Row
from pyspark.ml.linalg import Vectors
from pyspark.sql.types import StructType, IntegerType, StringType, StructField, BooleanType
from pyspark.sql.functions import *
from pyspark.sql import Window
from pyspark.sql.functions import last
import sys

In [2]:
df_1 = spark.read.load('./apacheApsVar.csv', format='csv', header='true', inferSchema=True)
df1_todrop = ['apacheapsvarid','creatinine','bun', 'glucose', 'bilirubin','meds','ph', 'hematocrit','urine', 'albumin', 
              'pao2', 'pco2', 'fio2', 'ph','meds','sodium']
df1 = df_1.drop(*df1_todrop)
df_2 = spark.read.load('./apachePatientResult.csv', format='csv', header='true', inferSchema=True)
df2_todrop = ['apachepatientresultsid','physicianspeciality','physicianinterventioncategory','acutephysiologyscore',
              'apachescore','apacheversion', 'predictedicumortality','predictediculos','predictedhospitalmortality',
              'predictedhospitallos','preopmi', 'preopcardiaccath', 'ptcawithin24h', 'unabridgedunitlos','unabridgedhosplos',
              'actualventdays', 'predventdays','unabridgedactualventdays']
df2 = df_2.drop(*df2_todrop)
df_3 = spark.read.load('./patient.csv', format='csv', header='true', inferSchema=True)
df3_todrop = ['patienthealthsystemstayid','hospitalid', 'wardid', 'apacheadmissiondx','hospitaladmittime24','hospitaldischargetime24',
              'hospitaladmitoffset','hospitaladmitsource', 'hospitaldischargeyear','hospitaldischargeoffset','hospitaldischargelocation', 
              'hospitaldischargestatus', 'unittype','unitadmittime24', 'unitadmitsource', 'unitvisitnumber', 'unitstaytype',
              'unitdischargetime24','unitdischargeoffset', 'unitdischargelocation', 'unitdischargestatus', 'uniquepid','dischargeweight']
df3 = df_3.drop(*df3_todrop)

In [3]:
df_filled1 = df1.withColumn('eyes', regexp_replace('eyes', '-1', '4'))
df_filled2 = df_filled1.withColumn('motor', regexp_replace('motor', '-1', '6'))
df_filled = df_filled2.withColumn('verbal', regexp_replace('verbal', '-1', '5'))

df_filled = df_filled.withColumn('GCS', col('motor')+ col('eyes')+ col('verbal'))

In [4]:
data_1 = df_filled.select('patientunitstayid',"eyes",'motor','verbal','intubated','vent','dialysis', 'GCS')

In [5]:
dfw =  df1.withColumn("wbc",
    when(col("wbc").isin('-1.0'),None).otherwise(col("wbc")))   
window = Window.orderBy('patientunitstayid')\
               .rowsBetween(-sys.maxsize, 0)\
                .partitionBy('patientunitstayid')
filled_column_wbc = last(dfw['wbc'], ignorenulls=True).over(window)
df_filled_wbc = dfw.withColumn('wbc',  filled_column_wbc)
wbc = df_filled_wbc.select('patientunitstayid','wbc')
wbc = wbc.where('wbc<=120').drop()

In [6]:
dft =  df1.withColumn("temperature",
    when(col("temperature").isin('-1.0'),None).otherwise(col("temperature")))
window = Window.orderBy('patientunitstayid')\
               .rowsBetween(-sys.maxsize, 0)\
                .partitionBy('patientunitstayid')
filled_column_temperature = last(dft['temperature'], ignorenulls=True).over(window)
df_filled_temperature = dft.withColumn('temperature',  filled_column_temperature)
temperature = df_filled_temperature.select('patientunitstayid','temperature')
temperature = temperature.where('temperature<=41.0').drop()
temperature = temperature.where('temperature>=30.0').drop()

In [7]:
dfr =  df1.withColumn("respiratoryrate",
    when(col("respiratoryrate").isin('-1.0'),None).otherwise(col("respiratoryrate")))
window = Window.orderBy('patientunitstayid')\
               .rowsBetween(-sys.maxsize, 0)\
                .partitionBy('patientunitstayid')
filled_column_respiratoryrate = last(dfr['respiratoryrate'], ignorenulls=True).over(window)
df1_filled_respiratoryrate = dfr.withColumn('respiratoryrate',  filled_column_respiratoryrate)
respiratoryrate = df1_filled_respiratoryrate.select('patientunitstayid','respiratoryrate')

In [8]:
dfh =  df1.withColumn("heartrate",
    when(col("heartrate").isin('-1'),None).otherwise(col("heartrate")))
window = Window.orderBy('patientunitstayid')\
               .rowsBetween(-sys.maxsize, 0)\
               .partitionBy('patientunitstayid')
filled_column_heartrate = last(dfh['heartrate'], ignorenulls=True).over(window)
df1_filled_heartrate = dfh.withColumn('heartrate',  filled_column_heartrate)
heartrate = df1_filled_heartrate.select('patientunitstayid','heartrate')
heartrate = heartrate.where('heartrate<=200').drop()

In [9]:
dfm =  df1.withColumn("meanbp",
    when(col("meanbp").isin('-1.0'),None).otherwise(col("meanbp")))
window = Window.orderBy('patientunitstayid')\
               .rowsBetween(-sys.maxsize, 0)\
               .partitionBy('patientunitstayid')
filled_column_meanbp = last(dfm['meanbp'], ignorenulls=True).over(window)
df1_filled_meanbp = dfm.withColumn('meanbp',  filled_column_meanbp)
meanbp = df1_filled_meanbp.select('patientunitstayid','meanbp')
meanbp = meanbp.where('meanbp<=200').drop()

In [10]:
data_1 = df_filled.select('patientunitstayid',"eyes",'motor','verbal','intubated','vent','dialysis', 'GCS')
data_1 = data_1.join(wbc,['patientunitstayid'])

In [11]:
data_1 = data_1.join(temperature,['patientunitstayid'])

In [12]:
data_1 = data_1.join(respiratoryrate,['patientunitstayid'])

In [13]:
data_1 = data_1.join(heartrate,['patientunitstayid'])

In [14]:
data_1 = data_1.join(meanbp,['patientunitstayid'])

In [15]:
df_2 = df2.select('patientunitstayid', 'actualicumortality', 'actualiculos')

In [16]:
data_1 = data_1.join(df_2,['patientunitstayid'])

In [17]:
age = df3.select('patientunitstayid','age')

In [18]:
age = age.withColumn('age', regexp_replace('age', '> 89', '90'))

In [19]:
age = age.na.drop(subset="age")

In [20]:
df3 =  df3.withColumn("gender",
    when(col("gender").isin('Other','Unknown'),None).otherwise(col("gender")))
window = Window.orderBy('patientunitstayid')\
               .rowsBetween(-sys.maxsize, 0)\
                .partitionBy('patientunitstayid')
filled_column_gender = last(df3['gender'], ignorenulls=True).over(window)
df3_filled_gender = df3.withColumn('gender',  filled_column_gender)
gender = df3_filled_gender.select('patientunitstayid','gender')

In [21]:
gender = gender.withColumn('gender', regexp_replace('gender', 'Male', '1'))
gender = gender.withColumn('gender', regexp_replace('gender', 'Female', '0'))

In [22]:
df3 =  df3.withColumn("admissionweight",
    when(col("admissionweight").isin('-1.0'),None).otherwise(col("admissionweight")))
window = Window.orderBy('patientunitstayid')\
               .rowsBetween(-sys.maxsize, 0)\
                .partitionBy('patientunitstayid')
filled_column_admissionweight = last(df3['admissionweight'], ignorenulls=True).over(window)
df3_filled_admissionweight = df3.withColumn('admissionweight',  filled_column_admissionweight)
admissionweight = df3_filled_admissionweight.select('patientunitstayid','admissionweight')

In [23]:
data_1 = data_1.join(age,['patientunitstayid'])

In [24]:
data_1 = data_1.join(gender,['patientunitstayid'])

In [25]:
data = data_1.join(admissionweight,['patientunitstayid'])

In [26]:
data = data.sampleBy('actualicumortality', fractions={'ALIVE': 0.000035, 'EXPIRED': 0.0005}, seed=1)

In [27]:
data = data.withColumn('actualicumortality', regexp_replace('actualicumortality', 'ALIVE', '1'))
data = data.withColumn('actualicumortality', regexp_replace('actualicumortality', 'EXPIRED', '0'))

In [28]:
data = data.withColumn('gender', regexp_replace('gender', 'MALE', '1'))
data = data.withColumn('gender', regexp_replace('gender', 'FEMALE', '0'))

In [29]:
data = data.withColumn(('patientunitstayid'),data['patientunitstayid'].cast(StringType()))

In [30]:
data = data.withColumn(('intubated'),data['intubated'].cast(BooleanType()))
data = data.withColumn(('dialysis'),data['dialysis'].cast(BooleanType()))
data = data.withColumn(('vent'),data['vent'].cast(BooleanType()))
data = data.withColumn(('age'),data['age'].cast(IntegerType()))
data = data.withColumn(('verbal'),data['verbal'].cast(IntegerType()))
data = data.withColumn(('eyes'),data['eyes'].cast(IntegerType()))
data = data.withColumn(('motor'),data['motor'].cast(IntegerType()))
data = data.withColumn(('gender'),data['gender'].cast(IntegerType()))
data = data.withColumn(('actualicumortality'),data['actualicumortality'].cast(IntegerType()))

In [31]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(
  inputCols=['intubated', 'vent', 'wbc', 'temperature', 'gender', 'respiratoryrate',
             'heartrate', 'meanbp', 'age',  'GCS', 'admissionweight'],
              outputCol="features")

In [32]:
output = assembler.transform(data)

In [33]:
final_data = output.select("features",'actualicumortality')

In [34]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
my_binary_eval = BinaryClassificationEvaluator(labelCol = 'actualicumortality')

In [35]:
train_data,test_data = final_data.randomSplit([0.7,0.3])

In [36]:
from pyspark.ml.classification import DecisionTreeClassifier, DecisionTreeClassificationModel
dtc = DecisionTreeClassifier(labelCol='actualicumortality',featuresCol='features', maxDepth=8, minInstancesPerNode=10,impurity='gini')
#rfc = RandomForestClassifier(labelCol='actualicumortality',featuresCol='features',maxDepth=10, minInstancesPerNode=10,impurity='entropy',numTrees=5)
#gbt = GBTClassifier(labelCol='actualicumortality',featuresCol='features',maxDepth=15, minInstancesPerNode=20)

In [None]:
dtc_model = dtc.fit(train_data)
#rfc_model = rfc.fit(train_data)
#gbt_model = gbt.fit(train_data)

In [None]:
dtc_predictions = dtc_model.transform(test_data)
#rfc_predictions = rfc_model.transform(test_data)
#gbt_predictions = gbt_model.transform(test_data)

In [None]:
print("dtc")
print(my_binary_eval.evaluate(dtc_predictions))
#print("rfc")
#print(my_binary_eval.evaluate(rfc_predictions))
#my_binary_gbt_eval = BinaryClassificationEvaluator(labelCol='actualicumortality', rawPredictionCol='prediction')
#print("GBT")
#print(my_binary_gbt_eval.evaluate(gbt_predictions))

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
acc_evaluator = MulticlassClassificationEvaluator(labelCol="actualicumortality", predictionCol="prediction", metricName="accuracy")
dtc_acc = acc_evaluator.evaluate(dtc_predictions)
#rfc_acc = acc_evaluator.evaluate(rfc_predictions)
#gbt_acc = acc_evaluator.evaluate(gbt_predictions)
#print('A single decision tree has an accuracy of: {0:2.2f}%'.format(dtc_acc*100))
print('A random forest ensemble has an accuracy of: {0:2.2f}%'.format(rfc_acc*100))
#print('An ensemble using GBT has an accuracy of: {0:2.2f}%'.format(gbt_acc*100))