# Feature processing with Spark, training with XGBoost and Blazinfg algorithms and deploying as Inference Pipeline

Typically a Machine Learning (ML) process consists of few steps: gathering data with various ETL jobs, pre-processing the data, featurizing the dataset by incorporating standard techniques or prior knowledge, and finally training an ML model using an algorithm.

In many cases, when the trained model is used for processing real time or batch prediction requests, the model receives data in a format which needs to pre-processed (e.g. featurized) before it can be passed to the algorithm. In the following notebook, we will demonstrate how you can build your ML Pipeline leveraging Spark Feature Transformers and SageMaker XGBoost algorithm & after the model is trained, deploy the Pipeline (Feature Transformer and XGBoost) as an Inference Pipeline behind a single Endpoint for real-time inference and for batch inferences using Amazon SageMaker Batch Transform.

In this notebook, we use EMR to run serverless Spark. 

## Objective: predict sentiment

## Methodologies
The Notebook consists of a few high-level steps:

* Using AWS Glue for executing the SparkML feature processing job.
* Using SageMaker XGBoost to train on the processed dataset produced by SparkML job.
* Building an Inference Pipeline consisting of SparkML & XGBoost models for a realtime inference endpoint.
* Building an Inference Pipeline consisting of SparkML & XGBoost models for a single Batch Transform job.

# Using EMR for executing the SparkML job

See notebook sparkml_serving_emr_mleap_sentiment_analysis to setup emr cluster.

## Writing the feature processing script using SparkML

The code for feature transformation using SparkML can be found in `abalone_processing.py` file written in the same directory. You can go through the code itself to see how it is using standard SparkML constructs to define the Pipeline for featurizing the data.

Once the Spark ML Pipeline `fit` and `transform` is done, we are splitting our dataset into 80-20 train & validation as part of the script and uploading to S3 so that it can be used with XGBoost for training.

In [None]:
%%info

In [None]:
from __future__ import print_function

import os
import shutil
import boto3

import pyspark
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.sql.functions import *
from pyspark.sql.types import *
from mleap.pyspark.spark_support import SimpleSparkSerializer

In [None]:
#defining schema of the dataset
def get_schema_structure():
    
    #creating schema for dataset
    data_schema = [
        StructField("label", IntegerType(), True),
        StructField("ids", LongType(), True),
        StructField("date", StringType(), True),
        StructField("flag", StringType(), True),
        StructField("user", StringType(), True),
        StructField("text", StringType(), True)
    ]
    return StructType(fields=data_schema)

In [None]:
DATASET_ENCODING = "ISO-8859-1"
TEST_SIZE = 0.3
TRAIN_SIZE = 0.7
VAL_SIZE = 0.2
DATA_DIR = 'data'
VOCABULARY_SIZE = 20000

In [None]:
df_total = spark.read\
.schema(get_schema_structure())\
.format('csv')\
.option('encoding', DATASET_ENCODING)\
.option('header','false')\
.csv('s3a://sagemaker-us-east-2-446439287457/sagemaker/twitter/data/train.csv')\
.select('label', 'text')

In [None]:
df_total_label = df_total.withColumn("label",
              when(df_total["label"] == 4, 1).otherwise(df_total["label"])).select('label', 'text')

In [None]:
df_total_label.show(5)

In [None]:
 #defining a manual stop list
