In [1]:
import findspark
findspark.init()
findspark.find()

'C:\\bin\\spark-3.3.0-bin-hadoop2'

In [2]:
from pyspark import SparkContext, SparkConf

In [3]:
conf=SparkConf().setAppName("practice").setMaster("local")
sc=SparkContext(conf=conf)

In [4]:
from pyspark.sql import SparkSession 
from pyspark.ml.feature import VectorAssembler

In [5]:
from pyspark.ml.feature import StandardScaler
from pyspark.ml.feature import StringIndexer

from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.classification import RandomForestClassifier

In [6]:
from pyspark.mllib.evaluation import BinaryClassificationMetrics
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [7]:
spark = SparkSession.builder.appName("classification ML algrithms").getOrCreate()

In [8]:
data = spark.read.csv('diabetes.csv',header=True)

In [9]:
data.show()

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|          6|    148|           72|           35|      0|33.6|                   0.627| 50|      1|
|          1|     85|           66|           29|      0|26.6|                   0.351| 31|      0|
|          8|    183|           64|            0|      0|23.3|                   0.672| 32|      1|
|          1|     89|           66|           23|     94|28.1|                   0.167| 21|      0|
|          0|    137|           40|           35|    168|43.1|                   2.288| 33|      1|
|          5|    116|           74|            0|      0|25.6|                   0.201| 30|      0|
|          3|     78|           50|           32|     88|  31|                   0.248| 26|      1|


In [10]:
data.printSchema()

root
 |-- Pregnancies: string (nullable = true)
 |-- Glucose: string (nullable = true)
 |-- BloodPressure: string (nullable = true)
 |-- SkinThickness: string (nullable = true)
 |-- Insulin: string (nullable = true)
 |-- BMI: string (nullable = true)
 |-- DiabetesPedigreeFunction: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Outcome: string (nullable = true)



In [11]:
from pyspark.sql.functions import col

In [12]:
new_data = data.select(*(col(c).cast("float").alias(c) for c in data.columns))

In [13]:
new_data.printSchema()

root
 |-- Pregnancies: float (nullable = true)
 |-- Glucose: float (nullable = true)
 |-- BloodPressure: float (nullable = true)
 |-- SkinThickness: float (nullable = true)
 |-- Insulin: float (nullable = true)
 |-- BMI: float (nullable = true)
 |-- DiabetesPedigreeFunction: float (nullable = true)
 |-- Age: float (nullable = true)
 |-- Outcome: float (nullable = true)



In [14]:
# check for missing values

from pyspark.sql.functions import col, count, isnan,when

In [15]:
new_data.select([count(when(col(c).isNull(), c)).alias(c) for c in new_data.columns]).show()

+-----------+-------+-------------+-------------+-------+---+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin|BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+---+------------------------+---+-------+
|          0|      0|            0|            0|      0|  0|                       0|  0|      0|
+-----------+-------+-------------+-------------+-------+---+------------------------+---+-------+



In [17]:
cols = new_data.columns
cols.remove("Outcome")
assembler =  VectorAssembler(inputCols=cols, outputCol="features")

#Now let us use transform to transform our dataset

data = assembler.transform(new_data)

data.select("features",'Outcome').show(truncate=False)

