# PySpark - Diabetes Prediction - End to End ML Project

![](https://miro.medium.com/max/500/1*5C4UQznqEiN3D6Xutlgwlg.png)

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('spark').getOrCreate()

In [3]:
! git clone https://github.com/education454/diabetes_dataset

fatal: destination path 'diabetes_dataset' already exists and is not an empty directory.


In [4]:
df = spark.read.csv('diabetes_dataset/diabetes.csv', header=True, inferSchema=True)

In [5]:
df.show()

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|          2|    138|           62|           35|      0|33.6|                   0.127| 47|      1|
|          0|     84|           82|           31|    125|38.2|                   0.233| 23|      0|
|          0|    145|            0|            0|      0|44.2|                    0.63| 31|      1|
|          0|    135|           68|           42|    250|42.3|                   0.365| 24|      1|
|          1|    139|           62|           41|    480|40.7|                   0.536| 21|      0|
|          0|    173|           78|           32|    265|46.5|                   1.159| 58|      0|
|          4|     99|           72|           17|      0|25.6|                   0.294| 28|      0|


In [6]:
df.printSchema()

root
 |-- Pregnancies: integer (nullable = true)
 |-- Glucose: integer (nullable = true)
 |-- BloodPressure: integer (nullable = true)
 |-- SkinThickness: integer (nullable = true)
 |-- Insulin: integer (nullable = true)
 |-- BMI: double (nullable = true)
 |-- DiabetesPedigreeFunction: double (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Outcome: integer (nullable = true)



In [7]:
print((df.count(), len(df.columns)))

(2000, 9)


In [8]:
df.groupby('Outcome').count().show()

+-------+-----+
|Outcome|count|
+-------+-----+
|      1|  684|
|      0| 1316|
+-------+-----+



In [9]:
df.describe().show()

+-------+-----------------+------------------+------------------+-----------------+-----------------+------------------+------------------------+------------------+------------------+
|summary|      Pregnancies|           Glucose|     BloodPressure|    SkinThickness|          Insulin|               BMI|DiabetesPedigreeFunction|               Age|           Outcome|
+-------+-----------------+------------------+------------------+-----------------+-----------------+------------------+------------------------+------------------+------------------+
|  count|             2000|              2000|              2000|             2000|             2000|              2000|                    2000|              2000|              2000|
|   mean|           3.7035|          121.1825|           69.1455|           20.935|           80.254|32.192999999999984|     0.47092999999999974|           33.0905|             0.342|
| stddev|3.306063032730656|32.068635649902916|19.188314815604098|16.103242909926

### Data Cleaning & Preparation

In [10]:
for col in df.columns:
  print(col + ":" , df[df[col].isNull()].count())

Pregnancies: 0
Glucose: 0
BloodPressure: 0
SkinThickness: 0
Insulin: 0
BMI: 0
DiabetesPedigreeFunction: 0
Age: 0
Outcome: 0


In [11]:
def count_zeros():
  columns_list  = ['Glucose','BloodPressure','SkinThickness', 'Insulin','BMI']
  for col in columns_list:
    print(col + ':', df[df[col]==0].count() )

In [12]:
count_zeros()

Glucose: 13
BloodPressure: 90
SkinThickness: 573
Insulin: 956
BMI: 28


In [13]:
from pyspark.sql.functions import *

In [14]:
df.agg({'BMI':'mean'}).first()[0]

32.192999999999984

In [15]:
for col in df.columns[1:6]:
  data = df.agg({col:'mean'}).first()[0]
  print(f'Mean value for {col} is {int(data)}')
  df = df.withColumn(col, when(df[col]== 0, int(data)).otherwise(df[col]))

Mean value for Glucose is 121
Mean value for BloodPressure is 69
Mean value for SkinThickness is 20
Mean value for Insulin is 80
Mean value for BMI is 32


In [16]:
df.show()

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|          2|    138|           62|           35|     80|33.6|                   0.127| 47|      1|
|          0|     84|           82|           31|    125|38.2|                   0.233| 23|      0|
|          0|    145|           69|           20|     80|44.2|                    0.63| 31|      1|
|          0|    135|           68|           42|    250|42.3|                   0.365| 24|      1|
|          1|    139|           62|           41|    480|40.7|                   0.536| 21|      0|
|          0|    173|           78|           32|    265|46.5|                   1.159| 58|      0|
|          4|     99|           72|           17|     80|25.6|                   0.294| 28|      0|


### Correlation Analysis & Feature Selection

In [17]:
for col in df.columns[:8]:
  print(f'Correlation to target for {col} feature is {df.stat.corr("Outcome", col)}')

Correlation to target for Pregnancies feature is 0.22443699263363961
Correlation to target for Glucose feature is 0.48796646527321064
Correlation to target for BloodPressure feature is 0.17171333286446713
Correlation to target for SkinThickness feature is 0.1659010662889893
Correlation to target for Insulin feature is 0.1711763270226193
Correlation to target for BMI feature is 0.2827927569760082
Correlation to target for DiabetesPedigreeFunction feature is 0.1554590791569403
Correlation to target for Age feature is 0.23650924717620253


In [18]:
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=['Pregnancies','Glucose','BloodPressure',
                                       'SkinThickness','Insulin','BMI',
                                       'DiabetesPedigreeFunction','Age'],
                            outputCol='features')
output_data = assembler.transform(df)

In [19]:
output_data.printSchema()

root
 |-- Pregnancies: integer (nullable = true)
 |-- Glucose: integer (nullable = true)
 |-- BloodPressure: integer (nullable = true)
 |-- SkinThickness: integer (nullable = true)
 |-- Insulin: integer (nullable = true)
 |-- BMI: double (nullable = true)
 |-- DiabetesPedigreeFunction: double (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Outcome: integer (nullable = true)
 |-- features: vector (nullable = true)



In [20]:
output_data.show()

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+--------------------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|            features|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+--------------------+
|          2|    138|           62|           35|     80|33.6|                   0.127| 47|      1|[2.0,138.0,62.0,3...|
|          0|     84|           82|           31|    125|38.2|                   0.233| 23|      0|[0.0,84.0,82.0,31...|
|          0|    145|           69|           20|     80|44.2|                    0.63| 31|      1|[0.0,145.0,69.0,2...|
|          0|    135|           68|           42|    250|42.3|                   0.365| 24|      1|[0.0,135.0,68.0,4...|
|          1|    139|           62|           41|    480|40.7|                   0.536| 21|      0|[1.0,139.0,62.0,4...|
|          0|    173|           

### Build the Model

In [21]:
from pyspark.ml.classification import LogisticRegression

final_data = output_data.select(['features','Outcome'])

In [22]:
final_data.printSchema()

root
 |-- features: vector (nullable = true)
 |-- Outcome: integer (nullable = true)



In [23]:
train, test = final_data.randomSplit([0.7,0.3])
models = LogisticRegression(labelCol='Outcome')
model = models.fit(train)

In [24]:
summary = model.summary
summary.predictions.describe().show()

+-------+-------------------+------------------+
|summary|            Outcome|        prediction|
+-------+-------------------+------------------+
|  count|               1413|              1413|
|   mean|0.32767162066525124|0.2448690728945506|
| stddev| 0.4695305651187454|0.4301617891645938|
|    min|                0.0|               0.0|
|    max|                1.0|               1.0|
+-------+-------------------+------------------+



In [25]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

predictions = model.evaluate(test)

In [26]:
predictions.predictions.show(10)

+--------------------+-------+--------------------+--------------------+----------+
|            features|Outcome|       rawPrediction|         probability|prediction|
+--------------------+-------+--------------------+--------------------+----------+
|[0.0,57.0,60.0,20...|      0|[3.94560780883172...|[0.98102746052899...|       0.0|
|[0.0,73.0,69.0,20...|      0|[4.33156702723973...|[0.98702366916679...|       0.0|
|[0.0,84.0,64.0,22...|      0|[2.52989351531504...|[0.92621107612588...|       0.0|
|[0.0,84.0,82.0,31...|      0|[2.78630666914082...|[0.94193136123399...|       0.0|
|[0.0,86.0,68.0,32...|      0|[2.84586048932699...|[0.94510431135053...|       0.0|
|[0.0,91.0,68.0,32...|      0|[2.25721955053158...|[0.90527146201976...|       0.0|
|[0.0,93.0,60.0,20...|      0|[2.47762404590054...|[0.92255821922179...|       0.0|
|[0.0,93.0,60.0,20...|      0|[2.47762404590054...|[0.92255821922179...|       0.0|
|[0.0,93.0,60.0,25...|      0|[2.82851430358475...|[0.94419737430974...|    

In [27]:
evaluator = BinaryClassificationEvaluator(rawPredictionCol = 'rawPrediction', labelCol='Outcome')
evaluator.evaluate(model.transform(test))

0.8304032836337554

In [37]:
import json
# Save model parameters (coefficients, intercept, feature order)
params = {
    'coefficients': model.coefficients.toArray().tolist(),
    'intercept': float(model.intercept),
    'feature_cols': assembler.getInputCols()
}
with open('model_params.json', 'w') as f:
    json.dump(params, f)
print("Model parameters saved to model_params.json")

Model parameters saved to model_params.json


In [38]:
import json
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf
import math

class SimpleLRModel:
    def __init__(self, coefficients, intercept):
        self.coefficients = coefficients
        self.intercept = intercept
    def transform(self, df):
        coef = self.coefficients
        intercept = self.intercept
        def pred(v):
            s = intercept
            for i, c in enumerate(coef):
                s += float(v[i]) * c
            prob = 1.0 / (1.0 + math.exp(-s))
            return 1 if prob > 0.5 else 0
        pred_udf = udf(pred, IntegerType())
        return df.withColumn('prediction', pred_udf(df['features']))

with open('model_params.json', 'r') as f:
    params = json.load(f)
model2 = SimpleLRModel(params['coefficients'], params['intercept'])
print("Model loaded successfully (simple wrapper)")

Model loaded successfully (simple wrapper)


In [40]:
df_test = spark.read.csv('diabetes_dataset/new_test.csv', header=True, inferSchema=True)

In [41]:
df_test.printSchema()

root
 |-- Pregnancies: integer (nullable = true)
 |-- Glucose: integer (nullable = true)
 |-- BloodPressure: integer (nullable = true)
 |-- SkinThickness: integer (nullable = true)
 |-- Insulin: integer (nullable = true)
 |-- BMI: double (nullable = true)
 |-- DiabetesPedigreeFunction: double (nullable = true)
 |-- Age: integer (nullable = true)



In [42]:
test_data = assembler.transform(df_test)

In [43]:
test_data.printSchema()

root
 |-- Pregnancies: integer (nullable = true)
 |-- Glucose: integer (nullable = true)
 |-- BloodPressure: integer (nullable = true)
 |-- SkinThickness: integer (nullable = true)
 |-- Insulin: integer (nullable = true)
 |-- BMI: double (nullable = true)
 |-- DiabetesPedigreeFunction: double (nullable = true)
 |-- Age: integer (nullable = true)
 |-- features: vector (nullable = true)



In [44]:
results = model2.transform(test_data)
results.printSchema()

root
 |-- Pregnancies: integer (nullable = true)
 |-- Glucose: integer (nullable = true)
 |-- BloodPressure: integer (nullable = true)
 |-- SkinThickness: integer (nullable = true)
 |-- Insulin: integer (nullable = true)
 |-- BMI: double (nullable = true)
 |-- DiabetesPedigreeFunction: double (nullable = true)
 |-- Age: integer (nullable = true)
 |-- features: vector (nullable = true)
 |-- prediction: integer (nullable = true)



In [45]:
results.select(['features','prediction']).show()

+--------------------+----------+
|            features|prediction|
+--------------------+----------+
|[1.0,190.0,78.0,3...|         1|
|[0.0,80.0,84.0,36...|         0|
|[2.0,138.0,82.0,4...|         1|
|[1.0,110.0,63.0,4...|         1|
+--------------------+----------+



----------------------------------------
Exception occurred during processing of request from ('127.0.0.1', 59704)
Traceback (most recent call last):
  File "c:\Users\sinus\AppData\Local\Python\pythoncore-3.14-64\Lib\socketserver.py", line 318, in _handle_request_noblock
    self.process_request(request, client_address)
    ~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^
  File "c:\Users\sinus\AppData\Local\Python\pythoncore-3.14-64\Lib\socketserver.py", line 349, in process_request
    self.finish_request(request, client_address)
    ~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^
  File "c:\Users\sinus\AppData\Local\Python\pythoncore-3.14-64\Lib\socketserver.py", line 362, in finish_request
    self.RequestHandlerClass(request, client_address, self)
    ~~~~~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "c:\Users\sinus\AppData\Local\Python\pythoncore-3.14-64\Lib\socketserver.py", line 766, in __init__
    self.handle()
    ~~~~~~~~~~~^^
  File "c:\Users\sinus\AppData\Local\Pyt