# Create a Pipeline

You can perform the various steps required to ingest data, train a model, and register the model individually by using the Azure ML SDK to run script-based experiments. However, in an enterprise environment it is common to encapsulate the sequence of discrete steps required to build a machine learning solution into a *pipeline* that can be run on one or more compute targets; either on-demand by a user, from an automated build process, or on a schedule.

In this notebook, you'll bring together all of these elements to create a simple pipeline that pre-processes data and then trains and registers a model.

## Connect to your workspace

To get started, connect to your workspace.

> **Note**: If you haven't already established an authenticated session with your Azure subscription, you'll be prompted to authenticate by clicking a link, entering an authentication code, and signing into Azure.

In [2]:
import azureml.core
from azureml.core import Workspace

# Load the workspace from the saved config file
ws = Workspace.from_config()
print('Ready to use Azure ML {} to work with {}'.format(azureml.core.VERSION, ws.name))

Ready to use Azure ML 1.33.0 to work with wsag


## Prepare data

In your pipeline, you'll use a dataset containing details of diabetes patients. Run the cell below to create this dataset (if you created it previously, the code will find the existing version)

In [3]:
from azureml.core import Dataset

default_ds = ws.get_default_datastore()

if 'diabetes dataset' not in ws.datasets:
    default_ds.upload_files(files=['data/diabetes.csv', 'data/diabetes2.csv'], # Upload the diabetes csv files in /data
                        target_path='diabetes-data/', # Put it in a folder path in the datastore
                        overwrite=True, # Replace existing files of the same name
                        show_progress=True)

    #Create a tabular dataset from the path on the datastore (this may take a short while)
    tab_data_set = Dataset.Tabular.from_delimited_files(path=(default_ds, 'diabetes-data/*.csv'))

    # Register the tabular dataset
    try:
        tab_data_set = tab_data_set.register(workspace=ws, 
                                name='diabetes dataset',
                                description='diabetes data',
                                tags = {'format':'CSV'},
                                create_new_version=True)
        print('Dataset registered.')
    except Exception as ex:
        print(ex)
else:
    print('Dataset already registered.')

Uploading an estimated of 2 files
Uploading data/diabetes.csv
Uploaded data/diabetes.csv, 1 files out of an estimated total of 2
Uploading data/diabetes2.csv
Uploaded data/diabetes2.csv, 2 files out of an estimated total of 2
Uploaded 2 files
Dataset registered.


## Create scripts for pipeline steps

Pipelines consist of one or more *steps*, which can be Python scripts, or specialized steps like a data transfer step that copies data from one location to another. Each step can run in its own compute context. In this exercise, you'll build a simple pipeline that contains two Python script steps: one to pre-process some training data, and another to use the pre-processed data to train and register a model.

First, let's create a folder for the script files we'll use in the pipeline steps.

In [4]:
import os
# Create a folder for the pipeline step files
experiment_folder = 'diabetes_pipeline'
os.makedirs(experiment_folder, exist_ok=True)

print(experiment_folder)

diabetes_pipeline


Now let's create the first script, which will read data from the diabetes dataset and apply some simple pre-processing to remove any rows with missing data and normalize the numeric features so they're on a similar scale.

The script includes a argument named **--prepped-data**, which references the folder where the resulting data should be saved.

In [5]:
%%writefile $experiment_folder/prep_diabetes.py
# Import libraries
import os
import argparse
import pandas as pd
from azureml.core import Run
from sklearn.preprocessing import MinMaxScaler

# Get parameters
parser = argparse.ArgumentParser()
parser.add_argument("--input-data", type=str, dest='raw_dataset_id', help='raw dataset')
parser.add_argument('--prepped-data', type=str, dest='prepped_data', default='prepped_data', help='Folder for results')
args = parser.parse_args()
save_folder = args.prepped_data

# Get the experiment run context
run = Run.get_context()

# load the data (passed as an input dataset)
print("Loading Data...")
diabetes = run.input_datasets['raw_data'].to_pandas_dataframe()

# Log raw row count
row_count = (len(diabetes))
run.log('raw_rows', row_count)

# remove nulls
diabetes = diabetes.dropna()

# Normalize the numeric columns
scaler = MinMaxScaler()
num_cols = ['Pregnancies','PlasmaGlucose','DiastolicBloodPressure','TricepsThickness','SerumInsulin','BMI','DiabetesPedigree']
diabetes[num_cols] = scaler.fit_transform(diabetes[num_cols])

