# Spark ML Pipelines - Lending Club Demo

Using the Lending Club data set, this demo will demonstrate how to:

1. Load the research dataset from s3
2. Perform feature extraction using Spark ML APIs
3. Train a Logistic Regression and a Random Forest Classifier
4. Perform Hyperparameter tuning using ParamGrid, Binary Classification Evaluator, and a Cross Validator
5. Export the best pipeline (feature transformers and LR/RF models) to an MLeap Bundle, which we'll use to deploy the pipeline to an RESTful API service.



# Cluster Configuration

In [1]:
%%configure -f
{"kind": "spark",
"driverMemory": "2048M",
"executorCores": 2,
 "conf":{"spark.jars.packages":"org.apache.hadoop:hadoop-aws:2.7.3,ml.combust.mleap:mleap-spark_2.11:0.8.1,com.databricks:spark-avro_2.11:3.0.1",
         "spark.jars.excludes": "org.scala-lang:scala-reflect,org.apache.spark:spark-tags_2.11",
        "spark.yarn.appMasterEnv.JAVA_HOME": "/usr/lib/jvm/java-8-oracle/jre",
        "spark.executorEnv.JAVA_HOME": "/usr/lib/jvm/java-8-oracle/jre"}
}

In [4]:
// sc.stop()

# Part 2: Spark ML Feature Extraction

### Load Required Libraries

In [2]:
// Spark Training Pipeline Libraries
import org.apache.spark.ml.feature.OneHotEncoder
import org.apache.spark.ml.feature.{StandardScaler, StringIndexer, VectorAssembler, PolynomialExpansion}
import org.apache.spark.ml.classification.{RandomForestClassifier, LogisticRegression, RandomForestClassificationModel}
import org.apache.spark.ml.{Pipeline, PipelineModel, Transformer, PipelineStage}
import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}
import org.apache.spark.ml.evaluation.{BinaryClassificationEvaluator}
import com.databricks.spark.avro._

// MLeap/Bundle.ML Serialization Libraries
import ml.combust.mleap.spark.SparkSupport._
import resource._
import ml.combust.bundle.BundleFile
import org.apache.spark.ml.bundle.SparkBundleContext
import ml.combust.bundle.serializer.SerializationFormat

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
0,application_1500400165929_0592,spark,idle,Link,Link,✔


SparkSession available as 'spark'.
import ml.combust.bundle.serializer.SerializationFormat

### Step 1: Import and Explore the Research Dataset

In [3]:
%%local

! hadoop fs -ls /data/

Found 11 items
-rwxr-xr-x   3 5000 5000    4123652 2017-06-19 16:32 /data/FL_insurance_sample.csv
-rwxr-xr-x   3 5004 5004  102036996 2017-07-18 22:36 /data/LC_data.csv
-rwxr-xr-x   3 5000 5000   12527920 2017-07-10 19:58 /data/avro-tools-1.7.7.jar
drwxr-xr-x   - 5000 5000          1 2017-07-10 18:37 /data/lc_2017060401
drwxr-xr-x   - 5004 5004          2 2017-07-19 23:37 /data/lc_logit
-rwxr-xr-x   3 5000 5000        508 2017-07-10 18:30 /data/lending_club_2017060401.avsc
-rwxr-xr-x   3 5000 5000   48379871 2017-07-10 18:17 /data/lending_club_20170617.avro
-rwxr-xr-x   3 5000 5000        508 2017-07-10 18:31 /data/lending_club_20170617.avsc
-rwxr-xr-x   3 5003 5003   28697490 2017-07-10 19:50 /data/modeldb-scala-client.jar
drwxr-xr-x   - 5003 5003          2 2017-12-05 20:36 /data/rf-model
-rwxr-xr-x   3 5003 5003     104736 2017-12-05 20:36 /data/sample_libsvm_data.txt


In [4]:
//val filePath = "s3a://mleap-demo/datasources/lending_club_2017060401.avro"
// val filePath = "hdfs:///data/lending_club_2017060401.avro"
// val dataset = spark.read.avro(filePath)
// val dataset = spark.table("default.lendingclub2017060401")

