#Evaluating Risk for Loan Approvals

## using GLM prediction model

## Business Value

Being able to accurately assess the risk of a loan application can save a lender the cost of holding too many risky assets. Rather than a credit score or credit history which tracks how reliable borrowers are, we will generate a score of how profitable a loan will be compared to other loans in the past. The combination of credit scores, credit history, and profitability score will help increase the bottom line for financial institution.

Having a interporable model that a loan officer can use before performing a full underwriting can provide immediate estimate and response for the borrower and a informative view for the lender.

In this notebook we're playing the role of a Data Scientist and building an elementary model which our Dev Ops team will publish.

Data is again Loan dataset - free and publicaly available. More: https://www.kaggle.com/wendykan/lending-club-loan-data

-sandbox
### Tracking Experiments with MLflow

Over the course of the machine learning lifecycle, data scientists test many different models from various libraries with different hyperparemeters.  Tracking these various results poses an organizational challenge.  In brief, storing experiements, results, models, supplementary artifacts, and code creates significant challenges in the machine learning lifecycle.

MLflow Tracking is a logging API specific for machine learning and agnostic to libraries and environments that do the training.  It is organized around the concept of **runs**, which are executions of data science code.  Runs are aggregated into **experiments** where many runs can be a part of a given experiment and an MLflow server can host many experiments.

Each run can record the following information:

- **Parameters:** Key-value pairs of input parameters such as the number of trees in a random forest model
- **Metrics:** Evaluation metrics such as RMSE or Area Under the ROC Curve
- **Artifacts:** Arbitrary output files in any format.  This can include images, pickled models, and data files
- **Source:** The code that originally ran the experiement

MLflow tracking also serves as a **model registry** so tracked models can easily be stored and, as necessary, deployed into production.

Experiments can be tracked using libraries in Python, R, and Java as well as by using the CLI and REST calls.

<div><img src="https://pages.databricks.com/rs/094-YMS-629/images/mlflow-tracking.png" style="height: 300px; margin: 20px"/></div>
<div><img src="https://pages.databricks.com/rs/094-YMS-629/images/3 - Unify data and ML across the full lifecycle.png" width="950"></div>

### Databricks MLflow Integration
Managed MLFlow tracking server is available in community edition, the model registry is not.
This notebook will walk you through creating a model, track with MLFlow and **show** you how to register it with the registry when not in Community Edition

Automated MLFlow tracking for Hyper-parameter tuning demo:
- Where we use features in this notebook that aren't available in community edition, we'll just leave the markdown or comment them out
- here we've seperated out the deployment of the model to AWS, but a data scientist could deploy the model if they had the correct access to AWS

In [0]:
# Imports
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier, DecisionTreeClassificationModel
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import StringIndexer
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder, TrainValidationSplit

import mlflow,pyspark
import mlflow.mleap
import os

mlf_ver = mlflow.__version__
mlf_max_ver = (1,11,0)

mlf_ver_i = tuple(map(int, mlf_ver.split(".")))

#assert mlf_ver_i <= mlf_max_ver, f"Current MLFlow Version {mlf_ver}. Must at most {mlf_max_ver}. Please use a cluster with max runtime 6.3ML"

mlflow.pyspark.ml.autolog()

print(f"MLFlow Version:{mlflow.__version__}\n" +\
      f"Pyspark Version:{pyspark.__version__}")

# Data Engineering
- clean up the features for modelling
- review the feature characteristics
- add additional features we may wish to include in our model
- choose your databsename,deltatable location path ,registeredmodelname uniquely with a Thumbrule your firstname followed by (four random digits)zzzz_ml_devday example : aws_sko  do  a find replace using the edit menu of the notebook above find and replace naseer0814 to your desired uniquename using the thumb rule above.

In [0]:
%sql 
DROP DATABASE IF EXISTS aws_ml_devday CASCADE;
CREATE DATABASE IF NOT EXISTS aws_ml_devday;
USE aws_ml_devday;

In [0]:
%sql
DESCRIBE DATABASE aws_ml_devday;

In [0]:
# Configure location of loanstats_2012_2017.parquet
lspq_path = "/databricks-datasets/samples/lending_club/parquet/"

# Read loanstats_2012_2017.parquet
data = spark.read.parquet(lspq_path)

<img src="https://raw.githubusercontent.com/databricks/koalas/master/icons/koalas-logo.png" width=150/>

