Installation dependencies

In [None]:
# Lets install some image utilities
! pip install ipympl

# Core imports for the notebook
import numpy as np 
import tensorflow as tf
import time
import os
import json
from IPython.display import clear_output

# We will use the ResNet50 Keras model, pre-trained with imagenet weights...
from tensorflow.keras.applications import ResNet50

# ResNet50 helpers used specifically with our selected pre-trained model
from tensorflow.keras.applications.resnet50 import preprocess_input

! pip install pelion_sagemaker_controller
from pelion_sagemaker_controller import pelion_sagemaker_controller

Configuration settings...

In [None]:
# AWS Tunables
aws_region = 'us-east-1'
aws_s3_folder = "demo-sagemaker-edge"     # Root directory to be used in our S3 bucket

# We will use the ResNet50 image recognition model via Keras framework
model_framework = 'keras'
model_basename = 'resnet50'
model_name = model_framework + "-" + model_basename + "-image-recognizer"
packaged_model_name = model_framework + "-" + model_basename + "-model"
packaged_model_version = "1.0"
model_package = '{}-{}.tar.gz'.format(packaged_model_name, packaged_model_version)
model_max_input_key = packaged_model_name + "-max-num-inputs"
model_max_input_key = model_max_input_key.replace('-','_')

# These typically dont need to be changed... 
endpoint_api = 'api.' + aws_region + '.mbedcloud.com'              # This is optional and the default
async_response_sec = 0.25                                          # Pelion long polling tunable. Typically no need to change
busy_loop_wait_time_sec = 0.5                                      # This is our busy loop polling interval. Typically no need to change

# Decoder classification json specific to ResNet50...
resnet50_class_list_path = './model/imagenet_class_index.json'

# Number of images to capture in a single iteration
max_num_images       = 1
vp_capture_fps_ms    = 3000

# Input images WxH is 224x224 and are color images (so 3 channels)
image_size_w         = 224
image_size_h         = 224
image_num_channels   = 3

# Expected shape
vp_shape = {"width":image_size_w, "height":image_size_h, "depth":image_num_channels}

# capture loop sleep times (in seconds)
vp_initial_sleep_sec = 5
vp_loop_sleep_sec    = 3
vp_clean_sleep_sec   = 3

# Post-display capture clearing
vp_clear_s3          = True    # Clear out the captures from S3 bucket
vp_clear_nb          = True    # Clear out the captures from our Notebook

# Our Notebok directory where captures are stored
vp_np_capture_dir    = "capture"

# Our Pelion Edge Gateway is a Nvidia Jetson Xavier
target_device        = 'jetson_xavier'

# Set our Pelion API Configuration Here
api_key   = 'INSERT_YOUR_PELION_APPLICATION_KEY_HERE'
device_id = 'INSERT_YOUR_PELION_XAVIER_EDGE_SAGE_PT_DEVICEID_HERE'                            # Pelion Device ID of our Sagemaker Edge Agent PT device under our Pelion Edge gateway
video_processor_device_id = 'INSERT_YOUR_PELION_XAVIER_EDGE_VIDEO_PROCESSOR_PT_DEVICEID_HERE' # Pelion Device ID of our Video Processor PT

# Dougs API Key and Sagemaker Edge Agent Manager PT DeviceID + Video Processor PT DeviceID
api_key                   = 'ak_2MDE2ZmZjZmQwOTgxNGE2OGYxNmFlODQ0MDAwMDAwMDA0176d3f035509609f897638900000000Negxfm3p2PwYYEkqN1VKjqxMlM07HOzA'
device_id                 = '017a415e4fb83230f97c93d800300000'
video_processor_device_id = '017cddbbdd92d611df6b7d3700300000'

Allocate our Notebook client API and set its configuration - including the Video Processor extensions...

In [None]:
print("")
print("Allocating Notebook Client API and setting configuration...")
my_notebook = pelion_sagemaker_controller.MyNotebook(api_key, device_id, endpoint_api, aws_s3_folder, async_response_sec)

