# Distributed Data Processing using Apache Spark and SageMaker Processing 

Apache Spark is a unified analytics engine for large-scale data processing. The Spark framework is often used within the context of machine learning workflows to run data transformation or feature engineering workloads at scale. Amazon SageMaker provides a set of prebuilt Docker images that include Apache Spark and other dependencies needed to run distributed data processing jobs on Amazon SageMaker. This example notebook demonstrates how to:

1) Create a Spark training container and push it to your <a href='https://docs.aws.amazon.com/AmazonECR/latest/userguide/what-is-ecr.html'>Amazon ECR</a>.

2) Use SageMaker's Spark managed XGBoost container to train a regression on the preprocessed data.

3) Use SageMaker SDK to build an "inference only"  Pipeline combining the assets from the preprocessing step and the model training. For serving the Spark MLlib's inference pipeline we are going to use Amazon SageMaker's Spark managed container. 

## Preparation

First we are going to do a basic usage of AmazonSageMaker's SDK to obtain the role with the necesary permissions and access the bucket associated with the service on this account. 


In [1]:
import sagemaker
import pandas as pd
from time import gmtime, strftime
import time
from datetime import datetime

print('Start', datetime.now().strftime("%H:%M:%S"))

sagemaker_session = sagemaker.Session()
role = sagemaker.get_execution_role()
bucket = sagemaker_session.default_bucket()


Start 19:34:55


In [2]:

# S3 prefixes
timestamp_prefix = strftime("%Y-%m-%d-%H-%M-%S", gmtime())
prefix = 'sagemaker/spark-preprocess-demo/' + timestamp_prefix
input_prefix = prefix + '/input/raw/abalone'
input_preprocessed_prefix = prefix + '/input/preprocessed/abalone'
model_prefix = prefix + '/model'
mleap_model_prefix = prefix + '/mleap-model'


Now we are going to download the raw data from a public location and upload it to the bucket on this account

In [3]:
# Fetch the dataset from the SageMaker bucket
!wget https://s3-us-west-2.amazonaws.com/sparkml-mleap/data/abalone/abalone.csv

# Upload the training data to S3
sagemaker_session.upload_data(path='abalone.csv', bucket=bucket, key_prefix=input_prefix)

--2020-10-18 19:35:11--  https://s3-us-west-2.amazonaws.com/sparkml-mleap/data/abalone/abalone.csv
Resolving s3-us-west-2.amazonaws.com (s3-us-west-2.amazonaws.com)... 52.218.213.104
Connecting to s3-us-west-2.amazonaws.com (s3-us-west-2.amazonaws.com)|52.218.213.104|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 191873 (187K) [binary/octet-stream]
Saving to: ‘abalone.csv’


2020-10-18 19:35:12 (965 KB/s) - ‘abalone.csv’ saved [191873/191873]



's3://sagemaker-us-east-1-452432741922/sagemaker/spark-preprocess-demo/2020-10-18-19-34-58/input/raw/abalone/abalone.csv'

### Data Exploration
Now, let's do a quick exploration of the data set. More info on the meaning of the columns can be found in the <a href='http://archive.ics.uci.edu/ml/datasets/Abalone'>UCI Machine Learning repository</a>. 

In [4]:
df = pd.read_csv('abalone.csv',header=None)
df.head()

Unnamed: 0,0,1,2,3,4,5,6,7,8
0,M,0.455,0.365,0.095,0.514,0.2245,0.101,0.15,15
1,M,0.35,0.265,0.09,0.2255,0.0995,0.0485,0.07,7
2,F,0.53,0.42,0.135,0.677,0.2565,0.1415,0.21,9
3,M,0.44,0.365,0.125,0.516,0.2155,0.114,0.155,10
4,I,0.33,0.255,0.08,0.205,0.0895,0.0395,0.055,7


