# Publish a Training Pipeline
In this notebook, we will show how to automate the training/retraining of model using HyperDrive and registering best model. Once this training pipeline is published/created, it provides a rest endpoint which can be called to run this pipeline or we can create schedule to run this pipeline.

The steps in this notebook are
- [import libraries](#import),
- [read in the Azure ML workspace](#workspace),
- [upload the data to the cloud](#upload),
- [define a hyperparameter search configuration](#configuration),
- [create an estimator](#estimator),
- [Azure Machine Learning Pipelines overview](#aml_pipeline_overview)
- [create the pipeline tuning step](#aml_pipeline_tune_step),
- [create the pipeline best parameters step](#aml_pipeline_bh_step),
- [create the pipeline best model step](#aml_pipeline_bm_step),
- [create the pipeline register model step](#aml_pipeline_rm_step),
- [create the pipeline itself](#create_aml_pipeline),
- [publish the pipeline](#publish_aml_pipeline),
- [run the published pipeline using its REST endpoint](#run_publish_aml_pipeline),
- [schedule the published pipeline to regularly run](#schedule_aml_pipeline), and
- [write out the pipeline's URI and key](#write_uri_key)

## Imports  <a id='import'></a>

In [None]:
import os
import pandas as pd
import requests

from azureml.core import Workspace, Experiment
from azureml.core.datastore import Datastore
from azureml.core.compute import ComputeTarget, AmlCompute
from azureml.exceptions import ComputeTargetException
from azureml.data.data_reference import DataReference
from azureml.pipeline.steps import HyperDriveStep, PythonScriptStep, EstimatorStep
from azureml.pipeline.core import Pipeline, PipelineData, PipelineParameter
from azureml.core.runconfig import RunConfiguration, CondaDependencies
from azureml.train.estimator import Estimator
from azureml.train.hyperdrive import (
    RandomParameterSampling, choice, PrimaryMetricGoal,
    HyperDriveConfig, MedianStoppingPolicy)
from azureml.pipeline.core.schedule import ScheduleRecurrence, Schedule
import azureml.core
from get_auth import get_auth
print('azureml.core.VERSION={}'.format(azureml.core.VERSION))

from MetricsUtils.hpStatisticsCollection import statisticsCollector, CollectionEntry
from MetricsUtils.storageutils import storageConnection

## Read in the Azure ML workspace  <a id='workspace'></a>
Read in the the workspace created in a previous notebook.

In [None]:
auth = get_auth()
ws = Workspace.from_config(auth=auth)
ws_details = ws.get_details()
print('Name:\t\t{}\nLocation:\t{}'
      .format(ws_details['name'],
              ws_details['location']))

## Upload the data to the cloud <a id='upload'></a>
We put the data in a particular directory on the workspace's default data store. This will show up in the same location in the file system of every job running on the Batch AI cluster.

Get a handle to the workspace's default data store.

In [None]:
ds = ws.get_default_datastore()

Upload the data. We use `overwrite=False` to avoid taking the time to re-upload the data should files with the same names be already present. If you change the data and want to refresh what's uploaded, use `overwrite=True`.

In [None]:
ds.upload(src_dir=os.path.join('.', 'data'), target_path='data', overwrite=False, show_progress=True)

## Define a hyperparameter search configuration <a id='configuration'></a>
Define the hyperparameter space for a random search.  We will use a constant value for the number of estimators that is enough to let us reliably identify the best of the parameter configurations. Once we have the best combination, we will build a model using a larger number of estimators to boost the performance. The table below should give you an idea of the trade-off between the number of estimators and the modeling run time, model size, and model gain.

| Estimators | Run time (s) | Size (MB) | Gain@1 | Gain@2 | Gain@3 |
|------------|--------------|-----------|------------|------------|------------|
|        100 |           40 |  2 | 25.02% | 38.72% | 47.83% |
|       1000 |          177 |  4 | 46.79% | 60.80% | 69.11% |
|       2000 |          359 |  7 | 51.38% | 65.93% | 73.09% |
|       4000 |          628 | 12 | 53.39% | 67.40% | 74.74% |
|       8000 |          904 | 22 | 54,62% | 67.77% | 75.35% |


In [None]:
hyperparameter_sampling = RandomParameterSampling({
    'ngrams': choice(range(1, 5)),
    'match': choice(range(2, 41)),
    'min_child_samples': choice(range(1, 31)),
    'unweighted': choice('Yes', 'No')
})

This hyperparameter space specifies a grid of 9,360 unique configuration points (4 `ngrams` X 39 `match` X 30 `min_child_samples` X 2 `unweighted`). We control the resources used by the search through specifying a maximum number of configuration points to sample as `max_total_runs`.

In [None]:
max_total_runs = 96

It is also possible to specify a maximum duration for the tuning experiment by setting `max_duration_minutes`. If both of these parameters are specified, any remaining runs are terminated once `max_duration_minutes` have passed.

Specify the primary metric to be optimized as the gain at 3, and that it should be maximized. This metric is logged by the training script.

In [None]:
primary_metric_name = "gain@3"
primary_metric_goal = PrimaryMetricGoal.MAXIMIZE

The training script logs the metric throughout training, so we may specify an early termination policy. If no policy is specified, the hyperparameter tuning service will let all training runs run to completion. We use a median stopping policy that terminates runs whose best metrics on the tune dataset are worse than the median of the running averages of the metrics on all training runs, and we delay the policy's application until each run's fifth metric report.

In [None]:
policy = MedianStoppingPolicy(delay_evaluation=5)

## Create an estimator <a id='estimator'></a>
Create an estimator that specifies the location of the script, sets up its fixed parameters, including the location of the data, the compute target, and specifies the packages needed to run the script. It may take a while to prepare the run environment the first time an estimator is used, but that environment will be used until the list of packages is changed.

In [None]:
compute_target = 'hypetuning'
estimator = Estimator(source_directory=os.path.join('.', 'scripts'),
                      entry_script='TrainClassifier.py',
                      compute_target=compute_target,
                      conda_packages=['pandas==0.23.4',
                                      'scikit-learn==0.21.3',
                                      'lightgbm==2.2.1'])

Put the estimator and the configuration information together into an HyperDrive run configuration object.

In [None]:
hyperdrive_run_config = HyperDriveConfig(
    estimator=estimator,
    hyperparameter_sampling=hyperparameter_sampling,
    policy=policy,
    primary_metric_name=primary_metric_name,
    primary_metric_goal=primary_metric_goal,
    max_total_runs=max_total_runs)

## Azure Machine Learning Pipelines: Overview <a id='aml_pipeline_overview'></a>

A common scenario when using machine learning components is to have a data workflow that includes the following steps:

- Preparing/preprocessing a given dataset for training, followed by
- Training a machine learning model on this data, and then
- Deploying this trained model in a separate environment, and finally
- Running a batch scoring task on another data set, using the trained model.

Azure's Machine Learning pipelines give you a way to combine multiple steps like these into one configurable workflow, so that multiple agents/users can share and/or reuse this workflow. Machine learning pipelines thus provide a consistent, reproducible mechanism for building, evaluating, deploying, and running ML systems.

To get more information about Azure machine learning pipelines, please read our [Azure Machine Learning Pipelines overview](https://aka.ms/pl-concept), or the [getting started notebook](https://github.com/Azure/MachineLearningNotebooks/blob/master/how-to-use-azureml/machine-learning-pipelines/intro-to-pipelines/aml-pipelines-getting-started.ipynb).

Let's create a data reference for the raw data to be used in HyperDrive run.

In [None]:
data_folder = DataReference(datastore=ds, data_reference_name="data_folder")

This is the name of the model that will be registered.

In [None]:
model_name = PipelineParameter(name="model_name", default_value="FAQ_ranker")

## Create AML Pipeline Tuning Step <a id='aml_pipeline_tune_step'></a>
We create a HyperDrive step in the AML pipeline to perform a search for hyperparameters. The `tune_estimators` pipeline parameter that controls the number of estimators used in tuning deliberately has a low default value for the speed of pipeline testing.

In [None]:
tune_step_name="tune_model"
tune_estimators = PipelineParameter(name="tune_estimators", default_value=1)  # Set to 1000 when running the pipeline.
tune_step = HyperDriveStep(
    name=tune_step_name,
    hyperdrive_config=hyperdrive_run_config,
    estimator_entry_script_arguments=["--data-folder", data_folder,
                                      "--estimators", tune_estimators],
    inputs=[data_folder],
    allow_reuse=False)

## Create AML Pipeline Best Parameters Step <a id='aml_pipeline_bh_step'></a>
This Python script step gets the best hyperparameters found by a HyperDrive step and writes out them to file.

In [None]:
%%writefile scripts/Best_Hyperparameters.py

from __future__ import print_function
import os
import json
import argparse
import pandas as pd

from azureml.core import Run
from azureml.pipeline.core import PipelineRun
import azureml.core

if __name__ == "__main__":
    
    print("azureml.core.VERSION={}".format(azureml.core.VERSION))

    parser = argparse.ArgumentParser(description="Retrieve the hyperparameters "
                                     "of the best run")
    parser.add_argument("--hd-step", dest="hd_step",
                        help="the name of the HyperDrive step")
    parser.add_argument("--output-steps-data", dest="output_steps_data",
                        help="to share data between different steps in a pipeline",
                        default="outputs")
    parser.add_argument("--hyperparameters", help="the hyperparameters of the best run",
                        default="hyperparameters.json")
    parser.add_argument("--delete",
                        help="comma-separated list of hyperparameters to remove",
                        default="estimators")
    args = parser.parse_args()
    
    # Path to write best hyperparameters.
    os.makedirs(args.output_steps_data, exist_ok=True)
    hyperparameters_path = os.path.join(args.output_steps_data, args.hyperparameters)
    
    # Get the HyperDrive run.
    run = Run.get_context()
    print(run)
    pipeline_run = PipelineRun(run.experiment, run.parent.id)
    print(pipeline_run)
    hd_step_run = pipeline_run.find_step_run(args.hd_step)[0]
    print(hd_step_run)
    hd_run = list(hd_step_run.get_children())[0]
    print(hd_run)
    
    # Get the best run.
    hd_run.wait_for_completion(show_output=True, wait_post_processing=True)
    best_run = hd_run.get_best_run_by_primary_metric()
    if best_run is None:
        raise Exception("No best run was found")
    print(best_run)
    
    # Get its hyperparameters as a dict.
    parameter_values = best_run.get_details()["runDefinition"]["arguments"]
    best_parameters = dict(zip(map(str.strip, parameter_values[::2], 
                                   len(parameter_values[::2]) * ["-"]),
                               parameter_values[1::2]))
    
    # Remove these hyperparameters.
    for key in args.delete.split(","):
        if key in best_parameters:
            del best_parameters[key]
        
    # Print out the hyperparameters.
    print("Best run hyperparameters:")
    print(pd.Series(best_parameters, name="Value").to_frame())
    
    # Write them out to file.
    print("Writing best run hyperparameters to {}".format(hyperparameters_path))
    with open(hyperparameters_path, "w") as fp:
        json.dump(best_parameters, fp)

Creating PythonScript Step for AML pipeline to get the best run's hyperparameters.

In [None]:
bh_step_name = "best_parameters"
bh_steps_data = PipelineData("bh_steps_data", datastore=ds)
bh_hyperparameters_file = "hyperparameters.json"
bh_run_config = RunConfiguration(conda_dependencies=CondaDependencies.create(
    conda_packages=["pandas"],
    pip_packages=["azure-cli", "azureml-sdk", "azureml-pipeline"]))
bh_run_config.environment.docker.enabled = True
bh_step = PythonScriptStep(
    name=bh_step_name,
    script_name="Best_Hyperparameters.py",
    compute_target=compute_target,
    source_directory=os.path.join(".", "scripts"),
    arguments=["--hd-step", tune_step_name,
               "--output-steps-data", bh_steps_data,
               "--hyperparameters", bh_hyperparameters_file],
    outputs=[bh_steps_data],
    runconfig=bh_run_config,
    allow_reuse=False)
bh_step.run_after(tune_step)

## Create AML Pipeline Best Model Step <a id='aml_pipeline_estimator_step'></a>
This step passes the hyperparameters file from the previous step to the training script to create the best model. The `best_estimators` pipeline parameter that controls the number of estimators used in getting the best model deliberately has a low default value for the speed of pipeline testing.

In [None]:
bm_step_name="best_model"
bm_estimators = PipelineParameter(name="best_estimators", default_value=1)  # Set to 8000 when running the pipeline
bm_estimator = Estimator(source_directory=os.path.join('.', 'scripts'),  # Use a new Estimator as a bug workaround
                         entry_script='TrainClassifier.py',
                         compute_target=compute_target,
                         conda_packages=['pandas==0.23.4',
                                         'scikit-learn==0.21.3',
                                         'lightgbm==2.2.1'])
bm_step = EstimatorStep(
    name=bm_step_name,
    estimator=bm_estimator,
    estimator_entry_script_arguments=["--data-folder", data_folder,
                                      "--estimators", bm_estimators,
                                      "--input-steps-data", bh_steps_data,
                                      "--hyperparameters", bh_hyperparameters_file,
                                      "--save", model_name],
    compute_target=compute_target,
    inputs=[data_folder, bh_steps_data],
    allow_reuse=False)

## Create AML Pipeline Register Model Step <a id='aml_pipeline_rm_step'></a>
This Python script step registers the best model found created by an estimator step.

In [None]:
%%writefile scripts/Register_Model.py

from __future__ import print_function
import os
import json
import argparse
import pandas as pd

from azureml.core import Run
from azureml.pipeline.core import PipelineRun
import azureml.core

if __name__ == "__main__":
    
    print("azureml.core.VERSION={}".format(azureml.core.VERSION))

    parser = argparse.ArgumentParser(description="Register the model created by"
                                     " an estimator step")
    parser.add_argument("--es-step", dest="es_step",
                        help="the name of the estimator step")
    parser.add_argument("--outputs", help="the model file outputs directory")
    parser.add_argument("--model-name", dest="model_name",
                        help="the model file base name")
    args = parser.parse_args()
    
    model_name = args.model_name
    model_file = "{}.pkl".format(model_name)
    model_path = os.path.join(args.outputs, model_file)
    
    # Get the Estimator run.
    run = Run.get_context()
    print(run)
    pipeline_run = PipelineRun(run.experiment, run.parent.id)
    print(pipeline_run)
    es_run = pipeline_run.find_step_run(args.es_step)[0]
    print(es_run)
    
    # Register the Estimator step"s model
    es_run.wait_for_completion(show_output=True)
    model = es_run.register_model(model_name=model_name, model_path=model_path)
    print("Estimator model registered")

Creating PythonScript Step for AML pipeline to register the best model.

In [None]:
rm_step_name = "register_model"
rm_run_config = RunConfiguration(conda_dependencies=CondaDependencies.create(
    conda_packages=["pandas"],
    pip_packages=["azure-cli", "azureml-sdk", "azureml-pipeline"]))
rm_run_config.environment.docker.enabled = True
rm_step = PythonScriptStep(
    name=rm_step_name,
    script_name="Register_Model.py",
    compute_target=compute_target,
    source_directory=os.path.join(".", "scripts"),
    arguments=["--es-step", bm_step_name,
               "--outputs", "outputs",
               "--model-name", model_name],
    runconfig=rm_run_config,
    allow_reuse=False)
rm_step.run_after(bm_step)

## Create & Run a Pipeline <a id='create_aml_pipeline'></a>
When we specify the bm_step, Pipeline walks the dependency graph to include the other steps.

In [None]:
experiment_name = "hypetuning"
exp = Experiment(workspace=ws, name=experiment_name)
pipeline = Pipeline(workspace=ws, steps=[rm_step])
pipeline.validate()

Run the pipeline before publishing it. Wait for the run to complete.

In [None]:
pipeline_run = exp.submit(pipeline, continue_on_step_failure=True)
pipeline_run.wait_for_completion(show_output=True)

## Publish The Pipeline  <a id='publish_aml_pipeline'></a>
You may read more about why to publish a pipeline and how the published pipeline can be triggered [here](https://docs.microsoft.com/en-us/azure/machine-learning/service/concept-ml-pipelines)

In [None]:
published_pipeline = pipeline.publish(name="HyperDrive Pipeline",
                                      description="HyperDrive Pipeline",
                                      continue_on_step_failure=True)
published_pipeline.endpoint

## Run published pipeline using its REST endpoint <a id='run_publish_aml_pipeline'></a>
This step shows how to call the rest endpoint of a published pipeline to trigger the pipeline run

In [None]:
# aad_token = auth.get_authentication_header()
# rest_endpoint = published_pipeline.endpoint
# print("You can perform HTTP POST on URL {} to trigger this pipeline".format(rest_endpoint))
# response = requests.post(rest_endpoint, 
#                          headers=aad_token, 
#                          json={"ExperimentName": experiment_name,
#                                "RunSource": "SDK"})
# run_id = response.json()["Id"]
# print(run_id)

## Schedule the published pipeline to run regularly <a id='schedule_aml_pipeline'></a>
This step shows how to schedule the published pipeline to run regularly. This will also submit an initial run since a starting time for the schedule is not supplied.

In [None]:
schedule_name = "hypetuning"
frequency = "Hour"
interval = 1
recurrence = ScheduleRecurrence(frequency=frequency, interval=interval)
schedule = Schedule.create(
    workspace=ws,
    name=schedule_name,
    pipeline_id=published_pipeline.id,
    experiment_name=experiment_name,
    recurrence=recurrence,
    description=schedule_name)

## Write Out the URI <a id='write_uri_key'></a>
Write the URI to the statistics tracker.

In [None]:
statisticsCollector.addEntry(CollectionEntry.AKS_REALTIME_ENDPOINT,
                             published_pipeline.endpoint)


Get a connection string to the workspace's storage to use to save the statistics.

In [None]:
storageConnString = "YOUR_STORAGE_CONNECTION_STRING"

In [None]:
if storageConnString == "YOUR_STORAGE_CONNECTION_STRING":
    resource_group = ws.resource_group
    stgAcctName = ws.get_details()['storageAccount'].split('/')[-1]
    storageConnString = storageConnection.getConnectionStringWithAzCredentials(resource_group, stgAcctName)
print("storage_conn_string: {}".format(storageConnString))

Save the statistics collected so far.

In [None]:
statisticsCollector.uploadContent(storageConnString)

The [next notebook](08_Tear_Down.ipynb) shows how to delete the components created by this tutorial.