# Loading in Required Libraries

In [0]:
# Standard libraries/functions
import pandas as pd
import numpy as np

# Pyspark libraries/functions
import pyspark.sql.functions as F
from pyspark.sql.functions import col, sum, isnan, when, count, year, month, dayofmonth, date_format, concat_ws, acos, cos, radians, sin, udf, concat
from pyspark.sql.types import IntegerType, DateType, DoubleType, StringType, FloatType
from pyspark.sql.window import Window

# from pyspark.ml.linalg import Vectors#, VectorUDT
# from pyspark.ml.linalg import VectorType
from pyspark.mllib.linalg import Vectors, VectorUDT
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StandardScaler, StringIndexer, OneHotEncoder, Imputer
from pyspark.ml.functions import vector_to_array
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.stat import Correlation
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import PCA
from pyspark.mllib.evaluation import BinaryClassificationMetrics, MulticlassMetrics

# Sklearn libraries/functions
from sklearn.utils import shuffle
# from sklearn.linear_model import LogisticRegression
# from sklearn.model_selection import TimeSeriesSplit, GridSearchCV
from pyspark.ml.classification import LogisticRegression, GBTClassifier, RandomForestClassifier, LinearSVC
import time

import mlflow
import mlflow.spark
# from pyspark.mllib.linalg import Vectors, VectorUDT
from xgboost.spark import SparkXGBClassifier


# Setting Up Storage

In [0]:
## Place this cell in any team notebook that needs access to the team cloud storage.

# The following blob storage is accessible to team members only (read and write)
# access key is valid til TTL
# after that you will need to create a new SAS key and authenticate access again via DataBrick command line
blob_container  = "smsj-261"       # The name of your container created in https://portal.azure.com
storage_account = "smsj"  # The name of your Storage account created in https://portal.azure.com
secret_scope    = "smsjscope"           # The name of the scope created in your local computer using the Databricks CLI
secret_key      = "smsjkey"             # The name of the secret key created in your local computer using the Databricks CLI
team_blob_url        = f"wasbs://{blob_container}@{storage_account}.blob.core.windows.net"  #points to the root of your team storage bucket

# the 261 course blob storage is mounted here on the DataBricks workspace.
mids261_mount_path      = "/mnt/mids-w261"

# SAS Token: Grant the team limited access to Azure Storage resources
spark.conf.set(
  f"fs.azure.sas.{blob_container}.{storage_account}.blob.core.windows.net",
  dbutils.secrets.get(scope = secret_scope, key = secret_key)
)
import pandas as pd
pdf = pd.DataFrame([[1, 2, 3, "Jane"], [2, 2,2, None], [12, 12,12, "John"]], columns=["x", "y", "z", "a_string"])
df = spark.createDataFrame(pdf) # Create a Spark dataframe from a pandas DF

# The following can write the dataframe to the team's Cloud Storage  
# Navigate back to your Storage account in https://portal.azure.com, to inspect the partitions/files.
# df.write.parquet(f"{team_blob_url}/test")

# see what's in the blob storage root folder 
display(dbutils.fs.ls(f"{team_blob_url}"))

path,name,size,modificationTime
wasbs://smsj-261@smsj.blob.core.windows.net/test/,test/,0,1689534418000


# Loading the Data

In [0]:
# import spark
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("ReadingParquet").getOrCreate()

df_test_3m = spark.read.parquet("dbfs:/user/hive/warehouse/test_60m")
df_val = spark.sql("SELECT * FROM train_60m where YEAR(FL_DATE) = 2018")
df_train_3m = spark.read.parquet("dbfs:/user/hive/warehouse/train_60m")

df_test_3m.createOrReplaceTempView("test_60m")
df_train_3m.createOrReplaceTempView("df_train_60m")
# Use the DataFrame in your ML Flow project

In [0]:
# df_train_3m.count()
# df_val.count()

9192041

# Modelling Functions

In [0]:
def extract(row):
    return tuple(row.probability.toArray().tolist()) +  (row.label,) + (row.prediction,)

