# DPU example: RefineDet
---

## Aim/s
* This notebooks shows an example of DPU applications. The application, as well as the DPU IP, is pulled from the official 
[Vitis AI Github Repository](https://github.com/Xilinx/Vitis-AI).
* Description: RefineDet for object detection on VOC.
* Input size: 320x320
* Task: Object detection

## References
* [Vitis AI Github Repository](https://www.xilinx.com/products/design-tools/vitis/vitis-ai.html).

## Last revised
* Dec 14, 2023
    * Initial revision. MM, Alpha Data Parallel Systems Ltd.

----

In [None]:
import cv2
import time
import glob
import random
import colorsys
import numpy as np
import IPython
from PIL import Image

from pynq_dpu import DpuOverlay

# Import helper functions form refinedet_utils.py
# Original file available as utils_tf.py in the Vitis AI Github repository.
from refinedet_utils import pboxes_vgg_voc, Encoder

## 1. Prepare the overlay
We will download the overlay onto the board and prepare the model.

In [None]:
ol = DpuOverlay('dpu.bit')
ol.load_model('refinedet_voc_tf.xmodel')

## 2. Utility functions

In this section, we will prepare a few functions and the input data for later use.

First we need to load the class labels for the VOC dataset and determine the colors for annotation.

In [None]:
def get_class(classes_path):
    '''
    Function to parse the VOC classes file to get the classification information
    for the model.
    
    Input:
        classes_path: string corresponding to the path to the class labels file.
        
    Returns:
        List of strings corresponding to the individual class labels.
    
    '''
    with open(classes_path) as f:
        class_names = f.readlines()
    class_names = [c.strip() for c in class_names]
    return class_names

# Parse the classes file
classes_path = "img/voc_classes.txt"
class_names = get_class(classes_path)

# Generate colors for each class
num_classes = len(class_names)
hsv_tuples = [(1.0 * x / num_classes, 1., 1.) for x in range(num_classes)]
colors = list(map(lambda x: colorsys.hsv_to_rgb(*x), hsv_tuples))
colors = list(map(lambda x: 
                  (int(x[0] * 255), int(x[1] * 255), int(x[2] * 255)), 
                  colors))
random.seed(0)
random.shuffle(colors)
random.seed(None)

Next we need to define helper functions to prepare the input images for the model.
At this stage we will also define a post-processing function to decode the output
of the model and draw the bounding boxes.

The post-processing step for the RefineDet model is quite complex. To keep this
example notebook clear, the helper functions for post-processing are defined in
the [refinedet_utils.py](refinedet_utils.py) file.

In [None]:
def preprocess(image, size=(320, 320)):
    '''
    Function to prepare the input for the RefineDet model.
    The input images are first resized then mean subtraction is used to
    shift the colour channel values.
    
    Args:
        image: 3 dimensional array corresponding to the input BGR image.
        size (optional): (new_width, new_height) tuple corresponding to 
                         the size of the input the RefineDet model is
                         expecting. Set to 320x320 by default.
                         
    Returns:
        3 dimensional array corresponding to the input for RefineDet. The
        array has a shape of (new_height, new_width, 3).
    '''
    
    image = cv2.resize(image, size)
    image = image.astype('float32')
    R_MEAN = 123.68
    G_MEAN = 116.78 
    B_MEAN = 103.94
    mean = np.array([B_MEAN, G_MEAN, R_MEAN], dtype=np.float32)
    image = image - mean
    
    return image

def postprocess(image, output_tensors, display, score_threshold=0.5):
    '''
    Function to decode the output of the model and annotate the original
    image with the bounding boxes, predicted labels and confidence score.
    
    Args:
        image: 3 dimensional array corresponding to the original input image.
        output_tensors: buffers allocated for the DPU to store the output of
                        the model.
        display: boolean flag set to annotate and display the input image.
        score_threshold (optional): float corresponding to the minimum 
                        confidence in the prediction for annotation.
                        
    Returns:
        None, the annotated image is displayed when the display flag is set.
    
    '''
    
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    original_height, original_width, _ = image.shape
    bbox_thick = int(0.6 * (original_height + original_width) / 600)
    
    arm_loc = output_tensors[0] # (1, 8500, 4)
    arm_cls = output_tensors[1] # (1, 8500, 2)
    odm_loc = output_tensors[2] # (1, 8500, 4)
    odm_cls = output_tensors[3] # (1, 8500, 21)
    
    pboxes = pboxes_vgg_voc()
    encoder = Encoder(pboxes)
    
    loc, label, prob = encoder.decode_batch(arm_cls, arm_loc, odm_cls, odm_loc, 0.45, 200, device=0)[0]
    valid_mask = np.logical_and((loc[:, 2] - loc[:, 0] > 0), (loc[:, 3] - loc[:, 1] > 0))
    
    for i in range(prob.shape[0]-1, -1,-1):
        if not valid_mask[i]: continue
            
        score = prob[i]
        if score < score_threshold: break
            
        xmin = loc[i][0] * original_width
        ymin = loc[i][1] * original_height
        xmax = loc[i][2] * original_width
        ymax = loc[i][3] * original_height
        class_id = int(label[i]) - 1
        
        if not display: continue
        
        color = colors[class_id]
        text = f'{class_names[label[i] - 1]}: {prob[i]:.4f}'
        
        cv2.rectangle(image, (int(xmin), int(ymin)), (int(xmax), int(ymax)), color, bbox_thick)
        
        cv2.putText(image,
                    text,
                    (int((xmax - xmin)/2), int((ymax-ymin)/2)),
                    fontFace=cv2.FONT_HERSHEY_SIMPLEX,
                    fontScale=0.6,
                    color=color,
                    thickness=bbox_thick)
        
    if display:
        IPython.display.display(Image.fromarray(image))

Finally, use a search pattern to collect all the `JPEG` files for evaluation.

In [None]:
search_pattern = f'img/*.JPEG'
original_images = glob.glob(search_pattern)
total_images = len(original_images)

## 3. Use VART

Now we should be able to use VART to do object detection.

In [None]:
dpu = ol.runner

input_tensors = dpu.get_input_tensors()
output_tensors = dpu.get_output_tensors()

shape_in = input_tensors[0].dims
shape_out0 = output_tensors[0].dims
shape_out1 = output_tensors[1].dims
shape_out2 = output_tensors[2].dims
shape_out3 = output_tensors[3].dims

We can define a few buffers to store input and output data. They will be reused during multiple runs.

In [None]:
input_data = [np.empty(shape_in, dtype=np.float32, order="C")]
output_data = [np.empty(shape_out0, dtype=np.float32, order="C"), 
               np.empty(shape_out1, dtype=np.float32, order="C"),
               np.empty(shape_out2, dtype=np.float32, order="C"),
               np.empty(shape_out3, dtype=np.float32, order="C")]

image = input_data[0]

Remember that we have a list of `original_images`. We can no define a new function `run()` which takes the image index as the input, prepares the image for the model, executes the DPU for the given input and finally post-processes the output of the DPU. With the argument 
`display` set to `True`, the original image with the bounding boxes and class labels and 
confidence scores can be rendered.

In [None]:
def run(image_index, display=False):
    '''
    Function to handle a single input for the DPU and process the output of the run.
    
    Args:
        image_index: int corresonding to the index of the collected images to be processed.
        display (optional): boolean flag set to annotate and display the input image.
        
    Returns:
        None, the annotated image is displayed when the display flag is set.
        
    Raises:
        AssertionError: when the provided image_index is larger than the number of potential
                        input images found.
    
    '''
    assert image_index < total_images, \
    f'Please specify an image index less than {total_images}. Index provided: {image_index}.'
    
    # Read input image
    input_image = cv2.imread(original_images[image_index])
    
    # Pre-process image
    image_data = preprocess(input_image)
    
    # Fetch data to DPU and trigger it
    image[0, ...] = image_data
    job_id = dpu.execute_async(input_data, output_data)
    dpu.wait(job_id)
    
    # Post-process result
    postprocess(input_image, output_data, display, score_threshold=0.5)

Let's run it for 1 image and display the annotated image.

In [None]:
run(0, display=True)

We can also run it for multiple images as shown below. In this example we have only used 1 thread; in principle, users should be able to boost the performance by employing more threads.

In [None]:
time1 = time.time()
[run(i) for i in range(total_images)]
time2 = time.time()
fps = total_images/(time2-time1)
print("Performance: {} FPS".format(fps))

We will need to remove references to `vart.Runner` and let Python garbage-collect the unused graph objects. This will make sure we can run other notebooks without any issue.

In [None]:
del dpu
del ol

----

Copyright (C) 2021 Xilinx, Inc

SPDX-License-Identifier: Apache-2.0 License

----

----