In [0]:
# Read and load the input file into a dataframe
rawstrokeDF = spark.read.format('csv').option("inferSchema", True).option("header", True).option("sep",',').load("/FileStore/tables/SparkMLib/HeartStroke.csv")

In [0]:
# Check the schema and first few records
rawstrokeDF.printSchema()
rawstrokeDF.show(5, False)

root
 |-- gender: string (nullable = true)
 |-- age: double (nullable = true)
 |-- diabetes: integer (nullable = true)
 |-- hypertension: integer (nullable = true)
 |-- stroke: string (nullable = true)
 |-- heart_disease: string (nullable = true)
 |-- smoking_history: string (nullable = true)
 |-- BMI: double (nullable = true)

+------+----+--------+------------+------+-------------+---------------+-----+
|gender|age |diabetes|hypertension|stroke|heart_disease|smoking_history|BMI  |
+------+----+--------+------------+------+-------------+---------------+-----+
|Female|80.0|0       |0           |No    |Yes          |never          |25.19|
|Female|54.0|0       |0           |No    |No           |null           |null |
|Male  |28.0|0       |0           |No    |No           |never          |null |
|Female|36.0|0       |0           |No    |No           |current        |23.45|
|Male  |76.0|0       |1           |No    |Yes          |current        |20.14|
+------+----+--------+------------+---

In [0]:
# Note you can use display funciton only in Databricks which displays records in a tabular form
display(rawstrokeDF)

gender,age,diabetes,hypertension,stroke,heart_disease,smoking_history,BMI
Female,80.0,0,0,No,Yes,never,25.19
Female,54.0,0,0,No,No,,
Male,28.0,0,0,No,No,never,
Female,36.0,0,0,No,No,current,23.45
Male,76.0,0,1,No,Yes,current,20.14
Female,20.0,0,0,No,No,never,
Female,44.0,1,0,No,No,never,19.31
Female,79.0,0,0,No,No,,23.86
Male,42.0,0,0,No,No,never,33.64
Female,32.0,0,0,No,No,never,


In [0]:
# Check the count
rawstrokeDF.count()

Out[7]: 100000

In [0]:
# Get a summary description of the dataframe
rawstrokeDF.describe().show()

+-------+------+-----------------+-------------------+------------------+------+-------------+---------------+------------------+
|summary|gender|              age|           diabetes|      hypertension|stroke|heart_disease|smoking_history|               BMI|
+-------+------+-----------------+-------------------+------------------+------+-------------+---------------+------------------+
|  count|100000|           100000|             100000|            100000|100000|       100000|          64184|             74556|
|   mean|  null|41.88585600000013|              0.085|           0.07485|  null|         null|           null|27.321028891034764|
| stddev|  null|22.51683987161704|0.27888308976661896|0.2631504702289171|  null|         null|           null| 7.686295651045002|
|    min|Female|             0.08|                  0|                 0|    No|           No|        current|             10.01|
|    max| Other|             80.0|                  1|                 1|   Yes|          

In [0]:
# Two of the columns have nulls in some records, hence their count is lesser
 
from pyspark.sql.functions import isnull, when, count, col

In [0]:

# Filter for the null records in one of the columns and get the count
rawstrokeDF.filter(col('smoking_history').isNull()).count()

Out[10]: 35816

In [0]:
# Filter for the null records in the other column and get the count
rawstrokeDF.filter(col('BMI').isNull()).count()

Out[11]: 25444

In [0]:
# Get count of non-null records for the first column
rawstrokeDF.filter(col('smoking_history').isNotNull()).count()

Out[12]: 64184

In [0]:
# Get count of non-null records for the second column
rawstrokeDF.filter(col('BMI').isNotNull()).count()

Out[13]: 74556

In [0]:
# Get the list of columns
rawstrokeDF.columns

Out[14]: ['gender',
 'age',
 'diabetes',
 'hypertension',
 'stroke',
 'heart_disease',
 'smoking_history',
 'BMI']

In [0]:
# Use list comprehension of Python language to get the columns and the respective count of nulls
rawstrokeDF.select([count(when(isnull(c), c)).alias(c) for c in rawstrokeDF.columns]).show()

+------+---+--------+------------+------+-------------+---------------+-----+
|gender|age|diabetes|hypertension|stroke|heart_disease|smoking_history|  BMI|
+------+---+--------+------------+------+-------------+---------------+-----+
|     0|  0|       0|           0|     0|            0|          35816|25444|
+------+---+--------+------------+------+-------------+---------------+-----+



In [0]:
# Drop the records which have nulls in any column 
rawstrokeDF = rawstrokeDF.na.drop()
rawstrokeDF.count()

