# Create a Pipeline

You can perform the various steps required to ingest data, train a model, and register the model individually by using the Azure ML SDK to run script-based experiments. However, in an enterprise environment it is common to encapsulate the sequence of discrete steps required to build a machine learning solution into a *pipeline* that can be run on one or more compute targets; either on-demand by a user, from an automated build process, or on a schedule.

In this notebook, you'll bring together all of these elements to create a simple pipeline that pre-processes data and then trains and registers a model.

## Connect to your workspace

To get started, connect to your workspace.

> **Note**: If you haven't already established an authenticated session with your Azure subscription, you'll be prompted to authenticate by clicking a link, entering an authentication code, and signing into Azure.

In [1]:
import azureml.core
from azureml.core import Workspace

# Load the workspace from the saved config file
ws = Workspace.from_config()
print('Ready to use Azure ML {} to work with {}'.format(azureml.core.VERSION, ws.name))

Ready to use Azure ML 1.44.0 to work with mllearningworkspace


## Prepare data

In your pipeline, you'll use a dataset containing details of diabetes patients. Run the cell below to create this dataset (if you created it previously, the code will find the existing version)

In [2]:
from azureml.core import Dataset
from azureml.data.datapath import DataPath

default_ds = ws.get_default_datastore()

if 'diabetes dataset' not in ws.datasets:
    Dataset.File.upload_directory(src_dir='data',
                              target=DataPath(default_ds, 'diabetes-data/')
                              )

    #Create a tabular dataset from the path on the datastore (this may take a short while)
    tab_data_set = Dataset.Tabular.from_delimited_files(path=(default_ds, 'diabetes-data/*.csv'))

    # Register the tabular dataset
    try:
        tab_data_set = tab_data_set.register(workspace=ws, 
                                name='diabetes dataset',
                                description='diabetes data',
                                tags = {'format':'CSV'},
                                create_new_version=True)
        print('Dataset registered.')
    except Exception as ex:
        print(ex)
else:
    print('Dataset already registered.')

Dataset already registered.


## Create scripts for pipeline steps

Pipelines consist of one or more *steps*, which can be Python scripts, or specialized steps like a data transfer step that copies data from one location to another. Each step can run in its own compute context. In this exercise, you'll build a simple pipeline that contains two Python script steps: one to pre-process some training data, and another to use the pre-processed data to train and register a model.

First, let's create a folder for the script files we'll use in the pipeline steps.

In [3]:
import os
# Create a folder for the pipeline step files
experiment_folder = 'diabetes_pipeline'
os.makedirs(experiment_folder, exist_ok=True)

print(experiment_folder)

diabetes_pipeline


Now let's create the first script, which will read data from the diabetes dataset and apply some simple pre-processing to remove any rows with missing data and normalize the numeric features so they're on a similar scale.

The script includes a argument named **--prepped-data**, which references the folder where the resulting data should be saved.

In [4]:
%%writefile $experiment_folder/prep_diabetes.py
# Import libraries
import os
import argparse
import pandas as pd
from azureml.core import Run
from sklearn.preprocessing import MinMaxScaler

# Get parameters
parser = argparse.ArgumentParser()
parser.add_argument("--input-data", type=str, dest='raw_dataset_id', help='raw dataset')
parser.add_argument('--prepped-data', type=str, dest='prepped_data', default='prepped_data', help='Folder for results')
args = parser.parse_args()
save_folder = args.prepped_data

# Get the experiment run context
run = Run.get_context()

# load the data (passed as an input dataset)
print("Loading Data...")
diabetes = run.input_datasets['raw_data'].to_pandas_dataframe()

# Log raw row count
row_count = (len(diabetes))
run.log('raw_rows', row_count)

# remove nulls
diabetes = diabetes.dropna()

# Normalize the numeric columns
scaler = MinMaxScaler()
num_cols = ['Pregnancies','PlasmaGlucose','DiastolicBloodPressure','TricepsThickness','SerumInsulin','BMI','DiabetesPedigree']
diabetes[num_cols] = scaler.fit_transform(diabetes[num_cols])

# Log processed rows
row_count = (len(diabetes))
run.log('processed_rows', row_count)

