# Validate Your New ML model Performance in Production using SageMaker Shadow Testing
Shadow testing in Amazon SageMaker is a unique capability that allows you to evaluate new machine learning models or infrastructure changes by comparing their performance against currently deployed production systems without impacting end users. Shadow Testing is designed to help MLOps engineers and developers catch any potential configuration errors before deploying the model live which could impact users. Another benefit of using Shadow Testing is to evaluate operational metrics such as latency, throughput and error rate against live production traffic to give you a realistic benchmark to work backward from. 

Shadow Testing helps eliminate weeks of time spent building infrastructure for shadow testing, so you can release models to production faster.

## How It Works
The process involves two key components:

* A production variant that receives and responds to 100% of incoming inference requests
* A shadow variant that receives a copy of the requests but doesn't return responses to the caller. You can optionally turn on data capture to save request and/or response to an S3 bucket of your choice.

Here's an architecture diagram that depicts Shadow Testing on SageMaker:

<!-- ![shadow test diagram](img/shadow-test-diagram.png) -->

<img src="img/shadow-test-diagram.png" width="800">

## Common Use Cases
* You’re considering promoting a new model that has been validated offline to production, but want to evaluate operational performance metrics, such as latency, error rate, and so on, before making this decision.
  
* You’re considering changes to your serving infrastructure container, such as patching vulnerabilities or upgrading to newer versions, and want to assess the impact of these changes prior to promotion to production.
  
* You’re considering changing your ML instance and want to evaluate how the new instance would perform with live inference requests.

<br>
<div class="alert alert-info"> 
<b>Note:</b> SageMaker AI supports a maximum of one shadow variant per endpoint. For an endpoint with a shadow variant, there can be a maximum of one production variant. 
</div>
</br>

In this lab, we'll walk you through setting up a Shadow Testing for the Two Tower Retrieval model that we built in the previous labs. You can create a Shadow Test from SageMaker AI Console, or using API calls. We 'll use the API calls so that you understand how the end to end process works. 

If you missed them please go back and run these labs in order: [00-start-here.ipynb](00-start-here.ipynb), [01-sm-training.ipynb](01-sm-training.ipynb), [02-sm-inference.ipynb](02-sm-inference.ipynb)


Install additional dependencies

In [None]:
%pip install Pillow -q -U

Import python libraries

In [None]:
import sagemaker
import boto3
from sagemaker.pytorch.model import PyTorchModel
from sagemaker.local import LocalSession
from sagemaker.model import Model
from sagemaker import get_execution_role
from sagemaker.serializers import JSONSerializer
from sagemaker.deserializers import JSONDeserializer
import numpy as np
import json
from PIL import Image
import io
from sagemaker.s3 import S3Uploader
from datetime import datetime
from sagemaker import image_uris

Retrieve the stored variables from previous labs

In [None]:
%store -r

Setting up a proper IAM role, a sagemaker session and model URL for a Shadow Testing deployment.

In [None]:
# session = LocalSession() # uncomment for running in local mode.
# session.config = {'local': {'local_code': True} } # uncomment for running in local mode.
session = sagemaker.Session()
role = get_execution_role()
model_data_url = model_serving_data_s3_uri
bucket = session.default_bucket()
prefix = "models/shadow-test"

sm = boto3.client("sagemaker")
sm_runtime = boto3.client("sagemaker-runtime")

Download a copy of the current model artifacts.

In [None]:
!aws s3 cp {model_data_url} model/prod.tar.gz
!aws s3 cp {model_data_url} model/shadow.tar.gz

In the following cell, we are introducing a small latency to the shadow model variant by adding a delay (0.5s) in the inference script. 
After the change, we'll deploy both models as production and shadow endpoints, and run load tests against these endponts. Finally, we'll observe the performance metrics of each endpoint. We expect the latency metrics for the shadow endpoint to be higher than the production variant. 

