In [48]:
pip install --disable-pip-version-check --extra-index-url https://azuremlsdktestpypi.azureedge.net/azureml-contrib-k8s-preview/D58E86006C65 azureml-contrib-k8s

Looking in indexes: https://pypi.org/simple, https://azuremlsdktestpypi.azureedge.net/azureml-contrib-k8s-preview/D58E86006C65
Note: you may need to restart the kernel to use updated packages.


### Set up workspace

In [12]:
from azureml.core.workspace import Workspace

ws = Workspace.from_config()
print('Workspace name: ' + ws.name, 
      'Azure region: ' + ws.location, 
      'Subscription id: ' + ws.subscription_id, 
      'Resource group: ' + ws.resource_group, sep='\n')

Workspace name: akse-attach-ws1
Azure region: eastus2euap
Subscription id: 5abfd9c4-ec8c-4db9-acd4-c762dce93508
Resource group: akse-ws-rg


### Set up K8s target

In [13]:
from azureml.core.compute import ComputeTarget, AmlCompute
from azureml.core.compute_target import ComputeTargetException
# from azureml.contrib.core.compute.arckubernetescompute import ArcKubernetesCompute
from azureml.contrib.core.compute.cmakscompute import CmAksCompute

# choose a name for your Kubernetes compute
compute_name = 'aks-profile-test'
compute_target = ComputeTarget(workspace=ws, name=compute_name)

In [14]:
compute_target

CmAksCompute(workspace=Workspace.create(name='akse-attach-ws1', subscription_id='5abfd9c4-ec8c-4db9-acd4-c762dce93508', resource_group='akse-ws-rg'), name=aks-profile-test, id=/subscriptions/5abfd9c4-ec8c-4db9-acd4-c762dce93508/resourceGroups/akse-ws-rg/providers/Microsoft.MachineLearningServices/workspaces/akse-attach-ws1/computes/aks-profile-test, type=CmAks, provisioning_state=Succeeded, location=eastus2euap, tags=None)

### Azure Machine Learning Imports
In this first code cell, we import key Azure Machine Learning modules that we will use below.

In [25]:
import os
import azureml.core
from azureml.core import Workspace, Experiment, Datastore
from azureml.widgets import RunDetails

# Check core SDK version number
print("SDK version:", azureml.core.VERSION)

SDK version: 1.18.0


### Pipeline-specific SDK imports
Here, we import key pipeline modules, whose use will be illustrated in the examples below

In [26]:
from azureml.pipeline.core import Pipeline
from azureml.pipeline.steps import PythonScriptStep

print("Pipeline SDK-specific imports completed")

Pipeline SDK-specific imports completed


In [27]:
# Default datastore
def_blob_store = ws.get_default_datastore() 
# The following call GETS the Azure Blob Store associated with your workspace.
# Note that workspaceblobstore is **the name of this store and CANNOT BE CHANGED and must be used as is** 
def_blob_store = Datastore(ws, "workspaceblobstore")
print("Blobstore's name: {}".format(def_blob_store.name))

akse-attach-ws1
akse-ws-rg
eastus2euap
5abfd9c4-ec8c-4db9-acd4-c762dce93508
Blobstore's name: workspaceblobstore



### Upload data to default datastore
Default datastore on workspace is the Azure File storage. The workspace has a Blob storage associated with it as well. Let's upload a file to each of these storages.

In [29]:
# get_default_datastore() gets the default Azure Blob Store associated with your workspace.
# Here we are reusing the def_blob_store object we obtained earlier
def_blob_store.upload_files(["./20news.pkl"], target_path="20newsgroups", overwrite=True)
print("Upload call completed")

Uploading an estimated of 1 files
Uploading ./20news.pkl
Uploaded ./20news.pkl, 1 files out of an estimated total of 1
Uploaded 1 files
Upload call completed


