importing SparkContext

In [1]:
!pip install pyspark
from pyspark import SparkContext



In [2]:
spark = SparkContext(master='local[2]')

In [3]:
spark

importing sparkSesssion to perform operations

In [4]:
from pyspark.sql import SparkSession

In [5]:
sc = SparkSession.builder.appName("cardiovascular diseases prediction").getOrCreate()

we'll read it as a dataframe not an rdd since the dataset is not that huge

In [6]:
df = sc.read.csv("HeartData.csv", inferSchema=True, header=True)

Data Profiling

In [7]:
df.show(5)

+--------------+--------------------+--------+-------------+-----------+------------+----------+--------+---------+------+------------+-----------+-----------+-----+---------------+-------------------+-----------------+----------------------------+-----------------------+
|General_Health|             Checkup|Exercise|Heart_Disease|Skin_Cancer|Other_Cancer|Depression|Diabetes|Arthritis|   Sex|Age_Category|Height_(cm)|Weight_(kg)|  BMI|Smoking_History|Alcohol_Consumption|Fruit_Consumption|Green_Vegetables_Consumption|FriedPotato_Consumption|
+--------------+--------------------+--------+-------------+-----------+------------+----------+--------+---------+------+------------+-----------+-----------+-----+---------------+-------------------+-----------------+----------------------------+-----------------------+
|          Poor|Within the past 2...|      No|           No|         No|          No|        No|      No|      Yes|Female|       70-74|      150.0|      32.66|14.54|            Yes|

In [8]:
df.printSchema()

root
 |-- General_Health: string (nullable = true)
 |-- Checkup: string (nullable = true)
 |-- Exercise: string (nullable = true)
 |-- Heart_Disease: string (nullable = true)
 |-- Skin_Cancer: string (nullable = true)
 |-- Other_Cancer: string (nullable = true)
 |-- Depression: string (nullable = true)
 |-- Diabetes: string (nullable = true)
 |-- Arthritis: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age_Category: string (nullable = true)
 |-- Height_(cm): double (nullable = true)
 |-- Weight_(kg): double (nullable = true)
 |-- BMI: double (nullable = true)
 |-- Smoking_History: string (nullable = true)
 |-- Alcohol_Consumption: double (nullable = true)
 |-- Fruit_Consumption: double (nullable = true)
 |-- Green_Vegetables_Consumption: double (nullable = true)
 |-- FriedPotato_Consumption: double (nullable = true)



In [9]:
df.count()

308854

In [10]:
df.groupBy("Heart_Disease").count().show()

+-------------+------+
|Heart_Disease| count|
+-------------+------+
|           No|283883|
|          Yes| 24971|
+-------------+------+



In [11]:
df.dtypes

[('General_Health', 'string'),
 ('Checkup', 'string'),
 ('Exercise', 'string'),
 ('Heart_Disease', 'string'),
 ('Skin_Cancer', 'string'),
 ('Other_Cancer', 'string'),
 ('Depression', 'string'),
 ('Diabetes', 'string'),
 ('Arthritis', 'string'),
 ('Sex', 'string'),
 ('Age_Category', 'string'),
 ('Height_(cm)', 'double'),
 ('Weight_(kg)', 'double'),
 ('BMI', 'double'),
 ('Smoking_History', 'string'),
 ('Alcohol_Consumption', 'double'),
 ('Fruit_Consumption', 'double'),
 ('Green_Vegetables_Consumption', 'double'),
 ('FriedPotato_Consumption', 'double')]

Converting strings to numbers

In [12]:
from pyspark.ml.feature import StringIndexer, VectorAssembler

In [13]:
dise = StringIndexer(inputCol='Heart_Disease', outputCol='diseases').fit(df)
df = dise.transform(df)

In [14]:
dise.labels

['No', 'Yes']

In [15]:
df.select(['diseases']).distinct().show()

+--------+
|diseases|
+--------+
|     0.0|
|     1.0|
+--------+



In [16]:
df.dtypes

[('General_Health', 'string'),
 ('Checkup', 'string'),
 ('Exercise', 'string'),
 ('Heart_Disease', 'string'),
 ('Skin_Cancer', 'string'),
 ('Other_Cancer', 'string'),
 ('Depression', 'string'),
 ('Diabetes', 'string'),
 ('Arthritis', 'string'),
 ('Sex', 'string'),
 ('Age_Category', 'string'),
 ('Height_(cm)', 'double'),
 ('Weight_(kg)', 'double'),
 ('BMI', 'double'),
 ('Smoking_History', 'string'),
 ('Alcohol_Consumption', 'double'),
 ('Fruit_Consumption', 'double'),
 ('Green_Vegetables_Consumption', 'double'),
 ('FriedPotato_Consumption', 'double'),
 ('diseases', 'double')]

