 # Diabetes Prediction 

In [1]:
! pip install pyspark



In [4]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('spark').getOrCreate()

In [5]:
df = spark.read.csv("/Users/srivarshinig/Downloads/diabetes.csv", header=True, inferSchema=True)

In [6]:
df.show()

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|          6|    148|           72|           35|      0|33.6|                   0.627| 50|      1|
|          1|     85|           66|           29|      0|26.6|                   0.351| 31|      0|
|          8|    183|           64|            0|      0|23.3|                   0.672| 32|      1|
|          1|     89|           66|           23|     94|28.1|                   0.167| 21|      0|
|          0|    137|           40|           35|    168|43.1|                   2.288| 33|      1|
|          5|    116|           74|            0|      0|25.6|                   0.201| 30|      0|
|          3|     78|           50|           32|     88|31.0|                   0.248| 26|      1|


In [7]:
df.printSchema()

root
 |-- Pregnancies: integer (nullable = true)
 |-- Glucose: integer (nullable = true)
 |-- BloodPressure: integer (nullable = true)
 |-- SkinThickness: integer (nullable = true)
 |-- Insulin: integer (nullable = true)
 |-- BMI: double (nullable = true)
 |-- DiabetesPedigreeFunction: double (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Outcome: integer (nullable = true)



In [8]:
print((df.count(), len(df.columns)))

(768, 9)


In [9]:
df.groupby('Outcome').count().show()

+-------+-----+
|Outcome|count|
+-------+-----+
|      1|  268|
|      0|  500|
+-------+-----+



In [15]:
df.describe().show()

+-------+------------------+-----------------+------------------+------------------+------------------+------------------+------------------------+------------------+------------------+
|summary|       Pregnancies|          Glucose|     BloodPressure|     SkinThickness|           Insulin|               BMI|DiabetesPedigreeFunction|               Age|           Outcome|
+-------+------------------+-----------------+------------------+------------------+------------------+------------------+------------------------+------------------+------------------+
|  count|               768|              768|               768|               768|               768|               768|                     768|               768|               768|
|   mean|3.8450520833333335|     120.89453125|       69.10546875|20.536458333333332| 79.79947916666667|31.992578124999977|      0.4718763020833327|33.240885416666664|0.3489583333333333|
| stddev|  3.36957806269887|31.97261819513622|19.355807170644777|15.95

### Data Cleaning & Preparation

In [16]:
for col in df.columns:
  print(col + ":" , df[df[col].isNull()].count())

Pregnancies: 0
Glucose: 0
BloodPressure: 0
SkinThickness: 0
Insulin: 0
BMI: 0
DiabetesPedigreeFunction: 0
Age: 0
Outcome: 0


In [17]:
def count_zeros():
  columns_list  = ['Glucose','BloodPressure','SkinThickness', 'Insulin','BMI']
  for col in columns_list:
    print(col + ':', df[df[col]==0].count() )

In [18]:
count_zeros()

Glucose: 5
BloodPressure: 35
SkinThickness: 227
Insulin: 374
BMI: 11


In [19]:
from pyspark.sql.functions import *

In [20]:
df.agg({'BMI':'mean'}).first()[0]

31.992578124999977

In [21]:
for col in df.columns[1:6]:
  data = df.agg({col:'mean'}).first()[0]
  print(f'Mean value for {col} is {int(data)}')
  df = df.withColumn(col, when(df[col]== 0, int(data)).otherwise(df[col]))

Mean value for Glucose is 120
Mean value for BloodPressure is 69
Mean value for SkinThickness is 20
Mean value for Insulin is 79
Mean value for BMI is 31


In [22]:
df.show()

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|          6|    148|           72|           35|     79|33.6|                   0.627| 50|      1|
|          1|     85|           66|           29|     79|26.6|                   0.351| 31|      0|
|          8|    183|           64|           20|     79|23.3|                   0.672| 32|      1|
|          1|     89|           66|           23|     94|28.1|                   0.167| 21|      0|
|          0|    137|           40|           35|    168|43.1|                   2.288| 33|      1|
|          5|    116|           74|           20|     79|25.6|                   0.201| 30|      0|
|          3|     78|           50|           32|     88|31.0|                   0.248| 26|      1|


### Correlation Analysis & Feature Selection

In [23]:
for col in df.columns[:8]:
  print(f'Correlation to target for {col} feature is {df.stat.corr("Outcome", col)}')

Correlation to target for Pregnancies feature is 0.22189815303398638
Correlation to target for Glucose feature is 0.49288410274882094
Correlation to target for BloodPressure feature is 0.16287909949861834
Correlation to target for SkinThickness feature is 0.171856814176564
Correlation to target for Insulin feature is 0.17869558803050842
Correlation to target for BMI feature is 0.31289043493401536
Correlation to target for DiabetesPedigreeFunction feature is 0.17384406565296007
Correlation to target for Age feature is 0.23835598302719757


In [24]:
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=['Pregnancies','Glucose','BloodPressure',
                                       'SkinThickness','Insulin','BMI',
                                       'DiabetesPedigreeFunction','Age'],
                            outputCol='features')
output_data = assembler.transform(df)

In [25]:
output_data.printSchema()

