# 脉动阵列 - 卷积测试
## 1. 加载Overlay

In [1]:
import time
import random
from pynq import Overlay
import numpy as np
from pynq import Xlnk

# 加载Overlay
overlay = Overlay("systolic_array.bit")
print("Overlay downloaded successfully!")



Overlay downloaded successfully!


## 2. 定义IP核驱动

In [2]:
systolic_array_ip = overlay.systolic_array_0
xlnk = Xlnk()

# 脉动阵列驱动函数
def RunSystolic(array, din_a, din_b, bias, out):
    array.write(0x10, din_a.shape[0])
    array.write(0x18, din_a.shape[1])
    array.write(0x20, din_b.shape[1])
    array.write(0x28, din_a.physical_address)
    array.write(0x30, din_b.physical_address)
    array.write(0x38, bias.physical_address)
    array.write(0x40, out.physical_address)
    array.write(0, (array.read(0) & 0x80) | 0x01)
    tp = array.read(0)
    while not ((tp >> 1) & 0x1):
        tp = array.read(0)

## 3. 生成测试数据

In [3]:
channel  = 3    # 输入特征图通道数
feat_row = 28   # 输入特征图高
feat_col = 28   # 输入特征图宽
core_num = 128  # 卷积核个数/输出特征图通道数
core_w   = 3    # 卷积核边长
stride   = 1    # 卷积步长

out_row = (int)((feat_row - core_w)/stride + 1)
out_col = (int)((feat_col - core_w)/stride + 1)

core_size = core_w*core_w
out_size  = out_row*out_col

feature = np.zeros((channel, feat_row, feat_col), dtype = np.float32)
core    = np.zeros((core_num, channel, core_w, core_w), dtype = np.float32)
ref     = np.zeros((core_num, out_row, out_col), dtype = np.float32)

# 在PS端的DRAM中为IP核的输入输出数据分配存储空间
buf_a = xlnk.cma_array(shape = (core_num, channel*core_size), cacheable = 0, dtype = np.float32)
buf_b = xlnk.cma_array(shape = (channel*core_size, out_size), cacheable = 0, dtype = np.float32)
buf_c = xlnk.cma_array(shape = (core_num, out_size), cacheable = 0, dtype = np.float32)
bias  = xlnk.cma_array(shape = (core_num), cacheable = 0, dtype = np.float32)

# 随机生成测试数据
for ch in range(channel):
    for i in range(feat_row):
        for j in range(feat_col):
            feature[ch][i][j] = random.uniform(-20, 20)

for cnum in range(core_num):
    for ch in range(channel):
        for m in range(core_w):
            for n in range(core_w):
                core[cnum][ch][m][n] = random.uniform(-1, 1)
    
    bias[cnum] = random.uniform(-2, 5)

print("ready for test")

ready for test


## 4. 运行测试
### 4.1 软件卷积

In [4]:
pt0 = time.clock()

for ch_o in range(core_num):
    for r_o in range(out_row):
        for c_o in range(out_col):
            for ch_i in range(channel):
                for r in range(core_w):
                    for c in range(core_w):
                        ref[ch_o][r_o][c_o] += feature[ch_i][r_o*stride + r][c_o*stride + c] * core[ch_o][ch_i][r][c]
            ref[ch_o][r_o][c_o] += bias[ch_o]

pt1 = time.clock()
time_sw = pt1 - pt0

print("pure software: %fs" % time_sw)

pure software: 77.330211s


### 4.2 硬件卷积

In [6]:
pt0 = time.clock()

# TODO: 调整卷积核与特征图，以适应脉动阵列
# buf_a = xlnk.cma_array(shape = (core_num, channel*core_size), cacheable = 0, dtype = np.float32)
# buf_b = xlnk.cma_array(shape = (channel*core_size, out_size), cacheable = 0, dtype = np.float32)

for ch_o in range(core_num):
    for ch_i in range(channel):
        for r in range(core_w):
            for c in range(core_w):
                buf_a[ch_o][ch_i*core_size+r*core_w+c]=core[ch_o][ch_i][r][c]

for ch_i in range(channel):
    for r in range(core_w):
        for c in range(core_w):
            for r_o in range(out_row):
                for c_o in range(out_col):
                    buf_b[ch_i*core_size+r*core_w+c][r_o*out_col+c_o] = feature[ch_i][r_o*stride + r][c_o*stride + c]

# 利用硬件矩阵乘法实现卷积加速
RunSystolic(systolic_array_ip, buf_a, buf_b, bias, buf_c)

# 调整输出特征图的形状
buf_c = buf_c.reshape(core_num, out_row, out_col)

pt1 = time.clock()
time_hw = pt1 - pt0

print("hardware-accelerated: %fs" % time_hw)
print("speedup: %.2f" % (time_sw/time_hw))

hardware-accelerated: 7.301778s
speedup: 10.59


### 4.3 校验结果，计算加速比

In [7]:
def relative_err(ref, val):
    err = val - ref if val > ref else ref - val
    return err/ref if ref != 0 else err

flag = True

for ch in range(core_num):
    if flag is False:
        break
    for r in range(out_row):
        if flag is False:
            break
        for c in range(out_col):
            if relative_err(ref[ch][r][c], buf_c[ch][r][c]) > 0.01:
                print("Test failed at (%d, %d, %d)" % (ch, r, c))
                flag = False
                break

if flag:
    print("Test Passed!")

print("\nreference result: ")
print(ref)
print("\narray output:")
print(buf_c)

Test Passed!

reference result: 
[[[-14.59444046 -11.12211227  -8.13620186 ..., -14.64593506  58.6404686
    49.19946289]
  [ -7.53663969 -45.35098267 -96.07517242 ...,  16.10256958 -22.26272202
   -23.00614166]
  [  9.68885994  12.84560966  33.2521553  ..., -57.01199341  28.35104752
    50.14870834]
  ..., 
  [ 10.91507912  39.08739471 -39.50110245 ..., -19.86312103   6.20232821
     6.09776068]
  [ -0.427136    54.65568542 -13.18335533 ...,  -4.2653861   30.88496208
    -7.62136126]
  [ 48.32965851  -8.37588882   4.382339   ...,  -6.00511026 -18.4988327
   -26.62737083]]

 [[ 34.82493591 -13.22342682 -31.79992676 ...,  12.9793005  -52.89299011
   -59.79290009]
  [ 54.56661224 -14.59963799  43.50239182 ..., -32.43840408   1.36939502
   -41.81139755]
  [  9.21208     43.48594666  -3.61485887 ...,  -0.63792181 -74.96201324
    30.87534332]
  ..., 
  [-26.09944725  42.20083237 -15.62377357 ...,  19.19475365  -6.09777355
    12.60785484]
  [-38.53272629 -28.89927673 -13.68227768 ...,  57.