# Create batch pipeline

In [2]:
# Connect to your workspace
from azureml.core import Workspace
ws = Workspace.from_config()

# Provision inference compute

In [19]:
from azureml.core.compute import ComputeTarget, AmlCompute
from azureml.core.compute_target import ComputeTargetException

cluster_name = "mlopsbootcamp"

try:
    # Check for existing compute target
    inference_cluster = ComputeTarget(workspace=ws, name=cluster_name)
    print('Found existing cluster, use it.')
except ComputeTargetException:
    # If it doesn't already exist, create it
    try:
        compute_config = AmlCompute.provisioning_configuration(vm_size='Standard_DS2_v2', max_nodes=2)
        inference_cluster = ComputeTarget.create(ws, cluster_name, compute_config)
        inference_cluster.wait_for_completion(show_output=True)
    except Exception as ex:
        print(ex)

Creating......
SucceededProvisioning operation finished, operation "Succeeded"
Succeeded
AmlCompute wait for completion finished

Minimum number of nodes requested have been provisioned


# Create a pipeline for batch inferencing

In [3]:
import os
# Create a folder for the experiment files
experiment_folder = 'batch_pipeline'
os.makedirs(experiment_folder, exist_ok=True)

print(experiment_folder)

batch_pipeline


# Get path to the model

In [4]:
from azureml.core import Model
model_list = Model.list(ws)
model_path = Model.get_model_path('fourier_regression', _workspace=ws)
print(model_list, model_path)

[Model(workspace=Workspace.create(name='mlopsworkspace', subscription_id='e7d71274-b7c4-47ed-9751-2505b563b918', resource_group='mlopsgroup'), name=fourier_regression, id=fourier_regression:1, version=1, tags={}, properties={})] azureml-models\fourier_regression\1\fourier.pkl


# Load the model

In [5]:
import joblib
model = joblib.load(model_path)
model

LinearRegression()

# Check the batch data

In [6]:
mini_batch = list()
for (dirpath, dirnames, filenames) in os.walk("batch-data"):
    mini_batch += [os.path.join(dirpath, file) for file in filenames]
for elem in mini_batch:
    print(elem)

batch-data\1.csv
batch-data\2.csv
batch-data\3.csv
batch-data\4.csv
batch-data\5.csv
batch-data\6.csv
batch-data\7.csv


# Make prediction per batch

In [7]:
import numpy as np
def run(mini_batch):
    # This runs for each batch
    resultList = []

    # process each file in the batch
    for f in mini_batch:
        # Read comma-delimited data into an array
        data = np.genfromtxt(f, delimiter=',')
        print(data)
        # Reshape into a 2-dimensional array for model input
        prediction = model.predict(data.reshape(1, -1))
        # Append prediction to results
        resultList.append("{}: {}".format(os.path.basename(f), prediction[0]))
    return resultList

result = run(mini_batch)
result

# Summarize into python script

In [24]:
%%writefile $experiment_folder\score.py

import os
import numpy as np
from azureml.core import Model
import joblib

def init():
    # Runs when the pipeline step is initialized
    global model

    # load the model
    model_path = Model.get_model_path('fourier_regression')
    model = joblib.load(model_path)

def run(mini_batch):
    # This runs for each batch
    resultList = []

    # process each file in the batch
    for f in mini_batch:
        # Read comma-delimited data into an array
        data = np.genfromtxt(f, delimiter=',')
        # Reshape into a 2-dimensional array for model input
        prediction = model.predict(data.reshape(1, -1))
        # Append prediction to results
        resultList.append("{}: {}".format(os.path.basename(f), prediction[0]))
    return resultList

Overwriting batch_pipeline\score.py


# Create conda environment

In [21]:
%%writefile $experiment_folder\batch_environment.yml
name: batch_environment
dependencies:
- python=3.8
- numpy
- pandas
- scikit-learn
- pip:
    - azureml-core
    - azureml-dataset-runtime[fuse]

Overwriting batch_pipeline\batch_environment.yml


# Define run using environment

In [17]:
from azureml.core import Environment
from azureml.core.runconfig import DEFAULT_CPU_IMAGE

# Create an Environment for the experiment
batch_env = Environment.from_conda_specification("experiment_env", experiment_folder + "/batch_environment.yml")
batch_env.docker.base_image = DEFAULT_CPU_IMAGE
print('Configuration ready.')

Configuration ready.


# Configure batch pipeline steps

In [25]:
from datetime import datetime

from azureml.pipeline.steps import ParallelRunConfig, ParallelRunStep
from azureml.data import OutputFileDatasetConfig
from azureml.core.runconfig import DockerConfiguration

# Get the batch dataset for input
batch_data_set = ws.datasets['batch-data']

# Set the output location
default_ds = ws.get_default_datastore()
output_dir = OutputFileDatasetConfig(name='inferences')

# Define the parallel run step step configuration
parallel_run_config = ParallelRunConfig(
    source_directory=experiment_folder,
    entry_script="score.py",
    mini_batch_size="5",
    error_threshold=10,
    output_action="append_row",
    environment=batch_env,
    compute_target=inference_cluster,
    node_count=2)

parallel_step_name = "batchscoring-" + datetime.now().strftime("%Y%m%d%H%M")

# Create the parallel run step
parallelrun_step = ParallelRunStep(
    name=parallel_step_name,
    parallel_run_config=parallel_run_config,
    inputs=[batch_data_set.as_named_input('batch_data')],
    output=output_dir,
    arguments=[],
    allow_reuse=True
)

