In [0]:
import mlflow
 
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import StandardScaler, VectorAssembler, StringIndexer, OneHotEncoder

from hyperopt import fmin, tpe, hp, Trials, STATUS_OK

Loading the data  
link-
https://www.kaggle.com/santoshd3/bank-customers?select=Churn+Modeling.csv

In [0]:
cust_attrition_data = spark.read.csv('/FileStore/datasets/Bank_customer.csv', header = 'true', inferSchema = 'true')

display(cust_attrition_data)

CustomerId,Surname,CreditScore,Geography,Gender,Age,Tenure,Balance,NumOfProducts,HasCrCard,IsActiveMember,EstimatedSalary,Exited
15634602,Hargrave,619,France,Female,42,2,0.0,1,Yes,Yes,101348.88,Yes
15647311,Hill,608,Spain,Female,41,1,83807.86,1,No,Yes,112542.58,No
15619304,Onio,502,France,Female,42,8,159660.8,3,Yes,No,113931.57,Yes
15701354,Boni,699,France,Female,39,1,0.0,2,No,No,93826.63,No
15737888,Mitchell,850,Spain,Female,43,2,125510.82,1,Yes,Yes,79084.1,No
15574012,Chu,645,Spain,Male,44,8,113755.78,2,Yes,No,149756.71,Yes
15592531,Bartlett,822,France,Others,50,7,0.0,2,Yes,Yes,10062.8,No
15656148,Obinna,376,Germany,Female,29,4,115046.74,4,Yes,No,119346.88,Yes
15792365,He,501,France,Male,44,4,142051.07,2,No,Yes,74940.5,No
15592389,H?,684,France,Male,27,2,134603.88,1,Yes,Yes,71725.73,No


Dropping irrelevant columns

In [0]:
cust_attrition_data = cust_attrition_data.drop(*['CustomerId', 'Surname'])

feature_cols = cust_attrition_data.drop('Exited').columns

Randomly splitting the whole data into training,validation,test sets

In [0]:
training_data, test_data = cust_attrition_data.randomSplit([0.7, 0.3], seed = 42)

We are using  MLflow to track training.
Specifying  "nested=True" since this single model will be logged as a child run of Hyperopt's run.

This train_tree() function:
   - takes hyperparameters as inputs (for tuning later)
   - returns the Area Under PR score on the test dataset.We are using Area under PR score instead of AUC-ROC as the dataset is imbalanced
   
  Wrapping code as a function makes it easier to reuse the code later with Hyperopt.
  Chaining  stringIndexer, encoder , va1 , standardcaler ,va2, labelindexer, rf indexertogether into a single ML Pipeline. 
  Also Defining an evaluation metric(F1) and evaluate the model on the validation dataset.

In [0]:
def train_tree(maxDepth, numTrees , impurity):
    
    with mlflow.start_run(nested = True):

        categoricalCols = ['Geography', 'Gender', 'HasCrCard', 'IsActiveMember']
        numericCols = ['Age', 'Tenure', 'Balance', 'NumOfProducts', 'EstimatedSalary']
 
        stringIndexer = StringIndexer(inputCols = categoricalCols, 
                                      outputCols = [x + 'Index' for x in categoricalCols]) 
        encoder = OneHotEncoder(inputCols = stringIndexer.getOutputCols(), 
                                outputCols = [x + 'OHE' for x in categoricalCols])
        
        featureArr = [x + 'OHE' for x in categoricalCols] + [('scaled_' + f) for f in numericCols]

        va1 = [VectorAssembler(inputCols = [f], 
                               outputCol = ('vec_' + f)) for f in numericCols]
        ss = [StandardScaler(inputCol = 'vec_' + f, outputCol = 'scaled_' + f, 
                             withMean = True, withStd = True) for f in numericCols]

        va2 = VectorAssembler(inputCols = featureArr, outputCol = 'features')
        
        indexer = StringIndexer(inputCol = 'Exited', outputCol = 'indexedLabel')

        rf = RandomForestClassifier(labelCol = 'indexedLabel', maxDepth = maxDepth,
                                    numTrees = numTrees, impurity = impurity)
    
   
        pipeline = Pipeline(stages = [stringIndexer, encoder] + va1 + ss + [va2, indexer, rf] )
        model = pipeline.fit(training_data)
 
        evaluator = BinaryClassificationEvaluator(labelCol = 'indexedLabel',
                                                  metricName = 'areaUnderPR')
 
        predictions = model.transform(test_data)
        test_metric = evaluator.evaluate(predictions)
        
        mlflow.log_param('Max_depth', maxDepth)
        mlflow.log_param('Num_trees', numTrees)
        mlflow.log_param('Impurity', impurity)
        mlflow.log_metric('areaUnderPR', test_metric)
        
        mlflow.spark.log_model(spark_model = model, artifact_path = 'sklearn-model')
 
        return model, test_metric

In [0]:
initial_model, test_metric = train_tree(maxDepth = 2, numTrees = 10 , impurity = 'gini')

print(f'The trained RandomForestClassifier achieved an AreaUnderPR score of {test_metric} on the test data')

The trained RandomForestClassifier achieved an AreaUnderPR score of 0.5994082364955033 on the test data


Hyperopt expects you to return a loss (for which lower is better), so take the negative of the AreaUnderPR (for which higher is better).

In [0]:
def train_with_hyperopt(params):
    
    numTrees = params['numTrees']
    maxDepth = params['maxDepth']
    impurity = params['impurity']
 
    model, areaunderpr = train_tree(maxDepth, numTrees ,impurity)

    loss = - areaunderpr
    
    return {'loss': loss, 'status': STATUS_OK}

Search space is defined.Note that here we are using hp.quniform which results in continuous values.As we are casting both numtrees and maxdepth to integer above,it won't throw errors.Note that the search space is reduced in comparison to demo-05's search space as it was taking too long with demo-05's search space

In [0]:
from hyperopt.pyll import scope

search_space = {'numTrees':scope.int(hp.quniform('n_trees', 100, 200, 10)),
                'maxDepth':scope.int(hp.quniform('maxDepth', 2, 8, 1 )),
                'impurity':hp.choice('impurity', ['gini', 'entropy'])}   

In [0]:
algo = tpe.suggest
 
with mlflow.start_run():
    best_params = fmin(
        fn = train_with_hyperopt,
        space = search_space,
        algo = algo,
        max_evals = 16)

















100%|██████████| 16/16 [14:41<00:00, 55.27s/trial, best loss: -0.6983013285434326]100%|██████████| 16/16 [14:41<00:00, 55.06s/trial, best loss: -0.6983013285434326]


#### TODO Recording:
- Once training is complete, click on "experiment"
- Expand and show all the run under the top-level experiment
- Sort by AreaUnderPR in descending order
- Click on the best run
- Show "Parameters" and "Metrics"

Best parameters are obtained

In [0]:
best_params

Out[25]: {'impurity': 1, 'maxDepth': 8.0, 'n_trees': 150.0}