In [None]:
import os
import numpy as np

from threading import Thread, Event
from queue import Queue
import asyncio

import cv2
import opencv_jupyter_ui as jcv2

from datetime import datetime, timedelta
import time

import driver
from pynq.pl_server.device import Device

### Driver Base Modification

New methods added in driver_base.py to perform async execution:

```python
def async_exec(self, input_npy)
def polling_out_dma_ready(self)
```

# Config

In [None]:
FRAME_IN_W = 640
FRAME_IN_H = 480
N_FRAMES = 240

IMG_W_RSZ = 224
IMG_H_RSZ = 224
BATCH_SIZE = 20
FPGA_CLK = 5.0

CAP_FPS = BATCH_SIZE

WARM_UP_FRAMES = 5
VERBOSE = False
# Sleep if there are no new frames from the camera
SLEEP_BATCH_FRAME = 0.01
# Sleep DMA while inference is calculated: used for polling DMA
SLEEP_DMA_POLL = 0.2

# Accelerator

In [None]:
my_device = Device.devices[0]

accel = driver.FINNExampleOverlay(
    bitfile_name = '../bitfile/finn-accel.bit', 
    platform = "zynq-iodma",
    io_shape_dict = driver.io_shape_dict, 
    batch_size = BATCH_SIZE,
    fclk_mhz = FPGA_CLK,
    runtime_weight_dir = "runtime_weights/", 
    device=my_device
)

In [None]:
res = accel.throughput_test()
file = open("nw_metrics_threads.txt", "w")
file.write(str(res))
file.close()
print("Results written to nw_metrics.txt")

# Camera Setup

In [None]:
def init_videocapture(width=1280, height=720, fps = 30):
    camera = cv2.VideoCapture(0, cv2.CAP_V4L2)
    camera.set(cv2.CAP_PROP_FRAME_WIDTH, width)
    camera.set(cv2.CAP_PROP_FRAME_HEIGHT, height)
    camera.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc('M', 'J', 'P', 'G'))
    camera.set(cv2.CAP_PROP_FPS, fps)
    return camera

# Camera Thread

In [None]:
def get_frames(
    event_warm_up_ready,
    event_initial_plot_ready,
    queue_warm_up,
    queue_frames,
    frame_in_w, 
    frame_in_h, 
    n_frames,
    batch_size,
    warm_up_frames = 5,
    cap_fps = 30,
    verbose = False):
    
    '''
    Get frames continuously from the camera.
    '''
    
    capture = init_videocapture(width=frame_in_w, height=frame_in_h, fps=cap_fps)
    print(f'$ Camera Thread: Is webcam open: {capture.isOpened()}')
    img_w = capture.get(cv2.CAP_PROP_FRAME_WIDTH)
    img_h = capture.get(cv2.CAP_PROP_FRAME_HEIGHT)
    cv_fps = capture.get(cv2.CAP_PROP_FPS) 
    print(f'$ Camera Thread: Camera (width, height) = ({img_w}, {img_h}) - FPS = {cv_fps}')
    
    ################################
    #           Warm Up            #
    ################################
    warm_up_count = 0
    while (warm_up_count < warm_up_frames):
        ret, frame = capture.read()
        queue_warm_up.put(frame)
        warm_up_count += 1
    event_warm_up_ready.set()
    print(f'\n$ Camera Thread: Warm up of {warm_up_frames} frames finished\n')
    print(f'\n$ Camera Thread: Waiting for Initial Plot\n')
    event_initial_plot_ready.wait()
    print(f'\n$ Camera Thread: Initial Plot finished\n')
    
    ################################
    #            Loop              #
    ################################
    img_count = 0
    start = datetime.now()
    while (img_count < n_frames):
        ret, frame = capture.read()
        queue_frames.put(frame)
        if verbose == True:
            img_in_batch = img_count % batch_size
            print(f'$ Camera Thread: got image = {img_count} - Id in batch: {img_in_batch} - Queue frames size = {queue_frames.qsize()}.')
        img_count += 1
    end = datetime.now()
    
    queue_frames.put(None) # Signal plot_frames to stop
    
    delta_secs = (end-start).total_seconds()
    fps = round(n_frames / delta_secs, 1)
    print("\n$ *******************************")
    print(f'$ Camera Thread: FPS = {fps:.1f}')
    print("$ *******************************\n")
    
    capture.release()

# Batches Thread

