In [None]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
import pandas as pd
import numpy as np
import xgboost as xgb
import mlflow
import mlflow.sklearn
import mlflow.xgboost
import mlflow.pyfunc
from mlflow.tracking import MlflowClient
from sklearn.metrics import roc_auc_score
from math import exp
from hyperopt import fmin, hp, tpe, SparkTrials, STATUS_OK
import tempfile
import warnings
warnings.filterwarnings('ignore')

In [None]:
def calculate_feature_importance(model, x):
  feat_imp = pd.DataFrame(model.feature_importances_, index = x.columns).reset_index()
  feat_imp.columns = ['feature', 'feature_importance']
  feat_imp = feat_imp.sort_values(by = 'feature_importance', ascending = False)
  feat_imp['cumulative_feature_importance'] = feat_imp['feature_importance'].cumsum()

  f = 'gain'
  feat_gain = pd.DataFrame.from_dict(model.get_booster().get_score(importance_type = f), orient = 'index').reset_index()
  feat_gain.columns = ['feature', 'gain']

  return feat_imp.merge(feat_gain, on = 'feature', how = 'left').sort_values('gain', ascending = False)

def calculate_model_evaluation_metrics(y, y_hat, wt, bins = 20, mode = None):
  report_tmp = pd.DataFrame(y)
  report_tmp['probability'] = y_hat
  report_tmp.columns = ['bad', 'probability']
  report_tmp['weight'] = wt
  report_tmp['good'] = 1 - report_tmp['bad']
  if mode == 'score':
      report_tmp = (report_tmp.sort_values(by = (['probability', 'bad']), ascending = [True, False])).reset_index(drop = True)
  else :
      report_tmp = (report_tmp.sort_values(by = (['probability', 'bad']),ascending = [False, False])).reset_index(drop = True)
  report_tmp['weight_cumulative_sum'] = report_tmp['weight'].cumsum()
  report_tmp['weight_good'] = report_tmp.apply(lambda x: 0 if x['bad'] == 1 else x['weight'], axis = 1)
  report_tmp['weight_bad'] = report_tmp.apply(lambda x: 0 if x['good'] == 1 else x['weight'], axis = 1)
  report_tmp['bucket'] = pd.cut(report_tmp.weight_cumulative_sum, bins)
  report_grp = report_tmp.groupby('bucket', as_index = False)
  report = report_grp.min().probability
  report = pd.DataFrame(report_grp.min().probability, columns = ['decile'])
  report['decile'] = pd.Series(list(range(10, 110, int(100/bins))))
  report['minimum_probability']= report_grp.min().probability
  report['maximum_probability'] = report_grp.max().probability
  report['bads'] = report_grp.sum().weight_bad
  report['goods'] = report_grp.sum().weight_good
  report['total'] = report.bads + report.goods
  report['bad_rate'] = (report.bads/report.total)
  report['cumulative_bad_rate'] =(report.bads/report_tmp.weight_bad.sum()).cumsum()
  report['cumulative_good_rate'] =(report.goods/report_tmp.weight_good.sum()).cumsum()
  report['ks'] = np.abs(np.round(report['cumulative_bad_rate'] - report['cumulative_good_rate'], 4))
  flag = lambda x: '<' if x == report.ks.max() else ''
  report['max_ks'] = report.ks.apply(flag)
  roc_auc = roc_auc_score(y, y_hat, sample_weight = wt)
  gini = 2 * roc_auc - 1
  report['roc_auc'] = roc_auc
  report['gini'] = gini
  report = report[["decile",
                   "minimum_probability",
                   "maximum_probability",
                   "bads",
                   "goods",
                   "total",
                   "bad_rate",
                   'cumulative_bad_rate',
                   'cumulative_good_rate',
                   'ks',
                   'max_ks',
                   'roc_auc',
                   'gini']]
  return report

def model_evaluation_report(y_train, y_hat_train, y_val, y_hat_val, w_train, w_val, bins = 10 , mode = None):
  train_report = calculate_model_evaluation_metrics(y_train, y_hat_train, w_train, bins, mode)
  val_report = calculate_model_evaluation_metrics(y_val, y_hat_val, w_val, bins, mode)
  summary = pd.DataFrame({}, index = list(range(0,5)), columns = ['train', 'validation'])
  summary['train'][0] = train_report.roc_auc[0]
  summary['train'][1] = train_report.gini[0]
  summary['train'][2] = train_report.ks.max()
  summary['train'][3] = train_report.query('decile == 10')['cumulative_bad_rate'].reset_index(drop = True)[0]
  summary['train'][4] = train_report.query('decile == 20')['cumulative_bad_rate'].reset_index(drop = True)[0]
  summary['validation'][0] = val_report.roc_auc[0]
  summary['validation'][1] = val_report.gini[0]
  summary['validation'][2] = val_report.ks.max()
  summary['validation'][3] = val_report.query('decile == 10')['cumulative_bad_rate'].reset_index(drop = True)[0]
  summary['validation'][4] = val_report.query('decile == 20')['cumulative_bad_rate'].reset_index(drop = True)[0]
  summary.index = [['roc_auc','gini','ks','capture_rate_10%','capture_rate_20%']]
  summary = summary.reset_index().rename(columns = {'level_0': 'metric'})
  return train_report, val_report, summary

