In [1]:
import cv2
import numpy as np

import pycuda.driver as cuda
import pycuda.autoinit
import tensorrt as trt

import matplotlib.pyplot as plt
import matplotlib.patches as patches

from itertools import product

import tkinter as tk

# G-Streamer Pipelines

In [2]:
def gstr_in():
    return (
        "v4l2src device=/dev/video0 ! "
        "video/x-h264, width=640, height=480, framerate=20/1 ! "
        "h264parse ! "
        "nvv4l2decoder ! "
        "nvvidconv ! "
        "video/x-raw, format=(string)BGRx ! "
        "queue max-size-buffers=10 max-size-time=0 max-size-bytes=0 leaky=downstream drop=True ! "
        "appsink"
    )

pipe_in  = gstr_in()
pipe_in  = cv2.VideoCapture(pipe_in, cv2.CAP_GSTREAMER)

Opening in BLOCKING MODE 


NvMMLiteOpen : Block : BlockType = 261 
NVMEDIA: Reading vendor.tegra.display-size : status: 6 
NvMMLiteBlockCreate : Block : BlockType = 261 


In [3]:
width  = int(pipe_in.get(cv2.CAP_PROP_FRAME_HEIGHT))
height = int(pipe_in.get(cv2.CAP_PROP_FRAME_WIDTH))
fps    = int(pipe_in.get(cv2.CAP_PROP_FPS))

def gstr_out():
    return (
        "appsrc ! "
        "videoconvert ! "
        "video/x-raw, format=(string)BGRx ! "
        "nvvidconv ! "
        "nvv4l2h264enc ! "
        "h264parse ! "
        "rtph264pay config-interval=1 pt=96 ! "
        "udpsink host=192.168.1.144 port=5000"
    )

pipe_out = gstr_out()
pipe_out = cv2.VideoWriter(pipe_out, cv2.CAP_GSTREAMER, 0, fps, (height, width), True)

Opening in BLOCKING MODE 


# Cuda Needs

In [4]:
def get_engine(engine_file_path=None):
    with open(engine_file_path, "rb") as f, trt.Runtime(trt.Logger(trt.Logger.WARNING)) as runtime:
        return runtime.deserialize_cuda_engine(f.read())
    
def allocate_buffers_um_pinned(engine):
    
    inputs, outputs, bindings = [], [], []
    
    for binding in engine:

        size  = trt.volume(engine.get_binding_shape(binding)) #* engine.max_batch_size
        dtype = trt.nptype(engine.get_binding_dtype(binding))

        # Allocate SINGLE pinned memory buffer
        mem = cuda.pagelocked_empty(size, dtype)
        
        # Append the device buffer to device bindings.
        bindings.append(int(mem.base.get_device_pointer()))
        
        # Append to the appropriate list.
        if engine.binding_is_input(binding): inputs.append(mem)
        else:                                outputs.append(mem)
    
    return inputs, outputs, bindings


NvMMLiteOpen : Block : BlockType = 4 
===== NVMEDIA: NVENC =====
NvMMLiteBlockCreate : Block : BlockType = 4 


# Dynamic Patameters

In [5]:
# Initial value
lmks_b = 0
eyes_b, mouth_b, face_b, bbox_button, laser_lines = False, False, False, False, False

def set_lmks():
    global lmks_b
    lmks_b = int(entry.get())

def update_eyes():
    global eyes_b
    eyes_b = not eyes_b

def update_mouth():
    global mouth_b
    mouth_b = not mouth_b

def update_face():
    global face_b
    face_b = not face_b

def update_laser():
    global laser_lines
    laser_lines = not laser_lines

def update_bbox():
    global bbox_button
    bbox_button = not bbox_button

def update_var(val):
    # The value returned is a string, convert it to int
    bin_thr.set(int(val))

root = tk.Tk()

# Create an Entry widget
entry = tk.Entry(root)
entry.pack()

