You'll need to set up an S3 client object and define the locations within your default S3 bucket for storing the model artifacts. This step involves the automatic creation of a write bucket named sagemaker-<your-Region>-<your-account-id>, facilitated by the SageMaker session object as shown through sess.default_bucket(). Additionally, for training purposes, the datasets are located in a publicly accessible S3 bucket named sagemaker-sample-files. This bucket is referred to as the read bucket, and its specific location is determined by the read prefix, as indicated on line 29 of the code.

In [None]:
import pandas as pd  # Importing the pandas library for data manipulation and analysis.
import numpy as np  # Importing the numpy library for numerical operations.
import boto3  # Importing the boto3 library to interact with AWS services.
import sagemaker  # Importing the sagemaker library for building, training, and deploying machine learning models on AWS SageMaker.
import time  # Importing the time library for time-related tasks.
import json  # Importing the json library for JSON parsing.
import io  # Importing the io library for handling I/O stream operations.
from io import StringIO  # Importing StringIO from io for in-memory text streams.
import base64  # Importing the base64 library for encoding and decoding Base64 data.
import pprint  # Importing the pprint library for pretty-printing Python data structures.
import re  # Importing the re library for regular expression operations.

from sagemaker.image_uris import retrieve  # Importing the retrieve function from sagemaker.image_uris for retrieving container image URIs.

sess = sagemaker.Session()  # Creating a SageMaker session object for managing interactions with the SageMaker service.
write_bucket = sess.default_bucket()  # Retrieving the default S3 bucket for the SageMaker session.
write_prefix = "fraud-detect-demo"  # Defining the S3 prefix for writing data related to the fraud detection demo.

region = sess.boto_region_name  # Retrieving the AWS region name associated with the SageMaker session.
s3_client = boto3.client("s3", region_name=region)  # Creating an S3 client for interacting with Amazon S3.
sm_client = boto3.client("sagemaker", region_name=region)  # Creating a SageMaker client for interacting with the SageMaker service.
sm_runtime_client = boto3.client("sagemaker-runtime")  # Creating a SageMaker Runtime client for invoking deployed models.
sm_autoscaling_client = boto3.client("application-autoscaling")  # Creating an Application Auto Scaling client for managing auto-scaling policies.

sagemaker_role = sagemaker.get_execution_role()  # Retrieving the IAM role for SageMaker execution.

# S3 locations used for parameterizing the notebook run
read_bucket = "sagemaker-sample-files"  # Defining the S3 bucket for reading sample files.
read_prefix = "datasets/tabular/synthetic_automobile_claims"  # Defining the S3 prefix for reading synthetic automobile claims data.
model_prefix = "models/xgb-fraud"  # Defining the S3 prefix for the XGBoost fraud detection model.

data_capture_key = f"{write_prefix}/data-capture"  # Defining the S3 key for data capture during model inference.

# S3 location of trained model artifact
model_uri = f"s3://{read_bucket}/{model_prefix}/fraud-det-xgb-model.tar.gz"  # Defining the S3 URI of the trained XGBoost model artifact.

# S3 path where data captured at endpoint will be stored
data_capture_uri = f"s3://{write_bucket}/{data_capture_key}"  # Defining the S3 URI for storing data captured at the inference endpoint.

# S3 location of test data
test_data_uri = f"s3://{read_bucket}/{read_prefix}/test.csv"  # Defining the S3 URI for the test data.

1. Create a SageMaker model from the model artifact

In [None]:
# Retrieve the SageMaker managed XGBoost image
training_image = retrieve(framework="xgboost", region=region, version="1.3-1")

# Specify a unique model name that does not exist
model_name = "fraud-detect-xgb"
primary_container = {
                     "Image": training_image,
                     "ModelDataUrl": model_uri
                    }

model_matches = sm_client.list_models(NameContains=model_name)["Models"]
if not model_matches:
    model = sm_client.create_model(ModelName=model_name,
                                   PrimaryContainer=primary_container,
                                   ExecutionRoleArn=sagemaker_role)
else:
    print(f"Model with name {model_name} already exists! Change model name to create new")

2. Create an endpoint configuration to specify properties, including instance type and count

In [None]:
endpoint_name = f"{model_name}-endpoint"