# Save the prepped data
print("Saving Data...")
os.makedirs(save_folder, exist_ok=True)
save_path = os.path.join(save_folder,'data.csv')
diabetes.to_csv(save_path, index=False, header=True)

# End the run
run.complete()

Writing diabetes_pipeline/prep_diabetes.py


Now you can create the script for the second step, which will train a model. The script includes a argument named **--training-data**, which references the location where the prepared data was saved by the previous step.

In [5]:
%%writefile $experiment_folder/train_diabetes.py
# Import libraries
from azureml.core import Run, Model
import argparse
import pandas as pd
import numpy as np
import joblib
import os
from sklearn.model_selection import train_test_split
from sklearn.tree import DecisionTreeClassifier
from sklearn.metrics import roc_auc_score
from sklearn.metrics import roc_curve
import matplotlib.pyplot as plt

# Get parameters
parser = argparse.ArgumentParser()
parser.add_argument("--training-data", type=str, dest='training_data', help='training data')
args = parser.parse_args()
training_data = args.training_data

# Get the experiment run context
run = Run.get_context()

# load the prepared data file in the training folder
print("Loading Data...")
file_path = os.path.join(training_data,'data.csv')
diabetes = pd.read_csv(file_path)

# Separate features and labels
X, y = diabetes[['Pregnancies','PlasmaGlucose','DiastolicBloodPressure','TricepsThickness','SerumInsulin','BMI','DiabetesPedigree','Age']].values, diabetes['Diabetic'].values

# Split data into training set and test set
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.30, random_state=0)

# Train adecision tree model
print('Training a decision tree model...')
model = DecisionTreeClassifier().fit(X_train, y_train)

# calculate accuracy
y_hat = model.predict(X_test)
acc = np.average(y_hat == y_test)
print('Accuracy:', acc)
run.log('Accuracy', np.float(acc))

# calculate AUC
y_scores = model.predict_proba(X_test)
auc = roc_auc_score(y_test,y_scores[:,1])
print('AUC: ' + str(auc))
run.log('AUC', np.float(auc))

# plot ROC curve
fpr, tpr, thresholds = roc_curve(y_test, y_scores[:,1])
fig = plt.figure(figsize=(6, 4))
# Plot the diagonal 50% line
plt.plot([0, 1], [0, 1], 'k--')
# Plot the FPR and TPR achieved by our model
plt.plot(fpr, tpr)
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('ROC Curve')
run.log_image(name = "ROC", plot = fig)
plt.show()

# Save the trained model in the outputs folder
print("Saving model...")
os.makedirs('outputs', exist_ok=True)
model_file = os.path.join('outputs', 'diabetes_model.pkl')
joblib.dump(value=model, filename=model_file)

# Register the model
print('Registering model...')
Model.register(workspace=run.experiment.workspace,
               model_path = model_file,
               model_name = 'diabetes_model',
               tags={'Training context':'Pipeline'},
               properties={'AUC': np.float(auc), 'Accuracy': np.float(acc)})


run.complete()

Writing diabetes_pipeline/train_diabetes.py


## Prepare a compute environment for the pipeline steps

In this exercise, you'll use the same compute for both steps, but it's important to realize that each step is run independently; so you could specify different compute contexts for each step if appropriate.