root
 |-- Pregnancies: integer (nullable = true)
 |-- Glucose: integer (nullable = true)
 |-- BloodPressure: integer (nullable = true)
 |-- SkinThickness: integer (nullable = true)
 |-- Insulin: integer (nullable = true)
 |-- BMI: double (nullable = true)
 |-- DiabetesPedigreeFunction: double (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Outcome: integer (nullable = true)
 |-- features: vector (nullable = true)



In [26]:
output_data.show()

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+--------------------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|            features|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+--------------------+
|          6|    148|           72|           35|     79|33.6|                   0.627| 50|      1|[6.0,148.0,72.0,3...|
|          1|     85|           66|           29|     79|26.6|                   0.351| 31|      0|[1.0,85.0,66.0,29...|
|          8|    183|           64|           20|     79|23.3|                   0.672| 32|      1|[8.0,183.0,64.0,2...|
|          1|     89|           66|           23|     94|28.1|                   0.167| 21|      0|[1.0,89.0,66.0,23...|
|          0|    137|           40|           35|    168|43.1|                   2.288| 33|      1|[0.0,137.0,40.0,3...|
|          5|    116|           

### Build the Model

In [27]:
from pyspark.ml.classification import LogisticRegression

final_data = output_data.select(['features','Outcome'])

In [28]:
final_data.printSchema()

root
 |-- features: vector (nullable = true)
 |-- Outcome: integer (nullable = true)



In [45]:
train, test = final_data.randomSplit([0.7,0.3])
models = LogisticRegression(labelCol='Outcome')
model = models.fit(train)

In [46]:
summary = model.summary
summary.predictions.describe().show()

+-------+-------------------+-------------------+
|summary|            Outcome|         prediction|
+-------+-------------------+-------------------+
|  count|                542|                542|
|   mean|0.36531365313653136| 0.3062730627306273|
| stddev|0.48196282420614084|0.46137036014203325|
|    min|                0.0|                0.0|
|    max|                1.0|                1.0|
+-------+-------------------+-------------------+



In [47]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

predictions = model.evaluate(test)

In [48]:
predictions.predictions.show(10)

+--------------------+-------+--------------------+--------------------+----------+
|            features|Outcome|       rawPrediction|         probability|prediction|
+--------------------+-------+--------------------+--------------------+----------+
|[0.0,67.0,76.0,20...|      0|[2.54450236514653...|[0.92720330935037...|       0.0|
|[0.0,84.0,82.0,31...|      0|[2.88302073380646...|[0.94700068029335...|       0.0|
|[0.0,95.0,80.0,45...|      0|[2.33200853528915...|[0.91149350548116...|       0.0|
|[0.0,97.0,64.0,36...|      0|[1.68887931345106...|[0.84407672183194...|       0.0|
|[0.0,99.0,69.0,20...|      0|[3.29251286634386...|[0.96417106297346...|       0.0|
|[0.0,102.0,86.0,1...|      0|[2.38952866447742...|[0.91602531869977...|       0.0|
|[0.0,104.0,76.0,2...|      0|[3.27906535791165...|[0.96370360497604...|       0.0|
|[0.0,105.0,68.0,2...|      0|[3.51730461309939...|[0.97117614775439...|       0.0|
|[0.0,111.0,65.0,2...|      0|[2.13491103160625...|[0.89425032827860...|    

In [49]:
evaluator = BinaryClassificationEvaluator(rawPredictionCol = 'rawPrediction', labelCol='Outcome')
evaluator.evaluate(model.transform(test))

0.8481684981684984

In [50]:
model.write().overwrite().save('model')


In [51]:
from pyspark.ml.classification import LogisticRegressionModel

model2 = LogisticRegressionModel.load('model')


In [52]:
import os

In [53]:
current_dir = os.getcwd()

In [54]:
file_path = os.path.join(current_dir, 'diabetes_dataset', 'new_test.csv')

In [55]:
df_test = spark.read.csv(file_path, header=True, inferSchema=True)

In [56]:
df_test.printSchema()

root
 |-- Pregnancies: integer (nullable = true)
 |-- Glucose: integer (nullable = true)
 |-- BloodPressure: integer (nullable = true)
 |-- SkinThickness: integer (nullable = true)
 |-- Insulin: integer (nullable = true)
 |-- BMI: double (nullable = true)
 |-- DiabetesPedigreeFunction: double (nullable = true)
 |-- Age: integer (nullable = true)



In [57]:
test_data = assembler.transform(df_test)

In [58]:
test_data.printSchema()

root
 |-- Pregnancies: integer (nullable = true)
 |-- Glucose: integer (nullable = true)
 |-- BloodPressure: integer (nullable = true)
 |-- SkinThickness: integer (nullable = true)
 |-- Insulin: integer (nullable = true)
 |-- BMI: double (nullable = true)
 |-- DiabetesPedigreeFunction: double (nullable = true)
 |-- Age: integer (nullable = true)
 |-- features: vector (nullable = true)



In [59]:
results = model2.transform(test_data)
results.printSchema()

root
 |-- Pregnancies: integer (nullable = true)
 |-- Glucose: integer (nullable = true)
 |-- BloodPressure: integer (nullable = true)
 |-- SkinThickness: integer (nullable = true)
 |-- Insulin: integer (nullable = true)
 |-- BMI: double (nullable = true)
 |-- DiabetesPedigreeFunction: double (nullable = true)
 |-- Age: integer (nullable = true)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



In [60]:
results.select(['features','prediction']).show()

+--------------------+----------+
|            features|prediction|
+--------------------+----------+
|[1.0,190.0,78.0,3...|       1.0|
|[0.0,80.0,84.0,36...|       0.0|
|[2.0,138.0,82.0,4...|       1.0|
|[1.0,110.0,63.0,4...|       1.0|
+--------------------+----------+

