## Install and Initialize

In [None]:
!apt update > /dev/null
!apt install openjdk-8-jdk-headless -qq > /dev/null







## Get the latest version of Spark

In [None]:
!wget -q http://apache.osuosl.org/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz

!tar xf spark-3.1.2-bin-hadoop3.2.tgz
!pip install -q pyspark

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop3.2"

[K     |████████████████████████████████| 281.4 MB 30 kB/s 
[K     |████████████████████████████████| 198 kB 44.2 MB/s 
[?25h  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


## Import SparkSession from pyspark library

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").config('spark.ui.port', '4050').getOrCreate()

In [None]:
# The spark Console UI is available in the link that will be displayed in this cell
# If you do not wish to use the Console, you may skip the Tunnel part

sc = spark.sparkContext
sc

## Importing the necessary libraries

In [None]:
from pyspark.ml import Pipeline
from pyspark.sql.functions import mean, col, split, col, regexp_extract, when, lit
from pyspark.ml.feature import StringIndexer, IndexToString, VectorAssembler, VectorIndexer
from pyspark.ml.feature import QuantileDiscretizer, OneHotEncoder

from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.evaluation import BinaryClassificationEvaluator

from pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier, RandomForestClassifier

from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

## Importing the data file from local machine

In [None]:
from google.colab import files
 
 
uploaded = files.upload()

Saving bank-full.csv to bank-full.csv


In [None]:
import io
data = io.BytesIO(uploaded['bank-full.csv'])

In [None]:
bank_df = spark.read.csv('bank-full.csv', header = 'True', inferSchema = 'True',  sep = ';')

## Checking the data types of each column

In [None]:
bank_df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- education: string (nullable = true)
 |-- default: string (nullable = true)
 |-- balance: integer (nullable = true)
 |-- housing: string (nullable = true)
 |-- loan: string (nullable = true)
 |-- contact: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- duration: integer (nullable = true)
 |-- campaign: integer (nullable = true)
 |-- pdays: integer (nullable = true)
 |-- previous: integer (nullable = true)
 |-- poutcome: string (nullable = true)
 |-- y: string (nullable = true)



## Check the contents of the DataFrame

In [None]:
bank_df.show()

+---+------------+--------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+---+
|age|         job| marital|education|default|balance|housing|loan|contact|day|month|duration|campaign|pdays|previous|poutcome|  y|
+---+------------+--------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+---+
| 58|  management| married| tertiary|     no|   2143|    yes|  no|unknown|  5|  may|     261|       1|   -1|       0| unknown| no|
| 44|  technician|  single|secondary|     no|     29|    yes|  no|unknown|  5|  may|     151|       1|   -1|       0| unknown| no|
| 33|entrepreneur| married|secondary|     no|      2|    yes| yes|unknown|  5|  may|      76|       1|   -1|       0| unknown| no|
| 47| blue-collar| married|  unknown|     no|   1506|    yes|  no|unknown|  5|  may|      92|       1|   -1|       0| unknown| no|
| 33|     unknown|  single|  unknown|     no|      1|     no|  no|unknown|  5|  may

## Checking the Count and verifying the same

In [None]:
bank_df.count()

45211

## Checking the summary statistics

In [None]:
bank_df.describe([x[0] for x in bank_df.dtypes if x[1] == 'int']).show()

+-------+------------------+------------------+-----------------+------------------+-----------------+------------------+------------------+
|summary|               age|           balance|              day|          duration|         campaign|             pdays|          previous|
+-------+------------------+------------------+-----------------+------------------+-----------------+------------------+------------------+
|  count|             45211|             45211|            45211|             45211|            45211|             45211|             45211|
|   mean| 40.93621021432837|1362.2720576850766|15.80641879188693| 258.1630797814691|2.763840658246887| 40.19782796222158|0.5803233726305546|
| stddev|10.618762040975408|3044.7658291685243|8.322476153044596|257.52781226517095|3.098020883279184|100.12874599059813|2.3034410449312204|
|    min|                18|             -8019|                1|                 0|                1|                -1|                 0|
|    max|    

## Renaming our target column (named as 'y') as deposit as the naming convention is not right

In [None]:
bank_df = bank_df.withColumnRenamed("y","deposit")

bank_df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- education: string (nullable = true)
 |-- default: string (nullable = true)
 |-- balance: integer (nullable = true)
 |-- housing: string (nullable = true)
 |-- loan: string (nullable = true)
 |-- contact: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- duration: integer (nullable = true)
 |-- campaign: integer (nullable = true)
 |-- pdays: integer (nullable = true)
 |-- previous: integer (nullable = true)
 |-- poutcome: string (nullable = true)
 |-- deposit: string (nullable = true)



## Number of customers who subscribed and did not subscribe for a term deposit

In [None]:
groupBy_clients = bank_df.groupBy("deposit").count()

In [None]:
groupBy_clients.show()

+-------+-----+
|deposit|count|
+-------+-----+
|     no|39922|
|    yes| 5289|
+-------+-----+



## Data Preprocessing

### The following function code indexes each categorical column using the StringIndexer, and then converts the indexed categories into one-hot encoded variables. The resulting output has the binary vectors appended to the end of each row.

### We then Run the stages as a Pipeline. This puts the data through all of the feature transformations we described in a single call.

### Reference taken from the link 'https://runawayhorse001.github.io/LearningApacheSpark/classification.html'

In [None]:
def get_dummy(df, categoricalCols, continuousCols, labelCol):
  
  indexers = [StringIndexer(inputCol=c, outputCol="{0}_indexed".format(c)) for c in categoricalCols]

  encoders = [OneHotEncoder(inputCol=indexer.getOutputCol(),
                             outputCol="{0}_encoded".format(indexer.getOutputCol()))
              for indexer in indexers]

  assembler = VectorAssembler(inputCols=[encoder.getOutputCol() for encoder in encoders]
                              + continuousCols, outputCol="features")
  
  indexer = StringIndexer(inputCol=labelCol, outputCol='indexedLabel')

  pipeline = Pipeline(stages = indexers + encoders + [assembler] + [indexer])

  model=pipeline.fit(df)
  data = model.transform(df)

  data = data.withColumn('label', col(labelCol))
  
  return data.select('features', 'indexedLabel', 'label'), StringIndexer(inputCol='label').fit(data)

## we have defined our lists of categorical as well as numerical variables. Now we can got ahead and transform the data

In [None]:
categoricalColumns = ['job', 'marital', 'education', 'default', 'housing', 'loan', 'contact', 'poutcome']
numericCols = ['age', 'balance', 'duration', 'campaign', 'pdays', 'previous']
(bank_df, labelindexer) = get_dummy(bank_df, categoricalColumns, numericCols, 'deposit')

In [None]:
bank_df.show(5)

+--------------------+------------+-----+
|            features|indexedLabel|label|
+--------------------+------------+-----+
|(30,[1,11,14,16,1...|         0.0|   no|
|(30,[2,12,13,16,1...|         0.0|   no|
|(30,[7,11,13,16,1...|         0.0|   no|
|(30,[0,11,16,17,1...|         0.0|   no|
|(30,[12,16,18,20,...|         0.0|   no|
+--------------------+------------+-----+
only showing top 5 rows



## It is essential to fit the following featureIndexer model on the whole of the bank_df dataframe. This will automatically identify categorical features, and index them. Set maxCategories so features with > 4 distinct values are treated as continuous.

In [None]:
featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(bank_df)

featureIndexer.transform(bank_df).show(5)

+--------------------+------------+-----+--------------------+
|            features|indexedLabel|label|     indexedFeatures|
+--------------------+------------+-----+--------------------+
|(30,[1,11,14,16,1...|         0.0|   no|(30,[1,11,14,16,1...|
|(30,[2,12,13,16,1...|         0.0|   no|(30,[2,12,13,16,1...|
|(30,[7,11,13,16,1...|         0.0|   no|(30,[7,11,13,16,1...|
|(30,[0,11,16,17,1...|         0.0|   no|(30,[0,11,16,17,1...|
|(30,[12,16,18,20,...|         0.0|   no|(30,[12,16,18,20,...|
+--------------------+------------+-----+--------------------+
only showing top 5 rows



In [None]:
bank_df.show(5, False)

+---------------------------------------------------------------------------------------------------------+------------+-----+
|features                                                                                                 |indexedLabel|label|
+---------------------------------------------------------------------------------------------------------+------------+-----+
|(30,[1,11,14,16,17,18,20,21,24,25,26,27,28],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,58.0,2143.0,261.0,1.0,-1.0])|0.0         |no   |
|(30,[2,12,13,16,17,18,20,21,24,25,26,27,28],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,44.0,29.0,151.0,1.0,-1.0])  |0.0         |no   |
|(30,[7,11,13,16,17,20,21,24,25,26,27,28],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,33.0,2.0,76.0,1.0,-1.0])           |0.0         |no   |
|(30,[0,11,16,17,18,20,21,24,25,26,27,28],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,47.0,1506.0,92.0,1.0,-1.0])        |0.0         |no   |
|(30,[12,16,18,20,21,24,25,26,27,28],[1.0,1.0,1.0,1.0,1.0,33.0,1.0,198.0,1.0,-1.0])                       |0.0 

## Splitting the Data

### Split the data randomly in training and testing sets

In [None]:
(trainingData, testData) = bank_df.randomSplit([0.8, 0.2], seed=10)
print("Training Dataset Count: " + str(trainingData.count()))
print("Test Dataset Count: " + str(testData.count()))

Training Dataset Count: 36169
Test Dataset Count: 9042


## Checking the 1st 10 samples of training set

In [None]:
print("The first 10 samples of the Training Dataset:")
trainingData.show(10, False)

The first 10 samples of the Training Dataset:
+---------------------------------------------------------------------------------------------------------+------------+-----+
|features                                                                                                 |indexedLabel|label|
+---------------------------------------------------------------------------------------------------------+------------+-----+
|(30,[0,11,13,16,17,18,19,21,24,25,26,27,28],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,24.0,-220.0,90.0,1.0,-1.0]) |0.0         |no   |
|(30,[0,11,13,16,17,18,19,21,24,25,26,27,28],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,24.0,1470.0,212.0,1.0,-1.0])|0.0         |no   |
|(30,[0,11,13,16,17,18,19,21,24,25,26,27,28],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,25.0,2.0,102.0,2.0,-1.0])   |0.0         |no   |
|(30,[0,11,13,16,17,18,19,21,24,25,26,27,28],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,25.0,59.0,246.0,1.0,-1.0])  |0.0         |no   |
|(30,[0,11,13,16,17,18,19,21,24,25,26,27,28],[1.0,1.0,1.0,1.0,1.0

## Checking the 1st 10 samples of test set

In [None]:
print("The first 10 samples of the Test Dataset:")
testData.show(10, False)

The first 10 samples of the Test Dataset:
+----------------------------------------------------------------------------------------------------------+------------+-----+
|features                                                                                                  |indexedLabel|label|
+----------------------------------------------------------------------------------------------------------+------------+-----+
|(30,[0,11,13,16,17,18,19,21,24,25,26,27,28],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,24.0,1222.0,369.0,1.0,-1.0]) |0.0         |no   |
|(30,[0,11,13,16,17,18,19,21,24,25,26,27,28],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,25.0,-23.0,936.0,1.0,-1.0])  |0.0         |no   |
|(30,[0,11,13,16,17,18,19,21,24,25,26,27,28],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,25.0,148.0,119.0,1.0,-1.0])  |0.0         |no   |
|(30,[0,11,13,16,17,18,19,21,24,25,26,27,28],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,26.0,622.0,471.0,2.0,-1.0])  |0.0         |no   |
|(30,[0,11,13,16,17,18,19,21,24,25,26,27,28],[1.0,1.0,1.0,1.0,

## Evaluate Machine Learning Algorithms

### The below steps are used to build the models:

### Create initial model using the training set
### Tune parameters with a ParamGrid and 5-fold Cross Validation
### Evaluate the best model obtained from the Cross Validation using the test set

### We will use some functions to evaluate our models like the BinaryClassificationEvaluator which uses areaUnderROC as the default metric.

### Let us fit a logistic regression model. Train the same with our training data

In [None]:
lr = LogisticRegression(labelCol="indexedLabel", featuresCol="features")

In [None]:
## Converting indexed labels back to original labels

labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel", labels=labelindexer.labels)

In [None]:
## Chaining indexers and tree in a Pipeline

pipeline = Pipeline(stages=[featureIndexer, lr, labelConverter])

In [None]:
## Training the model on our trainig set

lrModel = pipeline.fit(trainingData)

## Making predictions on the test data using the transform() method. LogisticRegression.transform() will only use the column given in featuresCol parameter.

In [None]:
predictions = lrModel.transform(testData)

## Let us now check the predictions

In [None]:
predictions.show(10)

+--------------------+------------+-----+--------------------+--------------------+--------------------+----------+--------------+
|            features|indexedLabel|label|     indexedFeatures|       rawPrediction|         probability|prediction|predictedLabel|
+--------------------+------------+-----+--------------------+--------------------+--------------------+----------+--------------+
|(30,[0,11,13,16,1...|         0.0|   no|(30,[0,11,13,16,1...|[2.53914950939763...|[0.92684117862563...|       0.0|            no|
|(30,[0,11,13,16,1...|         0.0|   no|(30,[0,11,13,16,1...|[0.27682534642401...|[0.56876774409702...|       0.0|            no|
|(30,[0,11,13,16,1...|         0.0|   no|(30,[0,11,13,16,1...|[3.56698071014976...|[0.97253465576094...|       0.0|            no|
|(30,[0,11,13,16,1...|         0.0|   no|(30,[0,11,13,16,1...|[2.24351863333884...|[0.90408999758248...|       0.0|            no|
|(30,[0,11,13,16,1...|         1.0|  yes|(30,[0,11,13,16,1...|[-0.3669208225733...|

## Let us view the model's predictions and probabilities of each prediction class

In [None]:
predictions.select("features", "label", "probability", "predictedLabel").show(10)

+--------------------+-----+--------------------+--------------+
|            features|label|         probability|predictedLabel|
+--------------------+-----+--------------------+--------------+
|(30,[0,11,13,16,1...|   no|[0.92684117862563...|            no|
|(30,[0,11,13,16,1...|   no|[0.56876774409702...|            no|
|(30,[0,11,13,16,1...|   no|[0.97253465576094...|            no|
|(30,[0,11,13,16,1...|   no|[0.90408999758248...|            no|
|(30,[0,11,13,16,1...|  yes|[0.40928526839628...|           yes|
|(30,[0,11,13,16,1...|   no|[0.92645279294536...|            no|
|(30,[0,11,13,16,1...|   no|[0.95465600798036...|            no|
|(30,[0,11,13,16,1...|   no|[0.92994236996810...|            no|
|(30,[0,11,13,16,1...|   no|[0.97878351728092...|            no|
|(30,[0,11,13,16,1...|   no|[0.98615327265024...|            no|
+--------------------+-----+--------------------+--------------+
only showing top 10 rows



## Computing the model accuracy

In [None]:
cm = predictions.select("label", "predictedLabel")          
cm.groupby('label').agg({'label': 'count'}).show()  
cm.groupby('predictedLabel').agg({'predictedLabel': 'count'}).show()

+-----+------------+
|label|count(label)|
+-----+------------+
|   no|        8017|
|  yes|        1025|
+-----+------------+

+--------------+---------------------+
|predictedLabel|count(predictedLabel)|
+--------------+---------------------+
|            no|                 8534|
|           yes|                  508|
+--------------+---------------------+



In [None]:
predictions.groupBy('label', 'predictedLabel').count().show()

+-----+--------------+-----+
|label|predictedLabel|count|
+-----+--------------+-----+
|   no|            no| 7830|
|   no|           yes|  187|
|  yes|           yes|  321|
|  yes|            no|  704|
+-----+--------------+-----+



## For instance, in the test dataset there are 1025 customers that have the intension to subscribe a deposit and 8017 who does not. The classifier, however, predicted 508 clients having the intention to subscribe a deposit. It is possible to compute the accuracy of the model by computing the count when the labels are correctly classified over the total number of rows.

In [None]:
print("The Accuracy for test set is {}".format(cm.filter(cm.label == cm.predictedLabel).count()/cm.count()))

The Accuracy for test set is 0.9014598540145985


## The accuracy of the model and other metrics computed using the MulticlassClassificationEvaluator() function

In [None]:
evaluator = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
print("The Accuracy for test set is {}".format(evaluator.evaluate(predictions)))

The Accuracy for test set is 0.9014598540145985


## Let us generate a Confusion Matrix to better see the results of the predictions. ConfusionMatrix() works only with RDDs, so we will have to convert our DataFrame of (prediction, label) into a RDD. 

In [None]:
from pyspark.mllib.evaluation import BinaryClassificationMetrics
from pyspark.mllib.evaluation import MulticlassMetrics

In [None]:
predictionAndLabel = predictions.select("prediction", "indexedLabel").rdd

In [None]:
# Instantiate metrics object 

metricsMulti = MulticlassMetrics(predictionAndLabel)
metricsBinary= BinaryClassificationMetrics(predictionAndLabel)



In [None]:
# Overall statistics

confusionMatrix = metricsMulti.confusionMatrix()
precision = metricsMulti.precision(label=1) 
recall = metricsMulti.recall(label=1)

In [None]:
print("Summary Stats")
print("Confusion Matrix = \n %s" % confusionMatrix)
print("Precision = %s" % precision) 
print("Recall = %s" % recall)

Summary Stats
Confusion Matrix = 
 DenseMatrix([[7830.,  187.],
             [ 704.,  321.]])
Precision = 0.6318897637795275
Recall = 0.3131707317073171


In [None]:
# Area under precision-recall curve

print("Area under PR = %s" % metricsBinary.areaUnderPR) 

# Area under ROC curve

print("Area under ROC = %s" % metricsBinary.areaUnderROC)

Area under PR = 0.45381901211965736
Area under ROC = 0.6449226491267033


## Let us now fine tune the model. Performing Hyperparameter tuning using 5-fold cross validation



In [None]:
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.01, 0.5, 2.0])
             .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])
             .addGrid(lr.maxIter, [1, 5, 10])
             .build())

evaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction",labelCol="indexedLabel")

## Create and run 5-fold CrossValidator

In [None]:
pipeline = Pipeline(stages=[featureIndexer, lr, labelConverter]) 
cv = CrossValidator(estimator=pipeline, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5, parallelism=10, seed=100)
cvModel = cv.fit(trainingData)

## CV Model uses the best model found from the Cross Validation. Use test or new data to measure the accuracy of the model

In [None]:
predictions = cvModel.transform(testData)

predictions.select("features", "label", "probability", "predictedLabel").show(10)

+--------------------+-----+--------------------+--------------+
|            features|label|         probability|predictedLabel|
+--------------------+-----+--------------------+--------------+
|(30,[0,11,13,16,1...|   no|[0.92851810967124...|            no|
|(30,[0,11,13,16,1...|   no|[0.59534321366419...|            no|
|(30,[0,11,13,16,1...|   no|[0.97228751332930...|            no|
|(30,[0,11,13,16,1...|   no|[0.90520770026427...|            no|
|(30,[0,11,13,16,1...|  yes|[0.43276525947074...|           yes|
|(30,[0,11,13,16,1...|   no|[0.92672214395198...|            no|
|(30,[0,11,13,16,1...|   no|[0.95503335557052...|            no|
|(30,[0,11,13,16,1...|   no|[0.93145483376752...|            no|
|(30,[0,11,13,16,1...|   no|[0.97443451729485...|            no|
|(30,[0,11,13,16,1...|   no|[0.98423520085167...|            no|
+--------------------+-----+--------------------+--------------+
only showing top 10 rows



## Evaluating the best model

In [None]:
evaluator = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
print("The Accuracy for test set is {}".format(evaluator.evaluate(predictions)))

The Accuracy for test set is 0.8965936739659367


In [None]:
predictionAndLabel = predictions.select("prediction", "indexedLabel").rdd

# Instantiate metrics object 
metricsMulti = MulticlassMetrics(predictionAndLabel)
metricsBinary= BinaryClassificationMetrics(predictionAndLabel)
# Overall statistics 
confusionMatrix = metricsMulti.confusionMatrix()
precision = metricsMulti.precision(label=1) 
recall = metricsMulti.recall(label=1) 
print("Summary Stats")
print("Confusion Matrix = \n %s" % confusionMatrix)
print("Precision = %s" % precision) 
print("Recall = %s" % recall)



Summary Stats
Confusion Matrix = 
 DenseMatrix([[7890.,  127.],
             [ 808.,  217.]])
Precision = 0.6308139534883721
Recall = 0.21170731707317073


In [None]:
# Area under precision-recall curve

print("Area under PR = %s" % metricsBinary.areaUnderPR) 

# Area under ROC curve
 
print("Area under ROC = %s" % metricsBinary.areaUnderROC)

Area under PR = 0.42686132202366145
Area under ROC = 0.5979329899573164


## Let us now fit a decision tree model to see how it performs

## https://spark.apache.org/docs/latest/mllib-decision-tree.html

In [None]:
# Create initial Decision Tree Model

dt = DecisionTreeClassifier(labelCol="indexedLabel", featuresCol="features")

In [None]:
# Train model with Training Data

dtModel = dt.fit(trainingData)

In [None]:
# Make predictions on test data

predictions = dtModel.transform(testData)

In [None]:
# Evaluate the model by computing the metrics. 
evaluator = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
print("The Accuracy for test set is {}".format(evaluator.evaluate(predictions)))

The Accuracy for test set is 0.8990267639902676


## Let us generate a Confusion Matrix to better see the results of the predictions. ConfusionMatrix() works only with RDDs, so we will have to convert our DataFrame of (prediction, label) into a RDD.

In [None]:
predictionAndLabel = predictions.select("prediction", "indexedLabel").rdd

# Instantiate metrics object 
metricsMulti = MulticlassMetrics(predictionAndLabel)
metricsBinary= BinaryClassificationMetrics(predictionAndLabel)
# Overall statistics 
confusionMatrix = metricsMulti.confusionMatrix()
precision = metricsMulti.precision(label=1) 
recall = metricsMulti.recall(label=1)  
print("Summary Stats")
print("Confusion Matrix = \n %s" % confusionMatrix)
print("Precision = %s" % precision) 
print("Recall = %s" % recall) 



Summary Stats
Confusion Matrix = 
 DenseMatrix([[7797.,  220.],
             [ 693.,  332.]])
Precision = 0.6014492753623188
Recall = 0.32390243902439025


## Hyperparameter tuning with 5-fold cross validation

In [None]:
paramGrid = (ParamGridBuilder()
             .addGrid(dt.maxDepth, [1, 2, 6, 10])
             .addGrid(dt.maxBins, [20, 40, 80])
             .build())

evaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction",labelCol="indexedLabel")

In [None]:
pipeline = Pipeline(stages=[featureIndexer, dt, labelConverter]) 
cv = CrossValidator(estimator=pipeline, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5, parallelism=10, seed=100)
cvModel = cv.fit(trainingData)

In [None]:
predictions = cvModel.transform(testData)

In [None]:
# Evaluate the best model
evaluator = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
print("The Accuracy for test set is {}".format(evaluator.evaluate(predictions)))

The Accuracy for test set is 0.893828798938288


In [None]:
predictionAndLabel = predictions.select("prediction", "indexedLabel").rdd

# Instantiate metrics object 

metricsMulti = MulticlassMetrics(predictionAndLabel)
metricsBinary= BinaryClassificationMetrics(predictionAndLabel)

# Overall statistics 

confusionMatrix = metricsMulti.confusionMatrix()
precision = metricsMulti.precision(label=1) 
recall = metricsMulti.recall(label=1) 

print("Summary Stats")
print("Confusion Matrix = \n %s" % confusionMatrix)
print("Precision = %s" % precision) 
print("Recall = %s" % recall) 
 

# Area under precision-recall curve

print("Area under PR = %s" % metricsBinary.areaUnderPR) 

# Area under ROC curve 

print("Area under ROC = %s" % metricsBinary.areaUnderROC)



Summary Stats
Confusion Matrix = 
 DenseMatrix([[7727.,  290.],
             [ 670.,  355.]])
Precision = 0.5503875968992248
Recall = 0.3463414634146341
Area under PR = 0.4075541466977763
Area under ROC = 0.6550841656601671


## Let us now fit a Random Forest Model to see how it performs

In [None]:
# Create initial Random Forest Classifier
rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="features")


In [None]:
# Train model with Training Data.
rfModel = rf.fit(trainingData)

In [None]:
# Make predictions on test data.
predictions = rfModel.transform(testData)

In [None]:
# Evaluate the model by computing the metrics. 
evaluator = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
print("The Accuracy for test set is {}".format(evaluator.evaluate(predictions)))

The Accuracy for test set is 0.8880778588807786


In [None]:
predictionAndLabel = predictions.select("prediction", "indexedLabel").rdd

# Instantiate metrics object

metricsMulti = MulticlassMetrics(predictionAndLabel)
metricsBinary= BinaryClassificationMetrics(predictionAndLabel)

# Overall statistics

confusionMatrix = metricsMulti.confusionMatrix()
precision = metricsMulti.precision(label=1) 
recall = metricsMulti.recall(label=1) 

print("Summary Stats")
print("Confusion Matrix = \n %s" % confusionMatrix)
print("Precision = %s" % precision) 
print("Recall = %s" % recall) 
 

# Area under precision-recall curve

print("Area under PR = %s" % metricsBinary.areaUnderPR) 

# Area under ROC curve 

print("Area under ROC = %s" % metricsBinary.areaUnderROC)



Summary Stats
Confusion Matrix = 
 DenseMatrix([[8.010e+03, 7.000e+00],
             [1.005e+03, 2.000e+01]])
Precision = 0.7407407407407407
Recall = 0.01951219512195122
Area under PR = 0.43317109736016624
Area under ROC = 0.5093195252770789


## Hyperparameter tuning using 5-fold cross validation

In [None]:
paramGrid = (ParamGridBuilder()
             .addGrid(rf.maxDepth, [2, 4, 6])
             .addGrid(rf.maxBins, [20, 60])
             .addGrid(rf.numTrees, [5, 20])
             .build())

evaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction",labelCol="indexedLabel")

In [None]:
pipeline = Pipeline(stages=[featureIndexer, rf, labelConverter]) 
cv = CrossValidator(estimator=pipeline, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5, parallelism=10, seed=100)
cvModel = cv.fit(trainingData)

In [None]:
predictions = cvModel.transform(testData)

In [None]:
# Evaluate the best model

evaluator = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
print("The Accuracy for test set is {}".format(evaluator.evaluate(predictions)))

The Accuracy for test set is 0.8880778588807786


In [None]:
predictionAndLabel = predictions.select("prediction", "indexedLabel").rdd

# Instantiate metrics object 

metricsMulti = MulticlassMetrics(predictionAndLabel)
metricsBinary= BinaryClassificationMetrics(predictionAndLabel)
# Overall statistics

confusionMatrix = metricsMulti.confusionMatrix()
precision = metricsMulti.precision(label=1) 
recall = metricsMulti.recall(label=1) 
 
print("Summary Stats")
print("Confusion Matrix = \n %s" % confusionMatrix)
print("Precision = %s" % precision) 
print("Recall = %s" % recall) 


# Area under precision-recall curve 

print("Area under PR = %s" % metricsBinary.areaUnderPR) 

# Area under ROC curve
 
print("Area under ROC = %s" % metricsBinary.areaUnderROC)



Summary Stats
Confusion Matrix = 
 DenseMatrix([[7993.,   24.],
             [ 988.,   37.]])
Precision = 0.6065573770491803
Recall = 0.03609756097560975
Area under PR = 0.3688602400225488
Area under ROC = 0.5165519612287305


## We have fitted logistic regression, decision tree and random forest algorithms to our dataset. Upon comparing we found that Logistic Regression performed the best among them.