# DMA AXI Stream FIFO Test for PYNQ

In [None]:
from pynq import Overlay
ol = Overlay("axi_stream.bit")

ol.download()
ol.bitstream.timestamp

from pynq import PL
PL.bitfile_name

ol.is_loaded()

# Testing dma class provided in Base PYNQ  

In [32]:
from pynq.drivers.dma import DMA
dma_base = int(PL.ip_dict["SEG_axi_dma_0_Reg2"][0],16)
print(hex(dma_base))
dma = DMA(dma_base, direction=3)

In [109]:
import numpy as np
import cffi
MAX_NUM_SAMPLES=10
dma.create_buf(MAX_NUM_SAMPLES*8)#= bytearray(b'abcdefgh')
dma.transfer(MAX_NUM_SAMPLES*8, direction=1)

In [112]:
#vin = {}
vin=dma.get_buf(32)

In [113]:
import ctypes
import numpy as np
import pprint
list_val = []
temp2=[]
for i in range(1):
    for j in range(31,-1,-1):
            list_val.append(str((vin[i] & 1<<j)>>j))
    temp = ','.join(list_val)
pprint.pprint(temp)

'0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0'


# Creating DMA code based on codes available for Zybo/parallela 
# Trying without compiling drivers to kernel
# At this point, dont know whether interrupts can be accessed from user-space

In [118]:
from pynq import MMIO
axis = MMIO(int(PL.ip_dict["SEG_axi_dma_0_Reg2"][0],16),0x00010000)

#*********************************************************************#
#*                   define all register locations                   *#
#*            based on "one blog and one github code"                *#
#* FROM: https://lauri.xn--vsandi-pxa.com/hdl/zynq/xilinx-dma.html   *#
#*      & FROM: https://github.com/OpenDGPS/zynq-axi-dma-sg          *#
#*********************************************************************#
MM2S_CONTROL_REGISTER       =0x00
MM2S_STATUS_REGISTER        =0x04
MM2S_START_ADDRESS          =0x18
MM2S_LENGTH                 =0x28
MM2S_CURDESC                =0x08 # must align =0x40 addresses
MM2S_CURDESC_MSB            =0x0C # unused with 32bit addresses
MM2S_TAILDESC               =0x10 # must align =0x40 addresses
MM2S_TAILDESC_MSB           =0x14 # unused with 32bit addresses

S2MM_CONTROL_REGISTER       =0x30
S2MM_STATUS_REGISTER        =0x34
S2MM_DESTINATION_ADDRESS    =0x48
S2MM_LENGTH                 =0x58
S2MM_CURDESC                =0x38 # must align =0x40 addresses
S2MM_CURDESC_MSB            =0x3C # unused with 32bit addresses
S2MM_TAILDESC               =0x40 # must align =0x40 addresses
S2MM_TAILDESC_MSB           =0x44 # unused with 32bit addresses

SG_CTL                      =0x2C # CACHE CONTROL


#*********************************************************************#
#*                 define MEMORY MAP locations                       *#
#*          consult the README for the exact memory layout           *#
#*         FROM: https://github.com/OpenDGPS/zynq-axi-dma-sg         *#
#*********************************************************************#

HP0_DMA_BUFFER_MEM_ADDRESS         = 0x20000000
AXI_DMA_REGISTER_LOCATION          = 0x40400000 #AXI DMA Register Address Map
DESCRIPTOR_REGISTERS_SIZE          = 0xFFFF
SG_DMA_DESCRIPTORS_WIDTH           = 0xFFFF
MEMBLOCK_WIDTH                     = 0x3FFFFFF#size of mem used by s2mm and mm2s
BUFFER_BLOCK_WIDTH                 = 0x7D0000 #size of memory block per descriptor in bytes
NUM_OF_DESCRIPTORS                 = 0x7 #number of descriptors for each direction
HP_DMA_BUFFER_MEM_ADDRESS         = 0x30000000  # Using S_AXI_HP2 -- (My Address Editor shows it is connected to 0x00 to 0x1FFFFFFF
HP_MM2S_DMA_BASE_MEM_ADDRESS       = HP_DMA_BUFFER_MEM_ADDRESS
HP_S2MM_DMA_BASE_MEM_ADDRESS       = HP_DMA_BUFFER_MEM_ADDRESS + MEMBLOCK_WIDTH + 1
HP_MM2S_DMA_DESCRIPTORS_ADDRESS    = HP_MM2S_DMA_BASE_MEM_ADDRESS
HP_S2MM_DMA_DESCRIPTORS_ADDRESS    = HP_S2MM_DMA_BASE_MEM_ADDRESS
HP_MM2S_SOURCE_MEM_ADDRESS         = HP_MM2S_DMA_BASE_MEM_ADDRESS + SG_DMA_DESCRIPTORS_WIDTH + 1
HP_S2MM_TARGET_MEM_ADDRESS         = HP_S2MM_DMA_BASE_MEM_ADDRESS + SG_DMA_DESCRIPTORS_WIDTH + 1

In [119]:
print(hex(axis.read(MM2S_CONTROL_REGISTER)))
print(hex(axis.read(MM2S_STATUS_REGISTER)))
print(hex(axis.read(MM2S_START_ADDRESS)))
print(hex(axis.read(MM2S_LENGTH)))

print(hex(axis.read(S2MM_CONTROL_REGISTER)))
print(hex(axis.read(S2MM_STATUS_REGISTER)))
print(hex(axis.read(S2MM_DESTINATION_ADDRESS)))
print(hex(axis.read(S2MM_DESTINATION_ADDRESS)))

