# **Classification with PySpark**

**Dataset:** Pima Indians Diabetes Database

# About Data

The datasets consists of several medical predictor variables and one target variable, Outcome. Predictor variables includes the number of pregnancies the patient has had, their BMI, insulin level, age, and so on.

Pregnancies : Number of times pregnant

Glucose : Plasma glucose concentration a 2 hours in an oral glucose tolerance test

BloodPressure: Diastolic blood pressure (mm Hg)

SkinThickness: Triceps skin fold thickness (mm)

Insulin : 2-Hour serum insulin (mu U/ml)

BMI : Body mass index (weight in kg/(height in m)²)

DiabetesPedigreeFunction : Diabetes pedigree function

Age : Age (years)

Outcome : Class variable (0 or 1) 268 of 768 are 1, the others are 0

# Necessary Installation

In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-3.1.1/spark-3.1.1-bin-hadoop2.7.tgz
!tar xf spark-3.1.1-bin-hadoop2.7.tgz

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop2.7"

In [3]:
import pandas as pd
import numpy as np

In [4]:
!pip install -q findspark
!pip install pyspark



In [5]:
from pyspark.sql import SparkSession

In [6]:
APP_NAME = "diabetes"
SPARK_URL = "local[*]"
SPARK_HOME = os.environ["SPARK_HOME"]

In [7]:
spark = SparkSession.builder.appName(APP_NAME).getOrCreate()

In [8]:
spark

# Data Preparation

In [9]:
diabetes = spark.read.format('csv').option("header", "true").option("inferSchema", "true").load('/content/diabetes.csv')

In [10]:
diabetes.show()

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|          6|    148|           72|           35|      0|33.6|                   0.627| 50|      1|
|          1|     85|           66|           29|      0|26.6|                   0.351| 31|      0|
|          8|    183|           64|            0|      0|23.3|                   0.672| 32|      1|
|          1|     89|           66|           23|     94|28.1|                   0.167| 21|      0|
|          0|    137|           40|           35|    168|43.1|                   2.288| 33|      1|
|          5|    116|           74|            0|      0|25.6|                   0.201| 30|      0|
|          3|     78|           50|           32|     88|31.0|                   0.248| 26|      1|


In [11]:
print("There are", diabetes.count(), "rows", len(diabetes.columns), "Columns")

There are 768 rows 9 Columns


In [12]:
diabetes.printSchema()

