In [1]:
from ctypes import *
from typing import List
import cv2
import numpy as np
import vart
import os
import pathlib
import xir
import threading
import time
import sys
import argparse
from PIL import Image

In [2]:
def postprocess_and_save(output_tensor, save_path='output_image.png'):
    # Normalize the output tensor between 0 and 255
    normalized_output = ((output_tensor - output_tensor.min()) / (output_tensor.max() - output_tensor.min())) * 255
    normalized_output = normalized_output.astype(np.uint8) # Convert to byte tensor for image saving

    # Convert the byte tensor to a numpy array
    np_output = normalized_output

    # Convert the numpy array to a PIL image
    pil_image = Image.fromarray(np_output)#.transpose((1, 2, 0)))

    # Save the PIL image as a PNG
    pil_image.save(save_path)

# Example usage:
# Assuming `output_tensor` is your output tensor from the neural network
# postprocess_and_save(output_tensor, 'output_image.png')


In [3]:
def preprocess_fn(image):
    # Resize to (256, 256)
    print(image)
    resized_image = np.array(Image.open(image).resize((256, 256)))
    
    # Convert to tensor
    tensor_image = np.transpose(resized_image, (2, 0, 1))  # Convert to channels-first format
    tensor_image = tensor_image / 255.0  # Normalize to [0, 1]
    tensor_image = (tensor_image - np.array([0.485, 0.456, 0.406]).reshape((3, 1, 1))) / np.array([0.229, 0.224, 0.225]).reshape((3, 1, 1))  # Normalize
    
    tensor_image = np.transpose(tensor_image, (1, 2, 0))  # Convert to channels-first format
    return tensor_image

In [4]:
divider='---------------------------'

def get_child_subgraph_dpu(graph: "Graph") -> List["Subgraph"]:
    assert graph is not None, "'graph' should not be None."
    root_subgraph = graph.get_root_subgraph()
    assert (root_subgraph is not None), "Failed to get root subgraph of input Graph object."
    if root_subgraph.is_leaf:
        return []
    child_subgraphs = root_subgraph.toposort_child_subgraph()
    assert child_subgraphs is not None and len(child_subgraphs) > 0
    return [
        cs
        for cs in child_subgraphs
        if cs.has_attr("device") and cs.get_attr("device").upper() == "DPU"
    ]

In [5]:
def runDPU(id,start,dpu,img):

    '''get tensor'''
    inputTensors = dpu.get_input_tensors()
    outputTensors = dpu.get_output_tensors()
    input_ndim = tuple(inputTensors[0].dims)
    output_ndim = tuple(outputTensors[0].dims)

    batchSize = input_ndim[0]
    n_of_images = len(img)
    count = 0
    write_index = start
    while count < n_of_images:
        if (count+batchSize<=n_of_images):
            runSize = batchSize
        else:
            runSize=n_of_images-count

        '''prepare batch input/output '''
        outputData = []
        inputData = []
        inputData = [np.empty(input_ndim, dtype=np.float32, order="C")]
        outputData = [np.empty(output_ndim, dtype=np.float32, order="C")]

        '''init input image to input buffer '''
        for j in range(runSize):
            imageRun = inputData[0]
            imageRun[j, ...] = img[(count + j) % n_of_images].reshape(input_ndim[1:])

        '''run with batch '''
        job_id = dpu.execute_async(inputData,outputData)
        dpu.wait(job_id)

        '''store output vectors '''
        for j in range(runSize):
            out_q[write_index] = outputData[0][j]
            write_index += 1
        count = count + runSize


def app(image_dir,threads,model):

    listimage=os.listdir(image_dir)
    runTotal = len(listimage)

    global out_q
    out_q = [None] * runTotal

    g = xir.Graph.deserialize(model)
    subgraphs = get_child_subgraph_dpu(g)
    all_dpu_runners = []
    for i in range(threads):
        all_dpu_runners.append(vart.Runner.create_runner(subgraphs[0], "run"))

    ''' preprocess images '''
    print('Pre-processing',runTotal,'images...')
    img = []
    for i in range(runTotal):
        path = os.path.join(image_dir,listimage[i])
        img.append(preprocess_fn(path))

    '''run threads '''
    print('Starting',threads,'threads...')
    threadAll = []
    start=0
    for i in range(threads):
        if (i==threads-1):
            end = len(img)
        else:
            end = start+(len(img)//threads)
        in_q = img[start:end]
        t1 = threading.Thread(target=runDPU, args=(i,start,all_dpu_runners[i], in_q))
        threadAll.append(t1)
        start=end

    time1 = time.time()
    for x in threadAll:
        x.start()
    for x in threadAll:
        x.join()
    time2 = time.time()
    timetotal = time2 - time1

    fps = float(runTotal / timetotal)
    print(divider)
    print("Throughput=%.2f fps, total frames = %.0f, time=%.4f seconds" %(fps, runTotal, timetotal))

    #abslosses = []
    for i in range(len(out_q)):
        postprocess_and_save(out_q[i], save_path='output_image.png')
        #ground_truth, _ = listimage[i].split('_',1)
        #l = np.mean(np.abs(out_q[i]-ground_truth))
        #abslosses.append(l)

    #print('Loss mean: %.4f' %(np.array(abslosses).mean()))
    print(divider)

    return

In [9]:
image_dir='images'
threads=1
model='/home/ubuntu/Downloads/target_kv260/fcn8/model/fcn8.xmodel'

print(divider)

app(image_dir,threads,model)

---------------------------
Pre-processing 4 images...
images/birr copy.png
images/birr.png
images/birr copy 2.png
images/birr copy 3.png
Starting 1 threads...
---------------------------
Throughput=38.53 fps, total frames = 4, time=0.1038 seconds
---------------------------
