# Machine Learning Pipelines

A Machine Learning pipeline is a set of independent steps that each can have different compute, data, environment, etc. dependencies. Each step is treated as a separate run and all of the four aspects of a run (metrics, logs, snapshot and outputs) are stored independently. After the pipeline is created, it can be published as an API for later reuse.

This gives you huge benefit as you have ultimate control over defining each step of the pipeline in a way that serves best for that particular step.

ML Pipelines are some type of workflow orchestration tools. The main difference between ML Pipelines and regular job workflows (such as Azure Data Factory or Airflow) is that this workflow engine is designed to address ML needs. Therefore, if you have non-ML related workflow it's recommended you use generic workflow engines. You may call an ML Pipeline from a generic pipeline as a step.

In this tutorial, we want to treat the MNIST model we trained in the previous example like a serious ML problem. We'd like to break the task into three parts:
1. Downloading the dataset from the Yann Lecun website, unzip and normalize it and save it in a shared storage
2. Train our 2-Layer Neural Net on the normalized data
3. Register the model in case the performance of the new model is higher than the latest version of the model in the Model Registry
4. Publish the pipeline as a bundle of the above steps as an API Endpoint

### 1. Defining an ML Pipeline

In [1]:
import azureml.core
from azureml.core import Workspace, Experiment, Datastore
from azureml.core.compute import AmlCompute
from azureml.core.compute import ComputeTarget
from azureml.widgets import RunDetails

# Check core SDK version number
print("SDK version:", azureml.core.VERSION)

from azureml.data.data_reference import DataReference
from azureml.pipeline.core import Pipeline, PipelineData
from azureml.pipeline.steps import PythonScriptStep
print("Pipeline SDK-specific imports completed")

SDK version: 1.0.65
Pipeline SDK-specific imports completed


In [2]:
# Your subscription ID will be different replace the stirng with yours
subscription_id = "ed70929a-e125-4daf-8945-04f709d2c75e" 
resource_group = "MLOpsWorkshop"
workspace_name = "FirstExample"
workspace_region = "westus2"

In [3]:
# import the Workspace class and check the azureml SDK version
# exist_ok checks if workspace exists or not.

from azureml.core import Workspace

ws = Workspace(workspace_name = workspace_name,
               subscription_id = subscription_id,
               resource_group = resource_group)

# persist the subscription id, resource group name, and workspace name in aml_config/config.json.
ws.write_config()

In [4]:
ws = Workspace.from_config()
print('Workspace name: ' + ws.name, 
      'Azure region: ' + ws.location, 
      'Subscription id: ' + ws.subscription_id, 
      'Resource group: ' + ws.resource_group, sep = '\n')


Workspace name: FirstExample
Azure region: westus2
Subscription id: ed70929a-e125-4daf-8945-04f709d2c75e
Resource group: MLOpsWorkshop


### Data Stores

As described in the previous section, Datastores are attached to workspaces and are used to store connection information to Azure storage services so you can refer to them by name and don't need to remember the connection information and secret used to connect to the storage services.

https://docs.microsoft.com/en-us/python/api/azureml-core/azureml.core.datastore.datastore?view=azure-ml-py

To explore the registered Datastores in your Workspace, login to **ml.azure.com**, in the left pane, under **Manage** section click on the **Datastores**. By default, you have two data stores registered, **workspaceblobstore** and **workspacefilestore**.

