# Tracking ML Model Training with MLflow and Delta Lake

It's a common story - a data team trains a model, deploys it to production, and all is good for a time. Then the model begins to make strange predictions, and it quickly becomes necessary to inspect and debug the model.

This notebook demonstrates how to use [MLflow](http://mlflow.org) and [Delta Lake](http://delta.io) to easily track, visualize, and reproduce model training runs for ease of debugging. It demonstrates how to:

1. Track and reproduce the exact snapshot of data used to build an ML pipeline.
2. Identify models that were trained on a particular snapshot of data.
3. Rerun training on a past snapshot of data (e.g. to reproduce an old model).

The notebook uses Delta Lake to provide data versioning and "time-travel" capabilities (restoring old versions of data), and MLflow to track data and query for runs that used a particular dataset.

**Requirements**:
* A cluster running Databricks Runtime 7.0 ML or above with the Maven library `org.mlflow:mlflow-spark:1.11.0` installed.

## Problem Statement: Classifying "bad loans" for a lender

This notebook tackles a classification problem on the Lending Club dataset, with the goal of identifying "bad loans" (loans likely to be unprofitable) based on a combination of credit scores, credit history, and other features.

The end goal is to produce an interpretable model that a loan officer can use before deciding whether to approve a loan. Such a model provides an informative view for the lender as well as an immediate estimate and response for the prospective borrower.

### The Data

The data used is public data from Lending Club. It includes all funded loans from 2012 to 2017. Each loan includes applicant information provided by the applicant as well as the current loan status (Current, Late, Fully Paid, etc.) and latest payment information. For a full view of the data view the [data dictionary](https://resources.lendingclub.com/LCDataDictionary.xlsx).

![Loan_Data](https://preview.ibb.co/d3tQ4R/Screen_Shot_2018_02_02_at_11_21_51_PM.png)


https://www.kaggle.com/wendykan/lending-club-loan-data

### Set up: create a Delta table in DBFS

Generate some example data in Delta Lake format by converting an existing Parquet table stored in DBFS.

In [0]:
from pyspark.sql.functions import *

# Remove table if it exists
DELTA_TABLE_DEFAULT_PATH = "/ml/loan_stats.delta"
dbutils.fs.rm(DELTA_TABLE_DEFAULT_PATH, recurse=True)
# Load & munge Lending Club data, then write to DBFS in Delta Lake format
lspq_path = "/databricks-datasets/samples/lending_club/parquet/"
data = spark.read.parquet(lspq_path)
# Select only the columns needed & apply other preprocessing
features = ["loan_amnt",  "annual_inc", "dti", "delinq_2yrs","total_acc", "total_pymnt", "issue_d", "earliest_cr_line"]
raw_label = "loan_status"
loan_stats_ce = data.select(*(features + [raw_label]))
print("------------------------------------------------------------------------------------------------")
print("Create bad loan label, this will include charged off, defaulted, and late repayments on loans...")
loan_stats_ce = loan_stats_ce.filter(loan_stats_ce.loan_status.isin(["Default", "Charged Off", "Fully Paid"]))\
                       .withColumn("bad_loan", (~(loan_stats_ce.loan_status == "Fully Paid")).cast("string"))
loan_stats_ce = loan_stats_ce.orderBy(rand()).limit(10000) # Limit rows loaded to facilitate running on Community Edition
print("------------------------------------------------------------------------------------------------")
print("Casting numeric columns into the appropriate types...")
loan_stats_ce = loan_stats_ce.withColumn('issue_year',  substring(loan_stats_ce.issue_d, 5, 4).cast('double')) \
                       .withColumn('earliest_year', substring(loan_stats_ce.earliest_cr_line, 5, 4).cast('double')) \
                       .withColumn('total_pymnt', loan_stats_ce.total_pymnt.cast('double'))
loan_stats_ce = loan_stats_ce.withColumn('credit_length_in_years', (loan_stats_ce.issue_year - loan_stats_ce.earliest_year))   
# Save table in Delta Lake format
loan_stats_ce.write.format("delta").mode("overwrite").save(DELTA_TABLE_DEFAULT_PATH)

------------------------------------------------------------------------------------------------
Create bad loan label, this will include charged off, defaulted, and late repayments on loans...
------------------------------------------------------------------------------------------------
Casting numeric columns into the appropriate types...


## 1. Tracking Data Version and Location For Reproducibility

This notebook accepts data version and data path as input parameters via widgets, allowing for reproducing a run of the notebook against an explicitly-specified data version and path in the future. The ability to specify data version is an advantage of using Delta Lake, which preserves previous versions of datasets so that you can restore them later.

In [0]:
# Pull data path and version from notebook params
dbutils.widgets.text(name="deltaVersion", defaultValue="1", label="Table version, default=latest")
dbutils.widgets.text(name="deltaPath", defaultValue="", label="Table path")

data_version = None if dbutils.widgets.get("deltaVersion") == "" else int(dbutils.widgets.get("deltaVersion"))
DELTA_TABLE_DEFAULT_PATH = "/ml/loan_stats.delta"
data_path = DELTA_TABLE_DEFAULT_PATH if dbutils.widgets.get("deltaPath")  == "" else dbutils.widgets.get("deltaPath")

### Load Data From Delta Table
Load data back in Delta Lake format, using the data path and version specified in the widgets.

In [0]:
# Use the latest version of the table by default, unless a version parameter is explicitly provided
if data_version is None:
  from delta.tables import DeltaTable  
  delta_table = DeltaTable.forPath(spark, data_path)
  version_to_load = delta_table.history(1).select("version").collect()[0].version  
else:
  version_to_load = data_version

loan_stats = spark.read.format("delta").option("versionAsOf", version_to_load).load(data_path)  

# Review data
display(loan_stats)

loan_amnt,annual_inc,dti,delinq_2yrs,total_acc,total_pymnt,issue_d,earliest_cr_line,loan_status,bad_loan,issue_year,earliest_year,credit_length_in_years
20275.0,54000.0,16.0,0.0,34.0,23287.5357564959,Jan-2016,Dec-2005,Fully Paid,False,2016.0,2005.0,11.0
18000.0,86000.0,7.87,0.0,35.0,20172.7607755799,Oct-2014,Mar-1999,Fully Paid,False,2014.0,1999.0,15.0
23000.0,135000.0,13.12,0.0,18.0,24889.5143698578,Sep-2015,Oct-1998,Fully Paid,False,2015.0,1998.0,17.0
12000.0,62000.0,15.87,0.0,19.0,13235.1854983258,May-2015,Aug-2005,Fully Paid,False,2015.0,2005.0,10.0
20000.0,95335.0,31.65,0.0,38.0,7664.81,Nov-2015,Jul-1991,Charged Off,True,2015.0,1991.0,24.0
28000.0,100000.0,16.64,1.0,18.0,28429.55,Nov-2015,Oct-1995,Fully Paid,False,2015.0,1995.0,20.0
31500.0,70000.0,29.68,0.0,24.0,34505.1200049334,Dec-2015,Dec-1985,Fully Paid,False,2015.0,1985.0,30.0
9000.0,60000.0,20.54,1.0,18.0,10418.0517147723,Jun-2012,Oct-1999,Fully Paid,False,2012.0,1999.0,13.0
6600.0,40000.0,20.52,0.0,29.0,7355.2000011204,Feb-2014,Apr-1987,Fully Paid,False,2014.0,1987.0,27.0
17475.0,90000.0,8.0,0.0,12.0,23876.8283334905,Jul-2015,Oct-2003,Fully Paid,False,2015.0,2003.0,12.0


### Review Delta Table History
All the transactions for this table are stored within this table including the initial set of insertions, update, delete, merge, and inserts.

In [0]:
spark.sql("DROP TABLE IF EXISTS loan_stats")
spark.sql("CREATE TABLE loan_stats USING DELTA LOCATION '" + DELTA_TABLE_DEFAULT_PATH + "'")

Out[4]: DataFrame[]

In [0]:
%sql
DESCRIBE HISTORY loan_stats

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
0,2022-10-01T06:09:09.000+0000,8787448846386860,zaid.haddad@slalom.com,WRITE,"Map(mode -> Overwrite, partitionBy -> [])",,List(2541115443740120),0802-021749-3bpim4jg,,WriteSerializable,False,"Map(numFiles -> 1, numOutputRows -> 10000, numOutputBytes -> 197828)",,Databricks-Runtime/11.1.x-cpu-ml-scala2.12


### Train a Model with Cross Validation for Hyperparameter Tuning
Train an ML pipeline using Spark MLlib. The metrics and params from your tuning runs are automatically tracked to MLflow for later inspection.

In [0]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler, OneHotEncoder, StandardScaler, Imputer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

import mlflow.spark
from pyspark.sql import SparkSession

# Use autolog API to automatically log parameters including data_version, data_path
mlflow.spark.autolog()

def _fit_crossvalidator(train, features, target):
  """
  Helper function that fits a CrossValidator model to predict a binary label
  `target` on the passed-in training DataFrame using the columns in `features`
  :param: train: Spark DataFrame containing training data
  :param: features: List of strings containing column names to use as features from `train`
  :param: target: String name of binary target column of `train` to predict
  """
  train = train.select(features + [target])
  model_matrix_stages = [
    Imputer(inputCols = features, outputCols = features),
    VectorAssembler(inputCols=features, outputCol="features"),
    StringIndexer(inputCol="bad_loan", outputCol="label")
  ]
  lr = LogisticRegression(maxIter=10, elasticNetParam=0.5, featuresCol = "features")
  pipeline = Pipeline(stages=model_matrix_stages + [lr])
  paramGrid = ParamGridBuilder().addGrid(lr.regParam, [0.1, 0.01]).build()
  crossval = CrossValidator(estimator=pipeline,
                            estimatorParamMaps=paramGrid,
                            evaluator=BinaryClassificationEvaluator(),
                            numFolds=5)

  cvModel = crossval.fit(train)
  return cvModel.bestModel

In [0]:
# Fit model & display ROC
features = ["loan_amnt",  "annual_inc", "dti", "delinq_2yrs","total_acc", "credit_length_in_years"]
glm_model = _fit_crossvalidator(loan_stats, features, target="bad_loan")
lr_summary = glm_model.stages[len(glm_model.stages)-1].summary
display(lr_summary.roc)

In [0]:
print("ML Pipeline accuracy: %s" % lr_summary.accuracy)

### View Training Results in the MLflow Experiment Runs sidebar

The model training code above automatically logged metrics and params under an MLflow run, which you can view using the [MLflow Runs Sidebar](https://databricks.com/blog/2019/04/30/introducing-mlflow-run-sidebar-in-databricks-notebooks.html). Click Experiment at the upper right to display the Experiment Runs sidebar.

### Feature Engineering: Evolve Data Schema

You can do some feature engineering to potentially improve model performance, using Delta Lake to track older versions of the dataset. First, add a feature tracking the total amount of money earned or lost per loan:

In [0]:
print("------------------------------------------------------------------------------------------------")
print("Calculate the total amount of money earned or lost per loan...")
loan_stats_new = loan_stats.withColumn('net', round( loan_stats.total_pymnt - loan_stats.loan_amnt, 2))

Save the updated table, passing the `mergeSchema` option to safely evolve its schema.

In [0]:
loan_stats_new.write.option("mergeSchema", "true").format("delta").mode("overwrite").save(DELTA_TABLE_DEFAULT_PATH)

In [0]:
# See the difference between the original & modified schemas
set(loan_stats_new.schema.fields) - set(loan_stats.schema.fields)

Retrain the model on the updated data and compare its performance to the original.

In [0]:
# Return ROC
glm_model_new = _fit_crossvalidator(loan_stats_new, features + ["net"], target="bad_loan")
lr_summary_new = glm_model_new.stages[len(glm_model_new.stages)-1].summary
display(lr_summary_new.roc)

FPR,TPR
0.0,0.0
0.0,0.048076923076923
0.0,0.0961538461538461
0.0,0.1442307692307692
0.0,0.1923076923076923
0.0,0.2403846153846154
0.0,0.2884615384615384
0.0,0.3365384615384615
0.0,0.3846153846153846
0.0,0.4326923076923077


In [0]:
print("ML Pipeline accuracy: %s" % lr_summary_new.accuracy)

## 2. Find runs that used the original data version

Model accuracy improved from ~80% to ~95% after the feature engineering step. You might therefore wonder: what if you retrained all models built off of the original dataset against the feature-engineered dataset? Would there be similar improvements in model performance?

To identify other runs launched against the original dataset, use MLflow's `mlflow.search_runs` API:

In [0]:
mlflow.search_runs(filter_string="tags.sparkDatasourceInfo LIKE 'path=%{path},version={version},%'".format(path=data_path, version=0))

Unnamed: 0,run_id,experiment_id,status,artifact_uri,start_time,end_time,params.estimator,params.mlModelClass,params.mlEstimatorUid,params.estimatorParamMapsLength,params.numFolds,params.evaluator,tags.mlflow.user,tags.mlflow.databricks.notebookRevisionID,tags.mlflow.source.name,tags.mlflow.databricks.notebookPath,tags.mlflow.source.type,tags.fit_uuid,tags.sparkDatasourceInfo,tags.mlflow.databricks.notebookID,tags.mlflow.databricks.webappURL,tags.mlflow.rootRunId,tags.runSource,tags.mlflow.databricks.cluster.id,tags.mlflow.databricks.cluster.info
0,bd21c63ba33f4e5ca038fe66da956b90,3576320062614005,FINISHED,dbfs:/databricks/mlflow-tracking/3576320062614...,2020-10-22 21:29:43.278000+00:00,2020-10-22 21:29:43.406000+00:00,Pipeline,CrossValidator,CrossValidator_d0e58808b792,2,5,BinaryClassificationEvaluator,andrea.kress@databricks.com,1603402183526,/Users/andrea.kress@databricks.com/__Brooke_ne...,/Users/andrea.kress@databricks.com/__Brooke_ne...,NOTEBOOK,e3363b,"path=dbfs:/ml/loan_stats.delta,version=0,forma...",3576320062614005,https://oregon.cloud.databricks.com,bd21c63ba33f4e5ca038fe66da956b90,mllibAutoTracking,,
1,ebcbc7fd172b4c1c9649cc2018fce5f3,3576320062614005,FINISHED,dbfs:/databricks/mlflow-tracking/3576320062614...,2020-10-22 21:29:14.974000+00:00,2020-10-22 21:29:15.111000+00:00,Pipeline,CrossValidator,CrossValidator_0e9cfa50feec,2,5,BinaryClassificationEvaluator,andrea.kress@databricks.com,1603402155209,/Users/andrea.kress@databricks.com/__Brooke_ne...,/Users/andrea.kress@databricks.com/__Brooke_ne...,NOTEBOOK,0b2cdb,"path=dbfs:/ml/loan_stats.delta,version=0,forma...",3576320062614005,https://oregon.cloud.databricks.com,ebcbc7fd172b4c1c9649cc2018fce5f3,mllibAutoTracking,,
2,4d971564294c4ed0a6984cdcbfd2af2e,3576320062614005,FINISHED,dbfs:/databricks/mlflow-tracking/3576320062614...,2020-10-22 21:26:10.609000+00:00,2020-10-22 21:26:10.783000+00:00,Pipeline,CrossValidator,CrossValidator_54569541fab2,2,5,BinaryClassificationEvaluator,andrea.kress@databricks.com,1603401970877,/Users/andrea.kress@databricks.com/__Brooke_ne...,/Users/andrea.kress@databricks.com/__Brooke_ne...,NOTEBOOK,ef27c7,"path=dbfs:/ml/loan_stats.delta,version=0,forma...",3576320062614005,https://oregon.cloud.databricks.com,4d971564294c4ed0a6984cdcbfd2af2e,mllibAutoTracking,,
3,7d1f863793ab47d3917d8594918b5b19,3576320062614005,FINISHED,dbfs:/databricks/mlflow-tracking/3576320062614...,2020-10-22 21:25:38.465000+00:00,2020-10-22 21:25:38.590000+00:00,Pipeline,CrossValidator,CrossValidator_7d353687e3de,2,5,BinaryClassificationEvaluator,andrea.kress@databricks.com,1603401938746,/Users/andrea.kress@databricks.com/__Brooke_ne...,/Users/andrea.kress@databricks.com/__Brooke_ne...,NOTEBOOK,df8a04,"path=dbfs:/ml/loan_stats.delta,version=0,forma...",3576320062614005,https://oregon.cloud.databricks.com,7d1f863793ab47d3917d8594918b5b19,mllibAutoTracking,,
4,25c6aff7657b40c987826140d43444d8,3576320062614005,FINISHED,dbfs:/databricks/mlflow-tracking/3576320062614...,2020-10-22 20:49:56.909000+00:00,2020-10-22 20:49:57.054000+00:00,Pipeline,CrossValidator,CrossValidator_f4a85085d435,2,5,BinaryClassificationEvaluator,andrea.kress@databricks.com,1603399797157,/Users/andrea.kress@databricks.com/__Brooke_ne...,/Users/andrea.kress@databricks.com/__Brooke_ne...,NOTEBOOK,096ffa,path=dbfs:/ml/loan_stats.delta/_delta_log/0000...,3576320062614005,https://oregon.cloud.databricks.com,25c6aff7657b40c987826140d43444d8,mllibAutoTracking,0814-210843-users204,"{""cluster_id"":""0814-210843-users204"",""creator_..."
5,e69e0d1744864442b705195524c44d91,3576320062614005,FINISHED,dbfs:/databricks/mlflow-tracking/3576320062614...,2020-10-22 20:49:21.598000+00:00,2020-10-22 20:49:21.818000+00:00,Pipeline,CrossValidator,CrossValidator_de972bb75afc,2,5,BinaryClassificationEvaluator,andrea.kress@databricks.com,1603399761914,/Users/andrea.kress@databricks.com/__Brooke_ne...,/Users/andrea.kress@databricks.com/__Brooke_ne...,NOTEBOOK,e57996,path=dbfs:/ml/loan_stats.delta/_delta_log/0000...,3576320062614005,https://oregon.cloud.databricks.com,e69e0d1744864442b705195524c44d91,mllibAutoTracking,0814-210843-users204,"{""cluster_id"":""0814-210843-users204"",""creator_..."
6,f2601b993b58445f9c01bce4dd43b24e,3576320062614005,FINISHED,dbfs:/databricks/mlflow-tracking/3576320062614...,2020-10-22 20:42:24.019000+00:00,2020-10-22 20:42:24.187000+00:00,Pipeline,CrossValidator,CrossValidator_1619e089858e,2,5,BinaryClassificationEvaluator,andrea.kress@databricks.com,1603399344284,/Users/andrea.kress@databricks.com/__Brooke_ne...,/Users/andrea.kress@databricks.com/__Brooke_ne...,NOTEBOOK,cb5c93,path=dbfs:/ml/loan_stats.delta/_delta_log/0000...,3576320062614005,https://oregon.cloud.databricks.com,f2601b993b58445f9c01bce4dd43b24e,mllibAutoTracking,0814-210843-users204,"{""cluster_id"":""0814-210843-users204"",""creator_..."
7,ef64474c88b74a76873f29f964dc1c11,3576320062614005,FINISHED,dbfs:/databricks/mlflow-tracking/3576320062614...,2020-10-22 20:41:52.023000+00:00,2020-10-22 20:41:52.193000+00:00,Pipeline,CrossValidator,CrossValidator_80e5f319035a,2,5,BinaryClassificationEvaluator,andrea.kress@databricks.com,1603399312324,/Users/andrea.kress@databricks.com/__Brooke_ne...,/Users/andrea.kress@databricks.com/__Brooke_ne...,NOTEBOOK,497d7c,path=dbfs:/ml/loan_stats.delta/_delta_log/0000...,3576320062614005,https://oregon.cloud.databricks.com,ef64474c88b74a76873f29f964dc1c11,mllibAutoTracking,0814-210843-users204,"{""cluster_id"":""0814-210843-users204"",""creator_..."


## 3. Load back and reproduce runs against a snapshot of data
Finally, you can load back a specific version of the data for use in model re-training. To do this, simply update the widgets above with a data version of 1 (corresponding to the feature-engineered data) and rerun from section 1) of this notebook.