In [10]:
# create spark and sparkcontext objects
from pyspark.sql import SparkSession
from pyspark.sql import Row
import numpy as np
import pandas as pd
from pyspark.sql.functions import isnan, isnull, when, count, col
from pyspark.sql import functions as fn
import matplotlib.pyplot as plt
from pyspark.ml import feature
# Funcionality for classification
from pyspark.ml import Pipeline
import seaborn as sns
from pyspark.ml.feature import StandardScaler
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, MultilayerPerceptronClassifier, DecisionTreeClassifier,GBTClassifier
from pyspark.sql.types import IntegerType
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext

In [11]:
# Do not delete or change this cell

# grading import statements
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
sqlContext = SQLContext(sc)
import os

# Define a function to determine if we are running on data bricks
# Return true if running in the data bricks environment, false otherwise
def is_databricks():
    # get the databricks runtime version
    db_env = os.getenv("DATABRICKS_RUNTIME_VERSION")
    
    # if running on data bricks
    if db_env != None:
        return True
    else:
        return False

# Define a function to read the data file.  The full path data file name is constructed
# by checking runtime environment variables to determine if the runtime environment is 
# databricks, or a student's personal computer.  The full path file name is then
# constructed based on the runtime env.
# 
# Params
#   data_file_name: The base name of the data file to load
# 
# Returns the full path file name based on the runtime env
#
# Correct Usage Example (pass ONLY the full file name):
#   file_name_to_load = get_training_filename("sms_spam.csv") # correct - pass ONLY the full file name  
#   
# Incorrect Usage Example
#   file_name_to_load = get_training_filename("/sms_spam.csv") # incorrect - pass ONLY the full file name
#   file_name_to_load = get_training_filename("sms_spam.csv/") # incorrect - pass ONLY the full file name
#   file_name_to_load = get_training_filename("c:/users/will/data/sms_spam.csv") incorrect -pass ONLY the full file name
def get_training_filename(data_file_name):    
    # if running on data bricks
    if is_databricks():
        # build the full path file name assuming data brick env
        full_path_name = "/FileStore/tables/%s" % data_file_name
    # else the data is assumed to be in the same dir as this notebook
    else:
        # Assume the student is running on their own computer and load the data
        # file from the same dir as this notebook
        full_path_name = data_file_name
    
    # return the full path file name to the caller
    return full_path_name

In [20]:
file_path = "W:/Syracuse/Assignements/BigData/FinalProject/Hospital-Readmission-based-on-Diabetes/Data/"
file_name = "hospital_readmission_cleaned.csv"
diabetes_df = spark.read.csv(get_training_filename('Data/diabetes.csv'), header=True, inferSchema=True)
diabetes_df.toPandas().head()

Unnamed: 0,_c0,race,diag_1,diag_2,diag_3,admission_type_name,dischage_disposition_name,admission_source_name,gender,age,time_in_hospital,num_lab_procedures,num_procedures,num_medications,number_outpatient,number_emergency,number_inpatient,number_diagnoses,max_glu_serum,A1Cresult,metformin,repaglinide,nateglinide,chlorpropamide,glimepiride,glipizide,glyburide,pioglitazone,rosiglitazone,acarbose,miglitol,insulin,glyburide-metformin,change,diabetesMed,readmitted
0,0,Caucasian,250.83,276.0,276,Emergency,Not Mapped,Physician Referral,Female,[0-10),1,41,0,1,0,0,0,1,,,No,No,No,No,No,No,No,No,No,No,No,No,No,No,No,NO
1,1,Caucasian,276.0,250.01,255,Emergency,Discharged to home,Emergency Room,Female,[10-20),3,59,0,18,0,0,0,9,,,No,No,No,No,No,No,No,No,No,No,No,Up,No,Ch,Yes,YES
2,2,AfricanAmerican,648.0,250.0,V27,Emergency,Discharged to home,Emergency Room,Female,[20-30),2,11,5,13,2,0,1,6,,,No,No,No,No,No,Steady,No,No,No,No,No,No,No,No,Yes,NO
3,3,Caucasian,8.0,250.43,403,Emergency,Discharged to home,Emergency Room,Male,[30-40),2,44,1,16,0,0,0,7,,,No,No,No,No,No,No,No,No,No,No,No,Up,No,Ch,Yes,NO
4,4,Caucasian,197.0,157.0,250,Emergency,Discharged to home,Emergency Room,Male,[40-50),1,51,0,8,0,0,0,5,,,No,No,No,No,No,Steady,No,No,No,No,No,Steady,No,Ch,Yes,NO


