In [1]:
import time

import keras
import numpy as np
from keras import backend as K
from keras.datasets import mnist
from keras.models import Model, load_model

import cv2
import matplotlib.pyplot as plt
import pynq.lib.dma
from pynq import MMIO, PL, DefaultHierarchy, Overlay, Xlnk

Using Theano backend.


In [2]:
class Convolutional_Neural_Network(DefaultHierarchy):
    def __init__(self, description):
        super().__init__(description)

    def loadweight(self, W, index, quant_scale):
        KerDim = W.shape[2]
        IFMCH = W.shape[1]
        OFMCH = W.shape[0]
        kernel_val = W.ravel() * quant_scale
        kernel = np.append([index, 0, KerDim, IFMCH, 0, OFMCH, 0], kernel_val)
        in_buffer = Xlnk().cma_array(shape=(kernel.shape[0]), dtype=np.int16)
        out_buffer = Xlnk().cma_array(shape=(kernel.shape[0]), dtype=np.int16)
        for i, v in enumerate(kernel):
            in_buffer[i] = v
        self.axi_dma_0.sendchannel.transfer(in_buffer)
        self.axi_dma_0.recvchannel.transfer(out_buffer)
        self.axi_dma_0.sendchannel.wait()
        self.axi_dma_0.recvchannel.wait()

    def execute(self, test_data, batch_size, input_ch, input_dim, output_ch, output_dim):
        input_mat = test_data[0:batch_size]
        input_val = np.append(
            [0, batch_size, 0, input_ch, input_dim, output_ch, output_dim], input_mat.ravel())
        in_buffer = Xlnk().cma_array(
            shape=(input_val.shape[0]), dtype=np.int16)
        out_buffer = Xlnk().cma_array(
            shape=(7 + output_ch * batch_size * output_dim * output_dim), dtype=np.int16)
        for i, v in enumerate(input_val):
            in_buffer[i] = v
        start_time = time.process_time()
        self.axi_dma_0.sendchannel.transfer(in_buffer)
        self.axi_dma_0.recvchannel.transfer(out_buffer)
        self.axi_dma_0.sendchannel.wait()
        self.axi_dma_0.recvchannel.wait()
        end_time = time.process_time()
        print("Elapsed Test Time: ", end_time-start_time)
        output_mat = out_buffer[7:].reshape(batch_size, -1).astype(np.float32)
        for i in range(batch_size):
            output_mat[i] = output_mat[i]/sum(output_mat[i])
        return output_mat

    @staticmethod
    def checkhierarchy(description):
        if 'axi_dma_0' in description['ip']:
            return True
        return False

In [3]:
num_classes = 10
batch_size = 127
quant_scale = 116

# input image dimensions
img_rows, img_cols = 28, 28

# the data, split between train and test sets
(x_train, y_train), (x_test, y_test) = mnist.load_data()

x_train = x_train.reshape(x_train.shape[0], img_rows, img_cols, 1)
x_test = x_test.reshape(x_test.shape[0], img_rows, img_cols, 1)
input_shape = (img_rows, img_cols, 1)

x_train = x_train.astype('float32')
x_test = x_test.astype('float32')
x_train /= 4
x_test /= 4
# x_train = x_train.astype('int8')
# x_test = x_test.astype('int8')
# x_train = x_train.astype('int16')
# x_test = x_test.astype('int16')
print('x_train shape:', x_train.shape)
print(x_train.shape[0], 'train samples')
print(x_test.shape[0], 'test samples')

# convert class vectors to binary class matrices
y_train = keras.utils.to_categorical(y_train, num_classes)
y_test = keras.utils.to_categorical(y_test, num_classes)

x_train shape: (60000, 28, 28, 1)
60000 train samples
10000 test samples


In [4]:
model = load_model('mnist_cnn_model_int8.h5')

In [5]:
model.summary()

_________________________________________________________________
Layer (type)                 Output Shape              Param #   
conv2d_1 (Conv2D)            (None, 26, 26, 16)        144       
_________________________________________________________________
conv2d_2 (Conv2D)            (None, 24, 24, 32)        4608      
_________________________________________________________________
max_pooling2d_1 (MaxPooling2 (None, 6, 6, 32)          0         
_________________________________________________________________
dropout_1 (Dropout)          (None, 6, 6, 32)          0         
_________________________________________________________________
flatten_1 (Flatten)          (None, 1152)              0         
_________________________________________________________________
dense_1 (Dense)              (None, 128)               147456    
_________________________________________________________________
dropout_2 (Dropout)          (None, 128)               0         
__________

In [6]:
overlay = Overlay('nn.bit')

In [7]:
weight = model.get_layer('conv2d_1').get_weights()
weight = np.transpose(weight)
overlay.memory.loadweight(weight, 1, quant_scale)
weight = model.get_layer('conv2d_2').get_weights()
weight = np.transpose(weight)
overlay.memory.loadweight(weight, 2, quant_scale)
weight = model.get_layer('dense_1').get_weights()
weight = np.transpose(weight)
overlay.memory.loadweight(weight, 3, quant_scale)
weight = model.get_layer('dense_2').get_weights()
weight = np.transpose(weight)
overlay.memory.loadweight(weight, 4, quant_scale)

In [17]:
%%time
count = 0
result = overlay.memory.execute(x_test, batch_size, 1, 28, 10, 1)
for i in range(batch_size):
    if result[i].argmax() == y_test[i].argmax():
        count = count + 1
score = count/batch_size
print('FPGA accuracy:', score)

Elapsed Test Time:  0.4323413830000078
FPGA accuracy: 0.984251968503937
CPU times: user 760 ms, sys: 10 ms, total: 770 ms
Wall time: 770 ms


In [18]:
%%time
count = 0
result = model.predict(x_test[0:batch_size])
for i in range(batch_size):
    if result[i].argmax() == y_test[i].argmax():
        count = count + 1
score = count/batch_size
print('Arm accuracy:', score)

Arm accuracy: 1.0
CPU times: user 1.54 s, sys: 40 ms, total: 1.58 s
Wall time: 1.57 s


In [19]:
%%time
score = model.evaluate(x_test[0:batch_size], y_test[0:batch_size])
print('Arm accuracy:', score[1].astype(np.float32))

Arm accuracy: 1.0
CPU times: user 1.6 s, sys: 20 ms, total: 1.62 s
Wall time: 1.62 s


In [None]:
# 设置单层权值
# x=[1,2,3,4,5,6,7,8,9]
# x = np.transpose(x)
# x=np.reshape(x,(1,3,3,1,1))
# model.get_layer('conv2d_1').set_weights(x)

In [None]:
# 获取中间结果
# intermediate_layer_model = Model(inputs=model.input,
#                                  outputs=model.get_layer('conv2d_1').output)
# pre = intermediate_layer_model.predict(x_test[0].reshape(1,28,28,1))

In [None]:
# 展示图片
# result = np.reshape(x_test[0], (28,28))
# plt.figure(figsize=(4, 4))
# plt.imshow(result, 'gray')