## Orchestrate machine learning with pipelines

Experiment that we define with Scriptconfig contains every step of a machine learning pipeline which sometimes it is better to split and to be run on one or more compute targets, especially for enterprise solutions. Pipelines are key to implementing MLOps solution in Azure.

In Azure Machine Learning, a pipeline is a workflow of machine learning tasks in which each task is implemented as a step.

It allow us to arrange sequentially or in parallel some tasks and to decide which compute target should run that step.

A pipeline can be executed as an experiment.

There are multiple steps for the pipeline, such as the following ones:
- PythonScriptStep: runs a python script.
- DataTransferStep: uses Azure Data Factory to copy data between data stores.
- DatabricksStep: runs a notebook, script, or compiled JAR on a databricks cluster.
- AdlaStep: Runs a U-SQL job in Azure Data Lake Analytics.
- ParallelRunStep - Runs a Python script as a distributed task on multiple compute nodes.

### Define a pipeline

First of all, the steps must be defined.

```
from azureml.pipeline.steps import PythonScriptStep

# Step to run a Python script
step1 = PythonScriptStep(name = 'prepare data',
                         source_directory = 'scripts',
                         script_name = 'data_prep.py',
                         compute_target = 'aml-cluster')

# Step to train a model
step2 = PythonScriptStep(name = 'train model',
                         source_directory = 'scripts',
                         script_name = 'train_model.py',
                         compute_target = 'aml-cluster')
```

Then, an experiment is initiated which will manage the pipeline defined by the previous steps.

```
from azureml.pipeline.core import Pipeline
from azureml.core import Experiment

# Construct the pipeline
train_pipeline = Pipeline(workspace = ws, steps = [step1,step2])

# Create an experiment and run the pipeline
experiment = Experiment(workspace = ws, name = 'training-pipeline')
pipeline_run = experiment.submit(train_pipeline)
```

### Manage results between steps

In the middle, OutputFileDatasetConfig object are used to deliver intermediary data between steps in the pipeline. It's a special kind of data that references a location in a datastore.

```
from azureml.data import OutputFileDatasetConfig
from azureml.pipeline.steps import PythonScriptStep, EstimatorStep

# Get a dataset for the initial data
raw_ds = Dataset.get_by_name(ws, 'raw_dataset')

# Define a PipelineData object to pass data between steps
data_store = ws.get_default_datastore()
prepped_data = OutputFileDatasetConfig('prepped')

# Step to run a Python script
step1 = PythonScriptStep(name = 'prepare data',
                         source_directory = 'scripts',
                         script_name = 'data_prep.py',
                         compute_target = 'aml-cluster',
                         # Script arguments include PipelineData
                         arguments = ['--raw-ds', raw_ds.as_named_input('raw_data'),
                                      '--out_folder', prepped_data])

# Step to run an estimator
step2 = PythonScriptStep(name = 'train model',
                         source_directory = 'scripts',
                         script_name = 'train_model.py',
                         compute_target = 'aml-cluster',
                         # Pass as script argument
                         arguments=['--training-data', prepped_data.as_input()])
```
In the Python Script File step:
```
# code in data_prep.py
from azureml.core import Run
import argparse
import os

# Get the experiment run context
run = Run.get_context()

# Get arguments
parser = argparse.ArgumentParser()
parser.add_argument('--raw-ds', type=str, dest='raw_dataset_id')
parser.add_argument('--out_folder', type=str, dest='folder')
args = parser.parse_args()
output_folder = args.folder

# Get input dataset as dataframe
raw_df = run.input_datasets['raw_data'].to_pandas_dataframe()

# code to prep data (in this case, just select specific columns)
prepped_df = raw_df[['col1', 'col2', 'col3']]

# Save prepped data to the PipelineData location
os.makedirs(output_folder, exist_ok=True)
output_path = os.path.join(output_folder, 'prepped_data.csv')
prepped_df.to_csv(output_path)
```

### Reuse option

Sometimes, pipelines can take too much time. So, to reduce the computstional time, it's possible to reuse the results of the step in which the code didn't change. This can save a lot of time but on the other hand, results can be a little stale if the input data changed. allow_reuse  is the parameter which enables this behaviour for the chosen step.

