In [None]:
from azureml.core import Workspace, Dataset, Datastore
from azureml.core import Experiment, Environment, Model
from azureml.core.compute import ComputeTarget

In [None]:
from azureml.core.runconfig import RunConfiguration, CondaDependencies, DEFAULT_CPU_IMAGE
from azureml.pipeline.steps import PythonScriptStep, ParallelRunStep, ParallelRunConfig
from azureml.pipeline.core import Pipeline, PublishedPipeline, PipelineData
from azureml.pipeline.core import StepSequence
from azureml.widgets import RunDetails

In [None]:
import pandas as pd
import numpy as np
import os
import random as r

In [None]:
ws = Workspace.from_config()

In [None]:
compute_name = 'compute-cluster'
compute_target = ComputeTarget(ws, compute_name)

In [None]:
datastore = Datastore.get_default(ws)
my_datastore_name = 'workspaceblobstore'
my_datastore = Datastore.get(ws, my_datastore_name)

In [None]:
sepal_length_range = np.arange(4.3, 7.9, 0.1)
sepal_width_range = np.arange(2, 4.4, 0.1)
petal_length_range = np.arange(1, 6.9, 0.1)
petal_width_range = np.arange(0.1, 2.5, 0.1)
IrisList = []
columns = ['sepal_length','sepal_width','petal_length','petal_width']
IrisDF = pd.DataFrame(columns=columns)
for i in range(0,1000000):
    values = [round(r.choice(sepal_length_range),1),round(r.choice(sepal_width_range),1),round(r.choice(petal_length_range),1),round(r.choice(petal_width_range),1)]
    iris_dictionary = pd.DataFrame(dict(zip(columns, values)),index=[0])
    IrisList.append(iris_dictionary)
IrisDF = IrisDF.append(IrisList,True)

In [None]:
Dataset.Tabular.register_pandas_dataframe(IrisDF,datastore,'Iris Parallel Scoring')

In [None]:
os.makedirs('Scoring_Scripts', exist_ok=True)  

In [None]:
%%writefile Scoring_Scripts/Iris_Parallel_Scoring.py
from azureml.core import Run, Workspace
from azureml.core import Dataset, Datastore, Model

import os
import joblib
import argparse
import numpy as np
import pandas as pd

run = Run.get_context()

def init():
    parser = argparse.ArgumentParser()
    parser.add_argument('--model_name', dest="model_name", required=True)
    args, unknown_args = parser.parse_known_args()
    
    global model
    model_path = Model.get_model_path(args.model_name)
    model = joblib.load(model_path)
    
def run(input_data):
    predictions = model.predict(input_data)  
    predSeries = pd.Series(predictions)
    input_data['Prediction'] = predSeries 
    print('Data written to parallel_run_step.txt')
    return input_data

In [None]:
%%writefile Scoring_Scripts/Iris_Parallel_Output_Creation.py
from azureml.core import Run, Workspace
from azureml.core import Dataset, Datastore

import pandas as pd
import numpy as np
import os
import argparse

run = Run.get_context()

parser = argparse.ArgumentParser()
parser.add_argument("--input_data_folder",type=str)
args = parser.parse_args()

def main():  
    FileName = "parallel_run_step.txt"
    input_data_path = os.path.join(args.input_data_folder, FileName)  
    result = pd.read_csv(input_data_path, delimiter=" ", header=None)
    
    
    columns = ['sepal_length','sepal_width','petal_length','petal_width', 'Prediction']
    result.columns = columns

    ws = run.experiment.workspace
    datastore = Datastore.get_default(ws)
    
    output_datastore_path = 'Output_Folder'
    os.makedirs(output_datastore_path, exist_ok=True) 
    FileName = "Iris_Parallel_Predictions.csv"
    OutputPath = os.path.join(output_datastore_path, FileName)
    result.to_csv(OutputPath, index = False, sep=',')
    
    datastore.upload_files(files=[OutputPath], target_path=output_datastore_path, overwrite=True)
    os.remove(OutputPath)
    os.rmdir(output_datastore_path)
    
if __name__ == '__main__':
    main()

In [None]:
parser = argparse.ArgumentParser()
parser.add_argument("--input_data_folder",type=str)
args = parser.parse_args()

def main(): 
    input_data_path = os.path.join(args.input_data_folder) 
    result = pandas.read_parquet(input_data_path)

In [None]:
Env = Environment.get(ws, 'AutoML Environment')

In [None]:
parallel_run_output = PipelineData(name='parallel_predictions', datastore=datastore)

In [None]:
parallel_environment = Env
parallel_environment.docker.enabled = True 
parallel_environment.docker.base_image = DEFAULT_CPU_IMAGE

In [None]:
run_config = RunConfiguration()
run_config.environment = Env
run_config.environment.docker.enabled = True
run_config.environment.docker.base_image = DEFAULT_CPU_IMAGE

In [None]:
# This is for a parallel run
parallel_run_config = ParallelRunConfig(
    source_directory='Scoring_Scripts/',
    entry_script="Iris_Parallel_Scoring.py",
    mini_batch_size="1MB",
    error_threshold=5,
    output_action="append_row",
    environment=parallel_environment,
    compute_target=compute_target,
    run_invocation_timeout=60,
    node_count=4,
    logging_level="DEBUG")

In [None]:
dataset = Dataset.get_by_name(ws,'Iris Parallel Scoring')
input_data = dataset.as_named_input('Iris_Parallel_Scoring')
model_name = 'Iris-Multi-Classification-AutoML'

In [None]:
parallel_scoring_step = ParallelRunStep(
    name="iris-parallel-scoring-step",
    parallel_run_config=parallel_run_config,
    inputs=[input_data],
    output=parallel_run_output,
    arguments=['--model_name', model_name],
    allow_reuse=False
)

In [None]:
output_step = PythonScriptStep(name='iris-output-step',
                                         script_name='Iris_Parallel_Output_Creation.py',
                                         source_directory='Scoring_Scripts',
                                         arguments=["--input_data_folder", parallel_run_output,],
                                         inputs=[parallel_run_output], 
                                         compute_target=compute_target,
                                         runconfig=run_config,
                                         allow_reuse=False)

In [None]:
step_sequence = StepSequence(steps=[parallel_scoring_step, output_step])
pipeline = Pipeline(workspace=ws, steps=step_sequence)

In [None]:
# Run your pipeline
pipeline_experiment = Experiment(ws, 'Iris-Paralell-Scoring-Pipeline-Run')
pipeline_run = pipeline_experiment.submit(pipeline, show_output=True)

In [None]:
RunDetails(pipeline_run).show()
pipeline_run.wait_for_completion(show_output=True)

In [None]:
published_pipeline = pipeline_run.publish_pipeline(
    name='Iris-Parallel-Scoring-Pipeline',\
    description='Pipeline that Scores Iris Data in Parallel', version= '1.0')

published_pipeline