### Bayesian Optimization Search in Spark 
This notebook demonstrates the implementation of Bayesian optimization in Pyspark, Spark's Machine Learning library. Because Spark does not offer an inbuilt Bayesian optimization module like GridSearch, this implementation is done in combination with a third party library called HyperOpt. 

It works by:
1. Defining the domain
2. Setting up the optimization algorithm
3. Objective function to minimize

In [1]:
import findspark
findspark.init()

print('Success!')

Success!


In [2]:
import pyspark
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import *
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.sql import Row
from pyspark.sql.functions import *
from pyspark.sql.types import *
from sklearn.metrics import confusion_matrix, roc_auc_score, accuracy_score
from hyperopt import hp, tpe, fmin, SparkTrials, STATUS_OK

In [3]:
spark = SparkSession.builder.appName('search').getOrCreate()
df = spark.read.csv('credit-card-full.csv', header=True, inferSchema = True)
df.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- LIMIT_BAL: integer (nullable = true)
 |-- SEX: integer (nullable = true)
 |-- EDUCATION: integer (nullable = true)
 |-- MARRIAGE: integer (nullable = true)
 |-- AGE: integer (nullable = true)
 |-- PAY_0: integer (nullable = true)
 |-- PAY_2: integer (nullable = true)
 |-- PAY_3: integer (nullable = true)
 |-- PAY_4: integer (nullable = true)
 |-- PAY_5: integer (nullable = true)
 |-- PAY_6: integer (nullable = true)
 |-- BILL_AMT1: integer (nullable = true)
 |-- BILL_AMT2: integer (nullable = true)
 |-- BILL_AMT3: integer (nullable = true)
 |-- BILL_AMT4: integer (nullable = true)
 |-- BILL_AMT5: integer (nullable = true)
 |-- BILL_AMT6: integer (nullable = true)
 |-- PAY_AMT1: integer (nullable = true)
 |-- PAY_AMT2: integer (nullable = true)
 |-- PAY_AMT3: integer (nullable = true)
 |-- PAY_AMT4: integer (nullable = true)
 |-- PAY_AMT5: integer (nullable = true)
 |-- PAY_AMT6: integer (nullable = true)
 |-- default payment next month: inte

In [4]:
pd.DataFrame(df.take(2))

Unnamed: 0,0,1,2,3,4,5,6,7,8,9,...,15,16,17,18,19,20,21,22,23,24
0,1,20000,2,2,1,24,2,2,-1,-1,...,0,0,0,0,689,0,0,0,0,1
1,2,120000,2,2,2,26,-1,2,0,0,...,3272,3455,3261,0,1000,1000,1000,0,2000,1


In [5]:
pd.DataFrame(df.take(2), columns=df.columns).transpose()

Unnamed: 0,0,1
ID,1,2
LIMIT_BAL,20000,120000
SEX,2,2
EDUCATION,2,2
MARRIAGE,1,2
AGE,24,26
PAY_0,2,-1
PAY_2,2,2
PAY_3,-1,0
PAY_4,-1,0


In [6]:
#Check if classes are perfectly balanced
df.groupby('default payment next month').count().toPandas()

Unnamed: 0,default payment next month,count
0,1,6636
1,0,23364


In [7]:
df_renamed = df.withColumnRenamed("default payment next month", "label")

In [8]:
print(df_renamed)

DataFrame[ID: int, LIMIT_BAL: int, SEX: int, EDUCATION: int, MARRIAGE: int, AGE: int, PAY_0: int, PAY_2: int, PAY_3: int, PAY_4: int, PAY_5: int, PAY_6: int, BILL_AMT1: int, BILL_AMT2: int, BILL_AMT3: int, BILL_AMT4: int, BILL_AMT5: int, BILL_AMT6: int, PAY_AMT1: int, PAY_AMT2: int, PAY_AMT3: int, PAY_AMT4: int, PAY_AMT5: int, PAY_AMT6: int, label: int]


In [9]:
df = df_renamed.drop('ID')

In [11]:
# show summary statistics
numeric_features = [t[0] for t in df.dtypes if t[1] == 'int']
df.select(numeric_features).describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
LIMIT_BAL,30000,167484.32266666667,129747.66156720246,10000,1000000
SEX,30000,1.6037333333333332,0.4891291960902602,1,2
EDUCATION,30000,1.8531333333333333,0.7903486597207269,0,6
MARRIAGE,30000,1.5518666666666667,0.5219696006132467,0,3
AGE,30000,35.4855,9.217904068090155,21,79
PAY_0,30000,-0.0167,1.1238015279973335,-2,8
PAY_2,30000,-0.13376666666666667,1.1971859730345495,-2,8
PAY_3,30000,-0.1662,1.1968675684465686,-2,8
PAY_4,30000,-0.22066666666666668,1.1691386224023357,-2,8