Out[16]: 52175

In [0]:

# Use list comprehension again to get the columns and the respective count of nulls
# The number of nulls for all columns should be 0 now as the records with nulls are dropped
rawstrokeDF.select([count(when(isnull(c), c)).alias(c) for c in rawstrokeDF.columns]).show()

+------+---+--------+------------+------+-------------+---------------+---+
|gender|age|diabetes|hypertension|stroke|heart_disease|smoking_history|BMI|
+------+---+--------+------------+------+-------------+---------------+---+
|     0|  0|       0|           0|     0|            0|              0|  0|
+------+---+--------+------------+------+-------------+---------------+---+



In [0]:
# Get a summary description of the dataframe
rawstrokeDF.describe().show()

+-------+------+------------------+-------------------+-------------------+------+-------------+---------------+------------------+
|summary|gender|               age|           diabetes|       hypertension|stroke|heart_disease|smoking_history|               BMI|
+-------+------+------------------+-------------------+-------------------+------+-------------+---------------+------------------+
|  count| 52175|             52175|              52175|              52175| 52175|        52175|          52175|             52175|
|   mean|  null|46.532175946334455|0.11603258265452802|0.10464781983708672|  null|         null|           null|28.678252803066417|
| stddev|  null|19.574104259028466| 0.3202670578127496| 0.3061020246420571|  null|         null|           null| 7.203400132179493|
|    min|Female|              0.16|                  0|                  0|    No|           No|        current|             10.08|
|    max| Other|              80.0|                  1|                  1| 

In [0]:
# Display  few records to check
display(rawstrokeDF)

gender,age,diabetes,hypertension,stroke,heart_disease,smoking_history,BMI
Female,80.0,0,0,No,Yes,never,25.19
Female,36.0,0,0,No,No,current,23.45
Male,76.0,0,1,No,Yes,current,20.14
Female,44.0,1,0,No,No,never,19.31
Male,42.0,0,0,No,No,never,33.64
Female,54.0,0,0,No,No,former,54.7
Female,78.0,0,0,No,No,former,36.05
Female,67.0,0,0,No,No,never,25.69
Male,15.0,0,0,No,No,never,30.36
Female,42.0,0,0,No,No,never,24.48


In [0]:
# The variable values for any supervised ML algorithm has to be of type double.
# Take a closer look at the data type of each column

rawstrokeDF.printSchema()

root
 |-- gender: string (nullable = true)
 |-- age: double (nullable = true)
 |-- diabetes: integer (nullable = true)
 |-- hypertension: integer (nullable = true)
 |-- stroke: string (nullable = true)
 |-- heart_disease: string (nullable = true)
 |-- smoking_history: string (nullable = true)
 |-- BMI: double (nullable = true)



In [0]:
# Let us convert the columns "diabetes", "hypertension" and target varaible "stroke" data type into type double

from pyspark.sql.types import DoubleType

strokeDF = rawstrokeDF.withColumn("diabetes", col("diabetes").cast(DoubleType())).withColumn("hypertension", col("hypertension").cast(DoubleType()))

In [0]:

# Check the schema again. The data type for these two columns should be double now.
strokeDF.printSchema()

root
 |-- gender: string (nullable = true)
 |-- age: double (nullable = true)
 |-- diabetes: double (nullable = true)
 |-- hypertension: double (nullable = true)
 |-- stroke: string (nullable = true)
 |-- heart_disease: string (nullable = true)
 |-- smoking_history: string (nullable = true)
 |-- BMI: double (nullable = true)



In [0]:
# ### Transformations

# #### Binarizer
# Let us use divide the BMi into two groups: Obese and healthy. 1 represents 'obese' and 0 represents 'healthy' (If your BMI is 30.0 or higher, it falls within the obese range)
# We will use the Binarizer transformer to create a new variable 'Body Type' (1- obese and 0- healthy) by binarizing the 'BMI' variable by setting the obesity threshold value 30.0. Binarization is used for thresholding numerical feature to binary feature (0 or 1)

from pyspark.ml.feature import Binarizer
binarizer = Binarizer(inputCol="BMI", outputCol="BodyType", threshold=30.0)
binarizedDF = binarizer.transform(strokeDF)
binarizedDF.select('BMI', 'BodyType').show(5,False)

binarizedDF.printSchema()

binarizedDF.show(5)

+-----+--------+
|BMI  |BodyType|
+-----+--------+
|25.19|0.0     |
|23.45|0.0     |
|20.14|0.0     |
|19.31|0.0     |
|33.64|1.0     |
+-----+--------+
only showing top 5 rows