In [None]:
!rm -rf temp/ && mkdir -p temp && cd temp && tar -xvzf ../model/shadow.tar.gz >/dev/null 2>&1 && cp ../shadow/inference.py code/inference.py && tar -cvzf ../model/shadow.tar.gz . >/dev/null 2>&1 && cd .. && rm -rf temp

Uploads these artifacts into separate folders in S3. This step is done to simulate a Shadow and Production variants.

In [None]:
prod_model_url = S3Uploader.upload(
    local_path="model/prod.tar.gz",
    desired_s3_uri=f"s3://{bucket}/{prefix}",
)
shadow_model_url = S3Uploader.upload(
    local_path="model/shadow.tar.gz",
    desired_s3_uri=f"s3://{bucket}/{prefix}",
)

Provide a unique name to the production and shadow models.

In [None]:
prod_model_name = f"two-tower-prod-{datetime.now():%Y-%m-%d-%H-%M-%S}"
shadow_model_name = f"two-tower-shadow-{datetime.now():%Y-%m-%d-%H-%M-%S}"

Use SageMaker SDK to retrieve the ECR container image URI for the deployment

In [None]:
inference_image_uri = sagemaker.image_uris.retrieve(
    framework='pytorch',
    region=boto3.Session().region_name, 
    version='2.5.1',     # PyTorch version
    py_version='py311',  # Python version
    image_scope='inference',
    instance_type='ml.g4dn.2xlarge' 
)

Once the models are uploaded to S3, we'll use the boto3 client to create production and shadow SageMaker models.

In [None]:
prod_model_s3_data_url = f"s3://{bucket}/{prefix}/prod.tar.gz"
shadow_model_s3_data_url = f"s3://{bucket}/{prefix}/shadow.tar.gz"

resp = sm.create_model(
    ModelName=prod_model_name,
    ExecutionRoleArn=role,
    Containers=[{"Image": inference_image_uri, "ModelDataUrl": prod_model_s3_data_url, "Environment" : {
        "MASTER_ADDR" : "localhost", 
        "MASTER_PORT" : "12356", 
        "CUDA_VISIBLE_DEVICES" : "0",
        "LOCAL_RANK" : "0",
        "WORLD_SIZE" : "1"
    }}],
)

resp = sm.create_model(
    ModelName=shadow_model_name,
    ExecutionRoleArn=role,
    Containers=[{"Image": inference_image_uri, "ModelDataUrl": shadow_model_s3_data_url, "Environment" : {
        "MASTER_ADDR" : "localhost", 
        "MASTER_PORT" : "12356", 
        "CUDA_VISIBLE_DEVICES" : "0",
        "LOCAL_RANK" : "0",
        "WORLD_SIZE" : "1"
    }}],
    
)

In the following step, we create an endpoint config with the production and shadow variants. The `ProductionVariants` and `ShadowProductionVariants` are of particular interest. Both these variants have ml.g4dn.2xlarge instances and the initial instance count is set to 1. We can provide these information with a single API call `create_endpoint_config`.

In [None]:
ep_config_name = f"Shadow-EpConfig-{datetime.now():%Y-%m-%d-%H-%M-%S}"
production_variant_name = "production"
shadow_variant_name = "shadow"
create_endpoint_config_response = sm.create_endpoint_config(
    EndpointConfigName=ep_config_name,
    ProductionVariants=[
    # Type: Array of ProductionVariant (https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_ProductionVariant.html) objects
      {
            "VariantName": production_variant_name,
            "ModelName": prod_model_name,
            "InstanceType": "ml.g4dn.2xlarge",
            "InitialInstanceCount": 1,
            "InitialVariantWeight": 1,
      }
    ],
     # Type: Array of ShadowProductionVariants 
    ShadowProductionVariants = [
      {
        "VariantName": shadow_variant_name,  
         "ModelName": shadow_model_name,
         "InitialInstanceCount": 1,
         "InitialVariantWeight": 1,
         "InstanceType": "ml.g4dn.2xlarge" 
      }
   ]
)

After the endpoint configuration for production and shadow endpoints are created, we can now proceed with deploying an endpoint with production and shadow variant.

