# **Install Dependencies**

In [1]:
! pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m3.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488493 sha256=b3f644634eecb3d985ee59f23d53a1e5d696d4da6c9967aee14960efdc7d92d5
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


# **Run Spark Session**

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Diabetes_Prediction').getOrCreate()

# **Load Diabetes Dataset**

In [4]:
df = spark.read.csv('/content/diabetes.csv', header=True, inferSchema=True)

In [5]:
df.show()

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|          6|    148|           72|           35|      0|33.6|                   0.627| 50|      1|
|          1|     85|           66|           29|      0|26.6|                   0.351| 31|      0|
|          8|    183|           64|            0|      0|23.3|                   0.672| 32|      1|
|          1|     89|           66|           23|     94|28.1|                   0.167| 21|      0|
|          0|    137|           40|           35|    168|43.1|                   2.288| 33|      1|
|          5|    116|           74|            0|      0|25.6|                   0.201| 30|      0|
|          3|     78|           50|           32|     88|31.0|                   0.248| 26|      1|


In [6]:
df.printSchema()

root
 |-- Pregnancies: integer (nullable = true)
 |-- Glucose: integer (nullable = true)
 |-- BloodPressure: integer (nullable = true)
 |-- SkinThickness: integer (nullable = true)
 |-- Insulin: integer (nullable = true)
 |-- BMI: double (nullable = true)
 |-- DiabetesPedigreeFunction: double (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Outcome: integer (nullable = true)



In [7]:
df.groupby('outcome').count().show()

+-------+-----+
|outcome|count|
+-------+-----+
|      1|  268|
|      0|  500|
+-------+-----+



In [8]:
df.describe().show()

+-------+------------------+-----------------+------------------+------------------+------------------+------------------+------------------------+------------------+------------------+
|summary|       Pregnancies|          Glucose|     BloodPressure|     SkinThickness|           Insulin|               BMI|DiabetesPedigreeFunction|               Age|           Outcome|
+-------+------------------+-----------------+------------------+------------------+------------------+------------------+------------------------+------------------+------------------+
|  count|               768|              768|               768|               768|               768|               768|                     768|               768|               768|
|   mean|3.8450520833333335|     120.89453125|       69.10546875|20.536458333333332| 79.79947916666667|31.992578124999977|      0.4718763020833327|33.240885416666664|0.3489583333333333|
| stddev|  3.36957806269887|31.97261819513622|19.355807170644777|15.95

# **Cleaning Data**

In [9]:
for col in df.columns:
  print(col+":",df[df[col].isNull()].count())

Pregnancies: 0
Glucose: 0
BloodPressure: 0
SkinThickness: 0
Insulin: 0
BMI: 0
DiabetesPedigreeFunction: 0
Age: 0
Outcome: 0


In [10]:
def count_zeros():
  columns_list =['Glucose','BloodPressure','SkinThickness','Insulin','BMI']
  for i in columns_list:
    print(i+":",df[df[i]==0].count())

In [11]:
count_zeros()

Glucose: 5
BloodPressure: 35
SkinThickness: 227
Insulin: 374
BMI: 11


In [12]:
from pyspark.sql.functions import *
for i in df.columns[1:6]:
  data = df.agg({i:'mean'}).first()[0]
  print("Mean value for {} is {}".format(i,int(data)))
  df = df.withColumn(i,when(df[i]==0,int(data)).otherwise(df[i]))

Mean value for Glucose is 120
Mean value for BloodPressure is 69
Mean value for SkinThickness is 20
Mean value for Insulin is 79
Mean value for BMI is 31


In [13]:
df.describe().show()

+-------+------------------+----------------+------------------+------------------+------------------+------------------+------------------------+------------------+------------------+
|summary|       Pregnancies|         Glucose|     BloodPressure|     SkinThickness|           Insulin|               BMI|DiabetesPedigreeFunction|               Age|           Outcome|
+-------+------------------+----------------+------------------+------------------+------------------+------------------+------------------------+------------------+------------------+
|  count|               768|             768|               768|               768|               768|               768|                     768|               768|               768|
|   mean|3.8450520833333335|    121.67578125|             72.25|26.447916666666668|118.27083333333333|32.436588541666644|      0.4718763020833327|33.240885416666664|0.3489583333333333|
| stddev|  3.36957806269887|30.4362515773519|12.117202529118758| 9.73387187

In [14]:
df.show()

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|          6|    148|           72|           35|     79|33.6|                   0.627| 50|      1|
|          1|     85|           66|           29|     79|26.6|                   0.351| 31|      0|
|          8|    183|           64|           20|     79|23.3|                   0.672| 32|      1|
|          1|     89|           66|           23|     94|28.1|                   0.167| 21|      0|
|          0|    137|           40|           35|    168|43.1|                   2.288| 33|      1|
|          5|    116|           74|           20|     79|25.6|                   0.201| 30|      0|
|          3|     78|           50|           32|     88|31.0|                   0.248| 26|      1|


# **CORRELATION**

In [15]:
for col in df.columns:
  print("correlation to outcome for {} is {}".format(col,df.stat.corr('Outcome',col)))

correlation to outcome for Pregnancies is 0.22189815303398638
correlation to outcome for Glucose is 0.49288410274882094
correlation to outcome for BloodPressure is 0.16287909949861834
correlation to outcome for SkinThickness is 0.171856814176564
correlation to outcome for Insulin is 0.17869558803050842
correlation to outcome for BMI is 0.31289043493401536
correlation to outcome for DiabetesPedigreeFunction is 0.17384406565296007
correlation to outcome for Age is 0.23835598302719757
correlation to outcome for Outcome is 1.0


# **FEATURE SELECTION**

In [16]:
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=['Pregnancies','Glucose','BloodPressure','SkinThickness','Insulin','BMI','DiabetesPedigreeFunction','Age'],outputCol='features')
output_data = assembler.transform(df)