Commonly used by data scientists, pandas is a Python package that provides easy-to-use data structures and data analysis tools for the Python programming language. However, pandas does not scale out to big data. Koalas fills this gap by providing pandas equivalent APIs that work on Apache Spark. Koalas is useful not only for pandas users but also PySpark users, because Koalas supports many tasks that are difficult to do with PySpark, for example plotting data directly from a PySpark DataFrame.

In [0]:
import databricks.koalas as ks
kdf = data.to_koalas()

In [0]:
kdf[['loan_amnt','loan_status']].head(5)

In [0]:
kdf.groupby(['loan_status']).agg({'loan_amnt': ['sum','count']})

In [0]:
import seaborn as sns
sns.set_style('whitegrid')

%matplotlib inline
kdf['bad_loan'] = (kdf['loan_status'] != 'Fully Paid').astype('int')

sns.barplot(data=
  (kdf[(~kdf.grade.isnull()) & (kdf.loan_status.isin(["Default", "Charged Off", "Fully Paid"]))]
 .pivot_table(index=['grade'],columns='bad_loan',values='loan_amnt',aggfunc='count')
 .assign(df_ratio=lambda x: x[1]/(x[0]+x[1]))
 .sort_index()
 .reset_index()
 .to_pandas()
),
  x='grade',y='df_ratio'
)

In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import pandas_udf, PandasUDFType
import pandas as pd

def clean_pct(x: pd.Series) -> pd.Series:
  return x.str.strip('%').astype('double')

pandas_udf(clean_pct,returnType=FloatType())

def clean_loan_stats(df):
  kdf = df.to_koalas()
  kdf = kdf[["loan_status", "int_rate", "revol_util", "issue_d", "earliest_cr_line", "emp_length", "verification_status", "total_pymnt", "loan_amnt", "grade", "annual_inc", "dti", "addr_state", "term", "home_ownership", "purpose", "application_type", "delinq_2yrs", "total_acc"]]
  
  kdf = kdf[kdf.loan_status.isin(["Default", "Charged Off", "Fully Paid"])]
  
  # Create bad loan label, this will include charged off, defaulted, and late repayments on loans...
  kdf["bad_loan"] = (kdf['loan_status'] != 'Fully Paid').astype('int')
  
  # Clean and convert some of the string data into numerical data
  for c in ['int_rate', 'revol_util']:
    kdf[c] = clean_pct(kdf[c])
  
  kdf['issue_year'] = kdf['issue_d'].str.slice(4,9).astype('double') 
  kdf['earliest_year'] = kdf['earliest_cr_line'].str.slice(4,9).astype('double')
  kdf['total_pymnt'] = kdf['total_pymnt'].astype('float')
  
  kdf["credit_length_in_years"] = kdf['issue_year'] - kdf['earliest_year']
  
  # Converting emp_length column into numeric...
  kdf['emp_length'] = (kdf['emp_length']
                       .str.replace("([ ]*[a-zA-Z].*)|(n\/a)", "")
                       .str.replace("< 1", "0")
                       .str.replace("10\\+", "10")
                       .astype('float')
                      )
  
  return kdf.to_spark()

In [0]:
loan_stats = clean_loan_stats(data)
display(loan_stats)

## ![Delta Lake Tiny Logo](https://pages.databricks.com/rs/094-YMS-629/images/delta-lake-tiny-logo.png) Leverage Databricks Delta for Data versioning
We leverage Delta Lake for storing the cleaned data. Delta Lake comes with a number of very handy features in the context of Machine Learning: For example, ACID transactions allow us to merge merge new data into the data set while maintaining a consistent state and time-travel allows us to restore previous versions of a table that may have been used to train a model.

In [0]:

# Configure Path and temp view name
DELTALAKE_GOLD_PATH = ('/ml/aws_sko/loan_stats.delta')

#tviewname = dbutils.widgets.get('tviewname')

# Remove table if it exists
dbutils.fs.rm(DELTALAKE_GOLD_PATH, recurse=True)

# Save table as Delta Lake
(loan_stats
 .write
 .format("delta")
 .option('path',DELTALAKE_GOLD_PATH)
 .mode("overwrite")
 .saveAsTable('loan_stats_delta')
)

In [0]:
%sql
DESCRIBE TABLE EXTENDED loan_stats_delta;

In [0]:
%sql
select * from loan_stats_delta;

In [0]:
display(dbutils.fs.ls(DELTALAKE_GOLD_PATH))

In [0]:
%sql
select issue_year, count(*) 
from loan_stats_delta
group by 1
order by 1

