In [1]:
# Check core SDK version number
import azureml.core

print("SDK version:", azureml.core.VERSION)

SDK version: 1.25.0


In [2]:
from azureml.core import Workspace

ws = Workspace.from_config()
print('Workspace name: ' + ws.name, 
      'Azure region: ' + ws.location, 
      'Subscription id: ' + ws.subscription_id, 
      'Resource group: ' + ws.resource_group, sep = '\n')

If you run your code in unattended mode, i.e., where you can't give a user input, then we recommend to use ServicePrincipalAuthentication or MsiAuthentication.
Please refer to aka.ms/aml-notebook-auth for different authentication mechanisms in azureml-sdk.
Workspace name: ml-service
Azure region: westus2
Subscription id: 3e0e14b3-7e28-4da7-97de-0f5cb324f030
Resource group: ml


### Create or Attach existing compute resource
By using Azure Machine Learning Compute, a managed service, data scientists can train machine learning models on clusters of Azure virtual machines. Examples include VMs with GPU support. In this tutorial, you create Azure Machine Learning Compute as your training environment. The code below creates the compute clusters for you if they don't already exist in your workspace.

**Creation of compute takes approximately 5 minutes. If the AmlCompute with that name is already in your workspace the code will skip the creation process.**

In [3]:
import os
from azureml.core.compute import AmlCompute, ComputeTarget

# choose a name for your cluster
compute_name = os.environ.get("AML_COMPUTE_CLUSTER_NAME", "ds3cluster")
compute_min_nodes = os.environ.get("AML_COMPUTE_CLUSTER_MIN_NODES", 0)
compute_max_nodes = os.environ.get("AML_COMPUTE_CLUSTER_MAX_NODES", 2)

# This example uses CPU VM. For using GPU VM, set SKU to STANDARD_NC6
vm_size = os.environ.get("AML_COMPUTE_CLUSTER_SKU", "STANDARD_D2_V2")


if compute_name in ws.compute_targets:
    compute_target = ws.compute_targets[compute_name]
    if compute_target and type(compute_target) is AmlCompute:
        print('found compute target. just use it. ' + compute_name)
else:
    print('creating a new compute target...')
    provisioning_config = AmlCompute.provisioning_configuration(vm_size = vm_size,
                                                                min_nodes = compute_min_nodes, 
                                                                max_nodes = compute_max_nodes)

    # create the cluster
    compute_target = ComputeTarget.create(ws, compute_name, provisioning_config)
    
    # can poll for a minimum number of nodes and for a specific timeout.
    # if no min node count is provided it will use the scale settings for the cluster
    compute_target.wait_for_completion(show_output=True, min_node_count=None, timeout_in_minutes=20)
    
     # For a more detailed view of current AmlCompute status, use get_status()
    print(compute_target.get_status().serialize())

found compute target. just use it. ds3cluster


### Create a datastore containing sample images
The input dataset used for this notebook is CSV data which has attributes of different iris flowers. We have created a public blob container `sampledata` on an account named `pipelinedata`, containing iris data set. In the next step, we create a datastore with the name `iris_datastore`, which points to this container. In the call to `register_azure_blob_container` below, setting the `overwrite` flag to `True` overwrites any datastore that was created previously with that name. 

This step can be changed to point to your blob container by providing your own `datastore_name`, `container_name`, and `account_name`.

In [4]:
from azureml.core.datastore import Datastore

account_name = "pipelinedata"
datastore_name="iris_datastore_data"
container_name="sampledata"

iris_data = Datastore.register_azure_blob_container(ws, 
                      datastore_name=datastore_name, 
                      container_name= container_name, 
                      account_name=account_name, 
                      overwrite=True)

