
# <span style="color:DarkSeaGreen">JumpStart Lab 3</span>

This lab does the following:

- Uses the endpoint created in Lab 1
  - If you have not deleted and deregistered the target tracking policy created in lab 2, it can cause conflicts with the step policy created in this lab
- Implements SageMaker application-autoscaling **step** policy
- Tests the functionality 



# <span style="color:DarkSeaGreen">Prepare Your Environment</span>
### Note if you want a venv, see Lab 1

# Lab 3 Starts Here!

# <span style="color:DarkSeaGreen">Setup</span>

In [None]:
# region
# for the purpose of this lab, us-east-1, us-west-2, eu-west-1 has the broadest coverage of JumpStart models and instance types
# if you provision in other regions, you may not have access to all the models or instance types, and may need to request increase of quotas for some instance types
myRegion='us-east-1'

# parameter store
myParameterStoreChosenModel='doit-jumpstart-sagemaker-chosen-model'
myParameterStoreEndpointName='doit-jumpstart-sagemaker-endpoint-name'
myParameterStoreIAMARN='doit-jumpstart-sagemaker-iam-arn'

# application auto scaling policy
myEndpointScalingOutPolicyName='doit-jumpstart-sagemaker-endpoint-step-scaling-out-policy'
myEndpointScalingInPolicyName='doit-jumpstart-sagemaker-endpoint-step-scaling-in-policy'
myAlarmScaleOutName = "doit-jumpstart-sagemaker-endpoint-step-scaling-alarmhigh"
myAlarmScaleInName = "doit-jumpstart-sagemaker-endpoint-step-scaling-alarmlow"

print ('Done! Move to the next cell ->')

In [None]:
# import libraries
import boto3
from certifi import where

botoSession = boto3.Session(region_name=myRegion)

# Configure boto3 to use certifi's certificates - helps avoid SSL errors if your system’s certificate store is out of date or missing root certs
sts_client = boto3.client('sts', verify=where())
myAccountNumber = sts_client.get_caller_identity()["Account"]
print(myAccountNumber)
print(sts_client.get_caller_identity()["Arn"])

# create clients we can use later
# iam
iam = boto3.client('iam', region_name=myRegion, verify=where())
# ssm
ssm = boto3.client('ssm', region_name=myRegion, verify=where())
# cloudwatch
cw = boto3.client("cloudwatch", region_name=myRegion, verify=where())

print ('Done! Move to the next cell ->')

In [None]:
# define tags added to all services we create
myTags = [
    {"Key": "env", "Value": "non_prod"},
    {"Key": "owner", "Value": "doit-jumpstart"},
    {"Key": "project", "Value": "lab1"},
    {"Key": "author", "Value": "simon"},
]
myTagsDct = {
    "env": "non_prod",
    "owner": "doit-jumpstart",
    "project": "lab1",
    "author": "simon",
}

print ('Done! Move to the next cell ->')

# <span style="color:DarkSeaGreen">IAM</span>

In [None]:
def getSageMakerExecutionRole():
    """
    Gets a role required for SageMaker to run jobs on your behalf
    Only needed if this is being run in a local IDE, not needed if in SageMaker Studio or SageMaker Notebook Instance

    Args:
        None

    Returns:
        An IAM execution role ARN
    """

    # get the role we created in the previous lab from the parameter store
    response = ssm.get_parameter(Name=myParameterStoreIAMARN)
    myRoleSageMakerExecutionARN = response['Parameter']['Value']
    print(f"Retrieved role from parameter store: {myRoleSageMakerExecutionARN}")    

    return myRoleSageMakerExecutionARN

# <span style="color:DarkSeaGreen">Get Execution Role and Session</span>
- SageMaker requires an execution role to assume on your behalf

In [None]:
from sagemaker.session import get_execution_role
from sagemaker.session import Session

try:
    # if this is being run in a SageMaker AI JupyterLab Notebook
    myRoleSageMakerExecutionARN = get_execution_role()
except:
    # if this is being run in a local IDE - we need to create our own role
    myRoleSageMakerExecutionARN = getSageMakerExecutionRole()

# make sure we get a session in the correct region (needed as it can use the aws configure region if running this locally
sageMakerSession = Session(boto_session=botoSession)

print(myRoleSageMakerExecutionARN)
print(sageMakerSession)

print ('Done! Move to the next cell ->')

# <span style="color:DarkSeaGreen">Get the Endpoint from Lab 1</span>

In [None]:
# get the endpoint created in lab1
from sagemaker.predictor import Predictor
from sagemaker.serializers import JSONSerializer
from sagemaker.deserializers import JSONDeserializer

# get the endpoint name from parameter store
response = ssm.get_parameter(
    Name=myParameterStoreEndpointName
)
endpointName = response['Parameter']['Value']
print(f"Using endpoint name: {endpointName}")  

# create a predictor to interact with the endpoint - need to specify the default serializer and deserializer this time
predictor = Predictor(
    endpoint_name=endpointName,
    sagemaker_session=sageMakerSession,
    serializer=JSONSerializer(),      
    deserializer=JSONDeserializer()
)

print ('Done! Move to the next cell ->')

In [None]:
# required if an image model is being used
def decode_and_show(model_response) -> None:
    from PIL import Image
    import base64
    import io
    
    image = Image.open(io.BytesIO(base64.b64decode(model_response)))
    display(image)
    image.close()

In [None]:
# test the endpoint
import random

# get the model we deployed from parameter store
response = ssm.get_parameter(
    Name=myParameterStoreChosenModel
)
chosenModel = response['Parameter']['Value']
modelType = chosenModel.split("|")[0]
modelID = chosenModel.split("|")[1]
instanceType = chosenModel.split("|")[2].split(" ")[0]
print(f"You selected: model type {modelType} {modelID} on {instanceType}")

if modelType == "llm":
    example_payloads = [
        {
            "body": {
                "inputs": "Describe what a llm model can do for someone who is sceptical about them",
                "parameters": {
                    "max_new_tokens": 128,
                    "temperature": 0.2,
                    "top_p": 0.9,
                },
            },
            "content_type": "application/json",
            "accept": "application/json",
        },
    ]
