In [57]:
!pip install pyspark
!pip install findspark
import findspark
findspark.init()
findspark.find()
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler
from pyspark.mllib.evaluation import BinaryClassificationMetrics
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import NaiveBayes



In [58]:
spark = SparkSession.builder.appName("Classification of Diabetis Dataset with Spark").getOrCreate()

In [59]:
dataset = spark.read.csv("diabetes.csv",header=True)

In [60]:
dataset.show()

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|          6|    148|           72|           35|      0|33.6|                   0.627| 50|      1|
|          1|     85|           66|           29|      0|26.6|                   0.351| 31|      0|
|          8|    183|           64|            0|      0|23.3|                   0.672| 32|      1|
|          1|     89|           66|           23|     94|28.1|                   0.167| 21|      0|
|          0|    137|           40|           35|    168|43.1|                   2.288| 33|      1|
|          5|    116|           74|            0|      0|25.6|                   0.201| 30|      0|
|          3|     78|           50|           32|     88|  31|                   0.248| 26|      1|


In [61]:
dataset.printSchema()

root
 |-- Pregnancies: string (nullable = true)
 |-- Glucose: string (nullable = true)
 |-- BloodPressure: string (nullable = true)
 |-- SkinThickness: string (nullable = true)
 |-- Insulin: string (nullable = true)
 |-- BMI: string (nullable = true)
 |-- DiabetesPedigreeFunction: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Outcome: string (nullable = true)



In [62]:
from pyspark.sql.functions import col
new_data = dataset.select(*(col(c).cast("float").alias(c) for c in dataset.columns))

In [63]:
new_data.printSchema()

root
 |-- Pregnancies: float (nullable = true)
 |-- Glucose: float (nullable = true)
 |-- BloodPressure: float (nullable = true)
 |-- SkinThickness: float (nullable = true)
 |-- Insulin: float (nullable = true)
 |-- BMI: float (nullable = true)
 |-- DiabetesPedigreeFunction: float (nullable = true)
 |-- Age: float (nullable = true)
 |-- Outcome: float (nullable = true)



In [64]:
from pyspark.sql.functions import col, count, isnan, when
#checking for null ir nan type vals
new_data.select([count(when(col(c).isNull(), c)).alias(c) for c in new_data.columns]).show()

+-----------+-------+-------------+-------------+-------+---+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin|BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+---+------------------------+---+-------+
|          0|      0|            0|            0|      0|  0|                       0|  0|      0|
+-----------+-------+-------------+-------------+-------+---+------------------------+---+-------+



In [65]:
cols=new_data.columns
cols.remove("Outcome")
assembler = VectorAssembler(inputCols=cols,outputCol="features")

In [66]:
# transform method to transform dataset
data=assembler.transform(new_data)
data.select("features",'Outcome').show(truncate=False)

