# Predict the age of Abalone (regression problem) with Amazon SageMaker, Spark Pipeline, and AWS Glue 

This notebook shows how to build a prediction model to determine age of an Abalone (a kind of shellfish) from its physical measurements, using Feature processing with Spark, training with XGBoost and deploying as Inference Pipeline. In this notebook, we use Amazon Glue to run serverless Spark. Though the notebook demonstrates the end-to-end flow on a small dataset, the setup can be seamlessly used to scale to larger datasets.

We'll use SparkML to process the dataset (apply one or many feature transformers) and upload the transformed dataset to S3 so that it can be used for training with XGBoost.

##### The Notebook consists of a few high-level steps:

- Using AWS Glue for executing the SparkML feature processing job.
- Using SageMaker XGBoost to train on the processed dataset produced by SparkML job.
- Building an Inference Pipeline consisting of SparkML & XGBoost models for a realtime inference endpoint.
- Building an Inference Pipeline consisting of SparkML & XGBoost models for a single Batch Transform job.

##### Using AWS Glue for executing the SparkML job
We'll be running the SparkML job using AWS Glue. AWS Glue is a serverless ETL service which can be used to execute standard Spark/PySpark jobs. Glue currently only supports Python 2.7, hence we'll write the script in Python 2.7.

##### Important
##### Permission setup for invoking AWS Glue from this Notebook
In order to enable this Notebook to run AWS Glue jobs, we need to add one additional permission to the default IAM role of this notebook. We will be using SageMaker Python SDK to retrieve the default execution role and then you have to go to IAM Dashboard to edit the Role to add AWS Glue specific permission.

##### Adding AWS Glue as an additional trusted entity to this role
This step is needed if you want to pass the execution role of this Notebook while calling Glue APIs as well without creating an additional Role. If you have not used AWS Glue before, then this step is mandatory.

If you have used AWS Glue previously, then you should have an already existing role that can be used to invoke Glue APIs. In that case, you can pass that role while calling Glue (later in this notebook) and skip this next step.

On the IAM dashboard, please click on Roles on the left sidenav and search for this Role. Once the Role appears, click on the Role to go to its Summary page. Click on the Trust relationships tab on the Summary page to add AWS Glue as an additional trusted entity.

Click on Edit trust relationship and replace the JSON with below JSON.

In [1]:
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "Service": [
          "sagemaker.amazonaws.com",
          "glue.amazonaws.com"
        ]
      },
      "Action": "sts:AssumeRole"
    }
  ]
}

{'Version': '2012-10-17',
 'Statement': [{'Effect': 'Allow',
   'Principal': {'Service': ['sagemaker.amazonaws.com', 'glue.amazonaws.com']},
   'Action': 'sts:AssumeRole'}]}

In [16]:
import numpy as np                                # For matrix operations and numerical processing
import pandas as pd                               # For munging tabular data
import matplotlib.pyplot as plt                   # For charts and visualizations
from IPython.display import Image                 # For displaying images in the notebook
from IPython.display import display               # For displaying outputs in the notebook
from time import gmtime, strftime                 # For labeling SageMaker models, endpoints, etc.
import time
import sys                                        # For writing outputs to notebook
import math                                       # For ceiling function
import json                                       # For parsing hosting outputs
import os                                         # For manipulating filepath names
import boto3
import botocore
from botocore.exceptions import ClientError

import csv
import io
import re
import s3fs


import sagemaker                                 
from sagemaker.predictor import csv_serializer 
from sagemaker.predictor import json_deserializer
import sagemaker.amazon.common as smac
from sagemaker import get_execution_role

### 1. Preparation (Specifying Sagemaker roles)

In [3]:
sess = sagemaker.Session()
boto_session = sess.boto_session
s3 = boto_session.resource('s3')
account = boto_session.client('sts').get_caller_identity()['Account']
region = boto3.Session().region_name
default_bucket = 'aws-glue-{}-{}'.format(account, region)                     
role = 'arn:aws:iam::570447867175:role/SageMakerNotebookRole' # pass your IAM role name

