In [0]:
import pyspark
import pyspark.sql
import pyspark.streaming
import pyspark.mllib
import pyspark.ml
from pyspark.sql import SparkSession

In [0]:
pyspark.__version__

In [0]:
import warnings
warnings.filterwarnings("ignore", category=DeprecationWarning)

In [0]:
spark = SparkSession.builder \
    .appName('spark_for_conservative_credit_score') \
    .master('local[*]') \
    .config('spark.driver.memory','8G') \
    .config('spark.driver.maxResultSize', '2G') \
    .config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \
    .config('spark.kryoserializer.buffer.max', '800M')\
    .getOrCreate()

In [0]:
from pyspark.sql.types import IntegerType
from pyspark.sql.types import DoubleType

In [0]:
train = spark.read.csv('/data/paper_train1.csv', header=True)

In [0]:
# loading train data set
file_location = "/data/paper_train1.csv"
file_type = "csv"
# CSV options
infer_schema = "false"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
train = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location).cache()

# loading test data set
file_location = "/data/paper_valid1.csv"
file_type = "csv"
# CSV options
infer_schema = "false"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
valid = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location).cache()

Feature Engineering

In [0]:
from pyspark.sql.functions import *
train = train.withColumn("loan_amnt", train.loan_amnt.cast("float"))\
             .withColumn("emp_length", train.emp_length.cast("float"))\
             .withColumn("annual_inc", train.annual_inc.cast("float"))\
             .withColumn("dti", train.dti.cast("float"))\
             .withColumn("delinq_2yrs", train.delinq_2yrs.cast("float"))\
             .withColumn("revol_util", regexp_replace("revol_util", "%", "").cast("float"))\
             .withColumn("total_acc", train.total_acc.cast("float"))\
             .withColumn("credit_length_in_years", train.credit_length_in_years.cast("float"))\
             .withColumn("int_rate", regexp_replace("int_rate", "%", "").cast("float"))\
             .withColumn("remain", train.remain.cast("float"))\
             .withColumn("issue_year", train.issue_year.cast("float"))\
             .withColumn("phi_loan_amnt", train.phi_loan_amnt.cast("float"))\
             .withColumn("phi_emp_length", train.phi_emp_length.cast("float"))\
             .withColumn("phi_annual_inc", train.phi_annual_inc.cast("float"))\
             .withColumn("phi_dti", train.phi_dti.cast("float"))\
             .withColumn("phi_delinq_2yrs", train.phi_delinq_2yrs.cast("float"))\
             .withColumn("phi_revol_util", regexp_replace("phi_revol_util", "%", "").cast("float"))\
             .withColumn("phi_total_acc", train.phi_total_acc.cast("float"))\
             .withColumn("phi_credit_length_in_years", train.phi_credit_length_in_years.cast("float"))\
             .withColumn("phi_int_rate", regexp_replace("phi_int_rate", "%", "").cast("float"))\
             .withColumn("CRI", train.CRI.cast("float"))\
             .withColumn("train_flag", train.train_flag.cast("float"))

valid = valid.withColumn("loan_amnt", valid.loan_amnt.cast("float"))\
             .withColumn("emp_length", valid.emp_length.cast("float"))\
             .withColumn("annual_inc", valid.annual_inc.cast("float"))\
             .withColumn("dti", valid.dti.cast("float"))\
             .withColumn("delinq_2yrs", valid.delinq_2yrs.cast("float"))\
             .withColumn("revol_util", regexp_replace("revol_util", "%", "").cast("float"))\
             .withColumn("total_acc", valid.total_acc.cast("float"))\
             .withColumn("credit_length_in_years", valid.credit_length_in_years.cast("float"))\
             .withColumn("int_rate", regexp_replace("int_rate", "%", "").cast("float"))\
             .withColumn("remain", valid.remain.cast("float"))\
             .withColumn("issue_year", valid.issue_year.cast("float"))\
             .withColumn("phi_loan_amnt", valid.phi_loan_amnt.cast("float"))\
             .withColumn("phi_emp_length", valid.phi_emp_length.cast("float"))\
             .withColumn("phi_annual_inc", valid.phi_annual_inc.cast("float"))\
             .withColumn("phi_dti", valid.phi_dti.cast("float"))\
             .withColumn("phi_delinq_2yrs", valid.phi_delinq_2yrs.cast("float"))\
             .withColumn("phi_revol_util", regexp_replace("phi_revol_util", "%", "").cast("float"))\
             .withColumn("phi_total_acc", valid.phi_total_acc.cast("float"))\
             .withColumn("phi_credit_length_in_years", valid.phi_credit_length_in_years.cast("float"))\
             .withColumn("phi_int_rate", regexp_replace("phi_int_rate", "%", "").cast("float"))\
             .withColumn("CRI", valid.CRI.cast("float"))\
             .withColumn("train_flag", valid.train_flag.cast("float"))

In [0]:
train.registerTempTable("train")
train.write.parquet('AA_DFW_ALL.parquet', mode='overwrite')
valid.registerTempTable("valid")
valid.write.parquet('AA_DFW_ALL.parquet', mode='overwrite')

print(" >>>>>>> " + str(train.count())+ " loans opened by TRAIN data_set for model training!")
print(" >>>>>>> " + str(valid.count())+ " loans opened by VALID data_set for model validation!")

In [0]:
print(" == imbalancement of the loan train and valid datasets  ==")
print(" >>>>>>> Train dataset: " + str(train.groupby('default_loan').count().collect()))
print(" >>>>>>> Test dataset: " + str(valid.groupby('default_loan').count().collect()))

In [0]:
# Set the response and predictor variables and set up regression models with train and test datasets.
Y = "default_loan"

