<a href="https://colab.research.google.com/github/alfianhid/Prediksi-Penyakit-Stroke-Pada-Seseorang-Menggunakan-PySpark/blob/main/Prediksi_Penyakit_Stroke_Pada_Seseorang_Menggunakan_PySpark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

**Installing Dependencies & Initiating a New Spark Session**

In [1]:
# Install PySpark
!pip install pyspark

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/89/db/e18cfd78e408de957821ec5ca56de1250645b05f8523d169803d8df35a64/pyspark-3.1.2.tar.gz (212.4MB)
[K     |████████████████████████████████| 212.4MB 70kB/s 
[?25hCollecting py4j==0.10.9
[?25l  Downloading https://files.pythonhosted.org/packages/9e/b6/6a4fb90cd235dc8e265a6a2067f2a2c99f0d91787f06aca4bcf7c23f3f80/py4j-0.10.9-py2.py3-none-any.whl (198kB)
[K     |████████████████████████████████| 204kB 18.1MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.1.2-py2.py3-none-any.whl size=212880768 sha256=861e30a9542211a43e5eba1dd3c5f33efdb2a72692ee49410b70168a2c08eb01
  Stored in directory: /root/.cache/pip/wheels/40/1b/2c/30f43be2627857ab80062bef1527c0128f7b4070b6b2d02139
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.1.2


In [2]:
# Creating a SparkSession
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Prediksi Penyakit Stroke Pada Seseorang Menggunakan PySpark").getOrCreate()

**Load & Explore dataset**

In [3]:
# Loading the stroke dataset from Google Drive
from google.colab import drive
drive.mount('/content/drive/', force_remount=True)

Mounted at /content/drive/


In [4]:
# Create a spark dataframe
df = spark.read.csv('/content/drive/MyDrive/Colab Notebooks/datasets/healthcare-dataset-stroke-data.csv', header=True, inferSchema=True)

In [5]:
# Displaying the dataframe
df.show()

+-----+------+----+------------+-------------+------------+-------------+--------------+-----------------+----+---------------+------+
|   id|gender| age|hypertension|heart_disease|ever_married|    work_type|Residence_type|avg_glucose_level| bmi| smoking_status|stroke|
+-----+------+----+------------+-------------+------------+-------------+--------------+-----------------+----+---------------+------+
| 9046|  Male|67.0|           0|            1|         Yes|      Private|         Urban|           228.69|36.6|formerly smoked|     1|
|51676|Female|61.0|           0|            0|         Yes|Self-employed|         Rural|           202.21| N/A|   never smoked|     1|
|31112|  Male|80.0|           0|            1|         Yes|      Private|         Rural|           105.92|32.5|   never smoked|     1|
|60182|Female|49.0|           0|            0|         Yes|      Private|         Urban|           171.23|34.4|         smokes|     1|
| 1665|Female|79.0|           1|            0|         

In [6]:
# Printing the schema
df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- age: double (nullable = true)
 |-- hypertension: integer (nullable = true)
 |-- heart_disease: integer (nullable = true)
 |-- ever_married: string (nullable = true)
 |-- work_type: string (nullable = true)
 |-- Residence_type: string (nullable = true)
 |-- avg_glucose_level: double (nullable = true)
 |-- bmi: string (nullable = true)
 |-- smoking_status: string (nullable = true)
 |-- stroke: integer (nullable = true)



In [7]:
# Count the total of people with stroke and non-stroke class
print((df.count(),len(df.columns)))
df.groupBy('stroke').count().show()

(5100, 12)
+------+-----+
|stroke|count|
+------+-----+
|     1|  244|
|     0| 4856|
+------+-----+



In [8]:
# Displaying the summary of statistics
df.describe().show()

+-------+------------------+------+------------------+-------------------+-------------------+------------+---------+--------------+------------------+------------------+--------------+-------------------+
|summary|                id|gender|               age|       hypertension|      heart_disease|ever_married|work_type|Residence_type| avg_glucose_level|               bmi|smoking_status|             stroke|
+-------+------------------+------+------------------+-------------------+-------------------+------------+---------+--------------+------------------+------------------+--------------+-------------------+
|  count|              5100|  5100|              5100|               5100|               5100|        5100|     5100|          5100|              5100|              5100|          5100|               5100|
|   mean| 36520.33803921568|  null|43.223529411764716|0.09725490196078432|0.05411764705882353|        null|     null|          null|106.17687058823516|28.896040816326543|      

**Data Preparation**

In [9]:
# Checking for null values
for col in df.columns:
  print(col+":",df[df[col].isNull()].count())

id: 0
gender: 0
age: 0
hypertension: 0
heart_disease: 0
ever_married: 0
work_type: 0
Residence_type: 0
avg_glucose_level: 0
bmi: 0
smoking_status: 0
stroke: 0


In [10]:
# Check for any unnecessary values
def count_zeros():
  columns_list = ['age', 'hypertension', 'heart_disease','avg_glucose_level','bmi']
  for i in columns_list:
    print(i+":",df[df[i]=='N/A'].count())

In [11]:
count_zeros()

age: 0
hypertension: 0
heart_disease: 0
avg_glucose_level: 0
bmi: 200


In [12]:
# Calculate and replace the unnecessary values by the mean value of the respective column
from pyspark.sql.functions import *
for i in df.columns[9:10]:
  data= df.agg({i:'mean'}).first()[0]
  print(f'mean value for {i} is {int(data)}')
  df=df.withColumn(i,when(df[i]=='N/A',int(data)).otherwise(df[i]))

mean value for bmi is 28


In [13]:
# Display the dataframe 
df.show()

+-----+------+----+------------+-------------+------------+-------------+--------------+-----------------+----+---------------+------+
|   id|gender| age|hypertension|heart_disease|ever_married|    work_type|Residence_type|avg_glucose_level| bmi| smoking_status|stroke|
+-----+------+----+------------+-------------+------------+-------------+--------------+-----------------+----+---------------+------+
| 9046|  Male|67.0|           0|            1|         Yes|      Private|         Urban|           228.69|36.6|formerly smoked|     1|
|51676|Female|61.0|           0|            0|         Yes|Self-employed|         Rural|           202.21|  28|   never smoked|     1|
|31112|  Male|80.0|           0|            1|         Yes|      Private|         Rural|           105.92|32.5|   never smoked|     1|
|60182|Female|49.0|           0|            0|         Yes|      Private|         Urban|           171.23|34.4|         smokes|     1|
| 1665|Female|79.0|           1|            0|         

**Performing Correlation Analysis & Feature Selection**

In [15]:
# Convert 'bmi' column from string to double
from pyspark.sql.functions import col , column
changed_df = df.withColumn("bmi", col("bmi").cast("double"))
changed_df.dtypes

[('id', 'int'),
 ('gender', 'string'),
 ('age', 'double'),
 ('hypertension', 'int'),
 ('heart_disease', 'int'),
 ('ever_married', 'string'),
 ('work_type', 'string'),
 ('Residence_type', 'string'),
 ('avg_glucose_level', 'double'),
 ('bmi', 'double'),
 ('smoking_status', 'string'),
 ('stroke', 'int')]

In [16]:
# Find the correlation between the set of input & output variables
columns_list = ['age', 'hypertension', 'heart_disease','avg_glucose_level','bmi']
for i in columns_list:
  print(f'Correlation to stroke for {i} is {df.stat.corr("stroke",i)}')

Correlation to stroke for age is 0.24487886380792556
Correlation to stroke for hypertension is 0.1279557579344937
Correlation to stroke for heart_disease is 0.13722522884862864
Correlation to stroke for avg_glucose_level is 0.1360330038652827
Correlation to stroke for bmi is 0.03603127327571626


In [17]:
# Feature selection
from pyspark.ml.feature import VectorAssembler
assembler= VectorAssembler(inputCols=['age', 'hypertension', 'heart_disease','avg_glucose_level','bmi'],outputCol='features')
output_data= assembler.transform(df)

In [18]:
# Print the schema
output_data.printSchema()

root
 |-- id: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- age: double (nullable = true)
 |-- hypertension: integer (nullable = true)
 |-- heart_disease: integer (nullable = true)
 |-- ever_married: string (nullable = true)
 |-- work_type: string (nullable = true)
 |-- Residence_type: string (nullable = true)
 |-- avg_glucose_level: double (nullable = true)
 |-- bmi: double (nullable = true)
 |-- smoking_status: string (nullable = true)
 |-- stroke: integer (nullable = true)
 |-- features: vector (nullable = true)



In [19]:
# Display the dataframe
output_data.show()

+-----+------+----+------------+-------------+------------+-------------+--------------+-----------------+----+---------------+------+--------------------+
|   id|gender| age|hypertension|heart_disease|ever_married|    work_type|Residence_type|avg_glucose_level| bmi| smoking_status|stroke|            features|
+-----+------+----+------------+-------------+------------+-------------+--------------+-----------------+----+---------------+------+--------------------+
| 9046|  Male|67.0|           0|            1|         Yes|      Private|         Urban|           228.69|36.6|formerly smoked|     1|[67.0,0.0,1.0,228...|
|51676|Female|61.0|           0|            0|         Yes|Self-employed|         Rural|           202.21|28.0|   never smoked|     1|[61.0,0.0,0.0,202...|
|31112|  Male|80.0|           0|            1|         Yes|      Private|         Rural|           105.92|32.5|   never smoked|     1|[80.0,0.0,1.0,105...|
|60182|Female|49.0|           0|            0|         Yes|     

**Split Dataset & Build the Model**

In [20]:
# Create the final data
from pyspark.ml.classification import LogisticRegression
final_data = output_data.select('features','stroke')

In [21]:
# Print schema of the final data
final_data.printSchema()

root
 |-- features: vector (nullable = true)
 |-- stroke: integer (nullable = true)



In [22]:
# Split the dataset and build the model
train, test = final_data.randomSplit([0.7,0.3])
models = LogisticRegression(labelCol='stroke')
model = models.fit(train)

In [23]:
# Summary of the model
summary = model.summary
summary.predictions.describe().show()

+-------+-------------------+----------+
|summary|             stroke|prediction|
+-------+-------------------+----------+
|  count|               3528|      3528|
|   mean|0.04195011337868481|       0.0|
| stddev| 0.2005036069622285|       0.0|
|    min|                0.0|       0.0|
|    max|                1.0|       0.0|
+-------+-------------------+----------+



**Evaluate and Save the Model**

In [24]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
predictions = model.evaluate(test)

In [25]:
predictions.predictions.show(10)

+--------------------+------+--------------------+--------------------+----------+
|            features|stroke|       rawPrediction|         probability|prediction|
+--------------------+------+--------------------+--------------------+----------+
|[0.16,0.0,0.0,69....|     0|[7.83035802869214...|[0.99960267478563...|       0.0|
|[0.16,0.0,0.0,109...|     0|[7.61723343465783...|[0.99950834102130...|       0.0|
|[0.16,0.0,0.0,114...|     0|[7.55563853894242...|[0.99947712156257...|       0.0|
|[0.32,0.0,0.0,55....|     0|[7.86035823882333...|[0.9996144130735,...|       0.0|
|[0.32,0.0,0.0,73....|     0|[7.76664435379704...|[0.99957654694350...|       0.0|
|[0.32,0.0,0.0,108...|     0|[7.55328674711002...|[0.99947589105945...|       0.0|
|[0.32,0.0,0.0,127...|     0|[7.44291388900208...|[0.99941476629455...|       0.0|
|[0.4,0.0,0.0,109....|     0|[7.59557899573624...|[0.99949758371969...|       0.0|
|[0.56,0.0,0.0,57....|     0|[7.79003826250297...|[0.99958633414102...|       0.0|
|[0.

In [26]:
evaluator = BinaryClassificationEvaluator(rawPredictionCol='rawPrediction', labelCol='stroke')
evaluator.evaluate(model.transform(test))

0.8276027551942203

In [27]:
# Save the model
model.save("my_model")

In [28]:
# Load saved model back to the environment
from pyspark.ml.classification import LogisticRegressionModel
model = LogisticRegressionModel.load('my_model')

**Prediction on New Data with the saved model**

In [29]:
# Create a new spark dataframe
test_df = spark.read.csv("/content/drive/MyDrive/Colab Notebooks/datasets/new-healthcare-dataset-stroke-data.csv", header= True, inferSchema=True)

In [30]:
# Print the schema
test_df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- hypertension: integer (nullable = true)
 |-- heart_disease: integer (nullable = true)
 |-- ever_married: string (nullable = true)
 |-- work_type: string (nullable = true)
 |-- Residence_type: string (nullable = true)
 |-- avg_glucose_level: double (nullable = true)
 |-- bmi: string (nullable = true)
 |-- smoking_status: string (nullable = true)
 |-- stroke: integer (nullable = true)



In [31]:
# Convert 'bmi' column from string to double
from pyspark.sql.functions import col , column
test_df = test_df.withColumn("bmi", col("bmi").cast("double"))
test_df.dtypes

[('id', 'int'),
 ('gender', 'string'),
 ('age', 'int'),
 ('hypertension', 'int'),
 ('heart_disease', 'int'),
 ('ever_married', 'string'),
 ('work_type', 'string'),
 ('Residence_type', 'string'),
 ('avg_glucose_level', 'double'),
 ('bmi', 'double'),
 ('smoking_status', 'string'),
 ('stroke', 'int')]

In [33]:
# Create an additional feature, then merged the column 
test_data = assembler.transform(test_df)

In [34]:
# Print the schema
test_data.printSchema()

root
 |-- id: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- hypertension: integer (nullable = true)
 |-- heart_disease: integer (nullable = true)
 |-- ever_married: string (nullable = true)
 |-- work_type: string (nullable = true)
 |-- Residence_type: string (nullable = true)
 |-- avg_glucose_level: double (nullable = true)
 |-- bmi: double (nullable = true)
 |-- smoking_status: string (nullable = true)
 |-- stroke: integer (nullable = true)
 |-- features: vector (nullable = true)



In [35]:
# Use model to make predictions
results = model.transform(test_data)
results.printSchema()

root
 |-- id: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- hypertension: integer (nullable = true)
 |-- heart_disease: integer (nullable = true)
 |-- ever_married: string (nullable = true)
 |-- work_type: string (nullable = true)
 |-- Residence_type: string (nullable = true)
 |-- avg_glucose_level: double (nullable = true)
 |-- bmi: double (nullable = true)
 |-- smoking_status: string (nullable = true)
 |-- stroke: integer (nullable = true)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



In [36]:
results.printSchema()

root
 |-- id: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- hypertension: integer (nullable = true)
 |-- heart_disease: integer (nullable = true)
 |-- ever_married: string (nullable = true)
 |-- work_type: string (nullable = true)
 |-- Residence_type: string (nullable = true)
 |-- avg_glucose_level: double (nullable = true)
 |-- bmi: double (nullable = true)
 |-- smoking_status: string (nullable = true)
 |-- stroke: integer (nullable = true)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



In [41]:
# Display the predictions
results.show(2)

+-----+------+---+------------+-------------+------------+---------+--------------+-----------------+----+--------------+------+--------------------+--------------------+--------------------+----------+
|   id|gender|age|hypertension|heart_disease|ever_married|work_type|Residence_type|avg_glucose_level| bmi|smoking_status|stroke|            features|       rawPrediction|         probability|prediction|
+-----+------+---+------------+-------------+------------+---------+--------------+-----------------+----+--------------+------+--------------------+--------------------+--------------------+----------+
|17739|  Male| 57|           0|            0|         Yes|  Private|         Rural|            84.96|36.7|       Unknown|     1|[57.0,0.0,0.0,84....|[3.38145191423093...|[0.96711980607462...|       0.0|
|49669|Female| 14|           0|            0|          No| children|         Rural|            57.93|30.9|       Unknown|     1|[14.0,0.0,0.0,57....|[6.70597015655595...|[0.99877791027185.