# Models inference speed test

Scripts for measure inference tests for 480x480 pxl tile



In [None]:
from curses import wrapper
%load_ext autoreload
%autoreload 2

In [None]:
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = 'all'

import warnings
warnings.simplefilter('ignore')

from os import path
import sys
sys.path.append(path.abspath('..'))

In [None]:
import torch
import onnxruntime as ort

import typing as tp
from typing import Any, Tuple
import numpy as np
import time
import cv2

from mmdet.apis import init_detector, inference_detector

In [None]:
def benchmark(
    model: Any,
    input_shape: Tuple[int] = (1, 1, 224, 224),
    nwarmup: int = 50,
    nruns: int = 10000,  # 2000
    print_step: int = 1000,  # 500
    prefiks: str = '',
):
    input_data = np.random.rand(*input_shape).astype(np.float32)

    print("Warm up ...")
    for _ in range(nwarmup):
        input_data = np.random.rand(*input_shape).astype(np.float32)
        features = model.predict(input_data)

    print("Start timing ...")
    timings = []
    for i in range(1, nruns + 1):
        input_data = np.random.rand(*input_shape).astype(np.float32)
        start_time = time.time()
        features = model.predict(input_data)
        end_time = time.time()
        timings.append(end_time - start_time)
        if i % print_step == 0:
            print(
                f'Iteration {i}/{nruns}, avg batch time {np.mean(timings) * 1000:.2f} ± {np.std(timings) * 1000:.2f} ms.'
            )

    print(f'Input shape: {input_data.shape}')
    print(f'{prefiks} Average throughput: {input_shape[0] / np.mean(timings):.2f} images/second')

In [None]:
class MMdetWrapper:
    def __init__(self, model: torch.nn.Module):
        self.model = model
        self.device = device

    def predict(self, input_data: np.ndarray) -> np.ndarray:
        result = inference_detector(model, input_data[0])
        return result


class ONNXRuntimeWrapper:
    def __init__(self, ort_session: Any):
        self.session = ort_session
        self.input_name = [input_.name for input_ in ort_session.get_inputs()][0]

    def predict(self, input_data: np.ndarray) -> np.ndarray:
        return self.session.run(None, {self.input_name: input_data})[0]

In [None]:
print(torch.cuda.device_count())
print(torch.cuda.get_device_name(0))
print(torch.cuda.get_device_name(1))

In [None]:
DEVICES = ['cpu']
ONNX_PROVIDERS = ['CPUExecutionProvider']

if torch.cuda.device_count() > 1:
    ONNX_PROVIDERS += [
        ('CUDAExecutionProvider', {f'device_id': cuda_i,}) for cuda_i in range(torch.cuda.device_count())
    ]
    DEVICES += [f'cuda:{cuda_i}' for cuda_i in range(torch.cuda.device_count())]
else:
    DEVICES = ['cuda:0', 'cpu']
    ONNX_PROVIDERS = [
        'CUDAExecutionProvider',
        'CPUExecutionProvider',
    ]

config_files = ['../src/mmdet_maskrcnn.py', '../src/mmdet_mask2former.py', ]
checkpoint_files = ['../models/MaskRCNN-ResNet50.pth', '../models/Mask2Former-ResNet50.pth', ]

ONNX_MODEL_NAMES = [
    '../models/YOLOv11m-seg.onnx',
    '../models/UNet-MobileNetV3-large-075.onnx',
    '../models/UNet-ResNet50.onnx',
    '../models/FPN-ResNet50.onnx',
    '../models/MAnet-ResNet50.onnx',
    '../models/MaskRCNN_MFD.onnx',
]

NRUNS=2000
STEP=500
BATCH_SIZE = 1

In [None]:
ONNX_PROVIDERS

## Evaluate models

In [None]:
! lspci | grep -i nvidia
! lscpu | grep -i Model

In [None]:
for model_i, checkpoint_file in enumerate(checkpoint_files):
    for device in DEVICES:
        # init a detector cuda: 1159 Mb, 1161
        print(f"{device}: {checkpoint_file}")
        model = init_detector(config_files[model_i], checkpoint_file, device=device)
        mmdet_wrapper = MMdetWrapper(model)
        benchmark(mmdet_wrapper, (BATCH_SIZE, 3, 480, 480), nruns=NRUNS, print_step=STEP)
        print()