endpoint_matches = sm_client.list_endpoints(NameContains=endpoint_name)["Endpoints"]
if not endpoint_matches:
    endpoint_response = sm_client.create_endpoint(
                                                  EndpointName=endpoint_name,
                                                  EndpointConfigName=endpoint_config_name
                                                 )
else:
    print(f"Endpoint with name {endpoint_name} already exists! Change endpoint name to create new")

resp = sm_client.describe_endpoint(EndpointName=endpoint_name)
status = resp["EndpointStatus"]
while status == "Creating":
    print(f"Endpoint Status: {status}...")
    time.sleep(60)
    resp = sm_client.describe_endpoint(EndpointName=endpoint_name)
    status = resp["EndpointStatus"]
print(f"Endpoint Status: {status}")

3. Create the endpoint using the endpoint configuration

In [None]:
endpoint_name = f"{model_name}-endpoint"

endpoint_matches = sm_client.list_endpoints(NameContains=endpoint_name)["Endpoints"]
if not endpoint_matches:
    endpoint_response = sm_client.create_endpoint(
                                                  EndpointName=endpoint_name,
                                                  EndpointConfigName=endpoint_config_name
                                                 )
else:
    print(f"Endpoint with name {endpoint_name} already exists! Change endpoint name to create new")

resp = sm_client.describe_endpoint(EndpointName=endpoint_name)
status = resp["EndpointStatus"]
while status == "Creating":
    print(f"Endpoint Status: {status}...")
    time.sleep(60)
    resp = sm_client.describe_endpoint(EndpointName=endpoint_name)
    status = resp["EndpointStatus"]
print(f"Endpoint Status: {status}")

4. Invoke the inference endpoint

In [None]:
# Fetch test data to run predictions with the endpoint
test_df = pd.read_csv(test_data_uri)

# For content type text/csv, payload should be a string with commas separating the values for each feature
# This is the inference request serialization step
# CSV serialization
csv_file = io.StringIO()
test_sample = test_df.drop(["fraud"], axis=1).iloc[:5]
test_sample.to_csv(csv_file, sep=",", header=False, index=False)
payload = csv_file.getvalue()
response = sm_runtime_client.invoke_endpoint(
                                             EndpointName=endpoint_name,
                                             Body=payload,
                                             ContentType="text/csv",
                                             Accept="text/csv"
                                            )

# This is the inference response deserialization step
# This is a bytes object
result = response["Body"].read()
# Decoding bytes to a string
result = result.decode("utf-8")
# Converting to list of predictions
result = re.split(",|\n",result)

prediction_df = pd.DataFrame()
prediction_df["Prediction"] = result[:5]
prediction_df["Label"] = test_df["fraud"].iloc[:5].values
prediction_df 

Because data capture was set up in the endpoint configuration, you have a way to inspect what payload was sent to the endpoint alongside its response. The captured data takes some time to get fully uploaded to S3. Check if data capture is complete.

In [None]:
from sagemaker.s3 import S3Downloader
print("Waiting for captures to show up", end="")
for _ in range(90):
    capture_files = sorted(S3Downloader.list(f"{data_capture_uri}/{endpoint_name}"))
    if capture_files:
        capture_file = S3Downloader.read_file(capture_files[-1]).split("\n")
        capture_record = json.loads(capture_file[0])
        if "inferenceId" in capture_record["eventMetadata"]:
            break
    print(".", end="", flush=True)
    time.sleep(1)
print()
print(f"Found {len(capture_files)} Data Capture Files:")

The captured data is stored as a separate file for each endpoint invocation in S3 in JSON Lines, a newline-delimited format to store structured data where each line is a JSON value. Copy and paste the following code to retrieve the data capture files. 

In [None]:
capture_files = sorted(S3Downloader.list(f"{data_capture_uri}/{endpoint_name}"))
capture_file = S3Downloader.read_file(capture_files[0]).split("\n")
capture_record = json.loads(capture_file[0])
capture_record

 The following code to decode the data in the captured files using base64. The code retrieves the five test samples that were sent in as payload, and their predictions. This feature is useful in inspecting endpoint loads with model responses and monitor the model performance.

In [None]:
input_data = capture_record["captureData"]["endpointInput"]["data"]
output_data = capture_record["captureData"]["endpointOutput"]["data"]
input_data_list = base64.b64decode(input_data).decode("utf-8").split("\n")
print(input_data_list)
output_data_list = base64.b64decode(output_data).decode("utf-8").split("\n")
print(output_data_list)

