In [18]:
%env SPARK_LOCAL_HOSTNAME=localhost
import findspark
findspark.init(r'C:\Users\Downloads\spark-3.1.2-bin-hadoop3.2')
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.sql.context import SQLContext
# Starting the spark session
spark = SparkSession.builder.appName("HeartPrediction").config("spark.driver.host", "localhost").getOrCreate()

# load a csv file and converting it to a dataframe
data = spark.read.format('com.databricks.spark.csv').option("delimiter", ";").option("header", True).load(r'C:\Users\Desktop\heartdata.csv')

env: SPARK_LOCAL_HOSTNAME=localhost


In [19]:
# reading the values of the first 5 columns
import pandas as pd
pd.DataFrame(data.take(5), columns=data.columns).transpose()

Unnamed: 0,0,1,2,3,4
Age,40,49,37,48,54
Sex,M,F,M,F,M
ChestPainType,ATA,NAP,ATA,ASY,NAP
RestingBP,140,160,130,138,150
Cholesterol,289,180,283,214,195
FastingBS,0,0,0,0,0
RestingECG,Normal,Normal,ST,Normal,Normal
MaxHR,172,156,98,108,122
ExerciseAngina,N,N,N,Y,N
Oldpeak,0,1,0,1.5,0


In [20]:
print(data.printSchema()) 

root
 |-- Age: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- ChestPainType: string (nullable = true)
 |-- RestingBP: string (nullable = true)
 |-- Cholesterol: string (nullable = true)
 |-- FastingBS: string (nullable = true)
 |-- RestingECG: string (nullable = true)
 |-- MaxHR: string (nullable = true)
 |-- ExerciseAngina: string (nullable = true)
 |-- Oldpeak: string (nullable = true)
 |-- ST_Slope: string (nullable = true)
 |-- HeartDisease: string (nullable = true)

None


In [21]:
# creating the label and features columns that will be used by the decision trees model
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.feature import VectorAssembler
labelIndexer = StringIndexer(inputCol='HeartDisease', outputCol="label").fit(data)
features = data.columns[1:11]

In [22]:
# Number of the dataset's instances
data.count()

918

In [23]:
data.dtypes

[('Age', 'string'),
 ('Sex', 'string'),
 ('ChestPainType', 'string'),
 ('RestingBP', 'string'),
 ('Cholesterol', 'string'),
 ('FastingBS', 'string'),
 ('RestingECG', 'string'),
 ('MaxHR', 'string'),
 ('ExerciseAngina', 'string'),
 ('Oldpeak', 'string'),
 ('ST_Slope', 'string'),
 ('HeartDisease', 'string')]

In [8]:
# dataset summary
data.describe().toPandas()

Unnamed: 0,summary,Age,Sex,ChestPainType,RestingBP,Cholesterol,FastingBS,RestingECG,MaxHR,ExerciseAngina,Oldpeak,ST_Slope,HeartDisease
0,count,918.0,918,918,918.0,918.0,918.0,918,918.0,918,918.0,918,918.0
1,mean,53.510893246187365,,,132.39651416122004,198.7995642701525,0.233115468409586,,136.80936819172112,,0.8873638344226581,,0.5533769063180828
2,stddev,9.43261650673202,,,18.514154119907808,109.38414455220344,0.4230456247393029,,25.46033413825029,,1.0665701510493264,,0.497413738284597
3,min,28.0,F,ASY,0.0,0.0,0.0,LVH,100.0,N,-0.1,Down,0.0
4,max,77.0,M,TA,98.0,85.0,1.0,ST,99.0,Y,6.2,Up,1.0


In [25]:
# Data pre-processing phase
# casting the strings to numerical values
from pyspark.sql.functions import col
dataset = data.select(col('Age').cast('float'),col('Sex'),col('ChestPainType'),
                    col('RestingBP').cast('float'),col('Cholesterol').cast('float'),
                    col('FastingBS').cast('float'),col('RestingECG'),
                    col('MaxHR').cast('float'),col('ExerciseAngina'),
                    col('Oldpeak').cast('float'),col('ST_Slope')
                    ,col('HeartDisease')
                        )
