In [1]:
import time
import sys
import numpy as np
import platform
import tqdm
from typing import Tuple, List, Union, Any
import pynq_dpu
import pynq



### Jest to ostatnia część, w której przetestujemy nasz nauczony model po kwantyzacji na docelowej platformie.

Definiujemy na początku klasę `TimeMeasurement`. Jest to dokładnie ta sama klasa, którą wykorzystywaliśmy w dwóch poprzednich etapach. Pozwoli ona nam na sprawdzenie czasu przetwarzania danych.

Dodatkowo, tworzymy klasę `EvalLoader`. Pozwoli nam ona na odczytanie danych zapisanych w formacie `.npz`, które przygotowaliśmy w poprzedniej części. Standardowo, ustalony jest rozmiar batcha na 1, a nazwa pliku to `eval_MNIST.npz`. Dostosuj jedynie nazwę, jeżeli się nie zgadza.

In [2]:
class EvalLoader:
    def __init__(self, 
                 batch_size: int = 1, 
                 npz_path: str = 'eval_MNIST.npz') -> None:
        data = np.load(npz_path)
        self.data = data['data'].astype(np.float32)
        self.targets = data['targets']
        self.batch_size = batch_size
    
    def __getitem__(self, i):
        if i >= len(self):
            raise StopIteration

        beg = min(i * self.batch_size, self.data.shape[0])
        end = min(beg + self.batch_size, self.data.shape[0])

        return self.data[beg:end, ...], self.targets[beg:end]
    
    def __len__(self):
        return self.data.shape[0] // self.batch_size


class TimeMeasurement:
    def __init__(self, context_name: str, frames: int) -> None:
        self.context_name: str = context_name
        self.frames: int = frames
        self.begin: float = None
        self.end: float = None

    def __enter__(self):
        self.begin = time.time()
        return self

    def __exit__(self, *args):
        self.end = time.time()

    @property
    def time(self) -> float:
        if self.begin is None or self.end is None:
            raise RuntimeError()
        return int(self.end - self.begin)

    @property
    def fps(self):
        return self.frames / self.time

    def __str__(self) -> str:
        t = self.time
        h = t // 60
        min = (t - h*60) // 60
        s = int(t - h*60 - min*60)
        ms = int((t - np.floor(t))*1000)

        return f"Execution time: {h}:{min}:{s}:{ms}, processed {self.frames} frames, throughput: {self.fps} fps."

    def __repr__(self) -> str:
        t = self.time
        h = t // 60
        min = (t - h*60) // 60
        s = np.floor(t - h*60 - min*60)
        ms = np.floor((t - np.floor(t))*1000)

        return f'TimeMeasurement(context="{self.context_name}","{h}:{min}:{s}:{ms}", frames={self.frames}, throughput={self.fps})'

Definiujemy metrykę `Accuracy`. Jest ona taka sama jak w poprzednich częściach, ale możesz zdefiniować ją sam. 

Z wartości `y_pred` wyznacz wartości maksymalne funkcją `np.argmax`. Zrób to względem `axis=1`. Następnie porównaj uzyskany wektor z `y_ref`. Wynik porównania wpisz do zmiennej `cmp`. Na koniec wyznacz wartość `score`, która jest równa zsumowanej wartości wektora `cmp` (.sum()) podzielona przez długość wektora `cmp`. 

In [3]:
class AccuracyMetric:
    
    def __init__(self) -> None:
        pass

    def __call__(self, y_pred: np.ndarray, y_ref: np.ndarray) -> float:
        y_pred = ... #TODO
        cmp = ... #TODO
        score  = ... #TODO

        return score

Tworzymy klasę `CrossEntropyLoss`. Nie jest ona wymagana i może ona standardowo zwracać wartość 0. Jeżeli jednak ktoś byłby zainteresowany `dodatkowym zadaniem`, można ją zaimplementować na podstawie dokumentacji PyTorch lub internetu :).

In [4]:
class CrossEntropyLoss:
    def __init__(self) -> None:
        pass
        
    def __call__(self, 
                 y_pred: np.ndarray, 
                 y_ref: np.ndarray
                 ) -> Any:
        
        return 0.0

Inicjalizujemy generator danych, metrykę, funkcję straty.

In [5]:
loader = ... #TODO
metric = ... #TODO
criterion = ... #TODO
tm = TimeMeasurement("Evaluation on KV260", loader.batch_size * len(loader))

Zdefiniuj funkcję `softmax`. Zobacz jak ona działa w dokumentacji PyTorch lub w internecie.

In [6]:
def softmax(x: np.ndarray, axis=1):
    #TODO
    return ...

Tworzymy klasę NetworkDPU. Podczas inicjalizacji przyjmuje skompilowany model `MiniResNet_qu.xmodel` oraz ścieżkę do pliku `dpu.bit`. Pozostałe pliki `dpu` muszą znajdować się w tym samym folderze i muszą się tak samo nazywać!

Funkcja `input_float_to_int8` konwertuje dane z przestrzeni `float` do `int8`.

Funkcja `output_int8_to_float` wykonuje odwrotną operację.

