In [2]:
import time
from io import BytesIO
from pynq import Overlay, DefaultIP


XAD_FIFO_ALL_CONTROL_ADDR_AP_CTRL = 0x00
XAD_FIFO_ALL_CONTROL_ADDR_GIE = 0x04
XAD_FIFO_ALL_CONTROL_ADDR_IER = 0x08
XAD_FIFO_ALL_CONTROL_ADDR_ISR = 0x0c
XAD_FIFO_ALL_CONTROL_ADDR_DIV_DATA = 0x10
XAD_FIFO_ALL_CONTROL_BITS_DIV_DATA = 32
XAD_FIFO_ALL_CONTROL_ADDR_THRESHOLD_DATA = 0x18
XAD_FIFO_ALL_CONTROL_BITS_THRESHOLD_DATA = 32
XAD_FIFO_ALL_CONTROL_ADDR_START_R_DATA = 0x20
XAD_FIFO_ALL_CONTROL_BITS_START_R_DATA = 32
XAD_FIFO_ALL_CONTROL_ADDR_START_R_CTRL = 0x24
XAD_FIFO_ALL_CONTROL_ADDR_END_R_DATA = 0x30
XAD_FIFO_ALL_CONTROL_BITS_END_R_DATA = 32
XAD_FIFO_ALL_CONTROL_ADDR_END_R_CTRL = 0x34
XAD_FIFO_ALL_CONTROL_ADDR_COUNT_DATA = 0x40
XAD_FIFO_ALL_CONTROL_BITS_COUNT_DATA = 32
XAD_FIFO_ALL_CONTROL_ADDR_COUNT_CTRL = 0x44
XAD_FIFO_ALL_CONTROL_ADDR_SUCCESS_DATA = 0x50
XAD_FIFO_ALL_CONTROL_BITS_SUCCESS_DATA = 32
XAD_FIFO_ALL_CONTROL_ADDR_SUCCESS_CTRL = 0x54


class AdAll(DefaultIP):
    def __init__(self, description):
        super().__init__(description=description)

    bindto = ['chiro.work:ChiIP:ad_fifo_all:0.32']

    def start(self):
        data = self.read(XAD_FIFO_ALL_CONTROL_ADDR_AP_CTRL) & 0x80
        self.write(XAD_FIFO_ALL_CONTROL_ADDR_AP_CTRL, data | 0x01)

    def is_done(self):
        data = self.read(XAD_FIFO_ALL_CONTROL_ADDR_AP_CTRL);
        return ((data >> 1) & 0x1 != 0);
    
    def is_idle(self):
        data = self.read(XAD_FIFO_ALL_CONTROL_ADDR_AP_CTRL);
        return ((data >> 2) & 0x1 != 0);

    def is_ready(self):
        data = self.read(XAD_FIFO_ALL_CONTROL_ADDR_AP_CTRL);
        # check ap_start to see if the pcore is ready for next input
        return ((data & 0x1) == 0);

    def set_values(self, div=0, bit=12):
        threshold = ((1 << bit) - 1) // 2
        self.write(XAD_FIFO_ALL_CONTROL_ADDR_DIV_DATA, div)
#         print('threshold', threshold)
        self.write(XAD_FIFO_ALL_CONTROL_ADDR_THRESHOLD_DATA, threshold)
    
    def get_values(self):
        start = self.read(XAD_FIFO_ALL_CONTROL_ADDR_START_R_DATA)
        end = self.read(XAD_FIFO_ALL_CONTROL_ADDR_END_R_DATA)
        count = self.read(XAD_FIFO_ALL_CONTROL_ADDR_COUNT_DATA)
        success = self.read(XAD_FIFO_ALL_CONTROL_ADDR_SUCCESS_DATA)
        return {
            'start': start,
            'end': end,
            'count': count,
            'success': success
        }


