# Step 4: Model operationalization & Deployment

In this script, a model is saved as a .model file along with the relevant scheme for deployment. The functions are first tested locally before operationalizing the model using Azure Machine Learning Model Management environment for use in production in realtime.


In [1]:
## setup our environment by importing required libraries
import os
import csv

import pandas as pd
import io
import requests

import glob
import json
from azure.storage.blob import BlockBlobService
from azure.storage.blob import PublicAccess

# for creating pipelines and model
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, VectorIndexer
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# setup the pyspark environment
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

In [2]:
%%time
# load the previous created final dataset into the workspace
from azure.storage.blob import BlockBlobService
import glob
import os

# define parameters 
ACCOUNT_NAME = "pdmvienna"
ACCOUNT_KEY = "PDuXK61GpmMVWMrWdvr29THbPdlOXa61fN5RfgQV/jBO8berC1zLzZ678Nxrx+D3CRp4+ZvSff9al+lrUh8qUQ=="
CONTAINER_NAME = "featureengineering"

# define your blob service     
my_service = BlockBlobService(account_name=ACCOUNT_NAME, account_key=ACCOUNT_KEY)

# create a local path where to store the results later.
LOCAL_DIRECT = 'model_operationalize.parquet'
if not os.path.exists(LOCAL_DIRECT):
    os.makedirs(LOCAL_DIRECT)
    print('DONE creating a local directory!')

# define your blob service     
my_service = BlockBlobService(account_name=ACCOUNT_NAME, account_key=ACCOUNT_KEY)

# download the entire parquet result folder to local path for a new run 
for blob in my_service.list_blobs(CONTAINER_NAME):
    if 'featureengineering_files.parquet' in blob.name:
        local_file = os.path.join(LOCAL_DIRECT, os.path.basename(blob.name))
        my_service.get_blob_to_path(CONTAINER_NAME, blob.name, local_file)

data = spark.read.parquet('model_operationalize.parquet')
#data.persist()
data.show(5)
print('Feature engineering final dataset files loaded!')

DONE creating a local directory!
+---------+--------------------+------------------+--------------------+----------------------+-----------------------+-------------------+---------------------+-----------------------+------------------------+------------------+-------------------+---------------------+----------------------+------------------+--------------------+----------------------+-----------------------+------------------------+------------------------+------------------------+------------------------+------------------------+-----------------+-----------------+-----------------+-----------------+------+---+-------------+--------+-------+
|machineID|        dt_truncated|volt_rollingmean_3|rotate_rollingmean_3|pressure_rollingmean_3|vibration_rollingmean_3|volt_rollingmean_24|rotate_rollingmean_24|pressure_rollingmean_24|vibration_rollingmean_24| volt_rollingstd_3|rotate_rollingstd_3|pressure_rollingstd_3|vibration_rollingstd_3|volt_rollingstd_24|rotate_rollingstd_24|pressure_rol

# Define the features, labels for the model

In [3]:
# define list of input columns for downstream modeling 
input_features = [
'volt_rollingmean_3',
'rotate_rollingmean_3',
'pressure_rollingmean_3',
'vibration_rollingmean_3',
'volt_rollingmean_24',
'rotate_rollingmean_24',
'pressure_rollingmean_24',
'vibration_rollingmean_24',
'volt_rollingstd_3',
'rotate_rollingstd_3',
'pressure_rollingstd_3',
'vibration_rollingstd_3',
'volt_rollingstd_24',
'rotate_rollingstd_24',
'pressure_rollingstd_24',
'vibration_rollingstd_24',
'error1sum_rollingmean_24',
'error2sum_rollingmean_24',
'error3sum_rollingmean_24',
'error4sum_rollingmean_24',
'error5sum_rollingmean_24',
'comp1sum',
'comp2sum',
'comp3sum',
'comp4sum',
'age'  
]

label_var = ['label_e']
key_cols =['machineID','dt_truncated']

In [4]:
# assemble features
va = VectorAssembler(inputCols=(input_features), outputCol='features')
data = va.transform(data).select('machineID','dt_truncated','label_e','features')

In [5]:
# set maxCategories so features with > 10 distinct values are treated as continuous.
featureIndexer = VectorIndexer(inputCol="features", 
                               outputCol="indexedFeatures", 
                               maxCategories=10).fit(data)