Converting sex to double

In [17]:
gender = StringIndexer(inputCol='Sex', outputCol="gender").fit(df)
df= gender.transform(df)

In [18]:
df.show(5)

+--------------+--------------------+--------+-------------+-----------+------------+----------+--------+---------+------+------------+-----------+-----------+-----+---------------+-------------------+-----------------+----------------------------+-----------------------+--------+------+
|General_Health|             Checkup|Exercise|Heart_Disease|Skin_Cancer|Other_Cancer|Depression|Diabetes|Arthritis|   Sex|Age_Category|Height_(cm)|Weight_(kg)|  BMI|Smoking_History|Alcohol_Consumption|Fruit_Consumption|Green_Vegetables_Consumption|FriedPotato_Consumption|diseases|gender|
+--------------+--------------------+--------+-------------+-----------+------------+----------+--------+---------+------+------------+-----------+-----------+-----+---------------+-------------------+-----------------+----------------------------+-----------------------+--------+------+
|          Poor|Within the past 2...|      No|           No|         No|          No|        No|      No|      Yes|Female|       70-7

In [19]:
from pyspark.ml.feature import IndexToString

In [20]:
its=IndexToString(inputCol='diseases' , outputCol='target')
itsdf=its.transform(df)

In [21]:
itsdf.select('target').distinct().show()

+------+
|target|
+------+
|    No|
|   Yes|
+------+



In [22]:
itsdf.dtypes

[('General_Health', 'string'),
 ('Checkup', 'string'),
 ('Exercise', 'string'),
 ('Heart_Disease', 'string'),
 ('Skin_Cancer', 'string'),
 ('Other_Cancer', 'string'),
 ('Depression', 'string'),
 ('Diabetes', 'string'),
 ('Arthritis', 'string'),
 ('Sex', 'string'),
 ('Age_Category', 'string'),
 ('Height_(cm)', 'double'),
 ('Weight_(kg)', 'double'),
 ('BMI', 'double'),
 ('Smoking_History', 'string'),
 ('Alcohol_Consumption', 'double'),
 ('Fruit_Consumption', 'double'),
 ('Green_Vegetables_Consumption', 'double'),
 ('FriedPotato_Consumption', 'double'),
 ('diseases', 'double'),
 ('gender', 'double'),
 ('target', 'string')]

In [23]:
coll=itsdf.columns[0:11]
coll.append('Alcohol_Consumption')

we need to convert all the string datatype columns to integer and we'll use a pipeline to do it all at once

In [24]:
from pyspark.ml import Pipeline

In [25]:
indexers = []

for column in coll:
    indexer = StringIndexer(inputCol=column, outputCol=f"{column}_new").fit(df)
    indexers.append(indexer)

pipeline = Pipeline(stages=indexers)
df = pipeline.fit(df).transform(df)

In [26]:
df.show(5)

+--------------+--------------------+--------+-------------+-----------+------------+----------+--------+---------+------+------------+-----------+-----------+-----+---------------+-------------------+-----------------+----------------------------+-----------------------+--------+------+------------------+-----------+------------+-----------------+---------------+----------------+--------------+------------+-------------+-------+----------------+-----------------------+
|General_Health|             Checkup|Exercise|Heart_Disease|Skin_Cancer|Other_Cancer|Depression|Diabetes|Arthritis|   Sex|Age_Category|Height_(cm)|Weight_(kg)|  BMI|Smoking_History|Alcohol_Consumption|Fruit_Consumption|Green_Vegetables_Consumption|FriedPotato_Consumption|diseases|gender|General_Health_new|Checkup_new|Exercise_new|Heart_Disease_new|Skin_Cancer_new|Other_Cancer_new|Depression_new|Diabetes_new|Arthritis_new|Sex_new|Age_Category_new|Alcohol_Consumption_new|
+--------------+--------------------+--------+----

In [27]:
df.columns