In [17]:
output_data.printSchema()

root
 |-- Pregnancies: integer (nullable = true)
 |-- Glucose: integer (nullable = true)
 |-- BloodPressure: integer (nullable = true)
 |-- SkinThickness: integer (nullable = true)
 |-- Insulin: integer (nullable = true)
 |-- BMI: double (nullable = true)
 |-- DiabetesPedigreeFunction: double (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Outcome: integer (nullable = true)
 |-- features: vector (nullable = true)



In [18]:
output_data.show()

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+--------------------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|            features|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+--------------------+
|          6|    148|           72|           35|     79|33.6|                   0.627| 50|      1|[6.0,148.0,72.0,3...|
|          1|     85|           66|           29|     79|26.6|                   0.351| 31|      0|[1.0,85.0,66.0,29...|
|          8|    183|           64|           20|     79|23.3|                   0.672| 32|      1|[8.0,183.0,64.0,2...|
|          1|     89|           66|           23|     94|28.1|                   0.167| 21|      0|[1.0,89.0,66.0,23...|
|          0|    137|           40|           35|    168|43.1|                   2.288| 33|      1|[0.0,137.0,40.0,3...|
|          5|    116|           

# **BUILD AND TRAIN MODEL**

In [19]:
from pyspark.ml.classification import LogisticRegression
final_data = output_data.select('features','Outcome')

In [20]:
final_data.printSchema()

root
 |-- features: vector (nullable = true)
 |-- Outcome: integer (nullable = true)



In [21]:
train , test = final_data.randomSplit([0.7,0.3])
models = LogisticRegression(labelCol='Outcome')
model = models.fit(train)

In [22]:
summary = model.summary

In [23]:
summary.predictions.describe().show()

+-------+-------------------+-------------------+
|summary|            Outcome|         prediction|
+-------+-------------------+-------------------+
|  count|                548|                548|
|   mean| 0.3613138686131387|0.28832116788321166|
| stddev|0.48082016693713303| 0.4533951859599413|
|    min|                0.0|                0.0|
|    max|                1.0|                1.0|
+-------+-------------------+-------------------+



#  **EVALUATION AND TEST MODEL**

In [24]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
predictions = model.evaluate(test)

In [25]:
predictions.predictions.show(20)

+--------------------+-------+--------------------+--------------------+----------+
|            features|Outcome|       rawPrediction|         probability|prediction|
+--------------------+-------+--------------------+--------------------+----------+
|[0.0,78.0,88.0,29...|      0|[2.81870220640832...|[0.94367812876750...|       0.0|
|[0.0,91.0,68.0,32...|      0|[2.20127246013034...|[0.90036372003640...|       0.0|
|[0.0,91.0,80.0,20...|      0|[2.47908755946855...|[0.92266271465139...|       0.0|
|[0.0,93.0,100.0,3...|      0|[0.90041509429682...|[0.71103479716062...|       0.0|
|[0.0,94.0,69.0,20...|      0|[2.86213851582850...|[0.94594275699161...|       0.0|
|[0.0,97.0,64.0,36...|      0|[1.84390180070618...|[0.86340951335387...|       0.0|
|[0.0,99.0,69.0,20...|      0|[3.40747469625967...|[0.96793732310644...|       0.0|
|[0.0,101.0,65.0,2...|      0|[3.45259290766703...|[0.96930837268021...|       0.0|
|[0.0,101.0,76.0,2...|      0|[2.15588872872606...|[0.89621777556939...|    

In [26]:
evaluator = BinaryClassificationEvaluator(rawPredictionCol='rawPrediction', labelCol='Outcome')
evaluator.evaluate(model.transform(test))

0.8089523809523802

In [27]:
print("Test Error = %g" % (1.0 - evaluator.evaluate(model.transform(test))))

Test Error = 0.191048


In [28]:
type(predictions.predictions)

In [29]:
from sklearn.metrics import classification_report, confusion_matrix

y_true = predictions.predictions.select(['Outcome']).collect()
y_pred = predictions.predictions.select(['prediction']).collect()

print(classification_report(y_true, y_pred))
print(confusion_matrix(y_true,y_pred))


              precision    recall  f1-score   support

           0       0.81      0.82      0.82       150
           1       0.61      0.60      0.60        70

    accuracy                           0.75       220
   macro avg       0.71      0.71      0.71       220
weighted avg       0.75      0.75      0.75       220

[[123  27]
 [ 28  42]]


In [30]:
model.save("model")

In [31]:
from pyspark.ml.classification import LogisticRegressionModel
model = LogisticRegressionModel.load('model')