# Inference on a Spark ML Model

Following the previous notebook, we'll now deploy the model to a live endpoint. Notice that this notebook does not use the EMR Cluster anymore - in fact, it can't connect to it. The "Python 3(Data Science)" kernel is the best choice for executing this notebook.

# Initial setup

First, we'll use the same code we used on the training notebook to retrieve the cluster and bucket. We'll only need the bucket nane, to retrieve the model tarball.

In [1]:
import boto3

# This will work only if there's a single created stack in the account, otherwise it will get the first one.
cfn = boto3.client('cloudformation')
stack_name = cfn.list_stacks(StackStatusFilter=['CREATE_COMPLETE'])['StackSummaries'][0]['StackName']
emr_cluster_id = cfn.describe_stack_resource(
    StackName=stack_name,
    LogicalResourceId='EMRCluster'
)['StackResourceDetail']['PhysicalResourceId']
emr_bucket = cfn.describe_stack_resource(
    StackName=stack_name,
    LogicalResourceId='S3Bucket'
)['StackResourceDetail']['PhysicalResourceId']
emr_cluster_id = "j-22XVHS53FXPU1"

print(f'The EMR Cluster Id is {emr_cluster_id}\nThe S3 bucket EMR has access to is {emr_bucket}')

The EMR Cluster Id is j-22XVHS53FXPU1
The S3 bucket EMR has access to is studio-emr-v5-s3bucket-t2wfrxqwzj0g


The cell below declares the schema for input and output for the endpoint. It is required by MLeap to work, more details can be found on the [Spark ML Service Container readme](https://github.com/aws/sagemaker-sparkml-serving-container#procedure-to-pass-the-schema).

In [2]:
import json

schema = {
    "input":  [{"name": "review", "type": "string"}],
    "output": {"name": "prediction", "type": "double"}
}
schema_json = json.dumps(schema)
print(schema_json)

{"input": [{"name": "review", "type": "string"}], "output": {"name": "prediction", "type": "double"}}


# Creating the Endpoint

Then, we'll set up the model location and sagemaker required information for creating the endpoint.

In [4]:
import sagemaker
from sagemaker import get_execution_role

sess = sagemaker.Session()

# if get_execution_role fails, go to the SageMaker Studio Console
# (https://console.aws.amazon.com/sagemaker/home?region=us-east-1#/studio) and copy the execution role from there
role = get_execution_role()

model_bucket = emr_bucket
model_path = "emr/movie_reviews/mleap"
model_file = "model.tar.gz"

# S3 location of where you uploaded your trained and serialized SparkML model
sparkml_data = f"s3://{model_bucket}/{model_path}/{model_file}"

Now we can deploy the model itself. We'll first instantiate a `SparkMLModel` object, passing the tarball, role, session and a name to it. The name has to be unique, therefore we normally use a timestamp in case of rerunning the code. Notice that the SparkML Model receives one additional `env` parameter, where we pass the schema for the endpoint.

We also redefined the serializer to JSON, to make it REST-compatible. By default, Spark ML endpoints expect CSV input.

*Notice: model deployment takes a few minutes. You can follow it on the left bar, by clicking the "SageMaker Components and Registries" icon and select "Endpoints", or on the [console](https://console.aws.amazon.com/sagemaker/home?region=us-east-1#/endpoints).*

In [5]:
import time
from time import gmtime, strftime
from sagemaker.sparkml.model import SparkMLModel
from sagemaker.deserializers import JSONDeserializer
from sagemaker.serializers import JSONSerializer

timestamp_prefix = strftime("%Y-%m-%d-%H-%M-%S", gmtime())

model_name = "sparkml-mvreviews-" + timestamp_prefix
sparkml_model = SparkMLModel(
    model_data=sparkml_data,
    role=role,
    sagemaker_session=sess,
    name=model_name,
    # passing the schema defined above by using an environment
    # variable that sagemaker-sparkml-serving understands
    env={"SAGEMAKER_SPARKML_SCHEMA": schema_json},
)


endpoint_name = "sparkml-mvreviews-ep-" + timestamp_prefix
predictor = sparkml_model.deploy(
    initial_instance_count=1, instance_type="ml.m5.large", endpoint_name=endpoint_name, serializer=JSONSerializer()
)

-------------!

# Making inference

First, we'll try one simple string. Go ahead and change the text, just make sure to respect the format (all low-caps, no punctuation, just one space between each word). This is in line with the preprocessing we did before. In a real production environment, we'd create a lambda to execute the same transformations and then call the model, and we'd use the lambda as the exposed endpoint.

Legend:
- Positive sentiment: 1
- Negative sentiment: 0

In [6]:
payload = {"data": ["this movie is amazing the best 2 hours of my life"]}

In [7]:
int(float(predictor.predict(payload).decode('utf8')))

1

The result is a byte stream, containing the values '1.0' or '0.0'. With some string processing and typecasting, we get a 1 or 0. Let's use the endpoint to predict on the validation data we saved before. First, we load the csv. Since Spark always saves on a subfile and Pandas' `read_csv` can't expand wildcards on s3, we'll have to scan the bucket to determine the exact file name.

In [8]:
import pandas as pd
import boto3

s3 = boto3.resource('s3')
bucket = s3.Bucket(emr_bucket)
key = next(obj.key for obj in bucket.objects.all() if 'predictions.csv' in obj.key and obj.key.endswith('.csv'))
print(f"Loading s3://{emr_bucket}/{key}...")

df = pd.read_csv(
    f"s3://{emr_bucket}/{key}"
).drop('prediction', axis=1)

Loading s3://studio-emr-v5-s3bucket-t2wfrxqwzj0g/emr/movie_reviews/output/predictions.csv/part-00000-8835aa34-078a-4391-87b1-68f58683663b-c000.csv...


We can now calculate the predictions for the entire dataset. This can take a while.

In [9]:
df['prediction'] = df.review.apply(lambda x: int(float(predictor.predict({"data": [x]}).decode('utf8'))))

And then we check the accuracy of the predictor (note: it's probably not so great - we're just showing how to deploy a model, not how to optimize it).

In [10]:
import numpy as np

df['correct'] = df.prediction == df.label

df.correct.mean()

0.8603314621758889

Consumers of the endpoint can use the boto3 `sagemaker-runtime` client to invoke it. That doesn't require the SageMaker SDK and has very little overhead. It's the recommended way to access SageMaker endpoint from lambda and other clients that don't need the full SDK.

In [11]:
import boto3

smrun = boto3.client("sagemaker-runtime")
prediction = int(float(smrun.invoke_endpoint(
    EndpointName=sparkml_model.endpoint_name, Body=json.dumps(payload).encode('utf8'), ContentType="application/json"
)['Body'].read().decode('utf8')))

In [12]:
prediction

1

Finally, we delete the endpoint to save resources.

In [13]:
predictor.delete_endpoint()