In [3]:
import numpy as np
import pynq
import time
import sys
np.set_printoptions(precision = 4, suppress=True, linewidth=110)

#load overlay, as we did for GPIO. 
overlay = pynq.Overlay("bd.bit")
# print(overlay.ip_dict)

dma2pl = overlay.axi_dma_ps2pl.sendchannel
dma2ps = overlay.axi_dma_pl2ps.recvchannel

frame_size = 16
fft_point = 128
fft_point_full = 256
in_feature = 16
hidden_feature = 32
out_feature = 3


# Number of fraction bits;
Q_bit = 8
# Number type;
dtype = np.int32

def np_relu(a, dtype_=dtype):
    dim = a.shape[0]
    b = np.zeros(dim, dtype=dtype_)
    for i in range(dim):
        if (a[i] > 0): b[i] = a[i]
        else: b[i] = 0
    return  b

def np_crossentropy(a):
    return np.exp(a) / np.sum(np.exp(a))

# Return the converted int number of error;
def f2int(a, Q_bit_=Q_bit, dtype_=dtype):
    return dtype_(a * 2**Q_bit_), a - np.float32(dtype_(a * 2**Q_bit_)/2**Q_bit_)

def model(a):
    c0 = w1_f @ a + b1_f
    c1 = np_relu(c0, dtype_=np.float32)
    c2 = w2_f @ c1 + b2_f
    return c2, c1, c0

def model_int(a):
    c0 = dtype((w1_int @ a) / 2**Q_bit) + b1_int
    c1 = np_relu(c0)
    c2 = dtype((w2_int @ c1) / 2**Q_bit) + b2_int
    return np.float32(c2/2**Q_bit), c1, c0

def cutframe(audiodata, frame_length=256, frame_overlap=0):
    frame_move = frame_length - frame_overlap
    audiolength = len(audiodata)
    num_frame = int(np.ceil((audiolength - frame_overlap) / frame_move))
    audio_frames = np.zeros([num_frame, frame_length])
    # Add zero pads for the last frame if needed;
    pad_length = int((num_frame-1)*frame_move+frame_length) - audiolength
    if (pad_length > 0):
        pad = np.zeros(pad_length)
        pad_audiodata = np.concatenate((audiodata, pad))
    else:
        pad_audiodata = audiodata
    for i in range(num_frame):
        audio_frames[i] = pad_audiodata[i*frame_move:i*frame_move+frame_length]
    return audio_frames

def label(a):
    if (a == 0): b = 'cat'
    elif (a == 1): b = 'apple'
    else: b = 'box'
    return b


rawdata_f = np.load('./data/raw_data.npy')
data_f = np.load('./data/stftdata.npy')
w1_f = np.load('./data/w1.npy')
b1_f = np.load('./data/b1.npy')
w2_f = np.load('./data/w2.npy')
b2_f = np.load('./data/b2.npy')

# Convert to desired type;
rawdata_int, _ = f2int(rawdata_f)
data_int, _ = f2int(data_f)
w1_int, _ = f2int(w1_f)
b1_int, _ = f2int(b1_f)
w2_int, _ = f2int(w2_f)
b2_int, _ = f2int(b2_f)

w1 = w1_int.reshape(-1)
w2 = w2_int.reshape(-1)

i = 0
j = 4

test_raw = rawdata_int[i][j]
test_raw_framed = cutframe(test_raw)
test_feature_f = data_f[i][j]
test_feature_int = data_int[i][j]


# print('w1 is:', w1_int)
# print('b1 is:', b1_int)
# print('w2 is:', w2o)
# print('b2 is:', b2)
    
#set up PYNQ data arrays (can be used as numpy arrays, but include physical memory addresses for DMA access)
data_w1 = pynq.allocate(shape=(hidden_feature * in_feature,), dtype=np.int32)
data_b1 = pynq.allocate(shape=(hidden_feature,), dtype=np.int32)
data_w2 = pynq.allocate(shape=(out_feature * hidden_feature,), dtype=np.int32)
data_b2 = pynq.allocate(shape=(out_feature,), dtype=np.int32)
data_input_raw = pynq.allocate(shape=(frame_size * fft_point_full,), dtype=np.int32)
result_input_x1 = pynq.allocate(shape=(in_feature,), dtype=np.int32)
result_input_x2 = pynq.allocate(shape=(hidden_feature,), dtype=np.int32)
result_final = pynq.allocate(shape=(out_feature,), dtype=np.int32)


# load all the model parameters;
np.copyto(data_w1, w1)
np.copyto(data_b1, b1_int)
np.copyto(data_w2, w2)
np.copyto(data_b2, b2_int)
dma2pl.transfer(data_w1)


start = time.time()
# Set up dma transfer;
dma2pl.wait()
dma2pl.transfer(data_b1)
dma2pl.wait()
dma2pl.transfer(data_w2)
dma2pl.wait()
dma2pl.transfer(data_b2)
dma2pl.wait()

# We record the time to process the raw data;
start = time.time()

# Load raw input and set dma transfer;
np.copyto(data_input_raw, test_raw)
dma2pl.transfer(data_input_raw)
dma2pl.wait()


# Read input_x1;
dma2ps.transfer(result_input_x1)
dma2ps.wait()

# Ideally the result should be read at this time, result_input_x1 is used for debug only.
end = time.time()

# Read input_x2;
dma2ps.transfer(result_input_x2)
dma2ps.wait()

# Read final results;
dma2ps.transfer(result_final)
dma2ps.wait()

end = time.time()

test_feature = np.zeros([16, 128])
start2 = time.time()
for i in range(16):
    test_feature[i] = np.abs(np.real(np.fft.fft(test_raw_framed[i])))[0:128]
out_idealf, x2_idealf, c0_idealf = model(test_feature_f)
end2 = time.time()

r_score = np_crossentropy(out_idealf)
t_score = np_crossentropy(np.float32(result_final/2**Q_bit))


print('Test case for cat.')

print('Ideal score: {}, classified as {}'.format(r_score, label(np.argmax(r_score))))
print('Actual Score: {}, classified as {}'.format(t_score, label(np.argmax(t_score))))

print('FPGA time: {:4f}s'.format(end - start))
print('Python time: {:4f}s'.format(end2 - start2))
print('Speed up: {:4f}X'.format((end2-start2)/(end-start)))


Test case for cat.
Ideal score: [ 0.9508  0.0168  0.0324], classified as cat
Actual Score: [ 0.9434  0.0191  0.0374], classified as cat
FPGA time: 0.003662s
Python time: 0.005715s
Speed up: 1.560380X
