# Step 3: Model Operationalization & Deployment

In this script, a model is saved as a .model file along with the relevant scheme for deployment. The functions are first tested locally before operationalizing the model using Azure Machine Learning Model Management environment for use in production in realtime.

In [1]:
import keras

In [2]:
# import the libraries
import os
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import urllib
import json
import shutil

# Setup the pyspark environment
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
spark = SparkSession.builder.getOrCreate()

# Setting seed for reproducability
np.random.seed(1234)  
PYTHONHASHSEED = 0
from sklearn import preprocessing
from sklearn.metrics import confusion_matrix, recall_score, precision_score
from keras.models import Sequential
from keras.layers import Dense, Dropout, LSTM, Activation
%matplotlib inline

# For Azure blob storage access
from azure.storage.blob import BlockBlobService
from azure.storage.blob import PublicAccess

from azureml.api.schema.dataTypes import DataTypes
from azureml.api.schema.sampleDefinition import SampleDefinition
from azureml.api.realtime.services import generate_schema

# Load feature data set

We have previously created the labeled feature data set in the `Code\1_data_ingestion_and_preparation.ipynb` Jupyter notebook. Since the Azure Blob storage account name and account key are not passed between notebooks, you'll need your credentials here again.

In [6]:
# Enter your Azure blob storage details here 
ACCOUNT_NAME = "<your blob storage account name>"

# You can find the account key under the _Access Keys_ link in the 
# [Azure Portal](portal.azure.com) page for your Azure storage container.
ACCOUNT_KEY = "<your blob storage account key>"

#-------------------------------------------------------------------------------------------
# The data from the Data Ingestion and Preparation notebook is stored in the sensordata ingestion container.
CONTAINER_NAME = 'sensordataingestiontest'
FE_DIRECTORY = 'PM_test_files.parquet'
#FE_DIRECTORY = 'sensordataingestiontest_files.parquet'

MODEL_CONTAINER = 'modeldeploy'

# Connect to your blob service     
az_blob_service = BlockBlobService(account_name=ACCOUNT_NAME, account_key=ACCOUNT_KEY)

# Create a new container if necessary, otherwise you can use an existing container.
# This command creates the container if it does not already exist. Else it does nothing.
az_blob_service.create_container(CONTAINER_NAME, 
                                 fail_on_exist=False, 
                                 public_access=PublicAccess.Container)

# create a local path where to store the results later.
if not os.path.exists(FE_DIRECTORY):
    os.makedirs(FE_DIRECTORY)

# download the entire parquet result folder to local path for a new run 
for blob in az_blob_service.list_blobs(CONTAINER_NAME):
    if CONTAINER_NAME in blob.name:
        local_file = os.path.join(FE_DIRECTORY, os.path.basename(blob.name))
        az_blob_service.get_blob_to_path(CONTAINER_NAME, blob.name, local_file)

test_df = spark.read.parquet(FE_DIRECTORY)
#test_df.limit(5).toPandas().head(5)

test_df = test_df.toPandas()
test_df.head(10)

AnalysisException: 'Unable to infer schema for Parquet. It must be specified manually.;'

## Define init and run functions
Start by defining the init() and run() functions as shown in the cell below. Then write them to the score.py file. This file will load the model, perform the prediction, and return the result.

The init() function initializes your web service, loading in any data or models that you need to score your inputs. In the example below, we load in the trained model. This command is run when the Docker container containing your service initializes.
The run() function defines what is executed on a scoring call. In our simple example, we simply load in the input as a data frame, and run our pipeline on the input, and return the prediction.

In [7]:
def init():
    # read in the model file
    from sklearn.pipeline import Pipeline
    global pipeline
    
    pipeline = PipelineModel.load(os.environ['AZUREML_NATIVE_SHARE_DIRECTORY']+'pdmrfull.model')

