# Creating a Pipeline

In the previous labs, you used the Azure Machine Learning SDK to explore the entire model training process from accessing data through to running training experiments and registering machine learning models. Up until now, you have performed the various steps required to create a machine learning solution interactively. In this lab, you'll explore automation of these steps using *pipelines*.

## Connect to Your Workspace

The first thing you need to do is to connect to your workspace using the Azure ML SDK.

> **Note**: If the authenticated session with your Azure subscription has expired since you completed the previous exercise, you'll be prompted to reauthenticate.

In [1]:
import azureml.core
from azureml.core import Workspace

# Load the workspace from the saved config file
ws = Workspace.from_config()
print('Ready to use Azure ML {} to work with {}'.format(azureml.core.VERSION, ws.name))

Ready to use Azure ML 1.13.0 to work with BSM_MLWorkspace3


## Prepare Data for an Experiment

In this lab, you'll use a dataset containing details of diabetes patients. Run the cell below to create this dataset (if you created it in the previous lab, the code will find the existing version)

In [2]:
from azureml.core import Dataset

default_ds = ws.get_default_datastore()

if 'diabetes dataset' not in ws.datasets:
    default_ds.upload_files(files=['./data/diabetes.csv', './data/diabetes2.csv'], # Upload the diabetes csv files in /data
                        target_path='diabetes-data/', # Put it in a folder path in the datastore
                        overwrite=True, # Replace existing files of the same name
                        show_progress=True)

    #Create a tabular dataset from the path on the datastore (this may take a short while)
    tab_data_set = Dataset.Tabular.from_delimited_files(path=(default_ds, 'diabetes-data/*.csv'))

    # Register the tabular dataset
    try:
        tab_data_set = tab_data_set.register(workspace=ws, 
                                name='diabetes dataset',
                                description='diabetes data',
                                tags = {'format':'CSV'},
                                create_new_version=True)
        print('Dataset registered.')
    except Exception as ex:
        print(ex)
else:
    print('Dataset already registered.')

Dataset already registered.


## Create Scripts for Pipeline Steps

Pipelines consist of one or more *steps*, which can be Python scripts, or specialized steps like an Auto ML training estimator or a data transfer step that copies data from one location to another. Each step can run in its own compute context.

In this exercise, you'll build a simple pipeline that contains an estimator step (to train a model) and a Python script step (to register the trained model).

In [3]:
import os
# Create a folder for the pipeline step files
experiment_folder = 'diabetes_pipeline'
os.makedirs(experiment_folder, exist_ok=True)

print(experiment_folder)

diabetes_pipeline


Now you can create the script for the first step, which will train a model. The script includes a parameter named **output_folder**, which references the folder where the trained model should be saved.

In [4]:
%%writefile $experiment_folder/train_diabetes.py
# Import libraries
from azureml.core import Run
import argparse
import pandas as pd
import numpy as np
import joblib
import os
from sklearn.model_selection import train_test_split
from sklearn.tree import DecisionTreeClassifier
from sklearn.metrics import roc_auc_score
from sklearn.metrics import roc_curve
import matplotlib.pyplot as plt

# Get parameters
parser = argparse.ArgumentParser()
parser.add_argument('--output_folder', type=str, dest='output_folder', default="diabetes_model", help='output folder')
args = parser.parse_args()
output_folder = args.output_folder

# Get the experiment run context
run = Run.get_context()

# load the diabetes data (passed as an input dataset)
print("Loading Data...")
diabetes = run.input_datasets['diabetes_train'].to_pandas_dataframe()

# Separate features and labels
X, y = diabetes[['Pregnancies','PlasmaGlucose','DiastolicBloodPressure','TricepsThickness','SerumInsulin','BMI','DiabetesPedigree','Age']].values, diabetes['Diabetic'].values

# Split data into training set and test set
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.30, random_state=0)

# Train adecision tree model
print('Training a decision tree model')
model = DecisionTreeClassifier().fit(X_train, y_train)

# calculate accuracy
y_hat = model.predict(X_test)
acc = np.average(y_hat == y_test)
print('Accuracy:', acc)
run.log('Accuracy', np.float(acc))