else:
    example_payloads = [
        {
            "text_prompts": [
                {
                    "text": "A cowboy standoff at sunset in a dusty desert town, cinematic wide shot, golden hour lighting, photorealistic"
                }
            ],
            "width": 512,
            "height": 512,
            "cfg_scale": 7.0,
            "steps": 150,
            "seed": random.randint(0, 4294967295),
        },
    ]

for payload in example_payloads:
    if modelType == "llm":
        body = payload.body if hasattr(payload, "body") else payload["body"]
        response = predictor.predict(body)
        response = response[0] if isinstance(response, list) else response
        print("Input:\n", body, end="\n\n")
        print("Output:\n", response["generated_text"].strip(), end="\n\n\n")
    else:
        #payload = json.dumps(payload).encode("utf-8") # in case you change the image model that needs it
        response = predictor.predict(payload)
        decode_and_show(response["generated_image"])

print("Done! Move to the next cell ->")

# <span style="color:DarkSeaGreen">Create Scalability Plan - Step Scaling</span>
- Uses SageMaker Application Auto Scaling - Step Scaling
- A step scaling policy scales your application's capacity in predefined increments based on CloudWatch alarms. You can define separate scaling policies to handle scaling out (increasing capacity) and scaling in (decreasing capacity) when an alarm threshold is breached.
- With step scaling policies, you create and manage the CloudWatch alarms that invoke the scaling process. When an alarm is breached, Application Auto Scaling initiates the scaling policy associated with that alarm.
- The step scaling policy scales capacity using a set of adjustments, known as step adjustments. The size of the adjustment varies based on the magnitude of the alarm breach.
  - If the breach exceeds the first threshold, Application Auto Scaling will apply the first step adjustment.
  - If the breach exceeds the second threshold, Application Auto Scaling will apply the second step adjustment, and so on.
- Using the new high-resolution metrics allow you to greatly decrease the time it takes to scale up an endpoint using Application Auto Scaling

In [None]:
# https://github.com/aws/amazon-sagemaker-examples/blob/main/inference/generativeai/huggingfacetgi/meta-llama/llama3-8b/faster-autoscaling/realtime-endpoints/FasterAutoscaling-SME-Llama3-8B-AppAutoScaling.ipynb
# https://aws.amazon.com/blogs/machine-learning/amazon-sagemaker-inference-launches-faster-auto-scaling-for-generative-ai-models/
# https://docs.aws.amazon.com/autoscaling/application/userguide/what-is-application-auto-scaling.html

# define a new auto scaling target for Application Auto Scaling
# we will use step scaling - scales capacity using a set of adjustments, known as step adjustments, size of the adjustment varies based on the magnitude of the metric value
# auto scaling
autoScaling = boto3.client('application-autoscaling', region_name=myRegion, verify=where())
variantName = "AllTraffic"
ResourceId  = "endpoint/" + endpointName + "/variant/" + variantName

# Register scalable target
scalableTarget = autoScaling.register_scalable_target(
    ServiceNamespace="sagemaker",
    ResourceId=ResourceId,
    ScalableDimension="sagemaker:variant:DesiredInstanceCount",
    MinCapacity=1,
    MaxCapacity=5,  # Replace with your desired maximum instances
)

print ('Done! Move to the next cell ->')

In [None]:
# create step scaling policies - scale out and scale in
# These are step-scaling policies that uses the new high-resolution metrics for SageMaker endpoints
# Step-scaling policies are more complex to set up, but gives you more control over how your endpoint scales
# https://docs.aws.amazon.com/autoscaling/application/userguide/step-scaling-policy-overview.html

# create a policy that scales out when the endpoint receives more than n ConcurrentRequestsPerModel
# this new metric will be tracked when the predefined metric type used below is SageMakerVariantConcurrentRequestsPerModelHighResolution
stepScaleOutPolicyResponse = autoScaling.put_scaling_policy(
    PolicyName=myEndpointScalingOutPolicyName,
    ServiceNamespace="sagemaker",
    ResourceId=ResourceId,
    ScalableDimension="sagemaker:variant:DesiredInstanceCount",
    PolicyType="StepScaling",
    StepScalingPolicyConfiguration={
        "AdjustmentType": "ChangeInCapacity",
        "Cooldown": 120,  # n seconds cooldown
        "MetricAggregationType": "Maximum",
        "StepAdjustments": [
            {
                "MetricIntervalLowerBound": 0,
                "MetricIntervalUpperBound": 20,
                "ScalingAdjustment": 1,  # Increase by one instance
            },
            {
                "MetricIntervalLowerBound": 20,
                "ScalingAdjustment": 2,  # Increase by 2 instances
            },
        ],
    },
)

scale_out_policy_arn = stepScaleOutPolicyResponse["PolicyARN"]
print(f"Step scaling policy ARN: [i green]{scale_out_policy_arn}[/i green]")

print ('Done! Move to the next cell ->')

In [None]:
# create step scaling policies - scale out and scale in
# These are step-scaling policies that uses the new high-resolution metrics for SageMaker endpoints
# Step-scaling policies are more complex to set up, but gives you more control over how your endpoint scales
# https://docs.aws.amazon.com/autoscaling/application/userguide/step-scaling-policy-overview.html

# create a policy that scales in when the endpoint receives more than n ConcurrentRequestsPerModel
# this new metric will be tracked when the predefined metric type used below is SageMakerVariantConcurrentRequestsPerModelHighResolution
stepScaleInPolicyResponse = autoScaling.put_scaling_policy(
    PolicyName=myEndpointScalingInPolicyName,
    ServiceNamespace="sagemaker",
    ResourceId=ResourceId,
    ScalableDimension="sagemaker:variant:DesiredInstanceCount",
    PolicyType="StepScaling",
    StepScalingPolicyConfiguration={
        "AdjustmentType": "ChangeInCapacity",
        "Cooldown": 60,  # Cooldown period after scale-in activity
        "MetricAggregationType": "Maximum",
        "StepAdjustments": [
            {
                "MetricIntervalUpperBound": 0,
                "MetricIntervalLowerBound": -20,
                "ScalingAdjustment": -1,  # Decrease by 1 instance
            },
            {"MetricIntervalUpperBound": -20, "ScalingAdjustment": -2},  # Decrease by 2 instances
        ],
    },
)