print('Sagemaker session :', sess)
print('BoTo3 session :', boto_session)
print('Account:', account)
print('S3 bucket :', default_bucket)
print('Region selected :', region)
print('IAM role :', role)

Sagemaker session : <sagemaker.session.Session object at 0x000001E8EA71C088>
BoTo3 session : Session(region_name='us-west-2')
Account: 570447867175
S3 bucket : aws-glue-570447867175-us-west-2
Region selected : us-west-2
IAM role : arn:aws:iam::570447867175:role/SageMakerNotebookRole


### 2. Load Data

Dataset can be directly donwloaded from UCI Machine Learning Repo [Link](https://archive.ics.uci.edu/ml/datasets/abalone) 

We will download the data from above link that bucket and upload to your bucket so that AWS Glue can access the data. The default AWS Glue permissions we just added expects the data to be present in a bucket with the string aws-glue. Hence, after we download the dataset, we will create an S3 bucket in your account with a valid name and then upload the data to S3.

Or you can uncomment below to wget download from SageMaker Team's bucket 

In [4]:
# !wget https://s3-us-west-2.amazonaws.com/sparkml-mleap/data/abalone/abalone.csv

##### Creating an S3 bucket and uploading the dataset

In [6]:
try:
    if region == 'us-east-1':
        s3.create_bucket(Bucket=default_bucket)
    else:
        s3.create_bucket(Bucket=default_bucket, CreateBucketConfiguration={'LocationConstraint': region})
except ClientError as e:
    error_code = e.response['Error']['Code']
    message = e.response['Error']['Message']
    if error_code == 'BucketAlreadyOwnedByYou':
        print ('A bucket with the same name already exists in your account - using the same bucket.')
        pass        

# Uploading the training data to S3
sess.upload_data(path='./data/abalone.csv', bucket=default_bucket, key_prefix='input/abalone')

A bucket with the same name already exists in your account - using the same bucket.


's3://aws-glue-570447867175-us-west-2/input/abalone/abalone.csv'

### 3. Write the feature processing script using SparkML

The code for feature transformation using SparkML can be found in abalone_processing.py file written in the same directory. You can go through the code itself to see how it is using standard SparkML constructs to define the Pipeline for featurizing the data.

Once the Spark ML Pipeline fit and transform is done, we are splitting our dataset into 80-20 train & validation as part of the script and uploading to S3 so that it can be used with XGBoost for training.

##### [1] Serializing the trained Spark ML Model with MLeap
Apache Spark is best suited batch processing workloads. In order to use the Spark ML model we trained for low latency inference, we need to use the MLeap library to serialize it to an MLeap bundle and later use the SageMaker SparkML Serving to perform realtime and batch inference.

By using the SerializeToBundle() method from MLeap in the script, we are serializing the ML Pipeline into an MLeap bundle and uploading to S3 in tar.gz format as SageMaker expects.

##### [2] Uploading the code and other dependencies to S3 for AWS Glue
Unlike SageMaker, in order to run your code in AWS Glue, we do not need to prepare a Docker image. We can upload the code and dependencies directly to S3 and pass those locations while invoking the Glue job.

If your code has multiple files, you need to zip those files and upload to S3 instead of uploading a single file like it's being done here.

In [7]:
# Upload the 'abalone_processing.py' script to S3 now so that Glue can use it to run the PySpark job
script_location = sess.upload_data(path='abalone_processing.py', bucket=default_bucket, key_prefix='codes')

In [8]:
print(script_location)

s3://aws-glue-570447867175-us-west-2/codes/abalone_processing.py


##### [3] Upload MLeap dependencies to S3
For our job, we will also have to pass MLeap dependencies to Glue. MLeap is an additional library we are using which does not come bundled with default Spark.

Similar to most of the packages in the Spark ecosystem, MLeap is also implemented as a Scala package with a front-end wrapper written in Python so that it can be used from PySpark. We need to make sure that the MLeap Python library as well as the JAR is available within the Glue job environment. In the following cell, we will download the MLeap Python dependency & JAR from a SageMaker hosted bucket and upload to the S3 bucket we created above in your account.

If you are using some other Python libraries like nltk in your code, you need to download the wheel file from PyPI and upload to S3 in the same way. At this point, Glue only supports passing pure Python libraries in this way (e.g. you can not pass Pandas or OpenCV). However you can use NumPy & SciPy without having to pass these as packages because these are pre-installed in the Glue environment.

In [9]:
# !wget https://s3-us-west-2.amazonaws.com/sparkml-mleap/0.9.6/python/python.zip
# !wget https://s3-us-west-2.amazonaws.com/sparkml-mleap/0.9.6/jar/mleap_spark_assembly.jar

In [10]:
python_dep_location = sess.upload_data(path='python.zip', bucket=default_bucket, key_prefix='dependencies/python')
jar_dep_location = sess.upload_data(path='mleap_spark_assembly.jar', bucket=default_bucket, key_prefix='dependencies/jar')

In [11]:
print('python Dependency Location :', python_dep_location)
print('Jar file Location', jar_dep_location)

python Dependency Location : s3://aws-glue-570447867175-us-west-2/dependencies/python/python.zip
Jar file Location s3://aws-glue-570447867175-us-west-2/dependencies/jar/mleap_spark_assembly.jar


In [12]:
timestamp_prefix = strftime("%Y-%m-%d-%H-%M-%S", gmtime())

# Input location of the data, We uploaded our train.csv file to input key previously
s3_input_bucket = default_bucket
s3_input_key_prefix = 'input/abalone'

# Output location of the data. The input data will be split, transformed, and 
# uploaded to output/train and output/validation
s3_output_bucket = default_bucket
s3_output_key_prefix = timestamp_prefix + '/abalone'

# the MLeap serialized SparkML model will be uploaded to output/mleap
s3_model_bucket = default_bucket
s3_model_key_prefix = s3_output_key_prefix + '/mleap'

##### [4] Calling Glue APIs
Next we'll be creating Glue client via Boto so that we can invoke the create_job API of Glue. create_job API will create a job definition which can be used to execute your jobs in Glue. The job definition created here is mutable. While creating the job, we are also passing the code location as well as the dependencies location to Glue.

In [13]:
glue_client = boto_session.client('glue')
job_name = 'sparkml-abalone-' + timestamp_prefix
response = glue_client.create_job(
    Name=job_name,
    Description='PySpark job to featurize the Abalone dataset',
    Role=role, # you can pass your existing AWS Glue role here if you have used Glue before
    ExecutionProperty={
        'MaxConcurrentRuns': 1
    },
    Command={
        'Name': 'glueetl',
        'ScriptLocation': script_location
    },
    DefaultArguments={
        '--job-language': 'python',
        '--extra-jars' : jar_dep_location,
        '--extra-py-files': python_dep_location
    },
    AllocatedCapacity=5,
    Timeout=60,
)
glue_job_name = response['Name']
print(glue_job_name)

sparkml-abalone-2020-07-11-06-43-37


The aforementioned job will be executed now by calling start_job_run API. This API creates an immutable run/execution corresponding to the job definition created above. We will require the job_run_id for the particular job execution to check for status. We'll pass the data and model locations as part of the job execution parameters.

In [14]:
job_run_id = glue_client.start_job_run(JobName=job_name,
                                       Arguments = {
                                        '--S3_INPUT_BUCKET': s3_input_bucket,
                                        '--S3_INPUT_KEY_PREFIX': s3_input_key_prefix,
                                        '--S3_OUTPUT_BUCKET': s3_output_bucket,
                                        '--S3_OUTPUT_KEY_PREFIX': s3_output_key_prefix,
                                        '--S3_MODEL_BUCKET': s3_model_bucket,
                                        '--S3_MODEL_KEY_PREFIX': s3_model_key_prefix
                                       })['JobRunId']
print(job_run_id)

jr_8a58b87fa1b30c5e00dcb725a826dad152d799c5855c816c89e4b19c5fe2a0eb


##### [5] Check Glue job status

Now we will check for the job status to see if it has succeeded, failed or stopped. Once the job is succeeded, we have the transformed data into S3 in CSV format which we can use with XGBoost for training. If the job fails, you can go to AWS Glue console, click on Jobs tab on the left, and from the page, click on this particular job and you will be able to find the CloudWatch logs (the link under Logs) link for these jobs which can help you to see what exactly went wrong in the job execution.

In [17]:
job_run_status = glue_client.get_job_run(JobName=job_name,RunId=job_run_id)['JobRun']['JobRunState']
while job_run_status not in ('FAILED', 'SUCCEEDED', 'STOPPED'):
    job_run_status = glue_client.get_job_run(JobName=job_name,RunId=job_run_id)['JobRun']['JobRunState']
    print (job_run_status)
    time.sleep(30)

RUNNING
RUNNING
RUNNING
RUNNING
SUCCEEDED


### 4. Start Training

In [18]:
from sagemaker.amazon.amazon_estimator import get_image_uri

training_image = get_image_uri(sess.boto_region_name, 'xgboost', repo_version="latest")
print (training_image)

'get_image_uri' method will be deprecated in favor of 'ImageURIProvider' class in SageMaker Python SDK v2.
	get_image_uri(region, 'xgboost', '1.0-1').


433757028032.dkr.ecr.us-west-2.amazonaws.com/xgboost:latest


In [22]:
s3_train_data = 's3://{}/{}/{}'.format(s3_output_bucket, s3_output_key_prefix, 'train')
s3_validation_data = 's3://{}/{}/{}'.format(s3_output_bucket, s3_output_key_prefix, 'validation')
s3_output_location = 's3://{}/{}/{}'.format(s3_output_bucket, s3_output_key_prefix, 'xgboost_model')

xgb_model = sagemaker.estimator.Estimator(training_image,
                                         role, 
                                         train_instance_count=1, 
                                         train_instance_type='ml.m4.xlarge',
                                         train_volume_size = 20,
                                         train_max_run = 1800,
                                         input_mode= 'File',
                                         output_path=s3_output_location,
                                         sagemaker_session=sess)



In [23]:
xgb_model.set_hyperparameters(objective = "reg:linear",
                              eta = 0.01,
                              gamma = 4,
                              max_depth = 10,
                              num_round = 10,
                              subsample = 0.7,
                              silent = 0,
                              min_child_weight = 6)

train_data = sagemaker.session.s3_input(s3_train_data, distribution='FullyReplicated', 
                        content_type='text/csv', s3_data_type='S3Prefix')
validation_data = sagemaker.session.s3_input(s3_validation_data, distribution='FullyReplicated', 
                             content_type='text/csv', s3_data_type='S3Prefix')

data_channels = {'train': train_data, 'validation': validation_data}



In [24]:
xgb_model.fit(inputs=data_channels, logs=True)

2020-07-11 06:48:04 Starting - Starting the training job...
2020-07-11 06:48:06 Starting - Launching requested ML instances......
2020-07-11 06:49:13 Starting - Preparing the instances for training...
2020-07-11 06:50:03 Downloading - Downloading input data...
2020-07-11 06:50:24 Training - Downloading the training image..[34mArguments: train[0m
[34m[2020-07-11:06:50:48:INFO] Running standalone xgboost training.[0m
[34m[2020-07-11:06:50:48:INFO] File size need to be processed in the node: 0.21mb. Available memory size in the node: 8491.06mb[0m
[34m[2020-07-11:06:50:48:INFO] Determined delimiter of CSV input is ','[0m
[34m[06:50:48] S3DistributionType set as FullyReplicated[0m
[34m[06:50:48] 3376x9 matrix with 30384 entries loaded from /opt/ml/input/data/train?format=csv&label_column=0&delimiter=,[0m
[34m[2020-07-11:06:50:48:INFO] Determined delimiter of CSV input is ','[0m
[34m[06:50:48] S3DistributionType set as FullyReplicated[0m
[34m[06:50:48] 801x9 matrix with 7209

### 5. Inference Pipeline in Spark

##### [1] Building an Inference Pipeline consisting of SparkML & XGBoost models for a realtime inference endpoint
Next we will proceed with deploying the models in SageMaker to create an Inference Pipeline. You can create an Inference Pipeline with upto five containers.

Deploying a model in SageMaker requires two components:

- Docker image residing in ECR.
- Model artifacts residing in S3.

###### SparkML

- For SparkML, Docker image for MLeap based SparkML serving is provided by SageMaker team. For more information on this, please see SageMaker SparkML Serving. MLeap serialized SparkML model was uploaded to S3 as part of the SparkML job we executed in AWS Glue.

###### XGBoost

- For XGBoost, we will use the same Docker image we used for training. The model artifacts for XGBoost was uploaded as part of the training job we just ran.

##### [2] Passing the schema of the payload via environment variable
SparkML serving container needs to know the schema of the request that'll be passed to it while calling the predict method. In order to alleviate the pain of not having to pass the schema with every request, sagemaker-sparkml-serving allows you to pass it via an environment variable while creating the model definitions. This schema definition will be required in our next step for creating a model.

We will see later that you can overwrite this schema on a per request basis by passing it as part of the individual request payload as well.

In [25]:
import json
schema = {
    "input": [
        {
            "name": "sex",
            "type": "string"
        }, 
        {
            "name": "length",
            "type": "double"
        }, 
        {
            "name": "diameter",
            "type": "double"
        }, 
        {
            "name": "height",
            "type": "double"
        }, 
        {
            "name": "whole_weight",
            "type": "double"
        }, 
        {
            "name": "shucked_weight",
            "type": "double"
        },
        {
            "name": "viscera_weight",
            "type": "double"
        }, 
        {
            "name": "shell_weight",
            "type": "double"
        }, 
    ],
    "output": 
        {
            "name": "features",
            "type": "double",
            "struct": "vector"
        }
}
schema_json = json.dumps(schema)
print(schema_json)

{"input": [{"name": "sex", "type": "string"}, {"name": "length", "type": "double"}, {"name": "diameter", "type": "double"}, {"name": "height", "type": "double"}, {"name": "whole_weight", "type": "double"}, {"name": "shucked_weight", "type": "double"}, {"name": "viscera_weight", "type": "double"}, {"name": "shell_weight", "type": "double"}], "output": {"name": "features", "type": "double", "struct": "vector"}}


##### [3] Creating a PipelineModel which comprises of the SparkML and XGBoost model in the right order
Next we'll create a SageMaker PipelineModel with SparkML and XGBoost.The PipelineModel will ensure that both the containers get deployed behind a single API endpoint in the correct order. The same model would later be used for Batch Transform as well to ensure that a single job is sufficient to do prediction against the Pipeline.

Here, during the Model creation for SparkML, we will pass the schema definition that we built in the previous cell.

In [26]:
from sagemaker.model import Model
from sagemaker.pipeline import PipelineModel
from sagemaker.sparkml.model import SparkMLModel

sparkml_data = 's3://{}/{}/{}'.format(s3_model_bucket, s3_model_key_prefix, 'model.tar.gz')
# passing the schema defined above by using an environment variable that sagemaker-sparkml-serving understands
sparkml_model = SparkMLModel(model_data=sparkml_data, env={'SAGEMAKER_SPARKML_SCHEMA' : schema_json})
xgb_model = Model(model_data=xgb_model.model_data, image=training_image)

model_name = 'inference-pipeline-' + timestamp_prefix
sm_model = PipelineModel(name=model_name, role=role, models=[sparkml_model, xgb_model])



##### [4] Deploying the PipelineModel to SageMaker Endpoint for realtime inference

In [27]:
endpoint_name = 'inference-pipeline-ep-' + timestamp_prefix
sm_model.deploy(initial_instance_count=1, instance_type='ml.p2.xlarge', endpoint_name=endpoint_name)

---------------!

##### [5] Invoking the newly created inference endpoint with a payload to transform the data

Now we will invoke the endpoint with a valid payload that SageMaker SparkML Serving can recognize. There are three ways in which input payload can be passed to the request:

- Pass it as a valid CSV string. In this case, the schema passed via the environment variable will be used to determine the schema. For CSV format, every column in the input has to be a basic datatype (e.g. int, double, string) and it can not be a Spark Array or Vector.

- Pass it as a valid JSON string. In this case as well, the schema passed via the environment variable will be used to infer the schema. With JSON format, every column in the input can be a basic datatype or a Spark Vector or Array provided that the corresponding entry in the schema mentions the correct value.

- Pass the request in JSON format along with the schema and the data. In this case, the schema passed in the payload will take precedence over the one passed via the environment variable (if any).

In [28]:
# Passing a CSV string one
from sagemaker.predictor import json_serializer, csv_serializer, json_deserializer, RealTimePredictor
from sagemaker.content_types import CONTENT_TYPE_CSV, CONTENT_TYPE_JSON
payload = "F,0.515,0.425,0.14,0.766,0.304,0.1725,0.255"
predictor = RealTimePredictor(endpoint=endpoint_name, sagemaker_session=sess, serializer=csv_serializer,
                                content_type=CONTENT_TYPE_CSV, accept=CONTENT_TYPE_CSV)
print(predictor.predict(payload))

b'1.60151803493'


In [29]:
# Passing a JSON string one
payload = {"data": ["F",0.515,0.425,0.14,0.766,0.304,0.1725,0.255]}
predictor = RealTimePredictor(endpoint=endpoint_name, sagemaker_session=sess, serializer=json_serializer,
                                content_type=CONTENT_TYPE_JSON, accept=CONTENT_TYPE_CSV)

print(predictor.predict(payload))

b'1.60151803493'


### 6. Close the SageMaker Instance

In [30]:
sm_client = boto_session.client('sagemaker')
sm_client.delete_endpoint(EndpointName=endpoint_name)

{'ResponseMetadata': {'RequestId': '1ad19489-4740-4023-ab81-d0b77f4a4fc7',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '1ad19489-4740-4023-ab81-d0b77f4a4fc7',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '0',
   'date': 'Sat, 11 Jul 2020 06:58:59 GMT'},
  'RetryAttempts': 0}}

### 7. (optional) Building an Inference Pipeline for a single Batch Transform job

SageMaker Batch Transform also supports chaining **multiple containers** together when deploying an Inference Pipeline and performing a single batch transform jobs to **transform your data for a batch use-case** similar to the real-time use-case we have seen above.

###### Preparing data for Batch Transform
Batch Transform requires data in the same format described above, with one CSV or JSON being per line. For this Notebook, SageMaker team has created a sample input in CSV format which Batch Transform can process. The input is basically a similar CSV file to the training file with only difference is that it does not contain the label (rings) field.

Next we will download a sample of this data from one of the SageMaker buckets (named batch_input_abalone.csv) and upload to your S3 bucket. We will also inspect first five rows of the data post downloading.

In [None]:
# !wget https://s3-us-west-2.amazonaws.com/sparkml-mleap/data/batch_input_abalone.csv
# !printf "\n\nShowing first five lines\n\n"    
# !head -n 5 batch_input_abalone.csv 
# !printf "\n\nAs we can see, it is identical to the training file apart from the label being absent here.\n\n"

In [None]:
batch_input_loc = sess.upload_data(path='batch_input_abalone.csv', bucket=default_bucket, key_prefix='batch')

In [None]:
# Invoking the Transform API to create a Batch Transform job

input_data_path = 's3://{}/{}/{}'.format(default_bucket, 'batch', 'batch_input_abalone.csv')
output_data_path = 's3://{}/{}/{}'.format(default_bucket, 'batch_output/abalone', timestamp_prefix)
job_name = 'serial-inference-batch-' + timestamp_prefix
transformer = sagemaker.transformer.Transformer(
    # This was the model created using PipelineModel and it contains feature processing and XGBoost
    model_name = model_name,
    instance_count = 1,
    instance_type = 'ml.m5.xlarge',
    strategy = 'SingleRecord',
    assemble_with = 'Line',
    output_path = output_data_path,
    base_transform_job_name='serial-inference-batch',
    sagemaker_session=sess,
    accept = CONTENT_TYPE_CSV
)
transformer.transform(data = input_data_path,
                      job_name = job_name,
                      content_type = CONTENT_TYPE_CSV, 
                      split_type = 'Line')
transformer.wait()