#
# IMPORTANT: set the compiled # input images from our first input data set...
#
# NOTE:  While it appears that you can feed your compiled model more or less than this value,
#        the FIRST execution of your model MUST contain exactly this many inputs... otherwise
#        Sagemaker Edge Agent Manager will crash and you will have to reboot your gateway.
#       
#        Subsequent invocations will work fine with fewer inputs... invoacations with larger
#        inputs will also complete, however you will only receive predictions for the first 
#        "max_compiled_model_num_input_images" inputs - the others will not have predictions. 
#
#        In future notebooks/versions, we'll be using this configuration entry.
#
my_notebook.pelion_api.pelion_set_config(model_max_input_key,max_num_images)
max_compiled_model_num_input_images = my_notebook.pelion_api.pelion_get_config(model_max_input_key)
print("")
print("Setting max number of input images for our compiled ResNet50 model (" + model_max_input_key + ") to: " + str(max_compiled_model_num_input_images))

# Compile up for our target Pelion Edge Gateway platform type
print("")
print("Initiating Sagemaker Neo compile of " + model_name + "...")
# job_name = my_notebook.compile_model(resnet50_model, target_device, model_basename, model_framework, neo_input_layer_shape)

# Package up and store the compiled model onto S3
print("")
print("Packaging up Neo-compiled model and placing in S3...")
# model_package = my_notebook.package_model(packaged_model_name, packaged_model_version, job_name)
print("")
print("Model Package: " + model_package)

#
# Add the deviceID for our VideoProcessor PT
#
print("")
print("Setting video processor PT device ID: " + video_processor_device_id)
my_notebook.pelion_api.setVideoProcessorDeviceID(video_processor_device_id);

#
# Establish configuration for the VideoProcessor
#
print("")
print("Setting video processor extension configuration...")
my_notebook.vp_set_captures_per_iteration(max_num_images)
my_notebook.vp_set_capture_directory(vp_np_capture_dir)
my_notebook.pelion_api.pelion_set_vp_config("captureFPS",str(vp_capture_fps_ms))
my_notebook.pelion_api.pelion_set_vp_config("model",model_name)
my_notebook.pelion_api.pelion_set_vp_config("shape",json.dumps(vp_shape))

# Get the VP configuration
vp_config = my_notebook.pelion_api.pelion_get_vp_config()
print("")
print("Video Processor Extension Configuration:")
print(vp_config)

Create our Keras50 model with the default ImageNet weights...

In [None]:
print("")
print("Allocating ResNet50 model")
resnet50_model = ResNet50(weights="imagenet", pooling="avg")
    
# Lets dump the model details... 
print(resnet50_model.summary())

# Next, lets record the input layer for the Neo-compatible input_shape for Keras-based models...
input_layer = resnet50_model.get_layer(index=0)

# Neo wants me to build out a specific input_shape (in NCHW per above...) for the compile() task...
neo_input_layer_shape = {}
neo_input_layer_shape[input_layer.name] = [max_compiled_model_num_input_images, image_num_channels, image_size_h, image_size_w]
neo_input_layer_shape = json.dumps(neo_input_layer_shape)
print("")
print("Neo expects this input_shape if compiling a Keras model: " + neo_input_layer_shape)

Compile and package up the model using Neo (long-winded...)

In [None]:
print("")
print("Initiating Sagemaker Neo compile of " + model_name + " for platfrorm: " + target_device + "...")
job_name = my_notebook.compile_model(resnet50_model, target_device, model_basename, model_framework, neo_input_layer_shape)

# Package up and store the compiled model onto S3
print("")
print("Packaging up Neo-compiled model and placing in S3...")
model_package = my_notebook.package_model(packaged_model_name, packaged_model_version, job_name)
print("")
print("Model Package: " + model_package)

Reload the newly compiled model...

In [None]:
print("")
print('Reloading Model: ' + model_name + " in package: " + model_package + '...')
print("")
my_notebook.pelion_api.pelion_reload_model(model_name,model_package)

# Poll every 5 sec to look for the reload() completion....
while True:
    print("Reloading " + model_name + "...")
    is_running = my_notebook.pelion_api.pelion_cmd_is_running("reloadModel");
    if is_running == False:
        print("")
        print('Reload Completed!')
        print("")
        break
    time.sleep(5)
    
