# SageMaker Serial Inference Pipeline 
## Introduction
An **inference pipeline** is a Amazon SageMaker model that is composed of a linear sequence of 2 to 15 containers that process requests for inferences on data. You use an inference pipeline to define and deploy any combination of pretrained SageMaker built-in algorithms and your own custom algorithms packaged in Docker containers. You can use an inference pipeline to combine **preprocessing**, **predictions**, and **post-processing** data science tasks. Inference pipelines are fully managed.

Here's an architecture diagram that depicts a standard serial inference pipeline in SageMaker:

![sm inference pipeline](images/sm-serial-inference-pipeline.jpg) 

## Solution Overview
The focus of this notebook is to build an ML solution using SageMaker serial inference pipeline. 

To demonstrate the capability, we chose a recommendation use case that uses Neural Collaboritive Filtering (NCF) based on a project [here](https://github.com/aws-samples/amazon-sagemaker-custom-recommender-system). This prooject is also available in SageMaker Jumpstart. (In SageMaker Jumpstart, search for **Customized Recommender System**).

Additionally, we'll make some modification on the original model architecture to make a serial inference pipeline, as depicted in the following architecture:

![serial_inference](images/serial-inference-pipeline.png)


In [None]:
from sagemaker.sklearn.model import SKLearnModel
from sagemaker.tensorflow.model import TensorFlowModel
from sagemaker.local import LocalSession
from sagemaker import Session
import boto3
import sagemaker
from PIL import Image
import io
session = Session()

# Load the models
As shown in the previous architecture diagram, the serial inference pipeline is made of 2 ML models:

1. A KNN model that finds topk similar movies based on the given input
2. A tensorflow model that ranks the movies based on the predicted relevance scores.

For the first model, we've provided a pretrained KNN model in this lab so that you dont' need to go through the training process. The KNN model was trained on the following dataset:

* **Data source**: Movielens(ml-latest-small) (http://movielens.org)

In addition, the model is used in the serial inference pipeline to fetch the topk movies similar to the input title. 

For the second model, we leverage a trained tensorflow recommender model to perform rankings of the movies from the KNN model. The model weights can be found in the given S3 bucket so that it could be deployed into SageMaker environment.

In [None]:
%%sh
cd knn_model/pkl 
tar -czvf ../knnmodel.tar.gz .

In [None]:
s3_code_prefix = "models/knn"
bucket = session.default_bucket()  # bucket to house the model artifacts
knn_model_artifact = session.upload_data("knn_model/knnmodel.tar.gz", bucket, s3_code_prefix)

In [None]:
role = sagemaker.get_execution_role()
s3_model_artifact_tensorflow = "s3://sagemaker-solutions-prod-us-east-1/0.2.0/Customized-recommender-system/1.0.0/artifacts/model/model.tar.gz"
tensorflow_version = '2.1.0'
python_version = 'py3'

## Define Trained Models For Inference Pipeline 
Before we could deploy an inference pipeline, we need to define the models to be included. The first step is to define the model objects and associate them with a SageMaker Pipeline model as shown in following order:

1. Define model objects (between 2-15 model objects).
2. Create a SageMakerPipeline model object and associate the model objects created in the previous into the pipeline.

First, let's restore the SageMaker feature store group names that we've created in the previous step.

In [None]:
%store -r

In [None]:
from sagemaker.pipeline import PipelineModel
from time import gmtime, strftime

model_envs = {
    "titles_feature_group_name": titles_feature_group_name,
    "titles_embedding_mapping_group_name": titles_embeddings_mappings_feature_group_name,
    "AWS_DEFAULT_REGION": session._region_name
}
    
sklearn_model = SKLearnModel(model_data=knn_model_artifact,
                             role=role,
                             entry_point="custom_inference.py",
                             framework_version="1.0-1",
                             sagemaker_session=session,
                             source_dir='sklearn/src', 
                             env = model_envs)

tensorflow_model = TensorFlowModel(model_data=s3_model_artifact_tensorflow,
                        framework_version=tensorflow_version,
                        role=role,
                        sagemaker_session=session,
                        entry_point="inference.py",
                        source_dir="ncf/src")

timestamp_prefix = strftime("%Y-%m-%d-%H-%M-%S", gmtime())

endpoint_name = "inference-pipeline-ep-" + timestamp_prefix

sm_model = PipelineModel(
    name=f"recsys-inference-pipeline-{timestamp_prefix}", role=role, models=[sklearn_model, tensorflow_model], sagemaker_session=session)


Deploy the serial inference pipeline by invoking the model.deploy method and choosing an appropriate instance type. Optionally provide a name of the endpoint.

In [None]:
sm_model.deploy(initial_instance_count=1, instance_type="ml.c5.4xlarge", endpoint_name=endpoint_name)

# Testing the Serial Inference Pipeline
Once the serial inference model is deployed, we can test the functionality to ensure it works as expected. SageMaker SDK provides an easy way to invoke the inference endpoint using a Predictor class, as shown below.

In [None]:
from sagemaker.predictor import Predictor
from sagemaker.serializers import JSONSerializer
from sagemaker.deserializers import JSONDeserializer
import json

predictor = Predictor(
    endpoint_name=endpoint_name,
    sagemaker_session=session,
    serializer=JSONSerializer(),
    deserializer=JSONDeserializer()
)

# Test Serial Inference Pipeline With Sample Data
Once the serial inference pipeline has been deployed successfully, we want to send some inference requests to the deployed endpoint to ensure the end to end orchestration works as expected.

We'll use the given movies dataset in the data folder to perform inferences to validate the deployed endpoint.

In [None]:
import pandas as pd
movies_df = pd.read_csv("data/movies.csv")

In [None]:
import time

In [None]:
sample_movie_id = 2139
sample_user_id = "123"
test_data = { "user_id" :  sample_user_id, "movie_id" : str(sample_movie_id) }
payload = json.dumps(test_data)
predictions = predictor.predict(payload)
print(predictions)

In [None]:
featurestore_runtime_client = boto3.client('sagemaker-featurestore-runtime')

# Transform the Prediction Results
After invoking the endpoint successfully, we have a list of item ids returned from the serial inference pipeline. In the next cell, we will transform these movie ids back to the titles for better readability. We use the feature store group that contains the mappings of movie ids and the associated item_ids (index) to retrieve movie_ids. Detail is shown in the following: 

In [None]:
recommended_movies = []
for prediction in predictions['predictions']:
    item_id = prediction['item_id']
    title_embedding_feature_record = featurestore_runtime_client.get_record(FeatureGroupName=titles_embeddings_mappings_feature_group_name, 
                                                        RecordIdentifierValueAsString=str(item_id))
    movie_id = [ x['ValueAsString'] for x in title_embedding_feature_record[ 'Record'] if x['FeatureName'] == 'movieId' ][0]
    matching_movie = movies_df[movies_df['movieId'] == int(movie_id)]
    recommended_movies.append(matching_movie['title'].values[0])

In [None]:
watched_movie = movies_df[movies_df['movieId'] == int(sample_movie_id)]['title'].values[0]

In [None]:
print(f"The top 5 recommended movies after watching '{watched_movie}' are as followed:\n")
for recommended_movie in recommended_movies:
    print(recommended_movie)

# Performance Test
Finally, we'll run some performance tests against the serial inference endpoint to measure the total latency. We use the test data given in the lab to perform the test as followed:

In [None]:
import numpy as np

In [None]:
sample_movie_id = 2139
sample_user_id = "123"
test_data = { "user_id" :  sample_user_id, "movie_id" : str(sample_movie_id) }
payload = json.dumps(test_data)
pred_time = []
for i in range(100):
    start = time.time()
    predictions = predictor.predict(payload)
    pred_time.append(time.time() - start)

Finally, we visualize the p95, p90 and average latency based on the metrics captured from the test above. 

In [None]:
print("\nPredictions time statistic: \n")
print('\nP95: ' + str(np.percentile(pred_time, 95)*1000) + ' ms\n')    
print('P90: ' + str(np.percentile(pred_time, 90)*1000) + ' ms\n')
print('Average: ' + str(np.average(pred_time)*1000) + ' ms\n')

## Amazon Cloudwatch Metrics
In the following section, we visualize the relevant cloudwatch metrics for the inference pipeline.
We will focus on the metrics around invocation counts, server/client errors and model latency.

### Cloudwatch Metric namespace: AWS/SageMaker
* Metrics Frequency: 1-minute.
* Overhead latency: The time that it takes to transport a request to the model container from the SageMaker Runtime Runtime API and transport the response back to the SageMaker Runtime Runtime API.
* Model latency: The time that it takes the model container to process the request and return a response.
* Container Latency: The time that it takes the container to process the request and return a response. 
* Container CPU Consumption: The number of CPUs used in processing the incoming request
* Container Memory Consumption: The amount of memory used in procesing the incoming request

In [None]:
cw_client = boto3.client("cloudwatch")

In [None]:
def display_cw_metrics(endpoint_name):
    images = []
    stat = "Average"
    # Container/Model Latency
    metrics = [
        [ "AWS/SageMaker", "ModelLatency", "EndpointName", endpoint_name, "VariantName", "AllTraffic" ],
        [ "AWS/SageMaker", "OverheadLatency", "EndpointName", endpoint_name, "VariantName", "AllTraffic" ],
        [ ".", "ContainerLatency", ".", ".", ".", ".", "ContainerName", "container-2" ],
        [ "...", "container-1" ]]
            
    metric_widget = {
        "metrics": metrics,
        "view": "timeSeries",
        "stacked": False,
        "stat": stat,
        "period": 60,
        "width": 1000,
        "height": 200,
    }
    response = cw_client.get_metric_widget_image(
        MetricWidget=json.dumps(metric_widget)
    )
    
    images.append(Image.open(io.BytesIO(response["MetricWidgetImage"])))

    # Container CPU Utilization
    metrics = [[ "/aws/sagemaker/Endpoints", "CPUUtilization", "EndpointName", endpoint_name, 
                "VariantName", "AllTraffic", "ContainerName", "container_1" ],
                [ "...", "container_2" ]]

    metric_widget = {
        "metrics": metrics,
        "view": "timeSeries",
        "stacked": False,
        "stat": stat,
        "period": 60,
        "width": 1000,
        "height": 200,
    }
    response = cw_client.get_metric_widget_image(
        MetricWidget=json.dumps(metric_widget)
    )

    images.append(Image.open(io.BytesIO(response["MetricWidgetImage"])))


    # Container Memory Utilization
    metrics = [
            [ "/aws/sagemaker/Endpoints", "MemoryUtilization", "EndpointName", endpoint_name, 
             "VariantName", "AllTraffic", "ContainerName", "container_1" ],
            [ "...", "container_2" ]]

    metric_widget = {
        "metrics": metrics,
        "view": "timeSeries",
        "stacked": False,
        "stat": stat,
        "period": 60,
        "width": 1000,
        "height": 200,
    }
    response = cw_client.get_metric_widget_image(
        MetricWidget=json.dumps(metric_widget)
    )

    images.append(Image.open(io.BytesIO(response["MetricWidgetImage"])))

    # Invocattions 
    metrics = [[ "AWS/SageMaker", "Invocations", "EndpointName", endpoint_name, "VariantName", "AllTraffic" ],
        [ ".", "InvocationsPerInstance", ".", ".", ".", "." ]]

    metric_widget = {
        "metrics": metrics,
        "view": "timeSeries",
        "stacked": False,
        "stat": stat,
        "period": 60,
        "width": 1000,
        "height": 200,
    }
    response = cw_client.get_metric_widget_image(
        MetricWidget=json.dumps(metric_widget)
    )

    images.append(Image.open(io.BytesIO(response["MetricWidgetImage"])))
    
    for image in images:
        image.show()

In [None]:
display_cw_metrics(endpoint_name)

## Conclusion
In this notebook, we learn how to deploy a SageMaker Serial Inference Pipeline with a scikit-learn KNN, and a Tensorflow model. To validate the latency, we ran performance tests against the deployed endpoint and showed the total latency based on the test data. 
We also use Cloudwatch metrics to visualize the model latency and compute resource utilization at the container level as well as the instance level. 

## Clean up

In [None]:
predictor.delete_endpoint()