So far we have seen how to do data analysis and build ML models locally - but how do we actually productionize them? Some key concerns include:
- how do we perform distributed preprocessing on large datasets?
- how do we deploy our algorithm onto an API endpoint that can be easily consumed?
- how do we continuously train the model with new data after deployment?


In the following notebook, we will demonstrate how you can build your ML Pipeline leveraging Spark Feature Transformers and SageMaker XGBoost algorithm & after the model is trained, deploy the Pipeline (Feature Transformer and XGBoost) as an Inference Pipeline behind a single Endpoint for real-time inference and for batch inferences using Amazon SageMaker Batch Transform.

In [19]:
import pandas as pd
import numpy as np
import wget

## Exploratory Data Analysis

The problem we will tackle is to predict the age of an Abalone from its physical measurements such as sec, length, diameter, height etc. The target variable is `rings` since it corresponds with the age. The dataset we are using is available on the [UCI Machine Learning Repository](https://archive.ics.uci.edu/ml/datasets/abalone).

In [2]:
columns = ['sex', 'length', 'diameter', 'height', 'whole_weight', 'shucked_weight', 'viscera_weight', 'shell_weight', 'rings']
data = pd.read_csv('abalone.csv', names=columns)
data

Unnamed: 0,sex,length,diameter,height,whole_weight,shucked_weight,viscera_weight,shell_weight,rings
0,M,0.455,0.365,0.095,0.5140,0.2245,0.1010,0.1500,15
1,M,0.350,0.265,0.090,0.2255,0.0995,0.0485,0.0700,7
2,F,0.530,0.420,0.135,0.6770,0.2565,0.1415,0.2100,9
3,M,0.440,0.365,0.125,0.5160,0.2155,0.1140,0.1550,10
4,I,0.330,0.255,0.080,0.2050,0.0895,0.0395,0.0550,7
...,...,...,...,...,...,...,...,...,...
4172,F,0.565,0.450,0.165,0.8870,0.3700,0.2390,0.2490,11
4173,M,0.590,0.440,0.135,0.9660,0.4390,0.2145,0.2605,10
4174,M,0.600,0.475,0.205,1.1760,0.5255,0.2875,0.3080,9
4175,F,0.625,0.485,0.150,1.0945,0.5310,0.2610,0.2960,10


In [62]:
data.describe()

Unnamed: 0,length,diameter,height,whole_weight,shucked_weight,viscera_weight,shell_weight,rings
count,4177.0,4177.0,4177.0,4177.0,4177.0,4177.0,4177.0,4177.0
mean,0.523992,0.407881,0.139516,0.828742,0.359367,0.180594,0.238831,9.933684
std,0.120093,0.09924,0.041827,0.490389,0.221963,0.109614,0.139203,3.224169
min,0.075,0.055,0.0,0.002,0.001,0.0005,0.0015,1.0
25%,0.45,0.35,0.115,0.4415,0.186,0.0935,0.13,8.0
50%,0.545,0.425,0.14,0.7995,0.336,0.171,0.234,9.0
75%,0.615,0.48,0.165,1.153,0.502,0.253,0.329,11.0
max,0.815,0.65,1.13,2.8255,1.488,0.76,1.005,29.0


The Notebook consists of a few high-level steps:
- Using AWS Glue for executing the SparkML feature processing job.
- Using SageMaker XGBoost to train on the processed dataset produced by SparkML job.
- Building an Inference Pipeline consisting of SparkML & XGBoost models for a realtime inference endpoint.
- Building an Inference Pipeline consisting of SparkML & XGBoost models for a single Batch Transform job.

Here are the tools that we will use
 - AWS Sagemaker - end to end ML pipeline on the cloud
 - AWS Boto - python SDK for interfacing with AWS services
 - AWS S3 - simple storage service bucket for storing data in the cloud
 - AWS Glue - serverless ETL service which can be used to execute standard Spark jobs
 - Pyspark 
 - XGBoost

## Initializing AWS Sagemaker
Lets begin by setting up our AWS configuration to set up roles, Access Keys before uploading our dataset into S3

In [8]:
import sagemaker
from sagemaker import get_execution_role
sess = sagemaker.Session() # install awscli, $ aws configure
# session = boto3.session.Session()
#role = get_execution_role()
#print(role)

In [34]:
import boto3
import botocore
from botocore.exceptions import ClientError

boto_session = sess.boto_session
s3 = boto_session.resource('s3')
account = boto_session.client('sts').get_caller_identity()['Account']
region = boto_session.region_name
s3.create_bucket(Bucket='aws-glue-{}-{}'.format(account, region), 
                 CreateBucketConfiguration={'LocationConstraint': region})

# Uploading the training data to S3
sess.upload_data(path='abalone.csv', bucket=default_bucket, key_prefix='input/abalone')

's3://aws-glue-636839656075-ap-southeast-1/input/abalone/abalone.csv'

## Building ETL Pipeline

Next, lets build the entire ETL pipeline and convert that into a script `abalone_processing.py` consisting of the following steps:

- 1. Defining the input schema
- 2. Fetch data from S3 bucket
- 3. Build feature processing pipeline
- 4. Perform transform operations on RDDs
- 5. Serialize ETL Model and save to S3

Let's now upload our ETL script to S3 for use in Sagemaker later

In [35]:
bucket = 'aws-glue-{}-{}'.format(account, region)
script_location = sess.upload_data(path='abalone_processing.py', 
                                   bucket=bucket, key_prefix='codes')
script_location

's3://aws-glue-636839656075-ap-southeast-1/codes/abalone_processing.py'

For our job, we will also have to pass MLeap dependencies to Glue. MLeap is an additional library we are using which does not come bundled with default Spark.
Similar to most of the packages in the Spark ecosystem, MLeap is also implemented as a Scala package with a front-end wrapper written in Python so that it can be used from PySpark. 

We need to make sure that the MLeap Python library as well as the JAR is available within the Glue job environment. In the following cell, we will download the MLeap Python dependency & JAR from a SageMaker hosted bucket and upload to the S3 bucket we created above in your account.

In [21]:
wget.download('https://s3-us-west-2.amazonaws.com/sparkml-mleap/0.9.6/python/python.zip')
wget.download('https://s3-us-west-2.amazonaws.com/sparkml-mleap/0.9.6/jar/mleap_spark_assembly.jar')
              

'mleap_spark_assembly.jar'

In [23]:
python_dep_location = sess.upload_data(path='python.zip', 
                                       bucket=default_bucket, 
                                       key_prefix='dependencies/python')
jar_dep_location = sess.upload_data(path='mleap_spark_assembly.jar', 
                                    bucket=default_bucket, 
                                    key_prefix='dependencies/jar')
print(python_dep_location, jar_dep_location)

s3://aws-glue-636839656075-ap-southeast-1/dependencies/python/python.zip s3://aws-glue-636839656075-ap-southeast-1/dependencies/jar/mleap_spark_assembly.jar


## Executing ETL Jobs with AWS Glue

Next we define the output location where the transformed dataset should be uploaded. We are also specifying a model location where the MLeap serialized model would be updated. This locations should be consumed as part of the Spark script using `getResolvedOptions` method of AWS Glue library (see abalone_processing.py for details).

We'll be creating Glue client via Boto so that we can invoke the `create_job` API which will allow us to define mutable jobs for execution. Note that his requires passing the code location as well as the dependencies location to Glue.

In [67]:
from time import gmtime, strftime
import time

timestamp = strftime("%Y-%m-%d-%H-%M-%S", gmtime())
glue_client = boto_session.client('glue')
job_name = 'sparkml-abalone-' + timestamp
role = 'arn:aws:iam::636839656075:role/AWSGlueServiceSageMakerNotebookRole-Default'# get_execution_role()

response = glue_client.create_job(
    Name=job_name,
    Description='PySpark job to featurize the Abalone dataset',
    Role=role, # you can pass your existing AWS Glue role here if you have used Glue before
    ExecutionProperty={
        'MaxConcurrentRuns': 1
    },
    Command={
        'Name': 'glueetl',
        'ScriptLocation': script_location
    },
    DefaultArguments={
        '--job-language': 'python',
        '--extra-jars' : jar_dep_location,
        '--extra-py-files': python_dep_location
    },
    AllocatedCapacity=5,
    Timeout=60,
)
glue_job_name = response['Name']
print(glue_job_name)

sparkml-abalone-2020-04-12-13-36-14


Our **ETL spark job** will be executed now by calling `start_job_run` API. This API creates an immutable run/execution corresponding to the job definition created above. We will require the job_run_id for the particular job execution to check for status. We'll pass the data and model locations as part of the job execution parameters.

Now we will check for the job status to see if it has succeeded, failed or stopped. Once the job is succeeded, we have the transformed data into S3 in CSV format which we can use with XGBoost for training. If the job fails, you can go to AWS Glue console, click on Jobs tab on the left, and from the page, click on this particular job and you will be able to find the CloudWatch logs (the link under Logs) link for these jobs which can help you to see what exactly went wrong in the job execution.

In [68]:
s3_input_bucket = bucket
s3_input_key = 'input/abalone'
s3_output_bucket = bucket
s3_output_key = timestamp + '/abalone'
s3_model_bucket = bucket
s3_model_key = s3_output_key + '/mleap'

job_run_id = glue_client.start_job_run(JobName=job_name,
                                       Arguments = {
                                        '--S3_INPUT_BUCKET': s3_input_bucket,
                                        '--S3_INPUT_KEY_PREFIX': s3_input_key,
                                        '--S3_OUTPUT_BUCKET': s3_output_bucket,
                                        '--S3_OUTPUT_KEY_PREFIX': s3_output_key,
                                        '--S3_MODEL_BUCKET': s3_model_bucket,
                                        '--S3_MODEL_KEY_PREFIX': s3_model_key
                                       })['JobRunId']
print(job_run_id)
job_run_status = glue_client.get_job_run(
                            JobName=job_name,
                            RunId=job_run_id
                        )['JobRun']['JobRunState']

while job_run_status not in ('FAILED', 'SUCCEEDED', 'STOPPED'):
    job_run_status = glue_client.get_job_run(JobName=job_name,RunId=job_run_id)['JobRun']['JobRunState']
    print (job_run_status)
    time.sleep(30)

jr_606e8ca86c60e203f98fa04dc6b1975dc689f7567f3199ed0533966560b2beb0
RUNNING
RUNNING
RUNNING
RUNNING
RUNNING
SUCCEEDED


## Train and Deploy Model

Now we will use SageMaker XGBoost algorithm to train on this dataset. We already know the S3 location where the preprocessed training data was uploaded as part of the Glue job.

In [46]:
from sagemaker.amazon.amazon_estimator import get_image_uri

xboost = get_image_uri(region, 'xgboost', repo_version='0.90-1')
s3_train_data = 's3://{}/{}/{}'.format(s3_output_bucket, s3_output_key, 'train')
s3_validation_data = 's3://{}/{}/{}'.format(s3_output_bucket, s3_output_key, 'validation')
s3_output_location = 's3://{}/{}/{}'.format(s3_output_bucket, s3_output_key, 'xgboost_model')

xgb_model = sagemaker.estimator.Estimator(xboost, role, train_instance_count=1, train_instance_type='ml.m4.xlarge',
                                          train_volume_size = 5,train_max_run = 3600,input_mode= 'File', 
                                          output_path=s3_output_location, sagemaker_session=sess)

xgb_model.set_hyperparameters(objective="reg:linear", eta=.2, gamma=4, max_depth=5,
                              num_round=10,subsample=0.7,silent=0, min_child_weight=6)

train_data = sagemaker.session.s3_input(s3_train_data, distribution='FullyReplicated', 
                        content_type='text/csv', s3_data_type='S3Prefix')

validation_data = sagemaker.session.s3_input(s3_validation_data, distribution='FullyReplicated', 
                             content_type='text/csv', s3_data_type='S3Prefix')

xgb_model.fit(inputs={'train': train_data, 'validation': validation_data}, logs=True)


2020-04-12 10:46:49 Starting - Starting the training job...
2020-04-12 10:46:51 Starting - Launching requested ML instances...
2020-04-12 10:47:51 Starting - Preparing the instances for training......
2020-04-12 10:48:54 Downloading - Downloading input data
2020-04-12 10:48:54 Training - Downloading the training image.....[34mINFO:sagemaker-containers:Imported framework sagemaker_xgboost_container.training[0m
[34mINFO:sagemaker-containers:Failed to parse hyperparameter objective value reg:linear to Json.[0m
[34mReturning the value itself[0m
[34mINFO:sagemaker-containers:No GPUs detected (normal if no gpus installed)[0m
[34mINFO:sagemaker_xgboost_container.training:Running XGBoost Sagemaker in algorithm mode[0m
[34mINFO:root:Determined delimiter of CSV input is ','[0m
[34mINFO:root:Determined delimiter of CSV input is ','[0m
[34mINFO:root:Determined delimiter of CSV input is ','[0m
[34m[10:49:28] 3327x9 matrix with 29943 entries loaded from /opt/ml/input/data/train?form

Next we will proceed with deploying the models in SageMaker to create an Inference Pipeline. You can create an Inference Pipeline with upto five containers.
Deploying a model in SageMaker requires two components:
- **Model Docker image in ECR** - we created the fitted model during training
- **ETL Pipeline** - the serialized ETL pipeline we uploaded to S3 earlier

SparkML serving container needs to know the schema of the request that'll be passed to it while calling the predict method. In order to alleviate the pain of not having to pass the schema with every request, sagemaker-sparkml-serving allows you to pass it via an environment variable while creating the model definitions.

In [48]:
import json
schema = {
    "input": [
        {
            "name": "sex",
            "type": "string"
        }, 
        {
            "name": "length",
            "type": "double"
        }, 
        {
            "name": "diameter",
            "type": "double"
        }, 
        {
            "name": "height",
            "type": "double"
        }, 
        {
            "name": "whole_weight",
            "type": "double"
        }, 
        {
            "name": "shucked_weight",
            "type": "double"
        },
        {
            "name": "viscera_weight",
            "type": "double"
        }, 
        {
            "name": "shell_weight",
            "type": "double"
        }, 
    ],
    "output": 
        {
            "name": "features",
            "type": "double",
            "struct": "vector"
        }
}
schema_json = json.dumps(schema)
print(schema_json)

{"input": [{"name": "sex", "type": "string"}, {"name": "length", "type": "double"}, {"name": "diameter", "type": "double"}, {"name": "height", "type": "double"}, {"name": "whole_weight", "type": "double"}, {"name": "shucked_weight", "type": "double"}, {"name": "viscera_weight", "type": "double"}, {"name": "shell_weight", "type": "double"}], "output": {"name": "features", "type": "double", "struct": "vector"}}


Next we'll create a SageMaker PipelineModel with SparkML and XGBoost.The PipelineModel will ensure that both the containers get deployed behind a single API endpoint in the correct order. The same model would later be used for Batch Transform as well to ensure that a single job is sufficient to do prediction against the Pipeline.

In [52]:
from sagemaker.model import Model
from sagemaker.pipeline import PipelineModel
from sagemaker.sparkml.model import SparkMLModel

sparkml_data = 's3://{}/{}/{}'.format(s3_model_bucket, s3_model_key, 'model.tar.gz')
sparkml_model = SparkMLModel(model_data=sparkml_data, env={'SAGEMAKER_SPARKML_SCHEMA' : schema_json})
xgb_model = Model(model_data=xgb_model.model_data, image=xboost)
name = 'inference-pipeline-' + timestamp
model = PipelineModel(name=name, role=role, models=[sparkml_model, xgb_model])
model.deploy(initial_instance_count=1,instance_type='ml.c4.xlarge',endpoint_name='deployment-'+name)

-------------!

## Making Predictions Using Our Deployed Model

Now we will invoke the endpoint with a valid payload that SageMaker SparkML Serving can recognize. There are three ways in which input payload can be passed to the request:
- 1. Pass it as a valid CSV string. In this case, the schema passed via the environment variable will be used to determine the schema. For CSV format, every column in the input has to be a basic datatype (e.g. int, double, string) and it can not be a Spark Array or Vector.
- 2. Pass it as a valid JSON string. In this case as well, the schema passed via the environment variable will be used to infer the schema. With JSON format, every column in the input can be a basic datatype or a Spark Vector or Array provided that the corresponding entry in the schema mentions the correct value.
- 3. Pass the request in JSON format along with the schema and the data. In this case, the schema passed in the payload will take precedence over the one passed via the environment variable (if any).

In [59]:
from sagemaker.predictor import json_serializer, csv_serializer, json_deserializer, RealTimePredictor
from sagemaker.content_types import CONTENT_TYPE_CSV, CONTENT_TYPE_JSON

payload = "F,0.515,0.425,0.14,0.766,0.304,0.1725,0.255"
endpoint = 'deployment-'+name
predictor = RealTimePredictor(endpoint=endpoint, sagemaker_session=sess, 
                              serializer=csv_serializer, content_type=CONTENT_TYPE_CSV, 
                              accept=CONTENT_TYPE_CSV)
print(predictor.predict(payload))

b'11.457910537719727'


In [60]:
payload2 = {"data": ["F",0.515,0.425,0.14,0.766,0.304,0.1725,0.255]}
predictor = RealTimePredictor(endpoint=endpoint, sagemaker_session=sess, 
                              serializer=json_serializer, content_type=CONTENT_TYPE_JSON, 
                              accept=CONTENT_TYPE_JSON)

print(predictor.predict(payload2))

b'11.457910537719727'


In [61]:
sm_client = boto_session.client('sagemaker')
sm_client.delete_endpoint(EndpointName=endpoint)

{'ResponseMetadata': {'RequestId': '3f67da50-e2f8-4f78-9604-249e991adf5e',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '3f67da50-e2f8-4f78-9604-249e991adf5e',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '0',
   'date': 'Sun, 12 Apr 2020 11:28:20 GMT'},
  'RetryAttempts': 0}}