def gen_sequence(id_df, seq_length, seq_cols):
    """ Only sequences that meet the window-length are considered, no padding is used. This means for testing
    we need to drop those which are below the window-length. An alternative would be to pad sequences so that
    we can use shorter ones """
    data_array = id_df[seq_cols].values
    num_elements = data_array.shape[0]
    for start, stop in zip(range(0, num_elements-seq_length), range(seq_length, num_elements)):
        yield data_array[start:stop, :]    
    # pick the feature columns 
    sensor_cols = ['s' + str(i) for i in range(1,22)]
    sequence_cols = ['setting1', 'setting2', 'setting3', 'cycle_norm']
    sequence_cols.extend(sensor_cols)
        
    # generator for the sequences
    seq_gen = (list(gen_sequence(test_df[test_df['id']==id], sequence_length, sequence_cols)) 
           for id in test_df['id'].unique())

    # generate sequences and convert to numpy array
    seq_array = np.concatenate(list(seq_gen)).astype(np.float32)
    seq_array.shape

def run(input_df):
    import json
    response = ''
    try:
        score = pipeline.transform(seq_array)
        predictions = score.collect()   

   
        # Get each scored result
        preds = [str(x['prediction']) for x in predictions]
        response = ",".join(preds)
    except Exception as e:
        print("Error: {0}", str(e))
        return (str(e))
    
        # Return results
        print(json.dumps(response))
        return json.dumps(response)

## Create schema and schema file
Create a schema for the input to the web service and generate the schema file. This will be used to create a Swagger file for your web service which can be used to discover its input and sample data when calling it.

In [10]:
# define the input data frame
inputs = {"input_df": SampleDefinition(DataTypes.PANDAS, test_df)}
generate_schema(run_func=run, inputs=inputs, filepath='service_schema.json')

NameError: name 'train_df' is not defined

In [58]:
import imp
import score
imp.reload(score)

<module 'score' from 'C:\\Users\\lazzeri\\AppData\\Local\\Temp\\azureml_runs\\lstmpm_v2_1510243980846\\score.py'>

In [59]:
score.init()
score.run(test_df)

'"        id  cycle  setting1  setting2  setting3   s1        s2        s3  \\\\\\n0        1      1  0.459770  0.166667       0.0  0.0  0.183735  0.406802   \\n1        1      2  0.609195  0.250000       0.0  0.0  0.283133  0.453019   \\n2        1      3  0.252874  0.750000       0.0  0.0  0.343373  0.369523   \\n3        1      4  0.540230  0.500000       0.0  0.0  0.343373  0.256159   \\n4        1      5  0.390805  0.333333       0.0  0.0  0.349398  0.257467   \\n5        1      6  0.252874  0.416667       0.0  0.0  0.268072  0.292784   \\n6        1      7  0.557471  0.583333       0.0  0.0  0.382530  0.463920   \\n7        1      8  0.304598  0.750000       0.0  0.0  0.406627  0.259865   \\n8        1      9  0.545977  0.583333       0.0  0.0  0.274096  0.434707   \\n9        1     10  0.310345  0.583333       0.0  0.0  0.150602  0.440375   \\n10       1     11  0.603448  0.250000       0.0  0.0  0.322289  0.233486   \\n11       1     12  0.591954  0.666667       0.0  0.0  0.256

## Persist model assets
Next we persist the assets we have created to disk for use in operationalization.

In [None]:
# save the schema file for deployment
out = json.dumps(json_schema)
with open(os.environ['AZUREML_NATIVE_SHARE_DIRECTORY'] + 'service_schema.json', 'w') as f:
    f.write(out)

Now we will use %%writefile meta command to save the init() and run() functions to the save the pdmscore.py file.

In [None]:
%%writefile {os.environ['AZUREML_NATIVE_SHARE_DIRECTORY']}/pdmscore.py

import json
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier, DecisionTreeClassifier

# for creating pipelines and model
from pyspark.ml.feature import StringIndexer, VectorAssembler, VectorIndexer

def init():
    # read in the model file
    from pyspark.ml import PipelineModel
    # read in the model file
    global pipeline
    pipeline = PipelineModel.load('pdmrfull.model')
    
def run(input_df):
    response = ''
    try:
        score = pipeline.transform(seq_array)
        predictions = score.collect() 

        #Get each scored result
        preds = [str(x['prediction']) for x in predictions]
        response = ",".join(preds)
    except Exception as e:
        print("Error: {0}",str(e))
        return (str(e))
    
    # Return results
    print(json.dumps(response))
    return json.dumps(response)

if __name__ == "__main__":
    init()