In [None]:
endpoint_name = f"two-tower-prod-shadow-{datetime.now():%Y-%m-%d-%H-%M-%S}"
create_endpoint_api_response = sm.create_endpoint(
                                    EndpointName=endpoint_name,
                                    EndpointConfigName=ep_config_name,
                                )

We'll wait for the endpoint to be ready. This step should take about 10 minutes.

In [None]:
import boto3
import time

def wait_for_endpoint(client, endpoint_name, timeout_seconds=1200):
    """
    Wait for SageMaker endpoint to be ready
    
    Args:
        client: Boto3 SageMaker client
        endpoint_name: Name of the endpoint
        timeout_seconds: Maximum time to wait in seconds
        
    Returns:
        True if endpoint is ready, False if timeout occurred
    """
    start_time = time.time()
    
    while time.time() - start_time < timeout_seconds:
        response = client.describe_endpoint(EndpointName=endpoint_name)
        status = response['EndpointStatus']
        
        if status == 'InService':
            return True
        
        if status == 'Failed':
            raise Exception(f"Endpoint creation failed: {response['FailureReason']}")
            
        time.sleep(30)
        
    return False

In [None]:
# Wait for endpoint to be ready
is_ready = wait_for_endpoint(sm, endpoint_name)

if is_ready:
    print("Endpoint is ready")
else:
    print("Endpoint creation timed out")


Now that the endpoint is ready, let's begin with some test. In the following code, we will send 1000 requests to the endpoint iteratively. After the call is complete, we'll evaluate the performance metrics for each variant. 

In [None]:
def invoke_endpoint(endpoint_name, wait_interval_sec=0.01, should_raise_exp=False, iterations=1000):
    payload = {"inputs": [1, 2, 3]}
    for i in range(iterations):
        response = sm_runtime.invoke_endpoint(EndpointName=endpoint_name, ContentType="application/json", Body=json.dumps(payload))
        if i == 0:
            response_body = response['Body'].read().decode('utf-8')
            print(response_body)



In [None]:
response = invoke_endpoint(endpoint_name)