In this case we have the gender and 7 different meassures of the body and we want to predict the age of the abalone. This is a regression problem and in the preprocessing steps we need to encode the gender as a set of dummy variables. 

## Building a Spark preprocessing container

Now we need to build a docker container that is able to spin up Spark to work distributedly. 
We are going to design this image following SageMaker's <b>script mode</b> design pattern.

In the container folder we have the Dockerfile, the program folder and the hadoop-config folder.

- hadoop-config folder: contains all the cluster's configuration details
- program folder: contains the script to execute the file or folder that has been passed in training time
- Dockerfile: the blueprint for building the Docker image

Let's take a look at the Dockerfile:

In [5]:
!pwd

/home/ec2-user/SageMaker/ai-ml-collection/week2/day1/sagemaker-processing


In [6]:
!cat container/Dockerfile

FROM openjdk:8-jre-slim

RUN apt-get update
RUN apt-get install -y curl unzip python3 python3-setuptools python3-pip python-dev python3-dev python-psutil
# RUN pip3 install py4j psutil==5.6.5 numpy==1.17.4 mleap==0.8.1 boto3
RUN pip3 install py4j psutil==5.6.5 mleap==0.8.1 boto3
RUN apt-get clean
RUN rm -rf /var/lib/apt/lists/*

# http://blog.stuart.axelbrooke.com/python-3-on-spark-return-of-the-pythonhashseed
ENV PYTHONHASHSEED 0
ENV PYTHONIOENCODING UTF-8
ENV PIP_DISABLE_PIP_VERSION_CHECK 1

# Install Hadoop
ENV HADOOP_VERSION 3.0.0
ENV HADOOP_HOME /usr/hadoop-$HADOOP_VERSION
ENV HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
ENV PATH $PATH:$HADOOP_HOME/bin
RUN curl -sL --retry 3 \
  "http://archive.apache.org/dist/hadoop/common/hadoop-$HADOOP_VERSION/hadoop-$HADOOP_VERSION.tar.gz" \
  | gunzip \
  | tar -x -C /usr/ \
 && rm -rf $HADOOP_HOME/share/doc \
 && chown -R root:root $HADOOP_HOME

# Install Spark
# ENV SPARK_VERSION 2.4.4
ENV SPARK_VERSION 2.2.0
ENV SPARK_PACKAGE spark-${SPARK_VERS

### Building the docker image
Now we can build this image locally and name it sagemaker-spark-example. This process can take a few minutes. 

Hadoop, yarn and Spark installations are going to produce a lot of logs that we can inspect here locally. 

In [7]:
%cd container
!docker build -t sagemaker-spark-example .
%cd ../

/home/ec2-user/SageMaker/ai-ml-collection/week2/day1/sagemaker-processing/container
Sending build context to Docker daemon  43.52kB
Step 1/32 : FROM openjdk:8-jre-slim
 ---> 37613fe7d6dc
Step 2/32 : RUN apt-get update
 ---> Using cache
 ---> add7096b4cb3
Step 3/32 : RUN apt-get install -y curl unzip python3 python3-setuptools python3-pip python-dev python3-dev python-psutil
 ---> Using cache
 ---> 09712957d04d
Step 4/32 : RUN pip3 install py4j psutil==5.6.5 mleap==0.8.1 boto3
 ---> Using cache
 ---> 0195181b09dc
Step 5/32 : RUN apt-get clean
 ---> Using cache
 ---> 4037049ddbcf
Step 6/32 : RUN rm -rf /var/lib/apt/lists/*
 ---> Using cache
 ---> 0678323f6522
Step 7/32 : ENV PYTHONHASHSEED 0
 ---> Using cache
 ---> a19816326aa5
Step 8/32 : ENV PYTHONIOENCODING UTF-8
 ---> Using cache
 ---> 399aba086a47
Step 9/32 : ENV PIP_DISABLE_PIP_VERSION_CHECK 1
 ---> Using cache
 ---> 1a573b991ede
Step 10/32 : ENV HADOOP_VERSION 3.0.0
 ---> Using cache
 ---> d86388e3d7fa
Step 11/32 : ENV HADOOP_HOME

### Pushing the resulting image to Amazon ECR

Now that we have the image built and named locally, we can push it to our own Amazon ECR. 

In [8]:
import boto3
account_id = boto3.client('sts').get_caller_identity().get('Account')
region = boto3.session.Session().region_name

ecr_repository = 'sagemaker-spark-example'
tag = ':latest'
uri_suffix = 'amazonaws.com'
if region in ['cn-north-1', 'cn-northwest-1']:
    uri_suffix = 'amazonaws.com.cn'
spark_repository_uri = '{}.dkr.ecr.{}.{}/{}'.format(account_id, region, uri_suffix, ecr_repository + tag)

# Create ECR repository and push docker image
!$(aws ecr get-login --region $region --registry-ids $account_id --no-include-email)
!aws ecr create-repository --repository-name $ecr_repository
!docker tag {ecr_repository + tag} $spark_repository_uri
!docker push $spark_repository_uri

https://docs.docker.com/engine/reference/commandline/login/#credentials-store

Login Succeeded

An error occurred (RepositoryAlreadyExistsException) when calling the CreateRepository operation: The repository with name 'sagemaker-spark-example' already exists in the registry with id '452432741922'
The push refers to repository [452432741922.dkr.ecr.us-east-1.amazonaws.com/sagemaker-spark-example]

[1B3e6d01f4: Preparing 
[1B20aef0f7: Preparing 
[1Bbda25d8e: Preparing 
[1Bd66d6b10: Preparing 
[1Bf127d572: Preparing 
[1Beaae4b00: Preparing 
[1Bbe9b71a9: Preparing 
[1B092c8b7a: Preparing 
[1Bef647d03: Preparing 
[1B044138ab: Preparing 
[6Beaae4b00: Waiting g 
[6Bbe9b71a9: Waiting g 
[1B0f1b745d: Preparing 
[1B97fa8b8c: Layer already exists [8A[2K[3A[2Klatest: digest: sha256:3d406f1a87b96d8ad039d0ac1ea1e6f0e7d21d40be66527e4a75ab327fabc10b size: 3262


In [9]:
# The resulting Spark repository URI is in this variable. 
spark_repository_uri

'452432741922.dkr.ecr.us-east-1.amazonaws.com/sagemaker-spark-example:latest'

## Preprocessing Script

Now that we have the right environment to run Spark, we can write the preprocessing script. This consists on a simple PySpark pipeline using MLlib.

As we've seen before, the only important transformation that we need to perform is the OneHotEncoding of the "sex" variable. 

In [10]:
%%writefile preprocess.py
from __future__ import print_function
from __future__ import unicode_literals

import time
import sys
import os
import shutil
import csv

import boto3
import pyspark
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.sql.types import StructField, StructType, StringType, DoubleType
from pyspark.ml.feature import StringIndexer, VectorIndexer, OneHotEncoder, VectorAssembler
from pyspark.sql.functions import *
from mleap.pyspark.spark_support import SimpleSparkSerializer

def csv_line(data):
    r = ','.join(str(d) for d in data[1])
    return str(data[0]) + "," + r


def main():
    spark = SparkSession.builder.appName("PySparkAbalone").getOrCreate()
    
    # Convert command line args into a map of args
    args_iter = iter(sys.argv[1:])
    args = dict(zip(args_iter, args_iter))
    
    # This is needed to save RDDs which is the only way to write nested Dataframes into CSV format
    spark.sparkContext._jsc.hadoopConfiguration().set("mapred.output.committer.class",
                                                      "org.apache.hadoop.mapred.FileOutputCommitter")
    
    # Define the schema corresponding to the input data. The input data does not contain the headers
    schema = StructType([StructField("sex", StringType(), True), 
                         StructField("length", DoubleType(), True),
                         StructField("diameter", DoubleType(), True),
                         StructField("height", DoubleType(), True),
                         StructField("whole_weight", DoubleType(), True),
                         StructField("shucked_weight", DoubleType(), True),
                         StructField("viscera_weight", DoubleType(), True), 
                         StructField("shell_weight", DoubleType(), True), 
                         StructField("rings", DoubleType(), True)])

    # Download the data from S3 into a Dataframe
    total_df = spark.read.csv(('s3a://' + os.path.join(args['s3_input_bucket'], args['s3_input_key_prefix'],
                                                   'abalone.csv')), header=False, schema=schema)

    #StringIndexer on the sex column which has categorical value
    sex_indexer = StringIndexer(inputCol="sex", outputCol="indexed_sex")
    
    #one-hot-encoding is being performed on the string-indexed sex column (indexed_sex)
    sex_encoder = OneHotEncoder(inputCol="indexed_sex", outputCol="sex_vec")

    #vector-assembler will bring all the features to a 1D vector for us to easily save into CSV format
    assembler = VectorAssembler(inputCols=["sex_vec", 
                                           "length", 
                                           "diameter", 
                                           "height", 
                                           "whole_weight", 
                                           "shucked_weight", 
                                           "viscera_weight", 
                                           "shell_weight"], 
                                outputCol="features")
    
    # The pipeline comprises of the steps added above
    pipeline = Pipeline(stages=[sex_indexer, sex_encoder, assembler])
    
    # This step trains the feature transformers
    model = pipeline.fit(total_df)
    
    # This step transforms the dataset with information obtained from the previous fit
    transformed_total_df = model.transform(total_df)
    
    # Split the overall dataset into 80-20 training and validation
    (train_df, validation_df) = transformed_total_df.randomSplit([0.8, 0.2])
    
    # Convert the train dataframe to RDD to save it in CSV format and upload it to S3
    train_rdd = train_df.rdd.map(lambda x: (x.rings, x.features))
    train_lines = train_rdd.map(csv_line)
    train_lines.saveAsTextFile('s3a://' + os.path.join(args['s3_output_bucket'], args['s3_output_key_prefix'], 'train'))
    
    # Convert the validation dataframe to RDD to save it in CSV format and upload it to S3
    validation_rdd = validation_df.rdd.map(lambda x: (x.rings, x.features))
    validation_lines = validation_rdd.map(csv_line)
    validation_lines.saveAsTextFile('s3a://' + os.path.join(args['s3_output_bucket'], args['s3_output_key_prefix'], 'validation'))
    
    # Serialize and store the model via MLeap  
    SimpleSparkSerializer().serializeToBundle(model, "jar:file:/opt/ml/model.zip", validation_df)    
    # Unzip the model as SageMaker expects a .tar.gz file but MLeap produces a .zip file
    import zipfile
    with zipfile.ZipFile("/opt/ml/model.zip") as zf:
        zf.extractall("/opt/ml/model")

    # Write back the content as a .tar.gz file
    import tarfile
    with tarfile.open("/opt/ml/model.tar.gz", "w:gz") as tar:
        tar.add("/opt/ml/model/bundle.json", arcname='bundle.json')
        tar.add("/opt/ml/model/root", arcname='root')
    
    # Upload the model in tar.gz format to S3 so that it can be used with SageMaker for inference later
    s3 = boto3.resource('s3') 
    file_name = os.path.join(args['s3_mleap_model_prefix'], 'model.tar.gz')
    s3.Bucket(args['s3_model_bucket']).upload_file('/opt/ml/model.tar.gz', file_name)    

if __name__ == "__main__":
    main()

Overwriting preprocess.py


Note that the serializeToBundle method from MLLib creates a zip file with the pipeline serialized as a json file. 
SageMaker's interface always use tar gz compression, so we need to make this change in order to be able to use SageMaker's Spark managed container for serving. 

### Creating an Amazon SageMaker Preprocessing Job

Now we can use the SDK to create a preprocessing job. This job will spin up the image and run the <code>preprocess.py</code> script. Note that this process is going to run distributed in two different instances. 

In [12]:

from sagemaker.processing import ScriptProcessor, ProcessingInput
spark_processor = ScriptProcessor(base_job_name='spark-preprocessor',
                                  image_uri=spark_repository_uri,
                                  command=['/opt/program/submit'],
                                  role=role,
                                  instance_count=2,
                                  instance_type='ml.r5.xlarge',
                                  max_runtime_in_seconds=800,
                                  env={'mode': 'python'})

spark_processor.run(code='preprocess.py',
                    arguments=['s3_input_bucket', bucket,
                               's3_input_key_prefix', input_prefix,
                               's3_output_bucket', bucket,
                               's3_output_key_prefix', input_preprocessed_prefix,
                               's3_model_bucket', bucket,
                               's3_mleap_model_prefix', mleap_model_prefix],
                    logs=True)

Parameter 'session' will be renamed to 'sagemaker_session' in SageMaker Python SDK v2.



Job Name:  spark-preprocessor-2020-10-18-19-35-17-987
Inputs:  [{'InputName': 'code', 'S3Input': {'S3Uri': 's3://sagemaker-us-east-1-452432741922/spark-preprocessor-2020-10-18-19-35-17-987/input/code/preprocess.py', 'LocalPath': '/opt/ml/processing/input/code', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}]
Outputs:  []
[34m2020-10-18 19:38:52,330 INFO namenode.NameNode: STARTUP_MSG: [0m
[34m/************************************************************[0m
[34mSTARTUP_MSG: Starting NameNode[0m
[34mSTARTUP_MSG:   host = algo-1/10.0.214.65[0m
[34mSTARTUP_MSG:   args = [-format, -force][0m
[34mSTARTUP_MSG:   version = 3.0.0[0m
[34mSTARTUP_MSG:   classpath = /usr/hadoop-3.0.0/etc/hadoop:/usr/hadoop-3.0.0/share/hadoop/common/lib/avro-1.7.7.jar:/usr/hadoop-3.0.0/share/hadoop/common/lib/woodstox-core-5.0.3.jar:/usr/hadoop-3.0.0/share/hadoop/common/lib/kerby-asn1-1.0.1.jar:/usr/hadoop-3.0.0/share/hadoop/

In [13]:
print('Top 5 rows from s3://{}/{}/train/'.format(bucket, input_preprocessed_prefix))
!aws s3 cp --quiet s3://$bucket/$input_preprocessed_prefix/train/part-00000 - | head -n5

Top 5 rows from s3://sagemaker-us-east-1-452432741922/sagemaker/spark-preprocess-demo/2020-10-18-19-34-58/input/preprocessed/abalone/train/
5.0,0.0,0.0,0.275,0.195,0.07,0.08,0.031,0.0215,0.025
6.0,0.0,0.0,0.29,0.21,0.075,0.275,0.113,0.0675,0.035
5.0,0.0,0.0,0.29,0.225,0.075,0.14,0.0515,0.0235,0.04
7.0,0.0,0.0,0.325,0.26,0.09,0.1915,0.085,0.036,0.062
9.0,0.0,0.0,0.33,0.26,0.08,0.2,0.0625,0.05,0.07


## Training the XGBoost model

Now that the preprocessing is done and we have the numeric input that we need, we can train an XGBoost model using Amazon SageMaker's XGBoost managed container. 


In [14]:
from sagemaker.amazon.amazon_estimator import get_image_uri

training_image = get_image_uri(sagemaker_session.boto_region_name, 'xgboost', repo_version="0.90-1")
print(training_image)

'get_image_uri' method will be deprecated in favor of 'ImageURIProvider' class in SageMaker Python SDK v2.
There is a more up to date SageMaker XGBoost image. To use the newer image, please set 'repo_version'='1.0-1'. For example:
	get_image_uri(region, 'xgboost', '1.0-1').


683313688378.dkr.ecr.us-east-1.amazonaws.com/sagemaker-xgboost:0.90-1-cpu-py3


In [15]:
training_image

'683313688378.dkr.ecr.us-east-1.amazonaws.com/sagemaker-xgboost:0.90-1-cpu-py3'

In [16]:
s3_train_data = 's3://{}/{}/{}'.format(bucket, input_preprocessed_prefix, 'train/part')
s3_validation_data = 's3://{}/{}/{}'.format(bucket, input_preprocessed_prefix, 'validation/part')
s3_output_location = 's3://{}/{}/{}'.format(bucket, prefix, 'xgboost_model')

xgb_model = sagemaker.estimator.Estimator(training_image,
                                          role, 
                                          train_instance_count=1, 
                                          train_instance_type='ml.m4.xlarge',
                                          train_volume_size = 20,
                                          train_max_run = 3600,
                                          input_mode= 'File',
                                          output_path=s3_output_location,
                                          sagemaker_session=sagemaker_session)

xgb_model.set_hyperparameters(objective = "reg:squarederror",
                              eta = .2,
                              gamma = 4,
                              max_depth = 5,
                              num_round = 10,
                              subsample = 0.7,
                              silent = 0,
                              min_child_weight = 6)

train_data = sagemaker.session.s3_input(s3_train_data, distribution='FullyReplicated', 
                        content_type='text/csv', s3_data_type='S3Prefix')
validation_data = sagemaker.session.s3_input(s3_validation_data, distribution='FullyReplicated', 
                             content_type='text/csv', s3_data_type='S3Prefix')

data_channels = {'train': train_data, 'validation': validation_data}

Parameter image_name will be renamed to image_uri in SageMaker Python SDK v2.
's3_input' class will be renamed to 'TrainingInput' in SageMaker Python SDK v2.
's3_input' class will be renamed to 'TrainingInput' in SageMaker Python SDK v2.


In [17]:
xgb_model.fit(inputs=data_channels, logs=True)

2020-10-18 19:41:34 Starting - Starting the training job...
2020-10-18 19:41:37 Starting - Launching requested ML instances......
2020-10-18 19:42:59 Starting - Preparing the instances for training......
2020-10-18 19:43:50 Downloading - Downloading input data...
2020-10-18 19:44:11 Training - Downloading the training image..[34mINFO:sagemaker-containers:Imported framework sagemaker_xgboost_container.training[0m
[34mINFO:sagemaker-containers:Failed to parse hyperparameter objective value reg:squarederror to Json.[0m
[34mReturning the value itself[0m
[34mINFO:sagemaker-containers:No GPUs detected (normal if no gpus installed)[0m
[34mINFO:sagemaker_xgboost_container.training:Running XGBoost Sagemaker in algorithm mode[0m
[34mINFO:root:Determined delimiter of CSV input is ','[0m
[34mINFO:root:Determined delimiter of CSV input is ','[0m
[34mINFO:root:Determined delimiter of CSV input is ','[0m
[34m[19:44:39] 3344x9 matrix with 30096 entries loaded from /opt/ml/input/data/t

After the training process is complete we can see related information in the <a href='https://console.aws.amazon.com/sagemaker/home?region=us-east-1#/jobs'> AWS console </a> such as logs, training seconds and the location of the training data and the model artifacts. 
Remember we are going to need the model artifacts to build an inference pipeline later. 

## Building an inference Pipeline using SageMaker SDK

Finally, we are going to build an Inference Pipeline. In this pipeline we are going to combine the serialized MLLib Spark pipeline and the XGBoost supervised model.  

In [18]:
# XGBoost model artifacts 
xgb_model.model_data

's3://sagemaker-us-east-1-452432741922/sagemaker/spark-preprocess-demo/2020-10-18-19-34-58/xgboost_model/sagemaker-xgboost-2020-10-18-19-41-33-883/output/model.tar.gz'

In [19]:
# Serialized Pipeline
mleap_model_prefix

'sagemaker/spark-preprocess-demo/2020-10-18-19-34-58/mleap-model'

In [20]:
# We need to pass the schema as an environment variable to the Spark managed container.
import json
schema = {
    "input": [
        {
            "name": "sex",
            "type": "string"
        }, 
        {
            "name": "length",
            "type": "double"
        }, 
        {
            "name": "diameter",
            "type": "double"
        }, 
        {
            "name": "height",
            "type": "double"
        }, 
        {
            "name": "whole_weight",
            "type": "double"
        }, 
        {
            "name": "shucked_weight",
            "type": "double"
        },
        {
            "name": "viscera_weight",
            "type": "double"
        }, 
        {
            "name": "shell_weight",
            "type": "double"
        }, 
    ],
    "output": 
        {
            "name": "features",
            "type": "double",
            "struct": "vector"
        }
}
schema_json = json.dumps(schema)
print(schema_json)

{"input": [{"name": "sex", "type": "string"}, {"name": "length", "type": "double"}, {"name": "diameter", "type": "double"}, {"name": "height", "type": "double"}, {"name": "whole_weight", "type": "double"}, {"name": "shucked_weight", "type": "double"}, {"name": "viscera_weight", "type": "double"}, {"name": "shell_weight", "type": "double"}], "output": {"name": "features", "type": "double", "struct": "vector"}}


In [21]:
from sagemaker.model import Model
from sagemaker.pipeline import PipelineModel
from sagemaker.sparkml.model import SparkMLModel

sparkml_data = 's3://{}/{}/{}'.format(bucket, mleap_model_prefix, 'model.tar.gz')
# pass the schema defined above by using an environment variable that sagemaker-sparkml-serving understands
sparkml_model = SparkMLModel(model_data=sparkml_data, env={'SAGEMAKER_SPARKML_SCHEMA' : schema_json})
xgb_model = Model(model_data=xgb_model.model_data, image=training_image)

model_name = 'inference-pipeline-' + timestamp_prefix
sm_model = PipelineModel(name=model_name, role=role, models=[sparkml_model, xgb_model])

Parameter image will be renamed to image_uri in SageMaker Python SDK v2.
Parameter image will be renamed to image_uri in SageMaker Python SDK v2.


## Using the Pipeline for real time inference

Now we can deploy the Pipeline in an endpoint. Notice that we are going to utilize a single instance, but we could also spin up a cluster to distribute the load. Once we start creating the endpoint we can check the status using <a href='https://console.aws.amazon.com/sagemaker/home?region=us-east-1#/endpoints'>the console</a>.

In [22]:
endpoint_name = 'inference-pipeline-ep-' + timestamp_prefix
sm_model.deploy(initial_instance_count=1, instance_type='ml.c4.xlarge', endpoint_name=endpoint_name)

-----------------!

In [23]:
from sagemaker.predictor import json_serializer, csv_serializer, json_deserializer, RealTimePredictor
from sagemaker.content_types import CONTENT_TYPE_CSV, CONTENT_TYPE_JSON

In [24]:
payload = {"data": ["F",0.515,0.425,0.14,0.766,0.304,0.1725,0.255]}
predictor = RealTimePredictor(endpoint=endpoint_name, sagemaker_session=sagemaker_session, serializer=json_serializer,
                                content_type=CONTENT_TYPE_JSON, accept=CONTENT_TYPE_CSV)

print(predictor.predict(payload))

b'10.707151412963867'


## Clean up

Avoid incurring in costs by deleting the deployed endpoint

In [25]:
predictor.delete_endpoint()

In [26]:
print('End', datetime.now().strftime("%H:%M:%S"))


End 19:53:51
