# Spark ML Pipelines - Lending Club Demo

Using the Lending Club data set, this demo will demonstrate how to:

1. Load the research dataset from s3
2. Perform feature extraction using Spark ML APIs
3. Train a Logistic Regression and a Random Forest Classifier
4. Perform Hyperparameter tuning using ParamGrid, Binary Classification Evaluator, and a Cross Validator
5. Export the best pipeline (feature transformers and LR/RF models) to an MLeap Bundle, which we'll use to deploy the pipeline to an RESTful API service.



# Cluster Configuration

In [1]:
%%configure -f
{"kind": "spark",
"driverMemory": "2048M",
"executorCores": 2,
 "conf":{"spark.jars.packages":"org.apache.hadoop:hadoop-aws:2.7.3,ml.combust.mleap:mleap-spark-extension_2.11:0.7.0,com.databricks:spark-avro_2.11:3.0.1", "spark.jars.excludes": "org.scala-lang:scala-reflect,org.apache.spark:spark-tags_2.11"}
}

ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
69,application_1497282488964_0069,spark,idle,Link,Link,


# Part 2: Spark ML Feature Extraction

### Load Required Libraries

In [2]:
// Spark Training Pipeline Libraries
import org.apache.spark.ml.feature.OneHotEncoder
import org.apache.spark.ml.feature.{StandardScaler, StringIndexer, VectorAssembler, PolynomialExpansion}
import org.apache.spark.ml.classification.{RandomForestClassifier, LogisticRegression}
import org.apache.spark.ml.{Pipeline, PipelineModel, Transformer, PipelineStage}
import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}
import org.apache.spark.ml.evaluation.{BinaryClassificationEvaluator}
import com.databricks.spark.avro._

// MLeap/Bundle.ML Serialization Libraries
import ml.combust.mleap.spark.SparkSupport._
import resource._
import ml.combust.bundle.BundleFile
import org.apache.spark.ml.bundle.SparkBundleContext

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
70,application_1497282488964_0070,spark,idle,Link,Link,✔


SparkSession available as 'spark'.
import org.apache.spark.ml.bundle.SparkBundleContext

### Step 1: Import and Explore the Research Dataset

In [7]:
val filePath = "s3a://blah@blah://mleap-demo/datasources/lending_club.avro"
val dataset = spark.read.avro(filePath)

println("Total N Records: " + dataset.count())
println("Total N Approved Records: " + dataset.filter("approved == 1.0").count())

