## Sentiment Inference Pipeline with Scikit-learn and XGBoost

Typically a Machine Learning (ML) process consists of few steps: data gathering with various ETL jobs, pre-processing the data, featurizing the dataset by incorporating standard techniques or prior knowledge, and finally training an ML model using an algorithm. In many cases, when the trained model is used for processing real time or batch prediction requests, the model receives data in a format which needs to pre-processed (e.g. featurized) before it can be passed to the algorithm. In the following notebook, we will demonstrate how you can build your ML Pipeline leveraging the Sagemaker Scikit-learn container and SageMaker XGBoost algorithm & after the model is trained, deploy the Pipeline (Data preprocessing and XGBoost) as an Inference Pipeline behind a single Endpoint for real time inference.

We'll use Sagemaker's Scikit-learn container to featurize the dataset so that it can be used for training with XGBoost.

In [1]:
import sagemaker
import boto3
from sagemaker.tuner import IntegerParameter, CategoricalParameter, ContinuousParameter, HyperparameterTuner

from io import StringIO
import numpy as np                                # For matrix operations and numerical processing
import pandas as pd                               # For munging tabular data
import os 
np.random.seed(0)
pd.set_option('display.max_colwidth', -1)
%matplotlib inline
 
region = boto3.Session().region_name    
smclient = boto3.Session().client('sagemaker')
sagemaker_session = sagemaker.Session()

role = sagemaker.get_execution_role()

bucket = 'sagemaker-xgboost-sentiment-classification'                     
prefix = 'sklearn-xgboost-sentiment-classification/inference-pipeline'

## Preprocessing data and training the model 

### Downloading dataset 

Datasets were uploaded to an S3 bucket and downloaded into the notebook as well using the was command line. Only data in the notebook can be opened in pandas and viewed - this helped ensure that the training data were correct. The inference pipeline connects to the same S3 sources to train and not to the data in the notebook instance.

In [2]:
# do not run if data is already loaded into sagemaker notebook instance directory
!aws s3 cp s3://sagemaker-xgboost-sentiment-classification/sklearn-xgboost-sentiment-classification/inference-pipeline/data/test.csv .
!aws s3 cp s3://sagemaker-xgboost-sentiment-classification/sklearn-xgboost-sentiment-classification/inference-pipeline/data/y_test.csv .
!aws s3 cp s3://sagemaker-xgboost-sentiment-classification/sklearn-xgboost-sentiment-classification/inference-pipeline/train/full_train.csv .

download: s3://sagemaker-xgboost-sentiment-classification/sklearn-xgboost-sentiment-classification/inference-pipeline/data/test.csv to ./test.csv
download: s3://sagemaker-xgboost-sentiment-classification/sklearn-xgboost-sentiment-classification/inference-pipeline/data/y_test.csv to ./y_test.csv
download: s3://sagemaker-xgboost-sentiment-classification/sklearn-xgboost-sentiment-classification/inference-pipeline/train/full_train.csv to ./full_train.csv


In [4]:
# use pandas to read and view data in sagemaker 
train = pd.read_csv('full_train.csv', encoding='utf-8', names=['text','rating', 
                       'aws_neg', 'aws_pos', 'aws_mix', 'target', 'weights'])
train.head()

Unnamed: 0,text,rating,aws_neg,aws_pos,aws_mix,target,weights
0,Magnificent experience with great service and excellent food,5,0.000408,0.977658,0.009516,1,0.082
1,"Breakfast good as usual, great looking over the bay.",4,0.000518,0.983629,0.010093,1,0.082
2,Best hamburgers in town!,5,0.001825,0.650549,0.002832,1,0.082
3,Good service. Good view. Good food.,4,0.000181,0.991041,0.005654,1,0.082
4,Good service got my ribs within 10 minutes. Hurry on don't miss out,5,0.001218,0.978226,0.014726,1,0.082


In [16]:
# connect the sagemaker session to the relevant files in S3
!aws s3 cp s3://sagemaker-xgboost-sentiment-classification/sklearn-xgboost-sentiment-classification/inference-pipeline/data/batch_train.csv .

download: s3://sagemaker-xgboost-sentiment-classification/sklearn-xgboost-sentiment-classification/inference-pipeline/data/batch_train.csv to ./batch_train.csv


In [9]:
# use pandas to read and view data in sagemaker 
batch_train = pd.read_csv('batch_train.csv', encoding='utf-8', names=['text','rating', 
                          'aws_neg', 'aws_pos', 'aws_mix'])
batch_train.head()

