In [None]:
# Copyright (c) 2024，WuChao D-Robotics.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import cv2
import numpy as np
from scipy.special import softmax
# from scipy.special import expit as sigmoid
from time import time
from hobot_dnn import pyeasy_dnn as dnn 

In [None]:
coco_names = [
    "person", "bicycle", "car", "motorcycle", "airplane", "bus", "train", "truck", "boat", "traffic light", 
    "fire hydrant", "stop sign", "parking meter", "bench", "bird", "cat", "dog", "horse", "sheep", "cow", 
    "elephant", "bear", "zebra", "giraffe", "backpack", "umbrella", "handbag", "tie", "suitcase", "frisbee", 
    "skis", "snowboard", "sports ball", "kite", "baseball bat", "baseball glove", "skateboard", "surfboard", "tennis racket", "bottle", 
    "wine glass", "cup", "fork", "knife", "spoon", "bowl", "banana", "apple", "sandwich", "orange", 
    "broccoli", "carrot", "hot dog", "pizza", "donut", "cake", "chair", "couch", "potted plant", "bed", 
    "dining table", "toilet", "tv", "laptop", "mouse", "remote", "keyboard", "cell phone", "microwave", "oven", 
    "toaster", "sink", "refrigerator", "book", "clock", "vase", "scissors", "teddy bear", "hair drier", "toothbrush"
    ]

rdk_colors = [
    (56, 56, 255), (151, 157, 255), (31, 112, 255), (29, 178, 255),(49, 210, 207), (10, 249, 72), (23, 204, 146), (134, 219, 61),
    (52, 147, 26), (187, 212, 0), (168, 153, 44), (255, 194, 0),(147, 69, 52), (255, 115, 100), (236, 24, 0), (255, 56, 132),
    (133, 0, 82), (255, 56, 203), (200, 149, 255), (199, 55, 255)]

def draw_detection(img, box, score, class_id):
    x1, y1, x2, y2 = box
    color = rdk_colors[class_id%20]
    cv2.rectangle(img, (x1, y1), (x2, y2), color, 2)
    label = f"{coco_names[class_id]}: {score:.2f}"
    (label_width, label_height), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 1)
    label_x = x1
    label_y = y1 - 10 if y1 - 10 > label_height else y1 + 10
    cv2.rectangle(
        img, (label_x, label_y - label_height), (label_x + label_width, label_y + label_height), color, cv2.FILLED
    )
    cv2.putText(img, label, (label_x, label_y), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 0), 1, cv2.LINE_AA)

# matplotlib 绘制图像到 Jupyter Web
from  matplotlib import pyplot as plt
%matplotlib inline
def cv2_img2plt_img(cv2_img):
    if cv2_img.ndim==3: # Color image, Convert BGR to RGB
        return cv2_img[:,:,::-1] 
    else: # Grayscale image
        return cv2_img

def jshow(cv2_img, size=4):
    plt.figure(figsize=(size, size))
    plt.imshow(cv2_img2plt_img(cv2_img), plt.cm.gray)
    plt.show()