In [None]:
def do_batches(
    queue_frames, 
    queue_batches, 
    event_batch_ready, 
    batch_size,
    queue_frames_2_plot,
    sleep_time = 0.02,
    verbose = False): #, queue_frames_2_plot):
    
    '''
    Do batches taking images from Queue Frames. Sleep if there are no frames in the queue.
        - BGR2RGB
        - Resize
        - Expand dims to add batch dim
    Once a batch is formed, put it in Queue Batches.
    Queue Batches is of size 1, to block if previous batch was not predicted by the accelerator.
    
    Put the first frame of the batch in the Queue Frames to Plot, to plot it afterwards.
    '''
    
    img_id = 0
    batch_idx = 0
    
    while True:
        if queue_frames.empty():
            if verbose == True:
                print(">> Batches Thread: empty queue frames from camera")
            time.sleep(sleep_time)
        else:
            frame = queue_frames.get()
            if verbose == True:
                print(f">> Batches Thread: image {img_id} for batch {batch_idx} - Queue batches size = {queue_batches.qsize()}.")
            if frame is None:
                break 
            # Form batches for accel prediction
            img = frame.copy()
            img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
            img = cv2.resize(img, (IMG_W_RSZ, IMG_H_RSZ), interpolation = cv2.INTER_LINEAR)
            assert img.dtype == "uint8", "Image datatype must be UINT8"
            img = np.expand_dims(img, axis=0)
            if img_id == 0:
                # To plot images inside notebook
                queue_frames_2_plot.put([batch_idx, frame])
                if verbose == True:
                    print(f">> Batches Thread: plot frame added to batch {batch_idx} - Queue plot size = {queue_frames_2_plot.qsize()}.")
                batch_imgs = img
            else:
                batch_imgs = np.concatenate((batch_imgs, img), axis=0)
            if img_id == (batch_size - 1):
                # Put blocks if accel did not get previous batch
                queue_batches.put([batch_idx, batch_imgs])
                if verbose == True:
                    print(f">> Batches Thread: Batch {batch_idx} for accel - Queue batches size = {queue_batches.qsize()}.")
                event_batch_ready.set()
                batch_idx += 1
            img_id = (img_id + 1) % batch_size
            queue_frames.task_done()
    
    # Signal queues to stop
    print(">> Batches Thread: No more batches to do. Put None.")
    queue_batches.put([batch_idx, None])
    queue_frames_2_plot.put([batch_idx, None])
    time.sleep(5)
    event_batch_ready.set()
    #queue_frames_2_plot.put(None)

# Accel Thread

In [None]:
def accel_exec(
    queue_batches, 
    event_batch_ready, 
    event_accel_exec, 
    event_yhat_ready_accel,
    verbose = False):
    
    batch_idx = 0
    
    while True:
        event_batch_ready.wait()
        event_batch_ready.clear()
        # It avoids getting the new batch if previous one was not retrieved from DMA first
        # Therefore, this thread blocks until previous inference is performed
        # It causes that do_batches and get_frames gets blocked too
        event_yhat_ready_accel.wait()
        event_yhat_ready_accel.clear()
        do_batch_idx, batch = queue_batches.get()
        assert do_batch_idx == batch_idx, f"@ Accel Thread: do batch idx {do_batch_idx} does not match accel batch idx {batch_idx}."
        if batch is None:
            print("@@@ Accel Thread: No more Batches signal received.")
            break
        if verbose == True:
            print(f'@@@ Accel Thread: Batch idx {batch_idx}. Batch shape = {batch.shape}.')
        # Execute accelerator
        if verbose == True:
            start_accel = datetime.now()
        accel.async_exec([batch])
        # Notify DMA Poll that it can start polling for prediction ready
        if verbose == True:
            end_accel = datetime.now()
            print(f'@@@ Accel Thread: time to exec accel {(end_accel-start_accel).microseconds/1e3:.2f} [ms] - {end_accel}')   
        event_accel_exec.set()
        batch_idx += 1  
        queue_batches.task_done()
        
    queue_batches.task_done()

# Check DMA Result Thread