# Create a Button that will call set_param when clicked
lmks_button = tk.Button(root, text='lmks_idx', command=set_lmks)
lmks_button.pack()

eyes_check = tk.Checkbutton(root, text="eyes", command=update_eyes)
eyes_check.pack()

mouth_check = tk.Checkbutton(root, text="mouth", command=update_mouth)
mouth_check.pack()

face_check = tk.Checkbutton(root, text="face", command=update_face)
face_check.pack()

bbox_check = tk.Checkbutton(root, text="bbox", command=update_bbox)
bbox_check.pack()

laser_check = tk.Checkbutton(root, text="laser", command=update_laser)
laser_check.pack()

# create a variable to store the current value of the scroll bar
bin_thr = tk.IntVar()
# create a Scale widget
scale = tk.Scale(root, from_=0, to=255, orient=tk.HORIZONTAL, command=update_var)
scale.pack()

# Pre/Post-processing Functions

In [6]:
#DexiNed
#This implementation is based on the DexiNed from X. Soria and E. Riba and A. Sappa
#Source: https://github.com/xavysp/DexiNed

def dexi_pre(frame):

    frame = np.array(frame, dtype=np.float32)
    frame -= [103.939, 116.779, 123.68] #Mean subtraction
    frame = np.transpose(frame, (2, 0, 1)) #Move channels to the first index
    frame = np.expand_dims(frame, axis=0) 
    frame = frame.ravel()

    return frame

def sig(x): return 1 / (1 + np.exp(-x))

def dexi_post(frame):

    frame = 1 / (1 + np.exp(-frame))
    frame = frame * 255 #Normalization also possible
    frame = frame.reshape((480, 640))
    _, frame = cv2.threshold(frame, bin_thr.get(), 255, cv2.THRESH_BINARY)
    frame = cv2.cvtColor(frame, cv2.COLOR_GRAY2BGR)
    #frame = frame.astype(np.uint8)

    return frame


In [7]:
#YuNet
#This implementation is based on the YuNet from the OpenCV Zoo
#Source: https://github.com/opencv/opencv_zoo/blob/bfac311b2b30de4648307d9939d2f9e33e012007/models/face_detection_yunet/yunet.py

def facenet_pre(image):
    image = cv2.resize(image, (160, 120))
    image = np.asarray(image, dtype="float32")
    image = image.transpose(2, 0, 1)
    image = image.ravel()
    
    return image

min_sizes = [[10, 16, 24], [32, 48], [64, 96], [128, 192, 256]]
steps     = [8, 16, 32, 64]
variance  = [0.1, 0.2]

def priorGen(min_sizes, steps):
    
    w, h = (160, 120)
    feature_map_2th = [int(int((h + 1) / 2) / 2), int(int((w + 1) / 2) / 2)]
    feature_map_3th = [int(feature_map_2th[0] / 2), int(feature_map_2th[1] / 2)]
    feature_map_4th = [int(feature_map_3th[0] / 2), int(feature_map_3th[1] / 2)]
    feature_map_5th = [int(feature_map_4th[0] / 2), int(feature_map_4th[1] / 2)]
    feature_map_6th = [int(feature_map_5th[0] / 2), int(feature_map_5th[1] / 2)]

    feature_maps = [
        feature_map_3th,
        feature_map_4th,
        feature_map_5th,
        feature_map_6th,
    ]

    priors = []
    for k, f in enumerate(feature_maps):
        _min_sizes = min_sizes[k]
        
        for i, j in product(range(f[0]), range(f[1])):  # i->h, j->w
            for min_size in _min_sizes:
                s_kx = min_size / w
                s_ky = min_size / h

                cx = (j + 0.5) * steps[k] / w
                cy = (i + 0.5) * steps[k] / h

                priors.append([cx, cy, s_kx, s_ky])

    return np.array(priors, dtype=np.float32) #priors

