# Binary classification using Logistic Regression on Diabetes dataset

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [19]:
# reading csv file
raw_data = spark.read.format("csv").option("header","true").option("inferSchema", "true").load("diabetes2.csv")

In [20]:
raw_data.columns

['Pregnancies',
 'Glucose',
 'BloodPressure',
 'SkinThickness',
 'Insulin',
 'BMI',
 'DiabetesPedigreeFunction',
 'Age',
 'Outcome']

In [21]:
raw_data.show(5)

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|          6|    148|           72|           35|      0|33.6|                   0.627| 50|      1|
|          1|     85|           66|           29|      0|26.6|                   0.351| 31|      0|
|          8|    183|           64|            0|      0|23.3|                   0.672| 32|      1|
|          1|     89|           66|           23|     94|28.1|                   0.167| 21|      0|
|          0|    137|           40|           35|    168|43.1|                   2.288| 33|      1|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
only showing top 5 rows



In [22]:
raw_data.describe().select("Summary","SkinThickness","Insulin").show()

+-------+------------------+------------------+
|Summary|     SkinThickness|           Insulin|
+-------+------------------+------------------+
|  count|               768|               768|
|   mean|20.536458333333332| 79.79947916666667|
| stddev|15.952217567727642|115.24400235133803|
|    min|                 0|                 0|
|    max|                99|               846|
+-------+------------------+------------------+



In [23]:
raw_data.describe().select("Summary","BMI","DiabetesPedigreeFunction","Age").show()

+-------+------------------+------------------------+------------------+
|Summary|               BMI|DiabetesPedigreeFunction|               Age|
+-------+------------------+------------------------+------------------+
|  count|               768|                     768|               768|
|   mean|31.992578124999977|      0.4718763020833327|33.240885416666664|
| stddev| 7.884160320375441|       0.331328595012775|11.760231540678689|
|    min|               0.0|                   0.078|                21|
|    max|              67.1|                    2.42|                81|
+-------+------------------+------------------------+------------------+



In [24]:
raw_data.describe().select("Summary","Pregnancies","Glucose","BloodPressure").show()

+-------+------------------+-----------------+------------------+
|Summary|       Pregnancies|          Glucose|     BloodPressure|
+-------+------------------+-----------------+------------------+
|  count|               768|              768|               768|
|   mean|3.8450520833333335|     120.89453125|       69.10546875|
| stddev|  3.36957806269887|31.97261819513622|19.355807170644777|
|    min|                 0|                0|                 0|
|    max|                17|              199|               122|
+-------+------------------+-----------------+------------------+



In [25]:
import pyspark.sql.functions as F

In [26]:
import numpy as np

In [27]:
# updating values 0 with np.nan
raw_data=raw_data.withColumn("Glucose",F.when(raw_data.Glucose==0,np.nan).otherwise(raw_data.Glucose))
raw_data=raw_data.withColumn("BloodPressure",F.when(raw_data.BloodPressure==0,np.nan).otherwise(raw_data.BloodPressure))
raw_data=raw_data.withColumn("SkinThickness",F.when(raw_data.SkinThickness==0,np.nan).otherwise(raw_data.SkinThickness))
raw_data=raw_data.withColumn("BMI",F.when(raw_data.BMI==0,np.nan).otherwise(raw_data.BMI))
raw_data=raw_data.withColumn("Insulin",F.when(raw_data.Insulin==0,np.nan).otherwise(raw_data.Insulin))
raw_data.select("Insulin","Glucose","BloodPressure","SkinThickness","BMI").show(5)

+-------+-------+-------------+-------------+----+
|Insulin|Glucose|BloodPressure|SkinThickness| BMI|
+-------+-------+-------------+-------------+----+
|    NaN|  148.0|         72.0|         35.0|33.6|
|    NaN|   85.0|         66.0|         29.0|26.6|
|    NaN|  183.0|         64.0|          NaN|23.3|
|   94.0|   89.0|         66.0|         23.0|28.1|
|  168.0|  137.0|         40.0|         35.0|43.1|
+-------+-------+-------------+-------------+----+
only showing top 5 rows



In [28]:
# Filling nan with mean
from pyspark.ml.feature import Imputer
imputer=Imputer(inputCols=["Glucose","BloodPressure","SkinThickness","BMI","Insulin"],
                outputCols=["Glucose","BloodPressure","SkinThickness","BMI","Insulin"])
model=imputer.fit(raw_data)
raw_data=model.transform(raw_data)
raw_data.show(5)

