# Deploy model and run inference

In [1]:
import os
import glob
import cv2
%matplotlib inline
import matplotlib.pyplot as plt
import visualization_utils as viz_utils
import tensorflow as tf

Since the models are trained on my personal account, I downloaded the trained models and do the interference on the local machine.


In [2]:
efficientdet_model_path = '../../models/efficientdet-d1-coco17-fine-tuned-model-1/1'
faster_rcnn_model_path = '../../models/faster-rcnn-resnet152-v1-640x640-coco17-fine-tuned-model-1/1'
ssd_resnet_path = '../../models/ssd-resnet50-v1-fpn-1024x1024-coco17-fine-tuned-model-1/1'

In [3]:
efficientdet_model = tf.saved_model.load(efficientdet_model_path)
faster_rcnn_model = tf.saved_model.load(faster_rcnn_model_path)
ssd_resnet_model = tf.saved_model.load(ssd_resnet_path)

## Run inference

Our model is now deployed and we can query it. We are going to use the images available in `data/test_video` to run inference and generate a video. To do so, we are going to need a few tools:
* we need to sort all the frames by index order (which corresponds to chronological order)
* we need a function to load images into numpy array
* we need a loop to run inference and display the results on the input image

We list the frame paths and sort them by index.

In [4]:
frames_path = sorted(glob.glob('../data/test_video/*.png'), 
                     key = lambda k: int(os.path.basename(k).split('.')[0].split('_')[1]))

We create a small function to load images.

In [5]:
import numpy as np
def load_image(path: str) -> np.ndarray:
    """Read an image from the path and returns a numpy array"""
    cv_img = cv2.imread(path,1).astype('uint8')
    cv_img = cv2.cvtColor(cv_img, cv2.COLOR_BGR2RGB)
    return cv_img

We create a mapping from id to name for visualization purposes.

In [6]:
category_index = {
                    1: {'id': 1, 'name': 'vehicle'}, 
                    2: {'id': 2, 'name': 'pedestrian'},
                    4: {'id': 4, 'name': 'cyclist'}
                }

This is the main loop:
* we load images to numpy
* we query the deployed model
* we display the inference results on the images

In [7]:
def image_file_to_tensor(path):
    cv_img = cv2.imread(path,1).astype('uint8')
    cv_img = cv2.cvtColor(cv_img, cv2.COLOR_BGR2RGB)
    return cv_img
    
images_efficientdet = []
for idx, path in enumerate(frames_path):
    if idx % 10 == 0:
        print(f'Processed {idx}/{len(frames_path)} images.')
        
    # load image
    img = image_file_to_tensor(path)
    inputs = {'instances': [img.tolist()]}
    reshaped_tensor = tf.reshape(img,(1,640,640,3))
    result = efficientdet_model(reshaped_tensor)
    detection_boxes = result['detection_boxes'].numpy().reshape(-1, 4)
    detection_classes = result['detection_classes'].numpy().reshape(-1).astype(int)
    detection_scores = result['detection_scores'].numpy().ravel()
    # display results on image
    image_np_with_detections = \
        viz_utils.visualize_boxes_and_labels_on_image_array(
            img,
            detection_boxes,
            detection_classes,
            detection_scores,
            category_index,
            use_normalized_coordinates=True,
            max_boxes_to_draw=100,
            min_score_thresh=0.6,
            agnostic_mode=False)
    images_efficientdet.append(image_np_with_detections)

Processed 0/100 images.
Processed 10/100 images.
Processed 20/100 images.
Processed 30/100 images.
Processed 40/100 images.
Processed 50/100 images.
Processed 60/100 images.
Processed 70/100 images.
Processed 80/100 images.
Processed 90/100 images.


In [16]:
def image_file_to_tensor(path):
    cv_img = cv2.imread(path,1).astype('uint8')
    cv_img = cv2.cvtColor(cv_img, cv2.COLOR_BGR2RGB)
    return cv_img
    
images_faster_rcnn = []
for idx, path in enumerate(frames_path):
    if idx % 10 == 0:
        print(f'Processed {idx}/{len(frames_path)} images.')
        
    # load image
    img = image_file_to_tensor(path)
    inputs = {'instances': [img.tolist()]}
    reshaped_tensor = tf.reshape(img,(1,640,640,3))
    result = faster_rcnn_model(reshaped_tensor)
    detection_boxes = result['detection_boxes'].numpy().reshape(-1, 4)
    detection_classes = result['detection_classes'].numpy().reshape(-1).astype(int)
    detection_scores = result['detection_scores'].numpy().ravel()
    # display results on image
    image_np_with_detections = \
        viz_utils.visualize_boxes_and_labels_on_image_array(
            img,
            detection_boxes,
            detection_classes,
            detection_scores,
            category_index,
            use_normalized_coordinates=True,
            max_boxes_to_draw=100,
            min_score_thresh=0.6,
            agnostic_mode=False)
    images_faster_rcnn.append(image_np_with_detections)

Processed 0/100 images.
Processed 10/100 images.
Processed 20/100 images.
Processed 30/100 images.
Processed 40/100 images.
Processed 50/100 images.
Processed 60/100 images.
Processed 70/100 images.
Processed 80/100 images.
Processed 90/100 images.


In [9]:
def image_file_to_tensor(path):
    cv_img = cv2.imread(path,1).astype('uint8')
    cv_img = cv2.cvtColor(cv_img, cv2.COLOR_BGR2RGB)
    return cv_img
    
