In [1]:
from __future__ import print_function

In [2]:
import uio
import numpy as np

reload(uio)


def to_bin(x):
    vv = [ (x>>(32-(i+1)*4))&0xF for i in range(8)]
    return '{0:04b}_{1:04b}|{2:04b}_{3:04b}|{4:04b}_{5:04b}|{6:04b}_{7:04b}'.format( *vv )


axi_dma_direct_dtype = np.dtype([
        ('cr'        , '<u4'),
        ('st'        , '<u4'),
        ('reserved1' , '<u4',4),
        ('addr_lo'   , '<u4'),
        ('addr_hi'   , '<u4'),
        ('reserved2' , '<u4',2),
        ('length'    , '<u4'),
        ('pad'       , '<u4'),])

def dma_reset(dma):
    dma.cr = 4
    dma.cr = 0
    
def dma_idle(dma):
    return (dma.st&2) == 2

def dma_halted(dma):
    return (dma.st&1) == 1


class AxiDMA:
    def __init__(self, name_or_idx='dma'):
        dma_uio = uio.UIO(name_or_idx)
        mm2s, s2mm = dma_uio.as_recarray(2, dtype=axi_dma_direct_dtype)
        
        self._dma_uio = dma_uio
        self.mm2s = mm2s
        self.s2mm = s2mm
        self._streams = (mm2s, s2mm)

        if self.halted() == False:
            self.reset()

    def __repr__(self):
        #TODO: dump state instead
        return self._dma_uio.__repr__()
    
    def reset(self):
        map(dma_reset, self._streams)

    def idle(self):
        return dma_idle(self.s2mm) and dma_idle(self.mm2s)

    def halted(self):
        return dma_halted(self.s2mm) and dma_halted(self.mm2s)

    def _launch_stream(self, ss, buf, irq_flags = 0x1):
        phy_addr, sz = buf
        ss.addr_lo = phy_addr
        ss.cr = (irq_flags<<12)|1
        ss.length = sz

    def launch(self, src_buf, dst_buf, enable_interrupts = True):
        irq_flags = 0x1 if enable_interrupts else 0
        
        if enable_interrupts:
            self._dma_uio.enable_interrupts()
            
        self._launch_stream(self.s2mm, dst_buf, irq_flags)
        self._launch_stream(self.mm2s, src_buf, irq_flags)
        
        
    def wait(self, n_iter=2):
        x = self._dma_uio.wait_for_interrupt(reenable=True)
        if x < n_iter: #wait for second interrupt
            x = self._dma_uio.wait_for_interrupt()
            
        result = self.idle()
        self.reset()
        
        return result

        
        

dma = AxiDMA('dma')

dma

UIO: dma (uio2) sz:0x10000 @0x40400000

In [3]:
print('\n'.join([to_bin(x) for x in [dma.mm2s.cr, dma.s2mm.cr, dma.mm2s.st,dma.s2mm.st]]))

0000_0000|0000_0001|0000_0000|0000_0010
0000_0000|0000_0001|0000_0000|0000_0010
0000_0000|0000_0000|0000_0000|0000_0001
0000_0000|0000_0000|0000_0000|0000_0001


In [7]:
reload(uio)
mem_uio = uio.UIO('scratch_mem')

SZ=(1<<19)

data = mem_uio.as_ndarray()

src_data = data[:SZ]
dst_data = data[SZ:SZ*2]

src_data[:] = np.random.randint(0,256,SZ)
dst_data[:] = 0xFF

print("SRC:",src_data[:4])
print("DST:",dst_data[:4])

SRC: [191 166 174 183]
DST: [255 255 255 255]


In [10]:
dma.launch(mem_uio.phy_buf(src_data), mem_uio.phy_buf(dst_data), enable_interrupts=True)
if dma.wait():
    print('DMA Completed Succesfully')
else:
    print('DMA FAILED')



DMA Completed Succesfully


In [12]:
if (src_data == dst_data).all():
    print('SUCCESS: Data copied as expected')
else:
    print('FAILED: dst and src do not match up')

print("SRC:",src_data[:4],'...')
print("DST:",dst_data[:4],'...')

SUCCESS: Data copied as expected
SRC: [191 166 174 183] ...
DST: [191 166 174 183] ...


In [None]:
%%sh

date
whoami
ls -l /dev/uio* /dev/mem
cat /sys/class/uio/uio?/maps/map0/name
cat /sys/class/uio/uio?/name
ls /sys/class/uio/uio0/maps/map0

In [None]:
#!dtc -I fs -O dts /proc/device-tree/ 