In [1]:


import azureml.core
from azureml.core import Workspace, Datastore, Experiment, Dataset
from azureml.data import OutputFileDatasetConfig
from azureml.core.compute import AmlCompute
from azureml.core.compute import ComputeTarget

# Check core SDK version number
print("SDK version:", azureml.core.VERSION)

from azureml.pipeline.core import Pipeline
from azureml.pipeline.steps import PythonScriptStep
from azureml.pipeline.core.graph import PipelineParameter

print("Pipeline SDK-specific imports completed")

ws = Workspace.from_config()
print(ws.name, ws.resource_group, ws.location, ws.subscription_id, sep = '\n')

# Default datastore (Azure blob storage)
# def_blob_store = ws.get_default_datastore()
def_blob_store = Datastore(ws, "workspaceblobstore")
print("Blobstore's name: {}".format(def_blob_store.name))



SDK version: 1.45.0
Pipeline SDK-specific imports completed
Preparation-AI102-Florian
cloud-shell-storage-westeurope
westeurope
0252a218-2d27-4d77-a4f1-b638272c95e0
Blobstore's name: workspaceblobstore


In [2]:
from azureml.core.compute_target import ComputeTargetException

aml_compute_target = "cpu-cluster"
try:
    aml_compute = AmlCompute(ws, aml_compute_target)
    print("found existing compute target.")
except ComputeTargetException:
    print("creating new compute target")
    
    provisioning_config = AmlCompute.provisioning_configuration(vm_size = "STANDARD_D2_V2",
                                                                min_nodes = 1, 
                                                                max_nodes = 1)    
    aml_compute = ComputeTarget.create(ws, aml_compute_target, provisioning_config)
    aml_compute.wait_for_completion(show_output=True, min_node_count=None, timeout_in_minutes=20)



found existing compute target.


In [3]:
# Specify a public dataset path
data_path = "https://dprepdata.blob.core.windows.net/demo/Titanic.csv"
# Or uploading data to the datastore
# data_path = def_blob_store.upload_files(["./your_data.pkl"], target_path="your_path", overwrite=True)

In [4]:
# Reference the data uploaded to blob storage using file dataset
# Assign the datasource to blob_input_data variable
blob_input_data = Dataset.File.from_files(data_path).as_named_input("test_data")
print("Dataset created")

Dataset created


In [5]:
# Define intermediate data using OutputFileDatasetConfig
processed_data1 = OutputFileDatasetConfig(name="processed_data1")
print("Output dataset object created")

Output dataset object created


In [6]:
# trainStep consumes the datasource (Datareference) in the previous step
# and produces processed_data1

source_directory = "publish_run_train"

trainStep = PythonScriptStep(
    script_name="train.py", 
        arguments=["--input_data", blob_input_data.as_mount(), "--output_train", processed_data1],
    compute_target=aml_compute, 
    source_directory=source_directory
)
print("trainStep created")

trainStep created


In [7]:


# extractStep to use the intermediate data produced by trainStep
# This step also produces an output processed_data2
processed_data2 = OutputFileDatasetConfig(name="processed_data2")
source_directory = "publish_run_extract"

extractStep = PythonScriptStep(
    script_name="extract.py",
    arguments=["--input_extract", processed_data1.as_input(), "--output_extract", processed_data2],
    compute_target=aml_compute, 
    source_directory=source_directory)
print("extractStep created")



extractStep created


In [8]:
# We will use this later in publishing pipeline
pipeline_param = PipelineParameter(name="pipeline_arg", default_value=10)
print("pipeline parameter created")

pipeline parameter created


In [9]:
# Now define compareStep that takes two inputs (both intermediate data), and produce an output
processed_data3 = OutputFileDatasetConfig(name="processed_data3")

# You can register the output as dataset after job completion
processed_data3 = processed_data3.register_on_complete("compare_result")

source_directory = "publish_run_compare"

compareStep = PythonScriptStep(
    script_name="compare.py",
    arguments=["--compare_data1", processed_data1.as_input(), "--compare_data2", processed_data2.as_input(), "--output_compare", processed_data3, "--pipeline_param", pipeline_param],  
    compute_target=aml_compute, 
    source_directory=source_directory)
print("compareStep created")

compareStep created


In [10]:


pipeline1 = Pipeline(workspace=ws, steps=[compareStep])
print ("Pipeline is built")



ValueError: Step [compare.py]: script not found at: c:\Users\fuetu\gitlocal\DP100\mslearn-dp100\publish_run_compare\compare.py. Make sure to specify an appropriate source_directory on the Step or default_source_directory on the Pipeline.