This Tutorial is a modified version of a SageMaker Tutorial. This was modified to show how to integrate Arize into SageMaker.

# Tutorial Overview
This tutorial goes through the following steps:
1. Create a realtime ETL pipeline with SparkML loaded as a SageMaker Model
2. Serialize the model with MLeap
3. Create an XGBoost Model
4. Deploy both the ETL Model and XGBoost into a pipeline model
5. Deploy and Test to an Endpoint
6. Create a Lambda Function that integrates ARIZE AI and invokes Sagemaker
7. Test the Lambda function
8. Create a Gateway to internet that triggers the Lambda function
9. Call the prediction service

# Feature processing with Spark, training with XGBoost and deploying as Inference Pipeline

Typically a Machine Learning (ML) process consists of few steps: gathering data with various ETL jobs, pre-processing the data, featurizing the dataset by incorporating standard techniques or prior knowledge, and finally training an ML model using an algorithm.

In many cases, when the trained model is used for processing real time or batch prediction requests, the model receives data in a format which needs to pre-processed (e.g. featurized) before it can be passed to the algorithm. In the following notebook, we will demonstrate how you can build your ML Pipeline leveraging Spark Feature Transformers and SageMaker XGBoost algorithm & after the model is trained, deploy the Pipeline (Feature Transformer and XGBoost) as an Inference Pipeline behind a single Endpoint for real-time inference and for batch inferences using Amazon SageMaker Batch Transform.

In this notebook, we use Amazon Glue to run serverless Spark. Though the notebook demonstrates the end-to-end flow on a small dataset, the setup can be seamlessly used to scale to larger datasets.

## Objective: predict the age of an Abalone from its physical measurement