+-----------------------------------------------------------------------+-------+
|features                                                               |Outcome|
+-----------------------------------------------------------------------+-------+
|[6.0,148.0,72.0,35.0,0.0,33.599998474121094,0.6269999742507935,50.0]   |1.0    |
|[1.0,85.0,66.0,29.0,0.0,26.600000381469727,0.35100001096725464,31.0]   |0.0    |
|[8.0,183.0,64.0,0.0,0.0,23.299999237060547,0.671999990940094,32.0]     |1.0    |
|[1.0,89.0,66.0,23.0,94.0,28.100000381469727,0.16699999570846558,21.0]  |0.0    |
|[0.0,137.0,40.0,35.0,168.0,43.099998474121094,2.2880001068115234,33.0] |1.0    |
|[5.0,116.0,74.0,0.0,0.0,25.600000381469727,0.20100000500679016,30.0]   |0.0    |
|[3.0,78.0,50.0,32.0,88.0,31.0,0.24799999594688416,26.0]                |1.0    |
|[10.0,115.0,0.0,0.0,0.0,35.29999923706055,0.1340000033378601,29.0]     |0.0    |
|[2.0,197.0,70.0,45.0,543.0,30.5,0.15800000727176666,53.0]              |1.0    |
|[8.0,125.0,96.0

In [18]:
sdsc = StandardScaler().setInputCol("features").setOutputCol("Scaled_features")

data = sdsc.fit(data).transform(data)

In [19]:
assembled_data= data.select("Scaled_features","Outcome")
assembled_data.show()

+--------------------+-------+
|     Scaled_features|Outcome|
+--------------------+-------+
|[1.78063837321943...|    1.0|
|[0.29677306220323...|    0.0|
|[2.37418449762590...|    1.0|
|[0.29677306220323...|    0.0|
|[0.0,4.2849165233...|    1.0|
|[1.48386531101619...|    0.0|
|[0.89031918660971...|    1.0|
|[2.96773062203238...|    0.0|
|[0.59354612440647...|    1.0|
|[2.37418449762590...|    1.0|
|[1.18709224881295...|    0.0|
|[2.96773062203238...|    1.0|
|[2.96773062203238...|    0.0|
|[0.29677306220323...|    1.0|
|[1.48386531101619...|    1.0|
|[2.07741143542266...|    1.0|
|[0.0,3.6906580274...|    1.0|
|[2.07741143542266...|    1.0|
|[0.29677306220323...|    0.0|
|[0.29677306220323...|    1.0|
+--------------------+-------+
only showing top 20 rows



In [20]:
train, test = assembled_data.randomSplit([0.7,0.3])

In [23]:
train.show()

+--------------------+-------+
|     Scaled_features|Outcome|
+--------------------+-------+
|(8,[0,1,6,7],[0.5...|    0.0|
|(8,[0,1,6,7],[0.5...|    0.0|
|(8,[0,1,6,7],[0.8...|    0.0|
|(8,[0,1,6,7],[1.7...|    0.0|
|(8,[1,5,6,7],[2.2...|    0.0|
|(8,[1,5,6,7],[3.6...|    0.0|
|(8,[1,5,6,7],[4.3...|    1.0|
|(8,[1,5,6,7],[4.4...|    1.0|
|(8,[1,6,7],[2.940...|    0.0|
|[0.0,1.7827754878...|    0.0|
|[0.0,2.0955431172...|    0.0|
|[0.0,2.3144804578...|    0.0|
|[0.0,2.4395875096...|    0.0|
|[0.0,2.6272480873...|    0.0|
|[0.0,2.6272480873...|    0.0|
|[0.0,2.6898016132...|    0.0|
|[0.0,2.8461854279...|    0.0|
|[0.0,2.8461854279...|    0.0|
|[0.0,2.9087389538...|    0.0|
|[0.0,2.9087389538...|    0.0|
+--------------------+-------+
only showing top 20 rows



# Logistic Regression

In [25]:
log_reg = LogisticRegression(labelCol = 'Outcome', featuresCol="Scaled_features",maxIter=40)
model = log_reg.fit(train)

In [26]:
prediction_test = model.transform(test)

In [27]:
prediction_test.show()

+--------------------+-------+--------------------+--------------------+----------+
|     Scaled_features|Outcome|       rawPrediction|         probability|prediction|
+--------------------+-------+--------------------+--------------------+----------+
|(8,[0,1,6,7],[2.0...|    0.0|[3.46783269009572...|[0.96975852263817...|       0.0|
|(8,[0,1,6,7],[2.9...|    1.0|[2.65713446288194...|[0.93444936003442...|       0.0|
|(8,[1,5,6,7],[3.0...|    0.0|[2.35799433919003...|[0.91356756608827...|       0.0|
|(8,[1,5,6,7],[3.7...|    1.0|[0.99023415012180...|[0.72913416890163...|       0.0|
|(8,[1,5,6,7],[4.0...|    1.0|[-0.6250918262604...|[0.34862428264174...|       1.0|
|(8,[1,5,6,7],[4.5...|    1.0|[-1.6756624900918...|[0.15767068133191...|       1.0|
|(8,[1,5,6,7],[5.2...|    1.0|[-1.5991490172416...|[0.16810058505013...|       1.0|
|[0.0,2.9087389538...|    0.0|[1.34858654078930...|[0.79389844884420...|       0.0|
|[0.0,2.9712924797...|    0.0|[1.57195542944224...|[0.82806219154416...|    

In [28]:
prediction_test.select("Outcome","prediction").show(10)

+-------+----------+
|Outcome|prediction|
+-------+----------+
|    0.0|       0.0|
|    1.0|       0.0|
|    0.0|       0.0|
|    1.0|       0.0|
|    1.0|       1.0|
|    1.0|       1.0|
|    1.0|       1.0|
|    0.0|       0.0|
|    0.0|       0.0|
|    1.0|       0.0|
+-------+----------+
only showing top 10 rows



In [29]:
# Compute raw score on test data set

predictionAndLabels = prediction_test.select("Outcome","prediction").rdd

In [30]:
predictionAndLabels.collect()

[Row(Outcome=0.0, prediction=0.0),
 Row(Outcome=1.0, prediction=0.0),
 Row(Outcome=0.0, prediction=0.0),
 Row(Outcome=1.0, prediction=0.0),
 Row(Outcome=1.0, prediction=1.0),
 Row(Outcome=1.0, prediction=1.0),
 Row(Outcome=1.0, prediction=1.0),
 Row(Outcome=0.0, prediction=0.0),
 Row(Outcome=0.0, prediction=0.0),
 Row(Outcome=1.0, prediction=0.0),
 Row(Outcome=0.0, prediction=0.0),
 Row(Outcome=0.0, prediction=0.0),
 Row(Outcome=0.0, prediction=0.0),
 Row(Outcome=1.0, prediction=0.0),
 Row(Outcome=1.0, prediction=0.0),
 Row(Outcome=0.0, prediction=0.0),
 Row(Outcome=0.0, prediction=0.0),
 Row(Outcome=0.0, prediction=0.0),
 Row(Outcome=0.0, prediction=0.0),
 Row(Outcome=0.0, prediction=0.0),
 Row(Outcome=0.0, prediction=0.0),
 Row(Outcome=1.0, prediction=0.0),
 Row(Outcome=1.0, prediction=0.0),
 Row(Outcome=0.0, prediction=0.0),
 Row(Outcome=0.0, prediction=0.0),
 Row(Outcome=1.0, prediction=0.0),
 Row(Outcome=0.0, prediction=0.0),
 Row(Outcome=1.0, prediction=0.0),
 Row(Outcome=1.0, pr

In [31]:
metrics = BinaryClassificationMetrics(predictionAndLabels)



In [32]:
#Area under ROC curve

print("Area under ROC = %s" %metrics.areaUnderROC)

Area under ROC = 0.7578566384180792


In [33]:
# Check accuracy

evaluator = MulticlassClassificationEvaluator(labelCol='Outcome', predictionCol="prediction" , metricName='accuracy')
accuracy_LR = evaluator.evaluate(prediction_test)

print("accuracy = ", accuracy_LR)
                     
                                 
                                 
                                 

accuracy =  0.7468879668049793


# NaiveBayes

In [34]:
nb = NaiveBayes(featuresCol='Scaled_features', labelCol='Outcome',smoothing=1.0)

In [35]:
model = nb.fit(train)

In [36]:
# select example rows to display

prediction_test = model.transform(test)

In [37]:
prediction_test.show()

+--------------------+-------+--------------------+--------------------+----------+
|     Scaled_features|Outcome|       rawPrediction|         probability|prediction|
+--------------------+-------+--------------------+--------------------+----------+
|(8,[0,1,6,7],[2.0...|    0.0|[-18.017376848163...|[0.50941964807591...|       0.0|
|(8,[0,1,6,7],[2.9...|    1.0|[-21.727106182497...|[0.46359271267294...|       1.0|
|(8,[1,5,6,7],[3.0...|    0.0|[-15.810163801351...|[0.64918126532667...|       0.0|
|(8,[1,5,6,7],[3.7...|    1.0|[-17.708418706707...|[0.65592594256520...|       0.0|
|(8,[1,5,6,7],[4.0...|    1.0|[-21.741503489401...|[0.65539741888510...|       0.0|
|(8,[1,5,6,7],[4.5...|    1.0|[-26.272383877234...|[0.62904188707806...|       0.0|
|(8,[1,5,6,7],[5.2...|    1.0|[-26.565276269443...|[0.59018748414582...|       0.0|
|[0.0,2.9087389538...|    0.0|[-44.053418018752...|[0.77447832315169...|       0.0|
|[0.0,2.9712924797...|    0.0|[-35.121373209999...|[0.74792140314729...|    

In [38]:
predictionAndLabels = prediction_test.select("Outcome","prediction").rdd

In [39]:
# select (prediction true label)  and compute test error

evaluator = MulticlassClassificationEvaluator(labelCol="Outcome",predictionCol="prediction", metricName="accuracy")
accuracy_nb = evaluator.evaluate(prediction_test)

In [40]:
print("accuracy ",accuracy_nb)

accuracy  0.6224066390041494


In [42]:
metrics1 = BinaryClassificationMetrics(predictionAndLabels)

In [43]:
print("Area under ROC = %s" %metrics1.areaUnderROC)

Area under ROC = 0.8063829787234043


# GBT Classifier

In [44]:
gradient_boost_class = GBTClassifier(labelCol='Outcome', featuresCol='Scaled_features')

In [45]:
model = gradient_boost_class.fit(train)

In [46]:
prediction_test = model.transform(test)

In [47]:
prediction_test.show()

+--------------------+-------+--------------------+--------------------+----------+
|     Scaled_features|Outcome|       rawPrediction|         probability|prediction|
+--------------------+-------+--------------------+--------------------+----------+
|(8,[0,1,6,7],[2.0...|    0.0|[0.77113138564045...|[0.82379342485096...|       0.0|
|(8,[0,1,6,7],[2.9...|    1.0|[0.53881071651147...|[0.74604359776962...|       0.0|
|(8,[1,5,6,7],[3.0...|    0.0|[1.54783219690045...|[0.95671355086787...|       0.0|
|(8,[1,5,6,7],[3.7...|    1.0|[1.13902117730786...|[0.90704211613031...|       0.0|
|(8,[1,5,6,7],[4.0...|    1.0|[-1.2546490587156...|[0.07520891263918...|       1.0|
|(8,[1,5,6,7],[4.5...|    1.0|[-1.1936405960098...|[0.08414772520035...|       1.0|
|(8,[1,5,6,7],[5.2...|    1.0|[-0.9955266132359...|[0.12014548077372...|       1.0|
|[0.0,2.9087389538...|    0.0|[0.34901233015409...|[0.66774966852153...|       0.0|
|[0.0,2.9712924797...|    0.0|[1.40770652107501...|[0.94350305528235...|    

In [49]:
predictionAndLabels = prediction_test.select("outcome","prediction").rdd

In [50]:
metrics = BinaryClassificationMetrics(predictionAndLabels)

In [51]:
print("Area under ROC = %s" %metrics1.areaUnderROC)

Area under ROC = 0.8063829787234043


In [53]:
# select (prediction true label)  and compute test error

evaluator = MulticlassClassificationEvaluator(labelCol="Outcome",predictionCol="prediction",metricName="accuracy" )
accuracy_GBT = evaluator.evaluate(prediction_test)

In [54]:
print("accuracy = ",accuracy_GBT)

accuracy =  0.6804979253112033


# Rnadom Forest Classifier

In [55]:
rfClassifier = RandomForestClassifier(labelCol='Outcome', featuresCol='Scaled_features',numTrees=40)

In [56]:
model = rfClassifier.fit(train)

In [57]:
prediction_test = model.transform(test)

In [58]:
prediction_test.show(5)

+--------------------+-------+--------------------+--------------------+----------+
|     Scaled_features|Outcome|       rawPrediction|         probability|prediction|
+--------------------+-------+--------------------+--------------------+----------+
|(8,[0,1,6,7],[2.0...|    0.0|[28.0778814957584...|[0.70194703739396...|       0.0|
|(8,[0,1,6,7],[2.9...|    1.0|[30.4902611649443...|[0.76225652912360...|       0.0|
|(8,[1,5,6,7],[3.0...|    0.0|[38.5727522034549...|[0.96431880508637...|       0.0|
|(8,[1,5,6,7],[3.7...|    1.0|[32.7314357787334...|[0.81828589446833...|       0.0|
|(8,[1,5,6,7],[4.0...|    1.0|[23.9627124635999...|[0.59906781158999...|       0.0|
+--------------------+-------+--------------------+--------------------+----------+
only showing top 5 rows



In [59]:
prediction_test.select("Outcome","prediction").show(10)

+-------+----------+
|Outcome|prediction|
+-------+----------+
|    0.0|       0.0|
|    1.0|       0.0|
|    0.0|       0.0|
|    1.0|       0.0|
|    1.0|       0.0|
|    1.0|       1.0|
|    1.0|       1.0|
|    0.0|       0.0|
|    0.0|       0.0|
|    1.0|       0.0|
+-------+----------+
only showing top 10 rows



In [60]:
predictionAndLabels = prediction_test.select("outcome","prediction").rdd.map(lambda row: row[0:])

In [61]:
metrics2 = BinaryClassificationMetrics(predictionAndLabels)

In [None]:
#area under ROC curve

In [62]:
print("Area under ROC = %s" %metrics2.areaUnderROC)

Area under ROC = 0.7358442871587462


In [63]:
evaluator = MulticlassClassificationEvaluator(labelCol="Outcome",predictionCol="prediction",metricName="accuracy" )
accuracy_RFC = evaluator.evaluate(prediction_test)

In [64]:
print("accuracy = ",accuracy_RFC)

accuracy =  0.7344398340248963