In [None]:
def check_dma_ready(
    event_accel_exec, 
    queue_yhat, 
    event_yhat_ready,
    event_yhat_ready_accel,
    n_frames, 
    batch_size,
    sleep_time = 0.01,
    verbose = False):
    
    '''
    Polls the DMA to check if prediction was already completed by the accelerator.
    It signals the accelerator that prediction was completed, so it can load next batch: event_yhat_ready_accel.
    It signals the main thread that prediction is ready, so it can plot it: event_yhat_ready. 
    
    With this strategy, it blocks do_batches, as the accelerator cannot accept new batches until previous one is completed.
    '''
    
    total_batches = int(n_frames / batch_size)
    batch_idx = 0
    
    accel_preds_times = []
    
    while (batch_idx < total_batches):
        event_accel_exec.wait()
        start_accel = datetime.now()           
        event_accel_exec.clear()
        if verbose == True:
            print(f"**** DMA Thread: Accel started Execution - {start_accel}.")
        while True:
            if verbose == True:
                now_time = datetime.now()   
                print(f"**** DMA Thread: ... Polling for DMA result ... : {now_time}")
            out_ready, yhat = accel.polling_out_dma_ready()
            if out_ready == True:
                queue_yhat.put([batch_idx, yhat])
                
                if verbose == True:
                    print("\nOutput DMA ready")
                    print(f'{yhat}')
                    # Calculate time between predictions
                    end_accel = datetime.now()
                    if batch_idx == 0:
                        accel_preds_times.append(end_accel)
                    else:
                        accel_preds_times.append(end_accel)
                        time_between_preds = accel_preds_times[1] - accel_preds_times[0]
                        if batch_idx == 1:
                            accel_preds_sum = time_between_preds
                        else:
                            accel_preds_sum = accel_preds_sum + time_between_preds
                        accel_preds_mean_time = accel_preds_sum / batch_idx
                        accel_preds_times.pop(0)
                        print(f'\n_____ Accel between predictions: {time_between_preds.total_seconds():.2f} [secs] _____')
                        print(f'_____ Mean Accel between predictions: {accel_preds_mean_time.total_seconds():.2f} [secs] _____')
                    print(f'_____ Accel elapsed time: {(end_accel-start_accel).microseconds/1e3:.2f} [ms] _____\n')   
                
                # Print time between predictions and accel elapsed time close to plot window
                else:
                    end_accel = datetime.now()
                    if batch_idx == 0:
                        accel_preds_times.append(end_accel)
                        str_2_print = ""
                    else:
                        accel_preds_times.append(end_accel)
                        time_between_preds = accel_preds_times[1] - accel_preds_times[0]
                        if batch_idx == 1:
                            accel_preds_sum = time_between_preds
                        else:
                            accel_preds_sum = accel_preds_sum + time_between_preds
                        accel_preds_mean_time = accel_preds_sum / batch_idx
                        accel_preds_times.pop(0)
                        str_2_print = f'\nBatch idx = {batch_idx}\n'
                        str_2_print += f'--- Accel between predictions: {time_between_preds.total_seconds():.2f} [secs]\n'
                        str_2_print += f'*** Mean Accel between predictions: {accel_preds_mean_time.total_seconds():.2f} [secs]\n'
                    str_2_print += f'... Accel elapsed time: {(end_accel-start_accel).microseconds/1e3:.2f} [ms]\n'
                    print(str_2_print, end='\r')
                
                event_yhat_ready.set()
                # Finish blocking of accel if needed
                event_yhat_ready_accel.set()
                batch_idx += 1
                break
            else:
                if verbose == True:
                    print(f"**** DMA Thread: yhat not ready. Sleep {sleep_time} [secs].")
                time.sleep(sleep_time)
            
    # Signal queues to stop
    print("**** DMA Thread: No more batches to do. Put None.")
    queue_yhat.put([batch_idx, None])
    time.sleep(5)
    event_yhat_ready.set() 

# Plot Thread

#### Colors

In [None]:
BLACK_COLOR = (0, 0, 0)
WHITE_COLOR = (255, 255, 255)
GRAY_COLOR = (50, 50, 50)
RED_COLOR = (0,0,255)
BLUE_COLOR = (255,255,0) # CYAN
GREEN_COLOR = (0,255,0)
YELLOW_COLOR = (0,255,255)

#### Draw Prediction in Frame Function

