<h1>Pipeline Model Deployment</h1>

Once we have built and trained our models for feature engineering (using AWS Glue and SparkML) and binary classification (using the XGBoost built-in algorithm in Amazon SageMaker), we can choose to deploy them in a pipeline using Amazon SageMaker Inference Pipelines.
https://docs.aws.amazon.com/sagemaker/latest/dg/inference-pipelines.html

This notebook demonstrates how to create a pipeline with the SparkML model for feature engineering and the Amazon SageMaker XGBoost model for binary classification.

<span style="color: red"> Please replace your initials in the bucket_name variable defined in next cell.</span>

In [1]:
import boto3
import sagemaker

role = sagemaker.get_execution_role()
region = boto3.Session().region_name
sagemaker_session = sagemaker.Session()

print(region)
print(role)

# replace [your-initials] according to the bucket name you have defined.
bucket_name = 'endtoendml-workshop-ad82'

eu-west-1
arn:aws:iam::041631420165:role/service-role/AmazonSageMaker-ExecutionRole-20180507T143636


First, we need to create two Amazon SageMaker **Model** objects, which combine the artifacts of training (serialized model artifacts in Amazon S3) and the Docker container used for inference.

In order to do that, we need to get the paths to our serialized models in S3.
<ul>
    <li>For the SparkML model, we defined the path where the artifacts have been stored in step 02</li>
    <li>For the XGBoost model, we need to find the path based on Amazon SageMaker's naming convention</li>
</ul>

In [3]:
import boto3

def get_latest_training_job_name(base_job_name):
    client = boto3.client('sagemaker')
    response = client.list_training_jobs(NameContains=base_job_name, SortBy='CreationTime', 
                                         SortOrder='Descending', StatusEquals='Completed')
    if len(response['TrainingJobSummaries']) > 0 :
        return response['TrainingJobSummaries'][0]['TrainingJobName']
    else:
        raise Exception('Training job not found.')

def get_training_job_s3_model_artifacts(job_name):
    client = boto3.client('sagemaker')
    response = client.describe_training_job(TrainingJobName=job_name)
    s3_model_artifacts = response['ModelArtifacts']['S3ModelArtifacts']
    return s3_model_artifacts

# SparkML model path.
sparkml_model_path = 's3://{0}/output/sparkml/model.tar.gz'.format(bucket_name)

training_base_job_name = 'predmain-train-xgb'
latest_training_job_name = get_latest_training_job_name(training_base_job_name)
# XGBoost model path.
xgboost_model_path = get_training_job_s3_model_artifacts(latest_training_job_name)

print('SparkML model path: ' + sparkml_model_path)
print('XGBoost model path: ' + xgboost_model_path)

SparkML model path: s3://endtoendml-workshop-ad82/output/sparkml/model.tar.gz
XGBoost model path: s3://endtoendml-workshop-ad82/output/predmain-train-xgb-2019-09-03-14-45-08-105/output/model.tar.gz


Then we need to find the ECR (Elastic Container Registry) path of the XGBoost Docker containers that will be used for hosting (actually, this is the same container we used for training).

For SparkML serving, the container is provided by AWS so we do not need to worry of finding the ECR path since it will be looked up automatically by the Amazon SageMaker Python SDK.
For more info, please see: https://github.com/aws/sagemaker-sparkml-serving-container.

In [4]:
import boto3
from sagemaker.amazon.amazon_estimator import get_image_uri

xgboost_container = get_image_uri(boto3.Session().region_name, 'xgboost', repo_version="latest")
print(xgboost_container)

685385470294.dkr.ecr.eu-west-1.amazonaws.com/xgboost:latest


SparkML serving container needs to know the schema of the request that'll be passed to it while calling the predict method. In order to alleviate the pain of not having to pass the schema with every request, _sagemaker-sparkml-serving_ allows you to pass it via an environment variable while creating the model definitions. This schema definition will be required in our next step for creating a model.

You can overwrite this schema on a per request basis by passing it as part of the individual request payload as well.

In [11]:
import json
schema = {
    "input": [
        {
            "name": "turbine_id",
            "type": "string"
        },
        {
            "name": "turbine_type",
            "type": "string"
        },
        {
            "name": "wind_speed",
            "type": "double"
        }, 
        {
            "name": "rpm_blade",
            "type": "double"
        }, 
        {
            "name": "oil_temperature",
            "type": "double"
        }, 
        {
            "name": "oil_level",
            "type": "double"
        }, 
        {
            "name": "temperature",
            "type": "double"
        },
        {
            "name": "humidity",
            "type": "double"
        }, 
        {
            "name": "vibrations_frequency",
            "type": "double"
        },
        {
            "name": "pressure",
            "type": "double"
        },
        {
            "name": "wind_direction",
            "type": "string"
        },
    ],
    "output": 
        {
            "name": "features",
            "type": "double",
            "struct": "vector"
        }
}
schema_json = json.dumps(schema)
print(schema_json)