root
 |-- gender: string (nullable = true)
 |-- age: double (nullable = true)
 |-- diabetes: double (nullable = true)
 |-- hypertension: double (nullable = true)
 |-- stroke: string (nullable = true)
 |-- heart_disease: string (nullable = true)
 |-- smoking_history: string (nullable = true)
 |-- BMI: double (nullable = true)
 |-- BodyType: double (nullable = true)

+------+----+--------+------------+------+-------------+---------------+-----+--------+
|gender| age|diabetes|hypertension|stroke|heart_disease|smoking_history|  BMI|BodyType|
+------+----+--------+------------+------+-------------+---------------+-----+--------+
|Female|80.0|     0.0|         0.0|    No|          Yes|          never|25.19|     0.0|
|Female|36.0|     0.0|         0.0|    No|           No|        current|23.45|     0.0|
|  Male|76.0| 

In [0]:
# From the above result we can see that the value of the target feature label is now converted to binary values

In [0]:
# #### Bucketizer
# We now group the patients based on their age group. Here, we will use the Bucketizer transformer. Bucketizer is used for creating group of values of a continuous feature

from pyspark.ml.feature import Bucketizer
# lets define the age age group splits
splits = [0, 25.0, 50.0, 75.0, 100.0]
bucketizer = Bucketizer(inputCol="age", outputCol="ageGroup", splits=splits)
bucketizedDF = bucketizer.transform(binarizedDF)

bucketizedDF.printSchema()
bucketizedDF.select('age', 'ageGroup').show(10,False)

root
 |-- gender: string (nullable = true)
 |-- age: double (nullable = true)
 |-- diabetes: double (nullable = true)
 |-- hypertension: double (nullable = true)
 |-- stroke: string (nullable = true)
 |-- heart_disease: string (nullable = true)
 |-- smoking_history: string (nullable = true)
 |-- BMI: double (nullable = true)
 |-- BodyType: double (nullable = true)
 |-- ageGroup: double (nullable = true)

+----+--------+
|age |ageGroup|
+----+--------+
|80.0|3.0     |
|36.0|1.0     |
|76.0|3.0     |
|44.0|1.0     |
|42.0|1.0     |
|54.0|2.0     |
|78.0|3.0     |
|67.0|2.0     |
|15.0|0.0     |
|42.0|1.0     |
+----+--------+
only showing top 10 rows



In [0]:
# #### StringIndexer
# There are three categorical variables in our dataset viz., 'gender', 'heart disease' and 'smoking history'. These variables cannot be directly passed to our ML algorithms. We will converet them into indexes and to do that we will use StringIndexer transformer. StringIndexer converts a string column to an index column. The most frequent label gets index 0

from pyspark.ml.feature import StringIndexer
indexers = StringIndexer(inputCols= ['stroke','gender', 'heart_disease', 'smoking_history'], 
                         outputCols=['label', 'gender_indexed', 'heart_disease_indexed', 'smoking_history_indexed'])
strindexedDF = indexers.fit(bucketizedDF).transform(bucketizedDF)

strindexedDF.printSchema()
strindexedDF.select('stroke', 'label', 'gender', 'gender_indexed', 'heart_disease', 'heart_disease_indexed', 
                    'smoking_history', 'smoking_history_indexed').show(5, False)

root
 |-- gender: string (nullable = true)
 |-- age: double (nullable = true)
 |-- diabetes: double (nullable = true)
 |-- hypertension: double (nullable = true)
 |-- stroke: string (nullable = true)
 |-- heart_disease: string (nullable = true)
 |-- smoking_history: string (nullable = true)
 |-- BMI: double (nullable = true)
 |-- BodyType: double (nullable = true)
 |-- ageGroup: double (nullable = true)
 |-- label: double (nullable = false)
 |-- gender_indexed: double (nullable = false)
 |-- heart_disease_indexed: double (nullable = false)
 |-- smoking_history_indexed: double (nullable = false)

+------+-----+------+--------------+-------------+---------------------+---------------+-----------------------+
|stroke|label|gender|gender_indexed|heart_disease|heart_disease_indexed|smoking_history|smoking_history_indexed|
+------+-----+------+--------------+-------------+---------------------+---------------+-----------------------+
|No    |0.0  |Female|0.0           |Yes          |1.0     

In [0]:
### New Stage

# ### VectorAssembler
# MLlib expects all features to be contained within a single column. VectorAssembler combines multiple columns and gives single column as output

# Import VectorAssembler from pyspark.ml.feature package

from pyspark.ml.feature import VectorAssembler

# Create a list of all the variables that are required in features vector
# These features are then further used for training model