def get_stop_words_list():
            stop_words_list = ['link','google','facebook','yahoo','rt','i', 'me', 'my', 'myself', 'tag'
                              'we', 'our', 'ours', 'ourselves', 'you', "you're", "you've", "you'll",
                              "you'd", 'your', 'yours', 'yourself', 'yourselves', 'he', 'him', 'his',
                              'himself', 'she', "she's", 'her', 'hers', 'herself', 'it', "it's", 'its',
                              'itself', 'they', 'them', 'their', 'theirs', 'themselves', 'what', 'which',
                              'who', 'whom', 'this', 'that', "that'll", 'these', 'those', 'am', 'is', 'are',
                              'was', 'were', 'be', 'been', 'being', 'have', 'has', 'had', 'having', 'do',
                              'does', 'did', 'doing', 'a', 'an', 'the', 'and', 'but', 'if', 'or', 'because',
                              'as', 'until', 'while', 'of', 'at', 'by', 'for', 'with', 'about', 'against',
                              'between', 'into', 'through', 'during', 'before', 'after', 'above', 'below',
                              'to', 'from', 'up', 'down', 'in', 'out', 'on', 'off', 'over', 'under', 'again',
                              'further', 'then', 'once', 'here', 'there', 'when', 'where', 'why', 'how', 'all',
                              'any', 'both', 'each', 'few', 'more', 'most', 'other', 'some', 'such',
                              'only', 'own', 'same', 'so', 'than', 'too', 's', 't', 'can', 'will',
                              'just', 'should', "should've", 'now', 'd', 'll', 'm', 'o', 're', 've',
                              'y', 'ain', 'ma', 'u', 'aren', 'ø', 'å', 'æ', 'b', 'c', 'd', 'e']

            return stop_words_list

In [None]:
from pyspark.ml.feature import CountVectorizer, NGram, StopWordsRemover, Tokenizer

tokenizer = Tokenizer(inputCol="text", outputCol="words")

remover = StopWordsRemover(inputCol="words", outputCol="filtered")
remover.setStopWords(get_stop_words_list())

ngram = NGram(n=2, inputCol="filtered", outputCol="ngrams")
count_vectorizer = CountVectorizer(inputCol="ngrams", outputCol="features").setVocabSize(VOCABULARY_SIZE)

In [None]:
pipeline = Pipeline(stages = [ 
    tokenizer,
    remover,
    ngram,
    count_vectorizer
])

In [None]:
train, validation = df_total_label.randomSplit([TRAIN_SIZE, TEST_SIZE], seed=2)

In [None]:
model = pipeline.fit(train)

In [None]:
transformed_train_df = model.transform(train)

In [None]:
transformed_train_df.show(10)

In [None]:
transformed_validation_df = model.transform(validation)

In [None]:
def csv_line(data):
    r = ','.join(str(d) for d in data[1])
    return str(data[0]) + "," + r

In [None]:
s3_output_bucket = 'sagemaker-us-east-2-446439287457'
s3_output_key_prefix = 'emr/sentiment/xgboost'
s3_output_location = 's3://{}/{}/{}'.format(s3_output_bucket, s3_output_key_prefix, 'data')

In [None]:
transformed_train_rdd = transformed_train_df.rdd.map(lambda x: (x.label, x.features))
lines = transformed_train_rdd.map(csv_line)
lines.coalesce(1).saveAsTextFile(s3_output_location + '/train')

In [None]:
transformed_validation_rdd = transformed_validation_df.rdd.map(lambda x: (x.label, x.features))
lines = transformed_validation_rdd.map(csv_line)
lines.coalesce(1).saveAsTextFile(s3_output_location + '/validation')

### For Blazing use this

In [None]:
from pyspark.ml.feature import CountVectorizer, NGram, StopWordsRemover, Tokenizer

tokenizer = Tokenizer(inputCol="text", outputCol="words")

remover = StopWordsRemover(inputCol="words", outputCol="features")
remover.setStopWords(get_stop_words_list())

In [None]:
pipeline = Pipeline(stages = [ 
    tokenizer,
    remover
])

In [None]:
train, validation = df_total_label.randomSplit([TRAIN_SIZE, TEST_SIZE], seed=2)
model = pipeline.fit(train)
transformed_train_df = model.transform(train)
transformed_validation_df = model.transform(validation)

In [None]:
transformed_train_df.show(10)

In [None]:
def csv_line(data):
    r = ' '.join(d for d in str(data[1]))
    return ('__label__' + str(data[0])) + " " + r

