# Prediction of Stroke Probabibility

# Install Spark

In [136]:
!apt-get update > /dev/null
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [137]:
!wget -q http://apache.osuosl.org/spark/spark-3.0.1/spark-3.0.1-bin-hadoop3.2.tgz

In [138]:
!tar xf spark-3.0.1-bin-hadoop3.2.tgz
!pip install -q findspark

In [139]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

In [140]:
os.environ["SPARK_HOME"] = "/content/spark-3.0.1-bin-hadoop3.2"

In [141]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [142]:
sc = spark.sparkContext
sc

# Load Data

Data is available at : https://drive.google.com/drive/folders/1AbFCiys28K_xe4UjK9CzIVRqf2ynth9B?usp=sharing

Steps to Read Data:-

1. Download Data (csv file) from the above link
2. Upload the csv file in your google drive
3. Mount your drive in colab and get the path of the csv file from the drive
4. Execute below code, after pasting the path inside quotes, to read the file in colab


In [143]:
train = spark.read.csv('/content/drive/MyDrive/Praxis/Course-Foundations of Data Science/Exams/Healthcare_Dataset.csv', inferSchema=True,header=True)

In [144]:
train.printSchema()

root
 |-- id: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- age: double (nullable = true)
 |-- hypertension: integer (nullable = true)
 |-- heart_disease: integer (nullable = true)
 |-- ever_married: string (nullable = true)
 |-- work_type: string (nullable = true)
 |-- Residence_type: string (nullable = true)
 |-- avg_glucose_level: double (nullable = true)
 |-- bmi: string (nullable = true)
 |-- smoking_status: string (nullable = true)
 |-- stroke: integer (nullable = true)
 |-- _c12: string (nullable = true)



# Exploratory Data Analysis

In [145]:
# Converting data-type of feature "bmi" from string to double

train=train.withColumn("bmi",train["bmi"].cast("double"))

In [146]:
train.groupBy('stroke').count().show()

+------+-----+
|stroke|count|
+------+-----+
|     1|  249|
|     0| 4861|
+------+-----+



OBSERVATION :
As can be seen from this observation. This is an Imbalanced dataset, where the number of observations belonging to one class is significantly lower than those belonging to the other classes.

In [147]:
# create DataFrame as a temporary view

train.createOrReplaceTempView('table')

In [148]:
spark.sql("SELECT work_type, count(work_type) as work_type_count FROM table WHERE stroke == 1 GROUP BY work_type \
ORDER BY work_type_count DESC").show()

+-------------+---------------+
|    work_type|work_type_count|
+-------------+---------------+
|      Private|            149|
|Self-employed|             65|
|     Govt_job|             33|
|     children|              2|
+-------------+---------------+



OBSERVATION :
Private occupation is the most dangerous work type in this dataset.

In [149]:
spark.sql("SELECT gender, count(gender) as count_gender, count(gender)*100/sum(count(gender)) over() \
 as percent  FROM table GROUP BY gender").show()


+------+------------+--------------------+
|gender|count_gender|             percent|
+------+------------+--------------------+
|Female|        2994|  58.590998043052835|
| Other|           1|0.019569471624266144|
|  Male|        2115|    41.3894324853229|
+------+------------+--------------------+



OBSERVATION :
59% of all people are Female and only 41% are Male that participated in stroke research.

In [150]:
spark.sql("SELECT gender, count(gender), (COUNT(gender) * 100.0) /(SELECT count(gender) FROM table WHERE gender == 'Male') as \
percentage FROM table WHERE stroke = '1' and gender = 'Male' GROUP BY gender").show()

+------+-------------+----------------+
|gender|count(gender)|      percentage|
+------+-------------+----------------+
|  Male|          108|5.10638297872340|
+------+-------------+----------------+



 OBSERVATION :
 5% Male have had a stroke.

In [151]:
spark.sql("SELECT gender, count(gender), (COUNT(gender) * 100.0) /(SELECT count(gender) FROM table WHERE gender == 'Female') as \
percentage FROM table WHERE stroke = '1' and gender = 'Female' GROUP BY gender").show()

+------+-------------+----------------+
|gender|count(gender)|      percentage|
+------+-------------+----------------+
|Female|          141|4.70941883767535|
+------+-------------+----------------+



OBSERVATION : 4.7% Female have had a stroke.

