# Pipelines

This document will take you through understanding what a pipeline is in Azure Machine Learning.

## Introduction

### But i've already heard of pipelines?

In AML, we run a process as an experiment. This experiment can pull from data stores and use azure compute.

In a traditional ML project, you normally split your overall process into tasks that complete the data wrangling, modeling and visualisation/deployment separately. These can be split into smaller tasks and orchestrated together to form a `pipeline`.

Pipelines are a common term in ML. You will come across it in the following areas:
- scikit learn pipeline: combine data preprocessing transformations and ML modeling
- azure devops pipeline: these pipelines are for defining your build or release process 
- azure machine learning pipelines: group steps needed to run tasks in AML

Hence, you could have a scikit learn pipeline, running in an AML pipeline, that is initiated by a devops pipeline.

### What are pipelines?
A pipeline in AML refers to a `workflow` where each task within the workflow is refered to as a `step`. These steps can run in a sequence, or parallel. 

#### Steps of a ML pipeline
Step types:
- PythonScriptStep: runs a python script
- EstimatorStep: runs an estimator 
- DataTransferStep: Use ADF to copy data between data stores 
- DatabricksStep: Runs a notebook/script/JAR on databricks cluster 
- AdlaStep: runs U-SQL job in Azure Data Lake Analytics

### Example - lets create a pipeline.


In [None]:
from azureml.pipeline.steps import PythonScriptStep, EstimatorStep
from azureml.pipeline.core import pipeline
from azureml.core import Experiment

# construct experiment
experiment = Experiment(workspace=ws, name='training-pipeline')

# step1: run python script
step1 = PythonScriptStep(
    name='prepare data',
    source_directory='scripts',
    script_name='data_prep.py',
    compute_target='aml-cluster',
    runconfig=run_config
)

# step2: run estimator
step2 = EstimatorStep(
    name='train model',
    estimator= sk_estimator,
    compute_target='aml-cluster'
)

# construct pipeline
train_pipeline= Pipeline(workspace=ws, steps=[step1, step2])

# run pipeline
pipeline_run = experiment.submit(train_pipeline)

## What if we want to pass data between pipeline steps?

We use a `PipelineData object`:
- references a location in a datastore
- creates a data dependency between pipeline steps. It is a place to store data whilst between pipeline steps

To use a PipelineData object to pass data between two steps in a pipeline, you have to:
- Define PipelineData Object that references a location in a datastore
- It must be an input or an output for the steps that use It
- If it is an input, provide PipelineData object as a script parameter for the scripts that use it


For example:
If you had a python script that was called:

In [None]:
### This is the python script
%%writefile somefolder/somescript.py
from azureml.core import Run
from argparse
import os

# get experiment run context
run = Run.get_context()

# get dataset that is input into df
raw_df = run.input_datasets['raw_data'].to_pandas_dataframe()

# get pipeline arguments
parser = argparse.ArgumentParser()
parser.add_argument(
    '--folder', type=str, dest='folder'
)
args = parser.parse_args()
output_folder= args.folder

# code to prep data
prepped_df = raw_df[['col1', 'col2', 'col3']]

# save prepped data to the PipelineData location
os.makedirs(output_folder, exist_ok=True)
output_path = os.path.join(output_folder, 'prepped_data.csv')
prepped_df.to_csv(output_path)

In [None]:
from azureml.pipeline.core import PipelineData
from azureml.pipeline.steps import PythonScriptStep, EstimatorStep

# get dataset for the initial data
raw_ds = Dataset.get_by_name(ws, 'raw_dataset')

# get default datastore
data_store = ws.get_default_datastore()

# define PipelineData object to pass between pipeline steps
prepped_data = PipelineData('prepped', datastore=data_store)

# pipeline step1: python script
step1 = PythonScriptStep(
    name='prepped data',
    source_directory='scripts',
    compute_target='aml-cluster',
    runconfig=run_config,
    # input dataset
    inputs=[
        raw_ds.as_named_input('raw_data')
    ],
    # pipelinedata obj for output
    outputs = [
        prepped_data
    ],
    arguments=['--folder', prepped_data]
)

# pipeline step2: estimator
step2 = EstimatorStep(
    name='train model',
    estimator=sk_estimator,
    compute_target='aml-cluster',
    inputs=[prepped_data],
    estimator_entry_script_arguments=['--folder', prepped_data]
)