We skip the **workspacefilestore** for now and only use **workspaceblobstore** for this exercise. **workspaceblobstore** is referring to the default Blob storage that is created at the creation of Workspace. **Blob storage** is a type of storage account that is used to keep any type of data from binary (image files) to csv or parquet (It's similar to **AWS S3**). At any time you can register a new Azure's storage account at your workspace.

In [5]:
# Retrieve the pointer to the default Blob storage.

def_blob_store = Datastore(ws, "workspaceblobstore")

## The code below also yields the same result:
# def_blob_store = ws.get_default_datastore()

print("Blobstore's name: {}".format(def_blob_store.name))

Blobstore's name: workspaceblobstore


In [6]:
# An object from DataReference class represents a path within a Datastore. So in the example below, you're explaining that the MNIST data should be available in the **mnist_datainput** parth under the **workspaceblobstore** container in the Azure storage account.

In [7]:
# blob_input_data = DataReference(
#     datastore=def_blob_store,
#     data_reference_name="mnist_datainput",
#     path_on_datastore="mnist_datainput")
# 
# print("DataReference object created")

Making sure the compute targets are created.

In this example, we want to have two types of compute environment, the first compute type is a CPU type and the other is a GPU type cluster each with 1 node.

In [8]:
# Create a GPU cluster of type NV6 with 1 node. (due to subscription's limitations we stick to 1 node)

from azureml.core.compute import ComputeTarget, AmlCompute
from azureml.core.compute_target import ComputeTargetException

# choose a name for your cluster
cluster_name = "cpucluster"

try:
    compute_target_cpu = ComputeTarget(workspace=ws, name=cluster_name)
    print('Found existing compute target.')
except ComputeTargetException:
    print('Creating a new compute target...')
    # CPU: Standard_D3_v2
    # GPU: Standard_NV6
    compute_config = AmlCompute.provisioning_configuration(vm_size='STANDARD_D2_V2', 
                                                           max_nodes=1,
                                                           min_nodes=1)

    # create the cluster
    compute_target_cpu = ComputeTarget.create(ws, cluster_name, compute_config)

    compute_target_cpu.wait_for_completion(show_output=True)

# use get_status() to get a detailed status for the current cluster. 
print(compute_target_cpu.get_status().serialize())

Found existing compute target.
{'currentNodeCount': 1, 'targetNodeCount': 1, 'nodeStateCounts': {'preparingNodeCount': 0, 'runningNodeCount': 0, 'idleNodeCount': 1, 'unusableNodeCount': 0, 'leavingNodeCount': 0, 'preemptedNodeCount': 0}, 'allocationState': 'Steady', 'allocationStateTransitionTime': '2019-10-17T15:35:14.803000+00:00', 'errors': None, 'creationTime': '2019-10-17T15:33:55.622217+00:00', 'modifiedTime': '2019-10-17T15:34:12.672529+00:00', 'provisioningState': 'Succeeded', 'provisioningStateTransitionTime': None, 'scaleSettings': {'minNodeCount': 1, 'maxNodeCount': 1, 'nodeIdleTimeBeforeScaleDown': ''}, 'vmPriority': 'Dedicated', 'vmSize': 'STANDARD_D2_V2'}


In [9]:
# choose a name for your cluster
cluster_name = "gpucluster"

try:
    compute_target_gpu = ComputeTarget(workspace=ws, name=cluster_name)
    print('Found existing compute target.')
except ComputeTargetException:
    print('Creating a new compute target...')
    # CPU: Standard_D3_v2
    # GPU: Standard_NV6
    compute_config = AmlCompute.provisioning_configuration(vm_size='Standard_NV6', 
                                                           max_nodes=1,
                                                           min_nodes=1)

    # create the cluster
    compute_target_gpu = ComputeTarget.create(ws, cluster_name, compute_config)

    compute_target_gpu.wait_for_completion(show_output=True)

# use get_status() to get a detailed status for the current cluster. 
print(compute_target_gpu.get_status().serialize())

Found existing compute target.
{'currentNodeCount': 1, 'targetNodeCount': 1, 'nodeStateCounts': {'preparingNodeCount': 0, 'runningNodeCount': 0, 'idleNodeCount': 1, 'unusableNodeCount': 0, 'leavingNodeCount': 0, 'preemptedNodeCount': 0}, 'allocationState': 'Steady', 'allocationStateTransitionTime': '2019-10-17T15:36:51.535000+00:00', 'errors': None, 'creationTime': '2019-10-17T15:35:17.993935+00:00', 'modifiedTime': '2019-10-17T15:35:33.992268+00:00', 'provisioningState': 'Succeeded', 'provisioningStateTransitionTime': None, 'scaleSettings': {'minNodeCount': 1, 'maxNodeCount': 1, 'nodeIdleTimeBeforeScaleDown': ''}, 'vmPriority': 'Dedicated', 'vmSize': 'STANDARD_NV6'}


In [10]:
cts = ws.compute_targets
for ct in cts:
    print(ct)


cpucluster
gpucluster


PipelineData is a way to define data dependancies in an ML Pipeline. In this example, we want to first download the MNIST data into a directory called raw_data and then save the processed and normalized numpy objects into a subdirectory called Processed. This PipelineData object will be used as the output of the first step named Data Extraction.

In [13]:
processed_mnist_data = PipelineData("processed_mnist_data", datastore=def_blob_store)
processed_mnist_data

$AZUREML_DATAREFERENCE_processed_mnist_data

As the first step is a regular Python script, and can be executed on a CPU node with no prepackaged ML environment requirment, we stick to the default configurations.

The configurations below, first deploys a CPU based linux docker image on the VM and then installs 'azureml-sdk' and 'numpy' packages.

In [14]:
from azureml.core.runconfig import RunConfiguration
from azureml.core.conda_dependencies import CondaDependencies
from azureml.core.runconfig import DEFAULT_CPU_IMAGE

# create a new runconfig object
run_config = RunConfiguration()

# enable Docker 
run_config.environment.docker.enabled = True

# set Docker base image to the default CPU-based image
run_config.environment.docker.base_image = DEFAULT_CPU_IMAGE

# use conda_dependencies.yml to create a conda environment in the Docker image for execution
run_config.environment.python.user_managed_dependencies = False

# specify CondaDependencies obj
run_config.environment.python.conda_dependencies = CondaDependencies.create(pip_packages=['azureml-sdk',
                                                                                          'numpy'])

Here we define the first step by defining an object from PythonScriptStep class. Under the hood, it calls the extract.py file as the entry script and we pass the **processed_mnist_data** object as a parameter to the script.

**outputs** parameter defines the data output dependencies that in this case, we have an object of DataPipeline **processed_mnist_data** as the output dependency.

As the script can run on a CPU node, we don't waste our money by running it on a GPU node. Therefore, we select **compute_target_cpu** as the target compute.

In [15]:
# source directory
source_directory = 'DataExtraction'

extractDataStep = PythonScriptStep(
    script_name="extract.py", 
    arguments=["--output_extract", processed_mnist_data],
    outputs=[processed_mnist_data],
    compute_target=compute_target_cpu, 
    source_directory=source_directory,
    runconfig=run_config)

print("Data Extraction Step created")

Data Extraction Step created


The next step is to run our Tensorflow job to train our very best MNIST classifier. As this is a TF job, we can leverage Estimator classes such as  **azureml.train.dnn.TensorFlow**. Moreover, the inputs argument instructs the Pipeline what should be the dependency before executing this step. As the **processed_mnist_data** PipelineData object is provided as the output for the step above and input for this step, the Pipeline engine will execute the Training step after the Data Extraction step.

As a TF job, we can leverage our GPU node to boost up the computational performance of the training step. So we provide the GPU cluster as the compute target.

The TF estimator support TF 1. If your script is based on TF 2, then you can use the **PythonScriptStep** and provide your custom docker image or pip install TF 2 on a base GPU image.

We provide two arguments, **release_id** and **model_name**. **release_id** helps us to logically tag the run to a number that later can be retrieved. You can think of the **release_id** as the version number. In the 3rd day, you'll use the release pipeline to populate the release_id. **model_name** as it's name suggests instructs the code on what name to use to save the model in the run->output section.


In [16]:
from azureml.train.dnn import TensorFlow

source_directory = 'Training'
est = TensorFlow(source_directory=source_directory,
                 compute_target=compute_target_gpu,
                 entry_script='train.py', 
                 use_gpu=True, 
                 framework_version='1.13')

In [17]:
from azureml.pipeline.steps import EstimatorStep

trainingStep = EstimatorStep(name="Training-Step",
                             estimator=est,
                             estimator_entry_script_arguments=["--input_data_location", processed_mnist_data,
                                                               '--batch-size', 50,
                                                               '--first-layer-neurons', 300,
                                                               '--second-layer-neurons', 100,
                                                               '--learning-rate', 0.01,
                                                               "--release_id", 0,
                                                               '--model_name', 'tf_mnist_pipeline.model'],
                             runconfig_pipeline_params=None,
                             inputs=[processed_mnist_data],
                             compute_target=compute_target_gpu)

print("Model Training Step is Completed")

Model Training Step is Completed


And finally, we want to evaluate and register the model. Similar to the first step, we only need a CPU node to accomplish this task as we're running a regular python script with no ML dependencies. So we instantiate an object from **PythonScriptStep** class and provide **evaluate_model.py** as the entry script. 

The two arguments we provided in the step above are used here to retrieve the model saved in the run->output section of the experiment. Using release id, we can retrieve all other models and check if this model is outperforming them or not. If not we don't register this model into the model registry.


In [18]:
# source directory
source_directory = 'RegisterModel'

modelEvalReg = PythonScriptStep(
    name="Evaluate and Register Model",
    script_name="evaluate_model.py", 
    arguments=["--release_id", 0,
               '--model_name', 'tf_mnist_pipeline.model'],
    compute_target=compute_target_cpu, 
    source_directory=source_directory,
    runconfig=run_config)
print("Model Evaluation and Registration Step is Created")

Model Evaluation and Registration Step is Created


As this step doesn't have any inputs or outputs data dependancies, the Pipeline engine will execute it in parallel with the prevous two steps. However, logically we should execute this after the training step. Therefore, we use the below command the instruct the Pipeline engine:

In [19]:
modelEvalReg.run_after(trainingStep)

As all of the pipeline steps are defined. We can build the Pipeline class which defines how the pipeline should be executed.

Pipelines are loosely coupled with Experiments. At run time you can define to which Experiment this pipeline execution should be connected. For this we define/connect to **MNIST-Model-Manual-Pipeline** experiment:

In [20]:
from azureml.pipeline.core import Pipeline
from azureml.core import Experiment
pipeline = Pipeline(workspace=ws, steps=[extractDataStep, trainingStep, modelEvalReg])
pipeline_run = Experiment(ws, 'MNIST-Model-Manual-Pipeline').submit(pipeline)



Created step extract.py [0fb292b5][ad2254cc-a96e-4d66-8cc7-904bc65d13dd], (This step is eligible to reuse a previous run's output)
Created step Training-Step [4a160f07][48133b84-c041-4533-8567-3bbcf8c20c18], (This step is eligible to reuse a previous run's output)
Created step Evaluate and Register Model [5743555b][fd36ff04-5ce3-47e9-873e-5298cc424cd2], (This step is eligible to reuse a previous run's output)
Submitted PipelineRun 286d1a23-bfe0-4c43-a3c3-bcfed9b6ec82
Link to Azure Portal: https://mlworkspace.azure.ai/portal/subscriptions/ed70929a-e125-4daf-8945-04f709d2c75e/resourceGroups/MLOpsWorkshop/providers/Microsoft.MachineLearningServices/workspaces/FirstExample/experiments/MNIST-Model-Manual-Pipeline/runs/286d1a23-bfe0-4c43-a3c3-bcfed9b6ec82


In [21]:
RunDetails(pipeline_run).show()

_PipelineWidget(widget_settings={'childWidgetDisplay': 'popup', 'send_telemetry': False, 'log_level': 'INFO', …

Similar to any run, Pipeline runs are non-blocking. However, if you automate this job, you like your code to wait until the pipeline is constructed and executed. So you'd use the below command to make the run execution blocking:

In [90]:
pipeline_run.wait_for_completion(show_output=True, raise_on_error=True)

PipelineRunId: c53161a7-6585-4d45-95ee-f074e3e7e28b
Link to Portal: https://mlworkspace.azure.ai/portal/subscriptions/ed70929a-e125-4daf-8945-04f709d2c75e/resourceGroups/MLOpsWorkshop/providers/Microsoft.MachineLearningServices/workspaces/FirstExample/experiments/MNIST-Model-Manual-Pipeline/runs/c53161a7-6585-4d45-95ee-f074e3e7e28b
PipelineRun Status: Running


StepRunId: b3438f6a-9cd0-4a4b-9698-9daf3d049153
Link to Portal: https://mlworkspace.azure.ai/portal/subscriptions/ed70929a-e125-4daf-8945-04f709d2c75e/resourceGroups/MLOpsWorkshop/providers/Microsoft.MachineLearningServices/workspaces/FirstExample/experiments/MNIST-Model-Manual-Pipeline/runs/b3438f6a-9cd0-4a4b-9698-9daf3d049153

StepRun(Training-Step) Execution Summary
StepRun( Training-Step ) Status: Finished
{'runId': 'b3438f6a-9cd0-4a4b-9698-9daf3d049153', 'target': 'gpucluster', 'status': 'Completed', 'startTimeUtc': '2019-10-17T17:54:22.091836Z', 'endTimeUtc': '2019-10-17T17:54:22.17208Z', 'properties': {'azureml.reusedruni




StepRunId: d35eec6e-a9cf-47fb-b2b9-032b6f87fb7e
Link to Portal: https://mlworkspace.azure.ai/portal/subscriptions/ed70929a-e125-4daf-8945-04f709d2c75e/resourceGroups/MLOpsWorkshop/providers/Microsoft.MachineLearningServices/workspaces/FirstExample/experiments/MNIST-Model-Manual-Pipeline/runs/d35eec6e-a9cf-47fb-b2b9-032b6f87fb7e
StepRun( Evaluate and Register Model ) Status: Running

Streaming azureml-logs/55_azureml-execution-tvmps_df3edb6d473a93f5e4858a69a3045c995e5260becc6d0079551a95c1edfa2df8_d.txt
2019-10-17T17:54:43Z Starting output-watcher...
Login Succeeded
Using default tag: latest
latest: Pulling from azureml/azureml_2f781612c8dcf36929784fc87648a323
Digest: sha256:d3a5ca68d19135170f1990120982b2003a6d66f6d6d6fcd751716173b029fb8a
Status: Image is up to date for firstexamplebb2402fa.azurecr.io/azureml/azureml_2f781612c8dcf36929784fc87648a323:latest

Streaming azureml-logs/65_job_prep-tvmps_df3edb6d473a93f5e4858a69a3045c995e5260becc6d0079551a95c1edfa2df8_d.txt
bash: /azureml-en



PipelineRun Execution Summary
PipelineRun Status: Finished
{'runId': 'c53161a7-6585-4d45-95ee-f074e3e7e28b', 'status': 'Completed', 'startTimeUtc': '2019-10-17T17:54:17.632135Z', 'endTimeUtc': '2019-10-17T17:55:29.406801Z', 'properties': {'azureml.runsource': 'azureml.PipelineRun', 'runSource': None, 'runType': 'HTTP', 'azureml.parameters': '{}'}, 'inputDatasets': [], 'logFiles': {'logs/azureml/executionlogs.txt': 'https://firstexample7340056478.blob.core.windows.net/azureml/ExperimentRun/dcid.c53161a7-6585-4d45-95ee-f074e3e7e28b/logs/azureml/executionlogs.txt?sv=2018-11-09&sr=b&sig=bSO8KX2De%2BoWeGEOHMtaiMk%2FKQJ%2FjeqqlBP84etUEyc%3D&st=2019-10-17T17%3A45%3A37Z&se=2019-10-18T01%3A55%3A37Z&sp=r', 'logs/azureml/stderrlogs.txt': 'https://firstexample7340056478.blob.core.windows.net/azureml/ExperimentRun/dcid.c53161a7-6585-4d45-95ee-f074e3e7e28b/logs/azureml/stderrlogs.txt?sv=2018-11-09&sr=b&sig=Ly9oOGvLnYUjL5%2FsNmLhbSUuJ0Wa%2BifrhULfWFV5g9s%3D&st=2019-10-17T17%3A45%3A37Z&se=2019-10-18

'Finished'

### 2. Publish and trigger a pipeline

This Pipeline is executed once under an experiment. But for later use, you may want to Publish the Pipeline as an Endpoint. publish_pipeline method publishes a pipeline under Pipeline section of Workspace. The published pipeline can later be called from any where inside or outside of Azure.

One of the use-cases is to call the Pipeline within a Data Engineering Pipeline. So the Data Engineering team can trigger the piblished pipeline by having the URI information of the pipeline.

In [91]:
published_pipeline = pipeline_run.publish_pipeline(name="MNIST-Pipeline-Manually-Built", 
                                                   description="Steps are: data preparation, training, model validation and model registration", 
                                                   version="0.1", 
                                                   continue_on_step_failure=False)

In [92]:
from azureml.pipeline.core import PublishedPipeline

pipeline_id = published_pipeline.id # use your published pipeline id
published_pipeline = PublishedPipeline.get(ws, pipeline_id)
published_pipeline


Name,Id,Status,Endpoint
MNIST-Pipeline-Manually-Built,cb8ff274-d418-4e4d-b39b-94508a3efaf8,Active,REST Endpoint


Here is the endpoint that is callable:

In [93]:
rest_endpoint = published_pipeline.endpoint
rest_endpoint

'https://westus2.aether.ms/api/v1.0/subscriptions/ed70929a-e125-4daf-8945-04f709d2c75e/resourceGroups/MLOpsWorkshop/providers/Microsoft.MachineLearningServices/workspaces/FirstExample/PipelineRuns/PipelineSubmit/cb8ff274-d418-4e4d-b39b-94508a3efaf8'

Now you can call the Endpoint from anywhere. In order to call the endpoint, you need to authenticate yourself. There are two ways to do that:
    
    1. InteractiveLoginAuthentication
    1. ServicePrincipalAuthentication
    
The first one requires you to authentical yourself interactively or be already authenticated. As we're already authenticated, so we can use the first approach. The second approach id described in the next section.

The InteractiveLoginAuthentication class can help us generate the authentication key required to connect with the ML Pipeline Endpoint using **get_authentication_header** method:

In [94]:
from azureml.core.authentication import InteractiveLoginAuthentication
import requests

auth = InteractiveLoginAuthentication()

In [96]:
aad_token = auth.get_authentication_header()
aad_token

{'Authorization': 'Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiIsIng1dCI6ImFQY3R3X29kdlJPb0VOZzNWb09sSWgydGlFcyIsImtpZCI6ImFQY3R3X29kdlJPb0VOZzNWb09sSWgydGlFcyJ9.eyJhdWQiOiJodHRwczovL21hbmFnZW1lbnQuY29yZS53aW5kb3dzLm5ldC8iLCJpc3MiOiJodHRwczovL3N0cy53aW5kb3dzLm5ldC81ZWNkMzA2OS1jMmMyLTQzZjUtOGZhOS05YzIyYjVhMzdkYTMvIiwiaWF0IjoxNTcxMzMxNzY4LCJuYmYiOjE1NzEzMzE3NjgsImV4cCI6MTU3MTMzNTY2OCwiYWNyIjoiMSIsImFpbyI6IkFVUUF1LzhOQUFBQUd2eGxNTVg3Z05adWkzeFducmZlQmdvZVB3Y3hlR0tBVy82TFBJVzg4WXJSTzdySGlZVDJVZW9UeElDZUtLU0Y4bERiSG55K2VMVVhCZys4b0N4Ynd3PT0iLCJhbHRzZWNpZCI6IjE6bGl2ZS5jb206MDAwMzAwMDAyMzMwRDg0OSIsImFtciI6WyJwd2QiXSwiYXBwaWQiOiIwNGIwNzc5NS04ZGRiLTQ2MWEtYmJlZS0wMmY5ZTFiZjdiNDYiLCJhcHBpZGFjciI6IjAiLCJlbWFpbCI6ImF6dXJlLmZyZWUuY3JlZGl0QGdtYWlsLmNvbSIsImZhbWlseV9uYW1lIjoiQ3JlZGl0IiwiZ2l2ZW5fbmFtZSI6IkZyZWUgQXp1cmUiLCJncm91cHMiOlsiM2YzNDIyNTktZDM0ZS00NTg0LThkNTktZGJjNzNkMjJlZDFiIl0sImlkcCI6ImxpdmUuY29tIiwiaXBhZGRyIjoiNjkuMTY1LjE2MS42MiIsIm5hbWUiOiJGcmVlIEF6dXJlIENyZWRpdCIsIm9pZCI6ImY0ZTU3NjMwLTJhNmMtN

Finally using a simple post request, you can trigger the pipeline. Post request are available in any modern programming language, therefore, you can trigger the pipeline from anywhere.

In [97]:
# specify the param when running the pipeline
response = requests.post(rest_endpoint, 
                         headers=aad_token, 
                         json={"ExperimentName": "Kicked_MNist_Pipeline_Remotely",
                               "RunSource": "SDK"})

In [98]:
run_id = response.json()["Id"]

print(run_id)

61f5bf5b-17ac-474e-8d8c-00532f32a41f


In [108]:
response.json()

{'Description': None,
 'Status': {'StatusCode': 0,
  'StatusDetail': None,
  'CreationTime': '2019-10-17T18:01:32.7895765Z',
  'EndTime': None},
 'GraphId': 'f5d8a1b2-f02b-4b23-bbb3-b6fb93147ff5',
 'IsSubmitted': False,
 'HasErrors': False,
 'UploadState': 0,
 'ParameterAssignments': {},
 'DataSetDefinitionValueAssignment': None,
 'RunHistoryExperimentName': 'Kicked_MNist_Pipeline_Remotely',
 'PipelineId': 'cb8ff274-d418-4e4d-b39b-94508a3efaf8',
 'RunSource': 'SDK',
 'RunType': 0,
 'TotalRunSteps': 3,
 'ScheduleId': None,
 'tags': {},
 'Properties': {},
 'CreatedBy': {'UserObjectId': 'f4e57630-2a6c-465a-ad6e-e41b89d072a1',
  'UserTenantId': '5ecd3069-c2c2-43f5-8fa9-9c22b5a37da3',
  'UserName': 'Free Azure Credit'},
 'EntityStatus': 0,
 'Id': '61f5bf5b-17ac-474e-8d8c-00532f32a41f',
 'Etag': '"f200dfd8-0000-0800-0000-5da8ac7c0000"',
 'CreatedDate': '1970-01-01T00:00:00',
 'LastModifiedDate': '1970-01-01T00:00:00'}

In [106]:
# Retrieving the Pipeline Run:

from azureml.pipeline.core import PipelineRun

exp = Experiment(name="Kicked_MNist_Pipeline_Remotely", workspace=ws)
pipeline_run = PipelineRun(experiment=exp, run_id=run_id)

### 3. Schedule the Pipeline

Another way of using a Pipeline is to schedule it. In the example below, the pipeline is scheduled to be triggered weekly on Fridays at 15:30 UTC:

In [113]:
# # from azureml.pipeline.core import Schedule, ScheduleRecurrence
# 
# 
# recurrence = ScheduleRecurrence(frequency="Week", interval=1, week_days=["Friday"], time_of_day="15:30")
# schedule = Schedule.create(ws, name="ScheduledPipeline", pipeline_id=pipeline_id,
#                               experiment_name="MNIST-Pipeline-Wekly-Scheduled", recurrence=recurrence)
# 
## Get the list of scheduled Pipelines
# Schedule.list(ws)

In [116]:
! git add . && git commit -m "Pipeline published and scheduled"

The file will have its original line endings in your working directory.
The file will have its original line endings in your working directory.
