Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support numpy and float data types in MMIO read/write #1332

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 35 additions & 50 deletions pynq/mmio.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import warnings
import numpy as np
import pynq._3rdparty.tinynumpy as tnp
import struct

__author__ = "Yun Rock Qu"
__copyright__ = "Copyright 2016, Xilinx"
Expand All @@ -43,8 +44,9 @@ def __init__(self, baseaddress, device):
self.baseaddress = baseaddress
self.device = device

def read(self, offset, length):
return self.device.read_registers(self.baseaddress + offset, length)
def read(self, offset, length, **kwargs):
return self.device.read_registers(self.baseaddress + offset, length,
**kwargs)

def write(self, offset, data):
self.device.write_registers(self.baseaddress + offset, data)
Expand Down Expand Up @@ -95,19 +97,15 @@ def __init__(self, base_addr, length=4, device=None, **kwargs):
self.length = length

if self.device.has_capability('MEMORY_MAPPED'):
self.read = self.read
self.write = self.write_mm
self.array = self.device.mmap(base_addr, length)
elif self.device.has_capability('REGISTER_RW'):
self.read = self.read
self.write = self.write_reg
self._hook = _AccessHook(self.base_addr, self.device)
self.array = tnp.ndarray(shape=(length // 4,), dtype='u4',
hook=self._hook)
else:
raise ValueError("Device does not have capabilities for MMIO")

def read(self, offset=0, length=4, word_order='little'):
def read(self, offset=0, length=4, **kwargs):
"""The method to read data from MMIO.

For the `word_order` parameter, it is only effective when
Expand All @@ -126,13 +124,19 @@ def read(self, offset=0, length=4, word_order='little'):
The length of the data in bytes.
word_order : str
The word order of the 8-byte reads.
data_type : str
Interpret read value as data_type, either int or float.

Returns
-------
list
A list of data read out from MMIO

"""

word_order = kwargs.get('word_order', 'little')
data_type = kwargs.get('data_type', 'int')

if length not in [1, 2, 4, 8]:
raise ValueError("MMIO currently only supports "
"1, 2, 4 and 8-byte reads.")
Expand All @@ -143,25 +147,29 @@ def read(self, offset=0, length=4, word_order='little'):
idx = offset >> 2
if offset % 4:
raise MemoryError('Unaligned read: offset must be multiple of 4.')
if data_type == 'float' and length != 4:
raise ValueError("reading floating point is only valid when "
"length is equal to 4")

# Read data out
lsb = int(self.array[idx])
if length == 8:
if word_order == 'little':
return ((int(self.array[idx+1])) << 32) + lsb
else:
return (lsb << 32) + int(self.array[idx+1])
elif data_type == 'float':
return float(struct.unpack('!f', lsb.to_bytes(4, 'big'))[0])
else:
return lsb & ((2**(8*length)) - 1)

def write_mm(self, offset, data):
def write(self, offset, data):
"""The method to write data to MMIO.

Parameters
----------
offset : int
The write offset from the MMIO base address.
data : int / bytes
data : int, float, bytes
The integer(s) to be written into MMIO.

Returns
Expand All @@ -176,45 +184,22 @@ def write_mm(self, offset, data):
if offset % 4:
raise MemoryError('Unaligned write: offset must be multiple of 4.')

if type(data) is int:
if isinstance(data, (int, np.int32, np.uint32)):
self.array[idx] = np.uint32(data)
elif type(data) is bytes:
length = len(data)
num_words = length >> 2
if length % 4:
raise MemoryError(
'Unaligned write: data length must be multiple of 4.')
buf = np.frombuffer(data, np.uint32, num_words, 0)
for i in range(len(buf)):
self.array[idx + i] = buf[i]
else:
raise ValueError("Data type must be int or bytes.")

def write_reg(self, offset, data):
"""The method to write data to MMIO.

Parameters
----------
offset : int
The write offset from the MMIO base address.
data : int / bytes
The integer(s) to be written into MMIO.

Returns
-------
None

"""
if offset < 0:
raise ValueError("Offset cannot be negative.")

idx = offset >> 2
if offset % 4:
raise MemoryError('Unaligned write: offset must be multiple of 4.')

if type(data) is int:
self.array[idx] = data
elif type(data) is bytes:
self._hook.write(offset, data)
elif isinstance(data, (float, np.single, np.float32)):
data = int.from_bytes(struct.pack('f', np.single(data)), 'little')
self.array[idx] = np.uint32(data)
elif isinstance(data, bytes):
if self.device.has_capability('REGISTER_RW'):
self._hook.write(offset, data)
else:
length = len(data)
num_words = length >> 2
if length % 4:
raise MemoryError(
'Unaligned write: data length must be multiple of 4.')
buf = np.frombuffer(data, np.uint32, num_words, 0)
for i in range(len(buf)):
self.array[idx + i] = buf[i]
else:
raise ValueError("Data type must be int or bytes.")
raise ValueError("Data type must be int, uint, float or bytes.")
6 changes: 4 additions & 2 deletions pynq/overlay.py
Original file line number Diff line number Diff line change
Expand Up @@ -828,16 +828,18 @@ def _start_ert(self, *args, waitfor=(), **kwargs):
else:
return self.device.execute_bo(bo)

def read(self, offset=0):
def read(self, offset=0, length=4, **kwargs):
"""Read from the MMIO device

Parameters
----------
offset : int
Address to read
length : int
The length of the data in bytes

"""
return self.mmio.read(offset)
return self.mmio.read(offset, length, **kwargs)

def write(self, offset, value):
"""Write to the MMIO device
Expand Down
106 changes: 93 additions & 13 deletions tests/test_mmio.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,41 @@
import warnings
import struct
import pytest
# Copyright (c) 2022, Xilinx, Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
#
# 3. Neither the name of the copyright holder nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
# THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
# OR BUSINESS INTERRUPTION). HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
# WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
# OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
# ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

import numpy as np
import pynq
import pytest
import struct
import warnings

__author__ = "Peter Ogden, Mario Ruiz"
__copyright__ = "Copyright 2022, Xilinx"
__email__ = "pynq_support@xilinx.com"


from .mock_devices import MockDeviceBase, MockRegisterDevice
Expand All @@ -22,6 +56,15 @@
(BASE_ADDRESS + 8, struct.pack('I', 0xbeefcafe))
]