# Log processed rows
row_count = (len(diabetes))
run.log('processed_rows', row_count)

# Save the prepped data
print("Saving Data...")
os.makedirs(save_folder, exist_ok=True)
save_path = os.path.join(save_folder,'data.csv')
diabetes.to_csv(save_path, index=False, header=True)

# End the run
run.complete()

Overwriting diabetes_pipeline/prep_diabetes.py


Now you can create the script for the second step, which will train a model. The script includes a argument named **--training-data**, which references the location where the prepared data was saved by the previous step.

In [6]:
%%writefile $experiment_folder/train_diabetes.py
# Import libraries
from azureml.core import Run, Model
import argparse
import pandas as pd
import numpy as np
import joblib
import os
from sklearn.model_selection import train_test_split
from sklearn.tree import DecisionTreeClassifier
from sklearn.metrics import roc_auc_score
from sklearn.metrics import roc_curve
import matplotlib.pyplot as plt

# Get parameters
parser = argparse.ArgumentParser()
parser.add_argument("--training-data", type=str, dest='training_data', help='training data')
args = parser.parse_args()
training_data = args.training_data

# Get the experiment run context
run = Run.get_context()

# load the prepared data file in the training folder
print("Loading Data...")
file_path = os.path.join(training_data,'data.csv')
diabetes = pd.read_csv(file_path)

# Separate features and labels
X, y = diabetes[['Pregnancies','PlasmaGlucose','DiastolicBloodPressure',
                 'TricepsThickness','SerumInsulin','BMI','DiabetesPedigree','Age']].values, diabetes['Diabetic'].values

# Split data into training set and test set
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.30, random_state=0)

# Train adecision tree model
print('Training a decision tree model...')
model = DecisionTreeClassifier().fit(X_train, y_train)

# calculate accuracy
y_hat = model.predict(X_test)
acc = np.average(y_hat == y_test)
print('Accuracy:', acc)
run.log('Accuracy', np.float(acc))

# calculate AUC
y_scores = model.predict_proba(X_test)
auc = roc_auc_score(y_test,y_scores[:,1])
print('AUC: ' + str(auc))
run.log('AUC', np.float(auc))

# plot ROC curve
fpr, tpr, thresholds = roc_curve(y_test, y_scores[:,1])
fig = plt.figure(figsize=(6, 4))
# Plot the diagonal 50% line
plt.plot([0, 1], [0, 1], 'k--')
# Plot the FPR and TPR achieved by our model
plt.plot(fpr, tpr)
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('ROC Curve')
run.log_image(name = "ROC", plot = fig)
plt.show()

# Save the trained model in the outputs folder
print("Saving model...")
os.makedirs('outputs', exist_ok=True)
model_file = os.path.join('outputs', 'diabetes_model.pkl')
joblib.dump(value=model, filename=model_file)

# Register the model
print('Registering model...')
Model.register(workspace=run.experiment.workspace,
               model_path = model_file,
               model_name = 'diabetes_model',
               tags={'Training context':'Pipeline'},
               properties={'AUC': np.float(auc), 'Accuracy': np.float(acc)})


run.complete()

Overwriting diabetes_pipeline/train_diabetes.py


## Prepare a compute environment for the pipeline steps

In this exercise, you'll use the same compute for both steps, but it's important to realize that each step is run independently; so you could specify different compute contexts for each step if appropriate.

