Licensed under the MIT License.
# Build an Azure Machine Learning pipeline for batch scoring

<b>Note:</b>

Select Kernel = Python 3.6 - Azure ML when prompted.

In this lab, you learn how to build an Azure Machine Learning pipeline to run a batch scoring job. Machine learning pipelines optimize your workflow with speed, portability, and reuse, so you can focus on machine learning instead of infrastructure and automation.

The example uses a pretrained Inception-V3 convolutional neural network model implemented in Tensorflow to classify unlabeled images.

In this lab, you complete the following tasks:

- Configure workspace
- Download and store sample data
- Create dataset objects to fetch and output data
- Download, prepare, and register the model in your workspace
- Use provisioned compute target and create a scoring script
- Use the ParallelRunStep class for async batch scoring


The batch scoring example in this tutorial uses only one pipeline step.

## Configure workspace and create a datastore
Create a workspace object from the existing Azure Machine Learning workspace.

In [None]:
from azureml.core import Workspace
ws = Workspace.from_config()

Note: This code snippet expects the workspace configuration to be saved in the current directory or its parent.

## Create a datastore for sample images

get the ImageNet evaluation public data sample from the sampledata public blob container. Call register_azure_blob_container() to make the data available to the workspace under the name images_datastore. Then, set the workspace default datastore as the output datastore. Use the output datastore to score output in the pipeline.

In [None]:
from azureml.core.datastore import Datastore

batchscore_blob = Datastore.register_azure_blob_container(ws, 
                      datastore_name="images_datastore", 
                      container_name="sampledata", 
                      account_name="pipelinedata", 
                      overwrite=True)

def_data_store = ws.get_default_datastore()

## Create dataset objects
When building pipelines, Dataset objects are used for reading data from workspace datastores, and PipelineData objects are used for transferring intermediate data between pipeline steps.

In this scenario, you create Dataset objects that correspond to the datastore directories for both the input images and the classification labels (y-test values). You also create a PipelineData object for the batch scoring output data.

In [None]:
from azureml.core.dataset import Dataset
from azureml.pipeline.core import PipelineData

input_images = Dataset.File.from_files((batchscore_blob, "batchscoring/images/"))
label_ds = Dataset.File.from_files((batchscore_blob, "batchscoring/labels/"))
output_dir = PipelineData(name="scores", 
                          datastore=def_data_store, 
                          output_path_on_compute="batchscoring/results")

Register the datasets to the workspace if you want to reuse it later. This step is optional.

In [None]:
input_images = input_images.register(workspace = ws, name = "input_images")
label_ds = label_ds.register(workspace = ws, name = "label_ds")

## Download and register the model

Download the pretrained Tensorflow model to use it for batch scoring in a pipeline. First, create a local directory where you store the model. Then, download and extract the model.

In [None]:
import os
import tarfile
import urllib.request

if not os.path.isdir("models"):
    os.mkdir("models")
    
response = urllib.request.urlretrieve("http://download.tensorflow.org/models/inception_v3_2016_08_28.tar.gz", "model.tar.gz")
tar = tarfile.open("model.tar.gz", "r:gz")
tar.extractall("models")

Next, register the model to your workspace, so you can easily retrieve the model in the pipeline process. In the register() static function, the model_name parameter is the key you use to locate your model throughout the SDK.

In [None]:
from azureml.core.model import Model
 
model = Model.register(model_path="models/inception_v3.ckpt",
                       model_name="inception",
                       tags={"pretrained": "inception"},
                       description="Imagenet trained tensorflow inception",
                       workspace=ws)

## Attach the remote compute target
Machine learning pipelines can't be run locally, so you run them on cloud resources or remote compute targets. A remote compute target is a reusable virtual compute environment where you run experiments and machine learning workflows.
<br><br>
Update compute target name in cell below before executing it.

In [None]:
compute_target = ws.compute_targets['CPU-Cluster-XX']

## Write a scoring script

To do the scoring, create a batch scoring script called batch_scoring.py, and then write it to the current directory. The script takes input images, applies the classification model, and then outputs the predictions to a results file.

The batch_scoring.py script takes the following parameters, which get passed from the ParallelRunStep you create later:

- --model_name: The name of the model being used.
- --labels_dir: The location of the labels.txt file.
<br><br>
The pipeline infrastructure uses the ArgumentParser class to pass parameters into pipeline steps. For example, in the following code, the first argument --model_name is given the property identifier model_name. In the init() function, Model.get_model_path(args.model_name) is used to access this property.

In [None]:
%%writefile batch_scoring.py

import os
import argparse
import datetime
import time
import tensorflow as tf
from math import ceil
import numpy as np
import shutil
from tensorflow.contrib.slim.python.slim.nets import inception_v3

from azureml.core import Run
from azureml.core.model import Model
from azureml.core.dataset import Dataset

slim = tf.contrib.slim

image_size = 299
num_channel = 3


def get_class_label_dict(labels_dir):
    label = []
    labels_path = os.path.join(labels_dir, 'labels.txt')
    proto_as_ascii_lines = tf.gfile.GFile(labels_path).readlines()
    for l in proto_as_ascii_lines:
        label.append(l.rstrip())
    return label