## Creating a Step in a Pipeline 
A **PythonScriptStep** is a basic, built-in step to run a Python Script on a compute target. It takes a script name and optionally other parameters like arguments for the script, compute target, inputs and outputs. If no compute target is specified, default compute target for the workspace is used. You can also use a [**RunConfiguration**](https://docs.microsoft.com/en-us/python/api/azureml-core/azureml.core.runconfiguration?view=azure-ml-py) to specify requirements for the PythonScriptStep, 
such as conda dependencies and docker image.
        
The best practice is to use separate folders for scripts and its dependent files for each step and specify that folder as the 
`source_directory` for the step. This helps reduce the size of the snapshot created for the step (only the specific folder is snapshotted). 
Since changes in any files in the `source_directory` would trigger a re-upload of the snapshot, this helps keep the reuse of the step 
when there are no changes in the `source_directory` of the step.

```python
# Available arguments for PythonScriptStep constructor
PythonScriptStep(
    script_name, 
    name=None, 
    arguments=None, 
    compute_target=None, 
    runconfig=None, 
    inputs=None, 
    outputs=None, 
    params=None, 
    source_directory=None, 
    allow_reuse=True, 
    version=None, 
    hash_paths=None)
```

In [33]:
# Uses default values for PythonScriptStep construct.

source_directory = './train'
print('Source directory for the step is {}.'.format(os.path.realpath(source_directory)))

step1 = PythonScriptStep(name="train_step",
                         script_name="train.py", 
                         compute_target="aks-profile-test", 
                         source_directory=source_directory,
                         allow_reuse=True)
print("Step1 created")

Source directory for the step is /mnt/batch/tasks/shared/LS_root/mounts/clusters/test-aks-pipeline/code/Users/adsingha/train.
Step1 created


## Running a few steps in parallel
Here we are looking at a simple scenario where we are running a few steps (all involving PythonScriptStep) in **parallel**. Running nodes in parallel is the default behavior for steps in a pipeline.

We already have one step defined earlier. Let's define few more steps. For step3, we are using customized conda-dependency, and job might fail when "azureml-defaults" (or other meta package) is not in pip-package list. We need to be aware if 
we are not using any of the meta packages (azureml-sdk, azureml-defaults, azureml-core), and we recommend installing "azureml-defaults"

In [35]:
# For this step, we use a different source_directory
source_directory = './compare'
print('Source directory for the step is {}.'.format(os.path.realpath(source_directory)))

# All steps use the same Azure Machine Learning compute target as well
step2 = PythonScriptStep(name="compare_step",
                         script_name="compare.py", 
                         compute_target="aks-profile-test", 
                         source_directory=source_directory)

# Use a RunConfiguration to specify some additional requirements for this step.
from azureml.core.runconfig import RunConfiguration
from azureml.core.conda_dependencies import CondaDependencies
from azureml.core.runconfig import DEFAULT_CPU_IMAGE

# create a new runconfig object
run_config = RunConfiguration()

# enable Docker 
run_config.environment.docker.enabled = True

# set Docker base image to the default CPU-based image
run_config.environment.docker.base_image = DEFAULT_CPU_IMAGE

# use conda_dependencies.yml to create a conda environment in the Docker image for execution
run_config.environment.python.user_managed_dependencies = False

# specify CondaDependencies obj
run_config.environment.python.conda_dependencies = CondaDependencies.create(conda_packages=['scikit-learn'])

# For this step, we use yet another source_directory
source_directory = './extract'
print('Source directory for the step is {}.'.format(os.path.realpath(source_directory)))

step3 = PythonScriptStep(name="extract_step",
                         script_name="extract.py", 
                         compute_target="aks-profile-test", 
                         source_directory=source_directory,
                         runconfig=run_config)

# list of steps to run
steps = [step1, step2, step3]
print("Step lists created")

Source directory for the step is /mnt/batch/tasks/shared/LS_root/mounts/clusters/test-aks-pipeline/code/Users/adsingha/compare.
Source directory for the step is /mnt/batch/tasks/shared/LS_root/mounts/clusters/test-aks-pipeline/code/Users/adsingha/extract.
Step lists created


### Build the pipeline,
Once we have the steps (or steps collection), 
we can build the 
[pipeline](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-core/azureml.pipeline.core.pipeline.pipeline?view=azure-ml-py). 
By deafult, all these steps will run in **parallel** once we submit the pipeline for run.
        
A pipeline is created with a list of steps and a workspace. Submit a pipeline using [submit](https://docs.microsoft.com/en-us/python/api/azureml-core/azureml.core.experiment(class)?view=azure-ml-py#submit-config--tags-none----kwargs-). When submit is called, a [PipelineRun](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-core/azureml.pipeline.core.pipelinerun?view=azure-ml-py) is created which in turn creates [StepRun](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-core/azureml.pipeline.core.steprun?view=azure-ml-py) 
objects for each step in the workflow.

```python
# Available arguments for Pipeline constructor
Pipeline(workspace, 
         steps, 
         description=None, 
         default_datastore_name=None, 
         default_source_directory=None, 
         resolve_closure=True, 
         _workflow_provider=None, 
         _service_endpoint=None)
```

In [36]:
pipeline1 = Pipeline(workspace=ws, steps=steps)
print ("Pipeline is built")

Pipeline is built



### Validate the pipeline
You have the option to [validate](!https://docs.microsoft.com/en-us/python/api/azureml-pipeline-core/azureml.pipeline.core.pipeline.pipeline?view=azure-ml-py#validate--) the pipeline prior to submitting for run. 
The platform runs validation steps such as checking for circular dependencies and parameter checks etc. even if you do not explicitly call validate method.

In [38]:
pipeline1.validate()
print("Pipeline validation complete")

Step train_step is ready to be created [1a09229d]
Step compare_step is ready to be created [3e836eb9]
Step extract_step is ready to be created [39708cfb]
Pipeline validation complete


### Submit the pipeline
[Submitting](!https://docs.microsoft.com/en-us/python/api/azureml-pipeline-core/azureml.pipeline.core.pipeline.pipeline?view=azure-ml-py#submit) the 
pipeline involves creating an [Experiment](!https://docs.microsoft.com/en-us/python/api/azureml-core/azureml.core.experiment?view=azure-ml-py) object and providing the built pipeline for submission.

```python
# Available arguments for submit method
submit(experiment_name, 
       pipeline_parameters=None, 
       continue_on_step_failure=False, 
       regenerate_outputs=False)
```

In [39]:
pipeline_run1 = Experiment(ws, 'Hello_World1').submit(pipeline1, regenerate_outputs=False)
print("Pipeline is submitted for execution")

Created step train_step [1a09229d][497b2e5c-2a0d-4bfb-9d26-dfcd587160f4], (This step will run and generate new outputs)
Created step compare_step [3e836eb9][2d7b2536-bca7-4205-b078-0d3d5c3b2d04], (This step will run and generate new outputs)
Created step extract_step [39708cfb][0f99b231-4a9b-4304-9ebc-b362dc708b78], (This step will run and generate new outputs)
Submitted PipelineRun cf0b94ad-fa3e-47d9-b8f6-5f2e068371eb
Link to Azure Machine Learning Portal: https://ml.azure.com/experiments/Hello_World1/runs/cf0b94ad-fa3e-47d9-b8f6-5f2e068371eb?wsid=/subscriptions/5abfd9c4-ec8c-4db9-acd4-c762dce93508/resourcegroups/akse-ws-rg/workspaces/akse-attach-ws1
Pipeline is submitted for execution


Note: If regenerate_outputs is set to True, a new submit will always force generation of all step outputs, and disallow data reuse for any step of this run. Once this run is complete, however, subsequent runs may reuse the results of this run.


### Examine the pipeline run
#### Use RunDetails Widget
We are going to use the RunDetails widget to examine the run of the pipeline. You can click each row below to get more details on the step runs

In [40]:
RunDetails(pipeline_run1).show()

_PipelineWidget(widget_settings={'childWidgetDisplay': 'popup', 'send_telemetry': False, 'log_level': 'INFO', …


#### Use Pipeline SDK objects
You can cycle through the node_run objects and examine job logs, stdout, and stderr of each of the steps.

In [42]:
step_runs = pipeline_run1.get_children()
for step_run in step_runs:
    status = step_run.get_status()
    print('Script:', step_run.name, 'status:', status)
    
    # Change this if you want to see details even if the Step has succeeded.
    if status == "Failed":
        joblog = step_run.get_job_log()
        print('job log:', joblog)

Script: extract_step status: Finished
Script: compare_step status: Finished
Script: train_step status: Finished


### Running a few steps in sequence

Now let's see how we run a few steps in sequence. We already have three steps defined earlier. Let's reuse those steps for this part.

We will reuse step1, step2, step3, but build the pipeline in such a way that we chain step3 after step2 and step2 after step1. Note that there is no explicit data dependency between these steps, but still steps can be made dependent by using the run_after construct.

In [43]:
step2.run_after(step1)
step3.run_after(step2)

# Try a loop
#step2.run_after(step3)

# Now, construct the pipeline using the steps.

# We can specify the "final step" in the chain, 
# Pipeline will take care of "transitive closure" and 
# figure out the implicit or explicit dependencies
# https://www.geeksforgeeks.org/transitive-closure-of-a-graph/
pipeline2 = Pipeline(workspace=ws, steps=[step3])
print ("Pipeline is built")

pipeline2.validate()
print("Simple validation complete")

Pipeline is built
Step extract_step is ready to be created [d371450d]
Step compare_step is ready to be created [308caa3f]
Simple validation complete


In [44]:
pipeline_run2 = Experiment(ws, 'Hello_World2').submit(pipeline2)
print("Pipeline is submitted for execution")

Created step extract_step [d371450d][b84c2455-a3d4-44ee-8eea-3a98abda78a2], (This step will run and generate new outputs)
Created step compare_step [308caa3f][667db079-02f1-4f81-b9b0-cd161d67e1bd], (This step will run and generate new outputs)
Created step train_step [b54f1823][497b2e5c-2a0d-4bfb-9d26-dfcd587160f4], (This step is eligible to reuse a previous run's output)
Submitted PipelineRun 5b536770-f44d-47e9-a732-38b99c42b904
Link to Azure Machine Learning Portal: https://ml.azure.com/experiments/Hello_World2/runs/5b536770-f44d-47e9-a732-38b99c42b904?wsid=/subscriptions/5abfd9c4-ec8c-4db9-acd4-c762dce93508/resourcegroups/akse-ws-rg/workspaces/akse-attach-ws1
Pipeline is submitted for execution


In [46]:
RunDetails(pipeline_run2).show()

_PipelineWidget(widget_settings={'childWidgetDisplay': 'popup', 'send_telemetry': False, 'log_level': 'INFO', …