# Parallel Batch Scoring pipeline example

In this example, we'll build a `ParallelRunStep` pipeline that can train many small models for time series. The potentials product IDs, SKUs, etc. for which a model should be trained is configured via a configration dataset. This dataset needs to container all the permutations for which a model should be trained. See [`../sample-data/config.csv`](../sample-data/config.csv) for an example, where in total a 1000 models would be trained, one for each code and company permution.

In [None]:
import os
import azureml.core
from azureml.core import Workspace, Experiment, Dataset, RunConfiguration
from azureml.pipeline.core import Pipeline
from azureml.pipeline.steps import ParallelRunStep, ParallelRunConfig
from azureml.data import OutputFileDatasetConfig
from azureml.data.dataset_consumption_config import DatasetConsumptionConfig

print("Azure ML SDK version:", azureml.core.VERSION)


First, we will connect to the workspace. The command `Workspace.from_config()` will either:
* Read the local `config.json` with the workspace reference (given it is there) or
* Use the `az` CLI to connect to the workspace and use the workspace attached to via `az ml folder attach -g <resource group> -w <workspace name>`

In [None]:
ws = Workspace.from_config()
print(f'WS name: {ws.name}\nRegion: {ws.location}\nSubscription id: {ws.subscription_id}\nResource group: {ws.resource_group}')

# Preparation

Let's create the config dataset for making the pipeline work, in this case this is just a CSV that we register as a `TabularDataset`. This is required as input to `ParallelRunStep`, which either parallizes over rows or over a set of files (when using `FileDataset`).

In [None]:
from azureml.core import Dataset

datastore = ws.get_default_datastore()
datastore.upload(src_dir='../sample-data', target_path='config', overwrite=True)
ds = Dataset.Tabular.from_delimited_files(path=[(datastore, 'config/config.csv')])
ds.register(ws, name='config', description='Configuration dataset for ParallelRunStep', create_new_version=True)

Now we can prepare our config dataset as input for `ParallelRunStep`:

In [None]:
config_dataset = Dataset.get_by_name(ws, "config")
config_dataset_consumption = DatasetConsumptionConfig("config_dataset", config_dataset)

Now let's create a output dataset that will contain our predictions. This gives us complete freedom where we want to store the predictions on the datastore:

In [None]:
#output_dataset = PipelineData(name='batch_output', datastore=ws.get_default_datastore()).as_dataset()
#output_dataset = output_dataset.register(name='batch-scoring-results', create_new_version=True)

datastore = ws.get_default_datastore()
output_dataset = OutputFileDatasetConfig(name='prs_results',
                                         destination=(datastore, 'prs_results/{run-id}')).register_on_complete(name='prs_results')


Next, we can create a `ParallelRunStep` that runs our batch scoring code in parallel on one or more nodes. In this case, we use a `ParallelRunConfig` from a YAML file, that defines our batch scoring job (source script, environement, parallelization, target cluster, etc.)

In [None]:
parallel_run_config = ParallelRunConfig.load_yaml(workspace=ws, path="parallel_runconfig.yml")

prs_step = ParallelRunStep(
    name="parallel-run-step",
    parallel_run_config=parallel_run_config,
    arguments=['--forecast_horizon', '4'],
    inputs=[config_dataset_consumption],
    side_inputs=[],
    output=output_dataset,
    allow_reuse=False
)

steps = [prs_step]

Finally, we can create our pipeline object, validate it and then run it against an experiment:

In [None]:
pipeline = Pipeline(workspace=ws, steps=steps)
pipeline.validate()

pipeline_run = Experiment(ws, 'prs-pipeline').submit(pipeline)
pipeline_run.wait_for_completion()