Unnamed: 0,text,rating,aws_neg,aws_pos,aws_mix
0,Magnificent experience with great service and excellent food,5,0.000408,0.977658,0.009516
1,"Breakfast good as usual, great looking over the bay.",4,0.000518,0.983629,0.010093
2,Best hamburgers in town!,5,0.001825,0.650549,0.002832
3,Good service. Good view. Good food.,4,0.000181,0.991041,0.005654
4,Good service got my ribs within 10 minutes. Hurry on don't miss out,5,0.001218,0.978226,0.014726


In [17]:
# run this to upload data from sagemaker instance to S3
fObj = open("batch_train.csv", 'rb')
folder_name = 'train'
key = os.path.join(prefix, folder_name, 'batch_train.csv')
boto3.Session().resource('s3').Bucket(bucket).Object(key).upload_fileobj(fObj)

## Create a Scikit-learn script to train with <a class="anchor" id="create_sklearn_script"></a>
To run Scikit-learn on Sagemaker `SKLearn` Estimator with a script as an entry point. The training script is very similar to a training script you might run outside of SageMaker, but you can access useful properties about the training environment through various environment variables, such as:

* SM_MODEL_DIR: A string representing the path to the directory to write model artifacts to. These artifacts are uploaded to S3 for model hosting.
* SM_OUTPUT_DIR: A string representing the filesystem path to write output artifacts to. Output artifacts may include checkpoints, graphs, and other files to save, not including model artifacts. These artifacts are compressed and uploaded to S3 to the same S3 prefix as the model artifacts.

Supposing two input channels, 'train' and 'test', were used in the call to the Chainer estimator's fit() method, the following will be set, following the format SM_CHANNEL_[channel_name]:

* SM_CHANNEL_TRAIN: A string representing the path to the directory containing data in the 'train' channel
* SM_CHANNEL_TEST: Same as above, but for the 'test' channel.

A typical training script loads data from the input channels, configures training with hyperparameters, trains a model, and saves a model to model_dir so that it can be hosted later. Hyperparameters are passed to your script as arguments and can be retrieved with an argparse.ArgumentParser instance.

An additional `custom.py` script was used to add customized transformation helper functions to the sklearn pipeline. 


## Create SageMaker Scikit Estimator <a class="anchor" id="create_sklearn_estimator"></a>

To run our Scikit-learn training script on SageMaker, we construct a `sagemaker.sklearn.estimator.sklearn` estimator, which accepts several constructor arguments:

* __entry_point__: The path to the Python script SageMaker runs for training and prediction.
* __role__: Role ARN
* __train_instance_type__ *(optional)*: The type of SageMaker instances for training. __Note__: Because Scikit-learn does not natively support GPU training, Sagemaker Scikit-learn does not currently support training on GPU instance types.
* __sagemaker_session__ *(optional)*: The session used to train on Sagemaker.


In [2]:
from sagemaker.sklearn.estimator import SKLearn
#from custom import ItemSelector, TextFeatures, clean_text, expand_contractions

script_path = 'sklearn-featurizer.py'

sklearn_preprocessor = SKLearn(
    entry_point=script_path,
    role=role,
    train_instance_type="ml.c5.xlarge",
    sagemaker_session=sagemaker_session,
    dependencies=['custom.py']) # add custom functions

train_prefix = 'sklearn-xgboost-sentiment-classification/inference-pipeline/train'
train_data = 's3://{}/{}/{}'.format(bucket, train_prefix, 'full_train.csv')

# connect the sagemaker session to the relevant files in S3
train_input = sagemaker.session.s3_input(train_data, distribution='FullyReplicated', 
                             content_type='text/csv', s3_data_type='S3Prefix')


In [5]:
sklearn_preprocessor.fit({'train': train_input})

