# Azure ML - Sample Batch Prediction Pipeline
- Parallel run step leveraged
- Output collected & saved to blob storage

In [None]:
import azureml.core
from azureml.core import Workspace
import os, shutil
from azureml.core import Workspace, Experiment, Datastore, Environment, Dataset
from azureml.core.compute import ComputeTarget, AmlCompute
from azureml.core.compute_target import ComputeTargetException
from azureml.core.runconfig import RunConfiguration
from azureml.core.conda_dependencies import CondaDependencies
from azureml.core.runconfig import DEFAULT_CPU_IMAGE
from azureml.pipeline.core import Pipeline, PipelineParameter, PipelineData, PublishedPipeline
from azureml.pipeline.steps import PythonScriptStep
from azureml.pipeline.core import PipelineParameter, PipelineData, PipelineEndpoint
from azureml.data.output_dataset_config import OutputTabularDatasetConfig, OutputDatasetConfig, OutputFileDatasetConfig
from azureml.pipeline.core.schedule import ScheduleRecurrence, Schedule
from azureml.core.experiment import Experiment

# Load the workspace from the saved config file
ws = Workspace.from_config()
print('Ready to use Azure ML {} to work with {}'.format(azureml.core.VERSION, ws.name))

In [None]:
import os, shutil
folder_name = 'batch-inferencing-full'
script_folder = os.path.join(os.getcwd(), folder_name)
print(script_folder)
os.makedirs(script_folder, exist_ok=True)

## Connect to AML Workspace

In [None]:
#Get default datastore
default_ds = ws.get_default_datastore()

## Create Cluster

In [None]:
from azureml.core.compute import AmlCompute, ComputeTarget
from azureml.exceptions import ComputeTargetException

compute_name =  "email-cluster4"
print(compute_name)

# checks to see if compute target already exists in workspace, else create it
try:
    compute_target = ComputeTarget(workspace=ws, name=compute_name)
except ComputeTargetException:
    config = AmlCompute.provisioning_configuration(vm_size="STANDARD_D13_V2",
                                                   min_nodes=2, 
                                                   max_nodes=10)

    compute_target = ComputeTarget.create(workspace=ws, name=compute_name, provisioning_configuration=config)
    compute_target.wait_for_completion(show_output=True, min_node_count=None, timeout_in_minutes=120)

In [None]:
%%writefile $script_folder/email_classification_inference.yml
name: email_classification_inference
dependencies:
  # The python interpreter version.
  # Currently Azure ML only supports 3.5.2 and later.
- python=3.6.2
- scikit-learn
- ipykernel
- matplotlib
- pandas
- pip
- pip:
  - azureml-defaults
  - pyarrow

In [None]:
from azureml.core import Environment
from azureml.core.runconfig import DEFAULT_CPU_IMAGE

# Create an Environment for the experiment
batch_env = Environment.from_conda_specification("email_classification_inference", script_folder + "/email_classification_inference.yml")
batch_env.docker.base_image = DEFAULT_CPU_IMAGE
print('Configuration ready.')


run_config = RunConfiguration()
run_config.environment = batch_env

# Define Output Datasets

Below we define the configuration for datasets that will be passed between steps in our pipeline. Note, in all cases we specify the datastore that should hold the datasets and whether they should be registered following step completion or not. This can optionally be disabled by removing the register_on_complete() call


# Define Pipeline Parameters

PipelineParameter objects serve as variable inputs to an Azure ML pipeline and can be specified at runtime. Below we specify a pipeline parameter object model_name which will be used to reference the locally trained model that was uploaded and registered within the Azure ML workspace. Multiple pipeline parameters can be created and used. Included here are multiple sample pipeline parameters (get_data_param_*) to highlight how parameters can be passed into and consumed by various pipeline steps.

In [None]:
model_name = PipelineParameter(name='model_name', default_value='email_classifier')

# Define Pipeline Steps

