<a href="https://colab.research.google.com/github/Praxis-QR/BDSN/blob/main/Pipelines_with_Spark_Predicting_Diabetes.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

![CC-BY-SA](https://licensebuttons.net/l/by-sa/3.0/88x31.png)<br>
<hr>

![alt text](http://1.bp.blogspot.com/-nqAGzznZQNo/UwS8rxjfXeI/AAAAAAAABTA/nunmRLowpps/s1600/PraxisLogo.gif)<br>
[Data Science Program](http://praxis.ac.in/Programs/business-analytics/)

<hr>

[Prithwis Mukerjee](http://www.yantrajaal.com)<br>

#Spark Install

In [1]:
!pip3 -q install pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Praxis').master("local[*]").getOrCreate()
sc = spark.sparkContext
#sc

[K     |████████████████████████████████| 281.4 MB 49 kB/s 
[K     |████████████████████████████████| 198 kB 62.6 MB/s 
[?25h  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


## Imports

In [82]:
from pyspark.sql.functions import mean
from pyspark.ml.feature import (VectorAssembler,OneHotEncoder, StringIndexer)
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Loading data

In [2]:
!wget -O heart.csv -q https://raw.githubusercontent.com/Praxis-QR/BDSN/main/Documents/BigML_Heart_Dataset.csv

In [3]:
#loading dataset 
heartDF = spark.read.csv('heart.csv', inferSchema=True,header=True)
heartDF.show(5)

+------+----+------------+-------------+---------------+-----+--------+
|gender| age|hypertension|heart_disease|smoking_history|  BMI|diabetes|
+------+----+------------+-------------+---------------+-----+--------+
|Female|80.0|           0|            1|          never|25.19|       0|
|Female|54.0|           0|            0|           null| null|       0|
|  Male|28.0|           0|            0|          never| null|       0|
|Female|36.0|           0|            0|        current|23.45|       0|
|  Male|76.0|           1|            1|        current|20.14|       0|
+------+----+------------+-------------+---------------+-----+--------+
only showing top 5 rows



# Data Quick Look


In [4]:
print((heartDF.count(),len(heartDF.columns)))

(100000, 7)


In [5]:
heartDF.printSchema()

root
 |-- gender: string (nullable = true)
 |-- age: double (nullable = true)
 |-- hypertension: integer (nullable = true)
 |-- heart_disease: integer (nullable = true)
 |-- smoking_history: string (nullable = true)
 |-- BMI: double (nullable = true)
 |-- diabetes: integer (nullable = true)



In [6]:
#descriptive analysis 
heartDF.describe().show()

+-------+------+-----------------+------------------+------------------+---------------+------------------+-------------------+
|summary|gender|              age|      hypertension|     heart_disease|smoking_history|               BMI|           diabetes|
+-------+------+-----------------+------------------+------------------+---------------+------------------+-------------------+
|  count|100000|           100000|            100000|            100000|          64184|             74556|             100000|
|   mean|  null|41.88585600000013|           0.07485|           0.03942|           null|27.321028891034764|              0.085|
| stddev|  null|22.51683987161704|0.2631504702289171|0.1945930169980986|           null| 7.686295651045002|0.27888308976661896|
|    min|Female|             0.08|                 0|                 0|        current|             10.01|                  0|
|    max| Other|             80.0|                 1|                 1|    not current|             95.

In [7]:
#diabetic and non-diabetic count 
heartDF.groupBy('diabetes').count().show()

+--------+-----+
|diabetes|count|
+--------+-----+
|       1| 8500|
|       0|91500|
+--------+-----+



## Using Spark SQL

In [9]:
# create DataFrame as a temporary view
heartDF.createOrReplaceTempView('heart_T')

In [10]:
#group by gender 
spark.sql(\
          "SELECT \
           gender, count(gender) as count_gender, \
           count(gender)*100/sum(count(gender)) over() as percent  \
           FROM heart_T GROUP BY gender" \
           ).show()

+------+------------+-------+
|gender|count_gender|percent|
+------+------------+-------+
|Female|       58552| 58.552|
| Other|          18|  0.018|
|  Male|       41430|  41.43|
+------+------------+-------+



In [12]:
#group by gender having diabetes 
spark.sql(\
          "SELECT gender, count(gender), \
          round((COUNT(gender) * 100.0) /(SELECT count(gender) FROM heart_T ),2) as percentage \
          FROM heart_T WHERE diabetes = '1'  GROUP BY gender"\
          ).show()

+------+-------------+----------+
|gender|count(gender)|percentage|
+------+-------------+----------+
|Female|         4461|      4.46|
|  Male|         4039|      4.04|
+------+-------------+----------+



In [14]:
#group by gender having heart disease 
spark.sql(\
          "SELECT gender, count(gender), \
          round((COUNT(gender) * 100.0) /(SELECT count(gender) FROM heart_T ),2) as percentage \
          FROM heart_T WHERE heart_disease = '1'  GROUP BY gender"\
          ).show()

+------+-------------+----------+
|gender|count(gender)|percentage|
+------+-------------+----------+
|Female|         1562|      1.56|
|  Male|         2380|      2.38|
+------+-------------+----------+



In [16]:
#group by gender having hypertension 
spark.sql(\
          "SELECT gender, count(gender), \
          round((COUNT(gender) * 100.0) /(SELECT count(gender) FROM heart_T ),2) as percentage \
          FROM heart_T WHERE hypertension = '1'  GROUP BY gender"\
          ).show()

+------+-------------+----------+
|gender|count(gender)|percentage|
+------+-------------+----------+
|Female|         4197|      4.20|
|  Male|         3288|      3.29|
+------+-------------+----------+



In [17]:
#count of different types of smoker
heartDF.groupBy('smoking_history').count().show()

+---------------+-----+
|smoking_history|count|
+---------------+-----+
|    not current| 6447|
|           null|35816|
|         former| 9352|
|        current| 9286|
|          never|35095|
|           ever| 4004|
+---------------+-----+



In [20]:
#group by gender having hypertension 
spark.sql(\
          "SELECT smoking_history, count(smoking_history) as count, \
          round((COUNT(smoking_history) * 100.0) /(SELECT count(smoking_history) FROM heart_T ),2) as percentage \
          FROM heart_T   GROUP BY smoking_history"\
          ).show()

+---------------+-----+----------+
|smoking_history|count|percentage|
+---------------+-----+----------+
|    not current| 6447|     10.04|
|           null|    0|      0.00|
|         former| 9352|     14.57|
|        current| 9286|     14.47|
|          never|35095|     54.68|
|           ever| 4004|      6.24|
+---------------+-----+----------+



In [21]:
#Age vs Diabetes 
spark.sql("SELECT age, count(age) as age_count FROM heart_T WHERE diabetes == 1 GROUP BY age ORDER BY age_count DESC").show()

+----+---------+
| age|age_count|
+----+---------+
|80.0|     1024|
|62.0|      258|
|61.0|      250|
|66.0|      241|
|67.0|      236|
|65.0|      234|
|57.0|      233|
|59.0|      216|
|60.0|      213|
|64.0|      211|
|68.0|      208|
|69.0|      206|
|58.0|      205|
|63.0|      202|
|55.0|      201|
|71.0|      192|
|54.0|      191|
|56.0|      187|
|74.0|      184|
|70.0|      183|
+----+---------+
only showing top 20 rows



In [22]:
#count of diabetic patients over age 50 
heartDF.filter((heartDF['diabetes'] == 1) & (heartDF['age'] > '50')).count()


6650

# Data Preprocessing

In [23]:
#checking null values
heartDF.toPandas().isnull().sum()

gender                 0
age                    0
hypertension           0
heart_disease          0
smoking_history    35816
BMI                25444
diabetes               0
dtype: int64

In [27]:
# fill in missing values for smoking_history
heartDF2 = heartDF.na.fill('No Info', subset=['smoking_history'])
# fill in miss values for BMI with mean

cmean = heartDF2.select(mean(heartDF2['BMI'])).collect()
meanBMI = cmean[0][0]
heartDF2 = heartDF2.na.fill(meanBMI,['BMI'])

In [29]:
heartDF2.describe().show()
# note, mean BMI has not changed, but std of BMI has reduced, as expected

+-------+------+-----------------+------------------+------------------+---------------+------------------+-------------------+
|summary|gender|              age|      hypertension|     heart_disease|smoking_history|               BMI|           diabetes|
+-------+------+-----------------+------------------+------------------+---------------+------------------+-------------------+
|  count|100000|           100000|            100000|            100000|         100000|            100000|             100000|
|   mean|  null|41.88585600000013|           0.07485|           0.03942|           null|27.321028891031315|              0.085|
| stddev|  null|22.51683987161704|0.2631504702289171|0.1945930169980986|           null|  6.63678340151884|0.27888308976661896|
|    min|Female|             0.08|                 0|                 0|        No Info|             10.01|                  0|
|    max| Other|             80.0|                 1|                 1|    not current|             95.

In [30]:
heartDF2.toPandas().isnull().sum()

gender             0
age                0
hypertension       0
heart_disease      0
smoking_history    0
BMI                0
diabetes           0
dtype: int64

In [32]:
heartDF2.dtypes

[('gender', 'string'),
 ('age', 'double'),
 ('hypertension', 'int'),
 ('heart_disease', 'int'),
 ('smoking_history', 'string'),
 ('BMI', 'double'),
 ('diabetes', 'int')]

# Serial Data Transformations

## String Indexer

In [34]:
# we have two categorical varibales gender and smoking history 
# indexing all categorical columns in the dataset

GenderIndexer = StringIndexer(inputCol="gender", outputCol="genderIndex")
SmokeHistIndexer = StringIndexer(inputCol="smoking_history", outputCol="smoking_statusIndex")

In [50]:
heartDF2.show(5)

+------+----+------------+-------------+---------------+------------------+--------+
|gender| age|hypertension|heart_disease|smoking_history|               BMI|diabetes|
+------+----+------------+-------------+---------------+------------------+--------+
|Female|80.0|           0|            1|          never|             25.19|       0|
|Female|54.0|           0|            0|        No Info|27.321028891034764|       0|
|  Male|28.0|           0|            0|          never|27.321028891034764|       0|
|Female|36.0|           0|            0|        current|             23.45|       0|
|  Male|76.0|           1|            1|        current|             20.14|       0|
+------+----+------------+-------------+---------------+------------------+--------+
only showing top 5 rows



In [70]:
# Using any one String Indexer
#GenderIndexer.fit(heartDF2).transform(heartDF2).show()
SmokeHistIndexer.fit(heartDF2).transform(heartDF2).show(5)

+------+----+------------+-------------+---------------+------------------+--------+-------------------+
|gender| age|hypertension|heart_disease|smoking_history|               BMI|diabetes|smoking_statusIndex|
+------+----+------------+-------------+---------------+------------------+--------+-------------------+
|Female|80.0|           0|            1|          never|             25.19|       0|                1.0|
|Female|54.0|           0|            0|        No Info|27.321028891034764|       0|                0.0|
|  Male|28.0|           0|            0|          never|27.321028891034764|       0|                1.0|
|Female|36.0|           0|            0|        current|             23.45|       0|                3.0|
|  Male|76.0|           1|            1|        current|             20.14|       0|                3.0|
+------+----+------------+-------------+---------------+------------------+--------+-------------------+
only showing top 5 rows



In [69]:
# Putting TWO indexers, the start of the so-called Pipeline
GenderIndexer.fit(SmokeHistIndexer.fit(heartDF2).transform(heartDF2)).transform(SmokeHistIndexer.fit(heartDF2).transform(heartDF2)).show(5)

+------+----+------------+-------------+---------------+------------------+--------+-------------------+-----------+
|gender| age|hypertension|heart_disease|smoking_history|               BMI|diabetes|smoking_statusIndex|genderIndex|
+------+----+------------+-------------+---------------+------------------+--------+-------------------+-----------+
|Female|80.0|           0|            1|          never|             25.19|       0|                1.0|        0.0|
|Female|54.0|           0|            0|        No Info|27.321028891034764|       0|                0.0|        0.0|
|  Male|28.0|           0|            0|          never|27.321028891034764|       0|                1.0|        1.0|
|Female|36.0|           0|            0|        current|             23.45|       0|                3.0|        0.0|
|  Male|76.0|           1|            1|        current|             20.14|       0|                3.0|        1.0|
+------+----+------------+-------------+---------------+--------

## One Hot Encoder

In [56]:
OHE_Gender = OneHotEncoder(inputCols=["genderIndex"], outputCols=["genderVec"])
#ohe.fit(??).transform(??).show()
OHE_Gender.fit(GenderIndexer.fit(heartDF2).transform(heartDF2)).transform(GenderIndexer.fit(heartDF2).transform(heartDF2)).show(5)

+------+----+------------+-------------+---------------+------------------+--------+-----------+-------------+
|gender| age|hypertension|heart_disease|smoking_history|               BMI|diabetes|genderIndex|    genderVec|
+------+----+------------+-------------+---------------+------------------+--------+-----------+-------------+
|Female|80.0|           0|            1|          never|             25.19|       0|        0.0|(2,[0],[1.0])|
|Female|54.0|           0|            0|        No Info|27.321028891034764|       0|        0.0|(2,[0],[1.0])|
|  Male|28.0|           0|            0|          never|27.321028891034764|       0|        1.0|(2,[1],[1.0])|
|Female|36.0|           0|            0|        current|             23.45|       0|        0.0|(2,[0],[1.0])|
|  Male|76.0|           1|            1|        current|             20.14|       0|        1.0|(2,[1],[1.0])|
+------+----+------------+-------------+---------------+------------------+--------+-----------+-------------+
o

In [58]:
OHE_SmokeStat = OneHotEncoder(inputCols=["smoking_statusIndex"], outputCols=["smoking_statusVec"])
#ohe.fit(??).transform(??).show()
OHE_SmokeStat.fit(SmokeHistIndexer.fit(heartDF2).transform(heartDF2)).transform(SmokeHistIndexer.fit(heartDF2).transform(heartDF2)).show(5)

+------+----+------------+-------------+---------------+------------------+--------+-------------------+-----------------+
|gender| age|hypertension|heart_disease|smoking_history|               BMI|diabetes|smoking_statusIndex|smoking_statusVec|
+------+----+------------+-------------+---------------+------------------+--------+-------------------+-----------------+
|Female|80.0|           0|            1|          never|             25.19|       0|                1.0|    (5,[1],[1.0])|
|Female|54.0|           0|            0|        No Info|27.321028891034764|       0|                0.0|    (5,[0],[1.0])|
|  Male|28.0|           0|            0|          never|27.321028891034764|       0|                1.0|    (5,[1],[1.0])|
|Female|36.0|           0|            0|        current|             23.45|       0|                3.0|    (5,[3],[1.0])|
|  Male|76.0|           1|            1|        current|             20.14|       0|                3.0|    (5,[3],[1.0])|
+------+----+---

In [59]:
# Encoding Both Simltaenously
OHE_Gender_Smoke = OneHotEncoder(inputCols=["genderIndex","smoking_statusIndex"],
                                 outputCols=["genderVec","smoking_statusVec"])

In [62]:
#The 'Pipeline' becomes even longer
# but still ohe.fit(??).transform(??).show()
OHE_Gender_Smoke.fit(GenderIndexer.fit(SmokeHistIndexer.fit(heartDF2).transform(heartDF2)).transform(SmokeHistIndexer.fit(heartDF2).transform(heartDF2))).transform(GenderIndexer.fit(SmokeHistIndexer.fit(heartDF2).transform(heartDF2)).transform(SmokeHistIndexer.fit(heartDF2).transform(heartDF2))).show(5)

+------+----+------------+-------------+---------------+------------------+--------+-------------------+-----------+-------------+-----------------+
|gender| age|hypertension|heart_disease|smoking_history|               BMI|diabetes|smoking_statusIndex|genderIndex|    genderVec|smoking_statusVec|
+------+----+------------+-------------+---------------+------------------+--------+-------------------+-----------+-------------+-----------------+
|Female|80.0|           0|            1|          never|             25.19|       0|                1.0|        0.0|(2,[0],[1.0])|    (5,[1],[1.0])|
|Female|54.0|           0|            0|        No Info|27.321028891034764|       0|                0.0|        0.0|(2,[0],[1.0])|    (5,[0],[1.0])|
|  Male|28.0|           0|            0|          never|27.321028891034764|       0|                1.0|        1.0|(2,[1],[1.0])|    (5,[1],[1.0])|
|Female|36.0|           0|            0|        current|             23.45|       0|                3.0|  

## Assembler

In [65]:
F_assembler = VectorAssembler(inputCols=['genderVec',
 'age',
 'hypertension',
 'heart_disease',
 'BMI',
 'smoking_statusVec'],outputCol='features')

In [67]:
#assembler.transform(??).show()
F_assembler.transform(OHE_Gender_Smoke.fit(GenderIndexer.fit(SmokeHistIndexer.fit(heartDF2).transform(heartDF2)).transform(SmokeHistIndexer.fit(heartDF2).transform(heartDF2))).transform(GenderIndexer.fit(SmokeHistIndexer.fit(heartDF2).transform(heartDF2)).transform(SmokeHistIndexer.fit(heartDF2).transform(heartDF2)))).show(5)

+------+----+------------+-------------+---------------+------------------+--------+-------------------+-----------+-------------+-----------------+--------------------+
|gender| age|hypertension|heart_disease|smoking_history|               BMI|diabetes|smoking_statusIndex|genderIndex|    genderVec|smoking_statusVec|            features|
+------+----+------------+-------------+---------------+------------------+--------+-------------------+-----------+-------------+-----------------+--------------------+
|Female|80.0|           0|            1|          never|             25.19|       0|                1.0|        0.0|(2,[0],[1.0])|    (5,[1],[1.0])|(11,[0,2,4,5,7],[...|
|Female|54.0|           0|            0|        No Info|27.321028891034764|       0|                0.0|        0.0|(2,[0],[1.0])|    (5,[0],[1.0])|(11,[0,2,5,6],[1....|
|  Male|28.0|           0|            0|          never|27.321028891034764|       0|                1.0|        1.0|(2,[1],[1.0])|    (5,[1],[1.0])|(1

In [96]:
basePipe = Pipeline(stages=[GenderIndexer, SmokeHistIndexer, OHE_Gender_Smoke, F_assembler])
basePipe.fit(heartDF).transform(heartDF).show()

Py4JJavaError: ignored

# Train-Test Split 

In [71]:
# splitting training and validation data
train_heart,val_heart = heartDF2.randomSplit([0.7,0.3])
print(train_heart.count())
print(val_heart.count())

69945
30055


#Alternative Models

## Logistic Regression Model Pipeline 

In [73]:
lr = LogisticRegression(labelCol='diabetes',featuresCol='features',maxIter=5)
lr_pipeline = Pipeline(stages=[GenderIndexer, SmokeHistIndexer, OHE_Gender_Smoke, F_assembler,lr])
# training model pipeline with data
lr_model = lr_pipeline.fit(train_heart)
lr_predictions=lr_model.transform(val_heart)

In [76]:
acc_evaluator = MulticlassClassificationEvaluator(labelCol="diabetes", predictionCol="prediction", metricName="accuracy")
lr_acc=acc_evaluator.evaluate(lr_predictions)

#print('A Logistic Regression algorithm had an accuracy of: {0:2.2f}%'.format(lr_acc*100))
print(round(lr_acc,3), 'is the accuray of the LR pipeline')


0.912 is the accuray of the LR pipeline


In [98]:
# Let's use the run-of-the-mill evaluator
evaluator = BinaryClassificationEvaluator(labelCol='diabetes')

# We have only two choices: area under ROC and PR curves :-(
lr_auroc = evaluator.evaluate(lr_predictions, {evaluator.metricName: "areaUnderROC"})
#auprc = evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderPR"})
print("Area under ROC Curve: {:.4f}".format(lr_auroc))
#print("Area under PR Curve: {:.4f}".format(auprc))


Area under ROC Curve: 0.8326


## Random Forest Pipeline

In [79]:
rf = RandomForestClassifier(featuresCol = 'features', labelCol = 'diabetes')
rf_pipeline = Pipeline(stages=[GenderIndexer, SmokeHistIndexer, OHE_Gender_Smoke, F_assembler, rf])
# training model pipeline with data
rf_model = rf_pipeline.fit(train_heart)
rf_predictions=rf_model.transform(val_heart)

In [86]:
acc_evaluator = MulticlassClassificationEvaluator(labelCol="diabetes", predictionCol="prediction", metricName="accuracy")
rf_acc=acc_evaluator.evaluate(rf_predictions)

print('A Random Forest algorithm had an accuracy of: {0:2.2f}%'.format(rf_acc*100))

A Random Forest algorithm had an accuracy of: 91.30%


In [99]:
# Let's use the run-of-the-mill evaluator
evaluator = BinaryClassificationEvaluator(labelCol='diabetes')

# We have only two choices: area under ROC and PR curves :-(
rf_auroc = evaluator.evaluate(rf_predictions, {evaluator.metricName: "areaUnderROC"})
#auprc = evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderPR"})
print("Area under ROC Curve: {:.4f}".format(rf_auroc))
#print("Area under PR Curve: {:.4f}".format(auprc))

Area under ROC Curve: 0.8198


## Gradient Boost Model Pipeline

In [83]:
gbt = GBTClassifier(labelCol='diabetes',featuresCol='features')
gbt_pipeline = Pipeline(stages=[GenderIndexer, SmokeHistIndexer, OHE_Gender_Smoke, F_assembler, gbt])
gbt_model = gbt_pipeline.fit(train_heart)
gbt_predictions = gbt_model.transform(val_heart)

In [84]:
gbt_acc = acc_evaluator.evaluate(gbt_predictions)
print('Gradient Boost algorithm had an accuracy of: {0:2.2f}%'.format(gbt_acc*100))

Gradient Boost algorithm had an accuracy of: 91.40%


In [100]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Let's use the run-of-the-mill evaluator
evaluator = BinaryClassificationEvaluator(labelCol='diabetes')

# We have only two choices: area under ROC and PR curves :-(
gbt_auroc = evaluator.evaluate(gbt_predictions, {evaluator.metricName: "areaUnderROC"})
#auprc = evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderPR"})
print("Area under ROC Curve: {:.4f}".format(gbt_auroc))
#print("Area under PR Curve: {:.4f}".format(auprc))

Area under ROC Curve: 0.8346


#Summary

In [101]:
print(round(lr_acc,3), 'is the accuray of the LR pipeline')
print(round(rf_acc,3), 'is the accuray of the RF pipeline')
print(round(gbt_acc,3), 'is the accuray of the GBT pipeline')

print(round(lr_auroc,3), 'is area under ROC curve of the LR pipeline')
print(round(rf_auroc,3), 'is area under ROC curve of the RF pipeline')
print(round(gbt_auroc,3), 'is area under ROC curve of the GBT pipeline')

0.912 is the accuray of the LR pipeline
0.913 is the accuray of the RF pipeline
0.914 is the accuray of the GBT pipeline
0.833 is area under ROC curve of the LR pipeline
0.82 is area under ROC curve of the RF pipeline
0.835 is area under ROC curve of the GBT pipeline


#Chronobooks <br>
![alt text](https://1.bp.blogspot.com/-lTiYBkU2qbU/X1er__fvnkI/AAAAAAAAjtE/GhDR3OEGJr4NG43fZPodrQD5kbxtnKebgCLcBGAsYHQ/s600/Footer2020-600x200.png)<hr>
Chronotantra and Chronoyantra are two science fiction novels that explore the collapse of human civilisation on Earth and then its rebirth and reincarnation both on Earth as well as on the distant worlds of Mars, Titan and Enceladus. But is it the human civilisation that is being reborn? Or is it some other sentience that is revealing itself. 
If you have an interest in AI and found this material useful, you may consider buying these novels, in paperback or kindle, from [http://bit.ly/chronobooks](http://bit.ly/chronobooks)