The dataset is available from [UCI Machine Learning](https://archive.ics.uci.edu/ml/datasets/abalone). The aim for this task is to determine age of an Abalone (a kind of shellfish) from its physical measurements. At the core, it's a regression problem. The dataset contains several features - `sex` (categorical), `length` (continuous), `diameter` (continuous), `height` (continuous), `whole_weight` (continuous), `shucked_weight` (continuous), `viscera_weight` (continuous), `shell_weight` (continuous) and `rings` (integer).Our goal is to predict the variable `rings` which is a good approximation for age (age is `rings` + 1.5).

We'll use SparkML to process the dataset (apply one or many feature transformers) and upload the transformed dataset to S3 so that it can be used for training with XGBoost.

## Methodologies
The Notebook consists of a few high-level steps:

* Using SageMaker XGBoost to train on the processed dataset produced by SparkML job.
* Building an Inference Pipeline consisting of SparkML & XGBoost models for a realtime inference endpoint.
* Building an Inference Pipeline consisting of SparkML & XGBoost models for a single Batch Transform job.
* Integrating the Model with Arize

#  AWS Glue

* The original tutorial used AWS Glue. This tutorial was modified so the entire tutorial runs in this Notebook. No need to use AWS Glue.

## Architecture
This architecture shows the setup in this notebook and accompanying Lambda function. This notebook walks through the setup of a Realtime endpoint with Arize. It also walks through the setup of a Spark based feature transform model in a SageMaker inference pipeline.

We do recommend integrating Arize in front of the feature transformation so data captured is more humanly readable than the post one-hot encoding data.

<img src="https://storage.googleapis.com/arize-assets/tutorials/sagemaker/pipeline_plus_lambda/spark_arch.png">

### Finding out the current execution role of the Notebook
We are using SageMaker Python SDK to retrieve the current role for this Notebook which needs to be enhanced.

In [615]:
import sagemaker
import os
from sagemaker import get_execution_role
sess = sagemaker.Session()
role = get_execution_role()
print(role[role.rfind('/') + 1:])

AmazonSageMaker-ExecutionRole-20200409T140444


In [616]:
!pip install pyspark==2.2.3

You should consider upgrading via the '/home/ec2-user/anaconda3/envs/python3/bin/python -m pip install --upgrade pip' command.[0m


### Creating an S3 bucket and uploading this dataset
Next we will create an S3 bucket with the `aws-glue` string in the name and upload this data to the S3 bucket. In case you want to use some existing bucket to run your Spark job via AWS Glue, you can use that bucket to upload your data provided the `Role` has access permission to upload and download from that bucket.

Once the bucket is created, the following cell would also update the `abalone.csv` file downloaded locally to this bucket under the `input/abalone` prefix.

In [617]:
import boto3
import botocore
from botocore.exceptions import ClientError

boto_session = sess.boto_session
s3 = boto_session.resource('s3')
account = boto_session.client('sts').get_caller_identity()['Account']
region = boto_session.region_name
default_bucket = 'aws-glue-{}-{}'.format(account, region)

try:
    if region == 'us-east-1':
        s3.create_bucket(Bucket=default_bucket)
    else:
        s3.create_bucket(Bucket=default_bucket, CreateBucketConfiguration={'LocationConstraint': region})
except ClientError as e:
    error_code = e.response['Error']['Code']
    message = e.response['Error']['Message']
    if error_code == 'BucketAlreadyOwnedByYou':
        print ('A bucket with the same name already exists in your account - using the same bucket.')
        pass

# Uploading the training data to S3
sess.upload_data(path='abalone.csv', bucket=default_bucket, key_prefix='input/abalone')

A bucket with the same name already exists in your account - using the same bucket.


's3://aws-glue-783112096916-us-east-2/input/abalone/abalone.csv'

In [618]:
default_bucket

'aws-glue-783112096916-us-east-2'

## Downloading dataset and uploading to S3
SageMaker team has downloaded the dataset from UCI and uploaded to one of the S3 buckets in our account. In this Notebook, we will download from that bucket and upload to your bucket so that AWS Glue can access the data. The default AWS Glue permissions we just added expects the data to be present in a bucket with the string `aws-glue`. Hence, after we download the dataset, we will create an S3 bucket in your account with a valid name and then upload the data to S3.

In [619]:
!wget https://s3-us-west-2.amazonaws.com/sparkml-mleap/data/abalone/abalone.csv

--2020-07-30 20:58:30--  https://s3-us-west-2.amazonaws.com/sparkml-mleap/data/abalone/abalone.csv
Resolving s3-us-west-2.amazonaws.com (s3-us-west-2.amazonaws.com)... 52.218.235.64
Connecting to s3-us-west-2.amazonaws.com (s3-us-west-2.amazonaws.com)|52.218.235.64|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 191873 (187K) [binary/octet-stream]
Saving to: ‘abalone.csv.39’


2020-07-30 20:58:31 (1.24 MB/s) - ‘abalone.csv.39’ saved [191873/191873]



## Writing the feature processing script using SparkML

The code for feature transformation using SparkML is found below it uses standard SparkML constructs to define the Pipeline for featurizing the data.

Once the Spark ML Pipeline `fit` and `transform` is done, we are splitting our dataset into 80-20 train & validation as part of the script and uploading to S3 so that it can be used with XGBoost for training.

### Upload MLeap dependencies to S3

For our job, we will also have to pass MLeap dependencies to Glue. MLeap is an additional library we are using which does not come bundled with default Spark.

Similar to most of the packages in the Spark ecosystem, MLeap is also implemented as a Scala package with a front-end wrapper written in Python so that it can be used from PySpark. We need to make sure that the MLeap Python library as well as the JAR is available within the Glue job environment. In the following cell, we will download the MLeap Python dependency & JAR from a SageMaker hosted bucket and upload to the S3 bucket we created above in your account.

If you are using some other Python libraries like `nltk` in your code, you need to download the wheel file from PyPI and upload to S3 in the same way. At this point, Glue only supports passing pure Python libraries in this way (e.g. you can not pass `Pandas` or `OpenCV`). However you can use `NumPy` & `SciPy` without having to pass these as packages because these are pre-installed in the Glue environment.

In [620]:
!wget https://s3-us-west-2.amazonaws.com/sparkml-mleap/0.9.6/python/python.zip
!wget https://s3-us-west-2.amazonaws.com/sparkml-mleap/0.9.6/jar/mleap_spark_assembly.jar

--2020-07-30 20:58:31--  https://s3-us-west-2.amazonaws.com/sparkml-mleap/0.9.6/python/python.zip
Resolving s3-us-west-2.amazonaws.com (s3-us-west-2.amazonaws.com)... 52.218.235.64
Connecting to s3-us-west-2.amazonaws.com (s3-us-west-2.amazonaws.com)|52.218.235.64|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 36872 (36K) [application/zip]
Saving to: ‘python.zip.18’


2020-07-30 20:58:31 (742 KB/s) - ‘python.zip.18’ saved [36872/36872]

--2020-07-30 20:58:31--  https://s3-us-west-2.amazonaws.com/sparkml-mleap/0.9.6/jar/mleap_spark_assembly.jar
Resolving s3-us-west-2.amazonaws.com (s3-us-west-2.amazonaws.com)... 52.218.235.64
Connecting to s3-us-west-2.amazonaws.com (s3-us-west-2.amazonaws.com)|52.218.235.64|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 17319576 (17M) [application/java-archive]
Saving to: ‘mleap_spark_assembly.jar.18’


2020-07-30 20:58:33 (14.6 MB/s) - ‘mleap_spark_assembly.jar.18’ saved [17319576/17319576]



## Defining output locations for the data and model
Next we define the output location where the transformed dataset should be uploaded. We are also specifying a model location where the MLeap serialized model would be updated.

By designing our code in that way, we can re-use these variables as part of other SageMaker operations from this Notebook (details below).

In [621]:
python_dep_location = sess.upload_data(path='python.zip', bucket=default_bucket, key_prefix='dependencies/python')
jar_dep_location = sess.upload_data(path='mleap_spark_assembly.jar', bucket=default_bucket, key_prefix='dependencies/jar')

In [622]:
out_arg = '--jars ' + jar_dep_location + ' pyspark-shell'
#os.environ['PYSPARK_SUBMIT_ARGS'] = out_arg

In [623]:
from time import gmtime, strftime
import time

timestamp_prefix = strftime("%Y-%m-%d-%H-%M-%S", gmtime())

# Input location of the data, We uploaded our train.csv file to input key previously
s3_input_bucket = default_bucket
s3_input_key_prefix = 'input/abalone'

# Output location of the data. The input data will be split, transformed, and
# uploaded to output/train and output/validation
s3_output_bucket = default_bucket
s3_output_key_prefix = timestamp_prefix + '/abalone'

# the MLeap serialized SparkML model will be uploaded to output/mleap
s3_model_bucket = default_bucket
s3_model_key_prefix = timestamp_prefix + '_model' + '/mleap'
#THIS NEEDS TO GO IN LAMBDA FILE AS WELL
s3_schema_key_prefix = 'input_schema/abalone'

In [624]:
args = {}
args.update({'S3_INPUT_BUCKET' :s3_input_bucket,'S3_INPUT_KEY_PREFIX':s3_input_key_prefix,'S3_OUTPUT_BUCKET':s3_output_bucket,
                                         'S3_OUTPUT_KEY_PREFIX':s3_output_key_prefix,
                                         'S3_MODEL_BUCKET':s3_model_bucket,
                                         'S3_MODEL_KEY_PREFIX':s3_model_key_prefix})

In [625]:
from __future__ import print_function
from __future__ import unicode_literals

In [626]:
import time
import sys
import os
import shutil
import csv
import boto3

In [411]:
!pip3.6 install mleap

You should consider upgrading via the '/home/ec2-user/anaconda3/envs/python3/bin/python -m pip install --upgrade pip' command.[0m


## This is the SparkML pipeline portion of the script
We start a spark sesssion, build the pipeline, define the schema, process the training data.

Lastly a MLeap spark model gets created for a searlized call in SageMaker Pipeline.

In [412]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.sql.types import StructField, StructType, StringType, DoubleType
from pyspark.ml.feature import StringIndexer, VectorIndexer, OneHotEncoder, VectorAssembler
from pyspark.sql.functions import *
from mleap.pyspark.spark_support import SimpleSparkSerializer

In [413]:
def csv_line(data):
    r = ','.join(str(d) for d in data[1])
    return str(data[0]) + "," + r

In [414]:
spark = SparkSession.builder.config("spark.jars", "mleap_spark_assembly.jar").appName("PySparkAbalone").getOrCreate()

In [415]:
!wget https://s3-us-west-2.amazonaws.com/sparkml-mleap/data/abalone/abalone.csv

--2020-07-30 14:22:03--  https://s3-us-west-2.amazonaws.com/sparkml-mleap/data/abalone/abalone.csv
Resolving s3-us-west-2.amazonaws.com (s3-us-west-2.amazonaws.com)... 52.218.220.144
Connecting to s3-us-west-2.amazonaws.com (s3-us-west-2.amazonaws.com)|52.218.220.144|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 191873 (187K) [binary/octet-stream]
Saving to: ‘abalone.csv.38’


2020-07-30 14:22:03 (962 KB/s) - ‘abalone.csv.38’ saved [191873/191873]



In [416]:
spark.sparkContext._jsc.hadoopConfiguration().set("mapred.output.committer.class",
                                                      "org.apache.hadoop.mapred.FileOutputCommitter")

### Schema From File
Define schema of data from file



In [417]:
columns = ["sex", "length", "diameter", "height","whole_weight","shucked_weight","viscera_weight","shell_weight"]
pred_column = "rings"

In [418]:
schema = StructType([StructField(columns[0], StringType(), True),
                         StructField(columns[1], DoubleType(), True),
                         StructField(columns[2], DoubleType(), True),
                         StructField(columns[3], DoubleType(), True),
                         StructField(columns[4], DoubleType(), True),
                         StructField(columns[5], DoubleType(), True),
                         StructField(columns[6], DoubleType(), True),
                         StructField(columns[7], DoubleType(), True),
                         StructField(pred_column, DoubleType(), True)])

In [419]:
total_df = spark.read.csv('./abalone.csv', header=False, schema=schema)

### DataPipeLine Transforms
One Hot Encoder and String indexer are used to convert the Sex (Male/Female) column to a 1 hot feature


In [420]:
sex_indexer = StringIndexer(inputCol="sex", outputCol="indexed_sex")

In [421]:
sex_encoder = OneHotEncoder(inputCol="indexed_sex", outputCol="sex_vec")

In [422]:
assembler = VectorAssembler(inputCols=["sex_vec",
                                           "length",
                                           "diameter",
                                           "height",
                                           "whole_weight",
                                           "shucked_weight",
                                           "viscera_weight",
                                           "shell_weight"],
                                outputCol="features")

In [423]:
pipeline = Pipeline(stages=[sex_indexer, sex_encoder, assembler])

This creates the Data Transform by calling Fit. No Model is present yet / this is only data transforms

In [424]:
model = pipeline.fit(total_df)

In [425]:
transformed_total_df = model.transform(total_df)

In [426]:
total_df.show()

+---+------+--------+------+------------+--------------+--------------+------------+-----+
|sex|length|diameter|height|whole_weight|shucked_weight|viscera_weight|shell_weight|rings|
+---+------+--------+------+------------+--------------+--------------+------------+-----+
|  M| 0.455|   0.365| 0.095|       0.514|        0.2245|         0.101|        0.15| 15.0|
|  M|  0.35|   0.265|  0.09|      0.2255|        0.0995|        0.0485|        0.07|  7.0|
|  F|  0.53|    0.42| 0.135|       0.677|        0.2565|        0.1415|        0.21|  9.0|
|  M|  0.44|   0.365| 0.125|       0.516|        0.2155|         0.114|       0.155| 10.0|
|  I|  0.33|   0.255|  0.08|       0.205|        0.0895|        0.0395|       0.055|  7.0|
|  I| 0.425|     0.3| 0.095|      0.3515|         0.141|        0.0775|        0.12|  8.0|
|  F|  0.53|   0.415|  0.15|      0.7775|         0.237|        0.1415|        0.33| 20.0|
|  F| 0.545|   0.425| 0.125|       0.768|         0.294|        0.1495|        0.26| 16.0|

In [427]:
transformed_total_df.show()

+---+------+--------+------+------------+--------------+--------------+------------+-----+-----------+-------------+--------------------+
|sex|length|diameter|height|whole_weight|shucked_weight|viscera_weight|shell_weight|rings|indexed_sex|      sex_vec|            features|
+---+------+--------+------+------------+--------------+--------------+------------+-----+-----------+-------------+--------------------+
|  M| 0.455|   0.365| 0.095|       0.514|        0.2245|         0.101|        0.15| 15.0|        0.0|(2,[0],[1.0])|[1.0,0.0,0.455,0....|
|  M|  0.35|   0.265|  0.09|      0.2255|        0.0995|        0.0485|        0.07|  7.0|        0.0|(2,[0],[1.0])|[1.0,0.0,0.35,0.2...|
|  F|  0.53|    0.42| 0.135|       0.677|        0.2565|        0.1415|        0.21|  9.0|        2.0|    (2,[],[])|[0.0,0.0,0.53,0.4...|
|  M|  0.44|   0.365| 0.125|       0.516|        0.2155|         0.114|       0.155| 10.0|        0.0|(2,[0],[1.0])|[1.0,0.0,0.44,0.3...|
|  I|  0.33|   0.255|  0.08|      

In [428]:
(train_df, validation_df) = transformed_total_df.randomSplit([0.8, 0.2])

In [429]:
train_rdd = train_df.rdd.map(lambda x: (x.rings, x.features))

In [430]:
train_lines = train_rdd.map(csv_line)

The train and validation data sets are stored to files for Train and Validation of SageMaker pipeline.

In [431]:
train_lines.saveAsTextFile('./' + args['S3_OUTPUT_KEY_PREFIX'] + '/new_train')

In [432]:
import shutil
shutil.copy2('./' + args['S3_OUTPUT_KEY_PREFIX'] + '/new_train/part-00000', './new_train.csv')

'./new_train.csv'

In [433]:
validation_rdd = validation_df.rdd.map(lambda x: (x.rings, x.features))

In [434]:
validation_lines = validation_rdd.map(csv_line)

In [435]:
validation_lines.saveAsTextFile('./' + args['S3_OUTPUT_KEY_PREFIX'] + '/new_validate')

In [436]:
shutil.copy2('./' + args['S3_OUTPUT_KEY_PREFIX'] + '/new_validate/part-00000', './new_validate.csv')

'./new_validate.csv'

In [437]:
!rm -rf /home/ec2-user/SageMaker/modelnew.zip

### MLeap
This searlizes the model into a executable that is used by the SageMaker Pipeline. You can think of the datatransformation as now encapsulated in the serialized model. This model will be one part of the sagemaker pipeline.

In [438]:
SimpleSparkSerializer().serializeToBundle(model, "jar:file:/home/ec2-user/SageMaker/modelnew.zip", validation_df)

In [439]:
import zipfile
with zipfile.ZipFile("/home/ec2-user/SageMaker/modelnew.zip") as zf:
    zf.extractall("/home/ec2-user/SageMaker/modelnew")

import tarfile
with tarfile.open("/home/ec2-user/SageMaker/modelnew.tar.gz", "w:gz") as tar:
    tar.add("/home/ec2-user/SageMaker/modelnew/bundle.json", arcname='bundle.json')
    tar.add("/home/ec2-user/SageMaker/modelnew/root", arcname='root')

In [440]:
from sagemaker.model import Model
from sagemaker.pipeline import PipelineModel
from sagemaker.sparkml.model import SparkMLModel

In [441]:
import json
schema = {
    "input": [
        {
            "name": "sex",
            "type": "string"
        },
        {
            "name": "length",
            "type": "double"
        },
        {
            "name": "diameter",
            "type": "double"
        },
        {
            "name": "height",
            "type": "double"
        },
        {
            "name": "whole_weight",
            "type": "double"
        },
        {
            "name": "shucked_weight",
            "type": "double"
        },
        {
            "name": "viscera_weight",
            "type": "double"
        },
        {
            "name": "shell_weight",
            "type": "double"
        },
    ],
    "output":
        {
            "name": "features",
            "type": "double",
            "struct": "vector"
        }
}
schema_json = json.dumps(schema)

In [442]:
sparkml_data= sess.upload_data(path='modelnew.tar.gz', bucket=args['S3_MODEL_BUCKET'], key_prefix=args['S3_MODEL_KEY_PREFIX'])

In [443]:
sparkml_model = SparkMLModel(model_data=sparkml_data, env={'SAGEMAKER_SPARKML_SCHEMA' : schema_json})

Parameter image will be renamed to image_uri in SageMaker Python SDK v2.


In [444]:
sparkml_model.model_data

's3://aws-glue-783112096916-us-east-2/2020-07-30-14-22-00_model/mleap/modelnew.tar.gz'

## Using SageMaker XGBoost to train on the processed dataset produced by SparkML job

Now we will use SageMaker XGBoost algorithm to train on this dataset. We already know the S3 location
where the preprocessed training data was uploaded as part of the Glue job.

### We need to retrieve the XGBoost algorithm image
We will retrieve the XGBoost built-in algorithm image so that it can leveraged for the training job.

In [445]:
from sagemaker.amazon.amazon_estimator import get_image_uri

training_image = get_image_uri(sess.boto_region_name, 'xgboost', repo_version="latest")
print (training_image)

'get_image_uri' method will be deprecated in favor of 'ImageURIProvider' class in SageMaker Python SDK v2.
There is a more up to date SageMaker XGBoost image. To use the newer image, please set 'repo_version'='1.0-1'. For example:
	get_image_uri(region, 'xgboost', '1.0-1').


825641698319.dkr.ecr.us-east-2.amazonaws.com/xgboost:latest


In [446]:
os.path.join(args['S3_OUTPUT_BUCKET'])

'aws-glue-783112096916-us-east-2'

### Next XGBoost model parameters and dataset details will be set properly
We have parameterized this Notebook so that the same data location which was used in the PySpark  can now be passed to XGBoost Estimator as well.

In [447]:
s3_train_data = sess.upload_data(path='./new_train.csv', bucket=args['S3_OUTPUT_BUCKET'], key_prefix=args['S3_OUTPUT_KEY_PREFIX'])
s3_validation_data = sess.upload_data(path='./new_validate.csv', bucket=args['S3_OUTPUT_BUCKET'], key_prefix=args['S3_OUTPUT_KEY_PREFIX'])

In [448]:
s3_validation_data

's3://aws-glue-783112096916-us-east-2/2020-07-30-14-22-00/abalone/new_validate.csv'

In [449]:
#'./' + args['S3_OUTPUT_KEY_PREFIX'] + 'train'
##s3_train_data = sess.upload_data(path='./' + args['S3_OUTPUT_KEY_PREFIX'] + '/new_train', bucket=args['S3_OUTPUT_BUCKET'], key_prefix=args['S3_OUTPUT_KEY_PREFIX'])
#s3_validation_data = sess.upload_data(path='./' + args['S3_OUTPUT_KEY_PREFIX'] + '/new_validate', bucket=args['S3_OUTPUT_BUCKET'], key_prefix=args['S3_OUTPUT_KEY_PREFIX'])

In [450]:
s3_train_data

's3://aws-glue-783112096916-us-east-2/2020-07-30-14-22-00/abalone/new_train.csv'

In [451]:
s3_validation_data

's3://aws-glue-783112096916-us-east-2/2020-07-30-14-22-00/abalone/new_validate.csv'

In [452]:
s3_output_location = 's3://{}/{}/{}'.format(s3_output_bucket, s3_output_key_prefix, 'xgboost_model')

In [453]:
xgb_model_train = sagemaker.estimator.Estimator(training_image,
                                         role,
                                         train_instance_count=1,
                                         train_instance_type='ml.m4.xlarge',
                                         train_volume_size = 20,
                                         train_max_run = 3600,
                                         input_mode= 'File',
                                         output_path=s3_output_location,
                                         sagemaker_session=sess)

Parameter image_name will be renamed to image_uri in SageMaker Python SDK v2.


In [454]:
xgb_model_train.set_hyperparameters(objective = "reg:linear",
                              eta = .2,
                              gamma = 4,
                              max_depth = 5,
                              num_round = 10,
                              subsample = 0.7,
                              silent = 0,
                              min_child_weight = 6)

In [455]:
train_data = sagemaker.session.s3_input(s3_train_data, distribution='FullyReplicated',
                        content_type='text/csv', s3_data_type='S3Prefix')
validation_data = sagemaker.session.s3_input(s3_validation_data, distribution='FullyReplicated',
                             content_type='text/csv', s3_data_type='S3Prefix')

data_channels = {'train': train_data, 'validation': validation_data}

's3_input' class will be renamed to 'TrainingInput' in SageMaker Python SDK v2.
's3_input' class will be renamed to 'TrainingInput' in SageMaker Python SDK v2.


In [456]:
s3_validation_data

's3://aws-glue-783112096916-us-east-2/2020-07-30-14-22-00/abalone/new_validate.csv'

In [457]:
from urllib.parse import urlparse
def get_csv_output_from_s3(s3uri, file_name):
    parsed_url = urlparse(s3uri)
    bucket_name = parsed_url.netloc
    prefix = parsed_url.path[1:]
    s3 = boto3.resource('s3')
    obj = s3.Object(bucket_name, '{}/{}'.format(prefix, file_name))
    return obj.get()["Body"].read().decode('utf-8')

In [458]:
import pandas as pd
import io
str_file = get_csv_output_from_s3('s3://aws-glue-783112096916-us-east-2/2020-07-29-22-26-55/abalone', 'train.csv')
output_df = pd.read_csv(io.StringIO(str_file), sep=",")
output_df[20:30]

Unnamed: 0,0,54,3,999,0.1,1,0.2,0.3,0.4,0.5,...,1.8,0.39,0.40,0.41,1.9,0.42,0.43,0.44,1.10,0.45
20,1,35,1,6,1,0,0,0,0,0,...,0,0,0,0,0,1,0,0,0,1
21,0,38,1,999,0,1,0,1,0,0,...,0,0,0,0,0,0,1,0,1,0
22,0,48,2,999,0,1,0,0,0,0,...,0,0,0,1,0,0,0,0,1,0
23,0,53,2,999,1,1,0,0,0,0,...,0,0,0,0,0,0,1,1,0,0
24,0,37,7,999,0,1,0,0,1,0,...,0,0,0,0,0,1,0,0,1,0
25,0,27,2,999,0,1,0,0,0,0,...,0,0,0,1,0,0,0,0,1,0
26,0,43,1,999,1,1,0,0,0,0,...,0,0,1,0,0,0,0,1,0,0
27,0,58,3,999,0,1,0,0,1,0,...,0,0,0,0,0,1,0,0,1,0
28,0,34,4,999,0,1,1,0,0,0,...,0,0,0,0,1,0,0,0,1,0
29,0,28,1,999,0,1,0,1,0,0,...,0,0,0,0,0,0,1,0,1,0


### Finally XGBoost training will be performed.

In [459]:
xgb_model_train.fit(inputs=data_channels, logs=True)

2020-07-30 14:22:12 Starting - Starting the training job...
2020-07-30 14:22:14 Starting - Launching requested ML instances......
2020-07-30 14:23:18 Starting - Preparing the instances for training......
2020-07-30 14:24:35 Downloading - Downloading input data
2020-07-30 14:24:35 Training - Downloading the training image..[34mArguments: train[0m
[34m[2020-07-30:14:24:54:INFO] Running standalone xgboost training.[0m
[34m[2020-07-30:14:24:54:INFO] File size need to be processed in the node: 0.21mb. Available memory size in the node: 8494.55mb[0m
[34m[2020-07-30:14:24:54:INFO] Determined delimiter of CSV input is ','[0m
[34m[14:24:54] S3DistributionType set as FullyReplicated[0m
[34m[14:24:55] 3326x9 matrix with 29934 entries loaded from /opt/ml/input/data/train?format=csv&label_column=0&delimiter=,[0m
[34m[2020-07-30:14:24:55:INFO] Determined delimiter of CSV input is ','[0m
[34m[14:24:55] S3DistributionType set as FullyReplicated[0m
[34m[14:24:55] 851x9 matrix with 7659

# Building an Inference Pipeline consisting of SparkML & XGBoost models for a realtime inference endpoint

Next we will proceed with deploying the models in SageMaker to create an Inference Pipeline. You can create an Inference Pipeline with upto five containers.

Deploying a model in SageMaker requires two components:

* Docker image residing in ECR.
* Model artifacts residing in S3.

**SparkML**

For SparkML, Docker image for MLeap based SparkML serving is provided by SageMaker team. For more information on this, please see [SageMaker SparkML Serving](https://github.com/aws/sagemaker-sparkml-serving-container). MLeap serialized SparkML model was uploaded to S3 as part of the SparkML job we executed in AWS Glue.

**XGBoost**

For XGBoost, we will use the same Docker image we used for training. The model artifacts for XGBoost was uploaded as part of the training job we just ran.

### Passing the schema of the payload via environment variable
SparkML serving container needs to know the schema of the request that'll be passed to it while calling the `predict` method. In order to alleviate the pain of not having to pass the schema with every request, `sagemaker-sparkml-serving` allows you to pass it via an environment variable while creating the model definitions. This schema definition will be required in our next step for creating a model.

We will see later that you can overwrite this schema on a per request basis by passing it as part of the individual request payload as well.

In [460]:
import json
schema = {
    "input": [
        {
            "name": "sex",
            "type": "string"
        },
        {
            "name": "length",
            "type": "double"
        },
        {
            "name": "diameter",
            "type": "double"
        },
        {
            "name": "height",
            "type": "double"
        },
        {
            "name": "whole_weight",
            "type": "double"
        },
        {
            "name": "shucked_weight",
            "type": "double"
        },
        {
            "name": "viscera_weight",
            "type": "double"
        },
        {
            "name": "shell_weight",
            "type": "double"
        },
    ],
    "output":
        {
            "name": "features",
            "type": "double",
            "struct": "vector"
        }
}
schema_json = json.dumps(schema)
print(schema_json)

{"input": [{"name": "sex", "type": "string"}, {"name": "length", "type": "double"}, {"name": "diameter", "type": "double"}, {"name": "height", "type": "double"}, {"name": "whole_weight", "type": "double"}, {"name": "shucked_weight", "type": "double"}, {"name": "viscera_weight", "type": "double"}, {"name": "shell_weight", "type": "double"}], "output": {"name": "features", "type": "double", "struct": "vector"}}


In [503]:
with open('schema.txt', 'w') as outfile:
    json.dump(schema, outfile)

sess.upload_data(path='schema.txt', bucket=default_bucket, key_prefix=s3_schema_key_prefix)

's3://aws-glue-783112096916-us-east-2/input_schema/abalone/schema.txt'

In [514]:
json_data = {
  "body": {
    "data": {
      "sex": "F",
      "length": 0.515,
      "diameter" : 0.425,
      "height":0.14,
      "whole_weight":0.766,
      "shucked_weight":0.304,
      "viscera_weight":0.1725,
      "shell_weight":0.255
    }
  }
}

In [515]:
# Test the schema load function for the Lambda
import boto3
import json

s3 = boto3.resource('s3')

content_object = s3.Object(default_bucket, s3_schema_key_prefix + '/schema.txt')
file_content = content_object.get()['Body'].read().decode('utf-8')
json_content = json.loads(file_content)
query_list = []
for item in json_content['input']:
    query_list = query_list + [json_data['body']['data'][item['name']]]
payload= {"data":query_list}

In [516]:
print(payload)

{'data': ['F', 0.515, 0.425, 0.14, 0.766, 0.304, 0.1725, 0.255]}


### Creating a `PipelineModel` which comprises of the SparkML and XGBoost model in the right order

Next we'll create a SageMaker `PipelineModel` with SparkML and XGBoost.The `PipelineModel` will ensure that both the containers get deployed behind a single API endpoint in the correct order. The same model would later be used for Batch Transform as well to ensure that a single job is sufficient to do prediction against the Pipeline.

Here, during the `Model` creation for SparkML, we will pass the schema definition that we built in the previous cell.

In [461]:
xgb_model = Model(model_data=xgb_model_train.model_data, image=training_image,role=role)

Parameter image will be renamed to image_uri in SageMaker Python SDK v2.


In [462]:
#Commment out example - Loading Previous Model
#endpoint_name = 'xgb-test-debug' + timestamp_prefix + '11'
#xgb_model.deploy(initial_instance_count=1, instance_type='ml.m4.xlarge', endpoint_name=endpoint_name)

In [463]:
model_name = 'inference-pipeline-' + timestamp_prefix

In [464]:
sm_model = PipelineModel(name=model_name, role=role, models=[sparkml_model, xgb_model])

### Deploying the `PipelineModel` to an endpoint for realtime inference
Next we will deploy the model we just created with the `deploy()` method to start an inference endpoint and we will send some requests to the endpoint to verify that it works as expected.

In [568]:
#Need to delete the endpoint before recreating as we keep the same name / endpoint is referenced in Lambda
endpoint_name = 'inference-pipeline-lambda-realtime-1-0'
sm_model.deploy(initial_instance_count=1, instance_type='ml.m4.xlarge', endpoint_name=endpoint_name)

Using already existing model: inference-pipeline-2020-07-30-14-22-00


---------------!

### Invoking the newly created inference endpoint with a payload to transform the data
Now we will invoke the endpoint with a valid payload that SageMaker SparkML Serving can recognize. There are three ways in which input payload can be passed to the request:

* Pass it as a valid CSV string. In this case, the schema passed via the environment variable will be used to determine the schema. For CSV format, every column in the input has to be a basic datatype (e.g. int, double, string) and it can not be a Spark `Array` or `Vector`.

* Pass it as a valid JSON string. In this case as well, the schema passed via the environment variable will be used to infer the schema. With JSON format, every column in the input can be a basic datatype or a Spark `Vector` or `Array` provided that the corresponding entry in the schema mentions the correct value.

* Pass the request in JSON format along with the schema and the data. In this case, the schema passed in the payload will take precedence over the one passed via the environment variable (if any).

#### Passing the payload in CSV format
We will first see how the payload can be passed to the endpoint in CSV format.

In [518]:
endpoint_name

'inference-pipeline-ep-2020-07-30-14-22-0011'

In [519]:
from sagemaker.predictor import json_serializer, csv_serializer, json_deserializer, RealTimePredictor
from sagemaker.content_types import CONTENT_TYPE_CSV, CONTENT_TYPE_JSON

In [520]:
payload = {"data": ["F",0.515,0.425,0.14,0.766,0.304,0.1725,0.255]}

predictor = RealTimePredictor(endpoint=endpoint_name, sagemaker_session=sess, serializer=json_serializer,
                                deserializer=json_deserializer,
                                content_type='application/json', accept=CONTENT_TYPE_JSON)
print(predictor.predict(payload))
response_data = predictor.predict(payload)

10.5312843323


We have an example of a realtime prediction.

### Now Lets install Arize AI


In [468]:
!pip install arize

Collecting arize
  Downloading arize-0.0.20-py2.py3-none-any.whl (16 kB)
Collecting protobuf==3.11.3
  Downloading protobuf-3.11.3-cp36-cp36m-manylinux1_x86_64.whl (1.3 MB)
[K     |████████████████████████████████| 1.3 MB 4.3 MB/s eta 0:00:01
Collecting numpy==1.18.4
  Downloading numpy-1.18.4-cp36-cp36m-manylinux1_x86_64.whl (20.2 MB)
[K     |████████████████████████████████| 20.2 MB 17.3 MB/s eta 0:00:01
[?25hCollecting requests-futures==1.0.0
  Downloading requests-futures-1.0.0.tar.gz (10 kB)
Collecting googleapis-common-protos==1.51.0
  Downloading googleapis-common-protos-1.51.0.tar.gz (35 kB)
Building wheels for collected packages: requests-futures, googleapis-common-protos
  Building wheel for requests-futures (setup.py) ... [?25ldone
[?25h  Created wheel for requests-futures: filename=requests_futures-1.0.0-py3-none-any.whl size=7013 sha256=a453f3c685c7b5964110b99f60f41ecaa030c35d757f6ee61c4becbce915dfda
  Stored in directory: /home/ec2-user/.cache/pip/wheels/15/03/c1/78b

## Testing Arize in Jupyter Notebook Environment

This will show a very simple test of logging data to Arize in the Jupyter Notebook environment. The following section will show how to deploy arize to a lambda function that represents a production environment.

In [472]:
payload['data']

['F', 0.515, 0.425, 0.14, 0.766, 0.304, 0.1725, 0.255]

In [477]:
from arize.api import Client
from arize.utils.types import ModelTypes
import datetime
#### ARIZE CLIENT SETUP ####
#SPACE KEY - SUPPLIED BY ARIZE
space_key = 'SPACE KEY'
#API KEY - GENERATED IN ARIZE ACCOUNT OR SUPPLIED
api_key = 'API KEY'
model_name = 'pipeline_plus_lambda'
model_version_id_now = 'model_ver_1.0'
#Put Features together
labels = {}
for index, col in enumerate(columns):
    labels[col] = str(payload['data'][index])
#ARIZE Client
arize_client = Client(space_key=space_key, api_key=api_key)
#Prediction ID in this test is Random / IT SHOULD BE SOMETHING MATACHABLE FOR ACTUALS
prediction_id =  datetime.datetime.today().strftime('%m_%d_%Y__%H_%M_%S')
tfuture = arize_client.log_prediction(model_id=model_name, model_version=model_version_id_now, model_type=ModelTypes.NUMERIC,
            features=labels, prediction_ids=prediction_id,
            prediction_labels=response_data)

The above code shows a log in a sample environment from a Jupyter notebook.

In [None]:
res = tfuture.result()
if res.status_code == 200:
    print('Success sending Prediction!')
else:
    print(f'Log failed with response code {res.status_code}, {res.text}')

## Testing Arize in A Production Environment

In order to test arize in a produciton environment we are going to create a Lambda function that will call the Endpoint through an API. The lambda_function.py implements the Lambda. Open the local file and replace the API_KEY_ARIZE in the Lambda_function.py


The Lambda_function.py file needs to be uploaded to S3. Since the file calls the Arize library the Arize library needs to be included with the Lambda_function.py in a Zip. This ZIP will be referenced by Lambda as the file to execute when called.

CONFIRM THAT YOU HAVE EDITED THE API_KEY_ARIZE KEY IN THE FILE lambda_function.py BEFORE UPLOADING

THe below commands will take the local file, create a zip and then upload the zip to S3

In [521]:
!rm -r lambda_pkg
!rm myArizeDeploymentPackage.zip

In [522]:
!mkdir lambda_pkg
!pip install arize -t ./lambda_pkg/

Collecting arize
  Using cached arize-0.0.20-py2.py3-none-any.whl (16 kB)
Processing /home/ec2-user/.cache/pip/wheels/35/8d/af/a922cb18800b31fadac3523cadf6c1efdf233b788fe7a4da70/googleapis_common_protos-1.51.0-py3-none-any.whl
Processing /home/ec2-user/.cache/pip/wheels/15/03/c1/78bc17e91a6f740565af018749431a5c35e62ee93a32824344/requests_futures-1.0.0-py3-none-any.whl
Collecting pandas==1.0.3
  Using cached pandas-1.0.3-cp36-cp36m-manylinux1_x86_64.whl (10.0 MB)
Collecting numpy==1.18.4
  Using cached numpy-1.18.4-cp36-cp36m-manylinux1_x86_64.whl (20.2 MB)
Collecting protobuf==3.11.3
  Using cached protobuf-3.11.3-cp36-cp36m-manylinux1_x86_64.whl (1.3 MB)
Collecting requests>=1.2.0
  Using cached requests-2.24.0-py2.py3-none-any.whl (61 kB)
Collecting pytz>=2017.2
  Using cached pytz-2020.1-py2.py3-none-any.whl (510 kB)
Collecting python-dateutil>=2.6.1
  Using cached python_dateutil-2.8.1-py2.py3-none-any.whl (227 kB)
Collecting six>=1.9
  Using cached six-1.15.0-py2.py3-none-any.whl 

# Run this when lambda_function.py changes ->

In [642]:
!rm myArizeDeploymentPackage.zip
!cp lambda_function.py ./lambda_pkg/lambda_function.py
%cd lambda_pkg
!zip -r ../myArizeDeploymentPackage.zip *
%cd ..

/home/ec2-user/SageMaker/lambda_pkg
  adding: arize/ (stored 0%)
  adding: arize/public_pb2.py (deflated 88%)
  adding: arize/api.py (deflated 78%)
  adding: arize/input_transformer.py (deflated 71%)
  adding: arize/protocol_pb2.py (deflated 91%)
  adding: arize/bounded_executor.py (deflated 60%)
  adding: arize/__init__.py (stored 0%)
  adding: arize/__pycache__/ (stored 0%)
  adding: arize/__pycache__/__init__.cpython-36.pyc (deflated 12%)
  adding: arize/__pycache__/validation_helper.cpython-36.pyc (deflated 57%)
  adding: arize/__pycache__/bounded_executor.cpython-36.pyc (deflated 43%)
  adding: arize/__pycache__/public_pb2.cpython-36.pyc (deflated 64%)
  adding: arize/__pycache__/api.cpython-36.pyc (deflated 55%)
  adding: arize/__pycache__/input_transformer.cpython-36.pyc (deflated 50%)
  adding: arize/__pycache__/protocol_pb2.cpython-36.pyc (deflated 66%)
  adding: arize/validation_helper.py (deflated 82%)
  adding: arize-0.0.20.dist-info/ (stored 0%)
  adding: arize-0.0.20.dist

In [643]:
batch_input_loc = sess.upload_data(path='myArizeDeploymentPackage.zip', bucket=default_bucket, key_prefix='lambda_dir_3')

In [644]:
print(batch_input_loc)

s3://aws-glue-783112096916-us-east-2/lambda_dir_3/myArizeDeploymentPackage.zip


The below can be inserted into the Lambda Test portion of the platform

In [None]:
{
  "body": {
    "data": {
      "sex": "F",
      "length": 0.515,
      "diameter" : 0.425,
      "height":0.14,
      "whole_weight":0.766,
      "shucked_weight":0.304,
      "viscera_weight":0.1725,
      "shell_weight":0.255
    }
  }
}

## Setting up A Lambda
This section goes through how to setup a Lambda function to handle the requests to the endpoint.

Click the create lambada function.

It does need to be setup to the same Python version you built the Gzip with.



In [523]:
#Print out Python Version of local server
import sys
sys.version

'3.6.10 |Anaconda, Inc.| (default, Mar 25 2020, 23:51:54) \n[GCC 7.3.0]'

<img src="https://storage.googleapis.com/arize-assets/tutorials/sagemaker/pipeline_plus_lambda/step1.png">

<img src="https://storage.googleapis.com/arize-assets/tutorials/sagemaker/pipeline_plus_lambda/step2.png?">

### Select author from scratch

<img src="https://storage.googleapis.com/arize-assets/tutorials/sagemaker/pipeline_plus_lambda/step3.png?">

### Code Entry Type is file from S3
Enter the URL of the s3 file below

In [524]:
print(batch_input_loc)

s3://aws-glue-783112096916-us-east-2/lambda_dir_3/myArizeDeploymentPackage.zip


### Then set the permissions
This lambda needs to be able to invoke SageMaker and needs permission setup to do that.

<img src="https://storage.googleapis.com/arize-assets/tutorials/sagemaker/pipeline_plus_lambda/step4.png">

### Click add inline policy to the right

<img src="https://storage.googleapis.com/arize-assets/tutorials/sagemaker/pipeline_plus_lambda/step5.png">

### Select Sage Maker as Service, Action Invoke Endpoint, Resources AllResources

Permissions will show up as below

<img src="https://storage.googleapis.com/arize-assets/tutorials/sagemaker/pipeline_plus_lambda/step6.png">

## After setting up the permissions you can test the Lambda
The below json code can be entered in the <test> button to test the Lambda

In [590]:
{
  "body": {
    "data": {
      "sex": "F",
      "length": 0.515,
      "diameter" : 0.425,
      "height":0.14,
      "whole_weight":0.766,
      "shucked_weight":0.304,
      "viscera_weight":0.1725,
      "shell_weight":0.255
    }
  }
}

{'body': {'data': {'sex': 'F',
   'length': 0.515,
   'diameter': 0.425,
   'height': 0.14,
   'whole_weight': 0.766,
   'shucked_weight': 0.304,
   'viscera_weight': 0.1725,
   'shell_weight': 0.255}}}

<img  src="https://storage.googleapis.com/arize-assets/tutorials/sagemaker/pipeline_plus_lambda/step7.png">

### Connect to Public Internet by setting up a Gateway
Click the API Gateway button in the Lambda function. Click the <+ Add Trigger>

<img  src="https://storage.googleapis.com/arize-assets/tutorials/sagemaker/pipeline_plus_lambda/step8.png">

<img src="https://storage.googleapis.com/arize-assets/tutorials/sagemaker/pipeline_plus_lambda/step9.png">

<img src="https://storage.googleapis.com/arize-assets/tutorials/sagemaker/pipeline_plus_lambda/step10.png">

##  Test the end point by Hitting the URL
Copy the url above to the 'YOUR URL ENDPOINT HERE' below



curl -H "Content-Type: application/json" --location --request POST 'YOUR URL ENDPOINT HERE' --data-raw '{"data": {"sex": "F","length": 0.515,"diameter": 0.425,"height": 0.14,"whole_weight": 0.766,"shucked_weight": 0.304,"viscera_weight": 0.1725, "shell_weight": 0.255}}'



### Overview
Arize is an end-to-end ML observability and model monitoring platform. The platform is designed to help ML engineers and data science practitioners surface and fix issues with ML models in production faster with:
- Automated ML monitoring and model monitoring
- Workflows to troubleshoot model performance
- Real-time visualizations for model performance monitoring, data quality monitoring, and drift monitoring
- Model prediction cohort analysis
- Pre-deployment model validation
- Integrated model explainability

### Website
Visit Us At: https://arize.com/model-monitoring/

### Additional Resources
- [What is ML observability?](https://arize.com/what-is-ml-observability/)
- [Playbook to model monitoring in production](https://arize.com/the-playbook-to-monitor-your-models-performance-in-production/)
- [Using statistical distance metrics for ML monitoring and observability](https://arize.com/using-statistical-distance-metrics-for-machine-learning-observability/)
- [ML infrastructure tools for data preparation](https://arize.com/ml-infrastructure-tools-for-data-preparation/)
- [ML infrastructure tools for model building](https://arize.com/ml-infrastructure-tools-for-model-building/)
- [ML infrastructure tools for production](https://arize.com/ml-infrastructure-tools-for-production-part-1/)
- [ML infrastructure tools for model deployment and model serving](https://arize.com/ml-infrastructure-tools-for-production-part-2-model-deployment-and-serving/)
- [ML infrastructure tools for ML monitoring and observability](https://arize.com/ml-infrastructure-tools-ml-observability/)

Visit the [Arize Blog](https://arize.com/blog) and [Resource Center](https://arize.com/resource-hub/) for more resources on ML observability and model monitoring.
