# Connect to Workspace

In [None]:
import azureml.core
from azureml.telemetry import set_diagnostics_collection

from azureml.core.workspace import Workspace
ws = Workspace.from_config()
print('Workspace name : ' + ws.name,
      'Azure region   : ' + ws.location,
      'Subscription id: ' + ws.subscription_id,
      'Resource group : ' + ws.resource_group, sep='\n')

# Package scoring code to be sent to all scoring nodes

In [None]:
import os, shutil

# Create a directory that will contain all the necessary code
# that will need to be accessed on the compute targets for execution of the steps
source_directory = './scoring_runtime'
if os.path.exists(source_directory):
    shutil.rmtree(source_directory)
os.makedirs(source_directory, exist_ok=True)
shutil.copy('./pre_processing.py', source_directory) # the pre_processing code
shutil.copy('./scoring.py', source_directory) # the scoring code
shutil.copy('./scoring_custom_package.py', source_directory) # a custom package you may want to 'import' in your scoring.py file

# Scoring Pipeline Definition

In [None]:
from azureml.core import Environment,Datastore
from azureml.core.compute import ComputeTarget
from azureml.core.dataset import Dataset
from azureml.pipeline.core import Pipeline,PipelineData
from azureml.pipeline.core.pipeline_output_dataset import PipelineOutputFileDataset
from azureml.pipeline.steps import ParallelRunStep, ParallelRunConfig

# Environments Definition
cpu_env = Environment.from_conda_specification(name = "ubuntu",file_path = "./conda-scoring-cpu.yml")
cpu_env.docker.base_image = "mcr.microsoft.com/azureml/openmpi3.1.2-ubuntu18.04"
cpu_env.docker.enabled = True
gpu_env = Environment.from_conda_specification(name = "ubuntu-tf-gpu-1.15.3",file_path = "./conda-scoring-gpu.yml")
gpu_env.docker.base_image = "mcr.microsoft.com/azureml/openmpi3.1.2-cuda10.1-cudnn7-ubuntu18.04"
gpu_env.docker.enabled = True

# Compute Targets
cpu_compute_target = ComputeTarget(workspace=ws, name="f8s-cc")
gpu_compute_target = ComputeTarget(workspace=ws, name="nv12-cc")
print(f"CPU Compute Target: {cpu_compute_target.name}")
print(f"GPU Compute Target: {gpu_compute_target.name}")

# control parallelism for earch compute target type
cpu_node_count=1
cpu_process_count_per_node=8 # set this to number of vcores per compute node
gpu_node_count=1
gpu_process_count_per_node=2 # set this to number of gpus per compute node

# scoring input
images_to_score_ni = Dataset.get_by_name(ws, name='images-to-score').as_named_input('images_to_score')

# intermediate data: the pre-processing step in this example will identify images with problems before scoring and set them aside
data_store = Datastore.get_default(ws)
images_pre_processed_pd = PipelineOutputFileDataset(PipelineData(name="images_pre_processed",datastore=data_store))

# scoring output
images_scored_pd = PipelineData(name="images_scored",datastore=data_store)

# pre_processing step
pre_processing_run_config = ParallelRunConfig(
    source_directory=source_directory,
    entry_script="pre_processing.py",
    mini_batch_size="4",
    error_threshold=1,
    output_action="append_row",
    append_row_file_name="pre_processing.csv",
    environment=cpu_env,
    compute_target=cpu_compute_target,
    node_count=cpu_node_count,
    process_count_per_node=cpu_process_count_per_node
)
pre_processing_step = ParallelRunStep(
    name="pre-processing",
    parallel_run_config=pre_processing_run_config,
    arguments=["--images-pre-processed-folder", images_pre_processed_pd],
    inputs=[images_to_score_ni],
    output=images_pre_processed_pd,
    allow_reuse=True
)

# scoring step
model_name = "tfod_model"
model_version = 11
inference_batch_size = 4 # how many images are scored at a once on each GPU: try to max out the memory of the GPU for maximum speed
scoring_run_config = ParallelRunConfig(
    source_directory=source_directory,
    entry_script="scoring.py",
    mini_batch_size="4",
    error_threshold=1,
    output_action="append_row",
    append_row_file_name="scoring.csv",
    environment=gpu_env,
    compute_target=gpu_compute_target,
    node_count=gpu_node_count,
    process_count_per_node=gpu_process_count_per_node
)
scoring_step = ParallelRunStep(
    name="scoring",
    parallel_run_config=scoring_run_config,
    arguments=["--images-scored-folder", images_scored_pd,
              "--model-name", model_name,
              "--model-version", model_version,
              "--inference-batch-size", inference_batch_size],
    inputs=[images_pre_processed_pd],
    output=images_scored_pd,
    allow_reuse=True
)

pipeline = Pipeline(workspace=ws, steps=[scoring_step])
print(f"Pipeline: {pipeline}")

# Execute the Pipeline

In [None]:
from azureml.core import Experiment

experiment_name = 'tf-batch-scoring'
experiment = Experiment(ws, name=experiment_name)
run = experiment.submit(pipeline,tags={'cpu_nodes': str(cpu_node_count), 'gpu_nodes': str(gpu_node_count)})
run.wait_for_completion(show_output=True)

# Publish the Pipeline as an Endpoint

In [None]:
'''from azureml.pipeline.core import PipelineEndpoint

pipeline_endpoint_name = "tf-batch-scoring"
pipeline_endpoint = PipelineEndpoint.publish(workspace=ws,
                                                name=pipeline_endpoint_name,
                                                pipeline=pipeline,
                                                description="Tensorflow Batch Scoring")
print(f"Pipeline endpoint: {pipeline_endpoint}")'''