Copyright (c) Microsoft Corporation. All rights reserved. 
Licensed under the MIT License.

![Impressions](https://PixelServer20190423114238.azurewebsites.net/api/impressions/NotebookVM/how-to-use-azureml/machine-learning-pipelines/parallel-run/file-dataset-image-inference-mnist.png)

In [76]:
# Check core SDK version number
import azureml.core

print("SDK version:", azureml.core.VERSION)

SDK version: 1.19.0


### Connect to workspace
Create a workspace object from the existing workspace. Workspace.from_config() reads the file config.json and loads the details into an object named ws.

In [92]:
from azureml.core import Workspace, Datastore

ws = Workspace.from_config()

### Create or Attach existing compute resource
By using Azure Machine Learning Compute, a managed service, data scientists can train machine learning models on clusters of Azure virtual machines. Examples include VMs with GPU support. In this tutorial, you create Azure Machine Learning Compute as your training environment. The code below creates the compute clusters for you if they don't already exist in your workspace.

**Creation of compute takes approximately 5 minutes. If the AmlCompute with that name is already in your workspace the code will skip the creation process.**

In [93]:
import os
from azureml.core.compute import AmlCompute, ComputeTarget

# choose a name for your cluster
compute_name = os.environ.get("AML_COMPUTE_CLUSTER_NAME", "cpu-cluster")
compute_min_nodes = os.environ.get("AML_COMPUTE_CLUSTER_MIN_NODES", 0)
compute_max_nodes = os.environ.get("AML_COMPUTE_CLUSTER_MAX_NODES", 4)

# This example uses CPU VM. For using GPU VM, set SKU to STANDARD_NC6
vm_size = os.environ.get("AML_COMPUTE_CLUSTER_SKU", "STANDARD_D2_V2")


if compute_name in ws.compute_targets:
    compute_target = ws.compute_targets[compute_name]
    if compute_target and type(compute_target) is AmlCompute:
        print('found compute target. just use it. ' + compute_name)
else:
    print('creating a new compute target...')
    provisioning_config = AmlCompute.provisioning_configuration(vm_size = vm_size,
                                                                min_nodes = compute_min_nodes, 
                                                                max_nodes = compute_max_nodes)

    # create the cluster
    compute_target = ComputeTarget.create(ws, compute_name, provisioning_config)
    
    # can poll for a minimum number of nodes and for a specific timeout. 
    # if no min node count is provided it will use the scale settings for the cluster
    compute_target.wait_for_completion(show_output=True, min_node_count=None, timeout_in_minutes=20)
    
     # For a more detailed view of current AmlCompute status, use get_status()
    print(compute_target.get_status().serialize())

found compute target. just use it. cpu-cluster


### Create a FileDataset
A [FileDataset](https://docs.microsoft.com/en-us/python/api/azureml-core/azureml.data.filedataset?view=azure-ml-py) references single or multiple files in your datastores or public urls. The files can be of any format. FileDataset provides you with the ability to download or mount the files to your compute. By creating a dataset, you create a reference to the data source location. If you applied any subsetting transformations to the dataset, they will be stored in the dataset as well. The data remains in its existing location, so no extra storage cost is incurred.
You can use dataset objects as inputs. Register the datasets to the workspace if you want to reuse them later.

In [94]:
from azureml.core.dataset import Dataset
from azureml.data.datapath import DataPath
from azureml.pipeline.core import PipelineParameter

def_data_store = Datastore.get(ws, datastore_name='amlpoc_datastore')
mnist_ds_name = 'mnist_version_1_ds'

path_on_datastore = DataPath(datastore=def_data_store, path_on_datastore='mnist/version_1')

input_mnist_ds = Dataset.File.from_files(path=path_on_datastore, validate=False)

The input dataset can be specified as a pipeline parameter, so that you can pass in new data when rerun the PRS pipeline.

In [95]:
from azureml.data.dataset_consumption_config import DatasetConsumptionConfig
from azureml.pipeline.core import PipelineParameter

pipeline_param = PipelineParameter(name="mnist_param", default_value=input_mnist_ds)
input_mnist_ds_consumption = DatasetConsumptionConfig("minist_param_config", pipeline_param).as_mount()

### Intermediate/Output Data
Intermediate data (or output of a Step) is represented by [PipelineData](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-core/azureml.pipeline.core.pipelinedata?view=azure-ml-py) object. PipelineData can be produced by one step and consumed in another step by providing the PipelineData object as an output of one step and the input of one or more steps.

In [96]:
from azureml.pipeline.core import Pipeline, PipelineData
from azureml.data import OutputFileDatasetConfig

#output_dir = PipelineData(name="inferences", datastore=def_data_store)
output_dir = OutputFileDatasetConfig(name='batch_results',
                                         destination=(def_data_store, 'mnist/batch-scoring-results/{run-id}'))
#.register_on_complete(name='batch-scoring-results')

Class OutputFileDatasetConfig: This is an experimental class, and may change at any time.<br/>For more information, see https://aka.ms/azuremlexperimental.
Class OutputDatasetConfig: This is an experimental class, and may change at any time.<br/>For more information, see https://aka.ms/azuremlexperimental.


### Download the Model

Download and extract the model from https://pipelinedata.blob.core.windows.net/mnist-model/mnist-tf.tar.gz to "models" directory

In [70]:
import tarfile
import urllib.request

# create directory for model
model_dir = 'models'
if not os.path.isdir(model_dir):
    os.mkdir(model_dir)

url="https://pipelinedata.blob.core.windows.net/mnist-model/mnist-tf.tar.gz"
response = urllib.request.urlretrieve(url, "model.tar.gz")
tar = tarfile.open("model.tar.gz", "r:gz")
tar.extractall(model_dir)

os.listdir(model_dir)

['mnist-tf.model.data-00000-of-00001',
 'mnist-tf.model.index',
 'mnist-tf.model.meta',
 'saved_model.pb']

### Register the model with Workspace
A registered model is a logical container for one or more files that make up your model. For example, if you have a model that's stored in multiple files, you can register them as a single model in the workspace. After you register the files, you can then download or deploy the registered model and receive all the files that you registered.

Using tags, you can track useful information such as the name and version of the machine learning library used to train the model. Note that tags must be alphanumeric. Learn more about registering models [here](https://docs.microsoft.com/en-us/azure/machine-learning/service/how-to-deploy-and-where#registermodel) 

In [84]:
from azureml.core.model import Model

# register downloaded model 
model = Model.register(model_path="models/",
                       model_name="mnist-prs", # this is the name the model is registered as
                       tags={'pretrained': "mnist"},
                       description="Mnist trained tensorflow model",
                       workspace=ws)

Registering model mnist-prs


### Using your model to make batch predictions
To use the model to make batch predictions, you need an **entry script** and a list of **dependencies**:

#### An entry script
This script accepts requests, scores the requests by using the model, and returns the results.
- __init()__ - Typically this function loads the model into a global object. This function is run only once at the start of batch processing per worker node/process. Init method can make use of following environment variables (ParallelRunStep input):
    1.	AZUREML_BI_OUTPUT_PATH â€“ output folder path
- __run(mini_batch)__ - The method to be parallelized. Each invocation will have one minibatch.<BR>
__mini_batch__: Batch inference will invoke run method and pass either a list or Pandas DataFrame as an argument to the method. Each entry in min_batch will be - a filepath if input is a FileDataset, a Pandas DataFrame if input is a TabularDataset.<BR>
__run__ method response: run() method should return a Pandas DataFrame or an array. For append_row output_action, these returned elements are appended into the common output file. For summary_only, the contents of the elements are ignored. For all output actions, each returned output element indicates one successful inference of input element in the input mini-batch.
    User should make sure that enough data is included in inference result to map input to inference. Inference output will be written in output file and not guaranteed to be in order, user should use some key in the output to map it to input.
    

#### Dependencies
Helper scripts or Python/Conda packages required to run the entry script.

In [97]:
scripts_folder = "Code"
script_file = "digit_identification.py"

# peek at contents
with open(os.path.join(scripts_folder, script_file)) as inference_file:
    print(inference_file.read())


# Copyright (c) Microsoft. All rights reserved.
# Licensed under the MIT license.

import os
import numpy as np
import tensorflow as tf
from PIL import Image
from azureml.core import Model
from opencensus.ext.azure.log_exporter import AzureLogHandler
import logging
from azureml.core import Run

run = Run.get_context(allow_offline=False)

custom_dimensions = {
    "parent_run_id": run.parent.id,
    "step_id": run.id,
    "step_name": run.name,
    "experiment_name": run.experiment.name,
    "run_url": run.parent.get_portal_url(),
    "run_type": "inference"
}

def init():
    global g_tf_sess

    logger = logging.getLogger(__name__)
    logger.setLevel(logging.DEBUG)
    logger.addHandler(logging.StreamHandler())

    # Assumes the environment variable APPLICATIONINSIGHTS_CONNECTION_STRING is already set
    logger.addHandler(AzureLogHandler())
    # Assumes AzureLogHandler was already registered above
    logger.info("Inferring image", extra= {"custom_dimensions":custom_dimensions})

## Build and run the batch inference pipeline
The data, models, and compute resource are now available. Let's put all these together in a pipeline.

###  Specify the environment to run the script
Specify the conda dependencies for your script. This will allow us to install pip packages as well as configure the inference environment.
* Always include **azureml-core** and **azureml-dataset-runtime\[fuse\]** in the pip package list to make ParallelRunStep run properly.

If you're using custom image (`batch_env.python.user_managed_dependencies = True`), you need to install the package to your image.

In [98]:
from azureml.core import Environment
from azureml.core.runconfig import CondaDependencies, DEFAULT_CPU_IMAGE

batch_conda_deps = CondaDependencies.create(pip_packages=["tensorflow==1.15.2", "pillow", "pandas",
                                                          "azureml-core", "azureml-dataset-runtime[fuse]","opencensus-ext-azure>=1.0.1"])
batch_env = Environment(name="batch_environment")
batch_env.python.conda_dependencies = batch_conda_deps
batch_env.environment_variables = {
    "APPLICATIONINSIGHTS_CONNECTION_STRING": 'InstrumentationKey=6a74017c-9be4-48e0-bb67-5cc8e34e0bab;IngestionEndpoint=https://westeurope-1.in.applicationinsights.azure.com/'
}
batch_env.docker.enabled = True
batch_env.docker.base_image = DEFAULT_CPU_IMAGE

###  Create the configuration to wrap the inference script

In [99]:
from azureml.pipeline.core import PipelineParameter
from azureml.pipeline.steps import ParallelRunStep, ParallelRunConfig

parallel_run_config = ParallelRunConfig(
    source_directory=scripts_folder,
    entry_script=script_file,
    mini_batch_size=PipelineParameter(name="batch_size_param", default_value="5"),
    error_threshold=10,
    output_action="append_row",
    append_row_file_name="mnist_outputs.txt",
    environment=batch_env,
    compute_target=compute_target,
    process_count_per_node=PipelineParameter(name="process_count_param", default_value=2),
    node_count=2
)

### Create the pipeline step
Create the pipeline step using the script, environment configuration, and parameters. Specify the compute target you already attached to your workspace as the target of execution of the script. We will use ParallelRunStep to create the pipeline step.

In [100]:
parallelrun_step = ParallelRunStep(
    name="predict-digits-mnist",
    parallel_run_config=parallel_run_config,
    inputs=[ input_mnist_ds_consumption ],
    output=output_dir,
    allow_reuse=False
)

### Run the pipeline
At this point you can run the pipeline and examine the output it produced. The Experiment object is used to track the run of the pipeline

In [101]:
from azureml.core import Experiment

pipeline = Pipeline(workspace=ws, steps=[parallelrun_step])
experiment = Experiment(ws, 'digit_identification')
pipeline_run = experiment.submit(pipeline)

Created step predict-digits-mnist [092a9b35][64692e11-a32d-4498-a264-3534a612f303], (This step will run and generate new outputs)
Submitted PipelineRun 82488159-bd4f-4faf-9f9b-7e93a0ab9c9a
Link to Azure Machine Learning Portal: https://ml.azure.com/experiments/digit_identification/runs/82488159-bd4f-4faf-9f9b-7e93a0ab9c9a?wsid=/subscriptions/d661a889-c8b8-41f2-93ab-99b3ed99b6e7/resourcegroups/AMLPoC/workspaces/amlpocmlws


### Monitor the run

The pipeline run status could be checked in Azure Machine Learning portal (https://ml.azure.com). The link to the pipeline run could be retrieved by inspecting the `pipeline_run` object.

In [62]:
# This will output information of the pipeline run, including the link to the details page of portal.
pipeline_run

Experiment,Id,Type,Status,Details Page,Docs Page
digit_identification,f9ce4d75-688b-4311-a1d3-8ebc1f94c840,azureml.PipelineRun,NotStarted,Link to Azure Machine Learning studio,Link to Documentation


### Optional: View detailed logs (streaming) 

In [34]:
# Wait the run for completion and show output log to console
pipeline_run.wait_for_completion(show_output=True)

PipelineRunId: ab497d76-e504-4937-b0a0-6fc26137824e
Link to Azure Machine Learning Portal: https://ml.azure.com/experiments/digit_identification/runs/ab497d76-e504-4937-b0a0-6fc26137824e?wsid=/subscriptions/d661a889-c8b8-41f2-93ab-99b3ed99b6e7/resourcegroups/AMLPoC/workspaces/amlpocmlws
PipelineRun Status: Running


Expected a StepRun object but received <class 'azureml.core.run.Run'> instead.
This usually indicates a package conflict with one of the dependencies of azureml-core or azureml-pipeline-core.
Please check for package conflicts in your python environment






PipelineRun Execution Summary
PipelineRun Status: Finished
{'runId': 'ab497d76-e504-4937-b0a0-6fc26137824e', 'status': 'Completed', 'startTimeUtc': '2020-12-17T14:48:55.844577Z', 'endTimeUtc': '2020-12-17T15:07:15.18294Z', 'properties': {'azureml.runsource': 'azureml.PipelineRun', 'runSource': 'SDK', 'runType': 'SDK', 'azureml.parameters': '{"batch_size_param":"5","process_count_param":"2"}'}, 'inputDatasets': [], 'outputDatasets': [], 'logFiles': {'logs/azureml/executionlogs.txt': 'https://amlpocwsstorage.blob.core.windows.net/azureml/ExperimentRun/dcid.ab497d76-e504-4937-b0a0-6fc26137824e/logs/azureml/executionlogs.txt?sv=2019-02-02&sr=b&sig=6iWTXU5jT9OuZd9SloQT4oLLTDM%2B8hT%2FkdcbDZEH1c8%3D&st=2020-12-17T14%3A39%3A20Z&se=2020-12-17T22%3A49%3A20Z&sp=r', 'logs/azureml/stderrlogs.txt': 'https://amlpocwsstorage.blob.core.windows.net/azureml/ExperimentRun/dcid.ab497d76-e504-4937-b0a0-6fc26137824e/logs/azureml/stderrlogs.txt?sv=2019-02-02&sr=b&sig=TI3UDTFv%2B%2FyKtMHuJlHomZ43O%2BSLfdk

'Finished'

### Resubmit a with different dataset
Since we made the input a `PipelineParameter`, we can resubmit with a different dataset without having to create an entirely new experiment. We'll use the same datastore but use only a single image.

In [41]:
path_on_datastore = def_data_store.path('mnist/landing/0.png')
single_image_ds = Dataset.File.from_files(path=path_on_datastore, validate=False)

In [42]:
pipeline_run_2 = experiment.submit(pipeline, 
                                   pipeline_parameters={"mnist_param": single_image_ds, 
                                                        "batch_size_param": "1",
                                                        "process_count_param": 1}
)

Submitted PipelineRun 9dc73476-5d35-4d70-a14a-cb9ffc1031f9
Link to Azure Machine Learning Portal: https://ml.azure.com/experiments/digit_identification/runs/9dc73476-5d35-4d70-a14a-cb9ffc1031f9?wsid=/subscriptions/d661a889-c8b8-41f2-93ab-99b3ed99b6e7/resourcegroups/AMLPoC/workspaces/amlpocmlws


In [39]:
# This will output information of the pipeline run, including the link to the details page of portal.
pipeline_run_2

Experiment,Id,Type,Status,Details Page,Docs Page
digit_identification,d6d5325b-42eb-4a14-bb35-ee49877ab916,azureml.PipelineRun,Running,Link to Azure Machine Learning studio,Link to Documentation


In [40]:
# Wait the run for completion and show output log to console
pipeline_run_2.wait_for_completion(show_output=True)

PipelineRunId: d6d5325b-42eb-4a14-bb35-ee49877ab916
Link to Azure Machine Learning Portal: https://ml.azure.com/experiments/digit_identification/runs/d6d5325b-42eb-4a14-bb35-ee49877ab916?wsid=/subscriptions/d661a889-c8b8-41f2-93ab-99b3ed99b6e7/resourcegroups/AMLPoC/workspaces/amlpocmlws
PipelineRun Status: Running


Expected a StepRun object but received <class 'azureml.core.run.Run'> instead.
This usually indicates a package conflict with one of the dependencies of azureml-core or azureml-pipeline-core.
Please check for package conflicts in your python environment






ActivityFailedException: ActivityFailedException:
	Message: Activity Failed:
{
    "error": {
        "code": "UserError",
        "message": "Dataset initialization failed: DatasetExecutionException:\n\tMessage: \nError Code: ScriptExecution.StreamAccess.Authentication.AzureIdentityAccessTokenResolution\nFailed Step: 38503558-00ce-4a26-9486-c77ce3565f3b\nError Message: ScriptExecutionException was caused by StreamAccessException.\n  Unable to authenticate data access to 'https://amlpocwsstorage.blob.core.windows.net/data/0.png'.\n    AuthenticationException was caused by AzureIdentityAccessTokenResolutionException.\n      Compute has no identity provisioned.\n| session_id=bf434ffe-a397-4d78-acd0-3b866db3dd1c\n\tInnerException None\n\tErrorResponse \n{\n    \"error\": {\n        \"code\": \"UserError\",\n        \"message\": \"\\nError Code: ScriptExecution.StreamAccess.Authentication.AzureIdentityAccessTokenResolution\\nFailed Step: 38503558-00ce-4a26-9486-c77ce3565f3b\\nError Message: ScriptExecutionException was caused by StreamAccessException.\\n  Unable to authenticate data access to 'https://amlpocwsstorage.blob.core.windows.net/data/0.png'.\\n    AuthenticationException was caused by AzureIdentityAccessTokenResolutionException.\\n      Compute has no identity provisioned.\\n| session_id=bf434ffe-a397-4d78-acd0-3b866db3dd1c\"\n    }\n}",
        "messageParameters": {},
        "details": []
    },
    "time": "0001-01-01T00:00:00.000Z"
}
	InnerException None
	ErrorResponse 
{
    "error": {
        "message": "Activity Failed:\n{\n    \"error\": {\n        \"code\": \"UserError\",\n        \"message\": \"Dataset initialization failed: DatasetExecutionException:\\n\\tMessage: \\nError Code: ScriptExecution.StreamAccess.Authentication.AzureIdentityAccessTokenResolution\\nFailed Step: 38503558-00ce-4a26-9486-c77ce3565f3b\\nError Message: ScriptExecutionException was caused by StreamAccessException.\\n  Unable to authenticate data access to 'https://amlpocwsstorage.blob.core.windows.net/data/0.png'.\\n    AuthenticationException was caused by AzureIdentityAccessTokenResolutionException.\\n      Compute has no identity provisioned.\\n| session_id=bf434ffe-a397-4d78-acd0-3b866db3dd1c\\n\\tInnerException None\\n\\tErrorResponse \\n{\\n    \\\"error\\\": {\\n        \\\"code\\\": \\\"UserError\\\",\\n        \\\"message\\\": \\\"\\\\nError Code: ScriptExecution.StreamAccess.Authentication.AzureIdentityAccessTokenResolution\\\\nFailed Step: 38503558-00ce-4a26-9486-c77ce3565f3b\\\\nError Message: ScriptExecutionException was caused by StreamAccessException.\\\\n  Unable to authenticate data access to 'https://amlpocwsstorage.blob.core.windows.net/data/0.png'.\\\\n    AuthenticationException was caused by AzureIdentityAccessTokenResolutionException.\\\\n      Compute has no identity provisioned.\\\\n| session_id=bf434ffe-a397-4d78-acd0-3b866db3dd1c\\\"\\n    }\\n}\",\n        \"messageParameters\": {},\n        \"details\": []\n    },\n    \"time\": \"0001-01-01T00:00:00.000Z\"\n}"
    }
}

## Cleanup Compute resources

For re-occurring jobs, it may be wise to keep compute the compute resources and allow compute nodes to scale down to 0. However, since this is just a single-run job, we are free to release the allocated compute resources.

In [None]:
# uncomment below and run if compute resources are no longer needed 
compute_target.delete() 