In [None]:
# Import the required modules
import cv2
import time
import PIL.Image
from io import BytesIO
import IPython.display
import numpy as np

# Import the Inference Engine
from openvino.inference_engine import IECore, IENetwork

In [None]:
MODEL_XML = '/Users/demidovs/Documents/Work/repositories/workbench/wb/data/models/14/original/face-detection-adas-0001.xml'
MODEL_BIN = '/Users/demidovs/Documents/Work/repositories/workbench/wb/data/models/14/original/face-detection-adas-0001.bin'

In [None]:
# Prepare IE
ie_core = IECore()

network = ie_core.read_network(MODEL_XML, MODEL_BIN)
network_input_name = next(iter(network.input_info))
network_input_blob = network.input_info[network_input_name].input_data
batch, channels, input_layer_h, input_layer_w = network_input_blob.shape

print(f'Input shape of the network is [{batch}, {channels}, {input_layer_h}, {input_layer_w}]')

network_output_blob = next(iter(network.outputs))
print(f'Network outputs: {network_output_blob}')

DEVICE = 'CPU'
network_loaded_to_device = ie_core.load_network(network, DEVICE)

In [None]:
# Pre-processing
def pre_process_frame_to_network_input(input_frame: np.ndarray, batch: int, channels: int, input_layer_height: int, input_layer_width: int) -> np.ndarray:
    # Resize the frame to the network input 
    resized_frame = cv2.resize(input_frame, (input_layer_width, input_layer_height))
    
    # Change the data layout from HWC to CHW
    transposed_frame = resized_frame.transpose((2, 0, 1))  
    
    # Reshape the frame to the network input 
    reshaped_frame = transposed_frame.reshape((batch, channels, input_layer_height, input_layer_width))
    
    return reshaped_frame

In [None]:
def face_detection_inference(input_frame: np.ndarray) -> np.ndarray:
    feed_dict = {
        network_input_name: input_frame
    }
    
    # All is ready for the main thing - inference!
    # You have read and loaded the network to the device, prepared input data and now you are ready to infer.
    
    # Step 11:
    # To start an inference, call the `infer` function of the `network_loaded_to_device` variable. 
    # We must set input data (a dictionary).
    inference_result = network_loaded_to_device.infer(feed_dict)
    
    # Great! The `inference_result` variable contains output data after inference of the network.
    # `inference_result` is a dictionary, 
    #  where key is the name of the output name, 
    #        value is data from the blob.
    
    return inference_result[network_output_blob]

In [None]:
def _add_confidence_label(original_frame: np.ndarray, confidence: float, coordinates):
    # Draw a box and a label
    color = (0, 255, 0)
    
    # Create the title of an object
    text = f'{round(confidence * 100, 2)}%'
    
    # Put the title to a frame
    cv2.putText(original_frame, text, coordinates, cv2.FONT_HERSHEY_COMPLEX, 2, color, 2)

    
def _visualize_detection(original_frame: np.ndarray, bbox):
    xmin, ymin, xmax, ymax = bbox
    face_region = original_frame[ymin:ymax, xmin:xmax]
    
    if face_region.size == 0:
        return
    
    # original_frame[ymin:ymax, xmin:xmax] = _blur_region(face_region)
    original_frame[ymin:ymax, xmin:xmax] = _pixelize_region(face_region)


def _blur_region(region: np.ndarray) -> np.ndarray:
    return cv2.GaussianBlur(region, (23, 23), 50)


def _pixelize_region(region: np.ndarray) -> np.ndarray:
    height, width = region.shape[:2]
    pixels_count = 16
    temp = cv2.resize(region, (pixels_count, pixels_count), interpolation=cv2.INTER_LINEAR)
    return cv2.resize(temp, (width, height), interpolation=cv2.INTER_NEAREST)


def process_detection(original_frame: np.ndarray, detection: np.ndarray):       
    confidence =  detection[2]
    if confidence < 0.3:
        return
    frame_h, frame_w = original_frame.shape[:2]
    xmin = int(detection[3] * frame_w)
    ymin = int(detection[4] * frame_h)
    xmax = int(detection[5] * frame_w)
    ymax = int(detection[6] * frame_h)
    
    _add_confidence_label(original_frame, confidence, coordinates=(xmin, ymin - 7))
    
    _visualize_detection(original_frame, bbox=(xmin, ymin, xmax, ymax))

In [None]:
# Utility functions

# Use 'jpeg' instead of 'png' (~5 times faster)
def array_to_image(array: np.ndarray, format: str = 'jpeg'):
    binary_stream = BytesIO()
    PIL.Image.fromarray(array).save(binary_stream, format)
    return IPython.display.Image(data=binary_stream.getvalue())


def get_frame(video_capture: cv2.VideoCapture) -> np.ndarray:
    _, frame = video_capture.read()
    
    # Flip image for natural viewing
    frame = cv2.flip(frame, 1)
    
    frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    
    return frame

In [None]:
video_capture = cv2.VideoCapture(0)
try:
    width  = video_capture.get(cv2.CAP_PROP_FRAME_WIDTH)
    height = video_capture.get(cv2.CAP_PROP_FRAME_HEIGHT)

    video_display = IPython.display.display('', display_id=1)
    fps_label_display = IPython.display.display('', display_id=2)

    while True:
        try:
            t1 = time.time()
            frame = get_frame(video_capture)

            pre_processed_frame = pre_process_frame_to_network_input(frame, batch, channels, input_layer_h, input_layer_w)

            inference_result = face_detection_inference(pre_processed_frame)

            for detected_face in inference_result[0][0]:
                process_detection(frame, detected_face)


            image = array_to_image(frame)
            video_display.update(image)

            t2 = time.time()

            s = f"""{int(1/(t2-t1))} FPS"""
            fps_label_display.update( IPython.display.HTML(s) )
        except KeyboardInterrupt:
            print()
            IPython.display.clear_output()
            print ("Stream stopped")
            break
finally:
    video_capture.release()