In [13]:
'''
diabetes_df = diabetes_df.withColumn("diag_1", diabetes_df["diag_1"].cast(IntegerType()))
diabetes_df = diabetes_df.withColumn("diag_2", diabetes_df["diag_2"].cast(IntegerType()))
diabetes_df = diabetes_df.withColumn("diag_3", diabetes_df["diag_3"].cast(IntegerType()))
diabetes_df.dtypes
'''

'\ndiabetes_df = diabetes_df.withColumn("diag_1", diabetes_df["diag_1"].cast(IntegerType()))\ndiabetes_df = diabetes_df.withColumn("diag_2", diabetes_df["diag_2"].cast(IntegerType()))\ndiabetes_df = diabetes_df.withColumn("diag_3", diabetes_df["diag_3"].cast(IntegerType()))\ndiabetes_df.dtypes\n'

In [14]:
# Split the dataset into training and testing
train, test = diabetes_df.randomSplit([0.8, 0.2], 0)
train.dtypes

[('_c0', 'int'),
 ('race', 'string'),
 ('diag_1', 'string'),
 ('diag_2', 'string'),
 ('diag_3', 'string'),
 ('admission_type_name', 'string'),
 ('dischage_disposition_name', 'string'),
 ('admission_source_name', 'string'),
 ('gender', 'string'),
 ('age', 'string'),
 ('time_in_hospital', 'int'),
 ('num_lab_procedures', 'int'),
 ('num_procedures', 'int'),
 ('num_medications', 'int'),
 ('number_outpatient', 'int'),
 ('number_emergency', 'int'),
 ('number_inpatient', 'int'),
 ('number_diagnoses', 'int'),
 ('max_glu_serum', 'string'),
 ('A1Cresult', 'string'),
 ('metformin', 'string'),
 ('repaglinide', 'string'),
 ('nateglinide', 'string'),
 ('chlorpropamide', 'string'),
 ('glimepiride', 'string'),
 ('glipizide', 'string'),
 ('glyburide', 'string'),
 ('pioglitazone', 'string'),
 ('rosiglitazone', 'string'),
 ('acarbose', 'string'),
 ('miglitol', 'string'),
 ('insulin', 'string'),
 ('glyburide-metformin', 'string'),
 ('change', 'string'),
 ('diabetesMed', 'string'),
 ('readmitted', 'string')

In [24]:
# build the pipelines
pipe_model = Pipeline(stages = [feature.StringIndexer(inputCol = 'gender', outputCol = 'gender_en'),
                                feature.StringIndexer(inputCol = 'admission_type_name', outputCol = 'ad_type_en'),
                                feature.StringIndexer(inputCol = 'race', outputCol = 'race_en'),
                                feature.StringIndexer(inputCol = 'admission_source_name', outputCol = 'source_en'),
                                feature.StringIndexer(inputCol = 'readmitted', outputCol = 'target'),
                                feature.VectorAssembler(inputCols = ['gender_en','ad_type_en','race_en','source_en','time_in_hospital','number_outpatient','number_emergency','number_inpatient','number_diagnoses'],outputCol = 'features'),
                                feature.StandardScaler(withMean=True, inputCol='features',outputCol = 'zfeatures')])

In [25]:
# Logistic regression pipeline
pipe_logit = Pipeline(stages = [pipe_model,LogisticRegression(labelCol='target',featuresCol = 'zfeatures')])

In [26]:
# Fitiing logitstic model
fitted_model = pipe_logit.fit(train)
fitted_model.transform(diabetes_df).toPandas().head()

Unnamed: 0,_c0,race,diag_1,diag_2,diag_3,admission_type_name,dischage_disposition_name,admission_source_name,gender,age,time_in_hospital,num_lab_procedures,num_procedures,num_medications,number_outpatient,number_emergency,number_inpatient,number_diagnoses,max_glu_serum,A1Cresult,metformin,repaglinide,nateglinide,chlorpropamide,glimepiride,glipizide,glyburide,pioglitazone,rosiglitazone,acarbose,miglitol,insulin,glyburide-metformin,change,diabetesMed,readmitted,gender_en,ad_type_en,race_en,source_en,target,features,zfeatures,rawPrediction,probability,prediction
0,0,Caucasian,250.83,276.0,276,Emergency,Not Mapped,Physician Referral,Female,[0-10),1,41,0,1,0,0,0,1,,,No,No,No,No,No,No,No,No,No,No,No,No,No,No,No,NO,0.0,0.0,0.0,1.0,0.0,"(0.0, 0.0, 0.0, 1.0, 1.0, 0.0, 0.0, 0.0, 1.0)","[-0.9275920023263226, -0.7426376626049093, -0....","[0.9902374336719668, -0.9902374336719668]","[0.7291348173941973, 0.2708651826058027]",0.0
1,1,Caucasian,276.0,250.01,255,Emergency,Discharged to home,Emergency Room,Female,[10-20),3,59,0,18,0,0,0,9,,,No,No,No,No,No,No,No,No,No,No,No,Up,No,Ch,Yes,YES,0.0,0.0,0.0,0.0,1.0,"(0.0, 0.0, 0.0, 0.0, 3.0, 0.0, 0.0, 0.0, 9.0)","[-0.9275920023263226, -0.7426376626049093, -0....","[0.23690375055860308, -0.23690375055860308]","[0.5589504867749511, 0.44104951322504904]",0.0
2,2,AfricanAmerican,648.0,250.0,V27,Emergency,Discharged to home,Emergency Room,Female,[20-30),2,11,5,13,2,0,1,6,,,No,No,No,No,No,Steady,No,No,No,No,No,No,No,No,Yes,NO,0.0,0.0,1.0,0.0,0.0,"[0.0, 0.0, 1.0, 0.0, 2.0, 2.0, 0.0, 1.0, 6.0]","[-0.9275920023263226, -0.7426376626049093, 1.0...","[-0.0005450749801048926, 0.0005450749801048926]","[0.49986373125834765, 0.5001362687416524]",1.0
3,3,Caucasian,8.0,250.43,403,Emergency,Discharged to home,Emergency Room,Male,[30-40),2,44,1,16,0,0,0,7,,,No,No,No,No,No,No,No,No,No,No,No,Up,No,Ch,Yes,NO,1.0,0.0,0.0,0.0,0.0,"(1.0, 0.0, 0.0, 0.0, 2.0, 0.0, 0.0, 0.0, 7.0)","[1.077833868781897, -0.7426376626049093, -0.46...","[0.4632706666651081, -0.4632706666651081]","[0.6137897818038783, 0.38621021819612167]",0.0
4,4,Caucasian,197.0,157.0,250,Emergency,Discharged to home,Emergency Room,Male,[40-50),1,51,0,8,0,0,0,5,,,No,No,No,No,No,Steady,No,No,No,No,No,Steady,No,Ch,Yes,NO,1.0,0.0,0.0,0.0,0.0,"(1.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 5.0)","[1.077833868781897, -0.7426376626049093, -0.46...","[0.6353248651499642, -0.6353248651499642]","[0.6536958767566865, 0.34630412324331356]",0.0


In [27]:
# validate logistic model
fitted_model.transform(test).select(fn.avg(fn.expr('target = prediction').cast('float'))).show()

+-----------------------------------------+
|avg(CAST((target = prediction) AS FLOAT))|
+-----------------------------------------+
|                       0.6146293568973982|
+-----------------------------------------+



In [28]:
# Random forest pipeline
pipe_rf = Pipeline(stages = [pipe_model,RandomForestClassifier(labelCol="target", featuresCol="zfeatures", numTrees=10)])

In [29]:
# Fitiing RF model
fitted_model = pipe_rf.fit(train)
fitted_model.transform(diabetes_df).toPandas().head()

Unnamed: 0,_c0,race,diag_1,diag_2,diag_3,admission_type_name,dischage_disposition_name,admission_source_name,gender,age,time_in_hospital,num_lab_procedures,num_procedures,num_medications,number_outpatient,number_emergency,number_inpatient,number_diagnoses,max_glu_serum,A1Cresult,metformin,repaglinide,nateglinide,chlorpropamide,glimepiride,glipizide,glyburide,pioglitazone,rosiglitazone,acarbose,miglitol,insulin,glyburide-metformin,change,diabetesMed,readmitted,gender_en,ad_type_en,race_en,source_en,target,features,zfeatures,rawPrediction,probability,prediction
0,0,Caucasian,250.83,276.0,276,Emergency,Not Mapped,Physician Referral,Female,[0-10),1,41,0,1,0,0,0,1,,,No,No,No,No,No,No,No,No,No,No,No,No,No,No,No,NO,0.0,0.0,0.0,1.0,0.0,"(0.0, 0.0, 0.0, 1.0, 1.0, 0.0, 0.0, 0.0, 1.0)","[-0.9275920023263226, -0.7426376626049092, -0....","[6.667353951804931, 3.332646048195069]","[0.6667353951804931, 0.33326460481950687]",0.0
1,1,Caucasian,276.0,250.01,255,Emergency,Discharged to home,Emergency Room,Female,[10-20),3,59,0,18,0,0,0,9,,,No,No,No,No,No,No,No,No,No,No,No,Up,No,Ch,Yes,YES,0.0,0.0,0.0,0.0,1.0,"(0.0, 0.0, 0.0, 0.0, 3.0, 0.0, 0.0, 0.0, 9.0)","[-0.9275920023263226, -0.7426376626049092, -0....","[6.1896021981054625, 3.8103978018945366]","[0.6189602198105463, 0.38103978018945367]",0.0
2,2,AfricanAmerican,648.0,250.0,V27,Emergency,Discharged to home,Emergency Room,Female,[20-30),2,11,5,13,2,0,1,6,,,No,No,No,No,No,Steady,No,No,No,No,No,No,No,No,Yes,NO,0.0,0.0,1.0,0.0,0.0,"[0.0, 0.0, 1.0, 0.0, 2.0, 2.0, 0.0, 1.0, 6.0]","[-0.9275920023263226, -0.7426376626049092, 1.0...","[3.6360438223315237, 6.363956177668476]","[0.3636043822331524, 0.6363956177668476]",1.0
3,3,Caucasian,8.0,250.43,403,Emergency,Discharged to home,Emergency Room,Male,[30-40),2,44,1,16,0,0,0,7,,,No,No,No,No,No,No,No,No,No,No,No,Up,No,Ch,Yes,NO,1.0,0.0,0.0,0.0,0.0,"(1.0, 0.0, 0.0, 0.0, 2.0, 0.0, 0.0, 0.0, 7.0)","[1.077833868781897, -0.7426376626049092, -0.46...","[6.1896021981054625, 3.8103978018945366]","[0.6189602198105463, 0.38103978018945367]",0.0
4,4,Caucasian,197.0,157.0,250,Emergency,Discharged to home,Emergency Room,Male,[40-50),1,51,0,8,0,0,0,5,,,No,No,No,No,No,Steady,No,No,No,No,No,Steady,No,Ch,Yes,NO,1.0,0.0,0.0,0.0,0.0,"(1.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 5.0)","[1.077833868781897, -0.7426376626049092, -0.46...","[6.634437388523745, 3.365562611476255]","[0.6634437388523745, 0.3365562611476255]",0.0


In [32]:
# validate random forest model
fitted_model.transform(test).select(fn.avg(fn.expr('target = prediction').cast('float'))).show()

+-----------------------------------------+
|avg(CAST((target = prediction) AS FLOAT))|
+-----------------------------------------+
|                       0.6164948453608248|
+-----------------------------------------+



In [33]:
# MLP pipeline
layers = [4,5,5,2]
pipe_mlp = Pipeline(stages = [pipe_model,MultilayerPerceptronClassifier(maxIter=100, layers=layers,labelCol = 'target',featuresCol = 'zfeatures', blockSize=128)])


In [36]:
# Fitiing MLP model
fitted_model = pipe_mlp.fit(train)
fitted_model.transform(test).toPandas().head()

Py4JJavaError: An error occurred while calling o3678.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in stage 641.0 failed 1 times, most recent failure: Lost task 4.0 in stage 641.0 (TID 3089, localhost, executor driver): org.apache.spark.SparkException: Failed to execute user defined function($anonfun$1: (struct<type:tinyint,size:int,indices:array<int>,values:array<double>>) => struct<type:tinyint,size:int,indices:array<int>,values:array<double>>)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.lang.Thread.run(Unknown Source)
Caused by: java.lang.IllegalArgumentException: requirement failed: A & B Dimension mismatch!
	at scala.Predef$.require(Predef.scala:224)
	at org.apache.spark.ml.ann.BreezeUtil$.dgemm(BreezeUtil.scala:41)
	at org.apache.spark.ml.ann.AffineLayerModel.eval(Layer.scala:164)
	at org.apache.spark.ml.ann.FeedForwardModel.forward(Layer.scala:508)
	at org.apache.spark.ml.ann.FeedForwardModel.predictRaw(Layer.scala:561)
	at org.apache.spark.ml.classification.MultilayerPerceptronClassificationModel.predictRaw(MultilayerPerceptronClassifier.scala:323)
	at org.apache.spark.ml.classification.MultilayerPerceptronClassificationModel.predictRaw(MultilayerPerceptronClassifier.scala:280)
	at org.apache.spark.ml.classification.ProbabilisticClassificationModel$$anonfun$1.apply(ProbabilisticClassifier.scala:117)
	at org.apache.spark.ml.classification.ProbabilisticClassificationModel$$anonfun$1.apply(ProbabilisticClassifier.scala:116)
	... 18 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:299)
	at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:3263)
	at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:3260)
	at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3370)
	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3369)
	at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3260)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.lang.reflect.Method.invoke(Unknown Source)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Unknown Source)
