# Chapter 12. Machine Learning: Automating and Consuming

[*Applied Machine Learning for Health and Fitness*](https://www.apress.com/9781484257715) by Kevin Ashley (Apress, 2020).

[*Video Course*](http://ai-learning.vhx.tv) Need a deep dive? Watch my [*video course*](http://ai-learning.vhx.tv) that complements this book with additional examples and video-walkthroughs. 

[*Web Site*](http://activefitness.ai) for research and supplemental materials.

> There\'s a way to do it better - find it.
>
> Thomas A. Edison

![](images/ch12/fig_12-1.PNG)

Overview
========

In the previous chapter you used some of the cloud tools to label and process our data, train the models and register them in the cloud. As a researcher, you are likely to train quite a few models, changing your training script, experimenting with data, before making them available to your customers. This chapter is about making AI a high quality, automated process, that makes it easy to manage your code, publish models and consume them. You'll see the term CI/CD (Continuous Integration/Continuous Delivery) many times referring to the development cycle in machine learning, and in this chapter I'll be going over some practical examples of taking your research to the level of best practices and standards used in modern data science.

Managing models
===============

In the last chapter you trained a classification model that can classify images of sport activities. As a data scientist you feel happy: your model converges, it predicts sport activities with good accuracy, and now your customer wants to use it. To make it available for your client, the model needs to be deployed, so you can give your clients something like a link to an API, and then they can use it in their own apps.

# Project: Registering, Deploying and Comsuming Models in the Cloud

The first step in deploying your models is registering them in the workspace, this saves them in the cloud so they can be used later from your code:

In [1]:
import azureml.core
azureml.core.VERSION

'1.1.5'

In [None]:
import azureml.core
from azureml.core import Workspace

workspace = Workspace.from_config()

In [15]:
# initial experiment configuration
experiment_name = 'activity-classification'
script_folder = 'activity-classification'
cluster_name = "pipeline-cluster"
model_file_name = 'activities.pkl'
labeled_dataset_name = 'Classifying activities-2020-03-15 00:54:26'
output_folder =  'outputs'
local_download_folder = './download/' 
env_name = 'pipeline-env'
experiment_name = 'pipeline-experiment'
model_name = 'activities'

In [3]:
from azureml.core.authentication import InteractiveLoginAuthentication
from azureml.core import Workspace

auth = InteractiveLoginAuthentication(tenant_id = '72f988bf-86f1-41af-91ab-2d7cd011db47')
workspace = Workspace.from_config(auth = auth)

In [9]:
from azureml.core.model import Model

model = Model.register(model_path = "./models",
                       model_name = "activities_classifier",
                       description = "Activity Classification",
                       tags = {'area':'classification'},
                       workspace = workspace)

Registering model activities_classifier


Now, referencing your models becomes super-easy, simply pass your workspace and model name and in your code, you have a reference to the model:

In [10]:
model = Model(workspace, 'activities_classifier')
print(model.name, model.version, model.tags)

activities_classifier 3 {'area': 'classification'}


You can check the path in the cloud of the model you just deployed, note that registration automatically versions your models:

In [12]:
Model.get_model_path('activities_classifier', _workspace=workspace)

'azureml-models\\activities_classifier\\3\\models\\activities.pkl'

The location of your registered model in the workspace will become important in the next steps, because you'll need to reference this model in your scoring script's initialization, when this model is loaded by your service.

Creating a scoring script
=========================

> Knowledge is a treasure, but practice is the key to it.
>
> Lao Tzu

You already created a script to train your model, another script you'll need is the scoring, or inferencing script, typically named score.py. The script is often specific to your model, in our case we use PyTorch and torchvision, but if you use a different machine learning model library, your script will have a similar structure but use methods specific to your environment and model for inference. The idea is, that the script runs in the context of a Web service or an API. The scoring script needs two methods: init() and run(). The first one, init() is executed once when the container with your model is started, and loads the model and classes into a global variable.

The run() method is invoked each time your model is called to predict something. For the run method, since our model classifies an activity based on an image, the image needs to be decoded first from an HTTP request, and then transformed according to the size, mean and standard deviation that our model was trained with in the previous chapter.

In [16]:
%%writefile $script_folder/score.py

import torch
import torch.nn as nn
from torchvision import transforms
import json
import base64
from io import BytesIO
from PIL import Image
import os
import pickle

from azureml.core.model import Model

def transform(image_file):
    t = transforms.Compose([transforms.Resize(256),
        transforms.CenterCrop(224),
        transforms.ToTensor(), 
        transforms.Normalize(mean = [0.485, 0.456, 0.406], 
        std = [0.229, 0.224, 0.225])])
    image = Image.open(image_file)
    image = t(image).float()
    image = torch.tensor(image)
    image = image.unsqueeze(0)
    return image

def decode_base64_to_img(base64_string):
    base64_image = base64_string.encode('utf-8')
    decoded_img = base64.b64decode(base64_image)
    return BytesIO(decoded_img)

def init():
    global model, classes
    #model_path = Model.get_model_path('activities')
    model_path = os.path.join(os.getenv('AZUREML_MODEL_DIR'), 'models', 'activities.pkl')
    model = torch.load(model_path, map_location=lambda storage, loc: storage)
    model.eval()
    #pkl_file = open(os.path.join(model_path,'class_names.pkl'), 'rb')
    #classes = pickle.load(pkl_file)
    #pkl_file.close() 
    classes = ['surfing','tennis']

def run(input_data):
    image = decode_base64_to_img(json.loads(input_data)['data'])
    image = transform(image)

    output = model(image)

    softmax = nn.Softmax(dim=1)
    pred_probs = softmax(model(image)).detach().numpy()[0]
    index = torch.argmax(output, 1)

    result = json.dumps({"label": classes[index], "probability": str(pred_probs[index])})
    return result

Overwriting activity-classification/score.py


Defining an environment
=======================

Your inference environment defines your machine learning Web service configuration. You can use either Anaconda or pip requirements file to create your environment. For example, for Anaconda use a YAML file similar to what conda env export command generates:

In [17]:
%%writefile $script_folder/activities.yml
name: Activities-PyTorch
dependencies:
  - python=3.6.2
  - pip:
    - azureml-defaults
    - azureml-core
    - azureml-contrib-dataset
    - azureml-dataprep[pandas,fuse]
    - inference-schema[numpy-support]
    - torch
    - torchvision
    - pillow

Overwriting activity-classification/activities.yml


Next, you create the environment by using either from\_pip\_requirements or from\_conda\_specification method:

In [18]:
from azureml.core.environment import Environment

env = Environment.from_conda_specification(name='Activities-PyTorch',file_path=script_folder+"/activities.yml")
env.register(workspace=workspace)

{
    "name": "Activities-PyTorch",
    "version": "2",
    "environmentVariables": {
        "EXAMPLE_ENV_VAR": "EXAMPLE_VALUE"
    },
    "python": {
        "userManagedDependencies": false,
        "interpreterPath": "python",
        "condaDependenciesFile": null,
        "baseCondaEnvironment": null,
        "condaDependencies": {
            "dependencies": [
                "python=3.6.2",
                {
                    "pip": [
                        "azureml-defaults",
                        "azureml-core",
                        "azureml-contrib-dataset",
                        "azureml-dataprep[pandas,fuse]",
                        "inference-schema[numpy-support]",
                        "torch",
                        "torchvision",
                        "pillow"
                    ]
                }
            ],
            "name": "azureml_29be90bc0029fbe93f78eab2d8a6383f"
        }
    },
    "docker": {
        "enabled": false,
        "baseImage"

Since you are in a big friendly cloud, there're many environments that you can easily reuse, just run this script to see a large number of environments that are available:

In [26]:
# List environments availablel in the cloud
envs = Environment.list(workspace=workspace)
for env in list(envs)[0:4]:
    if env.startswith("AzureML"):
        print("Name",env)
        print("packages", envs[env].python.conda_dependencies.serialize_to_string())

Name AzureML-TensorFlow-2.0-CPU
packages channels:
- conda-forge
dependencies:
- python=3.6.2
- pip:
  - azureml-core==1.2.0
  - azureml-defaults==1.2.0
  - azureml-telemetry==1.2.0
  - azureml-train-restclients-hyperdrive==1.2.0
  - azureml-train-core==1.2.0
  - tensorflow==2.0
  - horovod==0.18.1
name: azureml_a685c8fa2729bbbf4932e75b8eb0df54

Name AzureML-Chainer-5.1.0-GPU
packages channels:
- conda-forge
dependencies:
- python=3.6.2
- pip:
  - azureml-core==1.2.0
  - azureml-defaults==1.2.0
  - azureml-telemetry==1.2.0
  - azureml-train-restclients-hyperdrive==1.2.0
  - azureml-train-core==1.2.0
  - chainer==5.1.0
  - cupy-cuda90==5.1.0
  - mpi4py==3.0.0
name: azureml_43ae3494b9b7666919116b4a25139bcf

Name AzureML-VowpalWabbit-8.8.0
packages channels:
- conda-forge
dependencies:
- python=3.6.2
- pip:
  - azureml-core==1.2.0
  - azureml-defaults==1.2.0
  - azureml-dataprep[fuse,pandas]
name: azureml_eed6129d1cdd3d18a4f0f2b746ad4d83



Deploying models
================

There're many ways to deploy your models, as a local Web service, Azure Kubernetes Service (AKS), Azure Container Instances (ACI), Azure Functions and more. Each deployment type has advantages: for example, a Kubernetes based deployment is best for production level scalable deployments, while container instances is a fast and easy way to deploy.

In this example our model is trained with PyTorch and saved as a Python pickle .pkl file, Keras models are often saved as HDF5 .h5 files and Tensorflow saves models as protocol buffer .pb files. Open Neural Network Exchange or ONNX is a promising standard that deals with interoperability of model formats and AI tools: the initialization function of your scoring script is responsible for loading the model.

It is often easy to start with a local deployment while you're developing your model, this allows you to check for any problems in your scoring and initialization script. Let's test deploying our model on the local Web server, using port 8891 as an endpoint:

In [None]:
from azureml.core.environment import Environment
from azureml.core.model import InferenceConfig, Model
from azureml.core.webservice import LocalWebservice

def deploy_locally(model_name, port):
    model = Model(workspace, model_name)
    myenv = Environment.from_conda_specification(name="env", file_path=script_folder+"/activities.yml")
    inference_config = InferenceConfig(entry_script=script_folder+"/score.py", environment=myenv)
    deployment_config = LocalWebservice.deploy_configuration(port=port)
    return Model.deploy(workspace, model_name, [model], inference_config, deployment_config)

service = deploy_locally('activities', 8891)
service.wait_for_deployment(True)
print(service.port)

Behind the scenes, everything that happens locally will also be happening in the cloud, with our next steps. When you call Model.deploy method, your environment specification is used to build and start a docker container, the model is copied to the container, and the scoring script you created earlier is invoked at the initialization method. Let's deploy our service to the cloud now:

In [None]:
from azureml.core.webservice import AciWebservice, Webservice
from azureml.core.model import Model

service_name = 'activity-classification'
model = Model(workspace, 'activities')
deployment_config = AciWebservice.deploy_configuration(cpu_cores = 1, memory_gb = 1)
service = Model.deploy(workspace, service_name, [model], inference_config, deployment_config)
service.wait_for_deployment(show_output = True)
print(service.state)
print(service.get_logs())

This script looks very similar to the local deployment you just did with Model.deploy() but notice that deployment configuration is created with AciWebservice instead of LocalWebservice, and instead of the port you specified cpu\_cores and memory\_gb as parameters to size your deployment.

In the above example, you loaded a single model, but what if your model doesn't generalize well, or your API exposes multiple models? It often happens in machine learning that you need to package many models into the same service API. Generally, you can register multiple models and let your initialization script load them.

Calling your model
===================

![](images/ch12/fig_12-2.png)

The model is successfully deployed and is now ready for users to call it. Our model accepts an image as an argument, so we need to encode it before it is sent inside JSON, as Base64 string. In the receiving script run() of your score.py file you created earlier, the image is decoded again and then our model is called to predict the activity. You can test how our model works by calling it:

In [None]:
import json
import base64

image_path = 'download/workspaceblobstore/activities/surfing/resize-DSC04631.JPG'

with open(image_path, 'rb') as file:
    byte_content = file.read()
    
base64_bytes = base64.b64encode(byte_content)
base64_string = base64_bytes.decode('utf-8')
request = json.dumps({'data': base64_string })
prediction = service.run(input_data=request)
print(prediction)

The call to our model will return the predicted activity: surfing and the probability of the prediction. Getting back to the goal we stated at the beginning of this chapter, we need to give to our customer a simple URI link they can use in a multitude of apps. To get a link to the service you just deployed, you can use service.scoring\_uri:

In [None]:
print("Model inference URI: ", service.scoring_uri)

Continuous Machine Learning Delivery
====================================

>  A pile of rocks ceases to be a rock pile when somebody contemplates it with the idea of a cathedral in mind.
>
> Antoine Saint-Exupery

In the previous sections we stepped through the process of registering a model, creating a scoring script that we used to initialize the model and provide an inferencing endpoint for the API that we published as a Web service. It turns out that this process is highly repeatable in data science. As your data science team works on the models and data, improving the models accuracy, we need to make sure we keep track of changes, issues, new models are deployed. It's important to follow engineering best practices to make our project continuously deliver value to customers.

Machine Learning Pipelines
==========================

![](images/ch12/fig_12-3.png)

Machine learning workflow needs an architecture and the level of automation that applies to all stages of AI projects: from source code management, to data integration, model development, unit testing, releasing models to QA and production environments, monitoring and scaling the models. In the early stages of the project you may be dealing with Jupyter notebooks, but AI projects require a solid level of automation and process management to be successful.

Source code
-----------

It all starts with the source code integration: most machine learning CI/CD frameworks integrate with Github, DevOps or other source control systems. Typically, as model scripts are checked into the source control by data scientists, the pipeline may be triggered to train, package and deploy the model. The premise of continuous delivery cycle is automating this process.

Automating model delivery
-------------------------------

![](images/ch12/fig_12-4.png)

To start with a continuous model training and delivery, frameworks such as Azure Python SDK offer some neat tools that make it easy to wrap the process into a repeatable set of steps, conveniently called a pipeline. If you are familiar with ETL processes dealing with data, then the concept should look very familiar to you. In fact, Machine Learning pipelines are often based on the same architecture and involve data transformation steps and repeatable processes that can be scheduled or triggered to run. For the purpose of creating the pipeline, you can re-use most scripts, such as your model training script.

## Project: Creating a continuous model training pipeline

Runtime environment 
--------------------

Before you build the pipeline, let's create an environment with dependencies we need in our model, such as PyTorch and torchvision, and runtime configuration that will be used in the pipeline. Setting docker.enabled in the environment also ensures that the environment supports containers:

In [28]:
import azureml.core
from azureml.core import Workspace, Dataset, Environment
from azureml.core.runconfig import RunConfiguration
from azureml.pipeline.core import Pipeline, PipelineData
from azureml.pipeline.steps import PythonScriptStep, EstimatorStep
from azureml.train.estimator import Estimator
from azureml.core.compute import ComputeTarget, AmlCompute
from azureml.core.compute_target import ComputeTargetException
from azureml.core.conda_dependencies import CondaDependencies

# Create environment and runtime configuration
environment = Environment(env_name)
environment.docker.enabled = True
environment.python.conda_dependencies = CondaDependencies.create(pip_packages=['azureml-sdk',
                                                                        'azureml-contrib-dataset',
                                                                        'torch','torchvision',
                                                                        'azureml-dataprep[pandas,fuse]'])

In [31]:
from azureml.core.compute import ComputeTarget, AmlCompute
from azureml.core.compute_target import ComputeTargetException

try:
    compute_target = ComputeTarget(workspace=workspace, name=cluster_name)
except ComputeTargetException:
    print('Creating a new compute target...')
    compute_config = AmlCompute.provisioning_configuration(vm_size='STANDARD_D3_V2', 
                                                           max_nodes=4)

    compute_target = ComputeTarget.create(workspace, cluster_name, compute_config)
    compute_target.wait_for_completion(show_output=True, min_node_count=None, timeout_in_minutes=20)

print(compute_target.get_status().serialize())

{'currentNodeCount': 0, 'targetNodeCount': 0, 'nodeStateCounts': {'preparingNodeCount': 0, 'runningNodeCount': 0, 'idleNodeCount': 0, 'unusableNodeCount': 0, 'leavingNodeCount': 0, 'preemptedNodeCount': 0}, 'allocationState': 'Steady', 'allocationStateTransitionTime': '2020-03-24T16:59:21.234000+00:00', 'errors': None, 'creationTime': '2020-03-24T13:33:40.051086+00:00', 'modifiedTime': '2020-03-24T13:33:56.048041+00:00', 'provisioningState': 'Succeeded', 'provisioningStateTransitionTime': None, 'scaleSettings': {'minNodeCount': 0, 'maxNodeCount': 4, 'nodeIdleTimeBeforeScaleDown': 'PT120S'}, 'vmPriority': 'Dedicated', 'vmSize': 'STANDARD_D3_V2'}


In [33]:
config = RunConfiguration()
config.target = compute_target
config.environment = environment

Our model is based on classification, and in the previous chapter you created a labelled dataset as part of the workspace. We will reference that dataset in the pipeline as input for model training. The output of our pipeline is a trained model, so I created another object for model\_folder of type PipelineData where the model will be placed at the end of the pipeline:

In [29]:
dataset = Dataset.get_by_name(workspace, labeled_dataset_name)
model_folder = PipelineData("model_folder", datastore=workspace.get_default_datastore())

Creating training step 
-----------------------

The first step in the pipeline is similar to our training procedure and uses Estimator object wrapped into an EstimatorStep. This step calls train.py script, takes the input of our labeled dataset and is executed in the compute environment:

In [34]:
estimator = Estimator(source_directory=script_folder, 
                      compute_target=compute_target,
                      environment_definition=config.environment,
                      entry_script='train.py')

train_step = EstimatorStep(name = "Train Model Step",
                           estimator=estimator, 
                           estimator_entry_script_arguments=['--output-folder', model_folder, '--model-file', model_file_name],
                           outputs=[model_folder],
                           compute_target=compute_target,
                           inputs=[dataset.as_named_input('activities')],
                           allow_reuse = True)

In the previous sections, you registered the model from a Jupyter notebook. To make that registration part of continuous model delivery, we also need to create an additional script to register the model. The easiest way to do this is creating a Python script file, this script will call Model.register method to register the trained model in the workspace:

In [35]:
%%writefile $script_folder/register_model.py
import argparse
from azureml.core import Workspace, Model, Run

parser = argparse.ArgumentParser()
parser.add_argument('--model_name', type=str, dest='model_name', default="activities", help='Model name.')
parser.add_argument('--model_folder', type=str, dest='model_folder', default="outputs", help='Model folder.')
parser.add_argument('--model_file', type=str, dest='model_file', default="activities.pkl", help='Model file.')
args = parser.parse_args()
model_name = args.model_name
model_folder = args.model_folder
model_file = args.model_file

run = Run.get_context()

print("Model folder:",model_folder)
print("Model file:",model_file)

Model.register(workspace=run.experiment.workspace,
               model_name = model_name,
               model_path = model_folder+"/"+model_file)

run.complete()

Overwriting activity-classification/register_model.py


Defining deployment step
------------------------

To run this script, you can add another step to the pipeline, using a generic PythonScriptStep step:

In [36]:
register_step = PythonScriptStep(name = "Register Model Step",
                                source_directory = script_folder,
                                script_name = "register_model.py",
                                arguments = ['--model_name', model_name, '--model_folder', model_folder, '--model_file', model_file_name],
                                inputs=[model_folder],
                                compute_target = compute_target,
                                runconfig = config,
                                allow_reuse = True)

In [37]:
steps = [train_step, register_step]
print("Steps created")

pipeline = Pipeline(workspace=workspace, steps=steps)
print ("Pipeline created")

Steps created
Pipeline created


Running the pipeline
--------------------

Everything you've done so far was defining the pipeline and preparing to run it: the longest running part in the workflow is also the shortest in terms of the code. To run your workflow, create a new experiment and submit your pipeline to it, this may take a while!

In [None]:
from azureml.core import Experiment

pipeline_run = Experiment(workspace, experiment_name).submit(pipeline)
pipeline_run.wait_for_completion()

Summary
=======

In this chapter I evolved our sport classification data science experiment from simple Jupyter notebooks to the level of a professional grade project that follows best engineering practices with continuous model training and deployment. I started with a practical example of deploying the model trained in the previous chapter to the cloud and explaining how to register and manage it. Then I created a scoring script including methods for initialization and inference. I demonstrated how to define our experimental environment, including compute target and dependencies, such as ML framework that will be used in a container running our model. Then I showed how to consume the model via a Web service endpoint: both locally and from the cloud. To create a complete CI/CD automated model training and delivery, I also wrapped these steps into a machine learning pipeline.

## Reference

[*Video Course*](http://ai-learning.vhx.tv) Need a deep dive? Watch my [*video course*](http://ai-learning.vhx.tv) that complements this book with additional examples and video-walkthroughs. 

[*Web Site*](http://activefitness.ai) for research and supplemental materials.