In [8]:
# !pip install pyspark



In [1]:
import pyspark
from pyspark.sql import SparkSession
import pandas as pd

print('PySpark Version : ',pyspark.__version__)

PySpark Version :  3.4.0


In [2]:
# JAVA_HOME = 'C:\Program Files\Java\jdk1.8.0_201'
# PATH = %PATH%;C:\Program Files\Java\jdk1.8.0_201\bin

In [4]:
# Make spark session
spark = SparkSession.builder.appName("Diabetes classification").getOrCreate()
spark

# Import data with PySpark

In [5]:
# df = spark.read.csv('D:\Work\Documentation\PyCaret\Dataset\diabetes.csv')

In [63]:
df = spark.read.option('header', 'true').csv('Dataset\diabetes.csv', inferSchema=True)

In [64]:
df.show(10)

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|          6|    148|           72|           35|      0|33.6|                   0.627| 50|      1|
|          1|     85|           66|           29|      0|26.6|                   0.351| 31|      0|
|          8|    183|           64|            0|      0|23.3|                   0.672| 32|      1|
|          1|     89|           66|           23|     94|28.1|                   0.167| 21|      0|
|          0|    137|           40|           35|    168|43.1|                   2.288| 33|      1|
|          5|    116|           74|            0|      0|25.6|                   0.201| 30|      0|
|          3|     78|           50|           32|     88|31.0|                   0.248| 26|      1|


In [65]:
df.head(5)

[Row(Pregnancies=6, Glucose=148, BloodPressure=72, SkinThickness=35, Insulin=0, BMI=33.6, DiabetesPedigreeFunction=0.627, Age=50, Outcome=1),
 Row(Pregnancies=1, Glucose=85, BloodPressure=66, SkinThickness=29, Insulin=0, BMI=26.6, DiabetesPedigreeFunction=0.351, Age=31, Outcome=0),
 Row(Pregnancies=8, Glucose=183, BloodPressure=64, SkinThickness=0, Insulin=0, BMI=23.3, DiabetesPedigreeFunction=0.672, Age=32, Outcome=1),
 Row(Pregnancies=1, Glucose=89, BloodPressure=66, SkinThickness=23, Insulin=94, BMI=28.1, DiabetesPedigreeFunction=0.167, Age=21, Outcome=0),
 Row(Pregnancies=0, Glucose=137, BloodPressure=40, SkinThickness=35, Insulin=168, BMI=43.1, DiabetesPedigreeFunction=2.288, Age=33, Outcome=1)]

In [66]:
df.printSchema()