+-----------------------------------------------------------------------+-------+
|features                                                               |Outcome|
+-----------------------------------------------------------------------+-------+
|[6.0,148.0,72.0,35.0,0.0,33.599998474121094,0.6269999742507935,50.0]   |1.0    |
|[1.0,85.0,66.0,29.0,0.0,26.600000381469727,0.35100001096725464,31.0]   |0.0    |
|[8.0,183.0,64.0,0.0,0.0,23.299999237060547,0.671999990940094,32.0]     |1.0    |
|[1.0,89.0,66.0,23.0,94.0,28.100000381469727,0.16699999570846558,21.0]  |0.0    |
|[0.0,137.0,40.0,35.0,168.0,43.099998474121094,2.2880001068115234,33.0] |1.0    |
|[5.0,116.0,74.0,0.0,0.0,25.600000381469727,0.20100000500679016,30.0]   |0.0    |
|[3.0,78.0,50.0,32.0,88.0,31.0,0.24799999594688416,26.0]                |1.0    |
|[10.0,115.0,0.0,0.0,0.0,35.29999923706055,0.1340000033378601,29.0]     |0.0    |
|[2.0,197.0,70.0,45.0,543.0,30.5,0.15800000727176666,53.0]              |1.0    |
|[8.0,125.0,96.0

In [67]:
standardscaler=StandardScaler().setInputCol("features").setOutputCol("Scaled_features")
data=standardscaler.fit(data).transform(data)

In [68]:
data.select("features",'Outcome','Scaled_features').show(truncate=False)

+-----------------------------------------------------------------------+-------+---------------------------------------------------------------------------------------------------------------------------------------------------------+
|features                                                               |Outcome|Scaled_features                                                                                                                                          |
+-----------------------------------------------------------------------+-------+---------------------------------------------------------------------------------------------------------------------------------------------------------+
|[6.0,148.0,72.0,35.0,0.0,33.599998474121094,0.6269999742507935,50.0]   |1.0    |[1.7806383732194306,4.628960915766174,3.7198138711154307,2.1940523222807116,0.0,4.261709202425419,1.8923810993699686,4.251616970894646]                  |
|[1.0,85.0,66.0,29.0,0.0,26.600000381469727,0.3510000109

In [69]:
assembled_data = data.select("Scaled_features","Outcome")
assembled_data.show()

+--------------------+-------+
|     Scaled_features|Outcome|
+--------------------+-------+
|[1.78063837321943...|    1.0|
|[0.29677306220323...|    0.0|
|[2.37418449762590...|    1.0|
|[0.29677306220323...|    0.0|
|[0.0,4.2849165233...|    1.0|
|[1.48386531101619...|    0.0|
|[0.89031918660971...|    1.0|
|[2.96773062203238...|    0.0|
|[0.59354612440647...|    1.0|
|[2.37418449762590...|    1.0|
|[1.18709224881295...|    0.0|
|[2.96773062203238...|    1.0|
|[2.96773062203238...|    0.0|
|[0.29677306220323...|    1.0|
|[1.48386531101619...|    1.0|
|[2.07741143542266...|    1.0|
|[0.0,3.6906580274...|    1.0|
|[2.07741143542266...|    1.0|
|[0.29677306220323...|    0.0|
|[0.29677306220323...|    1.0|
+--------------------+-------+
only showing top 20 rows



In [70]:
train, test = assembled_data.randomSplit([0.7, 0.3])

In [71]:
train.show()

+--------------------+-------+
|     Scaled_features|Outcome|
+--------------------+-------+
|(8,[0,1,6,7],[0.5...|    0.0|
|(8,[0,1,6,7],[0.5...|    0.0|
|(8,[0,1,6,7],[1.7...|    0.0|
|(8,[0,1,6,7],[2.0...|    0.0|
|(8,[0,1,6,7],[2.9...|    1.0|
|(8,[1,5,6,7],[2.2...|    0.0|
|(8,[1,5,6,7],[3.6...|    0.0|
|(8,[1,5,6,7],[3.7...|    1.0|
|(8,[1,5,6,7],[4.3...|    1.0|
|(8,[1,5,6,7],[4.4...|    1.0|
|(8,[1,5,6,7],[4.5...|    1.0|
|[0.0,1.7827754878...|    0.0|
|[0.0,2.0955431172...|    0.0|
|[0.0,2.4395875096...|    0.0|
|[0.0,2.6272480873...|    0.0|
|[0.0,2.6898016132...|    0.0|
|[0.0,2.8461854279...|    0.0|
|[0.0,2.8461854279...|    0.0|
|[0.0,2.9087389538...|    0.0|
|[0.0,2.9087389538...|    0.0|
+--------------------+-------+
only showing top 20 rows



In [72]:
test.show()

+--------------------+-------+
|     Scaled_features|Outcome|
+--------------------+-------+
|(8,[0,1,6,7],[0.8...|    0.0|
|(8,[1,5,6,7],[3.0...|    0.0|
|(8,[1,5,6,7],[4.0...|    1.0|
|(8,[1,5,6,7],[5.2...|    1.0|
|(8,[1,6,7],[2.940...|    0.0|
|[0.0,2.3144804578...|    0.0|
|[0.0,2.6272480873...|    0.0|
|[0.0,2.9712924797...|    0.0|
|[0.0,3.0338460056...|    0.0|
|[0.0,3.0651227685...|    0.0|
|[0.0,3.1276762944...|    0.0|
|[0.0,3.1589530573...|    0.0|
|[0.0,3.1902298203...|    0.0|
|[0.0,3.1902298203...|    0.0|
|[0.0,3.2527833462...|    0.0|
|[0.0,3.2527833462...|    1.0|
|[0.0,3.3466136350...|    0.0|
|[0.0,3.3466136350...|    1.0|
|[0.0,3.3778903979...|    0.0|
|[0.0,3.5342742127...|    1.0|
+--------------------+-------+
only showing top 20 rows



## Logistic Regression

In [73]:
log_reg = LogisticRegression(labelCol="Outcome", featuresCol="Scaled_features",maxIter=40)
model=log_reg.fit(train)

In [74]:
prediction_test=model.transform(test)

In [75]:
prediction_test.show()

+--------------------+-------+--------------------+--------------------+----------+
|     Scaled_features|Outcome|       rawPrediction|         probability|prediction|
+--------------------+-------+--------------------+--------------------+----------+
|(8,[0,1,6,7],[0.8...|    0.0|[4.24279922245683...|[0.98583617795574...|       0.0|
|(8,[1,5,6,7],[3.0...|    0.0|[2.24338928345768...|[0.90407878090324...|       0.0|
|(8,[1,5,6,7],[4.0...|    1.0|[-0.1689812582506...|[0.45785492428771...|       1.0|
|(8,[1,5,6,7],[5.2...|    1.0|[-1.3358261538824...|[0.20819728399933...|       1.0|
|(8,[1,6,7],[2.940...|    0.0|[4.04608549830297...|[0.98280995779903...|       0.0|
|[0.0,2.3144804578...|    0.0|[3.52820533082519...|[0.97147972970657...|       0.0|
|[0.0,2.6272480873...|    0.0|[2.47637893966886...|[0.92246921635009...|       0.0|
|[0.0,2.9712924797...|    0.0|[2.25004399122462...|[0.90465432962595...|       0.0|
|[0.0,3.0338460056...|    0.0|[1.80897382974160...|[0.85923780656902...|    

In [76]:
prediction_test.select("Outcome","prediction").show(10)

+-------+----------+
|Outcome|prediction|
+-------+----------+
|    0.0|       0.0|
|    0.0|       0.0|
|    1.0|       1.0|
|    1.0|       1.0|
|    0.0|       0.0|
|    0.0|       0.0|
|    0.0|       0.0|
|    0.0|       0.0|
|    0.0|       0.0|
|    0.0|       0.0|
+-------+----------+
only showing top 10 rows



In [77]:
# Compute raw scores on the test set
prednLabels = prediction_test.select("Outcome","prediction").rdd

In [78]:
prednLabels.collect()

[Row(Outcome=0.0, prediction=0.0),
 Row(Outcome=0.0, prediction=0.0),
 Row(Outcome=1.0, prediction=1.0),
 Row(Outcome=1.0, prediction=1.0),
 Row(Outcome=0.0, prediction=0.0),
 Row(Outcome=0.0, prediction=0.0),
 Row(Outcome=0.0, prediction=0.0),
 Row(Outcome=0.0, prediction=0.0),
 Row(Outcome=0.0, prediction=0.0),
 Row(Outcome=0.0, prediction=0.0),
 Row(Outcome=0.0, prediction=0.0),
 Row(Outcome=0.0, prediction=0.0),
 Row(Outcome=0.0, prediction=0.0),
 Row(Outcome=0.0, prediction=0.0),
 Row(Outcome=0.0, prediction=0.0),
 Row(Outcome=1.0, prediction=0.0),
 Row(Outcome=0.0, prediction=0.0),
 Row(Outcome=1.0, prediction=0.0),
 Row(Outcome=0.0, prediction=0.0),
 Row(Outcome=1.0, prediction=0.0),
 Row(Outcome=0.0, prediction=0.0),
 Row(Outcome=1.0, prediction=0.0),
 Row(Outcome=0.0, prediction=0.0),
 Row(Outcome=0.0, prediction=0.0),
 Row(Outcome=0.0, prediction=0.0),
 Row(Outcome=0.0, prediction=0.0),
 Row(Outcome=1.0, prediction=1.0),
 Row(Outcome=0.0, prediction=0.0),
 Row(Outcome=0.0, pr

In [79]:
mtrcs = BinaryClassificationMetrics(prednLabels)

# AUC
print("Area under ROC = %s" % mtrcs.areaUnderROC)

Area under ROC = 0.7806060606060606


In [80]:
evaluator = MulticlassClassificationEvaluator(labelCol="Outcome", predictionCol="prediction", metricName="accuracy")
acc_LR = evaluator.evaluate(prediction_test)
print ("Accuracy = " ,acc_LR)

Accuracy =  0.7777777777777778


## NaiveBayes

In [81]:
naive_bayes = NaiveBayes(featuresCol='Scaled_features',labelCol='Outcome',smoothing=1.0)

In [82]:
model = naive_bayes.fit(train)

In [83]:
# select example rows to display.
prediction_test = model.transform(test)

In [84]:
prediction_test.show()

+--------------------+-------+--------------------+--------------------+----------+
|     Scaled_features|Outcome|       rawPrediction|         probability|prediction|
+--------------------+-------+--------------------+--------------------+----------+
|(8,[0,1,6,7],[0.8...|    0.0|[-11.957858948040...|[0.58524347464391...|       0.0|
|(8,[1,5,6,7],[3.0...|    0.0|[-15.767686404680...|[0.64824948392723...|       0.0|
|(8,[1,5,6,7],[4.0...|    1.0|[-21.646189446351...|[0.65925242334085...|       0.0|
|(8,[1,5,6,7],[5.2...|    1.0|[-26.590028659131...|[0.58441608992677...|       0.0|
|(8,[1,6,7],[2.940...|    0.0|[-11.255421224969...|[0.61439097940903...|       0.0|
|[0.0,2.3144804578...|    0.0|[-22.217687115413...|[0.73321016257367...|       0.0|
|[0.0,2.6272480873...|    0.0|[-30.180895122098...|[0.73144667999712...|       0.0|
|[0.0,2.9712924797...|    0.0|[-35.902388395634...|[0.76149512243939...|       0.0|
|[0.0,3.0338460056...|    0.0|[-35.453665987275...|[0.71612846879933...|    

In [85]:
prediction_test.select("Outcome","prediction").show(10)

+-------+----------+
|Outcome|prediction|
+-------+----------+
|    0.0|       0.0|
|    0.0|       0.0|
|    1.0|       0.0|
|    1.0|       0.0|
|    0.0|       0.0|
|    0.0|       0.0|
|    0.0|       0.0|
|    0.0|       0.0|
|    0.0|       0.0|
|    0.0|       0.0|
+-------+----------+
only showing top 10 rows



In [86]:
prednLabels = prediction_test.select("Outcome","prediction").rdd

In [87]:
# computing test error
evaluator = MulticlassClassificationEvaluator(labelCol="Outcome", predictionCol="prediction", metricName="accuracy")
acc_NB = evaluator.evaluate(prediction_test)

In [88]:
print ("Accuracy:",acc_NB)

Accuracy: 0.6203703703703703


In [89]:
metrics = BinaryClassificationMetrics(prednLabels)

# AUC
print("Area under ROC = %s" % metrics.areaUnderROC)

Area under ROC = 0.7238095238095239


## Random Forest Classifier

In [90]:
random_forest_classifier = RandomForestClassifier(labelCol="Outcome", featuresCol="Scaled_features", numTrees=40)

In [91]:
model = random_forest_classifier.fit(train)

In [92]:
prediction_test = model.transform(test)

In [93]:
prediction_test.show()

+--------------------+-------+--------------------+--------------------+----------+
|     Scaled_features|Outcome|       rawPrediction|         probability|prediction|
+--------------------+-------+--------------------+--------------------+----------+
|(8,[0,1,6,7],[0.8...|    0.0|[38.5180006019978...|[0.96295001504994...|       0.0|
|(8,[1,5,6,7],[3.0...|    0.0|[38.5688287802784...|[0.96422071950696...|       0.0|
|(8,[1,5,6,7],[4.0...|    1.0|[17.2453017387244...|[0.43113254346811...|       1.0|
|(8,[1,5,6,7],[5.2...|    1.0|[7.57086684038422...|[0.18927167100960...|       1.0|
|(8,[1,6,7],[2.940...|    0.0|[38.6303850206482...|[0.96575962551620...|       0.0|
|[0.0,2.3144804578...|    0.0|[38.3098106053958...|[0.95774526513489...|       0.0|
|[0.0,2.6272480873...|    0.0|[37.1046860363802...|[0.92761715090950...|       0.0|
|[0.0,2.9712924797...|    0.0|[35.9613671862257...|[0.89903417965564...|       0.0|
|[0.0,3.0338460056...|    0.0|[36.8295729335206...|[0.92073932333801...|    

In [94]:
prediction_test.select("prediction","Outcome").show(10)

+----------+-------+
|prediction|Outcome|
+----------+-------+
|       0.0|    0.0|
|       0.0|    0.0|
|       1.0|    1.0|
|       1.0|    1.0|
|       0.0|    0.0|
|       0.0|    0.0|
|       0.0|    0.0|
|       0.0|    0.0|
|       0.0|    0.0|
|       0.0|    0.0|
+----------+-------+
only showing top 10 rows



In [95]:
prednLabels = prediction_test.select("Outcome","prediction").rdd

In [96]:
mtrcs = BinaryClassificationMetrics(prednLabels)
# AUC
print("Area under ROC = %s" % mtrcs.areaUnderROC)

Area under ROC = 0.793707033315706


In [97]:
# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator( labelCol="Outcome", predictionCol="prediction", metricName="accuracy")
acc_RF= evaluator.evaluate(prediction_test)

In [98]:
print ("Accuracy",accuracy_RF)

Accuracy 0.7782805429864253


#Gradient Boosting Classifier

In [99]:
gradient_boost_class = GBTClassifier(labelCol="Outcome", featuresCol="Scaled_features")

In [100]:
model = gradient_boost_class.fit(train)

In [101]:
prediction_test = model.transform(test)

In [102]:
prediction_test.show()

+--------------------+-------+--------------------+--------------------+----------+
|     Scaled_features|Outcome|       rawPrediction|         probability|prediction|
+--------------------+-------+--------------------+--------------------+----------+
|(8,[0,1,6,7],[0.8...|    0.0|[1.47093762589766...|[0.94987808236303...|       0.0|
|(8,[1,5,6,7],[3.0...|    0.0|[1.47093762589766...|[0.94987808236303...|       0.0|
|(8,[1,5,6,7],[4.0...|    1.0|[-0.3740370057387...|[0.32124110882603...|       1.0|
|(8,[1,5,6,7],[5.2...|    1.0|[-1.6332364020929...|[0.03673945078387...|       1.0|
|(8,[1,6,7],[2.940...|    0.0|[1.47093762589766...|[0.94987808236303...|       0.0|
|[0.0,2.3144804578...|    0.0|[1.28453187287597...|[0.92884384094078...|       0.0|
|[0.0,2.6272480873...|    0.0|[1.48854807207414...|[0.95152861537456...|       0.0|
|[0.0,2.9712924797...|    0.0|[1.38821115740729...|[0.94138835239435...|       0.0|
|[0.0,3.0338460056...|    0.0|[1.40843782490981...|[0.94358096908830...|    

In [103]:
prediction_test.select("Outcome","prediction").show(10)

+-------+----------+
|Outcome|prediction|
+-------+----------+
|    0.0|       0.0|
|    0.0|       0.0|
|    1.0|       1.0|
|    1.0|       1.0|
|    0.0|       0.0|
|    0.0|       0.0|
|    0.0|       0.0|
|    0.0|       0.0|
|    0.0|       0.0|
|    0.0|       0.0|
+-------+----------+
only showing top 10 rows



In [104]:
prednLabels = prediction_test.select("Outcome","prediction").rdd

In [105]:
mtrcs = BinaryClassificationMetrics(prednLabels)

# AUC
print("Area under ROC = %s" % mtrcs.areaUnderROC)

Area under ROC = 0.709055394170089


In [106]:
# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator( labelCol="Outcome", predictionCol="prediction", metricName="accuracy")
acc_GBT = evaluator.evaluate(prediction_test)

In [107]:
print ("Accuracy",acc_GBT)

Accuracy 0.7175925925925926


In [109]:
print("Accuracy of Naive Bayes classifier: ",acc_NB)
print("Accuracy of Random Forest classifier: ",acc_RF)
print("Accuracy of Gradient Boosting classifier: ",acc_GBT)
print("Accuracy of Logistic Regression : ",acc_LR)

Accuracy of Naive Bayes classifier:  0.6203703703703703
Accuracy of Random Forest classifier:  0.7824074074074074
Accuracy of Gradient Boosting classifier:  0.7175925925925926
Accuracy of Logistic Regression :  0.7777777777777778
