Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement MultiManager and low level H5Dread_multi call #2351

Open
wants to merge 17 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions docs/high/dataset.rst
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,55 @@ The dtype of the dataset can be accessed via ``<dset>.dtype`` as per normal.
As empty datasets cannot be sliced, some methods of datasets such as
``read_direct`` will raise a ``TypeError`` exception if used on a empty dataset.

Reading and Writing to Multiple Datasets
------------------------------------------------------------
The MultiManager interface enables reading and writing to multiple datasets
through HDF5's ``H5Dread_multi/H5Dwrite_multi`` API using numpy operations.
A MultiManager requires a list of datasets to operate on, and then accepts
slice arguments for reading and writing like a typical Dataset.

Performing operations through a MultiManager allows the library to in some cases
improve performance by providing information about the entire I/O operation to the
active file driver.

Reading datasets through a MultiManager returns a list where each entry is an array containing
the values read from the corresponding data.

>>> mm = MultiManager(datasets=[dset1, dset2, dset3])
>>> data = mm[...] # read all elements from each dataset
>>> data[0] # data read from dset1
[0, 1, 2, 3]
>>> data[1] # data read from dset2
[0, 2, 3, 4]

Writing to datasets through a MultiManager requires a list where each entry is an array containing
the values to be written to each dataset.

>>> mm[0] = [[1], [2], [3]] # write a different element to index 0 in each dataset
>>> data = mm[...]
>>> data[0]
[1, 1, 2, 3]
>>> data[1]
[2, 2, 3, 4]

It is required that the slicing arguments select the same number of regions
on each Dataset in the MultiManager.

Multiple selections can be provided to read or write to a different region on each dataset in the MultiManager.

>>> selections = [np.s_[0:2], np.s_[1:4], np.s_[2:4]]
>>> data = mm[selections]
>>> data[0]
[1, 1]
>>> data[1]
[2, 3, 4]
>>> mm[selections] == [[0, 1], [4, 5, 6], [7, 8]]
>>> data = mm[...]
>>> data[0]
[0, 1, 2, 3]
>>> data[1]
[2, 4, 5, 6]

Reference
---------

Expand Down
2 changes: 1 addition & 1 deletion h5py/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@
registered_drivers,
)
from ._hl.group import Group, SoftLink, ExternalLink, HardLink
from ._hl.dataset import Dataset
from ._hl.dataset import Dataset, MultiManager
from ._hl.datatype import Datatype
from ._hl.attrs import AttributeManager
from ._hl.vds import VirtualSource, VirtualLayout
Expand Down
281 changes: 243 additions & 38 deletions h5py/_hl/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -760,6 +760,40 @@
and isinstance(self.id.get_type(), (h5t.TypeIntegerID, h5t.TypeFloatID))
)

def process_getitem_params(self, args, new_dtype=None):
"""Parse and construct arguments for a low-level Dataset read

This function returns a tuple of the form (mspace_id, fspace_id,
arr, mtype_id, selection), where
- mspace_id: A SpaceID describing the memory selection
- fspace_id: A SpaceID describing the file dataspace
- arr: A numpy array that will should be populated with read data
- mtype_id: A TypeID describing the memory datatype
- selection: A Selector object that describes the selection
"""
out_dtype = self.dtype if new_dtype is None else new_dtype

if self.shape == ():
# Initialize output buffer for scalar dataspace
fspace = self.id.get_space()
selection = sel2.select_read(fspace, args)
mspace = selection.mspace

if selection.mshape is None:
arr = numpy.zeros((), dtype=out_dtype)
else:
arr = numpy.zeros(selection.mshape, dtype=out_dtype)

else:
# Initialize output buffer for non-scalar dataspace
selection = sel.select(self.shape, args, dataset=self)
fspace = selection.id
mspace = h5s.create_simple(selection.mshape)
arr = numpy.zeros(selection.array_shape, out_dtype, order='C')

mtype = h5t.py_create(out_dtype)
return (mspace, fspace, arr, mtype, selection)