com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 403, AWS Service: Amazon S3, AWS Request ID: 641FEF2AF9BA6B4E, AWS Error Code: null, AWS Error Message: Forbidden
  at com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:798)
  at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:421)
  at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:232)
  at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3528)
  at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:976)
  at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:956)
  at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:892)
  at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:77)
  at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1426)
  at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$14.apply(DataSource.sc

In [9]:
sys.env("fs.s3a.access.key")

java.util.NoSuchElementException: key not found: fs.s3a.access.key
  at scala.collection.MapLike$class.default(MapLike.scala:228)
  at scala.collection.AbstractMap.default(Map.scala:59)
  at scala.collection.MapLike$class.apply(MapLike.scala:141)
  at scala.collection.AbstractMap.apply(Map.scala:59)
  ... 53 elided


In [None]:
// View the individual records
dataset.select("loan_amount", "fico_score_group_fnl", "dti", "emp_length", "state", "approved", "loan_title").show(5)

### Alter the Data Slightly For The Demo

- Cap DTI
- Limit the number of states for OneHotEncoding
- Keep only certain loan categories

In [None]:
dataset.registerTempTable("df")
println(dataset.count())

val datasetFnl = spark.sqlContext.sql(f"""
    select
        loan_amount,
        fico_score_group_fnl,
        case when dti >= 10.0
            then 10.0
            else dti
        end as dti,
        emp_length,
        case when state in ('CA', 'NY', 'MN', 'IL', 'FL', 'WA', 'MA', 'TX', 'GA', 'OH', 'NJ', 'VA', 'MI')
            then state
            else 'Other'
        end as state,
        loan_title,
        approved
    from df
    where loan_title in('Debt Consolidation', 'Other', 'Home/Home Improvement', 'Payoff Credit Card', 'Car Payment/Loan',
    'Business Loan', 'Health/Medical', 'Moving', 'Wedding/Engagement', 'Vacation', 'College', 'Renewable Energy', 'Payoff Bills',
    'Personal Loan', 'Motorcycle')
""")

datasetFnl.registerTempTable("dfFnl")
println(datasetFnl.count())

### Summary Statistics

In [None]:
// Most popular cities (original dataset)

spark.sqlContext.sql(f"""
    select 
        state,
        count(*) as n,
        cast(avg(loan_amount) as decimal(12,2)) as loan_amount,
        cast(avg(dti) as decimal(12,2)) as dti,
        avg(cast(approved as decimal(12,2))) as approved
    from dfFnl
    group by state
    order by avg(cast(approved as decimal(12,2))) desc
""").show(15)

In [None]:
// Most popular cities (original dataset)

spark.sqlContext.sql(f"""
    select 
        loan_title,
        count(*) as n,
        cast(avg(loan_amount) as decimal(12,2)) as loan_amount,
        cast(avg(dti) as decimal(12,2)) as dti,
        avg(cast(approved as decimal(12,2))) as approved
    from dfFnl
    group by loan_title
    order by avg(cast(approved as decimal(12,2))) desc
""").show(15)

### Step 2: Define continous and categorical features and filter nulls

In [None]:
// Step 2. Create our feature pipeline and train it on the entire dataset
val continuousFeatures = Array("loan_amount", "dti")

val categoricalFeatures = Array("loan_title",
  "emp_length",
  "state",
  "fico_score_group_fnl")

val allFeatures = continuousFeatures.union(categoricalFeatures)

In [None]:
// Filter all null values
val allCols = allFeatures.union(Seq("approved")).map(datasetFnl.col)
val nullFilter = allCols.map(_.isNotNull).reduce(_ && _)
val datasetImputedFiltered = datasetFnl.select(allCols: _*).filter(nullFilter).persist()

println(datasetImputedFiltered.count())

### Step 3: Split data into training and validation

In [None]:
val Array(trainingDataset, validationDataset) = datasetImputedFiltered.randomSplit(Array(0.7, 0.3))

### Step 4: Continous Feature Pipelines

Pipeline 1: Scale Features
    - VectorAssembler
    - StandardScaler
    
Pipeline 2: Polynomial Expansion
    - VectorAssembler
    - PolynomialExpansion

In [None]:
// Pipeline 1
val continuousFeatureAssembler = new VectorAssembler(uid = "continuous_feature_assembler").
    setInputCols(continuousFeatures).
    setOutputCol("unscaled_continuous_features")

val continuousFeatureScaler = new StandardScaler(uid = "continuous_feature_scaler").
    setInputCol("unscaled_continuous_features").
    setOutputCol("scaled_continuous_features")

// Pipeline 2
val polyExpansionAssembler = new VectorAssembler(uid = "poly_expansion_feature_assembler").
    setInputCols(Array("loan_amount", "dti")).
    setOutputCol("poly_expansions_features")

val continuousFeaturePolynomialExpansion = new PolynomialExpansion(uid = "polynomial_expansion_loan_amount").
    setInputCol("poly_expansions_features").
    setOutputCol("loan_amount_polynomial_expansion_features")

### Step 5: Categorical Feature Pipeline

Pipeline 3: String Index + One-Hot-Encode each categorical feature
    - StringIndexer
    - OneHotEncoder

In [None]:
val categoricalFeatureIndexers = categoricalFeatures.map {
    feature => new StringIndexer(uid = s"string_indexer_$feature").
      setInputCol(feature).
      setOutputCol(s"${feature}_index")
}

val categoricalFeatureOneHotEncoders = categoricalFeatureIndexers.map {
    indexer => new OneHotEncoder(uid = s"oh_encoder_${indexer.getOutputCol}").
      setInputCol(indexer.getOutputCol).
      setOutputCol(s"${indexer.getOutputCol}_oh")
}

### Step 6: Assemble our features and feature pipeline

Combine Pipeline 1, Pipeline 2, and Pipeline 3

In [None]:
// Assemble Feature Vector For Random Forest Classifier
val featureColsRf = categoricalFeatureIndexers.map(_.getOutputCol).union(Seq("scaled_continuous_features", "loan_amount_polynomial_expansion_features"))

// Assemble Feature Vector For Logistic Regression
val featureColsLr = categoricalFeatureOneHotEncoders.map(_.getOutputCol).union(Seq("scaled_continuous_features"))

// Vector-assemble all categorical and continuous features into a single feature vector
val featureAssemblerLr = new VectorAssembler(uid = "feature_assembler_lr").
    setInputCols(featureColsLr).
    setOutputCol("features_lr")
    
val featureAssemblerRf = new VectorAssembler(uid = "feature_assembler_rf").
    setInputCols(featureColsRf).
    setOutputCol("features_rf")

// Define an array of all of our feature estimators that we need to fit
val estimators: Array[PipelineStage] = Array(continuousFeatureAssembler, continuousFeatureScaler, polyExpansionAssembler, continuousFeaturePolynomialExpansion).
    union(categoricalFeatureIndexers).
    union(categoricalFeatureOneHotEncoders).
    union(Seq(featureAssemblerLr, featureAssemblerRf))

// Build our pipeline based on the estimators
val featurePipeline = new Pipeline(uid = "feature_pipeline").
    setStages(estimators)

// Fit our pipeline
val sparkFeaturePipelineModel = featurePipeline.fit(trainingDataset)

println("Finished constructing the pipeline")



In [None]:
val dfWithFeatures = sparkFeaturePipelineModel.transform(trainingDataset)
// Explore our generated features - Linear Regression
dfWithFeatures.select("features_lr").head(1)

In [None]:
// Explore our generated features - Linear Regression
dfWithFeatures.select("features_rf").head(1)

# Part 3: Spark ML Training and Scoring

In this section we are going to train a Logistic Regression model and a Random Forest Classifier model on the training data and test their accuracy on the validation dataset.

### Step 7: Train Random Forest Classifier

In [None]:
// Step 3.1 Create our random forest model
val randomForest = new RandomForestClassifier(uid = "random_forest_classifier").
    setFeaturesCol("features_rf").
    setLabelCol("approved").
    setPredictionCol("approved_prediction")

val sparkPipelineEstimatorRf = new Pipeline().setStages(Array(sparkFeaturePipelineModel, randomForest))
val sparkPipelineRf = sparkPipelineEstimatorRf.fit(datasetImputedFiltered)

println("Complete: Training Random Forest")

### Step 8: Train Logistic Regression Model


In [None]:
val logisticRegression = new LogisticRegression(uid = "logistic_regression").
    setFeaturesCol("features_lr").
    setLabelCol("approved").
    setPredictionCol("approved_prediction")

val sparkPipelineEstimatorLr = new Pipeline().setStages(Array(sparkFeaturePipelineModel, logisticRegression))
val sparkPipelineLr = sparkPipelineEstimatorLr.fit(datasetImputedFiltered)

println("Complete: Training Logistic Regression")

### Step 9: Validate the Random Forest Classifier

In [None]:
// Run the feature transformers and score the validation dataset with RF model
val validationDataWithPrediction = sparkPipelineRf.transform(validationDataset)

In [None]:
// Compute the Area Under the Receiver Operating Characteristics (ROC) Curve
val evaluator = new BinaryClassificationEvaluator().
    setLabelCol(randomForest.getLabelCol).
    setRawPredictionCol(randomForest.getRawPredictionCol).
    setMetricName("areaUnderROC")
    
val accuracy = evaluator.evaluate(validationDataWithPrediction)
println("Accuracy: " + accuracy)

### Step 10: Validate the Logistic Regression

In [None]:
// Run the feature transformers and score the validation dataset with LR model
val validationDataWithPrediction = sparkPipelineLr.transform(validationDataset)

In [None]:
val evaluator = new BinaryClassificationEvaluator().
    setLabelCol(logisticRegression.getLabelCol).
    setRawPredictionCol(logisticRegression.getRawPredictionCol).
    setMetricName("areaUnderROC")
    
val accuracy = evaluator.evaluate(validationDataWithPrediction)
println("Accuracy: " + accuracy)