images_ssd_resnet = []
for idx, path in enumerate(frames_path):
    if idx % 10 == 0:
        print(f'Processed {idx}/{len(frames_path)} images.')
        
    # load image
    img = image_file_to_tensor(path)
    inputs = {'instances': [img.tolist()]}
    reshaped_tensor = tf.reshape(img,(1,640,640,3))
    result = ssd_resnet_model(reshaped_tensor)
    detection_boxes = result['detection_boxes'].numpy().reshape(-1, 4)
    detection_classes = result['detection_classes'].numpy().reshape(-1).astype(int)
    detection_scores = result['detection_scores'].numpy().ravel()
    # display results on image
    image_np_with_detections = \
        viz_utils.visualize_boxes_and_labels_on_image_array(
            img,
            detection_boxes,
            detection_classes,
            detection_scores,
            category_index,
            use_normalized_coordinates=True,
            max_boxes_to_draw=100,
            min_score_thresh=0.5,
            agnostic_mode=False)
    images_ssd_resnet.append(image_np_with_detections)

Processed 0/100 images.
Processed 10/100 images.
Processed 20/100 images.
Processed 30/100 images.
Processed 40/100 images.
Processed 50/100 images.
Processed 60/100 images.
Processed 70/100 images.
Processed 80/100 images.
Processed 90/100 images.


In [10]:
print(img.shape)
print(detection_boxes.shape)
print(detection_scores)
print(detection_classes)

(640, 640, 3)
(300, 4)
[9.95426595e-01 9.60321605e-01 9.02709424e-01 8.17579865e-01
 7.93292344e-01 6.92095757e-01 3.08514625e-01 2.27201074e-01
 2.14562401e-01 1.80795863e-01 1.72200412e-01 1.44587457e-01
 1.09890148e-01 7.37636015e-02 7.07768425e-02 6.80900961e-02
 4.57906611e-02 4.11713868e-02 3.54399942e-02 3.45046595e-02
 3.14030312e-02 2.83752847e-02 2.78261397e-02 2.50570662e-02
 2.44037919e-02 2.36758664e-02 2.33712178e-02 2.19492838e-02
 1.62432678e-02 1.60880871e-02 1.56127177e-02 1.33585315e-02
 1.29262051e-02 1.28587484e-02 1.16884876e-02 1.15012042e-02
 1.13214208e-02 1.13075068e-02 9.90782771e-03 9.76958685e-03
 9.35073663e-03 9.08306893e-03 8.48153699e-03 8.39796197e-03
 7.92154856e-03 7.39846937e-03 7.13239331e-03 7.12500559e-03
 6.86962856e-03 6.09234069e-03 5.99585427e-03 5.98348072e-03
 5.66827226e-03 5.43400552e-03 4.87852143e-03 4.51403763e-03
 4.18412127e-03 4.05209372e-03 3.95211298e-03 3.73267196e-03
 3.65063292e-03 3.58703500e-03 3.51928012e-03 3.44834267e-03
 

We can verify that the model worked correctly by displaying elements of the `images` list.

In [11]:
plt.figure()
plt.imshow(images_efficientdet[0])
plt.title("EfficientDet")
plt.axis('off')  # Optionally hide axes
plt.show()

# Display the first image of faster_rcnn
plt.figure()
plt.imshow(images_faster_rcnn[0])
plt.title("Faster R-CNN")
plt.axis('off')  # Optionally hide axes
plt.show()

# Display the first image of ssd_resnet
plt.figure()
plt.imshow(images_ssd_resnet[0])
plt.title("SSD ResNet")
plt.axis('off')  # Optionally hide axes
plt.show()

  plt.show()
  plt.show()
  plt.show()


Finally, we can create a video (`output.avi`) with our detections by running the following function.

In [13]:
frame_width = images_efficientdet[0].shape[0]
frame_height = images_efficientdet[0].shape[1]

out_efficientdet = cv2.VideoWriter('efficientdet.avi', cv2.VideoWriter_fourcc('M','J','P','G'), 10, (frame_width,frame_height))

# Read and display the images
for image in images_efficientdet:
    image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
    out_efficientdet.write(image) # Write the image to the video
    if cv2.waitKey(1) == ord('q'): # Hit `q` to exit
        break
        
# Release everything if job is finished
out_efficientdet.release()
cv2.destroyAllWindows()

The video would be stored in the current working directory. You can download it from Sagemaker and run it locally.

In [17]:
frame_width = images_faster_rcnn[0].shape[0]
frame_height = images_faster_rcnn[0].shape[1]

out_faster_rcnn = cv2.VideoWriter('images_faster_rcnn.avi', cv2.VideoWriter_fourcc('M','J','P','G'), 10, (frame_width,frame_height))

# Read and display the images
for image in images_faster_rcnn:
    image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
    out_faster_rcnn.write(image) # Write the image to the video
    if cv2.waitKey(1) == ord('q'): # Hit `q` to exit
        break
        
# Release everything if job is finished
out_faster_rcnn.release()
cv2.destroyAllWindows()

In [18]:
frame_width = images_ssd_resnet[0].shape[0]
frame_height = images_ssd_resnet[0].shape[1]

out_ssd_resnet = cv2.VideoWriter('ssd_resnet.avi', cv2.VideoWriter_fourcc('M','J','P','G'), 10, (frame_width,frame_height))

# Read and display the images
for image in images_ssd_resnet:
    image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
    out_ssd_resnet.write(image) # Write the image to the video
    if cv2.waitKey(1) == ord('q'): # Hit `q` to exit
        break
        
# Release everything if job is finished
out_ssd_resnet.release()
cv2.destroyAllWindows()