<img src=https://i.imgur.com/4q8SuvB.jpg height="850" width="850">

# ML Life Cycle Management with MLFlow and Delta Lake

It's a common story - a data team trains a model, deploys it to production, and all is good for a time. Then the model begins to make strange predictions, and it quickly becomes necessary to inspect and debug the model.

This notebook demonstrates how to use [MLflow](http://mlflow.org) and [Delta Lake](http://delta.io) to easily track, visualize, and reproduce model training runs for ease of debugging. It demonstrates how to:

1. Track and reproduce the exact snapshot of data used to build an ML pipeline.
2. Identify models that were trained on a particular snapshot of data.
3. Rerun training on a past snapshot of data (e.g. to reproduce an old model).

The notebook uses Delta Lake to provide data versioning and "time-travel" capabilities (restoring old versions of data), and MLflow to track data and query for runs that used a particular dataset.

**Prerequisites**:
* Databricks Runtime 6.3 ML or above
* Python 3

## Setup

1. Ensure you are using or create a cluster specifying 
  * **Databricks Runtime Version:** Databricks Runtime 6.0 or above 
  * **Python Version:** Python 3
1. Install a library with Source **PyPI** and enter `mlflow`.
1. Attach this notebook to the cluster.

## Problem Statement: Classifying "bad loans" for a lender

This notebook tackles a classification problem on the Lending Club dataset, with the goal of identifying "bad loans" (loans likely to be unprofitable) based on a combination of credit scores, credit history, and other features.

The end goal is to produce an interpretable model that a loan officer can use before deciding whether to approve a loan. Such a model provides an informative view for the lender as well as an immediate estimate and response for the prospective borrower.

### The Data

The data used is public data from Lending Club. It includes all funded loans from 2012 to 2017. Each loan includes applicant information provided by the applicant as well as the current loan status (Current, Late, Fully Paid, etc.) and latest payment information. For a full view of the data view the [data dictionary](https://resources.lendingclub.com/LCDataDictionary.xlsx).

![Loan_Data](https://preview.ibb.co/d3tQ4R/Screen_Shot_2018_02_02_at_11_21_51_PM.png)


https://www.kaggle.com/wendykan/lending-club-loan-data

## 1. Tracking Data Version and Location For Reproducibility

This notebook accepts data version and data path as input parameters via widgets, allowing for reproducing a run of the notebook against an explicitly-specified data version and path in the future. The ability to specify data version is an advantage of using Delta Lake, which preserves previous versions of datasets so that you can restore them later.

### Set up: create a Delta table in DBFS

Generate some example data in Delta Lake format by converting an existing Parquet table stored in DBFS.

In [9]:
%fs ls /databricks-datasets/samples/lending_club/parquet/

path,name,size
dbfs:/databricks-datasets/samples/lending_club/parquet/_SUCCESS,_SUCCESS,0
dbfs:/databricks-datasets/samples/lending_club/parquet/_committed_4376965328215018897,_committed_4376965328215018897,514
dbfs:/databricks-datasets/samples/lending_club/parquet/_started_4376965328215018897,_started_4376965328215018897,0
dbfs:/databricks-datasets/samples/lending_club/parquet/part-00000-tid-4376965328215018897-67dfb805-a14c-42f6-99f0-1a6608ac4723-443-c000.snappy.parquet,part-00000-tid-4376965328215018897-67dfb805-a14c-42f6-99f0-1a6608ac4723-443-c000.snappy.parquet,72106652
dbfs:/databricks-datasets/samples/lending_club/parquet/part-00001-tid-4376965328215018897-67dfb805-a14c-42f6-99f0-1a6608ac4723-444-c000.snappy.parquet,part-00001-tid-4376965328215018897-67dfb805-a14c-42f6-99f0-1a6608ac4723-444-c000.snappy.parquet,57676765
dbfs:/databricks-datasets/samples/lending_club/parquet/part-00002-tid-4376965328215018897-67dfb805-a14c-42f6-99f0-1a6608ac4723-445-c000.snappy.parquet,part-00002-tid-4376965328215018897-67dfb805-a14c-42f6-99f0-1a6608ac4723-445-c000.snappy.parquet,43307289
dbfs:/databricks-datasets/samples/lending_club/parquet/part-00003-tid-4376965328215018897-67dfb805-a14c-42f6-99f0-1a6608ac4723-446-c000.snappy.parquet,part-00003-tid-4376965328215018897-67dfb805-a14c-42f6-99f0-1a6608ac4723-446-c000.snappy.parquet,21780130
dbfs:/databricks-datasets/samples/lending_club/parquet/part-00004-tid-4376965328215018897-67dfb805-a14c-42f6-99f0-1a6608ac4723-447-c000.snappy.parquet,part-00004-tid-4376965328215018897-67dfb805-a14c-42f6-99f0-1a6608ac4723-447-c000.snappy.parquet,60919


In [10]:
DELTA_TABLE_DEFAULT_PATH = "/ml/loan_stats.delta"
data_path = DELTA_TABLE_DEFAULT_PATH

In [11]:
from pyspark.sql.functions import *

# Remove table if it exists
dbutils.fs.rm(DELTA_TABLE_DEFAULT_PATH, recurse=True)
# Load & munge Lending Club data, then write to DBFS in Delta Lake format
lspq_path = "/databricks-datasets/samples/lending_club/parquet/"
data = spark.read.parquet(lspq_path)
# Select only the columns needed & apply other preprocessing
features = ["loan_amnt",  "annual_inc", "dti", "delinq_2yrs","total_acc", "total_pymnt", "issue_d", "earliest_cr_line"]
raw_label = "loan_status"
loan_stats_ce = data.select(*(features + [raw_label]))
print("------------------------------------------------------------------------------------------------")
print("Create bad loan label, this will include charged off, defaulted, and late repayments on loans...")
loan_stats_ce = loan_stats_ce.filter(loan_stats_ce.loan_status.isin(["Default", "Charged Off", "Fully Paid"]))\
                       .withColumn("bad_loan", (~(loan_stats_ce.loan_status == "Fully Paid")).cast("string"))
loan_stats_ce = loan_stats_ce.orderBy(rand()).limit(10000) # Limit rows loaded to facilitate running on Community Edition
print("------------------------------------------------------------------------------------------------")
print("Casting numeric columns into the appropriate types...")
loan_stats_ce = loan_stats_ce.withColumn('issue_year',  substring(loan_stats_ce.issue_d, 5, 4).cast('double')) \
                       .withColumn('earliest_year', substring(loan_stats_ce.earliest_cr_line, 5, 4).cast('double')) \
                       .withColumn('total_pymnt', loan_stats_ce.total_pymnt.cast('double'))
loan_stats_ce = loan_stats_ce.withColumn('credit_length_in_years', (loan_stats_ce.issue_year - loan_stats_ce.earliest_year))   
# Save table in Delta Lake format
loan_stats_ce.write.format("delta").mode("overwrite").save(DELTA_TABLE_DEFAULT_PATH)

### Load Data From Delta Table
Load data back in Delta Lake format, using the data path and version specified in the widgets.

In [13]:
# Use the latest version of the table by default, unless a version parameter is explicitly provided

loan_stats_train,loan_stats_test = loan_stats_ce.randomSplit([0.25, 0.75], seed=123)
# Review data
display(loan_stats_train)

loan_amnt,annual_inc,dti,delinq_2yrs,total_acc,total_pymnt,issue_d,earliest_cr_line,loan_status,bad_loan,issue_year,earliest_year,credit_length_in_years
1000.0,23290.0,20.97,0.0,3.0,1288.3113285272,Aug-2012,Feb-2006,Fully Paid,False,2012.0,2006.0,6.0
1000.0,25000.0,8.64,1.0,15.0,1104.49,Feb-2014,Aug-2004,Fully Paid,False,2014.0,2004.0,10.0
1000.0,29000.0,13.74,0.0,15.0,1141.51,Oct-2014,Jun-2004,Fully Paid,False,2014.0,2004.0,10.0
1000.0,32000.0,24.46,1.0,44.0,482.19,Jul-2015,Aug-2000,Charged Off,True,2015.0,2000.0,15.0
1000.0,40000.0,19.41,0.0,15.0,1121.1418091796,Mar-2016,May-2002,Fully Paid,False,2016.0,2002.0,14.0
1000.0,60000.0,22.42,1.0,29.0,1029.29,Mar-2016,Mar-2001,Fully Paid,False,2016.0,2001.0,15.0
1000.0,75000.0,19.63,0.0,25.0,1117.5,May-2014,Jun-1998,Fully Paid,False,2014.0,1998.0,16.0
1000.0,85000.0,21.09,3.0,26.0,1062.56,Nov-2014,Mar-2002,Fully Paid,False,2014.0,2002.0,12.0
1000.0,94704.0,4.32,0.0,15.0,1047.331275467,Jan-2017,Oct-1995,Fully Paid,False,2017.0,1995.0,22.0
1000.0,117000.0,16.58,0.0,13.0,1016.28,Jan-2016,Jul-2001,Fully Paid,False,2016.0,2001.0,15.0


### Review Delta Table History
All the transactions for this table are stored within this table including the initial set of insertions, update, delete, merge, and inserts.

In [15]:
spark.sql("DROP TABLE IF EXISTS loan_stats")
spark.sql("CREATE TABLE loan_stats USING DELTA LOCATION '" + DELTA_TABLE_DEFAULT_PATH + "'")

In [16]:
%sql

desc extended loan_stats

col_name,data_type,comment
loan_amnt,float,
annual_inc,float,
dti,float,
delinq_2yrs,float,
total_acc,float,
total_pymnt,double,
issue_d,string,
earliest_cr_line,string,
loan_status,string,
bad_loan,string,


In [17]:
%sql
DESCRIBE HISTORY loan_stats

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend
0,2020-03-03T13:16:04.000+0000,1950142778633211,bhavin.kukadia@databricks.com,WRITE,"Map(mode -> Overwrite, partitionBy -> [])",,List(2858831332095950),1108-185305-true25,,WriteSerializable,False


### Train a Model with Cross Validation for Hyperparameter Tuning
Train an ML pipeline using Spark MLlib. The metrics and params from your tuning runs are automatically tracked to MLflow for later inspection.

In [19]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler, OneHotEncoder, StandardScaler, Imputer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from mlflow import spark as sparkm
import mlflow

def _evaluate_clf(bestModel,test):
    predictions = bestModel.transform(test)
    evaluator=BinaryClassificationEvaluator()
    objective_metric = evaluator.evaluate(predictions)
    return objective_metric


def _fit_crossvalidator(clf, param_grid,test,train,features,target, tag):
  """
  Helper function that fits a CrossValidator model to predict a binary label
  `target` on the passed-in training DataFrame using the columns in `features`
  :param: train: Spark DataFrame containing training data
  :param: features: List of strings containing column names to use as features from `train`
  :param: target: String name of binary target column of `train` to predict
  """
  train = train.select(features + [target])
  model_matrix_stages = [
    Imputer(inputCols = features, outputCols = features),
    VectorAssembler(inputCols=features, outputCol="features"),
    StringIndexer(inputCol="bad_loan", outputCol="label")
  ]
# lr = LogisticRegression(maxIter=10, elasticNetParam=0.5, featuresCol = "features")
# ParamGridBuilder().addGrid(clf.regParam, [0.1, 0.01]).build()
  pipeline = Pipeline(stages=model_matrix_stages + [clf])
  paramGrid = param_grid 
  crossval = CrossValidator(estimator=pipeline,
                            estimatorParamMaps=paramGrid,
                            evaluator=BinaryClassificationEvaluator(),
                            numFolds=5)
  with mlflow.start_run(run_name="bk-loan-clf") as run:
    run_id = run.info.run_uuid
    mlflow.log_param("data_path", DELTA_TABLE_DEFAULT_PATH)
    mlflow.set_tag("model_type",tag)
    cvModel = crossval.fit(train)
    accuracy = _evaluate_clf(cvModel.bestModel,test)
    mlflow.log_metric("accuracy",accuracy)
    sparkm.log_model(spark_model=cvModel.bestModel, artifact_path="bk-loan-clf")
    return cvModel.bestModel,accuracy,run_id

In [20]:
# Fit model
lr = LogisticRegression(maxIter=10, elasticNetParam=0.5, featuresCol = "features")
param_grid = ParamGridBuilder().addGrid(lr.regParam, [0.1, 0.01]).build()
features = ["loan_amnt",  "annual_inc", "dti", "delinq_2yrs","total_acc", "credit_length_in_years"]
glm_model,accuracy,run_id = _fit_crossvalidator(lr,param_grid,loan_stats_test,loan_stats_train,features,target="bad_loan",tag="glm_model")
print("accuracy: ",accuracy)
print(f"{run_id}")

### View Training Results in the MLflow Runs Sidebar

The model training code above automatically logged metrics and params under an MLflow run, which you can view using the [MLflow Runs Sidebar](https://databricks.com/blog/2019/04/30/introducing-mlflow-run-sidebar-in-databricks-notebooks.html).

![](https://pages.databricks.com/rs/094-YMS-629/images/db-mlflow-integration.gif)

In [22]:
model_uri = "runs:/" + run_id + "/bk-loan-clf"
print(model_uri)

In [23]:
model = mlflow.spark.load_model(model_uri)
predictions = model.transform(loan_stats_test)
display(predictions)

loan_amnt,annual_inc,dti,delinq_2yrs,total_acc,total_pymnt,issue_d,earliest_cr_line,loan_status,bad_loan,issue_year,earliest_year,credit_length_in_years,features,label,rawPrediction,probability,prediction
1000.0,11856.0,24.39,0.0,39.0,1305.9818214111,Aug-2013,Dec-1993,Fully Paid,False,2013.0,1993.0,20.0,"List(1, 6, List(), List(1000.0, 11856.0, 24.389999389648438, 0.0, 39.0, 20.0))",0.0,"List(1, 2, List(), List(1.6511959617567114, -1.6511959617567114))","List(1, 2, List(), List(0.8390526225658662, 0.16094737743413384))",0.0
1000.0,17000.0,22.23,0.0,23.0,1235.111500455999,Jan-2012,Nov-1998,Fully Paid,False,2012.0,1998.0,14.0,"List(1, 6, List(), List(1000.0, 17000.0, 22.229999542236328, 0.0, 23.0, 14.0))",0.0,"List(1, 2, List(), List(1.456784976700993, -1.456784976700993))","List(1, 2, List(), List(0.8110404527775369, 0.18895954722246316))",0.0
1000.0,18000.0,31.87,0.0,20.0,460.72,Jul-2015,Oct-1999,Charged Off,True,2015.0,1999.0,16.0,"List(1, 6, List(), List(1000.0, 18000.0, 31.8700008392334, 0.0, 20.0, 16.0))",1.0,"List(1, 2, List(), List(1.1407253667925399, -1.1407253667925399))","List(1, 2, List(), List(0.757812792326072, 0.24218720767392798))",0.0
1000.0,18500.0,34.96,1.0,18.0,1216.7999999993,Aug-2012,Aug-1998,Fully Paid,False,2012.0,1998.0,14.0,"List(1, 6, List(), List(1000.0, 18500.0, 34.959999084472656, 1.0, 18.0, 14.0))",0.0,"List(1, 2, List(), List(0.9477104930644107, -0.9477104930644107))","List(1, 2, List(), List(0.7206545066721547, 0.27934549332784536))",0.0
1000.0,22000.0,30.71,0.0,16.0,1195.4566365132,Jul-2012,Aug-2005,Fully Paid,False,2012.0,2005.0,7.0,"List(1, 6, List(), List(1000.0, 22000.0, 30.709999084472656, 0.0, 16.0, 7.0))",0.0,"List(1, 2, List(), List(1.024115327545561, -1.024115327545561))","List(1, 2, List(), List(0.7357734398500779, 0.2642265601499221))",0.0
1000.0,23000.0,9.92,0.0,16.0,1279.7230687248,Dec-2014,Jan-1994,Fully Paid,False,2014.0,1994.0,20.0,"List(1, 6, List(), List(1000.0, 23000.0, 9.920000076293945, 0.0, 16.0, 20.0))",0.0,"List(1, 2, List(), List(1.8283148617250526, -1.8283148617250526))","List(1, 2, List(), List(0.8615608565049786, 0.13843914349502134))",0.0
1000.0,24000.0,17.55,3.0,14.0,183.55,Apr-2015,Mar-2007,Charged Off,True,2015.0,2007.0,8.0,"List(1, 6, List(), List(1000.0, 24000.0, 17.549999237060547, 3.0, 14.0, 8.0))",1.0,"List(1, 2, List(), List(1.279301171277882, -1.279301171277882))","List(1, 2, List(), List(0.7823307968340352, 0.21766920316596494))",0.0
1000.0,25000.0,7.87,1.0,13.0,1039.2215462115,Nov-2016,Mar-2008,Fully Paid,False,2016.0,2008.0,8.0,"List(1, 6, List(), List(1000.0, 25000.0, 7.869999885559082, 1.0, 13.0, 8.0))",0.0,"List(1, 2, List(), List(1.668684019892717, -1.668684019892717))","List(1, 2, List(), List(0.841400287967941, 0.15859971203205903))",0.0
1000.0,28000.0,14.66,0.0,8.0,1029.42,Dec-2014,Jan-2006,Fully Paid,False,2014.0,2006.0,8.0,"List(1, 6, List(), List(1000.0, 28000.0, 14.65999984741211, 0.0, 8.0, 8.0))",0.0,"List(1, 2, List(), List(1.4432829627569492, -1.4432829627569492))","List(1, 2, List(), List(0.8089625222771974, 0.19103747772280263))",0.0
1000.0,29000.0,31.54,0.0,23.0,1043.11,Jan-2015,Feb-2000,Fully Paid,False,2015.0,2000.0,15.0,"List(1, 6, List(), List(1000.0, 29000.0, 31.540000915527344, 0.0, 23.0, 15.0))",0.0,"List(1, 2, List(), List(1.173250657609099, -1.173250657609099))","List(1, 2, List(), List(0.763732084677646, 0.23626791532235406))",0.0


In [24]:
pyfunc_m = mlflow.pyfunc.load_model(model_uri)
pyfunc_m
pandas_df = loan_stats_test.toPandas()
print("loaded model as pyfunc: " , pyfunc_m)
pyfunc_preds = pyfunc_m.predict(pandas_df)
print(pyfunc_preds)