### Connect to the workspace

In [1]:
import azureml.core
from azureml.core import Workspace

# Load the workspace from the saved config file
ws = Workspace.from_config()
print('Ready to use Azure ML {} to work with {}'.format(azureml.core.VERSION, ws.name))

Ready to use Azure ML 1.47.0 to work with mlw-test


### Generate and upload batch data
Generate a random sample from our diabetes CSV file, upload that data to a datastore in the Azure Machine Learning workspace, and register a dataset for it.

In [2]:
from azureml.core import Datastore, Dataset
import pandas as pd
import os

# Set default data store
ws.set_default_datastore('workspaceblobstore')
default_ds = ws.get_default_datastore()

# Enumerate all datastores, indicating which is the default
for ds_name in ws.datastores:
    print(ds_name, "- Default =", ds_name == default_ds.name)

# Load the diabetes data
diabetes = pd.read_csv('../../data/diabetes/diabetes.csv')

# Get a 100-item sample of the feature columns (not the diabetic label)
sample = diabetes[['Pregnancies','PlasmaGlucose','DiastolicBloodPressure','TricepsThickness','SerumInsulin','BMI','DiabetesPedigree','Age']].sample(n=100).values

# Create a folder
batch_folder = './diabetes-batch-data'
os.makedirs(batch_folder, exist_ok=True)
print("Folder created!")

# Save each sample as a separate file
print("Saving files...")
for i in range(100):
    fname = str(i+1) + '.csv'
    sample[i].tofile(os.path.join(batch_folder, fname), sep=",")
print("files saved!")

# Upload the files to the default datastore
print("Uploading files to datastore...")
default_ds = ws.get_default_datastore()
default_ds.upload(src_dir="diabetes-batch-data", target_path="diabetes-batch-data", overwrite=True, show_progress=True)

# Register a dataset for the input data
batch_data_set = Dataset.File.from_files(path=(default_ds, 'diabetes-batch-data/'), validate=False)
try:
    batch_data_set = batch_data_set.register(workspace=ws, 
                                             name='diabetes-batch-data',
                                             description='Diabetes batch data',
                                             create_new_version=True)
except Exception as ex:
    print(ex)

print("Done!")

workspaceworkingdirectory - Default = False
workspaceartifactstore - Default = False
workspacefilestore - Default = False
workspaceblobstore - Default = True
Folder created!
Saving files...
files saved!
Uploading files to datastore...
Uploading an estimated of 102 files
Uploading diabetes-batch-data/.amlignore
Uploaded diabetes-batch-data/.amlignore, 1 files out of an estimated total of 102
Uploading diabetes-batch-data/.amlignore.amltmp
Uploaded diabetes-batch-data/.amlignore.amltmp, 2 files out of an estimated total of 102
Uploading diabetes-batch-data/1.csv
Uploaded diabetes-batch-data/1.csv, 3 files out of an estimated total of 102
Uploading diabetes-batch-data/10.csv
Uploaded diabetes-batch-data/10.csv, 4 files out of an estimated total of 102
Uploading diabetes-batch-data/100.csv
Uploaded diabetes-batch-data/100.csv, 5 files out of an estimated total of 102
Uploading diabetes-batch-data/11.csv
Uploaded diabetes-batch-data/11.csv, 6 files out of an estimated total of 102
Uploading

### Let's take a look at models registered in the workspace

In [3]:
from azureml.core import Model

for model in Model.list(ws):
    print(model.name, 'version:', model.version)
    for tag_name in model.tags:
        tag = model.tags[tag_name]
        print ('\t',tag_name, ':', tag)
    for prop_name in model.properties:
        prop = model.properties[prop_name]
        print ('\t',prop_name, ':', prop)
    print('\n')

diabetes-model version: 7
	 Algorithm : Decision Tree Classifier
	 Training context : Pipeline
	 AUC : 0.8832546157718848
	 Accuracy : 0.898


diabetes-model version: 6
	 Algorithm : Logistic Regression
	 Training context : Tabular diabetes data asset
	 AUC : 0.8568650620553335
	 Accuracy : 0.7893333333333333
	 Regularization Rate : 0.1