# print(scale_in_policy_response)
scale_in_policy_arn = stepScaleInPolicyResponse["PolicyARN"]
print(f"Step scaling policy ARN: [i green]{scale_in_policy_arn}[/i green]")

print ('Done! Move to the next cell ->')

# <span style="color:DarkSeaGreen">Create Scalability Plan - Alarms</span>
- Unlike target tracking scaling, we have to manually create the alarms

In [None]:
# create CloudWatch alarm for scale-out
response = cw.put_metric_alarm(
    AlarmName=myAlarmScaleOutName,
    MetricName="ConcurrentRequestsPerModel",
    Namespace="AWS/SageMaker",
    Statistic="Maximum",
    Period=60,
    EvaluationPeriods=3,
    Threshold=20.0,
    ComparisonOperator="GreaterThanOrEqualToThreshold",
    Dimensions=[
    {"Name": "EndpointName", "Value": endpointName},
    {"Name": "VariantName", "Value": "AllTraffic"}],
    AlarmActions=[scale_out_policy_arn],
    TreatMissingData="ignore",
)

print(f"CloudWatch alarm created for scale-out:\n[b blue]{myAlarmScaleOutName}")
print ('Done! Move to the next cell ->')

In [None]:
# Create CloudWatch alarm for scale-in
response = cw.put_metric_alarm(
    AlarmName=myAlarmScaleInName,
    MetricName="ConcurrentRequestsPerModel",
    Namespace="AWS/SageMaker",
    Statistic="Maximum",
    Period=60,
    EvaluationPeriods=3,
    Threshold=10.0,
    ComparisonOperator="LessThanOrEqualToThreshold",
    Dimensions=[
    {"Name": "EndpointName", "Value": endpointName},
    {"Name": "VariantName", "Value": "AllTraffic"}],
    AlarmActions=[scale_in_policy_arn],
    TreatMissingData="ignore",
)

print(f"CloudWatch alarm created for scale-in:\n[b blue]{myAlarmScaleInName}")
print ('Done! Move to the next cell ->')

In [None]:
# get scaling policies for endpoint
policies = autoScaling.describe_scaling_policies(
    ServiceNamespace="sagemaker",
    ResourceId=ResourceId,
    ScalableDimension="sagemaker:variant:DesiredInstanceCount",
)

# collect alarm details as we need them later when monitoring the locust run
alarms_info = []
for policy in policies.get("ScalingPolicies", []):
    alarm_name = policy.get("Alarms", [{}])[0].get("AlarmName")
    if not alarm_name:
        continue
    alarm_desc = cw.describe_alarms(AlarmNames=[alarm_name])['MetricAlarms'][0]
    alarms_info.append({
        "AlarmName": alarm_name,
        "Threshold": alarm_desc['Threshold'],
        "Period": alarm_desc['Period'],
        "EvaluationPeriods": alarm_desc['EvaluationPeriods'],
        "ScaleInCooldown": alarm_desc['Period'] * alarm_desc['EvaluationPeriods']
    })

# ensure high alarm is at [0], low alarm at [1]
alarms = [None, None]
for alarm in alarms_info:
    if "alarmhigh" in alarm['AlarmName'].lower():
        alarms[0] = alarm
    elif "alarmlow" in alarm['AlarmName'].lower():
        alarms[1] = alarm

# print in a readable way
for alarm in alarms:
    print(f"Alarm Name: {alarm['AlarmName']} | Scale cooldown (seconds): {alarm['ScaleInCooldown']} | Threshold: {alarm['Threshold']} | EvaluationPeriods: {alarm['EvaluationPeriods']} periods of {alarm['Period']} seconds")


# <span style="color:DarkSeaGreen">Test Scalability Plan</span>
- Lets just test the endpoint first, make sure its all good
- Simulate load
- Make sure your venv is activated, eg
  - activate the virtual environment
  - source venv-jumpstart-lab1/bin/activate

In [None]:
# now we're going to use locust to simulate load on the endpoint
# https://docs.locust.io/en/stable/ 
# https://aws.amazon.com/blogs/machine-learning/best-practices-for-load-testing-amazon-sagemaker-real-time-inference-endpoints/
# see the locust_script_lab2.py file for details of the load test
# it gathers the endpoint name, etc via os environment vars we export below
# run this cell, then paste and run in a terminal window, make sure its run in your virtual environment created in lab 1, or in your own that has boto3 and locust installed

print ("export AWS_REGION={}".format(myRegion))
print ("export ENDPOINT_NAME={}".format(endpointName))
print ("export CONTENT_TYPE={}".format("application/json"))
print ("export MODEL_TYPE={}".format(modelType))
print ("export HOST={}".format('http://localhost'))
if modelType == "llm":
    print ("export PAYLOAD='{}'".format('{"inputs": "Please explain what load testing is and why its important in reference to sagemaker endpoints"}'))
else:
    print ("export PAYLOAD='{}'".format('A cowboy standoff at sunset in a dusty desert town, cinematic wide shot, golden hour lighting, photorealistic'))


In [None]:
# these are picked up by the locust file
# Paste the following in a terminal window, make sure its run in your virtual environment created in lab 1, or in your own that has boto3 and locust installed
# LOCUST_USERS is the number of simulated users
# LOCUST_SPAWN_RATE is the rate per second to spawn (add new) users - so 20 users at rate of 2 means add 2 users every second, so take 10 seconds to get to 20 users
# LOCUST_RUN_TIME is how long to run the test for
export LOCUST_USERS=140
export LOCUST_SPAWN_RATE=0.5
export LOCUST_RUN_TIME=25m

