# Setting Transformation and Modelling Pipeline

In [53]:
import org.apache.spark.ml.feature.StringIndexer
import org.apache.spark.ml.feature.OneHotEncoderEstimator
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.feature._
import org.apache.spark.mllib.feature.Normalizer
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.classification._
import org.apache.spark.sql._

import org.apache.spark.ml.feature.StringIndexer
import org.apache.spark.ml.feature.OneHotEncoderEstimator
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.feature._
import org.apache.spark.mllib.feature.Normalizer
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.classification._
import org.apache.spark.sql._


In [None]:
// pipline: dataframe, modeltype, cutoff, weighted? y/n

## Reading Data

In [9]:
val df = spark.read.format("csv")
  .option("inferSchema", "true")
  .option("header", "true")
  .load("LCLoan_Wrangled.csv")

df: org.apache.spark.sql.DataFrame = [loan_amnt: int, term: string ... 63 more fields]


## Transformations

In [49]:
//Categorical Features array
val categorical_features = Array(
    "emp_length", "term", "zip_code", "grade", "sub_grade", "title", "purpose", 
    "application_type", "addr_state", "home_ownership",
     "verification_status", "initial_list_status"
)

//encode every categorical feature
val encodeCategoric = categorical_features.flatMap{ name =>
    
    val stringIndexer = new StringIndexer()
      .setInputCol(name)
      .setOutputCol(name + "_index")
      .setHandleInvalid("skip") // options are "keep", "error" or "skip"
    
    val oneHotEncoder = new OneHotEncoderEstimator()
      .setInputCols(Array(name + "_index"))
      .setOutputCols(Array(name + "_vec"))
      .setDropLast(false)
    
    Array(stringIndexer, oneHotEncoder)
}

//define a list of vecotr columns (will be created by the above stage)
val categoricVectorCols = categorical_features.map(_ + "_vec")

//combine categoric vector columns into one
val categoricVectorCombine = new VectorAssembler()
  .setInputCols(categoricVectorCols)
  .setOutputCol("categorical_features")

//Get the number-type columns from the Dataframe
val numericColumns = df.dtypes.filter(column => column._2 == "IntegerType" || column._2 == "DoubleType").map(_._1)
    

//remove the target variable from the numberColumns
numberColumns = numberColumns diff Array("loan_status")
    
//define Vector Assembler for the numeric columns
val numericVec = new VectorAssembler()
    .setInputCols(numberColumns)
    .setOutputCol("numerical_features")
    .setHandleInvalid("skip") 
    
//Numeric scaler, this one is a  minmax scaler
val scaler = new MinMaxScaler()
    .setInputCol("numerical_features")
    .setOutputCol("scaledFeatures")

//Combine Features
val combineAllVec = new VectorAssembler()
    .setInputCols(Array("categorical_features", "scaledFeatures"))
    .setOutputCol("features")

//set the Transformation Order
val transformOrder = Array(numericVec,scaler) ++ encodeCategoric ++ Array(categoricVectorCombine,combineAllVec) 




categorical_features: Array[String] = Array(emp_length, term, zip_code, grade, sub_grade, title, purpose, application_type, addr_state, home_ownership, verification_status, initial_list_status)
encodeCategoric: Array[org.apache.spark.ml.Estimator[_ >: org.apache.spark.ml.feature.OneHotEncoderModel with org.apache.spark.ml.feature.StringIndexerModel <: org.apache.spark.ml.Model[_ >: org.apache.spark.ml.feature.OneHotEncoderModel with org.apache.spark.ml.feature.StringIndexerModel <: org.apache.spark.ml.Transformer with org.apache.spark.ml.param.shared.HasHandleInvalid with org.apache.spark.ml.util.MLWritable] with org.apache.spark.ml.param.shared.HasHandleInvalid with org.apache.spark.ml.util.MLWritable] with org.apache.spark.ml.param.shared.HasHandleInvalid with org.apache.spark.ml.util...

## Setting the Modelling Options
The function will accept inputs to determine the appropriate Pipline

In [50]:
def runModel ( df : DataFrame, modelType : String, threshold : Double, weighted : Boolean ) : PipelineModel = {
        
    //default
    val pipelineOrder = transformOrder    
    
    
    //depending on the model chosen,
    if (modelType == "lr"){
        
        //logistic regression option
        val model = new LogisticRegression()
            .setMaxIter(10)
            .setRegParam(0.001)
            .setThreshold(threshold)
        
        //****not sure about the weighted option just yet
        
        
        //set the pipeline order for this option
        val pipelineOrder = transformOrder ++ Array(model)
    }
    
    else if (modelType == "dt"){
        
        /*decisiontree option - stil working on this
        val model = new DecisionTreeClassifier()
            .setLabelCol("loan_status")
            .setFeaturesCol("features")
        
        //Label Converter
        val labelConverter = new IndexToString()
            .setInputCol("prediction")
            .setOutputCol("predictedLabel")
            .setLabels(labelIndexer.labels)
        
        //set the pipeline order for this option
        val pipelineOrder = transformOrder ++ Array(model, labelConverter)*/
        
    }
    

    
    
    //set the pipeline for the model
    val pipeline = new Pipeline()
        .setStages(pipelineOrder)
    
    //return a model fitted on the dataframe
    return pipeline.fit(df)
    
    
}

runModel: (df: org.apache.spark.sql.DataFrame, modelType: String, threshold: Double, weighted: Boolean)org.apache.spark.ml.PipelineModel


## Training/Test Split

In [76]:
//nothing here yet

## Running the model

In [51]:
//running the model with the desired parameters
var testmodel = runModel(df,"lr",0.5,false)

testmodel: org.apache.spark.ml.PipelineModel = pipeline_8522a0474fb7


In [75]:
//running the model with loan status renamed to label
testmodel.transform(df.withColumnRenamed("loan_status", "label"))
    .select("label","probability","prediction")
    .toDF.show(false)

+-----+------------------------------------------+----------+
|label|probability                               |prediction|
+-----+------------------------------------------+----------+
|1    |[0.0013796581227724393,0.9986203418772276]|1.0       |
|1    |[0.010189470424057854,0.9898105295759422] |1.0       |
|1    |[0.18936247706655956,0.8106375229334405]  |1.0       |
|0    |[0.9172074055718759,0.0827925944281242]   |0.0       |
|1    |[4.260359179034007E-4,0.9995739640820965] |1.0       |
|0    |[0.9999898327118802,1.0167288119700322E-5]|0.0       |
|1    |[0.0017418781360447172,0.9982581218639552]|1.0       |
|1    |[1.61385106990636E-4,0.9998386148930095]  |1.0       |
|1    |[0.006648019148728157,0.993351980851272]  |1.0       |
|0    |[0.025892942657991594,0.9741070573420085] |1.0       |
|0    |[0.7029207943029315,0.29707920569706847]  |0.0       |
|1    |[0.34132342318276887,0.6586765768172311]  |1.0       |
|1    |[0.2000653841423633,0.7999346158576367]   |1.0       |
|1    |[