# Benchmark autoscaling RoBERTa base model using Amazon SageMaker Multi-model endpoints (MME) with GPU support

Amazon SageMaker multi-model endpoints with GPU works using NVIDIA Triton Inference Server. NVIDIA Triton Inference Server is open-source inference serving software that simplifies the inference serving process and provides high inference performance. Triton supports all major training and inference frameworks, such as TensorFlow, NVIDIA TensorRT, PyTorch, MXNet, Python, ONNX, XGBoost, scikit-learn, RandomForest, OpenVINO, custom C++, and more. It offers dynamic batching, concurrent execution, post-training quantization, optimal model configuration to achieve high performance inference.

In this notebook, we are going to run benchmark testing for the most popluar NLP models using MME on GPU. We will evaluate model performance such as the inference latency, throughput, and optimum model count per instance. We will also compile these models using NVIDA TensorRT to compare performance against TorchScript models.

This notebook is tested on `PyTorch 1.12 Python 3.8 CPU Optimized` kernel on SageMaker Studio. An instance with at least 8 vCPU cores such as an `ml.c5.2xlarge` is recommended to run the load test. A smaller instance may be utilized by reducing the scale of the load test. The configuration provide here simulates up to 200 concurrent workers      |

## Set up the environment

Installs the dependencies required to package the model and run inferences using Triton server.

Also define the IAM role that will give SageMaker access to the model artifacts and the NVIDIA Triton ECR image.

In [None]:
%pip install timm -Uqq
%pip install transformers -Uqq
%pip install locust -Uqq
%pip install boto3 -Uqq
%pip install sagemaker -Uqq
%pip install matplotlib -Uqq
%pip install Jinja2 -Uqq
%pip install ipywidgets -Uqq

In [None]:
%%capture
import IPython

IPython.Application.instance().kernel.do_shutdown(True)  # has to restart kernel so changes are used

In [None]:
%env TOKENIZERS_PARALLELISM=False

In [None]:
import sagemaker
from sagemaker import get_execution_role
import torch
from pathlib import Path

import boto3
import json
from pathlib import Path
import time
import datetime as dt
import warnings

from utils import model_utils

role = get_execution_role()
sess = sagemaker.Session()

account = sess.account_id()
bucket = sess.default_bucket() # or use your own custom bucket name
prefix = 'mme-roberta-base-benchmark'

use_case = "nlp"

sm_client = boto3.client(service_name="sagemaker")
runtime_sm_client = boto3.client("sagemaker-runtime")

tested_models = ["roberta-base"]

model_name = "roberta-base" #change the model name to benchmark different NLP models

max_seq_len = 128

Account Id Mapping for triton inference containers

In [None]:
account_id_map = {
    'us-east-1': '785573368785',
    'us-east-2': '007439368137',
    'us-west-1': '710691900526',
    'us-west-2': '301217895009',
    'eu-west-1': '802834080501',
    'eu-west-2': '205493899709',
    'eu-west-3': '254080097072',
    'eu-north-1': '601324751636',
    'eu-south-1': '966458181534',
    'eu-central-1': '746233611703',
    'ap-east-1': '110948597952',
    'ap-south-1': '763008648453',
    'ap-northeast-1': '941853720454',
    'ap-northeast-2': '151534178276',
    'ap-southeast-1': '324986816169',
    'ap-southeast-2': '355873309152',
    'cn-northwest-1': '474822919863',
    'cn-north-1': '472730292857',
    'sa-east-1': '756306329178',
    'ca-central-1': '464438896020',
    'me-south-1': '836785723513',
    'af-south-1': '774647643957'
}

In [None]:
region = boto3.Session().region_name
if region not in account_id_map.keys():
    raise("UNSUPPORTED REGION")

## Generate Pretrained Models

We are going to use the following SageMaker Processing script to generate our pretrained model. This script does the following:

1. Generate a model using the Pytorch Hub

2. jit script the model and save the torchscript file

3. Create a model artifact which is comprised of the torchscript file and a model configuration (config.pbtxt) for Triton serving

Helper functions have been created for each of these steps and are imported from the `utils.model_utils` local module

In [None]:
if model_name in tested_models:
    tokenizer, model = model_utils.get_model_from_hf_hub(model_name)