def decode(outputBlob, priors, variance):

    loc  = np.array(outputBlob[0]).reshape([-1, 14])
    conf = np.array(outputBlob[1]).reshape([-1, 2])
    iou  = np.array(outputBlob[2]).reshape([-1, 1])

    # get score
    cls_scores = conf[:, 1]
    iou_scores = iou[:, 0]

    # clamp
    _idx = np.where(iou_scores < 0.0)
    iou_scores[_idx] = 0.0

    _idx = np.where(iou_scores > 1.0)
    iou_scores[_idx] = 1.0
    
    scores = np.sqrt(cls_scores * iou_scores)
    scores = scores[:, np.newaxis]
    
    scale = np.array((160, 120))

    # get bboxes
    bboxes = np.hstack(((priors[:, 0:2]+ loc[:, 0:2] * variance[0] * priors[:, 2:4]) * scale, (priors[:, 2:4] * np.exp(loc[:, 2:4] * variance)) * scale,))
    # (x_c, y_c, w, h) -> (x1, y1, w, h)
    bboxes[:, 0:2] -= bboxes[:, 2:4] / 2
    
    # get landmarks
    landmarks = np.hstack(
        (
            (priors[:, 0:2] + loc[:, 4:6] * variance[0] * priors[:, 2:4]) * scale,
            (priors[:, 0:2] + loc[:, 6:8] * variance[0] * priors[:, 2:4]) * scale,
            (priors[:, 0:2] + loc[:, 8:10] * variance[0] * priors[:, 2:4]) * scale,
            (priors[:, 0:2] + loc[:, 10:12] * variance[0] * priors[:, 2:4]) * scale,
            (priors[:, 0:2] + loc[:, 12:14] * variance[0] * priors[:, 2:4]) * scale,
        )
    )

    return np.hstack((bboxes, landmarks, scores))

priors = priorGen(min_sizes, steps)

def facedet_post(output_blob, priors=priors, variance=variance):
    # Decode
    dets = decode(output_blob, priors, variance)

    # NMS
    keepIdx = cv2.dnn.NMSBoxes(
        bboxes          =dets[:, 0:4].tolist(),
        scores          =dets[:, -1].tolist(),
        score_threshold =0.6, #self._conf_threshold,
        nms_threshold   =0.3, #self._nms_threshold,
        top_k           =5000,#self._top_k,
    )  # box_num x class_num

    if len(keepIdx) > 0:
        dets = dets[keepIdx]
        #dets = np.squeeze(dets, axis=1)
        return dets[: 5000] #self._top_k
    
    else:
        return np.empty(shape=(0, 15))

#Calculate the scaling factors
old_size = (160, 120)
new_size = (640, 480)
scale_w = new_size[0] / old_size[0]
scale_h = new_size[1] / old_size[1]

# Additional Landmarks

In [8]:
lmks_face       = np.arange(0,60)
lmks_face_outer = np.append(np.arange(0,17), np.arange(17,27)[::-1])
lmks_eyes       = np.arange(36,48)
lmks_nose       = np.arange(27,36)
lmks_mouth      = np.arange(48,60)

lmks_left_eye  = np.arange(36,42)
lmks_right_eye = np.arange(42,48)

lmks = [[], lmks_face, lmks_eyes, lmks_nose, lmks_mouth]

# Drawing Functions

In [9]:
def enlarge_hull(hull, scale=2):
    # Calculate the center of the hull
    center = hull.mean(axis=0)

    # Initialize a new hull for the result
    new_hull = []

    # For each point in the hull
    for point in hull:
        # Calculate the vector from the center to the point
        vector = point - center

        # Scale the vector
        scaled_vector = vector * scale

        # Calculate the new point and add it to the new hull
        new_point = center + scaled_vector
        new_hull.append(new_point)

    return np.array(new_hull).astype(int) # Converting to int for cv2 compatibility