In [None]:
def draw_pred_box(yhat, img_to_plot, img_w, img_h):

    yhat_str = np.array2string(yhat)
    empty_str = "Empty: " + yhat_str
    smoke_str = "Smoke: " + yhat_str
    fire_str = "Fire: " + yhat_str
    smoke_fire_str = "Smoke & Fire: " + yhat_str

    # Empty
    if yhat[0] < 0.5 and yhat[1] < 0.5:
        cv2.rectangle(img_to_plot, (0,0), (img_w, img_h), GRAY_COLOR, 20)
        cv2.rectangle(img_to_plot, (0,0), (250, 35), GRAY_COLOR, -1)
        cv2.putText(img_to_plot, empty_str, (8, 25), cv2.FONT_HERSHEY_SIMPLEX, 1, WHITE_COLOR, 1) 
    # Smoke
    elif yhat[0] > 0.5 and yhat[1] < 0.5:
        cv2.rectangle(img_to_plot, (0,0), (img_w, img_h), BLUE_COLOR, 20)
        cv2.rectangle(img_to_plot, (0,0), (270, 35), BLUE_COLOR, -1)
        cv2.putText(img_to_plot, smoke_str, (8, 25), cv2.FONT_HERSHEY_SIMPLEX, 1, BLACK_COLOR, 1) 
    # Fire
    elif yhat[0] < 0.5 and yhat[1] > 0.5:
        cv2.rectangle(img_to_plot, (0,0), (img_w, img_h), RED_COLOR, 20)
        cv2.rectangle(img_to_plot, (0,0), (210, 35), RED_COLOR, -1)
        cv2.putText(img_to_plot, fire_str, (8, 25), cv2.FONT_HERSHEY_SIMPLEX, 1, BLACK_COLOR, 1) 
    # Smoke & Fire
    elif yhat[0] > 0.5 and yhat[1] > 0.5:
        cv2.rectangle(img_to_plot, (0,0), (img_w, img_h), YELLOW_COLOR, 20)
        cv2.rectangle(img_to_plot, (0,0), (500, 35), YELLOW_COLOR, -1)
        cv2.putText(img_to_plot, smoke_fire_str, (8, 25), cv2.FONT_HERSHEY_SIMPLEX, 1, BLACK_COLOR, 1)
        
    return img_to_plot

In [None]:
async def plot_frames(
    event_warm_up_ready,
    event_initial_plot_ready,
    queue_warm_up,
    event_yhat_ready,
    queue_yhat,
    queue_frames_2_plot):

    ################################
    #           Warm Up            #
    ################################
    event_warm_up_ready.wait()
    print(':):):) Plot Thread: start Initial Plot.')    
    for _ in range(queue_warm_up.qsize()):
        new_frame = queue_warm_up.get()
        jcv2.imshow('UAV', new_frame)
        queue_warm_up.task_done()
    event_initial_plot_ready.set()
    
    ################################
    #            Loop              #
    ################################
    plot_batch_idx = 0

    while True:
        event_yhat_ready.wait()
        event_yhat_ready.clear()
        yhat_batch_idx, yhat = queue_yhat.get()
        plot_frames_idx, frame_to_plot = queue_frames_2_plot.get()
        assert yhat_batch_idx == plot_batch_idx, f':) Plot Thread: plot batch idx {plot_batch_idx} does not match yhat batch idx {yhat_batch_idx}.'
        assert plot_frames_idx == plot_batch_idx, f':) Plot Thread: plot frames idx {plot_frames_idx} does not match batch idx {yhat_batch_idx}.'
        if yhat is None:
            print(':):):) Plot Thread: yhat is None received -> Execution Finished')
            break
        else:
            yhat_mean = np.mean(yhat, axis=0)
            smoke_mean = yhat_mean[0]
            fire_mean = yhat_mean[1]
            if VERBOSE:
                print(f':):):) Plot Thread: yhat\n {yhat}')
                print(f':):):) Plot Thread: smoke mean = {smoke_mean}')
                print(f':):):) Plot Thread: fire mean = {fire_mean}')
            frame_to_plot = draw_pred_box(yhat_mean, frame_to_plot, FRAME_IN_W, FRAME_IN_H)   
            jcv2.imshow('UAV', frame_to_plot)
            if jcv2.waitKey(1)==ord('q'):
                break 
            plot_batch_idx += 1
            queue_yhat.task_done()
            queue_frames_2_plot.task_done()

    queue_yhat.task_done()
    queue_frames_2_plot.task_done()
    jcv2.destroyAllWindows()

# Threads

### Setup Queues

Accel cannot start if previous prediction was not completed. Therefore, it cannot get a new batch.
Setting the Queue of Batches to 1 blocks do_batches if previous prediction was not completed.

There is a balance between this blocking and Queue Frames size, which could grow to the maximum and block/slow down the camera.

In [None]:
queue_warm_up = Queue(maxsize=WARM_UP_FRAMES)

queue_frames = Queue(maxsize=BATCH_SIZE*2) # -> BATCH_SIZE*2 is very conservative, it should be reduced
queue_batches = Queue(maxsize=1)
queue_yhat = Queue(maxsize=1)
queue_frames_2_plot = Queue(maxsize=8)

### Setup Events

In [None]:
event_warm_up_ready = Event()
event_initial_plot_ready = Event()

