## Creating Inference Pipeline in SageMaker to Conduct Feature Processing, NTM Training and Deployment

1. [Introduction](#Introduction)
2. [Preprocessing](#Preprocessing)   
   1. [Create-Features-Through-SparkML-jobs](#Create-Features-Through-SparkML-jobs)
3. [Create Training Validation and Test Datasets](#Create-Training-Validation-and-Test-Datasets)
   1. [Store Headlines on S3 in Protobuf format](#Store-Headlines-on-S3-in-Protobuf-format)
4. [Model Training](#Model-Training)
5. [SageMaker Inference Pipeline](#SageMaker-Inference-Pipeline)
   1. [Real Time Predictions](#Real-Time-Predictions)
   2. [Batch Predictions](#Batch-Predictions)

## Introduction
Through this notebook, we will demonstrate how SageMaker platform can be used to automate feature processing through Glue, model training, deployment, and inference. Being able to automate these key stages in machine learning life cycle, will enable data scientists and machine learning engineers to relatively quickly create production-ready solutions for business problems.

#### We will follow the below process to illustrate key ideas
-  Create processed dataset using Amazon Glue ETL service to run SparkML jobs
-  Identify topics in the processed dataset via training NTM algorithm
-  Create inference pipeline consisting of SparkML and NTM models for real time predictions
-  Create inference pipeline consisting of SparkML and NTM models for batch predictions


#### About the Dataset
To illustrate the concepts, we will use [ABC Millions](https://www.kaggle.com/therohk/million-headlines) Headlines dataset. The dataset contains approximately a million news headlines 

## Preprocessing

### Create Features Through SparkML jobs

AWS Glue is a serverless ETL service, which can execute PySpark/Spark jobs. We will run SparkML jobs using AWS Glue. We will need to assign the current notebook a role, so it can access the Glue service.

#### Configure the current notebook to access AWS Glue service

We will first retrieve the current execution role of the notebook. We will then navigate to [IAM Dashboard](http://console.aws.amazon.com/iam/home) to edit the Role to include AWS Glue specific permission

In [1]:
import sagemaker
from sagemaker import get_execution_role

sess = sagemaker.Session()
role = get_execution_role()
default_bucket = 'ai-in-aws'

print(role)

arn:aws:iam::413491515223:role/service-role/AmazonSageMaker-ExecutionRole-20190822T170423


Add Glue as an trusted entity to this role:

 On the IAM Dashboard, click on __Roles__ on the left-side nav and search for this Role. Click on the target Role to navigate to **Summary** page. Click on **Trust Relationships** tab to add AWS Glue as an additional trusted entity.

Click on **Edit trust relationship** to add the following entry to "Service" key:

"glue.amazonaws.com"

#### Write the feature processing script using SparkML

We are assuming that the source data is already unzipped and uploaded to your S3 bucket (inference-pipeline/input). We will upload the feature processing script (abcheadlines_processing.py) to s3, so that Glue can run the script as a Pyspark job. 


The feature processing script conducts the following main functions:
- Filter the dataset to include only ~100k abc news headlines
- Use SparkML feature transformers to tokenize the headlines, remove stop words, get word & document frequency
- The processed data to saved to the designated S3 bucket
- The SparkML PipelineModel is serialized using MLeap

[MLeap](http://mleap-docs.combust.ml/) is a serialization format and execution engine for machine learning pipelines. It serializes the pipeline to an MLeap bundle, which enables data scientists to take models to wherever they go. It supports Spark, scikit-learn and tensorflow for training pipelines.

In [12]:
script_location = sess.upload_data(path='abcheadlines_processing.py', bucket=default_bucket, key_prefix='inference-pipeline/codes')

#### Upload MLeap Dependencies to S3
MLeap related software packages need to be made available to Glue job. Download them from the following aws locations

In [3]:
!wget https://s3-us-west-2.amazonaws.com/sparkml-mleap/0.9.6/python/python.zip
!wget https://s3-us-west-2.amazonaws.com/sparkml-mleap/0.9.6/jar/mleap_spark_assembly.jar

--2019-09-05 01:19:41--  https://s3-us-west-2.amazonaws.com/sparkml-mleap/0.9.6/python/python.zip
Resolving s3-us-west-2.amazonaws.com (s3-us-west-2.amazonaws.com)... 52.218.192.200
Connecting to s3-us-west-2.amazonaws.com (s3-us-west-2.amazonaws.com)|52.218.192.200|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 36872 (36K) [application/zip]
Saving to: ‘python.zip’


2019-09-05 01:19:42 (507 KB/s) - ‘python.zip’ saved [36872/36872]

--2019-09-05 01:19:42--  https://s3-us-west-2.amazonaws.com/sparkml-mleap/0.9.6/jar/mleap_spark_assembly.jar
Resolving s3-us-west-2.amazonaws.com (s3-us-west-2.amazonaws.com)... 52.218.192.200
Connecting to s3-us-west-2.amazonaws.com (s3-us-west-2.amazonaws.com)|52.218.192.200|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 17319576 (17M) [application/java-archive]
Saving to: ‘mleap_spark_assembly.jar’


2019-09-05 01:19:44 (8.46 MB/s) - ‘mleap_spark_assembly.jar’ saved [17319576/17319576]



In [14]:
python_dep_location = sess.upload_data(path='python.zip', bucket=default_bucket, key_prefix='inference-pipeline/dependencies/python')
jar_dep_location = sess.upload_data(path='mleap_spark_assembly.jar', bucket=default_bucket, key_prefix='inference-pipeline/dependencies/jar')

#### Designate input/output locations for SparkML model

In [9]:
from time import gmtime, strftime
import time

timestamp_prefix = strftime("%Y-%m-%d-%H-%M-%S", gmtime())

s3_input_bucket = default_bucket
s3_input_key_prefix = 'inference-pipeline/input'
s3_input_fn = 'abcnews-date-text.csv.gz' 


s3_output_bucket = default_bucket
s3_output_key_prefix = 'inference-pipeline/output/' + timestamp_prefix

s3_model_bucket = default_bucket
s3_model_key_prefix = s3_output_key_prefix + '/mleap'


**Upload ABC News Headlines to S3 bucket**
Before uploading .gz version of news headlines, make sure that the .zip version of headlines is available in present working directory on local SageMaker compute instance

In [None]:
import gzip
from zipfile import ZipFile
sr_fn = 'abcnews-date-text'


with ZipFile(sr_fn + '.zip', 'r') as zip:
    news_data = zip.read(sr_fn + '.csv')
    gz_news = gzip.open(sr_fn + '.csv.gz', 'wb')
    gz_news.write(news_data)
    gz_news.close()
    
sess.upload_data(path=sr_fn+'.csv.gz', bucket=default_bucket, key_prefix=s3_input_key_prefix)

#### Invoke Glue API

In [20]:
boto_session = sess.boto_session
s3 = boto_session.resource('s3')

glue_client = boto_session.client('glue')
job_name = 'sparkml-abcnews-' + timestamp_prefix

response = glue_client.create_job(
    Name=job_name,
    Description='PySpark job to featurize the ABC Headlines dataset',
    Role=role, # you can pass your existing AWS Glue role here if you have used Glue before
    ExecutionProperty={
        'MaxConcurrentRuns': 1
    },
    Command={
        'Name': 'glueetl',
        'ScriptLocation': script_location
    },
    DefaultArguments={
        '--job-language': 'python',
        '--extra-jars' : jar_dep_location,
        '--extra-py-files': python_dep_location
    },
    AllocatedCapacity=10,
    Timeout=60,
)
glue_job_name = response['Name']

print(glue_job_name)

sparkml-abcnews-2019-09-05-11-42-09


In [21]:
job_run_id = glue_client.start_job_run(JobName=job_name,
                                       Arguments = {
                                        '--S3_INPUT_BUCKET': s3_input_bucket,
                                        '--S3_INPUT_KEY_PREFIX': s3_input_key_prefix,
                                        '--S3_INPUT_FILENAME': s3_input_fn,  
                                        '--S3_OUTPUT_BUCKET': s3_output_bucket,
                                        '--S3_OUTPUT_KEY_PREFIX': s3_output_key_prefix,
                                        '--S3_MODEL_BUCKET': s3_model_bucket,
                                        '--S3_MODEL_KEY_PREFIX': s3_model_key_prefix
                                       })['JobRunId']
print(job_run_id)

jr_7a04ae15fef0095eb1ff0ed2f180a0491ab41677c9500e400bcf31c794db6cc1


#### Check Glue Job Status

In [22]:
job_run_status = glue_client.get_job_run(JobName=job_name,RunId=job_run_id)['JobRun']['JobRunState']
while job_run_status not in ('FAILED', 'SUCCEEDED', 'STOPPED'):
    job_run_status = glue_client.get_job_run(JobName=job_name,RunId=job_run_id)['JobRun']['JobRunState']
    print (job_run_status)
    time.sleep(30)

RUNNING
RUNNING
RUNNING
RUNNING
SUCCEEDED


#### Read the ABC News Headlines dataset (processed) from S3 bucket

In [2]:
    
import pandas as pd
import numpy as np

from sklearn.preprocessing import normalize
from scipy.sparse import csr_matrix
import io
import os
from os import listdir
from os.path import isfile, join
import boto3
from sagemaker.amazon.amazon_estimator import get_image_uri
import sagemaker.amazon.common as smac
from sagemaker.session import s3_input
from sagemaker.predictor import csv_serializer, json_deserializer
import warnings
warnings.simplefilter(action='ignore')
warnings.simplefilter(action='ignore', category=FutureWarning)
from sagemaker.predictor import json_serializer, csv_serializer, json_deserializer, RealTimePredictor
from sagemaker.content_types import CONTENT_TYPE_CSV, CONTENT_TYPE_JSON
from sagemaker.model import Model
from sagemaker.pipeline import PipelineModel
from sagemaker.sparkml.model import SparkMLModel
import sagemaker.amazon.common as smamzc

In [25]:
s3 = boto3.resource('s3')
my_bucket = s3.Bucket(default_bucket)

files = my_bucket.objects.filter(Prefix=s3_output_key_prefix)

for f in files:
    if '.csv' in f.key:
        abcnews_df = pd.read_csv(os.path.join('s3://', s3_output_bucket, f.key))

In [26]:
abcnews_df.shape

(110365, 200)

In [28]:
abcnews_csr = csr_matrix(abcnews_df, dtype=np.float32)
print(abcnews_csr[:16].toarray())

[[0. 0. 0. ... 0. 0. 0.]
 [0. 0. 0. ... 0. 0. 0.]
 [0. 0. 0. ... 0. 0. 0.]
 ...
 [0. 0. 0. ... 0. 0. 0.]
 [0. 0. 0. ... 0. 0. 0.]
 [0. 0. 0. ... 0. 0. 0.]]


In [32]:
abcnews_csr.shape[0]

110365

## Create Training Validation and Test Datasets

In [35]:
vol_train = int(0.8 * abcnews_csr.shape[0])

train_data = abcnews_csr[:vol_train, :] 
test_data = abcnews_csr[vol_train:, :] 

vol_test = test_data.shape[0]
val_data = test_data[:vol_test//2, :]
test_data = test_data[vol_test//2:, :]

In [30]:
print(train_data.shape, test_data.shape, val_data.shape)

(88292, 200) (11037, 200) (11036, 200)


### Store Headlines on S3 in Protobuf format
The NTM algorithm, and other built-in SageMaker algorithms, accepts data in CSV or RecordIO Protobuf format. SageMaker algorithms work the best when input data is provided in RecordIO wrapped Protobuf format, an efficient format to encode/serialize structured data

In [10]:
train_prefix = os.path.join(s3_input_key_prefix,'train')
val_prefix = os.path.join(s3_input_key_prefix, 'val')
output_prefix = 'inference-pipeline/output'
aux_prefix = os.path.join(s3_input_key_prefix, 'aux')

s3loc_train_data = os.path.join('s3://', default_bucket, train_prefix)
s3loc_val_data = os.path.join('s3://', default_bucket, val_prefix)
s3loc_aux_data = os.path.join('s3://', default_bucket, aux_prefix)
output_path = os.path.join('s3://', default_bucket, output_prefix)

In [36]:
def convert_to_pbr(sprse_matrix, bucket, prefix, fname):
    data_bytes = io.BytesIO()
    smamzc.write_spmatrix_to_sparse_tensor(array=sprse_matrix, file=data_bytes, labels=None)
    data_bytes.seek(0)

    file_name = os.path.join(prefix, fname)
    boto3.resource('s3').Bucket(bucket).Object(file_name).upload_fileobj(data_bytes)

In [37]:
convert_to_pbr(train_data, default_bucket, train_prefix, 'train.pbr')
convert_to_pbr(val_data, default_bucket, val_prefix, 'val.pbr')

In [44]:
s3 = boto3.resource('s3')

files = my_bucket.objects.filter(Prefix=s3_output_key_prefix)

for f in files:
    if '.txt' in f.key:
        s3.Bucket(default_bucket).download_file(f.key, 'vocab.txt')

        

In [49]:
vocabFN_location = sess.upload_data(path='vocab.txt', bucket=default_bucket, key_prefix=aux_prefix)

## Model Training

To train NTM in SageMaker, we obtain registry path of training docker image of NTM. Additionally, we create Estimator object from SageMaker Python SDK to provide infrastructure specifications. Then, we set hyperparameters and call fit() method of the estimator created to start training.

In [3]:
container = get_image_uri(boto3.Session().region_name, 'ntm')

In [6]:
sess = sagemaker.Session()
ntm_estmtr_abc = sagemaker.estimator.Estimator(container,
                                   role,
                                   train_instance_count=1,
                                   train_instance_type='ml.c4.xlarge',
                                   output_path=output_path,
                                   sagemaker_session=sess)

We will set hyperparameters of NTM. You will deep dive into NTM in next chapter, where you will learn about each of the hyperparameters

In [14]:
num_topics = 5
vocab_size = 200
ntm_estmtr_abc.set_hyperparameters(num_topics=num_topics, feature_dim=vocab_size, mini_batch_size=30, epochs=150, num_patience_epochs=3, tolerance=.001)

In [11]:
s3_train = s3_input(s3loc_train_data, content_type='application/x-recordio-protobuf')
s3_val = s3_input(s3loc_val_data, content_type='application/x-recordio-protobuf')

In [12]:
s3_aux = s3_input(s3loc_aux_data, content_type='text/plain')

In [15]:
ntm_estmtr_abc.fit({'train': s3_train, 'validation': s3_val, 'auxiliary': s3_aux})

2019-09-05 17:11:06 Starting - Starting the training job...............
2019-09-05 17:13:15 Starting - Launching requested ML instances......
2019-09-05 17:14:18 Starting - Preparing the instances for training......
2019-09-05 17:15:24 Downloading - Downloading input data
2019-09-05 17:15:24 Training - Downloading the training image...
2019-09-05 17:15:54 Training - Training image download completed. Training in progress.
[31mDocker entrypoint called with argument(s): train[0m
  from numpy.testing import nosetester[0m
[31m[09/05/2019 17:15:57 INFO 140644550915904] Reading default configuration from /opt/amazon/lib/python2.7/site-packages/algorithm/default-input.json: {u'num_patience_epochs': u'3', u'clip_gradient': u'Inf', u'encoder_layers': u'auto', u'optimizer': u'adadelta', u'_kvstore': u'auto_gpu', u'rescale_gradient': u'1.0', u'_tuning_objective_metric': u'', u'_num_gpus': u'auto', u'learning_rate': u'0.01', u'_data_format': u'record', u'sub_sample': u'1.0', u'epochs': u'50', 

#### Topics extracted, with the confidence range

[0.40, 0.94] defends decision denies war anti pm warns un bush report iraq calls public australia minister backs wins tas plans chief

[0.52, 0.77] clash top win world tour test pakistan back record cup killed title final talks england set australia us still pm

[0.45, 0.90] urged indigenous water power take call lead boost final residents get wa act funds england centre fire help plan funding

[0.51, 0.72] new record says found strike set win cup south police fire us go pay court plan rise australia bid deal

[0.54, 0.93] charged dies murder man charges crash death dead car two woman accident face charge found attack police injured court sydney

In [16]:
print('Training job name: {}'.format(ntm_estmtr_abc.latest_training_job.job_name))

Training job name: ntm-2019-09-05-17-11-06-777


## SageMaker Inference Pipeline

SageMaker Python SDK provides classes, such Model, SparkMLModel, & PipelineModel, to create an inference pipeline that can be used to conduct feature processing and then fit target algorithm to the processed data. Subsequently, the PipelineModel created can be deployed as an endpoint for real time inferences. Additionally, the PipelineModel can also be deployed in batch mode (Batch Transform), to get inferences for a large volume of data points. 

The SparkMLModel requires schema of the input dataset

In [3]:
import json
schema = {
    "input": [
        {
            "name": "headline_text",
            "type": "string"
        }
    ],
    "output": 
        {
            "name": "features",
            "type": "double",
            "struct": "vector"
        }
}
schema_json = json.dumps(schema)
print(schema_json)

{"input": [{"name": "headline_text", "type": "string"}], "output": {"name": "features", "type": "double", "struct": "vector"}}


### Real Time Predictions

In [19]:

modeldataurl = 's3://{}/{}/{}/{}'.format(default_bucket, output_prefix, ntm_estmtr_abc.latest_training_job.job_name, 'output/model.tar.gz')

sparkml_data = 's3://{}/{}/{}'.format(default_bucket, s3_model_key_prefix, 'model.tar.gz')

ntm_model = Model(model_data=modeldataurl, image=container)

sparkml_model = SparkMLModel(model_data=sparkml_data, env={'SAGEMAKER_SPARKML_SCHEMA' : schema_json})


model_name = 'inference-pipeline-' + strftime("%Y-%m-%d-%H-%M-%S", gmtime())
sm_model = PipelineModel(name=model_name, role=role, models=[sparkml_model, ntm_model])

##### Deploy the PipelineModel 

In [20]:
endpoint_name = 'inference-pipeline-ep-' + strftime("%Y-%m-%d-%H-%M-%S", gmtime())
sm_model.deploy(initial_instance_count=1, instance_type='ml.c4.xlarge', endpoint_name=endpoint_name)

--------------------------------------------------------------------------------------------------!

##### Pass json payload
Because the output of SparkML model is a dense vector, we will use JSON format (instead of CSV format) to pass input to the pipeline model.

In [22]:
payload = {
    "schema": {
        "input": [
        {
            "name": "headline_text",
            "type": "string"
        }, 
    ],
    "output": 
        {
            "name": "features",
            "type": "double",
            "struct": "vector"
        }
    },
    "data": [
        ["lisa scaffidi public hearing possible over expenses scandal"]
            ]
            
}

predictor = RealTimePredictor(endpoint=endpoint_name, sagemaker_session=sess, serializer=json_serializer,
                                content_type=CONTENT_TYPE_JSON, accept=CONTENT_TYPE_CSV)

print(predictor.predict(payload))

b'{"predictions":[{"topic_weights":[0.5172129869,0.0405323133,0.2246916145,0.1741439849,0.0434190407]}]}'


### Batch Predictions

In [39]:
model_name

'inference-pipeline-2019-09-06-00-52-01'

Get a sample of headlines for batch scoring - i.e. retrieve topic mixture for each of the headlines

In [8]:
batch_ip_fn = 'abcnews-batch-input.csv'
abchl = pd.read_csv(s3_input_fn)
abchl['headline_text'][200000:200027].to_csv(batch_ip_fn, index=False)

In [None]:
sess.upload_data(path=batch_ip_fn, bucket=default_bucket, key_prefix='inference-pipeline/batch')

In [14]:
input_data_path = 's3://{}/{}/{}'.format(default_bucket, 'inference-pipeline/batch', batch_ip_fn)

output_data_path = 's3://{}/{}/{}'.format(default_bucket, 'inference-pipeline/batch/abcnews_output', timestamp_prefix)

job_name = 'serial-inference-batch-' + strftime("%Y-%m-%d-%H-%M-%S", gmtime())

model_name = 'inference-pipeline-2019-09-06-00-52-01'

transformer = sagemaker.transformer.Transformer(
    model_name = model_name,
    instance_count = 1,
    instance_type = 'ml.m4.xlarge',
    strategy = 'SingleRecord',
    assemble_with = 'Line',
    output_path = output_data_path,
    base_transform_job_name='serial-inference-batch',
    sagemaker_session=sess,
    accept = CONTENT_TYPE_CSV
)

transformer.transform(data = input_data_path,
                      job_name = job_name,
                      content_type = CONTENT_TYPE_CSV, 
                      split_type = 'Line')
transformer.wait()

.....................................................!
