# Authoring repeatable processes aka AzureML pipelines

In [None]:
from azureml.core import Workspace

ws = Workspace.from_config()
dataset = ws.datasets["diabetes-tabular"]
compute_target = ws.compute_targets["cpu-cluster"]

In [None]:
from azureml.core import RunConfiguration

# To simplify we are going to use a big demo environment instead
# of creating our own specialized environment. We will also use
# the same environment for all steps, but this is not needed.
runconfig = RunConfiguration()
runconfig.environment = ws.environments["AzureML-lightgbm-3.2-ubuntu18.04-py37-cpu"]

## Step 1 - Convert data into LightGBM dataset

In [None]:
from azureml.pipeline.core import PipelineData

step01_output = PipelineData(
    "training_data", datastore=ws.get_default_datastore(), is_directory=True
)

In [None]:
from azureml.pipeline.core import PipelineParameter
from azureml.data.dataset_consumption_config import DatasetConsumptionConfig

ds_pipeline_param = PipelineParameter(name="dataset", default_value=dataset)
step01_input_dataset = DatasetConsumptionConfig("input_dataset", ds_pipeline_param)

In [None]:
from azureml.pipeline.steps import PythonScriptStep

step_01 = PythonScriptStep(
    "step01_data_prep.py",
    source_directory="040_scripts",
    arguments=["--dataset-id", step01_input_dataset, "--output-path", step01_output],
    name="Prepare data",
    runconfig=runconfig,
    compute_target=compute_target,
    inputs=[step01_input_dataset],
    outputs=[step01_output],
    allow_reuse=True,
)

## Step 2 - Train the LightGBM model

In [None]:
from azureml.pipeline.core import PipelineParameter

learning_rate_param = PipelineParameter(name="learning_rate", default_value=0.05)

In [None]:
step02_output = PipelineData(
    "model_output", datastore=ws.get_default_datastore(), is_directory=True
)

In [None]:
step_02 = PythonScriptStep(
    "step02_train.py",
    source_directory="040_scripts",
    arguments=[
        "--learning-rate",
        learning_rate_param,
        "--input-path",
        step01_output,
        "--output-path",
        step02_output,
    ],
    name="Train model",
    runconfig=runconfig,
    compute_target=compute_target,
    inputs=[step01_output],
    outputs=[step02_output],
)

## Step 3 - Register model

In [None]:
step_03 = PythonScriptStep(
    "step03_register.py",
    source_directory="040_scripts",
    arguments=[
        "--input-path",
        step02_output,
        "--dataset-id",
        step01_input_dataset,
    ],
    name="Register model",
    runconfig=runconfig,
    compute_target=compute_target,
    inputs=[step01_input_dataset, step02_output],
)

## Create pipeline

In [None]:
from azureml.pipeline.core import Pipeline

pipeline = Pipeline(workspace=ws, steps=[step_01, step_02, step_03])

## Trigger pipeline through SDK

In [None]:
from azureml.core import Experiment

# Using the SDK
experiment = Experiment(ws, "pipeline-run")
pipeline_run = experiment.submit(pipeline, pipeline_parameters={"learning_rate": 0.5})
pipeline_run.wait_for_completion()

## Register pipeline to reuse

In [None]:
published_pipeline = pipeline.publish(
    "Training pipeline", description="A pipeline to train a LightGBM model"
)

## Trigger published pipeline through REST

In [None]:
from azureml.core.authentication import InteractiveLoginAuthentication

auth = InteractiveLoginAuthentication()
aad_token = auth.get_authentication_header()

In [None]:
import requests

response = requests.post(
    published_pipeline.endpoint,
    headers=aad_token,
    json={
        "ExperimentName": "pipeline-run",
        "ParameterAssignments": {"learning_rate": 0.02},
    },
)

print(
    f"Made a POST request to {published_pipeline.endpoint} and got {response.status_code}."
)
print(f"The portal url for the run is {response.json()['RunUrl']}")

## Scheduling a pipeline

In [None]:
from azureml.pipeline.core.schedule import ScheduleRecurrence, Schedule
from datetime import datetime

recurrence = ScheduleRecurrence(
    frequency="Month", interval=1, start_time=datetime.now()
)

schedule = Schedule.create(
    workspace=ws,
    name="pipeline-schedule",
    pipeline_id=published_pipeline.id,
    experiment_name="pipeline-schedule-run",
    recurrence=recurrence,
    wait_for_provisioning=True,
    description="Schedule to retrain model",
)

print("Created schedule with id: {}".format(schedule.id))

In [None]:
from azureml.pipeline.core.schedule import Schedule

# Disable schedule
schedules = Schedule.list(ws, active_only=True)
print("Your workspace has the following schedules set up:")
for schedule in schedules:
    print(f"Disabling {schedule.id} (Published pipeline: {schedule.pipeline_id}")
    schedule.disable(wait_for_provisioning=True)