In [None]:
for onnx_model_path in ONNX_MODEL_NAMES:
    for provider in ONNX_PROVIDERS:
        # init a detector cuda: 1159 Mb, 1161
        print(f"{provider}: {onnx_model_path}")
        
        ort_session = ort.InferenceSession(
            onnx_model_path,                       
            providers=[provider]                      
        )
        ort_wrapper = ONNXRuntimeWrapper(ort_session)
        benchmark(ort_wrapper, (BATCH_SIZE, 3, 480, 480), nruns=NRUNS, print_step=STEP, prefiks=f"{provider}: {onnx_model_path}")
        print()

## Evaluate full pipeline

In [None]:
def benchmark_all(
    model: Any,
    input_data: Any,
    nwarmup: int = 50,
    nruns: int = 10000,  # 2000
    print_step: int = 1000,  # 500
):
    print("Warm up ...")
    for _ in range(nwarmup):
        boxes, masks = model.predict_results(input_data)

    print("Start timing ...")
    timings = []
    boxes_num = []
    for i in range(1, nruns + 1):
        start_time = time.time()
        boxes, masks = model.predict_results(input_data)
        end_time = time.time()
        timings.append(end_time - start_time)
        boxes_num.append(len(list(boxes)))
        if i % print_step == 0:
            print(
                f'Iteration {i}/{nruns}, avg batch time {np.mean(timings) * 1000:.2f} ± {np.std(timings) * 1000:.2f} ms.'
            )

    print(f'Input shape: {input_data.shape}')
    print(f'Predicted {np.mean(boxes_num):.2f} ± {np.std(boxes_num):.2f} boxes')
    print(f'Average throughput: {BATCH_SIZE / np.mean(timings):.2f} images/second')


def nms_pytorch(P: torch.tensor ,thresh_iou: float):
    """
    Apply non-maximum suppression to avoid detecting too many
    overlapping bounding boxes for a given object.
    Args:
        boxes: (tensor) The location preds for the image 
            along with the class predscores, Shape: [num_boxes,5].
        thresh_iou: (float) The overlap thresh for suppressing unnecessary boxes.
    Returns:
        A list of filtered boxes, Shape: [ , 5]
    """
 
    # we extract coordinates for every 
    # prediction box present in P
    x1 = P[:, 0]
    y1 = P[:, 1]
    x2 = P[:, 2]
    y2 = P[:, 3]
 
    # we extract the confidence scores as well
    scores = P[:, 4]
 
    # calculate area of every block in P
    areas = (x2 - x1) * (y2 - y1)
     
    # sort the prediction boxes in P
    # according to their confidence scores
    order = scores.argsort()
 
    # initialise an empty list for 
    # filtered prediction boxes
    keep = []
    while len(order) > 0:
         
        # extract the index of the 
        # prediction with highest score
        # we call this prediction S
        idx = order[-1]
 
        # push S in filtered predictions list
        keep.append(P[idx])
 
        # remove S from P
        order = order[:-1]
 
        # sanity check
        if len(order) == 0:
            break
         
        # select coordinates of BBoxes according to 
        # the indices in order
        xx1 = torch.index_select(x1,dim = 0, index = order)
        xx2 = torch.index_select(x2,dim = 0, index = order)
        yy1 = torch.index_select(y1,dim = 0, index = order)
        yy2 = torch.index_select(y2,dim = 0, index = order)
 
        # find the coordinates of the intersection boxes
        xx1 = torch.max(xx1, x1[idx])
        yy1 = torch.max(yy1, y1[idx])
        xx2 = torch.min(xx2, x2[idx])
        yy2 = torch.min(yy2, y2[idx])
 
        # find height and width of the intersection boxes
        w = xx2 - xx1
        h = yy2 - yy1
         
        # take max with 0.0 to avoid negative w and h
        # due to non-overlapping boxes
        w = torch.clamp(w, min=0.0)
        h = torch.clamp(h, min=0.0)
 
        # find the intersection area
        inter = w*h
 
        # find the areas of BBoxes according the indices in order
        rem_areas = torch.index_select(areas, dim = 0, index = order) 
 
        # find the union of every prediction T in P
        # with the prediction S
        # Note that areas[idx] represents area of S
        union = (rem_areas - inter) + areas[idx]
         
        # find the IoU of every prediction in P with S
        IoU = inter / union
 
        # keep the boxes with IoU less than thresh_iou
        mask = IoU < thresh_iou
        order = order[mask]
     
    return keep


