<a href="https://colab.research.google.com/github/darkfish619/Data-scrapping-Wrangling/blob/master/Diabetes_Prediction_with_Pyspark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# install pyspark

! pip install pyspark



Collecting pyspark
  Downloading pyspark-3.2.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 53 kB/s 
[?25hCollecting py4j==0.10.9.2
  Downloading py4j-0.10.9.2-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 68.4 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.0-py2.py3-none-any.whl size=281805912 sha256=75452b72bfc5dcf1580e855d5a3785d306509e8e48ce59f92e22b2eece01a74b
  Stored in directory: /root/.cache/pip/wheels/0b/de/d2/9be5d59d7331c6c2a7c1b6d1a4f463ce107332b1ecd4e80718
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.2 pyspark-3.2.0


In [2]:
# Create a sparksession

from pyspark.sql import SparkSession

sc = SparkSession.builder.appName('spark').getOrCreate()   # it get the adjusting spark session



In [3]:
# Task 2. Clone & Explore datset

In [4]:
# clone from github

! git clone https://github.com/education454/diabetes_dataset

Cloning into 'diabetes_dataset'...
remote: Enumerating objects: 6, done.[K
remote: Counting objects: 100% (6/6), done.[K
remote: Compressing objects: 100% (5/5), done.[K
remote: Total 6 (delta 0), reused 0 (delta 0), pack-reused 0[K
Unpacking objects: 100% (6/6), done.


In [6]:
# Check if the dataset exists

! ls diabetes_dataset

diabetes.csv  new_test.csv


In [8]:
# Create spark dataframe / Spark dataframe are immutable
## inferSchema automatically a=identify the data types

df = sc.read.csv("/content/diabetes_dataset/diabetes.csv", header=True, inferSchema=True)  
df.show()

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|          2|    138|           62|           35|      0|33.6|                   0.127| 47|      1|
|          0|     84|           82|           31|    125|38.2|                   0.233| 23|      0|
|          0|    145|            0|            0|      0|44.2|                    0.63| 31|      1|
|          0|    135|           68|           42|    250|42.3|                   0.365| 24|      1|
|          1|    139|           62|           41|    480|40.7|                   0.536| 21|      0|
|          0|    173|           78|           32|    265|46.5|                   1.159| 58|      0|
|          4|     99|           72|           17|      0|25.6|                   0.294| 28|      0|


In [9]:
# Print the schema

df.printSchema()

root
 |-- Pregnancies: integer (nullable = true)
 |-- Glucose: integer (nullable = true)
 |-- BloodPressure: integer (nullable = true)
 |-- SkinThickness: integer (nullable = true)
 |-- Insulin: integer (nullable = true)
 |-- BMI: double (nullable = true)
 |-- DiabetesPedigreeFunction: double (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Outcome: integer (nullable = true)



In [10]:
# Count total rows and column

print(df.count(), len(df.columns))

2000 9


In [11]:
# Total number of patients of diabetes==1 vs non diabetes==0

df.groupBy('Outcome').count().show()

+-------+-----+
|Outcome|count|
+-------+-----+
|      1|  684|
|      0| 1316|
+-------+-----+



In [12]:
# Get the stats

df.describe().show()

+-------+-----------------+------------------+------------------+-----------------+-----------------+------------------+------------------------+------------------+------------------+
|summary|      Pregnancies|           Glucose|     BloodPressure|    SkinThickness|          Insulin|               BMI|DiabetesPedigreeFunction|               Age|           Outcome|
+-------+-----------------+------------------+------------------+-----------------+-----------------+------------------+------------------------+------------------+------------------+
|  count|             2000|              2000|              2000|             2000|             2000|              2000|                    2000|              2000|              2000|
|   mean|           3.7035|          121.1825|           69.1455|           20.935|           80.254|32.192999999999984|     0.47092999999999974|           33.0905|             0.342|
| stddev|3.306063032730656|32.068635649902916|19.188314815604098|16.103242909926

In [13]:
# Task.3 Data cleaning & Preparation

In [14]:
# Check null values

for col in df.columns:
  print(col+ ":",df[df[col].isNull()].count())

Pregnancies: 0
Glucose: 0
BloodPressure: 0
SkinThickness: 0
Insulin: 0
BMI: 0
DiabetesPedigreeFunction: 0
Age: 0
Outcome: 0


In [15]:
# Look for the unnecessary values present

def count_zeros():
  columns_list = ['Glucose','BloodPressure','SkinThickness','Insulin','BMI']
  for i in columns_list:
    print(i+":",df[df[i]==0].count())  # total number of zeros present in above column

In [16]:
count_zeros()   # total of zero values in columns

Glucose: 13
BloodPressure: 90
SkinThickness: 573
Insulin: 956
BMI: 28


In [18]:
# As insulin have almost 50% zero value

# calculate and replace the zeros with the mean value

from pyspark.sql.functions import *    # import all

for i in df.columns[1:6]:    # from glucose to bmi
  data = df.agg({i:'mean'}).first()[0]    #agg fn use over each col and get the mean value
  print("mean value for {} is {}" .format(i, int(data)))

  # replace the 0's with the mean
  df = df.withColumn(i,when(df[i]==0,int(data)).otherwise(df[i]))  # if col value is 0 replace with mean otherwise leave it as it is

mean value for Glucose is 121
mean value for BloodPressure is 69
mean value for SkinThickness is 20
mean value for Insulin is 80
mean value for BMI is 32


In [19]:
# display the df
  # Now the zeros have benn replaced by the mean
df.show()


+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|          2|    138|           62|           35|     80|33.6|                   0.127| 47|      1|
|          0|     84|           82|           31|    125|38.2|                   0.233| 23|      0|
|          0|    145|           69|           20|     80|44.2|                    0.63| 31|      1|
|          0|    135|           68|           42|    250|42.3|                   0.365| 24|      1|
|          1|    139|           62|           41|    480|40.7|                   0.536| 21|      0|
|          0|    173|           78|           32|    265|46.5|                   1.159| 58|      0|
|          4|     99|           72|           17|     80|25.6|                   0.294| 28|      0|


In [20]:
#Task.4. Correlation Analysis & Feature Selection

In [23]:
# Find the correlation among the set of input & output variables
# iterate over each columns

for i in df.columns:
  print('Correlation to outcome for {} is {}'.format(i,df.stat.corr('Outcome',i)))

# First argumnets is input column which is outcome and the 2nd argumnets is the input columns which is i

Correlation to outcome for Pregnancies is 0.22443699263363961
Correlation to outcome for Glucose is 0.48796646527321064
Correlation to outcome for BloodPressure is 0.17171333286446713
Correlation to outcome for SkinThickness is 0.1659010662889893
Correlation to outcome for Insulin is 0.1711763270226193
Correlation to outcome for BMI is 0.2827927569760082
Correlation to outcome for DiabetesPedigreeFunction is 0.1554590791569403
Correlation to outcome for Age is 0.23650924717620253
Correlation to outcome for Outcome is 1.0


In [25]:
# As there are no highly correalted values so we will keep all the colmns

# feature selection

from pyspark.ml.feature import VectorAssembler   # We need to apply vectorassembler, which it is a feature transformer that merges 
                                                # multiple columns into a vector columns

assembler = VectorAssembler(inputCols=['Pregnancies','Glucose','BloodPressure','SkinThickness','Insulin','BMI','DiabetesPedigreeFunction','Age'],outputCol='features')

# We have converted all the columns into a single vector column  'features'
output_data = assembler.transform(df)

In [26]:
# Print the schema

output_data.printSchema()

root
 |-- Pregnancies: integer (nullable = true)
 |-- Glucose: integer (nullable = true)
 |-- BloodPressure: integer (nullable = true)
 |-- SkinThickness: integer (nullable = true)
 |-- Insulin: integer (nullable = true)
 |-- BMI: double (nullable = true)
 |-- DiabetesPedigreeFunction: double (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Outcome: integer (nullable = true)
 |-- features: vector (nullable = true)



In [27]:
# display output data

output_data.show()

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+--------------------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|            features|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+--------------------+
|          2|    138|           62|           35|     80|33.6|                   0.127| 47|      1|[2.0,138.0,62.0,3...|
|          0|     84|           82|           31|    125|38.2|                   0.233| 23|      0|[0.0,84.0,82.0,31...|
|          0|    145|           69|           20|     80|44.2|                    0.63| 31|      1|[0.0,145.0,69.0,2...|
|          0|    135|           68|           42|    250|42.3|                   0.365| 24|      1|[0.0,135.0,68.0,4...|
|          1|    139|           62|           41|    480|40.7|                   0.536| 21|      0|[1.0,139.0,62.0,4...|
|          0|    173|           

In [28]:
# Task.5. Split the datset & build the Model


In [29]:
# Create final data
## Logistic give highest accuracy for binary classification

from pyspark.ml.classification import LogisticRegression

final_data = output_data.select('features','outcome')  # selecting the input and output columns


In [30]:
# Print the schema

final_data.printSchema()

root
 |-- features: vector (nullable = true)
 |-- outcome: integer (nullable = true)



In [31]:
# Split the data into , build the model

train, test = final_data.randomSplit([0.7,0.3])     # 70:30 ration 

# create model

models = LogisticRegression(labelCol='outcome')  # outcome is label column

model = models.fit(train)   #train data has been created with final data with 70 with train and 30% with test

In [32]:
# Summary of the model

summary = model.summary

summary.predictions.describe().show()



+-------+-------------------+-------------------+
|summary|            outcome|         prediction|
+-------+-------------------+-------------------+
|  count|               1362|               1362|
|   mean|0.34801762114537443| 0.2709251101321586|
| stddev|0.47651660324132755|0.44460074995832793|
|    min|                0.0|                0.0|
|    max|                1.0|                1.0|
+-------+-------------------+-------------------+



In [33]:
# Task.6. Evaluate and Save the Model

In [34]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

predictions = model.evaluate(test)    # evalutiong with the test data

In [35]:
predictions.predictions.show(20)





+--------------------+-------+--------------------+--------------------+----------+
|            features|outcome|       rawPrediction|         probability|prediction|
+--------------------+-------+--------------------+--------------------+----------+
|[0.0,73.0,69.0,20...|      0|[4.36044238559781...|[0.98738834935813...|       0.0|
|[0.0,74.0,52.0,10...|      0|[3.87405602372724...|[0.97964883512597...|       0.0|
|[0.0,74.0,52.0,10...|      0|[3.87405602372724...|[0.97964883512597...|       0.0|
|[0.0,78.0,88.0,29...|      0|[2.79974842064439...|[0.94266222771291...|       0.0|
|[0.0,84.0,64.0,22...|      0|[2.58475159561349...|[0.92987374741549...|       0.0|
|[0.0,84.0,64.0,22...|      0|[2.58475159561349...|[0.92987374741549...|       0.0|
|[0.0,84.0,64.0,22...|      0|[2.58475159561349...|[0.92987374741549...|       0.0|
|[0.0,84.0,82.0,31...|      0|[2.75039641848450...|[0.93993573414941...|       0.0|
|[0.0,84.0,82.0,31...|      0|[2.75039641848450...|[0.93993573414941...|    

In [37]:
# Evaluate our model

evaluator = BinaryClassificationEvaluator(rawPredictionCol='rawPrediction',labelCol='outcome')
evaluator.evaluate(model.transform(test))     #Predicting on test data

0.8241544281263907

In [42]:
# Saving the model

model.save('diabetes_model')

In [43]:
 # Load savel model back to the enviornment

 from pyspark.ml.classification import LogisticRegressionModel

 model = LogisticRegressionModel.load('diabetes_model')

In [44]:
# Task.7. Prediction on New Data with the saved model

In [45]:
# Create a new dataframe

new_df = sc.read.csv('/content/diabetes_dataset/new_test.csv', header=True, inferSchema=True)


In [46]:
# Display the schema

new_df.printSchema()

root
 |-- Pregnancies: integer (nullable = true)
 |-- Glucose: integer (nullable = true)
 |-- BloodPressure: integer (nullable = true)
 |-- SkinThickness: integer (nullable = true)
 |-- Insulin: integer (nullable = true)
 |-- BMI: double (nullable = true)
 |-- DiabetesPedigreeFunction: double (nullable = true)
 |-- Age: integer (nullable = true)



In [47]:
# As we have similar features but we don't have the outcome column
# Create an additional feature merged column

test_data = assembler.transform(new_df)

In [48]:
#Print the Schema

test_data.printSchema()   #now we have an additional feature column

root
 |-- Pregnancies: integer (nullable = true)
 |-- Glucose: integer (nullable = true)
 |-- BloodPressure: integer (nullable = true)
 |-- SkinThickness: integer (nullable = true)
 |-- Insulin: integer (nullable = true)
 |-- BMI: double (nullable = true)
 |-- DiabetesPedigreeFunction: double (nullable = true)
 |-- Age: integer (nullable = true)
 |-- features: vector (nullable = true)



In [49]:
# use the model to make predictions

results = model.transform(test_data)
results.printSchema()

root
 |-- Pregnancies: integer (nullable = true)
 |-- Glucose: integer (nullable = true)
 |-- BloodPressure: integer (nullable = true)
 |-- SkinThickness: integer (nullable = true)
 |-- Insulin: integer (nullable = true)
 |-- BMI: double (nullable = true)
 |-- DiabetesPedigreeFunction: double (nullable = true)
 |-- Age: integer (nullable = true)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



In [50]:
# Display the predictions

results.select('features','prediction').show()

+--------------------+----------+
|            features|prediction|
+--------------------+----------+
|[1.0,190.0,78.0,3...|       1.0|
|[0.0,80.0,84.0,36...|       0.0|
|[2.0,138.0,82.0,4...|       1.0|
|[1.0,110.0,63.0,4...|       1.0|
+--------------------+----------+



In [51]:
# Show our model is able to make predictions on unlabeled data