@with_phil
def __getitem__(self, args, new_dtype=None):
""" Read a slice from the HDF5 dataset.
Expand Down Expand Up @@ -831,54 +865,44 @@
if args == () or (len(args) == 1 and args[0] is Ellipsis):
return numpy.zeros(self.shape, dtype=new_dtype)

# === Scalar dataspaces =================

if self.shape == ():
fspace = self.id.get_space()
selection = sel2.select_read(fspace, args)
if selection.mshape is None:
arr = numpy.zeros((), dtype=new_dtype)
else:
arr = numpy.zeros(selection.mshape, dtype=new_dtype)
for mspace, fspace in selection:
self.id.read(mspace, fspace, arr, mtype)
if selection.mshape is None:
return arr[()]
return arr

# === Everything else ===================

# Perform the dataspace selection.
selection = sel.select(self.shape, args, dataset=self)

if selection.nselect == 0:
return numpy.zeros(selection.array_shape, dtype=new_dtype)
# Perform the dataspace selection
mspace, fspace, arr, mtype, selection =\
self.process_getitem_params(args, new_dtype)

arr = numpy.zeros(selection.array_shape, new_dtype, order='C')
if self.shape != ():
if selection.nselect == 0:
return numpy.zeros(selection.array_shape, dtype=new_dtype)

# Perform the actual read
mspace = h5s.create_simple(selection.mshape)
fspace = selection.id
self.id.read(mspace, fspace, arr, mtype, dxpl=self._dxpl)

# Patch up the output for NumPy
if arr.shape == ():
return arr[()] # 0 dim array -> numpy scalar
if (self.shape == () and selection.mshape is None)\
or (self.shape != () and arr.shape == ()):
return arr[()]

return arr

@with_phil
def __setitem__(self, args, val):
""" Write to the HDF5 dataset from a Numpy array.
def process_setitem_params(self, args, val, names=None):
"""Parse and construct arguments for a low-level Dataset write

NumPy's broadcasting rules are honored, for "simple" indexing
(slices and integers). For advanced indexing, the shapes must
match.
Performs setup operations during Dataset reads, potentially
including array type conversion, construction of memory types,
creating selections, string and array type conversion,
and scalar broadcasting.

If 'names' is not provided as a tuple of strings, then
names of target compound fields (if any) are extracted
from the slicing arguments.
"""
args = args if isinstance(args, tuple) else (args,)

# Sort field indices from the slicing
names = tuple(x for x in args if isinstance(x, str))
args = tuple(x for x in args if not isinstance(x, str))
if names is None:
names = tuple(x for x in args if isinstance(x, str))
args = tuple(x for x in args if not isinstance(x, str))
dtype = None

# Generally we try to avoid converting the arrays on the Python
# side. However, for compound literals this is unavoidable.
Expand Down Expand Up @@ -985,9 +1009,6 @@
# Perform the dataspace selection
selection = sel.select(self.shape, args, dataset=self)

if selection.nselect == 0:
return

# Broadcast scalars if necessary.
# In order to avoid slow broadcasting filling the destination by
# the scalar value, we create an intermediate array of the same
Expand All @@ -1014,8 +1035,22 @@
val = val2
mshape = val.shape

# Perform the write, with broadcasting
mspace = h5s.create_simple(selection.expand_shape(mshape))

return (val, mshape, mspace, mtype, selection)
@with_phil
def __setitem__(self, args, val):
""" Write to the HDF5 dataset from a Numpy array.

NumPy's broadcasting rules are honored, for "simple" indexing
(slices and integers). For advanced indexing, the shapes must
match.
"""
val, mshape, mspace, mtype, selection = self.process_setitem_params(args, val)
if selection.nselect == 0:
return

Check warning on line 1051 in h5py/_hl/dataset.py

View check run for this annotation

Codecov / codecov/patch

h5py/_hl/dataset.py#L1051

Added line #L1051 was not covered by tests

# Perform the write, with broadcasting
for fspace in selection.broadcast(mshape):
self.id.write(mspace, fspace, val, mtype, dxpl=self._dxpl)

Expand Down Expand Up @@ -1166,3 +1201,173 @@
Return ``False`` otherwise.
"""
return h5ds.is_scale(self._id)


class MultiManager():

"""
High-level object to support slicing operations
that map to H5Dread_multi/H5Dwrite_multi
"""
@with_phil
def __init__(self, datasets=None, dtypes=None, dxpl=None):
if (datasets is None) or (len(datasets) == 0):
raise ValueError("MultiManager requires non-empty list of datasets")

self.datasets = datasets
self.dxpl = dxpl

@with_phil
def __getitem__(self, args):
""" Read the same slice from each of the datasets
managed by this MultiManager.

Takes slices. Obeys basic NumPy rules, including broadcasting.

