### import library

In [0]:
RUN_NAME = dbutils.widgets.get("RUN_NAME")
print(RUN_NAME)

In [0]:
dbutils.library.installPyPI("xgboost", version="1.2.0" )
dbutils.library.restartPython()

In [0]:
import pandas as pd
import xgboost as xgb

In [0]:
print(xgb.__version__)
print(pd.__version__)

In [0]:
dbutils.library.list()

In [0]:
%run "./common_module"

### Read in tables

In [0]:
winback_base_dummy = spark.sql("""select * from DB.table_dummy""")
winback_base_dummy = winback_base_dummy.withColumn("Y_GA_FLAG",col("Y_GA_FLAG").cast("int"))
winback_base_dummy = winback_base_dummy.dropDuplicates(['ID'])


In [0]:
display(winback_base_dummy.select([count(when(isnan(c), c)).alias(c) for c in winback_base_dummy.columns]))

In [0]:
display(winback_base_dummy)

In [0]:
df = winback_base_dummy
cnt_all = df.agg(countDistinct('ID').alias("cnt")).collect()[0].cnt
print(cnt_all)


#### check Y rate in data

In [0]:
y_cnt = df.where(col('Y_GA_FLAG') == 1).agg(countDistinct('ID').alias("cnt")).collect()[0].cnt
scale_pos_wei = ((cnt_all-y_cnt)/y_cnt)
print("Y rate :{:.6f}".format(y_cnt/cnt_all))
print("scale_pos_weight: {:.2f}".format(scale_pos_wei))


### Conver Spark DF To Pandas DF

#### Convert datatype to numeric for Pandas

In [0]:
def convert_datatype_for_pandas(full_data):
  #convert decimal to float
  select_expr = [col(c).cast("float") if ("decimal" in t) else col(c) for c, t in full_data.dtypes]
  full_data = full_data.select(*select_expr)
  #convert double to float
  select_expr = [col(c).cast("float") if ("double" in t) else col(c) for c, t in full_data.dtypes]
  full_data = full_data.select(*select_expr)

  #convert boolean to int
  select_expr = [col(c).cast("int") if ("boolean" in t) else col(c) for c, t in full_data.dtypes]
  full_data = full_data.select(*select_expr)
  return full_data.printSchema()

In [0]:
convert_datatype_for_pandas(df)


### split train & test

####To Pandas: Convert datatype to category for XGB

In [0]:
df_pd = df.toPandas()

In [0]:
feature_list = df_pd.columns.tolist()
rm_elems = ["ID","Y_GA_FLAG"] 
feature_list = [i  for i in feature_list if i not in rm_elems]
print(feature_list)

### Train  Model

#### Define Parameters

In [0]:
MLflow_path = dbutils.widgets.get("MLflow_path")
print(MLflow_path)

In [0]:
booster = dbutils.widgets.get("booster")
n_estimators = dbutils.widgets.get("n_estimators")
max_depth = dbutils.widgets.get("max_depth")
min_child_weight = dbutils.widgets.get("min_child_weight")
learning_rate = dbutils.widgets.get("learning_rate")
objective = dbutils.widgets.get("objective")
eval_metric = dbutils.widgets.get("eval_metric")
silent = dbutils.widgets.get("silent")

In [0]:
param_dist = {
      'booster':booster,
      'n_estimators':int(n_estimators), #number of trees //
      'scale_pos_weight': scale_pos_wei,
      'max_depth' : int(max_depth), #depth of tree,larger-->overfitting //
      'min_child_weight': int(min_child_weight), #min sample weight
      'learning_rate' : float(learning_rate), #every step-lengh of learning,default = 0.03 //
      'objective': objective,
      'eval_metric':eval_metric,
      'silent':int(silent),

    }
print(param_dist)

In [0]:
RUN_NAME = dbutils.widgets.get("RUN_NAME")
print(RUN_NAME)

#### Define UDF

In [0]:
def eval_pct_qcut10(y_proba, y_test):
  df_proba = pd.DataFrame({"predict_proba":y_proba[:,1], "actual":y_test})
  df_proba['rank'] = df_proba['predict_proba'].rank(method='first',ascending=False)
  df_proba['decile'] = pd.qcut(df_proba['rank'].values, 10).codes
  df_proba = df_proba.sort_values(by=['decile'])
  groups= df_proba.groupby("decile")
  pct = groups["actual"].sum()/groups.size()
  test_base_rate = (df_proba["actual"].sum()/len(df_proba["actual"]))
  top10 = pct[0]
  lift = top10 / test_base_rate
  return lift, test_base_rate, pct

In [0]:
def xgbClassifierModel(df_pd, y_col_name,run_name, param_dist):
  with mlflow.start_run(run_name=run_name):

    # Split to train/test
    train_df, test_df = train_test_split(df_pd,test_size=0.33,random_state=20210804)

    X_train = train_df[feature_list]
    y_train = train_df[y_col_name]
    X_test = test_df[feature_list]
    y_test = test_df[y_col_name]
    
    # Initilize model and fit
    print("***fitting XGBClasifier model...")
    model = xgb.XGBClassifier(**param_dist)
    model.fit(X=X_train, y=y_train, eval_set=[(X_train, y_train), (X_test, y_test)])
    print(model)
    

   #evaluate result
    print("***predicting probability...")
    y_proba = model.predict_proba(X_test) 
    lift, base_rate, pct = eval_pct_qcut10(y_proba, y_test)
    
    print("Eval Set result:")
    ROC_AUC_test = roc_auc_score(y_test, y_proba[:,-1])
    ROC_AUC_train = roc_auc_score(y_train, model.predict_proba(X_train)[:,-1])
    
    metric_dist = {'ROC_AUC_test':ROC_AUC_test,
              'ROC_AUC_train':ROC_AUC_train,
              'lift': lift,
              'base_rate':base_rate,
              
             }
    # get feature importance
    bst = model.get_booster()
    feature_important = bst.get_score(importance_type='gain')
    featr_keys = list(feature_important.keys())
    featr_values = list(feature_important.values())
    
    featr_imp = pd.DataFrame(data=featr_values, index=featr_keys, columns=["gain"]).sort_values(by = "gain", ascending=False)
    featr_imp.plot(figsize=(15,10),kind='barh')

    #log data into mlflow
    print("***logging data to mlflow...")
    mlflow.set_tag("Y",y_col_name)
    training_size, test_size = len(y_train), len(y_test)
    for parameter in param_dist:
      mlflow.log_param(parameter, param_dist[parameter])
    
    for metric in metric_dist:
      mlflow.log_metric(metric, metric_dist[metric])
      print(metric, metric_dist[metric])
    print("pct: ",pct)
    
    print(featr_imp)

    x_col_name = list(X_train.columns)
    del X_train, y_train, X_test, y_test 
  return model, x_col_name

#### Training

In [0]:
import mlflow
mlflow.set_tracking_uri("databricks")
mlflow.set_experiment(MLflow_path)

In [0]:
model, x_col_name = xgbClassifierModel(df_pd, "Y_GA_FLAG",RUN_NAME,param_dist)