diabetes-model version: 5
	 Algorithm : Decision Tree Classifier
	 Training context : Pipeline
	 AUC : 0.88375696004516
	 Accuracy : 0.8986666666666666


diabetes-model version: 4
	 Algorithm : Logistic Regression
	 Training context : Tabular diabetes data asset
	 AUC : 0.8568650620553335
	 Accuracy : 0.7893333333333333
	 Regularization Rate : 0.1


diabetes-model version: 3
	 Training context : Tabular diabetes data asset
	 AUC : 0.8568650620553335
	 Accuracy : 0.7893333333333333
	 Regularization Rate : 0.1


diabetes-model version: 2
	 Training context : Tabular diabetes data asset
	 AUC : 0.8568650620553335
	 Accuracy : 0.7893333333333333
	 Regular

### Get the model that we want to deploy. By default, if we specify a model name, the latest version will be returned.

In [4]:
model = ws.models['diabetes-model']
print(model.name, 'version', model.version)

diabetes-model version 7


### Create compute

In [5]:
from azureml.core.compute import ComputeTarget, AmlCompute
from azureml.core.compute_target import ComputeTargetException

cluster_name = "compute-cluster-ds3-v2"

try:
    # Check for existing compute target
    inference_cluster = ComputeTarget(workspace=ws, name=cluster_name)
    print('Found existing cluster, use it.')
except ComputeTargetException:
    # If it doesn't already exist, create it
    try:
        compute_config = AmlCompute.provisioning_configuration(vm_size='STANDARD_DS3_V2', max_nodes=2)
        inference_cluster = ComputeTarget.create(ws, cluster_name, compute_config)
        inference_cluster.wait_for_completion(show_output=True)
    except Exception as ex:
        print(ex)

Found existing cluster, use it.


### Create a pipeline for batch inferencing

In [6]:
import os
# Create a folder for the experiment files
experiment_folder = 'diabetes-batch-pipeline'
os.makedirs(experiment_folder, exist_ok=True)

print(experiment_folder)

diabetes-batch-pipeline


### Create the batch inferencing script

In [7]:
%%writefile $experiment_folder/batch_diabetes.py
import os
import numpy as np
from azureml.core import Model
import joblib


def init():
    # Runs when the pipeline step is initialized
    global model

    # load the model
    model_path = Model.get_model_path('diabetes_model')
    model = joblib.load(model_path)


def run(mini_batch):
    # This runs for each batch
    resultList = []

    # process each file in the batch
    for f in mini_batch:
        # Read the comma-delimited data into an array
        data = np.genfromtxt(f, delimiter=',')
        # Reshape into a 2-dimensional array for prediction (model expects multiple items)
        prediction = model.predict(data.reshape(1, -1))
        # Append prediction to results
        resultList.append("{}: {}".format(os.path.basename(f), prediction[0]))
    return resultList

Overwriting diabetes-batch-pipeline/batch_diabetes.py


### Create a conda environment for the pipeline

In [None]:
%%writefile $experiment_folder/diabetes_batch_environment.yml
name: diabetes-batch-environment
dependencies:
- python=3.6.2
- scikit-learn
- pip
- pip:
  - azureml-defaults

### Define a run context that includes the conda environment

In [8]:
from azureml.core import Environment
from azureml.core.runconfig import DEFAULT_CPU_IMAGE

# Create an Environment for the experiment
batch_env = Environment.from_conda_specification("diabetes-batch-experiment_env", experiment_folder + "/diabetes_batch_environment.yml")
batch_env.docker.base_image = DEFAULT_CPU_IMAGE
print('Configuration ready.')

Configuration ready.


### Define the batch inferencing pipeline
You're going to use a pipeline to run the batch prediction script, generate predictions from the input data, and save the results as a text file in the output folder. To do this, you can use a ParallelRunStep, which enables the batch data to be processed in parallel and the results collated in a single output file named parallel_run_step.txt.

In [9]:
from azureml.pipeline.steps import ParallelRunConfig, ParallelRunStep
from azureml.data import OutputFileDatasetConfig

output_dir = OutputFileDatasetConfig(name='inferences')

parallel_run_config = ParallelRunConfig(
    source_directory=experiment_folder,
    entry_script="batch_diabetes.py",
    mini_batch_size="5",
    error_threshold=10,
    output_action="append_row",
    environment=batch_env,
    compute_target=inference_cluster,
    node_count=2)