# Get the loaded model info via Pelion...
time.sleep(5)
reload_result = my_notebook.pelion_api.pelion_list_models();
if 'response' in reload_result and len(reload_result['response']) > 0:
    if 'name' in reload_result['response'][0]:
        print("")
        print("Currently Loaded Model(s):")
        print(reload_result)
else:
    print("Model: " + model_name + " did NOT load properly. Check sagemaker edge agent logs on GW.")

    
# Get the loaded model info via Pelion...
reload_result = my_notebook.pelion_api.pelion_list_models();
print(reload_result)
if 'response' in reload_result and len(reload_result['response']) > 0:
    if 'name' in reload_result['response'][0]:
        print("")
        print("Currently Loaded Model(s):")
        print(reload_result)
else:
    print("Model: " + model_name + " did NOT load properly. Check sagemaker edge agent logs on GW.")

In [None]:
# Get the loaded model info via Pelion...
reload_result = my_notebook.pelion_api.pelion_list_models();
print(reload_result)
if 'response' in reload_result and len(reload_result['response']) > 0:
    if 'name' in reload_result['response'][0]:
        print("")
        print("Currently Loaded Model(s):")
        print(reload_result)
else:
    print("Model: " + model_name + " did NOT load properly. Check sagemaker edge agent logs on GW.")

Lets do a manual invocation first to "prime" the compiled model - first run is always slow...

In [None]:
# Input images directory in our notebook...
image_paths = './images'

# collect and prepare our images to analyze
print("")
print("Collecting images from: " + str(image_paths))
my_image_list = [os.path.join(image_paths,filename) for filename in os.listdir(image_paths) if os.path.isfile(image_paths + '/' + filename)]
my_image_list_length = len(my_image_list)

# Display the input image set
print("")
print("Input Image Files: " + json.dumps(my_image_list))

# read in the batch of images... then, preprocess_input() is ResNet50 specific
print("")
print("Pre-processing input image files...")
image_data = my_notebook.read_image_batch(my_image_list, image_size_h, image_size_w)
preprocessed_images_list = preprocess_input(image_data['array'])

# Display our initial images
print("")
print("Displaying Input Images:")
my_notebook.display_images(image_data['img'])

#
# Neo Sagemaker likes NCHW shapes for Keras models. 
# Our image set is in NHWC...
#
print("")
print("Input shape: " + str(preprocessed_images_list.shape))
print("Input dtype: " + str(preprocessed_images_list.dtype))
print("Input data length number of images: " + str(len(preprocessed_images_list)))

# ...so transpose() to NCHW format...  we have to do this because Neo requires a NCHW layer...
preprocessed_images_list = tf.transpose(preprocessed_images_list,[0, 3, 1, 2])

# Neo-compatible input tensor shape now!
print("")
print("Neo transposed Input shape: " + str(preprocessed_images_list.shape))
print("Neo transposed Input dtype: " + str(preprocessed_images_list.dtype))
print("Neo transposed Input data length number of images: " + str(len(preprocessed_images_list)))

# Save the preprocessed images, as the input tensor, to a single input file in S3
input_data_filename = 'preprocessed_images_' + str(time.time()) + '.input'
my_notebook.save_input_tensor_to_s3(preprocessed_images_list,input_data_filename)

# Invoke the prediction with our input image data via Pelion using the S3 URLs for input and outputs...
input_tensor_s3 = 's3:///' + input_data_filename
output_tensor_s3 = 's3:///' + model_basename + '-predicted.data'
print("Invoking Prediction on Pelion Edge with Sagemaker. Model: " + model_name + " Input Tensor: " + input_tensor_s3 + " Output Tensor: " + output_tensor_s3)
print("")
my_notebook.pelion_api.pelion_predict(model_name, input_tensor_s3, output_tensor_s3)

# Poll every busy_loop_wait_time_sec seconds to look for the predict() completion....
while True:
    print('Predicting...')
    is_running = my_notebook.pelion_api.pelion_cmd_is_running("predict");
    if is_running == False:
        print("")
        print('Prediction Completed!')
        print("")
        break
    time.sleep(busy_loop_wait_time_sec)
    