TEST_DATA_NUMPY = [
(300, np.uint32(53970361), int(53970361).to_bytes(4, 'little')),
(256, -14921033, int(np.uint32(-14921033)).to_bytes(4, 'little'))
]

TEST_DATA_FLOAT = [
(8, 98.576, int(0x42c526e9).to_bytes(4, 'little')),
(48, np.single(-32.587), int(0xc2025917).to_bytes(4, 'little'))
]

@pytest.fixture
def register_device():
Expand Down Expand Up @@ -140,18 +183,11 @@ def test_unsupported_length_read(device):
def test_bad_endianness_read(device):
mmio = pynq.MMIO(BASE_ADDRESS, ADDR_RANGE, device=device)
with pytest.raises(ValueError) as excinfo:
mmio.read(4, 8, 'middle')
mmio.read(4, 8, word_order='middle')
assert str(excinfo.value) == \
"MMIO only supports big and little endian."


def test_float_write(device):
mmio = pynq.MMIO(BASE_ADDRESS, ADDR_RANGE, device=device)
with pytest.raises(ValueError) as excinfo:
mmio.write(4, 8.5)
assert str(excinfo.value) == "Data type must be int or bytes."


def test_oob_write(device):
mmio = pynq.MMIO(BASE_ADDRESS, ADDR_RANGE, device=device)
with pytest.raises(IndexError):
Expand Down Expand Up @@ -203,7 +239,7 @@ def test_8byte_littleendian_read(register_device):
pynq.Device.active_device = device
mmio = pynq.MMIO(BASE_ADDRESS, ADDR_RANGE)
with device.check_transactions(TEST_READ_DATA, []):
read = mmio.read(4, 8, 'little')
read = mmio.read(4, 8, word_order='little')
assert read == 0xbeefcafe12345678
assert mmio.device == device
pynq.Device.active_device = None
Expand All @@ -214,7 +250,7 @@ def test_8byte_bigendian_read(register_device):
pynq.Device.active_device = device
mmio = pynq.MMIO(BASE_ADDRESS, ADDR_RANGE)
with device.check_transactions(TEST_READ_DATA, []):
read = mmio.read(4, 8, 'big')
read = mmio.read(4, 8, word_order='big')
assert read == 0x12345678beefcafe
assert mmio.device == device
pynq.Device.active_device = None
Expand All @@ -240,3 +276,47 @@ def test_deprecated_debug_keyword(mmap_device):
'Warning is not of type DeprecationWarning'
assert "debug" in str(w[-1].message), \
'Warning not related to keyword debug'


@pytest.mark.parametrize('transaction', TEST_DATA_NUMPY)
def test_reg_write_numpy(transaction, register_device):
offset, pyobj, bytesobj = transaction
device = register_device
mmio = pynq.MMIO(BASE_ADDRESS, ADDR_RANGE, device=device)
with device.check_transactions([], [(BASE_ADDRESS+offset, bytesobj)]):
mmio.write(offset, pyobj)


@pytest.mark.parametrize('transaction', TEST_DATA_FLOAT)
def test_reg_write_float(transaction, register_device):
offset, pyobj, bytesobj = transaction
device = register_device
mmio = pynq.MMIO(BASE_ADDRESS, ADDR_RANGE, device=device)
with device.check_transactions([], [(BASE_ADDRESS+offset, bytesobj)]):
mmio.write(offset, pyobj)


def test_reg_read_float(mmap_device):
device = mmap_device
mmio = pynq.MMIO(BASE_ADDRESS, ADDR_RANGE, device=device)
offset, testdata = (956, 102.687)
mmio.write(offset, np.single(testdata))
read = mmio.read(offset, data_type='float')
assert np.isclose(read, testdata, rtol=1e-05, atol=1e-08, equal_nan=False)


def test_reg_read_float_len8(mmap_device):
device = mmap_device
mmio = pynq.MMIO(BASE_ADDRESS, ADDR_RANGE, device=device)
with pytest.raises(ValueError) as excinfo:
read = mmio.read(4, 8, data_type='float')
assert str(excinfo.value) == \
"reading floating point is only valid when length is equal to 4"


def test_reg_write_unsupported_type(mmap_device):
device = mmap_device
mmio = pynq.MMIO(BASE_ADDRESS, ADDR_RANGE, device=device)
with pytest.raises(ValueError) as excinfo:
mmio.write(4, np.half(20.22))
assert str(excinfo.value) == "Data type must be int, uint, float or bytes."