In [0]:
def RegressionEvaluator(preds):

    rdd_preds_m = preds.select(['prediction', 'label']).rdd

    preds = preds.select("probability", "label", "prediction")
    preds = preds.rdd.map(extract).toDF(["p0", "p1", "label", "prediction"])

    # Create an binary evaluator
    evaluator_auc = BinaryClassificationEvaluator(labelCol='label')
    evaluator_auc.setRawPredictionCol('p1')

    # Compute the areaUnderROC on the test data
    areaUnderROC = evaluator_auc.evaluate(preds, {evaluator_auc.metricName: "areaUnderROC"})
    areaUnderPR = evaluator_auc.evaluate(preds, {evaluator_auc.metricName: "areaUnderPR"})

    multi_evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction")
    multi_evaluator2 = MulticlassMetrics(rdd_preds_m)
    # Compute various evaluation metrics
    accuracy = multi_evaluator.evaluate(preds, {multi_evaluator.metricName: "accuracy"})
    precision = multi_evaluator.evaluate(preds, {multi_evaluator.metricName: "precisionByLabel"})
    recall = multi_evaluator.evaluate(preds, {multi_evaluator.metricName: "recallByLabel"})
    f1 = multi_evaluator.evaluate(preds, {multi_evaluator.metricName: "f1"})

    f2 = np.round(multi_evaluator2.fMeasure(label=1.0, beta=2.0), 5)
    # pr = binary_evaluator.areaUnderPR

    return accuracy, precision, recall, f1, f2 ,areaUnderROC,areaUnderPR



# Model Pipeline (Modelling & Hyperparameter Tuning)

# ML Flow Model

In [0]:
xgb = SparkXGBClassifier(features_col="allFeatures")
df_train = df_train_3m
df_test = df_test_3m

pipeline = Pipeline(stages=[xgb])

from hyperopt import hp
search_space = {
    "num_workers": hp.quniform("num_workers", 3,5,1),
    "n_estimators": hp.quniform("n_estimators", 90,120.0,10.0),
    "max_bin": hp.quniform("max_bin", 25.0,35.0,3.0),
    "max_depth": hp.quniform("max_depth", 6,16,2),
    "learning_rate": hp.quniform("learning_rate", 0.2,0.6,0.1),
    "max_leaves": hp.quniform("max_leaves", 6,16,1),
    "gamma": hp.quniform("gamma", 0,20,2),
    "scale_pos_weight": hp.quniform("scale_pos_weight", 0.8,2,0.2),
    "colsample_bytree": hp.quniform("colsample_bytree", 0.75,1,0.05),
}
search_space

{'num_workers': <hyperopt.pyll.base.Apply at 0x7f1840b49990>,
 'n_estimators': <hyperopt.pyll.base.Apply at 0x7f1840bbf550>,
 'max_bin': <hyperopt.pyll.base.Apply at 0x7f1840bbdf30>,
 'max_depth': <hyperopt.pyll.base.Apply at 0x7f1840bc9450>,
 'learning_rate': <hyperopt.pyll.base.Apply at 0x7f1840bc9300>,
 'max_leaves': <hyperopt.pyll.base.Apply at 0x7f1840bc91b0>,
 'gamma': <hyperopt.pyll.base.Apply at 0x7f1840bca6b0>,
 'scale_pos_weight': <hyperopt.pyll.base.Apply at 0x7f1840bca350>,
 'colsample_bytree': <hyperopt.pyll.base.Apply at 0x7f1840bca200>}