print(hex(axis.read(SG_CTL)))

0x10003
0x10008
0x0
0x0
0x10003
0x10008
0x0
0x0
0x3


In [None]:
def dma_status(REGISTER_OFFSET):
    global axis
    status=axis.read(REGISTER_OFFSET)
    if (REGISTER_OFFSET==0x34):
        print("Stream to memory-mapped status: "+str(hex(status))+"@"+str(REGISTER_OFFSET)+": ",end="", flush=True)  
    else: 
        print("Memory-mapped to stream status: "+str(hex(status))+"@"+str(REGISTER_OFFSET)+": ",end="", flush=True)  
    print("halted") if (status & 0x00000001) else print("running ",end="", flush=True)
    if (status & 0x00000002): print("idle",end="", flush=True)  
    if (status & 0x00000008): print("SGIncld",end="", flush=True)  
    if (status & 0x00000010): print("DMAIntErr",end="", flush=True) 
    if (status & 0x00000020): print("DMASlvErr",end="", flush=True) 
    if (status & 0x00000040): print("DMADecErr",end="", flush=True)
    if (status & 0x00000100): print("SGIntErr",end="", flush=True) 
    if (status & 0x00000200): print("SGSlvErr",end="", flush=True) 
    if (status & 0x00000400): print("SGDecErr",end="", flush=True) 
    if (status & 0x00001000): print("IOC_Irq",end="", flush=True) 
    if (status & 0x00002000): print("Dly_Irq",end="", flush=True) 
    if (status & 0x00004000): print("Err_Irq",end="", flush=True) 
    print("\n");

def dma_set(offset,value):
    global axis
    axis.write(offset,value)
    #print(hex(offset))

def dma_get(offset):
    global axis
    val=axis.read(offset)
    #print(hex(val))
    return val

def memdump(virt_addr,byteCount):
    #global axis
    for offset in range(0,byteCount):
        print("LOC ",str(offset), str(hex(virt_addr.read(offset*4))));
        if offset%4==3: print(" ") 
    print("\n") 

def memset(virt_addr,data,byteCount):
    #global axis
    for offset in range(0,byteCount):
        virt_addr.write(offset*4,data)
        #print("LOC ",str(offset), str(hex()));
        #if offset%4==3: print(" ") 
    #print("\n") 
    
def dma_mm2s_sync():
    mm2s_status=dma_get(MM2S_STATUS_REGISTER)
    while(~(mm2s_status & 1<<12) | ~(mm2s_status & 1<<1) ):
        #dma_status(S2MM_STATUS_REGISTER)
        #dma_status(MM2S_STATUS_REGISTER)
        mm2s_status=dma_get(MM2S_STATUS_REGISTER)

def dma_s2mm_sync():
    s2mm_status=dma_get(S2MM_STATUS_REGISTER)
    while(~(mm2s_status & 1<<12) | ~(mm2s_status & 1<<1) ):
        #dma_status(S2MM_STATUS_REGISTER)
        #dma_status(MM2S_STATUS_REGISTER)
        s2mm_status=dma_get(S2MM_STATUS_REGISTER)

In [None]:
# Based on https://lauri.xn--vsandi-pxa.com/hdl/zynq/xilinx-dma.html Code
srcAddr = MMIO(0x0e000000,0x00010000)
dstAddr= MMIO(0x0f000000,0x00010000)
srcAddr.write(0,0x11223344)
memset(dstAddr,0,32)
print("Source memory block:      ")
memdump(srcAddr,32);
print("Destination memory block:      ")
memdump(dstAddr,32);

In [None]:
print("Resetting DMA")    
dma_set(S2MM_CONTROL_REGISTER, 4)
dma_set(MM2S_CONTROL_REGISTER, 4)
dma_status(S2MM_STATUS_REGISTER)
dma_status(MM2S_STATUS_REGISTER)

print("Halting DMA")   
dma_set(S2MM_CONTROL_REGISTER, 0);
dma_set(MM2S_CONTROL_REGISTER, 0);
dma_status(S2MM_STATUS_REGISTER);
dma_status(MM2S_STATUS_REGISTER);

print("Writing destination address..");
dma_set(S2MM_DESTINATION_ADDRESS, 0x0f000000);
dma_status(S2MM_STATUS_REGISTER);

print("Writing source address...");
dma_set(MM2S_START_ADDRESS, 0x0e000000); 
dma_status(MM2S_STATUS_REGISTER);

print("Starting S2MM channel with all interrupts masked...");
dma_set(S2MM_CONTROL_REGISTER, 0xf001);
dma_status(S2MM_STATUS_REGISTER);

print("Starting MM2S channel with all interrupts masked...");
dma_set(MM2S_CONTROL_REGISTER, 0xf001);
dma_status(MM2S_STATUS_REGISTER);

print("Writing S2MM transfer length...");
dma_set(S2MM_LENGTH, 32);
dma_status(S2MM_STATUS_REGISTER);

print("Writing MM2S transfer length...");
dma_set(MM2S_LENGTH, 32);
dma_status(MM2S_STATUS_REGISTER);

print("Waiting for MM2S synchronization..");
#dma_mm2s_sync(); #(GETS STUCK HERE IF SYNC IS ENABLED)

print("Waiting for S2MM sychronization...");
#dma_s2mm_sync(); 

print("Source memory block:      ")
memdump(srcAddr,32);
print("Destination memory block:      ")
memdump(dstAddr,32);
dma_status(S2MM_STATUS_REGISTER);
dma_status(MM2S_STATUS_REGISTER);
