In [None]:
##################################################################################
# Model Training Notebook
#
# This notebook shows an example of a Model Training pipeline using Delta tables.
# It is configured and can be executed as the "Train" task in the model_training_job workflow defined under
# ``fraud_detection/resources/model-workflow-resource.yml``
#
# Parameters:
# * env (required):                 - Environment the notebook is run in (staging, or prod). Defaults to "staging".
# * training_data_path (required)   - Path to the training data.
# * experiment_name (required)      - MLflow experiment name for the training runs. Will be created if it doesn't exist.
# * model_name (required)           - Three-level name (<catalog>.<schema>.<model_name>) to register the trained model in Unity Catalog. 
#  
##################################################################################

In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
import os
notebook_path =  '/Workspace/' + os.path.dirname(dbutils.notebook.entry_point.getDbutils().notebook().getContext().notebookPath().get())
%cd $notebook_path

In [None]:
%pip install -r ../../requirements.txt

In [None]:
dbutils.library.restartPython()

In [None]:
from databricks.connect import DatabricksSession
from databricks.sdk import WorkspaceClient

spark = DatabricksSession.builder.getOrCreate()
w = WorkspaceClient()

In [None]:
import numpy as np                   # array, vector, matrix calculations
import pandas as pd                  # DataFrame handling
import xgboost as xgb                # gradient boosting machines (GBMs)
import mlflow
import os
import mlflow.pyfunc
import mlflow.spark
import sklearn

In [None]:
# List of input args needed to run this notebook as a job.
# Provide them via DB widgets or notebook arguments.

# Notebook Environment
w.dbutils.widgets.dropdown("env", "staging", ["dev","staging", "prod"], "Environment Name")
env = w.dbutils.widgets.get("env")

# Path to the Hive-registered Delta table containing the training data.
w.dbutils.widgets.text(
    "training_data_path",
    "/Volumes/aaron_dev/fraud_detection/raw/Fraud_final-1.csv",
    label="Path to the training data",
)

# MLflow experiment name.
w.dbutils.widgets.text(
    "experiment_name",
    "/dev-fraud_detection-experiment",
    label="MLflow experiment name",
)
# Unity Catalog registered model name to use for the trained model.
w.dbutils.widgets.text(
    "model_name", "aaron_dev.fraud_detection.fraud_detection-model", label="Full (Three-Level) Model Name"
)

# Unity Catalog result table.
w.dbutils.widgets.text(
    "result_table", "aaron_dev.fraud_detection.training_result", label="Training Result Table"
)

# Unity Catalog training data table
w.dbutils.widgets.text(
    "training_table", "aaron_dev.fraud_detection.training_input", label="Training Input Table"
)

# MLflow tracking server
w.dbutils.widgets.text(
    "tracking_uri", "databricks", label="MLflow Tracking Server"
)

In [None]:
input_table_path = w.dbutils.widgets.get("training_data_path")
experiment_name = w.dbutils.widgets.get("experiment_name")
model_name = w.dbutils.widgets.get("model_name")
result_table = w.dbutils.widgets.get("result_table")
training_table = w.dbutils.widgets.get("training_table")
tracking_uri = w.dbutils.widgets.get("tracking_uri")

In [None]:
%cp Fraud_final-1.csv $input_table_path

In [None]:
spark.read.option("inferSchema", "true") \
          .option("header", "true") \
          .option("delim", ",") \
          .csv(input_table_path) \
          .write \
          .format("delta") \
          .mode("overwrite") \
          .option("overwriteSchema", "true") \
          .saveAsTable(training_table)

Let's first define a outline for feature preprocessing and modeling. We will call the respective preprocessing and modeling functions after we have imported out data.

In [None]:
# This scaling code using the simple sklearn out-of-the-box scaler. It's used here for simplicity and re-used inside our PyFunc class
def preprocess_data(source_df,
                    numeric_columns,
                    fitted_scaler):
  '''
  Subset df with selected columns
  Use the fitted scaler to center and scale the numeric columns  
  '''
  res_df = source_df[numeric_columns].copy()
  
  ## scale the numeric columns with the pre-built scaler
  res_df[numeric_columns] = fitted_scaler.transform(res_df[numeric_columns])
  
  return res_df