def onnx_preprocessing(
    image: np.ndarray,
    image_size: tp.Tuple[int, int] = (224, 224),
) -> np.ndarray:
    # resize
    image = cv2.resize(image.copy(), image_size, interpolation=cv2.INTER_LINEAR)

    # normalize
    mean = np.array((0.485, 0.456, 0.406), dtype=np.float32) * 255.0
    std = np.array((0.229, 0.224, 0.225), dtype=np.float32) * 255.0
    denominator = np.reciprocal(std, dtype=np.float32)
    image = image.astype(np.float32)
    image -= mean
    image *= denominator

    # transpose
    image = image.transpose((2, 0, 1))[None]
    return image


def intersection(box1,box2):
    box1_x1,box1_y1,box1_x2,box1_y2 = box1[:4]
    box2_x1,box2_y1,box2_x2,box2_y2 = box2[:4]
    x1 = max(box1_x1,box2_x1)
    y1 = max(box1_y1,box2_y1)
    x2 = min(box1_x2,box2_x2)
    y2 = min(box1_y2,box2_y2)
    return (x2-x1)*(y2-y1) 


def union(box1,box2):
    box1_x1,box1_y1,box1_x2,box1_y2 = box1[:4]
    box2_x1,box2_y1,box2_x2,box2_y2 = box2[:4]
    box1_area = (box1_x2-box1_x1)*(box1_y2-box1_y1)
    box2_area = (box2_x2-box2_x1)*(box2_y2-box2_y1)
    return box1_area + box2_area - intersection(box1,box2)


def iou(box1,box2):
    return intersection(box1,box2)/union(box1,box2)


def sigmoid(z):
    return 1 / (1 + np.exp(-z))

In [None]:
class MMdetWrapper:
    def __init__(self):
        pass
        
    def set_session(self, model: torch.nn.Module):
        self.model = model

    def predict_results(self, input_data: np.ndarray) -> tuple:
        result = inference_detector(model, [input_data])[0]
        pred_boxes = result.pred_instances.bboxes.detach().cpu().numpy()
        pred_scores = result.pred_instances.scores.detach().cpu().numpy()
        pred_masks = result.pred_instances.masks.detach().cpu().numpy()
        
        pred_boxes = pred_boxes[pred_scores >= 0.7]
        pred_masks = pred_masks[pred_scores >= 0.7]
        return pred_boxes, pred_masks


class ONNXRuntimeWrapperMaskRCNN:
    def __init__(self):
        pass
    
    def set_session(self, ort_session: Any):
        self.session = ort_session
        self.input_name = [input_.name for input_ in ort_session.get_inputs()][0]

    def predict_results(self, input_data: np.ndarray) -> tuple:
        boxes_scores, labels, masks = self.session.run(None, {self.input_name: input_data})
        boxes = boxes_scores[0,:,:4]  # raw bounding boxes
        scores = boxes_scores[0,:,4]   # scores
        labels = labels[0,:]           # raw labels
        masks = np.transpose(masks, [1, 0, 2, 3])
        return boxes, masks


class ONNXRuntimeWrapperYOLO:
    def __init__(self):
        pass
    
    def set_session(self, ort_session: Any):
        self.session = ort_session
        self.input_name = [input_.name for input_ in ort_session.get_inputs()][0]

    def predict_results(self, input_data: np.ndarray) -> tuple:
        prediction, mask_info = self.session.run(None, {self.input_name: input_data})
        number_of_classes = 1
        mask_index = 4 + number_of_classes
        prediction = prediction[0].transpose()
        pass_boxes_mask = prediction[:, 4:mask_index] >= 0.7
        pass_data = []
        for xi, x in enumerate(prediction):  # image index, image inference
            if pass_boxes_mask[xi][0]:
                xc, yc, w, h, prob = x[:5]
                x1 = xc - w / 2
                y1 = yc - h / 2
                x2 = xc + w / 2
                y2 = yc + h / 2
                masks = x[5:]
                pass_data.append(np.array([x1, y1, x2, y2, prob] + list(masks)))
    
        pass_data_result = []
        pass_data.sort(key=lambda x: x[4], reverse=True)
        while len(pass_data) > 0:
            pass_data_result.append(pass_data[0])
            pass_data = [
                box for box in pass_data if iou(
                    box[:4], pass_data[0][:4]
                ) < 0.7
            ]
    
        pass_results = np.array(pass_data_result)
        masks = pass_results[:, 5:]
        boxes = pass_results[:, :5]
    
        mask_len, mask_h, mask_w = mask_info.shape[1:]
        output1 = mask_info[0].reshape(
            mask_len, mask_h * mask_w
        )
        masks = masks @ output1  # (n, 32) (32, 25600)
        return boxes, masks