print('Steps defined')

Steps defined


# Run the pipeline

In [34]:
from azureml.core import Experiment
from azureml.pipeline.core import Pipeline

# Create the pipeline
pipeline = Pipeline(workspace=ws, steps=[parallelrun_step])

# Run the pipeline as an experiment
pipeline_run = Experiment(ws, 'pytown-energy-demand-batch').submit(pipeline)
pipeline_run.wait_for_completion(show_output=True)

Created step batchscoring-202107311955 [f431e354][591b0fbb-7303-48d3-a748-32bfa9cd5f1d], (This step is eligible to reuse a previous run's output)
Submitted PipelineRun b7f54fc5-eb51-44cf-8a51-8290e746fcd6
Link to Azure Machine Learning Portal: https://ml.azure.com/runs/b7f54fc5-eb51-44cf-8a51-8290e746fcd6?wsid=/subscriptions/e7d71274-b7c4-47ed-9751-2505b563b918/resourcegroups/mlopsgroup/workspaces/mlopsworkspace&tid=a0f1cacd-618c-4403-b945-76fb3d6874e5
PipelineRunId: b7f54fc5-eb51-44cf-8a51-8290e746fcd6
Link to Azure Machine Learning Portal: https://ml.azure.com/runs/b7f54fc5-eb51-44cf-8a51-8290e746fcd6?wsid=/subscriptions/e7d71274-b7c4-47ed-9751-2505b563b918/resourcegroups/mlopsgroup/workspaces/mlopsworkspace&tid=a0f1cacd-618c-4403-b945-76fb3d6874e5
PipelineRun Status: NotStarted
PipelineRun Status: Running

PipelineRun Execution Summary
PipelineRun Status: Finished
{'runId': 'b7f54fc5-eb51-44cf-8a51-8290e746fcd6', 'status': 'Completed', 'startTimeUtc': '2021-07-31T18:58:14.199303Z', 

AttributeError: 'Run' object has no attribute 'get_output_data'

# Retrieve predictions

In [35]:
import pandas as pd
import shutil

# Remove the local results folder if left over from a previous run
shutil.rmtree('batch-results', ignore_errors=True)

# Get the run for the first step and download its output
prediction_run = next(pipeline_run.get_children())
prediction_output = prediction_run.get_output_data('inferences')
prediction_output.download(local_path='batch-results')

# Traverse the folder hierarchy and find the results file
for root, dirs, files in os.walk('batch-results'):
    for file in files:
        if file.endswith('parallel_run_step.txt'):
            result_file = os.path.join(root,file)

# cleanup output format
df = pd.read_csv(result_file, delimiter=":", header=None)
df.columns = ["File", "Prediction"]

# Display the first 20 results
df.head(20)

AttributeError: 'Run' object has no attribute 'get_output_data'

# Publish the pipeline

In [36]:
published_pipeline = pipeline_run.publish_pipeline(name='Fourier_regression_batch_prediction_pipeline',
                                                   description='Batch scoring using linear regression model with Fourier ML features',
                                                   version='1.0')

published_pipeline

Name,Id,Status,Endpoint
Fourier_regression_batch_prediction_pipeline,f2695278-3a6d-4542-ae2a-39537a7a3376,Active,REST Endpoint


# Get REST endpoint

In [37]:
rest_endpoint = published_pipeline.endpoint
print(rest_endpoint)

https://westeurope.api.azureml.ms/pipelines/v1.0/subscriptions/e7d71274-b7c4-47ed-9751-2505b563b918/resourceGroups/mlopsgroup/providers/Microsoft.MachineLearningServices/workspaces/mlopsworkspace/PipelineRuns/PipelineSubmit/f2695278-3a6d-4542-ae2a-39537a7a3376


# Schedule the pipeline to run every Monday at 04:00 in the morning (02:00 UTC)

In [38]:
from azureml.pipeline.core import ScheduleRecurrence, Schedule

weekly = ScheduleRecurrence(frequency='Week', interval=1, week_days=["Monday"], time_of_day="02:00")
pipeline_schedule = Schedule.create(ws, name='Weekly Predictions',
                                        description='batch inferencing',
                                        pipeline_id=published_pipeline.id,
                                        experiment_name='Batch_Prediction',
                                        recurrence=weekly)

# Disable pipeline with active schedule

In [39]:
ss = Schedule.list(ws)
for s in ss:
    print(s)

Pipeline(Name: Weekly Predictions,
Id: e14c5ce5-db2c-462b-97f4-8c1c21faa31b,
Status: Active,
Pipeline Id: f2695278-3a6d-4542-ae2a-39537a7a3376,
Pipeline Endpoint Id: None,
Recurrence Details: Runs at 2:00 on Monday every Week)


In [40]:
def stop_by_schedule_id(ws, schedule_id):
    s = next(s for s in Schedule.list(ws) if s.id == schedule_id)
    s.disable()
    return s

schedule_id = 'e14c5ce5-db2c-462b-97f4-8c1c21faa31b'
stop_by_schedule_id(ws, schedule_id)

Name,Id,Status,Pipeline Id,Pipeline Endpoint Id,Recurrence Details
Weekly Predictions,e14c5ce5-db2c-462b-97f4-8c1c21faa31b,Disabled,f2695278-3a6d-4542-ae2a-39537a7a3376,,Runs at 2:00 on Monday every Week
