# Install Dependencies

In [1]:
! pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.2.tar.gz (317.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m4.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.2-py2.py3-none-any.whl size=317812365 sha256=4b0da805ca1c387279e71a648f770a340dad1ef70a83186846d5660d09591f7a
  Stored in directory: /root/.cache/pip/wheels/34/34/bd/03944534c44b677cd5859f248090daa9fb27b3c8f8e5f49574
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.2


# Run a SparkSession

In [2]:
from pyspark.sql import SparkSession

"""
SparkSession: he class we're using to create our Spark session.
.builder: builder pattern used to construct a SparkSession. It allows you to set various configuration options.
.appName("spark"): sets the name of your Spark application.
.getOrCreate(): either creates a new SparkSession, or returns an existing one. You typically only want one SparkSession per application.
"""
spark = SparkSession.builder.appName("spark").getOrCreate()

# Clone Diabetest Dataset

In [3]:
! git clone https://github.com/education454/diabetes_dataset

Cloning into 'diabetes_dataset'...
remote: Enumerating objects: 6, done.[K
remote: Counting objects: 100% (6/6), done.[K
remote: Compressing objects: 100% (5/5), done.[K
remote: Total 6 (delta 0), reused 0 (delta 0), pack-reused 0 (from 0)[K
Receiving objects: 100% (6/6), 13.02 KiB | 13.02 MiB/s, done.


In [4]:
! ls diabetes_dataset/

diabetes.csv  new_test.csv


In [5]:
df = spark.read.csv('/content/diabetes_dataset/diabetes.csv', header=True, inferSchema=True)

In [6]:
df.show()

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|          2|    138|           62|           35|      0|33.6|                   0.127| 47|      1|
|          0|     84|           82|           31|    125|38.2|                   0.233| 23|      0|
|          0|    145|            0|            0|      0|44.2|                    0.63| 31|      1|
|          0|    135|           68|           42|    250|42.3|                   0.365| 24|      1|
|          1|    139|           62|           41|    480|40.7|                   0.536| 21|      0|
|          0|    173|           78|           32|    265|46.5|                   1.159| 58|      0|
|          4|     99|           72|           17|      0|25.6|                   0.294| 28|      0|


In [7]:
df.printSchema()

root
 |-- Pregnancies: integer (nullable = true)
 |-- Glucose: integer (nullable = true)
 |-- BloodPressure: integer (nullable = true)
 |-- SkinThickness: integer (nullable = true)
 |-- Insulin: integer (nullable = true)
 |-- BMI: double (nullable = true)
 |-- DiabetesPedigreeFunction: double (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Outcome: integer (nullable = true)



In [10]:
print(df.count(), len(df.columns))

2000 9


In [11]:
df.groupBy('Outcome').count().show()

+-------+-----+
|Outcome|count|
+-------+-----+
|      1|  684|
|      0| 1316|
+-------+-----+



In [12]:
df.describe().show()

+-------+-----------------+------------------+------------------+-----------------+-----------------+------------------+------------------------+------------------+------------------+
|summary|      Pregnancies|           Glucose|     BloodPressure|    SkinThickness|          Insulin|               BMI|DiabetesPedigreeFunction|               Age|           Outcome|
+-------+-----------------+------------------+------------------+-----------------+-----------------+------------------+------------------------+------------------+------------------+
|  count|             2000|              2000|              2000|             2000|             2000|              2000|                    2000|              2000|              2000|
|   mean|           3.7035|          121.1825|           69.1455|           20.935|           80.254|32.192999999999984|     0.47092999999999974|           33.0905|             0.342|
| stddev|3.306063032730656|32.068635649902916|19.188314815604098|16.103242909926

# Cleaning Data

In [13]:
# find for null values
for col in df.columns:
  print(col + ":", df[df[col].isNull()].count()) # count null or missing value of each column in df

Pregnancies: 0
Glucose: 0
BloodPressure: 0
SkinThickness: 0
Insulin: 0
BMI: 0
DiabetesPedigreeFunction: 0
Age: 0
Outcome: 0


In [15]:
# find total number of 0 values in columns: Glucose, Bloodpressure, SkinThickness, Insulin and BMI
def count_zeros():
  column_list = ['Glucose', 'Bloodpressure', 'SkinThickness', 'Insulin', 'BMI']
  for i in column_list:
    print(i + ':', df[df[i] == 0].count())

In [16]:
count_zeros()

Glucose: 13
Bloodpressure: 90
SkinThickness: 573
Insulin: 956
BMI: 28


In [18]:
from pyspark.sql.functions import *
for i in df.columns[1:6]:
  data = df.agg({i:'mean'}).first()[0]
  print('Mean value for {} is {}'.format(i, int(data)))
  df = df.withColumn(i, when(df[i] == 0, int(data)).otherwise(df[i]))

Mean value for Glucose is 121
Mean value for BloodPressure is 69
Mean value for SkinThickness is 20
Mean value for Insulin is 80
Mean value for BMI is 32


In [19]:
df.show()

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|          2|    138|           62|           35|     80|33.6|                   0.127| 47|      1|
|          0|     84|           82|           31|    125|38.2|                   0.233| 23|      0|
|          0|    145|           69|           20|     80|44.2|                    0.63| 31|      1|
|          0|    135|           68|           42|    250|42.3|                   0.365| 24|      1|
|          1|    139|           62|           41|    480|40.7|                   0.536| 21|      0|
|          0|    173|           78|           32|    265|46.5|                   1.159| 58|      0|
|          4|     99|           72|           17|     80|25.6|                   0.294| 28|      0|


In [21]:
for col in df.columns:
  print("Correlation to outcome for {} is {}".format(col, df.stat.corr('Outcome', col)))

Correlation to outcome for Pregnancies is 0.22443699263363961
Correlation to outcome for Glucose is 0.48796646527321064
Correlation to outcome for BloodPressure is 0.17171333286446713
Correlation to outcome for SkinThickness is 0.1659010662889893
Correlation to outcome for Insulin is 0.1711763270226193
Correlation to outcome for BMI is 0.2827927569760082
Correlation to outcome for DiabetesPedigreeFunction is 0.1554590791569403
Correlation to outcome for Age is 0.23650924717620253
Correlation to outcome for Outcome is 1.0


In [25]:
from pyspark.ml.feature import VectorAssembler # a feature transformer that merges multiple columns into a vector column
assembler = VectorAssembler(inputCols = ['Pregnancies', 'Glucose', 'BloodPressure', 'SkinThickness', 'Insulin', 'BMI', 'DiabetesPedigreeFunction', 'Age'], outputCol='features')
output_data = assembler.transform(df)

In [26]:
output_data.printSchema()

root
 |-- Pregnancies: integer (nullable = true)
 |-- Glucose: integer (nullable = true)
 |-- BloodPressure: integer (nullable = true)
 |-- SkinThickness: integer (nullable = true)
 |-- Insulin: integer (nullable = true)
 |-- BMI: double (nullable = true)
 |-- DiabetesPedigreeFunction: double (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Outcome: integer (nullable = true)
 |-- features: vector (nullable = true)



In [27]:
output_data.show()

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+--------------------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|            features|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+--------------------+
|          2|    138|           62|           35|     80|33.6|                   0.127| 47|      1|[2.0,138.0,62.0,3...|
|          0|     84|           82|           31|    125|38.2|                   0.233| 23|      0|[0.0,84.0,82.0,31...|
|          0|    145|           69|           20|     80|44.2|                    0.63| 31|      1|[0.0,145.0,69.0,2...|
|          0|    135|           68|           42|    250|42.3|                   0.365| 24|      1|[0.0,135.0,68.0,4...|
|          1|    139|           62|           41|    480|40.7|                   0.536| 21|      0|[1.0,139.0,62.0,4...|
|          0|    173|           

In [28]:
from pyspark.ml.classification import LogisticRegression
final_data = output_data.select('features', 'Outcome')


In [29]:
final_data.printSchema()

root
 |-- features: vector (nullable = true)
 |-- Outcome: integer (nullable = true)



In [30]:
train, test = final_data.randomSplit([0.7, 0.3])
models = LogisticRegression(labelCol='Outcome')
model = models.fit(train)

In [31]:
summary = model.summary

In [32]:
summary.predictions.describe().show()

+-------+-------------------+-------------------+
|summary|            Outcome|         prediction|
+-------+-------------------+-------------------+
|  count|               1423|               1423|
|   mean|0.34012649332396344|0.26774420238931834|
| stddev| 0.4739180269038876|0.44293918179486763|
|    min|                0.0|                0.0|
|    max|                1.0|                1.0|
+-------+-------------------+-------------------+



# Evaluation & Test Model

In [33]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
predictions = model.evaluate(test)

In [34]:
predictions.predictions.show(10)

+--------------------+-------+--------------------+--------------------+----------+
|            features|Outcome|       rawPrediction|         probability|prediction|
+--------------------+-------+--------------------+--------------------+----------+
|[0.0,57.0,60.0,20...|      0|[4.40970130328403...|[0.98798725114188...|       0.0|
|[0.0,73.0,69.0,20...|      0|[4.34812126873711...|[0.98723399448201...|       0.0|
|[0.0,73.0,69.0,20...|      0|[4.34812126873711...|[0.98723399448201...|       0.0|
|[0.0,74.0,52.0,10...|      0|[3.81426511646464...|[0.97842196461101...|       0.0|
|[0.0,74.0,52.0,10...|      0|[3.81426511646464...|[0.97842196461101...|       0.0|
|[0.0,78.0,88.0,29...|      0|[2.75038375538202...|[0.93993501923035...|       0.0|
|[0.0,84.0,64.0,22...|      0|[2.55457336795204...|[0.92788015740649...|       0.0|
|[0.0,84.0,64.0,22...|      0|[2.55457336795204...|[0.92788015740649...|       0.0|
|[0.0,84.0,82.0,31...|      0|[2.66255453710941...|[0.93478057917706...|    

In [35]:
evaluator = BinaryClassificationEvaluator(rawPredictionCol='rawPrediction', labelCol='Outcome')
evaluator.evaluate(model.transform(test))

0.828037135278515

In [36]:
model.save('model')

In [None]:
from pyspark.ml.classification import LogisticRegressionModel
model = LogisticRegressionModel.load('model')