Will throw an error if any of the managed datasets are
unreadable due to being empty or zero-sized.
"""
count = len(self.datasets)

# Get slice arguments from the input
if (isinstance(args, tuple)):
slices = [args] * len(self.datasets)
elif (isinstance(args, list) and len(args) == 1):
# Use this single slice for all dsets
slices = [args[0]] * len(self.datasets)
elif isinstance(args, list):
if len(args) != len(self.datasets):
raise ValueError("Multi read requires a slice for each dataset")

Check warning on line 1240 in h5py/_hl/dataset.py

View check run for this annotation

Codecov / codecov/patch

h5py/_hl/dataset.py#L1240

Added line #L1240 was not covered by tests
slices = args.copy()
else:
slices = [args] * len(self.datasets)

# Wrap any Ellipsis selections as tuples
for i in range(len(slices)):
if slices[i] == Ellipsis:
slices[i] = (Ellipsis,)

args = args if isinstance(args, tuple) else (args,)

dtypes = [d.dtype for d in self.datasets]

out = [None] * count

for i in range(count):
if self.datasets[i]._is_empty:
raise ValueError("Multi read requires non-empty datasets")

for i in range(count):
if self.datasets[i].size == 0:
raise ValueError("Multi read requires non-zero-sized datasets")

Check warning on line 1262 in h5py/_hl/dataset.py

View check run for this annotation

Codecov / codecov/patch

h5py/_hl/dataset.py#L1262

Added line #L1262 was not covered by tests

# Sort field names from the rest of the args.
names = tuple(x for x in args if isinstance(x, str))

if names:
raise ValueError("Field subsetting not supported with multi read")

for slice in slices:
if isinstance(slice, h5r.RegionReference):
raise ValueError("Region references not supported with multi read")

fspaces = [None] * count
mspaces = [None] * count
selections = [None] * count
mtypes = [h5t.py_create(new_dtype) for new_dtype in dtypes]

# Get selections on each dataset
for i in range(count):
mspaces[i], fspaces[i], out[i], mtypes[i], selections[i] =\
self.datasets[i].process_getitem_params(slices[i])

# Perform the actual read_multi
fspace_ids = [f.id for f in fspaces]
mspace_ids = [m.id for m in mspaces]
type_ids = [t.id for t in mtypes]
dset_ids = [d.id.id for d in self.datasets]

h5d.rw_multi(dset_ids, mspace_ids, fspace_ids, type_ids, out, 1,
dxpl=None)

# Patch up the output for NumPy
for i in range(count):
if (selections[i].mshape is None) or (out[i].shape == ()):
out[i] = out[i][()] # 0 dim array -> numpy scalar

return out

@with_phil
def __setitem__(self, args, vals):
""" Write to the HDF5 datasets from a list of Numpy arrays.

NumPy's broadcasting rules are honored, for "simple" indexing
(slices and integers). For advanced indexing, the shapes must
match.
"""
args = args if isinstance(args, tuple) else (args,)

# Sort field indices from the slicing
names = tuple(x for x in args if isinstance(x, str))
args = tuple(x for x in args if not isinstance(x, str))
count = len(self.datasets)
mtypes = [None] * count
mshapes = [None] * count
mspaces = [None] * count
selections = [None] * count
dtypes = [d.dtype for d in self.datasets]

# Get slice arguments from the input
if (len(args) == 0):
# Use empty tuple for all selections
slices = [args] * len(self.datasets)
elif isinstance(args[0], list):
if len(args[0]) == 1:
# Use this single slice for all dsets
slices = [args[0][0]] * len(self.datasets)
elif len(args[0]) != len(self.datasets):
raise ValueError("Multi read requires a slice for each dataset")

Check warning on line 1329 in h5py/_hl/dataset.py

View check run for this annotation

Codecov / codecov/patch

h5py/_hl/dataset.py#L1329

Added line #L1329 was not covered by tests
else:
slices = args[0].copy()
else:
# Single selection given as non-list; broadcast to all dsets
slices = [args] * len(self.datasets)

# Wrap any Ellipsis selections as tuples
for i in range(len(slices)):
if slices[i] == Ellipsis:
slices[i] = (Ellipsis,)

Check warning on line 1339 in h5py/_hl/dataset.py

View check run for this annotation

Codecov / codecov/patch

h5py/_hl/dataset.py#L1339

Added line #L1339 was not covered by tests

# Generate selections, cast arrays, construct mtypes, and get dataspaces
for i in range(count):
vals[i], mshapes[i], mspaces[i], mtypes[i], selections[i] =\
self.datasets[i].process_setitem_params(slices[i], vals[i], names)

if any((selection.nselect == 0) for selection in selections):
raise ValueError("All writes in write multi must be non-zero")

Check warning on line 1347 in h5py/_hl/dataset.py

View check run for this annotation

Codecov / codecov/patch

h5py/_hl/dataset.py#L1347

Added line #L1347 was not covered by tests

# Set up broadcast selection iterators
fspace_gens = [None] * count

for i in range(count):
fspace_gens[i] = selections[i].broadcast(mshapes[i])
try:
fspace_ids = [next(f).id for f in fspace_gens]
except StopIteration:
raise ValueError("each dset needs at least 1 region selected")

Check warning on line 1357 in h5py/_hl/dataset.py

View check run for this annotation

Codecov / codecov/patch

h5py/_hl/dataset.py#L1356-L1357

Added lines #L1356 - L1357 were not covered by tests

dset_ids = [d.id.id for d in self.datasets]
if None in mtypes:
mtype_ids = [h5t.py_create(t) for t in dtypes]
else:
mtype_ids = [h5t.py_create(t) for t in mtypes]
mtype_hids = [t.id for t in mtype_ids]
mspace_ids = [ms.id for ms in mspaces]

while (fspace_ids[0] is not None):
h5d.rw_multi(dset_ids, mspace_ids, fspace_ids, mtype_hids, vals, 0, dxpl=self.dxpl)

try:
fspace_ids = [next(f).id for f in fspace_gens]
except StopIteration:
break
Loading