Copyright (c) Microsoft Corporation. All rights reserved.

Licensed under the MIT License.

![Impressions](https://PixelServer20190423114238.azurewebsites.net/api/impressions/MachineLearningNotebooks/how-to-use-azureml/machine-learning-pipelines/intro-to-pipelines/aml-pipelines-how-to-use-estimatorstep.png)

# How to use EstimatorStep in AML Pipeline

This notebook shows how to use the EstimatorStep with Azure Machine Learning Pipelines. Estimator is a convenient object in Azure Machine Learning that wraps run configuration information to help simplify the tasks of specifying how a script is executed.


## Prerequisite:
* Understand the [architecture and terms](https://docs.microsoft.com/azure/machine-learning/service/concept-azure-machine-learning-architecture) introduced by Azure Machine Learning
* If you are using an Azure Machine Learning Notebook VM, you are all set. Otherwise, go through the [configuration notebook](../../../configuration.ipynb) to:
    * install the AML SDK
    * create a workspace and its configuration file (`config.json`)

Let's get started. First let's import some Python libraries.

In [3]:
import azureml.core
# check core SDK version number
print("Azure ML SDK Version: ", azureml.core.VERSION)

Azure ML SDK Version:  1.0.41


## Initialize workspace
Initialize a [Workspace](https://docs.microsoft.com/azure/machine-learning/service/concept-azure-machine-learning-architecture#workspace) object from the existing workspace you created in the Prerequisites step. `Workspace.from_config()` creates a workspace object from the details stored in `config.json`.

In [4]:
from azureml.core import Workspace
ws = Workspace.from_config(path="C:\\Users\\anders.swanson\\Documents\\attrition\\compute\\aml_config\\config.json")

print('Workspace name: ' + ws.name, 
      'Azure region: ' + ws.location, 
      'Subscription id: ' + ws.subscription_id, 
      'Resource group: ' + ws.resource_group, sep = '\n')

Workspace name: avadevitsmlsvc
Azure region: westus2
Subscription id: ff2e23ae-7d7c-4cbd-99b8-116bb94dca6e
Resource group: RG-ITSMLTeam-Dev


## Get default AmlCompute
You can create a [compute target](https://docs.microsoft.com/azure/machine-learning/service/concept-azure-machine-learning-architecture#compute-target) for training your model. In this tutorial, you use default `AmlCompute` as your training compute resource.

In [90]:
from azureml.core.compute import AmlCompute, ComputeTarget, DataFactoryCompute
from azureml.exceptions import ComputeTargetException

def get_or_create_compute(workspace, compute_target_name, **kwargs):
    try:
        compute_target = ComputeTarget(workspace=workspace, name=compute_target_name)
        print('Found existing cluster,{},use it.'.format(compute_target_name))
    except ComputeTargetException:
        compute_target = ComputeTarget.create(
            workspace,
            compute_target_name,
            AmlCompute.provisioning_configuration(**kwargs))

        compute_target.wait_for_completion(show_output=True)
    return compute_target

cpu_cluster = get_or_create_compute(workspace=ws,
                                       compute_target_name='mpi-test2',
                                       vm_size='STANDARD_D2_V2',
                                       max_nodes=3
                                       )
# use get_status() to get a detailed status for the current cluster. 
print(cpu_cluster.get_status().serialize())

Found existing cluster,mpi-test2,use it.
{'currentNodeCount': 0, 'targetNodeCount': 0, 'nodeStateCounts': {'preparingNodeCount': 0, 'runningNodeCount': 0, 'idleNodeCount': 0, 'unusableNodeCount': 0, 'leavingNodeCount': 0, 'preemptedNodeCount': 0}, 'allocationState': 'Steady', 'allocationStateTransitionTime': '2019-06-12T21:49:51.507000+00:00', 'errors': None, 'creationTime': '2019-06-12T21:49:40.574046+00:00', 'modifiedTime': '2019-06-12T21:49:56.586313+00:00', 'provisioningState': 'Succeeded', 'provisioningStateTransitionTime': None, 'scaleSettings': {'minNodeCount': 0, 'maxNodeCount': 3, 'nodeIdleTimeBeforeScaleDown': 'PT120S'}, 'vmPriority': 'Dedicated', 'vmSize': 'STANDARD_D2_V2'}


Now that you have created the compute target, let's see what the workspace's `compute_targets` property returns. You should now see one entry named 'cpucluster' of type `AmlCompute`.

## Use a simple script
We have already created a simple "hello world" script. This is the script that we will submit through the estimator pattern. It prints a hello-world message, and if Azure ML SDK is installed, it will also logs an array of values ([Fibonacci numbers](https://en.wikipedia.org/wiki/Fibonacci_number)).

## Build an Estimator object
Estimator by default will attempt to use Docker-based execution. You can also enable Docker and let estimator pick the default CPU image supplied by Azure ML for execution. You can target an AmlCompute cluster (or any other supported compute target types). You can also customize the conda environment by adding conda and/or pip packages.

> Note: The arguments to the entry script used in the Estimator object should be specified as *list* using
    'estimator_entry_script_arguments' parameter when instantiating EstimatorStep. Estimator object's parameter
    'script_params' accepts a dictionary. However 'estimator_entry_script_arguments' parameter expects arguments as
    a list.

> Estimator object initialization involves specifying a list of DataReference objects in its 'inputs' parameter.
    In Pipelines, a step can take another step's output or DataReferences as input. So when creating an EstimatorStep,
    the parameters 'inputs' and 'outputs' need to be set explicitly and that will override 'inputs' parameter
    specified in the Estimator object.

In [91]:
from azureml.core import Datastore
from azureml.data.data_reference import DataReference
from azureml.pipeline.core import PipelineData

def_blob_store = Datastore(ws, "workspaceblobstore")

def_blob_store.upload_files(['./iris.csv'], 
                                  target_path = 'iris', 
                                  overwrite = True, 
                                  show_progress = True)

Uploading ./iris.csv
Uploaded ./iris.csv, 1 files out of an estimated total of 1


$AZUREML_DATAREFERENCE_b5baefca9e994496ac78d936ba0c32a2

In [92]:
input_data = DataReference(
    datastore=def_blob_store,
    data_reference_name="input_data",
    path_on_datastore="iris")

output = PipelineData("output", datastore=def_blob_store)

In [93]:
from azureml.core.runconfig import MpiConfiguration
mpi_config = MpiConfiguration()
mpi_config.process_count_per_node = 2

In [94]:
config_est = {
    'OG':{
        'conda_packages':['scikit-learn'],
        'entry_script':'dummy_train.py'
    },
    'MML_image':{
        'node_count':1,
        # distributed_training=mpi_config,
        'use_gpu':True,
        'conda_packages':['scikit-learn','pyspark', 'pandas', 'numpy'],
        'pip_packages':['pip', 'findspark'],
        'custom_docker_image':'microsoft/mmlspark:gpu-0.12',
        'entry_script':'dummy_train.py'
    },
    'MML_use_spark':{
        'node_count':1,
        'use_gpu':True,
        'conda_packages':['scikit-learn','pyspark', 'pandas', 'numpy'],
        'pip_packages':['pip', 'findspark'],
        'custom_docker_image':'microsoft/mmlspark:gpu-0.12',
        'entry_script':'roll_iris.py'
    },
    'MML_MPI':{
        'node_count':2,
        'distributed_training':mpi_config,
        'use_gpu':True,
        'conda_packages':['scikit-learn','pyspark', 'pandas', 'numpy'],
        'pip_packages':['pip', 'findspark'],
        'custom_docker_image':'microsoft/mmlspark:gpu-0.12',
        'entry_script':'roll_iris.py'
    }
}

In [95]:
from azureml.train.estimator import Estimator

est = Estimator(source_directory='.', 
                compute_target=cpu_cluster, 
               **config_est['MML_MPI'])

## Create an EstimatorStep
[EstimatorStep](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-steps/azureml.pipeline.steps.estimator_step.estimatorstep?view=azure-ml-py) adds a step to run Estimator in a Pipeline.

- **name:** Name of the step
- **estimator:** Estimator object
- **estimator_entry_script_arguments:** 
- **runconfig_pipeline_params:** Override runconfig properties at runtime using key-value pairs each with name of the runconfig property and PipelineParameter for that property
- **inputs:** Inputs
- **outputs:** Output is list of PipelineData
- **compute_target:** Compute target to use 
- **allow_reuse:** Whether the step should reuse previous results when run with the same settings/inputs. If this is false, a new run will always be generated for this step during pipeline execution.
- **version:** Optional version tag to denote a change in functionality for the step

In [96]:
from azureml.pipeline.steps import EstimatorStep

est_step = EstimatorStep(name="Estimator_Train", 
                         estimator=est, 
                         runconfig_pipeline_params=None, 
                         estimator_entry_script_arguments=[
                            '--input_dir', input_data,
                            '--output_dir', output,
                            '--script_dir', ".",
                         ],
                         inputs=[input_data], 
                         outputs=[output], 
                         compute_target=cpu_cluster,
                        )

## Build and Submit the Experiment

In [97]:
from azureml.pipeline.core import Pipeline
from azureml.core import Experiment
pipeline = Pipeline(workspace=ws, steps=[est_step])

In [98]:
pipeline_run = Experiment(ws, 'Estimator_sample').submit(pipeline,
                                                        regenerate_outputs=True)

Created step Estimator_Train [5762d517][542d5e28-fe81-4a91-b7f1-1e88dffc9960], (This step will run and generate new outputs)
Created data reference input_data for StepId [a28f10c5][4794d99c-87ab-41f4-85fa-d7ce08b888fa], (Consumers of this data will generate new runs.)
Submitted pipeline run: b9c0ef19-58a2-4e34-9018-200ae07c6639


## View Run Details

In [99]:
from azureml.widgets import RunDetails
RunDetails(pipeline_run).show()

_PipelineWidget(widget_settings={'childWidgetDisplay': 'popup', 'send_telemetry': True, 'log_level': 'INFO', '…

In [68]:
pipeline_run.tag("script","roll_iris")
pipeline_run.tag("est","shouldn't work")
pipeline_run.tag("compute","holy roller")