In [57]:
from pyspark.sql import SparkSession
import numpy as np
import pandas as pd
import os
os.environ['JAVA_HOME'] = 'C:\Program Files\Java\jdk1.8.0_101'

In [58]:
spark = SparkSession.builder.appName("spark").getOrCreate()

In [59]:
df = spark.read.csv("diabetes.csv",header=True,inferSchema=True)

In [60]:
df.show()

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|          2|    138|           62|           35|      0|33.6|                   0.127| 47|      1|
|          0|     84|           82|           31|    125|38.2|                   0.233| 23|      0|
|          0|    145|            0|            0|      0|44.2|                    0.63| 31|      1|
|          0|    135|           68|           42|    250|42.3|                   0.365| 24|      1|
|          1|    139|           62|           41|    480|40.7|                   0.536| 21|      0|
|          0|    173|           78|           32|    265|46.5|                   1.159| 58|      0|
|          4|     99|           72|           17|      0|25.6|                   0.294| 28|      0|


In [61]:
df.printSchema()

root
 |-- Pregnancies: integer (nullable = true)
 |-- Glucose: integer (nullable = true)
 |-- BloodPressure: integer (nullable = true)
 |-- SkinThickness: integer (nullable = true)
 |-- Insulin: integer (nullable = true)
 |-- BMI: double (nullable = true)
 |-- DiabetesPedigreeFunction: double (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Outcome: integer (nullable = true)



In [62]:
print((df.count(),len(df.columns)))

(2000, 9)


In [63]:
df.groupby('Outcome').count().show()

+-------+-----+
|Outcome|count|
+-------+-----+
|      1|  684|
|      0| 1316|
+-------+-----+



In [64]:
df.describe().show()

+-------+-----------------+------------------+------------------+-----------------+-----------------+------------------+------------------------+------------------+------------------+
|summary|      Pregnancies|           Glucose|     BloodPressure|    SkinThickness|          Insulin|               BMI|DiabetesPedigreeFunction|               Age|           Outcome|
+-------+-----------------+------------------+------------------+-----------------+-----------------+------------------+------------------------+------------------+------------------+
|  count|             2000|              2000|              2000|             2000|             2000|              2000|                    2000|              2000|              2000|
|   mean|           3.7035|          121.1825|           69.1455|           20.935|           80.254|32.192999999999984|     0.47092999999999974|           33.0905|             0.342|
| stddev|3.306063032730656|32.068635649902916|19.188314815604098|16.103242909926

In [65]:
for col in df.columns:
    print(col+":",df[df[col].isNull()].count())

Pregnancies: 0
Glucose: 0
BloodPressure: 0
SkinThickness: 0
Insulin: 0
BMI: 0
DiabetesPedigreeFunction: 0
Age: 0
Outcome: 0


In [66]:
def count_zeros():
    columns_list = ['Glucose','BloodPressure','SkinThickness','Insulin','BMI']
    for i in columns_list:
        print(i+":",df[df[i]==0].count())
count_zeros()        

Glucose: 13
BloodPressure: 90
SkinThickness: 573
Insulin: 956
BMI: 28


In [67]:
from pyspark.sql.functions import *

for i in df.columns[1:6]:
    data = df.agg({i:'mean'}).first()[0]
    print("Mean  value for  {} is {}".format(i,int(data)))
    
    df = df.withColumn(i,when(df[i]==0,int(data)).otherwise(df[i]))

Mean  value for  Glucose is 121
Mean  value for  BloodPressure is 69
Mean  value for  SkinThickness is 20
Mean  value for  Insulin is 80
Mean  value for  BMI is 32


In [68]:
df.show()

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|          2|    138|           62|           35|     80|33.6|                   0.127| 47|      1|
|          0|     84|           82|           31|    125|38.2|                   0.233| 23|      0|
|          0|    145|           69|           20|     80|44.2|                    0.63| 31|      1|
|          0|    135|           68|           42|    250|42.3|                   0.365| 24|      1|
|          1|    139|           62|           41|    480|40.7|                   0.536| 21|      0|
|          0|    173|           78|           32|    265|46.5|                   1.159| 58|      0|
|          4|     99|           72|           17|     80|25.6|                   0.294| 28|      0|


In [69]:
from pyspark.sql.functions import *
for col in df.columns:
    print("Coorelation to Outcomes for  {} is {} ".format(col,df.stat.corr('Outcome',col)))

Coorelation to Outcomes for  Pregnancies is 0.22443699263363961 
Coorelation to Outcomes for  Glucose is 0.48796646527321064 
Coorelation to Outcomes for  BloodPressure is 0.17171333286446713 
Coorelation to Outcomes for  SkinThickness is 0.1659010662889893 
Coorelation to Outcomes for  Insulin is 0.1711763270226193 
Coorelation to Outcomes for  BMI is 0.2827927569760082 
Coorelation to Outcomes for  DiabetesPedigreeFunction is 0.1554590791569403 
Coorelation to Outcomes for  Age is 0.23650924717620253 
Coorelation to Outcomes for  Outcome is 1.0 


In [70]:
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=['Pregnancies','Glucose','BloodPressure','SkinThickness','Insulin','BMI','DiabetesPedigreeFunction','Age'],outputCol='features')
output_data = assembler.transform(df)

