# TASK 1 : Install Dependencies & Run Spark Session

In [167]:
#install pyspark
! pip install pyspark



In [168]:
#create a sparksession
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('spark').getOrCreate()


# TASK 2: Clone & Explore dataset

In [169]:
#clone the diabetes dataset from the github repository
! git clone https://github.com/education454/diabetes_dataset

fatal: destination path 'diabetes_dataset' already exists and is not an empty directory.


In [170]:
#check if the dataset exists
! ls diabetes_dataset

diabetes.csv  new_test.csv


In [171]:
#create spark dataframe
df = spark.read.csv('/content/diabetes_dataset/diabetes.csv', header=True, inferSchema=True)


In [172]:
#display the dataframe
df.show()

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|          2|    138|           62|           35|      0|33.6|                   0.127| 47|      1|
|          0|     84|           82|           31|    125|38.2|                   0.233| 23|      0|
|          0|    145|            0|            0|      0|44.2|                    0.63| 31|      1|
|          0|    135|           68|           42|    250|42.3|                   0.365| 24|      1|
|          1|    139|           62|           41|    480|40.7|                   0.536| 21|      0|
|          0|    173|           78|           32|    265|46.5|                   1.159| 58|      0|
|          4|     99|           72|           17|      0|25.6|                   0.294| 28|      0|


In [173]:
#print the schema
df.printSchema()

root
 |-- Pregnancies: integer (nullable = true)
 |-- Glucose: integer (nullable = true)
 |-- BloodPressure: integer (nullable = true)
 |-- SkinThickness: integer (nullable = true)
 |-- Insulin: integer (nullable = true)
 |-- BMI: double (nullable = true)
 |-- DiabetesPedigreeFunction: double (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Outcome: integer (nullable = true)



In [174]:
#count the total no. of diabetic and non-diabetic class
print( df.count(), len(df.columns) )
print(df.groupBy('Outcome').count().show())

2000 9
+-------+-----+
|Outcome|count|
+-------+-----+
|      1|  684|
|      0| 1316|
+-------+-----+

None


In [175]:
#get the summary statistics
df.describe().show()

+-------+-----------------+------------------+------------------+-----------------+-----------------+------------------+------------------------+------------------+------------------+
|summary|      Pregnancies|           Glucose|     BloodPressure|    SkinThickness|          Insulin|               BMI|DiabetesPedigreeFunction|               Age|           Outcome|
+-------+-----------------+------------------+------------------+-----------------+-----------------+------------------+------------------------+------------------+------------------+
|  count|             2000|              2000|              2000|             2000|             2000|              2000|                    2000|              2000|              2000|
|   mean|           3.7035|          121.1825|           69.1455|           20.935|           80.254|32.192999999999984|     0.47092999999999974|           33.0905|             0.342|
| stddev|3.306063032730656|32.068635649902916|19.188314815604098|16.103242909926

# TASK 3: Data Cleaning & Preparation

In [176]:
#check for null values
for col in df.columns:
  print(col, ': ', df[df[col].isNull()].count() )


Pregnancies :  0
Glucose :  0
BloodPressure :  0
SkinThickness :  0
Insulin :  0
BMI :  0
DiabetesPedigreeFunction :  0
Age :  0
Outcome :  0


In [177]:
#look for the unnecessary values present

col_list = ['Glucose', 'BloodPressure', 'SkinThickness', 'Insulin', 'BMI']

for col in col_list:
  print(col, ": ", df[df[col] == 0].count() )

Glucose :  13
BloodPressure :  90
SkinThickness :  573
Insulin :  956
BMI :  28


In [178]:
#calculate and replace the unnecessary values by the mean value
from pyspark.sql.functions import *

for col in col_list:
  col_mean = df.agg({col: 'mean'}).first()[0]
  print(f"mean value for {col} : {col_mean}")

  df = df.withColumn( col, when(df[col]==0, int(col_mean)).otherwise(df[col]))


mean value for Glucose : 121.1825
mean value for BloodPressure : 69.1455
mean value for SkinThickness : 20.935
mean value for Insulin : 80.254
mean value for BMI : 32.192999999999984


In [179]:
#display the dataframe
df.show()

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|          2|    138|           62|           35|     80|33.6|                   0.127| 47|      1|
|          0|     84|           82|           31|    125|38.2|                   0.233| 23|      0|
|          0|    145|           69|           20|     80|44.2|                    0.63| 31|      1|
|          0|    135|           68|           42|    250|42.3|                   0.365| 24|      1|
|          1|    139|           62|           41|    480|40.7|                   0.536| 21|      0|
|          0|    173|           78|           32|    265|46.5|                   1.159| 58|      0|
|          4|     99|           72|           17|     80|25.6|                   0.294| 28|      0|


# TASK 4: Correlation Analysis & Feature Selection

In [180]:
#find the correlation among the set of input & output variables
for col in df.columns:
  print(f"correlation between {col} and Outcome: {df.stat.corr('Outcome', col)}")

correlation between Pregnancies and Outcome: 0.22443699263363961
correlation between Glucose and Outcome: 0.48796646527321064
correlation between BloodPressure and Outcome: 0.17171333286446713
correlation between SkinThickness and Outcome: 0.1659010662889893
correlation between Insulin and Outcome: 0.1711763270226193
correlation between BMI and Outcome: 0.2827927569760082
correlation between DiabetesPedigreeFunction and Outcome: 0.1554590791569403
correlation between Age and Outcome: 0.23650924717620253
correlation between Outcome and Outcome: 1.0


In [181]:
#feature selection
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols=df.columns, outputCol='features')
output_data = assembler.transform(df)