In [6]:
# fit on whole dataset to include all labels in index
labelIndexer = StringIndexer(inputCol="label_e", outputCol="indexedLabel").fit(data)

In [7]:
# split the data into train/test based on date
training = data.filter(data.dt_truncated > "2015-01-01").filter(data.dt_truncated < "2015-09-30")
testing = data.filter(data.dt_truncated > "2015-09-30")

print(training.count())
print(testing.count())

2174000
747000


# Train your best model

In [8]:
# train a RandomForest model
rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", numTrees=10)

# chain indexers and forest in a Pipeline
pipeline_rf = Pipeline(stages=[labelIndexer, featureIndexer, rf])

# train model
model_rf = pipeline_rf.fit(training)

# Save your model

Once you have a model that performs well, you can package it into a scoring service. To prepare for this, save your model and dataset schema locally first. For this ensure that the user changes the setting within aml_config and set docker.compute file to have sharedVolumes: true and prepare the environment. 

In [9]:
# save model
model_rf.write().overwrite().save(os.environ['AZUREML_NATIVE_SHARE_DIRECTORY']+'pdmrfull.model')
print("Model saved")

Model saved


In [10]:
# check to see if the model was saved in the shared location
!ls $AZUREML_NATIVE_SHARE_DIRECTORY

pdmrfull.model


# Authoring Realtime Web Service

In this section, we show the user how to author a realtime web service that scores the model you saved above. First, check to ensure that the latest version of the azure-ml-api-sdk is available for use.

In [11]:
!pip install azure-ml-api-sdk

