# KV260 UnitTest Core
env build 
`pip3 install -U pip opencv-python tqdm` 
`sudo dnf install packagegroup-kv260-aibox-reid.noarch`  
`sudo dnf install kv260-dpu-benchmark.k26_kv` 
`sudo xmutil unloadapp kv260-dpu-benchmark` 
`sudo xmutil loadapp kv260-dpu-benchmark` 

**Image Read & show**

In [None]:
#input_path = <image location>
input_path = ""
import os
import cv2
import numpy as np
from matplotlib import pyplot as plt
import xir
import vart

if os.path.isfile(input_path):
    print("file exist")
    image = cv2.imread(input_path)
    image_resize = cv2.resize(image, (224, 224), interpolation=cv2.INTER_AREA)
    im_plt = cv2.cvtColor(image_resize, cv2.COLOR_BGR2RGB)
    plt.imshow(im_plt)
    plt.show()
else:
  print("file not exist")

**Camera Read & show**

In [None]:
from IPython.display import clear_output, Image, display, HTML
import time
import cv2
import base64
import threading

current_time = 0
exit = False
# 图像处理函数
def processImg(img):
    # 画出一个框
    #cv2.rectangle(img, (500, 300), (800, 400), (0, 0, 255), 5, 1, 0)
    # 上下翻转
    # img= cv2.flip(img, 0)

    # 显示FPS
    global current_time
    if current_time == 0:
        current_time = time.time()
    else:
        last_time = current_time
        current_time = time.time()
        fps = 1. / (current_time - last_time)
        text = "FPS: %d" % int(fps)
        cv2.putText(img, text , (0,100), cv2.FONT_HERSHEY_TRIPLEX, 3.65, (255, 0, 0), 2)
    
    return img

def arrayShow(img):
    _,ret = cv2.imencode('.jpg', img)
    return Image(data=ret)


video = cv2.VideoCapture(0)

def startCam():
    while(True):
        try:
            clear_output(wait=True)
            ret, frame = video.read()
            if not ret:
                break
            lines, columns, _ = frame.shape
            frame = processImg(frame)
            #frame = cv2.resize(frame, (int(columns / 2), int(lines / 2)))

            img = arrayShow(frame)
            display(img)
            # 控制帧率
            time.sleep(0.01)
            if exit is True:
                video.release()
                clear_output(wait=True)
                print("stop")
                break
        except KeyboardInterrupt:
            video.release()

t = threading.Thread(target=startCam, args=()) # 
print("start")
t.start() # 開始
#t.join() # 等待結束

In [None]:
#leave code
exit = True

**Read XModel**

In [13]:
import numpy as np
from matplotlib import pyplot as plt
import xir
import vart
import time 

class coreDPU:
    def __init__(self,model_path):
        g = xir.Graph.deserialize(model_path)
        sub = []
        root = g.get_root_subgraph()

        child_subgraph = root.toposort_child_subgraph()
        sub = [s for s in child_subgraph
              if s.has_attr("device") and s.get_attr("device").upper() == "DPU"]
        self.model = sub
        self.__createDPU()
        
    def __createDPU(self):
        self.__dpu = vart.Runner.create_runner(self.model[0],"run")
        input_tensors = self.__dpu.get_input_tensors() #得模型DPU輸入層
        output_tensors = self.__dpu.get_output_tensors() #得模型運算DPU最終層
        self.__input_dims = tuple(input_tensors[0].dims) #(1,224,224,3)
        self.__output_dims = tuple(output_tensors[0].dims) #(1,6)
    
    def runDPU(self,img) ->int:
        input_data = []
        output_data = []
        input_data = [np.empty(self.__input_dims, dtype =np.float32, order = "C")]
        output_data = [np.empty(self.__output_dims, dtype = np.float32, order = "C")]
        dpu_image = input_data[0]
        dpu_image[0,...] =  img #圖進DPU的BUFFER,

        self.__dpu.execute_async(dpu_image,output_data)
        ans = np.argmax(output_data[0]) +1#分類
        #print(output_data[0])
        #print('AQI_LEVEL:' + str(ans+1))
        return ans

In [None]:
##run DPU Demo
import cv2

image_path = "/home/petalinux/dev/testimg/3_201410301130.jpg"
model_path = "/home/petalinux/dev/KV260_4096.xmodel"

try:
    image = cv2.imread(image_path)
    #print(image)
    image_resize = cv2.resize(image, (224, 224), interpolation=cv2.INTER_AREA)
    im_plt = cv2.cvtColor(image_resize, cv2.COLOR_BGR2RGB)
    plt.imshow(im_plt)
    plt.show()
except Exception as a:
    print(a)
time1 = time.time()
DPU_Class = coreDPU(model_path)
time2 = time.time()
level = DPU_Class.runDPU(image_resize)
time3 = time.time()
level = DPU_Class.runDPU(image_resize)
print(time2-time1)
print(time3-time2)
print('AQI_LEVEL:',level)

**運行模型測試集**
需要運行Read Xmodel Class
資料集須按照Label分類放置個資料夾
image_dir
    -1
    -2
    .
    .
    -n

In [16]:
import cv2
import os
import numpy as np
from IPython.display import clear_output
from tqdm.notebook import tqdm
#if use py,import the following line
#from tqdm import tqdm, trange
image_path = "/home/petalinux/dev/testimg/"
model_path = "/home/petalinux/dev/KV260_4096.xmodel"

