Copyright (c) Microsoft Corporation. All rights reserved.  
Licensed under the MIT License.

# Azure Machine Learning Pipelines

## Overview

Read [Azure Machine Learning Pipelines](https://docs.microsoft.com/en-us/azure/machine-learning/service/concept-ml-pipelines) overview, or the [readme article](../README.md) on Azure Machine Learning Pipelines to get more information.
 

This Notebook shows basic construction of a **pipeline** that runs jobs unattended in different compute clusters. 

## Prerequisites and Azure Machine Learning Basics
Make sure you go through the configuration Notebook located at https://github.com/Azure/MachineLearningNotebooks first if you haven't. This sets you up with a working config file that has information on your workspace, subscription id, etc. 


### Azure Machine Learning Imports

In this first code cell, we import key Azure Machine Learning modules that we will use below. 

In [None]:
from datetime import datetime
import azureml.core
from azureml.core import Workspace, Run, Experiment, Datastore
from azureml.core.compute import AmlCompute
from azureml.core.compute import ComputeTarget
from azureml.core.compute import DataFactoryCompute
from azureml.widgets import RunDetails

# Check core SDK version number
print("SDK version:", azureml.core.VERSION)

### Pipeline-specific SDK imports

Here, we import key pipeline modules, whose use will be illustrated in the examples below.

In [None]:
from azureml.data.data_reference import DataReference
from azureml.pipeline.core import Pipeline, PipelineData, StepSequence
from azureml.pipeline.steps import PythonScriptStep
from azureml.pipeline.steps import DataTransferStep
from azureml.pipeline.core import PublishedPipeline
from azureml.pipeline.core.graph import PipelineParameter

print("Pipeline SDK-specific imports completed")

### Initialize Workspace

Initialize a [workspace](https://docs.microsoft.com/en-us/python/api/azureml-core/azureml.core.workspace(class%29) object from persisted configuration.

In [None]:
ws = Workspace.from_config()

print(ws.name, ws.resource_group, ws.location, ws.subscription_id, sep = '\n')

# Default datastore (Azure file storage)
def_file_store = ws.get_default_datastore() 
# The above call is equivalent to Datastore(ws, "workspacefilestore") or simply Datastore(ws)
print("Default datastore's name: {}".format(def_file_store.name))

# Blob storage associated with the workspace
# The following call GETS the Azure Blob Store associated with your workspace.
# Note that workspaceblobstore is **the name of this store and CANNOT BE CHANGED and must be used as is** 
def_blob_store = Datastore(ws, "workspaceblobstore")
print("Blobstore's name: {}".format(def_blob_store.name))

In [None]:
# project folder
project_folder = '.'
    
print('Sample projects will be created in {}.'.format(project_folder))

### Required data and script files for the the tutorial
Sample files required to finish this tutorial are already copied to the project folder specified above. Even though the .py provided in the samples don't have much "ML work," as a data scientist, you will work on this extensively as part of your work. To complete this tutorial, the contents of these files are not very important. The one-line files are for demostration purpose only.

### Datastore concepts
A [Datastore](https://docs.microsoft.com/en-us/python/api/azureml-core/azureml.core.datastore(class) is a place where data can be stored that is then made accessible to a compute either by means of mounting or copying the data to the compute target. 

A Datastore can either be backed by an Azure File Storage (default) or by an Azure Blob Storage.

In this next step, we will upload the training and test set into the workspace's default storage (File storage), and another piece of data to Azure Blob Storage. When to use [Azure Blobs](https://docs.microsoft.com/en-us/azure/storage/blobs/storage-blobs-introduction), [Azure Files](https://docs.microsoft.com/en-us/azure/storage/files/storage-files-introduction), or [Azure Disks](https://docs.microsoft.com/en-us/azure/virtual-machines/linux/managed-disks-overview) is [detailed here](https://docs.microsoft.com/en-us/azure/storage/common/storage-decide-blobs-files-disks).

**Please take good note of the concept of the datastore.**

#### Upload data to default datastore
Default datastore on workspace is the Azure  File storage. The workspace has a Blob storage associated with it as well. Let's upload a file to each of these storages.

In [None]:
# get_default_datastore() gets the default Azure File Store associated with your workspace.
# Here we are reusing the def_file_store object we obtained earlier

# Here we are reusing the def_blob_store we created earlier
def_blob_store.upload_files(["./20news.pkl"], target_path="pipeline", overwrite=True)

print("Upload calls completed")

#### (Optional) See your files using Azure Portal
Once you successfully uploaded the files, you can browse to them (or upload more files) using [Azure Portal](https://portal.azure.com). At the portal, make sure you have selected **AzureML Nursery** as your subscription (click *Resource Groups* and then select the subscription). Then look for your **Machine Learning Workspace** (it has your *alias* as the name). It has a link to your storage. Click on the storage link. It will take you to a page where you can see [Blobs](https://docs.microsoft.com/en-us/azure/storage/blobs/storage-blobs-introduction), [Files](https://docs.microsoft.com/en-us/azure/storage/files/storage-files-introduction), [Tables](https://docs.microsoft.com/en-us/azure/storage/tables/table-storage-overview), and [Queues](https://docs.microsoft.com/en-us/azure/storage/queues/storage-queues-introduction). We have just uploaded a file to the Blob storage and another one to the File storage. You should be able to see both of these files in their respective locations. 

### Compute Targets
A compute target specifies where to execute your program such as a remote Docker on a VM, or a cluster. A compute target needs to be addressable and accessible by you.

**You need at least one compute target to send your payload to. We are planning to use Azure Machine Learning Compute exclusively for this tutorial for all steps. However in some cases you may require multiple compute targets as some steps may run in one compute target like Azure Machine Learning Compute, and some other steps in the same pipeline could run in a different compute target.**

*The example belows show creating/retrieving/attaching to an Azure Machine Learning Compute instance.*

#### List of Compute Targets on the workspace

In [None]:
cts = ws.compute_targets
for ct in cts:
    print(ct)

#### Retrieve or create a Azure Machine Learning compute
Azure Machine Learning Compute is a service for provisioning and managing clusters of Azure virtual machines for running machine learning workloads. Let's create a new Azure Machine Learning Compute in the current workspace, if it doesn't already exist. We will then run the training script on this compute target.

If we could not find the compute with the given name in the previous cell, then we will create a new compute here. We will create an Azure Machine Learning Compute containing **STANDARD_D2_V2 CPU VMs**. This process is broken down into the following steps:

1. Create the configuration
2. Create the Azure Machine Learning compute

**This process will take about 3 minutes and is providing only sparse output in the process. Please make sure to wait until the call returns before moving to the next cell.**

In [None]:
aml_compute_target = "p100cluster"
try:
    aml_compute = AmlCompute(ws, aml_compute_target)
    print("found existing compute target.")
except:
    print("creating new compute target")
    
    provisioning_config = AmlCompute.provisioning_configuration(vm_size = "STANDARD_D2_V2",
                                                                min_nodes = 1, 
                                                                max_nodes = 4)    
    aml_compute = ComputeTarget.create(ws, aml_compute_target, provisioning_config)
    aml_compute.wait_for_completion(show_output=True, min_node_count=None, timeout_in_minutes=20)
    
print("Azure Machine Learning Compute attached")


In [None]:
# For a more detailed view of current Azure Machine Learning Compute status, use the 'status' property
# example: un-comment the following line.
aml_compute.status.serialize()

**Wait for this call to finish before proceeding (you will see the asterisk turning to a number).**

Now that you have created the compute target, let's see what the workspace's compute_targets() function returns. You should now see one entry named 'amlcompute' of type AmlCompute.

**Now that we have completed learning the basics of Azure Machine Learning (AML), let's go ahead and start understanding the Pipeline concepts.**

### Creating a Step in a Pipeline
A Step is a unit of execution. Step typically needs a target of execution (compute target), a script to execute, and may require script arguments and inputs, and can produce outputs. The step also could take a number of other parameters. Azure Machine Learning Pipelines provides the following built-in Steps:

- [**PythonScriptStep**](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-steps/azureml.pipeline.steps.python_script_step.pythonscriptstep?view=azure-ml-py): Add a step to run a Python script in a Pipeline.
- [**AdlaStep**](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-steps/azureml.pipeline.steps.adla_step.adlastep?view=azure-ml-py): Adds a step to run U-SQL script using Azure Data Lake Analytics.
- [**DataTransferStep**](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-steps/azureml.pipeline.steps.data_transfer_step.datatransferstep?view=azure-ml-py): Transfers data between Azure Blob and Data Lake accounts.
- [**DatabricksStep**](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-steps/azureml.pipeline.steps.databricks_step.databricksstep?view=azure-ml-py): Adds a DataBricks notebook as a step in a Pipeline.
- [**HyperDriveStep**](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-steps/azureml.pipeline.steps.hyper_drive_step.hyperdrivestep?view=azure-ml-py): Creates a Hyper Drive step for Hyper Parameter Tuning in a Pipeline.
- [**EstimatorStep**](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-steps/azureml.pipeline.steps.estimator_step.estimatorstep?view=azure-ml-py): Adds a step to run Estimator in a Pipeline.
- [**MpiStep**](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-steps/azureml.pipeline.steps.mpi_step.mpistep?view=azure-ml-py): Adds a step to run a MPI job in a Pipeline.
- [**HyperDriveStep**](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-steps/azureml.pipeline.steps.hyper_drive_step.hyperdrivestep?view=azure-ml-py): Creates a HyperDrive step in a Pipeline.

The following code will create a PythonScriptStep to be executed in the Azure Machine Learning Compute we created above using train.py, one of the files already made available in the project folder.

A **PythonScriptStep** is a basic, built-in step to run a Python Script on a compute target. It takes a script name and optionally other parameters like arguments for the script, compute target, inputs and outputs. If no compute target is specified, default compute target for the workspace is used.

In [None]:
# Uses default values for PythonScriptStep construct.

# Syntax
# PythonScriptStep(
#     script_name, 
#     name=None, 
#     arguments=None, 
#     compute_target=None, 
#     runconfig=None, 
#     inputs=None, 
#     outputs=None, 
#     params=None, 
#     source_directory=None, 
#     allow_reuse=True, 
#     version=None, 
#     hash_paths=None)
# This returns a Step
trainStep = PythonScriptStep(name="train_step",
                         script_name="train.py", 
                         compute_target=aml_compute, 
                         source_directory=project_folder,
                         allow_reuse=True)
print("trainStep created")

**Note:** In the above call to PythonScriptStep(), the flag *allow_reuse* determines whether the step should reuse previous results when run with the same settings/inputs. This flag's default value is *True*; the default is set to *True* because, when inputs and parameters have not changed, we typically do not want to re-run a given pipeline step. 

If *allow_reuse* is set to *False*, a new run will always be generated for this step during pipeline execution. The *allow_reuse* flag can come in handy in situations where you do *not* want to re-run a pipeline step.

## Running a few steps in parallel
Here we are looking at a simple scenario where we are running a few steps (all involving PythonScriptStep)  in parallel. Running nodes in **parallel** is the default behavior for steps in a pipeline.

We already have one step defined earlier. Let's define few more steps.

In [None]:
# All steps use files already available in the project_folder
# All steps use the same Azure Machine Learning compute target as well
compareStep = PythonScriptStep(name="compare_step",
                         script_name="compare.py", 
                         compute_target=aml_compute, 
                         source_directory=project_folder)

extractStep = PythonScriptStep(name="extract_step",
                         script_name="extract.py", 
                         compute_target=aml_compute, 
                         source_directory=project_folder)

# list of steps to run
steps = [trainStep, compareStep, extractStep]
print("Step lists created")

### Build the pipeline
Once we have the steps (or steps collection), we can build the [pipeline](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-core/azureml.pipeline.core.pipeline.pipeline?view=azure-ml-py). By deafult, all these steps will run in **parallel** once we submit the pipeline for run.

A pipeline is created with a list of steps and a workspace. Submit a pipeline using [submit](https://docs.microsoft.com/en-us/python/api/azureml-core/azureml.core.experiment%28class%29?view=azure-ml-py#submit). When submit is called, a [PipelineRun](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-core/azureml.pipeline.core.pipelinerun?view=azure-ml-py) is created which in turn creates [StepRun](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-core/azureml.pipeline.core.steprun?view=azure-ml-py) objects for each step in the workflow.

In [None]:
# Syntax
# Pipeline(workspace, 
#          steps, 
#          description=None, 
#          default_datastore_name=None, 
#          default_source_directory=None, 
#          resolve_closure=True, 
#          _workflow_provider=None, 
#          _service_endpoint=None)

pipeline1 = Pipeline(workspace=ws, steps=steps)
print ("Pipeline is built")

### Validate the pipeline
You have the option to [validate](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-core/azureml.pipeline.core.pipeline.pipeline?view=azure-ml-py#validate) the pipeline prior to submitting for run. The platform runs validation steps such as checking for circular dependencies and parameter checks etc. even if you do not explicitly call validate method.

In [None]:
pipeline1.validate()
print("Pipeline validation complete")

### Submit the pipeline
[Submitting](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-core/azureml.pipeline.core.pipeline.pipeline?view=azure-ml-py#submit) the pipeline involves creating an [Experiment](https://docs.microsoft.com/en-us/python/api/azureml-core/azureml.core.experiment?view=azure-ml-py) object and providing the built pipeline for submission. 

In [None]:
date_object = datetime.now()
time_format = date_object.strftime('%b%d_%H_%M_')
exp_name = time_format + "Hello_World1"
# Submit syntax
# submit(experiment_name, 
#        pipeline_parameters=None, 
#        continue_on_node_failure=False, 
#        regenerate_outputs=False)

pipeline_run1 = Experiment(ws, exp_name).submit(pipeline1, regenerate_outputs=False)
print("Pipeline is submitted for execution")

**Note:** If regenerate_outputs is set to True, a new submit will always force generation of all step outputs, and disallow data reuse for any step of this run. Once this run is complete, however, subsequent runs may reuse the results of this run.

### Examine the pipeline run

#### Use RunDetails Widget
We are going to use the RunDetails widget to examine the run of the pipeline. You can click each row below to get more details on the step runs.

In [None]:
RunDetails(pipeline_run1).show()

#### Use Pipeline SDK objects
You can cycle through the node_run objects and examine job logs, stdout, and stderr of each of the steps.

In [None]:
step_runs = pipeline_run1.get_children()
for step_run in step_runs:
    status = step_run.get_status()
    print('Script:', step_run.name, 'status:', status)
    
    # Change this if you want to see details even if the Step has succeeded.
    if status == "Failed":
        joblog = step_run.get_job_log()
        print('job log:', joblog)

#### Get additonal run details
If you wait until the pipeline_run is finished, you may be able to get additional details on the run. **Since this is a blocking call, the following code is commented out.**

In [None]:
#pipeline_run1.wait_for_completion()
#for step_run in pipeline_run1.get_children():
#    print("{}: {}".format(step_run.name, step_run.get_metrics()))

## Running a few steps in sequence
Now let's see how we run a few steps in sequence. We already have three steps defined earlier. Let's *reuse* those steps for this part.

We will reuse step1, step2, step3, but build the pipeline in such a way that we chain step3 after step2 and step2 after step1. Note that there is no explicit data dependency between these steps, but still steps can be made dependent by using the [run_after](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-core/azureml.pipeline.core.builder.pipelinestep?view=azure-ml-py#run-after) construct.

In [None]:
compareStep.run_after(trainStep)
extractStep.run_after(compareStep)

pipeline2 = Pipeline(workspace=ws, steps=[extractStep])
print ("Pipeline is built")

pipeline2.validate()
print("Simple validation complete")

In [None]:
date_object = datetime.now()
time_format = date_object.strftime('%b%d_%H_%M_')
exp_name = time_format + "Hello_World2"

pipeline_run2 = Experiment(ws, exp_name).submit(pipeline2)
print("Pipeline is submitted for execution")

In [None]:
RunDetails(pipeline_run2).show()

## Building Pipeline Steps with Inputs and Outputs
As mentioned earlier, a step in the pipeline can take data as input. This data can be a data source that lives in one of the accessible data locations, or intermediate data produced by a previous step in the pipeline.

### Datasources
Datasource is represented by **[DataReference](https://docs.microsoft.com/en-us/python/api/azureml-core/azureml.data.data_reference.datareference?view=azure-ml-py)** object and points to data that lives in or is accessible from Datastore. DataReference could be a pointer to a file or a directory.

In [None]:
# Reference the data uploaded to blob storage using DataReference
# Assign the datasource to blob_input_data variable

# DataReference(datastore, 
#               data_reference_name=None, 
#               path_on_datastore=None, 
#               mode='mount', 
#               path_on_compute=None, 
#               overwrite=False)

blob_input_data = DataReference(
    datastore=def_blob_store,
    data_reference_name="test_data",
    path_on_datastore="20newsgroups/20news.pkl")
print("DataReference object created")

### Intermediate/Output Data
Intermediate data (or output of a Step) is represented by **[PipelineData](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-core/azureml.pipeline.core.pipelinedata?view=azure-ml-py)** object. PipelineData can be produced by one step and consumed in another step by providing the PipelineData object as an output of one step and the input of one or more steps.

#### Constructing PipelineData
- **name:** [*Required*] Name of the data item within the pipeline graph
- **datastore_name:** Name of the Datastore to write this output to
- **output_name:** Name of the output
- **output_mode:** Specifies "upload" or "mount" modes for producing output (default: mount)
- **output_path_on_compute:** For "upload" mode, the path to which the module writes this output during execution
- **output_overwrite:** Flag to overwrite pre-existing data

In [None]:
# Define intermediate data using PipelineData
# Syntax

# PipelineData(name, 
#              datastore=None, 
#              output_name=None, 
#              output_mode='mount', 
#              output_path_on_compute=None, 
#              output_overwrite=None, 
#              data_type=None, 
#              is_directory=None)

# Naming the intermediate data as processed_data1 and assigning it to the variable processed_data1.
processed_data1 = PipelineData("processed_data1",datastore=def_blob_store)
print("PipelineData object created")

### Pipelines steps using datasources and intermediate data
Machine learning pipelines can have many steps and these steps could use or reuse datasources and intermediate data. Here's how we construct such a pipeline:

#### Define a Step that consumes a datasource and produces intermediate data.
In this step, we define a step that consumes a datasource and produces intermediate data.

**Open `train.py` in the local machine and examine the arguments, inputs, and outputs for the script. That will give you a good sense of why the script argument names used below are important.** 

In [None]:
# trainStep consumes the datasource (Datareference) in the previous step
# and produces processed_data1
trainStep = PythonScriptStep(
    script_name="train.py", 
    arguments=["--input_data", blob_input_data, "--output_train", processed_data1],
    inputs=[blob_input_data],
    outputs=[processed_data1],
    compute_target=aml_compute, 
    source_directory=project_folder
)
print("trainStep created")

#### Define a Step that consumes intermediate data and produces intermediate data
In this step, we define a step that consumes an intermediate data and produces intermediate data.

**Open `extract.py` in the local machine and examine the arguments, inputs, and outputs for the script. That will give you a good sense of why the script argument names used below are important.**

In [None]:
# extractStep to use the intermediate data produced by step4
# This step also produces an output processed_data2
processed_data2 = PipelineData("processed_data2", datastore=def_blob_store)

extractStep = PythonScriptStep(
    script_name="extract.py",
    arguments=["--input_extract", processed_data1, "--output_extract", processed_data2],
    inputs=[processed_data1],
    outputs=[processed_data2],
    compute_target=aml_compute, 
    source_directory=project_folder)
print("extractStep created")

#### Define a Step that consumes multiple intermediate data and produces intermediate data
In this step, we define a step that consumes multiple intermediate data and produces intermediate data.

**Open `compare.py` in the local machine and examine the arguments, inputs, and outputs for the script. That will give you a good sense of why the script argument names used below are important.**


This step also has a [PipelineParameter](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-core/azureml.pipeline.core.graph.pipelineparameter?view=azure-ml-py) argument that help with calling the REST endpoint of the published pipeline.

In [None]:
# We will use this later in publishing pipeline
pipeline_param = PipelineParameter(name="pipeline_arg", default_value=10)
print("pipeline parameter created")

In [None]:
# Now define compareStep that takes two inputs (both intermediate data), and a pipeline parameter and produce an output
processed_data3 = PipelineData("processed_data3", datastore=def_blob_store)

compareStep = PythonScriptStep(
    script_name="compare.py",
    arguments=["--compare_data1", processed_data1, "--compare_data2", processed_data2, "--output_compare", processed_data3, "--pipeline_param", pipeline_param],
    inputs=[processed_data1, processed_data2],
    outputs=[processed_data3],    
    compute_target=aml_compute, 
    source_directory=project_folder)
print("compareStep created")

In [None]:
pipeline1 = Pipeline(workspace=ws, steps=[compareStep])
print ("Pipeline is built")

pipeline1.validate()
print("Simple validation complete") 

date_object = datetime.now()
time_format = date_object.strftime('%b%d_%H_%M_')
exp_name = time_format + "Data_dependency"

pipeline_run1 = Experiment(ws, exp_name).submit(pipeline1)
print("Pipeline is submitted for execution")

In [None]:
RunDetails(pipeline_run1).show()

## Publish the pipeline

In [None]:
date_object = datetime.now()
time_format = date_object.strftime('%b%d_%H_%M_')
pipeline_name = time_format + "Data_dependency"

published_pipeline1 = pipeline1.publish(name=pipeline_name, description="My_Published_Pipeline_Description")
print(published_pipeline1.id)

### Run published pipeline using its REST endpoint

In [None]:
from azureml.core.authentication import AzureCliAuthentication
import requests

cli_auth = AzureCliAuthentication()
aad_token = cli_auth.get_authentication_header()

rest_endpoint1 = published_pipeline1.endpoint

print(rest_endpoint1)

date_object = datetime.now()
time_format = date_object.strftime('%b%d_%H_%M_')
exp_name = time_format + "My_Pipeline"


# specify the param when running the pipeline
response = requests.post(rest_endpoint1, 
                         headers=aad_token, 
                         json={"ExperimentName": exp_name,
                               "RunSource": "SDK",
                               "ParameterAssignments": {"pipeline_arg": 55}})
run_id = response.json()["Id"]

print(run_id)

## Data Transfer
In certain cases, you will need to transfer data from one data location to another. For example, your data may be in Files storage and you may want to move it to Blob storage. Or, if your data is in an ADLS account and you want to make it available in the Blob storage. The built-in **DataTransferStep** class helps you transfer data in these situations.

### Register Datastores

In the code cell below, you will need to fill in the appropriate values for the workspace name, datastore name, subscription id, resource group, store name, tenant id, client id, and client secret that are associated with your ADLS datastore. 

For background on registering your data store, consult this article:

https://docs.microsoft.com/en-us/azure/data-lake-store/data-lake-store-service-to-service-authenticate-using-active-directory

To obtain the keys for the ADLS account, please go to [this oneNote](https://microsoft.sharepoint.com/teams/azuremlnursery/_layouts/15/WopiFrame.aspx?sourcedoc={4c394733-8bc8-4a24-9ba6-5200de206450}&wd=target%28Workshop.one%7C265d85d5-44c8-9d40-b556-a31fa098e708%2FPipeline%20Datatransfer%7C3b63da03-f100-4843-96bf-c9c4805ada5b%2F%29&wdorigin=703) and replace the following cell with it's content.

#### Source of the data copy (ADLS)

In [None]:
## Need to register ADLS, get details from the shared OneNote.

adl_datastore_name='MigratedADLS'

# These are the details you get from your migrated ADLS account.
subscription_id=  # subscription id of ADLS account
resource_group=  # resource group of ADLS account
store_name=  # ADLS account name
tenant_id=  # tenant id of service principal
client_id=  # client id of service principal
client_secret=  # the secret of service principal

In [None]:
try:
    adls_datastore = Datastore.get(ws, adl_datastore_name)
    print("found datastore with name: %s" % adl_datastore_name)
except:
    adls_datastore = Datastore.register_azure_data_lake(
        workspace=ws,
        datastore_name=adl_datastore_name,
        subscription_id=subscription_id, # subscription id of ADLS account
        resource_group=resource_group, # resource group of ADLS account
        store_name=store_name, # ADLS account name
        tenant_id=tenant_id, # tenant id of service principal
        client_id=client_id, # client id of service principal
        client_secret=client_secret) # the secret of service principal
    print("registered datastore with name: %s" % adl_datastore_name)

In [None]:
adls_data = DataReference(
    datastore=adls_datastore,
    data_reference_name="adlsdata",
    path_on_datastore="local/AMLTest/input.tsv")

#### Destination of the data copy (Blob)

In [None]:
blob_data = DataReference(datastore=def_blob_store,
                       path_on_datastore="pipeline",
                       data_reference_name="blobdata")

### Copy data using DataTransferStep

In [None]:
from azureml.core.compute import DataFactoryCompute
from azureml.pipeline.steps import DataTransferStep

data_factory_name = 'adftest'

try:
    data_factory_compute = DataFactoryCompute(ws, data_factory_name)
    print("Found existing data factory compute.")
except:
    print("Creating new data factory compute")
    
    provisioning_config = DataFactoryCompute.provisioning_configuration()
    data_factory_compute = ComputeTarget.create(ws, "adftest", provisioning_config)
    data_factory_compute.wait_for_completion(show_output=True)

print("Azure Machine Learning Compute attached")

transfer_adls_to_blob = DataTransferStep(
    name="Copy_from_ADLS_to_Blob",
    source_data_reference=input_data,
    destination_data_reference=blob_destination,
    source_reference_type='file',
    compute_target=data_factory_compute)
print("data transfer step created")

In [None]:
pipeline3 = Pipeline(
    description="Data Transfer",
    workspace=ws, 
    steps=[transfer_adls_to_blob])

In [None]:
date_object = datetime.now()
time_format = date_object.strftime('%b%d_%H_%M_')
exp_name = time_format + "Data_Transfer"

pipeline_run = Experiment(ws, exp_name).submit(pipeline3)
from azureml.widgets import RunDetails
RunDetails(pipeline_run).show()