# calculate AUC
y_scores = model.predict_proba(X_test)
auc = roc_auc_score(y_test,y_scores[:,1])
print('AUC: ' + str(auc))
run.log('AUC', np.float(auc))

# plot ROC curve
fpr, tpr, thresholds = roc_curve(y_test, y_scores[:,1])
fig = plt.figure(figsize=(6, 4))
# Plot the diagonal 50% line
plt.plot([0, 1], [0, 1], 'k--')
# Plot the FPR and TPR achieved by our model
plt.plot(fpr, tpr)
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('ROC Curve')
run.log_image(name = "ROC", plot = fig)
plt.show()

# Save the trained model
os.makedirs(output_folder, exist_ok=True)
output_path = output_folder + "/model.pkl"
joblib.dump(value=model, filename=output_path)

run.complete()

Writing diabetes_pipeline/train_diabetes.py


The script for the second step of the pipeline will load the model from where it was saved, and then register it in the workspace. It includes a single **model_folder** parameter that contains the path where the model was saved.

In [5]:
%%writefile $experiment_folder/register_diabetes.py
# Import libraries
import argparse
import joblib
from azureml.core import Workspace, Model, Run

# Get parameters
parser = argparse.ArgumentParser()
parser.add_argument('--model_folder', type=str, dest='model_folder', default="diabetes_model", help='model location')
args = parser.parse_args()
model_folder = args.model_folder

# Get the experiment run context
run = Run.get_context()

# load the model
print("Loading model from " + model_folder)
model_file = model_folder + "/model.pkl"
model = joblib.load(model_file)

Model.register(workspace=run.experiment.workspace,
               model_path = model_file,
               model_name = 'diabetes_model',
               tags={'Training context':'Pipeline'})

run.complete()

Writing diabetes_pipeline/register_diabetes.py


## Prepare a Compute Environment for the Pipeline Steps

In this exercise, you'll use the same compute for both steps, but it's important to realize that each step is run independently; so you could specify different compute contexts for each step if appropriate.