# features_col = ["age", "diabetes", "hypertension", "BMI", "BodyType", "ageGroup", "gender_indexed","heart_disease_indexed","smoking_history_indexed"]

features_col = ["diabetes", "hypertension", "BodyType", "ageGroup", "gender_indexed","heart_disease_indexed","smoking_history_indexed"]

# Create the VectorAssembler object and use it to transform the dataframe to add a vector type column features

assembler = VectorAssembler(inputCols= features_col, outputCol= "features")
assembledDF = assembler.transform(strindexedDF)

assembledDF.printSchema()
assembledDF.select("features").show(5, False)

root
 |-- gender: string (nullable = true)
 |-- age: double (nullable = true)
 |-- diabetes: double (nullable = true)
 |-- hypertension: double (nullable = true)
 |-- stroke: string (nullable = true)
 |-- heart_disease: string (nullable = true)
 |-- smoking_history: string (nullable = true)
 |-- BMI: double (nullable = true)
 |-- BodyType: double (nullable = true)
 |-- ageGroup: double (nullable = true)
 |-- label: double (nullable = false)
 |-- gender_indexed: double (nullable = false)
 |-- heart_disease_indexed: double (nullable = false)
 |-- smoking_history_indexed: double (nullable = false)
 |-- features: vector (nullable = true)

+-----------------------------+
|features                     |
+-----------------------------+
|(7,[3,5],[3.0,1.0])          |
|(7,[3,6],[1.0,2.0])          |
|[0.0,1.0,0.0,3.0,1.0,1.0,2.0]|
|(7,[0,3],[1.0,1.0])          |
|(7,[2,3,4],[1.0,1.0,1.0])    |
+-----------------------------+
only showing top 5 rows



In [0]:
# As we see for some records, the vector column displays all the values. This is called dense vector.
# For example [0.0,1.0,0.0,3.0,1.0,1.0,2.0]
# For some records, the vector column displays the size of the vector, then the list of non-zero value positions and lastly the non-zero values. This is nown as sparse vector.
# For example (7,[3,5],[3.0,1.0]) 
# In the above vector size is 7 (0 to 6). In these 7 positions position 3, and 5 have non-zero values. These are 3.0 and 1.0
# This in dense format would be [0.0,0.0,0.0,3.0,0.0,1.0,0.0] which occupies more space than sparse vector.

In [0]:
# Now all required features are vectorized.

# ## Spark ML Decision Tree Classification
# We will now train the ML model with the data that we have transformed so far. We will build classification model since, given the data, we need to determine if a person will get a stroke or not.

# ### Train-Test Split
# We split the output of  data into training and test sets (30% held out for testing)
# Note: This train-test split of for logistic regression

# We spilt the data into 70-30 set
# Training Set - 70% obesevations
# Testing Set - 30% observations
trainDF, testDF =  assembledDF.randomSplit([0.7,0.3], seed = 2020)

# print the count of observations in each set
print("Observations in training set = ", trainDF.count())
print("Observations in testing set = ", testDF.count())

Observations in training set =  36680
Observations in testing set =  15495


In [0]:
# ### Supervised Learning - Decision Tree Classification 

# import the DecisionTree function from the pyspark.ml.classification package
from pyspark.ml.classification import DecisionTreeClassifier

# Create the DecisionTreeClassifier object 'dtc' by setting the required parameters
# We will pass the VectorIndexed columns as featureCol and maxDepth which is a stopping criterion to Decision Tree Classifier.

dtc = DecisionTreeClassifier(featuresCol="features", labelCol="label",maxDepth= 10)

# Fit the DecisionTreeClassifier object on the training data to produce the model

dtcmodel = dtc.fit(trainDF)

In [0]:
type(dtc)

Out[29]: pyspark.ml.classification.DecisionTreeClassifier

In [0]:
type(dtcmodel)

Out[30]: pyspark.ml.classification.DecisionTreeClassificationModel

In [0]:
# This DecisionTreeClassificationModel can be used as a transformer to perform prediction on the testing data

dtcpredictionDF = dtcmodel.transform(testDF)

dtcpredictionDF.printSchema()

dtcpredictionDF.select("label","rawPrediction", "probability", "prediction").show(10,False)

root
 |-- gender: string (nullable = true)
 |-- age: double (nullable = true)
 |-- diabetes: double (nullable = true)
 |-- hypertension: double (nullable = true)
 |-- stroke: string (nullable = true)
 |-- heart_disease: string (nullable = true)
 |-- smoking_history: string (nullable = true)
 |-- BMI: double (nullable = true)
 |-- BodyType: double (nullable = true)
 |-- ageGroup: double (nullable = true)
 |-- label: double (nullable = false)
 |-- gender_indexed: double (nullable = false)
 |-- heart_disease_indexed: double (nullable = false)
 |-- smoking_history_indexed: double (nullable = false)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)