In [10]:
def draw_lines_between_coords(img, coords1, coords2, color=(0, 255, 0), thickness=2):
    """
    Draw lines on an image between all pairs of points from two lists of coordinates.

    Args:
        img (numpy.array): The image on which to draw the lines.
        coords1 (list of tuples): The first list of coordinates. Each coordinate is a tuple (x, y).
        coords2 (list of tuples): The second list of coordinates. Each coordinate is a tuple (x, y).
        color (tuple, optional): The color of the lines in BGR format. Defaults to green.
        thickness (int, optional): The thickness of the lines. Defaults to 2.

    Returns:
        numpy.array: The image with the lines drawn.
    """
    for pt1 in coords1:
        for pt2 in coords2:
            cv2.line(img, pt1, pt2, color, thickness)

    return img


In [11]:
def enlarge_circle(coords, scale=1.5):
    
    # Compute the center of the coordinates
    center = np.mean(coords, axis=0)
    
    # Shift coordinates so that center is at origin
    shifted_coords = coords - center
    
    # Compute the radius and angle of each point
    radii = np.sqrt(np.sum(shifted_coords**2, axis=1))
    angles = np.arctan2(shifted_coords[:,1], shifted_coords[:,0])
    
    # Scale the radius
    new_radii = radii * scale
    
    # Compute the new coordinates
    new_coords = np.zeros_like(coords)
    new_coords[:,0] = new_radii * np.cos(angles) + center[0]
    new_coords[:,1] = new_radii * np.sin(angles) + center[1]
    
    return new_coords

# Main Loop

In [12]:
dir_engine_dexi    = '/home/gzguevara/laser/engines/dexi_block4_640x480_fp16.trt'
dir_engine_facedet = '/home/gzguevara/laser/engines/face_detection_yunet_2022mar.trt'
dir_engine_lmk     = '/home/gzguevara/laser/engines/landmarks.trt'

#Build an engine
engine_dexi    = get_engine(dir_engine_dexi)
engine_facedet = get_engine(dir_engine_facedet)
engine_lmk     = get_engine(dir_engine_lmk)

#Allocate buffers and create a stream
stream_dexi    = cuda.Stream()
stream_facedet = cuda.Stream()
stream_lmk     = cuda.Stream()

#Get locations
in_dexi, out_dexi, bind_dexi          = allocate_buffers_um_pinned(engine_dexi)
in_facedet, out_facedet, bind_facedet = allocate_buffers_um_pinned(engine_facedet)
in_lmk, out_lmk, bind_lmk             = allocate_buffers_um_pinned(engine_lmk)

[05/25/2023-14:03:24] [TRT] [W] Using an engine plan file across different models of devices is not recommended and is likely to affect performance or even cause errors.
[05/25/2023-14:03:24] [TRT] [W] Using an engine plan file across different models of devices is not recommended and is likely to affect performance or even cause errors.


