In [None]:
# import pynq and numpy libraries
from pynq import Overlay
from pynq import allocate
import numpy as np

In [None]:
class AxiStreamIP:

    def __init__(self, hierarchy):
        self.__dma = hierarchy.axi_dma_0

    def run(self, in_data, window):
        out_dim = in_data.shape[0] - window + 1
        out_data = np.empty(out_dim, np.float32)
        buf_in = allocate(in_data.shape[0] + 2, np.int32, cacheable=True)
        buf_out = allocate(out_dim, np.float32, cacheable=True)
        buf_in[0] = in_data.shape[0]
        buf_in[1] = window
        buf_in[2:] = in_data[:]
        buf_in.flush()

        self.__dma.recvchannel.transfer(buf_out)
        self.__dma.sendchannel.transfer(buf_in)

        self.__dma.sendchannel.wait()
        self.__dma.recvchannel.wait()

        buf_out.invalidate()
        out_data = buf_out.copy()
        buf_in.freebuffer()
        buf_out.freebuffer()
        return out_data

In [None]:
# load overlay
overlay = Overlay("./design_1_wrapper.bit")

In [None]:
# access to the hierarchy and dma
hier = overlay.moving_average_hier
dma = hier.axi_dma_0

In [None]:
# moving average software function
def moving_average(data, window):
    data_out_size = data.shape[0] - window + 1
    data_out = np.empty(data_out_size, np.float32)
    for i in range(data_out_size):
        data_out[i] = np.sum(data[i:i + window])/window
    return data_out   

In [None]:
# input buffer dimension and window value
dim = 1000
window = 3

In [None]:
# create software input data
data_in = np.random.randint(-1000, 1000, dim, np.int32)

In [None]:
# create software output data
data_out = moving_average(data_in, window)

In [None]:
# create hardware buffers
buf_in = allocate(dim + 2, np.int32)
buf_out = allocate(dim - window + 1, np.float32)

In [None]:
# fill input buffer
buf_in[0] = dim
buf_in[1] = window
buf_in[2:] = data_in[:]

In [None]:
# send and receive data
dma.recvchannel.transfer(buf_out)
dma.sendchannel.transfer(buf_in)
# wait for data
dma.sendchannel.wait()
dma.recvchannel.wait()

In [None]:
# check results
print(np.all(buf_out == data_out))

In [None]:
# run experiments on window values from 1 to 16
max_window = 16
print("Starting tests w/ class", flush=True)
myIP = AxiStreamIP(hier)
for i in range(1, max_window + 1):
    print("Test with window = %d ..." % (i), end="")
    data_in = np.random.randint(-1000, 1000, dim, np.int32)
    fpga_out = myIP.run(data_in, i)
    data_out = moving_average(data_in, i)
    if np.all(fpga_out == data_out):
        print("Passed!", flush=True)
    else:
        print("Failed!", flush=True)