# <span style="color:DarkSeaGreen">Before Locust is Run</span>
- Go to the CloudWatch console
- Monitor the alarm being target tracked for ConcurrentRequestsPerModel, eg 
  - TargetTracking-endpoint/*endpoint name*-Alarm**High**-*uuid*
  - TargetTracking-endpoint/*endpoint name*-Alarm**Low**-*uuid*
- Run the cell below to monitor the instance count



In [None]:
import time
from datetime import datetime, timedelta

# --- Configuration ---
endpoint_name = endpointName
region = myRegion
poll_interval = 10    # seconds between checks

# --- Clients ---
sm_client = boto3.client("sagemaker", region_name=region)
cw_client = boto3.client("cloudwatch", region_name=region)

print(f"Monitoring endpoint '{endpoint_name}' variants (press Ctrl+C to stop)...\n")
print("Alarms for reference:")
print(f"High Scaling Alarm: {alarms[0]['AlarmName']} | Scale cooldown (seconds): {cw_client.describe_alarms(AlarmNames=[alarms[0]['AlarmName']])['MetricAlarms'][0]['Period'] * cw_client.describe_alarms(AlarmNames=[alarms[0]['AlarmName']])['MetricAlarms'][0]['EvaluationPeriods']} | when above {cw_client.describe_alarms(AlarmNames=[alarms[0]['AlarmName']])['MetricAlarms'][0]['Threshold']} for {cw_client.describe_alarms(AlarmNames=[alarms[0]['AlarmName']])['MetricAlarms'][0]['EvaluationPeriods']} periods of {cw_client.describe_alarms(AlarmNames=[alarms[0]['AlarmName']])['MetricAlarms'][0]['Period']} seconds")
print(f"Low Scaling Alarm: {alarms[1]['AlarmName']} | Scale cooldown (seconds): {cw_client.describe_alarms(AlarmNames=[alarms[1]['AlarmName']])['MetricAlarms'][0]['Period'] * cw_client.describe_alarms(AlarmNames=[alarms[1]['AlarmName']])['MetricAlarms'][0]['EvaluationPeriods']} | when below {cw_client.describe_alarms(AlarmNames=[alarms[1]['AlarmName']])['MetricAlarms'][0]['Threshold']} for {cw_client.describe_alarms(AlarmNames=[alarms[1]['AlarmName']])['MetricAlarms'][0]['EvaluationPeriods']} periods of {cw_client.describe_alarms(AlarmNames=[alarms[1]['AlarmName']])['MetricAlarms'][0]['Period']} seconds")

try:
    while True:
        # --- Describe endpoint variants ---
        response = sm_client.describe_endpoint(EndpointName=endpoint_name)
        for variant in response["ProductionVariants"]:
            variant_name = variant["VariantName"]
            current_instances = variant["CurrentInstanceCount"]
            desired_instances = variant["DesiredInstanceCount"]

            # --- Fetch latest ConcurrentRequestsPerModel metric ---
            end_time = datetime.utcnow()
            start_time = end_time - timedelta(seconds=poll_interval*2)  # small window to get the latest datapoint

            # get the ConcurrentRequestsPerModel metric for this variant
            metric_resp = cw_client.get_metric_statistics(
                Namespace="AWS/SageMaker",
                MetricName="ConcurrentRequestsPerModel",
                Dimensions=[
                    {"Name": "EndpointName", "Value": endpoint_name},
                    {"Name": "VariantName", "Value": variant_name},
                ],
                StartTime=start_time,
                EndTime=end_time,
                Period=poll_interval,
                Statistics=["Average"],
            )

            datapoints = metric_resp.get("Datapoints", [])
            concurrent_requests = round(datapoints[-1]["Average"], 2) if datapoints else 0

            print(
                f"[{time.strftime('%H:%M:%S')}] Variant: {variant_name} | "
                f"Current instances: {current_instances} | Desired instances: {desired_instances} | "
                f"ConcurrentRequestsPerModel: {concurrent_requests} | "
                f"High Scaling Alarm State: {cw_client.describe_alarms(AlarmNames=[alarms[0]['AlarmName']])['MetricAlarms'][0]['StateValue']} | "
                f"Low Scaling Alarm State: {cw_client.describe_alarms(AlarmNames=[alarms[1]['AlarmName']])['MetricAlarms'][0]['StateValue']}"
            )

        print("-" * 80)
        time.sleep(poll_interval)

except KeyboardInterrupt:
    print(f"Monitoring stopped.")
    # print each alarm name and description on a new line
    print("Alarms for reference:")
    print(f"High Scaling Alarm: {alarms[0]['AlarmName']} | Scale cooldown (seconds): {cw_client.describe_alarms(AlarmNames=[alarms[0]['AlarmName']])['MetricAlarms'][0]['Period'] * cw_client.describe_alarms(AlarmNames=[alarms[0]['AlarmName']])['MetricAlarms'][0]['EvaluationPeriods']} | when above {cw_client.describe_alarms(AlarmNames=[alarms[0]['AlarmName']])['MetricAlarms'][0]['Threshold']} for {cw_client.describe_alarms(AlarmNames=[alarms[0]['AlarmName']])['MetricAlarms'][0]['EvaluationPeriods']} periods of {cw_client.describe_alarms(AlarmNames=[alarms[0]['AlarmName']])['MetricAlarms'][0]['Period']} seconds")
    print(f"Low Scaling Alarm: {alarms[1]['AlarmName']} | Scale cooldown (seconds): {cw_client.describe_alarms(AlarmNames=[alarms[1]['AlarmName']])['MetricAlarms'][0]['Period'] * cw_client.describe_alarms(AlarmNames=[alarms[1]['AlarmName']])['MetricAlarms'][0]['EvaluationPeriods']} | when below {cw_client.describe_alarms(AlarmNames=[alarms[1]['AlarmName']])['MetricAlarms'][0]['Threshold']} for {cw_client.describe_alarms(AlarmNames=[alarms[1]['AlarmName']])['MetricAlarms'][0]['EvaluationPeriods']} periods of {cw_client.describe_alarms(AlarmNames=[alarms[1]['AlarmName']])['MetricAlarms'][0]['Period']} seconds")


# <span style="color:DarkSeaGreen">Start Locust</span>
- Paste the command into your terminal window in your venv
- If you are testing load with a txt2img model on a mac, and you see assertion errors for threading
  - this is a harmless warning related to gevent (the async library Locust uses) and Python's threading system and can be ignored

## # fails
- if you see a failure rate more than 0%, or it climbs as user load increases its likely you do not have instance sizes that can handle the load
- this may mean you need to scale earlier, or use step scaling rather than target scaling
- check the cpu, memory and gpu utilization metrics of your endpoint, likely 100%
- the defaults used in this lab may show an example of failure, which after scaling occurs reduces
  - however endpoints can take minutes to provision once a scale out alarm happens, once it does, those failures go away
  - monitor the metrics to see the % increase, then decrease after scaling
- failures can also be caused by api call throttling
  - you can help this by increasing the retries maxattempts value in the locust script, or implementing exponential backoff as you would in your production code

In [None]:
# now we're going to use locust to simulate load on the endpoint
# https://docs.locust.io/en/stable/ 
# https://aws.amazon.com/blogs/machine-learning/best-practices-for-load-testing-amazon-sagemaker-real-time-inference-endpoints/
# see the locust_script_lab2.py file for details of the load test
# it gathers the endpoint name, etc from the parameter store where we stored it in lab 1

# Paste the following in a terminal window, make sure its run in your virtual environment created in lab 1, or in your own that has boto3 and locust installed
# this will run in headless mode, use the properties just exported, and write csv logs for use in graph cells below
locust -f locust_script_lab2.py --headless -u $LOCUST_USERS -r $LOCUST_SPAWN_RATE --run-time $LOCUST_RUN_TIME --host http://localhost --csv results --csv-full-history

# for quick testing you can run with just 1 user, spawning at rate of 1 user per second, for 2 minutes
locust -f locust_script_lab2.py --headless -u 1 -r 1 --run-time 2m --host http://localhost --csv results --csv-full-history

# OR
# if you want to see the locust web UI, then run without --headless and point your browser at http://localhost:8089
# it will still write the csv files for use in graph cells below
locust -f locust_script_lab2.py -u $LOCUST_USERS -r $LOCUST_SPAWN_RATE --run-time $LOCUST_RUN_TIME --host http://localhost --csv results --csv-full-history

# <span style="color:DarkSeaGreen">Review Stats</span>
- After Locust has finished, we can review the stats it wrote as csv files
- NOTE the endpoint will not scale back in for 15 minutes (value may change depending on the scaling policy) after locust has finished its run - see above for a description of this
  - therefore when viewing the charts below, it will not reflect a scale in event for 15 minutes 

#### SageMaker Endpoint Latency Under Load
  - See how your SageMaker endpoint responded as load increased, and correlate that with autoscaling events
  - NOTE this uses the results csv file written by locust, so wait until locust finishes before running this and subsequent cells

In [None]:
# Plot response times over time
import pandas as pd
import matplotlib.pyplot as plt

# Load the full history file
df = pd.read_csv("results_stats_history.csv")
df.head()

# Plot response times over time
plt.plot(df["Timestamp"], df["95%"], label="p95 latency (ms)")
plt.plot(df["Timestamp"], df["99%"], label="p99 latency (ms)")
plt.xlabel("Time (s)")
plt.ylabel("Latency (ms)")
plt.title("SageMaker Endpoint Latency Under Load")
plt.legend()
plt.grid(True)
plt.show()

#### Concurrent Users vs. Request Rate
- Total RPS (Requests per second) → how busy your endpoint is at each interval
- Concurrent Users → how many users were active at that moment
- Compare these to your endpoint instance count and scaling policy in SageMaker to see if the scaling is working as expected

In [None]:
# Plot Concurrent Users and Requests per Second
fig, ax1 = plt.subplots(figsize=(12,6))

# Left Y-axis: Requests per second
ax1.plot(df["Timestamp"], df["Requests/s"], color="tab:blue", label="Total RPS")
ax1.set_xlabel("Time (s)")
ax1.set_ylabel("Requests per second", color="tab:blue")
ax1.tick_params(axis="y", labelcolor="tab:blue")

# Right Y-axis: Number of concurrent users
ax2 = ax1.twinx()
ax2.plot(df["Timestamp"], df["User Count"], color="tab:orange", label="Concurrent Users")
ax2.set_ylabel("Concurrent Users", color="tab:orange")
ax2.tick_params(axis="y", labelcolor="tab:orange")

# Titles & Legends
plt.title("Concurrent Users vs. Request Rate")
fig.tight_layout()
ax1.legend(loc="upper left")
ax2.legend(loc="upper right")
plt.grid(True)
plt.show()


#### Locust Failures
- Any failures plotted against GPU metrics
- Look in the results_failures.csv for details
- If you see throttling errors, its because have exceeded the number of requests handled per second by your endpoint api
  - In the locust script, you can increase retries, example below, or implement exponential backoff as you wold in your own production code
    - region_name=region, retries={"max_attempts": 3, "mode": "standard"}
- NOTE there is a lookback in minutes defined below, so the cell output is only relevant if its run shortly after locust has finished, any longer and it may not get results from the metrics

In [None]:
from datetime import datetime, timedelta, timezone

lookback_minutes = 45  # how far back to fetch metrics

cw = boto3.client("cloudwatch", region_name=region)
aas = boto3.client("application-autoscaling", region_name=region)

end_time = datetime.now(timezone.utc)
start_time = end_time - timedelta(minutes=lookback_minutes)

# --- 1. Get CloudWatch metric: ConcurrentRequestsPerModel ---
metrics = cw.get_metric_statistics(
    Namespace="/aws/sagemaker/Endpoints",
    MetricName="GPUUtilization",
    Dimensions=[{"Name": "EndpointName", "Value": endpoint_name},
                {"Name": "VariantName", "Value": variant_name}],
    StartTime=start_time,
    EndTime=end_time,
    Period=60,
    Statistics=["Average"],
)

# Convert CloudWatch metric to DataFrame
gpu_df = pd.DataFrame({
    "timestamp": [dp['Timestamp'] for dp in metrics['Datapoints']],
    "GPU_Utilization": [dp['Average'] for dp in metrics['Datapoints']]
})
gpu_df = gpu_df.sort_values("timestamp")

# --- Load Locust failures CSV ---
locust_df = pd.read_csv("results_stats_history.csv")
# Convert Unix timestamp to datetime
locust_df['timestamp'] = pd.to_datetime(locust_df['Timestamp'], unit='s', utc=True)

# Use total failures per timestamp
locust_df = locust_df[['timestamp', 'Total Failure Count']]

# --- Merge on nearest timestamp ---
merged = pd.merge_asof(
    locust_df.sort_values('timestamp'),
    gpu_df.sort_values('timestamp'),
    on='timestamp'
)

# --- Plot ---
fig, ax1 = plt.subplots(figsize=(12,6))

ax1.plot(merged['timestamp'], merged['GPU_Utilization'], 'b-', label='GPU Utilization (%)')
ax1.set_ylabel('GPU Utilization (%)', color='b')
ax1.tick_params(axis='y', labelcolor='b')

ax2 = ax1.twinx()
ax2.plot(merged['timestamp'], merged['Total Failure Count'], 'r-', label='Locust Fails')
ax2.set_ylabel('Locust Fails', color='r')
ax2.tick_params(axis='y', labelcolor='r')

plt.title(f"GPU vs Locust Failures for {endpoint_name}")
fig.tight_layout()
plt.show()

#### SageMaker Endpoint Scaling
- Concurrent Requests vs. Instance Count
- Show when the instance scales
- Pulls SageMaker endpoint metrics from CloudWatch
- Handles empty or missing data gracefully
- Converts them into a DataFrame for analysis
- Plots the metric values over time
- If the instance has not scaled in yet, its probably still within the 15 minutes (may be different depending on policy) of the scale in threshold, check the alarm in the console for details
- NOTE there is a lookback in minutes defined below, so the cell output is only relevant if its run shortly after locust has finished, any longer and it may not get results from the metrics

In [None]:
from datetime import datetime, timedelta, timezone

lookback_minutes = 45  # how far back to fetch metrics

cw = boto3.client("cloudwatch", region_name=region)
aas = boto3.client("application-autoscaling", region_name=region)

end_time = datetime.now(timezone.utc)
start_time = end_time - timedelta(minutes=lookback_minutes)

# --- 1. Get CloudWatch metric: ConcurrentRequestsPerModel ---
metrics = cw.get_metric_statistics(
    Namespace="AWS/SageMaker",
    MetricName="ConcurrentRequestsPerModel",
    Dimensions=[{"Name": "EndpointName", "Value": endpoint_name},
                {"Name": "VariantName", "Value": variant_name}],
    StartTime=start_time,
    EndTime=end_time,
    Period=60,
    Statistics=["Average"],
)

datapoints = metrics.get("Datapoints", [])
cw_df = pd.DataFrame(datapoints)

if cw_df.empty:
    print("⚠️ No CloudWatch datapoints found for ConcurrentRequestsPerModel")
else:
    cw_df["Timestamp"] = pd.to_datetime(cw_df["Timestamp"])
    cw_df.sort_values("Timestamp", inplace=True)

# --- 2. Get Current & Desired Instance Counts ---
scalable_target = aas.describe_scalable_targets(
    ServiceNamespace="sagemaker",
    ResourceIds=[f"endpoint/{endpoint_name}/variant/{variant_name}"],
    ScalableDimension="sagemaker:variant:DesiredInstanceCount"
)
desired_min = scalable_target["ScalableTargets"][0]["MinCapacity"]
desired_max = scalable_target["ScalableTargets"][0]["MaxCapacity"]

# --- 3. Get actual scaling activity history (DesiredInstanceCount changes) ---
scaling_history = aas.describe_scaling_activities(
    ServiceNamespace="sagemaker",
    ResourceId=f"endpoint/{endpoint_name}/variant/{variant_name}",
    ScalableDimension="sagemaker:variant:DesiredInstanceCount",
    MaxResults=20
)

scaling_df = pd.DataFrame(scaling_history.get("ScalingActivities", []))

if not scaling_df.empty:
    scaling_df["Timestamp"] = pd.to_datetime(scaling_df["StartTime"])
    # Check if timestamps are timezone-aware
    tzinfo = scaling_df["Timestamp"].dt.tz  # will be None if naive
    lookback_start = datetime.now(tz=tzinfo) - timedelta(minutes=lookback_minutes)

    # Define cutoff
    cutoff_time = datetime.now(timezone.utc) - timedelta(minutes=lookback_minutes)
    # Restrict to only rows newer than cutoff
    scaling_df = scaling_df[scaling_df["Timestamp"] >= cutoff_time].copy()
    # extract the instance count
    scaling_df["NewCapacity"] = scaling_df["Description"].str.extract(r"(\d+)(?=\D*$)").astype(float)
    # Insert synthetic row at lookback start - this is get the graph to show 1 instance at the start of the lookback period
    synthetic_row = pd.DataFrame({
        "Timestamp": [lookback_start],
        "NewCapacity": [1.0],  # default instance count
        "StatusCode": ["Simulated"],
    })
    scaling_df = pd.concat([synthetic_row, scaling_df], ignore_index=True)

    # Sort for plotting
    scaling_df.sort_values("Timestamp", inplace=True)

# --- 4. Plot everything together ---
if not cw_df.empty:
    fig, ax1 = plt.subplots(figsize=(14, 6))

    # Plot concurrency (left axis)
    ax1.plot(cw_df["Timestamp"], cw_df["Average"], marker="o", color="tab:blue", label="ConcurrentRequestsPerModel")
    ax1.set_xlabel("Time")
    ax1.set_ylabel("Concurrent Requests", color="tab:blue")
    ax1.tick_params(axis="y", labelcolor="tab:blue")

    # Plot desired instance count (right axis)
    ax2 = ax1.twinx()
    if not scaling_df.empty:
        ax2.step(scaling_df["Timestamp"], scaling_df["NewCapacity"], where="post", color="tab:red", label="DesiredInstanceCount")
    ax2.set_ylabel("Instance Count", color="tab:red")
    ax2.tick_params(axis="y", labelcolor="tab:red")
    ax2.set_ylim(0, desired_max + 1)

    fig.suptitle(f"SageMaker Endpoint Scaling: {endpoint_name}/{variant_name}")
    fig.tight_layout()
    plt.show()

    display(cw_df.head(10))
    if not scaling_df.empty:
        display(scaling_df[["Timestamp", "NewCapacity", "StatusCode", "StatusMessage"]].head(10))
        display(scaling_df)



#### Concurrent Requests per Instance Scaling
- Calculates requests per instance over time
- Plots Locust users vs requests per instance, marking the scaling threshold

In [None]:
from datetime import datetime, timedelta, timezone
import pandas as pd
import matplotlib.pyplot as plt
import boto3

lookback_minutes = 50  # how far back to fetch metrics

cw = boto3.client("cloudwatch", region_name=region)
aas = boto3.client("application-autoscaling", region_name=region)

end_time = datetime.now(timezone.utc)
start_time = end_time - timedelta(minutes=lookback_minutes)

# --- 1. Get CloudWatch metric: ConcurrentRequestsPerModel ---
metrics = cw.get_metric_statistics(
    Namespace="AWS/SageMaker",
    MetricName="ConcurrentRequestsPerModel",
    Dimensions=[{"Name": "EndpointName", "Value": endpoint_name},
                {"Name": "VariantName", "Value": variant_name}],
    StartTime=start_time,
    EndTime=end_time,
    Period=60,
    Statistics=["Average"],
)

datapoints = metrics.get("Datapoints", [])
cw_df = pd.DataFrame(datapoints)

if cw_df.empty:
    print("⚠️ No CloudWatch datapoints found for ConcurrentRequestsPerModel")
else:
    cw_df["Timestamp"] = pd.to_datetime(cw_df["Timestamp"])
    cw_df.sort_values("Timestamp", inplace=True)

# --- 2. Get Current & Desired Instance Counts ---
scalable_target = aas.describe_scalable_targets(
    ServiceNamespace="sagemaker",
    ResourceIds=[f"endpoint/{endpoint_name}/variant/{variant_name}"],
    ScalableDimension="sagemaker:variant:DesiredInstanceCount"
)
desired_min = scalable_target["ScalableTargets"][0]["MinCapacity"]
desired_max = scalable_target["ScalableTargets"][0]["MaxCapacity"]

# --- 3. Get actual scaling activity history (DesiredInstanceCount changes) ---
scaling_history = aas.describe_scaling_activities(
    ServiceNamespace="sagemaker",
    ResourceId=f"endpoint/{endpoint_name}/variant/{variant_name}",
    ScalableDimension="sagemaker:variant:DesiredInstanceCount",
    MaxResults=20
)

scaling_df = pd.DataFrame(scaling_history.get("ScalingActivities", []))

if not scaling_df.empty:
    scaling_df["Timestamp"] = pd.to_datetime(scaling_df["StartTime"])
    # Check if timestamps are timezone-aware
    tzinfo = scaling_df["Timestamp"].dt.tz  # will be None if naive
    lookback_start = datetime.now(tz=tzinfo) - timedelta(minutes=lookback_minutes)

    # Define cutoff
    cutoff_time = datetime.now(timezone.utc) - timedelta(minutes=lookback_minutes)
    # Restrict to only rows newer than cutoff
    scaling_df = scaling_df[scaling_df["Timestamp"] >= cutoff_time].copy()
    # extract the instance count
    scaling_df["NewCapacity"] = scaling_df["Description"].str.extract(r"(\d+)(?=\D*$)").astype(float)
    # Insert synthetic row at lookback start - this is get the graph to show 1 instance at the start of the lookback period
    synthetic_row = pd.DataFrame({
        "Timestamp": [lookback_start],
        "NewCapacity": [1.0],  # default instance count
        "StatusCode": ["Simulated"],
    })
    scaling_df = pd.concat([synthetic_row, scaling_df], ignore_index=True)

    # Sort for plotting
    scaling_df.sort_values("Timestamp", inplace=True)

# --- 4. Calculate Requests Per Instance ---
if not cw_df.empty and not scaling_df.empty:
    # Create a DataFrame with instance counts at each minute
    instance_counts = pd.DataFrame()
    instance_counts["Timestamp"] = cw_df["Timestamp"]
    
    # For each timestamp in cw_df, find the most recent instance count from scaling_df
    instance_counts["InstanceCount"] = instance_counts["Timestamp"].apply(
        lambda ts: scaling_df[scaling_df["Timestamp"] <= ts]["NewCapacity"].iloc[-1] 
        if not scaling_df[scaling_df["Timestamp"] <= ts].empty else 1.0
    )
    
    # Calculate requests per instance
    cw_df["RequestsPerInstance"] = cw_df["Average"] / instance_counts["InstanceCount"]
    
    # Also add instance count to cw_df for reference
    cw_df["InstanceCount"] = instance_counts["InstanceCount"]

# --- 5. Plot everything together with aligned x-axis ---
if not cw_df.empty:
    fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(14, 10), sharex=True)  # Added sharex=True
    
    # Get common x-axis limits
    x_min = cw_df["Timestamp"].min()
    x_max = cw_df["Timestamp"].max()
    
    # Plot 1: Total concurrent requests and instance count
    ax1.plot(cw_df["Timestamp"], cw_df["Average"], marker="o", color="tab:blue", 
             label="Total Concurrent Requests", markersize=4)
    ax1.set_ylabel("Total Concurrent Requests", color="tab:blue")
    ax1.tick_params(axis="y", labelcolor="tab:blue")
    ax1.grid(True, alpha=0.3)
    
    # Add instance count to first plot
    ax1_inst = ax1.twinx()
    if not scaling_df.empty:
        ax1_inst.step(scaling_df["Timestamp"], scaling_df["NewCapacity"], where="post", 
                     color="tab:red", linewidth=2, label="Instance Count")
    ax1_inst.set_ylabel("Instance Count", color="tab:red")
    ax1_inst.tick_params(axis="y", labelcolor="tab:red")
    ax1_inst.set_ylim(0, desired_max + 1)
    
    # Combine legends for first plot
    lines1, labels1 = ax1.get_legend_handles_labels()
    lines2, labels2 = ax1_inst.get_legend_handles_labels()
    ax1.legend(lines1 + lines2, labels1 + labels2, loc='upper left')

    # Plot 2: Requests per instance
    if "RequestsPerInstance" in cw_df.columns:
        ax2.plot(cw_df["Timestamp"], cw_df["RequestsPerInstance"], marker="s", 
                color="tab:green", label="Requests Per Instance", linewidth=2, markersize=4)
        ax2.axhline(y=desired_max, color="red", linestyle="--", alpha=0.7, 
                   label=f"Scale-out threshold (est. ~{desired_max})")
        ax2.axhline(y=desired_min, color="orange", linestyle="--", alpha=0.7, 
                   label=f"Scale-in threshold (est. ~{desired_min})")
        ax2.set_ylabel("Requests Per Instance", color="tab:green")
        ax2.set_xlabel("Time")  # Only bottom plot gets x-label
        ax2.tick_params(axis="y", labelcolor="tab:green")
        ax2.grid(True, alpha=0.3)
        ax2.legend()
        
        # Add instance count as text annotations for scaling events
        if not scaling_df.empty:
            for _, event in scaling_df[scaling_df["StatusCode"] != "Simulated"].iterrows():
                ax2.axvline(x=event["Timestamp"], color="gray", linestyle=":", alpha=0.5)
                ax2.text(event["Timestamp"], ax2.get_ylim()[1] * 0.9, 
                        f"Inst: {int(event['NewCapacity'])}", 
                        rotation=90, va='top', ha='right', fontsize=8)

    # Set common x-axis limits
    for ax in [ax1, ax2]:
        ax.set_xlim(x_min, x_max)
    
    # Format x-axis ticks to be more readable
    plt.xticks(rotation=45)
    plt.tight_layout()
    
    fig.suptitle(f"Concurrent Requests per Instance Scaling: {endpoint_name}/{variant_name}\n"
                f"Min: {desired_min}, Max: {desired_max} instances", fontsize=14)
    plt.subplots_adjust(top=0.92)  # Make room for suptitle
    plt.show()

    # Display the data
    print("📊 CloudWatch Metrics (with per-instance calculations):")
    display_cols = ["Timestamp", "Average", "InstanceCount", "RequestsPerInstance"] if "RequestsPerInstance" in cw_df.columns else ["Timestamp", "Average"]
    display(cw_df[display_cols].head(10))
    
    if not scaling_df.empty:
        print("\n⚡ Scaling Activities:")
        display(scaling_df[["Timestamp", "NewCapacity", "StatusCode", "StatusMessage"]].head(10))
        
        # Show summary statistics
        if "RequestsPerInstance" in cw_df.columns:
            print(f"\n📈 Requests Per Instance Statistics:")
            print(f"   Average: {cw_df['RequestsPerInstance'].mean():.2f}")
            print(f"   Maximum: {cw_df['RequestsPerInstance'].max():.2f}")
            print(f"   Minimum: {cw_df['RequestsPerInstance'].min():.2f}")
            print(f"   Std Dev: {cw_df['RequestsPerInstance'].std():.2f}")

# <span style="color:DarkSeaGreen">SageMaker Inference Recommender</span>
- Helps you select the best instance type and configuration for your ML models and workloads
- Suitable for traditional ML and deep learning models - **no documented usage for LLMs** - added here for reference only
- https://docs.aws.amazon.com/sagemaker/latest/dg/inference-recommender.html
- NOTE NEEDS TO BE INVESTIGATED TO SEE IF IT WORKS WITH LLM REAL TIME ENDPOINTS

# <span style="color:DarkSeaGreen">Clean Up Architecture</span>
### <span style="color:Red">Only do this if you have finished with this lab and any labs that depend on it!</span>
##### It will delete all architecture created, make sure you no longer need any of it!!!

In [None]:
# delete the alarms
for alarm in alarms:
    cw.delete_alarms(AlarmNames=[alarm['AlarmName']])

print ('Done! Move to the next cell ->')

In [None]:
# delete the auto scaling policy and scalable target if you want to clean up
# first delete the scaling policy
response = autoScaling.delete_scaling_policy(
    PolicyName=myEndpointScalingOutPolicyName,
    ServiceNamespace="sagemaker",
    ResourceId=ResourceId,
    ScalableDimension="sagemaker:variant:DesiredInstanceCount"
)
print(response)

response = autoScaling.delete_scaling_policy(
    PolicyName=myEndpointScalingInPolicyName,
    ServiceNamespace="sagemaker",
    ResourceId=ResourceId,
    ScalableDimension="sagemaker:variant:DesiredInstanceCount"
)
print(response)

print ('Done! Move to the next cell ->')


In [None]:
# then deregister the scalable target
response = autoScaling.deregister_scalable_target(
    ServiceNamespace="sagemaker",
    ResourceId=ResourceId,
    ScalableDimension="sagemaker:variant:DesiredInstanceCount"
)
print(response)
print ('Done! Move to the next cell ->')

In [None]:
# endpoint and all other architecture created in lab 1 can be deleted in lab 1 notebook