Caused by: org.apache.spark.SparkException: Failed to execute user defined function($anonfun$1: (struct<type:tinyint,size:int,indices:array<int>,values:array<double>>) => struct<type:tinyint,size:int,indices:array<int>,values:array<double>>)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	... 1 more
Caused by: java.lang.IllegalArgumentException: requirement failed: A & B Dimension mismatch!
	at scala.Predef$.require(Predef.scala:224)
	at org.apache.spark.ml.ann.BreezeUtil$.dgemm(BreezeUtil.scala:41)
	at org.apache.spark.ml.ann.AffineLayerModel.eval(Layer.scala:164)
	at org.apache.spark.ml.ann.FeedForwardModel.forward(Layer.scala:508)
	at org.apache.spark.ml.ann.FeedForwardModel.predictRaw(Layer.scala:561)
	at org.apache.spark.ml.classification.MultilayerPerceptronClassificationModel.predictRaw(MultilayerPerceptronClassifier.scala:323)
	at org.apache.spark.ml.classification.MultilayerPerceptronClassificationModel.predictRaw(MultilayerPerceptronClassifier.scala:280)
	at org.apache.spark.ml.classification.ProbabilisticClassificationModel$$anonfun$1.apply(ProbabilisticClassifier.scala:117)
	at org.apache.spark.ml.classification.ProbabilisticClassificationModel$$anonfun$1.apply(ProbabilisticClassifier.scala:116)
	... 18 more