In [12]:
numeric_features_df=df.select(numeric_features)
numeric_features_df.toPandas().head()

Unnamed: 0,LIMIT_BAL,SEX,EDUCATION,MARRIAGE,AGE,PAY_0,PAY_2,PAY_3,PAY_4,PAY_5,...,BILL_AMT4,BILL_AMT5,BILL_AMT6,PAY_AMT1,PAY_AMT2,PAY_AMT3,PAY_AMT4,PAY_AMT5,PAY_AMT6,label
0,20000,2,2,1,24,2,2,-1,-1,-2,...,0,0,0,0,689,0,0,0,0,1
1,120000,2,2,2,26,-1,2,0,0,0,...,3272,3455,3261,0,1000,1000,1000,0,2000,1
2,90000,2,2,2,34,0,0,0,0,0,...,14331,14948,15549,1518,1500,1000,1000,1000,5000,0
3,50000,2,2,1,37,0,0,0,0,0,...,28314,28959,29547,2000,2019,1200,1100,1069,1000,0
4,50000,1,2,1,57,-1,0,-1,0,0,...,20940,19146,19131,2000,36681,10000,9000,689,679,0


In [13]:
column_list = df.columns
print(column_list)

['LIMIT_BAL', 'SEX', 'EDUCATION', 'MARRIAGE', 'AGE', 'PAY_0', 'PAY_2', 'PAY_3', 'PAY_4', 'PAY_5', 'PAY_6', 'BILL_AMT1', 'BILL_AMT2', 'BILL_AMT3', 'BILL_AMT4', 'BILL_AMT5', 'BILL_AMT6', 'PAY_AMT1', 'PAY_AMT2', 'PAY_AMT3', 'PAY_AMT4', 'PAY_AMT5', 'PAY_AMT6', 'label']


In [14]:
# Prepare data for ML Model
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
inputColums = ['LIMIT_BAL', 'SEX', 'EDUCATION', 'MARRIAGE', 'AGE', 'PAY_0', 'PAY_2', 'PAY_3', 'PAY_4', 'PAY_5', 'PAY_6', 'BILL_AMT1', 'BILL_AMT2', 'BILL_AMT3', 'BILL_AMT4', 'BILL_AMT5', 'BILL_AMT6', 'PAY_AMT1', 'PAY_AMT2', 'PAY_AMT3', 'PAY_AMT4', 'PAY_AMT5', 'PAY_AMT6']
stages = []

assembler = VectorAssembler(inputCols=inputColums, outputCol="vectorized_features")
stages += [assembler]

scaler = StandardScaler(inputCol="vectorized_features", outputCol="features")
stages += [scaler]

## Pipeline
The pipeline is use to chain multiple Transformers and Estimators together to specify our machine learning workflow. A Pipeline’s stages are specified as an ordered array.

In [15]:
from pyspark.ml import Pipeline
cols = df.columns
pipeline = Pipeline(stages = stages)
pipelineModel = pipeline.fit(df)
df = pipelineModel.transform(df)
selectedCols = ['vectorized_features'] + cols
df = df.select(selectedCols)
df.printSchema()