def test(overlay):
    wait = 0
    while not overlay.ad_fifo_all_0.is_ready():
        time.sleep(1)
        print('waiting ready', wait, bin(overlay.ad_fifo_all_0.read(XAD_FIFO_ALL_CONTROL_ADDR_AP_CTRL)))
        wait += 1
    overlay.ad_fifo_all_0.set_values(div=0, bit=10)
    overlay.ad_fifo_all_0.start()
    wait = 1
    while not overlay.ad_fifo_all_0.is_done():
        time.sleep(1)
        print('waiting done', wait, bin(overlay.ad_fifo_all_0.read(XAD_FIFO_ALL_CONTROL_ADDR_AP_CTRL)))
        wait += 1
    values = overlay.ad_fifo_all_0.get_values()
    return values


def download():
    overlay = Overlay("bits/ad_all/ad_all.bit")
    overlay.download()
    return overlay

def reverse_bits(n, bit=32):
    return int((((f"%{bit}s") % (bin(n)[2:])).replace(' ', '0'))[::-1], base=2)

def show_wave(data: BytesIO, values: dict, depth: int=128):
    data.seek(0)
    for i in range(depth):
        d = data.read(2)
        if (len(d) != 2):
            break
        val = int().from_bytes(d, byteorder='big', signed=False)
        if i == values["start"]:
            print("/" + "=" * 18 + '\\')
        if i == values["end"]:
            print("\\" + "=" * 18 + '/')
        print("%-20s" % ("#" * int(20 * (val / 2048))), "%6s" % hex(val), ("%13s" % bin(val)[2:]).replace(' ', '0'))


def run_test(overlay, depth: int=0xFF):
    values = test(overlay)
    print(values)
#     for i in range(0xFF):
#         src = overlay.axi_bram_ctrl_out_r.read(i * 4)
#         val = reverse_bits(src, 12)
#         # print("%20s" % bin(val)[2:])
#         if i == values["start"]:
#             print("/" + "=" * 18 + '\\')
#         if i == values["end"]:
#             print("\\" + "=" * 18 + '/')
#         print("%-20s" % ("#" * int(20 * (val / 2048))), "%6s" % hex(val), "%6s" % hex(src), ("%13s" % bin(val)[2:]).replace(' ', '0'), ("%13s" % bin(src)[2:]).replace(' ', '0'))
    data = BytesIO()
    for i in range(0xFF):
        src = overlay.axi_bram_ctrl_out_r.read(i * 4)
        val = reverse_bits(src, 12)
        data.write(int(val).to_bytes(length=2, byteorder='big', signed=False))
        if (i % 0xFF == 0):
            print('.', end='')
    print('Read done.')
    data.seek(0)
    values = find_wave(data)
    print('values', values)
    show_wave(data, values)
    

def find_wave(data: BytesIO, depth: int=0xFF, threshold: int=-1) -> dict:
    max_val, min_val = 0, 0xFFFFFFFF
    
    values = {
        "start": 0,
        "end": 0,
        "count": 0
    }
    
    data.seek(0)
    dataset = []
    for _ in range(depth):
        d = data.read(2)
        if (len(d) != 2):
            break
        val = int().from_bytes(d, byteorder='big', signed=False)
        dataset.append(val)
    
    max_val, min_val = max(dataset), min(dataset)
    if (threshold < 0):
        threshold = (max_val + min_val) / 2
    
    start, middle, end = 0, 0, 0
    found = False
    for i in range(depth - 1):
        if dataset[i] <= threshold and dataset[i + 1] >= threshold:
            found = True
            start = i
            break
    if not found:
        return values
    values.update({'start': start})
    found = False
    for i in range(start - 1, depth - 1, 1):
        if dataset[i] >= threshold and dataset[i + 1] <= threshold:
            found = True
            middle = i
            break
    if not found:
        return values
    found = False
    for i in range(middle - 1, depth - 1, 1):
        if dataset[i] <= threshold and dataset[i + 1] >= threshold:
            found = True
            end = i
            break
    if not found:
        return values
    values.update({'end': end, 'count': end - start})
    return values
    
    

def main():
    overlay = download()
    print("Download done!")
    run_test(overlay)
    # help(overlay.axi_bram_ctrl_out_r.mmio)


main()


Download done!
waiting done 1 0b1110
waiting done 2 0b100
waiting done 3 0b100
waiting done 4 0b100
waiting done 5 0b100
waiting done 6 0b100


KeyboardInterrupt: 