In [38]:
#to install pyspark
#pip install pyspark

In [210]:
# Import necessary libraries
from pyspark.sql import SparkSession
from pyspark.ml.stat import Correlation
from pyspark.ml.feature import VectorAssembler,StringIndexer,StandardScaler
from pyspark.mllib.evaluation import BinaryClassificationMetrics
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.classification import LinearSVC
from pyspark.ml.classification import DecisionTreeClassifier

In [211]:
# Create a SparkSession
spark = SparkSession.builder.appName("HeartFailurePredictor").master("local").getOrCreate()

In [212]:
# Load the dataset
data = spark.read.csv("C:/Users/eobot/OneDrive - Coventry University/Desktop/7153/Assessment/Coursework/heart.csv", header=True, inferSchema=True)

In [213]:
data.show(10)

+---+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+
|Age|Sex|ChestPainType|RestingBP|Cholesterol|FastingBS|RestingECG|MaxHR|ExerciseAngina|Oldpeak|ST_Slope|HeartDisease|
+---+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+
| 40|  M|          ATA|      140|        289|        0|    Normal|  172|             N|    0.0|      Up|           0|
| 49|  F|          NAP|      160|        180|        0|    Normal|  156|             N|    1.0|    Flat|           1|
| 37|  M|          ATA|      130|        283|        0|        ST|   98|             N|    0.0|      Up|           0|
| 48|  F|          ASY|      138|        214|        0|    Normal|  108|             Y|    1.5|    Flat|           1|
| 54|  M|          NAP|      150|        195|        0|    Normal|  122|             N|    0.0|      Up|           0|
| 39|  M|          NAP|      120|        339|        0| 

In [214]:
import pyspark.sql.functions as fc
print((data.count(), len(data.columns)))
data.describe().show()


# check for null values in each column
data_null = data.agg(*[fc.count(fc.when(fc.isnull(c), c)).alias(c) for c in data.columns])
data_null.show()   # no null values

(918, 12)
+-------+------------------+----+-------------+------------------+------------------+-------------------+----------+------------------+--------------+------------------+--------+-------------------+
|summary|               Age| Sex|ChestPainType|         RestingBP|       Cholesterol|          FastingBS|RestingECG|             MaxHR|ExerciseAngina|           Oldpeak|ST_Slope|       HeartDisease|
+-------+------------------+----+-------------+------------------+------------------+-------------------+----------+------------------+--------------+------------------+--------+-------------------+
|  count|               918| 918|          918|               918|               918|                918|       918|               918|           918|               918|     918|                918|
|   mean|53.510893246187365|NULL|         NULL|132.39651416122004| 198.7995642701525|0.23311546840958605|      NULL|136.80936819172112|          NULL|0.8873638344226581|    NULL| 0.5533769063180

In [215]:
data.summary
data.describe().show()
data.dtypes

+-------+------------------+----+-------------+------------------+------------------+-------------------+----------+------------------+--------------+------------------+--------+-------------------+
|summary|               Age| Sex|ChestPainType|         RestingBP|       Cholesterol|          FastingBS|RestingECG|             MaxHR|ExerciseAngina|           Oldpeak|ST_Slope|       HeartDisease|
+-------+------------------+----+-------------+------------------+------------------+-------------------+----------+------------------+--------------+------------------+--------+-------------------+
|  count|               918| 918|          918|               918|               918|                918|       918|               918|           918|               918|     918|                918|
|   mean|53.510893246187365|NULL|         NULL|132.39651416122004| 198.7995642701525|0.23311546840958605|      NULL|136.80936819172112|          NULL|0.8873638344226581|    NULL| 0.5533769063180828|
| std

[('Age', 'int'),
 ('Sex', 'string'),
 ('ChestPainType', 'string'),
 ('RestingBP', 'int'),
 ('Cholesterol', 'int'),
 ('FastingBS', 'int'),
 ('RestingECG', 'string'),
 ('MaxHR', 'int'),
 ('ExerciseAngina', 'string'),
 ('Oldpeak', 'double'),
 ('ST_Slope', 'string'),
 ('HeartDisease', 'int')]

In [216]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer

#label encoding of categorical columns
categorical_cols = ["Sex","ChestPainType","RestingECG","ExerciseAngina","ST_Slope"]
label_encoders = [StringIndexer(inputCol=col, outputCol=col + "_encoded").fit(data) for col in categorical_cols]
pipeline = Pipeline(stages=label_encoders)
data = pipeline.fit(data).transform(data)