val filePath = "hdfs:///data/LC_data.csv"
// val dataset = spark.read.csv(filePath)

val dataset = spark.sqlContext.read.format("csv").option("header", "true").option("inferSchema", "true").load(filePath)

println("Total N Records: " + dataset.count())

Total N Records: 534847

In [5]:
println("Total N Approved Records: " + dataset.filter("default7 == 1.0").count())

Total N Approved Records: 109584

In [6]:
dataset.show(1)

+---------+--------+----------+-----+-----------+--------+----------+--------+---------------+---------------+-----------+-------------------+--------------------------+-------------------+------------------------+-------------+----------------------+---------------+--------------+-------------+------------------------+----------------------+----------------+---------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------

In [7]:
// View the individual records
// dataset.select("loan_amount", "fico_score_group_fnl", "dti", "emp_length", "state", "approved", "loan_title").show(5)

### Step 2: Define continous and categorical features and filter nulls

In [8]:
// Step 2. Create our feature pipeline and train it on the entire dataset
val continuousFeatures = Array("loan_amnt", "dti", "int_rate","annual_inc")

val categoricalFeatures = Array("delinq_2yrs",
                                  "term_ 36 months",
                                  "term_ 60 months",
                                  "purpose_car", "purpose_credit_card", 
                                 "purpose_debt_consolidation", "purpose_educational")

// feature set
val allFeatures = continuousFeatures.union(categoricalFeatures)

allFeatures: Array[String] = Array(loan_amnt, dti, int_rate, annual_inc, delinq_2yrs, term_ 36 months, term_ 60 months, purpose_car, purpose_credit_card, purpose_debt_consolidation, purpose_educational)

In [9]:
// Subset dataset to a handful of features
val allCols = allFeatures.union(Seq("default7")).map(dataset.col)
val datasetFiltered = dataset.select(allCols: _*).persist()

datasetFiltered: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [loan_amnt: double, dti: double ... 10 more fields]

### Step 3: Split data into training and validation

In [10]:
val Array(trainingDataset, validationDataset) = datasetFiltered.randomSplit(Array(0.7, 0.3))

trainingDataset: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [loan_amnt: double, dti: double ... 10 more fields]
validationDataset: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [loan_amnt: double, dti: double ... 10 more fields]

### Step 4: Continous Feature Pipelines

Pipeline 1: Scale Features
    - VectorAssembler
    - StandardScaler

In [11]:
// Pipeline 1
val continuousFeatureAssembler = new VectorAssembler(uid = "continuous_feature_assembler").
    setInputCols(continuousFeatures).
    setOutputCol("unscaled_continuous_features")

val continuousFeatureScaler = new StandardScaler(uid = "continuous_feature_scaler").
    setInputCol("unscaled_continuous_features").
    setOutputCol("scaled_continuous_features")

continuousFeatureScaler: org.apache.spark.ml.feature.StandardScaler = continuous_feature_scaler

### Step 6: Assemble our features and feature pipeline

In [12]:
// Assemble Feature Vector For Random Forest Classifier
val featureColsRf = allFeatures //continuousFeatureAssembler.map(_.getOutputCol).union(Seq("scaled_continuous_features"))

val featureAssemblerRf = new VectorAssembler(uid = "feature_assembler_rf").
    setInputCols(featureColsRf).
    setOutputCol("features_rf")

// Define an array of all of our feature estimators that we need to fit
val estimators: Array[PipelineStage] = Array(continuousFeatureAssembler, continuousFeatureScaler).
    union(Seq(featureAssemblerRf))

// Build our pipeline based on the estimators
val featurePipeline = new Pipeline(uid = "feature_pipeline").
    setStages(estimators)

// Fit our pipeline
val sparkFeaturePipelineModel = featurePipeline.fit(trainingDataset)

println("Finished constructing the pipeline")

Finished constructing the pipeline

In [13]:
// apply pipeline to the training set to get final modeling dataset
val dfWithFeatures = sparkFeaturePipelineModel.transform(trainingDataset)