event_batch_ready = Event()
event_accel_exec = Event()
event_yhat_ready_accel = Event()
event_yhat_ready_accel.set() # To let first batch start in the accelerator
event_yhat_ready = Event()

### Setup Threads

In [None]:
cap_thread = Thread(
    target=get_frames, 
    args=(
        event_warm_up_ready, event_initial_plot_ready, queue_warm_up,
        queue_frames, FRAME_IN_W, FRAME_IN_H, N_FRAMES, BATCH_SIZE, WARM_UP_FRAMES, CAP_FPS, VERBOSE,))

batches_thread = Thread(
    target=do_batches, 
    args=(queue_frames, queue_batches, event_batch_ready, BATCH_SIZE, queue_frames_2_plot, SLEEP_BATCH_FRAME, VERBOSE,))

accel_thread = Thread(
    target=accel_exec, 
    args=(queue_batches, event_batch_ready, event_accel_exec, event_yhat_ready_accel, VERBOSE,))

check_dma_ready = Thread(
    target=check_dma_ready, 
    args=(event_accel_exec, queue_yhat, event_yhat_ready, event_yhat_ready_accel, N_FRAMES, BATCH_SIZE, SLEEP_DMA_POLL, VERBOSE,))

### Asyncio Thread to Plot

In [None]:
def run_event_loop(loop):
    # report a message
    print('Asyncio event loop is running')
    # set the loop for the current thread
    asyncio.set_event_loop(loop)
    # run the event loop until stopped
    loop.run_forever()
    
# create a new event loop (low-level api)
loop = asyncio.new_event_loop()

plot_thread = Thread(target=run_event_loop, args=(loop,), daemon=True)

### Start Threads

In [None]:
start_running = datetime.now()

In [None]:
cap_thread.start()
batches_thread.start()
accel_thread.start()
check_dma_ready.start()

In [None]:
plot_thread.start()
print("\nWaiting for capture thread")

future = asyncio.run_coroutine_threadsafe(plot_frames(
                                                event_warm_up_ready, event_initial_plot_ready, queue_warm_up,
                                                event_yhat_ready,
                                                queue_yhat,
                                                queue_frames_2_plot), 
                                            loop)
# wait for the task to finish
value = future.result()
# report a message
print(f'Got Async Result: {value}')

### Wait for Threads

In [None]:
# cap_thread.join()
# batches_thread.join()
# accel_thread.join()

# Main Thread Loop

### Plot inside Asyncio Loop Thread

In [None]:
# plot_thread.join()
    
end_running = datetime.now()

print(f'\nElapsed time = {end_running - start_running}.')

### Plot in Main Thread

In [None]:
# '''
# When yhat is ready, it retrives a frame from Queue Frames to Plot and yhat from Queue Yhat.
# Calculates the mean of yhat for both classes and plot it.
# '''

# plot_batch_idx = 0

# while True:
#     event_yhat_ready.wait()
#     event_yhat_ready.clear()
#     yhat_batch_idx, yhat = queue_yhat.get()
#     plot_frames_idx, frame_to_plot = queue_frames_2_plot.get()
#     assert yhat_batch_idx == plot_batch_idx, f':) Main Thread: plot batch idx {plot_batch_idx} does not match yhat batch idx {yhat_batch_idx}.'
#     assert plot_frames_idx == plot_batch_idx, f':) Main Thread: plot frames idx {plot_frames_idx} does not match batch idx {yhat_batch_idx}.'
#     if yhat is None:
#         print(':):):) Main Thread: yhat is None received -> Execution Finished')
#         break
#     else:
#         smoke_mean = np.mean(yhat[:, 0])
#         fire_mean = np.mean(yhat[:, 1])
#         if VERBOSE:
#             print(f':):):) Main Thread: yhat\n {yhat}')
#             print(f':):):) Main Thread: smoke mean = {smoke_mean}')
#             print(f':):):) Main Thread: fire mean = {fire_mean}')
#             # print(f':):):) Main Thread: yhat\n {type(yhat)}')
#             # print(f':):):) Main Thread: yhat\n {yhat.shape}')
#         jcv2.imshow('UAV', frame_to_plot)
#         if jcv2.waitKey(1)==ord('q'):
#             break 
#         plot_batch_idx += 1
#         queue_yhat.task_done()
#         queue_frames_2_plot.task_done()
        
# queue_yhat.task_done()
# queue_frames_2_plot.task_done()
# jcv2.destroyAllWindows()

# end_running = datetime.now()

# print(f'\nElapsed time = {end_running - start_running}.')