### Create a TabularDataset
A [TabularDataSet](https://docs.microsoft.com/en-us/python/api/azureml-core/azureml.data.tabulardataset?view=azure-ml-py) references single or multiple files which contain data in a tabular structure (ie like CSV files) in your datastores or public urls. TabularDatasets provides you with the ability to download or mount the files to your compute. By creating a dataset, you create a reference to the data source location. If you applied any subsetting transformations to the dataset, they will be stored in the dataset as well. The data remains in its existing location, so no extra storage cost is incurred.
You can use dataset objects as inputs. Register the datasets to the workspace if you want to reuse them later.

In [5]:
from azureml.core.dataset import Dataset

iris_ds_name = 'iris_data'

path_on_datastore = iris_data.path('iris/')
input_iris_ds = Dataset.Tabular.from_delimited_files(path=path_on_datastore, validate=False)
named_iris_ds = input_iris_ds.as_named_input(iris_ds_name)

### Intermediate/Output Data
Intermediate data (or output of a Step) is represented by [PipelineData](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-core/azureml.pipeline.core.pipelinedata?view=azure-ml-py) object. PipelineData can be produced by one step and consumed in another step by providing the PipelineData object as an output of one step and the input of one or more steps.

In [6]:
from azureml.pipeline.core import PipelineData

datastore = ws.get_default_datastore()
output_folder = PipelineData(name='inferences', datastore=datastore)

## Registering the Model with the Workspace
Get the pretrained model from a publicly available Azure Blob container, then register it to use in your workspace

In [7]:
model_container_name="iris-model"
model_datastore_name="iris_model_datastore"

model_datastore = Datastore.register_azure_blob_container(ws, 
                      datastore_name=model_datastore_name, 
                      container_name= model_container_name, 
                      account_name=account_name, 
                      overwrite=True)

In [8]:
from azureml.core.model import Model

model_datastore.download('iris_model.pkl')

# register downloaded model
model = Model.register(model_path = "iris_model.pkl/iris_model.pkl",
                       model_name = "iris-prs", # this is the name the model is registered as
                       tags = {'pretrained': "iris"},
                       workspace = ws)

Downloading iris_model.pkl/iris_model.pkl
Downloaded iris_model.pkl/iris_model.pkl, 1 files out of an estimated total of 1
Registering model iris-prs


### Using your model to make batch predictions
To use the model to make batch predictions, you need an **entry script** and a list of **dependencies**:

#### An entry script
This script accepts requests, scores the requests by using the model, and returns the results.
- __init()__ - Typically this function loads the model into a global object. This function is run only once at the start of batch processing per worker node/process. init method can make use of following environment variables (ParallelRunStep input):
    1.	AZUREML_BI_OUTPUT_PATH â€“ output folder path
- __run(mini_batch)__ - The method to be parallelized. Each invocation will have one minibatch.<BR>
__mini_batch__: Batch inference will invoke run method and pass either a list or Pandas DataFrame as an argument to the method. Each entry in min_batch will be - a filepath if input is a FileDataset, a Pandas DataFrame if input is a TabularDataset.<BR>
__run__ method response: run() method should return a Pandas DataFrame or an array. For append_row output_action, these returned elements are appended into the common output file. For summary_only, the contents of the elements are ignored. For all output actions, each returned output element indicates one successful inference of input element in the input mini-batch.
    User should make sure that enough data is included in inference result to map input to inference. Inference output will be written in output file and not guaranteed to be in order, user should use some key in the output to map it to input.
    

#### Dependencies
Helper scripts or Python/Conda packages required to run the entry script.

## Print inferencing script

In [9]:
scripts_folder = "Code"
script_file = "iris_score.py"

# peek at contents
with open(os.path.join(scripts_folder, script_file)) as inference_file:
    print(inference_file.read())

import io
import pickle
import argparse
import numpy as np

from azureml.core.model import Model
from sklearn.linear_model import LogisticRegression

from azureml_user.parallel_run import EntryScript


def init():
    global iris_model

    logger = EntryScript().logger
    logger.info("init() is called.")

    parser = argparse.ArgumentParser(description="Iris model serving")
    parser.add_argument('--model_name', dest="model_name", required=True)
    args, unknown_args = parser.parse_known_args()

    model_path = Model.get_model_path(args.model_name)
    with open(model_path, 'rb') as model_file:
        iris_model = pickle.load(model_file)


def run(input_data):
    logger = EntryScript().logger
    logger.info("run() is called with: {}.".format(input_data))

    # make inference
    num_rows, num_cols = input_data.shape
    pred = iris_model.predict(input_data).reshape((num_rows, 1))

    # cleanup output
    result = input_data.drop(input_data.columns[4:], axis=1)
    result['va

## Build and run the batch inference pipeline
The data, models, and compute resource are now available. Let's put all these together in a pipeline.

###  Specify the environment to run the script
Specify the conda dependencies for your script. This will allow us to install pip packages as well as configure the inference environment.
* Always include **azureml-core** and **azureml-dataset-runtime\[fuse\]** in the pip package list to make ParallelRunStep run properly.
* For TabularDataset, add **pandas** as `run(mini_batch)` uses `pandas.DataFrame` as mini_batch type.

If you're using custom image (`batch_env.python.user_managed_dependencies = True`), you need to install the package to your image.

In [10]:
from azureml.core import Environment
from azureml.core.runconfig import CondaDependencies

predict_conda_deps = CondaDependencies.create(pip_packages=["scikit-learn==0.20.3",
                                                            "azureml-core", "azureml-dataset-runtime[pandas,fuse]"])

predict_env = Environment(name="predict_environment")
predict_env.python.conda_dependencies = predict_conda_deps
predict_env.docker.enabled = True
predict_env.spark.precache_packages = False

'enabled' is deprecated. Please use the azureml.core.runconfig.DockerConfiguration object with the 'use_docker' param instead.


###  Create the configuration to wrap the inference script

In [11]:
from azureml.pipeline.steps import ParallelRunStep, ParallelRunConfig

# In a real-world scenario, you'll want to shape your process per node and nodes to fit your problem domain.
parallel_run_config = ParallelRunConfig(
    source_directory=scripts_folder,
    entry_script=script_file,  # the user script to run against each input
    mini_batch_size='1KB',
    error_threshold=5,
    output_action='append_row',
    append_row_file_name="iris_outputs.txt",
    environment=predict_env,
    compute_target=compute_target, 
    node_count=2,
    run_invocation_timeout=600
)

### Create the pipeline step
Create the pipeline step using the script, environment configuration, and parameters. Specify the compute target you already attached to your workspace as the target of execution of the script. We will use ParallelRunStep to create the pipeline step.

In [12]:
distributed_csv_iris_step = ParallelRunStep(
    name='example-iris',
    inputs=[named_iris_ds],
    output=output_folder,
    parallel_run_config=parallel_run_config,
    arguments=['--model_name', 'iris-prs'],
    allow_reuse=False
)

'enabled' is deprecated. Please use the azureml.core.runconfig.DockerConfiguration object with the 'use_docker' param instead.


### Run the pipeline
At this point you can run the pipeline and examine the output it produced. The Experiment object is used to track the run of the pipeline

In [13]:
from azureml.core import Experiment
from azureml.pipeline.core import Pipeline

pipeline = Pipeline(workspace=ws, steps=[distributed_csv_iris_step])

pipeline_run = Experiment(ws, 'iris-prs').submit(pipeline)

Created step example-iris [6be526d3][4e7b9b06-2ad2-48d3-b044-05cd51e40b13], (This step will run and generate new outputs)
Submitted PipelineRun 7ee258d4-451c-4c27-88e7-e8c97a71e276
Link to Azure Machine Learning Portal: https://ml.azure.com/runs/7ee258d4-451c-4c27-88e7-e8c97a71e276?wsid=/subscriptions/3e0e14b3-7e28-4da7-97de-0f5cb324f030/resourcegroups/ml/workspaces/ml-service&tid=72f988bf-86f1-41af-91ab-2d7cd011db47


## View progress of Pipeline run

The pipeline run status could be checked in Azure Machine Learning portal (https://ml.azure.com). The link to the pipeline run could be retrieved by inspecting the `pipeline_run` object.

In [14]:
# This will output information of the pipeline run, including the link to the details page of portal.
pipeline_run

Experiment,Id,Type,Status,Details Page,Docs Page
iris-prs,7ee258d4-451c-4c27-88e7-e8c97a71e276,azureml.PipelineRun,Preparing,Link to Azure Machine Learning studio,Link to Documentation


### Optional: View detailed logs (streaming) 

In [15]:
## Wait the run for completion and show output log to console
pipeline_run.wait_for_completion(show_output=True)

PipelineRunId: 7ee258d4-451c-4c27-88e7-e8c97a71e276
Link to Azure Machine Learning Portal: https://ml.azure.com/runs/7ee258d4-451c-4c27-88e7-e8c97a71e276?wsid=/subscriptions/3e0e14b3-7e28-4da7-97de-0f5cb324f030/resourcegroups/ml/workspaces/ml-service&tid=72f988bf-86f1-41af-91ab-2d7cd011db47
PipelineRun Status: Running


StepRunId: 977a5778-f7ff-47d8-bc0b-be5b056d3032
Link to Azure Machine Learning Portal: https://ml.azure.com/runs/977a5778-f7ff-47d8-bc0b-be5b056d3032?wsid=/subscriptions/3e0e14b3-7e28-4da7-97de-0f5cb324f030/resourcegroups/ml/workspaces/ml-service&tid=72f988bf-86f1-41af-91ab-2d7cd011db47
StepRun( example-iris ) Status: NotStarted
StepRun( example-iris ) Status: Queued
StepRun( example-iris ) Status: Running

Streaming azureml-logs/20_image_build_log.txt
2021/03/29 09:19:06 Downloading source code...
2021/03/29 09:19:07 Finished downloading source code
2021/03/29 09:19:08 Creating Docker network: acb_default_network, driver: 'bridge'
2021/03/29 09:19:08 Successfully set u

'Finished'

## View Results
In the iris_score.py file above you can see that the Result with the prediction of the iris variety gets returned and then appended to the original input of the row from the csv file. These results are written to the DataStore specified in the PipelineData object as the output data, which in this case is called *inferences*. This contains the outputs from  all of the worker nodes used in the compute cluster. You can download this data to view the results ... below just filters to a random 20 rows

In [16]:
import pandas as pd
import tempfile

prediction_run = pipeline_run.find_step_run(distributed_csv_iris_step.name)[0]
prediction_output = prediction_run.get_output_data(output_folder.name)

target_dir = tempfile.mkdtemp()
prediction_output.download(local_path=target_dir)
result_file = os.path.join(target_dir, prediction_output.path_on_datastore, parallel_run_config.append_row_file_name)

# cleanup output format
df = pd.read_csv(result_file, delimiter=" ", header=None)
df.columns = ["sepal.length", "sepal.width", "petal.length", "petal.width", "variety"]
print("Prediction has ", df.shape[0], " rows")

random_subset = df.sample(n=20)
random_subset.head(20)

Prediction has  74  rows


Unnamed: 0,sepal.length,sepal.width,petal.length,petal.width,variety
47,5.3,3.7,1.5,0.2,Iris-setosa
15,4.8,3.0,1.4,0.1,Iris-setosa
4,4.4,3.2,1.3,0.2,Iris-setosa
55,5.8,4.0,1.2,0.2,Iris-setosa
64,5.4,3.4,1.7,0.2,Iris-setosa
65,6.3,2.5,4.9,1.5,Iris-versicolor
66,6.3,3.3,4.7,1.6,Iris-virginica
18,5.1,3.7,1.5,0.4,Iris-setosa
35,4.7,3.2,1.3,0.2,Iris-setosa
45,6.5,2.8,4.6,1.5,Iris-virginica


## Cleanup compute resources
For re-occurring jobs, it may be wise to keep compute the compute resources and allow compute nodes to scale down to 0. However, since this is just a single run job, we are free to release the allocated compute resources.

In [None]:
# uncomment below and run if compute resources are no longer needed 
# compute_target.delete()