In [None]:
import time
import os
import ipywidgets as widgets
from enum import Enum
from pynq import Overlay, allocate
from IPython.display import display, clear_output

# Load bitstream to overlay
BITSTREAM_PATH = '../overlay/demo.bit'

if not os.path.exists(BITSTREAM_PATH):
    raise FileNotFoundError(f'Bitstream not found at {BITSTREAM_PATH}. Run \'make bitstream\'.')

print(f'Loading overlay: {BITSTREAM_PATH}...')
ol = Overlay(BITSTREAM_PATH)

# Assign GPIO
host_wdata = ol.gpio_wdata.channel1
host_rdata = ol.gpio_rdata.channel1
host_addr = ol.gpio_addr.channel1
host_ctrl = ol.gpio_ctrl.channel1
host_status = ol.gpio_ctrl.channel2

# Allocate 1024 * 4B buffer
buffer = allocate(shape=(1024,), dtype='u4')
buffer_addr = buffer.physical_address

print(f'DDR buffer allocated at {hex(buffer_addr)}')

# Helper enums
class MemOp(int, Enum):
    IDLE = 0
    WRITE = 1
    READ = 2

class MemSize(int, Enum):
    BYTE = 0
    HALF = 1
    WORD = 2
    DWORD = 3

    @staticmethod
    def from_bit_count(bit_count: int) -> 'MemSize':
        bits_to_size = {
            8: MemSize.BYTE,
            16: MemSize.HALF,
            32: MemSize.WORD,
            64: MemSize.DWORD,
        }
        return bits_to_size[bit_count]

class MemOpStatus(Enum):
    NONE = 'none'
    DONE = 'done'
    WAIT = 'wait'
    ERROR = 'error'
    INVALID = 'invalid'

    @staticmethod
    def from_reg(status_reg: int) -> 'MemOpStatus':
        if (status_reg >> 3) & 0x1:
            return MemOpStatus.INVALID
        elif (status_reg >> 2) & 0x1:
            return MemOpStatus.ERROR
        elif (status_reg >> 1) & 0x1:
            return MemOpStatus.DONE
        elif status_reg & 0x1:
            return MemOpStatus.WAIT
        else:
            return MemOpStatus.NONE

class MemOpResult:
    def __init__(self, status: MemOpStatus, data: int) -> None:
        self.status = status
        self.data = data

# Helper functions
def _make_ctrl(clear: bool = False, cmd: MemOp = MemOp.IDLE, size: MemSize = MemSize.BYTE) -> int:
    # [5:5]=Clear, [4:3]=RW, [2:0]=Size, start transaction
    return (int(clear) << 5) | ((cmd & 0x3) << 3) | (size & 0x7)

def _execute(cmd: MemOp, size: MemSize, addr: int, data: int = 0) -> MemOpResult:
    """Execute raw transfer"""
    host_addr.write(addr)

    if cmd == MemOp.WRITE:
        host_wdata.write(data & 0xFFFFFFFF)

    ctrl_val = _make_ctrl(False, cmd, size)
    host_ctrl.write(ctrl_val)

    status = 0
    for _ in range(100):
        status = host_status.read()
        done = (status >> 1) & 0x1
        if done:
            break
        time.sleep(0.001)

    # Return to idle
    host_ctrl.write(_make_ctrl())

    # Read result
    result_data = 0
    if cmd == MemOp.READ:
        result_data = host_rdata.read()

    return MemOpResult(MemOpStatus.from_reg(status), result_data)

def read(size: MemSize, addr: int) -> MemOpResult:
    return _execute(MemOp.READ, size, addr)

def write(size: MemSize, addr: int, data: int) -> MemOpStatus:
    return _execute(MemOp.WRITE, size, addr, data).status

def clear() -> None:
    host_ctrl.write(_make_ctrl(clear=True))
    time.sleep(0.01)
    host_ctrl.write(0)

# Widgets
w_addr = widgets.Text(value=hex(buffer_addr), description='Addr (Hex):')
w_data = widgets.Text(value='0x12345678', description='Data (Hex):')
w_size = widgets.Dropdown(
    options=[
        ('Byte (8b)', MemSize.BYTE),
        ('Half (16b)', MemSize.HALF),
        ('Word (32b)', MemSize.WORD),
        ('Dword (64b)', MemSize.DWORD),
    ],
    value=MemSize.WORD,
    description="Size:"
)
w_log = widgets.Output(layout={
    'border': '1px solid black',
    'height': '150px',
    'overflow_y': 'scroll',
})

btn_write = widgets.Button(description='WRITE', button_style='warning')
btn_read = widgets.Button(description='READ', button_style='success')
btn_check = widgets.Button(description='Check', button_style='info')

# Callbacks
def on_write_click(_):
    with w_log:
        try:
            addr = int(w_addr.value, 16)
            data = int(w_data.value, 16)
            size = w_size.value

            print(f'> WRITE: Addr={hex(addr)} Data={hex(data)} Size={size}')

            status = write(size, addr, data)

            if status != MemOpStatus.DONE:
                print('  RESULT: ERROR')
            else:
                print('  RESULT: DONE')

        except Exception as e:
            print(f'  ERROR: {e}')

def on_read_click(_):
    with w_log:
        try:
            addr = int(w_addr.value, 16)
            size = w_size.value

            print(f'> READ: Addr={hex(addr)} Size={size}')

            buffer.invalidate()

            result = read(size, addr)

            if result.status != MemOpStatus.DONE:
                print('  RESULT: ERROR')
            else:
                print(f'  RESULT: Data = {hex(result.data)}')

        except Exception as e:
            print(f'  ERROR: {e}')

def on_check(b):
    with w_log:
        print(f'[Debug] Buffer[0] = {hex(buffer[0])}')
        print(f'[Debug] Buffer[1] = {hex(buffer[1])}')

btn_write.on_click(on_write_click)
btn_read.on_click(on_read_click)
btn_check.on_click(on_check)

# Display
ui = widgets.VBox(
    [
        widgets.HTML('<h3>AXI Master Demo</h3>'),
        w_addr,
        w_data,
        w_size,
        widgets.HBox(
            [btn_write, btn_read, btn_check]
        ),
        w_log
    ]
)

clear()
display(ui)