dataset.show()

+----+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+
| Age|Sex|ChestPainType|RestingBP|Cholesterol|FastingBS|RestingECG|MaxHR|ExerciseAngina|Oldpeak|ST_Slope|HeartDisease|
+----+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+
|40.0|  M|          ATA|    140.0|      289.0|      0.0|    Normal|172.0|             N|    0.0|      Up|           0|
|49.0|  F|          NAP|    160.0|      180.0|      0.0|    Normal|156.0|             N|    1.0|    Flat|           1|
|37.0|  M|          ATA|    130.0|      283.0|      0.0|        ST| 98.0|             N|    0.0|      Up|           0|
|48.0|  F|          ASY|    138.0|      214.0|      0.0|    Normal|108.0|             Y|    1.5|    Flat|           1|
|54.0|  M|          NAP|    150.0|      195.0|      0.0|    Normal|122.0|             N|    0.0|      Up|           0|
|39.0|  M|          NAP|    120.0|      339.0|  

In [26]:
# checking whether the dataframe has null values
from pyspark.sql.functions import isnull, when, count, col
dataset.select([count(when(isnull(c), c)).alias(c) for c in dataset.columns]).show()

+---+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+
|Age|Sex|ChestPainType|RestingBP|Cholesterol|FastingBS|RestingECG|MaxHR|ExerciseAngina|Oldpeak|ST_Slope|HeartDisease|
+---+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+
|  0|  0|            0|        0|          0|        0|         0|    0|             0|      0|       0|           0|
+---+---+-------------+---------+-----------+---------+----------+-----+--------------+-------+--------+------------+



In [27]:
dataset.dtypes

[('Age', 'float'),
 ('Sex', 'string'),
 ('ChestPainType', 'string'),
 ('RestingBP', 'float'),
 ('Cholesterol', 'float'),
 ('FastingBS', 'float'),
 ('RestingECG', 'string'),
 ('MaxHR', 'float'),
 ('ExerciseAngina', 'string'),
 ('Oldpeak', 'float'),
 ('ST_Slope', 'string'),
 ('HeartDisease', 'string')]

In [35]:
# mapping our string columns to label indices columns
from pyspark.ml.feature import StringIndexer
dataset = StringIndexer(inputCol='HeartDisease',outputCol='label',handleInvalid='keep').fit(dataset).transform(dataset)
dataset = StringIndexer(inputCol='Sex',outputCol='sex',handleInvalid='keep').fit(dataset).transform(dataset)
dataset = StringIndexer(inputCol='ChestPainType',outputCol='Chest_Pain',handleInvalid='keep').fit(dataset).transform(dataset)
dataset = StringIndexer(inputCol='RestingECG',outputCol='Resting_ECG',handleInvalid='keep').fit(dataset).transform(dataset)
dataset = StringIndexer(inputCol='ExerciseAngina',outputCol='Ex_Angina',handleInvalid='keep').fit(dataset).transform(dataset)
dataset = StringIndexer(inputCol='ST_Slope',outputCol='Slope_St',handleInvalid='keep').fit(dataset).transform(dataset)
dataset.show()

+----+---+---------+-----------+---------+-----+-------+-----+----------+-----------+---------+--------+
| Age|sex|RestingBP|Cholesterol|FastingBS|MaxHR|Oldpeak|label|Chest_Pain|Resting_ECG|Ex_Angina|Slope_St|
+----+---+---------+-----------+---------+-----+-------+-----+----------+-----------+---------+--------+
|40.0|0.0|    140.0|      289.0|      0.0|172.0|    0.0|  1.0|       2.0|        0.0|      0.0|     1.0|
|49.0|1.0|    160.0|      180.0|      0.0|156.0|    1.0|  0.0|       1.0|        0.0|      0.0|     0.0|
|37.0|0.0|    130.0|      283.0|      0.0| 98.0|    0.0|  1.0|       2.0|        2.0|      0.0|     1.0|
|48.0|1.0|    138.0|      214.0|      0.0|108.0|    1.5|  0.0|       0.0|        0.0|      1.0|     0.0|
|54.0|0.0|    150.0|      195.0|      0.0|122.0|    0.0|  1.0|       1.0|        0.0|      0.0|     1.0|
|39.0|0.0|    120.0|      339.0|      0.0|170.0|    0.0|  1.0|       1.0|        0.0|      0.0|     1.0|
|45.0|1.0|    130.0|      237.0|      0.0|170.0|    0.0