# Now get the prediction result
prediction_result = my_notebook.pelion_api.pelion_last_cmd_result();
if 'details' in prediction_result:
    if 'output' in prediction_result['details']:
        print("")
        print("Prediction Results:")
        print(prediction_result)
        
# Prediction results tensor filename in our notebook
output_tensor_local_nb_filename = model_basename + '-prediction-output.tensor'

# Read the output tensor from S3 and deposit a copy of the output into the notebook. our output tensor should be float32 for Resnet50...
output_tensor = my_notebook.get_output_tensor(prediction_result['details']['output'][0]['url'], output_tensor_local_nb_filename, np.float32)

# Decode the Resnet50 prediction results
print("")
print("Decoding ResNet50 Imagenet-trained Prediction results...")
most_likely_labels = my_notebook.decode_resnet50_predictions(output_tensor, top=3, class_list_path=resnet50_class_list_path)

# Display the images, annotated with the predictions
print("")
print("Displaying prediction results...")
my_notebook.display_images(image_data['img'],most_likely_labels)

Next, we startup our video capture loop

In [None]:
print("")
print("Starting video capture...")
my_notebook.pelion_api.pelion_start_videocapture(True)
print("Video capture started.")
time.sleep(vp_initial_sleep_sec)

# Loop and pull over the captured images and display our preditions
counter = 1
while True:
    try:
        # pull over the latest images
        print("")
        print("Getting latest capture timestamps...")
        timestamps = my_notebook.pull_video_captures()
        print("Retrieved " + str(len(timestamps)) + " timestamps...")
        if len(timestamps) > 0:
            for timestamp in timestamps:
                # Read in images for the ith timestamp
                print("")
                print("Calling read_captured_images...")
                image_list = my_notebook.read_captured_images(timestamp,image_size_w,image_size_h)
                if not image_list is None:
                    # Read in the output tensor and decode the predictions
                    print("")
                    print("Calling get_captured_output_tensor...")
                    output_tensor = my_notebook.get_captured_output_tensor(timestamp, np.float32)
                    if not output_tensor is None:
                        # Display the ith set of images and our predictions
                        print("")
                        print("Displaying captured predictions for Timestamp: " + timestamp)
                        my_notebook.display_captures(image_list, output_tensor, timestamp, resnet50_class_list_path)
                    else:
                        print("")
                        print("Ignoring timestamp: " + str(timestamp) + " (no output prediction)... OK.")
                else:
                    print("")
                    print("Ignoring timestamp: " + str(timestamp) + " (image read incomplete)... OK.")
        else:
            print("")
            print("No timestamps given")
    except Exception as e:
        print("")
        print("Exception occurred during capture parsing...")
        print(e)
        exc_type, exc_obj, exc_tb = sys.exc_info()
        fname = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1]
        print(exc_type, fname, exc_tb.tb_lineno)
                
    # Pause for a bit...
    print("")
    print("Sleeping for a few seconds... count: " + str(counter))
    time.sleep(vp_loop_sleep_sec)
    
    # Clean up if we have timestamps...
    if len(timestamps) > 0:
        for timestamp in timestamps:
            print("")
            print("Cleaning out captures for timestamp: " + timestamp)
            my_notebook.clean_captures(timestamp, vp_clear_s3, vp_clear_nb)

    # increment and clear the output section...
    counter = counter + 1
    time.sleep(vp_clean_sleep_sec)
    clear_output(wait=True)

At any time we can interrupt the video capture loop and tell our capture PT to stop collecting captures...

In [None]:
# Stop video capture...
print("")
print("Stopping video capture...")
my_notebook.pelion_api.pelion_stop_videocapture()

# Finalize cleanup if we have timestamps...
print("")
print("Cleaning up last video captures")
time.sleep(vp_clean_sleep_sec)
my_notebook.finalize_clean_captures(vp_clear_s3, vp_clear_nb)
time.sleep(vp_clean_sleep_sec)
my_notebook.finalize_clean_captures(vp_clear_s3, vp_clear_nb)
print("")
print("Video capture stopped! Captures cleaned up.")