In [152]:
# RISK BY AGE

spark.sql("SELECT age, count(age) as age_count FROM table WHERE stroke == 1 GROUP BY age ORDER BY age_count DESC").show()

+----+---------+
| age|age_count|
+----+---------+
|78.0|       21|
|79.0|       17|
|80.0|       17|
|81.0|       14|
|57.0|       11|
|76.0|       10|
|68.0|        9|
|74.0|        9|
|63.0|        9|
|82.0|        9|
|59.0|        8|
|77.0|        8|
|71.0|        7|
|58.0|        7|
|69.0|        6|
|70.0|        6|
|72.0|        6|
|61.0|        6|
|54.0|        6|
|75.0|        6|
+----+---------+
only showing top 20 rows



In [153]:
train.filter((train['stroke'] == 1) & (train['age'] > '50')).count()


226

OBSERVATION: 
using filter operation to calculate the number of stroke cases for people after 50 years.

As we can see Age is an important risk factor for developing a stroke.

# Dealing with Null Values

In [154]:
#checking Null values

train.toPandas().isnull().sum()


id                      0
gender                  0
age                     0
hypertension            0
heart_disease           0
ever_married            0
work_type               0
Residence_type          0
avg_glucose_level       0
bmi                   201
smoking_status          0
stroke                  0
_c12                 5110
dtype: int64

In [155]:
# fill in miss values with mean for feature "bmi"

from pyspark.sql.functions import mean
mean = train.select(mean(train['bmi'])).collect()
mean_bmi = mean[0][0]
train_f = train.na.fill(mean_bmi,['bmi'])

In [156]:
# Recheck if any further null values are present

train_f.toPandas().isnull().sum()

id                      0
gender                  0
age                     0
hypertension            0
heart_disease           0
ever_married            0
work_type               0
Residence_type          0
avg_glucose_level       0
bmi                     0
smoking_status          0
stroke                  0
_c12                 5110
dtype: int64

In [157]:
train_f.describe().show()

+-------+-----------------+------+------------------+------------------+-------------------+------------+---------+--------------+------------------+-----------------+--------------+-------------------+----+
|summary|               id|gender|               age|      hypertension|      heart_disease|ever_married|work_type|Residence_type| avg_glucose_level|              bmi|smoking_status|             stroke|_c12|
+-------+-----------------+------+------------------+------------------+-------------------+------------+---------+--------------+------------------+-----------------+--------------+-------------------+----+
|  count|             5110|  5110|              5110|              5110|               5110|        5110|     5110|          5110|              5110|             5110|          5110|               5110|   0|
|   mean|36517.82935420744|  null|43.226614481409015|0.0974559686888454|0.05401174168297456|        null|     null|          null|106.14767710371804|28.89323691179472| 

# One Hot Encoding (Dealing with Categorical Variables)

In [158]:
from pyspark.ml.feature import (VectorAssembler,OneHotEncoder,
                                StringIndexer)

In [159]:
# indexing all categorical columns in the dataset
from pyspark.ml.feature import StringIndexer
indexer1 = StringIndexer(inputCol="gender", outputCol="genderIndex")
indexer2 = StringIndexer(inputCol="ever_married", outputCol="ever_marriedIndex")
indexer3 = StringIndexer(inputCol="work_type", outputCol="work_typeIndex")
indexer4 = StringIndexer(inputCol="Residence_type", outputCol="Residence_typeIndex")
indexer5 = StringIndexer(inputCol="smoking_status", outputCol="smoking_statusIndex")

In [160]:
# Doing one hot encoding of indexed data
from pyspark.ml.feature import OneHotEncoder
encoder = OneHotEncoder(inputCols=["genderIndex","ever_marriedIndex","work_typeIndex","Residence_typeIndex","smoking_statusIndex"],
                                 outputCols=["genderVec","ever_marriedVec","work_typeVec","Residence_typeVec","smoking_statusVec"])

In [161]:
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=['genderVec',
 'age',
 'hypertension',
 'heart_disease',
 'ever_marriedVec',
 'work_typeVec',
 'Residence_typeVec',
 'avg_glucose_level',
 'bmi',
 'smoking_statusVec'],outputCol='features')

# Train and Test Split

In [162]:
# splitting training and validation data

train_data,val_data = train_f.randomSplit([0.7,0.3],seed=100)

# DECISION TREE CLASSIFICATION