# Configure auto scaling for endpoint
Auto scaling can be set up in two steps. First, you configure a scaling policy with details of minimum, desired, and maximum number of instances per endpoint. Copy and paste the following code to configure a target tracking scaling policy. The specified maximum number of instances are launched when traffic goes over chosen thresholds, which you choose in the next step.

In [None]:
resp = sm_client.describe_endpoint(EndpointName=endpoint_name)

# SageMaker expects resource id to be provided with the following structure
resource_id = f"endpoint/{endpoint_name}/variant/{resp['ProductionVariants'][0]['VariantName']}"

# Scaling configuration
scaling_config_response = sm_autoscaling_client.register_scalable_target(
                                                          ServiceNamespace="sagemaker",
                                                          ResourceId=resource_id,
                                                          ScalableDimension="sagemaker:variant:DesiredInstanceCount", 
                                                          MinCapacity=1,
                                                          MaxCapacity=2
                                                        )

Create the scaling policy. The chosen scaling metric is SageMakerVariantInvocationsPerInstance, which is the average number of times per minute that each inference instance for a model variant is invoked. When this number crosses the chosen threshold of 5, the auto scaling is triggered.

In [None]:
# Create Scaling Policy
policy_name = f"scaling-policy-{endpoint_name}"
scaling_policy_response = sm_autoscaling_client.put_scaling_policy(
                                                PolicyName=policy_name,
                                                ServiceNamespace="sagemaker",
                                                ResourceId=resource_id,
                                                ScalableDimension="sagemaker:variant:DesiredInstanceCount",
                                                PolicyType="TargetTrackingScaling",
                                                TargetTrackingScalingPolicyConfiguration={
                                                    "TargetValue": 5.0, # Target for avg invocations per minutes
                                                    "PredefinedMetricSpecification": {
                                                        "PredefinedMetricType": "SageMakerVariantInvocationsPerInstance",
                                                    },
                                                    "ScaleInCooldown": 600, # Duration in seconds until scale in
                                                    "ScaleOutCooldown": 60 # Duration in seconds between scale out
                                                }
                                            )

retrieve the scaling policy details

In [None]:
response = sm_autoscaling_client.describe_scaling_policies(ServiceNamespace="sagemaker")

pp = pprint.PrettyPrinter(indent=4, depth=4)
for i in response["ScalingPolicies"]:
    pp.pprint(i["PolicyName"])
    print("")
    if("TargetTrackingScalingPolicyConfiguration" in i):
        pp.pprint(i["TargetTrackingScalingPolicyConfiguration"])

Stress-test the endpoint. The code runs for 250 seconds and invokes the endpoint repeatedly by sending randomly selected samples from the test dataset

In [None]:
request_duration = 250
end_time = time.time() + request_duration
print(f"Endpoint will be tested for {request_duration} seconds")
while time.time() < end_time:
    csv_file = io.StringIO()
    test_sample = test_df.drop(["fraud"], axis=1).iloc[[np.random.randint(0, test_df.shape[0])]]
    test_sample.to_csv(csv_file, sep=",", header=False, index=False)
    payload = csv_file.getvalue()
    response = sm_runtime_client.invoke_endpoint(
                                                 EndpointName=endpoint_name,
                                                 Body=payload,
                                                 ContentType="text/csv"
                                                )

When the endpoint receives the increased payload, you can check the status of the endpoint by running the code below. This code checks when the status of the endpoint changes from InService to Updating and keeps track of the instance counts. After a few minutes, you can see the status changing from InService to Updating and back to InService but with a higher instance count.

In [None]:
# Check the instance counts after the endpoint gets more load
response = sm_client.describe_endpoint(EndpointName=endpoint_name)
endpoint_status = response["EndpointStatus"]
request_duration = 250
end_time = time.time() + request_duration
print(f"Waiting for Instance count increase for a max of {request_duration} seconds. Please re run this cell in case the count does not change")
while time.time() < end_time:
    response = sm_client.describe_endpoint(EndpointName=endpoint_name)
    endpoint_status = response["EndpointStatus"]
    instance_count = response["ProductionVariants"][0]["CurrentInstanceCount"]
    print(f"Status: {endpoint_status}")
    print(f"Current Instance count: {instance_count}")
    if (endpoint_status=="InService") and (instance_count>1):
        break
    else:
        time.sleep(15)