Funkcja `process` wykonuje przetwarzanie danych. Zaimplementuj ją, wykonując kolejne operacje:
1. Przekonwertuj daną wejściową `x` z przestrzeni `float` do przestrzeni `int`,
2. Wpisz przekonwertowaną daną do zerowego indeksu bufowa wejściowego `buff_in`,
3. Wywołaj funkcję `self.dpu.execute_async`, gdzie jako pierwszy parametr podajesz bufor wejściowy, a jako drugi parametr bufor wyjściowy. Funkcja zwróci indeks - zapisz go do zmiennej `job_id`,
4. Poczekaj aż wątek obliczenia się wykona - wykorzystaj funkcję `self.dpu.wait`, gdzie parametrem jest indeks `job_id`.
5. Odczytaj pierwszą wartość z bufora `buff_out`. Przypisz ją do zmiennej `y`,
6. Przekonwertuj zmienną `y` do typu `float`,
7. Wykonaj funkcję `softmax` na zmiennej `y` i ją zwróć.

In [7]:
class NetworkDPU:
    
    def __init__(self, xmodel_path: str = 'MiniResNet_qu.xmodel', dpu_path: str = 'dpu.bit'):
        self.ov: pynq_dpu.DpuOverlay = pynq_dpu.DpuOverlay(dpu_path, download=True)
        self.ov.load_model(xmodel_path)
        self.dpu = self.ov.runner
        print(self.ov.runner)
        inputTensors = self.dpu.get_input_tensors()
        outputTensors = self.dpu.get_output_tensors()
        # get list of shapes
        shapeIn = np.array([it.dims for it in inputTensors])
        shapeOut = np.array([ot.dims for ot in outputTensors])
        self.shapeIn = shapeIn
        self.shapeOut = shapeOut
        self.buff_in = [np.zeros(sh, np.int8, order='C') for sh in shapeIn]
        self.buff_out = [np.zeros(sh, np.int8, order='C') for sh in shapeOut]
        
        self.input_repr = [(it.get_attr('bit_width'), it.get_attr('fix_point')) for it in inputTensors]
        self.output_repr = [(ot.get_attr('bit_width'), ot.get_attr('fix_point')) for ot in outputTensors]
    
    def input_float_to_int8(self, x: np.ndarray) -> np.ndarray:
        BIT_WIDTH, PRECISION_BITS = self.input_repr[0]
        x = x * (2**PRECISION_BITS)
        x = np.floor(x)
        x = np.clip(x,-128, 127)
        return x.astype(np.int8)
    
    def output_int8_to_float(self, y: np.ndarray):
        BIT_WIDTH, PRECISION_BITS = self.output_repr[0]
        PRECISION = 1 / 2**PRECISION_BITS
        y = y * PRECISION
        return y.astype(np.float32)
    
    def process(self, x: np.ndarray):
        x = ...#TODO
        self.buff_in[0] = ...#TODO
        # start DPU thread
        job_id = ...#TODO
        self.dpu.wait(...)
        y = ... #TODO
        y = ... #TODO
        y = ... #TODO
        return y
    
    def __call__(self, x: np.ndarray) -> Any:
        return self.process(x)

Zainicjalizuj model sieci DPU, podając ścieżki do modelu i pliku `dpu.bit`.

In [8]:
net = NetworkDPU(xmodel_path='MiniResNet_qu.xmodel', 
                 dpu_path='dpu.bit')

vart::Runner@0x39a31f50


Tworzymy funkcję do ewaluacji modelu. Jeżeli ktoś zaimplementować Cross Entropy, to wartość loss będzie uwzględniania. W innym przypadku będzie zwracać 0 i nie będziemy na to zwracać uwagi.

In [9]:
def evaluation(model: NetworkDPU,
               data_loader: EvalLoader,
               criterion: CrossEntropyLoss,
               metric: AccuracyMetric,
               ) -> Tuple[float, float]:

    print(f"Running on platform: {platform.platform()}, "
          f"machine: {platform.machine()}, "
          f"python_version: {platform.python_version()}, "
          f"processor: {platform.processor()}, "
          f"system: {platform.system()}, "
          )
    total_loss: float = 0.0
    total_accuracy: float = 0.0
    samples_num: int = 0
    
    for i, (X, y_ref) in tqdm.tqdm(enumerate(data_loader),):
        y_pred = model(X)
        
        # calculate loss
        loss = criterion(y_pred, y_ref)
        
        # calculate accuracy
        accuracy = metric(y_pred, y_ref)

        total_loss += loss * y_pred.shape[0]
        total_accuracy += accuracy * y_pred.shape[0]
        samples_num += y_pred.shape[0]

    if samples_num == 0:
        return 0.0, 0.0

    return total_loss / samples_num, total_accuracy / samples_num


Uruchamiamy ewaluację. Porównaj wyniki uzyskane podczas ewaluacji modelu zmiennoprzecinkowego.

Oczekiwany jest przyrost ilości przetwarzanych danych na sekundę, przy minimalnej lub żadnej utracie dokładności.

In [10]:
with tm:
    loss, acc = evaluation(net, loader, criterion, metric)
    
print(str(tm))
print("Loss: ", loss)
print("Accuracy: ", acc)

Running on platform: Linux-5.4.0-1021-xilinx-zynqmp-aarch64-with-glibc2.29, machine: aarch64, python_version: 3.8.10, processor: aarch64, system: Linux, 


10000it [00:07, 1412.94it/s]

Execution time: 0:0:7:0, processed 10000 frames, throughput: 1428.5714285714287 fps.
Loss:  0.0
Accuracy:  0.982





## Udało Ci się wykonać wszystkie zadania! Gratulacje :)