def init():
    global g_tf_sess, probabilities, label_dict, input_images

    parser = argparse.ArgumentParser(description="Start a tensorflow model serving")
    parser.add_argument('--model_name', dest="model_name", required=True)
    parser.add_argument('--labels_dir', dest="labels_dir", required=True)
    args, _ = parser.parse_known_args()

    label_dict = get_class_label_dict(args.labels_dir)
    classes_num = len(label_dict)

    with slim.arg_scope(inception_v3.inception_v3_arg_scope()):
        input_images = tf.placeholder(tf.float32, [1, image_size, image_size, num_channel])
        logits, _ = inception_v3.inception_v3(input_images,
                                              num_classes=classes_num,
                                              is_training=False)
        probabilities = tf.argmax(logits, 1)

    config = tf.ConfigProto()
    config.gpu_options.allow_growth = True
    g_tf_sess = tf.Session(config=config)
    g_tf_sess.run(tf.global_variables_initializer())
    g_tf_sess.run(tf.local_variables_initializer())

    model_path = Model.get_model_path(args.model_name)
    saver = tf.train.Saver()
    saver.restore(g_tf_sess, model_path)


def file_to_tensor(file_path):
    image_string = tf.read_file(file_path)
    image = tf.image.decode_image(image_string, channels=3)

    image.set_shape([None, None, None])
    image = tf.image.resize_images(image, [image_size, image_size])
    image = tf.divide(tf.subtract(image, [0]), [255])
    image.set_shape([image_size, image_size, num_channel])
    return image


def run(mini_batch):
    result_list = []
    for file_path in mini_batch:
        test_image = file_to_tensor(file_path)
        out = g_tf_sess.run(test_image)
        result = g_tf_sess.run(probabilities, feed_dict={input_images: [out]})
        result_list.append(os.path.basename(file_path) + ": " + label_dict[result[0]])
    return result_list

## Build the pipeline

efore you run the pipeline, create an object that defines the Python environment and creates the dependencies that your batch_scoring.py script requires. The main dependency required is Tensorflow, but you also install azureml-core and azureml-dataprep[fuse] which are required by ParallelRunStep.Also, specify Docker and Docker-GPU support.

In [None]:
from azureml.core import Environment
from azureml.core.conda_dependencies import CondaDependencies
from azureml.core.runconfig import DEFAULT_GPU_IMAGE

cd = CondaDependencies.create(pip_packages=["tensorflow-gpu==1.15.2",
                                            "azureml-core", "azureml-dataprep[fuse]"])
env = Environment(name="parallelenv")
env.python.conda_dependencies = cd
env.docker.base_image = DEFAULT_GPU_IMAGE

## Create the configuration to wrap the script

Create the pipeline step using the script, environment configuration, and parameters. Specify the compute target you already attached to your workspace.

In [None]:
from azureml.pipeline.steps import ParallelRunConfig

parallel_run_config = ParallelRunConfig(
    environment=env,
    entry_script="batch_scoring.py",
    source_directory=".",
    output_action="append_row",
    mini_batch_size="20",
    error_threshold=1,
    compute_target=compute_target,
    process_count_per_node=2,
    node_count=2
)

## Create the pipeline step

A pipeline step is an object that encapsulates everything you need to run a pipeline, including:

- Environment and dependency settings
- The compute resource to run the pipeline on
- Input and output data, and any custom parameters
- Reference to a script or SDK logic to run during the step

Multiple classes inherit from the parent class PipelineStep. You can choose classes to use specific frameworks or stacks to build a step. In this example, you use the ParallelRunStep class to define your step logic by using a custom Python script. If an argument to your script is either an input to the step or an output of the step, the argument must be defined both in the arguments array and in either the input or the output parameter, respectively.

In scenarios where there is more than one step, an object reference in the outputs array becomes available as an input for a subsequent pipeline step.

In [None]:
from azureml.pipeline.steps import ParallelRunStep
from datetime import datetime

parallel_step_name = "batchscoring-" + datetime.now().strftime("%Y%m%d%H%M")

label_config = label_ds.as_named_input("labels_input")

batch_score_step = ParallelRunStep(
    name=parallel_step_name,
    inputs=[input_images.as_named_input("input_images")],
    output=output_dir,
    arguments=["--model_name", "inception",
               "--labels_dir", label_config],
    side_inputs=[label_config],
    parallel_run_config=parallel_run_config,
    allow_reuse=False
)

## Submit the pipeline

Now, run the pipeline. First, create a Pipeline object by using your workspace reference and the pipeline step you created. The steps parameter is an array of steps. In this case, there's only one step for batch scoring. To build pipelines that have multiple steps, place the steps in order in this array.

Next, use the Experiment.submit() function to submit the pipeline for execution. The wait_for_completion function outputs logs during the pipeline build process. You can use the logs to see current progress.

In [None]:
from azureml.core import Experiment
from azureml.pipeline.core import Pipeline

pipeline = Pipeline(workspace=ws, steps=[batch_score_step])
pipeline_run = Experiment(ws, 'Lab-05-batch_scoring').submit(pipeline)
pipeline_run.wait_for_completion(show_output=True)

## Download and review output

In [None]:
conda install pandas

In [None]:
import pandas as pd

batch_run = next(pipeline_run.get_children())
batch_output = batch_run.get_output_data("scores")
batch_output.download(local_path="inception_results")

for root, dirs, files in os.walk("inception_results"):
    for file in files:
        if file.endswith("parallel_run_step.txt"):
            result_file = os.path.join(root, file)

df = pd.read_csv(result_file, delimiter=":", header=None)
df.columns = ["Filename", "Prediction"]
print("Prediction has ", df.shape[0], " rows")
df.head(10)

### --- End ---

In [7]:
#Increase width
from IPython.core.display import display, HTML
display(HTML("<style>.container { width:80% !important; }</style>"))