In [217]:
data.dtypes

[('Age', 'int'),
 ('Sex', 'string'),
 ('ChestPainType', 'string'),
 ('RestingBP', 'int'),
 ('Cholesterol', 'int'),
 ('FastingBS', 'int'),
 ('RestingECG', 'string'),
 ('MaxHR', 'int'),
 ('ExerciseAngina', 'string'),
 ('Oldpeak', 'double'),
 ('ST_Slope', 'string'),
 ('HeartDisease', 'int'),
 ('Sex_encoded', 'double'),
 ('ChestPainType_encoded', 'double'),
 ('RestingECG_encoded', 'double'),
 ('ExerciseAngina_encoded', 'double'),
 ('ST_Slope_encoded', 'double')]

In [218]:
#drop duplicate columns
new_data = data.drop("Sex","ChestPainType","RestingECG","ExerciseAngina","ST_Slope")
new_data.dtypes

[('Age', 'int'),
 ('RestingBP', 'int'),
 ('Cholesterol', 'int'),
 ('FastingBS', 'int'),
 ('MaxHR', 'int'),
 ('Oldpeak', 'double'),
 ('HeartDisease', 'int'),
 ('Sex_encoded', 'double'),
 ('ChestPainType_encoded', 'double'),
 ('RestingECG_encoded', 'double'),
 ('ExerciseAngina_encoded', 'double'),
 ('ST_Slope_encoded', 'double')]

In [219]:
FeaturesData = new_data.drop("HeartDisease")
print((FeaturesData.count(), len(FeaturesData.columns)))

(918, 11)


In [220]:
Target = new_data.drop("Age", "RestingBP","Cholesterol","FastingBS","MaxHR","Oldpeak","ExerciseAngina_encoded","Sex_encoded","ChestPainType_encoded","RestingECG_encoded","ST_Slope_encoded")
print((Target.count(), len(Target.columns)))
Target.groupBy('HeartDisease').count().orderBy('count').show()

(918, 1)
+------------+-----+
|HeartDisease|count|
+------------+-----+
|           0|  410|
|           1|  508|
+------------+-----+



In [221]:
features_columns = FeaturesData.columns
print(features_columns)
VAssembler = VectorAssembler(inputCols=features_columns, outputCol="VAfeatures")
new_data = VAssembler.transform(new_data)
new_data = new_data.select("VAfeatures", "HeartDisease")


['Age', 'RestingBP', 'Cholesterol', 'FastingBS', 'MaxHR', 'Oldpeak', 'Sex_encoded', 'ChestPainType_encoded', 'RestingECG_encoded', 'ExerciseAngina_encoded', 'ST_Slope_encoded']


In [222]:
new_data.show(5)