In [33]:
# removing the initial string columns
dataset = dataset.drop('HeartDisease')
dataset = dataset.drop('ChestPainType')
dataset = dataset.drop('RestingECG')
dataset = dataset.drop('ExerciseAngina')
dataset = dataset.drop('ST_Slope')
dataset.show()

+----+---+---------+-----------+---------+-----+-------+-----+----------+-----------+---------+--------+
| Age|Sex|RestingBP|Cholesterol|FastingBS|MaxHR|Oldpeak|label|Chest_Pain|Resting_ECG|Ex_Angina|Slope_St|
+----+---+---------+-----------+---------+-----+-------+-----+----------+-----------+---------+--------+
|40.0|  M|    140.0|      289.0|      0.0|172.0|    0.0|  1.0|       2.0|        0.0|      0.0|     1.0|
|49.0|  F|    160.0|      180.0|      0.0|156.0|    1.0|  0.0|       1.0|        0.0|      0.0|     0.0|
|37.0|  M|    130.0|      283.0|      0.0| 98.0|    0.0|  1.0|       2.0|        2.0|      0.0|     1.0|
|48.0|  F|    138.0|      214.0|      0.0|108.0|    1.5|  0.0|       0.0|        0.0|      1.0|     0.0|
|54.0|  M|    150.0|      195.0|      0.0|122.0|    0.0|  1.0|       1.0|        0.0|      0.0|     1.0|
|39.0|  M|    120.0|      339.0|      0.0|170.0|    0.0|  1.0|       1.0|        0.0|      0.0|     1.0|
|45.0|  F|    130.0|      237.0|      0.0|170.0|    0.0

In [36]:
dataset.dtypes

[('Age', 'float'),
 ('sex', 'double'),
 ('RestingBP', 'float'),
 ('Cholesterol', 'float'),
 ('FastingBS', 'float'),
 ('MaxHR', 'float'),
 ('Oldpeak', 'float'),
 ('label', 'double'),
 ('Chest_Pain', 'double'),
 ('Resting_ECG', 'double'),
 ('Ex_Angina', 'double'),
 ('Slope_St', 'double')]

In [37]:
# Assembling all the dataframe's attributes into a one dimentional vector 'features' to facilitate the model's work 
required_features = ['Age','sex','RestingBP','Cholesterol','FastingBS','MaxHR','Oldpeak','Chest_Pain','Resting_ECG'
                    ,'Ex_Angina','Slope_St']
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=required_features, outputCol='features')
transformed_data = assembler.transform(dataset)

In [38]:
# random data splitting into two subsets, testing and training groups 
(training_data, test_data) = transformed_data.randomSplit([0.7,0.3])

In [40]:
# Initializing the decision tree model based on these two subsets
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
decisionTree = DecisionTreeClassifier(labelCol='label',featuresCol='features')

In [41]:
# fitting/training the build model on the training subset
model=decisionTree.fit(training_data)

In [42]:
# testing the model on the testing subset and storing the result (values of the predicted class 'HeartDisease') into a new variable
predictions=model.transform(test_data)

In [43]:
# Evaluation phase by computing the model's accuracy
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(labelCol='label',predictionCol='prediction',metricName='accuracy')

In [44]:
accuracy = evaluator.evaluate(predictions)
print('Test Accuracy: ', accuracy)

Test Accuracy:  0.8424657534246576