The pipeline below consists of steps to gather and register data from a remote source, a scoring step where the registered model is used to make predictions on loaded, and a data publish step where scored data can be exported to a remote data source. All of the PythonScriptSteps have a corresponding *.py file which is referenced in the step arguments. Also, any PipelineParameters defined above can be passed to and consumed within these steps.


In [None]:
import os, shutil
folder_name = 'batch-inferencing'
script_folder = os.path.join(os.getcwd(), folder_name)
print(script_folder)
os.makedirs(script_folder, exist_ok=True)

In [None]:
%%writefile $script_folder/batch_inferencing_data_silly.py

import os
import numpy as np
from azureml.core import Model
import joblib
import time
import pandas as pd


def init():
    # Runs when the pipeline step is initialized
    global model

    # load the model
    print('****loaded model**********')
    model_path = Model.get_model_path('email_classifier')
    model = joblib.load(model_path)


def run(mini_batch):
    # This runs for each batch
    print(f'run method start: {__file__}, run({mini_batch})')
    resultList = []
    all_predictions = pd.DataFrame()
    
    for idx, file_path in enumerate(mini_batch):
        file_name, file_extension = os.path.splitext(os.path.basename(file_path))
       
        #print(file_path)
        #data = pd.read_csv(file_path)
        
        text_file = open(file_path, "r")
        data = text_file.read()
        text_file.close()
        result = model.predict([data])
        print(data)
        resultList.append("{}: {}".format(os.path.basename(file_path), result[0]))
    #return resultList
        
        #for _, row in result_df.iterrows():
        #    result_list.append((row))


    #Return all rows formatted as a Pandas dataframe
    return pd.DataFrame(resultList)



You're going to use a pipeline to run the batch prediction script, generate predictions from the input data, and save the results as a text file in the output folder. To do this, you can use a **ParallelRunStep**, which enables the batch data to be processed in parallel and the results collated in a single output file named *parallel_run_step.txt*.

In [None]:
# Register a dataset for the input data
batch_data_set = Dataset.File.from_files(path=(default_ds, 'spam-data-inferencing/'), validate=False)
try:
    batch_data_set = batch_data_set.register(workspace=ws, 
                                             name='spam-batch-data-inference',
                                             description='inference batch data',
                                             create_new_version=True)
except Exception as ex:
    print(ex)

print("Done!")

In [None]:
script_folder

In [None]:
%%writefile $script_folder/organize_data_silly.py

import pandas as pd
import os
import datetime
import argparse

# Parse input arguments
parser = argparse.ArgumentParser("parallel run step results directory")
parser.add_argument("--processed_dataset_tabular", dest='processed_dataset_tabular', required=True)
parser.add_argument("--processed_dataset", type=str, required=True)

args, _ = parser.parse_known_args()

#Get output data from previous step - saved as parallel_run_step.txt
pipeline_data_file = os.path.join(args.processed_dataset, 'parallel_run_step.txt')

#Parse as dataframe and assign headers
df_pipeline_data = pd.read_csv(pipeline_data_file, header=None, delimiter=" ")

print(df_pipeline_data.columns)
#df_pipeline_data.columns = ['D', 'E', 'F', 'G', 'A', 'B', 'C', 'Year']

#Note: additional DF formatting operations can be done here

#Create output directories for CSV/Excel files
os.makedirs(args.processed_dataset_tabular, exist_ok=True)
os.makedirs(args.processed_dataset, exist_ok=True)

#Save output files to blob storage
df_pipeline_data.to_csv(os.path.join(args.processed_dataset_tabular, 'processed_data.csv'), index=False)

In [None]:
from azureml.pipeline.steps import ParallelRunConfig, ParallelRunStep
from azureml.data import OutputFileDatasetConfig
from azureml.core.runconfig import DockerConfiguration

#output_dir = OutputFileDatasetConfig(name='inferences')
processed_dataset_tabular = OutputFileDatasetConfig(name='processed_data_tabular', destination=(default_ds, 'processed_data_tabular/{run-id}')).read_delimited_files().register_on_complete(name='processed_data_tabular')
#processed_dataset_file = OutputFileDatasetConfig(name='processed_data_file', destination=(default_ds, 'processed_data_file/{run-id}')).register_on_complete(name='processed_data_file')
processed_dataset_pipeline_data = PipelineData(name='processed_data', datastore=default_ds)