In [0]:
def objective_function(params):
    # CHANGES HERE
    num_workers = params["num_workers"]
    n_estimators = params["n_estimators"]
    max_bin = params["max_bin"]
    max_depth = params["max_depth"]
    learning_rate = params["learning_rate"]
    max_leaves = params["max_leaves"]
    gamma = params["gamma"]
    scale_pos_weight = params["scale_pos_weight"]
    # reg_alpha = params["reg_alpha"]
    # min_child_weight = params["min_child_weight"]


    with mlflow.start_run():
        # CHANGES HERE
        estimator = pipeline.copy({ xgb.num_workers:num_workers,
                                    xgb.n_estimators:int(n_estimators),
                                    xgb.max_bin:int(max_bin),
                                    xgb.max_depth: int(max_depth),
                                    xgb.learning_rate:learning_rate,
                                    xgb.max_leaves:int(max_leaves),
                                    xgb.gamma:gamma,
                                    xgb.scale_pos_weight:scale_pos_weight
                                    # ,
                                    # xgb.reg_alpha:reg_alpha,
                                    # xgb.min_child_weight:min_child_weight


                                  })
        

        model = estimator.fit(df_train)

        preds_training = model.transform(df_train)       
        pred_calc_training = RegressionEvaluator(preds_training)  

        train_accuracy = pred_calc_training[0]
        train_precision = pred_calc_training[1]
        train_recall = pred_calc_training[2]
        train_f1_score = pred_calc_training[3]
        train_f2_score = pred_calc_training[4]
        train_areaUnderROC = pred_calc_training[5]
        train_areaUnderPR = pred_calc_training[6]

        mlflow.log_metric('train_accuracy', train_accuracy)
        mlflow.log_metric('train_precision', train_precision)
        mlflow.log_metric('train_recall', train_recall)
        mlflow.log_metric('train_f1_score', train_f1_score)
        mlflow.log_metric('train_f2_score', train_f2_score)
        mlflow.log_metric('train_areaUnderROC', train_areaUnderROC)
        mlflow.log_metric('train_areaUnderPR', train_areaUnderPR)


        print('-------------------')
        print('Train Metrics:')
        print('accuracy:',train_accuracy)
        print('precision:',train_precision)
        print('recall:',train_recall)

        print('f1_score:',train_f1_score)
        print('f2_score:',train_f2_score)

        print('areaUnderROC:',str(train_areaUnderROC))
        print('areaUnderPR:',str(train_areaUnderPR))

        
        preds = model.transform(df_test)
        # preds = model.transform(df_val)
        pred_calc = RegressionEvaluator(preds)
        val_accuracy = pred_calc[0]
        val_precision = pred_calc[1]
        val_recall = pred_calc[2]
        val_f1_score = pred_calc[3]
        val_f2_score = pred_calc[4]
        val_areaUnderROC = pred_calc[5]
        val_areaUnderPR = pred_calc[6]

        mlflow.log_metric('val_accuracy', val_accuracy)
        mlflow.log_metric('val_precision', val_precision)
        mlflow.log_metric('val_recall', val_recall)
        mlflow.log_metric('val_f1_score', val_f1_score)
        mlflow.log_metric('val_f2_score', val_f2_score)
        mlflow.log_metric('val_areaUnderROC', val_areaUnderROC)
        mlflow.log_metric('val_areaUnderPR', val_areaUnderPR)
        print('-------------------')
        print('Validation Metrics:')
        print('accuracy:',val_accuracy)
        print('precision:',val_precision)
        print('recall:',val_recall)
        print('f1_score:',val_f1_score)
        print('f2_score:',val_f2_score)
        print('areaUnderROC:',val_areaUnderROC)
        print('areaUnderPR:',val_areaUnderPR)


        print('-------------------')
        print('Model Params:')
        print('num_workers:',num_workers)
        print('n_estimators:',n_estimators)
        print('max_bin:',max_bin)
        print('max_depth:',max_depth)
        print('learning_rate:',learning_rate)
        print('max_leaves:',max_leaves)
        print('gamma:',gamma)

        mlflow.spark.log_model(model, "XGB_Model_test_JT_60m")
        # print('model_logging_complete' + " --- %s seconds ---" % (time.time() - start_time))
    return val_areaUnderPR

In [0]:
from hyperopt import fmin, tpe, Trials
import mlflow

mlflow.pyspark.ml.autolog(log_models=False)
num_evals = 1
trials = Trials()
best_hyperparam = fmin(fn=objective_function,
                       space = search_space,
                       algo=tpe.suggest,
                       max_evals = num_evals,
                       trials=trials,
                       rstate=np.random.default_rng(42))


  0%|          | 0/1 [00:00<?, ?trial/s, best loss=?]






                                                     -------------------
  0%|          | 0/1 [13:16<?, ?trial/s, best loss=?]                                                     Train Metrics:
  0%|          | 0/1 [13:16<?, ?trial/s, best loss=?]                                                     accuracy:
  0%|          | 0/1 [13:16<?, ?trial/s, best loss=?]                                                     0.8134354492108988
  0%|          | 0/1 [13:16<?, ?trial/s, best loss=?]                                                     precision:
  0%|          | 0/1 [13:16<?, ?trial/s, best loss=?]                                                     0.8419836580548854
  0%|          | 0/1 [13:16<?, ?trial/s, best loss=?]                                                     recall:
  0%|          | 0/1 [13:16<?, ?trial/s, best loss=?]                                                     0.951528868476467
  0%|          | 0/1 [13:16<?, ?trial/s, best loss=?]       

2023/08/08 21:45:20 INFO mlflow.spark: Inferring pip requirements by reloading the logged model from the databricks artifact repository, which can be time-consuming. To speed up, explicitly specify the conda_env or pip_requirements when calling log_model().



100%|██████████| 1/1 [15:36<00:00, 936.13s/trial, best loss: 0.3363465582622245]100%|██████████| 1/1 [15:36<00:00, 936.13s/trial, best loss: 0.3363465582622245]


In [0]:
best_hyperparam

{'colsample_bytree': 0.6000000000000001,
 'gamma': 8.0,
 'learning_rate': 0.5,
 'max_bin': 70.0,
 'max_depth': 8.0,
 'max_leaves': 14.0,
 'n_estimators': 160.0,
 'num_workers': 4.0}