In [None]:
s3_output_bucket = 'sagemaker-us-east-2-446439287457'
s3_output_key_prefix = 'emr/sentiment/blazing'
s3_output_location = 's3://{}/{}/{}'.format(s3_output_bucket, s3_output_key_prefix, 'data')

### Serializing the trained Spark ML Model with [MLeap](https://github.com/combust/mleap)
Apache Spark is best suited batch processing workloads. In order to use the Spark ML model we trained for low latency inference, we need to use the MLeap library to serialize it to an MLeap bundle and later use the [SageMaker SparkML Serving](https://github.com/aws/sagemaker-sparkml-serving-container) to perform realtime and batch inference. 

By using the `SerializeToBundle()` method from MLeap in the script, we are serializing the ML Pipeline into an MLeap bundle and uploading to S3 in `tar.gz` format as SageMaker expects.

In [None]:
#run it if you need to clean
import os
os.remove('/tmp/model.zip')
os.remove('/tmp/model.tar.gz')
shutil.rmtree('/tmp/model')

In [None]:
SimpleSparkSerializer().serializeToBundle(model, "jar:file:/tmp/model.zip", transformed_validation_df)

In [None]:
import zipfile
with zipfile.ZipFile("/tmp/model.zip") as zf:
    zf.extractall("/tmp/model")
    
import tarfile
with tarfile.open("/tmp/model.tar.gz", "w:gz") as tar:
    tar.add("/tmp/model/bundle.json", arcname='bundle.json')
    tar.add("/tmp/model/root", arcname='root')   

In [None]:
# Please replace the bucket name with your bucket name where you want to upload the model
s3 = boto3.resource('s3') 
file_name = os.path.join("emr/sentiment/blazing/model", 'model.tar.gz')
s3.Bucket('sagemaker-us-east-2-446439287457').upload_file('/tmp/model.tar.gz', file_name)

## Using SageMaker XGBoost to train on the processed dataset produced by SparkML job

Now we will use SageMaker XGBoost algorithm to train on this dataset. We already know the S3 location
where the preprocessed training data was uploaded as part of the Glue job.

In [2]:
# Import SageMaker Python SDK to get the Session and execution_role
import sagemaker
from sagemaker import get_execution_role
sess = sagemaker.Session()
role = get_execution_role()
print(role[role.rfind('/') + 1:])

from sagemaker.amazon.amazon_estimator import get_image_uri

AmazonSageMaker-ExecutionRole-20190904T205879


### Next XGBoost model parameters and dataset details will be set properly
We have parameterized this Notebook so that the same data location which was used in the PySpark script can now be passed to XGBoost Estimator as well.

In [None]:
training_image = get_image_uri(sess.boto_region_name, 'xgboost', repo_version="latest")
print (training_image)

In [None]:
s3_output_bucket = 'sagemaker-us-east-2-446439287457'
s3_output_key_prefix = 'emr/sentiment/xgboost'
s3_output_location = 's3://{}/{}/'.format(s3_output_bucket, s3_output_key_prefix)

xgb_model = sagemaker.estimator.Estimator(training_image,
                                         role, 
                                         train_instance_count=1, 
                                         train_instance_type='ml.m4.xlarge',
                                         train_volume_size = 20,
                                         train_max_run = 3600,
                                         input_mode= 'File',
                                         output_path=s3_output_location + 'xgboost_model',
                                         sagemaker_session=sess)

xgb_model.set_hyperparameters(objective = "binary:logistic",
                              eta = .2,
                              gamma = 4,
                              max_depth = 5,
                              num_round = 10,
                              subsample = 0.7,
                              silent = 0,
                              min_child_weight = 6)

train_data = sagemaker.session.s3_input(s3_output_location + 'data/train', distribution='FullyReplicated', 
                        content_type='text/csv', s3_data_type='S3Prefix')
validation_data = sagemaker.session.s3_input(s3_output_location + 'data/validation', distribution='FullyReplicated', 
                             content_type='text/csv', s3_data_type='S3Prefix')

data_channels = {'train': train_data, 'validation': validation_data}

In [None]:
xgb_model.fit(inputs=data_channels, logs=True)

# Building an Inference Pipeline consisting of SparkML & XGBoost models for a realtime inference endpoint

Next we will proceed with deploying the models in SageMaker to create an Inference Pipeline. You can create an Inference Pipeline with upto five containers.

Deploying a model in SageMaker requires two components:

* Docker image residing in ECR.
* Model artifacts residing in S3.

**SparkML**

For SparkML, Docker image for MLeap based SparkML serving is provided by SageMaker team. For more information on this, please see [SageMaker SparkML Serving](https://github.com/aws/sagemaker-sparkml-serving-container). MLeap serialized SparkML model was uploaded to S3 as part of the SparkML job we executed in AWS EMR.

**XGBoost**

For XGBoost, we will use the same Docker image we used for training. The model artifacts for XGBoost was uploaded as part of the training job we just ran.

### Passing the schema of the payload via environment variable
SparkML serving container needs to know the schema of the request that'll be passed to it while calling the `predict` method. In order to alleviate the pain of not having to pass the schema with every request, `sagemaker-sparkml-serving` allows you to pass it via an environment variable while creating the model definitions. This schema definition will be required in our next step for creating a model.

We will see later that you can overwrite this schema on a per request basis by passing it as part of the individual request payload as well.

In [None]:
import json
schema = {
    "input": [
        {
            "name": "text",
            "type": "string"
        } 
    ],
    "output": 
        {
            "name": "features",
            "type": "double",
            "struct": "vector"
        }
}
schema_json = json.dumps(schema)
print(schema_json)

### Creating a `PipelineModel` which comprises of the SparkML and XGBoost model in the right order

Next we'll create a SageMaker `PipelineModel` with SparkML and XGBoost.The `PipelineModel` will ensure that both the containers get deployed behind a single API endpoint in the correct order. The same model would later be used for Batch Transform as well to ensure that a single job is sufficient to do prediction against the Pipeline. 

Here, during the `Model` creation for SparkML, we will pass the schema definition that we built in the previous cell.

In [None]:
from sagemaker.model import Model
from sagemaker.pipeline import PipelineModel
from sagemaker.sparkml.model import SparkMLModel
from time import gmtime, strftime
import time

timestamp_prefix = strftime("%Y-%m-%d-%H-%M-%S", gmtime())

s3_model_bucket = s3_output_bucket
s3_model_key_prefix = 'emr/sentiment/xgboost/mleap'

sparkml_data = 's3://{}/{}/{}'.format(s3_model_bucket, s3_model_key_prefix, 'model.tar.gz')
# passing the schema defined above by using an environment variable that sagemaker-sparkml-serving understands
sparkml_model = SparkMLModel(model_data=sparkml_data, env={'SAGEMAKER_SPARKML_SCHEMA' : schema_json})
xgb_model = Model(model_data=xgb_model.model_data, image=training_image)

model_name = 'inference-pipeline-' + timestamp_prefix
sm_model = PipelineModel(name=model_name, role=role, models=[sparkml_model, xgb_model])

### Deploying the `PipelineModel` to an endpoint for realtime inference
Next we will deploy the model we just created with the `deploy()` method to start an inference endpoint and we will send some requests to the endpoint to verify that it works as expected.

In [None]:
endpoint_name = 'inference-pipeline-ep-' + timestamp_prefix
sm_model.deploy(initial_instance_count=1, instance_type='ml.c4.xlarge', endpoint_name=endpoint_name)

### Invoking the newly created inference endpoint with a payload to transform the data
Now we will invoke the endpoint with a valid payload that SageMaker SparkML Serving can recognize. There are three ways in which input payload can be passed to the request:

* Pass it as a valid CSV string. In this case, the schema passed via the environment variable will be used to determine the schema. For CSV format, every column in the input has to be a basic datatype (e.g. int, double, string) and it can not be a Spark `Array` or `Vector`.

* Pass it as a valid JSON string. In this case as well, the schema passed via the environment variable will be used to infer the schema. With JSON format, every column in the input can be a basic datatype or a Spark `Vector` or `Array` provided that the corresponding entry in the schema mentions the correct value.

* Pass the request in JSON format along with the schema and the data. In this case, the schema passed in the payload will take precedence over the one passed via the environment variable (if any).

#### Passing the payload in CSV format
We will first see how the payload can be passed to the endpoint in CSV format.

In [None]:
from sagemaker.predictor import json_serializer, csv_serializer, json_deserializer, RealTimePredictor
from sagemaker.content_types import CONTENT_TYPE_CSV, CONTENT_TYPE_JSON
payload = "I love it"
predictor = RealTimePredictor(endpoint=endpoint_name, sagemaker_session=sess, serializer=csv_serializer,
                                content_type=CONTENT_TYPE_CSV, accept=CONTENT_TYPE_CSV)
print(predictor.predict(payload))

#### Passing the payload in JSON format
We will now pass a different payload in JSON format.

In [None]:
payload = {"data": ["I don't like it"]}
predictor = RealTimePredictor(endpoint=endpoint_name, sagemaker_session=sess, serializer=json_serializer,
                                content_type=CONTENT_TYPE_JSON, accept=CONTENT_TYPE_CSV)

print(predictor.predict(payload))

#### [Optional] Passing the payload with both schema and the data
Next we will pass the input payload comprising of both the schema and the data. If you notice carefully, this schema will be slightly different than what we have passed via the environment variable. The locations of `length` and `sex` column have been swapped and so the data. The server now parses the payload with this schema and works properly.

In [None]:
payload = {
    "schema": {
        "input": [
        {
            "name": "text",
            "type": "string"
        } 
    ],
    "output": 
        {
            "name": "features",
            "type": "double",
            "struct": "vector"
        }
    },
    "data": ["I like it"]
}

predictor = RealTimePredictor(endpoint=endpoint_name, sagemaker_session=sess, serializer=json_serializer,
                                content_type=CONTENT_TYPE_JSON, accept=CONTENT_TYPE_CSV)

print(predictor.predict(payload))

### [Optional] Deleting the Endpoint
If you do not plan to use this endpoint, then it is a good practice to delete the endpoint so that you do not incur the cost of running it.

In [None]:
boto_session = sess.boto_session
sm_client = boto_session.client('sagemaker')
sm_client.delete_endpoint(EndpointName=endpoint_name)

# Building an Inference Pipeline consisting of SparkML & XGBoost models for a single Batch Transform job
SageMaker Batch Transform also supports chaining multiple containers together when deploying an Inference Pipeline and performing a single batch transform jobs to transform your data for a batch use-case similar to the real-time use-case we have seen above.

### Preparing data for Batch Transform
Batch Transform requires data in the same format described above, with one CSV or JSON being per line. For this Notebook, SageMaker team has created a sample input in CSV format which Batch Transform can process. The input is basically a similar CSV file to the training file with only difference is that it does not contain the label field.

I've created a file with a few lines in S3 just to test.

### Invoking the Transform API to create a Batch Transform job
Next we will create a Batch Transform job using the `Transformer` class from Python SDK to create a Batch Transform job.

In [None]:
input_data_path = 's3://{}/{}/{}'.format(s3_output_bucket, 'emr/sentiment/xgboost/batch', 'batch_input_sentiment.csv')
output_data_path = 's3://{}/{}/{}'.format(s3_output_bucket, 'emr/sentiment/xgboost/batch_output', timestamp_prefix)

In [None]:
job_name = 'serial-inference-batch-' + timestamp_prefix
transformer = sagemaker.transformer.Transformer(
    # This was the model created using PipelineModel and it contains feature processing and XGBoost
    model_name = model_name,
    instance_count = 1,
    instance_type = 'ml.m4.xlarge',
    strategy = 'SingleRecord',
    assemble_with = 'Line',
    output_path = output_data_path,
    base_transform_job_name='serial-inference-batch',
    sagemaker_session=sess,
    accept = CONTENT_TYPE_CSV
)
transformer.transform(data = input_data_path,
                      job_name = job_name,
                      content_type = CONTENT_TYPE_CSV, 
                      split_type = 'Line')
transformer.wait()

### Creating a `PipelineModel` which comprises of the SparkML and BlazingText model in the right order

Next we'll create a SageMaker `PipelineModel` with SparkML and BlazingText.The `PipelineModel` will ensure that both the containers get deployed behind a single API endpoint in the correct order. The same model would later be used for Batch Transform as well to ensure that a single job is sufficient to do prediction against the Pipeline. 

Here, during the `Model` creation for SparkML, we will pass the schema definition that we built in the previous cell.

In [132]:
from sagemaker.amazon.amazon_estimator import get_image_uri

training_image = get_image_uri(sess.boto_region_name, 'blazingtext', repo_version="latest")
print (training_image)

825641698319.dkr.ecr.us-east-2.amazonaws.com/blazingtext:latest


In [133]:
s3_output_bucket = 'sagemaker-us-east-2-446439287457'
s3_output_key_prefix = 'emr/sentiment/blazing'
s3_output_location = 's3://{}/{}/'.format(s3_output_bucket, s3_output_key_prefix)

bt_model = sagemaker.estimator.Estimator(training_image,
                                         role, 
                                         train_instance_count=1, 
                                         train_instance_type='ml.c4.xlarge',
                                         train_volume_size = 20,
                                         train_max_run = 3600,
                                         input_mode= 'File',
                                         output_path=s3_output_location,
                                         sagemaker_session=sess)

bt_model.set_hyperparameters(mode="supervised",
                            epochs=50,
                            min_count=100,
                            learning_rate=0.005,
                            vector_dim=5,
                            early_stopping=True,
                            patience=4,
                            min_epochs=5,
                            word_ngrams=3)


train_data = sagemaker.session.s3_input(s3_output_location + 'data/train/part-00000', distribution='FullyReplicated', 
                        content_type='text/csv', s3_data_type='S3Prefix')
validation_data = sagemaker.session.s3_input(s3_output_location + 'data/validation/part-00000', distribution='FullyReplicated', 
                             content_type='text/csv', s3_data_type='S3Prefix')

data_channels = {'train': train_data, 'validation': validation_data}

In [134]:
bt_model.fit(inputs=data_channels, logs=True)

2019-11-13 14:16:36 Starting - Starting the training job...
2019-11-13 14:17:05 Starting - Launching requested ML instances......
2019-11-13 14:18:00 Starting - Preparing the instances for training......
2019-11-13 14:18:50 Downloading - Downloading input data...
2019-11-13 14:19:29 Training - Training image download completed. Training in progress.....[31mArguments: train[0m
[31m[11/13/2019 14:19:30 INFO 140352715757376] nvidia-smi took: 0.025249004364 secs to identify 0 gpus[0m
[31m[11/13/2019 14:19:30 INFO 140352715757376] Running single machine CPU BlazingText training using supervised mode.[0m
[31m[11/13/2019 14:19:30 INFO 140352715757376] Processing /opt/ml/input/data/train/part-00000 . File size: 208 MB[0m
[31m[11/13/2019 14:19:30 INFO 140352715757376] Processing /opt/ml/input/data/validation/part-00000 . File size: 89 MB[0m
[31mRead 10M words[0m
[31mRead 20M words[0m
[31mRead 30M words[0m
[31mRead 40M words[0m
[31mRead 50M words[0m
[31mRead 60M words[0m
[

## create a hyperparameter tuning

#### A good way to improve the Hyperparamter tuning is to do it randomly with a script. See the example notebook hyperparameter_tuning_mxnet_gluon_cifar10_random_search for more details.

In [120]:
from sagemaker.tuner import IntegerParameter, ContinuousParameter, HyperparameterTuner

hyperparameter_ranges = {
                        'epochs': IntegerParameter(20, 60) ,
                        'min_count': IntegerParameter(2, 100),
                        'vector_dim': IntegerParameter(1, 100),
                        'word_ngrams': IntegerParameter(2, 3)
                        }
objective_metric_name = 'validation:accuracy'

In [121]:
# First, make sure to import the relevant objects used to construct the tuner
tuner = HyperparameterTuner(bt_model,
                            objective_metric_name,
                            hyperparameter_ranges,
                            max_jobs=60,
                            max_parallel_jobs=3)

In [122]:
tuner.fit({'train': train_data, 'validation': validation_data})

In [123]:
tuner.wait()

.................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................!


In [128]:
tuning_job_info = sess.sagemaker_client.describe_hyper_parameter_tuning_job(HyperParameterTuningJobName='blazingtext-191113-1321')

# We begin by asking SageMaker to describe for us the results of the best training job. The data
# structure returned contains a lot more information than we currently need, try checking it out
# yourself in more detail.
best_training_job_name = tuning_job_info['BestTrainingJob']['TrainingJobName']
training_job_info = sess.sagemaker_client.describe_training_job(TrainingJobName=best_training_job_name)

In [131]:
training_job_info

{u'AlgorithmSpecification': {u'MetricDefinitions': [{u'Name': u'train:accuracy',
    u'Regex': u'#train_accuracy: (\\S+)'},
   {u'Name': u'train:mean_rho', u'Regex': u'#mean_rho: (\\S+)'},
   {u'Name': u'validation:accuracy',
    u'Regex': u'#validation_accuracy: (\\S+)'}],
  u'TrainingImage': u'825641698319.dkr.ecr.us-east-2.amazonaws.com/blazingtext:latest',
  u'TrainingInputMode': u'File'},
 u'BillableTimeInSeconds': 226,
 u'CreationTime': datetime.datetime(2019, 11, 13, 13, 57, 21, 273000, tzinfo=tzlocal()),
 u'EnableInterContainerTrafficEncryption': False,
 u'EnableManagedSpotTraining': False,
 u'EnableNetworkIsolation': False,
 u'FinalMetricDataList': [{u'MetricName': u'train:accuracy',
   u'Timestamp': datetime.datetime(1970, 1, 19, 5, 7, 33, 767000, tzinfo=tzlocal()),
   u'Value': 0.7631000280380249},
  {u'MetricName': u'validation:accuracy',
   u'Timestamp': datetime.datetime(1970, 1, 19, 5, 7, 33, 767000, tzinfo=tzlocal()),
   u'Value': 0.7613000273704529}],
 u'HyperParameter

## create end point

In [135]:
import json
schema = {
    "input": [
        {
            "name": "text",
            "type": "string"
        }
    ],
    "output": 
        {
            "name": "features",
            "type": "string",
            "struct": "array"
        }
}
schema_json = json.dumps(schema)
print(schema_json)

{"input": [{"type": "string", "name": "text"}], "output": {"type": "string", "name": "features", "struct": "array"}}


In [136]:
from sagemaker.model import Model
from sagemaker.pipeline import PipelineModel
from sagemaker.sparkml.model import SparkMLModel
from time import gmtime, strftime
import time

timestamp_prefix = strftime("%Y-%m-%d-%H-%M-%S", gmtime())

sparkml_data = 's3://{}/{}/{}'.format(s3_output_bucket, s3_output_key_prefix + '/model', 'model.tar.gz')
# passing the schema defined above by using an environment variable that sagemaker-sparkml-serving understands
sparkml_model = SparkMLModel(model_data=sparkml_data,
                             env={'SAGEMAKER_SPARKML_SCHEMA' : schema_json, 
                                  'SAGEMAKER_DEFAULT_INVOCATIONS_ACCEPT': "application/jsonlines;data=text"})
bt_model = Model(model_data=bt_model.model_data, image=training_image)

model_name = 'inference-pipeline-' + timestamp_prefix
sm_model = PipelineModel(name=model_name, role=role, models=[sparkml_model, bt_model])

In [137]:
endpoint_name = 'inference-pipeline-ep-' + timestamp_prefix
sm_model.deploy(initial_instance_count=1, instance_type='ml.c4.xlarge', endpoint_name=endpoint_name)

--------------------------------------------------------------------------------------!

In [138]:
from sagemaker.predictor import json_serializer, csv_serializer, json_deserializer, RealTimePredictor
from sagemaker.content_types import CONTENT_TYPE_CSV, CONTENT_TYPE_JSON
payload = "I don't love it."
predictor = RealTimePredictor(endpoint=endpoint_name, sagemaker_session=sess, serializer=csv_serializer,
                                content_type=CONTENT_TYPE_CSV, accept='application/jsonlines')
print(predictor.predict(payload))

{"label": ["__label__0"], "prob": [0.9649649262428284]}



In [143]:
payload = {"data": ["He likes potatoes"]}
predictor = RealTimePredictor(endpoint=endpoint_name, sagemaker_session=sess, serializer=json_serializer,
                                content_type=CONTENT_TYPE_JSON)

print(predictor.predict(payload))

{"label": ["__label__0"], "prob": [0.873008668422699]}



In [None]:
#delete endpoint
sm_client = boto_session.client('sagemaker')
sm_client.delete_endpoint(EndpointName=endpoint_name)

In [144]:
input_data_path = 's3://{}/{}/{}'.format(s3_output_bucket, 'emr/sentiment/xgboost/batch', 'batch_input_sentiment.csv')
output_data_path = 's3://{}/{}/{}'.format(s3_output_bucket, 'emr/sentiment/blazing/batch_output', timestamp_prefix)

In [145]:
transformer = sagemaker.transformer.Transformer(
    model_name = model_name,
    instance_count = 1,
    instance_type = 'ml.m4.xlarge',
    strategy = 'SingleRecord',
    assemble_with = 'Line',
    output_path = output_data_path,
    base_transform_job_name='serial-inference-batch',
    sagemaker_session=sess,
    accept = CONTENT_TYPE_CSV
)
transformer.transform(data = input_data_path, 
                        content_type = CONTENT_TYPE_CSV, 
                        split_type = 'Line')
transformer.wait()

.......................[31m  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \[0m
[32m  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \[0m
[31m( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 :: Spring Boot ::                  (v2.2)
[0m
[31m2019-11-13 14:35:07.681  INFO 8 --- [           main] com.amazonaws.sagemaker.App              : Starting App v2.2 on 785f6dad92c3 with PID 8 (/usr/local/lib/sparkml-serving-2.2.jar started by root in /sagemaker-sparkml-model-server)[0m
[31m2019-11-13 14:35:07.685  INFO 8 --- [           main] com.amazonaws.sagemaker.App              : No active profile set, falling back to default profiles: default[0m
[32m( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 :: Spring Boot ::                  (v2.2)
[0m
[32m2019-11-13 14:35:0

### inference pipeline didn't work very well with xgBoost. 

All results are the same for the Xgboost model and it is annoying we have to write the csv to to the inference pipeline. It would be much more pratical if the pipeline managed it as it manages after trainning.

Convert a sparse vector to a column in csv makes the operation trick since the file is huge. I tried to write only 10 000 results and I tested but that's not the best solution.

With Blazing it works at least but I don't trust at the results at all.