In [37]:
gbt = GBTClassifier(labelCol="target",featuresCol="zfeatures")
gbt_pipeline = Pipeline(stages=[pipe_model, gbt]).fit(train)
gbt_pipeline.transform(test).select(fn.avg(fn.expr('target = prediction').cast('float'))).show()

+-----------------------------------------+
|avg(CAST((target = prediction) AS FLOAT))|
+-----------------------------------------+
|                       0.6189003436426117|
+-----------------------------------------+



In [38]:
dt = DecisionTreeClassifier(labelCol="target", featuresCol="zfeatures")
dt_pipeline = Pipeline(stages=[pipe_model, dt]).fit(train)
dt_pipeline.transform(test).select(fn.avg(fn.expr('target = prediction').cast('float'))).show()

+-----------------------------------------+
|avg(CAST((target = prediction) AS FLOAT))|
+-----------------------------------------+
|                        0.614972999509082|
+-----------------------------------------+



In [None]:
#diabetes_df = diabetes_df.withColumn("metformin", diabetes_df["metformin"].cast(IntegerType()))
#diabetes_df = diabetes_df.withColumn("diag_2", hosp_readmit_id_mapped_df.diag_2.cast('Int'))
#print(diabetes_df.count())
#diabetes_df = diabetes_df.na.drop()
#diabetes_df.where(diabetes_df.metformin.isNull()).count()



'''
NULL VALUES IN 
diag_1
diag_2
diag_3
max_glu_serum
A1Cresult
metformin_en
repaglinide_en
'''
#diabetes_df = diabetes_df.withColumn('readmitted',when((diabetes_df.readmitted == '<30') | (diabetes_df.readmitted == '>30'),'YES').otherwise('NO'))
#diabetes_df.toPandas().to_csv('Data/diabetes.csv')

#52.169
#discharge_en reduces to 50.73
#source_en 55.24
#time in hosp 55.95
# number_outpatient 57.69
#number_emergency 59.31
# number_inpatient 63.46
# number_diagnoses 63.92