root
 |-- vectorized_features: vector (nullable = true)
 |-- LIMIT_BAL: integer (nullable = true)
 |-- SEX: integer (nullable = true)
 |-- EDUCATION: integer (nullable = true)
 |-- MARRIAGE: integer (nullable = true)
 |-- AGE: integer (nullable = true)
 |-- PAY_0: integer (nullable = true)
 |-- PAY_2: integer (nullable = true)
 |-- PAY_3: integer (nullable = true)
 |-- PAY_4: integer (nullable = true)
 |-- PAY_5: integer (nullable = true)
 |-- PAY_6: integer (nullable = true)
 |-- BILL_AMT1: integer (nullable = true)
 |-- BILL_AMT2: integer (nullable = true)
 |-- BILL_AMT3: integer (nullable = true)
 |-- BILL_AMT4: integer (nullable = true)
 |-- BILL_AMT5: integer (nullable = true)
 |-- BILL_AMT6: integer (nullable = true)
 |-- PAY_AMT1: integer (nullable = true)
 |-- PAY_AMT2: integer (nullable = true)
 |-- PAY_AMT3: integer (nullable = true)
 |-- PAY_AMT4: integer (nullable = true)
 |-- PAY_AMT5: integer (nullable = true)
 |-- PAY_AMT6: integer (nullable = true)
 |-- label: integer (

In [16]:
#Randomly split data into train and test sets, and set seed for reproducibility
train, test = df.randomSplit([0.7, 0.3], seed=2018)
print("Training Dataset Count:" + str(train.count()))
print("Test Dataset Count: " + str(test.count()))

Training Dataset Count:21035
Test Dataset Count: 8965


## Model Definition

In [17]:
from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(featuresCol = 'vectorized_features', labelCol = 'label')
rfModel = rf.fit(train)
predictions = rfModel.transform(test)
predictions.select('LIMIT_BAL', 'label', 'rawPrediction', 'prediction', 'probability').show(10)

+---------+-----+--------------------+----------+--------------------+
|LIMIT_BAL|label|       rawPrediction|prediction|         probability|
+---------+-----+--------------------+----------+--------------------+
|    10000|    0|[15.5275636301864...|       0.0|[0.77637818150932...|
|    20000|    0|[15.5275636301864...|       0.0|[0.77637818150932...|
|    20000|    0|[15.8040613134055...|       0.0|[0.79020306567027...|
|    20000|    1|[15.5275636301864...|       0.0|[0.77637818150932...|
|    20000|    0|[15.5275636301864...|       0.0|[0.77637818150932...|
|    20000|    1|[15.6061128017402...|       0.0|[0.78030564008701...|
|    20000|    1|[15.5275636301864...|       0.0|[0.77637818150932...|
|    30000|    1|[15.3296151185211...|       0.0|[0.76648075592605...|
|    30000|    0|[15.5275636301864...|       0.0|[0.77637818150932...|
|    30000|    0|[15.5275636301864...|       0.0|[0.77637818150932...|
+---------+-----+--------------------+----------+--------------------+
only s

In [18]:
class_names = [1, 0]
y_true = predictions.select("label")
y_true = y_true.toPandas()

y_pred = predictions.select("prediction")
y_pred = y_pred.toPandas()

cnf_matrix = confusion_matrix(y_true, y_pred,labels=class_names)
print(cnf_matrix)

[[ 571 1339]
 [ 291 6764]]


In [19]:
# Check accuracy
evaluator = BinaryClassificationEvaluator()
print("Test Area Under ROC: " + str(evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})))
evaluator.getMetricName()

Test Area Under ROC: 0.7626305653782361


'areaUnderROC'

In [20]:
accuracy = predictions.filter(predictions.label == predictions.prediction).count() / float(predictions.count())
print("Accuracy : ",accuracy)

Accuracy :  0.8181818181818182


## Model Finetuning 


In [67]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from datetime import datetime, timedelta

def train_reg(maxDepth, numTrees):
    randomforest = RandomForestClassifier(featuresCol = 'vectorized_features', 
                                          labelCol = 'label',
                                          maxDepth=maxDepth,
                                          numTrees=numTrees
                                         )
    model = randomforest.fit(train)
    
    pred = model.transform(test)
    
    evaluator_auc = BinaryClassificationEvaluator(labelCol="label", metricName="areaUnderROC")
    val_auc = evaluator_auc.evaluate(pred)
    
    evaluator_loss = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="label", metricName="accuracy")
    val_acc = evaluator_loss.evaluate(pred)
    
    return model, val_auc, val_acc    

In [68]:
def train_with_hyperopt(params):
    maxDepth = int(params['maxDepth'])
    numTrees = int(params['numTrees'])
    
    model, val_auc, val_acc = train_reg(maxDepth, numTrees)
    loss = 1 - val_acc
    return {'loss': loss, 'status': STATUS_OK}    

In [71]:
space = {
    'maxDepth': hp.choice('max_depth', [2, 4, 8, 15]),
    'numTrees': hp.choice('n_estimators', [10, 20, 50])
}

In [76]:
from datetime import datetime, timedelta
start_time = datetime.now()
print("%-20s %s" % ("Start Time", start_time))

algo = tpe.suggest
best_params = fmin(fn=train_with_hyperopt, space=space, algo=algo, max_evals=20)
2
end_time = datetime.now()
print("%-20s %s" % ("End Time", end_time))
print(str(timedelta(seconds=(end_time-start_time).seconds)))

Start Time           2023-01-17 19:15:37.196201
100%|███████████████████████████████████████████████| 20/20 [02:07<00:00,  6.37s/trial, best loss: 0.17902955939765752]
End Time             2023-01-17 19:17:44.692304
0:02:07


In [78]:
## Evaluate Best Model
print('Best Model Test Area Under ROC', best_params)

Best Model Test Area Under ROC {'max_depth': 2, 'n_estimators': 2}


In [81]:
maxDepth = int(best_params['max_depth'])
numTrees = int(best_params['n_estimators'])
model, val_auc, val_acc = train_reg(maxDepth, numTrees)  # We can train this on full dataset
print('Validation AUC:', val_auc)
print('Validation Accuracy:', val_acc)

Validation AUC: 0.6855070667641308
Validation Accuracy: 0.8155047406581148


In [80]:
#print best model parameters
print('Best Model - Maximum Depth', best_params['max_depth'])
print('Best Model - Estimators:', best_params['n_estimators'])

Best Model - Maximum Depth 2
Best Model - Estimators: 2
