<a href="https://cocl.us/Data_Science_with_Scalla_top"><img src = "https://s3-api.us-geo.objectstorage.softlayer.net/cf-courses-data/CognitiveClass/SC0103EN/adds/Data_Science_with_Scalla_notebook_top.png" width = 750, align = "center"></a>
 <br/>
<a><img src="https://ibm.box.com/shared/static/ugcqz6ohbvff804xp84y4kqnvvk3bq1g.png" width="200" align="center"></a>"

# Module 5: Pipeline and Grid Search

## Predicting Grant Applications: Building a Pipeline

### Lesson Objectives

* After completing this lesson, you should be able to extract useful information from the results of the grid search, including:
  - the average area under the ROC curve for each combination of parameters
  - the parameters of the best model
  - the feature importances of the best model
  
### avgMetrics

In [None]:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().getOrCreate()
import spark.implicits._
import org.apache.spark.sql.functions._

val data = spark.read.
  format("com.databricks.spark.csv").
  option("delimiter", "\t").
  option("header", "true").
  option("inferSchema", "true").
  load("/resources/data/grantsPeople.csv")

data.show()

In [None]:

val researchers = data.
  withColumn ("phd", data("With_PHD").equalTo("Yes").cast("Int")).
  withColumn ("CI", data("Role").equalTo("CHIEF_INVESTIGATOR").cast("Int")).
  withColumn("paperscore", data("A2") * 4 + data("A") * 3)

val grants = researchers.groupBy("Grant_Application_ID").agg(
  max("Grant_Status").as("Grant_Status"),
  max("Grant_Category_Code").as("Category_Code"),
  max("Contract_Value_Band").as("Value_Band"),
  sum("phd").as("PHDs"),
  when(max(expr("paperscore * CI")).isNull, 0).
    otherwise(max(expr("paperscore * CI"))).as("paperscore"),
  count("*").as("teamsize"),
  when(sum("Number_of_Successful_Grant").isNull, 0).
    otherwise(sum("Number_of_Successful_Grant")).as("successes"),
  when(sum("Number_of_Unsuccessful_Grant").isNull, 0).
    otherwise(sum("Number_of_Unsuccessful_Grant")).as("failures")
)

grants.show()

In [None]:

import org.apache.spark.ml.feature.StringIndexer

val value_band_indexer = new StringIndexer().
  setInputCol("Value_Band").
  setOutputCol("Value_index").
  fit(grants)
  
val category_indexer = new StringIndexer().
  setInputCol("Category_Code").
  setOutputCol("Category_index").
  fit(grants)
  
val label_indexer = new StringIndexer().
  setInputCol("Grant_Status").
  setOutputCol("status").
  fit(grants)



In [None]:
import org.apache.spark.ml.feature.VectorAssembler

val assembler = new VectorAssembler().
  setInputCols(Array(
    "Value_index"
    ,"Category_index"
    ,"PHDs"
    ,"paperscore"
    ,"teamsize"
    ,"successes"
    ,"failures"
  )).setOutputCol("assembled")


In [None]:
import org.apache.spark.ml.classification.RandomForestClassifier
import org.apache.spark.ml.classification.RandomForestClassificationModel

val rf = new RandomForestClassifier().
  setFeaturesCol("assembled").
  setLabelCol("status").
  setSeed(42)

import org.apache.spark.ml.Pipeline
val pipeline = new Pipeline().setStages(Array(
    value_band_indexer,
    category_indexer,
    label_indexer,
    assembler,
    rf)
  )

In [None]:
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
val auc_eval = new BinaryClassificationEvaluator().
  setLabelCol("status").
  setRawPredictionCol("rawPrediction")

auc_eval.getMetricName

val tr = grants.filter("Grant_Application_ID < 6635")
val te = grants.filter("Grant_Application_ID >= 6635")
val training = tr.na.fill(0, Seq("PHDs"))
val test = te.na.fill(0, Seq("PHDs"))

val model = pipeline.fit(training)
val pipeline_results = model.transform(test)
auc_eval.evaluate(pipeline_results)

rf.extractParamMap


In [None]:
import org.apache.spark.ml.tuning.ParamGridBuilder

val paramGrid = new ParamGridBuilder().
  addGrid(rf.maxDepth, Array(2, 5)).
  addGrid(rf.numTrees, Array(1, 20)).
  build()

In [None]:
import org.apache.spark.ml.tuning.CrossValidator

val cv = new CrossValidator().
  setEstimator(pipeline).
  setEvaluator(auc_eval).
  setEstimatorParamMaps(paramGrid).
  setNumFolds(3)

val cvModel = cv.fit(training)

val cv_results = cvModel.transform(test)
cvModel.avgMetrics

### Finding the Winning Parameters

In [None]:
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.ml.tuning.CrossValidatorModel

implicit class BestParamMapCrossValidatorModel(cvModel: CrossValidatorModel)
{
  def bestEstimatorParamMap: ParamMap = cvModel.getEstimatorParamMaps.zip(cvModel.avgMetrics).maxBy(_._2)._1
}

### Using bestEstimatorParamMap

In [None]:

println(cvModel.bestEstimatorParamMap)


### Best Model

In [None]:
val bestPipelineModel = cvModel.bestModel.asInstanceOf[org.apache.spark.ml.PipelineModel]
bestPipelineModel.stages

### Extracting the Winning Classifier

In [None]:
val bestRandomForest = bestPipelineModel.stages(4).asInstanceOf[RandomForestClassificationModel]
bestRandomForest.toDebugString

### totalNumNodes

In [None]:
bestRandomForest.totalNumNodes

### Feature Importances

In [None]:
bestRandomForest.featureImportances

### Wrapping Up

* Using the default parameters, we had an area under the ROC curve of 0.909
* After a grid search, we got that up to 0.926
* Running the grid search on a cluster was a real timesaver
* Not all of our features proved very useful; maybe you can do better!

### Module Summary

* Having completed this module about Predicting Grant Applications, you should be able to:
  - Understand how to fit together the functions available in Spark's machine learning libraries to solve real problems
  - Fit models in a fraction of the time, using a Spark cluster


### About the Authors

[Petro Verkhogliad](https://www.linkedin.com/in/vpetro) is Consulting Manager at Lightbend. He holds a Masters degree in Computer Science with specialization in Intelligent Systems. He is passionate about functional programming and applications of AI.