In [None]:
class XGBWrapper(mlflow.pyfunc.PythonModel):
  '''
    XGBClassifier model with embedded pre-processing.
    
    This class is an MLflow custom python function wrapper around a XGB model.
    The wrapper provides data preprocessing so that the model can be applied to input dataframe directly.
    :Input: to the model is pandas dataframe
    :Output: predicted price for each listing

    The model declares current local versions of XGBoost and pillow as dependencies in its
    conda environment file.  
  '''
  def __init__(self,
               model,
               X,
               y,
               numeric_columns):
    
    self.model = model

    from sklearn.model_selection import train_test_split
    self.X_train, self.X_test, self.y_train, self.y_test = train_test_split(X, y, test_size=0.30, random_state=2019)
    self.numeric_columns = numeric_columns
    
    from sklearn.preprocessing import StandardScaler 
    #create a scaler for our numeric variables
    # only run this on the training dataset and use to scale test set later.
    scaler = StandardScaler()
    self.fitted_scaler = scaler.fit(self.X_train[self.numeric_columns])
    self.X_train_processed = preprocess_data(self.X_train, self.numeric_columns, self.fitted_scaler)
    self.X_test_processed  = preprocess_data(self.X_test, self.numeric_columns, self.fitted_scaler)

    def _accuracy_metrics(model, X, y):
      import sklearn
      from sklearn import metrics
      y_pred = model.predict_proba(X)[:,1]
      fpr, tpr, thresholds = sklearn.metrics.roc_curve(y, y_pred)
      self.auc = sklearn.metrics.auc(fpr, tpr)
      print("Model AUC is:", self.auc)

      return self.auc
    
    self.auc = _accuracy_metrics(model=self.model, X=self.X_test_processed, y=self.y_test )
    
    
  def predict(self,  model_input):
    '''
      Generate predictions from the input df 
      Subset input df with selected columns
      Assess the model accuracy
      Use the fitted scaler to center and scale the numeric columns  
      :param input: pandas.DataFrame with numeric_columns to be scored. The
                   columns must has same schema as numeric_columns of X_train
     :return: numpy 1-d array as fraud probabilities 

    '''
    input_processed = self._preprocess_data(X=model_input, numeric_columns=self.numeric_columns, fitted_scaler=self.fitted_scaler )
    return pd.DataFrame(self.model.predict_proba(input_processed)[:,1], columns=['predicted'])

  
  def _preprocess_data(self,
                      X,
                      numeric_columns,
                      fitted_scaler):
    res_df = preprocess_data(X, numeric_columns, fitted_scaler)
    self._df = res_df
    
    return res_df

In [None]:
# Our fit method will be used within our MLflow model training experiment run
# The AUROC metric is chosen here 
def fit(X, y):
  """
   :return: dict with fields 'loss' (scalar loss) and 'model' fitted model instance
  """
  import xgboost
  from xgboost import XGBClassifier
  from sklearn.model_selection import cross_val_score
  
  _model =  XGBClassifier(learning_rate=0.3,
                          gamma=5,
                          max_depth=8,
                          n_estimators=15,
                          min_child_weight = 9, objective='binary:logistic')

  xgb_model = _model.fit(X, y)
  
  score = -cross_val_score(_model, X, y, scoring='roc_auc').mean()
  
  return {'loss': score, 'model': xgb_model}

Our input dataset has several fields which will be used for rule based modeling and machine learning. In this notebook we will rely on our machine learning model to identify important features that are effective at predicting fraud. Let's take a look into descriptions of these features to understand our downstream modeling and interpretability results.
<br>
<br>
* LAST_ADR_CHNG_DUR     - Duration in days since the last address change on the account.
<br>
* AVG_DLY_AUTHZN_AMT    - The average daily authorization amount on the plastic since the day of first use.
<br>
* DISTANCE_FROM_HOME	  - Approximate distance of customer's home from merchant.
<br>
* HOME_PHN_NUM_CHNG_DUR - Duration in days since the home phone number was changed on the account.

In [None]:
from pyspark.sql.functions import * 

import cloudpickle
import pandas as pd
import numpy as np


df = spark.read.table(training_table) 

data = df.toPandas()
data = data.drop(columns=['AUTH_ID', 'ACCT_ID_TOKEN'])
numeric_columns = data.columns.to_list()
numeric_columns.remove('FRD_IND')
data.head()

In [None]:
conda_env = mlflow.pyfunc.get_default_conda_env()

In [None]:
conda_env = mlflow.pyfunc.get_default_conda_env()
conda_env['dependencies'][2]['pip'] += [f'xgboost=={xgb.__version__}']
conda_env['dependencies'][2]['pip'] += [f'scikit-learn=={sklearn.__version__}']

In [None]:
import mlflow

mlflow.set_experiment(experiment_name)
mlflow.set_registry_uri('databricks-uc')
mlflow.set_tracking_uri(tracking_uri)

In [None]:
import mlflow
from mlflow.models import infer_signature

# useremail = w.dbutils.notebook.entry_point.getDbutils().notebook().getContext().userName().get()
# experiment_name = f"/Users/{useremail}/dff_orchestrator"
mlflow.set_experiment(experiment_name) 
model_run_name = 'fraud-xgb-wrapper'

