# Tensorflow Model Zoo SageMaker Serving - Benchmark

This notebook illustrates how to serve models from [Tensorflow Model Zoo](https://github.com/tensorflow/models/) and runs a benchmark on some [Object Detection](https://github.com/tensorflow/models/tree/master/research/object_detection) models. We'll walk you through:

- Which libraries are needed to process and serve the models
- How to retrieve, repackage and make the models available to SageMaker for serving
- Benchmark illustrating how to create and invoke SageMaker inference endpoints.

## Libraries to be imported

In [1]:
import os
import sys
import pathlib
import tensorflow as tf
import numpy as np
from PIL import Image
from shutil import copytree, rmtree
from glob import glob
from time import sleep
import boto3
import botocore
import tarfile
import logging




In [2]:
ZOO_BUCKET = 'FILL IN YOUR OWN BUCKET NAME'
ZOO_DIR = 'tf-model-zoo'  # Change as desired
assert(ZOO_BUCKET != 'FILL IN YOUR OWN BUCKET NAME', "Please provide a bucket name to store repackaged zoo models before proceding.")

## Helper Function to copy [Tensorflow Model Zoo](https://github.com/tensorflow/models) object detection models to S3 and creating SageMaker Model from them.

SageMaker uses Tensorflow Serving, which requires the model to be in a folder structure like this:

```
model_dir/
  |
  +-version/
       |
       +-variables/
       |
       +-saved_model.pb
```

The models available in model zoo object detection are in a tar file, containing a folder structure in the following format:
```
<model_dir>/
  |
  +-<several files>
  |
  +-saved_model/
      |
      +-variables/
      |
      +-saved_model.pb
```

The `load_model_in_s3` function downloads the tar file from the zoo, unpacks it and takes the `saved_model` directory. It then restructures its content to the structure above, packs it into a `model.tar.gz` file which gets uploaded to the specified bucket and path, under a folder named after the model. For simplicity, `<model_dir>` defaults to "model". 

The `download_and_create_model` function continues the process, calling `load_model_in_s3` and then creating a [`sagemker.tensorflow.serving.Model`](https://sagemaker.readthedocs.io/en/stable/sagemaker.tensorflow.html#tensorflow-serving-model) object from the `model.tar.gz` file uploaded to s3. It can take additional parameters, which will be passed on to the `Model` initializer.

In [3]:
def wait_for_dir(path, timeout=100):
    """
    Waits for path to not exist up to timeout * 6 seconds. This is needed if several executions are running in parallel, potentially trying to repackage the same model.
    """
    logging.debug(f'Waiting for {path}')
    i = 0
    while os.path.exists(path) and i < timeout:
        sleep(6)
        i += 1
    if os.path.exists(path):
        raise TimeoutError(f'Path {path} still exists after {6*timeout} seconds.')
        
def s3_obj_exists(bucket, key):
    """
    Checks if a key already exists in a bucket. Useful to avoid rebuilding previously existing models.
    """
    logging.debug(f'Checking existence of s3://{bucket}/{key}')
    s3 = boto3.resource('s3')
    try:
        s3.Object(bucket, key).load()
    except botocore.exceptions.ClientError as e:
        if e.response['Error']['Code'] == "404":
            return(False)
        else:
            raise
    else:
        return(True)

def load_model_in_s3(model_name, s3_bucket=ZOO_BUCKET, s3_path=ZOO_DIR, base_url = 'http://download.tensorflow.org/models/object_detection/'):
    """
    Downloads an object detection model from Tensorflow Model Zoo, reorganizes the directory structure to be compatible with SageMaker and Tensorflow Serving
    and copies it to the specified S3 location.
    params:
        model_name: exact name of the model as specified at https://github.com/tensorflow/models/blob/master/research/object_detection/g3doc/detection_model_zoo.md
        s3_bucket:  name of an S3 bucket where to store the repackaged model.tar.gz
        s3_path:    path where to store the model.tar.gz inside the bucket. The model name will be appended to it as a folder, and the model.tar.gz will be uploaded to that destination.
        
    returns: the full S3 object path of the model.tar.gz file, with the format 's3://<s3_bucket>/<s3_path>/<model_name>/model.tar.gz'
    """
    # Waiting for the cache directory to be free for creating and using. Fail if it doesn't get free by 10 minutes. That gives time for other processes to build the model for us to reuse.
    wait_for_dir(f'/tmp/{model_name}')

    # Check if the model file is already on S3 - another process may have created and uploaded it already while we were waiting. If it is there, we only reuse it.
    model_s3_full_path = f's3://{s3_bucket}/{s3_path}/{model_name}/model.tar.gz'
    if not s3_obj_exists(s3_bucket, f'{s3_path}/{model_name}/model.tar.gz'):
        os.makedirs(f'/tmp/{model_name}')
        # Download and untar the model in cahce
        model_dir = tf.keras.utils.get_file(
            fname=model_name, 
            origin=base_url + model_name + '.tar.gz',
            cache_dir=f'/tmp/{model_name}',
            untar=True)

        #Clean up the location for the repackaged model tar
        rmtree(f'/tmp/sm-models/{model_name}', ignore_errors=True)
        #Focus on the saved model directory
        model_dir = pathlib.Path(model_dir)/"saved_model"
        copytree(model_dir.absolute().as_posix(), f'/tmp/sm-models/{model_name}/1')
        
        #Create the repackaged tar
        with tarfile.open(f"/tmp/sm-models/{model_name}.tar.gz", "w:gz") as tar:
            for name in glob(f'/tmp/sm-models/{model_name}/*'):
                tar.add(name, arcname=name.split('/')[-1])

        #Upload the repackaged tar to S3
        s3 = boto3.client('s3')
        s3.upload_file(f"/tmp/sm-models/{model_name}.tar.gz", Bucket=s3_bucket, Key=f'{s3_path}/{model_name}/model.tar.gz')
        #Clean up the downloaded cache
        rmtree(f'/tmp/{model_name}', ignore_errors=True)
    return(model_s3_full_path)

In [4]:
from sagemaker.tensorflow.serving import Model
import sagemaker
import urllib

def download_and_create_model(model_name, bucket=None, bucket_path='tf-model-zoo', role=None, **kwargs):
    """
    Downloads a model from Tensorflow Model Zoo, repackages it and returns a SageMaker Model instance of it.
    """
    if bucket is None:
        bucket = sagemaker.session.Session().default_bucket()
    if role is None:
        role=sagemaker.get_execution_role()
    try:
        model_tar = load_model_in_s3(model_name, bucket, bucket_path)
    except urllib.error.HTTPError:
        raise ValueError(f'Model {model_name} not found on Tensorflow Model Zoo.')
    adj_model_name = model_name.replace("_", "-").replace(".", "-")[:45]
    model = Model(name=adj_model_name, model_data=model_tar, role=role, **kwargs)
    return(model)

## Helper Functions to Benchmark models on a directory of JPG images

These functions simply execute inference and return the prediction from one or several images. For a more elaborate version that loads the categories and displays the image with bounding boxes and probabilities, please refer to the [Object detection API demo notebook](https://github.com/tensorflow/models/blob/master/research/object_detection/object_detection_tutorial.ipynb) in the Tensorflow Model Zoo repo.

In [5]:
# These images are known to consistently cause some models to fail. If you are using a dataset other than the coco 2017 test, you may have to run a few tests and change this list.

bad_images = ['../data/000000001688.jpg',
              '../data/000000002240.jpg',
              '../data/000000000913.jpg',
              '../data/000000004208.jpg',
              '../data/000000000078.jpg',
              '../data/000000000073.jpg',
              '../data/000000002758.jpg',
              '../data/000000003947.jpg',
              '../data/000000003517.jpg',
              '../data/000000003242.jpg',
              '../data/000000000263.jpg',
              '../data/000000003293.jpg'
             ]

In [6]:
def load_images(image_path, max_images=10, skip_images=None):
    """
    Preloads an array of images from a path for faster inference
    """
    if skip_images is None:
        skip_images = []
    logging.debug(f'Starting to load images from {image_path} (cwd: {os.getcwd()})')
    images = [(im_file, np.array(Image.open(im_file))) 
              for im_file in glob(f'{image_path}/*.jpg')[:max_images] 
                  if im_file not in skip_images]
    return(images)

def predict_images(predictor, image_path, max_images=10, skip_images=None):
    """
    does inference for a number of images from a path
    """
    images = load_images(image_path, max_images, skip_images)
    predictions = [(imfile, predict_image(predictor, image)) for imfile, image in images]
    logging.warning([f"{imfile} had no prediction" for imfile, output in predictions if len(output) == 0])
    return(predictions)

def predict_image(predictor, image):
    """
    Does inference for one preloaded image as a numpy array
    """
    input_dict = {'instances': [image.tolist()]}
    try:
        output_dict = predictor.predict(input_dict)    
    except Exception as e:
        output_dict = {}
        logging.error(e)
    return output_dict

## Deploying a Model to SageMaker Tensorflow Serving

So, as the code above shows, a tar file containing the model parameters is loaded into a [Model](https://sagemaker.readthedocs.io/en/stable/model.html) object (in this specific case a [sagemaker.tensorflow.TensorflowModel](https://sagemaker.readthedocs.io/en/stable/sagemaker.tensorflow.html#tensorflow-model)). That object has a `deploy` method that returns a [Predictor](https://sagemaker.readthedocs.io/en/stable/predictors.html) object. That object has the `endpoint` property, containing the url of the prediction endpoint. It can also be used programmatically to generate inferences, through the `predict` method. In the case of [Tensorflow Serving](https://www.tensorflow.org/tfx/guide/serving), the endpoint (and the `predict` method) expect a request with the following body structure:

```
input = {
  'instances': [nested json list with all dimensions]
}
```

This nested structured can be obtained by converting an array to a list object, for instance using the `numpy.ndarray.tolist()` method. The response is a JSON structure like this:
```
{
  'predictions': [{'<prediction one>': [nested json array]}, {...},...]
}
```

For examples on how we create Model and Predictor instances, please check the `download_and_create_model` (above) and `gen_model_instance_profile` (below) functions.

## Benchmark helper functions

The functions below set up an inference endpoint, warm it up to reduce the impact of initial calls and profile the execution of a batch of images. They will be used for benchmarking models on several instance types.

These functions assume that the notebook is running in a subdirectory, and another directory called `data`, at its parent level, contains the test images from the [coco 2017 dataset](http://cocodataset.org/#download). it particularly requires a `000000000009.jpg` file. If you are using another dataset, please adjust the `warmup_predictor` function or its call below.

### Warmup

Warming up the model is an important step right after its deployment. All tests clearly shows it takes a number of calls for the model to reach a consistent inference speed. Tensorflow serving does provide its own warmup processes, but in our case it was straightforward to just make a number of calls before running the benchmark.

In [7]:
import re
import pandas as pd
import cProfile
import io
import pstats

In [19]:
def logged_predict(predictor, body):
    """ Generates a prediction with some stdout output. Optional (can be replaced by predictor.predict(body))"""
    predictor.predict(body)
    sys.stdout.write('.')
    

def warmup_predictor(predictor, n_times=50, startup_image_path='../data/000000000009.jpg'):
    """Warms up the predictor with a fixed image, to reduce the risk of erroneous performance measurements due to initializations and lazy loading."""
    x = np.array(Image.open(startup_image_path))
    body={'instances':[x.tolist()]}
    _ = [logged_predict(predictor, body) for _ in range(n_times)]
    sleep(90)  # To give some space in the graph after warmup
    logging.debug(f'Warmup finished for {predictor.endpoint}.')


def profile_predictor(predictor, images, executions=1):
    """Profiles the execution of a predictor with a batch of preloaded images."""
    pr = cProfile.Profile()
    bodies = [(impath, {'instances':[x.tolist()]}) for impath, x in images]
    pr.enable()
    for _ in range(executions):
        for impath, body in bodies:
            try:
                logged_predict(predictor, body)
            except botocore.exceptions.ConnectionClosedError as e:
                print(f'Failed to predict for {impath}: {e}')
            except:
                print(f'Failed to predict for {impath}')
    pr.disable()
    logging.debug(f'Profiling finished for {predictor.endpoint}.')
    return(pr)


def get_stats(profile, sort=['cumtime'], pct=.1):
    """Retrieves the profile statistics for a previous run, and returns the information in an object
    returns:
        Profile: a namedtuple containing:
            - calls: integer total number of function calls made
            - total_seconds: float measurement of the total execution time in seconds
            - data: a Pandas DataFrame containing the details of individual calls, cumulative time, executions, etc.
    """
    s = io.StringIO()
    ps = pstats.Stats(profile, stream=s).sort_stats(*sort)
    ps.print_stats(pct)
    
    headers = []
    data = []
    calls = None
    total_seconds = None
    for i, line in enumerate(s.getvalue().split("\n")):
        if i == 0:
            try:
                calls = int(re.match(r'\s*(\d+) function calls', line).groups(1)[0])
            except TypeError:
                calls = 0
            try:
                total_seconds = float(re.match(r"in (\d+\.?\d*) seconds", line))
            except TypeError:
                total_seconds = 0
        if i < 5:
            continue
        reduced_line, _ = re.subn("\s+", " ", line)
        if len(reduced_line):
            if reduced_line[0] == ' ':
                reduced_line = reduced_line[1:]
            cols = reduced_line.split(' ')
            if headers and cols:
                data.append({h: v for h, v in zip(headers, cols)})
            if i == 5:
                headers = cols
    stats_data = pd.DataFrame(data)
    return(calls, total_seconds, stats_data)


def gen_model_instance_profile(model, instance, batch_size=100, executions=1):
    """Creates and profiles a predictor for the model and instance type requested.
    params:
        - model: name of the Tensorflow Model Zoo model to use
        - instance: a string defining an acceptable instance type for hosting a SageMaker endpoint ('ml.<family>.<size>')
        - images: a batch of images to run the benchmark on.
        - executions: number of times the whole batch should be processed for profiling
    returns ProfileResults: a namedtuple containing:
            - model: the parameter described above
            - instance_type: the `instance` parameter described above
            - predictor: the sagemaker.Predictor created from the model and instance type passed. Its name is a combination of both.
            - executions: the parameter described above
            - calls: integer total number of function calls made
            - total_seconds: float measurement of the total execution time in seconds
            - data: a Pandas DataFrame containing the details of individual calls, cumulative time, executions, etc.
    """
    model_instance = download_and_create_model(model, framework_version='2.1.0',  container_log_level=10) # DEBUG
    endpoint_name = f'{model_instance.name}-{instance.replace(".", "-")}'
    logging.info(f"Starting profile for endpoint {endpoint_name} on model {model_instance.name}, instance {instance} with {batch_size} images...")
    predictor = model_instance.deploy(
        initial_instance_count=1, instance_type=instance,
        endpoint_name=endpoint_name,
        update_endpoint=False)
    logging.debug(f"Endpoint {predictor.endpoint} created...")
    warmup_predictor(predictor)
    
    images = load_images(image_path='../data', max_images=batch_size, skip_images=bad_images)
    logging.info(f'Loaded {len(images)} images for profiling.')
    profile = profile_predictor(predictor, images, executions=executions)
    
    stats = get_stats(profile)
    results = ProfileResults(
        model=model_instance.name,
        instance_type=instance,
        predictor=predictor.endpoint,
        executions=executions,
        calls=stats[0],
        total_seconds=stats[1]
    )
    stats[2].to_csv(f'stats-{predictor.endpoint}.csv')
    return(model_instance.name, instance, predictor.endpoint, executions, stats[0], stats[1])
    

## Benchmark on the Prediction time for 100 images

The benchmark will be run for several models and several instance types, as listed below. The benchmarks run over the 100 images 10 times, for a total of 100 inference calls.

In [9]:
models = ['faster_rcnn_resnet50_coco_2018_01_28', 'ssd_resnet50_v1_fpn_shared_box_predictor_640x640_coco14_sync_2018_07_03', 'faster_rcnn_inception_resnet_v2_atrous_coco_2018_01_28']
instance_types = ['ml.p3.2xlarge', 'ml.p2.xlarge', 'ml.c5.2xlarge', 'ml.c5.4xlarge', 'ml.m5.4xlarge']

The cells below runs a test of each model on all the instance types defined for benchmarking.

**Note**: running all these instances incurs in costs and can reach service quota limits. Plan your own tests carefully.

**<font color='red'>The next cell will clear ALL endpoints in this account and region. Run it with care!</font>**

In [15]:
smclient = boto3.client('sagemaker')
for ep in smclient.list_endpoints(MaxResults=100)['Endpoints']:
    deleted = False
    while not deleted:
        try:
            smclient.delete_endpoint(EndpointName=ep['EndpointName'])
            deleted = True
        except botocore.exceptions.ClientError:
            print(f'Endpoint {ep["EndpointName"]} not in a deletable state. Waiting 30 seconds to try again...')
            sleep(30)

In [11]:
for model in models:
    try:
        rmtree(f'/tmp/{model}')
    except:
        pass
rmtree(f'/tmp/sm-models', ignore_errors=True)

The cells below will run all models for each specified instance type in parallel. Notice that service quotas may limit the number of inference endpoints you may have for a given instance type and fail the benchmark if such quota is not enough to run all models in parallel. An easy way to control that is to limit the pool size to the quota you have for the most limited instance type.

In [16]:
from concurrent.futures import ThreadPoolExecutor
from multiprocessing import Pool

results = []
def call_gen(params):
    logging.debug(f'Calling with {params}')
    return(gen_model_instance_profile(*params, executions=10))
    

In [17]:
instance = 'ml.p3.2xlarge'
#     for model in models:
#         result = gen_model_instance_profile(model, instance, executions=10)
with Pool(3) as executor:
    result = executor.map(call_gen, [(model, instance) for model in models])
    results.append(result)



-



-



--------------------------------------------------------------------!---!--!

In [18]:
results

[[('faster-rcnn-resnet50-coco-2018-01-28',
   'ml.p3.2xlarge',
   'faster-rcnn-resnet50-coco-2018-01-28-ml-p3-2xlarge',
   10,
   3610050,
   0),
  ('ssd-resnet50-v1-fpn-shared-box-predictor-640x',
   'ml.p3.2xlarge',
   'ssd-resnet50-v1-fpn-shared-box-predictor-640x-ml-p3-2xlarge',
   10,
   3610050,
   0),
  ('faster-rcnn-inception-resnet-v2-atrous-coco-2',
   'ml.p3.2xlarge',
   'faster-rcnn-inception-resnet-v2-atrous-coco-2-ml-p3-2xlarge',
   10,
   3610303,
   0)]]

In [20]:
instance = 'ml.p2.xlarge'
#     for model in models:
#         result = gen_model_instance_profile(model, instance, executions=10)
with Pool(3) as executor:
    result = executor.map(call_gen, [(model, instance) for model in models])
    results.append(result)



-



-



-------------------------------------------!!!

In [21]:
results

[[('faster-rcnn-resnet50-coco-2018-01-28',
   'ml.p3.2xlarge',
   'faster-rcnn-resnet50-coco-2018-01-28-ml-p3-2xlarge',
   10,
   3610050,
   0),
  ('ssd-resnet50-v1-fpn-shared-box-predictor-640x',
   'ml.p3.2xlarge',
   'ssd-resnet50-v1-fpn-shared-box-predictor-640x-ml-p3-2xlarge',
   10,
   3610050,
   0),
  ('faster-rcnn-inception-resnet-v2-atrous-coco-2',
   'ml.p3.2xlarge',
   'faster-rcnn-inception-resnet-v2-atrous-coco-2-ml-p3-2xlarge',
   10,
   3610303,
   0)],
 [('faster-rcnn-resnet50-coco-2018-01-28',
   'ml.p2.xlarge',
   'faster-rcnn-resnet50-coco-2018-01-28-ml-p2-xlarge',
   10,
   3610303,
   0),
  ('ssd-resnet50-v1-fpn-shared-box-predictor-640x',
   'ml.p2.xlarge',
   'ssd-resnet50-v1-fpn-shared-box-predictor-640x-ml-p2-xlarge',
   10,
   3610050,
   0),
  ('faster-rcnn-inception-resnet-v2-atrous-coco-2',
   'ml.p2.xlarge',
   'faster-rcnn-inception-resnet-v2-atrous-coco-2-ml-p2-xlarge',
   10,
   3614948,
   0)]]

In [22]:
instance = 'ml.c5.2xlarge'
#     for model in models:
#         result = gen_model_instance_profile(model, instance, executions=10)
with Pool(3) as executor:
    result = executor.map(call_gen, [(model, instance) for model in models])
    results.append(result)



---------------------------------!-!-!

In [23]:
results

[[('faster-rcnn-resnet50-coco-2018-01-28',
   'ml.p3.2xlarge',
   'faster-rcnn-resnet50-coco-2018-01-28-ml-p3-2xlarge',
   10,
   3610050,
   0),
  ('ssd-resnet50-v1-fpn-shared-box-predictor-640x',
   'ml.p3.2xlarge',
   'ssd-resnet50-v1-fpn-shared-box-predictor-640x-ml-p3-2xlarge',
   10,
   3610050,
   0),
  ('faster-rcnn-inception-resnet-v2-atrous-coco-2',
   'ml.p3.2xlarge',
   'faster-rcnn-inception-resnet-v2-atrous-coco-2-ml-p3-2xlarge',
   10,
   3610303,
   0)],
 [('faster-rcnn-resnet50-coco-2018-01-28',
   'ml.p2.xlarge',
   'faster-rcnn-resnet50-coco-2018-01-28-ml-p2-xlarge',
   10,
   3610303,
   0),
  ('ssd-resnet50-v1-fpn-shared-box-predictor-640x',
   'ml.p2.xlarge',
   'ssd-resnet50-v1-fpn-shared-box-predictor-640x-ml-p2-xlarge',
   10,
   3610050,
   0),
  ('faster-rcnn-inception-resnet-v2-atrous-coco-2',
   'ml.p2.xlarge',
   'faster-rcnn-inception-resnet-v2-atrous-coco-2-ml-p2-xlarge',
   10,
   3614948,
   0)],
 [('faster-rcnn-resnet50-coco-2018-01-28',
   'ml.c5.2xl

In [24]:
instance = 'ml.c5.4xlarge'
#     for model in models:
#         result = gen_model_instance_profile(model, instance, executions=10)
with Pool(3) as executor:
    result = executor.map(call_gen, [(model, instance) for model in models])
    results.append(result)



---------------------------------------!!!

In [25]:
results

[[('faster-rcnn-resnet50-coco-2018-01-28',
   'ml.p3.2xlarge',
   'faster-rcnn-resnet50-coco-2018-01-28-ml-p3-2xlarge',
   10,
   3610050,
   0),
  ('ssd-resnet50-v1-fpn-shared-box-predictor-640x',
   'ml.p3.2xlarge',
   'ssd-resnet50-v1-fpn-shared-box-predictor-640x-ml-p3-2xlarge',
   10,
   3610050,
   0),
  ('faster-rcnn-inception-resnet-v2-atrous-coco-2',
   'ml.p3.2xlarge',
   'faster-rcnn-inception-resnet-v2-atrous-coco-2-ml-p3-2xlarge',
   10,
   3610303,
   0)],
 [('faster-rcnn-resnet50-coco-2018-01-28',
   'ml.p2.xlarge',
   'faster-rcnn-resnet50-coco-2018-01-28-ml-p2-xlarge',
   10,
   3610303,
   0),
  ('ssd-resnet50-v1-fpn-shared-box-predictor-640x',
   'ml.p2.xlarge',
   'ssd-resnet50-v1-fpn-shared-box-predictor-640x-ml-p2-xlarge',
   10,
   3610050,
   0),
  ('faster-rcnn-inception-resnet-v2-atrous-coco-2',
   'ml.p2.xlarge',
   'faster-rcnn-inception-resnet-v2-atrous-coco-2-ml-p2-xlarge',
   10,
   3614948,
   0)],
 [('faster-rcnn-resnet50-coco-2018-01-28',
   'ml.c5.2xl

In [None]:
instance = 'ml.m5.4xlarge'
#     for model in models:
#         result = gen_model_instance_profile(model, instance, executions=10)
with Pool(3) as executor:
    result = executor.map(call_gen, [(model, instance) for model in models])
    results.append(result)



--



-------------------------------------!!!

In [None]:
results

## Deleting the Endpoints to save resources

**<font color='red'>The next cell will clear ALL endpoints in this account and region. Run it with care!</font>**

In [15]:
smclient = boto3.client('sagemaker')
for ep in smclient.list_endpoints(MaxResults=100)['Endpoints']:
    deleted = False
    while not deleted:
        try:
            smclient.delete_endpoint(EndpointName=ep['EndpointName'])
            deleted = True
        except botocore.exceptions.ClientError:
            print(f'Endpoint {ep["EndpointName"]} not in a deletable state. Waiting 30 seconds to try again...')
            sleep(30)