categoricals = ["phi_term_month", "home_ownership", "purpose", "addr_state", "verification_status", "application_type"]
numerics = ["CRI", "phi_loan_amnt", "phi_emp_length", "phi_annual_inc", "phi_dti", "phi_delinq_2yrs", "phi_revol_util", "phi_total_acc", "phi_credit_length_in_years", "phi_int_rate"]
X = categoricals + numerics

# now we can save the valid to use it as an imput data for our final model

Ridge regression

In [0]:
# (1) define the model function
# to build Grid of GLM models and Standardization + CrossValidation

import sklearn.metrics as metrics
import pandas as pd
from plotnine import *
from plotnine.data import meat
from mizani.breaks import date_breaks
from mizani.formatters import date_format
from pyspark.ml import Pipeline
from pyspark.ml.feature import StandardScaler, StringIndexer, OneHotEncoder, Imputer, VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
import mlflow
import mlflow.spark
from pyspark.mllib.evaluation import BinaryClassificationMetrics
from pyspark.ml.linalg import Vectors

# setting the parameters
maxIter = 10
elasticNetParam = 0
regParam = 0.3
  
  ## we start with mlflow.start_run() which essentially start tracking what we are doing in this notebook in databricks
with mlflow.start_run():
    labelCol = "default_loan"
    indexers = list(map(lambda c: StringIndexer(inputCol=c, outputCol=c+"_idx", handleInvalid = "keep"), categoricals))
    ohes = list(map(lambda c: OneHotEncoder(inputCol=c + "_idx", outputCol=c+"_class"), categoricals))
    imputers = Imputer(inputCols = numerics, outputCols = numerics)
    featureCols = list(map(lambda c: c+"_class", categoricals)) + numerics
    model_matrix_stages = indexers + ohes + \
                          [imputers] + \
                          [VectorAssembler(inputCols=featureCols, outputCol="features"), \
                           StringIndexer(inputCol= labelCol, outputCol="label")]
    
    scaler = StandardScaler(inputCol="features",
                            outputCol="scaledFeatures",
                            withStd=True,
                            withMean=True)
    
    ## here, we build the logistic regression model with parameters equal to variables for elasticNet regression
    lr = LogisticRegression(maxIter=maxIter, elasticNetParam=elasticNetParam, regParam=regParam, featuresCol = "scaledFeatures")
    
    ##now, we define a pipline which includes everything from standardazing the data, imputing missing values and encoding for categorical columns
    pipeline = Pipeline(stages=model_matrix_stages+[scaler]+[lr])
    
    glm_model = pipeline.fit(train)
    
    ## Log Params and Model
    ## The important part for mlflow of model tracking and reproduceability of the input parameters that we may want to review and take an action.  
    mlflow.log_param("algorithm", "SparkML_GLM_regression") # we put a name for the algorithm that we used
    mlflow.log_param("regParam", regParam)
    mlflow.log_param("maxIter", maxIter)
    mlflow.log_param("elasticNetParam", elasticNetParam)
    mlflow.spark.log_model(glm_model, "glm_model")           # here we log the model itself
    
    ##Evaluate and Log ROC Curve
    lr_summary = glm_model.stages[len(glm_model.stages)-1].summary
    roc_pd = lr_summary.roc.toPandas()
    fpr = roc_pd["FPR"]
    tpr = roc_pd["TPR"]
    roc_auc = metrics.auc(roc_pd["FPR"], roc_pd["TPR"])
   
    ## Set Max F1 Threshold  (for predicting the loan default with a balance between true-positives and false-positives)
    fMeasure = lr_summary.fMeasureByThreshold
    maxFMeasure = fMeasure.groupBy().max("F-Measure").select("max(F-Measure)").head()
    madFMeasure = maxFMeasure["max(F-Measure)"]
    fMeasure = fMeasure.toPandas()
    bestThreshold = float ( fMeasure[ fMeasure["F-Measure"] == maxFMeasure] ["threshold"])
    lr.setThreshold(bestThreshold)
    
    
     ## Evaluate and Log Metrics  (here we score the customers)
    def extract(row):
      return (row.remain,) + tuple(row.probability.toArray().tolist()) + (row.label,) + (row.prediction,)

    def score(model,data):
      pred = model.transform(data).select("remain", "probability", "label", "prediction")
      pred = pred.rdd.map(extract).toDF(["remain", "p0", "p1", "label", "prediction"])
      return pred

    def auc(pred):
      metric = BinaryClassificationMetrics(pred.select("p1", "label").rdd)
      return metric.areaUnderROC
    
   
    glm_train = score(glm_model, train)
    glm_valid = score(glm_model, valid)
    
    glm_train.registerTempTable("glm_train")
    glm_valid.registerTempTable("glm_valid")
    
    print( "GLM Training AUC :" + str( auc(glm_train)))
    print( "GLM Validation AUC :" + str(auc(glm_valid)))
    
    ## here we log the auc values and the area under the curve for the models metrics as we defined before for training as well as validation dataset
    mlflow.log_metric("train_auc", auc(glm_train))
    mlflow.log_metric("valid_auc", auc(glm_valid))


In [0]:
pandas_df = glm_valid.toPandas()
txt = 'This table represents the "CONFUSION MATRIX" from Ridge Regression'
print(txt.title())
pd.crosstab(pandas_df.label, pandas_df.prediction, values=pandas_df.remain, aggfunc="count").round(2)

In [0]:
pandas_df_sum_net = glm_valid.groupBy("label", "prediction").agg((sum(col("remain"))).alias("sum_net")).toPandas()
txt = 'This table represents the "SUM NET" from Ridge Regression'
print(txt.title())
pd.crosstab(pandas_df_sum_net.label, pandas_df_sum_net.prediction, values=pandas_df_sum_net.sum_net , aggfunc="sum").round(2)