root
 |-- Pregnancies: integer (nullable = true)
 |-- Glucose: integer (nullable = true)
 |-- BloodPressure: integer (nullable = true)
 |-- SkinThickness: integer (nullable = true)
 |-- Insulin: integer (nullable = true)
 |-- BMI: double (nullable = true)
 |-- DiabetesPedigreeFunction: double (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Outcome: integer (nullable = true)



In [67]:
type(df)

pyspark.sql.dataframe.DataFrame

In [68]:
df.select(['Glucose', 'BloodPressure']).show(3)

+-------+-------------+
|Glucose|BloodPressure|
+-------+-------------+
|    148|           72|
|     85|           66|
|    183|           64|
+-------+-------------+
only showing top 3 rows



In [69]:
df.describe().show()

+-------+------------------+-----------------+------------------+------------------+------------------+------------------+------------------------+------------------+------------------+
|summary|       Pregnancies|          Glucose|     BloodPressure|     SkinThickness|           Insulin|               BMI|DiabetesPedigreeFunction|               Age|           Outcome|
+-------+------------------+-----------------+------------------+------------------+------------------+------------------+------------------------+------------------+------------------+
|  count|               768|              768|               768|               768|               768|               768|                     768|               768|               768|
|   mean|3.8450520833333335|     120.89453125|       69.10546875|20.536458333333332| 79.79947916666667|31.992578124999977|      0.4718763020833327|33.240885416666664|0.3489583333333333|
| stddev|  3.36957806269887|31.97261819513622|19.355807170644777|15.95

There is no missing value in the dataset, but we will delete 0 values in the Glucose, BloodPressure, SkinThickness, Insulin, and BMI columns

In [70]:
deleted_columns = ['Glucose', 'BloodPressure', 'SkinThickness', 'Insulin', 'BMI']
for columns in deleted_columns :
    df = df.filter(df[columns] != 0)

df.describe().show()

+-------+------------------+------------------+------------------+------------------+------------------+------------------+------------------------+------------------+-------------------+
|summary|       Pregnancies|           Glucose|     BloodPressure|     SkinThickness|           Insulin|               BMI|DiabetesPedigreeFunction|               Age|            Outcome|
+-------+------------------+------------------+------------------+------------------+------------------+------------------+------------------------+------------------+-------------------+
|  count|               392|               392|               392|               392|               392|               392|                     392|               392|                392|
|   mean|3.3010204081632653|122.62755102040816| 70.66326530612245|29.145408163265305|156.05612244897958|33.086224489795896|      0.5230459183673473|30.864795918367346|0.33163265306122447|
| stddev|   3.2114244562103|30.860780631724722|12.4960915649

In [71]:
df.groupBy('Outcome').mean()

DataFrame[Outcome: int, avg(Pregnancies): double, avg(Glucose): double, avg(BloodPressure): double, avg(SkinThickness): double, avg(Insulin): double, avg(BMI): double, avg(DiabetesPedigreeFunction): double, avg(Age): double, avg(Outcome): double]

In [72]:
type(df['Glucose'])

pyspark.sql.column.Column

**Train model**

In [95]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline

col = ['Pregnancies',  'Glucose', 'BloodPressure', 'SkinThickness', 'Insulin', 'BMI', 'DiabetesPedigreeFunction', 'Age']

# Index labels, adding metadata to the label column.
# labelIndexer = StringIndexer(inputCol='Outcome', 
#                              outputCol="indexedLabel").fit(df)

# Automatically identify categorical features, and index them.
# featureIndexer = VectorIndexer(inputCol=['Pregnancies',  'Glucose', 'BloodPressure', 'SkinThickness', 'Insulin', 'BMI', 'DiabetesPedigreeFunction', 'Age'], 
#                                outputCol="indexedFeatures").fit(df)

assembler = VectorAssembler(inputCols=col, outputCol="features")

# Split data to 75% training and 25% testing
train_data, test_data = df.randomSplit([0.75, 0.25], seed=42)

# Train random forest model
clf = RandomForestClassifier(featuresCol="features", labelCol="Outcome")

# Convert indexed labels back to original labels.
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel",
                               labels=labelIndexer.labels)

# Convert indexed labels back to original labels.
# labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel",
#                                labels=labelIndexer.labels)

# Chain indexers and forest in a Pipeline
pipeline = Pipeline(stages=[assembler, clf, labelConverter])

# Train model.  This also runs the indexers.
model = pipeline.fit(train_data)

In [96]:
# Make predictions.
predictions = model.transform(test_data)
predictions.select("predictedLabel", "Outcome", "features").show(5)



+--------------+-------+--------------------+
|predictedLabel|Outcome|            features|
+--------------+-------+--------------------+
|             0|      0|[0.0,84.0,64.0,22...|
|             0|      0|[0.0,93.0,100.0,3...|
|             0|      0|[0.0,95.0,64.0,39...|
|             0|      0|[0.0,95.0,80.0,45...|
|             0|      0|[0.0,100.0,70.0,2...|
+--------------+-------+--------------------+
only showing top 5 rows



In [97]:
evaluator = MulticlassClassificationEvaluator(
    labelCol="Outcome", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Accuracy = %g" % (accuracy*100))


Accuracy = 71.6216


In [98]:
rfModel = model.stages[2]
print(rfModel) 

IndexToString_9b961c1764d9