['General_Health',
 'Checkup',
 'Exercise',
 'Heart_Disease',
 'Skin_Cancer',
 'Other_Cancer',
 'Depression',
 'Diabetes',
 'Arthritis',
 'Sex',
 'Age_Category',
 'Height_(cm)',
 'Weight_(kg)',
 'BMI',
 'Smoking_History',
 'Alcohol_Consumption',
 'Fruit_Consumption',
 'Green_Vegetables_Consumption',
 'FriedPotato_Consumption',
 'diseases',
 'gender',
 'General_Health_new',
 'Checkup_new',
 'Exercise_new',
 'Heart_Disease_new',
 'Skin_Cancer_new',
 'Other_Cancer_new',
 'Depression_new',
 'Diabetes_new',
 'Arthritis_new',
 'Sex_new',
 'Age_Category_new',
 'Alcohol_Consumption_new']

In [28]:
coll1=['Height_(cm)',
 'Weight_(kg)',
 'BMI',
 'Alcohol_Consumption',
 'Fruit_Consumption',
 'Green_Vegetables_Consumption',
 'FriedPotato_Consumption',
 'gender',
 'General_Health_new',
 'Checkup_new',
 'Exercise_new',
 'Skin_Cancer_new',
 'Other_Cancer_new',
 'Depression_new',
 'Diabetes_new',
 'Arthritis_new',
 'Sex_new',
 'Age_Category_new',
 'Alcohol_Consumption_new']

In [29]:
coll2=coll1.copy()
coll2.append('diseases')

In [30]:
df1 = df.select(coll2)

In [31]:
df1.show(5)

+-----------+-----------+-----+-------------------+-----------------+----------------------------+-----------------------+------+------------------+-----------+------------+---------------+----------------+--------------+------------+-------------+-------+----------------+-----------------------+--------+
|Height_(cm)|Weight_(kg)|  BMI|Alcohol_Consumption|Fruit_Consumption|Green_Vegetables_Consumption|FriedPotato_Consumption|gender|General_Health_new|Checkup_new|Exercise_new|Skin_Cancer_new|Other_Cancer_new|Depression_new|Diabetes_new|Arthritis_new|Sex_new|Age_Category_new|Alcohol_Consumption_new|diseases|
+-----------+-----------+-----+-------------------+-----------------+----------------------------+-----------------------+------+------------------+-----------+------------+---------------+----------------+--------------+------------+-------------+-------+----------------+-----------------------+--------+
|      150.0|      32.66|14.54|                0.0|             30.0|          

In [32]:
df1.dtypes

[('Height_(cm)', 'double'),
 ('Weight_(kg)', 'double'),
 ('BMI', 'double'),
 ('Alcohol_Consumption', 'double'),
 ('Fruit_Consumption', 'double'),
 ('Green_Vegetables_Consumption', 'double'),
 ('FriedPotato_Consumption', 'double'),
 ('gender', 'double'),
 ('General_Health_new', 'double'),
 ('Checkup_new', 'double'),
 ('Exercise_new', 'double'),
 ('Skin_Cancer_new', 'double'),
 ('Other_Cancer_new', 'double'),
 ('Depression_new', 'double'),
 ('Diabetes_new', 'double'),
 ('Arthritis_new', 'double'),
 ('Sex_new', 'double'),
 ('Age_Category_new', 'double'),
 ('Alcohol_Consumption_new', 'double'),
 ('diseases', 'double')]

In [33]:
df1=df1.toPandas().replace('NA',0).astype(float)

In [34]:
df1.dtypes

Height_(cm)                     float64
Weight_(kg)                     float64
BMI                             float64
Alcohol_Consumption             float64
Fruit_Consumption               float64
Green_Vegetables_Consumption    float64
FriedPotato_Consumption         float64
gender                          float64
General_Health_new              float64
Checkup_new                     float64
Exercise_new                    float64
Skin_Cancer_new                 float64
Other_Cancer_new                float64
Depression_new                  float64
Diabetes_new                    float64
Arthritis_new                   float64
Sex_new                         float64
Age_Category_new                float64
Alcohol_Consumption_new         float64
diseases                        float64
dtype: object

Visualization: it has a lot of data and we dont have enough computing power or time to build a pairplot to identify patterns and predict which model would be the most suitable based on the graphs

In [35]:
# import seaborn as sns
# import pandas as pd
# import matplotlib.pyplot as plt
# import numpy as np
# %matplotlib inline

In [36]:
# sns.pairplot(df1, hue='diseases')
# plt.show()

Converting features into feature vectors

In [37]:
from pyspark.ml.feature import VectorAssembler

In [38]:
print(df1.columns)