These files are stored in the ['AZUREML_NATIVE_SHARE_DIRECTORY'] location on the kernel host machine with the model stored in the `Code\3_model_building_and_evaluation.ipynb` notebook. In order to share these assets and operationalize the model, we create a new blob container and store a compressed file containing those assets for later retrieval.

In [None]:
# Compress the operationalization assets for easy blob storage transfer
MODEL_O16N = shutil.make_archive('o16n', 'zip', os.environ['AZUREML_NATIVE_SHARE_DIRECTORY'])

# Create a new container if necessary, otherwise you can use an existing container.
# This command creates the container if it does not already exist. Else it does nothing.
az_blob_service.create_container(MODEL_CONTAINER, 
                                 fail_on_exist=False, 
                                 public_access=PublicAccess.Container)

# Transfer the compressed operationalization assets into the blob container.
az_blob_service.create_blob_from_path(MODEL_CONTAINER, "o16n.zip", str(MODEL_O16N) ) 


# Time the notebook execution. 
# This will only make sense if you "Run All" cells
toc = time.time()
print("Full run took %.2f minutes" % ((toc - tic)/60))

logger.log("Operationalization Run time", ((toc - tic)/60))

## Deployment

Once the assets are stored, we can download them into a local compute context for operationalization on an Azure web service.

We demonstrate how to setup this web service this through a CLI window opened in the AML Workbench application. 

## Download the model

To download the model we've saved, follow these instructions on a local computer.

- Open the [Azure Portal](http://portal.azure.com)
- In the left hand pane, click on __All resources__
- Search for the storage account using the name you provided earlier in this notebook. 
- Choose the storage account from search result list, this will open the storage account panel.
- On the storage account panel, choose __Blobs__
- On the Blobs panel choose the container __modeldeploy__
- Select the file o16n.zip and on the properties pane for that blob choose download.

Once downloaded, unzip the file into the directory of your choosing. The zip file contains three deployment assets:

- the `lstmscore.py` file
- a `lstm.model` directory
- the `service_schema.json` file



## Create a model management endpoint 

Create a modelmanagement under your account. We will call this `lstmmodelmanagement`. The remaining defaults are acceptable.

`az ml account modelmanagement create --location <ACCOUNT_REGION> --resource-group <RESOURCE_GROUP> --name pdmmodelmanagement`


## Check environment settings

Show what environment is currently active:

`az ml env show`

If nothing is set, we setup the environment with the existing model management context first: 

` az ml env setup --location <ACCOUNT_REGION> --resource-group <RESOURCE_GROUP> --name pdmmodelmanagement`

then set the current environment:

`az ml env set --resource-group <RESOURCE_GROUP> --cluster-name pdmmodelmanagement`

Check that the environment is now set:

`az ml env show`


## Deploy your web service 

Once the environment is setup, we'll deploy the web service from the CLI.

These commands assume the current directory contains the webservice assets we created in throughout the notebooks in this scenario (`lstmscore.py`, `service_schema.json` and `lstm.model`). If your kernel has run locally, the assets will be in the `os.environ['AZUREML_NATIVE_SHARE_DIRECTORY']`. 

On windows this points to:

```
cd C:\Users\<username>\.azureml\share\<team account>\<Project Name>
```

on linux variants this points to:

```
cd ~\.azureml\share\<team account>\<Project Name>
```


The command to create a web service (`<SERVICE_ID>`) with these operationalization assets in the current directory is:

```
az ml service create realtime -f <filename> -r <TARGET_RUNTIME> -m <MODEL_FILE> -s <SCHEMA_FILE> -n <SERVICE_ID> --cpu 0.1
```

The default cluster has only 2 nodes with 2 cores each. Some cores are taken for system components. AMLWorkbench asks for 1 core per service. To deploy multiple services into this cluster, we specify the cpu requirement in the service create command as (--cpu 0.1) to request 10% of a core. 

For this example, we will call our webservice `amlworkbenchpdmwebservice`. This `SERVICE_ID` must be all lowercase, with no spaces:

```
az ml service create realtime -f lstmscore.py -r spark-py -m lstm.model -s service_schema.json --cpu 0.1 -n amlworkbenchpdmwebservice
```

This command will take some time to execute. 

Once complete, the command returns sample usage commands to test the service for both PowerShell and the cmd prompt. We can execute these commands from the command line as well.