dfWithFeatures: org.apache.spark.sql.DataFrame = [loan_amnt: double, dti: double ... 13 more fields]

In [14]:
dfWithFeatures.select("features_rf").head(1)

res36: Array[org.apache.spark.sql.Row] = Array([(11,[0,1,2,3,4,5],[500.0,3.04,10.71,7904.04,1.0,1.0])])

# Part 3: Spark ML Training and Scoring

In this section we are going to train a Logistic Regression model and a Random Forest Classifier model on the training data and test their accuracy on the validation dataset.

### Step 7: Train Random Forest Classifier

In [15]:
// Step 3.1 Create our random forest model
val randomForest = new RandomForestClassifier(uid = "random_forest_classifier").
    setFeaturesCol("features_rf").
    setLabelCol("default7").
    setPredictionCol("approved_prediction").
    setNumTrees(5).
    setMaxDepth(5)


println("Complete: Training Random Forest")

Complete: Training Random Forest

In [16]:
trainingDataset.show(1)

+---------+----+--------+----------+-----------+---------------+---------------+-----------+-------------------+--------------------------+-------------------+--------+
|loan_amnt| dti|int_rate|annual_inc|delinq_2yrs|term_ 36 months|term_ 60 months|purpose_car|purpose_credit_card|purpose_debt_consolidation|purpose_educational|default7|
+---------+----+--------+----------+-----------+---------------+---------------+-----------+-------------------+--------------------------+-------------------+--------+
|    500.0|3.04|   10.71|   7904.04|        1.0|              1|              0|          0|                  0|                         0|                  0|       0|
+---------+----+--------+----------+-----------+---------------+---------------+-----------+-------------------+--------------------------+-------------------+--------+
only showing top 1 row

In [17]:
val sparkPipelineEstimatorRf = new Pipeline().setStages(Array(sparkFeaturePipelineModel, randomForest))

// fit the random forest piple here: first stage is feature pipleline, then random forest
val sparkPipelineRf = sparkPipelineEstimatorRf.fit(trainingDataset)

sparkPipelineRf: org.apache.spark.ml.PipelineModel = pipeline_0fadea4be96f

In [18]:
val randomForestModel = randomForest.fit(dfWithFeatures)
randomForestModel.toDebugString

res44: String =
"RandomForestClassificationModel (uid=rfc_a58db9bf0c70) with 5 trees
  Tree 0 (weight 1.0):
    If (feature 1 <= 21.24)
     If (feature 6 <= 0.0)
      If (feature 2 <= 12.29)
       If (feature 2 <= 8.94)
        If (feature 3 <= 51000.0)
         Predict: 0.0
        Else (feature 3 > 51000.0)
         Predict: 0.0
       Else (feature 2 > 8.94)
        If (feature 2 <= 11.12)
         Predict: 0.0
        Else (feature 2 > 11.12)
         Predict: 0.0
      Else (feature 2 > 12.29)
       If (feature 3 <= 43000.0)
        If (feature 2 <= 15.59)
         Predict: 0.0
        Else (feature 2 > 15.59)
         Predict: 0.0
       Else (feature 3 > 43000.0)
        If (feature 2 <= 16.29)
         Predict: 0.0
        Else (feature 2 > 16.29)
         Predict: 0.0
     ...

In [19]:
randomForestModel.extractParamMap

res45: org.apache.spark.ml.param.ParamMap =
{
	rfc_a58db9bf0c70-cacheNodeIds: false,
	rfc_a58db9bf0c70-checkpointInterval: 10,
	rfc_a58db9bf0c70-featureSubsetStrategy: auto,
	rfc_a58db9bf0c70-featuresCol: features_rf,
	rfc_a58db9bf0c70-impurity: gini,
	rfc_a58db9bf0c70-labelCol: default7,
	rfc_a58db9bf0c70-maxBins: 32,
	rfc_a58db9bf0c70-maxDepth: 5,
	rfc_a58db9bf0c70-maxMemoryInMB: 256,
	rfc_a58db9bf0c70-minInfoGain: 0.0,
	rfc_a58db9bf0c70-minInstancesPerNode: 1,
	rfc_a58db9bf0c70-numTrees: 5,
	rfc_a58db9bf0c70-predictionCol: approved_prediction,
	rfc_a58db9bf0c70-probabilityCol: probability,
	rfc_a58db9bf0c70-rawPredictionCol: rawPrediction,
	rfc_a58db9bf0c70-seed: 207336481,
	rfc_a58db9bf0c70-subsamplingRate: 1.0
}