+-----+--------------+-----------------------------------------+----------+
|label|rawPrediction |probability                              |prediction|
+-----+--------------+-----------------------------------------+----------

In [0]:
###
# rawPrediction is the raw output of the classifier (array with length equal to the number of classes)
# probability is the result of applying the function to rawPrediction (array of length equal to that of rawPrediction)
# prediction is the argument where the array probability takes its maximum value, and it gives the most probable label (single number)
###

In [0]:
# ##### Model Evaluation

# import MulticlassClassificationEvaluator from the pyspark.ml.evaluation package
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Build the MulticlassClassificationEvaluator object 'evaluator'
multievaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction")

# 1. Accuracy
print("Accuracy: ", multievaluator.evaluate(dtcpredictionDF, {multievaluator.metricName: "accuracy"})) 
# 2. Area under the ROC curve
print('Area under the ROC curve = ', multievaluator.evaluate(dtcpredictionDF))
# 3. Precision (Positive Predictive Value)
print("Precision = ", multievaluator.evaluate(dtcpredictionDF, {multievaluator.metricName: "weightedPrecision"}))
# 4. Recall (True Positive Rate)
print("Recall = ", multievaluator.evaluate(dtcpredictionDF, {multievaluator.metricName: "weightedRecall"}))

Accuracy:  0.9812197483059051
Area under the ROC curve =  0.9720464831258082
Precision =  0.963043148050061
Recall =  0.9812197483059051


In [0]:
print(dtcmodel.toDebugString)

DecisionTreeClassificationModel: uid=DecisionTreeClassifier_ba31e8a7eed3, depth=8, numNodes=45, numClasses=2, numFeatures=7
  If (feature 3 in {0.0,1.0})
   If (feature 5 in {1.0})
    If (feature 6 in {0.0,1.0,2.0,4.0})
     If (feature 2 in {0.0})
      Predict: 0.0
     Else (feature 2 not in {0.0})
      If (feature 4 in {1.0})
       Predict: 0.0
      Else (feature 4 not in {1.0})
       If (feature 6 in {0.0,1.0})
        Predict: 0.0
       Else (feature 6 not in {0.0,1.0})
        If (feature 0 <= 0.5)
         Predict: 0.0
        Else (feature 0 > 0.5)
         If (feature 6 in {2.0})
          Predict: 0.0
         Else (feature 6 not in {2.0})
          Predict: 1.0
    Else (feature 6 not in {0.0,1.0,2.0,4.0})
     If (feature 2 in {1.0})
      Predict: 0.0
     Else (feature 2 not in {1.0})
      If (feature 3 in {0.0})
       Predict: 1.0
      Else (feature 3 not in {0.0})
       Predict: 0.0
   Else (feature 5 not in {1.0})
    Predict: 0.0
  Else (feature 3 not in {0

In [0]:
# ### Model Persistence
# Model persistence means saving your model to a disk. After you finalize your model for prediction depending upon the performance, you need to save the model to the disk. Let's say, you finalize 'dtpipelinemodel' to be used for in production environment i.e. in your application. We use the following code to save it.

# ##### Saving the model

# use save() method to save the model
# write().overwrite() is usually used when you want to replace the older model with a new one
# It might happen that you wish to retrain your model and save it at the same the place

dtcmodel.write().overwrite().save("/FileStore/tables/SparkMLib/dtcmodel")


In [0]:
display(dbutils.fs.ls("/FileStore/tables/SparkMLib"))

path,name,size,modificationTime
dbfs:/FileStore/tables/SparkMLib/HeartStroke.csv,HeartStroke.csv,2959857,1741514298000
dbfs:/FileStore/tables/SparkMLib/S5_1_PySparkMLlib.zip,S5_1_PySparkMLlib.zip,1736898,1741514275000
dbfs:/FileStore/tables/SparkMLib/dtcmodel/,dtcmodel/,0,0
dbfs:/FileStore/tables/SparkMLib/newheartstroke.csv,newheartstroke.csv,573,1741514297000


In [0]:
display(dbutils.fs.ls("/FileStore/tables/SparkMLib/dtcmodel"))

path,name,size,modificationTime
dbfs:/FileStore/tables/SparkMLib/dtcmodel/data/,data/,0,0
dbfs:/FileStore/tables/SparkMLib/dtcmodel/metadata/,metadata/,0,0
