# Load Testing using Locust

---

모델 배포는 모델 서빙의 첫 단추로 프로덕션 배포 시에 고려할 점들이 많습니다. 예를 들어, 특정 이벤트로 인해 갑자기 동시 접속자가 증가해서 트래픽이 몰릴 수 있죠. SageMaker는 관리형 서비스이니만큼 오토스케일링 policy를 손쉽게 구성할 수 있지만, 비용 최적화 관점에서 최적의 인스턴스 종류와 개수를 정하는 것은 쉽지 않습니다. 따라서, 로드 테스트를 통해 엔드포인트가 처리할 수 있는 RPS(Request Per Second; 동시 초당 접속자)를 파악하는 것이 중요하며, 이를 위해 자체 테스트 툴킷을 개발하거나 오픈소스 툴킷을 사용합니다. (또한, re:Invent 2021에 소개된 신규 서비스인 SageMaker Inference Recommender를 사용하여 로드 테스트를 API 호출로 편리하게 수행할 수 있습니다.)

본 노트북에서는 Locust (https://docs.locust.io/en/stable/) 를 사용하여 간단한 로드 테스트를 수행해 보겠습니다. Locust는 Python으로 테스트 스크립트를 빠르게 작성할 수 있고 파라메터들이 직관적이라 빠르게 로드 테스트 환경을 구축하고 실행할 수 있습니다.

완료 시간은 **10-20분** 정도 소요됩니다. 


### 목차
- [1. Create Locust Script](#1.-Create-Locust-Script)
- [2. Load Testing](#2.-Load-Testing)

<div class="alert alert-warning"><h4>주의</h4><p>
아래 코드 셀은 ngrok 토큰을 설정하고, 주피터 노트북 커널을 셧다운시킵니다. <a href='https://ngrok.com/'>https://ngrok.com/</a> 에서 회원 가입 후, 토큰을 설정해 주시기 바랍니다.
    
노트북 커널이 셧다운된다면, 아래 코드 셀에서 <b><font color='darkred'>setup_needed = False</font></b>로 변경 후, 코드 셀을 다시 실행해 주세요. 이 작업은 한 번만 수행하면 됩니다. 
</p></div>

회원 가입 후, 로그인하면 아래와 같은 화면이 출력됩니다. **2. Connect your account** 에서 `ngrok authtoken [YOUR-TOKEN]`의 `[YOUR-TOKEN]`을 아래 코드 셀로 복사하세요.
![ngrok_1](img/ngrok_1.png)

In [1]:
import sys, IPython

install_needed = True
#install_needed = False

if install_needed:
    print("===> Installing deps and restarting kernel. Please change 'install_needed = False' and run this code cell again.")
    !{sys.executable} -m pip install locust pyngrok
    
    
    from pyngrok import ngrok
    print("===> Setting the authtoken. Please change 'setup_needed = False' and run this code cell again.")
    ngrok.set_auth_token("[YOUR-TOKEN]") ##<=== Token 정보를 입력해주세요.
    
    IPython.Application.instance().kernel.do_shutdown(True)

===> Installing deps and restarting kernel. Please change 'install_needed = False' and run this code cell again.
Collecting locust
  Downloading locust-2.24.0-py3-none-any.whl.metadata (7.2 kB)
Collecting pyngrok
  Downloading pyngrok-7.1.3-py3-none-any.whl.metadata (7.6 kB)
Collecting geventhttpclient>=2.0.11 (from locust)
  Downloading geventhttpclient-2.0.11-cp310-cp310-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (1.3 kB)
Collecting ConfigArgParse>=1.5.5 (from locust)
  Downloading ConfigArgParse-1.7-py3-none-any.whl.metadata (23 kB)
Collecting Flask-Login>=0.6.3 (from locust)
  Downloading Flask_Login-0.6.3-py3-none-any.whl.metadata (5.8 kB)
Collecting roundrobin>=0.0.2 (from locust)
  Downloading roundrobin-0.0.4.tar.gz (3.4 kB)
  Preparing metadata (setup.py) ... [?25ldone
Downloading locust-2.24.0-py3-none-any.whl (1.4 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.4/1.4 MB[0m [31m29.2 MB/s[0m eta [36m0

In [19]:
%load_ext autoreload
%autoreload 2
%store -r
%store

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload
Stored variables and their in-db values:
ecr_image_uri              -> '763104351884.dkr.ecr.us-west-2.amazonaws.com/pyto
endpoint_name              -> 'sam-endpoint-2024-03-04-02-05-03'
img_path                   -> '../images/옥택연_원본.jpg'
model_data_url             -> 's3://sagemaker-us-west-2-322537213286/sam/deploy/


# 0. SageMaker Endpoint 생성 (이전 삭제 시 다시 생성하는 작업입니다.)

In [25]:
import boto3
import sagemaker
import time 

sm_client = boto3.client("sagemaker")
smr_client = boto3.client("sagemaker-runtime")
    
sess = sagemaker.Session()
sm_session = sagemaker.session.Session()
bucket = sm_session.default_bucket()
role = sagemaker.get_execution_role()

ecr_image_uri = sagemaker.image_uris.retrieve(
    framework='pytorch', 
    version='2.1.0',
    instance_type='ml.g4dn.2xlarge',
    region='us-west-2', 
    image_scope='inference'
)
prefix = 'sam/deploy'
model_data_url = f's3://{bucket}/{prefix}/model.tar.gz'

In [26]:
import boto3
sm_client = boto3.client('sagemaker')
try:
    sm_client.describe_endpoint(EndpointName=endpoint_name)
except:
    ts = time.strftime("%Y-%m-%d-%H-%M-%S", time.gmtime())
    sm_model_name = f"sam-model-{ts}"
    endpoint_config_name = f"sam-endpoint-config-{ts}"
    endpoint_name = f"sam-endpoint-{ts}"
    
    instance_type='ml.g4dn.xlarge'
    
    container = {
        "Image": ecr_image_uri,
        "ModelDataSource": {
                "S3DataSource": {
                    "S3Uri": model_data_url,
                    "S3DataType": "S3Object",
                    "CompressionType": "Gzip", ## SageMaker Hosting
                },
            },
        "Environment": {}
    }
    create_model_response = sm_client.create_model(
        ModelName=sm_model_name, 
        ExecutionRoleArn=role, 
        PrimaryContainer=container
    )

    create_endpoint_config_response = sm_client.create_endpoint_config(
        EndpointConfigName=endpoint_config_name,
        ProductionVariants=[
            {
                "InstanceType": instance_type,
                "InitialVariantWeight": 1,
                "InitialInstanceCount": 1,
                "ModelName": sm_model_name,
                "VariantName": "AllTraffic",
                # "ContainerStartupHealthCheckTimeoutInSeconds": 600
            }
        ]
    )
    create_endpoint_response = sm_client.create_endpoint(
        EndpointName=endpoint_name, 
        EndpointConfigName=endpoint_config_name
    )
sess.wait_for_endpoint(endpoint_name, poll=5)

-------------------------------------------------!

{'EndpointName': 'sam-endpoint-2024-03-04-02-42-07',
 'EndpointArn': 'arn:aws:sagemaker:us-west-2:322537213286:endpoint/sam-endpoint-2024-03-04-02-42-07',
 'EndpointConfigName': 'sam-endpoint-config-2024-03-04-02-42-07',
 'ProductionVariants': [{'VariantName': 'AllTraffic',
   'DeployedImages': [{'SpecifiedImage': '763104351884.dkr.ecr.us-west-2.amazonaws.com/pytorch-inference:2.1.0-gpu-py310',
     'ResolvedImage': '763104351884.dkr.ecr.us-west-2.amazonaws.com/pytorch-inference@sha256:d6344a1c254a71cae1f39de2d2ec72022e42209ee487ee83dca4a2bdd11bee02',
     'ResolutionTime': datetime.datetime(2024, 3, 4, 2, 42, 9, 8000, tzinfo=tzlocal())}],
   'CurrentWeight': 1.0,
   'DesiredWeight': 1.0,
   'CurrentInstanceCount': 1,
   'DesiredInstanceCount': 1}],
 'EndpointStatus': 'InService',
 'CreationTime': datetime.datetime(2024, 3, 4, 2, 42, 8, 279000, tzinfo=tzlocal()),
 'LastModifiedTime': datetime.datetime(2024, 3, 4, 2, 47, 9, 788000, tzinfo=tzlocal()),
 'ResponseMetadata': {'RequestId': '

## SageMaker Autoscaling 설정

In [27]:
import pprint
import boto3
from sagemaker import get_execution_role
import sagemaker
import json

pp = pprint.PrettyPrinter(indent=4, depth=4)
role = get_execution_role()
sagemaker_client = boto3.Session().client(service_name='sagemaker')
response = sagemaker_client.describe_endpoint(EndpointName=endpoint_name)
pp.pprint(response)

{   'CreationTime': datetime.datetime(2024, 3, 4, 2, 42, 8, 279000, tzinfo=tzlocal()),
    'EndpointArn': 'arn:aws:sagemaker:us-west-2:322537213286:endpoint/sam-endpoint-2024-03-04-02-42-07',
    'EndpointConfigName': 'sam-endpoint-config-2024-03-04-02-42-07',
    'EndpointName': 'sam-endpoint-2024-03-04-02-42-07',
    'EndpointStatus': 'InService',
    'LastModifiedTime': datetime.datetime(2024, 3, 4, 2, 47, 9, 788000, tzinfo=tzlocal()),
    'ProductionVariants': [   {   'CurrentInstanceCount': 1,
                                  'CurrentWeight': 1.0,
                                  'DeployedImages': [{...}],
                                  'DesiredInstanceCount': 1,
                                  'DesiredWeight': 1.0,
                                  'VariantName': 'AllTraffic'}],
    'ResponseMetadata': {   'HTTPHeaders': {   'content-length': '758',
                                               'content-type': 'application/x-amz-json-1.1',
                                

In [28]:
#Let us define a client to play with autoscaling options
client = boto3.client('application-autoscaling') # Common class representing Application Auto Scaling for SageMaker amongst other services
resource_id='endpoint/' + endpoint_name + '/variant/' + 'AllTraffic' # This is the format in which application autoscaling references the endpoint
policy_name = 'Invocations-ScalingPolicy'

response = client.register_scalable_target(
    ServiceNamespace='sagemaker', #
    ResourceId=resource_id,
    ScalableDimension='sagemaker:variant:DesiredInstanceCount',
    MinCapacity=1,
    MaxCapacity=5
)

#Example 1 - SageMakerVariantInvocationsPerInstance Metric
response = client.put_scaling_policy(
    PolicyName=policy_name,
    ServiceNamespace='sagemaker', # The namespace of the AWS service that provides the resource. 
    ResourceId=resource_id, # Endpoint name 
    ScalableDimension='sagemaker:variant:DesiredInstanceCount', # SageMaker supports only Instance Count
    PolicyType='TargetTrackingScaling', # 'StepScaling'|'TargetTrackingScaling'
    TargetTrackingScalingPolicyConfiguration={
        'TargetValue': 10.0, # The target value for the metric. - here the metric is - SageMakerVariantInvocationsPerInstance
        'PredefinedMetricSpecification': {
            'PredefinedMetricType': 'SageMakerVariantInvocationsPerInstance', # is the average number of times per minute that each instance for a variant is invoked. 
        },
        'ScaleInCooldown': 600, # The cooldown period helps you prevent your Auto Scaling group from launching or terminating 
                                # additional instances before the effects of previous activities are visible. 
                                # You can configure the length of time based on your instance startup time or other application needs.
                                # ScaleInCooldown - The amount of time, in seconds, after a scale in activity completes before another scale in activity can start. 
        'ScaleOutCooldown': 300, # ScaleOutCooldown - The amount of time, in seconds, after a scale out activity completes before another scale out activity can start.
        # 'DisableScaleIn': True|False - ndicates whether scale in by the target tracking policy is disabled. 
                            # If the value is true , scale in is disabled and the target tracking policy won't remove capacity from the scalable resource.
    }
)

# #Example 2 - CPUUtilization metric
# response = client.put_scaling_policy(
#     PolicyName='CPUUtil-ScalingPolicy',
#     ServiceNamespace='sagemaker',
#     ResourceId=resource_id,
#     ScalableDimension='sagemaker:variant:DesiredInstanceCount',
#     PolicyType='TargetTrackingScaling',
#     TargetTrackingScalingPolicyConfiguration={
#         'TargetValue': 90.0,
#         'CustomizedMetricSpecification':
#         {
#             'MetricName': 'CPUUtilization',
#             'Namespace': '/aws/sagemaker/Endpoints',
#             'Dimensions': [
#                 {'Name': 'EndpointName', 'Value': endpoint_name },
#                 {'Name': 'VariantName','Value': 'AllTraffic'}
#             ],
#             'Statistic': 'Average', # Possible - 'Statistic': 'Average'|'Minimum'|'Maximum'|'SampleCount'|'Sum'
#             'Unit': 'Percent'
#         },
#         'ScaleInCooldown': 600,
#         'ScaleOutCooldown': 300
#     }
# )

In [29]:
response = client.describe_scaling_policies(
    ServiceNamespace='sagemaker'
)

for i in response['ScalingPolicies']:
    print('')
    pp.pprint(i['PolicyName'])
    print('')
    if('TargetTrackingScalingPolicyConfiguration' in i):
        pp.pprint(i['TargetTrackingScalingPolicyConfiguration']) 
    else:
        pp.pprint(i['StepScalingPolicyConfiguration'])
    print('')


'Invocations-ScalingPolicy'

{   'PredefinedMetricSpecification': {   'PredefinedMetricType': 'SageMakerVariantInvocationsPerInstance'},
    'ScaleInCooldown': 600,
    'ScaleOutCooldown': 300,
    'TargetValue': 10.0}



## [Optional] Scaling 설정 제거

In [31]:
# response = client.deregister_scalable_target(
#     ServiceNamespace='sagemaker', #
#     ResourceId=resource_id,
#     ScalableDimension='sagemaker:variant:DesiredInstanceCount'
# )

# response = client.describe_scaling_policies(
#     ServiceNamespace='sagemaker'
# )

# for i in response['ScalingPolicies']:
#     print('')
#     pp.pprint(i['PolicyName'])
#     print('')
#     if('TargetTrackingScalingPolicyConfiguration' in i):
#         pp.pprint(i['TargetTrackingScalingPolicyConfiguration']) 
#     else:
#         pp.pprint(i['StepScalingPolicyConfiguration'])
#     print('')

In [30]:
## scaling policy 삭제

# response = client.delete_scaling_policy(
#     ServiceNamespace='sagemaker', #
#     ResourceId=resource_id,
#     ScalableDimension='sagemaker:variant:DesiredInstanceCount',
#     PolicyName=policy_name,
# )

<br>

# 1. Create Locust Script
---

아래 코드 셀은 Locust 기반 로드 테스트에 필요한 스크립트를 저장합니다. 
- `config.json`: 로드 테스트에서 사용할 설정값들을 저장합니다.
- `stress.py`: 로드 테스트 시 각 사용자의 객체를 생성하는 스크립트로, `HttpUser` 클래스를 상속받습니다. 이 클래스는 각 사용자에게 client 속성을 부여합니다. 

In [49]:
%%writefile config.json
{
    "contentType": "application/json",
    "showEndpointResponse": 0,
    "dataFile": "../images/옥택연_원본.jpg",
    "numTestSamples": 100
}

Overwriting config.json


In [50]:
import os, json
config_file = "./config.json"
print(f"config_file : {config_file}")
with open(config_file, "r") as c:
    print(json.loads(c.read()))

config_file : ./config.json
{'contentType': 'application/json', 'showEndpointResponse': 0, 'dataFile': '../images/옥택연_원본.jpg', 'numTestSamples': 100}


In [51]:
%%writefile stress.py
import os
import json
import time
import boto3
import io
from io import StringIO
import pandas as pd
from locust import HttpUser, task, events, between

import base64
from PIL import Image
from io import BytesIO
import numpy as np

class SageMakerConfig:

    def __init__(self):
        self.__config__ = None

    @property
    def data_file(self):
        return self.config["dataFile"]

    @property
    def content_type(self):
        return self.config["contentType"]

    @property
    def show_endpoint_response(self):
        return self.config["showEndpointResponse"]
    
    @property
    def num_test_samples(self):
        return self.config["numTestSamples"]

    @property
    def config(self):
        self.__config__ = self.__config__ or self.load_config()
        return self.__config__

    def load_config(self):
        config_file = os.path.join(os.path.dirname(os.path.realpath(__file__)), "config.json")
        print(f"config_file : {config_file}")
        with open(config_file, "r") as c:
            return json.loads(c.read())
    
class SageMakerEndpointTestSet(HttpUser):
    wait_time = between(5, 15)
    
    def __init__(self, parent):
        super().__init__(parent)
        self.config = SageMakerConfig()
        
    def encode_image(self, image):
        buffer = BytesIO()
        image.save(buffer, format="JPEG")
        img_str = base64.b64encode(buffer.getvalue())
        return img_str
     
    def decode_image(self, img):
        img = img.encode("utf8") if type(img) == "bytes" else img
        buff = BytesIO(base64.b64decode(img))
        image = Image.open(buff)
        return image
        
    def on_start(self):
        data_file_full_path = os.path.join(os.path.dirname(__file__), self.config.data_file)
        print(f"data_file_full_path : {data_file_full_path}")
        face_image = Image.open(data_file_full_path)
        encode_face_image = self.encode_image(face_image).decode("utf-8")
        
        f_left, f_top, f_width, f_height = 185, 276, 200, 279
        self.payload = dict(
            encode_face_image = encode_face_image,
            input_box = [f_left, f_top, f_left+f_width, f_top+f_height]
        )

    @task
    def test_invoke(self):
        response = self._locust_wrapper(self._invoke_endpoint, self.payload)
        if self.config.show_endpoint_response:
            print(response["Body"].read().decode("utf-8"))

    
    def _invoke_endpoint(self, payload):
        region = self.client.base_url.split("://")[1].split(".")[2]
        endpoint_name = self.client.base_url.split("/")[-2]
        runtime_client = boto3.client('sagemaker-runtime', region_name=region)

        response = runtime_client.invoke_endpoint(
            EndpointName=endpoint_name,
            Body=json.dumps(payload),
            ContentType=self.config.content_type
        )

        return response
    

    def _locust_wrapper(self, func, *args, **kwargs):
        """
        Locust wrapper so that the func fires the sucess and failure events for custom boto3 client
        :param func: The function to invoke
        :param args: args to use
        :param kwargs:
        :return:
        """
        start_time = time.time()
        try:
            result = func(*args, **kwargs)
            total_time = int((time.time() - start_time) * 1000)
            events.request.fire(request_type="boto3", name="invoke_endpoint", response_time=total_time,
                                        response_length=0)
            
            print(f"result : {result}")
            return result
        except Exception as e:
            total_time = int((time.time() - start_time) * 1000)
            events.request.fire(request_type="boto3", name="invoke_endpoint", response_time=total_time,
                                        response_length=0,
                                        exception=e)
            print(f"exception : {e}")
            raise e

Overwriting stress.py


<br>

# 2. Load Testing
---

로드 테스트는 아래 파라메터들의 설정만으로 로드 테스트를 편리하게 수행할 수 있습니다.

- `num_users`: 어플리케이션을 테스트하는 총 사용자 수입니다. 
- `spawn_rate`: 초당 몇 명씩 사용자를 늘릴 것인지 정합니다. 이 때, on_start 함수가 정의되어 있다면 이 함수를 같이 호출합니다.

예를 들어 `num_users=100, spawn_rate=10` 일 때는 초당 10명의 사용자가 추가되며, 10초 후에는 100명의 사용자로 늘어납니다. 이 사용자 수에 도달하면 통계치가 재설정되니다.

In [59]:
import boto3
region = boto3.Session().region_name

num_users = 100
spawn_rate = 10
endpoint_url = f'https://runtime.sagemaker.{region}.amazonaws.com/endpoints/{endpoint_name}/invocations'

### Running a locustfile

주피터 노트북 상에서의 실습을 위해 nohup으로 백그라운드에서 locust를 시작합니다. Locust는 기본적으로 8089 포트를 사용합니다. (http://localahost:8089)

In [60]:
# %%bash -s "$num_users" "$spawn_rate" "$endpoint_url"

# echo locust -f stress.py -u $1 -r $2 -H $3

In [61]:
%%bash -s "$num_users" "$spawn_rate" "$endpoint_url"

nohup locust -f stress.py -u $1 -r $2 -H $3 >/dev/null 2>&1 &

### Secure tunnels to localhost using ngrok

ngrok를 사용해 외부에서 로컬호스트로 접속할 수 있습니다. pyngrok는 Python wrapper로 API 호출로 ngrok을 더 편리하게 사용할 수 있습니다.

- ngrok: https://ngrok.com/
- pyngrok: https://pyngrok.readthedocs.io/en/latest

In [62]:
from pyngrok import ngrok
http_tunnel = ngrok.connect(8089, bind_tls=True)
http_url = http_tunnel.public_url

아래 코드 셀 실행 시 출력되는 URL을 클릭 후, `Start swarming` 버튼을 클릭해 주세요.

In [63]:
from IPython.core.display import display, HTML
display(HTML(f'<b><a target="blank" href="{http_url}">Load test: {http_url}</a></b>'))

  from IPython.core.display import display, HTML


In [64]:
tunnels = ngrok.get_tunnels()
print(tunnels)

[<NgrokTunnel: "https://dac6-52-42-79-222.ngrok-free.app" -> "http://localhost:8089">]


### CloudWatch Monitoring
아래 코드 셀에서 출력되는 링크를 클릭해면 CloudWatch 대시보드로 이동합니다.

In [44]:
cw_url = f"https://console.aws.amazon.com/cloudwatch/home?region={region}#metricsV2:graph=~(metrics~(~(~'AWS*2fSageMaker~'InvocationsPerInstance~'EndpointName~'{endpoint_name}~'VariantName~'AllTraffic))~view~'timeSeries~stacked~false~region~'{region}~start~'-PT15M~end~'P0D~stat~'SampleCount~period~60);query=~'*7bAWS*2fSageMaker*2cEndpointName*2cVariantName*7d*20{endpoint_name}"
display(HTML(f'<b><a target="blank" href="{cw_url}">Cloudwatch Monitoring</a></b>'))

### Stop Locust and Disconnect ngrok

In [58]:
!pkill -9 -ef locust
ngrok.disconnect(http_url)

locust killed (pid 12020)


### (Optional) More testing

위 섹션에서 `num_users, spawn_rate`를 변경해서 테스트해 보세요. (예: `num_users=1000, spawn_rate=20`) RPS가 일정 이상이면 Failures 수치가 올라가는 것을 확인할 수 있습니다.

<br>

# 3. Endpoint Clean-up
---

과금 방지를 위해 엔드포인트를 삭제합니다.

In [None]:
# try:
#     sm_client.delete_endpoint(EndpointName=endpoint_name)
#     sm_client.delete_endpoint_config(EndpointConfigName=endpoint_config_name)
#     sm_client.delete_model(ModelName=sm_model_name)
# except:
#     print("If any docker processes are running,Retry this cell, please.")