# Parallel Batch Scoring pipeline example

In this example, we'll build a pipeline that is able to batch score data in parallel on one or multiple nodes. This can be used to either score large amounts of data or train many models in parallel.

In [None]:
import os
import azureml.core
from azureml.core import Workspace, Experiment, Dataset, RunConfiguration
from azureml.pipeline.core import Pipeline
from azureml.pipeline.steps import ParallelRunStep, ParallelRunConfig
from azureml.data import OutputFileDatasetConfig
from azureml.data.dataset_consumption_config import DatasetConsumptionConfig

print("Azure ML SDK version:", azureml.core.VERSION)


First, we will connect to the workspace. The command `Workspace.from_config()` will either:
* Read the local `config.json` with the workspace reference (given it is there) or
* Use the `az` CLI to connect to the workspace and use the workspace attached to via `az ml folder attach -g <resource group> -w <workspace name>`

In [None]:
ws = Workspace.from_config()
print(f'WS name: {ws.name}\nRegion: {ws.location}\nSubscription id: {ws.subscription_id}\nResource group: {ws.resource_group}')

# Preparation

Let's register the provided `model.pkl` as model in our workspace. We'll use this model for batch scoring in the pipeline:

In [None]:
from azureml.core.model import Model
Model.register(model_path="model.pkl",
               model_name="credit_model_tutorial",
               description="Example model for batch scoring tutorial",
               workspace=ws)

Let's also register a dataset with data that we want to use for batch scoring:

In [None]:
from azureml.core import Dataset

datastore = ws.get_default_datastore()
datastore.upload(src_dir='../data-batch-scoring', target_path='german-credit-batch-tutorial', overwrite=True)
ds = Dataset.File.from_files(path=[(datastore, 'german-credit-batch-tutorial')])
ds.register(ws, name='german-credit-batch-tutorial', description='Dataset for batch scoring tutorial', create_new_version=True)

Next, let's reference our newly created batch scoring dataset, so that we can use it as the pipeline input:

In [None]:
batch_dataset = Dataset.get_by_name(ws, "german-credit-batch-tutorial")
batch_dataset_consumption = DatasetConsumptionConfig("batch_dataset", batch_dataset).as_download()

Now let's create a output dataset that will contain our predictions. This gives us complete freedom where we want to store the predictions on the datastore:

In [None]:
datastore = ws.get_default_datastore()

# This will put the output results into a pre-defined folder on our datastore and optionally register it as a dataset (not required)
output_dataset = OutputFileDatasetConfig(name='batch_results',
                                         destination=(datastore, 'batch-scoring-results/{run-id}')).register_on_complete(name='batch-scoring-results')


Next, we can create a `ParallelRunStep` that runs our batch scoring code in parallel on one or more nodes. In this case, we use a `ParallelRunConfig` from a YAML file, that defines our batch scoring job (source script, environement, parallelization, target cluster, etc.)

In [None]:
parallel_run_config = ParallelRunConfig.load_yaml(workspace=ws, path="parallel_runconfig.yml")

batch_step = ParallelRunStep(
    name="batch-inference-step",
    parallel_run_config=parallel_run_config,
    arguments=['--model_name', 'credit_model_tutorial'],
    inputs=[batch_dataset_consumption],
    side_inputs=[],
    output=output_dataset,
    allow_reuse=False
)

steps = [batch_step]

Finally, we can create our pipeline object and validate it. This will check the input and outputs are properly linked and that the pipeline graph is a non-cyclic graph:

In [None]:
pipeline = Pipeline(workspace=ws, steps=steps)
pipeline.validate()

Lastly, we can submit the pipeline against an experiment:

In [None]:
pipeline_run = Experiment(ws, 'batch-scoring-pipeline').submit(pipeline)
pipeline_run.wait_for_completion()

Last but not least, we can nnow download the resulting dataset and have a look at our predictions. For easy of use, we'll just download it here to a folder named `temp`:

In [None]:
Dataset.get_by_name(ws, "batch-scoring-results").download(target_path="temp/", overwrite=True)
with open('temp/batch-predictions.txt','r') as f:
    print(f.read())