def get_temporary_directory_path(prefix, suffix):
  temp = tempfile.NamedTemporaryFile(prefix = prefix, suffix = suffix)
  return temp

def save_artifact(artifact, file_name, file_type, directory):
  temp_file_name = get_temporary_directory_path(file_name, file_type)
  temp_name = temp_file_name.name
  artifact.to_csv(temp_name, index = False)
  mlflow.log_artifact(temp_name, directory)

def mlflow_run(run_name, params, n, x_train, y_train, w_train, x_val, y_val, w_val):
  with mlflow.start_run(run_name = run_name) as run:
    run_id = run.info.run_uuid
    experiment_id = run.info.experiment_id
    
    model = xgb.XGBClassifier()
    model.set_params(**params)
    model.fit(x_train, y_train, sample_weight = w_train, eval_set = [(x_val, y_val, w_val)], eval_metric = 'auc', early_stopping_rounds = 8)

    y_hat_train = model.predict_proba(x_train , ntree_limit = model.best_ntree_limit)[:,1]
    y_hat_val = model.predict_proba(x_val, ntree_limit = model.best_ntree_limit)[:,1]
    
    feature_importance = calculate_feature_importance(model, x_train)
    
    train_event_rate = y_train.mean()
    train_predicted_event_rate = y_hat_train.mean()
    val_event_rate = y_val.mean()
    val_predicted_event_rate = y_hat_val.mean()
    
    train_min_prob = y_hat_train.min()
    train_max_prob = y_hat_train.max()
    val_min_prob = y_hat_val.min()
    val_max_prob = y_hat_val.max()
    
    best_n_tree  = model.best_ntree_limit
    
    train_report, val_report, summary = model_evaluation_report(y_train, y_hat_train, y_val, y_hat_val, w_train, w_val, bins = 10, mode = None)
    
    train_roc_auc_scr = summary['train'][0]
    val_roc_auc_scr = summary['validation'][0]
    train_val_roc_auc_scr_diff = train_roc_auc_scr - val_roc_auc_scr
    
    train_gini = summary['train'][1]
    val_gini = summary['validation'][1]
    train_val_gini_diff = train_gini - val_gini
    
    train_ks = summary['train'][2]
    val_ks = summary['validation'][2]
    train_val_ks_diff = train_ks - val_ks
    
    train_capture_rate_10 = summary['train'][3]
    val_capture_rate_10 = summary['validation'][3]
    train_val_capture_rate_10_diff = train_capture_rate_10 - val_capture_rate_10
    
    train_capture_rate_20 = summary['train'][4]
    val_capture_rate_20 = summary['validation'][4]
    train_val_capture_rate_20_diff = train_capture_rate_20 - val_capture_rate_20
    
    mlflow.log_params(params)
    mlflow.log_param("iteration", n + 1)
    
    mlflow.log_metric("roc_auc_score_train", train_roc_auc_scr)
    mlflow.log_metric("roc_auc_score_validation", val_roc_auc_scr)
    mlflow.log_metric("roc_auc_score_difference", train_val_roc_auc_scr_diff)
    
    mlflow.log_metric("gini_train", train_gini)
    mlflow.log_metric("gini_validation", val_gini)
    mlflow.log_metric("gini_difference", train_val_gini_diff)
    
    mlflow.log_metric("ks_train", train_ks)
    mlflow.log_metric("ks_validation", val_ks)
    mlflow.log_metric("ks_difference", train_val_ks_diff)
    
    mlflow.log_metric("capture_rate_10_train", train_capture_rate_10)
    mlflow.log_metric("capture_rate_10_validation", val_capture_rate_10)
    mlflow.log_metric("capture_rate_10_difference", train_val_capture_rate_10_diff)
    
    mlflow.log_metric("capture_rate_20_train", train_capture_rate_20)
    mlflow.log_metric("capture_rate_20_validation", val_capture_rate_20)
    mlflow.log_metric("capture_rate_20_difference", train_val_capture_rate_20_diff)
    
    mlflow.log_metric("actual_bad_rate_train", train_event_rate)
    mlflow.log_metric("predicted_bad_rate_train", train_predicted_event_rate)
    mlflow.log_metric("actual_bad_rate_validation", val_event_rate)
    mlflow.log_metric("predicted_bad_rate_validation", val_predicted_event_rate)
    
    mlflow.log_metric("minimum_probability_train", train_min_prob)
    mlflow.log_metric("maximum_probability_train", train_max_prob)
    mlflow.log_metric("minimum_probability_validation", val_min_prob)
    mlflow.log_metric("maximum_probability_validation", val_max_prob)

    MlflowClient().set_experiment_tag(experiment_id, "mlflow.note.content", "US Export Propensity Model Development")
    MlflowClient().set_tag(run_id, "mlflow.note.content", "Iteration Number: {}".format(n + 1))

    mlflow.xgboost.log_model(xgb_model = model, artifact_path = "xgboost-model")
    
    save_artifact(feature_importance, "feature_importance", ".csv", "xgboost-model")
    save_artifact(train_report, "model_evaluation_report_train", ".csv", "xgboost-model")
    save_artifact(val_report, "model_evaluation_report_validation", ".csv", "xgboost-model")
    save_artifact(summary, "model_evaluation_summary", ".csv", "xgboost-model")
    
  return (run_id, experiment_id)
      