In [182]:
#print the schema
output_data.printSchema()

root
 |-- Pregnancies: integer (nullable = true)
 |-- Glucose: integer (nullable = true)
 |-- BloodPressure: integer (nullable = true)
 |-- SkinThickness: integer (nullable = true)
 |-- Insulin: integer (nullable = true)
 |-- BMI: double (nullable = true)
 |-- DiabetesPedigreeFunction: double (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Outcome: integer (nullable = true)
 |-- features: vector (nullable = true)



In [183]:
#display dataframe
output_data.show()

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+--------------------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|            features|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+--------------------+
|          2|    138|           62|           35|     80|33.6|                   0.127| 47|      1|[2.0,138.0,62.0,3...|
|          0|     84|           82|           31|    125|38.2|                   0.233| 23|      0|[0.0,84.0,82.0,31...|
|          0|    145|           69|           20|     80|44.2|                    0.63| 31|      1|[0.0,145.0,69.0,2...|
|          0|    135|           68|           42|    250|42.3|                   0.365| 24|      1|[0.0,135.0,68.0,4...|
|          1|    139|           62|           41|    480|40.7|                   0.536| 21|      0|[1.0,139.0,62.0,4...|
|          0|    173|           

# TASK 5: Split Dataset & Build the Model

In [184]:
#create final data
from pyspark.ml.classification import LogisticRegression

final_data = output_data.select('features', 'Outcome')


In [185]:
#print schema of final data
final_data.printSchema()

root
 |-- features: vector (nullable = true)
 |-- Outcome: integer (nullable = true)



In [186]:
#split the dataset ; build the model
train, test = final_data.randomSplit([0.7, 0.3])
clf = LogisticRegression(labelCol='Outcome')
model = clf.fit(train)

In [187]:
#summary of the model
summary = model.summary
summary.predictions.describe().show()

+-------+-------------------+-------------------+
|summary|            Outcome|         prediction|
+-------+-------------------+-------------------+
|  count|               1434|               1434|
|   mean|0.32496513249651326|0.32496513249651326|
| stddev|0.46852521230823363|0.46852521230823363|
|    min|                0.0|                0.0|
|    max|                1.0|                1.0|
+-------+-------------------+-------------------+



# TASK 6: Evaluate and Save the Model

In [188]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
pred = model.evaluate(test)

In [189]:
pred.predictions.show(20)

+--------------------+-------+--------------------+--------------------+----------+
|            features|Outcome|       rawPrediction|         probability|prediction|
+--------------------+-------+--------------------+--------------------+----------+
|[0.0,57.0,60.0,20...|      0|[20.0700973275015...|[0.99999999807838...|       0.0|
|[0.0,67.0,76.0,20...|      0|[19.6449019469560...|[0.99999999706013...|       0.0|
|[0.0,73.0,69.0,20...|      0|[20.2483480525545...|[0.99999999839211...|       0.0|
|[0.0,74.0,52.0,10...|      0|[20.2752633372681...|[0.99999999843481...|       0.0|
|[0.0,84.0,64.0,22...|      0|[19.7932146208917...|[0.99999999746536...|       0.0|
|[0.0,93.0,60.0,20...|      0|[19.7677745161188...|[0.99999999740005...|       0.0|
|[0.0,93.0,60.0,20...|      0|[19.7677745161188...|[0.99999999740005...|       0.0|
|[0.0,93.0,60.0,25...|      0|[19.8415536417678...|[0.99999999758496...|       0.0|
|[0.0,93.0,100.0,3...|      0|[18.9888422996185...|[0.99999999433433...|    

In [190]:
eval = BinaryClassificationEvaluator(rawPredictionCol='rawPrediction', labelCol='prediction')
eval.evaluate(model.transform(test))

1.0

In [192]:
# save model
model.save('model')

In [193]:
# load saved model back to the environment
from pyspark.ml.classification import LogisticRegressionModel

model = LogisticRegressionModel.load('/content/model')

# TASK 7: Prediction on New Data with the saved model

In [194]:
#create a new spark dataframe
new_data = spark.read.csv('/content/diabetes_dataset/new_test.csv', header=True, inferSchema=True)

In [195]:
#print the schema
new_data.printSchema()

root
 |-- Pregnancies: integer (nullable = true)
 |-- Glucose: integer (nullable = true)
 |-- BloodPressure: integer (nullable = true)
 |-- SkinThickness: integer (nullable = true)
 |-- Insulin: integer (nullable = true)
 |-- BMI: double (nullable = true)
 |-- DiabetesPedigreeFunction: double (nullable = true)
 |-- Age: integer (nullable = true)



In [196]:
print(new_data.count(), len(new_data.columns))

4 8


In [197]:
#create an additional feature merged column
new_df = assembler.transform(new_data)

IllegalArgumentException: Outcome does not exist. Available: Pregnancies, Glucose, BloodPressure, SkinThickness, Insulin, BMI, DiabetesPedigreeFunction, Age

In [None]:
#print the schema
new_df.printSchema()

In [None]:
#use model to make predictions
result = model.transform(new_df)
result.printSchema)

In [None]:
#display the predictions
result.select('features', 'prediction')