Copyright (c) Microsoft Corporation. All rights reserved.  
Licensed under the MIT License.

![Impressions](https://PixelServer20190423114238.azurewebsites.net/api/impressions/NotebookVM/how-to-use-azureml/machine-learning-pipelines/intro-to-pipelines/aml-pipelines-with-data-dependency-steps.png)

# Azure Machine Learning Pipelines with Data Dependency
In this notebook, we will see how we can build a pipeline with implicit data dependency.

### Azure Machine Learning and Pipeline SDK-specific Imports

In [40]:
import azureml.core
import azureml.dataprep
from azureml.core import Workspace, Experiment, Datastore, Dataset
from azureml.core.compute import AmlCompute
from azureml.core.compute import ComputeTarget
from azureml.widgets import RunDetails
from azureml.data import TabularDataset
from azureml.data.data_reference import DataReference
from azureml.pipeline.core import Pipeline, PipelineData
from azureml.pipeline.steps import PythonScriptStep

# Check core SDK version number
print("SDK version:", azureml.core.VERSION)

SDK version: 1.0.83


### Initialize Workspace and Retrieve a Compute Target

In [41]:
ws = Workspace.from_config()
print("== Workspace:")
print(ws.name, ws.resource_group, ws.location, ws.subscription_id, sep = '\n')

# Default datastore (Azure blob storage)
# def_blob_store = ws.get_default_datastore()
blob_store = Datastore(ws, "workspaceblobstore")
print("== Datastore: {}".format(blob_store.name))

# list compute targets
print("== Compute targets:")
for ct in ws.compute_targets:
    print("  " + ct)
    
# Retrieve a compute target    
from azureml.core.compute_target import ComputeTargetException
aml_compute_target = "agd-training-cpu"
try:
    aml_compute = AmlCompute(ws, aml_compute_target)
    print("== AML compute target attached: " + aml_compute_target)
except ComputeTargetException:
    print("== AML compute target not found: " + aml_compute_target)

== Workspace:
agd-mlws
azure-ml-workshop
westus2
c5ec24ce-9c5f-4da2-bf12-9ca8e9758d60
== Datastore: workspaceblobstore
== Compute targets:
  agd-inference
  agd-inference-v
  agd-training-gpu
  agd-training-cpu
== AML compute target attached: agd-training-cpu


### Create Compute Configuration

This step uses a docker image, use a [**RunConfiguration**](https://docs.microsoft.com/en-us/python/api/azureml-core/azureml.core.runconfiguration?view=azure-ml-py) to specify these requirements and use when creating the PythonScriptStep. 

In [42]:
from azureml.core.runconfig import RunConfiguration
from azureml.core.runconfig import DEFAULT_CPU_IMAGE
from azureml.core.conda_dependencies import CondaDependencies

# create a new runconfig object
run_config = RunConfiguration()

# enable Docker 
run_config.environment.docker.enabled = True

# set Docker base image to the default CPU-based image
run_config.environment.docker.base_image = DEFAULT_CPU_IMAGE

# use conda_dependencies.yml to create a conda environment in the Docker image for execution
run_config.environment.python.user_managed_dependencies = False

# specify dependencies
#run_config.environment.python.conda_dependencies = CondaDependencies.create(
#    conda_packages=['pandas'],
#    pip_packages=['azureml-sdk', 'azureml-dataprep[fuse,pandas]', 'azureml-train-automl'], 
#    pin_sdk_version=False)
run_config.environment.python.conda_dependencies = CondaDependencies(
    conda_dependencies_file_path='data-prep-pipeline.yml')

#
print("== Run Configuration created")

== Run Configuration created


## Building Pipeline Steps with Inputs and Outputs
As mentioned earlier, a step in the pipeline can take data as input. This data can be a data source that lives in one of the accessible data locations, or intermediate data produced by a previous step in the pipeline.

### Datasources as Datasets

In [43]:
# hourly time series
h_time_series_1_ds = Dataset.get_by_name(ws,"h_time_series_1")
h_time_series_2_ds = Dataset.get_by_name(ws,"h_time_series_2")
h_time_series_3_ds = Dataset.get_by_name(ws,"h_time_series_3")
# daily time series
d_time_series_1_dr = DataReference(datastore=blob_store,
                                   data_reference_name="d_time_series_1",
                                   path_on_datastore="datasets/time-series/X1.csv")
d_time_series_2_dr = DataReference(datastore=blob_store,
                                   data_reference_name="d_time_series_2",
                                   path_on_datastore="datasets/time-series/X2.csv")

print("== Datasets metadata retrieved")

== Datasets metadata retrieved


### Intermediate/Output Data
Intermediate data (or output of a Step) is represented by **[PipelineData](https://docs.microsoft.com/en-us/python/api/azureml-pipeline-core/azureml.pipeline.core.pipelinedata?view=azure-ml-py)** object. PipelineData can be produced by one step and consumed in another step by providing the PipelineData object as an output of one step and the input of one or more steps.

In [44]:
# Define intermediate data using PipelineData
# hourly series that need to have timestampts generated from columns and pivoted if necessary
h_time_series_1_pivot_pd = PipelineData("h_time_series_1_pivot",datastore=blob_store)
h_time_series_2_pivot_pd = PipelineData("h_time_series_2_pivot",datastore=blob_store)
h_time_series_3_pivot_pd = PipelineData("h_time_series_3_pivot",datastore=blob_store)
print("== Intermediate PipelineData references created")

== Intermediate PipelineData references created


### Pipelines steps using datasources and intermediate data
Machine learning pipelines can have many steps and these steps could use or reuse datasources and intermediate data. Here's how we construct such a pipeline:

In [45]:
# The best practice is to use separate folders for scripts and its dependent files
# for each step and specify that folder as the source_directory for the step.
# This helps reduce the size of the snapshot created for the step (only the specific folder is snapshotted).
# Since changes in any files in the source_directory would trigger a re-upload of the snapshot, this helps
# keep the reuse of the step when there are no changes in the source_directory of the step.
source_directory_pivot = 'src/pivot'
source_directory_join = 'src/join'
source_directory_groupby_aggregate = 'src/groupby-aggregate'

In [46]:
# PIVOT all hourly time series + normalize timestamps

h_time_series_1_pivot_step = PythonScriptStep(
    script_name="pivot.py", 
    arguments=["--date_column","MYDATE",
               "--hour_column","HOUR",
               "--datetime_column_name","DATETIME",
               "--pivot_columns","NODE_ID",
               "--value_column","MW",
               "--output", h_time_series_1_pivot_pd],
    inputs=[h_time_series_1_ds.as_named_input("time_series")],
    outputs=[h_time_series_1_pivot_pd],
    compute_target=aml_compute, 
    source_directory=source_directory_pivot,
    runconfig=run_config
)
print("== PythonScriptStep time_series_1_pivot_step created")

h_time_series_2_pivot_step = PythonScriptStep(
    script_name="pivot.py", 
    arguments=["--date_column","MYDATE",
               "--hour_column","HOUR",
               "--datetime_column_name","DATETIME",
               "--pivot_columns","NODE_ID",
               "--value_column","MW",
               "--output", h_time_series_2_pivot_pd],
    inputs=[h_time_series_2_ds.as_named_input("time_series")],
    outputs=[h_time_series_2_pivot_pd],
    compute_target=aml_compute, 
    source_directory=source_directory_pivot,
    runconfig=run_config
)
print("== PythonScriptStep time_series_2_pivot_step created")

h_time_series_3_pivot_step = PythonScriptStep(
    script_name="pivot.py", 
    arguments=["--date_column","DATE",
               "--hour_column","HE",
               "--datetime_column_name","DATETIME",
               "--pivot_columns","",
               "--value_column","CLOAD",
               "--output", h_time_series_3_pivot_pd],
    inputs=[h_time_series_3_ds.as_named_input("time_series")],
    outputs=[h_time_series_3_pivot_pd],
    compute_target=aml_compute, 
    source_directory=source_directory_pivot,
    runconfig=run_config
)
print("== PythonScriptStep time_series_3_pivot_step created")

== PythonScriptStep time_series_1_pivot_step created
== PythonScriptStep time_series_2_pivot_step created
== PythonScriptStep time_series_3_pivot_step created


#### Define a Step that consumes intermediate data and produces intermediate data
In this step, we define a step that consumes an intermediate data and produces intermediate data.

**Open `join.py` in the local machine and examine the arguments, inputs, and outputs for the script. That will give you a good sense of why the script argument names used below are important.** 

In [47]:
# Joining all HOURLY pivoted time-series

#h_time_series_joined_ds = PipelineData('h_time_series_joined', datastore=blob_store).as_dataset()
#h_time_series_joined_ds = h_time_series_joined_ds.register(name='h_time_series_joined', create_new_version=True)

h_time_series_joined_pd = PipelineData("h_time_series_joined",datastore=blob_store)

# Join Step
h_join_step = PythonScriptStep(
    script_name="join.py",
    arguments=["--join_column", "DATETIME",
               "--input_1", h_time_series_1_pivot_pd,
               "--input_2", h_time_series_2_pivot_pd,
               "--input_3", h_time_series_3_pivot_pd,
               "--output", h_time_series_joined_pd],
    inputs=[h_time_series_1_pivot_pd,
            h_time_series_2_pivot_pd,
            h_time_series_3_pivot_pd],
    outputs=[h_time_series_joined_pd],
    compute_target=aml_compute, 
    source_directory=source_directory_join,
    runconfig=run_config
)

print("== PythonScriptStep h_join_step created")

== PythonScriptStep h_join_step created


In [48]:
# Generate stats for HOURLY joined series GROUP BY DAILY
#h_time_series_joined_groupby_aggregated_ds = PipelineData('h_time_series_joined_groupby_aggregated', datastore=blob_store).as_dataset()
#h_time_series_joined_groupby_aggregated_ds = h_time_series_joined_groupby_aggregated_ds.register(name='h_time_series_joined_groupby_aggregated', create_new_version=True)

h_time_series_joined_groupby_aggregated_pd = PipelineData("h_time_series_joined_groupby_aggregated",datastore=blob_store)

# groupby-aggregate step
groupby_aggregate_step = PythonScriptStep(
    script_name="groupby-aggregate.py",
    arguments=["--datetime_column", "DATETIME",
               "--date_column_name","RDATE",
               "--input_pd", h_time_series_joined_pd,
               "--output_pd", h_time_series_joined_groupby_aggregated_pd],
    inputs=[h_time_series_joined_pd],
    outputs=[h_time_series_joined_groupby_aggregated_pd],
    compute_target=aml_compute,
    source_directory=source_directory_groupby_aggregate,
    runconfig=run_config
)

print("== PythonScriptStep groupby_aggregate_step created")

== PythonScriptStep groupby_aggregate_step created


In [49]:
# Joining all DAILY time-series
#d_time_series_joined_ds = PipelineData('d_time_series_joined', datastore=blob_store).as_dataset()
#d_time_series_joined_ds = d_time_series_joined_ds.register(name='d_time_series_joined', create_new_version=True)

d_time_series_joined_pd = PipelineData("d_time_series_joined",datastore=blob_store)

# Join Step
d_join_step = PythonScriptStep(
    script_name="join.py",
    arguments=["--join_column", "RDATE",
               "--input_1", d_time_series_1_dr,
               "--input_2", d_time_series_2_dr,
               "--output", d_time_series_joined_pd,
               "--cleanup_date_column", "RDATE"],
    inputs=[d_time_series_1_dr,
            d_time_series_2_dr],
    outputs=[d_time_series_joined_pd],
    compute_target=aml_compute,
    source_directory=source_directory_join,
    runconfig=run_config
)

print("== PythonScriptStep d_join_step created")

== PythonScriptStep d_join_step created


In [50]:
# FINAL JOIN TO GET THE USE CASE 1 TRAINING DATA SET READY
# JOIN hourly summarized by day and daily time series
d_use_case_1_pd = PipelineData("d_use_case_1",datastore=blob_store)

# Join Step
use_case_1_join_step = PythonScriptStep(
    script_name="join.py",
    arguments=["--join_column", "RDATE",
               "--input_1", h_time_series_joined_groupby_aggregated_pd,
               "--input_2", d_time_series_joined_pd,
               "--output", d_use_case_1_pd],
    inputs=[h_time_series_joined_groupby_aggregated_pd,
            d_time_series_joined_pd],
    outputs=[d_use_case_1_pd],
    compute_target=aml_compute,
    source_directory=source_directory_join,
    runconfig=run_config
)

print("== PythonScriptStep use_case_1_join_step created")

== PythonScriptStep use_case_1_join_step created


### Build the pipeline and submit an Experiment run

In [51]:
#pipeline = Pipeline(workspace=ws, steps=[h_join_step])
#pipeline = Pipeline(workspace=ws, steps=[groupby_aggregate_step])
#pipeline = Pipeline(workspace=ws, steps=[d_join_step])
pipeline = Pipeline(workspace=ws, steps=[use_case_1_join_step])
print ("== Pipeline is built")

== Pipeline is built


In [52]:
pipeline_run = Experiment(ws, 'use-case-1-data-prep').submit(pipeline)
print("== Pipeline is submitted for execution")

Created step join.py [60e891b5][94b06719-9a24-470e-bae7-e8294c3b1095], (This step is eligible to reuse a previous run's output)
Created step groupby-aggregate.py [235c200b][b2166c45-bd5f-4559-a6b2-5a9dfe309ac6], (This step is eligible to reuse a previous run's output)
Created step join.py [f658d7b0][7149f3dd-ea20-41b9-9547-46c0b73b41b8], (This step is eligible to reuse a previous run's output)
Created step pivot.py [d1b370f5][72116d10-f8bd-4e27-af07-ccc59ea17957], (This step is eligible to reuse a previous run's output)
Created step pivot.py [179e9160][16c75099-f63b-4757-ae16-d1fda71df017], (This step is eligible to reuse a previous run's output)
Created step pivot.py [4f407ad6][31a4fba2-1b16-41a3-acfb-89c838aef27b], (This step is eligible to reuse a previous run's output)
Created step join.py [8e7c7bb3][466ec3d9-51a3-4e22-8929-b97b80ae24dd], (This step is eligible to reuse a previous run's output)
Using data reference d_time_series_1 for StepId [7db5061c][e8ae75ce-6596-478d-a4a1-95758

In [53]:
RunDetails(pipeline_run).show()

_PipelineWidget(widget_settings={'childWidgetDisplay': 'popup', 'send_telemetry': False, 'log_level': 'INFO', …

### Wait for pipeline run to complete

In [54]:
pipeline_run.wait_for_completion(show_output=True)

PipelineRunId: af4de0ad-26e6-453a-a8ba-b4abc5b352bb
Link to Portal: https://ml.azure.com/experiments/use-case-1-data-prep/runs/af4de0ad-26e6-453a-a8ba-b4abc5b352bb?wsid=/subscriptions/c5ec24ce-9c5f-4da2-bf12-9ca8e9758d60/resourcegroups/azure-ml-workshop/workspaces/agd-mlws
PipelineRun Status: Running


StepRunId: 443b3794-cf48-4cc3-b97e-193019354d3c
Link to Portal: https://ml.azure.com/experiments/use-case-1-data-prep/runs/443b3794-cf48-4cc3-b97e-193019354d3c?wsid=/subscriptions/c5ec24ce-9c5f-4da2-bf12-9ca8e9758d60/resourcegroups/azure-ml-workshop/workspaces/agd-mlws

StepRun(join.py) Execution Summary
StepRun( join.py ) Status: Finished
{'runId': '443b3794-cf48-4cc3-b97e-193019354d3c', 'target': 'agd-training-cpu', 'status': 'Completed', 'startTimeUtc': '2020-05-04T20:27:34.908219Z', 'endTimeUtc': '2020-05-04T20:27:35.000115Z', 'properties': {'azureml.reusedrunid': 'f49a72cd-8d2e-4dd0-8fbd-2afd8f74cfa3', 'azureml.reusednodeid': '6efa0f4b', 'azureml.reusedpipeline': '12a0ed4f-8e22-409e




StepRunId: 31da513e-6b53-4b0c-bb80-0a06eb1894fc
Link to Portal: https://ml.azure.com/experiments/use-case-1-data-prep/runs/31da513e-6b53-4b0c-bb80-0a06eb1894fc?wsid=/subscriptions/c5ec24ce-9c5f-4da2-bf12-9ca8e9758d60/resourcegroups/azure-ml-workshop/workspaces/agd-mlws

StepRun(join.py) Execution Summary
StepRun( join.py ) Status: Finished
{'runId': '31da513e-6b53-4b0c-bb80-0a06eb1894fc', 'target': 'agd-training-cpu', 'status': 'Completed', 'startTimeUtc': '2020-05-04T20:27:34.26694Z', 'endTimeUtc': '2020-05-04T20:27:34.343296Z', 'properties': {'azureml.reusedrunid': '672a0b9f-8c9a-40c3-b0e2-1368232b4f39', 'azureml.reusednodeid': '461b09f4', 'azureml.reusedpipeline': '12a0ed4f-8e22-409e-acee-1907daf2774e', 'azureml.reusedpipelinerunid': '12a0ed4f-8e22-409e-acee-1907daf2774e', 'azureml.runsource': 'azureml.StepRun', 'azureml.nodeid': '8e7c7bb3', 'ContentSnapshotId': 'b00ea1cd-a096-4602-b551-bc565f24e8d3', 'StepType': 'PythonScriptStep', 'ComputeTargetType': 'AmlCompute', 'azureml.pip




StepRunId: 1a35ca31-3537-46fb-bd1a-f83afbd23225
Link to Portal: https://ml.azure.com/experiments/use-case-1-data-prep/runs/1a35ca31-3537-46fb-bd1a-f83afbd23225?wsid=/subscriptions/c5ec24ce-9c5f-4da2-bf12-9ca8e9758d60/resourcegroups/azure-ml-workshop/workspaces/agd-mlws

StepRun(pivot.py) Execution Summary
StepRun( pivot.py ) Status: Finished
{'runId': '1a35ca31-3537-46fb-bd1a-f83afbd23225', 'target': 'agd-training-cpu', 'status': 'Completed', 'startTimeUtc': '2020-05-04T20:27:34.179104Z', 'endTimeUtc': '2020-05-04T20:27:34.280126Z', 'properties': {'azureml.reusedrunid': 'f99a9a83-93d8-42a7-b851-1e83f0201016', 'azureml.reusednodeid': 'b83f94f5', 'azureml.reusedpipeline': '12a0ed4f-8e22-409e-acee-1907daf2774e', 'azureml.reusedpipelinerunid': '12a0ed4f-8e22-409e-acee-1907daf2774e', 'azureml.runsource': 'azureml.StepRun', 'azureml.nodeid': '179e9160', 'ContentSnapshotId': 'c666717c-97fe-4e31-9a7e-6a51d4e30f7e', 'StepType': 'PythonScriptStep', 'ComputeTargetType': 'AmlCompute', 'azureml.

{'runId': 'ef7600af-1f05-4a18-9012-cbc05ed0d52a', 'target': 'agd-training-cpu', 'status': 'Completed', 'startTimeUtc': '2020-05-04T20:27:34.172555Z', 'endTimeUtc': '2020-05-04T20:27:34.2544Z', 'properties': {'azureml.reusedrunid': 'd507980b-068f-4bb5-8e38-25a60077f4d5', 'azureml.reusednodeid': '09a0b9c4', 'azureml.reusedpipeline': '12a0ed4f-8e22-409e-acee-1907daf2774e', 'azureml.reusedpipelinerunid': '12a0ed4f-8e22-409e-acee-1907daf2774e', 'azureml.runsource': 'azureml.StepRun', 'azureml.nodeid': 'd1b370f5', 'ContentSnapshotId': 'c666717c-97fe-4e31-9a7e-6a51d4e30f7e', 'StepType': 'PythonScriptStep', 'ComputeTargetType': 'AmlCompute', 'azureml.pipelinerunid': 'af4de0ad-26e6-453a-a8ba-b4abc5b352bb', '_azureml.ComputeTargetType': 'amlcompute', 'AzureML.DerivedImageName': 'azureml/azureml_fbecfa6255cba4c1cc15571eceadee24', 'ProcessInfoFile': 'azureml-logs/process_info.json', 'ProcessStatusFile': 'azureml-logs/process_status.json'}, 'inputDatasets': [], 'runDefinition': {'script': 'pivot.py

'Finished'

### See Outputs

See where outputs of each pipeline step are located on your datastore.

***Wait for pipeline run to complete, to make sure all the outputs are ready***

In [33]:
# Get Steps
for step in pipeline_run.get_steps():
    print("== Outputs of step " + step.name)
    
    # Get a dictionary of StepRunOutputs with the output name as the key 
    output_dict = step.get_outputs()
    
    for name, output in output_dict.items():
        output_reference = output.get_port_data_reference() # Get output port data reference
        print("\tname: " + name)
        print("\tdatastore: " + output_reference.datastore_name)
        print("\tpath on datastore: " + output_reference.path_on_datastore)

== Outputs of step join.py
	name: d_use_case_1
	datastore: workspaceblobstore
	path on datastore: azureml/9bb75485-9888-442b-92b5-6261ae96344d/d_use_case_1
== Outputs of step groupby-aggregate.py
	name: h_time_series_joined_groupby_aggregated
	datastore: workspaceblobstore
	path on datastore: azureml/2fea305e-3368-4fa5-9852-5aedcd63d6fc/h_time_series_joined_groupby_aggregated
== Outputs of step join.py
	name: h_time_series_joined
	datastore: workspaceblobstore
	path on datastore: azureml/f49a72cd-8d2e-4dd0-8fbd-2afd8f74cfa3/h_time_series_joined
== Outputs of step pivot.py
	name: h_time_series_3_pivot
	datastore: workspaceblobstore
	path on datastore: azureml/545cc732-6882-4136-8dca-88643e042a55/h_time_series_3_pivot
== Outputs of step pivot.py
	name: h_time_series_2_pivot
	datastore: workspaceblobstore
	path on datastore: azureml/f99a9a83-93d8-42a7-b851-1e83f0201016/h_time_series_2_pivot
== Outputs of step pivot.py
	name: h_time_series_1_pivot
	datastore: workspaceblobstore
	path on da

In [34]:
# REGISTER a new version of the final output as a Dataset

from azureml.core import Dataset, Datastore
from azureml.data.datapath import DataPath

# find output dataset
for step in pipeline_run.get_steps():
    output_dict = step.get_outputs()
    for name, output in output_dict.items():
        if name == 'd_use_case_1':
            # generate a Tabular DataSet for it
            output_reference = output.get_port_data_reference()
            datastore_path = [DataPath(blob_store, output_reference.path_on_datastore)]
            ds = Dataset.Tabular.from_delimited_files(datastore_path)
            dataset_name = 'd_use_case_1'
            ds.register(ws, name=dataset_name, create_new_version=True)
            print("== Registered new version of dataset: " + dataset_name)

== Registered new version of dataset: d_use_case_1