2019-10-08 19:12:54 Starting - Starting the training job...
2019-10-08 19:13:14 Starting - Launching requested ML instances......
2019-10-08 19:14:15 Starting - Preparing the instances for training......
2019-10-08 19:15:15 Downloading - Downloading input data
2019-10-08 19:15:15 Training - Downloading the training image.[31m2019-10-08 19:15:27,417 sagemaker-containers INFO     Imported framework sagemaker_sklearn_container.training[0m
[31m2019-10-08 19:15:27,419 sagemaker-containers INFO     No GPUs detected (normal if no gpus installed)[0m
[31m2019-10-08 19:15:27,430 sagemaker_sklearn_container.training INFO     Invoking user training script.[0m
[31m2019-10-08 19:15:27,651 sagemaker-containers INFO     Module sklearn-featurizer does not provide a setup.py. [0m
[31mGenerating setup.py[0m
[31m2019-10-08 19:15:27,651 sagemaker-containers INFO     Generating setup.cfg[0m
[31m2019-10-08 19:15:27,651 sagemaker-containers INFO     Generating MANIFEST.in[0m
[31m2019-10-08 19:1

## Batch transform our training data 
Now that our proprocessor is properly fitted, let's go ahead and preprocess our training data. Let's use batch transform to directly preprocess the raw data and store right back into s3.

In [6]:
 # Define a SKLearn Transformer from the trained SKLearn Estimator
transformer = sklearn_preprocessor.transformer(
    instance_count=1, 
    instance_type='ml.c5.xlarge',
    assemble_with = 'Line',
    max_payload = 35,
    accept = 'text/csv')

# Increased waiting time and instance type because gateway 
# kept timing out
transformer.env = {"SAGEMAKER_MODEL_SERVER_TIMEOUT" : "3600"}

In [7]:
# Preprocess training input
transformer.transform(train_data, content_type='text/csv')
print('Waiting for transform job: ' + transformer.latest_transform_job.job_name)
transformer.wait()
preprocessed_train = transformer.output_path

Waiting for transform job: sagemaker-scikit-learn-2019-10-08-19-17-07-224
................[31mProcessing /opt/ml/code[0m
[31mBuilding wheels for collected packages: sklearn-featurizer
  Building wheel for sklearn-featurizer (setup.py): started
  Building wheel for sklearn-featurizer (setup.py): finished with status 'done'
  Created wheel for sklearn-featurizer: filename=sklearn_featurizer-1.0.0-py2.py3-none-any.whl size=7042 sha256=78894a6d10a24db32f1f0a65fae299fa3b5e6bf914ddf15ebe755baed1b2cfd5
  Stored in directory: /tmp/pip-ephem-wheel-cache-0l8b55ct/wheels/35/24/16/37574d11bf9bde50616c67372a334f94fa8356bc7164af8ca3[0m
[31mSuccessfully built sklearn-featurizer[0m
[31mInstalling collected packages: sklearn-featurizer[0m
[31mSuccessfully installed sklearn-featurizer-1.0.0[0m
[31m[2019-10-08 19:19:30 +0000] [37] [INFO] Starting gunicorn 19.9.0[0m
[31m[2019-10-08 19:19:30 +0000] [37] [INFO] Listening at: unix:/tmp/gunicorn.sock (37)[0m
[31m[2019-10-08 19:19:30 +0000] [37]

In [8]:
preprocessed_train

's3://sagemaker-us-east-1-944828514909/sagemaker-scikit-learn-2019-10-08-19-17-07-224'

## Run XGBoost Model with Preprocessed Data

In [9]:
from sagemaker.amazon.amazon_estimator import get_image_uri

training_image = get_image_uri(sagemaker_session.boto_region_name, 'xgboost', repo_version="0.90-1")
print (training_image)


683313688378.dkr.ecr.us-east-1.amazonaws.com/sagemaker-xgboost:0.90-1-cpu-py3


In [10]:
xgb_model = sagemaker.estimator.Estimator(training_image,
                                         role, 
                                         train_instance_count=1, 
                                         train_instance_type='ml.m4.xlarge',
                                         train_volume_size = 2,
                                         train_max_run = 3600,
                                         input_mode= 'File',
                                         sagemaker_session=sagemaker_session)

xgb_model.set_hyperparameters(objective = "multi:softprob",
                              base_score = 0,
                              eta = .1,
                              gamma = 0,
                              max_depth = 3,
                              num_round = 100,
                              num_class=3,
                              csv_weights=1,
                              subsample = 1,
                              silent = 0)

preprocessed_train_data = sagemaker.session.s3_input(
    preprocessed_train, 
    distribution='FullyReplicated',
    content_type='text/csv', 
    s3_data_type='S3Prefix')

data_channels = {'train': preprocessed_train_data}
xgb_model.fit(inputs=data_channels, logs=True)

2019-10-08 19:21:20 Starting - Starting the training job...
2019-10-08 19:21:21 Starting - Launching requested ML instances......
2019-10-08 19:22:23 Starting - Preparing the instances for training......
2019-10-08 19:23:43 Downloading - Downloading input data
2019-10-08 19:23:43 Training - Downloading the training image...
2019-10-08 19:24:05 Training - Training image download completed. Training in progress.[31mINFO:sagemaker-containers:Imported framework sagemaker_xgboost_container.training[0m
[31mINFO:sagemaker-containers:Failed to parse hyperparameter objective value multi:softprob to Json.[0m
[31mReturning the value itself[0m
[31mINFO:sagemaker-containers:No GPUs detected (normal if no gpus installed)[0m
[31mINFO:sagemaker_xgboost_container.training:Running XGBoost Sagemaker in algorithm mode[0m
[31mINFO:root:Determined delimiter of CSV input is ','[0m
[31mINFO:root:Determined delimiter of CSV input is ','[0m
[31m[19:24:07] 146567x63 matrix with 9233721 entries loa

In [11]:
preprocessed_train_data

<sagemaker.inputs.s3_input at 0x7f2098767ef0>

## Serial Inference Pipeline with Scikit preprocessor and XGBoost Model

### Set up the inference pipeline <a class="anchor" id="pipeline_setup"></a>
Setting up a Machine Learning pipeline can be done with the Pipeline Model. This sets up a list of models in a single endpoint; in this example, we configure our pipeline model with the fitted Scikit-learn preprocessing step and the fitted xgboost model. Deploying the model follows the same ```deploy``` pattern in the SDK.

In [12]:
from sagemaker.model import Model
from sagemaker.pipeline import PipelineModel
from time import gmtime, strftime

timestamp_prefix = strftime("%Y-%m-%d-%H-%M-%S", gmtime())

sklearn_preprocessing_model = sklearn_preprocessor.create_model()
xgb_sentiment_model = xgb_model.create_model()

# set environment variable in first container to ensure 
# the content coming out from the preprocessing model is in the right format
sklearn_preprocessing_model.env = {"SAGEMAKER_DEFAULT_INVOCATIONS_ACCEPT":"text/csv"}

model_name = 'inference-pipeline-' + timestamp_prefix
endpoint_name = 'inference-pipeline-ep-2019-10-07-23-33-51' #'inference-pipeline-ep-' + timestamp_prefix
sm_pipeline = PipelineModel(
    name=model_name, 
    role=role, 
    models=[
        sklearn_preprocessing_model, 
        xgb_sentiment_model])

sm_pipeline.deploy(initial_instance_count=1, instance_type='ml.c4.xlarge', endpoint_name=endpoint_name)

--------------------------------------------------------------------------------------------------!

In [46]:
# The endpoint was updated after being created because 
# the entry script was changed. 
endpoint_name = 'inference-pipeline-ep-2019-10-07-23-33-51'

sm_pipeline.deploy(initial_instance_count=1, instance_type='ml.c4.xlarge', 
                   endpoint_name=endpoint_name, update_endpoint = True)

## Make a request to our pipeline endpoint <a class="anchor" id="pipeline_inference_request"></a>

Here we just grab the first line from the test data (you'll notice that the inference python script is very particular about the ordering of the inference request data). The ```ContentType``` field configures the first container, while the ```Accept``` field configures the last container. You can also specify each container's ```Accept``` and ```ContentType``` values using environment variables.

We make our request with the payload in ```'text/csv'``` format, since that is what our script currently supports. If other formats need to be supported, this would have to be added to the ```output_fn()``` method in our entry point. Note that we set the ```Accept``` to ```text/csv```, since XGBoost does not support ```application/json``` ```Accept```. The prediction output in this case is trying to guess the number of rings the abalone specimen would have given its other physical features; the actual number of rings is 10.

In [None]:
from sagemaker.predictor import json_serializer, csv_serializer, json_deserializer, RealTimePredictor

payload = '"The food was nice, element of improvement on the preparation of the ribs should be grilled. \
Please invest in a flat top griller. Use a different bbq sauce for the chicken wings and  ribs. \
Overall service from Andries was great. ", 4, 0.010453076, 0.754360855, 0.209270433'

#payload = '"Very good food and plenty parking ." ,4,0.002821277,0.9654154779999999,0.00835315'

predictor = RealTimePredictor(endpoint=endpoint_name,
                              sagemaker_session=sagemaker_session,
                              serializer=csv_serializer,
                              content_type=CONTENT_TYPE_CSV,
                              accept=CONTENT_TYPE_CSV)

print(predictor.predict(payload))

### Delete Endpoint

Only delete if no longer in use. To reactivate - rerun `sm_pipeline.deploy(initial_instance_count=1, instance_type='ml.c4.xlarge', endpoint_name=endpoint_name)`

In [16]:
smclient.delete_endpoint(EndpointName=endpoint_name)

{'ResponseMetadata': {'RequestId': '80a5d025-a321-4d22-b80c-6215f0be5059',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '80a5d025-a321-4d22-b80c-6215f0be5059',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '0',
   'date': 'Tue, 08 Oct 2019 19:43:14 GMT'},
  'RetryAttempts': 0}}