```
step1 = PythonScriptStep(name = 'prepare data',
                         source_directory = 'scripts',
                         script_name = 'data_prep.py',
                         compute_target = 'aml-cluster',
                         runconfig = run_config,
                         inputs=[raw_ds.as_named_input('raw_data')],
                         outputs=[prepped_data],
                         arguments = ['--folder', prepped_data]),
                         # Disable step reuse
                         allow_reuse = False)
```

Otherwise, you can force all the pipeline steps to run::

```
pipeline_run = experiment.submit(train_pipeline, regenerate_outputs=True)
```

### Publish

A REST endpoint can be created in order to deploy the pipeline.

```
published_pipeline = pipeline.publish(name='training_pipeline',
                                          description='Model training pipeline',
                                          version='1.0')
```

Or you can select one pipeline among the ones which run in the experiment:

```
# Get the most recent run of the pipeline
pipeline_experiment = ws.experiments.get('training-pipeline')
run = list(pipeline_experiment.get_runs())[0]

# Publish the pipeline from the run
published_pipeline = run.publish_pipeline(name='training_pipeline',
                                          description='Model training pipeline',
                                          version='1.0')
```
You can also define its URI:
```
rest_endpoint = published_pipeline.endpoint
print(rest_endpoint)
```

To initiate a published endpoint, you make an HTTP request to its REST endpoint, passing an authorization header with a token for a service principal with permission to run the pipeline, and a JSON payload specifying the experiment name. The pipeline is run asynchronously, so the response from a successful REST call includes the run ID. You can use the run ID to track the run in Azure Machine Learning studio.

For example, the following Python code makes a REST request to run a pipeline and displays the returned run ID.

```
import requests

response = requests.post(rest_endpoint,
                         headers=auth_header,
                         json={"ExperimentName": "run_training_pipeline"})
run_id = response.json()["Id"]
print(run_id)
```

### Use pipeline parameters

PipelineParameter allows you to define global value for the pipeline.

```
from azureml.pipeline.core.graph import PipelineParameter

reg_param = PipelineParameter(name='reg_rate', default_value=0.01)

...

step2 = PythonScriptStep(name = 'train model',
                         source_directory = 'scripts',
                         script_name = 'train_model.py',
                         compute_target = 'aml-cluster',
                         # Pass parameter as script argument
                         arguments=['--in_folder', prepped_data,
                                    '--reg', reg_param],
                         inputs=[prepped_data])
```

In this way, the pipeline is parameterized and you can pass parameter values in the JSON payload for the REST interface, like this one:

```
response = requests.post(rest_endpoint,
                         headers=auth_header,
                         json={"ExperimentName": "run_training_pipeline",
                               "ParameterAssignments": {"reg_rate": 0.1}})
```

### Schedule pipelines

After the pipeline has been published, schedule it like this:

```
from azureml.pipeline.core import ScheduleRecurrence, Schedule

daily = ScheduleRecurrence(frequency='Day', interval=1)
pipeline_schedule = Schedule.create(ws, name='Daily Training',
                                        description='trains model every day',
                                        pipeline_id=published_pipeline.id,
                                        experiment_name='Training_Pipeline',
                                        recurrence=daily)
```

To schedule a pipeline to run whenever data changes, you must create a Schedule that monitors a specified path on a datastore, like this:

```
from azureml.core import Datastore
from azureml.pipeline.core import Schedule

training_datastore = Datastore(workspace=ws, name='blob_data')
pipeline_schedule = Schedule.create(ws, name='Reactive Training',
                                    description='trains model on data change',
                                    pipeline_id=published_pipeline.id,
                                    experiment_name='Training_Pipeline',
                                    datastore=training_datastore,
                                    path_on_datastore='data/training')
```

## Create a Pipeline

In [4]:
from azureml.core import Workspace

ws = Workspace.from_config()

In [9]:
from azureml.core import Environment

env = Environment.get(workspace = ws, name = 'training_environment')

In [12]:
from azureml.core.compute import ComputeTarget

pipeline_cluster = ComputeTarget(workspace=ws, name='aml-cluster')

In [14]:
from azureml.core.runconfig import RunConfiguration

pipeline_run_config = RunConfiguration()

