## AIM 406: Reinvent 2022 Notebook
### TensorFlow U-Net Optimization
 
<b>Credits</b>: This is an adaptation and extension of the following [example notebook](https://github.com/aws/amazon-sagemaker-examples/blob/main/aws_sagemaker_studio/sagemaker_neo_compilation_jobs/tensorflow_unet/sagemaker-neo-tf-unet.ipynb) from SageMaker Examples.

The notebook contains the following performance optimization techniques
- TensorFlow Serving Container Environment Variable Tuning
- Neo Compilation
- Load Testing & AutoScaling

Note that distributed locust test will most likely crash whatever Notebook Instance you are using. It is recommended to run this script on EC2 or Kubernetes, so that the client side has enough workers/power to handle the distributed test. For the purpose of the demo we will be running the test on an EC2 instance of type c6i.32xlarge. To get a better understanding on setting up an EC2 instance and the Locust load test please reference this repository's [README](https://github.com/aws-samples/load-testing-sagemaker-endpoints).

- Notebook Setting:
    - Kernel: conda_tensorflow2_p38
    - Notebook Instance: ml.g4dn.12xlarge

## Setup

In [None]:
!pip install -U --quiet --upgrade "sagemaker"
!pip install -U --quiet "tensorflow==1.15.3"

In [None]:
import tarfile
import numpy as np
import sagemaker
import time
from sagemaker.utils import name_from_base

In [None]:
from sagemaker import get_execution_role
from sagemaker.session import Session

role = get_execution_role()
sess = Session()
region = sess.boto_region_name
bucket = sess.default_bucket()

### Retreive Model Data

In [None]:
model_name = "unet_medical"
export_path = "export"
model_archive_name = "unet-medical.tar.gz"
model_archive_url = "https://sagemaker-neo-artifacts.s3.us-east-2.amazonaws.com/{}".format(
    model_archive_name
)

In [None]:
!wget {model_archive_url}

In [None]:
!tar -xvzf unet-medical.tar.gz

Understand model input/output data formats for Neo compilation

In [None]:
import os

model_path = os.path.join(export_path, "Servo/1")
!saved_model_cli show --all --dir {model_path}

In [None]:
model_data = Session().upload_data(path=model_archive_name, key_prefix="model")
print("model uploaded to: {}".format(model_data))

## Endpoint Creation and Inference

In [None]:
from sagemaker.tensorflow.serving import Model

instance_type = "ml.g4dn.16xlarge"
framework = "TENSORFLOW"
framework_version = "1.15.3"

### Environment Variable Tuning

Depending on the framework you are working with there are a number of container level variables that you can tune. For TensorFlow reference the following [blog](https://aws.amazon.com/blogs/machine-learning/maximize-tensorflow-performance-on-amazon-sagemaker-endpoints-for-real-time-inference/) and [serving code](https://github.com/aws/sagemaker-tensorflow-serving-container/blob/master/docker/build_artifacts/sagemaker/serve.py#L40-L70). For this example we tune the following environment variable to maximize the throughput our endpoint can achieve.

- SageMaker Gunicorn Workers
- SAGEMAKER_TFS_INTER_OP_PARALLELISM
- SAGEMAKER_TFS_INTRA_OP_PARALLELISM
- SAGEMAKER_TFS_INSTANCE_COUNT

In [None]:
sm_model = Model(model_data=model_data, framework_version=framework_version, role=role,
                 env= {
                    'SAGEMAKER_GUNICORN_WORKERS': '64',
                    'SAGEMAKER_TFS_INTER_OP_PARALLELISM': '1',
                    'SAGEMAKER_TFS_INTRA_OP_PARALLELISM': '1',
                    'SAGEMAKER_TFS_INSTANCE_COUNT': '8'})

In [None]:
uncompiled_predictor = sm_model.deploy(initial_instance_count=1, instance_type=instance_type)

### Payload Retrieval

In [None]:
sample_img_fname = "cell-4.png"
sample_img_url = "https://sagemaker-neo-artifacts.s3.us-east-2.amazonaws.com/{}".format(
    sample_img_fname
)

In [None]:
!wget {sample_img_url}

In [None]:
# read the image file into a tensor (numpy array)
!pip install --quiet opencv-python
#!apt-get update -q && apt-get install ffmpeg libsm6 libxext6  -y -q

import cv2

image = cv2.imread(sample_img_fname)
original_shape = image.shape

In [None]:
image = np.resize(image, (256, 256, 3))
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
image = np.asarray(image)
image = np.expand_dims(image, axis=0)

In [None]:
payload = json.dumps(image.tolist()) #serialize payload to JSON

### Sample Inference with Boto3 and SageMaker SDK

For the difference between the two SDKs please reference this [article](https://towardsdatascience.com/sagemaker-python-sdk-vs-boto3-sdk-45c424e8e250).

In [None]:
import boto3
imgetsizeofson
import os
import joblib
import pickle
import tarfile
import sagemaker
from sagemaker.estimator import Estimator
import time
from time import gmtime, strftime
import subprocess


#Setup
client = boto3.client(service_name="sagemaker")
runtime = boto3.client(service_name="sagemaker-runtime")
boto_session = boto3.session.Session()
s3 = boto_session.resource('s3')
region = boto_session.region_name
print(region)

In [None]:
endpoint_name = 'tensorflow-inference-2022-11-26-03-09-03-293' #replace with your endpoint name
response = runtime.invoke_endpoint(EndpointName=endpoint_name,
                                   ContentType='application/json',
                                   Body=payload)
result = json.loads(response['Body'].read().decode())
#result

In [None]:
start_time = time.time()

# get a prediction from the endpoint
# the image input is automatically converted to a JSON request.
# the JSON response from the endpoint is returned as a python dict
result = uncompiled_predictor.predict(image)
print("Prediction took %.2f seconds" % (time.time() - start_time))

In [None]:
uncompiled_results = []
endpoint_name = 'tensorflow-inference-2022-11-26-03-09-03-293'

for _ in range(100):
    start = time.time()
    response = runtime.invoke_endpoint(EndpointName=endpoint_name,
                                   ContentType='application/json',
                                   Body=payload)
    uncompiled_results.append((time.time() - start) * 1000)

print("\nPredictions for un-compiled model: \n")
print("\nP95: " + str(np.percentile(uncompiled_results, 95)) + " ms\n")
print("P90: " + str(np.percentile(uncompiled_results, 90)) + " ms\n")
print("P50: " + str(np.percentile(uncompiled_results, 50)) + " ms\n")
print("Average: " + str(np.average(uncompiled_results)) + " ms\n")

## Neo Compilation and Deployment

In [None]:
# Replace the value of data_shape below and
# specify the name & shape of the expected inputs for your trained model in JSON
# Note that -1 is replaced with 1 for the batch size placeholder
data_shape = {"inputs": [1, 224, 224, 3]}

instance_family = "ml_g4dn"

compilation_job_name = name_from_base("medical-tf-Neo")
# output path for compiled model artifact
compiled_model_path = "s3://{}/{}/output".format(bucket, compilation_job_name)

In [None]:
optimized_estimator = sm_model.compile(
    target_instance_family=instance_family,
    input_shape=data_shape,
    job_name=compilation_job_name,
    role=role,
    framework=framework.lower(),
    framework_version=framework_version,
    output_path=compiled_model_path,
)

In [None]:
optimized_predictor = optimized_estimator.deploy(
    initial_instance_count=1, instance_type=instance_type
)

In [None]:
start_time = time.time()

# get a prediction from the endpoint
# the image input is automatically converted to a JSON request.
# the JSON response from the endpoint is returned as a python dict
result = optimized_predictor.predict(image)
print("Prediction took %.2f seconds" % (time.time() - start_time))

In [None]:
compiled_results = []
#test_input = {"instances": np.asarray(shape_input).tolist()}
# Warmup inference.
optimized_predictor.predict(image)
# Inferencing 100 times.
for _ in range(100):
    start = time.time()
    optimized_predictor.predict(image)
    compiled_results.append((time.time() - start) * 1000)

print("\nPredictions for compiled model: \n")
print("\nP95: " + str(np.percentile(compiled_results, 95)) + " ms\n")
print("P90: " + str(np.percentile(compiled_results, 90)) + " ms\n")
print("P50: " + str(np.percentile(compiled_results, 50)) + " ms\n")
print("Average: " + str(np.average(compiled_results)) + " ms\n")

In [None]:
endpoint_name = 'tensorflow-inference-ml-g4dn-2022-11-26-03-21-16-793' #replace with compiled endpoint name
response = runtime.invoke_endpoint(EndpointName=endpoint_name,
                                   ContentType='application/json',
                                   Body=payload)
result = json.loads(response['Body'].read().decode())
#result

In [None]:
endpoint_name = 'tensorflow-inference-ml-g4dn-2022-11-26-03-21-16-793'
for _ in range(100):
    start = time.time()
    response = runtime.invoke_endpoint(EndpointName=endpoint_name,
                                   ContentType='application/json',
                                   Body=payload)
    compiled_results.append((time.time() - start) * 1000)

print("\nPredictions for compiled model: \n")
print("\nP95: " + str(np.percentile(compiled_results, 95)) + " ms\n")
print("P90: " + str(np.percentile(compiled_results, 90)) + " ms\n")
print("P50: " + str(np.percentile(compiled_results, 50)) + " ms\n")
print("Average: " + str(np.average(compiled_results)) + " ms\n")

## AutoScaling

For a full AutoScaling breakdown, reference this [blog](https://towardsdatascience.com/autoscaling-sagemaker-real-time-endpoints-b1b6e6731c59) and [code samples](https://github.com/RamVegiraju/SageMaker-Deployment/tree/master/AdvancedFunctionality/AutoScaling).

In [None]:
# AutoScaling client
asg = boto3.client('application-autoscaling')
endpoint_name = 

# Resource type is variant and the unique identifier is the resource ID.
resource_id=f"endpoint/{endpoint_name}/variant/AllTraffic"

# scaling configuration
response = asg.register_scalable_target(
    ServiceNamespace='sagemaker', #
    ResourceId=resource_id,
    ScalableDimension='sagemaker:variant:DesiredInstanceCount', 
    MinCapacity=1,
    MaxCapacity=4
)

#Target Scaling: Once invocations reach 10, the instance will scale out to 4 within 30 seconds and cool back down in 500 seconds.
response = asg.put_scaling_policy(
    PolicyName=f'Request-ScalingPolicy-{endpoint_name}',
    ServiceNamespace='sagemaker',
    ResourceId=resource_id,
    ScalableDimension='sagemaker:variant:DesiredInstanceCount',
    PolicyType='TargetTrackingScaling',
    TargetTrackingScalingPolicyConfiguration={
        'TargetValue': 10.0, # Threshold
        'PredefinedMetricSpecification': {
            'PredefinedMetricType': 'SageMakerVariantInvocationsPerInstance',
        },
        'ScaleInCooldown': 500, # duration until scale in
        'ScaleOutCooldown': 30 # duration between scale out
    }
)

Sending requests for a certain duration so we can capture the AutoScaling of the endpoint.

In [None]:
request_duration = 400
endpoint_name='tensorflow-inference-2022-11-14-20-31-52-879'
end_time = time.time() + request_duration
print(f"test will run for {request_duration} seconds")
while time.time() < end_time:
    response = runtime_client.invoke_endpoint(EndpointName=endpoint_name, 
                                   ContentType='application/x-image',     
                                   Body=birds_image)

We can monitor the endpoint scaling out.

In [None]:
sm_client = boto3.client(service_name='sagemaker')
response = sm_client.describe_endpoint(EndpointName=endpoint_name)
status = response['EndpointStatus']
print("Status: " + status)


while status=='Updating':
    time.sleep(1)
    response = sm_client.describe_endpoint(EndpointName=endpoint_name)
    status = response['EndpointStatus']
    instance_count = response['ProductionVariants'][0]['CurrentInstanceCount']
    print(f"Status: {status}")
    print(f"Current Instance count: {instance_count}")

### Write payload to S3 for Load Testing with Locust

In [None]:
# write the payload to a text file for our locust script to read
text_file = open("unet-payload.txt", "w")
n = text_file.write(payload)
text_file.close()

In [None]:
input_file = open('unet-payload.txt', 'r')
text_payload = input_file.read()

In [None]:
endpoint_name = 'tensorflow-inference-ml-g4dn-2022-11-26-03-21-16-793'
response = runtime.invoke_endpoint(EndpointName=endpoint_name,
                                   ContentType='application/json',
                                   Body=text_payload)
result = json.loads(response['Body'].read().decode())
#result

In [None]:
!aws s3 cp unet-payload.txt s3://{bucket}/unet-payload.txt