## Brain Stroke Prediction

In [1]:
import findspark 
findspark.find() 
findspark.init() 
import pyspark 
from pyspark import SparkContext
from pyspark.sql.session import SparkSession
sc = SparkContext.getOrCreate()
spark = SparkSession(sc)  

22/07/29 16:00:43 WARN Utils: Your hostname, Dave-GF65 resolves to a loopback address: 127.0.1.1; using 192.168.1.3 instead (on interface wlo1)
22/07/29 16:00:43 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/07/29 16:00:44 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
import numpy as np 
import pandas as pd 
import matplotlib.pyplot as plt  
from pyspark.ml.feature import StringIndexer 
from pyspark.ml.feature import OneHotEncoder 
from pyspark.ml.feature import VectorAssembler 
from pyspark.ml.linalg import Vectors 
from pyspark.ml.feature import Normalizer 

In [3]:
df = spark.read.csv('brain_stroke_df.csv', header=True)

In [4]:
df.show() 

+-----+------+----+------------+-------------+------------+-------------+--------------+-----------------+------------------+---------------+------+
|   id|gender| age|hypertension|heart_disease|ever_married|    work_type|Residence_type|avg_glucose_level|               bmi| smoking_status|stroke|
+-----+------+----+------------+-------------+------------+-------------+--------------+-----------------+------------------+---------------+------+
| 9046|  Male|67.0|           0|            1|         Yes|      Private|         Urban|           228.69|              36.6|formerly smoked|     1|
|51676|Female|61.0|           0|            0|         Yes|Self-employed|         Rural|           202.21|28.894559902200502|   never smoked|     1|
|31112|  Male|80.0|           0|            1|         Yes|      Private|         Rural|           105.92|              32.5|   never smoked|     1|
|60182|Female|49.0|           0|            0|         Yes|      Private|         Urban|           171.23|

In [38]:
indexer = StringIndexer(inputCols=['gender','ever_married', 'Residence_type', 'work_type', 'smoking_status'
                                  ], 
                        outputCols=['gender_index', 'ever_married_index', 'Residence_type_index', 
                                   'work_type_index', 'smoking_status_index']) 

In [39]:
indexed.dtypes

[('id', 'string'),
 ('gender', 'string'),
 ('age', 'int'),
 ('hypertension', 'int'),
 ('heart_disease', 'int'),
 ('ever_married', 'string'),
 ('work_type', 'string'),
 ('Residence_type', 'string'),
 ('avg_glucose_level', 'int'),
 ('bmi', 'int'),
 ('smoking_status', 'string'),
 ('stroke', 'string'),
 ('gender_index', 'double'),
 ('ever_married_index', 'double'),
 ('Residence_type_index', 'double'),
 ('work_type_index', 'double'),
 ('smoking_status_index', 'double'),
 ('stroke_index', 'double')]

In [40]:
encoder = OneHotEncoder(inputCols=['work_type_index', 'smoking_status_index','stroke'], 
                       outputCols=['work_type_hot', 'smoking_status_hot','stroke_hot']) 


In [41]:
vectorAssembler = VectorAssembler(inputCols=['gender_index', 'age', 'hypertension', 'heart_disease', 
                                            'ever_married_index', 'work_type_hot', 'Residence_type_index', 
                                            'avg_glucose_level', 'bmi', 'smoking_status_hot'], 
                                 outputCol='features') 

In [42]:
normalizer = Normalizer(inputCol='features', outputCol='features_norm', p=1.0)   

In [43]:
from pyspark.ml.classification import GBTClassifier

classifier = GBTClassifier(labelCol='stroke', featuresCol='features_norm', maxIter=10)


In [44]:
from pyspark.ml import Pipeline 

pipeline = Pipeline(stages=[indexer,encoder,vectorAssembler,normalizer, classifier]) 

model = pipeline.fit(df) 
prediction = model.transform(df) 

prediction.show() 