root
 |-- Pregnancies: integer (nullable = true)
 |-- Glucose: integer (nullable = true)
 |-- BloodPressure: integer (nullable = true)
 |-- SkinThickness: integer (nullable = true)
 |-- Insulin: integer (nullable = true)
 |-- BMI: double (nullable = true)
 |-- DiabetesPedigreeFunction: double (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Outcome: integer (nullable = true)



In [13]:
# describe() function to extract the statistics of the numerical columns in the table
numeric= [t[0] for t in diabetes.dtypes if t[1] =='int']
diabetes.select(numeric).describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
Pregnancies,768,3.8450520833333335,3.36957806269887,0,17
Glucose,768,120.89453125,31.97261819513622,0,199
BloodPressure,768,69.10546875,19.355807170644777,0,122
SkinThickness,768,20.536458333333332,15.952217567727642,0,99
Insulin,768,79.79947916666667,115.24400235133803,0,846
Age,768,33.240885416666664,11.760231540678689,21,81
Outcome,768,0.3489583333333333,0.476951377242799,0,1


In [14]:
diabetes.groupBy('Outcome').count().show()

+-------+-----+
|Outcome|count|
+-------+-----+
|      1|  268|
|      0|  500|
+-------+-----+



In [15]:
from pyspark.sql.functions import isnan, when, count, col
diabetes.select([count(when(isnan(c), c)).alias(c) for c in diabetes.columns]).toPandas().head()

Unnamed: 0,Pregnancies,Glucose,BloodPressure,SkinThickness,Insulin,BMI,DiabetesPedigreeFunction,Age,Outcome
0,0,0,0,0,0,0,0,0,0


In [16]:
# We have used userdefinded function to change the “Outcome” values from 0 to “No”, and from 1 to “Yes”

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
y_udf = udf(lambda y: "No" if y==0 else "yes", StringType())

df= diabetes.withColumn('HasDiabities', y_udf('OutCome')).drop('OutCome')


In [17]:
df.printSchema()

root
 |-- Pregnancies: integer (nullable = true)
 |-- Glucose: integer (nullable = true)
 |-- BloodPressure: integer (nullable = true)
 |-- SkinThickness: integer (nullable = true)
 |-- Insulin: integer (nullable = true)
 |-- BMI: double (nullable = true)
 |-- DiabetesPedigreeFunction: double (nullable = true)
 |-- Age: integer (nullable = true)
 |-- HasDiabities: string (nullable = true)



In [26]:
def udf_multiple(age):
  if (age <=25):
    return 'Under 25'
  elif (age >=25 and age<=35):
    return 'Between 25 and 35'
  elif (age >=35 and age < 50):
    return 'Between 36 and 49'
  elif (age >50):
    return 'Over 50'
  else: return 'N/A'

e_udf = udf(udf_multiple)
df=df.withColumn("Age_udf", e_udf('Age'))

In [27]:
df.show()

+-----------+-------+-------------+-------------+-------+----+------------------------+---+------------+-----------------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|HasDiabities|          Age_udf|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+------------+-----------------+
|          6|    148|           72|           35|      0|33.6|                   0.627| 50|         yes|              N/A|
|          1|     85|           66|           29|      0|26.6|                   0.351| 31|          No|Between 25 and 35|
|          8|    183|           64|            0|      0|23.3|                   0.672| 32|         yes|Between 25 and 35|
|          1|     89|           66|           23|     94|28.1|                   0.167| 21|          No|         Under 25|
|          0|    137|           40|           35|    168|43.1|                   2.288| 33|         yes|Between 25 and 35|
|          5|   

In [28]:
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler

# Your code here

categoricalColumns= ['Age_udf']
stages=[]
for categoricalCol in categoricalColumns:
  stringIndexer = StringIndexer(inputCol= categoricalCol, outputCol = categoricalCol+ 'Index')
  encoder = OneHotEncoder(inputCols=[stringIndexer.getOutputCol()], outputCols= [categoricalCol+"classVec"])
  stages+= [stringIndexer, encoder]
label_stringIdx= StringIndexer(inputCol = 'HasDiabities', outputCol = 'label')
stages +=[label_stringIdx]  
numericCols= ['Pregnancies', 'Glucose', 'BloodPressure', 'SkinThickness', 'Insulin','BMI','DiabetesPedigreeFunction']
assemblerInputs = [c + 'classVec' for c in categoricalColumns] + numericCols
assembler = VectorAssembler(inputCols = assemblerInputs, outputCol = "features")
stages+= [assembler]

In [30]:
pipeline= Pipeline(stages = stages)
pipelineModel = pipeline.fit(df)
df_transformed = pipelineModel.transform(df)

In [32]:
df_transformed.show(5)

+-----------+-------+-------------+-------------+-------+----+------------------------+---+------------+-----------------+------------+---------------+-----+--------------------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|HasDiabities|          Age_udf|Age_udfIndex|Age_udfclassVec|label|            features|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+------------+-----------------+------------+---------------+-----+--------------------+
|          6|    148|           72|           35|      0|33.6|                   0.627| 50|         yes|              N/A|         4.0|      (4,[],[])|  1.0|(11,[4,5,6,7,9,10...|
|          1|     85|           66|           29|      0|26.6|                   0.351| 31|          No|Between 25 and 35|         1.0|  (4,[1],[1.0])|  0.0|[0.0,1.0,0.0,0.0,...|
|          8|    183|           64|            0|      0|23.3|                   0.672| 32|         yes|B

In [33]:
cols= df.columns
selectedcols = ['label','features']+cols
df= df_transformed.select(selectedcols)
df.printSchema()

root
 |-- label: double (nullable = false)
 |-- features: vector (nullable = true)
 |-- Pregnancies: integer (nullable = true)
 |-- Glucose: integer (nullable = true)
 |-- BloodPressure: integer (nullable = true)
 |-- SkinThickness: integer (nullable = true)
 |-- Insulin: integer (nullable = true)
 |-- BMI: double (nullable = true)
 |-- DiabetesPedigreeFunction: double (nullable = true)
 |-- Age: integer (nullable = true)
 |-- HasDiabities: string (nullable = true)
 |-- Age_udf: string (nullable = true)



In [34]:
train, test = df.randomSplit([0.8, 0.2], seed= 99999)
print("Training Dataset count: ", str(train.count()))
print("Test Dataset count: ", str(test.count()))

Training Dataset count:  617
Test Dataset count:  151


# Models

## LogisticRegression

In [35]:
from pyspark.ml.classification import LogisticRegression

In [36]:
lr= LogisticRegression(featuresCol='features', labelCol='label', maxIter=5)
lrmodel= lr.fit(train)

In [37]:
lrmodel.summary.accuracy

0.7406807131280388

In [38]:
prediction= lrmodel.transform(test)
prediction.select('label', 'features', 'rawPrediction', 'prediction', 'probability').toPandas().head(5)

Unnamed: 0,label,features,rawPrediction,prediction,probability
0,0.0,"(1.0, 0.0, 0.0, 0.0, 2.0, 90.0, 60.0, 0.0, 0.0...","[2.383231419263786, -2.383231419263786]",0.0,"[0.9155396453519554, 0.08446035464804458]"
1,0.0,"(1.0, 0.0, 0.0, 0.0, 2.0, 91.0, 62.0, 0.0, 0.0...","[2.226893311514571, -2.226893311514571]",0.0,"[0.902638678029611, 0.09736132197038905]"
2,0.0,"(1.0, 0.0, 0.0, 0.0, 2.0, 111.0, 60.0, 0.0, 0....","[1.8809071371086512, -1.8809071371086512]",0.0,"[0.8677152873526073, 0.13228471264739272]"
3,0.0,"(1.0, 0.0, 0.0, 0.0, 3.0, 80.0, 0.0, 0.0, 0.0,...","[1.075615097595022, -1.075615097595022]",0.0,"[0.7456632847348876, 0.25433671526511237]"
4,0.0,"(1.0, 0.0, 0.0, 0.0, 7.0, 105.0, 0.0, 0.0, 0.0...","[0.5641027934161749, -0.5641027934161749]",0.0,"[0.6374013150218999, 0.3625986849781001]"


In [40]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluator = BinaryClassificationEvaluator()
print('Test Area Under ROC', evaluator.evaluate(prediction))

Test Area Under ROC 0.7983682983682975


In [42]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

ParamGrid = (ParamGridBuilder().addGrid(lr.regParam, [0.01, 0.5, 2]).addGrid(lr.elasticNetParam, [0.0, 0.5,1.0]).addGrid(lr.maxIter,[1,5,10]).build())


In [43]:
cv = CrossValidator(estimator=lr, estimatorParamMaps=ParamGrid, evaluator=evaluator, numFolds=5)
cvModel= cv.fit(train)

In [44]:
#Best Model Performance
Predictions= cvModel.transform(test)
print("Best Model Test Area Under ROC", evaluator.evaluate(Predictions))

Best Model Test Area Under ROC 0.853729603729603


In [50]:
#Best Model Feature Weights
from pyspark.sql import SQLContext
sqlContext = SQLContext(spark)
weights = cvModel.bestModel.coefficients
weights= [(float(w),) for w in weights]

weightsDF = sqlContext.createDataFrame(weights, ["Feature Weight"])

weightsDF.toPandas().head(5)

Unnamed: 0,Feature Weight
0,-0.281617
1,-0.00431
2,0.206675
3,0.125383
4,0.027253


In [None]:
#Best Model Parameters

In [51]:
best_model= cvModel.bestModel

print('Best Param (regParam): ',best_model._java_obj.getRegParam())
print('Best Param (MaxIter): ', best_model._java_obj.getMaxIter())
print('Best Param (eleasticNetParam): ', best_model._java_obj.getElasticNetParam())

Best Param (regParam):  0.5
Best Param (MaxIter):  10
Best Param (eleasticNetParam):  0.0


## DecisionTreeClassifier

In [53]:
from pyspark.ml.classification import DecisionTreeClassifier
dc = DecisionTreeClassifier()

In [56]:
ParamGrid = (ParamGridBuilder().addGrid(dc.maxDepth, [2,3,5,10,15, 20]).addGrid(dc.maxBins, [5,10,20,30,40,50]).build())
cv = CrossValidator(estimator=dc, estimatorParamMaps=ParamGrid, evaluator=evaluator, numFolds=5)
cvModel= cv.fit(train)

#Best Model Performance
Predictions= cvModel.transform(test)
print("Best Model Test Area Under ROC", evaluator.evaluate(Predictions))

Best Model Test Area Under ROC 0.7142579642579643


In [74]:
#Best Model Parameters
best_model= cvModel.bestModel


print('Best Param (maxDepth): ',best_model._java_obj.getMaxDepth())
print('Best Param (maxBins): ', best_model._java_obj.getMaxBins())



Best Param (maxDepth):  10
Best Param (maxBins):  10


## RandomForestClassifier

In [76]:
from pyspark.ml.classification import RandomForestClassifier
rfc= RandomForestClassifier()

In [77]:


ParamGrid = (ParamGridBuilder().addGrid(rfc.numTrees, [2,3,5,10,15,20]).addGrid(rfc.maxDepth, [2,3,5,10,15, 20]).addGrid(rfc.maxBins, [5,10,20,30,40,50]).build())
cv = CrossValidator(estimator=rfc, estimatorParamMaps=ParamGrid, evaluator=evaluator, numFolds=5)
cvModel= cv.fit(train)

#Best Model Performance
Predictions= cvModel.transform(test)
print("Best Model Test Area Under ROC", evaluator.evaluate(Predictions))

Best Model Test Area Under ROC 0.827214452214452


In [79]:
#Best Model Parameters
best_model= cvModel.bestModel


print('Best Param (maxDepth): ',best_model._java_obj.getMaxDepth())
print('Best Param (maxBins): ', best_model._java_obj.getMaxBins())


Best Param (maxDepth):  5
Best Param (maxBins):  40


## Gradient-boosted tree classifier

In [83]:
from pyspark.ml.classification import GBTClassifier
gbt= GBTClassifier()

In [85]:
ParamGrid = (ParamGridBuilder().addGrid(gbt.maxIter, [5,10,15,20,25]).build())
cv = CrossValidator(estimator=gbt, estimatorParamMaps=ParamGrid, evaluator=evaluator, numFolds=5)
cvModel= cv.fit(train)

#Best Model Performance
Predictions= cvModel.transform(test)
print("Best Model Test Area Under ROC", evaluator.evaluate(Predictions))

Best Model Test Area Under ROC 0.7855477855477851


In [86]:
#Best Model Parameters
best_model= cvModel.bestModel


print('Best Param (MaxIter): ', best_model._java_obj.getMaxIter())



Best Param (MaxIter):  20


## Linear Support Vector Machine

In [103]:
from pyspark.ml.classification import LinearSVC
l_SVC =LinearSVC()

In [104]:
ParamGrid = (ParamGridBuilder().addGrid(l_SVC.maxIter, [2,3,5,10,15, 20]).addGrid(l_SVC.regParam, [0.1, 0.2,0.5]).build())
cv = CrossValidator(estimator=l_SVC, estimatorParamMaps=ParamGrid, evaluator=evaluator, numFolds=5)
cvModel= cv.fit(train)

#Best Model Performance
Predictions= cvModel.transform(test)
print("Best Model Test Area Under ROC", evaluator.evaluate(Predictions))

Best Model Test Area Under ROC 0.7299922299922301


In [116]:
#Best Model Parameters
best_model= cvModel.bestModel


print('Best Param (maxIter): ',best_model._java_obj.getMaxIter())
print('Best Param (regParam): ', best_model._java_obj.getRegParam())


Best Param (maxIter):  20
Best Param (regParam):  0.1


In [115]:
from pyspark.ml.classification import LogisticRegression, OneVsRest
lr =LogisticRegression()
ovr=OneVsRest().setClassifier(lr)


In [122]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
Model= ovr.fit(train)

#Best Model Performance
Predictions= Model.transform(test)
evaluator=MulticlassClassificationEvaluator()
print("Test Area Under ROC", evaluator.evaluate(Predictions))

Test Area Under ROC 0.7606037096586798


## Naive Bayes

In [126]:
from pyspark.ml.classification import NaiveBayes
nb= NaiveBayes()

In [127]:
Model= nb.fit(train)

#Best Model Performance
Predictions= Model.transform(test)
evaluator=MulticlassClassificationEvaluator()
print("Test Area Under ROC", evaluator.evaluate(Predictions))

Test Area Under ROC 0.5854209416256826


## Factorization machines classifier

In [128]:
from pyspark.ml.classification import FMClassifier
fmc= FMClassifier()

In [129]:
ParamGrid = (ParamGridBuilder().addGrid(fmc.stepSize, [0.001,0.01,0.1]).build())
cv = CrossValidator(estimator=fmc, estimatorParamMaps=ParamGrid, evaluator=evaluator, numFolds=5)
cvModel= cv.fit(train)

#Best Model Performance
Predictions= cvModel.transform(test)
print("Best Model Test Area Under ROC", evaluator.evaluate(Predictions))

Best Model Test Area Under ROC 0.7387148738142116