In [163]:
#Importing Decision Tree  Classifiers

from pyspark.ml.classification import DecisionTreeClassifier

In [164]:
dtc = DecisionTreeClassifier(labelCol='stroke',featuresCol='features')

In [165]:
# Creating Pipeline

from pyspark.ml import Pipeline

pipeline_1 = Pipeline(stages=[indexer1, indexer2, indexer3, indexer4, indexer5, encoder, assembler,dtc])

In [166]:
# Training Model

model = pipeline_1.fit(train_data)

Predictions

In [167]:
dtc_predictions = model.transform(val_data)

# Decision Tree Model Evaluation

Accuracy Score

In [168]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Select (prediction, true label) and compute test error

acc_evaluator = MulticlassClassificationEvaluator(labelCol="stroke", predictionCol="prediction", metricName="accuracy")

dtc_acc = acc_evaluator.evaluate(dtc_predictions)

print('A Decision Tree algorithm had an accuracy of: {0:2.2f}%'.format(dtc_acc*100))

A Decision Tree algorithm had an accuracy of: 95.09%


AUC-ROC

In [169]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluator = BinaryClassificationEvaluator(labelCol='stroke')

# area under ROC curve

auroc = evaluator.evaluate(dtc_predictions, {evaluator.metricName: "areaUnderROC"})
print("Area under ROC Curve: {:.4f}".format(auroc))

Area under ROC Curve: 0.3578


# LOGISTIC REGRESSION

In [170]:
#Importing Logistic Regression 

from pyspark.ml.classification import LogisticRegression


In [171]:
# Logistic Regression

lr = LogisticRegression(labelCol='stroke',featuresCol='features',maxIter=5)

In [172]:
# Creating Pipeline

from pyspark.ml import Pipeline
pipeline_2 = Pipeline(stages=[indexer1, indexer2, indexer3, indexer4, indexer5, encoder, assembler,lr])

In [173]:
# training model pipeline with data

model = pipeline_2.fit(train_data)

Prediction

In [174]:
lr_predictions=model.transform(val_data)


# Logistic Model Evaluation

Accuracy Score

In [175]:
# Accuracy Score

from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Select (prediction, true label) and compute test error

acc_evaluator = MulticlassClassificationEvaluator(labelCol="stroke", predictionCol="prediction", metricName="accuracy")
lr_acc=acc_evaluator.evaluate(lr_predictions)

print('A Logistic Regression algorithm had an accuracy of: {0:2.2f}%'.format(lr_acc*100))


A Logistic Regression algorithm had an accuracy of: 95.16%


AUC-ROC

In [176]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluator = BinaryClassificationEvaluator(labelCol='stroke')

# area under ROC curve

auroc = evaluator.evaluate(lr_predictions, {evaluator.metricName: "areaUnderROC"})
print("Area under ROC Curve: {:.4f}".format(auroc))

Area under ROC Curve: 0.8009


# Random Forest Classifier (Ensemble)

In [177]:
#Importing Random Forest

from pyspark.ml.classification import RandomForestClassifier


In [178]:
# RF

rf = RandomForestClassifier(featuresCol = 'features', labelCol = 'stroke')

In [179]:
#Creating Pipeline

pipeline_3 = Pipeline(stages=[indexer1, indexer2, indexer3, indexer4, indexer5, encoder, assembler,rf])



In [180]:
# Training

model = pipeline_3.fit(train_data)

Predictions

In [181]:

rf_predictions=model.transform(val_data)

# RF_Model_Evaluation

Accuracy

In [182]:
#Accuracy Score

from pyspark.ml.evaluation import MulticlassClassificationEvaluator

acc_evaluator = MulticlassClassificationEvaluator(labelCol="stroke", predictionCol="prediction", metricName="accuracy")

rf_acc=acc_evaluator.evaluate(rf_predictions)

print('A Random Forest algorithm had an accuracy of: {0:2.2f}%'.format(rf_acc*100))

A Random Forest algorithm had an accuracy of: 95.23%


AUC-ROC

In [183]:
#Area under curve

evaluator = BinaryClassificationEvaluator(labelCol='stroke')

auroc = evaluator.evaluate(rf_predictions, {evaluator.metricName: "areaUnderROC"})

print("Area under ROC Curve: {:.4f}".format(auroc))


Area under ROC Curve: 0.7826