+-----+------+---+------------+-------------+------------+-------------+--------------+-----------------+---+---------------+------+------------+------------------+--------------------+---------------+--------------------+-------------+------------------+----------+--------------------+--------------------+--------------------+--------------------+----------+
|   id|gender|age|hypertension|heart_disease|ever_married|    work_type|Residence_type|avg_glucose_level|bmi| smoking_status|stroke|gender_index|ever_married_index|Residence_type_index|work_type_index|smoking_status_index|work_type_hot|smoking_status_hot|stroke_hot|            features|       features_norm|       rawPrediction|         probability|prediction|
+-----+------+---+------------+-------------+------------+-------------+--------------+-----------------+---+---------------+------+------------+------------------+--------------------+---------------+--------------------+-------------+------------------+----------+----------

In [37]:
from pyspark.sql.types import IntegerType
df = df.withColumn("age",df["age"].cast(IntegerType())) 
df = df.withColumn("hypertension", df["hypertension"].cast(IntegerType())) 
df = df.withColumn("heart_disease", df["heart_disease"].cast(IntegerType())) 
df = df.withColumn("avg_glucose_level", df["avg_glucose_level"].cast(IntegerType())) 
df = df.withColumn("bmi", df["bmi"].cast(IntegerType())) 
df = df.withColumn("stroke", df["stroke"].cast(IntegerType()))

In [69]:
df.select(df['work_type']).dtypes

[('work_type', 'int')]

In [45]:
df_train = prediction.drop('features').drop('gender').drop('age').drop('hypertension').drop('heart_disease').drop(
'ever_married').drop('work_type').drop('Residence_type').drop('avg_glucose_level').drop('bmi').drop('smoking_status')

In [46]:
df_train = df_train.drop('gender_index').drop('ever_married_index').drop('Residence_type_index').drop('work_type_index').drop('smoking_status_index').drop('work_type_hot').drop('smoking_status_hot')

In [47]:
df_train.show()

+-----+------+----------+--------------------+--------------------+--------------------+----------+
|   id|stroke|stroke_hot|       features_norm|       rawPrediction|         probability|prediction|
+-----+------+----------+--------------------+--------------------+--------------------+----------+
| 9046|     1| (1,[],[])|(15,[0,1,3,5,10,1...|[0.65015430224937...|[0.78588691609292...|       0.0|
|51676|     1| (1,[],[])|(15,[1,6,9,10,11,...|[0.83598702900468...|[0.84183883689666...|       0.0|
|31112|     1| (1,[],[])|(15,[0,1,3,5,9,10...|[0.84268231965137...|[0.84361359172168...|       0.0|
|60182|     1| (1,[],[])|(15,[1,5,10,11],[...|[1.18102016165574...|[0.91388651043276...|       0.0|
| 1665|     1| (1,[],[])|(15,[1,2,6,9,10,1...|[0.67709707642098...|[0.79481447501208...|       0.0|
|56669|     1| (1,[],[])|(15,[0,1,5,10,11,...|[0.68736663693932...|[0.79814380883965...|       0.0|
|53882|     1| (1,[],[])|[0.00564971751412...|[0.62357556017918...|[0.77680631690799...|       0.0|


In [52]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator 
from pyspark.mllib.evaluation import MulticlassMetrics
binEval = MulticlassClassificationEvaluator().setMetricName("accuracy") .setPredictionCol(
    "prediction").setLabelCol("stroke")
    
binEval.evaluate(prediction) 



0.9540027402622823

In [63]:
preds_and_labels = df_train.select(['prediction','stroke']).withColumn('label', F.col('stroke').cast(FloatType())).orderBy('prediction')

#select only prediction and label columns
preds_and_labels = preds_and_labels.select(['prediction','label'])

metrics = MulticlassMetrics(preds_and_labels.rdd.map(tuple))

print(metrics.confusionMatrix().toArray())





[[4.859e+03 1.000e+00]
 [2.340e+02 1.500e+01]]


In [62]:
import pyspark.sql.functions as F  
from pyspark.sql.types import FloatType  

In [65]:
df_train.count() 

5109