parallelrun_step = ParallelRunStep(
    name='batch-score-diabetes',
    parallel_run_config=parallel_run_config,
    inputs=[batch_data_set.as_named_input('diabetes_batch')],
    output=output_dir,
    arguments=[],
    allow_reuse=True
)

print('Steps defined')

Steps defined


### Run the pipeline

In [10]:
from azureml.core import Experiment
from azureml.pipeline.core import Pipeline

pipeline = Pipeline(workspace=ws, steps=[parallelrun_step])
pipeline_run = Experiment(ws, 'diabetes-batch').submit(pipeline)
pipeline_run.wait_for_completion(show_output=True)

Created step batch-score-diabetes [a29e44ca][cc23a2a6-62bf-4bdc-ae99-bc14aaeb6d00], (This step is eligible to reuse a previous run's output)
Submitted PipelineRun ccf6d2a2-02c4-4f01-9823-ed2eaaf37cb2
Link to Azure Machine Learning Portal: https://ml.azure.com/runs/ccf6d2a2-02c4-4f01-9823-ed2eaaf37cb2?wsid=/subscriptions/fad2758e-cc1a-4f45-8323-6b466001d50a/resourcegroups/rg-ml-test/workspaces/mlw-test&tid=67119780-27b3-4331-813a-0b239d67524a
PipelineRunId: ccf6d2a2-02c4-4f01-9823-ed2eaaf37cb2
Link to Azure Machine Learning Portal: https://ml.azure.com/runs/ccf6d2a2-02c4-4f01-9823-ed2eaaf37cb2?wsid=/subscriptions/fad2758e-cc1a-4f45-8323-6b466001d50a/resourcegroups/rg-ml-test/workspaces/mlw-test&tid=67119780-27b3-4331-813a-0b239d67524a
PipelineRun Status: NotStarted
PipelineRun Status: Running

PipelineRun Execution Summary
PipelineRun Status: Finished
{'runId': 'ccf6d2a2-02c4-4f01-9823-ed2eaaf37cb2', 'status': 'Completed', 'startTimeUtc': '2023-02-01T08:52:32.509157Z', 'endTimeUtc': '20

'Finished'

### Retrieve the batch inference results
When the pipeline has finished running, the resulting predictions will have been saved in the outputs of the experiment associated with the first (and only) step in the pipeline. You can retrieve it as follows:

In [11]:
import pandas as pd
import shutil

# Remove the local results folder if left over from a previous run
shutil.rmtree('diabetes-batch-results', ignore_errors=True)

# Get the run for the first step and download its output
prediction_run = next(pipeline_run.get_children())
prediction_output = prediction_run.get_output_data('inferences')
prediction_output.download(local_path='diabetes-batch-results')

# Traverse the folder hierarchy and find the results file
for root, dirs, files in os.walk('diabetes-batch-results'):
    for file in files:
        if file.endswith('parallel_run_step.txt'):
            result_file = os.path.join(root,file)

# cleanup output format
df = pd.read_csv(result_file, delimiter=":", header=None)
df.columns = ["File", "Prediction"]

# Display the first 20 results
df.head(20)

Unnamed: 0,File,Prediction
0,11.csv,0
1,12.csv,0
2,13.csv,1
3,14.csv,0
4,15.csv,0
5,16.csv,0
6,17.csv,0
7,18.csv,1
8,19.csv,1
9,2.csv,0


### Publish the Pipeline and use its REST Interface
Now that you have a working pipeline for batch inferencing, you can publish it and use a REST endpoint to run it from an application.

In [12]:
published_pipeline = pipeline_run.publish_pipeline(
    name='diabetes-batch-pipeline', description='Batch scoring of diabetes data', version='1.0')

published_pipeline

Name,Id,Status,Endpoint
diabetes-batch-pipeline,fd36a80f-9fbb-45a6-a005-62dbd6c9d04c,Active,REST Endpoint


Note that the published pipeline has an endpoint, which you can see in the Azure portal. You can also find it as a property of the published pipeline object:

In [14]:
rest_endpoint = published_pipeline.endpoint
print(rest_endpoint)

https://westeurope.api.azureml.ms/pipelines/v1.0/subscriptions/fad2758e-cc1a-4f45-8323-6b466001d50a/resourceGroups/rg-ml-test/providers/Microsoft.MachineLearningServices/workspaces/mlw-test/PipelineRuns/PipelineSubmit/fd36a80f-9fbb-45a6-a005-62dbd6c9d04c


To use the endpoint, client applications need to make a REST call over HTTP. This request must be authenticated, so an authorization header is required. To test this out, we'll use the authorization header from your current connection to your Azure workspace, which you can get using the following code:
> **Note**: A real application would require a service principal with which to be authenticated.

In [13]:
from azureml.core.authentication import InteractiveLoginAuthentication

interactive_auth = InteractiveLoginAuthentication()
auth_header = interactive_auth.get_authentication_header()
print('Authentication header ready.')

Authentication header ready.


Now we're ready to call the REST interface. The pipeline runs asynchronously, so we'll get an identifier back, which we can use to track the pipeline experiment as it runs:

In [15]:
import requests

rest_endpoint = published_pipeline.endpoint
response = requests.post(rest_endpoint, 
                         headers=auth_header, 
                         json={"ExperimentName": "diabetes-batch"})
run_id = response.json()["Id"]
run_id

'374f5f94-c5b2-401d-b047-8d148eab317c'

Since we have the run ID, we can use the RunDetails widget to view the experiment as it runs:

In [18]:
from azureml.pipeline.core.run import PipelineRun
from azureml.widgets import RunDetails

published_pipeline_run = PipelineRun(ws.experiments['diabetes-batch'], run_id)

# Block until the run completes
published_pipeline_run.wait_for_completion(show_output=True)

PipelineRunId: 374f5f94-c5b2-401d-b047-8d148eab317c
Link to Azure Machine Learning Portal: https://ml.azure.com/runs/374f5f94-c5b2-401d-b047-8d148eab317c?wsid=/subscriptions/fad2758e-cc1a-4f45-8323-6b466001d50a/resourcegroups/rg-ml-test/workspaces/mlw-test&tid=67119780-27b3-4331-813a-0b239d67524a

PipelineRun Execution Summary
PipelineRun Status: Finished
{'runId': '374f5f94-c5b2-401d-b047-8d148eab317c', 'status': 'Completed', 'startTimeUtc': '2023-02-01T08:54:58.54624Z', 'endTimeUtc': '2023-02-01T08:54:59.68383Z', 'services': {}, 'properties': {'azureml.runsource': 'azureml.PipelineRun', 'runSource': 'Unavailable', 'runType': 'HTTP', 'azureml.parameters': '{}', 'azureml.continue_on_step_failure': 'False', 'azureml.continue_on_failed_optional_input': 'True', 'azureml.pipelineid': 'fd36a80f-9fbb-45a6-a005-62dbd6c9d04c', 'azureml.pipelineComponent': 'pipelinerun', 'azureml.pipelines.stages': '{"Initialization":null,"Execution":{"StartTime":"2023-02-01T08:54:58.8475964+00:00","EndTime":"2

'Finished'

Wait for the pipeline run to complete, and then run the following cell to see the results.

As before, the results are in the output of the first pipeline step:

In [19]:
import pandas as pd
import shutil

# Remove the local results folder if left over from a previous run
shutil.rmtree('diabetes-batch-results', ignore_errors=True)

# Get the run for the first step and download its output
prediction_run = next(pipeline_run.get_children())
prediction_output = prediction_run.get_output_data('inferences')
prediction_output.download(local_path='diabetes-batch-results')

# Traverse the folder hierarchy and find the results file
for root, dirs, files in os.walk('diabetes-batch-results'):
    for file in files:
        if file.endswith('parallel_run_step.txt'):
            result_file = os.path.join(root,file)

# cleanup output format
df = pd.read_csv(result_file, delimiter=":", header=None)
df.columns = ["File", "Prediction"]

# Display the first 20 results
df.head(20)

Unnamed: 0,File,Prediction
0,11.csv,0
1,12.csv,0
2,13.csv,1
3,14.csv,0
4,15.csv,0
5,16.csv,0
6,17.csv,0
7,18.csv,1
8,19.csv,1
9,2.csv,0