<hr>

## Reusing pipeline steps

AML pipeline has a caching and reuse feature for pipelines that are long-running and have multiple steps.

### Managing the multi step output reuse
Default: previous pipeline output is reused
- However, this changes if the script, source directory or if the parameters change

This can reduce time, but can lead to stale results when changes to later data sources are not accounted for.

To control reuse, specify the `allow_reuse` parameter.

In [None]:
stepX = PythonScriptStep(
    name='prep data',
    source_directory='scripts',
    script_name='data_prep.py',
    compute_target='aml-cluster',
    runconfig=run_config,
    inputs=[raw_ds.as_named_input('raw_data')],
    outputs=[prepped_data],
    arguments=['--folder', prepped_data],
    # distable step reuse - forcing reloading this step
    allow_reuse = False
)

You can also force the entire pipeline to rerun by setting the `regenerate_outputs` param:
```python
pipeline_run = experiment.submit(train_pipeline, regenerate_outputs=True)
```

# Publishing pipelines

How can we publish a pipeline, so that others can use our amazing code?

The answer is by creating a REST endpoint. This will allow the pipeline to be ran on demand. 

There are two ways to do this:
- call the `publish` method of the pipeline before the run
- call the `publish` method on a successfuly run pipeline

```python
published_pipeline=pipeline.publish(
    name='training-pipline',
    description='model training pipeline',
    version='1.0'
)
```

***Or***

```python
published_pipeline=run.publish_pipeline(
    name='training-pipeline',
    description='model training pipeline',
    version='1.0'
)
```

You can check if you have successfuly published a pipeline via:
- AML studio portal
- check the URI endpoint

```python
rest_endpoint = published_pipeline.endpoint
print(rest_endpoint)
```

But how can I then interact and call this REST endpoint? Well sir, that is the easy part.

you just have to make a HTTP request to the endpoint, passing :
- authorisation header: with a token for a service principle with permission to run the pipeline
- JSON payload: specifying the experiment name

Example:
```python
import requests

response = requests.post(
    rest_endpoint,
    headers=auth_header,
    json = {
        'ExperimentName': 'run_training_pipeline'
    }
)
```

## What about passing parameters to a pipeline?

You can use a `PipelineParameter` object to pass parameters to a pipeline.

Here is an example:
```python
from azureml.pipeline.core.graph import PipelineParameter

reg_param = PipelineParameter(name='reg_rate',default_value=0.1)

# some more code

# pipeline step
step2 = EstimatorStep(
    name='train model',
    estimator=sk_estimator,
    compute_target='aml-cluster',
    inputs=[prepped],
    estimator_entry_script_arguments= [
        '--folder', prepped,
        '--reg',reg_param
    ]
)
```

You can also pass the parameters needed in a step through JSON payloads for pipelines that are published. 

Here is an example:
```python
response = request.post(
    rest_endpoint,
    headers=auth_header,
    json={
        'ExperimentName':'run_training_pipeline',
        'ParameterAssignments': {
            'reg_rate':0.1
        }
    }
)
```

## What about if I want to schedule my pipeline?

Once you have published your pipeline, you can:
- call it on demand
- run it automatically based on a periodic schedule/based on response on data update

### schedule for periodic intervals
To create a Schedule, you must define a `ScheduleRecurrence` that determines the run frequence.

Example:

```python
from azureml.pipeline.core import ScheduleRecurrence, Schedule

daily = ScheduleRecurrence(
    frequency='Day',
    intervals=1
)

pipeline_schedule = Schedule.create(
    ws,
    name='Daily Training',
    description='trains model every day',
    pipeline_id = published_pipeline.id,
    experiment_name='Training_Pipeline',
    recurrence=daily
)
```

### Schedule on data change
What if we want the pipeline to run on data changing? Here, we have to create a Schedule that monitors a specific path on a datastore.

```python
from azureml.core import datastore
from azureml.pipeline.core import Schedule

training_datastore = Datastore(workspace=ws, name='blob-data')
pipeline_schedule=Schedule.create(
    ws,
    name='Reactive Training',
    description='trains model on data change',
    pipeline_id=published_pipeline_id,
    experiment_name='Training_Pipeline',
    datastore=training_datastore,
    path_on_datastore='data/training'
)
```