### Step 9: Validate the Random Forest Classifier

In [20]:
// Run the feature transformers and score the validation dataset with RF model
val validationDataWithPrediction = sparkPipelineRf.transform(validationDataset)

validationDataWithPrediction: org.apache.spark.sql.DataFrame = [loan_amnt: double, dti: double ... 16 more fields]

In [21]:
// Compute the Area Under the Receiver Operating Characteristics (ROC) Curve
val evaluator = new BinaryClassificationEvaluator().
    setLabelCol(randomForest.getLabelCol).
    setRawPredictionCol(randomForest.getRawPredictionCol).
    setMetricName("areaUnderROC")
    
val accuracy = evaluator.evaluate(validationDataWithPrediction)
println("Accuracy: " + accuracy)

Accuracy: 0.6964259045202464

# Part 5: Serializing ML Pipelines for Deployment

In [22]:
import ml.combust.bundle.serializer.SerializationFormat
import ml.combust.bundle.BundleFile

import ml.combust.bundle.BundleFile

In [23]:
import java.io._
val sbcRf = SparkBundleContext().withDataset(sparkPipelineRf.transform(datasetFiltered))
val localpwd = new File(".").getAbsolutePath()
localpwd

res50: String = /tmp/hadoop-mapr/nm-local-dir/usercache/harry/appcache/application_1500400165929_0592/container_e22_1500400165929_0592_01_000001/.

In [24]:
// Save model to Spark Master node
for(bf <- managed(BundleFile(s"jar:file:$localpwd/lc_pipeline_rf2.zip"))) {
        sparkPipelineRf.writeBundle.save(bf)(sbcRf).get
      }


In [25]:
// Copy file to hadoop fs
import sys.process._
val hadoopresult = s"hadoop fs -copyFromLocal -f $localpwd/lc_pipeline_rf2.zip /tmp/harry_lc_pipeline_rf2.zip" !!
hadoopresult

res54: String = ""

In [26]:
%%local
# Copy file to DS Platform Container

! hadoop fs -ls /tmp
! hadoop fs -get /tmp/harry_lc_pipeline_rf2.zip /home/datascience/loan-risk-demo/SparkExamples/

Found 8 items
-rwxr-xr-x   3 5003 5003  102036996 2017-12-04 23:24 /tmp/LC_data.csv
drwxr-xr-x   - 5005 5005          5 2017-09-15 00:04 /tmp/affinity_df
drwxr-xr-x   - 5005 5005          5 2017-09-15 00:04 /tmp/affinity_df_spark_csv
-rwxr-xr-x   3 5005 5005       9096 2017-09-13 02:12 /tmp/artist_alias_small.txt
-rwxr-xr-x   3 5005 5005     728192 2017-09-13 02:24 /tmp/artist_data_small.txt
-rwxr-xr-x   3 5007 5007      11140 2018-01-11 07:05 /tmp/harry_lc_pipeline_rf2.zip
drwxr-xr-x   - 5005 5005          2 2017-09-14 23:27 /tmp/iris_tbl_tmp
-rwxr-xr-x   3 5005 5005     898234 2017-09-13 02:24 /tmp/user_artist_data_small.txt
get: `/home/datascience/loan-risk-demo/SparkExamples/harry_lc_pipeline_rf2.zip': File exists


In [29]:
%%local
!ls /home/datascience/loan-risk-demo/SparkExamples/

Readme.md	 harry_lc_pipeline_rf2.zip
deploy-scala.py  mleap-scala-train-and-save.ipynb