pipeline_run_config.target = pipeline_cluster

pipeline_run_config.environment = env

In [18]:
from azureml.data import OutputFileDatasetConfig
from azureml.pipeline.steps import PythonScriptStep

# Get the training dataset
diabetes_ds = ws.datasets.get("diabetes dataset")

# Create an OutputFileDatasetConfig (temporary Data Reference) for data passed from step 1 to step 2
prepped_data = OutputFileDatasetConfig("prepped_data")

# Step 1, Run the data prep script
prep_step = PythonScriptStep(name = "Prepare Data",
                                source_directory = 'Script',
                                script_name = "7_Pipeline_I.py",
                                arguments = ['--input-data', diabetes_ds.as_named_input('raw_data'),
                                             '--prepped-data', prepped_data],
                                compute_target = pipeline_cluster,
                                runconfig = pipeline_run_config,
                                allow_reuse = True)

# Step 2, run the training script
train_step = PythonScriptStep(name = "Train and Register Model",
                                source_directory = 'Script',
                                script_name = "8_Pipeline_II.py",
                                arguments = ['--training-data', prepped_data.as_input()],
                                compute_target = pipeline_cluster,
                                runconfig = pipeline_run_config,
                                allow_reuse = True)

print("Pipeline steps defined")

Pipeline steps defined


In [None]:
from azureml.pipeline.core import Pipeline
from azureml.core import Experiment

pipeline_steps = [prep_step, train_step]
pipeline = Pipeline(workspace=ws, steps=pipeline_steps)

experiment = Experiment(workspace = ws, name = 'mslearn-diabetes-pipeline')
pipeline_run = experiment.submit(pipeline, regenerate_outputs=True)
pipeline_run.wait_for_completion(show_output=True)

Created step Prepare Data [2c075aca][02126d70-004f-4201-a0eb-b541915c35e7], (This step will run and generate new outputs)
Created step Train and Register Model [51ad56d2][a27e4535-0133-42f5-b11b-483996e556dd], (This step will run and generate new outputs)
Submitted PipelineRun ac71d0f5-63d0-4ac7-89cf-63a0caa955fb
Link to Azure Machine Learning Portal: https://ml.azure.com/runs/ac71d0f5-63d0-4ac7-89cf-63a0caa955fb?wsid=/subscriptions/d12c1b85-0a70-4232-b483-12d1ffcfc148/resourcegroups/ResourceGroupRavazzi/workspaces/ravazzil-workspace&tid=b00367e2-193a-4f48-94de-7245d45c0947
PipelineRunId: ac71d0f5-63d0-4ac7-89cf-63a0caa955fb
Link to Azure Machine Learning Portal: https://ml.azure.com/runs/ac71d0f5-63d0-4ac7-89cf-63a0caa955fb?wsid=/subscriptions/d12c1b85-0a70-4232-b483-12d1ffcfc148/resourcegroups/ResourceGroupRavazzi/workspaces/ravazzil-workspace&tid=b00367e2-193a-4f48-94de-7245d45c0947
PipelineRun Status: Running


StepRunId: ffb874b6-2043-4151-9c6e-960dc7846bb2
Link to Azure Machine L

In [None]:
for run in pipeline_run.get_children():
    print(run.name, ':')
    metrics = run.get_metrics()
    for metric_name in metrics:
        print('\t',metric_name, ":", metrics[metric_name])

In [None]:
# Publish the pipeline from the run.

published_pipeline = pipeline_run.publish_pipeline(
    name="diabetes-training-pipeline", description="Trains diabetes model", version="1.0")

published_pipeline

In [None]:
rest_endpoint = published_pipeline.endpoint
print(rest_endpoint)

In [None]:
from azureml.core.authentication import InteractiveLoginAuthentication

interactive_auth = InteractiveLoginAuthentication()
auth_header = interactive_auth.get_authentication_header()
print("Authentication header ready.")

In [None]:
import requests

experiment_name = 'mslearn-diabetes-pipeline'

rest_endpoint = published_pipeline.endpoint
response = requests.post(rest_endpoint,
                         headers=auth_header,
                         json={"ExperimentName": experiment_name})
run_id = response.json()["Id"]
run_id