def bgr2nv12_opencv(image):
    height, width = image.shape[0], image.shape[1]
    area = height * width
    yuv420p = cv2.cvtColor(image, cv2.COLOR_BGR2YUV_I420).reshape((area * 3 // 2,))
    y = yuv420p[:area]
    uv_planar = yuv420p[area:].reshape((2, area // 4))
    uv_packed = uv_planar.transpose((1, 0)).reshape((area // 2,))
    nv12 = np.zeros_like(yuv420p)
    nv12[:height * width] = y
    nv12[height * width:] = uv_packed
    return nv12

In [None]:
# 导入bin模型
model_path = "ptq_models/yolov8x_detect_bayese_640x640_nv12_modified.bin"
begin_time = time()
quantize_model = dnn.load(model_path)
print("\033[1;31m" + f"Load D-Robotics Quantize model time = {1000*(time() - begin_time):.2f} ms" + "\033[0m")

print("-> input tensors")
for i, quantize_input in enumerate(quantize_model[0].inputs):
    print(f"intput[{i}], name={quantize_input.name}, type={quantize_input.properties.dtype}, shape={quantize_input.properties.shape}")

print("-> output tensors")
for i, quantize_input in enumerate(quantize_model[0].outputs):
    print(f"output[{i}], name={quantize_input.name}, type={quantize_input.properties.dtype}, shape={quantize_input.properties.shape}")

In [None]:
# 提前将反量化系数准备好
s_bboxes_scale = quantize_model[0].outputs[1].properties.scale_data[np.newaxis, :]
m_bboxes_scale = quantize_model[0].outputs[3].properties.scale_data[np.newaxis, :]
l_bboxes_scale = quantize_model[0].outputs[5].properties.scale_data[np.newaxis, :]
print(f"{s_bboxes_scale.shape=}, {m_bboxes_scale.shape=}, {l_bboxes_scale.shape=}")

# DFL求期望的系数, 只需要生成一次
weights_static = np.array([i for i in range(16)]).astype(np.float32)[np.newaxis, np.newaxis, :]
print(f"{weights_static.shape = }")

# 输入图像大小, 一些阈值, 提前计算好
REG = 16
print(f"{REG = }")

CLASSES_NUM = 80
print(f"{CLASSES_NUM = }")

SCORE_THRESHOLD = 0.25
NMS_THRESHOLD = 0.7
CONF_THRES_RAW = -np.log(1/SCORE_THRESHOLD - 1)
print("SCORE_THRESHOLD  = %.2f, NMS_THRESHOLD = %.2f"%(SCORE_THRESHOLD, NMS_THRESHOLD))
print("CONF_THRES_RAW = %.2f"%CONF_THRES_RAW)

input_H, input_W = quantize_model[0].inputs[0].properties.shape[2:4]
print(f"{input_H = }, {input_W = }")

RESIZE_TYPE = 0
LETTERBOX_TYPE = 1
PREPROCESS_TYPE = LETTERBOX_TYPE
print("LETTERBOX_TYPE" if PREPROCESS_TYPE == LETTERBOX_TYPE else "PREPROCESS_TYPE")

# grid, 只需要生成一次
s_grid = np.stack([np.tile(np.linspace(0.5, 79.5, 80), reps=80), 
                     np.repeat(np.arange(0.5, 80.5, 1), 80)], axis=0).transpose(1,0)
m_grid = np.stack([np.tile(np.linspace(0.5, 39.5, 40), reps=40), 
                     np.repeat(np.arange(0.5, 40.5, 1), 40)], axis=0).transpose(1,0)
l_grid = np.stack([np.tile(np.linspace(0.5, 19.5, 20), reps=20), 
                     np.repeat(np.arange(0.5, 20.5, 1), 20)], axis=0).transpose(1,0)
print(f"{s_grid.shape = }  {m_grid.shape = }  {l_grid.shape = }")

In [None]:
# 读取一张bgr8格式的图像, 并进行可视化
begin_time = time()
img = cv2.imread("../../../../resource/datasets/COCO2017/assets/bus.jpg")
print("\033[1;31m" + f"cv2.imread time = {1000*(time() - begin_time):.2f} ms" + "\033[0m")
print(f"{img.shape = }")
jshow(img)

img_h, img_w = img.shape[0:2]
if PREPROCESS_TYPE == RESIZE_TYPE:
    # 利用resize的方式进行前处理, 准备nv12的输入数据
    begin_time = time()
    input_tensor = cv2.resize(img, (input_W, input_H), interpolation=cv2.INTER_NEAREST) # 利用resize重新开辟内存节约一次
    input_tensor = bgr2nv12_opencv(input_tensor)
    y_scale = 1.0 * input_H / img_h
    x_scale = 1.0 * input_W / img_w
    y_shift = 0;
    x_shift = 0;
    print("\033[1;31m" + f"pre process(resize) time = {1000*(time() - begin_time):.2f} ms" + "\033[0m")
    print(f"{input_tensor.shape = }")
elif PREPROCESS_TYPE == LETTERBOX_TYPE:
    # 利用 letter box 的方式进行前处理, 准备nv12的输入数据
    begin_time = time()
    x_scale = min(1.0 * input_H / img_h, 1.0 * input_W / img_w)
    y_scale = x_scale
    
    if x_scale <= 0 or y_scale <= 0:
        raise ValueError("Invalid scale factor.")
    
    new_w = int(img_w * x_scale)
    x_shift = (input_W - new_w) // 2
    x_other = input_W - new_w - x_shift
    
    new_h = int(img_h * y_scale)
    y_shift = (input_H - new_h) // 2
    y_other = input_H - new_h - y_shift
    
    input_tensor = cv2.resize(img, (new_w, new_h))
    
    input_tensor = cv2.copyMakeBorder(input_tensor, y_shift, y_other, x_shift, x_other, cv2.BORDER_CONSTANT, value=[127, 127, 127])
    input_tensor = bgr2nv12_opencv(input_tensor)
    print("\033[1;31m" + f"pre process(letter box) time = {1000*(time() - begin_time):.2f} ms" + "\033[0m")
    print(f"{input_tensor.shape = }")
else:
    print(f"illegal PREPROCESS_TYPE = {PREPROCESS_TYPE}")
    exit(-1)

In [None]:
# 推理
begin_time = time()
quantize_outputs = quantize_model[0].forward(input_tensor)
print("\033[1;31m" + f"forward time = {1000*(time() - begin_time):.2f} ms" + "\033[0m")

In [None]:
# c to np
begin_time = time()
s_clses = quantize_outputs[0].buffer
s_bboxes = quantize_outputs[1].buffer
m_clses = quantize_outputs[2].buffer
m_bboxes = quantize_outputs[3].buffer
l_clses = quantize_outputs[4].buffer
l_bboxes = quantize_outputs[5].buffer
print("\033[1;31m" + f"c to numpy time = {1000*(time() - begin_time):.2f} ms" + "\033[0m")

print(f"{s_bboxes.shape = }  {m_bboxes.shape = }  {l_bboxes.shape = }")
print(f"{s_clses.shape = }   {m_clses.shape = }   {l_clses.shape = }")

print(f"{s_bboxes.dtype = }  {m_bboxes.dtype = }  {l_bboxes.dtype = }")
print(f"{s_clses.dtype = }   {m_clses.dtype = }   {l_clses.dtype = }")

In [None]:
# reshape
begin_time = time()
s_bboxes = s_bboxes.reshape(-1, REG * 4)
m_bboxes = m_bboxes.reshape(-1, REG * 4)
l_bboxes = l_bboxes.reshape(-1, REG * 4)
s_clses = s_clses.reshape(-1, CLASSES_NUM)
m_clses = m_clses.reshape(-1, CLASSES_NUM)
l_clses = l_clses.reshape(-1, CLASSES_NUM)
print("\033[1;31m" + f"reshape time = {1000*(time() - begin_time):.2f} ms" + "\033[0m")

print(f"{s_bboxes.shape = }  {m_bboxes.shape = }  {l_bboxes.shape = }")
print(f"{s_clses.shape = }   {m_clses.shape = }   {l_clses.shape = }")

print(f"{s_bboxes.dtype = }  {m_bboxes.dtype = }  {l_bboxes.dtype = }")
print(f"{s_clses.dtype = }   {m_clses.dtype = }   {l_clses.dtype = }")

In [None]:
# classify: 利用numpy向量化操作完成阈值筛选（优化版 2.0）
begin_time = time()
s_max_scores = np.max(s_clses, axis=1)
s_valid_indices = np.flatnonzero(s_max_scores >= CONF_THRES_RAW)  # 得到大于阈值分数的索引，此时为小数字
s_ids = np.argmax(s_clses[s_valid_indices, : ], axis=1)
s_scores = s_max_scores[s_valid_indices]

m_max_scores = np.max(m_clses, axis=1)
m_valid_indices = np.flatnonzero(m_max_scores >= CONF_THRES_RAW)  # 得到大于阈值分数的索引，此时为小数字
m_ids = np.argmax(m_clses[m_valid_indices, : ], axis=1)
m_scores = m_max_scores[m_valid_indices]

l_max_scores = np.max(l_clses, axis=1)
l_valid_indices = np.flatnonzero(l_max_scores >= CONF_THRES_RAW)  # 得到大于阈值分数的索引，此时为小数字
l_ids = np.argmax(l_clses[l_valid_indices, : ], axis=1)
l_scores = l_max_scores[l_valid_indices]
print("\033[1;31m" + f"Small, Medium, Big Feature Map Conf Threshold time = {1000*(time() - begin_time):.2f} ms" + "\033[0m")

print(f"{s_scores.shape = }  {s_ids.shape = }  {s_valid_indices.shape = }")
print(f"{m_scores.shape = }  {m_ids.shape = }  {m_valid_indices.shape = }")
print(f"{l_scores.shape = }  {l_ids.shape = }  {l_valid_indices.shape = }")

In [None]:
# 3个Classify分类分支：Sigmoid计算
begin_time = time()
s_scores = 1 / (1 + np.exp(-s_scores))
m_scores = 1 / (1 + np.exp(-m_scores))
l_scores = 1 / (1 + np.exp(-l_scores))
print("\033[1;31m" + f"sigmoid time = {1000*(time() - begin_time):.2f} ms" + "\033[0m")

In [None]:
# 3个Bounding Box分支：反量化
begin_time = time()
s_bboxes_float32 = s_bboxes[s_valid_indices,:].astype(np.float32) * s_bboxes_scale
m_bboxes_float32 = m_bboxes[m_valid_indices,:].astype(np.float32) * m_bboxes_scale
l_bboxes_float32 = l_bboxes[l_valid_indices,:].astype(np.float32) * l_bboxes_scale
print("\033[1;31m" + f"Bounding Box Dequantized time = {1000*(time() - begin_time):.2f} ms" + "\033[0m")

print(f"{s_bboxes_float32.shape = }")
print(f"{m_bboxes_float32.shape = }")
print(f"{l_bboxes_float32.shape = }")

In [None]:
# 3个Bounding Box分支：dist2bbox（ltrb2xyxy） transpose
begin_time = time()

s_ltrb_indices = np.sum(softmax(s_bboxes_float32.reshape(-1, 4, 16), axis=2) * weights_static, axis=2)
s_grid_indices = s_grid[s_valid_indices, :]
s_x1y1 = s_grid_indices - s_ltrb_indices[:, 0:2]
s_x2y2 = s_grid_indices + s_ltrb_indices[:, 2:4]
s_dbboxes = np.hstack([s_x1y1, s_x2y2])*8

m_ltrb_indices = np.sum(softmax(m_bboxes_float32.reshape(-1, 4, 16), axis=2) * weights_static, axis=2)
m_grid_indices = m_grid[m_valid_indices, :]
m_x1y1 = m_grid_indices - m_ltrb_indices[:, 0:2]
m_x2y2 = m_grid_indices + m_ltrb_indices[:, 2:4]
m_dbboxes = np.hstack([m_x1y1, m_x2y2])*16

l_ltrb_indices = np.sum(softmax(l_bboxes_float32.reshape(-1, 4, 16), axis=2) * weights_static, axis=2)
l_grid_indices = l_grid[l_valid_indices,:]
l_x1y1 = l_grid_indices - l_ltrb_indices[:, 0:2]
l_x2y2 = l_grid_indices + l_ltrb_indices[:, 2:4]
l_dbboxes = np.hstack([l_x1y1, l_x2y2])*32
print("\033[1;31m" + f"dist2bbox（ltrb2xyxy） time = {1000*(time() - begin_time):.2f} ms" + "\033[0m")

In [None]:
# 大中小特征层阈值筛选结果拼接
begin_time = time()
dbboxes = np.concatenate((s_dbboxes, m_dbboxes, l_dbboxes), axis=0)
scores = np.concatenate((s_scores, m_scores, l_scores), axis=0)
ids = np.concatenate((s_ids, m_ids, l_ids), axis=0)

## xyxy 2 xyhw
xy = (dbboxes[:,2:4] + dbboxes[:,0:2])/2.0
hw = (dbboxes[:,2:4] - dbboxes[:,0:2])
xyhw = np.hstack([xy, hw])
print("\033[1;31m" + f"concat time = {1000*(time() - begin_time):.2f} ms" + "\033[0m")

print(f"{dbboxes.shape = }  {scores.shape = }  {ids.shape = }")

In [None]:
results = []
for i in range(CLASSES_NUM):
    id_indices = ids==i
    indices = cv2.dnn.NMSBoxes(xyhw[id_indices,:], scores[id_indices], SCORE_THRESHOLD, NMS_THRESHOLD)
    if len(indices) == 0:
        continue
    for indic in indices:
        x1, y1, x2, y2 = dbboxes[id_indices,:][indic]
        # scale and shift
        x1 = int((x1 - x_shift) / x_scale)
        y1 = int((y1 - y_shift) / y_scale)
        x2 = int((x2 - x_shift) / x_scale)
        y2 = int((y2 - y_shift) / y_scale)
        # clip
        x1 = x1 if x1 > 0 else 0
        x2 = x2 if x2 > 0 else 0
        y1 = y1 if y1 > 0 else 0
        y2 = y2 if y2 > 0 else 0
        x1 = x1 if x1 < img_w else img_w
        x2 = x2 if x2 < img_w else img_w
        y1 = y1 if y1 < img_h else img_h
        y2 = y2 if y2 < img_h else img_h
        results.append((i, scores[id_indices][indic], x1, y1, x2, y2))

In [None]:
# 绘制
draw_img = img.copy()
begin_time = time()
for class_id, score, x1, y1, x2, y2 in results:
    print("(%d, %d, %d, %d) -> %s: %.2f"%(x1,y1,x2,y2, coco_names[class_id], score))
    draw_detection(draw_img, (x1, y1, x2, y2), score, class_id)
jshow(draw_img, 10)
print("\033[1;31m" + f"Draw Result time = {1000*(time() - begin_time):.2f} ms" + "\033[0m")