### **Installing Dependencies.**

In [2]:
! pip install pyspark

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/f0/26/198fc8c0b98580f617cb03cb298c6056587b8f0447e20fa40c5b634ced77/pyspark-3.0.1.tar.gz (204.2MB)
[K     |████████████████████████████████| 204.2MB 67kB/s 
[?25hCollecting py4j==0.10.9
[?25l  Downloading https://files.pythonhosted.org/packages/9e/b6/6a4fb90cd235dc8e265a6a2067f2a2c99f0d91787f06aca4bcf7c23f3f80/py4j-0.10.9-py2.py3-none-any.whl (198kB)
[K     |████████████████████████████████| 204kB 39.0MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.0.1-py2.py3-none-any.whl size=204612243 sha256=aa8a173d365ff6f569aa8395376507874e197f92e6b6efda4ecec97b0410a175
  Stored in directory: /root/.cache/pip/wheels/5e/bd/07/031766ca628adec8435bb40f0bd83bb676ce65ff4007f8e73f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.0.1


### **Running Spark Session**

In [3]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("spark").getOrCreate()

## **Clone Diabetes Dataset**

In [4]:
! git clone https://github.com/education454/diabetes_dataset


Cloning into 'diabetes_dataset'...
remote: Enumerating objects: 6, done.[K
remote: Counting objects: 100% (6/6), done.[K
remote: Compressing objects: 100% (5/5), done.[K
remote: Total 6 (delta 0), reused 0 (delta 0), pack-reused 0[K
Unpacking objects: 100% (6/6), done.


In [5]:
! ls diabetes_dataset

diabetes.csv  new_test.csv


In [6]:
df = spark.read.csv('/content/diabetes_dataset/diabetes.csv', header=True, inferSchema=True)

In [7]:
df.show()

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|          2|    138|           62|           35|      0|33.6|                   0.127| 47|      1|
|          0|     84|           82|           31|    125|38.2|                   0.233| 23|      0|
|          0|    145|            0|            0|      0|44.2|                    0.63| 31|      1|
|          0|    135|           68|           42|    250|42.3|                   0.365| 24|      1|
|          1|    139|           62|           41|    480|40.7|                   0.536| 21|      0|
|          0|    173|           78|           32|    265|46.5|                   1.159| 58|      0|
|          4|     99|           72|           17|      0|25.6|                   0.294| 28|      0|


In [8]:
df.printSchema()

root
 |-- Pregnancies: integer (nullable = true)
 |-- Glucose: integer (nullable = true)
 |-- BloodPressure: integer (nullable = true)
 |-- SkinThickness: integer (nullable = true)
 |-- Insulin: integer (nullable = true)
 |-- BMI: double (nullable = true)
 |-- DiabetesPedigreeFunction: double (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Outcome: integer (nullable = true)



In [9]:
print((df.count(), len(df.columns)))

(2000, 9)


In [10]:
df.groupby('Outcome').count().show()

+-------+-----+
|Outcome|count|
+-------+-----+
|      1|  684|
|      0| 1316|
+-------+-----+



In [11]:
df.describe().show()

+-------+-----------------+------------------+------------------+-----------------+-----------------+------------------+------------------------+------------------+------------------+
|summary|      Pregnancies|           Glucose|     BloodPressure|    SkinThickness|          Insulin|               BMI|DiabetesPedigreeFunction|               Age|           Outcome|
+-------+-----------------+------------------+------------------+-----------------+-----------------+------------------+------------------------+------------------+------------------+
|  count|             2000|              2000|              2000|             2000|             2000|              2000|                    2000|              2000|              2000|
|   mean|           3.7035|          121.1825|           69.1455|           20.935|           80.254|32.192999999999984|     0.47092999999999974|           33.0905|             0.342|
| stddev|3.306063032730656|32.068635649902916|19.188314815604098|16.103242909926

## **Data Cleaning**|

In [12]:
# Checking all the Null values in the Dataset.
for col in df.columns:
  print(col+ ":", df[df[col].isNull()].count())

Pregnancies: 0
Glucose: 0
BloodPressure: 0
SkinThickness: 0
Insulin: 0
BMI: 0
DiabetesPedigreeFunction: 0
Age: 0
Outcome: 0


In [13]:
# Finding total number of 0s entry in colums.
def count_zero():
  column_list= ['Glucose', 'BloodPressure', 'SkinThickness', 'Insulin', 'BMI']
  for i in column_list:
    print(i+ ":", df[df[i] == 0].count())

In [14]:
count_zero()

Glucose: 13
BloodPressure: 90
SkinThickness: 573
Insulin: 956
BMI: 28


In [15]:
# Upadating Null with mean value.
from pyspark.sql.functions import *
for i in df.columns[1:6]:
  data = df.agg({i:'mean'}).first()[0]
  print("Mean value for {} is {}".format(i,int(data)))
  df = df.withColumn(i, when(df[i]== 0, int(data)).otherwise(df[i]))

Mean value for Glucose is 121
Mean value for BloodPressure is 69
Mean value for SkinThickness is 20
Mean value for Insulin is 80
Mean value for BMI is 32


In [16]:
df.show()

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|          2|    138|           62|           35|     80|33.6|                   0.127| 47|      1|
|          0|     84|           82|           31|    125|38.2|                   0.233| 23|      0|
|          0|    145|           69|           20|     80|44.2|                    0.63| 31|      1|
|          0|    135|           68|           42|    250|42.3|                   0.365| 24|      1|
|          1|    139|           62|           41|    480|40.7|                   0.536| 21|      0|
|          0|    173|           78|           32|    265|46.5|                   1.159| 58|      0|
|          4|     99|           72|           17|     80|25.6|                   0.294| 28|      0|


In [18]:
# Finding Co-Relation among Input and Output Values, Finding best feature for our model.
for col in df.columns:
  print("Corelation to outcome for {} is {}".format(col, df.stat.corr('Outcome',col)))

Corelation to outcome for Pregnancies is 0.22443699263363961
Corelation to outcome for Glucose is 0.48796646527321064
Corelation to outcome for BloodPressure is 0.17171333286446713
Corelation to outcome for SkinThickness is 0.1659010662889893
Corelation to outcome for Insulin is 0.1711763270226193
Corelation to outcome for BMI is 0.2827927569760082
Corelation to outcome for DiabetesPedigreeFunction is 0.1554590791569403
Corelation to outcome for Age is 0.23650924717620253
Corelation to outcome for Outcome is 1.0


## **Feature Selection**

In [21]:
# we will Choose all input features for our model as similar co_relation 
from pyspark.ml.feature import VectorAssembler
# defining a new feature column.
assembler = VectorAssembler(inputCols=['Pregnancies','Glucose','BloodPressure','SkinThickness','Insulin','BMI','DiabetesPedigreeFunction','Age'], outputCol='features')
output_data = assembler.transform(df) # produce a new column 'features

In [22]:
output_data.printSchema()

root
 |-- Pregnancies: integer (nullable = true)
 |-- Glucose: integer (nullable = true)
 |-- BloodPressure: integer (nullable = true)
 |-- SkinThickness: integer (nullable = true)
 |-- Insulin: integer (nullable = true)
 |-- BMI: double (nullable = true)
 |-- DiabetesPedigreeFunction: double (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Outcome: integer (nullable = true)
 |-- features: vector (nullable = true)



In [23]:
output_data.show()

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+--------------------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|            features|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+--------------------+
|          2|    138|           62|           35|     80|33.6|                   0.127| 47|      1|[2.0,138.0,62.0,3...|
|          0|     84|           82|           31|    125|38.2|                   0.233| 23|      0|[0.0,84.0,82.0,31...|
|          0|    145|           69|           20|     80|44.2|                    0.63| 31|      1|[0.0,145.0,69.0,2...|
|          0|    135|           68|           42|    250|42.3|                   0.365| 24|      1|[0.0,135.0,68.0,4...|
|          1|    139|           62|           41|    480|40.7|                   0.536| 21|      0|[1.0,139.0,62.0,4...|
|          0|    173|           

## **Build & Train Model**

In [26]:
# We will now use Logistic Regression Algorithm model.
from pyspark.ml.classification import LogisticRegression
# selecting featues and outcome column.
final_data = output_data.select('features', 'Outcome')

In [27]:
final_data.printSchema()

root
 |-- features: vector (nullable = true)
 |-- Outcome: integer (nullable = true)



In [28]:
# Splitting the dataset into training and test part.
train, test = final_data.randomSplit([0.7,0.3])
# train our model.
models = LogisticRegression(labelCol='Outcome')
model = models.fit(train)

In [29]:
summary = model.summary

In [30]:
summary.predictions.describe().show()

+-------+-------------------+-------------------+
|summary|            Outcome|         prediction|
+-------+-------------------+-------------------+
|  count|               1423|               1423|
|   mean|0.33661278988053406| 0.2550948699929726|
| stddev|0.47271730999279493|0.43606777801978236|
|    min|                0.0|                0.0|
|    max|                1.0|                1.0|
+-------+-------------------+-------------------+



### **Evaluate and Test Our Model**

In [32]:
# By deafault it uses Area under the ROC as performance matrix by default.
from pyspark.ml.evaluation import BinaryClassificationEvaluator
predictions = model.evaluate(test)

In [37]:
# By default it adds the three column.
predictions.predictions.show(10)

+--------------------+-------+--------------------+--------------------+----------+
|            features|Outcome|       rawPrediction|         probability|prediction|
+--------------------+-------+--------------------+--------------------+----------+
|[0.0,57.0,60.0,20...|      0|[4.18358802440098...|[0.98498516731631...|       0.0|
|[0.0,67.0,76.0,20...|      0|[2.28434823289449...|[0.90757244409546...|       0.0|
|[0.0,74.0,52.0,10...|      0|[3.65161618717839...|[0.97470717138332...|       0.0|
|[0.0,78.0,88.0,29...|      0|[2.75968874052261...|[0.94045820695036...|       0.0|
|[0.0,84.0,64.0,22...|      0|[2.51962611840127...|[0.92550628189044...|       0.0|
|[0.0,84.0,64.0,22...|      0|[2.51962611840127...|[0.92550628189044...|       0.0|
|[0.0,84.0,82.0,31...|      0|[2.61161975366294...|[0.93160567355690...|       0.0|
|[0.0,84.0,82.0,31...|      0|[2.61161975366294...|[0.93160567355690...|       0.0|
|[0.0,91.0,68.0,32...|      0|[2.17404797339874...|[0.89789468223857...|    

In [39]:
# using Binary classification evaluator
evaluator =BinaryClassificationEvaluator(rawPredictionCol='rawPrediction', labelCol='Outcome')
# How accurate our Model is?
evaluator.evaluate(model.transform(test))

0.8327825858903752

## ***This Show how much accuracy our Model has.***

---




In [40]:
# Saving our model.
model.save("Model")

In [43]:
# for Further use in working environment.
from pyspark.ml.classification import LogisticRegressionModel
model = LogisticRegressionModel.load('Model')

## **End.**