In [1]:
import os
os.environ['CUDA_MODULE_LOADING'] = 'LAZY'
import time
import contextlib
from collections import namedtuple, OrderedDict

import cv2
import torch
import numpy as np
import tensorrt as trt
import pycuda.driver as cuda

In [2]:
class TRTInference:
    def __init__(self, engine_path, output_names_mapping: dict = None, verbose=False):
        cuda.init()
        self.device_ctx = cuda.Device(0).make_context()
        self.engine_path = engine_path
        self.output_names_mapping = output_names_mapping or {}
        self.logger = trt.Logger(trt.Logger.VERBOSE) if verbose else trt.Logger(trt.Logger.INFO)
        self.engine = None
        self.load_engine()
        assert self.engine is not None, 'Failed to load TensorRT engine.'

        self.context = self.engine.create_execution_context()
        self.stream = cuda.Stream()

        self.bindings = self.get_bindings()
        self.bindings_addr = OrderedDict((n, v.ptr) for n, v in self.bindings.items())

        self.input_names = self.get_input_names()
        self.output_names = self.get_output_names()

    def load_engine(self):
        with open(self.engine_path, 'rb') as f, trt.Runtime(self.logger) as runtime:
            self.engine = runtime.deserialize_cuda_engine(f.read())

    def get_input_names(self):
        names = []
        for _, name in enumerate(self.engine):
            if self.engine.get_tensor_mode(name) == trt.TensorIOMode.INPUT:
                names.append(name)
        return names

    def get_output_names(self):
        names = []
        for _, name in enumerate(self.engine):
            if self.engine.get_tensor_mode(name) == trt.TensorIOMode.OUTPUT:
                names.append(name)
        return names

    def get_bindings(self):
        Binding = namedtuple('Binding', ('name', 'dtype', 'shape', 'data', 'ptr'))
        bindings = OrderedDict()

        for i, name in enumerate(self.engine):
            shape = self.engine.get_tensor_shape(name)
            dtype = trt.nptype(self.engine.get_tensor_dtype(name))
            if self.engine.get_tensor_mode(name) == trt.TensorIOMode.INPUT:
                data = np.random.randn(*shape).astype(dtype)
                ptr = cuda.mem_alloc(data.nbytes)
                bindings[name] = Binding(name, dtype, shape, data, ptr)
            else:
                data = cuda.pagelocked_empty(trt.volume(shape), dtype)
                ptr = cuda.mem_alloc(data.nbytes)
                bindings[name] = Binding(name, dtype, shape, data, ptr)

        return bindings

    def __call__(self, blob):
        blob = {n: np.ascontiguousarray(v) for n, v in blob.items()}
        for n in self.input_names:
            cuda.memcpy_htod_async(self.bindings_addr[n], blob[n], self.stream)

        bindings_addr = [int(v) for _, v in self.bindings_addr.items()]
        self.context.execute_async_v2(bindings=bindings_addr, stream_handle=self.stream.handle)

        outputs = {}
        for n in self.output_names:
            cuda.memcpy_dtoh_async(self.bindings[n].data, self.bindings[n].ptr, self.stream)
            o = self.bindings[n].data
            # reshape to correct output shape
            if o.shape != self.bindings[n].shape:
                o = o.reshape(self.bindings[n].shape)
            outputs[self.output_names_mapping.get(n, n)] = o

        self.stream.synchronize()

        return outputs

    def warmup(self, blob, n):
        for _ in range(n):
            _ = self(blob)

    def __del__(self):
        try:
            self.device_ctx.pop()
        except cuda.LogicError as _:
            pass

In [3]:
model = TRTInference('rtdetr_hgnetv2_x_6x_coco.trt', output_names_mapping={'tile_3.tmp_0': 'bbox_num', 'reshape2_83.tmp_0': 'bbox'}, verbose=True)

In [8]:
img = cv2.imread("soccer.jpg")
org_img = img
im_shape = np.array([[float(img.shape[0]), float(img.shape[1])]]).astype('float32')
img = cv2.resize(img, (640, 640))
scale_factor = np.array([[float(640/img.shape[0]), float(640/img.shape[1])]]).astype('float32')
img = img.astype(np.float32) / 255.0
img = np.transpose(img, [2, 0, 1])  # BGR to RGB
img = np.expand_dims(img, axis=0)  # add batch dimension
inputs_dict = {
    'im_shape': im_shape,
    'image': img,
    'scale_factor': scale_factor
}
result = model(inputs_dict)['bbox']

In [9]:
confidence_threshold = 0.5
boxes = result[result[:, 1] > confidence_threshold]
for box in boxes:
    cv2.rectangle(org_img, (int(box[2]), int(box[3])), (int(box[4]), int(box[5])), (0, 255, 0), 2)
cv2.imwrite("output/soccer_trt.jpg", org_img)

True

In [7]:
result_dicts = []
for box in boxes:
    result_dicts.append({
        'img_name': 'soccer.jpg',
        'class': 'suspect',  # example
        'confidence': box[1],
        'ymin': box[3],
        'xmin': box[2],
        'ymax': box[5],
        'xmax': box[4]
    })