### Bank Classification


#####   Loading Libraries


In [2]:
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.sql.functions import mean, col, split, col, regexp_extract, when, lit
from pyspark.sql.types import *
from pyspark.ml.feature import StringIndexer, IndexToString, VectorAssembler, VectorIndexer
from pyspark.ml.feature import QuantileDiscretizer, OneHotEncoderEstimator, OneHotEncoder, StringIndexer, VectorIndexer

from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.evaluation import BinaryClassificationEvaluator

from pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier, RandomForestClassifier

from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

from datetime import datetime
from dateutil import parser
from pyspark.mllib.evaluation import BinaryClassificationMetrics
from pyspark.ml.evaluation import BinaryClassificationEvaluator


In [3]:
spark = SparkSession.builder.appName("Spark ML applied on Bank Marketing dataset").getOrCreate()

#### get data


In [4]:
# Primary storage info
account_name = 'synapse11datalake' # primary ADLS account name https://synapse11datalake.dfs.core.windows.net
container_name = 'root' # Primary ADLS Gen2 file system from Synapse Home Page
relative_path = 'Raw' #  relative folder path
filename =  'bank.csv'
bank_data_path = 'abfss://%s@%s.dfs.core.windows.net/%s/%s' % (container_name, account_name, relative_path,filename)

bank_df = spark.read.csv(bank_data_path, header = 'True', inferSchema = 'True')

In [5]:
bank_df.printSchema()

In [6]:
bank_df.show(5)

In [7]:
groupBy_clients = bank_df.groupBy("deposit").count()

groupBy_clients.show()

In [8]:
display(groupBy_clients)

In [9]:
bank_df.describe([t[0] for t in bank_df.dtypes if t[1] == 'int']).show()

In [10]:
display(bank_df.groupBy("job").count())


##### Preprocess Data


In [11]:
def get_dummy(df, categoricalCols, continuousCols, labelCol):
  
  indexers = [StringIndexer(inputCol=c, outputCol="{0}_indexed".format(c)) for c in categoricalCols]

  encoders = [OneHotEncoder(inputCol=indexer.getOutputCol(),
                             outputCol="{0}_encoded".format(indexer.getOutputCol()))
              for indexer in indexers]

  assembler = VectorAssembler(inputCols=[encoder.getOutputCol() for encoder in encoders]
                              + continuousCols, outputCol="features")
  
  indexer = StringIndexer(inputCol=labelCol, outputCol='indexedLabel')

  pipeline = Pipeline(stages = indexers + encoders + [assembler] + [indexer])

  model=pipeline.fit(df)
  data = model.transform(df)
 
  data = data.withColumn('label', col(labelCol))
  
  return data.select('features', 'indexedLabel', 'label'), StringIndexer(inputCol='label').fit(data)

#### Transform data


In [12]:
categoricalColumns = ['job', 'marital', 'education', 'default', 'housing', 'loan', 'contact', 'poutcome']
numericCols = ['age', 'balance', 'duration', 'campaign', 'pdays', 'previous']
(bank_df, labelindexer) = get_dummy(bank_df, categoricalColumns, numericCols, 'deposit')
bank_df.show(5)

#### Identify categorical features and index them


In [13]:
featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(bank_df)

featureIndexer.transform(bank_df).show(5)

In [14]:

bank_df.show(5, False)

### Create Split and Training datasets


In [15]:
(trainingData, testData) = bank_df.randomSplit([0.8, 0.2], seed=10)
print("Training Dataset Count: " + str(trainingData.count()))
print("Test Dataset Count: " + str(testData.count()))

In [16]:

print("The first 5 samples of the Training Dataset:")
trainingData.show(5, False)
print("The first 5 samples of the Test Dataset:")
testData.show(5, False)

#### Evaluate Models



In [17]:

#lr = LogisticRegression(labelCol="indexedLabel", featuresCol="indexedFeatures") # using this line if you would using indexedFeatures instead features column
lr = LogisticRegression(labelCol="indexedLabel", featuresCol="features")

##### Pipeline


In [18]:
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel", labels=labelindexer.labels) 

pipeline = Pipeline(stages=[featureIndexer, lr, labelConverter])

lrModel = pipeline.fit(trainingData)

In [19]:
predictions = lrModel.transform(testData)

predictions.show(5)

In [20]:
predictions.select("features", "label", "probability", "predictedLabel").show(5)

In [21]:
cm = predictions.select("label", "predictedLabel")          
cm.groupby('label').agg({'label': 'count'}).show()  
cm.groupby('predictedLabel').agg({'predictedLabel': 'count'}).show()

In [22]:
evaluator = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
print("The Accuracy for test set is {}".format(evaluator.evaluate(predictions)))
 

#### Create a Confusion Matrix


