In [1]:
from pyspark.sql.types import BooleanType
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import LinearSVC
from pyspark.sql.session import SparkSession
from pyspark.sql.functions import expr
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from helpers.helper_functions import translate_to_file_string
from pyspark.sql import DataFrameReader
from pyspark.sql import SparkSession
from pyspark.ml.feature import IndexToString, Normalizer, StringIndexer, VectorAssembler, VectorIndexer
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml import Pipeline
from helpers.helper_functions import translate_to_file_string
from sklearn.metrics import roc_curve, auc
from sklearn.tree import plot_tree
import seaborn as sns
import pandas as pd
import os
import warnings
import matplotlib.pyplot as plt
warnings.filterwarnings('ignore')
    

In [2]:
inputFile = translate_to_file_string("../data/heart_val.csv")

In [3]:
spark = (SparkSession
       .builder
       .appName("HeartDiseaseAnalDT")
       .getOrCreate())

In [4]:
# load data file.
# create a DataFrame using an ifered Schema 
df = spark.read.option("header", "true") \
       .option("inferSchema", "true") \
       .option("delimiter", ";") \
       .csv(inputFile)


In [5]:
#remove the outliner
df_filtered=df.filter(df.age > 30)

In [6]:
#transform labels to number values
labelIndexer = StringIndexer().setInputCol("target").setOutputCol("label").fit(df)
sexIndexer = StringIndexer().setInputCol("sex").setOutputCol("sex_num").fit(df)

In [7]:
#feature columns
featureCols = df.columns.copy()
featureCols.remove("target")
featureCols.remove("sex")
featureCols = featureCols + ["sex_num"]

In [8]:
#vector assembler
assembler =  VectorAssembler(outputCol="features", inputCols=list(featureCols))

In [9]:
#Build feauture Indexer 
featureIndexer = VectorIndexer(inputCol="features",outputCol="indexedFeatures", maxCategories=6)

In [10]:
#Convert Indexed labels back to original labels
predConverter = IndexToString(inputCol="prediction",outputCol="predictedLabel",labels=labelIndexer.labels)

In [11]:
#Build decistion tree model
dt = DecisionTreeClassifier(labelCol="label", featuresCol="features")

In [12]:
# build a parameter grid for different values
paramGrid = ParamGridBuilder().addGrid(dt.maxDepth, [ 5, 10 ]) \
				              .addGrid(dt.minInfoGain, [0.05, 0.025, 0.02, 0.01]) \
				              .addGrid(dt.minInstancesPerNode, [5, 10, 15]) \
                              .addGrid(dt.maxBins, [5, 6, 9]) \
				              .build()

In [13]:
#split data for testing

splits = df.randomSplit([0.6, 0.4 ], 3455)
train = splits[0]
test = splits[1]

In [14]:
#Pipelining of all steps 
pipeline = Pipeline(stages= [labelIndexer,sexIndexer,  assembler, featureIndexer,   dt, predConverter])

In [15]:
#build evaluator 
evaluator =  BinaryClassificationEvaluator(labelCol="label",rawPredictionCol="rawPrediction", metricName="areaUnderROC")

In [16]:
#Cross validator
cv = CrossValidator(estimator=pipeline, evaluator=evaluator,estimatorParamMaps=paramGrid,numFolds=10, parallelism=2)

In [17]:
#train model
cvModel = cv.fit(train)

In [18]:
#Find out the best model
treeModel = cvModel.bestModel.stages[4] 
print("Learned classification tree model:\n",treeModel)
print("Best Params: \n", treeModel.explainParams())

Learned classification tree model:
 DecisionTreeClassificationModel: uid=DecisionTreeClassifier_6d1df8416897, depth=4, numNodes=13, numClasses=2, numFeatures=13
Best Params: 
 cacheNodeIds: If false, the algorithm will pass trees to executors to match instances with nodes. If true, the algorithm will cache node IDs for each instance. Caching can speed up training of deeper trees. Users can set how often should the cache be checkpointed or disable it by setting checkpointInterval. (default: False)
checkpointInterval: set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations. Note: this setting will be ignored if the checkpoint directory is not set in the SparkContext. (default: 10)
featuresCol: features column name. (default: features, current: features)
impurity: Criterion used for information gain calculation (case-insensitive). Supported options: entropy, gini (default: gini)
labelCol: label column name. (default

In [19]:
#show tree
plt.figure(figsize=(20,20))
features = df.columns

classes = ["No Disease","Diasease"]

#plot_tree(treeModel,feature_names=features,class_names=classes,filled=True)
#plt.show()


<Figure size 1440x1440 with 0 Axes>

In [20]:
#test model
predictions = cvModel.transform(test)
predictions.select("prediction", "label", "target", "features").show()

+----------+-----+------+--------------------+
|prediction|label|target|            features|
+----------+-----+------+--------------------+
|       0.0|  0.0|     y|[34.0,1.0,118.0,2...|
|       0.0|  0.0|     y|(13,[0,1,2,3,6,9,...|
|       1.0|  1.0|     n|[39.0,0.0,118.0,2...|
|       0.0|  0.0|     y|(13,[0,1,2,3,6,9,...|
|       1.0|  1.0|     n|[40.0,0.0,110.0,1...|
|       0.0|  0.0|     y|[41.0,1.0,105.0,1...|
|       0.0|  0.0|     y|[41.0,1.0,130.0,2...|
|       0.0|  0.0|     y|[41.0,1.0,110.0,2...|
|       0.0|  0.0|     y|[41.0,2.0,112.0,2...|
|       1.0|  0.0|     y|[41.0,2.0,130.0,2...|
|       0.0|  0.0|     y|[42.0,0.0,102.0,2...|
|       0.0|  0.0|     y|[42.0,2.0,120.0,2...|
|       0.0|  1.0|     n|[42.0,0.0,136.0,3...|
|       0.0|  0.0|     y|(13,[0,2,3,5,6,9,...|
|       0.0|  0.0|     y|[42.0,2.0,130.0,1...|
|       0.0|  0.0|     y|[42.0,3.0,148.0,2...|
|       1.0|  1.0|     n|[43.0,0.0,132.0,3...|
|       0.0|  0.0|     y|[43.0,0.0,115.0,3...|
|       1.0| 

In [21]:
accuracy = evaluator.evaluate(predictions)
print("Test Error = " ,(1.0 - accuracy))

Test Error =  0.1752653301886793


In [22]:
spark.stop()