In [0]:
%sql
delete
from loan_stats_delta
where issue_year <= 2013

In [0]:
%sql
select issue_year, count(*) 
from loan_stats_delta
group by 1
order by 1

In [0]:
%sql
describe history loan_stats_delta

In [0]:
%sql
select issue_year, count(*) 
from loan_stats_delta@v0
group by 1
order by 1

In [0]:
%sql
RESTORE TABLE loan_stats_delta TO VERSION AS OF 0

In [0]:
%sql
select *
from loan_stats_delta
where issue_year <= 2013
limit 20

##![Delta Lake Logo Tiny](https://pages.databricks.com/rs/094-YMS-629/images/delta-lake-tiny-logo.png) Schema Evolution
With the `mergeSchema` option, you can evolve your Delta Lake table schema. Imagine that we decide to add two additional features:

In [0]:
from pyspark.sql.functions import *

print("------------------------------------------------------------------------------------------------")
print("Map multiple levels into one factor level for verification_status...")
loan_stats = loan_stats.withColumn('verification_status', trim(regexp_replace(loan_stats.verification_status, 'Source Verified', 'Verified')))

print("------------------------------------------------------------------------------------------------")
print("Calculate the total amount of money earned or lost per loan...")
loan_stats = loan_stats.withColumn('net', round( loan_stats.total_pymnt - loan_stats.loan_amnt, 2))

In [0]:
# Add the mergeSchema option
loan_stats.write.option("mergeSchema","true").format("delta").mode("overwrite").save(DELTALAKE_GOLD_PATH)

In [0]:
%sql 
select * from loan_stats_delta

In [0]:
# note the positively skewed distribution
display(loan_stats)

# First iteration over the model