+-----------+-------+-------------+------------------+-----------------+----+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|     SkinThickness|          Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+------------------+-----------------+----+------------------------+---+-------+
|          6|  148.0|         72.0|              35.0|155.5482233502538|33.6|                   0.627| 50|      1|
|          1|   85.0|         66.0|              29.0|155.5482233502538|26.6|                   0.351| 31|      0|
|          8|  183.0|         64.0|29.153419593345657|155.5482233502538|23.3|                   0.672| 32|      1|
|          1|   89.0|         66.0|              23.0|             94.0|28.1|                   0.167| 21|      0|
|          0|  137.0|         40.0|              35.0|            168.0|43.1|                   2.288| 33|      1|
+-----------+-------+-------------+------------------+-----------------+----+---

In [29]:
# applying vector assembler
cols=raw_data.columns
cols.remove("Outcome")
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=['Pregnancies','Glucose','BloodPressure',
                                       'SkinThickness','Insulin','BMI','DiabetesPedigreeFunction','Age'],
                            outputCol="features")
raw_data=assembler.transform(raw_data)
raw_data.select("features").show()

+--------------------+
|            features|
+--------------------+
|[6.0,148.0,72.0,3...|
|[1.0,85.0,66.0,29...|
|[8.0,183.0,64.0,2...|
|[1.0,89.0,66.0,23...|
|[0.0,137.0,40.0,3...|
|[5.0,116.0,74.0,2...|
|[3.0,78.0,50.0,32...|
|[10.0,115.0,72.40...|
|[2.0,197.0,70.0,4...|
|[8.0,125.0,96.0,2...|
|[4.0,110.0,92.0,2...|
|[10.0,168.0,74.0,...|
|[10.0,139.0,80.0,...|
|[1.0,189.0,60.0,2...|
|[5.0,166.0,72.0,1...|
|[7.0,100.0,72.405...|
|[0.0,118.0,84.0,4...|
|[7.0,107.0,74.0,2...|
|[1.0,103.0,30.0,3...|
|[1.0,115.0,70.0,3...|
+--------------------+
only showing top 20 rows



In [30]:
# applying standard scaler
from pyspark.ml.feature import StandardScaler
standardscaler=StandardScaler().setInputCol("features").setOutputCol("Scaled_features")
raw_data=standardscaler.fit(raw_data).transform(raw_data)
raw_data.select("features","Scaled_features").show(5)

+--------------------+--------------------+
|            features|     Scaled_features|
+--------------------+--------------------+
|[6.0,148.0,72.0,3...|[1.78063837321943...|
|[1.0,85.0,66.0,29...|[0.29677306220323...|
|[8.0,183.0,64.0,2...|[2.37418449762590...|
|[1.0,89.0,66.0,23...|[0.29677306220323...|
|[0.0,137.0,40.0,3...|[0.0,4.5012560836...|
+--------------------+--------------------+
only showing top 5 rows



In [32]:
raw_data.select("Scaled_features").show(5,truncate=False)

+------------------------------------------------------------------------------------------------------------------------------------------------------+
|Scaled_features                                                                                                                                       |
+------------------------------------------------------------------------------------------------------------------------------------------------------+
|[1.7806383732194306,4.862670805688543,5.952210601826984,3.9813708583353558,1.8295247783934943,4.887165154544966,1.8923811872495484,4.251616970894646] |
|[0.29677306220323846,2.7927501248886903,5.456193051674735,3.29885013976358,1.8295247783934943,3.869005747348098,1.0593712866420917,2.6360025219546803]|
|[2.3741844976259077,6.0126267394662385,5.290853868290652,3.316302148279125,1.8295247783934943,3.3890163125267176,2.0281980188703295,2.721034861372573]|
|[0.29677306220323846,2.9241736601775696,5.456193051674735,2.616329421191805,1.105

In [33]:
# Splitting the dataset
train, test = raw_data.randomSplit([0.75, 0.25], seed=0.8)

In [34]:
# checking imbalance in training set
dataset_size=float(train.select("Outcome").count())
numPositives=train.select("Outcome").where('Outcome == 1').count()
per_ones=(float(numPositives)/float(dataset_size))*100
numNegatives=float(dataset_size-numPositives)
print('The number of ones are {}'.format(numPositives))
print('Percentage of ones are {}'.format(per_ones))

The number of ones are 214
Percentage of ones are 36.02693602693603


In [35]:
# calculating balance ratio
BalancingRatio= numNegatives/dataset_size
print('BalancingRatio = {}'.format(BalancingRatio))

BalancingRatio = 0.6397306397306397


In [37]:
# adding classWeights column in train set
train=train.withColumn("classWeights", F.when(train.Outcome == 1,BalancingRatio).otherwise(1-BalancingRatio))
train.select("classWeights").show(5)

+------------------+
|      classWeights|
+------------------+
|0.3602693602693603|
|0.3602693602693603|
|0.3602693602693603|
|0.3602693602693603|
|0.3602693602693603|
+------------------+
only showing top 5 rows



In [38]:
# Feature selection using chisquareSelector
from pyspark.ml.feature import ChiSqSelector
css = ChiSqSelector(featuresCol='Scaled_features',outputCol='Aspect',labelCol='Outcome',fpr=0.05)
train=css.fit(train).transform(train)
test=css.fit(test).transform(test)
test.select("Aspect").show(5,truncate=False)

+-----------------------------------------------------------------------------------------------------------------------------------------+
|Aspect                                                                                                                                   |
+-----------------------------------------------------------------------------------------------------------------------------------------+
|[0.0,1.8727853778665335,4.960175501522486,3.316302148279125,1.8295247783934943,3.1562941623102905,2.2183415831394226,5.697166740998825]  |
|[0.0,2.4313354028442715,4.2988187679861545,1.1375345309529588,0.42342426421586205,4.043547360010418,0.8118828379108909,1.870711467193644]|
|[0.0,2.7598942410664704,5.290853868290652,2.5025759680965094,0.7762778177290804,5.207158111092553,1.6448927385183476,1.785679127775751]  |
|[0.0,3.2855883822219885,7.274924068899646,6.825207185717752,1.2937963628818008,6.8071228938304875,2.903462044870918,2.6360025219546803]  |
|[0.0,3.318444266044

In [39]:
# fitting the logistic regression model
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(labelCol="Outcome", featuresCol="Aspect",weightCol="classWeights",maxIter=10)
model=lr.fit(train)
predict_train=model.transform(train)
predict_test=model.transform(test)
predict_test.select("Outcome","prediction").show()

+-------+----------+
|Outcome|prediction|
+-------+----------+
|      0|       0.0|
|      0|       0.0|
|      0|       0.0|
|      0|       1.0|
|      0|       0.0|
|      0|       0.0|
|      0|       0.0|
|      0|       0.0|
|      0|       0.0|
|      0|       0.0|
|      1|       0.0|
|      0|       1.0|
|      1|       0.0|
|      0|       0.0|
|      0|       0.0|
|      0|       0.0|
|      0|       0.0|
|      1|       1.0|
|      0|       0.0|
|      1|       1.0|
+-------+----------+
only showing top 20 rows



In [41]:
predict_test.show(1)

+-----------+-------+-------------+------------------+-----------------+----+------------------------+---+-------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|Pregnancies|Glucose|BloodPressure|     SkinThickness|          Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|            features|     Scaled_features|              Aspect|       rawPrediction|         probability|prediction|
+-----------+-------+-------------+------------------+-----------------+----+------------------------+---+-------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|          0|   57.0|         60.0|29.153419593345657|155.5482233502538|21.7|                   0.735| 67|      0|[0.0,57.0,60.0,29...|[0.0,1.8727853778...|[0.0,1.8727853778...|[3.84066523834802...|[0.97897235159879...|       0.0|
+-----------+-------+-------------+------------------+-----------------+----

In [43]:
# evaluating
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator=BinaryClassificationEvaluator(rawPredictionCol="rawPrediction",labelCol="Outcome")
predict_test.select("Outcome","rawPrediction","prediction","probability").show(5)
print("The area under ROC for train set is {}".format(evaluator.evaluate(predict_train)))
print("The area under ROC for test set is {}".format(evaluator.evaluate(predict_test)))

+-------+--------------------+----------+--------------------+
|Outcome|       rawPrediction|prediction|         probability|
+-------+--------------------+----------+--------------------+
|      0|[3.84066523834802...|       0.0|[0.97897235159879...|
|      0|[2.86492478535243...|       0.0|[0.94608505614275...|
|      0|[1.69781957503435...|       0.0|[0.84524974428250...|
|      0|[-0.1056353100376...|       1.0|[0.47361570272801...|
|      0|[2.70237566566379...|       0.0|[0.93716668108294...|
+-------+--------------------+----------+--------------------+
only showing top 5 rows

The area under ROC for train set is 0.8270167240531232
The area under ROC for test set is 0.8746913580246912


In [44]:
# Hyperparameter tuning and K fold cross validation
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
paramGrid = ParamGridBuilder()\
    .addGrid(lr.aggregationDepth,[2,5,10])\
    .addGrid(lr.elasticNetParam,[0.0, 0.5, 1.0])\
    .addGrid(lr.fitIntercept,[False, True])\
    .addGrid(lr.maxIter,[10, 100, 1000])\
    .addGrid(lr.regParam,[0.01, 0.5, 2.0]) \
    .build()
cv = CrossValidator(estimator=lr, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)
cvModel = cv.fit(train)
predict_train=cvModel.transform(train)
predict_test=cvModel.transform(test)
print("The area under ROC for train set after CV  is {}".format(evaluator.evaluate(predict_train)))
print("The area under ROC for test set after CV  is {}".format(evaluator.evaluate(predict_test)))