Index(['Height_(cm)', 'Weight_(kg)', 'BMI', 'Alcohol_Consumption',
       'Fruit_Consumption', 'Green_Vegetables_Consumption',
       'FriedPotato_Consumption', 'gender', 'General_Health_new',
       'Checkup_new', 'Exercise_new', 'Skin_Cancer_new', 'Other_Cancer_new',
       'Depression_new', 'Diabetes_new', 'Arthritis_new', 'Sex_new',
       'Age_Category_new', 'Alcohol_Consumption_new', 'diseases'],
      dtype='object')


Converting pandas Dataframe back to pyspark dataframe

In [39]:
df1= sc.createDataFrame(df1)

In [40]:
fea= VectorAssembler(inputCols=coll1, outputCol='features')

In [41]:
df2=fea.transform(df1)

In [42]:
df2.show(5)

+-----------+-----------+-----+-------------------+-----------------+----------------------------+-----------------------+------+------------------+-----------+------------+---------------+----------------+--------------+------------+-------------+-------+----------------+-----------------------+--------+--------------------+
|Height_(cm)|Weight_(kg)|  BMI|Alcohol_Consumption|Fruit_Consumption|Green_Vegetables_Consumption|FriedPotato_Consumption|gender|General_Health_new|Checkup_new|Exercise_new|Skin_Cancer_new|Other_Cancer_new|Depression_new|Diabetes_new|Arthritis_new|Sex_new|Age_Category_new|Alcohol_Consumption_new|diseases|            features|
+-----------+-----------+-----+-------------------+-----------------+----------------------------+-----------------------+------+------------------+-----------+------------+---------------+----------------+--------------+------------+-------------+-------+----------------+-----------------------+--------+--------------------+
|      150.0|   

In [43]:
traind,testd= df2.randomSplit([0.8,0.2])

Using RandomForestClassifier

In [44]:
from pyspark.ml.classification import RandomForestClassifier

In [45]:
lr= RandomForestClassifier(featuresCol='features', labelCol='diseases')

In [46]:
lrm=lr.fit(traind)

In [47]:
y=lrm.transform(testd)

In [48]:
print(y.columns)

['Height_(cm)', 'Weight_(kg)', 'BMI', 'Alcohol_Consumption', 'Fruit_Consumption', 'Green_Vegetables_Consumption', 'FriedPotato_Consumption', 'gender', 'General_Health_new', 'Checkup_new', 'Exercise_new', 'Skin_Cancer_new', 'Other_Cancer_new', 'Depression_new', 'Diabetes_new', 'Arthritis_new', 'Sex_new', 'Age_Category_new', 'Alcohol_Consumption_new', 'diseases', 'features', 'rawPrediction', 'probability', 'prediction']


In [49]:
y.select(['diseases', 'rawPrediction', 'probability', 'prediction']).show()

+--------+--------------------+--------------------+----------+
|diseases|       rawPrediction|         probability|prediction|
+--------+--------------------+--------------------+----------+
|     0.0|[18.5571497166680...|[0.92785748583340...|       0.0|
|     0.0|[18.5716676781295...|[0.92858338390647...|       0.0|
|     0.0|[18.5716676781295...|[0.92858338390647...|       0.0|
|     0.0|[18.5716676781295...|[0.92858338390647...|       0.0|
|     0.0|[17.4873012303677...|[0.87436506151838...|       0.0|
|     0.0|[18.5571497166680...|[0.92785748583340...|       0.0|
|     0.0|[18.5716676781295...|[0.92858338390647...|       0.0|
|     0.0|[17.4873012303677...|[0.87436506151838...|       0.0|
|     0.0|[18.1376762029048...|[0.90688381014524...|       0.0|
|     0.0|[18.5571497166680...|[0.92785748583340...|       0.0|
|     0.0|[17.4873012303677...|[0.87436506151838...|       0.0|
|     0.0|[18.5716676781295...|[0.92858338390647...|       0.0|
|     0.0|[18.1435534973552...|[0.907177

Model Evaluation

In [50]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [51]:
eval= MulticlassClassificationEvaluator(labelCol='diseases', metricName="accuracy")

In [52]:
eval.evaluate(y)

0.9186998433740776

In [53]:
from pyspark.mllib.evaluation import MulticlassMetrics

Creating a RDD consisting of dis(disease) and prediction

In [54]:
lrmetrics =MulticlassMetrics(y['diseases', 'prediction'].rdd)



In [55]:
print(f"the accuracy of this model is {round(lrmetrics.accuracy*100,2)}% ")

the accuracy of this model is 91.87% 
