# Step 3: Create a Scoring Pipeline

The scoring pipeline consists of two steps:

  1. Transform the raw data into a scoring data set
  2. Score the scoring data set.
  
We assume that new data is arriving into the data store as it's generated. The scoring workflow would poll the data store for new data on whatever schedule is convenient (realtime, hourly, daily...), transform and manipulate the new data just as was done for the training step, then predict the label for the new observations. 

Basically, we can run the feature engineering notebook (`./notebooks/2a_feature_engineering`) with the correct parameters to get the new data out, and transform the data into observation format the model expects. Second, we run the scoring notebook (`./notebooks/3a_model_scoring`) to generate predictions for each of those observations. 

We choose the model created in the model training pipeline, and store the predictions in the results_data table. The

In [2]:
import pyspark.sql.functions as F
from pyspark.sql import SparkSession

# The scoring uses the same feature engineering script used to train the model
scoring_table = 'HPscoring_input'
results_table = 'HPresults_output'
model_type = 'RandomForest' # Use 'DecisionTree' or 'RandomForest'

In [3]:
dbutils.widgets.removeAll()
dbutils.widgets.text("results_data", results_table)

dbutils.widgets.text("model_type", model_type)

dbutils.widgets.text("start_date", '2015-11-15')

dbutils.widgets.text("to_date", '2016-04-30')


In [4]:
dbutils.notebook.run("2a_feature_engineering", 600, {"features_table": scoring_table, 
                                                     "start_date": dbutils.widgets.get("start_date"), 
                                                     "to_date": dbutils.widgets.get("to_date")})

In [5]:
dbutils.notebook.run("3a_model_scoring", 600, {"scoring_data": scoring_table, 
                                               "results_data": dbutils.widgets.get("results_data"), 
                                               "model_type": dbutils.widgets.get("model_type")})

In [6]:
# Since we created the scoring data set, we should remove it to keep things clean.
spark = SparkSession.builder.getOrCreate()
telemetry = spark.sql("DROP TABLE " + scoring_table)