In [1]:
from pynq import Overlay
from pynq import DefaultIP
import pynq.lib.dma
from pynq import Xlnk
import numpy as np

class AddDriver(DefaultIP):
    def __init__(self,description):
        super().__init__(description=description)
    
    bindto = ['xilinx.com:hls:add:1.0']
    
    def add(self,a,b):
        self.write(0x10,a)
        self.write(0x18,b)
        return self.read(0x20)
    
class ConstantMultiplyDriver(DefaultIP):
    def __init__(self, description):
        super().__init__(description=description)

    bindto = ['xilinx.com:hls:vector_multiplier:1.0']

    @property
    def constant(self):
        return self.read(0x10)

    @constant.setter
    def constant(self, value):
        self.write(0x10, value)
        
    def constant_list(self,value):
        for i in range(len(value)):
            self.write(0x3f + (i*4),value[i])
            

In [4]:
ol = Overlay('vector_multiplier.bit')
dma = ol.custom_multiplier.axi_dma_0
multiply = ol.custom_multiplier.vector_multiplier



In [8]:


xlnk = Xlnk()
in_buffer = xlnk.cma_array(shape=(5,), dtype=np.uint32)
out_buffer = xlnk.cma_array(shape=(5,), dtype=np.uint32)

for i in range(5):
    in_buffer[i] = i*5
# x = [1,2,3,4,5]
multiply.constant = 5
dma.sendchannel.transfer(in_buffer)
dma.recvchannel.transfer(out_buffer)
dma.sendchannel.wait()
dma.recvchannel.wait()

print(out_buffer)

[  0  25  50  75 100]


  This is separate from the ipykernel package so we can avoid doing imports until


In [6]:
from pynq import DefaultHierarchy

class StreamMultiplyDriver(DefaultHierarchy):
    def __init__(self, description):
        super().__init__(description)

    def stream_multiply(self, stream, constant):
        self.multiply.constant = constant
        with xlnk.cma_array(shape=(len(stream),), dtype=np.uint32) as in_buffer,\
             xlnk.cma_array(shape=(len(stream),), dtype=np.uint32) as out_buffer:
            for i, v, in enumerate(stream):
                in_buffer[i] = v
            self.multiply_dma.sendchannel.transfer(in_buffer)
            self.multiply_dma.recvchannel.transfer(out_buffer)
            self.multiply_dma.sendchannel.wait()
            self.multiply_dma.recvchannel.wait()
            result = out_buffer.copy()
        return result

    @staticmethod
    def checkhierarchy(description):
        if 'multiply_dma' in description['ip'] and 'multiply' in description['ip']:
            return True
        return False