# TASK 1 : Install Dependencies & Run Spark Session

In [1]:
#install pyspark
! pip install pyspark



In [12]:
from pyspark.sql.functions import col,when,count

In [2]:
#create a sparksession
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Diabetes_Prediction").getOrCreate()
spark

# TASK 2: Clone & Explore dataset

In [3]:
#clone the diabetes dataset from the github repository
! git clone https://github.com/education454/diabetes_dataset

Cloning into 'diabetes_dataset'...
remote: Enumerating objects: 6, done.[K
remote: Counting objects: 100% (6/6), done.[K
remote: Compressing objects: 100% (5/5), done.[K
remote: Total 6 (delta 0), reused 0 (delta 0), pack-reused 0 (from 0)[K
Receiving objects: 100% (6/6), 13.02 KiB | 6.51 MiB/s, done.


In [5]:
#check if the dataset exists
! ls diabetes_dataset

diabetes.csv  new_test.csv


In [52]:
#create spark dataframe
path = '/content/diabetes_dataset/diabetes.csv'
df = spark.read.csv(path,inferSchema=True,header = True)

In [53]:
#display the dataframe
df.show()

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|          2|    138|           62|           35|      0|33.6|                   0.127| 47|      1|
|          0|     84|           82|           31|    125|38.2|                   0.233| 23|      0|
|          0|    145|            0|            0|      0|44.2|                    0.63| 31|      1|
|          0|    135|           68|           42|    250|42.3|                   0.365| 24|      1|
|          1|    139|           62|           41|    480|40.7|                   0.536| 21|      0|
|          0|    173|           78|           32|    265|46.5|                   1.159| 58|      0|
|          4|     99|           72|           17|      0|25.6|                   0.294| 28|      0|


In [54]:
#print the schema
df.printSchema()

root
 |-- Pregnancies: integer (nullable = true)
 |-- Glucose: integer (nullable = true)
 |-- BloodPressure: integer (nullable = true)
 |-- SkinThickness: integer (nullable = true)
 |-- Insulin: integer (nullable = true)
 |-- BMI: double (nullable = true)
 |-- DiabetesPedigreeFunction: double (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Outcome: integer (nullable = true)



In [15]:
#count the total no. of diabetic and non-diabetic class
df.select('Outcome').groupby('Outcome').count().show()

+-------+-----+
|Outcome|count|
+-------+-----+
|      1|  684|
|      0| 1316|
+-------+-----+



In [18]:
df.count(),len(df.columns)

(2000, 9)

In [17]:
#get the summary statistics
df.describe().show()

+-------+-----------------+------------------+------------------+-----------------+-----------------+------------------+------------------------+------------------+------------------+
|summary|      Pregnancies|           Glucose|     BloodPressure|    SkinThickness|          Insulin|               BMI|DiabetesPedigreeFunction|               Age|           Outcome|
+-------+-----------------+------------------+------------------+-----------------+-----------------+------------------+------------------------+------------------+------------------+
|  count|             2000|              2000|              2000|             2000|             2000|              2000|                    2000|              2000|              2000|
|   mean|           3.7035|          121.1825|           69.1455|           20.935|           80.254|32.192999999999984|     0.47092999999999974|           33.0905|             0.342|
| stddev|3.306063032730656|32.068635649902916|19.188314815604098|16.103242909926

# TASK 3: Data Cleaning & Preparation

In [55]:
#check for null values
for column in df.columns:
  nullcount = df.filter(col(column).isNull()).count()
  print(f"Null count for {column}:{nullcount}")

Null count for Pregnancies:0
Null count for Glucose:0
Null count for BloodPressure:0
Null count for SkinThickness:0
Null count for Insulin:0
Null count for BMI:0
Null count for DiabetesPedigreeFunction:0
Null count for Age:0
Null count for Outcome:0


In [57]:
#look for the unnecessary values present
def count_zeros():
 columns_list = ['Glucose','BloodPressure','SkinThickness','Insulin','BMI']
 for column in columns_list:
  print({column},df.filter(col(column)==0).count())

In [58]:
#or
def count_Zeros():
 columns_list = ['Glucose','BloodPressure','SkinThickness','Insulin','BMI']
 for column in columns_list:
  print(column +":",df[df[column]==0].count())

In [59]:
count_zeros()

{'Glucose'} 13
{'BloodPressure'} 90
{'SkinThickness'} 573
{'Insulin'} 956
{'BMI'} 28


In [60]:
count_Zeros()

Glucose: 13
BloodPressure: 90
SkinThickness: 573
Insulin: 956
BMI: 28


In [38]:
#calculate and replace the unnecessary values by the mean value
from pyspark.ml.feature import Imputer,StandardScaler

In [49]:
for i in df.columns[1:6]:
  data = df.agg({i:'mean'}).first()[0]
  print(f"Mean value for {i} is {int(data)}")

Mean value for Glucose is 121
Mean value for BloodPressure is 69
Mean value for SkinThickness is 20
Mean value for Insulin is 80
Mean value for BMI is 32


In [61]:
zero_cols = ['Glucose','BloodPressure','SkinThickness','Insulin','BMI']
for i in zero_cols:
  df = df.withColumn(i,when(col(i)==0,int(df.agg({i:'mean'}).first()[0])).otherwise(col(i)))

In [62]:
df.describe().show()

+-------+-----------------+------------------+------------------+------------------+-----------------+------------------+------------------------+------------------+------------------+
|summary|      Pregnancies|           Glucose|     BloodPressure|     SkinThickness|          Insulin|               BMI|DiabetesPedigreeFunction|               Age|           Outcome|
+-------+-----------------+------------------+------------------+------------------+-----------------+------------------+------------------------+------------------+------------------+
|  count|             2000|              2000|              2000|              2000|             2000|              2000|                    2000|              2000|              2000|
|   mean|           3.7035|           121.969|           72.2505|            26.665|          118.494|32.640999999999984|     0.47092999999999974|           33.0905|             0.342|
| stddev|3.306063032730656|30.533214334373536|11.970354817098098|10.0542189

# TASK 4: Correlation Analysis & Feature Selection

In [67]:
#find the correlation among the set of input & output variables
for i in df.columns:
  print(f"Correlation to outcome for {i} is {df.stat.corr('Outcome',i)}")

Correlation to outcome for Pregnancies is 0.22443699263363961
Correlation to outcome for Glucose is 0.48796646527321064
Correlation to outcome for BloodPressure is 0.17171333286446713
Correlation to outcome for SkinThickness is 0.1659010662889893
Correlation to outcome for Insulin is 0.1711763270226193
Correlation to outcome for BMI is 0.2827927569760082
Correlation to outcome for DiabetesPedigreeFunction is 0.1554590791569403
Correlation to outcome for Age is 0.23650924717620253
Correlation to outcome for Outcome is 1.0


IllegalArgumentException: requirement failed: Currently correlation calculation for columns with dataType struct<type:tinyint,size:int,indices:array<int>,values:array<double>> not supported.

In [70]:
output_var = ['Pregnancies','Glucose','BloodPressure','SkinThickness','Insulin','BMI','DiabetesPedigreeFunction','Age']

In [74]:
#feature selection
#since correlation is relatively low across all features let us take everthing into consideration for building the model
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols =output_var,outputCol='features')
df = assembler.transform(df)
df.show()

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+--------------------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|            features|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+--------------------+
|          2|    138|           62|           35|     80|33.6|                   0.127| 47|      1|[2.0,138.0,62.0,3...|
|          0|     84|           82|           31|    125|38.2|                   0.233| 23|      0|[0.0,84.0,82.0,31...|
|          0|    145|           69|           20|     80|44.2|                    0.63| 31|      1|[0.0,145.0,69.0,2...|
|          0|    135|           68|           42|    250|42.3|                   0.365| 24|      1|[0.0,135.0,68.0,4...|
|          1|    139|           62|           41|    480|40.7|                   0.536| 21|      0|[1.0,139.0,62.0,4...|
|          0|    173|           

In [75]:
#print the schema
df.printSchema()

root
 |-- Pregnancies: integer (nullable = true)
 |-- Glucose: integer (nullable = true)
 |-- BloodPressure: integer (nullable = true)
 |-- SkinThickness: integer (nullable = true)
 |-- Insulin: integer (nullable = true)
 |-- BMI: double (nullable = true)
 |-- DiabetesPedigreeFunction: double (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Outcome: integer (nullable = true)
 |-- features: vector (nullable = true)



In [None]:
#display dataframe

# TASK 5: Split Dataset & Build the Model

In [76]:
#create final data
from pyspark.ml.classification import LogisticRegression
final_data = df.select('features','Outcome')


In [None]:
#print schema of final data

In [88]:
#split the dataset ; build the model
train,test = final_data.randomSplit([0.7,0.3])
lr = LogisticRegression(labelCol='Outcome',featuresCol='features')
model = lr.fit(train)

In [89]:
#summary of the model
summary = model.summary

In [90]:
  summary.predictions.describe().show()

+-------+-------------------+------------------+
|summary|            Outcome|        prediction|
+-------+-------------------+------------------+
|  count|               1437|              1437|
|   mean| 0.3354210160055672|0.2567849686847599|
| stddev|0.47230180017588863|0.4370118419092189|
|    min|                0.0|               0.0|
|    max|                1.0|               1.0|
+-------+-------------------+------------------+



# TASK 6: Evaluate and Save the Model

In [93]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
predict_test = model.transform(test)
predict_test.select(['Outcome','prediction']).show(50)

+-------+----------+
|Outcome|prediction|
+-------+----------+
|      0|       0.0|
|      0|       0.0|
|      0|       0.0|
|      0|       0.0|
|      0|       0.0|
|      0|       0.0|
|      0|       0.0|
|      0|       0.0|
|      0|       0.0|
|      0|       0.0|
|      1|       0.0|
|      0|       0.0|
|      0|       0.0|
|      0|       0.0|
|      0|       0.0|
|      0|       0.0|
|      0|       0.0|
|      0|       0.0|
|      1|       0.0|
|      0|       0.0|
|      0|       0.0|
|      1|       0.0|
|      1|       0.0|
|      0|       0.0|
|      0|       0.0|
|      0|       0.0|
|      0|       0.0|
|      1|       0.0|
|      0|       0.0|
|      0|       0.0|
|      0|       0.0|
|      1|       0.0|
|      0|       0.0|
|      0|       0.0|
|      0|       0.0|
|      0|       0.0|
|      0|       0.0|
|      0|       0.0|
|      0|       0.0|
|      0|       0.0|
|      0|       0.0|
|      1|       0.0|
|      1|       0.0|
|      0|       0.0|
|      1|    

In [94]:
# evaluating the model
evaluator = BinaryClassificationEvaluator(rawPredictionCol='rawPrediction',labelCol='Outcome')
evaluator.evaluate(predict_test)

0.8409807739776748

In [95]:
# load saved model back to the environment
model.save("log_reg_diabetes")

# TASK 7: Prediction on New Data with the saved model

In [96]:
#Loading the model
from pyspark.ml.classification import LogisticRegressionModel
model = LogisticRegressionModel.load('log_reg_diabetes')

In [97]:
#create a new spark dataframe
path = '/content/diabetes_dataset/new_test.csv'
test_df = spark.read.csv(path,inferSchema=True,header = True)

In [98]:
#print the schema
test_df.printSchema()

root
 |-- Pregnancies: integer (nullable = true)
 |-- Glucose: integer (nullable = true)
 |-- BloodPressure: integer (nullable = true)
 |-- SkinThickness: integer (nullable = true)
 |-- Insulin: integer (nullable = true)
 |-- BMI: double (nullable = true)
 |-- DiabetesPedigreeFunction: double (nullable = true)
 |-- Age: integer (nullable = true)



In [99]:
#create an additional feature merged column
test_data = assembler.transform(test_df)
test_data.show()

+-----------+-------+-------------+-------------+-------+----+------------------------+---+--------------------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|            features|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+--------------------+
|          1|    190|           78|           38|    150|45.1|                   0.153| 48|[1.0,190.0,78.0,3...|
|          0|     80|           84|           36|    120|50.2|                   0.211| 26|[0.0,80.0,84.0,36...|
|          2|    138|           82|           46|    255|52.3|                   0.315| 30|[2.0,138.0,82.0,4...|
|          1|    110|           63|           44|    480|62.7|                   0.616| 32|[1.0,110.0,63.0,4...|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+--------------------+



In [100]:
#print the schema
test_data.printSchema()

root
 |-- Pregnancies: integer (nullable = true)
 |-- Glucose: integer (nullable = true)
 |-- BloodPressure: integer (nullable = true)
 |-- SkinThickness: integer (nullable = true)
 |-- Insulin: integer (nullable = true)
 |-- BMI: double (nullable = true)
 |-- DiabetesPedigreeFunction: double (nullable = true)
 |-- Age: integer (nullable = true)
 |-- features: vector (nullable = true)



In [101]:
#use model to make predictions
results = model.transform(test_data)
results.printSchema()

root
 |-- Pregnancies: integer (nullable = true)
 |-- Glucose: integer (nullable = true)
 |-- BloodPressure: integer (nullable = true)
 |-- SkinThickness: integer (nullable = true)
 |-- Insulin: integer (nullable = true)
 |-- BMI: double (nullable = true)
 |-- DiabetesPedigreeFunction: double (nullable = true)
 |-- Age: integer (nullable = true)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



In [103]:
#display the predictions
results.show()

+-----------+-------+-------------+-------------+-------+----+------------------------+---+--------------------+--------------------+--------------------+----------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|            features|       rawPrediction|         probability|prediction|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+--------------------+--------------------+--------------------+----------+
|          1|    190|           78|           38|    150|45.1|                   0.153| 48|[1.0,190.0,78.0,3...|[-2.0164062524367...|[0.11749110365714...|       1.0|
|          0|     80|           84|           36|    120|50.2|                   0.211| 26|[0.0,80.0,84.0,36...|[1.79655704821887...|[0.85772930982955...|       0.0|
|          2|    138|           82|           46|    255|52.3|                   0.315| 30|[2.0,138.0,82.0,4...|[-0.6362358559521...|[0.34609792423773...|       1.0|
|   