First, get the compute target you created in a previous lab (if it doesn't exist, it will be created).

> **Important**: Change *your-compute-cluster* to the name of your compute cluster in the code below before running it! Cluster names must be globally unique names between 2 to 16 characters in length. Valid characters are letters, digits, and the - character.

In [7]:
from azureml.core.compute import ComputeTarget, AmlCompute
from azureml.core.compute_target import ComputeTargetException

cluster_name = "agcluster"

try:
    # Check for existing compute target
    pipeline_cluster = ComputeTarget(workspace=ws, name=cluster_name)
    print('Found existing cluster, use it.')
except ComputeTargetException:
    # If it doesn't already exist, create it
    try:
        compute_config = AmlCompute.provisioning_configuration(vm_size='STANDARD_DS11_V2', max_nodes=2)
        pipeline_cluster = ComputeTarget.create(ws, cluster_name, compute_config)
        pipeline_cluster.wait_for_completion(show_output=True)
    except Exception as ex:
        print(ex)
    

InProgress......
SucceededProvisioning operation finished, operation "Succeeded"
Succeeded
AmlCompute wait for completion finished

Minimum number of nodes requested have been provisioned


> **Note**: Compute instances and clusters are based on standard Azure virtual machine images. For this exercise, the *Standard_DS11_v2* image is recommended to achieve the optimal balance of cost and performance. If your subscription has a quota that does not include this image, choose an alternative image; but bear in mind that a larger image may incur higher cost and a smaller image may not be sufficient to complete the tasks. Alternatively, ask your Azure administrator to extend your quota.

The compute will require a Python environment with the necessary package dependencies installed.

In [8]:
%%writefile $experiment_folder/experiment_env.yml
name: experiment_env
dependencies:
- python=3.6.2
- scikit-learn
- ipykernel
- matplotlib
- pandas
- pip
- pip:
  - azureml-defaults
  - pyarrow

Overwriting diabetes_pipeline/experiment_env.yml


Now that you have a Conda configuration file, you can create an environment and use it in the run configuration for the pipeline.

In [9]:
from azureml.core import Environment
from azureml.core.runconfig import RunConfiguration

# Create a Python environment for the experiment (from a .yml file)
experiment_env = Environment.from_conda_specification("experiment_env", experiment_folder + "/experiment_env.yml")

# Register the environment 
experiment_env.register(workspace=ws)
registered_env = Environment.get(ws, 'experiment_env')

# Create a new runconfig object for the pipeline
pipeline_run_config = RunConfiguration()

# Use the compute you created above. 
pipeline_run_config.target = pipeline_cluster

# Assign the environment to the run configuration
pipeline_run_config.environment = registered_env

print ("Run configuration created.")

Run configuration created.


## Create and run a pipeline

Now you're ready to create and run a pipeline.

First you need to define the steps for the pipeline, and any data references that need to be passed between them. In this case, the first step must write the prepared data to a folder that can be read from by the second step. Since the steps will be run on remote compute (and in fact, could each be run on different compute), the folder path must be passed as a data reference to a location in a datastore within the workspace. The **OutputFileDatasetConfig** object is a special kind of data reference that is used for interim storage locations that can be passed between pipeline steps, so you'll create one and use at as the output for the first step and the input for the second step. Note that you need to pass it as a script argument so your code can access the datastore location referenced by the data reference.

In [10]:
from azureml.data import OutputFileDatasetConfig
from azureml.pipeline.steps import PythonScriptStep

# Get the training dataset
diabetes_ds = ws.datasets.get("diabetes dataset")

# Create an OutputFileDatasetConfig (temporary Data Reference) for data passed from step 1 to step 2
prepped_data = OutputFileDatasetConfig("prepped_data")

# Step 1, Run the data prep script
prep_step = PythonScriptStep(name = "Prepare Data",
                                source_directory = experiment_folder,
                                script_name = "prep_diabetes.py",
                                arguments = ['--input-data', diabetes_ds.as_named_input('raw_data'),
                                             '--prepped-data', prepped_data],
                                compute_target = pipeline_cluster,
                                runconfig = pipeline_run_config,
                                allow_reuse = True)

# Step 2, run the training script
train_step = PythonScriptStep(name = "Train and Register Model",
                                source_directory = experiment_folder,
                                script_name = "train_diabetes.py",
                                arguments = ['--training-data', prepped_data.as_input()],
                                compute_target = pipeline_cluster,
                                runconfig = pipeline_run_config,
                                allow_reuse = True)

print("Pipeline steps defined")

Pipeline steps defined


OK, you're ready build the pipeline from the steps you've defined and run it as an experiment.

In [11]:
from azureml.core import Experiment
from azureml.pipeline.core import Pipeline
from azureml.widgets import RunDetails

# Construct the pipeline
pipeline_steps = [prep_step, train_step]
pipeline = Pipeline(workspace=ws, steps=pipeline_steps)
print("Pipeline is built.")

# Create an experiment and run the pipeline
experiment = Experiment(workspace=ws, name = 'mslearn-diabetes-pipeline')
pipeline_run = experiment.submit(pipeline, regenerate_outputs=True)
print("Pipeline submitted for execution.")
RunDetails(pipeline_run).show()
pipeline_run.wait_for_completion(show_output=True)

Pipeline is built.
Created step Prepare Data [7e0e6b92][816010c3-1309-4730-bbdf-45b42f5380a5], (This step will run and generate new outputs)
Created step Train and Register Model [a8f6abd2][c94153b9-da54-4fd4-bb12-3ad9a9488c0a], (This step will run and generate new outputs)
Submitted PipelineRun f3a9be5f-c90c-41ca-8435-c89f1b3f7c37
Link to Azure Machine Learning Portal: https://ml.azure.com/runs/f3a9be5f-c90c-41ca-8435-c89f1b3f7c37?wsid=/subscriptions/6ea869be-bab3-4204-94c3-1fc677f7d2de/resourcegroups/rgag/workspaces/wsag&tid=0f942ca0-ebef-4f26-80f8-f501d599ba90
Pipeline submitted for execution.


_PipelineWidget(widget_settings={'childWidgetDisplay': 'popup', 'send_telemetry': False, 'log_level': 'INFO', …

PipelineRunId: f3a9be5f-c90c-41ca-8435-c89f1b3f7c37
Link to Azure Machine Learning Portal: https://ml.azure.com/runs/f3a9be5f-c90c-41ca-8435-c89f1b3f7c37?wsid=/subscriptions/6ea869be-bab3-4204-94c3-1fc677f7d2de/resourcegroups/rgag/workspaces/wsag&tid=0f942ca0-ebef-4f26-80f8-f501d599ba90
PipelineRun Status: NotStarted
PipelineRun Status: Running


StepRunId: 5bae0eed-1d23-4487-81d8-af48acc034e4
Link to Azure Machine Learning Portal: https://ml.azure.com/runs/5bae0eed-1d23-4487-81d8-af48acc034e4?wsid=/subscriptions/6ea869be-bab3-4204-94c3-1fc677f7d2de/resourcegroups/rgag/workspaces/wsag&tid=0f942ca0-ebef-4f26-80f8-f501d599ba90
StepRun( Prepare Data ) Status: NotStarted
StepRun( Prepare Data ) Status: Running

Streaming azureml-logs/20_image_build_log.txt
2021/08/22 03:59:04 Downloading source code...
2021/08/22 03:59:05 Finished downloading source code
2021/08/22 03:59:05 Creating Docker network: acb_default_network, driver: 'bridge'
2021/08/22 03:59:06 Successfully set up Docker network


qt-5.9.6             | 67.3 MB   |            |   0% 
qt-5.9.6             | 67.3 MB   |            |   0% 
qt-5.9.6             | 67.3 MB   |            |   0% 
qt-5.9.6             | 67.3 MB   |            |   0% 
qt-5.9.6             | 67.3 MB   |            |   1% 
qt-5.9.6             | 67.3 MB   |            |   1% 
qt-5.9.6             | 67.3 MB   | 1          |   1% 
qt-5.9.6             | 67.3 MB   | 1          |   2% 
qt-5.9.6             | 67.3 MB   | 2          |   2% 
qt-5.9.6             | 67.3 MB   | 3          |   3% 
qt-5.9.6             | 67.3 MB   | 4          |   4% 
qt-5.9.6             | 67.3 MB   | 5          |   5% 
qt-5.9.6             | 67.3 MB   | 6          |   7% 
qt-5.9.6             | 67.3 MB   | 7          |   8% 
qt-5.9.6             | 67.3 MB   | 9          |   9% 
qt-5.9.6             | 67.3 MB   | #          |  10% 
qt-5.9.6             | 67.3 MB   | #1         |  11% 
qt-5.9.6             | 67.3 MB   | #2         |  13% 
qt-5.9.6             | 67.3


_libgcc_mutex-0.1    | 3 KB      |            |   0% 
_libgcc_mutex-0.1    | 3 KB      | ########## | 100% 

pillow-8.3.1         | 637 KB    |            |   0% 
pillow-8.3.1         | 637 KB    | 2          |   3% 
pillow-8.3.1         | 637 KB    | ##2        |  23% 
pillow-8.3.1         | 637 KB    | ####2      |  43% 
pillow-8.3.1         | 637 KB    | ######2    |  63% 
pillow-8.3.1         | 637 KB    | #########5 |  95% 
pillow-8.3.1         | 637 KB    | ########## | 100% 

jupyter_client-6.1.1 | 88 KB     |            |   0% 
jupyter_client-6.1.1 | 88 KB     | ########## | 100% 

pyzmq-22.2.1         | 454 KB    |            |   0% 
pyzmq-22.2.1         | 454 KB    | 3          |   4% 
pyzmq-22.2.1         | 454 KB    | ###1       |  32% 
pyzmq-22.2.1         | 454 KB    | #######3   |  74% 
pyzmq-22.2.1         | 454 KB    | ########## | 100% 

python-3.6.2         | 23.6 MB   |            |   0% 
python-3.6.2         | 23.6 MB   | 4          |   5% 
python-3.6.2         | 


Removing intermediate container 239dce1fa6f4
 ---> 9e8aa674dfc4
Step 9/18 : ENV PATH /azureml-envs/azureml_0c5a9aa2def4b3c2501c1f40287a356b/bin:$PATH
 ---> Running in 391c884bd921
Removing intermediate container 391c884bd921
 ---> a124638669cd
Step 10/18 : COPY azureml-environment-setup/send_conda_dependencies.py azureml-environment-setup/send_conda_dependencies.py
 ---> 244b76760521
Step 11/18 : COPY azureml-environment-setup/environment_context.json azureml-environment-setup/environment_context.json
 ---> 53c8f4eb242d
Step 12/18 : RUN python /azureml-environment-setup/send_conda_dependencies.py -p /azureml-envs/azureml_0c5a9aa2def4b3c2501c1f40287a356b
 ---> Running in a5913b8941aa
Report materialized dependencies for the environment
Reading environment context
Exporting conda environment
Sending request with materialized conda environment details
Successfully sent materialized environment details
Removing intermediate container a5913b8941aa
 ---> 9f96327f12f3
Step 13/18 : ENV AZUREM

latest: Pulling from azureml/azureml_1e5b59c0734bdc528077f509e1d397fe
92473f7ef455: Pulling fs layer
fb52bde70123: Pulling fs layer
64788f86be3f: Pulling fs layer
33f6d5f2e001: Pulling fs layer
eeb715f1b6ae: Pulling fs layer
fe519cf36537: Pulling fs layer
58ff99196c15: Pulling fs layer
9b13f06a8eff: Pulling fs layer
2d4e93adbf58: Pulling fs layer
6ee7c3767844: Pulling fs layer
62cfc3ccb8ab: Pulling fs layer
4a7af9d757ee: Pulling fs layer
886c27cf0865: Pulling fs layer
7c9062f12448: Pulling fs layer
7268e886f68a: Pulling fs layer
e1fdaab561c7: Pulling fs layer
ccb2816215bd: Pulling fs layer
55d70b17f345: Pulling fs layer
6ee7c3767844: Waiting
62cfc3ccb8ab: Waiting
4a7af9d757ee: Waiting
886c27cf0865: Waiting
7c9062f12448: Waiting
7268e886f68a: Waiting
e1fdaab561c7: Waiting
ccb2816215bd: Waiting
55d70b17f345: Waiting
33f6d5f2e001: Waiting
eeb715f1b6ae: Waiting
fe519cf36537: Waiting
58ff99196c15: Waiting
9b13f06a8eff: Waiting
2d4e93adbf58: Waiting
64788f86be3f: Verifying Checksum
64788f86b


Streaming azureml-logs/75_job_post-tvmps_15999753d9cbf062a77ff62962351986939d769bd06ad32925c4abff9dd587a3_d.txt
[2021-08-22T04:14:50.384831] Entering job release
[2021-08-22T04:14:51.184351] Starting job release
[2021-08-22T04:14:51.185017] Logging experiment finalizing status in history service.[2021-08-22T04:14:51.185199] job release stage : upload_datastore starting...
Starting the daemon thread to refresh tokens in background for process with pid = 225
[2021-08-22T04:14:51.185608] job release stage : start importing azureml.history._tracking in run_history_release.

[2021-08-22T04:14:51.185918] job release stage : execute_job_release starting...
[2021-08-22T04:14:51.186285] job release stage : copy_batchai_cached_logs starting...
[2021-08-22T04:14:51.186329] job release stage : copy_batchai_cached_logs completed...
[2021-08-22T04:14:51.189767] Entering context manager injector.
[2021-08-22T04:14:51.203815] job release stage : upload_datastore completed...
[2021-08-22T04:14:51.2799




StepRunId: 1c11d6b2-1e28-47b3-a7e9-1d1bfcdee7f2
Link to Azure Machine Learning Portal: https://ml.azure.com/runs/1c11d6b2-1e28-47b3-a7e9-1d1bfcdee7f2?wsid=/subscriptions/6ea869be-bab3-4204-94c3-1fc677f7d2de/resourcegroups/rgag/workspaces/wsag&tid=0f942ca0-ebef-4f26-80f8-f501d599ba90
StepRun( Train and Register Model ) Status: Running

Streaming azureml-logs/55_azureml-execution-tvmps_15999753d9cbf062a77ff62962351986939d769bd06ad32925c4abff9dd587a3_d.txt
2021-08-22T04:15:25Z Successfully mounted a/an Blobfuse File System at /mnt/batch/tasks/shared/LS_root/jobs/wsag/azureml/1c11d6b2-1e28-47b3-a7e9-1d1bfcdee7f2/mounts/workspaceblobstore
2021-08-22T04:15:26Z The vmsize standard_ds11_v2 is not a GPU VM, skipping get GPU count by running nvidia-smi command.
2021-08-22T04:15:26Z Starting output-watcher...
2021-08-22T04:15:26Z IsDedicatedCompute == True, won't poll for Low Pri Preemption
2021-08-22T04:15:26Z Executing 'Copy ACR Details file' on 10.0.0.5
2021-08-22T04:15:26Z Copy ACR Details


Streaming azureml-logs/70_driver_log.txt
2021/08/22 04:15:49 Starting App Insight Logger for task:  runTaskLet
2021/08/22 04:15:49 Version: 3.0.01678.0001 Branch: 2021-08-06 Commit: fee6fc3
2021/08/22 04:15:49 Attempt 1 of http call to http://10.0.0.5:16384/sendlogstoartifacts/info
2021/08/22 04:15:49 Send process info logs to master server succeeded
2021/08/22 04:15:49 Attempt 1 of http call to http://10.0.0.5:16384/sendlogstoartifacts/status
2021/08/22 04:15:49 Send process info logs to master server succeeded
[2021-08-22T04:15:49.304874] Entering context manager injector.
[2021-08-22T04:15:49.847808] context_manager_injector.py Command line Options: Namespace(inject=['ProjectPythonPath:context_managers.ProjectPythonPath', 'Dataset:context_managers.Datasets', 'RunHistory:context_managers.RunHistory', 'TrackUserError:context_managers.TrackUserError'], invocation=['train_diabetes.py', '--training-data', 'DatasetConsumptionConfig:input_c7f96f24'])
Script type = None
[2021-08-22T04:15:4



PipelineRun Execution Summary
PipelineRun Status: Finished
{'runId': 'f3a9be5f-c90c-41ca-8435-c89f1b3f7c37', 'status': 'Completed', 'startTimeUtc': '2021-08-22T03:58:44.27973Z', 'endTimeUtc': '2021-08-22T04:16:15.210657Z', 'properties': {'azureml.runsource': 'azureml.PipelineRun', 'runSource': 'SDK', 'runType': 'SDK', 'azureml.parameters': '{}'}, 'inputDatasets': [], 'outputDatasets': [], 'logFiles': {'logs/azureml/executionlogs.txt': 'https://wsag8535920275.blob.core.windows.net/azureml/ExperimentRun/dcid.f3a9be5f-c90c-41ca-8435-c89f1b3f7c37/logs/azureml/executionlogs.txt?sv=2019-07-07&sr=b&sig=VPlGDYBjzPZaJgUeJCHg1ozrUC5L6tWLndzYSE49YR8%3D&st=2021-08-22T03%3A48%3A59Z&se=2021-08-22T11%3A58%3A59Z&sp=r', 'logs/azureml/stderrlogs.txt': 'https://wsag8535920275.blob.core.windows.net/azureml/ExperimentRun/dcid.f3a9be5f-c90c-41ca-8435-c89f1b3f7c37/logs/azureml/stderrlogs.txt?sv=2019-07-07&sr=b&sig=d1ZVq87x%2F%2BJ%2FyPdWJDsihbA33EhSnq9R%2FCG%2FkuZD19c%3D&st=2021-08-22T03%3A48%3A59Z&se=2021-

'Finished'

A graphical representation of the pipeline experiment will be displayed in the widget as it runs. Keep an eye on the kernel indicator at the top right of the page, when it turns from **&#9899;** to **&#9711;**, the code has finished running. You can also monitor pipeline runs in the **Experiments** page in [Azure Machine Learning studio](https://ml.azure.com).

When the pipeline has finished, you can examine the metrics recorded by it's child runs.

In [12]:
for run in pipeline_run.get_children():
    print(run.name, ':')
    metrics = run.get_metrics()
    for metric_name in metrics:
        print('\t',metric_name, ":", metrics[metric_name])

Train and Register Model :
	 Accuracy : 0.8975555555555556
	 AUC : 0.8824313108907177
	 ROC : aml://artifactId/ExperimentRun/dcid.1c11d6b2-1e28-47b3-a7e9-1d1bfcdee7f2/ROC_1629605752.png
Prepare Data :
	 raw_rows : 15000
	 processed_rows : 15000


Assuming the pipeline was successful, a new model should be registered with a *Training context* tag indicating it was trained in a pipeline. Run the following code to verify this.

In [13]:
from azureml.core import Model

for model in Model.list(ws):
    print(model.name, 'version:', model.version)
    for tag_name in model.tags:
        tag = model.tags[tag_name]
        print ('\t',tag_name, ':', tag)
    for prop_name in model.properties:
        prop = model.properties[prop_name]
        print ('\t',prop_name, ':', prop)
    print('\n')

diabetes_model version: 1
	 Training context : Pipeline
	 AUC : 0.8824313108907177
	 Accuracy : 0.8975555555555556




## Publish the pipeline

After you've created and tested a pipeline, you can publish it as a REST service.

In [14]:
# Publish the pipeline from the run
published_pipeline = pipeline_run.publish_pipeline(
    name="diabetes-training-pipeline", description="Trains diabetes model", version="1.0")

published_pipeline

Name,Id,Status,Endpoint
diabetes-training-pipeline,0ffd4abe-3ce5-43cd-842c-5f1778d3ac4a,Active,REST Endpoint


Note that the published pipeline has an endpoint, which you can see in the **Endpoints** page (on the **Pipeline Endpoints** tab) in [Azure Machine Learning studio](https://ml.azure.com). You can also find its URI as a property of the published pipeline object:

In [15]:
rest_endpoint = published_pipeline.endpoint
print(rest_endpoint)

https://centralindia.api.azureml.ms/pipelines/v1.0/subscriptions/6ea869be-bab3-4204-94c3-1fc677f7d2de/resourceGroups/rgag/providers/Microsoft.MachineLearningServices/workspaces/wsag/PipelineRuns/PipelineSubmit/0ffd4abe-3ce5-43cd-842c-5f1778d3ac4a


## Call the pipeline endpoint

To use the endpoint, client applications need to make a REST call over HTTP. This request must be authenticated, so an authorization header is required. A real application would require a service principal with which to be authenticated, but to test this out, we'll use the authorization header from your current connection to your Azure workspace, which you can get using the following code:

In [16]:
from azureml.core.authentication import InteractiveLoginAuthentication

interactive_auth = InteractiveLoginAuthentication()
auth_header = interactive_auth.get_authentication_header()
print("Authentication header ready.")

Authentication header ready.


Now we're ready to call the REST interface. The pipeline runs asynchronously, so we'll get an identifier back, which we can use to track the pipeline experiment as it runs:

In [17]:
import requests

experiment_name = 'mslearn-diabetes-pipeline'

rest_endpoint = published_pipeline.endpoint
response = requests.post(rest_endpoint, 
                         headers=auth_header, 
                         json={"ExperimentName": experiment_name})
run_id = response.json()["Id"]
run_id

'011144ad-bc6d-4e57-a487-cd7f0db70972'

Since you have the run ID, you can use it to wait for the run to complete.

> **Note**: The pipeline should complete quickly, because each step was configured to allow output reuse. This was done primarily for convenience and to save time in this course. In reality, you'd likely want the first step to run every time in case the data has changed, and trigger the subsequent steps only if the output from step one changes.

In [18]:
from azureml.pipeline.core.run import PipelineRun

published_pipeline_run = PipelineRun(ws.experiments[experiment_name], run_id)
published_pipeline_run.wait_for_completion(show_output=True)

PipelineRunId: 011144ad-bc6d-4e57-a487-cd7f0db70972
Link to Azure Machine Learning Portal: https://ml.azure.com/runs/011144ad-bc6d-4e57-a487-cd7f0db70972?wsid=/subscriptions/6ea869be-bab3-4204-94c3-1fc677f7d2de/resourcegroups/rgag/workspaces/wsag&tid=0f942ca0-ebef-4f26-80f8-f501d599ba90

PipelineRun Execution Summary
PipelineRun Status: Finished
{'runId': '011144ad-bc6d-4e57-a487-cd7f0db70972', 'status': 'Completed', 'startTimeUtc': '2021-08-22T04:17:39.35602Z', 'endTimeUtc': '2021-08-22T04:17:40.95673Z', 'properties': {'azureml.runsource': 'azureml.PipelineRun', 'runSource': 'Unavailable', 'runType': 'HTTP', 'azureml.parameters': '{}', 'azureml.pipelineid': '0ffd4abe-3ce5-43cd-842c-5f1778d3ac4a'}, 'inputDatasets': [], 'outputDatasets': [], 'logFiles': {'logs/azureml/executionlogs.txt': 'https://wsag8535920275.blob.core.windows.net/azureml/ExperimentRun/dcid.011144ad-bc6d-4e57-a487-cd7f0db70972/logs/azureml/executionlogs.txt?sv=2019-07-07&sr=b&sig=jJHEhcumGdnjDx0BQkGzKBT%2B8UWwQplRSmCq

'Finished'

## Schedule the Pipeline

Suppose the clinic for the diabetes patients collects new data each week, and adds it to the dataset. You could run the pipeline every week to retrain the model with the new data.

In [19]:
from azureml.pipeline.core import ScheduleRecurrence, Schedule

# Submit the Pipeline every Monday at 00:00 UTC
recurrence = ScheduleRecurrence(frequency="Week", interval=1, week_days=["Monday"], time_of_day="00:00")
weekly_schedule = Schedule.create(ws, name="weekly-diabetes-training", 
                                  description="Based on time",
                                  pipeline_id=published_pipeline.id, 
                                  experiment_name='mslearn-diabetes-pipeline', 
                                  recurrence=recurrence)
print('Pipeline scheduled.')

Pipeline scheduled.


You can retrieve the schedules that are defined in the workspace like this:

In [20]:
schedules = Schedule.list(ws)
schedules

[Pipeline(Name: weekly-diabetes-training,
 Id: 9ed09a93-852c-4436-9dd8-bffceff09ac1,
 Status: Active,
 Pipeline Id: 0ffd4abe-3ce5-43cd-842c-5f1778d3ac4a,
 Pipeline Endpoint Id: None,
 Recurrence Details: Runs at 0:00 on Monday every Week)]

You can check the latest run like this:

In [21]:
pipeline_experiment = ws.experiments.get('mslearn-diabetes-pipeline')
latest_run = list(pipeline_experiment.get_runs())[0]

latest_run.get_details()

{'runId': '8dff8bf7-f352-40cb-b39e-9bca18151142',
 'status': 'Completed',
 'startTimeUtc': '2021-08-22T04:17:51.268176Z',
 'endTimeUtc': '2021-08-22T04:17:52.943038Z',
 'properties': {'azureml.git.repository_uri': 'https://github.com/MicrosoftLearning/mslearn-dp100',
  'mlflow.source.git.repoURL': 'https://github.com/MicrosoftLearning/mslearn-dp100',
  'azureml.git.branch': 'main',
  'mlflow.source.git.branch': 'main',
  'azureml.git.commit': 'd82038dd1ffb2a2d0fa0f91dba7ca84099924eb4',
  'mlflow.source.git.commit': 'd82038dd1ffb2a2d0fa0f91dba7ca84099924eb4',
  'azureml.git.dirty': 'True',
  'azureml.runsource': 'azureml.PipelineRun',
  'runSource': 'Unavailable',
  'runType': 'Schedule',
  'azureml.parameters': '{}',
  'azureml.pipelineid': '0ffd4abe-3ce5-43cd-842c-5f1778d3ac4a'},
 'inputDatasets': [],
 'outputDatasets': [],
 'logFiles': {'logs/azureml/executionlogs.txt': 'https://wsag8535920275.blob.core.windows.net/azureml/ExperimentRun/dcid.8dff8bf7-f352-40cb-b39e-9bca18151142/logs/

This is a simple example, designed to demonstrate the principle. In reality, you could build more sophisticated logic into the pipeline steps - for example, evaluating the model against some test data to calculate a performance metric like AUC or accuracy, comparing the metric to that of any previously registered versions of the model, and only registering the new model if it performs better.

You can use the [Azure Machine Learning extension for Azure DevOps](https://marketplace.visualstudio.com/items?itemName=ms-air-aiagility.vss-services-azureml) to combine Azure ML pipelines with Azure DevOps pipelines (yes, it *is* confusing that they have the same name!) and integrate model retraining into a *continuous integration/continuous deployment (CI/CD)* process. For example you could use an Azure DevOps *build* pipeline to trigger an Azure ML pipeline that trains and registers a model, and when the model is registered it could trigger an Azure Devops *release* pipeline that deploys the model as a web service, along with the application or service that consumes the model.