In [71]:
output_data.printSchema()

root
 |-- Pregnancies: integer (nullable = true)
 |-- Glucose: integer (nullable = true)
 |-- BloodPressure: integer (nullable = true)
 |-- SkinThickness: integer (nullable = true)
 |-- Insulin: integer (nullable = true)
 |-- BMI: double (nullable = true)
 |-- DiabetesPedigreeFunction: double (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Outcome: integer (nullable = true)
 |-- features: vector (nullable = true)



In [72]:
output_data.show()

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+--------------------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|            features|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+--------------------+
|          2|    138|           62|           35|     80|33.6|                   0.127| 47|      1|[2.0,138.0,62.0,3...|
|          0|     84|           82|           31|    125|38.2|                   0.233| 23|      0|[0.0,84.0,82.0,31...|
|          0|    145|           69|           20|     80|44.2|                    0.63| 31|      1|[0.0,145.0,69.0,2...|
|          0|    135|           68|           42|    250|42.3|                   0.365| 24|      1|[0.0,135.0,68.0,4...|
|          1|    139|           62|           41|    480|40.7|                   0.536| 21|      0|[1.0,139.0,62.0,4...|
|          0|    173|           

In [73]:
from pyspark.ml.classification import LogisticRegression
final_data = output_data.select('features','Outcome')

In [74]:
final_data.printSchema()

root
 |-- features: vector (nullable = true)
 |-- Outcome: integer (nullable = true)



In [75]:
train , test = final_data.randomSplit([0.7,0.3])
models = LogisticRegression(labelCol='Outcome')
model = models.fit(train)

In [76]:
summary = model.summary

In [77]:
summary.predictions.describe().show()

+-------+------------------+-------------------+
|summary|           Outcome|         prediction|
+-------+------------------+-------------------+
|  count|              1405|               1405|
|   mean|0.3323843416370107| 0.2498220640569395|
| stddev|0.4712356555477142|0.43306406452829255|
|    min|               0.0|                0.0|
|    max|               1.0|                1.0|
+-------+------------------+-------------------+



In [78]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
predictions = model.evaluate(test)

In [79]:
predictions.predictions.show(20)

+--------------------+-------+--------------------+--------------------+----------+
|            features|Outcome|       rawPrediction|         probability|prediction|
+--------------------+-------+--------------------+--------------------+----------+
|[0.0,67.0,76.0,20...|      0|[2.02566123377385...|[0.88346512509350...|       0.0|
|[0.0,73.0,69.0,20...|      0|[4.34158744282428...|[0.98715138575666...|       0.0|
|[0.0,74.0,52.0,10...|      0|[3.54138744991223...|[0.97184270371704...|       0.0|
|[0.0,74.0,52.0,10...|      0|[3.54138744991223...|[0.97184270371704...|       0.0|
|[0.0,84.0,64.0,22...|      0|[2.33252853400971...|[0.91153544641202...|       0.0|
|[0.0,84.0,64.0,22...|      0|[2.33252853400971...|[0.91153544641202...|       0.0|
|[0.0,84.0,82.0,31...|      0|[2.56771450807437...|[0.92875461421466...|       0.0|
|[0.0,91.0,68.0,32...|      0|[2.06980453045052...|[0.88793351221408...|       0.0|
|[0.0,91.0,80.0,20...|      0|[2.33971207666541...|[0.91211300714857...|    

In [80]:
evaluator = BinaryClassificationEvaluator(rawPredictionCol='rawPrediction', labelCol='Outcome')
evaluator.evaluate(model.transform(test))

0.8368322239289984

In [81]:
train.count()

train.explain(extended=True)

== Parsed Logical Plan ==
Sample 0.0, 0.7, false, 971595430145716579
+- Sort [features#4029 ASC NULLS FIRST, Outcome#2852 ASC NULLS FIRST], false
   +- Project [features#4029, Outcome#2852]
      +- Project [Pregnancies#2844, Glucose#3725, BloodPressure#3752, SkinThickness#3779, Insulin#3806, BMI#3833, DiabetesPedigreeFunction#2850, Age#2851, Outcome#2852, UDF(struct(Pregnancies_double_VectorAssembler_95b8b81ac381, cast(Pregnancies#2844 as double), Glucose_double_VectorAssembler_95b8b81ac381, cast(Glucose#3725 as double), BloodPressure_double_VectorAssembler_95b8b81ac381, cast(BloodPressure#3752 as double), SkinThickness_double_VectorAssembler_95b8b81ac381, cast(SkinThickness#3779 as double), Insulin_double_VectorAssembler_95b8b81ac381, cast(Insulin#3806 as double), BMI, BMI#3833, DiabetesPedigreeFunction, DiabetesPedigreeFunction#2850, Age_double_VectorAssembler_95b8b81ac381, cast(Age#2851 as double))) AS features#4029]
         +- Project [Pregnancies#2844, Glucose#3725, BloodPressur

In [None]:
model.save('Model')

In [None]:
from pyspark.ml.classification import LogisticRegressionModel
model = LogisticRegressionModel.load('model')