{"input": [{"name": "turbine_id", "type": "string"}, {"name": "turbine_type", "type": "string"}, {"name": "wind_speed", "type": "double"}, {"name": "rpm_blade", "type": "double"}, {"name": "oil_temperature", "type": "double"}, {"name": "oil_level", "type": "double"}, {"name": "temperature", "type": "double"}, {"name": "humidity", "type": "double"}, {"name": "vibrations_frequency", "type": "double"}, {"name": "pressure", "type": "double"}, {"name": "wind_direction", "type": "string"}], "output": {"name": "features", "type": "double", "struct": "vector"}}


We are ready to create our **Model** objects:

In [12]:
from sagemaker.model import Model
from sagemaker.sparkml.model import SparkMLModel

sparkml_preprocessor_model = SparkMLModel(model_data=sparkml_model_path, env={'SAGEMAKER_SPARKML_SCHEMA' : schema_json}, 
                                    sagemaker_session=sagemaker_session)
xgboost_model = Model(xgboost_model_path, xgboost_container, sagemaker_session=sagemaker_session)


Once we have models ready, we can deploy them in a pipeline:

In [14]:
import sagemaker
import time
from sagemaker.pipeline import PipelineModel

pipeline_model_name = 'pred-main-sparkml-xgb-pipeline-{0}'.format(str(int(time.time())))

pipeline_model = PipelineModel(
    name=pipeline_model_name, 
    role=role,
    models=[
        sparkml_preprocessor_model, 
        xgboost_model],
    sagemaker_session=sagemaker_session)

endpoint_name = 'pred-main-pipeline-endpoint'

pipeline_model.deploy(initial_instance_count=1, 
                      instance_type='ml.c5.xlarge', 
                      endpoint_name=endpoint_name)

----------------------------------------------------------------------------!

<h2>Getting inferences</h2>

Now we can try invoking our pipeline of models and try getting some inferences:

In [25]:
from sagemaker.predictor import json_serializer, csv_serializer, json_deserializer, RealTimePredictor
from sagemaker.content_types import CONTENT_TYPE_CSV, CONTENT_TYPE_JSON

#predictor = RealTimePredictor(
#    endpoint=endpoint_name,
#    sagemaker_session=sagemaker_session,
#    serializer=csv_serializer,
#    content_type=CONTENT_TYPE_CSV,
#    accept=CONTENT_TYPE_JSON)

#payload = "TID008,VAWT,64,80,46,21,55,55,7,34,NE"
#print(predictor.predict(payload))

payload = {"data": ["TID008", "VAWT", 64.0,80.1,46.0,21.0,55.0,55.0,7.0,34.0,"NE"]}
predictor = RealTimePredictor(endpoint=endpoint_name, sagemaker_session=sagemaker_session, serializer=json_serializer,
                                content_type=CONTENT_TYPE_JSON, accept=CONTENT_TYPE_CSV)

print(predictor.predict(payload))

ModelError: An error occurred (ModelError) when calling the InvokeEndpoint operation: Received client error (400) from container-1 with message "invalid fields: rpm_blade". See https://eu-west-1.console.aws.amazon.com/cloudwatch/home?region=eu-west-1#logEventViewer:group=/aws/sagemaker/Endpoints/pred-main-pipeline-endpoint in account 041631420165 for more information.

In [26]:
payload = {
    "schema": {
        "input": [
        {
            "name": "turbine_id",
            "type": "string"
        },
        {
            "name": "turbine_type",
            "type": "string"
        },
        {
            "name": "wind_speed",
            "type": "double"
        }, 
        {
            "name": "rpm_blade",
            "type": "double"
        }, 
        {
            "name": "oil_temperature",
            "type": "double"
        }, 
        {
            "name": "oil_level",
            "type": "double"
        }, 
        {
            "name": "temperature",
            "type": "double"
        },
        {
            "name": "humidity",
            "type": "double"
        }, 
        {
            "name": "vibrations_frequency",
            "type": "double"
        },
        {
            "name": "pressure",
            "type": "double"
        },
        {
            "name": "wind_direction",
            "type": "string"
        },
    ],
    "output": 
        {
            "name": "features",
            "type": "double",
            "struct": "vector"
        }
    },
    "data": ["TID008", "VAWT", 64.0,80.1,46.0,21.0,55.0,55.0,7.0,34.0,"NE"]
}

predictor = RealTimePredictor(endpoint=endpoint_name, sagemaker_session=sagemaker_session, serializer=json_serializer,
                                content_type=CONTENT_TYPE_JSON, accept=CONTENT_TYPE_CSV)

print(predictor.predict(payload))


b'0.998887598515'