[33mThe directory '/home/mmlspark/.cache/pip/http' or its parent directory is not owned by the current user and the cache has been disabled. Please check the permissions and owner of that directory. If executing pip with sudo, you may want sudo's -H flag.[0m
[33mThe directory '/home/mmlspark/.cache/pip' or its parent directory is not owned by the current user and caching wheels has been disabled. check the permissions and owner of that directory. If executing pip with sudo, you may want sudo's -H flag.[0m
Collecting azure-ml-api-sdk
  Downloading azure_ml_api_sdk-0.1.0a11-py2.py3-none-any.whl (80kB)
[K    100% |################################| 81kB 745kB/s ta 0:00:01
Collecting liac-arff (from azure-ml-api-sdk)
  Downloading liac-arff-2.1.1.tar.gz
Installing collected packages: liac-arff, azure-ml-api-sdk
  Running setup.py install for liac-arff ... [?25ldone
[?25hSuccessfully installed azure-ml-api-sdk-0.1.0a11 liac-arff-2.1.1
[33mYou are using pip version 8.1.2, however vers

In [12]:
from azureml.api.schema.dataTypes import DataTypes
from azureml.api.schema.sampleDefinition import SampleDefinition
from azureml.api.realtime.services import generate_schema

# Define init and run functions
Start by defining the init() and run() functions as shown in the cell below. Then write them to the score.py file. This file will load the model, perform the prediction, and return the result.

The init() function initializes your web service, loading in any data or models that you need to score your inputs. In the example below, we load in the trained model. This command is run when the Docker container containing your service initializes.
The run() function defines what is executed on a scoring call. In our simple example, we simply load in the input as a data frame, and run our pipeline on the input, and return the prediction.

In [13]:
def init():
    # read in the model file
    from pyspark.ml import PipelineModel
    global pipeline
    
    pipeline = PipelineModel.load(os.environ['AZUREML_NATIVE_SHARE_DIRECTORY']+'pdmrfull.model')
    
def run(input_df):
    import json
    response = ''
    try:
        #Get prediction results for the dataframe
        input_features = [
            'volt_rollingmean_3',
            'rotate_rollingmean_3',
            'pressure_rollingmean_3',
            'vibration_rollingmean_3',
            'volt_rollingmean_24',
            'rotate_rollingmean_24',
            'pressure_rollingmean_24',
            'vibration_rollingmean_24',
            'volt_rollingstd_3',
            'rotate_rollingstd_3',
            'pressure_rollingstd_3',
            'vibration_rollingstd_3',
            'volt_rollingstd_24',
            'rotate_rollingstd_24',
            'pressure_rollingstd_24',
            'vibration_rollingstd_24',
            'error1sum_rollingmean_24',
            'error2sum_rollingmean_24',
            'error3sum_rollingmean_24',
            'error4sum_rollingmean_24',
            'error5sum_rollingmean_24',
            'comp1sum',
            'comp2sum',
            'comp3sum',
            'comp4sum',
            'age',
        ]
        
        va = VectorAssembler(inputCols=(input_features), outputCol='features')
        data = va.transform(input_df).select('machineID','features')
        score = pipeline.transform(data)
        predictions = score.collect()

        #Get each scored result
        preds = [str(x['prediction']) for x in predictions]
        response = ",".join(preds)
    except Exception as e:
        print("Error: {0}",str(e))
        return (str(e))
    
    # Return results
    print(json.dumps(response))
    return json.dumps(response)

# Create schema and schema file
Create a schema for the input to the web service and generate the schema file. This will be used to create a Swagger file for your web service which can be used to discover its input and sample data when calling it.

In [14]:
# define the input data frame
inputs = {"input_df": SampleDefinition(DataTypes.SPARK, data.drop("dt_truncated","failure1","label_e", "model","model_encoded"))}

In [15]:
x = generate_schema(run_func=run, inputs=inputs, filepath='service_schema.json')
print(x)

{'input': {'input_df': {'internal': {'fields': [{'metadata': {}, 'type': 'integer', 'name': 'machineID', 'nullable': True}, {'metadata': {'ml_attr': {'num_attrs': 26, 'attrs': {'numeric': [{'idx': 0, 'name': 'volt_rollingmean_3'}, {'idx': 1, 'name': 'rotate_rollingmean_3'}, {'idx': 2, 'name': 'pressure_rollingmean_3'}, {'idx': 3, 'name': 'vibration_rollingmean_3'}, {'idx': 4, 'name': 'volt_rollingmean_24'}, {'idx': 5, 'name': 'rotate_rollingmean_24'}, {'idx': 6, 'name': 'pressure_rollingmean_24'}, {'idx': 7, 'name': 'vibration_rollingmean_24'}, {'idx': 8, 'name': 'volt_rollingstd_3'}, {'idx': 9, 'name': 'rotate_rollingstd_3'}, {'idx': 10, 'name': 'pressure_rollingstd_3'}, {'idx': 11, 'name': 'vibration_rollingstd_3'}, {'idx': 12, 'name': 'volt_rollingstd_24'}, {'idx': 13, 'name': 'rotate_rollingstd_24'}, {'idx': 14, 'name': 'pressure_rollingstd_24'}, {'idx': 15, 'name': 'vibration_rollingstd_24'}, {'idx': 16, 'name': 'error1sum_rollingmean_24'}, {'idx': 17, 'name': 'error2sum_rollingme

# Test init and run
We can then test the init() and run() functions right here in the notebook, before we decide to actually publish a web service.

In [16]:
# this is how the input data should be
input_data = [[114, 163.375732902,333.149484586,100.183951698,44.0958812638,164.114723991,277.191815232,97.6289110707,50.8853505161,21.0049565219,67.5287259378,12.9361526861,4.61359760918,15.5377738062,67.6519885441,10.528274633,6.94129487555,0.0,0.0,0.0,0.0,0.0,489.0,549.0,549.0,564.0,18.0]]
input_data

[[114,
  163.375732902,
  333.149484586,
  100.183951698,
  44.0958812638,
  164.114723991,
  277.191815232,
  97.6289110707,
  50.8853505161,
  21.0049565219,
  67.5287259378,
  12.9361526861,
  4.61359760918,
  15.5377738062,
  67.6519885441,
  10.528274633,
  6.94129487555,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  489.0,
  549.0,
  549.0,
  564.0,
  18.0]]

In [17]:
df = (spark.createDataFrame(input_data, ["machineID", "volt_rollingmean_3", "rotate_rollingmean_3", "pressure_rollingmean_3", "vibration_rollingmean_3", "volt_rollingmean_24", 
            "rotate_rollingmean_24", "pressure_rollingmean_24", "vibration_rollingmean_24", "volt_rollingstd_3", "rotate_rollingstd_3",
            "pressure_rollingstd_3", "vibration_rollingstd_3", "volt_rollingstd_24", "rotate_rollingstd_24", "pressure_rollingstd_24",
            "vibration_rollingstd_24", "error1sum_rollingmean_24", "error2sum_rollingmean_24", "error3sum_rollingmean_24",
            "error4sum_rollingmean_24", "error5sum_rollingmean_24", "comp1sum", "comp2sum", "comp3sum", "comp4sum",
            "age"]))

In [18]:
# test init() in local notebook
init()

In [19]:
# test run() in local notebook
run(df)

"0.0"


'"0.0"'

In [20]:
# save the schema file for deployment
out = json.dumps(x)
with open(os.environ['AZUREML_NATIVE_SHARE_DIRECTORY'] + 'service_schema.json', 'w') as f:
    f.write(out)

In [21]:
!ls $AZUREML_NATIVE_SHARE_DIRECTORY

pdmrfull.model	service_schema.json


Now the user will need to navigate to the folder: 
```C:\Users\<username>\.azureml\share\<team account>\<Project Name> ```

Copy the file service_schema.json to your projects folder for deployment.

Now we will use %%writefile command will save the *.py file.

In [22]:
%%writefile /azureml-share/pdmscore.py
# after testing the below init() and run() functions,
# uncomment this cell to create the score.py after.

# remove import from init() from function.

from pyspark import SparkConf
from pyspark import SparkContext
from pyspark import SQLContext
import pyspark.sql.functions as F
from pyspark.sql.functions import concat, col, udf, lag, date_add, explode, lit, unix_timestamp
from pyspark.sql.functions import month, weekofyear, dayofmonth
from pyspark.sql.functions import datediff, to_date, lit, unix_timestamp
from pyspark.sql.types import *
from pyspark.sql.types import DateType
from pyspark.sql.dataframe import *
from pyspark.sql.window import Window
from pyspark.sql import Row
from pyspark.ml.classification import *
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, VectorIndexer
from pyspark.ml.feature import StandardScaler, PCA, RFormula
from pyspark.ml import Pipeline, PipelineModel
from pyspark.mllib.stat import Statistics
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.functions import *
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from azureml.api.schema.dataTypes import DataTypes
from azureml.api.schema.sampleDefinition import SampleDefinition
from azureml.api.realtime.services import generate_schema


def init():
    # read in the model file
    from pyspark.ml import PipelineModel
    global pipeline
    pipeline = PipelineModel.load(os.environ['AZUREML_NATIVE_SHARE_DIRECTORY']+'pdmrfull.model')
    
def run(input_df):
    import json
    response = ''
    
    try:
        #Get prediction results for the dataframe
        input_features = [
            'volt_rollingmean_3',
            'rotate_rollingmean_3',
            'pressure_rollingmean_3',
            'vibration_rollingmean_3',
            'volt_rollingmean_24',
            'rotate_rollingmean_24',
            'pressure_rollingmean_24',
            'vibration_rollingmean_24',
            'volt_rollingstd_3',
            'rotate_rollingstd_3',
            'pressure_rollingstd_3',
            'vibration_rollingstd_3',
            'volt_rollingstd_24',
            'rotate_rollingstd_24',
            'pressure_rollingstd_24',
            'vibration_rollingstd_24',
            'error1sum_rollingmean_24',
            'error2sum_rollingmean_24',
            'error3sum_rollingmean_24',
            'error4sum_rollingmean_24',
            'error5sum_rollingmean_24',
            'comp1sum',
            'comp2sum',
            'comp3sum',
            'comp4sum',
            'age',
        ]

        va = VectorAssembler(inputCols=(input_features), outputCol='features')
        data = va.transform(input_df).select('machineID','features')
        score = pipeline.transform(data)
        predictions = score.collect()

        #Get each scored result
        preds = [str(x['prediction']) for x in predictions]
        response = ",".join(preds)
    except Exception as e:
        print("Error: {0}",str(e))
        return (str(e))
    
    # Return results
    print(json.dumps(response))
    return json.dumps(response)

if __name__ == "__main__":
    init()
    run("{\"input_df\":[{\"machineID\":114,\"volt_rollingmean_3\":163.375732902,\"rotate_rollingmean_3\":333.149484586,\"pressure_rollingmean_3\":100.183951698,\"vibration_rollingmean_3\":44.0958812638,\"volt_rollingmean_24\":164.114723991,\"rotate_rollingmean_24\":277.191815232,\"pressure_rollingmean_24\":97.6289110707,\"vibration_rollingmean_24\":50.8853505161,\"volt_rollingstd_3\":21.0049565219,\"rotate_rollingstd_3\":67.5287259378,\"pressure_rollingstd_3\":12.9361526861,\"vibration_rollingstd_3\":4.61359760918,\"volt_rollingstd_24\":15.5377738062,\"rotate_rollingstd_24\":67.6519885441,\"pressure_rollingstd_24\":10.528274633,\"vibration_rollingstd_24\":6.94129487555,\"error1sum_rollingmean_24\":0.0,\"error2sum_rollingmean_24\":0.0,\"error3sum_rollingmean_24\":0.0,\"error4sum_rollingmean_24\":0.0,\"error5sum_rollingmean_24\":0.0,\"comp1sum\":489.0,\"comp2sum\":549.0,\"comp3sum\":549.0,\"comp4sum\":564.0,\"age\":18.0}]}")

Writing /azureml-share/pdmscore.py


In [23]:
!ls $AZUREML_NATIVE_SHARE_DIRECTORY

pdmrfull.model	pdmscore.py  service_schema.json


Now the user will need to navigate to the folder: 
```C:\Users\<username>\.azureml\share\<team account>\<Project Name> ```

Copy the file pdmscore.py to your projects folder for deployment.

# Use the CLI to deploy and manage your web service 

## Pre-requisites 

Use the following commands to set up an environment and account to run the web service. For more info, see the Getting Started Guide and the CLI Command Reference. You can use -h flag at the end of the commands for command help.

• Create the environment (you need to do this once per environment e.g. dev or prod)
```
az ml env setup -c -n <yourclustername> --location <e.g. eastus2>
```

• Create a Model Management account (one time setup)
```
az ml account modelmanagement create --location <e.g. eastus2> -n <your-new-acctname> -g <yourresourcegroupname> --sku-instances 1 --sku-name S1
```

• Set the Model Management account
```
az ml account modelmanagement set -n <youracctname> -g <yourresourcegroupname>
```

• Set the environment. The cluster name is the name used in step 1 above. The resource group name was the output of the same process and would be in the command window when the setup process is completed.
```
az ml env set -n <yourclustername> -g <yourresourcegroupname>
```

## Deploy your web service 

Switch to a bash shell, and run the following commands to deploy your service and run it.

Enter the path where the notebook and other files are saved. Your actual path may be different from this example.
```
cd ~/notebooks/azureml/realtime/
```

This assumes that you saved your model locally.
```
az ml service create realtime -f pdmscore.py -r  spark-py -m pdmrfull.model -s service_schema.json -n pdmservice --cpu 0.1
```

This command will return the sample run command with sample data. You can get the Service Id from the output of the create command above.
```
az ml service show realtime -i <yourserviceid>
```

Call the web service to get a prediction
```
az ml service run realtime -i <yourserviceid> -d "{\"input_df\": [{\"machineID\":114, \"vo
lt_rollingmean_3\":163.375732902, \"rotate_rollingmean_3\":333.149484586, \"pressure_rollingmean_3\":100.183951698, \"vibration_rollingmean_3\":44.0958812638, \"volt_rollingme
an_24\":164.114723991, \"rotate_rollingmean_24\":277.191815232, \"pressure_rollingmean_24\":97.6289110707, \"vibration_rollingmean_24\":50.8853505161, \"volt_rollingstd_3\":21
.0049565219, \"rotate_rollingstd_3\":67.5287259378, \"pressure_rollingstd_3\":12.9361526861, \"vibration_rollingstd_3\":4.61359760918, \"volt_rollingstd_24\":15.5377738062, \"
rotate_rollingstd_24\":67.6519885441, \"pressure_rollingstd_24\":10.528274633, \"vibration_rollingstd_24\":6.94129487555, \"error1sum_rollingmean_24\":0.0, \"error2sum_rolling
mean_24\":0.0, \"error3sum_rollingmean_24\":0.0, \"error4sum_rollingmean_24\":0.0, \"error5sum_rollingmean_24\":0.0, \"comp1sum\":489.0, \"comp2sum\":549.0, \"comp3sum\":549.0
, \"comp4sum\":564.0, \"age\":180}]}"
```

Predicted output label is as follows:
```
"0.0"
```