In [13]:
with engine_dexi.create_execution_context() as context_dexi, \
     engine_facedet.create_execution_context() as context_facedet, \
     engine_lmk.create_execution_context() as context_lmk:

    while pipe_in.isOpened():

        ret, frame = pipe_in.read()
        frame = frame[:,:,:3]
        if not ret: break
        
        np.copyto(in_dexi[0], dexi_pre(frame))
        np.copyto(in_facedet[0], facenet_pre(frame))

        context_dexi.execute_async_v2(bindings=bind_dexi, stream_handle=stream_dexi.handle)
        context_facedet.execute_async_v2(bindings=bind_facedet, stream_handle=stream_facedet.handle)

        stream_dexi.synchronize()
        stream_facedet.synchronize()

        dexi_frame = dexi_post(out_dexi[0])
        bboxs      = facedet_post(out_facedet)

        for det in (bboxs if bboxs is not None else []):
            
            bbox = det[0:4].astype(np.int32)
            
            # Scale the bounding box to the new size
            #if bbox[0] < 0: bbox[0] = 0
            x_new = int(bbox[0] * scale_w) - 5
            y_new = int(bbox[1] * scale_h) - 7
            w_new = int(bbox[2] * scale_w) + 10
            h_new = int(bbox[3] * scale_h) + 14

            # Draw the bounding box on the larger image
            if bbox_button:
                cv2.rectangle(dexi_frame, (x_new, y_new), (x_new + w_new, y_new + h_new), (255, 0, 0), 2)

            #Crop out face
            face = frame[y_new:y_new+h_new, x_new:x_new+w_new]
            if 0 in face.shape: continue
            face = cv2.resize(face, (80,80))
            face = cv2.cvtColor(face, cv2.COLOR_BGR2GRAY)

            np.copyto(in_lmk[0], face.ravel())
            context_lmk.execute_async_v2(bindings=bind_lmk, stream_handle=stream_lmk.handle)
            stream_lmk.synchronize()

            # Denormalize the landmarks to the 120x160 size (bbox space)
            landmarks_denorm = (out_lmk[1].reshape((80,2)) / 80 * [w_new, h_new] + [x_new, y_new]).astype(np.int32)

            #### Draw on edge map ###

            #Draw Hulls
            hulls = []

            #Draw Lines
            if laser_lines:
                large_l = enlarge_circle(landmarks_denorm[lmks_left_eye])[[0, 3, 4, 5]]
                large_r = enlarge_circle(landmarks_denorm[lmks_right_eye])[[0, 3, 4, 5]]
                eyes_line = np.append(large_l, large_r, axis=0)
                cv2.fillPoly(dexi_frame, [cv2.convexHull(landmarks_denorm[lmks_face_outer])], color=(0,0,0))
                dexi_frame = draw_lines_between_coords(dexi_frame, eyes_line, landmarks_denorm[[2,4,6,8,10,12,14]], color=(0, 255, 0), thickness=1)
            
            #Cover areas
            if eyes_b:
                hulls.append(enlarge_hull(cv2.convexHull(landmarks_denorm[lmks_left_eye])))
                hulls.append(enlarge_hull(cv2.convexHull(landmarks_denorm[lmks_right_eye])))
            if mouth_b:
                hulls.append(cv2.convexHull(landmarks_denorm[lmks_mouth]))
            if face_b:
                hulls.append(cv2.convexHull(landmarks_denorm[lmks_face_outer]))
            for hull in hulls:
                cv2.fillPoly(dexi_frame, [hull], color=(0,0,0))

            # Now, landmarks_rescaled_to_512_512 contains the coordinates of the landmarks in the 512x512 image.
            for (x, y) in landmarks_denorm[lmks[lmks_b]]: 
                cv2.circle(dexi_frame, (int(x), int(y)), 2, (0, 255, 0), -1)


        dexi_frame = dexi_frame.astype(np.uint8)
    
        pipe_out.write(dexi_frame)
        root.update() 
    
pipe_in.release()
pipe_out.release()

H264: Profile = 66, Level = 0 
NVMEDIA: Need to set EMC bandwidth : 84000 
NVMEDIA_ENC: bBlitMode is set to TRUE 


# DEV

In [None]:
def enlarge_circle(coords, scale=1.5):
    
    # Compute the center of the coordinates
    center = np.mean(coords, axis=0)
    
    # Shift coordinates so that center is at origin
    shifted_coords = coords - center
    
    # Compute the radius and angle of each point
    radii = np.sqrt(np.sum(shifted_coords**2, axis=1))
    angles = np.arctan2(shifted_coords[:,1], shifted_coords[:,0])
    
    # Scale the radius
    new_radii = radii * scale
    
    # Compute the new coordinates
    new_coords = np.zeros_like(coords)
    new_coords[:,0] = new_radii * np.cos(angles) + center[0]
    new_coords[:,1] = new_radii * np.sin(angles) + center[1]
    
    return new_coords