with mlflow.start_run(run_name=model_run_name) as run:
  mlflow.log_param('Input-data-location', training_table)
  from sklearn.model_selection import train_test_split
  X_train, X_test, y_train, y_test = train_test_split(data.drop(["FRD_IND"], axis=1), data["FRD_IND"], test_size=0.33, random_state=42)

  from sklearn.preprocessing import StandardScaler 
  # create a scaler for our numeric variables
  # only run this on the training dataset and use to scale test set later.
  scaler = StandardScaler()
  fitted_scaler = scaler.fit(X_train[numeric_columns])
  X_train_processed = preprocess_data(source_df=X_train, numeric_columns=numeric_columns, fitted_scaler=fitted_scaler )

  #train a model and get the loss
  train_dict = {}
  train_dict = fit(X=X_train_processed, y=y_train)
  xgb_model = train_dict['model']
  mlflow.log_metric('loss', train_dict['loss'])
  
  ##------- log pyfunc custom model -------##
   # make an instance of the Pyfunc Class
  myXGB = XGBWrapper(model = xgb_model,
                     X = data[numeric_columns].copy(), 
                     y = data['FRD_IND'], 
                     numeric_columns = numeric_columns)
  
  signature = infer_signature(X_train_processed, myXGB.predict(model_input = X_train_processed))

  mlflow.pyfunc.log_model(model_run_name, python_model=myXGB, conda_env=conda_env, signature=signature,registered_model_name=model_name)

  mlflow.log_metric('auroc', myXGB.auc)
  
# programmatically get the latest Run ID
runs = mlflow.search_runs(mlflow.get_experiment_by_name(experiment_name).experiment_id)
latest_run_id = runs.sort_values('end_time').iloc[-1]["run_id"]
print('The latest run id: ', latest_run_id)

After running SHAP on model we can see how some of the features such  duration since address change, transaction amount and available cash in the account were proved to be most important. While this is purely machine learning driven approach, we will look at ways to improve customer satisfaction with rule based modeling based vs really totally on ML based approach.

In [None]:
X = data[numeric_columns].copy()
y = data['FRD_IND']

predictions = myXGB.predict( X)
predictions.head()

In [None]:
import shap
from pyspark.sql import *
explainer = shap.TreeExplainer(xgb_model)
shap_values = explainer.shap_values(X, y=y.values)
mean_abs_shap = np.absolute(shap_values).mean(axis=0).tolist()
display(spark.createDataFrame(sorted(list(zip(mean_abs_shap, X.columns)), reverse=True)[:8], ["Mean |SHAP|", "Column"]))

In [None]:
shap_values = explainer.shap_values(X, y=y.values)
print(shap_values.shape)

In [None]:
display(shap.force_plot(explainer.expected_value, shap_values[0,:], X.iloc[0,:],matplotlib=True))

In [None]:
import pandas as pd 
schema = spark.createDataFrame(X).schema
df = spark.createDataFrame(pd.DataFrame(shap_values, columns=X.columns)).withColumn("id", monotonically_increasing_id())
for col in df.columns:
  df = df.withColumnRenamed(col, 'shap_v_' + col)
df.createOrReplaceTempView("fraud_shap_values")

In [None]:
spark.createDataFrame(pd.concat([pd.DataFrame(X, columns=X.columns), pd.DataFrame(predictions, columns=['predicted']), pd.DataFrame(y, columns=['FRD_IND'])], axis=1)).withColumn("id", monotonically_increasing_id()).createOrReplaceTempView("txns")


### Model Result Saving 

In addition to saving model fraud scores, we want to be able to interactively query SHAP values on each observation also. We will persist these values on each observation so we can query in tabular form using SQL Analytics.

In [None]:
spark.sql("""select t.*, 
       s.*
from txns t join fraud_shap_values s 
on t.id = s.shap_v_id""").write.format("delta").option('overwriteSchema', 'true').mode('overwrite').saveAsTable(result_table)

In [None]:
display(spark.sql(f'''
select case when predicted > 0.5 then 1 else 0 end predicted_Ind, frd_ind, count(1) ct
from {result_table}
group by case when predicted > 0.5 then 1 else 0 end, frd_ind
''')
)


In [None]:
from mlflow.tracking import MlflowClient
import mlflow.pyfunc


def get_latest_model_version(model_name):
    latest_version = 1
    mlflow_client = MlflowClient(tracking_uri=tracking_uri)
    for mv in mlflow_client.search_model_versions(f"name='{model_name}'"):
        version_int = int(mv.version)
        if version_int > latest_version:
            latest_version = version_int
    return latest_version


In [None]:
# The returned model URI is needed by the model deployment notebook.
model_version = get_latest_model_version(model_name)
print(model_version)

In [None]:
# The returned model URI is needed by the model deployment notebook.
model_version = get_latest_model_version(model_name)
model_uri = f"models:/{model_name}/{model_version}"
w.dbutils.jobs.taskValues.set("model_uri", model_uri)
w.dbutils.jobs.taskValues.set("model_name", model_name)
w.dbutils.jobs.taskValues.set("model_version", model_version)

In [None]:
w.dbutils.notebook.exit(model_uri)