In [8]:
import mxnet as mx
import cv2 as cv
import numpy as np
import os
from PIL import Image
import math
from collections import namedtuple
from mxnet.contrib.onnx import import_model
import cityscapes_labels


In [9]:
def preprocess(im):
    # Convert to float32 for further calculations
    rgb_mean = cv.mean(im)
    result_shape = [im.shape[0],im.shape[1]]
    test_img = im.astype(np.float32)
    # Extrapolate image with a small border in order obtain an accurate reshaped image after DUC layer
    #first we will get the h and w of the im
    test_shape = [im.shape[0],im.shape[1]]
    #Determine the cell_shapes by taking each dimension in test_shape, dividing it by 8, and then rounding up to the nearest multiple of 8. This step is likely related to some specific requirements of the subsequent processing.
    cell_shapes = [math.ceil(l / 8)*8 for l in test_shape]
    #Extend the border of the image (test_img) using cv.copyMakeBorder to match the size specified by cell_shapes. This step adds a border to the image if the calculated size is larger than the original image size, using a constant value (rgb_mean) to fill the border pixels.
    test_img = cv.copyMakeBorder(test_img, 0, max(0, int(cell_shapes[0]) - im.shape[0]), 0, max(0, int(cell_shapes[1]) - im.shape[1]), cv.BORDER_CONSTANT, value=rgb_mean)
    #Transpose the image dimensions using np.transpose to change the order of the axes. It converts the shape from (height, width, channels) to (channels, height, width).
    test_img = np.transpose(test_img, (2, 0, 1))
    # Subtract the mean RGB values (rgb_mean) from each channel of the image. It iterates over each channel and subtracts the corresponding mean value.
    for i in range(3):
        test_img[i] -= rgb_mean[i]
    #Expand the dimensions of the image using np.expand_dims to add an extra dimension at the beginning. This is typically done to match the input shape expected by the subsequent processing steps.
    test_img = np.expand_dims(test_img, axis=0)
    # Convert the image to an mx.ndarray.array using mx.ndarray.array(test_img). This step likely converts the image to a format compatible with the MXNet deep learning framework.
    test_img = mx.ndarray.array(test_img)
    return test_img

In [10]:

def get_palette():
    # Get train id to color mappings from file
    trainId2colors = {label.trainId: label.color for label in cityscapes_labels.labels}
    # Prepare and return palette
    palette = [0] * 256 * 3
    for trainId in trainId2colors:
        colors = trainId2colors[trainId]
        if trainId == 255:
            colors = (0, 0, 0)
        for i in range(3):
            palette[trainId * 3 + i] = colors[i]
    return palette

def colorize(labels):
    # Generate colorized image from output labels and color palette
    result_img = Image.fromarray(labels).convert('P')
    result_img.putpalette(get_palette())
    return np.array(result_img.convert('RGB'))

def predict(imgs,im):
    # Get input and output dimensions
    rgb_mean = cv.mean(im)
    result_shape = [im.shape[0],im.shape[1]]
    result_height, result_width = result_shape
    _, _, img_height, img_width = imgs.shape
    # Set downsampling rate
    ds_rate = 8
    # Set cell width
    cell_width = 2
    # Number of output label classes
    label_num = 19
    
    # Perform forward pass
    batch = namedtuple('Batch', ['data'])
    mod.forward(batch([imgs]), is_train=False)
    labels = mod.get_outputs()[0].asnumpy().squeeze()

    # Re-arrange output
    test_width = int((int(img_width) / ds_rate) * ds_rate)
    test_height = int((int(img_height) / ds_rate) * ds_rate)
    feat_width = int(test_width / ds_rate)
    feat_height = int(test_height / ds_rate)
    labels = labels.reshape((label_num, 4, 4, feat_height, feat_width))
    labels = np.transpose(labels, (0, 3, 1, 4, 2))
    labels = labels.reshape((label_num, int(test_height / cell_width), int(test_width / cell_width)))

    labels = labels[:, :int(img_height / cell_width), :int(img_width / cell_width)]
    labels = np.transpose(labels, [1, 2, 0])
    labels = cv.resize(labels, (result_width, result_height), interpolation=cv.INTER_LINEAR)
    labels = np.transpose(labels, [2, 0, 1])
    
    # Get softmax output
    softmax = labels
    
    # Get classification labels
    results = np.argmax(labels, axis=0).astype(np.uint8)
    raw_labels = results

    # Compute confidence score
    confidence = float(np.max(softmax, axis=0).mean())

    # Generate segmented image
    result_img = Image.fromarray(colorize(raw_labels)).resize(result_shape[::-1])
    
    # Generate blended image
    blended_img = Image.fromarray(cv.addWeighted(im[:, :, ::-1], 0.5, np.array(result_img), 0.5, 0))

    return confidence, result_img, blended_img, raw_labels

In [11]:
def get_model(ctx, model_path,im):
    # Import ONNX model into MXNet symbols and params
    sym, arg, aux = import_model(model_path)
    # Define network module
    mod = mx.mod.Module(symbol=sym, data_names=['data'], context=ctx, label_names=None)
    # Bind parameters to the network
    mod.bind(for_training=False, data_shapes=[('data', (1, 3, im.shape[0], im.shape[1]))], label_shapes=mod._label_shapes)
    mod.set_params(arg_params=arg, aux_params=aux, allow_missing=True, allow_extra=True)
    return mod

In [12]:
# Determine and set context
if len(mx.test_utils.list_gpus())==0:
    ctx = mx.cpu()
else:
    ctx = mx.gpu(0)

In [13]:
cap = cv.VideoCapture('video.mp4')
ret, frame = cap.read()
im = frame[:, :, ::-1]

In [14]:
# Load ONNX model
mod = get_model(ctx, 'ResNet101_DUC_HDC.onnx',im)

Calling mxnet.contrib.onnx.import_model...
Please be advised that importing ONNX models into MXNet is going to be deprecated in the upcoming MXNet v1.10 release. The following apis will be deleted: mxnet.contrib.onnx.import_model/get_model_metadata/import_to_gluon.


[00:44:25] ../src/executor/graph_executor.cc:1991: Subgraph backend MKLDNN is activated.


In [None]:
# Download test video
# mx.test_utils.download('https://example.com/video.mp4', fname='video.mp4')

# Read video and initialize variables
cap = cv.VideoCapture('video.mp4')
frame_count = 0
frame_skip = 5

# Iterate over video frames
while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break

    # Apply frame skipping logic
    frame_count += 1
    if frame_count % frame_skip != 0:
        continue

    # Perform object detection on the frame
    im = frame[:, :, ::-1]
    pre = preprocess(im)
    conf, result_img, blended_img, raw = predict(pre,im)

    # Display or save the results as desired
    cv.imshow('Result', np.array(result_img))
    cv.imshow('Blended', np.array(blended_img))
    
    if cv.waitKey(1) & 0xFF == ord('q'):
        break

# Release the video capture and close windows
cap.release()
cv.destroyAllWindows()