parallel_run_config = ParallelRunConfig(
    source_directory=script_folder,
    entry_script="batch_inferencing_data_silly.py",
    mini_batch_size="50",
    error_threshold=10,
    output_action="append_row",
    environment=batch_env,
    compute_target=compute_target,
    node_count=2)

parallelrun_step = ParallelRunStep(
    name='batch-score-diabetes',
    parallel_run_config=parallel_run_config,
    inputs=[batch_data_set.as_named_input('email_batch')],
    output=processed_dataset_pipeline_data,
    arguments=[],
    allow_reuse=False
)

organize_results_step = PythonScriptStep(
    name='organize_results_step',
    script_name='organize_data_silly.py',
    arguments =['--processed_dataset_tabular', processed_dataset_tabular,
               '--processed_dataset', processed_dataset_pipeline_data],
    inputs=[processed_dataset_pipeline_data],
    outputs=[processed_dataset_tabular],
    compute_target=compute_target,
    source_directory=script_folder,
    allow_reuse=False,
    runconfig=run_config
)

print('Steps defined')

In [None]:
from azureml.core import Experiment
from azureml.pipeline.core import Pipeline

pipeline = Pipeline(workspace=ws, steps=[parallelrun_step, organize_results_step])
pipeline_run = Experiment(ws, '03-email-classifcation-batch-inference_full').submit(pipeline)
pipeline_run.wait_for_completion(show_output=True)

## Publish the Pipeline

In [None]:
# published_pipeline = pipeline.publish(name = 'Email Batch Prediction Pipeline Silly',
#                                      description = 'Pipeline that generates batch predictions using a registered trained model.',
#                                      continue_on_step_failure = False)

In [None]:
# published_pipeline

In [None]:
#from azureml.pipeline.core.schedule import ScheduleRecurrence, Schedule
# pipeline_id = '2c8fc5ae-1508-4bf9-9dda-24c21fb2e8aa'
# experiment_name = 'scheduled_silly_email'
# recurrence = ScheduleRecurrence(frequency="Minute", interval=5)
# recurring_schedule = Schedule.create(ws, name="MyRecurringSchedule", 
#                             description="Based on time",
#                             pipeline_id=pipeline_id, 
#                             experiment_name=experiment_name, 
#                             recurrence=recurrence)

## Get published pipeline Info

In [None]:
# experiments = Experiment.list(ws)
# # for experiment in experiments:
# #     print(experiment.name)

# published_pipelines = PublishedPipeline.list(ws)
# for published_pipeline in  published_pipelines:
#     print(f"{published_pipeline.name},'{published_pipeline.id}'")

In [None]:
# ss = Schedule.list(ws)
# for s in ss:
#     print(s)
#     print('****************')

In [None]:
# def stop_by_schedule_id(ws, schedule_id):
#     s = next(s for s in Schedule.list(ws) if s.id == schedule_id)
#     s.disable()
#     return s

# #stop_by_schedule_id(ws, '60166fcd-5276-4557-9a5b-c5a0ce3ec84e')

In [None]:
# pipeline = PublishedPipeline.get(ws, id = '898c1939-7278-4ce8-976f-106b71bbb678')
# pipeline.disable()

# # for published_pipeline in  published_pipelines:
# #     pipeline = PublishedPipeline.get(ws, id = published_pipeline.id)
# #     pipeline.disable()

## Set Schedule for Pipeline

In [None]:
# pipeline_id = published_pipeline.Id
# experiment_name = 'silly_scheduled_email'
# recurrence = ScheduleRecurrence(frequency="Minute", interval=5)
# recurring_schedule = Schedule.create(ws, name="MyRecurringSchedule", 
#                             description="Based on time",
#                             pipeline_id=pipeline_id, 
#                             experiment_name=experiment_name, 
#                             recurrence=recurrence)