In [23]:
from pyspark.mllib.evaluation import BinaryClassificationMetrics
from pyspark.mllib.evaluation import MulticlassMetrics

predictionAndLabel = predictions.select("prediction", "indexedLabel").rdd

# Instantiate metrics object 
metricsMulti = MulticlassMetrics(predictionAndLabel)
metricsBinary= BinaryClassificationMetrics(predictionAndLabel)
# Overall statistics 
confusionMatrix = metricsMulti.confusionMatrix()
precision = metricsMulti.precision(label=1) 
recall = metricsMulti.recall(label=1) 
f1Score = metricsMulti.fMeasure() 
print("Summary Stats")
print("Confusion Matrix = \n %s" % confusionMatrix)
print("Precision = %s" % precision) 
print("Recall = %s" % recall) 
print("F1 Score = %s" % f1Score) 

# Area under precision-recall curve 
print("Area under PR = %s" % metricsBinary.areaUnderPR) 
# Area under ROC curve 
print("Area under ROC = %s" % metricsBinary.areaUnderROC)


#### Model tuning



In [24]:

print(lr.explainParams())

#### Hyperparameter Tuning



In [25]:
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.01, 0.5, 2.0])
             .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])
             .addGrid(lr.maxIter, [1, 5, 10])
             .build())

evaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction",labelCol="indexedLabel")


#### Create and run 5-fold CrossValidator


In [26]:
#cv = CrossValidator(estimator=lr, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)
#pipeline = Pipeline(stages=[featureIndexer, cv, labelConverter])
#cvModel = pipeline.fit(trainingData)

pipeline = Pipeline(stages=[featureIndexer, lr, labelConverter]) 
cv = CrossValidator(estimator=pipeline, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5, parallelism=10, seed=100)
cvModel = cv.fit(trainingData)

##### Use the new data for testing


In [27]:
predictions = cvModel.transform(testData)

predictions.select("features", "label", "probability", "predictedLabel").show(5)


#### Evaluate the best model


In [28]:
evaluator = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
print("The Accuracy for test set is {}".format(evaluator.evaluate(predictions)))

In [29]:
predictionAndLabel = predictions.select("prediction", "indexedLabel").rdd

# Instantiate metrics object 
metricsMulti = MulticlassMetrics(predictionAndLabel)
metricsBinary= BinaryClassificationMetrics(predictionAndLabel)
# Overall statistics 
confusionMatrix = metricsMulti.confusionMatrix()
precision = metricsMulti.precision(label=1) 
recall = metricsMulti.recall(label=1) 
f1Score = metricsMulti.fMeasure() 
print("Summary Stats")
print("Confusion Matrix = \n %s" % confusionMatrix)
print("Precision = %s" % precision) 
print("Recall = %s" % recall) 
print("F1 Score = %s" % f1Score) 

# Area under precision-recall curve 
print("Area under PR = %s" % metricsBinary.areaUnderPR) 
# Area under ROC curve 
print("Area under ROC = %s" % metricsBinary.areaUnderROC)

In [30]:

evaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction",labelCol="indexedLabel")
print("The area under ROC for test set is {}".format(evaluator.evaluate(predictions)))

##### Evaluate Decision Tree Algorithms


In [31]:
# Create initial Decision Tree Model
dt = DecisionTreeClassifier(labelCol="indexedLabel", featuresCol="features")

# Train model with Training Data.
dtModel = dt.fit(trainingData)

# Make predictions on test data.
predictions = dtModel.transform(testData)

# Evaluate the model by computing the metrics. 
evaluator = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
print("The Accuracy for test set is {}".format(evaluator.evaluate(predictions)))

print("===============================================")

predictionAndLabel = predictions.select("prediction", "indexedLabel").rdd

# Instantiate metrics object 
metricsMulti = MulticlassMetrics(predictionAndLabel)
metricsBinary= BinaryClassificationMetrics(predictionAndLabel)
# Overall statistics 
confusionMatrix = metricsMulti.confusionMatrix()
precision = metricsMulti.precision(label=1) 
recall = metricsMulti.recall(label=1) 
f1Score = metricsMulti.fMeasure() 
print("Summary Stats")
print("Confusion Matrix = \n %s" % confusionMatrix)
print("Precision = %s" % precision) 
print("Recall = %s" % recall) 
print("F1 Score = %s" % f1Score) 

# Area under precision-recall curve 
print("Area under PR = %s" % metricsBinary.areaUnderPR) 
# Area under ROC curve 
print("Area under ROC = %s" % metricsBinary.areaUnderROC)

print("===============================================")

evaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction",labelCol="indexedLabel")
print("The area under ROC for test set is {}".format(evaluator.evaluate(predictions)))


##### Hyper parameter tuning


In [32]:
paramGrid = (ParamGridBuilder()
             .addGrid(dt.maxDepth, [1, 2, 6, 10])
             .addGrid(dt.maxBins, [20, 40, 80])
             .build())

evaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction",labelCol="indexedLabel")

pipeline = Pipeline(stages=[featureIndexer, dt, labelConverter]) 
cv = CrossValidator(estimator=pipeline, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5, parallelism=10, seed=100)
cvModel = cv.fit(trainingData)

predictions = cvModel.transform(testData)

# Evaluate the best model
evaluator = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
print("The Accuracy for test set is {}".format(evaluator.evaluate(predictions)))

print("===============================================")

predictionAndLabel = predictions.select("prediction", "indexedLabel").rdd

# Instantiate metrics object 
metricsMulti = MulticlassMetrics(predictionAndLabel)
metricsBinary= BinaryClassificationMetrics(predictionAndLabel)
# Overall statistics 
confusionMatrix = metricsMulti.confusionMatrix()
precision = metricsMulti.precision(label=1) 
recall = metricsMulti.recall(label=1) 
f1Score = metricsMulti.fMeasure() 
print("Summary Stats")
print("Confusion Matrix = \n %s" % confusionMatrix)
print("Precision = %s" % precision) 
print("Recall = %s" % recall) 
print("F1 Score = %s" % f1Score) 

# Area under precision-recall curve 
print("Area under PR = %s" % metricsBinary.areaUnderPR) 
# Area under ROC curve 
print("Area under ROC = %s" % metricsBinary.areaUnderROC)

print("===============================================")

evaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction",labelCol="indexedLabel")
print("The area under ROC for test set is {}".format(evaluator.evaluate(predictions)))


#### Random Forest


In [33]:
# Create initial Random Forest Classifier
rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="features")

# Train model with Training Data.
rfModel = rf.fit(trainingData)

# Make predictions on test data.
predictions = rfModel.transform(testData)

# Evaluate the model by computing the metrics. 
evaluator = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
print("The Accuracy for test set is {}".format(evaluator.evaluate(predictions)))

print("===============================================")

predictionAndLabel = predictions.select("prediction", "indexedLabel").rdd

# Instantiate metrics object 
metricsMulti = MulticlassMetrics(predictionAndLabel)
metricsBinary= BinaryClassificationMetrics(predictionAndLabel)
# Overall statistics 
confusionMatrix = metricsMulti.confusionMatrix()
precision = metricsMulti.precision(label=1) 
recall = metricsMulti.recall(label=1) 
f1Score = metricsMulti.fMeasure() 
print("Summary Stats")
print("Confusion Matrix = \n %s" % confusionMatrix)
print("Precision = %s" % precision) 
print("Recall = %s" % recall) 
print("F1 Score = %s" % f1Score) 

# Area under precision-recall curve 
print("Area under PR = %s" % metricsBinary.areaUnderPR) 
# Area under ROC curve 
print("Area under ROC = %s" % metricsBinary.areaUnderROC)

print("===============================================")

evaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction",labelCol="indexedLabel")
print("The area under ROC for test set is {}".format(evaluator.evaluate(predictions)))

#### Hyperparameter Tuning


In [34]:
paramGrid = (ParamGridBuilder()
             .addGrid(rf.maxDepth, [2, 4, 6])
             .addGrid(rf.maxBins, [20, 60])
             .addGrid(rf.numTrees, [5, 20])
             .build())

evaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction",labelCol="indexedLabel")

pipeline = Pipeline(stages=[featureIndexer, rf, labelConverter]) 
cv = CrossValidator(estimator=pipeline, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5, parallelism=10, seed=100)
cvModel = cv.fit(trainingData)

predictions = cvModel.transform(testData)

# Evaluate the best model
evaluator = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
print("The Accuracy for test set is {}".format(evaluator.evaluate(predictions)))

print("===============================================")

predictionAndLabel = predictions.select("prediction", "indexedLabel").rdd

# Instantiate metrics object 
metricsMulti = MulticlassMetrics(predictionAndLabel)
metricsBinary= BinaryClassificationMetrics(predictionAndLabel)
# Overall statistics 
confusionMatrix = metricsMulti.confusionMatrix()
precision = metricsMulti.precision(label=1) 
recall = metricsMulti.recall(label=1) 
f1Score = metricsMulti.fMeasure() 
print("Summary Stats")
print("Confusion Matrix = \n %s" % confusionMatrix)
print("Precision = %s" % precision) 
print("Recall = %s" % recall) 
print("F1 Score = %s" % f1Score) 

# Area under precision-recall curve 
print("Area under PR = %s" % metricsBinary.areaUnderPR) 
# Area under ROC curve 
print("Area under ROC = %s" % metricsBinary.areaUnderROC)

print("===============================================")

evaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction",labelCol="indexedLabel")
print("The area under ROC for test set is {}".format(evaluator.evaluate(predictions)))