# Creating an ML pipeline

In [1]:
from azureml.core import Workspace

workspace = Workspace.from_config()

If you run your code in unattended mode, i.e., where you can't give a user input, then we recommend to use ServicePrincipalAuthentication or MsiAuthentication.
Please refer to aka.ms/aml-notebook-auth for different authentication mechanisms in azureml-sdk.


In [3]:
from pathlib import Path

from azureml.core import RunConfiguration, Environment

model_dir = Path("../model")

run_config = RunConfiguration()
run_config.environment = Environment.from_conda_specification(
    "model-env", model_dir / "environment.yml"
)

In [4]:
from azureml.core.compute import AmlCompute, ComputeTarget
from azureml.core.compute_target import ComputeTargetException


def _get_or_create_cluster(
    workspace,
    name,
    vm_size="STANDARD_D2_V2",
    vm_priority="lowpriority",
    min_nodes=0,
    max_nodes=4,
    idle_seconds_before_scaledown="300",
    wait=False,
):
    """Helper function for creating a cluster of VMs as compute target."""

    try:
        # pylint: disable=abstract-class-instantiated
        target = ComputeTarget(workspace=workspace, name=name)
        print("Using existing cluster %s" % name)
    except ComputeTargetException: 
        print("Creating cluster %s" % name)
        config = AmlCompute.provisioning_configuration(
            vm_size=vm_size,
            vm_priority=vm_priority,
            min_nodes=min_nodes,
            max_nodes=max_nodes,
            idle_seconds_before_scaledown=idle_seconds_before_scaledown,
        )
        target = ComputeTarget.create(workspace, name, config)

        if wait:
            target.wait_for_completion(show_output=False)

    return target


compute_target = _get_or_create_cluster(workspace, name="my-cluster", wait=True)

In [5]:
from azureml.data.data_reference import DataReference

datastore = workspace.get_default_datastore()

train_data = DataReference(
    datastore=datastore,
    data_reference_name="train_data",
    path_on_datastore="titanic",
)

In [6]:
from azureml.pipeline.core import PipelineData

preprocessed_data = PipelineData(
    "preprocessed_data", 
    datastore=datastore,
)

model_data = PipelineData(
    "model_data", 
    datastore=datastore, 
    pipeline_output_name="model",
)

In [7]:
from azureml.pipeline.steps import PythonScriptStep

preprocess_step = PythonScriptStep(
    name="preprocess",
    script_name="scripts/preprocess.py",
    arguments=["--input_dir", train_data, "--output_dir", preprocessed_data],
    inputs=[train_data],
    outputs=[preprocessed_data],
    compute_target=compute_target,
    runconfig=run_config,
    source_directory=str(model_dir),
    allow_reuse=False,
)

In [8]:
from azureml.pipeline.core import Pipeline

pipeline = Pipeline(
    workspace=workspace,
    steps=[
        preprocess_step, 
    ],
)
pipeline.validate()

Step preprocess is ready to be created [aa25c266]


[]

In [9]:
published_pipeline = pipeline.publish(
    name="my-first-pipeline",
    description="Description of my first pipeline.",
    continue_on_step_failure=False,
) 

print(published_pipeline)

Created step preprocess [aa25c266][60bc56cf-b688-4500-920f-805f22e0b1b7], (This step will run and generate new outputs)
Using data reference train_data for StepId [00af37ce][bc992f3e-4530-44e8-8a0b-28d75f6a510b], (Consumers of this data are eligible to reuse prior runs.)
Pipeline(Name: my-first-pipeline,
Id: 591b06a0-83c5-4a1e-9798-befc5c0197e8,
Status: Active,
Endpoint: https://westeurope.aether.ms/api/v1.0/subscriptions/5ddf05c0-b972-44ca-b90a-3e49b5de80dd/resourceGroups/julian-playground/providers/Microsoft.MachineLearningServices/workspaces/julian-ml/PipelineRuns/PipelineSubmit/591b06a0-83c5-4a1e-9798-befc5c0197e8)


In [None]:
# Can fetch pipeline using the pipeline ID as follows:
# from azureml.pipeline.core import PipelineRun, PublishedPipeline
# pipeline = PublishedPipeline.get(workspace=workspace, id=pipeline_id)

In [None]:
import time

import requests

from azureml.pipeline.core import PipelineRun
from azureml.core.authentication import InteractiveLoginAuthentication


def _wait_for_run_completion(pipeline_run):
    """Helper function that waits for the pipeline to finish."""
    
    JOB_STATUS = {
        "not_started": {0, "NotStarted"},
        "running": {1, "Running"},
        "failed": {2, "Failed"},
        "cancelled": {3, "Cancelled"},
        "finished": {4, "Finished"},
    }
    
    print("Waiting for job to start...")
    status = pipeline_run.get_status()
    while status in JOB_STATUS["not_started"]:
        time.sleep(1)
        status = pipeline_run.get_status()

    if status in JOB_STATUS["running"]:
        print("Job started, waiting for completion...")
        while status in JOB_STATUS["running"]:
            time.sleep(1)
            status = pipeline_run.get_status()

    if status in JOB_STATUS["finished"]:
        print("Job finished successfully!")
    elif status in JOB_STATUS["failed"]:
        print("ERROR: Job failed!")
    elif status in JOB_STATUS["cancelled"]:
        print("WARNING: Job was cancelled.")
    else:
        raise ValueError(f"Unexpected status '{status}'")
        
    
experiment_name = "my-first-experiment"

auth = InteractiveLoginAuthentication()
aad_token = auth.get_authentication_header()

request_payload = {
    "ExperimentName": experiment_name,
    "ParameterAssignments": {},
}

response = requests.post(published_pipeline.endpoint, headers=aad_token, json=request_payload)
response.raise_for_status()

run_id = response.json()["Id"]
print("Job ID: %s" % run_id)
 
pipeline_run = PipelineRun.get(workspace, run_id)
_wait_for_run_completion(pipeline_run) 

### Exercise

Build the rest of the pipeline.

TODO: Screenshot


In [16]:
%load answers/pipeline.py

Assignment: See if you can find the running pipeline in your Azure ML portal. 