else:
    warnings.warn(f"{model_name} has not been tested and may not work")
    tokenizer, model = model_utils.get_model_from_hf_hub(model_name)
model.eval()

print(f"loaded model {model_name} with {model_utils.count_parameters(model)} parameters")

example_input = tokenizer("This is a sample", padding="max_length", max_length=max_seq_len, return_tensors="pt")

## Packaging Pytorch model for Triton sever on SageMaker

**Note**: SageMaker expects the model tarball file to have a top level directory with the same name as the model defined in the `config.pbtxt`.

```
model_name
├── 1
│   └── model.pt
└── config.pbtxt
```

We will be tracing an existing RoBERTa base model for the purpose of converting PyTorch modules to TorchScript - PyTorch high-performance deployment runtime. 

[This tutorial](https://pytorch.org/tutorials/beginner/Intro_to_TorchScript_tutorial.html) is an introduction to TorchScript, an intermediate representation of a PyTorch model (subclass of nn.Module) that can then be run in a high-performance environment such as C++.

In [None]:
pytorch_model_path = Path(f"triton-serve-pt/{model_name}/1")
pytorch_model_path.mkdir(parents=True, exist_ok=True)
pt_model_path = model_utils.export_pt_jit(model, list(example_input.values()), pytorch_model_path) #export jit compiled model to specified directory

<div class="alert alert-info"> <strong> Note: </strong>
Based on the architecture of the model we will generate a Triton configuration (config.pbtxt) file. This approach should work for most models but you may need to make adjustments to the generated config. Additionally a base model is assumed that will return the output from the last hidden state. If using a different output head such as a sequence classification, adjust the triton_outputs variable below.
</div>

In [None]:
#get input names 
triton_inputs = [
    {"name": input_name, "data_type": "TYPE_INT32", "dims": f"[{max_seq_len}]"}
    for input_name in example_input
]
triton_outputs = [
    {
        "name": "last_hidden_state",
        "data_type": "TYPE_FP32",
        "dims": f"[{max_seq_len}, {model.config.hidden_size}]",
    }
]

In [None]:
triton_config_path = model_utils.generate_triton_config(platform="pt", triton_inputs=triton_inputs,  triton_outputs=triton_outputs, save_path=pytorch_model_path)
triton_config_path

In [None]:
# We'll package a model config template along with the compiled model into a model.tar.gz artifact. 
# The config templates assume batch size of 32 and sequence length of 128
# You may need to adjust the template if not using one of the tested models
model_atifact_path = model_utils.package_triton_model(model_name, pt_model_path, triton_config_path)

In [None]:
mme_path = f"s3://{bucket}/{prefix}/{model_name}/"
initial_model_path = sess.upload_data(model_atifact_path.as_posix(), bucket=bucket, key_prefix=f"{prefix}{model_name}")

In [None]:
initial_model_path

In [None]:
mme_path

We make sure there are no models located in the Multi Model Endpoint path

In [None]:
! aws s3 rm --recursive {mme_path}

In [None]:
!aws s3 ls {mme_path}

## Create a SageMaker Multi-Model Endpoint for PyTorch Model

In [None]:
from utils.endpoint_utils import create_endpoint, delete_endpoint, get_instance_utilization, run_autoscaling_load_test

base = "amazonaws.com.cn" if region.startswith("cn-") else "amazonaws.com"
mme_triton_image_uri = f"{account_id_map[region]}.dkr.ecr.{region}.{base}" + \
            "/sagemaker-tritonserver:22.10-py3"
print(mme_triton_image_uri)
instance_type = 'ml.g4dn.xlarge'

In [None]:
container = {
    "Image": mme_triton_image_uri,
    "ModelDataUrl": mme_path,
    "Mode": "MultiModel"
}

We'll deploy and endpoint is deployed using a helper function

In [None]:
sm_model_name, endpoint_config_name, endpoint_name = create_endpoint(sm_client, model_name, role, container, instance_type, "pt")

Next we'll upload a python model that we can use to query the instance utilization in real time

In [None]:
!tar czvf metrics.tar.gz server_metrics/
!aws s3 cp metrics.tar.gz {mme_path}

In [None]:
!aws s3 ls {mme_path}

In [None]:
get_instance_utilization(runtime_sm_client, endpoint_name) #invoke once to load the python model in memory

## Load PyTorch Models into Endpoint

In this section we will invoke the Endpoint to make sure it is working and returning predictions and then load 100 models into the Endpoint

In [None]:
payload = {
    "inputs":
        [{"name": name, "shape": list(data.size()), "datatype": "INT32", "data": data.tolist()} for name, data in example_input.items()]
}
payload['inputs'][0]['shape']

In [None]:
!aws s3 cp roberta-base.tar.gz {mme_path}

#### Invoke the Endpoint to make sure it is working and returning predictions 

In [None]:
targetModel="roberta-base.tar.gz"
print(f"invoking endpoint with traget model: {targetModel}")

response = runtime_sm_client.invoke_endpoint(
            EndpointName=endpoint_name,
            ContentType="application/octet-stream",
            Body=json.dumps(payload),
            TargetModel=targetModel, 
        )
response

#### Load 100 models into the Endpoint

In [None]:
max_models_to_load = 100
models_loaded = 0
while models_loaded < max_models_to_load:
    !aws s3 cp {initial_model_path} {mme_path}{model_name}-v{models_loaded}.tar.gz
    models_loaded = models_loaded+1

#### Invoke the Endpoint to make sure it is working with a copied model and returning predictions 

In [None]:
targetModel=f"{model_name}-v0.tar.gz"
print(f"invoking endpoint with traget model: {targetModel}")

response = runtime_sm_client.invoke_endpoint(
            EndpointName=endpoint_name,
            ContentType="application/octet-stream",
            Body=json.dumps(payload),
            TargetModel=targetModel, 
        )
response

In [None]:
!aws s3 ls {mme_path}

In [None]:
models_loaded

## Add Auto Scaling policy

Currently we are setting a very low threshold of no of invocations per instance as 1 so it will start the scale up almost right away. For production loads this needs to be tuned and set appropriately

In [None]:
scaling_client = boto3.client(
    "application-autoscaling"
)  # Common class representing Application Auto Scaling for SageMaker amongst other services

resource_id = (
    "endpoint/" + endpoint_name + "/variant/" + "AllTraffic" #"variant1"
)  # This is the format in which application autoscaling references the endpoint
print(resource_id)

# Configure Autoscaling on asynchronous endpoint down to zero instances
response = scaling_client.register_scalable_target(
    ServiceNamespace="sagemaker",
    ResourceId=resource_id,
    ScalableDimension="sagemaker:variant:DesiredInstanceCount",
    MinCapacity=1,
    MaxCapacity=4,
)

response = scaling_client.put_scaling_policy(
    PolicyName="Invocations-ScalingPolicy",
    ServiceNamespace="sagemaker",  # The namespace of the AWS service that provides the resource.
    ResourceId=resource_id,  # Endpoint name
    ScalableDimension="sagemaker:variant:DesiredInstanceCount",  # SageMaker supports only Instance Count
    PolicyType="TargetTrackingScaling",  # 'StepScaling'|'TargetTrackingScaling'
    TargetTrackingScalingPolicyConfiguration={
        "TargetValue": 1.0, #0.5, #30,  # 1 or 70 -- > based on your workload
        "PredefinedMetricSpecification": {
            "PredefinedMetricType": "SageMakerVariantInvocationsPerInstance", # is the average number of times per minute that each instance for a variant is invoked. 
        },
        "ScaleInCooldown": 600,  # The cooldown period helps you prevent your Auto Scaling group from launching or terminating
        # additional instances before the effects of previous activities are visible.
        # You can configure the length of time based on your instance startup time or other application needs.
        # ScaleInCooldown - The amount of time, in seconds, after a scale in activity completes before another scale in activity can start.
        "ScaleOutCooldown": 5  # ScaleOutCooldown - The amount of time, in seconds, after a scale out activity completes before another scale out activity can start.
        # 'DisableScaleIn': True|False - ndicates whether scale in by the target tracking policy is disabled.
        # If the value is true , scale in is disabled and the target tracking policy won't remove capacity from the scalable resource.
    },
)
response

## Benchmark Pytorch Model using Locust with autoscaling

`locust_benchmark_sm.py` is provided in the 'locust' folder

<div class="alert alert-info"> <strong> Note: </strong>
The load test is run with up to 20 simulated workers. This may not be suitable for larger models with long response times. You can modify the <code>StagesShape</code> Class in the <code>locust/locust_benchmark_sm.py</code> file to adjust the traffic pattern and the number of concurrent workers
</div>

In [None]:
locust_result_path = Path("results") / model_name / "autoscaling"
locust_result_path.mkdir(parents=True,exist_ok=True)
locust_result_path

In [None]:
%%time
output_path = (locust_result_path / f"{instance_type}*pt*{models_loaded}") # capture the instance type, engine, and models loaded in file name
run_autoscaling_load_test(endpoint_name, use_case, model_name, models_loaded, output_path, print_stdout=True, n_procs=6, sample_payload=json.dumps(payload))

## Plot Metrics

In [None]:
import datetime
from datetime import datetime, timedelta
import pandas as pd
import numpy as np

cw = boto3.Session().client("cloudwatch")

def get_sagemaker_utilization_metrics(
    endpoint_name,
    endpoint_config_name,
    variant_name,
    metric_name,
    statistic,
    start_time,
    end_time,
    period=1
):
    dimensions = [
        {"Name": "EndpointName", "Value": endpoint_name},
        {"Name": "VariantName", "Value": variant_name},
    ]
    if endpoint_config_name is not None:
        dimensions.append({"Name": "EndpointConfigName", "Value": endpoint_config_name})
        
    if metric_name in ["CPUUtilization", "MemoryUtilization", "DiskUtilization", "LoadedModelCount",]:
        namespace = "/aws/sagemaker/Endpoints"
    else:
        namespace = "AWS/SageMaker"
    metrics = cw.get_metric_statistics(
        Namespace=namespace, #"aws/sagemaker/Endpoints",
        MetricName=metric_name,
        StartTime=start_time,
        EndTime=end_time,
        Period=period, #1, #period, #60,  # 1,#60,
        Statistics=[statistic],
        Dimensions=dimensions,
    )
    rename = endpoint_config_name if endpoint_config_name is not None else "ALL"
    #print(metrics)
    df = pd.DataFrame(metrics["Datapoints"])
    if df.empty:
        print(f"EmptyDF:UTIL:: metric_name={metric_name}::statistic={statistic}::period={period}::namespace={namespace}::")
        return df

    return (
        df.sort_values("Timestamp")
        .set_index("Timestamp")
        .drop(["Unit"], axis=1)
        .rename(columns={statistic: rename})
    )


def get_sagemaker_metrics(
    endpoint_name,
    endpoint_config_name,
    variant_name,
    metric_name,
    statistic,
    start_time,
    end_time,
):
    dimensions = [
        {"Name": "EndpointName", "Value": endpoint_name},
        {"Name": "VariantName", "Value": variant_name},
    ]
    if endpoint_config_name is not None:
        dimensions.append({"Name": "EndpointConfigName", "Value": endpoint_config_name})
        
    if metric_name in ["CPUUtilization", "MemoryUtilization", "DiskUtilization"]:
        namespace = "/aws/sagemaker/Endpoints"
    else:
        namespace = "AWS/SageMaker"

    print(f"Metrics:namespace={namespace}::metric_name={metric_name}::statistic={statistic}:dimensions={dimensions}:endpoint_name={endpoint_name}:")
    metrics = cw.get_metric_statistics(
        Namespace=namespace,
        MetricName=metric_name,
        StartTime=start_time,
        EndTime=end_time,
        Period=60, # 1,#60,
        Statistics=[statistic,],
        Dimensions=dimensions,
    )
    rename = endpoint_config_name if endpoint_config_name is not None else "ALL"
    # print(metrics)
    df = pd.DataFrame(metrics["Datapoints"])
    if df.empty:
        print(f"EmptyDF:CUST: metric_name={metric_name}::statistic={statistic}::")
        return df

    return (
        df.sort_values("Timestamp")
        .set_index("Timestamp")
        .drop(["Unit"], axis=1)
        .rename(columns={statistic: rename})
    )


def plot_endpoint_model_latency_metrics(
    endpoint_name,
    endpoint_config_name,
    variant_name,
    start_time=None,
    end_time=datetime.now(),
    metric_name="ModelLatency",
    statistic="Average",
):
    start_time = start_time or datetime.now() - timedelta(minutes=60)
    # end_time = datetime.now()
    # metric_name = "ModelLatency"
    # statistic = "Average"
    metrics_variants = get_sagemaker_metrics(
        endpoint_name,
        endpoint_config_name,
        variant_name,
        metric_name,
        statistic,
        start_time,
        end_time,
    )
    if metrics_variants.empty:
        print(
            f"NO RESULTS for metric_name={metric_name}::statistic={statistic}::start_time={start_time}:: end_time={end_time}:: endpoint_name={endpoint_name}: "
        )
        return
    metrics_variants.plot(title=f"{metric_name}-{statistic}")
    return metrics_variants

### Get total invocations per minute

In [None]:
start_time = datetime.now() - timedelta(minutes=22)  # - 60
end_time = datetime.now()# - timedelta(minutes=30)  # - 30, 660, datetime.now() #-- minutes=30
metric_name = "Invocations" #"Invocations", #"InvocationsPerInstance"  # "ModelLatency"
statistics = "Sum" #"Sum"  # "Maximum" #"Average"
invocations_metrics = plot_endpoint_model_latency_metrics(
    endpoint_name, None, "AllTraffic", start_time, end_time, metric_name, statistics
)

### Get invocations per instance, per minute

In [None]:
start_time = datetime.now() - timedelta(minutes=22)  # - 60
end_time = datetime.now()# - timedelta(minutes=30)  # - 30, 660, datetime.now() #-- minutes=30
metric_name = "InvocationsPerInstance" #"Invocations", #"InvocationsPerInstance"  # "ModelLatency"
statistics = "Sum" #"Sum"  # "Maximum" #"Average"
invocations_per_instance_metrics = plot_endpoint_model_latency_metrics(
    endpoint_name, None, "AllTraffic", start_time, end_time, metric_name, statistics
)

### Get number of instances at any point of time

In [None]:
start_time = datetime.now() - timedelta(minutes=22)  # - 60
end_time = datetime.now()# - timedelta(minutes=30)  # - 30, 660, datetime.now() #-- minutes=30
metric_name = "CPUUtilization" #"InvocationsPerInstance"  # "ModelLatency"
statistic = "SampleCount" #"Sum"  # "Maximum" #"Average"
# -- SampleCount, Average, Sum, Minimum, Maximum]
insatcnes_count = get_sagemaker_utilization_metrics(
        endpoint_name=endpoint_name,
        endpoint_config_name=None,
        variant_name="AllTraffic",
        metric_name=metric_name,
        statistic=statistic,
        start_time=start_time,
        end_time=end_time,
        period=1
    )
insatcnes_count[:-1]

### Get loaded model count

In [None]:
start_time = datetime.now() - timedelta(minutes=22)  # - 60
end_time = datetime.now()# - timedelta(minutes=30)  # - 30, 660, datetime.now() #-- minutes=30
metric_name = "LoadedModelCount" #"InvocationsPerInstance"  # "ModelLatency"
statistic = "Sum" #"SampleCount" #"Sum"  # "Maximum" #"Average"
# -- SampleCount, Average, Sum, Minimum, Maximum]
loaded_model_count = get_sagemaker_utilization_metrics(
        endpoint_name=endpoint_name,
        endpoint_config_name=None,
        variant_name="AllTraffic",
        metric_name=metric_name,
        statistic=statistic,
        start_time=start_time,
        end_time=end_time,
        period=1
    )
loaded_model_count

### Clean up auto scaling

In [None]:
scaling_client = boto3.client(
    "application-autoscaling"
)  # Common class representing Application Auto Scaling for SageMaker amongst other services

response = scaling_client.deregister_scalable_target(
    ServiceNamespace="sagemaker",
    ResourceId=resource_id,
    ScalableDimension="sagemaker:variant:DesiredInstanceCount",
)
response

## Clean Up PyTorch Endpoint

In [None]:
delete_endpoint(sm_client, sm_model_name, endpoint_config_name, endpoint_name)

In [None]:
!aws s3 rm --recursive {mme_path}