First, get the compute target you created in a previous lab (if it doesn't exist, it will be created).

> **Important**: Change *your-compute-cluster* to the name of your compute cluster in the code below before running it! Cluster names must be globally unique names between 2 to 16 characters in length. Valid characters are letters, digits, and the - character.

In [6]:
from azureml.core.compute import ComputeTarget, AmlCompute
from azureml.core.compute_target import ComputeTargetException

cluster_name = "my-compute-cluster"

try:
    # Check for existing compute target
    pipeline_cluster = ComputeTarget(workspace=ws, name=cluster_name)
    print('Found existing cluster, use it.')
except ComputeTargetException:
    # If it doesn't already exist, create it
    try:
        compute_config = AmlCompute.provisioning_configuration(vm_size='STANDARD_DS11_V2', max_nodes=2)
        pipeline_cluster = ComputeTarget.create(ws, cluster_name, compute_config)
        pipeline_cluster.wait_for_completion(show_output=True)
    except Exception as ex:
        print(ex)
    

Found existing cluster, use it.


> **Note**: Compute instances and clusters are based on standard Azure virtual machine images. For this exercise, the *Standard_DS11_v2* image is recommended to achieve the optimal balance of cost and performance. If your subscription has a quota that does not include this image, choose an alternative image; but bear in mind that a larger image may incur higher cost and a smaller image may not be sufficient to complete the tasks. Alternatively, ask your Azure administrator to extend your quota.

The compute will require a Python environment with the necessary package dependencies installed.

In [7]:
%%writefile $experiment_folder/experiment_env.yml
name: experiment_env
dependencies:
- python=3.6.2
- scikit-learn
- ipykernel
- matplotlib
- pandas
- pip
- pip:
  - azureml-defaults
  - pyarrow

Writing diabetes_pipeline/experiment_env.yml


Now that you have a Conda configuration file, you can create an environment and use it in the run configuration for the pipeline.

In [8]:
from azureml.core import Environment
from azureml.core.runconfig import RunConfiguration

# Create a Python environment for the experiment (from a .yml file)
experiment_env = Environment.from_conda_specification("experiment_env", experiment_folder + "/experiment_env.yml")

# Register the environment 
experiment_env.register(workspace=ws)
registered_env = Environment.get(ws, 'experiment_env')

# Create a new runconfig object for the pipeline
pipeline_run_config = RunConfiguration()

# Use the compute you created above. 
pipeline_run_config.target = pipeline_cluster

# Assign the environment to the run configuration
pipeline_run_config.environment = registered_env

print ("Run configuration created.")

Run configuration created.


## Create and run a pipeline

Now you're ready to create and run a pipeline.

First you need to define the steps for the pipeline, and any data references that need to be passed between them. In this case, the first step must write the prepared data to a folder that can be read from by the second step. Since the steps will be run on remote compute (and in fact, could each be run on different compute), the folder path must be passed as a data reference to a location in a datastore within the workspace. The **OutputFileDatasetConfig** object is a special kind of data reference that is used for interim storage locations that can be passed between pipeline steps, so you'll create one and use at as the output for the first step and the input for the second step. Note that you need to pass it as a script argument so your code can access the datastore location referenced by the data reference.

In [9]:
from azureml.data import OutputFileDatasetConfig
from azureml.pipeline.steps import PythonScriptStep

# Get the training dataset
diabetes_ds = ws.datasets.get("diabetes dataset")

# Create an OutputFileDatasetConfig (temporary Data Reference) for data passed from step 1 to step 2
prepped_data = OutputFileDatasetConfig("prepped_data")

# Step 1, Run the data prep script
prep_step = PythonScriptStep(name = "Prepare Data",
                                source_directory = experiment_folder,
                                script_name = "prep_diabetes.py",
                                arguments = ['--input-data', diabetes_ds.as_named_input('raw_data'),
                                             '--prepped-data', prepped_data],
                                compute_target = pipeline_cluster,
                                runconfig = pipeline_run_config,
                                allow_reuse = True)

# Step 2, run the training script
train_step = PythonScriptStep(name = "Train and Register Model",
                                source_directory = experiment_folder,
                                script_name = "train_diabetes.py",
                                arguments = ['--training-data', prepped_data.as_input()],
                                compute_target = pipeline_cluster,
                                runconfig = pipeline_run_config,
                                allow_reuse = True)

print("Pipeline steps defined")

Pipeline steps defined


OK, you're ready build the pipeline from the steps you've defined and run it as an experiment.

In [10]:
from azureml.core import Experiment
from azureml.pipeline.core import Pipeline
from azureml.widgets import RunDetails

# Construct the pipeline
pipeline_steps = [prep_step, train_step]
pipeline = Pipeline(workspace=ws, steps=pipeline_steps)
print("Pipeline is built.")

# Create an experiment and run the pipeline
experiment = Experiment(workspace=ws, name = 'mslearn-diabetes-pipeline')
pipeline_run = experiment.submit(pipeline, regenerate_outputs=True)
print("Pipeline submitted for execution.")
RunDetails(pipeline_run).show()
pipeline_run.wait_for_completion(show_output=True)

Pipeline is built.
Created step Prepare Data [97b0da5c][2b7ad44b-064f-4bb5-92ba-a7504eb71fd9], (This step will run and generate new outputs)
Created step Train and Register Model [af939e74][1238808b-3f9e-4268-b117-83ef2d388f7a], (This step will run and generate new outputs)
Submitted PipelineRun 2b657331-8f19-4048-8743-5054fe89b53d
Link to Azure Machine Learning Portal: https://ml.azure.com/runs/2b657331-8f19-4048-8743-5054fe89b53d?wsid=/subscriptions/376621af-6a06-4df6-b902-d25c6239f015/resourcegroups/learning/workspaces/mllearningworkspace&tid=16b3c013-d300-468d-ac64-7eda0820b6d3
Pipeline submitted for execution.


_PipelineWidget(widget_settings={'childWidgetDisplay': 'popup', 'send_telemetry': False, 'log_level': 'INFO', …

PipelineRunId: 2b657331-8f19-4048-8743-5054fe89b53d
Link to Azure Machine Learning Portal: https://ml.azure.com/runs/2b657331-8f19-4048-8743-5054fe89b53d?wsid=/subscriptions/376621af-6a06-4df6-b902-d25c6239f015/resourcegroups/learning/workspaces/mllearningworkspace&tid=16b3c013-d300-468d-ac64-7eda0820b6d3
PipelineRun Status: Running


StepRunId: 6d2cbcab-d186-4752-9f45-19ed7d0048e2
Link to Azure Machine Learning Portal: https://ml.azure.com/runs/6d2cbcab-d186-4752-9f45-19ed7d0048e2?wsid=/subscriptions/376621af-6a06-4df6-b902-d25c6239f015/resourcegroups/learning/workspaces/mllearningworkspace&tid=16b3c013-d300-468d-ac64-7eda0820b6d3
StepRun( Prepare Data ) Status: NotStarted
StepRun( Prepare Data ) Status: Running

StepRun(Prepare Data) Execution Summary
StepRun( Prepare Data ) Status: Finished
{'runId': '6d2cbcab-d186-4752-9f45-19ed7d0048e2', 'target': 'my-compute-cluster', 'status': 'Completed', 'startTimeUtc': '2022-12-13T11:46:31.437788Z', 'endTimeUtc': '2022-12-13T11:48:05.016536Z'

'Finished'

A graphical representation of the pipeline experiment will be displayed in the widget as it runs. Keep an eye on the kernel indicator at the top right of the page, when it turns from **&#9899;** to **&#9711;**, the code has finished running. You can also monitor pipeline runs in the **Experiments** page in [Azure Machine Learning studio](https://ml.azure.com).

When the pipeline has finished, you can examine the metrics recorded by it's child runs.

In [11]:
for run in pipeline_run.get_children():
    print(run.name, ':')
    metrics = run.get_metrics()
    for metric_name in metrics:
        print('\t',metric_name, ":", metrics[metric_name])

Train and Register Model :
	 Accuracy : 0.8988888888888888
	 AUC : 0.8834359994372682
	 ROC : aml://artifactId/ExperimentRun/dcid.614dbebe-96c5-4ac1-9ad0-43818b9f6538/ROC_1670932102.png
Prepare Data :
	 raw_rows : 15000
	 processed_rows : 15000


Assuming the pipeline was successful, a new model should be registered with a *Training context* tag indicating it was trained in a pipeline. Run the following code to verify this.

In [12]:
from azureml.core import Model

for model in Model.list(ws):
    print(model.name, 'version:', model.version)
    for tag_name in model.tags:
        tag = model.tags[tag_name]
        print ('\t',tag_name, ':', tag)
    for prop_name in model.properties:
        prop = model.properties[prop_name]
        print ('\t',prop_name, ':', prop)
    print('\n')

diabetes_model version: 6
	 Training context : Pipeline
	 AUC : 0.8834359994372682
	 Accuracy : 0.8988888888888888


diabetes_model version: 5
	 Training context : Compute cluster
	 AUC : 0.8814452031098893
	 Accuracy : 0.8971111111111111


diabetes_model version: 4
	 Training context : File dataset
	 AUC : 0.8468514889078735
	 Accuracy : 0.7788888888888889


diabetes_model version: 3
	 Training context : Tabular dataset
	 AUC : 0.8568650620553335
	 Accuracy : 0.7893333333333333


diabetes_model version: 2
	 Training context : Parameterized script
	 AUC : 0.8483198169063138
	 Accuracy : 0.774


diabetes_model version: 1
	 Training context : Script
	 AUC : 0.8484929598487486
	 Accuracy : 0.774




## Publish the pipeline

After you've created and tested a pipeline, you can publish it as a REST service.

In [13]:
# Publish the pipeline from the run
published_pipeline = pipeline_run.publish_pipeline(
    name="diabetes-training-pipeline", description="Trains diabetes model", version="1.0")

published_pipeline

Name,Id,Status,Endpoint
diabetes-training-pipeline,5afb8457-0092-4fb1-90b8-87f52f00a483,Active,REST Endpoint


Note that the published pipeline has an endpoint, which you can see in the **Endpoints** page (on the **Pipeline Endpoints** tab) in [Azure Machine Learning studio](https://ml.azure.com). You can also find its URI as a property of the published pipeline object:

In [14]:
rest_endpoint = published_pipeline.endpoint
print(rest_endpoint)

https://westeurope.api.azureml.ms/pipelines/v1.0/subscriptions/376621af-6a06-4df6-b902-d25c6239f015/resourceGroups/learning/providers/Microsoft.MachineLearningServices/workspaces/mllearningworkspace/PipelineRuns/PipelineSubmit/5afb8457-0092-4fb1-90b8-87f52f00a483


## Call the pipeline endpoint

To use the endpoint, client applications need to make a REST call over HTTP. This request must be authenticated, so an authorization header is required. A real application would require a service principal with which to be authenticated, but to test this out, we'll use the authorization header from your current connection to your Azure workspace, which you can get using the following code:

In [15]:
from azureml.core.authentication import InteractiveLoginAuthentication

interactive_auth = InteractiveLoginAuthentication()
auth_header = interactive_auth.get_authentication_header()
print("Authentication header ready.")

Authentication header ready.


Now we're ready to call the REST interface. The pipeline runs asynchronously, so we'll get an identifier back, which we can use to track the pipeline experiment as it runs:

In [16]:
import requests

experiment_name = 'mslearn-diabetes-pipeline'

rest_endpoint = published_pipeline.endpoint
response = requests.post(rest_endpoint, 
                         headers=auth_header, 
                         json={"ExperimentName": experiment_name})
run_id = response.json()["Id"]
run_id

'16a4fd4f-71dc-4a6d-a26c-3c95bbb92f10'

Since you have the run ID, you can use it to wait for the run to complete.

> **Note**: The pipeline should complete quickly, because each step was configured to allow output reuse. This was done primarily for convenience and to save time in this course. In reality, you'd likely want the first step to run every time in case the data has changed, and trigger the subsequent steps only if the output from step one changes.

In [17]:
from azureml.pipeline.core.run import PipelineRun

published_pipeline_run = PipelineRun(ws.experiments[experiment_name], run_id)
published_pipeline_run.wait_for_completion(show_output=True)

PipelineRunId: 16a4fd4f-71dc-4a6d-a26c-3c95bbb92f10
Link to Azure Machine Learning Portal: https://ml.azure.com/runs/16a4fd4f-71dc-4a6d-a26c-3c95bbb92f10?wsid=/subscriptions/376621af-6a06-4df6-b902-d25c6239f015/resourcegroups/learning/workspaces/mllearningworkspace&tid=16b3c013-d300-468d-ac64-7eda0820b6d3
PipelineRun Status: Running


StepRunId: a7b90cbf-4a08-4e75-a535-1278d50bc66c
Link to Azure Machine Learning Portal: https://ml.azure.com/runs/a7b90cbf-4a08-4e75-a535-1278d50bc66c?wsid=/subscriptions/376621af-6a06-4df6-b902-d25c6239f015/resourcegroups/learning/workspaces/mllearningworkspace&tid=16b3c013-d300-468d-ac64-7eda0820b6d3
StepRun( Prepare Data ) Status: NotStarted
StepRun( Prepare Data ) Status: Running

StepRun(Prepare Data) Execution Summary
StepRun( Prepare Data ) Status: Finished
{'runId': 'a7b90cbf-4a08-4e75-a535-1278d50bc66c', 'target': 'my-compute-cluster', 'status': 'Completed', 'startTimeUtc': '2022-12-13T11:51:33.91711Z', 'endTimeUtc': '2022-12-13T11:51:59.691269Z',

'Finished'

## Schedule the Pipeline

Suppose the clinic for the diabetes patients collects new data each week, and adds it to the dataset. You could run the pipeline every week to retrain the model with the new data.

In [18]:
from azureml.pipeline.core import ScheduleRecurrence, Schedule

# Submit the Pipeline every Monday at 00:00 UTC
recurrence = ScheduleRecurrence(frequency="Week", interval=1, week_days=["Monday"], time_of_day="00:00")
weekly_schedule = Schedule.create(ws, name="weekly-diabetes-training", 
                                  description="Based on time",
                                  pipeline_id=published_pipeline.id, 
                                  experiment_name='mslearn-diabetes-pipeline', 
                                  recurrence=recurrence)
print('Pipeline scheduled.')

Pipeline scheduled.


You can retrieve the schedules that are defined in the workspace like this:

In [19]:
schedules = Schedule.list(ws)
schedules

[Pipeline(Name: weekly-diabetes-training,
 Id: 4e5c08d6-78a9-4d29-a44d-ab462308f1c9,
 Status: Active,
 Pipeline Id: 5afb8457-0092-4fb1-90b8-87f52f00a483,
 Pipeline Endpoint Id: None,
 Recurrence Details: Runs at 0:00 on Monday every Week)]

You can check the latest run like this:

In [20]:
pipeline_experiment = ws.experiments.get('mslearn-diabetes-pipeline')
latest_run = list(pipeline_experiment.get_runs())[0]

latest_run.get_details()

{'runId': '504b7c07-3b46-4bb3-aacc-d3e503ab2f12',
 'status': 'Running',
 'startTimeUtc': '2022-12-13T11:53:51.734969Z',
 'services': {},
 'properties': {'azureml.runsource': 'azureml.PipelineRun',
  'runSource': 'Unavailable',
  'runType': 'Schedule',
  'azureml.parameters': '{}',
  'azureml.continue_on_step_failure': 'False',
  'azureml.continue_on_failed_optional_input': 'True',
  'azureml.pipelineid': '5afb8457-0092-4fb1-90b8-87f52f00a483',
  'azureml.pipelineComponent': 'pipelinerun'},
 'inputDatasets': [],
 'outputDatasets': [],
 'logFiles': {'logs/azureml/executionlogs.txt': 'https://mllearningwork3345488027.blob.core.windows.net/azureml/ExperimentRun/dcid.504b7c07-3b46-4bb3-aacc-d3e503ab2f12/logs/azureml/executionlogs.txt?sv=2019-07-07&sr=b&sig=W2Gqx4RA4LizNd9FjFGS7sDHvs%2BH5tw6iZx2e7GoyqU%3D&skoid=530a969d-1f1a-4d2a-9866-6f18d23e261b&sktid=16b3c013-d300-468d-ac64-7eda0820b6d3&skt=2022-12-13T08%3A59%3A06Z&ske=2022-12-14T17%3A09%3A06Z&sks=b&skv=2019-07-07&st=2022-12-13T11%3A44%3A

This is a simple example, designed to demonstrate the principle. In reality, you could build more sophisticated logic into the pipeline steps - for example, evaluating the model against some test data to calculate a performance metric like AUC or accuracy, comparing the metric to that of any previously registered versions of the model, and only registering the new model if it performs better.

You can use the [Azure Machine Learning extension for Azure DevOps](https://marketplace.visualstudio.com/items?itemName=ms-air-aiagility.vss-services-azureml) to combine Azure ML pipelines with Azure DevOps pipelines (yes, it *is* confusing that they have the same name!) and integrate model retraining into a *continuous integration/continuous deployment (CI/CD)* process. For example you could use an Azure DevOps *build* pipeline to trigger an Azure ML pipeline that trains and registers a model, and when the model is registered it could trigger an Azure Devops *release* pipeline that deploys the model as a web service, along with the application or service that consumes the model.