In this tutorial, we will use the Real-Time Inference option to deploy a binary classification XGBoost model, pre-trained on a synthetic auto insurance claims dataset. 
The dataset consists of details and extracted features from claims and customer tables along with a fraud column indicating whether a claim was fraudulent or not. 
The model predicts the probability of a claim being fraudulent. 

## Step 1: Setup a SageMaker Studio Notebook

In [3]:
%pip install --upgrade -q aiobotocore

[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
awscli 1.27.154 requires botocore==1.29.154, but you have botocore 1.29.76 which is incompatible.
awscli 1.27.154 requires PyYAML<5.5,>=3.10, but you have pyyaml 6.0 which is incompatible.
awscli 1.27.154 requires rsa<4.8,>=3.1.2, but you have rsa 4.9 which is incompatible.
boto3 1.26.154 requires botocore<1.30.0,>=1.29.154, but you have botocore 1.29.76 which is incompatible.
sagemaker 2.165.0 requires importlib-metadata<5.0,>=1.4.0, but you have importlib-metadata 6.6.0 which is incompatible.
sparkmagic 0.20.4 requires nest-asyncio==1.5.5, but you have nest-asyncio 1.5.6 which is incompatible.[0m[31m
[0mNote: you may need to restart the kernel to use updated packages.


In [3]:
import pandas as pd
import numpy as np
import boto3
import sagemaker
import time
import json
import io
from io import StringIO
import base64
import pprint
import re

from sagemaker.image_uris import retrieve

In [4]:
%pip install --upgrade botocore
#restart your kernel, in case of any errors

[0mNote: you may need to restart the kernel to use updated packages.


In [6]:
# we need to instantiate the S3 client object and the locations inside default S3 bucket, 
# where the model artifacts are uploaded

sess = sagemaker.Session()

write_bucket = sess.default_bucket()
write_prefix = "fraud-detect-demo"

region = sess.boto_region_name
s3_client = boto3.client("s3", region_name=region)
sm_client = boto3.client("sagemaker", region_name=region)
sm_runtime_client = boto3.client("sagemaker-runtime")
sm_autoscaling_client = boto3.client("application-autoscaling")

sagemaker_role = sagemaker.get_execution_role()


# S3 locations used for parameterizing the notebook run
read_bucket = "sagemaker-sample-files"
read_prefix = "datasets/tabular/synthetic_automobile_claims" 
model_prefix = "models/xgb-fraud"

data_capture_key = f"{write_prefix}/data-capture"

# S3 location of trained model artifact
model_uri = f"s3://{read_bucket}/{model_prefix}/fraud-det-xgb-model.tar.gz"

# S3 path where data captured at endpoint will be stored
data_capture_uri = f"s3://{write_bucket}/{data_capture_key}"

# S3 location of test data
test_data_uri = f"s3://{read_bucket}/{read_prefix}/test.csv" 

In [21]:
# Let's briefly check the data structure:

import pandas as pd
data = pd.read_csv(test_data_uri)
data[:10]

Unnamed: 0,fraud,num_vehicles_involved,num_injuries,num_witnesses,police_report_available,injury_claim,vehicle_claim,total_claim_amount,incident_month,incident_day,...,authorities_contacted_ambulance,policy_state_ca,policy_state_az,policy_state_nv,policy_state_id,policy_state_wa,policy_state_or,customer_gender_other,customer_gender_male,customer_gender_female
0,0,2,0,0,1,58400,14247.766867,72647.766867,7,29,...,0,1,0,0,0,0,0,0,0,1
1,0,2,0,2,0,11500,10675.671347,22175.671347,11,28,...,0,1,0,0,0,0,0,0,0,1
2,0,2,0,0,1,18500,10202.266354,28702.266354,1,6,...,0,1,0,0,0,0,0,0,1,0
3,0,1,0,0,0,16300,9338.348066,25638.348066,9,1,...,0,1,0,0,0,0,0,0,1,0
4,0,2,1,0,0,14700,28145.924994,42845.924994,10,23,...,1,0,0,0,0,0,1,0,1,0
5,0,2,0,1,1,5100,13375.679744,18475.679744,11,24,...,0,0,0,0,0,0,1,0,0,1
6,0,2,0,1,0,7700,17182.473642,24882.473642,12,1,...,0,0,1,0,0,0,0,0,0,1
7,0,2,0,0,0,20300,15425.363265,35725.363265,6,30,...,0,1,0,0,0,0,0,0,0,0
8,0,1,0,0,0,14900,1475.167607,16375.167607,7,10,...,0,1,0,0,0,0,0,0,0,1
9,0,2,1,0,1,16600,35715.897799,52315.897799,10,3,...,0,1,0,0,0,0,0,0,0,1


## Step 2: Create a Real-Time Inference Endpoint

We have to implement three steps to deploy a model:

* Create a SageMaker model from the model artifact
* Create an endpoint configuration to specify properties, including instance type and count
* Create the endpoint using the endpoint configuration

In [7]:
# let's create a SageMaker model using the trained model artifacts stored in Amazon S3.

# The create_model method takes the Docker container containing the training image 
# (for this model, the XGBoost container) and the Amazon S3 location of the model artifacts as parameters.

# Retrieve the SageMaker managed XGBoost image
training_image = retrieve(framework="xgboost", region=region, version="1.3-1")

# Specify a unique model name that does not exist
model_name = "fraud-detect-xgb"
primary_container = {
                     "Image": training_image,
                     "ModelDataUrl": model_uri
                    }

model_matches = sm_client.list_models(NameContains=model_name)["Models"]
if not model_matches:
    model = sm_client.create_model(ModelName=model_name,
                                   PrimaryContainer=primary_container,
                                   ExecutionRoleArn=sagemaker_role)
else:
    print(f"Model with name {model_name} already exists! Change model name to create new")

In [9]:
# let's make the endpoint configuration, by using the Boto3 create_endpoint_config method. 
# The main inputs to the create_endpoint_config method are the endpoint 
# configuration name and variant information, such as inference instance type and count, 
# the name of the model to be deployed, and the traffic share the endpoint should handle.

# Endpoint Config name
endpoint_config_name = f"{model_name}-endpoint-config"

# Endpoint config parameters
production_variant_dict = {
                           "VariantName": "Alltraffic",
                           "ModelName": model_name,
                           "InitialInstanceCount": 1,
                           "InstanceType": "ml.m5.xlarge",
                           "InitialVariantWeight": 1
                          }

# Data capture config parameters
data_capture_config_dict = {
                            "EnableCapture": True,
                            "InitialSamplingPercentage": 100,
                            "DestinationS3Uri": data_capture_uri,
                            "CaptureOptions": [{"CaptureMode" : "Input"}, {"CaptureMode" : "Output"}]
                           }


# Create endpoint config if one with the same name does not exist
endpoint_config_matches = sm_client.list_endpoint_configs(NameContains=endpoint_config_name)["EndpointConfigs"]
if not endpoint_config_matches:
    endpoint_config_response = sm_client.create_endpoint_config(
                                                                EndpointConfigName=endpoint_config_name,
                                                                ProductionVariants=[production_variant_dict],
                                                                DataCaptureConfig=data_capture_config_dict
                                                               )
else:
    print(f"Endpoint config with name {endpoint_config_name} already exists! Change endpoint config name to create new")


In [10]:
# let's create the endpoint. 
# The create_endpoint method takes the endpoint configuration as a parameter, 
# and deploys the model specified in the endpoint configuration to a compute instance. 

endpoint_name = f"{model_name}-endpoint"

endpoint_matches = sm_client.list_endpoints(NameContains=endpoint_name)["Endpoints"]
if not endpoint_matches:
    endpoint_response = sm_client.create_endpoint(
                                                  EndpointName=endpoint_name,
                                                  EndpointConfigName=endpoint_config_name
                                                 )
else:
    print(f"Endpoint with name {endpoint_name} already exists! Change endpoint name to create new")

resp = sm_client.describe_endpoint(EndpointName=endpoint_name)
status = resp["EndpointStatus"]
while status == "Creating":
    print(f"Endpoint Status: {status}...")
    time.sleep(60)
    resp = sm_client.describe_endpoint(EndpointName=endpoint_name)
    status = resp["EndpointStatus"]
print(f"Endpoint Status: {status}")

Endpoint Status: Creating...
Endpoint Status: Creating...
Endpoint Status: Creating...
Endpoint Status: InService


## Step 3: Invoke The Inference Endpoint

In [11]:
# Fetch test data to run predictions with the endpoint
test_df = pd.read_csv(test_data_uri)

# For content type text/csv, payload should be a string with commas separating the values for each feature
# This is the inference request serialization step
# CSV serialization
csv_file = io.StringIO()
test_sample = test_df.drop(["fraud"], axis=1).iloc[:5]
test_sample.to_csv(csv_file, sep=",", header=False, index=False)
payload = csv_file.getvalue()
response = sm_runtime_client.invoke_endpoint(
                                             EndpointName=endpoint_name,
                                             Body=payload,
                                             ContentType="text/csv",
                                             Accept="text/csv"
                                            )

# This is the inference response deserialization step
# This is a bytes object
result = response["Body"].read()
# Decoding bytes to a string
result = result.decode("utf-8")
# Converting to list of predictions
result = re.split(",|\n",result)

prediction_df = pd.DataFrame()
prediction_df["Prediction"] = result[:5]
prediction_df["Label"] = test_df["fraud"].iloc[:5].values
prediction_df 

Unnamed: 0,Prediction,Label
0,0.0224366653710603,0
1,0.0224366653710603,0
2,0.0799826979637146,0
3,0.1393152326345443,0
4,0.0311235189437866,0


In [12]:
# Because data capture was set up in the endpoint configuration, you have a way to inspect 
# what payload was sent to the endpoint alongside its response. The captured data takes some 
# time to get fully uploaded to S3.

from sagemaker.s3 import S3Downloader
print("Waiting for captures to show up", end="")
for _ in range(90):
    capture_files = sorted(S3Downloader.list(f"{data_capture_uri}/{endpoint_name}"))
    if capture_files:
        capture_file = S3Downloader.read_file(capture_files[-1]).split("\n")
        capture_record = json.loads(capture_file[0])
        if "inferenceId" in capture_record["eventMetadata"]:
            break
    print(".", end="", flush=True)
    time.sleep(1)
print()
print(f"Found {len(capture_files)} Data Capture Files:")

Waiting for captures to show up..........................................................................................
Found 1 Data Capture Files:


In [13]:
# The captured data is stored as a separate file for each endpoint invocation in S3 in JSON Lines,
# a newline-delimited format to store structured data where each line is a JSON value. 
# Let's retrieve the data capture files: 

capture_files = sorted(S3Downloader.list(f"{data_capture_uri}/{endpoint_name}"))
capture_file = S3Downloader.read_file(capture_files[0]).split("\n")
capture_record = json.loads(capture_file[0])
capture_record

{'captureData': {'endpointInput': {'observedContentType': 'text/csv',
   'mode': 'INPUT',
   'data': 'MiwwLDAsMSw1ODQwMCwxNDI0Ny43NjY4NjY2NzA4NSw3MjY0Ny43NjY4NjY2NzA4NSw3LDI5LDAsMTEsNDEsMTA4LDAsMSw3NTAsMzAwMCwwLDIsMjAxNCwwLDAsMSwwLDAsMSwwLDAsMCwxLDAsMCwwLDAsMSwwLDAsMSwwLDEsMCwwLDAsMCwwLDAsMCwxCjIsMCwyLDAsMTE1MDAsMTA2NzUuNjcxMzQ2NzM4MTU4LDIyMTc1LjY3MTM0NjczODE2LDExLDI4LDMsMTYsNTQsMTcxLDAsMSw3NTAsMjc1MCwyLDMsMjAxNSwwLDAsMCwwLDEsMSwwLDAsMCwxLDAsMCwwLDAsMSwwLDAsMSwwLDEsMCwwLDAsMCwwLDAsMCwxCjIsMCwwLDEsMTg1MDAsMTAyMDIuMjY2MzUzOTk5Nzc2LDI4NzAyLjI2NjM1Mzk5OTc3LDEsNiw2LDgsNTEsMTk2LDAsMSw3NTAsMzAwMCwwLDEsMjAxMSwwLDAsMCwwLDEsMSwwLDAsMSwwLDAsMCwwLDEsMCwwLDAsMSwwLDEsMCwwLDAsMCwwLDAsMSwwCjEsMCwwLDAsMTYzMDAsOTMzOC4zNDgwNjY0MzU3MTgsMjU2MzguMzQ4MDY2NDM1NzIsOSwxLDYsMyw1NywxMjIsMCwxLDc1MCwzMDAwLDEsMSwyMDE1LDAsMCwwLDEsMCwwLDEsMCwwLDAsMSwwLDAsMCwxLDAsMCwxLDAsMSwwLDAsMCwwLDAsMCwxLDAKMiwxLDAsMCwxNDcwMCwyODE0NS45MjQ5OTQxODIyNSw0Mjg0NS45MjQ5OTQxODIyNSwxMCwyMywyLDE1LDU5LDE0MiwwLDEsNzUwLDMwMDAsMiwzLDIwMTksMCwwLD

In [14]:
# Let's decode the data in the captured files using base64. 
# The code retrieves the five test samples that were sent in as payload, and their predictions. 
# This feature is useful in inspecting endpoint loads with model responses and monitor the model performance.

input_data = capture_record["captureData"]["endpointInput"]["data"]
output_data = capture_record["captureData"]["endpointOutput"]["data"]
input_data_list = base64.b64decode(input_data).decode("utf-8").split("\n")
print(input_data_list)
output_data_list = base64.b64decode(output_data).decode("utf-8").split("\n")
print(output_data_list)


['2,0,0,1,58400,14247.76686667085,72647.76686667085,7,29,0,11,41,108,0,1,750,3000,0,2,2014,0,0,1,0,0,1,0,0,0,1,0,0,0,0,1,0,0,1,0,1,0,0,0,0,0,0,0,1', '2,0,2,0,11500,10675.671346738158,22175.67134673816,11,28,3,16,54,171,0,1,750,2750,2,3,2015,0,0,0,0,1,1,0,0,0,1,0,0,0,0,1,0,0,1,0,1,0,0,0,0,0,0,0,1', '2,0,0,1,18500,10202.266353999776,28702.26635399977,1,6,6,8,51,196,0,1,750,3000,0,1,2011,0,0,0,0,1,1,0,0,1,0,0,0,0,1,0,0,0,1,0,1,0,0,0,0,0,0,1,0', '1,0,0,0,16300,9338.348066435718,25638.34806643572,9,1,6,3,57,122,0,1,750,3000,1,1,2015,0,0,0,1,0,0,1,0,0,0,1,0,0,0,1,0,0,1,0,1,0,0,0,0,0,0,1,0', '2,1,0,0,14700,28145.92499418225,42845.92499418225,10,23,2,15,59,142,0,1,750,3000,2,3,2019,0,0,1,0,0,1,0,0,1,0,0,0,0,1,0,0,0,0,1,0,0,0,0,0,1,0,1,0', '']
['0.02243666537106037', '0.02243666537106037', '0.0799826979637146', '0.13931523263454437', '0.03112351894378662', '']


## Step 4: Configure Auto Scaling For Endpoint

Workloads that use Real-Time Inference endpoints usually have low latency requirements. Furthermore, when traffic spikes, Real-Time Inference endpoints can experience CPU overload, high latency, or timeouts. Therefore, it's important to scale capacity to handle traffic changes efficiently with low latency. SageMaker inference auto scaling monitors your workloads and dynamically adjusts the instance count to maintain steady and predictable endpoint performance at a low cost. When the workload increases, auto scaling brings more instances online, and when the workload decreases, it removes unnecessary instances, helping you reduce your compute cost. In this tutorial, we use the AWS SDK - Boto3 to set up auto scaling to the endpoint. SageMaker provides multiple types of autoscaling: target tracking scaling, step scaling, on-demand scaling, and scheduled scaling. In this tutorial, we will use a target tracking scaling policy, which is triggered when a chosen scaling metric increases over a chosen target threshold.


Auto scaling can be set up in two steps. First, we configure a scaling policy with details of minimum, desired, and maximum number of instances per endpoint. The specified maximum number of instances are launched when traffic goes over chosen thresholds, which we choose in the next step.

In [15]:
resp = sm_client.describe_endpoint(EndpointName=endpoint_name)

# SageMaker expects resource id to be provided with the following structure
resource_id = f"endpoint/{endpoint_name}/variant/{resp['ProductionVariants'][0]['VariantName']}"

# Scaling configuration
scaling_config_response = sm_autoscaling_client.register_scalable_target(
                                                          ServiceNamespace="sagemaker",
                                                          ResourceId=resource_id,
                                                          ScalableDimension="sagemaker:variant:DesiredInstanceCount", 
                                                          MinCapacity=1,
                                                          MaxCapacity=2
                                                        )

In [16]:
# Let's create scaling policy.
# The chosen scaling metric is SageMakerVariantInvocationsPerInstance, 
# which is the average number of times per minute that each inference instance 
# for a model variant is invoked. When this number crosses the chosen threshold of 5, 
# the auto scaling is triggered.

# Create Scaling Policy
policy_name = f"scaling-policy-{endpoint_name}"
scaling_policy_response = sm_autoscaling_client.put_scaling_policy(
                                                PolicyName=policy_name,
                                                ServiceNamespace="sagemaker",
                                                ResourceId=resource_id,
                                                ScalableDimension="sagemaker:variant:DesiredInstanceCount",
                                                PolicyType="TargetTrackingScaling",
                                                TargetTrackingScalingPolicyConfiguration={
                                                    "TargetValue": 5.0, # Target for avg invocations per minutes
                                                    "PredefinedMetricSpecification": {
                                                        "PredefinedMetricType": "SageMakerVariantInvocationsPerInstance",
                                                    },
                                                    "ScaleInCooldown": 600, # Duration in seconds until scale in
                                                    "ScaleOutCooldown": 60 # Duration in seconds between scale out
                                                }
                                            )

In [17]:
# Let's retrieve the scaling policy details:

response = sm_autoscaling_client.describe_scaling_policies(ServiceNamespace="sagemaker")

pp = pprint.PrettyPrinter(indent=4, depth=4)
for i in response["ScalingPolicies"]:
    pp.pprint(i["PolicyName"])
    print("")
    if("TargetTrackingScalingPolicyConfiguration" in i):
        pp.pprint(i["TargetTrackingScalingPolicyConfiguration"])

'scaling-policy-fraud-detect-xgb-endpoint'

{   'PredefinedMetricSpecification': {   'PredefinedMetricType': 'SageMakerVariantInvocationsPerInstance'},
    'ScaleInCooldown': 600,
    'ScaleOutCooldown': 60,
    'TargetValue': 5.0}


In [18]:
# Let's make a stress-test for the endpoint. The code runs for 250 seconds and invokes 
# the endpoint repeatedly by sending randomly selected samples from the test dataset.

request_duration = 250
end_time = time.time() + request_duration
print(f"Endpoint will be tested for {request_duration} seconds")
while time.time() < end_time:
    csv_file = io.StringIO()
    test_sample = test_df.drop(["fraud"], axis=1).iloc[[np.random.randint(0, test_df.shape[0])]]
    test_sample.to_csv(csv_file, sep=",", header=False, index=False)
    payload = csv_file.getvalue()
    response = sm_runtime_client.invoke_endpoint(
                                                 EndpointName=endpoint_name,
                                                 Body=payload,
                                                 ContentType="text/csv"
                                                )

Endpoint will be tested for 250 seconds


When the endpoint receives the increased payload, we can check the status of the endpoint by running the code below. This code checks when the status of the endpoint changes from InService to Updating and keeps track of the instance counts. After a few minutes, we can see the status changing from InService to Updating and back to InService but with a higher instance count.

In [20]:
# Check the instance counts after the endpoint gets more load
response = sm_client.describe_endpoint(EndpointName=endpoint_name)
endpoint_status = response["EndpointStatus"]
request_duration = 250
end_time = time.time() + request_duration
print(f"Waiting for Instance count increase for a max of {request_duration} seconds. Please re run this cell in case the count does not change")
while time.time() < end_time:
    response = sm_client.describe_endpoint(EndpointName=endpoint_name)
    endpoint_status = response["EndpointStatus"]
    instance_count = response["ProductionVariants"][0]["CurrentInstanceCount"]
    print(f"Status: {endpoint_status}")
    print(f"Current Instance count: {instance_count}")
    if (endpoint_status=="InService") and (instance_count>1):
        break
    else:
        time.sleep(15)

Waiting for Instance count increase for a max of 250 seconds. Please re run this cell in case the count does not change
Status: Updating
Current Instance count: 1
Status: Updating
Current Instance count: 1
Status: Updating
Current Instance count: 1
Status: Updating
Current Instance count: 1
Status: Updating
Current Instance count: 1
Status: Updating
Current Instance count: 1
Status: Updating
Current Instance count: 1
Status: Updating
Current Instance count: 1
Status: Updating
Current Instance count: 1
Status: InService
Current Instance count: 2


## Step 5: Clean Up The Resources

In [None]:
# Delete model
sm_client.delete_model(ModelName=model_name)

# Delete endpoint configuration
sm_client.delete_endpoint_config(EndpointConfigName=endpoint_config_name)

# Delete endpoint
sm_client.delete_endpoint(EndpointName=endpoint_name)


#### To delete the S3 bucket, do the following:

* Open the Amazon S3 console. On the navigation bar, choose Buckets, sagemaker /your-Region/your-account-id/, and then select the checkbox next to fraud-detect-demo. Then, choose Delete.

* On the Delete objects dialog box, verify that you have selected the proper object to delete and enter permanently delete into the Permanently delete objects confirmation box.

* Once this is complete and the bucket is empty, you can delete the sagemaker/your-Region/<your-account-id/ bucket by following the same procedure again.

#### To Delete Domain:

* Open the CloudFromation console. In the CloudFormation pane, choose Stacks. From the status dropdown list, select Active. Under Stack name, choose CFN-SM-IM-Lambda-catalog to open the stack details page.

* On CFN-SM-IM-Lambda-catalog stack details page, choose Delete to delete the stack along with the resources it created at the beginning.