+--------------------+------------+
|          VAfeatures|HeartDisease|
+--------------------+------------+
|(11,[0,1,2,4,7,10...|           0|
|[49.0,160.0,180.0...|           1|
|[37.0,130.0,283.0...|           0|
|[48.0,138.0,214.0...|           1|
|(11,[0,1,2,4,7,10...|           0|
+--------------------+------------+
only showing top 5 rows



In [223]:
#standardizing the data
scaled_data = StandardScaler(inputCol="VAfeatures", outputCol="features")
new_data = scaled_data.fit(new_data).transform(new_data)

In [224]:
new_data.show(5)

+--------------------+------------+--------------------+
|          VAfeatures|HeartDisease|            features|
+--------------------+------------+--------------------+
|(11,[0,1,2,4,7,10...|           0|(11,[0,1,2,4,7,10...|
|[49.0,160.0,180.0...|           1|[5.19474103129591...|
|[37.0,130.0,283.0...|           0|[3.92255955424385...|
|[48.0,138.0,214.0...|           1|[5.08872590820824...|
|(11,[0,1,2,4,7,10...|           0|(11,[0,1,2,4,7,10...|
+--------------------+------------+--------------------+
only showing top 5 rows



In [225]:
new_data = new_data.select("features", "HeartDisease")
new_data = new_data.withColumnRenamed("HeartDisease","label")
new_data.show(5)

+--------------------+-----+
|            features|label|
+--------------------+-----+
|(11,[0,1,2,4,7,10...|    0|
|[5.19474103129591...|    1|
|[3.92255955424385...|    0|
|[5.08872590820824...|    1|
|(11,[0,1,2,4,7,10...|    0|
+--------------------+-----+
only showing top 5 rows



In [226]:
#splitting into test and train data
train_data, test_data = new_data.randomSplit([0.8, 0.2], seed=42)

In [228]:
# Logistic Regression Model
LR=LogisticRegression().fit(train_data)

#Get Predictions for Logistic Regression Model
prediction = LR.transform(test_data)
multi_evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction")
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", labelCol="label")

#Metrics for evaluation
auc = evaluator.evaluate(prediction)
accuracy = multi_evaluator.evaluate(prediction, {multi_evaluator.metricName: "accuracy"})
precision = multi_evaluator.evaluate(prediction, {multi_evaluator.metricName: "weightedPrecision"})
sensitivity = multi_evaluator.evaluate(prediction, {multi_evaluator.metricName: "weightedRecall"})
f1score = multi_evaluator.evaluate(prediction, {multi_evaluator.metricName: "f1"})
print("AUC-ROC: ", auc)
print("Accuracy: ", accuracy)
print("Precision: ", precision)
print("Sensitivity: ", sensitivity)
print("F1-Score: ", f1score)

AUC-ROC:  0.9154929577464785
Accuracy:  0.8523489932885906
Precision:  0.8607363591551966
Sensitivity:  0.8523489932885906
F1-Score:  0.8508075816800649


In [229]:
#Display the Logistic Regression predictions
prediction.show()

+--------------------+-----+--------------------+--------------------+----------+
|            features|label|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+----------+
|(11,[0,1,2,3,4,5]...|    1|[-1.9765736975550...|[0.12168455786611...|       1.0|
|(11,[0,1,2,3,4,8]...|    1|[-2.5642192262587...|[0.07147701365420...|       1.0|
|(11,[0,1,2,3,4,8]...|    1|[-2.1431794373647...|[0.10497030110055...|       1.0|
|(11,[0,1,2,4],[4....|    1|[-0.1044147375503...|[0.47392000593331...|       1.0|
|(11,[0,1,2,4],[5....|    1|[-0.4093443818699...|[0.39906933649413...|       1.0|
|(11,[0,1,2,4],[6....|    1|[-1.0969145465464...|[0.25031846173779...|       1.0|
|(11,[0,1,2,4,5],[...|    0|[-1.0665928358751...|[0.25605157238773...|       1.0|
|(11,[0,1,2,4,5],[...|    0|[-1.0982620016845...|[0.25006568456086...|       1.0|
|(11,[0,1,2,4,5,7]...|    1|[-0.6055305346470...|[0.35307941627607...|       1.0|
|(11,[0,1,2,4,5,

In [232]:
# Gradient Boost Model
gradient_boost_class = GBTClassifier(labelCol="label", featuresCol="features")
model = gradient_boost_class.fit(train_data)

#Get predictions for Gradient Boost model
predictionsGBT = model.transform(test_data)

#Metrics for evaluation
auc = evaluator.evaluate(predictionsGBT)
accuracy = multi_evaluator.evaluate(predictionsGBT, {multi_evaluator.metricName: "accuracy"})
precision = multi_evaluator.evaluate(predictionsGBT, {multi_evaluator.metricName: "weightedPrecision"})
sensitivity = multi_evaluator.evaluate(predictionsGBT, {multi_evaluator.metricName: "weightedRecall"})
f1score = multi_evaluator.evaluate(predictionsGBT, {multi_evaluator.metricName: "f1"})
print("AUC-ROC: ", auc)
print("Accuracy: ", accuracy)
print("Precision: ", precision)
print("Sensitivity: ", sensitivity)
print("F1-Score: ", f1score)

AUC-ROC:  0.914409534127844
Accuracy:  0.8859060402684564
Precision:  0.888868189893407
Sensitivity:  0.8859060402684564
F1-Score:  0.8853979190208447


In [233]:
#Display the Gradient Boost predictions
predictionsGBT.show()

+--------------------+-----+--------------------+--------------------+----------+
|            features|label|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+----------+
|(11,[0,1,2,3,4,5]...|    1|[-0.1975587198008...|[0.40248599068022...|       1.0|
|(11,[0,1,2,3,4,8]...|    1|[-1.4665496852419...|[0.05054138821990...|       1.0|
|(11,[0,1,2,3,4,8]...|    1|[-1.5212945755071...|[0.04553850148072...|       1.0|
|(11,[0,1,2,4],[4....|    1|[-1.4399676927821...|[0.05315438828405...|       1.0|
|(11,[0,1,2,4],[5....|    1|[-0.9464326388474...|[0.13091811735594...|       1.0|
|(11,[0,1,2,4],[6....|    1|[-1.1492157641999...|[0.09125294448692...|       1.0|
|(11,[0,1,2,4,5],[...|    0|[-1.2485247306508...|[0.07606528277765...|       1.0|
|(11,[0,1,2,4,5],[...|    0|[-0.7350970369494...|[0.18691311748380...|       1.0|
|(11,[0,1,2,4,5,7]...|    1|[-0.2311986871358...|[0.38641725436498...|       1.0|
|(11,[0,1,2,4,5,

In [234]:
#Linear Support Vector Classifier Model
lsvc = LinearSVC(labelCol="label", maxIter=50)
model = lsvc.fit(train_data)

#Get predictions for Support Vector Machine Classifier
predictionsLSVC = model.transform(test_data)

#Metrics for evaluation
auc = evaluator.evaluate(predictionsLSVC)
accuracy = multi_evaluator.evaluate(predictionsLSVC, {multi_evaluator.metricName: "accuracy"})
precision = multi_evaluator.evaluate(predictionsLSVC, {multi_evaluator.metricName: "weightedPrecision"})
sensitivity = multi_evaluator.evaluate(predictionsLSVC, {multi_evaluator.metricName: "weightedRecall"})
f1score = multi_evaluator.evaluate(predictionsLSVC, {multi_evaluator.metricName: "f1"})
print("AUC-ROC: ", auc)
print("Accuracy: ", accuracy)
print("Precision: ", precision)
print("Sensitivity: ", sensitivity)
print("F1-Score: ", f1score)

AUC-ROC:  0.9142289635247379
Accuracy:  0.8590604026845637
Precision:  0.863629237557017
Sensitivity:  0.8590604026845637
F1-Score:  0.8581355768921999


In [235]:
#Display the Linear Support Vector Classifier predictions
predictionsLSVC.show()

+--------------------+-----+--------------------+----------+
|            features|label|       rawPrediction|prediction|
+--------------------+-----+--------------------+----------+
|(11,[0,1,2,3,4,5]...|    1|[-1.1797294937039...|       1.0|
|(11,[0,1,2,3,4,8]...|    1|[-1.5779064502089...|       1.0|
|(11,[0,1,2,3,4,8]...|    1|[-1.2373276731106...|       1.0|
|(11,[0,1,2,4],[4....|    1|[0.02798379541442...|       0.0|
|(11,[0,1,2,4],[5....|    1|[-0.1982245787043...|       1.0|
|(11,[0,1,2,4],[6....|    1|[-0.6840884421823...|       1.0|
|(11,[0,1,2,4,5],[...|    0|[-0.6271911434717...|       1.0|
|(11,[0,1,2,4,5],[...|    0|[-0.6493538274875...|       1.0|
|(11,[0,1,2,4,5,7]...|    1|[-0.3886628594374...|       1.0|
|(11,[0,1,2,4,5,7]...|    0|[0.16449453891078...|       0.0|
|(11,[0,1,2,4,5,7]...|    1|[0.77735077284634...|       0.0|
|(11,[0,1,2,4,5,7]...|    1|[-0.9344799232149...|       1.0|
|(11,[0,1,2,4,5,7]...|    1|[-0.9711073955980...|       1.0|
|(11,[0,1,2,4,5,8]...|  

In [236]:
#Random Forest Model
random_forest = RandomForestClassifier(labelCol="label", featuresCol="features")
model = random_forest.fit(train_data)

#Get predictions for Random Forest model
predictionsRDF = model.transform(test_data)

#Metrics for evaluation
auc = evaluator.evaluate(predictionsRDF)
accuracy = multi_evaluator.evaluate(predictionsRDF, {multi_evaluator.metricName: "accuracy"})
precision = multi_evaluator.evaluate(predictionsRDF, {multi_evaluator.metricName: "weightedPrecision"})
sensitivity = multi_evaluator.evaluate(predictionsRDF, {multi_evaluator.metricName: "weightedRecall"})
f1score = multi_evaluator.evaluate(predictionsRDF, {multi_evaluator.metricName: "f1"})
print("AUC-ROC: ", auc)
print("Accuracy: ", accuracy)
print("Precision: ", precision)
print("Sensitivity: ", sensitivity)
print("F1-Score: ", f1score)

AUC-ROC:  0.9232574936800284
Accuracy:  0.8590604026845637
Precision:  0.8602488048594623
Sensitivity:  0.8590604026845637
F1-Score:  0.8586770152865075


In [237]:
#Display the Random Forest predictions
predictionsRDF.show()

+--------------------+-----+--------------------+--------------------+----------+
|            features|label|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+----------+
|(11,[0,1,2,3,4,5]...|    1|[6.40696759591695...|[0.32034837979584...|       1.0|
|(11,[0,1,2,3,4,8]...|    1|[2.35887518843032...|[0.11794375942151...|       1.0|
|(11,[0,1,2,3,4,8]...|    1|[2.98116607583748...|[0.14905830379187...|       1.0|
|(11,[0,1,2,4],[4....|    1|[3.23511970718328...|[0.16175598535916...|       1.0|
|(11,[0,1,2,4],[5....|    1|[5.17610857800937...|[0.25880542890046...|       1.0|
|(11,[0,1,2,4],[6....|    1|[2.00173233128746...|[0.10008661656437...|       1.0|
|(11,[0,1,2,4,5],[...|    0|[2.35079409291290...|[0.11753970464564...|       1.0|
|(11,[0,1,2,4,5],[...|    0|[4.65547503527542...|[0.23277375176377...|       1.0|
|(11,[0,1,2,4,5,7]...|    1|[11.3093241329396...|[0.56546620664698...|       0.0|
|(11,[0,1,2,4,5,

In [238]:
#Decision Tree Model
dt = DecisionTreeClassifier(labelCol="label", featuresCol="features", maxDepth = 7)
model = dt.fit(train_data)

#Get predictions for Decision Tree model
predictionsDT = model.transform(test_data)

#Metrics for evaluation
auc = evaluator.evaluate(predictionsDT)
accuracy = multi_evaluator.evaluate(predictionsDT, {multi_evaluator.metricName: "accuracy"})
precision = multi_evaluator.evaluate(predictionsDT, {multi_evaluator.metricName: "weightedPrecision"})
sensitivity = multi_evaluator.evaluate(predictionsDT, {multi_evaluator.metricName: "weightedRecall"})
f1score = multi_evaluator.evaluate(predictionsDT, {multi_evaluator.metricName: "f1"})
print("AUC-ROC: ", auc)
print("Accuracy: ", accuracy)
print("Precision: ", precision)
print("Sensitivity: ", sensitivity)
print("F1-Score: ", f1score)

AUC-ROC:  0.8768508486818345
Accuracy:  0.87248322147651
Precision:  0.8737757619863225
Sensitivity:  0.87248322147651
F1-Score:  0.8721363471639829


In [239]:
#Display the Decision Tree predictions
predictionsDT.show()

+--------------------+-----+-------------+--------------------+----------+
|            features|label|rawPrediction|         probability|prediction|
+--------------------+-----+-------------+--------------------+----------+
|(11,[0,1,2,3,4,5]...|    1|    [5.0,4.0]|[0.55555555555555...|       0.0|
|(11,[0,1,2,3,4,8]...|    1|   [0.0,24.0]|           [0.0,1.0]|       1.0|
|(11,[0,1,2,3,4,8]...|    1|   [0.0,24.0]|           [0.0,1.0]|       1.0|
|(11,[0,1,2,4],[4....|    1|   [0.0,24.0]|           [0.0,1.0]|       1.0|
|(11,[0,1,2,4],[5....|    1|   [0.0,24.0]|           [0.0,1.0]|       1.0|
|(11,[0,1,2,4],[6....|    1|   [0.0,24.0]|           [0.0,1.0]|       1.0|
|(11,[0,1,2,4,5],[...|    0|  [4.0,117.0]|[0.03305785123966...|       1.0|
|(11,[0,1,2,4,5],[...|    0|    [5.0,4.0]|[0.55555555555555...|       0.0|
|(11,[0,1,2,4,5,7]...|    1|   [1.0,15.0]|     [0.0625,0.9375]|       1.0|
|(11,[0,1,2,4,5,7]...|    0|  [12.0,17.0]|[0.41379310344827...|       1.0|
|(11,[0,1,2,4,5,7]...|   