First, get the compute target you created in a previous lab (if it doesn't exist, it will be created).

> **Important**: Change *your-compute-cluster* to the name of your compute cluster in the code below before running it! Cluster names must be globally unique names between 2 to 16 characters in length. Valid characters are letters, digits, and the - character.

In [8]:
from azureml.core.compute import ComputeTarget, AmlCompute
from azureml.core.compute_target import ComputeTargetException

cluster_name = "B-ML3-CmpCluster"

try:
    # Check for existing compute target
    pipeline_cluster = ComputeTarget(workspace=ws, name=cluster_name)
    print('Found existing cluster, use it.')
except ComputeTargetException:
    # If it doesn't already exist, create it
    try:
        compute_config = AmlCompute.provisioning_configuration(vm_size='STANDARD_DS11_V2', max_nodes=2)
        pipeline_cluster = ComputeTarget.create(ws, cluster_name, compute_config)
        pipeline_cluster.wait_for_completion(show_output=True)
    except Exception as ex:
        print(ex)
    

Found existing cluster, use it.


The compute will require a Python environment with the necessary package dependencies installed, so you'll need to create a run configuration.

In [9]:
from azureml.core import Environment
from azureml.core.conda_dependencies import CondaDependencies
from azureml.core.runconfig import RunConfiguration

# Create a Python environment for the experiment
diabetes_env = Environment("diabetes-experiment-env")
diabetes_env.python.user_managed_dependencies = False # Let Azure ML manage dependencies
diabetes_env.docker.enabled = True # Use a docker container

# Create a set of package dependencies
diabetes_packages = CondaDependencies.create(conda_packages=['scikit-learn','ipykernel','matplotlib', 'pandas'],
                                             pip_packages=['azureml-defaults','azureml-dataprep[pandas]','pyarrow'])

# Add the dependencies to the environment
diabetes_env.python.conda_dependencies = diabetes_packages

# Register the environment (just in case previous lab wasn't completed)
diabetes_env.register(workspace=ws)
registered_env = Environment.get(ws, 'diabetes-experiment-env')

# Create a new runconfig object for the pipeline
pipeline_run_config = RunConfiguration()

# Use the compute you created above. 
pipeline_run_config.target = pipeline_cluster

# Assign the environment to the run configuration
pipeline_run_config.environment = registered_env

print ("Run configuration created.")

Run configuration created.


## Create and Run a Pipeline

Now you're ready to create and run a pipeline.

First you need to define the steps for the pipeline, and any data references that need to passed between them. In this case, the first step must write the model to a folder that can be read from by the second step. Since the steps will be run on remote compute (and in fact, could each be run on different compute), the folder path must be passed as a data reference to a location in a datastore within the workspace. The **PipelineData** object is a special kind of data reference that is used for interim storage locations that can be passed between pipeline steps, so you'll create one and use at as the output for the first step and the input for the second step. Note that you also need to pass it as a script argument so our code can access the datastore location referenced by the data reference.

In [10]:
from azureml.pipeline.core import PipelineData
from azureml.pipeline.steps import PythonScriptStep, EstimatorStep
from azureml.train.estimator import Estimator

# Get the training dataset
diabetes_ds = ws.datasets.get("diabetes dataset")

# Create a PipelineData (temporary Data Reference) for the model folder
model_folder = PipelineData("model_folder", datastore=ws.get_default_datastore())

estimator = Estimator(source_directory=experiment_folder,
                        compute_target = pipeline_cluster,
                        environment_definition=pipeline_run_config.environment,
                        entry_script='train_diabetes.py')

train_step = EstimatorStep(name = "Train Model",
                           estimator=estimator, 
                           estimator_entry_script_arguments=['--output_folder', model_folder],
                           inputs=[diabetes_ds.as_named_input('diabetes_train')],
                           outputs=[model_folder],
                           compute_target = pipeline_cluster,
                           allow_reuse = True)

# Step 2, run the model registration script
register_step = PythonScriptStep(name = "Register Model",
                                source_directory = experiment_folder,
                                script_name = "register_diabetes.py",
                                arguments = ['--model_folder', model_folder],
                                inputs=[model_folder],
                                compute_target = pipeline_cluster,
                                runconfig = pipeline_run_config,
                                allow_reuse = True)

print("Pipeline steps defined")

Pipeline steps defined


OK, you're ready build the pipeline from the steps you've defined and run it as an experiment.

In [11]:
from azureml.core import Experiment
from azureml.pipeline.core import Pipeline

# Construct the pipeline
pipeline_steps = [train_step, register_step]
pipeline = Pipeline(workspace = ws, steps=pipeline_steps)
print("Pipeline is built.")

# Create an experiment and run the pipeline
experiment = Experiment(workspace = ws, name = 'diabetes-training-pipeline')
pipeline_run = experiment.submit(pipeline, regenerate_outputs=True)
print("Pipeline submitted for execution.")
pipeline_run.wait_for_completion(show_output=True)

Pipeline is built.
Created step Train Model [4b69b241][230c85c3-a426-4786-81c9-bb8b26d19599], (This step will run and generate new outputs)
Created step Register Model [01aafa58][5448278e-926d-42a5-9ee1-1ba3ab98344a], (This step will run and generate new outputs)
Submitted PipelineRun 3b8d6a6c-e927-4e07-9905-d3fb38fa4a15
Link to Azure Machine Learning Portal: https://ml.azure.com/experiments/diabetes-training-pipeline/runs/3b8d6a6c-e927-4e07-9905-d3fb38fa4a15?wsid=/subscriptions/94670d96-c488-4d89-a5e8-ea4fef48bf6a/resourcegroups/MS_SolutionCenter/workspaces/BSM_MLWorkspace3
Pipeline submitted for execution.
PipelineRunId: 3b8d6a6c-e927-4e07-9905-d3fb38fa4a15
Link to Azure Machine Learning Portal: https://ml.azure.com/experiments/diabetes-training-pipeline/runs/3b8d6a6c-e927-4e07-9905-d3fb38fa4a15?wsid=/subscriptions/94670d96-c488-4d89-a5e8-ea4fef48bf6a/resourcegroups/MS_SolutionCenter/workspaces/BSM_MLWorkspace3
PipelineRun Status: NotStarted
PipelineRun Status: Running


StepRunId: 3


pyparsing-2.4.7      | 64 KB     |            |   0% 
pyparsing-2.4.7      | 64 KB     | ########## | 100% 

matplotlib-3.3.1     | 24 KB     |            |   0% 
matplotlib-3.3.1     | 24 KB     | ########## | 100% 

glib-2.56.2          | 5.0 MB    |            |   0% 
glib-2.56.2          | 5.0 MB    | ########## | 100% 

joblib-0.16.0        | 210 KB    |            |   0% 
joblib-0.16.0        | 210 KB    | ########## | 100% 

expat-2.2.9          | 196 KB    |            |   0% 
expat-2.2.9          | 196 KB    | ########## | 100% 

intel-openmp-2020.2  | 947 KB    |            |   0% 
intel-openmp-2020.2  | 947 KB    | ########## | 100% 

zlib-1.2.11          | 120 KB    |            |   0% 
zlib-1.2.11          | 120 KB    | ########## | 100% 

wcwidth-0.2.5        | 37 KB     |            |   0% 
wcwidth-0.2.5        | 37 KB     | ########## | 100% 

libtiff-4.1.0        | 607 KB    |            |   0% 
libtiff-4.1.0        | 607 KB    | ########## | 100% 

ipykernel-5.3.4   

Verifying transaction: ...working... done
Executing transaction: ...working... done
Ran pip subprocess with arguments:
['/azureml-envs/azureml_89bb01b798b4fa58c7defa99b5a77fbf/bin/python', '-m', 'pip', 'install', '-U', '-r', '/azureml-environment-setup/condaenv.en587qx8.requirements.txt']
Pip subprocess output:
Collecting azureml-defaults~=1.13.0
  Downloading azureml_defaults-1.13.0-py3-none-any.whl (3.0 kB)
Collecting azureml-dataprep[pandas]
  Downloading azureml_dataprep-2.1.0-py3-none-any.whl (28.2 MB)
Collecting pyarrow
  Downloading pyarrow-1.0.1-cp36-cp36m-manylinux2014_x86_64.whl (17.3 MB)
Collecting applicationinsights>=0.11.7
  Downloading applicationinsights-0.11.9-py2.py3-none-any.whl (58 kB)
Collecting configparser==3.7.4
  Downloading configparser-3.7.4-py2.py3-none-any.whl (22 kB)
Collecting azureml-dataset-runtime[fuse]~=1.13.0
  Downloading azureml_dataset_runtime-1.13.0-py3-none-any.whl (3.2 kB)
Collecting azureml-core~=1.13.0
  Downloading azureml_core-1.13.0-py3-no

Removing intermediate container 64240947f02e
 ---> 8c1a879a0f64
Step 9/15 : ENV PATH /azureml-envs/azureml_89bb01b798b4fa58c7defa99b5a77fbf/bin:$PATH
 ---> Running in 1cf43bbef548
Removing intermediate container 1cf43bbef548
 ---> 9e1199e71059
Step 10/15 : ENV AZUREML_CONDA_ENVIRONMENT_PATH /azureml-envs/azureml_89bb01b798b4fa58c7defa99b5a77fbf
 ---> Running in a34a0b4dbffe
Removing intermediate container a34a0b4dbffe
 ---> 4f7aef77ff4d
Step 11/15 : ENV LD_LIBRARY_PATH /azureml-envs/azureml_89bb01b798b4fa58c7defa99b5a77fbf/lib:$LD_LIBRARY_PATH
 ---> Running in 514e7f51b0e3
Removing intermediate container 514e7f51b0e3
 ---> b98299a891e6
Step 12/15 : COPY azureml-environment-setup/spark_cache.py azureml-environment-setup/log4j.properties /azureml-environment-setup/
 ---> 1808f9da789e
Step 13/15 : RUN if [ $SPARK_HOME ]; then /bin/bash -c '$SPARK_HOME/bin/spark-submit  /azureml-environment-setup/spark_cache.py'; fi
 ---> Running in e09db264157b
Removing intermediate container e09db264157b

Training a decision tree model
Accuracy: 0.8993333333333333
AUC: 0.8853989246160638

Streaming azureml-logs/75_job_post-tvmps_b9e76a58abcac63e8b113b9a905ec63e78c7dcc135972768cc552dd55d06dbfd_d.txt
Entering job release. Current time:2020-09-03T03:39:35.707031
Starting job release. Current time:2020-09-03T03:39:36.679804
Logging experiment finalizing status in history service.
Starting the daemon thread to refresh tokens in background for process with pid = 390
[2020-09-03T03:39:36.680944] job release stage : upload_datastore starting...
[{}] job release stage : start importing azureml.history._tracking in run_history_release.
[2020-09-03T03:39:36.688587] job release stage : execute_job_release starting...
[2020-09-03T03:39:36.689549] job release stage : copy_batchai_cached_logs starting...
[2020-09-03T03:39:36.689879] job release stage : copy_batchai_cached_logs completed...
[2020-09-03T03:39:36.690270] Entering context manager injector.
[2020-09-03T03:39:36.918418] job release stage : 




StepRunId: 8a031ad4-c291-4085-a340-3f9fbfa06b02
Link to Azure Machine Learning Portal: https://ml.azure.com/experiments/diabetes-training-pipeline/runs/8a031ad4-c291-4085-a340-3f9fbfa06b02?wsid=/subscriptions/94670d96-c488-4d89-a5e8-ea4fef48bf6a/resourcegroups/MS_SolutionCenter/workspaces/BSM_MLWorkspace3
StepRun( Register Model ) Status: NotStarted
StepRun( Register Model ) Status: Running

Streaming azureml-logs/55_azureml-execution-tvmps_b9e76a58abcac63e8b113b9a905ec63e78c7dcc135972768cc552dd55d06dbfd_d.txt
2020-09-03T03:40:14Z Starting output-watcher...
2020-09-03T03:40:14Z IsDedicatedCompute == True, won't poll for Low Pri Preemption
2e2810d38d3b84b7b65958c3c218d774b9cd031fec74667ebdcca8f4279716ba
2020/09/03 03:40:15 setuptask.go:353: Starting App Insight Logger for task:  containerSetup
2020/09/03 03:40:15 logger.go:293: Version: 3.0.01321.0003 Branch: .SourceBranch Commit: 8fd8782
2020/09/03 03:40:15 utils.go:302: /dev/infiniband/uverbs0 found (implying presence of InfiniBand


Streaming azureml-logs/75_job_post-tvmps_b9e76a58abcac63e8b113b9a905ec63e78c7dcc135972768cc552dd55d06dbfd_d.txt
Entering job release. Current time:2020-09-03T03:40:32.725998
Starting job release. Current time:2020-09-03T03:40:33.694042
Logging experiment finalizing status in history service.
Starting the daemon thread to refresh tokens in background for process with pid = 159
[2020-09-03T03:40:33.704312] job release stage : upload_datastore starting...
[{}] job release stage : start importing azureml.history._tracking in run_history_release.
[2020-09-03T03:40:33.705055] job release stage : execute_job_release starting...
[2020-09-03T03:40:33.707171] job release stage : copy_batchai_cached_logs starting...
[2020-09-03T03:40:33.712437] job release stage : copy_batchai_cached_logs completed...
[2020-09-03T03:40:33.713487] Entering context manager injector.
[2020-09-03T03:40:34.023364] job release stage : upload_datastore completed...
[2020-09-03T03:40:34.096601] job release stage : send_



PipelineRun Execution Summary
PipelineRun Status: Finished
{'runId': '3b8d6a6c-e927-4e07-9905-d3fb38fa4a15', 'status': 'Completed', 'startTimeUtc': '2020-09-03T03:25:35.111762Z', 'endTimeUtc': '2020-09-03T03:40:51.17896Z', 'properties': {'azureml.runsource': 'azureml.PipelineRun', 'runSource': 'SDK', 'runType': 'SDK', 'azureml.parameters': '{}'}, 'inputDatasets': [], 'outputDatasets': [], 'logFiles': {'logs/azureml/executionlogs.txt': 'https://bsmmlworkspace6998297976.blob.core.windows.net/azureml/ExperimentRun/dcid.3b8d6a6c-e927-4e07-9905-d3fb38fa4a15/logs/azureml/executionlogs.txt?sv=2019-02-02&sr=b&sig=IWxa2y6xe%2Bn1V9MR5TEV2iRTCiDMB3huDuqnRDBcxGI%3D&st=2020-09-03T03%3A30%3A52Z&se=2020-09-03T11%3A40%3A52Z&sp=r', 'logs/azureml/stderrlogs.txt': 'https://bsmmlworkspace6998297976.blob.core.windows.net/azureml/ExperimentRun/dcid.3b8d6a6c-e927-4e07-9905-d3fb38fa4a15/logs/azureml/stderrlogs.txt?sv=2019-02-02&sr=b&sig=1OJ3QgKRsTh3VOVSYzXsGZKWPmfzyD9FXWMXtvHcVh4%3D&st=2020-09-03T03%3A30%3A

'Finished'

The output from the pipeline experiment will be displayed as it runs. keep an eye on the kernel indicator at the top right of the page, when it turns from **&#9899;** to **&#9711;**, the code has finished running. You can also monitor pipeline runs in the **Experiments** page in [Azure Machine Learning studio](https://ml.azure.com).

When the pipeline has finished, a new model should be registered with a *Training context* tag indicating it was trained in a pipeline. Run the following code to verify this.

In [12]:
from azureml.core import Model

for model in Model.list(ws):
    print(model.name, 'version:', model.version)
    for tag_name in model.tags:
        tag = model.tags[tag_name]
        print ('\t',tag_name, ':', tag)
    for prop_name in model.properties:
        prop = model.properties[prop_name]
        print ('\t',prop_name, ':', prop)
    print('\n')

diabetes_model version: 9
	 Training context : Pipeline


diabetes_model version: 8
	 Training context : Azure ML compute
	 AUC : 0.8863989679711839
	 Accuracy : 0.9004444444444445


diabetes_model version: 7
	 Training context : Estimator + Environment (Decision Tree)
	 AUC : 0.8822545724167647
	 Accuracy : 0.8968888888888888


diabetes_model version: 6
	 Training context : Estimator + Environment (Decision Tree)
	 AUC : 0.8841057918016353
	 Accuracy : 0.8997777777777778


diabetes_model version: 5
	 Training context : SKLearn Estimator (file dataset)
	 AUC : 0.8568655044545174
	 Accuracy : 0.7893333333333333


diabetes_model version: 4
	 Training context : SKLearn Estimator (tabular dataset)
	 AUC : 0.8568613016622707
	 Accuracy : 0.7891111111111111


diabetes_model version: 3
	 Training context : Using Datastore
	 AUC : 0.8568655044545174
	 Accuracy : 0.7893333333333333


diabetes_model version: 2
	 Training context : Parameterized SKLearn Estimator
	 AUC : 0.8483904671874223
	 Accu

This is a simple example, designed to demonstrate the principle. In reality, you could build more sophisticated logic into the pipeline steps - for example, evaluating the model against some test data to calculate a performance metric like AUC or accuracy, comparing the metric to that of any previously registered versions of the model, and only registering the new model if it performs better.

You can use the [Azure Machine Learning extension for Azure DevOps](https://marketplace.visualstudio.com/items?itemName=ms-air-aiagility.vss-services-azureml) to combine Azure ML pipelines with Azure DevOps pipelines (yes, it *is* confusing that they have the same name!) and integrate model retraining into a *continuous integration/continuous deployment (CI/CD)* process. For example you could use an Azure DevOps *build* pipeline to trigger an Azure ML pipeline that trains and registers a model, and when the model is registered it could trigger an Azure Devops *release* pipeline that deploys the model as a web service, along with the application or service that consumes the model.