table = np.zeros((6,6),dtype=int)

DPU_Class = coreDPU(model_path)

for i in range(1,7):
    print()

    image_i_folder = image_path+"{num}/".format(num=i)
    for filename in tqdm(os.listdir(image_i_folder),desc="Image_Folder_{} ".format(i),ascii="#"):
        image_location = image_i_folder + filename
        image = cv2.imread(image_location)
        image_resize = cv2.resize(image, (224, 224), interpolation=cv2.INTER_AREA)
        level = DPU_Class.runDPU(image_resize) - 1
        table[level][i-1] +=1

print(table)

me_class = model_Evaluate(table)
print(me_class.getAccuracy())
print(me_class.getPrecision())
print(me_class.getRecall())






Image_Folder_1 :   0%|          | 0/246 [00:00<?, ?it/s]




Image_Folder_2 :   0%|          | 0/252 [00:00<?, ?it/s]




Image_Folder_3 :   0%|          | 0/252 [00:00<?, ?it/s]




Image_Folder_4 :   0%|          | 0/245 [00:00<?, ?it/s]




Image_Folder_5 :   0%|          | 0/30 [00:00<?, ?it/s]




Image_Folder_6 :   0%|          | 0/21 [00:00<?, ?it/s]

[[235  99   0   0   0   0]
 [ 11 105  43   9   1   7]
 [  0  19  62   4   3   0]
 [  0  27 147 231   7   0]
 [  0   2   0   1  16   1]
 [  0   0   0   0   3  13]]
0.6328871892925431
['70.36%', '59.66%', '70.45%', '56.07%', '80.00%', '81.25%']
['95.53%', '41.67%', '24.60%', '94.29%', '53.33%', '61.90%']


**記錄結果**
rol_sum = 機器辨識為該類別的結果
col_sum = 該類別資料集的總比數

In [14]:
import pandas as pd
class model_Evaluate:
    def __init__(self,table):
        df = pd.DataFrame(table)
        self.__length = len(df)
        df['row_sum']=df.apply(lambda x: x.sum(),axis=1)
        df.loc['col_sum']=df.apply(lambda x: x.sum(),axis=0)
        self.__df = df 
        
    def getAccuracy(self):
        Accuracy = 0
        TP_TN = 0 #所有類別正確數之合
        TP_TN_FP_FN = 0 #圖片總數

        for i in range(self.__length):
            TP_TN += self.__df[i][i]

        TP_TN_FP_FN = self.__df['row_sum'].values[-1]
        Accuracy = (TP_TN/TP_TN_FP_FN)
        return Accuracy
    def getPrecision(self):
        Precision = np.zeros((self.__length,),dtype=float)

        for i in range(self.__length):
            TP = self.__df[i][i]
            TP_FP = self.__df['row_sum'].values[i]
            Precision[i] = TP/TP_FP
        Precision_df = pd.DataFrame(Precision,columns = ['Precision'])
        Precision_df = Precision_df.applymap(lambda x: '%.2f%%' % (x*100))
        return Precision_df['Precision'].tolist()
    
    def getRecall(self):
        Recall = np.zeros((self.__length,),dtype=float)
        for i in range(self.__length):
            TP = self.__df[i][i]
            TP_FN = self.__df.iloc[-1].values[i]
            Recall[i] =  TP/TP_FN
        Recall_df = pd.DataFrame(Recall,columns = ['Recall'])
        Recall_df = Recall_df.applymap(lambda x: '%.2f%%' % (x*100))
        return Recall_df['Recall'].tolist()



In [None]:
df = pd.DataFrame(table)
df['row_sum']=df.apply(lambda x: x.sum(),axis=1)
df.loc['col_sum']=df.apply(lambda x: x.sum(),axis=0)
print(df)

**模型準確率**
所有類別分類正確之合 / 所有類別圖片總數

In [None]:
##模型準確率
Accuracy = 0
TP_TN = 0 #所有類別正確數之合
TP_TN_FP_FN = 0 #圖片總數

for i in range(6):
    TP_TN += df[i][i]
    
TP_TN_FP_FN = df['row_sum'].values[-1]
Accuracy = (TP_TN/TP_TN_FP_FN)
print("Accuracy: {:2.3%} ".format(Accuracy))

**模型精確率**
該類別分類正確 / 該類別模型辨識總數

In [None]:
#模型精確率
Precision = np.zeros((6,),dtype=float)

for i in range(6):
    TP = df[i][i]
    TP_FP = df['row_sum'].values[i]
    Precision[i] = TP/TP_FP
Precision_df = pd.DataFrame(Precision,columns = ['Precision'])
Precision_df = Precision_df.applymap(lambda x: '%.2f%%' % (x*100))
#print(Precision_df)


**模型召回率**
該類別分類正確 / 該類別圖片總數

In [None]:
#模型召回率
Recall = np.zeros((6,),dtype=float)
for i in range(6):
    TP = df[i][i]
    TP_FN = df.iloc[-1].values[i]
    Recall[i] =  TP/TP_FN
Recall_df = pd.DataFrame(Recall,columns = ['Recall'])
Recall_df = Recall_df.applymap(lambda x: '%.2f%%' % (x*100))
#print(Recall_df)

In [None]:
model_PR = pd.merge(Precision_df,Recall_df, right_index = True,left_index = True)
print(model_PR)