In [1]:
# !python3 -m pip install pynq-dpu --no-use-pep517



In [2]:
import time
import sys
# sys.path.append("/usr/include/vart")
# sys.path.append("/usr/include/xir")
import numpy as np
import platform
import tqdm
from typing import Tuple, List, Union, Any
import pynq_dpu
import pynq

# pynq.pl_server.xrt_device._xrt_version = (2, 8, 0)

class EvalLoader:
    def __init__(self, 
                 batch_size: int = 1, 
                 npz_path: str = 'eval_data.npz') -> None:
        data = np.load(npz_path)
        self.data = data['data'].astype(np.float32) / 255
        self.targets = data['targets']
        self.batch_size = batch_size
    
    def __getitem__(self, i):
        if i >= len(self):
            raise StopIteration

        beg = min(i * self.batch_size, self.data.shape[0])
        end = min(beg + self.batch_size, self.data.shape[0])

        return self.data[beg:end, ...], self.targets[beg:end]
    
    def __len__(self):
        return self.data.shape[0] // self.batch_size


class TimeMeasurement:
    def __init__(self, context_name: str, frames: int) -> None:
        self.context_name: str = context_name
        self.frames: int = frames
        self.begin: float = None
        self.end: float = None

    def __enter__(self):
        self.begin = time.time()
        return self

    def __exit__(self, *args):
        self.end = time.time()

    @property
    def time(self) -> float:
        if self.begin is None or self.end is None:
            raise RuntimeError()
        return int(self.end - self.begin)

    @property
    def fps(self):
        return self.frames / self.time

    def __str__(self) -> str:
        t = self.time
        h = t // 60
        min = (t - h*60) // 60
        s = int(t - h*60 - min*60)
        ms = int((t - np.floor(t))*1000)

        return f"Execution time: {h}:{min}:{s}:{ms}, processed {self.frames} frames, throughput: {self.fps} fps."

    def __repr__(self) -> str:
        t = self.time
        h = t // 60
        min = (t - h*60) // 60
        s = np.floor(t - h*60 - min*60)
        ms = np.floor((t - np.floor(t))*1000)

        return f'TimeMeasurement(context="{self.context_name}","{h}:{min}:{s}:{ms}", frames={self.frames}, throughput={self.fps})'


class AccuracyMetic:
    
    def __init__(self) -> None:
        pass

    def __call__(self, y_pred: np.ndarray, y_ref: np.ndarray) -> float:
        """
        :param y_pred: array of shape (batch_size, num_of_classes) type float
        :param y_ref: array with shape (batch_size,) and type Long
        :return: scalar as accuracy metric for batch
        """
        y_pred = np.argmax(y_pred, axis=1)
        cmp = y_pred == y_ref
        # scalar value
        score  = cmp.sum() / cmp.shape[0]

        return score


class CrossEntropyLoss:
    def __init__(self) -> None:
        pass
        
    def __call__(self, 
                 y_pred: np.ndarray, 
                 y_ref: np.ndarray
                 ) -> Any:
        
        return 0.0


loader = EvalLoader()
metric = AccuracyMetic()
criterion = CrossEntropyLoss()
tm = TimeMeasurement("Evaluation on KV260", loader.batch_size * len(loader))

In [7]:
def softmax(x: np.ndarray, axis=1):
    x = np.exp(x)
    x = x / np.sum(x, axis=axis)
    return x


class NetworkDPU:
    
    def __init__(self, xmodel_path: str = 'MiniResnet_VAI.xmodel', dpu_path: str = 'dpu.bit'):
        self.ov: pynq_dpu.DpuOverlay = pynq_dpu.DpuOverlay(dpu_path, download=True)
        self.ov.load_model(xmodel_path)
        self.dpu = self.ov.runner
        print(self.ov.runner)
        inputTensors = self.dpu.get_input_tensors()
        outputTensors = self.dpu.get_output_tensors()
        # get list of shapes
        shapeIn = np.array([it.dims for it in inputTensors])
        shapeOut = np.array([ot.dims for ot in outputTensors])
        self.shapeIn = shapeIn
        self.shapeOut = shapeOut
        self.buff_in = [np.zeros(sh, np.int8, order='C') for sh in shapeIn]
        self.buff_out = [np.zeros(sh, np.int8, order='C') for sh in shapeOut]
        
        self.input_repr = [(it.get_attr('bit_width'), it.get_attr('fix_point')) for it in inputTensors]
        self.output_repr = [(ot.get_attr('bit_width'), ot.get_attr('fix_point')) for ot in outputTensors]
    
    def input_float_to_int8(self, x: np.ndarray) -> np.ndarray:
        BIT_WIDTH, PRECISION_BITS = self.input_repr[0]
        
        # int space 
        x = x * (2**PRECISION_BITS)
        x = np.floor(x)
        x = np.clip(x,-128, 127)
        x = x.astype(np.int8)
        
        return x
    
    def output_int8_to_float(self, y: np.ndarray):
        BIT_WIDTH, PRECISION_BITS = self.output_repr[0]
        PRECISION = 1 / 2**PRECISION_BITS
        y = y * PRECISION
        return y
    
    def process(self, x: np.ndarray):
        x = self.input_float_to_int8(x)
        
        # fill input buffer
        self.buff_in[0] = x
        # start DPU thread
        job_id = self.dpu.execute_async(self.buff_in, self.buff_out)
        # wait for thread end to join it
        self.dpu.wait(job_id)
        # read from output buffer
        y = self.buff_out[0]
        
        y = self.output_int8_to_float(y)
        return y
    
    def __call__(self, x: np.ndarray) -> Any:
        return self.process(x)
    
net = NetworkDPU(xmodel_path='MiniResnet_VAI.xmodel', 
                 dpu_path='dpu.bit')
        

vart::Runner@0x22d68b30


In [8]:
def evaluation(model: NetworkDPU,
               data_loader: EvalLoader,
               criterion: CrossEntropyLoss,
               metric: AccuracyMetic,
               ) -> Tuple[float, float]:
    """
    Eval pass generator data through the model.
    
    :param model: network
    :param data_generator: data loader
    :param criterion: criterion / loss two arg function
    :param metric: metric object - two arg function
    :return: loss_value, metric_value
    """
    print(f"Running on platform: {platform.platform()}, "
          f"machine: {platform.machine()}, "
          f"python_version: {platform.python_version()}, "
          f"processor: {platform.processor()}, "
          f"system: {platform.system()}, "
          )
    total_loss: float = 0.0
    total_accuracy: float = 0.0
    samples_num: int = 0
    
    for i, (X, y_ref) in tqdm.tqdm(enumerate(data_loader),):
        y_pred = model(X)
        
        # calculate loss
        loss = criterion(y_pred, y_ref)
        
        # calculate accuracy
        accuracy = metric(y_pred, y_ref)

        total_loss += loss * y_pred.shape[0]
        total_accuracy += accuracy * y_pred.shape[0]
        samples_num += y_pred.shape[0]

    if samples_num == 0:
        return 0.0, 0.0

    return total_loss / samples_num, total_accuracy / samples_num


In [9]:
with tm:
    loss, acc = evaluation(net, loader, criterion, metric)
    
print(str(tm))
print("Loss: ", loss)
print("Accuracy: ", acc)


Running on platform: Linux-5.4.0-1017-xilinx-zynqmp-aarch64-with-glibc2.29, machine: aarch64, python_version: 3.8.10, processor: aarch64, system: Linux, 


10000it [00:06, 1588.21it/s]

Execution time: 0:0:6:0, processed 10000 frames, throughput: 1666.6666666666667 fps.
Loss:  0.0
Accuracy:  0.9842