Acknowledging our data skew, we're ready to take a look at our first model
Logistic Regression is a staple in classification (although assumes a normal distribution - we're making several leaps here in the interest of showing the tooling)

In [0]:
# again, for expediancy, we're splitting on date - and caching
print("------------------------------------------------------------------------------------------------")
print("Setting variables to predict bad loans")
myY = "bad_loan"
categoricals = ["term", "home_ownership", "purpose", "addr_state",
                "verification_status","application_type"]
numerics = ["loan_amnt","emp_length", "annual_inc","dti",
            "delinq_2yrs","revol_util","total_acc",
            "credit_length_in_years"]
myX = categoricals + numerics

loan_stats2 = loan_stats.select(myX + [myY, "int_rate", "net", "issue_year"])

# note the use of cache here: if using a delta optimised instance this isn't required
# we'll accept that splitting on date isn't best practice
train = loan_stats2.filter(loan_stats2.issue_year <= 2015)
valid = loan_stats2.filter(loan_stats2.issue_year > 2015)

train.count(), valid.count()

In [0]:
display(train) # intrigingly the distribution is less skewed
# display(valid) # seems more positively skewed

### Logistic Regression Notes
* We will be using the Apache Spark pre-installed GLM and GBTClassifier models in this noteboook
* **GLM** is in reference to *generalized linear models*; the Apache Spark *logistic regression* model is a special case of a [generalized linear model](https://spark.apache.org/docs/2.2.0/ml-classification-regression.html#logistic-regression)
* We will also use BinaryClassificationEvaluator, CrossValidator, and ParamGridBuilder to tune our models.
* References to max F1 threshold (i.e. F_1 score or F-score or F-measure) is the measure of our logistic regression model's accuracy; more information can be found at [F1 score](https://en.wikipedia.org/wiki/F1_score).
* **GBTClassifier** is in reference to *gradient boosted tree classifier* which is a popular classification and regression method using ensembles of decision trees; more information can be found at [Gradiant Boosted Tree Classifier](https://spark.apache.org/docs/2.2.0/ml-classification-regression.html#gradient-boosted-tree-classifier)
* In a subsequent notebook, we will be using the XGBoost, an optimized distributed gradient boosting library.  
  * Underneath the covers, we will be using *XGBoost4J-Spark* - a project aiming to seamlessly integrate XGBoost and Apache Spark by fitting XGBoost to Apache Spark’s MLLIB framework.  More inforamtion can be found at [XGBoost4J-Spark Tutorial](https://xgboost.readthedocs.io/en/latest/jvm/xgboost4j_spark_tutorial.html).
  
### MLFlow
* we don't need to log the individual paramaters or metrics, with 'trackMLlib.enabled' this a tracked.
* MLFlow autotracking is also available (see more [here](https://www.mlflow.org/docs/latest/python_api/mlflow.keras.html))
* results from the next cell will be captured under the 'runs' button

In [0]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler, OneHotEncoder
from pyspark.ml.feature import StandardScaler, Imputer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

## Current possible ways to handle categoricals in string indexer is 'error', 'keep', and 'skip'
indexers = map(lambda c: StringIndexer(inputCol=c, outputCol=c+"_idx", handleInvalid = 'keep'), categoricals)
ohes = map(lambda c: OneHotEncoder(inputCol=c + "_idx", outputCol=c+"_class"),categoricals)
imputers = Imputer(inputCols = numerics, outputCols = numerics)

# Establish features columns
featureCols = list(map(lambda c: c+"_class", categoricals)) + numerics

# Build the stage for the ML pipeline
# Build the stage for the ML pipeline
model_matrix_stages = list(indexers) + list(ohes) + [imputers] + \
                     [VectorAssembler(inputCols=featureCols, outputCol="features"), StringIndexer(inputCol="bad_loan", outputCol="label")]

# Apply StandardScaler to create scaledFeatures
scaler = StandardScaler(inputCol="features",
                        outputCol="scaledFeatures",
                        withStd=True,
                        withMean=True)

# Use logistic regression 
lr = LogisticRegression(maxIter=10, elasticNetParam=0.5, featuresCol = "scaledFeatures")

# Build our ML pipeline
pipeline = Pipeline(stages=model_matrix_stages+[scaler]+[lr])

# Build the parameter grid for model tuning
# (building with one parameter to reduce training time on Community Edition)
paramGrid = ParamGridBuilder() \
              .addGrid(lr.regParam, [0.1, 0.01]) \
              .build()

# uncomment for the elasticNetParam for a more interesting grid (increases the training time)
# paramGrid = ParamGridBuilder() \
#              .addGrid(lr.regParam, [0.1, 0.01]) \
#              .addGrid(lr.elasticNetParam, [0.5, 0.2]) \
#              .build()

# Execute CrossValidator for model tuning
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(),
                          numFolds=5)

# this will auto track our params, also inherits the current active run
cvModel = crossval.fit(train) 

glm_model = cvModel.bestModel

# Return ROC
lr_summary = glm_model.stages[len(glm_model.stages)-1].summary

# you may need to adjust the following to get the FPR on the X axis, and TPR on the Y-Axis
display(lr_summary.roc)

## Review the results in the run window

Using the comparison tool we can now make a choice about our model

# MLFlow and Model registry
* log the model and model registry to catalog
* this time we want to log the model as part of the run
* we'll use the UI to explore the model registry from our DS persona

In [0]:
registered_model_name = 'aws-ml-devday'

# we need a run to attach to and record and log the model:
with mlflow.start_run(run_name='production_run'):
  
  cvModel = crossval.fit(train) # this will auto track our params, also inherits the current active run
  
  glm_model = cvModel.bestModel
  
  #from mlflow import spark
  
  # log the model and register it with the model repository 
  try:
    mlflow.spark.log_model(glm_model, artifact_path='glm_model', registered_model_name=registered_model_name) 
  except:
    mlflow.spark.log_model(glm_model, artifact_path='glm_model')


In [0]:
# note that no metrics are captured
from pyspark.ml.classification import GBTClassifier

# Establish stages for our GBT model
indexers = map(lambda c: StringIndexer(inputCol=c, outputCol=c+"_idx", handleInvalid = 'keep'), categoricals)
imputers = Imputer(inputCols = numerics, outputCols = numerics)
featureCols = list(map(lambda c: c+"_idx", categoricals)) + numerics

# Define vector assemblers
model_matrix_stages = list(indexers) + [imputers] + \
                     [VectorAssembler(inputCols=featureCols, outputCol="features"), StringIndexer(inputCol="bad_loan", outputCol="label")]

# Define a GBT model.
gbt = GBTClassifier(featuresCol="features",
                    labelCol="label",
                    lossType = "logistic",
                    maxBins = 52,
                    maxIter=20,
                    maxDepth=5)

# Chain indexer and GBT in a Pipeline
pipeline = Pipeline(stages=model_matrix_stages+[gbt])

# Train model.  This also runs the indexer.
gbt_model = pipeline.fit(train)

In [0]:

from pyspark.mllib.evaluation import BinaryClassificationMetrics
from pyspark.ml.linalg import Vectors

def extract(row):
  return (row.net,) + tuple(row.probability.toArray().tolist()) +  (row.label,) + (row.prediction,)

def score(model,data):
  pred = model.transform(data).select("net", "probability", "label", "prediction")
  pred = pred.rdd.map(extract).toDF(["net", "p0", "p1", "label", "prediction"])
  return pred 

def auc(pred):
  metric = BinaryClassificationMetrics(pred.select("p1", "label").rdd)
  return metric.areaUnderROC

glm_train = score(glm_model, train)
glm_valid = score(glm_model, valid)
gbt_train = score(gbt_model, train)
gbt_valid = score(gbt_model, valid)

glm_train.createOrReplaceTempView("glm_train")
glm_valid.createOrReplaceTempView("glm_valid")
gbt_train.createOrReplaceTempView("gbt_train")
gbt_valid.createOrReplaceTempView("gbt_valid")

# instead of running this, we'll use MLFlow to bring it all together in a single experiment
 # print ("GLM Training AUC:" + str(auc(glm_train)))
 # print ("GLM Validation AUC :" + str(auc(glm_valid)))
 # print ("GBT Training AUC :" + str(auc(gbt_train)))
#  print ("GBT Validation AUC :" + str(auc(gbt_valid)))

In [0]:
%scala
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
// import org.apache.spark.sql.functions.typedLit
import org.apache.spark.sql.functions.{array, lit, map, struct}

def roc(pred:org.apache.spark.sql.DataFrame, model_id:String): org.apache.spark.sql.DataFrame = {
  var testScoreAndLabel = pred.select("p1", "label").map{ case Row(p:Double,l:Double) => (p,l)}
  val metrics = new BinaryClassificationMetrics(testScoreAndLabel.rdd, 100)
  val roc = metrics.roc().toDF().withColumn("model", lit(model_id))
  return roc
}

val glm_train = roc( spark.table("glm_train"), "glm_train")
val glm_valid = roc( spark.table("glm_valid"), "glm_valid")
val gbt_train = roc( spark.table("gbt_train"), "gbt_train")
val gbt_valid = roc( spark.table("gbt_valid"), "gbt_valid")

val roc_curves = glm_train.union(glm_valid).union(gbt_train).union(gbt_valid)

//settings for the chart: Series grouping : model, x-axis: _1, y-axis: _2
display(roc_curves)

## Quantify the Business Value

A great way to quickly understand the business value of this model is to use the confusion matrix to evaluate the cost of a poor loan decision.  The definition of our matrix is as follows:

* Prediction=1, Label=1 (Blue) : Correctly found bad loans. sum_net = loss avoided.
* Prediction=1, Label=0 (Orange) : Incorrectly labeled bad loans. sum_net = profit forfeited.
* Prediction=0, Label=1 (Green) : Incorrectly labeled good loans. sum_net = loss still incurred.
* Prediction=0, Label=0 (Red) : Correctly found good loans. sum_net = profit retained.

The following code snippet calculates the following confusion matrix.

In [0]:
# configure plot as bar; series groupings: label, prediction, value; sum_net
display(glm_valid.groupBy("label", "prediction").agg((sum(col("net"))).alias("sum_net")))

<img src="https://pages.databricks.com/rs/094-YMS-629/images/stop.png?raw=true" width=50/>
### Stop the notebook for setting experiment location

In [0]:
dbutils.notebook.exit("stop")

In [0]:
# by setting the experiment we now log to a single location in the workspace: In the sidebar, click the home button select your user name and then right click and create a folder named mlflow and set the the experiment below. example : mlflow.set_experiment('/Users/userfirstname.userlastname@emaildomain.com/mlflow/gbt-glm-new')

In [0]:
dbutils.fs.ls('/')

In [0]:
user_email=dbutils.notebook.entry_point.getDbutils().notebook().getContext().tags().apply('user')
mlflow.create_experiment(f"/Users/{user_email}/gbt-glm-new1")

with mlflow.start_run(run_name="glm"):
  mlflow.log_param("regParam", 0.01)
  mlflow.log_metric("auc", auc(glm_valid))
  
  # we can log via the API or through the UI (we've already registered one model)
  # will not register for community edition
  try:
    mlflow.spark.log_model(glm_model, artifact_path='glm_model', registered_model_name=registered_model_name) 
  except:
    mlflow.spark.log_model(glm_model, 'glm_model')
    
with mlflow.start_run(run_name="gbt"):
  mlflow.log_param("maxDepth", 5)
  mlflow.log_metric("auc", auc(gbt_valid))
  mlflow.spark.log_model(gbt_model, 'gbt_model')
