# DIABETES PREDICTION BUT USING PYSPARK INSTEAD OF PANDAS

We will be doing Predictions on the Diabetes dataset, but not tradionally like how we used to do using pandas, but instead we will be doing it using a new method called as Pyspark.
PySpark is similar to pandas, it has a dataframe, but the key difference between both of them is that, a pandas dataframe is mutable, where as a Pyspark dataframe is immutable. It is very essential, when the integrity of the dataset is crucial to the business application and this is where the pyspark library is useful.

If you are completely new to the Pyspark concept, but know little bit about pandas library, I suggest you visit this site:
 https://sparkbyexamples.com/pyspark/pandas-vs-pyspark-dataframe-with-examples/

In [1]:
#Installing all the important libraries
!pip install pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("spark").getOrCreate()
from pyspark.sql.functions import *
from pyspark.ml.feature import VectorAssembler

Collecting pyspark
  Downloading pyspark-3.2.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 38 kB/s 
[?25hCollecting py4j==0.10.9.2
  Downloading py4j-0.10.9.2-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 56.6 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.0-py2.py3-none-any.whl size=281805912 sha256=b8a4cb2b822d2cbc6d68f1b65687091e97227ea64e23d7f6209a56216878b417
  Stored in directory: /root/.cache/pip/wheels/0b/de/d2/9be5d59d7331c6c2a7c1b6d1a4f463ce107332b1ecd4e80718
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.2 pyspark-3.2.0


In [2]:
!git clone https://github.com/education454/diabetes_dataset

Cloning into 'diabetes_dataset'...
remote: Enumerating objects: 6, done.[K
remote: Counting objects: 100% (6/6), done.[K
remote: Compressing objects: 100% (5/5), done.[K
remote: Total 6 (delta 0), reused 0 (delta 0), pack-reused 0[K
Unpacking objects: 100% (6/6), done.


In [3]:
! ls diabetes_dataset 

diabetes.csv  new_test.csv


In [4]:
sdf = spark.read.csv('/content/diabetes_dataset/diabetes.csv',header=True,inferSchema=True)

In [5]:
sdf.show()

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|          2|    138|           62|           35|      0|33.6|                   0.127| 47|      1|
|          0|     84|           82|           31|    125|38.2|                   0.233| 23|      0|
|          0|    145|            0|            0|      0|44.2|                    0.63| 31|      1|
|          0|    135|           68|           42|    250|42.3|                   0.365| 24|      1|
|          1|    139|           62|           41|    480|40.7|                   0.536| 21|      0|
|          0|    173|           78|           32|    265|46.5|                   1.159| 58|      0|
|          4|     99|           72|           17|      0|25.6|                   0.294| 28|      0|


In [6]:
sdf.printSchema()

root
 |-- Pregnancies: integer (nullable = true)
 |-- Glucose: integer (nullable = true)
 |-- BloodPressure: integer (nullable = true)
 |-- SkinThickness: integer (nullable = true)
 |-- Insulin: integer (nullable = true)
 |-- BMI: double (nullable = true)
 |-- DiabetesPedigreeFunction: double (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Outcome: integer (nullable = true)



Here, we can see the schema of the dataset. Understanding the schema is really important before you start analyzing the data. You may make useful assumptions required for the dataset, just by looking at the data's schema.

In [7]:
print(sdf.count(),len(sdf.columns))

2000 9


In [8]:
#Counting the number of 0's and 1's in the output column of the dataset.
sdf.groupby('Outcome').count().show()

+-------+-----+
|Outcome|count|
+-------+-----+
|      1|  684|
|      0| 1316|
+-------+-----+



In [9]:
sdf.describe().show()

+-------+-----------------+------------------+------------------+-----------------+-----------------+------------------+------------------------+------------------+------------------+
|summary|      Pregnancies|           Glucose|     BloodPressure|    SkinThickness|          Insulin|               BMI|DiabetesPedigreeFunction|               Age|           Outcome|
+-------+-----------------+------------------+------------------+-----------------+-----------------+------------------+------------------------+------------------+------------------+
|  count|             2000|              2000|              2000|             2000|             2000|              2000|                    2000|              2000|              2000|
|   mean|           3.7035|          121.1825|           69.1455|           20.935|           80.254|32.192999999999984|     0.47092999999999974|           33.0905|             0.342|
| stddev|3.306063032730656|32.068635649902916|19.188314815604098|16.103242909926

As we can see, in the above output, some of the feature columns like Glucose and BloodPressure have values 0, which does not makes sense, since it never can be 0, unless he/she is dead. Jokes apart, these values need to be replaced by some alternate values, which is why, we will be fetching those values, and then replacing them by the value of the mean of the column which we have already found in the above code.

In [10]:
#Replacing the 0 values from features to the mean values.
#Also check for missing or null values
for cl in sdf.columns:
  print(cl+":",sdf[sdf[cl].isNull()].count())

Pregnancies: 0
Glucose: 0
BloodPressure: 0
SkinThickness: 0
Insulin: 0
BMI: 0
DiabetesPedigreeFunction: 0
Age: 0
Outcome: 0


We can see, that there are no null values in the dataset, which makes half of our job easy. Atleast that's what I think.

In [11]:
#Checking the total number of 0's in the columns of Glucose, Blood Pressure, SkinThickness, Insulin and BMI.
def check_zeros():
  feature_names = ['Glucose','BloodPressure','SkinThickness','Insulin','BMI']
  #For printing the name of each column and the subsequent value.
  for i in feature_names:
    print(i+"",sdf[sdf[i]==0].count())
check_zeros()

Glucose 13
BloodPressure 90
SkinThickness 573
Insulin 956
BMI 28


Let's iterate through each column, and find out which column has what as their mean value. Then we will find out the positions of values in the respective columns with 0, and replace it with the mean value of that column.

In [12]:
for i in sdf.columns[1:6]:
  value = sdf.agg({i:'mean'}).first()[0]
  print("Mean value for {} is {}".format(i,int(value)))
  sdf = sdf.withColumn(i,when(sdf[i]==0,int(value)).otherwise(sdf[i]))

Mean value for Glucose is 121
Mean value for BloodPressure is 69
Mean value for SkinThickness is 20
Mean value for Insulin is 80
Mean value for BMI is 32


In [13]:
sdf.show()

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|          2|    138|           62|           35|     80|33.6|                   0.127| 47|      1|
|          0|     84|           82|           31|    125|38.2|                   0.233| 23|      0|
|          0|    145|           69|           20|     80|44.2|                    0.63| 31|      1|
|          0|    135|           68|           42|    250|42.3|                   0.365| 24|      1|
|          1|    139|           62|           41|    480|40.7|                   0.536| 21|      0|
|          0|    173|           78|           32|    265|46.5|                   1.159| 58|      0|
|          4|     99|           72|           17|     80|25.6|                   0.294| 28|      0|


We can see that the values with 0 in them, has changed into the mean values of the respective columns.

Let's now figure out the correlation of each column, with every other column. Correlation basically means how much is one column related to the other column.

In [14]:
for cl in sdf.columns:
  print("Correlation to outcome for {} is {} ".format(cl,sdf.stat.corr('Outcome',cl)))

Correlation to outcome for Pregnancies is 0.22443699263363961 
Correlation to outcome for Glucose is 0.48796646527321064 
Correlation to outcome for BloodPressure is 0.17171333286446713 
Correlation to outcome for SkinThickness is 0.1659010662889893 
Correlation to outcome for Insulin is 0.1711763270226193 
Correlation to outcome for BMI is 0.2827927569760082 
Correlation to outcome for DiabetesPedigreeFunction is 0.1554590791569403 
Correlation to outcome for Age is 0.23650924717620253 
Correlation to outcome for Outcome is 1.0 


VectorAssember from Spark ML library is a module that allows to convert numerical features into a single vector that is used by the machine learning models.
ectorAssembler will have two parameters:
1.   inputCols – list of features to combine into a single vector column.
2.   outputCol – the new column that will contain the transformed vector




In [15]:
assembler = VectorAssembler(inputCols = ['Pregnancies','Glucose','BloodPressure','SkinThickness','Insulin','BMI','DiabetesPedigreeFunction','Age'], outputCol='features')
output_data = assembler.transform(sdf)

In [16]:
output_data.printSchema()

root
 |-- Pregnancies: integer (nullable = true)
 |-- Glucose: integer (nullable = true)
 |-- BloodPressure: integer (nullable = true)
 |-- SkinThickness: integer (nullable = true)
 |-- Insulin: integer (nullable = true)
 |-- BMI: double (nullable = true)
 |-- DiabetesPedigreeFunction: double (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Outcome: integer (nullable = true)
 |-- features: vector (nullable = true)



In the princtSchema output, we can see at the last section, we have a new vector column, which has the data from the all the columns that we have passed in the inputCols.

In [17]:
output_data.show()

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+--------------------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|            features|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+--------------------+
|          2|    138|           62|           35|     80|33.6|                   0.127| 47|      1|[2.0,138.0,62.0,3...|
|          0|     84|           82|           31|    125|38.2|                   0.233| 23|      0|[0.0,84.0,82.0,31...|
|          0|    145|           69|           20|     80|44.2|                    0.63| 31|      1|[0.0,145.0,69.0,2...|
|          0|    135|           68|           42|    250|42.3|                   0.365| 24|      1|[0.0,135.0,68.0,4...|
|          1|    139|           62|           41|    480|40.7|                   0.536| 21|      0|[1.0,139.0,62.0,4...|
|          0|    173|           

# Logistic Regression Analysis of the dataset.

We will be using Logistic Regression Analysis to predict the output. There are other methods which are available for predicting like Naive Bayes and more, but when it comes to binary classification, Logistic Regression analysis makes more astute prediction.

In [18]:
#We will be using Logistic Regression
from pyspark.ml.classification import LogisticRegression

In [19]:
final_data = output_data.select('features','Outcome')

In [20]:
final_data.printSchema()


root
 |-- features: vector (nullable = true)
 |-- Outcome: integer (nullable = true)



We will be splitting the data in 2 parts, training and testing data. The training data will have 70% of overall data, whereas testing will have reamining 30%. It's not mandatory to do that, you can take any number you like, just make sure that the training split is more than the testing split. This helps in a way that model learns more about the data and can get us good prediction results.

In [21]:
train, test = final_data.randomSplit([0.7,0.3])
models = LogisticRegression(labelCol='Outcome')
model = models.fit(train)

In [22]:
summary = model.summary

In [23]:
summary.predictions.describe().show()



+-------+-------------------+-------------------+
|summary|            Outcome|         prediction|
+-------+-------------------+-------------------+
|  count|               1374|               1374|
|   mean|0.34643377001455605|0.26346433770014555|
| stddev| 0.4760066386956732| 0.4406724565359349|
|    min|                0.0|                0.0|
|    max|                1.0|                1.0|
+-------+-------------------+-------------------+



Evaluator for binary classification, which expects input columns rawPrediction, label and an optional weight column. The rawPrediction column can be of type double (binary 0/1 prediction, or probability of label 1) or of type vector (length-2 vector of raw predictions, scores, or label probabilities).

You can get more details in the link:
https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.evaluation.BinaryClassificationEvaluator.html

In [24]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator


In [25]:
predictions = model.evaluate(test)

In [26]:
predictions.predictions.show(30)



+--------------------+-------+--------------------+--------------------+----------+
|            features|Outcome|       rawPrediction|         probability|prediction|
+--------------------+-------+--------------------+--------------------+----------+
|[0.0,73.0,69.0,20...|      0|[4.04756103990063...|[0.98283486865127...|       0.0|
|[0.0,74.0,52.0,10...|      0|[3.48665316118600...|[0.97030561623887...|       0.0|
|[0.0,84.0,64.0,22...|      0|[2.41536709537680...|[0.91799164036427...|       0.0|
|[0.0,86.0,68.0,32...|      0|[2.58731957673575...|[0.93004101702774...|       0.0|
|[0.0,86.0,68.0,32...|      0|[2.58731957673575...|[0.93004101702774...|       0.0|
|[0.0,91.0,68.0,32...|      0|[2.21797861111311...|[0.90185241848705...|       0.0|
|[0.0,91.0,80.0,20...|      0|[2.31842695869104...|[0.91039169688789...|       0.0|
|[0.0,93.0,60.0,20...|      0|[2.30710677193522...|[0.90946391095931...|       0.0|
|[0.0,93.0,60.0,20...|      0|[2.30710677193522...|[0.90946391095931...|    

In [27]:
evaluator = BinaryClassificationEvaluator(rawPredictionCol='rawPrediction',labelCol='Outcome')
evaluator.evaluate(model.transform(test))

0.8541129922708871

We can save the model using model.save() command with the name of model which ever you like.

In [28]:
model.save("Binary_model")

For loading the saved model, you will need a library in the below code I have mentioned how to use and import it.

In [29]:
from pyspark.ml.classification import LogisticRegressionModel

In [30]:
model = LogisticRegressionModel.load("Binary_model")