class ONNXRuntimeWrapperSemantic:
    def __init__(self):
        pass
    
    def set_session(self, ort_session: Any):
        self.session = ort_session
        self.input_name = [input_.name for input_ in ort_session.get_inputs()][0]

    def predict_results(self, input_data: np.ndarray) -> tuple:
        ort_outputs = self.session.run(None, {self.input_name: input_data})[0]
        pr_mask = ort_outputs.squeeze().round()
        pr_mask = np.exp(-np.logaddexp(0, -pr_mask))  # sigmoid
        pr_mask[pr_mask >= 0.7] = 255
        pr_mask[pr_mask < 0.7] = 0
        pr_mask = pr_mask.astype(np.uint8)
    
        cnts, hierarchy = cv2.findContours(
                pr_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_NONE
            )
        boxes = []
        for cnt in cnts:
            x1,y1,w,h = cv2.boundingRect(cnt)
            boxes.append([int(x1), int(y1), int(x1+w), int(y1+h)])
        return boxes, pr_mask

In [None]:
test_image = cv2.imread('../IoU_test/images/50-NH4NO3_prill.jpg')
granules_number = 0
test_image = cv2.cvtColor(test_image, cv2.COLOR_BGR2RGB)

onnx_input = onnx_preprocessing(
    test_image,
    image_size=(480, 480)
)
onnx_input = np.concatenate([onnx_input] * BATCH_SIZE)

MODELS = {
    '../models/UNet-MobileNetV3-large-075.onnx': {
        'input': onnx_input,
        'wrapper': ONNXRuntimeWrapperSemantic(),
        'devices': ONNX_PROVIDERS
    },
    '../models/UNet-ResNet50.onnx': {
        'input': onnx_input,
        'wrapper': ONNXRuntimeWrapperSemantic(),
        'devices': ONNX_PROVIDERS
    },
    '../models/FPN-ResNet50.onnx': {
        'input': onnx_input,
        'wrapper': ONNXRuntimeWrapperSemantic(),
        'devices': ONNX_PROVIDERS
    },
    '../models/MAnet-ResNet50.onnx': {
        'input': onnx_input,
        'wrapper': ONNXRuntimeWrapperSemantic(),
        'devices': ONNX_PROVIDERS
    },
    '../models/MaskRCNN-ResNet50.onnx': {
        'input': onnx_input,
        'wrapper': ONNXRuntimeWrapperMaskRCNN(),
        'devices': ONNX_PROVIDERS
    },
    '../models/YOLOv11m-seg.onnx': {
        'input': onnx_input,
        'wrapper': ONNXRuntimeWrapperYOLO(),
        'devices': ONNX_PROVIDERS
    },
    '../models/Mask2Former-ResNet50.pth': {
        'input': test_image,
        'wrapper': MMdetWrapper(),
        'devices': DEVICES
    },
}

In [None]:
for model_path in MODELS:
    print(model_path)
    for device in MODELS[model_path]['devices']:
        print(f'\t {device}:')
        if 'Mask2Former' in model_path:
            print(f'\t not ONNX')
            MODELS[model_path]['wrapper'].set_session(
                init_detector('../src/mmdet_mask2former.py', '../models/Mask2Former-ResNet50.pth', device=device)
            )
        else:
            ort_session = ort.InferenceSession(
                model_path,
                providers=[device]
            )
            MODELS[model_path]['wrapper'].set_session(ort_session)
        benchmark_all(
            MODELS[model_path]['wrapper'], 
            MODELS[model_path]['input'], 
            nruns=NRUNS,
            print_step=STEP
        )
    print()