In [None]:
#!pip install --upgrade azureml-sdk azureml-dataprep matplotlib

In [None]:
#Get AzureML Workspace config
#Setup environment using samples here: https://github.com/Azure/MachineLearningNotebooks/tree/master/setup-environment

import azureml.core
from azureml.core import Workspace, Experiment,Dataset
from azureml.core.compute import ComputeTarget, AmlCompute
from azureml.data.datapath import DataPath, DataPathComputeBinding

from azureml.pipeline.core import PipelineParameter
from azureml.pipeline.core import Pipeline, PipelineRun
from azureml.pipeline.steps import PythonScriptStep


In [None]:
workspace = Workspace.from_config()
#print(workspace)

In [None]:
#Avro files location pre-registered as a files dataset. Ensure the dataset path is of the format folder/** to pick up all partitions and folders inside.
dataset_name="eventhub-capture-avro"
dataset = Dataset.get_by_name(workspace, name=dataset_name)
print(dataset.name)

In [None]:
# Choose a name for your cluster.
amlcompute_cluster_name = "cpucluster"

found = False
# Check if this compute target already exists in the workspace.
cts = workspace.compute_targets
if amlcompute_cluster_name in cts and cts[amlcompute_cluster_name].type == 'AmlCompute':
    found = True
    print('Found existing compute target.')
    compute_target = cts[amlcompute_cluster_name]
    
if not found:
    print('Creating a new compute target...')
    provisioning_config = AmlCompute.provisioning_configuration(vm_size = "STANDARD_D2_V2", # for GPU, use "STANDARD_NC6"
                                                                #vm_priority = 'lowpriority', # optional
                                                                max_nodes = 1)

    # Create the cluster.
    compute_target = ComputeTarget.create(workspace, amlcompute_cluster_name, provisioning_config)
    
    # Can poll for a minimum number of nodes and for a specific timeout.
    # If no min_node_count is provided, it will use the scale settings for the cluster.
    compute_target.wait_for_completion(show_output = True, timeout_in_minutes = 10)

In [None]:
# Choose a name for the run history container in the workspace.
experiment_name = 'avro-test-pipeline'
source_directory  = '.'

experiment = Experiment(workspace, experiment_name)
experiment

In [None]:
%%writefile conda_env.yaml
# Packages should have explicit versions 
# For demo purposes we let them loose
# Also note the azureml-defaults package mentioned in  https://docs.microsoft.com/en-us/azure/machine-learning/concept-environments#types-of-environments
name: custom-env
dependencies:
  - python=3.6
  - scikit-learn
  - pip
  - pip:
    - azureml-defaults
    - pandas
    - fastavro

In [None]:
from azureml.core import Environment
from azureml.core.runconfig import RunConfiguration

# Create an environment from the conda dependencies
run_env = Environment.from_conda_specification("run-environment",  'conda_env.yaml')
# Create a run config that we will use in our steps
run_config = RunConfiguration()
run_config.environment = run_env

In [None]:
#Define Pipeline Parameter
minute_pipeline_param = PipelineParameter(name="minute_param", default_value='07')

In [None]:
process_avro_step = PythonScriptStep(
    name='process_avro_step',
    script_name="avro-mount-script.py",
    arguments=["--minute", minute_pipeline_param],
    inputs=[dataset.as_named_input('input_dataset')],
    compute_target=compute_target, 
    source_directory=source_directory,
    runconfig = run_config)
print("process_avro_step created")

pipeline = Pipeline(workspace=workspace, steps=[process_avro_step])
print("pipeline with the process_avro_step created")

In [None]:
pipeline_run = experiment.submit(pipeline)
print("Pipeline is submitted for execution")

In [None]:
pipeline_run_with_params = experiment.submit(pipeline, \
        pipeline_parameters={'minute_param':'08'})
