# Azure Machine Learning Service example
### Deploy and publish scoring pipeline

*This notebook shows you how to:*
- Build a pipeline using ParallelRunStep or PythonScriptStep
- Publish pipeline
- Deploy the pipelie as REST end point
- Request to score via the HTTP endpoint with basic authentication

These similar to [tutorial](https://docs.microsoft.com/en-us/azure/machine-learning/tutorial-pipeline-batch-scoring-classification) published in Microsoft documentation.  

Difference between the existing batch scoring example and this example:

- This version focus in demonstrating the code for the pipeline using a simple problem
- It is much easier to register and configure the assets (e.g. Dataset, Compute) through the Azure Machine Learning Service web interface, so the assumption is that they have already been created and the pipeline code will use **get** instead of **register** to get references to those assets in your workspace.

## Setup workspace
- Workspace created in Azure Machine Learning service - [howto](https://docs.microsoft.com/en-us/azure/machine-learning/how-to-manage-workspace?tabs=python)
- Updated the cell below with your workspace details and run the cell to connect to workspace

In [1]:
import json
import os
config_file = '_config.json'

subscription_id='xxxxx' # Subscription ID of the workspace
resource_group='xxxxx' # Resource group of the workspace
workspace_name = "xxxxxx" # Name of your workspace

if os.path.isfile(config_file):
    with open(config_file, 'r') as f:
        configs = json.load(f)
        subscription_id = configs['subscription_id']
        resource_group = configs['resource_group']
        workspace_name = configs['workspace_name']

from azureml.core import Workspace

ws = Workspace.get(name=workspace_name,
               subscription_id=subscription_id,
               resource_group=resource_group)


### Connect to Workspace

### Initialise
- Updated the asset names in the cell below (optional)
- Execute to initialise variables, create temp directory for storing files for upload

In [2]:
# Asset names 
model_dir = 'simple_model'
model_name = model_dir
model_file = 'model_v{}.json'
experiment_name = "pipeline_simple"
compute_name = "DS3-4c-14G"
environment_name = "test_minimal"
dataset_parent_path = "data"
dataset_name = experiment_name

# Temp local directory for storing created files that will be uploaded to AML for the pipeline
local_temp_dir = './_temp'

import os 
local_path_to_model = os.path.join(local_temp_dir, model_dir)
if not os.path.isdir(local_path_to_model):
    os.makedirs(local_path_to_model)
    
import azureml.core
print("SDK version:", azureml.core.VERSION)



SDK version: 1.20.0


### Register Dataset
1. Generate sample input dataset csv file by executing the next cell

In [3]:
%%writefile ./_temp/pipeline_simple_input.csv
id,gender,age
1,F,30
2,F,50
3,M,55
4,M,23

Overwriting ./_temp/pipeline_simple_input.csv


2. Use the Azure ML web interface to register the dataset
    - Load your AML workspace in the web browser
    - Click on 'Dataset' option
    - Use the register action to register this input csv as a dataset in your workspace, use the name 'pipeline_simple' as the dataset name

### Get reference to input dataset

In [4]:
from azureml.core.dataset import Dataset 
input_data = Dataset.get_by_name(ws, dataset_name)

### Create and register model 

In [5]:
import json

# Create model
class MyModel:
    
    def load_model(file_path):
        with open(file_path, 'r') as f:
            loaded_params = json.load(f)
            return MyModel(loaded_params)
    
    def __init__(self, params):
        self._params = params
    
    def predict(self, data):
        data['prediction'] = data.age < self._params['age_average']
        return data[['id', 'prediction']]

    

params = {'age_average':40}
local_model_path = os.path.join(local_path_to_model, model_file)

with open(local_model_path, 'w') as f:
    json.dump(params, f)
# Test model code
mymodel = MyModel.load_model(local_model_path)
print(str(mymodel._params))



{'age_average': 40}


### Get compute target

In [6]:
from azureml.core.compute import AmlCompute, ComputeTarget
from azureml.exceptions import ComputeTargetException

# checks to see if compute target already exists in workspace, else create it
try:
    compute_target = ComputeTarget(workspace=ws, name=compute_name)
except ComputeTargetException as e:
    config = AmlCompute.provisioning_configuration(vm_size="STANDARD_NC6",
                                                   vm_priority="lowpriority", 
                                                   min_nodes=0, 
                                                   max_nodes=1)

    compute_target = ComputeTarget.create(workspace=ws, name=compute_name, provisioning_configuration=config)
    compute_target.wait_for_completion(show_output=True, min_node_count=None, timeout_in_minutes=20)

### Define environment
- Create requirements file with dependencies require for running your model.

In [7]:
%%writefile ./_temp/requirements.txt
pandas
numpy
azureml-core
azureml-dataset-runtime

Overwriting ./_temp/requirements.txt


In [9]:
from azureml.core import Environment
# Pipeline doesn't like using curated environment, create a new one 
env = Environment.from_pip_requirements(name = environment_name, file_path = os.path.join(local_temp_dir, 'requirements.txt'))

### Build & run pipeline
#### 1. Create scoring file
This is a python script for making prediction based on your model.  This [section](https://docs.microsoft.com/en-au/azure/machine-learning/how-to-deploy-existing-model#entry-script-scorepy) in Azure documentation explains how this file works.  Note how this scoring script uses azureml.core modules, hence we need the azureml-core azure-datatime-runtime included as dependencies when defining the environment previously

In [10]:
%%writefile ./_temp/scoring.py

from azureml.core import Run
from azureml.core.model import Model
from azureml.core.dataset import Dataset

import os
from datetime import datetime
import argparse


import pandas as pd
import json


def init():
    global model
    
    class MyModel:

        def load_model(file_path):
            with open(file_path, 'r') as f:
                loaded_params = json.load(f)
                return MyModel(loaded_params)

        def __init__(self, params):
            self._params = params

        def predict(self, data):
            data['prediction'] = data.age < self._params['age_average']
            return data[['id', 'prediction']]
        
    # Read from parameters from argument, e.g. which model to use
    parser = argparse.ArgumentParser()
    parser.add_argument('--model_name', dest="model_name", required=True)
    args, _ = parser.parse_known_args()

    
    model_dir = 'simple_model'
    model_name = args.model_name
    print(str(datetime.now()) + ': init()')

    model_path = Model.get_model_path(model_dir) + '/' + model_name + '.json'
    model = MyModel.load_model(model_path)
    print(str(datetime.now()) + ': Model loaded')


def run(data):
    print(str(datetime.now()) + ': run()')
    output = model.predict(data)
    print(str(datetime.now()) + ': run completed()')
    return output

def local_test():
    init()
    input_data = pd.read_csv(os.path.join('_temp/pipeline_simple_input.csv'))
    print(run(input_data))


Writing ./_temp/batch_scoring.py


#### 2. Define the pipeline
- Here, we are building a pipeline with only one step.  A pipeline can contain multiple steps that perform post-processing.

In [11]:
from azureml.core import Datastore
from azureml.core import Experiment
from azureml.pipeline.core import Pipeline, PipelineParameter

def_data_store = Datastore.get_default(ws)
score_script = os.path.join(local_temp_dir, "scoring.py")

def parallel_run_step():
    from azureml.pipeline.steps import ParallelRunConfig
    from azureml.pipeline.core import PipelineData

    parallel_run_config = ParallelRunConfig(
        environment=env,
        entry_script=score_script,
        source_directory=".",
        output_action="append_row",
        append_row_file_name="parallel_run_step.txt",
        compute_target=compute_target,
        error_threshold=1,
        node_count=1,
        process_count_per_node=2
    )

    from azureml.pipeline.steps import ParallelRunStep
    from datetime import datetime

    parallel_step_name = "scoring-" + datetime.now().strftime("%Y%m%d%H%M")
    output_dir = PipelineData(name=dataset_name, datastore=def_data_store)
    model_name_param = PipelineParameter(name="model_arg", default_value=model_name)

    score_step = ParallelRunStep(
        name=parallel_step_name,
        inputs=[input_data.as_named_input("input")],
        output=output_dir,
        arguments=["--model_name", model_name_param],
        parallel_run_config=parallel_run_config,
        allow_reuse=False
    )
    return score_step

def python_step_step():
    return PythonScriptStep(script_name=score_script,
                             arguments=arguments=["--model_name", model_name_param],
                             target=compute_target,
                             source_directory=".")
score_step = parallel_run_step()
pipeline = Pipeline(workspace=ws, steps=[score_step])

ParallelRunStep requires azureml-dataset-runtime[fuse,pandas] for tabular dataset.
Please add relevant package in CondaDependencies.


#### 3. Submit the pipeline as an experiment 

In [12]:
import time
t0 = time.time()
pipeline_run = Experiment(ws, experiment_name).submit(pipeline)
print(pipeline_run)
pipeline_run.wait_for_completion(show_output=True)
print('Completed in {}'.format(time.time() - t0))

Created step batchscoring-202101151600 [7d1d18b8][6cf11253-38c3-4064-8892-1fc492853955], (This step will run and generate new outputs)
Submitted PipelineRun 0fc7480b-06ae-4643-a05f-d8e9ad0d18c7
Link to Azure Machine Learning Portal: https://ml.azure.com/experiments/pipeline_simple/runs/0fc7480b-06ae-4643-a05f-d8e9ad0d18c7?wsid=/subscriptions/2643b3a6-256f-49a8-81a2-16283d061433/resourcegroups/azuremltest/workspaces/hungap_test_2021
Run(Experiment: pipeline_simple,
Id: 0fc7480b-06ae-4643-a05f-d8e9ad0d18c7,
Type: azureml.PipelineRun,
Status: Running)
PipelineRunId: 0fc7480b-06ae-4643-a05f-d8e9ad0d18c7
Link to Azure Machine Learning Portal: https://ml.azure.com/experiments/pipeline_simple/runs/0fc7480b-06ae-4643-a05f-d8e9ad0d18c7?wsid=/subscriptions/2643b3a6-256f-49a8-81a2-16283d061433/resourcegroups/azuremltest/workspaces/hungap_test_2021
PipelineRun Status: Running


StepRunId: 0955963e-984b-4d67-bb42-88de845e1f57
Link to Azure Machine Learning Portal: https://ml.azure.com/experiments/p


Streaming azureml-logs/75_job_post-tvmps_254caf111fca5ea9768480baf7421c59a2a87174afc300de44eb1d1f8ee51319_d.txt
[2021-01-15T06:07:42.552299] Entering job release
[2021-01-15T06:07:43.640686] Starting job release
[2021-01-15T06:07:43.641630] Logging experiment finalizing status in history service.
Starting the daemon thread to refresh tokens in background for process with pid = 1148
[2021-01-15T06:07:43.642255] job release stage : upload_datastore starting...
[2021-01-15T06:07:43.651496] job release stage : start importing azureml.history._tracking in run_history_release.
[2021-01-15T06:07:43.651977] job release stage : copy_batchai_cached_logs starting...
[2021-01-15T06:07:43.652296] job release stage : copy_batchai_cached_logs completed...
[2021-01-15T06:07:43.652336] job release stage : execute_job_release starting...
[2021-01-15T06:07:43.653047] Entering context manager injector.
[2021-01-15T06:07:43.868076] job release stage : upload_datastore completed...
[2021-01-15T06:07:43.946

ActivityFailedException: ActivityFailedException:
	Message: Activity Failed:
{
    "error": {
        "code": "UserError",
        "message": "AzureMLCompute job failed.\nJobFailed: Submitted script failed with a non-zero exit code; see the driver log file for details.\n\tReason: Job failed with non-zero exit Code",
        "messageFormat": "{Message}",
        "messageParameters": {
            "Message": "AzureMLCompute job failed.\nJobFailed: Submitted script failed with a non-zero exit code; see the driver log file for details.\n\tReason: Job failed with non-zero exit Code"
        },
        "details": [],
        "innerError": {
            "code": "BadArgument",
            "innerError": {
                "code": "AmlComputeBadRequest"
            }
        }
    },
    "correlation": {
        "operation": null,
        "request": "14c3333c99d0a5c2"
    },
    "environment": "westus2",
    "location": "westus2",
    "time": "2021-01-15T06:07:55.243737Z",
    "componentName": "execution-worker"
}
	InnerException None
	ErrorResponse 
{
    "error": {
        "message": "Activity Failed:\n{\n    \"error\": {\n        \"code\": \"UserError\",\n        \"message\": \"AzureMLCompute job failed.\\nJobFailed: Submitted script failed with a non-zero exit code; see the driver log file for details.\\n\\tReason: Job failed with non-zero exit Code\",\n        \"messageFormat\": \"{Message}\",\n        \"messageParameters\": {\n            \"Message\": \"AzureMLCompute job failed.\\nJobFailed: Submitted script failed with a non-zero exit code; see the driver log file for details.\\n\\tReason: Job failed with non-zero exit Code\"\n        },\n        \"details\": [],\n        \"innerError\": {\n            \"code\": \"BadArgument\",\n            \"innerError\": {\n                \"code\": \"AmlComputeBadRequest\"\n            }\n        }\n    },\n    \"correlation\": {\n        \"operation\": null,\n        \"request\": \"14c3333c99d0a5c2\"\n    },\n    \"environment\": \"westus2\",\n    \"location\": \"westus2\",\n    \"time\": \"2021-01-15T06:07:55.243737Z\",\n    \"componentName\": \"execution-worker\"\n}"
    }
}

### Publish pipeline as a REST endpoint

In [None]:
published_pipeline = pipeline_run.publish_pipeline(
    name=experiment_name, description="Batching scoring", version="1.0")

published_pipeline

### Start a job via REST endpoint

In [None]:
from azureml.core.authentication import InteractiveLoginAuthentication

interactive_auth = InteractiveLoginAuthentication()
auth_header = interactive_auth.get_authentication_header()

In [None]:
import requests

rest_endpoint = published_pipeline.endpoint
response = requests.post(rest_endpoint, 
                         headers=auth_header, 
                         json={"ExperimentName": experiment_name,
                               "ParameterAssignments": {"model_name": model_name}})
try:
    response.raise_for_status()
except Exception:    
    raise Exception("Received bad response from the endpoint: {}\n"
                    "Response Code: {}\n"
                    "Headers: {}\n"
                    "Content: {}".format(rest_endpoint, response.status_code, response.headers, response.content))

run_id = response.json().get('Id')
print('Submitted pipeline run: ', run_id)