# SageMaker AI endpoint invocation metrics
By default, when you deploy a model with SageMaker, SageMaker automatically emits important metrics to Cloudwatch that helps you monitor and understand the health of the deployed endpoint. 
For a complete list of metrics please refer to [this](https://docs.aws.amazon.com/sagemaker/latest/dg/monitoring-cloudwatch.html) link.

**Note:** Metrics are available at a 1-minute frequency.

The following illustration shows how a SageMaker AI endpoint interacts with the Amazon SageMaker Runtime API. The overall time between sending a request to an endpoint and receiving a response depends on the following three components.

* Network latency – the time that it takes between making a request to and receiving a response back from the SageMaker Runtime Runtime API.

* Overhead latency – the time that it takes to transport a request to the model container from and transport the response back to the SageMaker Runtime Runtime API.

* Model latency – the time that it takes the model container to process the request and return a response.

![sagemaker cloudwatch metrics](img/sm-metrics-cloudwatch.png)

In the following section, we'll explore some of the key metrics for both the production and shadow variants including the visualization. Similarly, you can access all the metrics relevant the the endpoint in the Amazon Cloudwatch console. 


Creating a cloudwatch client to help us bring visualization into the notebook:

In [None]:
cw_client = boto3.client("cloudwatch")

In [None]:
def display_cw_metrics(endpoint_name):
    images = []
    stat = "Average"
    # Container/Model Latency
    metrics = [
        [ "AWS/SageMaker", "ModelLatency", "EndpointName", endpoint_name, "VariantName", "production" ],
        [ "AWS/SageMaker", "ModelLatency", "EndpointName", endpoint_name, "VariantName", "shadow" ]]

    metric_widget = {
        "metrics": metrics,
        "view": "timeSeries",
        "stacked": False,
        "stat": stat,
        "period": 5,
        "width": 1000,
        "height": 200,
    }
    response = cw_client.get_metric_widget_image(
        MetricWidget=json.dumps(metric_widget)
    )
    
    images.append(Image.open(io.BytesIO(response["MetricWidgetImage"])))

    # Container CPU Utilization
    metrics = [[ "/aws/sagemaker/Endpoints", "CPUUtilization", "EndpointName", endpoint_name, "VariantName", "production"],
              [ "/aws/sagemaker/Endpoints", "CPUUtilization", "EndpointName", endpoint_name, "VariantName", "shadow"]]

    metric_widget = {
        "metrics": metrics,
        "view": "timeSeries",
        "stacked": False,
        "stat": stat,
        "period": 5,
        "width": 1000,
        "height": 200,
    }
    response = cw_client.get_metric_widget_image(
        MetricWidget=json.dumps(metric_widget)
    )

    images.append(Image.open(io.BytesIO(response["MetricWidgetImage"])))


    # Container Memory Utilization
    metrics = [
            [ "/aws/sagemaker/Endpoints", "MemoryUtilization", "EndpointName", endpoint_name, 
             "VariantName", "production"],
            [ "/aws/sagemaker/Endpoints", "MemoryUtilization", "EndpointName", endpoint_name, 
             "VariantName", "shadow"]]

    metric_widget = {
        "metrics": metrics,
        "view": "timeSeries",
        "stacked": False,
        "stat": stat,
        "period": 5,
        "width": 1000,
        "height": 200,
    }
    response = cw_client.get_metric_widget_image(
        MetricWidget=json.dumps(metric_widget)
    )

    images.append(Image.open(io.BytesIO(response["MetricWidgetImage"])))

    # Invocattions 
    metrics = [[ "AWS/SageMaker", "Invocations", "EndpointName", endpoint_name, "VariantName", "production" ],
        [ "AWS/SageMaker", "Invocations", "EndpointName", endpoint_name, "VariantName", "shadow" ]]

    metric_widget = {
        "metrics": metrics,
        "view": "timeSeries",
        "stacked": False,
        "stat": "Sum",
        "period": 5,
        "width": 1000,
        "height": 200,
    }
    response = cw_client.get_metric_widget_image(
        MetricWidget=json.dumps(metric_widget)
    )

    images.append(Image.open(io.BytesIO(response["MetricWidgetImage"])))
    
    for image in images:
        image.show()

In [None]:
display_cw_metrics(endpoint_name)

# Update to make Shadow Variant primary

In [None]:
promote_ep_config_name = f"PromoteShadow-EpConfig-{datetime.now():%Y-%m-%d-%H-%M-%S}"

create_endpoint_config_response = sm.create_endpoint_config(
    EndpointConfigName=promote_ep_config_name,
    ProductionVariants=[
        {
            "VariantName": shadow_variant_name,
            "ModelName": shadow_model_name,
            "InstanceType": "ml.g5.xlarge",
            "InitialInstanceCount": 1,
            "InitialVariantWeight": 1.0,
        }
    ],
)
print(f"Created EndpointConfig: {create_endpoint_config_response['EndpointConfigArn']}")

update_endpoint_api_response = sm.update_endpoint(
    EndpointName=endpoint_name,
    EndpointConfigName=promote_ep_config_name,
)

In [None]:
# Wait for endpoint to be ready
is_ready = wait_for_endpoint(sm, endpoint_name)

if is_ready:
    print("Endpoint is ready")
else:
    print("Endpoint creation timed out")
    
sm.describe_endpoint(EndpointName=endpoint_name)

In [None]:
response = invoke_endpoint(endpoint_name, iterations=1)
response

### Clean Up

In [None]:
# Delete the SageMaker endpoint and the attached resources
sm.delete_endpoint(EndpointName=endpoint_name)
sm.delete_endpoint_config(EndpointConfigName=ep_config_name)
sm.delete_endpoint_config(EndpointConfigName=promote_ep_config_name)
sm.delete_model(ModelName=prod_model_name)
sm.delete_model(ModelName=shadow_model_name)

# Next Step
Congratulations! You've complete all the labs. 