def build_models(run_name, parameters, niter, x_train, y_train, w_train, x_val, y_val, w_val):

  for n in range(niter):
    params = {}
    params_tmp = eval(parameters)

    for i in ['colsample_bylevel', 'colsample_bytree', 'learning_rate', 'max_depth', 'min_child_weight', 'n_estimators', 'subsample']:
      params[i] = params_tmp[i]

    run_id, experiment_id = mlflow_run(run_name, params, n, x_train, y_train, w_train, x_val, y_val, w_val)
  
  return run_id, experiment_id

In [None]:
df1 = pd.read_csv('/dbfs/FileStore/sahayk/us_export_propensity/output_data/us_export_propensity_model_output_override.csv')

identifiers = ['duns',
'append_year',
'append_month',
'sample_type']

weight = 'weight'

target = 'export'

prediction = 'predicted_export'

predictors = ['gctrs_ttl_signal_3yrs',
'ind_gctrs_3yrs',
'gctrs_cnt_unq_yrs',
'npayexp',
'sic4_score',
'nloc',
'gctrs_cnt_unq_customer_country',
'nrectyp',
'satis',
'sales',
'drp_paydex1_loc_decile',
'location_growth_score',
'ncomptype',
'nimptexpt',
'foreign_trade_buyer_ind',
'location_cluster_score',
'miny_ownd_ind',
'export_job_title_ind',
'ba_sum_excl_12m',
'loc_pct_rent_1',
'sml_bus_ind',
'loc_pct_comptype_g',
'export_business_name_ind',
'ucc_flng_3yr_cnt',
'ba_count_info_src_12m',
'chg_tot_emp',
'inds_norm_pydx_scr',
'drp_sales_loc_decile',
'foreign_trade_ind']

df2 = df1.drop(prediction, axis = 1)

train = df2[df2['sample_type'] == 'train']
val = df2[df2['sample_type'] == 'val']
test = df2[df2['sample_type'] == 'test']

run_name = "US Export Propensity Hyperparamter Tuning Experiment"

parameters = """{'colsample_bylevel': np.random.randint(5,10)/10,
               'colsample_bytree': np.random.randint(5,10)/10,
               'learning_rate': np.random.choice([0.1, 0.01, 0.05, 0.075]),
               'max_depth': np.random.randint(3,6),
               'min_child_weight': np.random.randint(3,10),
               'n_estimators': np.random.choice([300, 400, 500, 600]),
               'subsample': np.random.randint(5,10)/10}"""

niter = 20

run_id, experiment_id = build_models(run_name, parameters, niter, train[predictors], train[target], train[weight], val[predictors], val[target], val[weight])

In [None]:
best_model_run_id = 'cc75d84cd2bd4f16a4f8bf3d4e86a0ea'
model_name = 'us_export_propensity'
model_source = 'dbfs:/databricks/mlflow-tracking/1890214478686766/cc75d84cd2bd4f16a4f8bf3d4e86a0ea/artifacts/xgboost-model'
model_version = 1

client = mlflow.tracking.MlflowClient()
client.create_registered_model(model_name)
model_version = client.create_model_version(model_name, model_source, best_model_run_id)

client.transition_model_version_stage(name = model_name, version = model_version, stage = "Staging")

model = mlflow.pyfunc.load_model(model_uri = f"models:/{model_name}/{model_version}")

test['predicted_export'] = model.predict(test[predictors])

client.transition_model_version_stage(name = model_name, version = model_version, stage = "Production")