Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement MultiManager and low level H5Dread_multi call #2351

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions h5py/_proxy.pxd
Expand Up @@ -12,7 +12,7 @@ from .defs cimport *

cdef herr_t attr_rw(hid_t attr, hid_t mtype, void *progbuf, int read) except -1

cdef herr_t dset_rw(hid_t dset, hid_t mtype, hid_t mspace, hid_t fspace,
hid_t dxpl, void* progbuf, int read) except -1
cdef herr_t dset_rw(size_t count, hid_t* _dset, hid_t* _mtype, hid_t* _mspace, hid_t* _fspace,
hid_t dxpl, void **progbuf, int read) except -1

cdef htri_t needs_bkg_buffer(hid_t src, hid_t dst) except -1
206 changes: 149 additions & 57 deletions h5py/_proxy.pyx
Expand Up @@ -81,85 +81,177 @@ cdef herr_t attr_rw(hid_t attr, hid_t mtype, void *progbuf, int read) except -1:
# =============================================================================
# Proxy for vlen buf workaround

cdef herr_t dset_rw(size_t count, hid_t* dset, hid_t* mtype, hid_t* mspace, hid_t* _fspace,
hid_t dxpl, void **progbuf, int read) except -1:

cdef herr_t dset_rw(hid_t dset, hid_t mtype, hid_t mspace, hid_t fspace,
hid_t dxpl, void* progbuf, int read) except -1:

cdef htri_t need_bkg
cdef hid_t dstype = -1 # Dataset datatype
cdef hid_t plist_id = -1
cdef hid_t rawdstype = -1
cdef hid_t dspace = -1 # Dataset dataspace
cdef hid_t cspace = -1 # Temporary contiguous dataspaces

cdef void* back_buf = NULL
cdef void* conv_buf = NULL
cdef hsize_t npoints
cdef hid_t* dstype = NULL # Dataset datatype
cdef hid_t* cspace = NULL # Temporary contiguous dataspaces
cdef hid_t* mspace_tmp = NULL
cdef hid_t* fspace_tmp = NULL

cdef htri_t* need_bkg = NULL

cdef void** back_buf = NULL
cdef void** conv_buf = NULL

cdef hsize_t* npoints = NULL

cdef bint rw_needs_proxy = False

try:
# Make local list of mem/file spaces which may be freely modified
mspace_tmp = <hid_t*>malloc(sizeof(hid_t*) * count)
fspace_tmp = <hid_t*>malloc(sizeof(hid_t*) * count)
dstype = <hid_t*> malloc(sizeof(hid_t*) * count)

for i in range(count):
mspace_tmp[i] = mspace[i]
fspace_tmp[i] = _fspace[i]

# Issue 372: when a compound type is involved, using the dataset type
# may result in uninitialized data being sent to H5Tconvert for fields
# not present in the memory type. Limit the type used for the dataset
# to only those fields present in the memory type. We can't use the
# memory type directly because of course that triggers HDFFV-1063.
if (H5Tget_class(mtype) == H5T_COMPOUND) and (not read):
rawdstype = H5Dget_type(dset)
dstype = make_reduced_type(mtype, rawdstype)
H5Tclose(rawdstype)
else:
dstype = H5Dget_type(dset)

if not (needs_proxy(dstype) or needs_proxy(mtype)):
if read:
H5Dread(dset, mtype, mspace, fspace, dxpl, progbuf)
for i in range(count):
if (H5Tget_class(mtype[i]) == H5T_COMPOUND) and (not read):
rawdstype = H5Dget_type(dset[i])
dstype[i] = make_reduced_type(mtype[i], rawdstype)
H5Tclose(rawdstype)
rawdstype = -1
else:
H5Dwrite(dset, mtype, mspace, fspace, dxpl, progbuf)
else:

if mspace == H5S_ALL and fspace != H5S_ALL:
mspace = fspace
elif mspace != H5S_ALL and fspace == H5S_ALL:
fspace = mspace
elif mspace == H5S_ALL and fspace == H5S_ALL:
fspace = mspace = dspace = H5Dget_space(dset)
dstype[i] = H5Dget_type(dset[i])

npoints = H5Sget_select_npoints(mspace)
cspace = H5Screate_simple(1, &npoints, NULL)
rw_needs_proxy = rw_needs_proxy or (needs_proxy(dstype[i]) or needs_proxy(mtype[i]))

conv_buf = create_buffer(H5Tget_size(dstype), H5Tget_size(mtype), npoints)

# Only create a (contiguous) backing buffer if absolutely
# necessary. Note this buffer always has memory type.
if not rw_needs_proxy:
if read:
need_bkg = needs_bkg_buffer(dstype, mtype)
if count > 1:
IF HDF5_VERSION >= (1,14,0):
H5Dread_multi(count, <hid_t*> dset, <hid_t*>mtype, <hid_t*> mspace_tmp, <hid_t*>fspace_tmp, dxpl, progbuf)
ELSE:
raise Exception(
f"read_multi requires HDF5 >= 1.14.0 (got version "
f"{HDF5_VERSION} from environment variable or library)"
)
else:
H5Dread(dset[0], mtype[0], mspace_tmp[0], fspace_tmp[0], dxpl, <void*>progbuf[0])
else:
need_bkg = needs_bkg_buffer(mtype, dstype)
if need_bkg:
back_buf = create_buffer(H5Tget_size(dstype), H5Tget_size(mtype), npoints)
if count > 1:
IF HDF5_VERSION >= (1,14,0):
H5Dwrite_multi(count, <hid_t*> dset, <hid_t*>mtype, <hid_t*> mspace_tmp, <hid_t*>fspace_tmp, dxpl, <const void**> progbuf)
ELSE:
raise Exception(
f"write_multi requires HDF5 >= 1.14.0 (got version "
f"{HDF5_VERSION} from environment variable or library)"
)
else:
H5Dwrite(dset[0], mtype[0],mspace_tmp[0], fspace_tmp[0], dxpl, <void*>progbuf[0])
else:
cspace = <hid_t*> malloc(sizeof(hid_t*) * count)
need_bkg = <htri_t*> malloc(sizeof(htri_t) * count)
back_buf = <void**> malloc(sizeof(void*) * count)
conv_buf = <void**> malloc(sizeof(void*) * count)
npoints = <hsize_t*> malloc(sizeof(hsize_t) * count)

for i in range(count):
back_buf[i] = NULL
conv_buf[i] = NULL

for i in range(count):
if mspace_tmp[i] == H5S_ALL and fspace_tmp[i] != H5S_ALL:
mspace_tmp[i] = fspace_tmp[i]
elif mspace_tmp[i] != H5S_ALL and fspace_tmp[i] == H5S_ALL:
fspace_tmp[i] = mspace_tmp[i]
elif mspace_tmp[i] == H5S_ALL and fspace_tmp[i] == H5S_ALL:
mspace_tmp[i] = fspace_tmp[i] = H5Dget_space(dset[i])

npoints[i] = H5Sget_select_npoints(mspace_tmp[i])
cspace[i] = H5Screate_simple(1, <hsize_t*> &npoints[i], NULL)

conv_buf[i] = create_buffer(H5Tget_size(dstype[i]), H5Tget_size(mtype[i]), npoints[i])

# Only create a (contiguous) backing buffer if absolutely
# necessary. Note this buffer always has memory type.
if read:
h5py_copy(mtype, mspace, back_buf, progbuf, H5PY_GATHER)
need_bkg[i] = needs_bkg_buffer(dstype[i], mtype[i])
else:
need_bkg[i] = needs_bkg_buffer(mtype[i], dstype[i])

if need_bkg[i]:
back_buf[i] = create_buffer(H5Tget_size(dstype[i]), H5Tget_size(mtype[i]), npoints[i])
if read:
h5py_copy(mtype[i], mspace_tmp[i], <void*> back_buf[i], <void*>progbuf[i], H5PY_GATHER)

if read:
H5Dread(dset, dstype, cspace, fspace, dxpl, conv_buf)
H5Tconvert(dstype, mtype, npoints, conv_buf, back_buf, dxpl)
h5py_copy(mtype, mspace, conv_buf, progbuf, H5PY_SCATTER)
if count > 1:
IF HDF5_VERSION >= (1,14,0):
H5Dread_multi(count, <hid_t*> dset, <hid_t*>mtype, <hid_t*> mspace_tmp, <hid_t*>fspace_tmp, dxpl, conv_buf)
ELSE:
raise Exception(
f"read_multi requires HDF5 >= 1.14.0 (got version "
f"{HDF5_VERSION} from environment variable or library)"
)
else:
H5Dread(dset[0], dstype[0], cspace[0], fspace_tmp[0], dxpl, <void*> conv_buf[0])

for i in range(count):
H5Tconvert(dstype[i], mtype[i], npoints[i], <void*> conv_buf[i], <void*> back_buf[i], dxpl)
h5py_copy(mtype[i], mspace_tmp[i], <void*> conv_buf[i], <void*>progbuf[i], H5PY_SCATTER)
else:
h5py_copy(mtype, mspace, conv_buf, progbuf, H5PY_GATHER)
H5Tconvert(mtype, dstype, npoints, conv_buf, back_buf, dxpl)
H5Dwrite(dset, dstype, cspace, fspace, dxpl, conv_buf)
H5Dvlen_reclaim(dstype, cspace, H5P_DEFAULT, conv_buf)
for i in range(count):
h5py_copy(mtype[i], mspace_tmp[i], <void*> conv_buf[i], <void*>progbuf[i], H5PY_GATHER)
H5Tconvert(mtype[i], dstype[i], npoints[i], <void*> conv_buf[i], <void*> back_buf[i], dxpl)

if count > 1:
IF HDF5_VERSION >= (1,14,0):
H5Dwrite_multi(count, <hid_t*>dset, <hid_t*>dstype, <hid_t*>cspace, <hid_t*>fspace_tmp, dxpl, <const void**> conv_buf)
ELSE:
raise Exception(
f"write_multi requires HDF5 >= 1.14.0 (got version "
f"{HDF5_VERSION} from environment variable or library)"
)
else:
H5Dwrite(dset[0], dstype[0], cspace[0], fspace_tmp[0], dxpl, <void*> conv_buf[0])

for i in range(count):
H5Dvlen_reclaim(dstype[i], cspace[i], H5P_DEFAULT, <void*> conv_buf[i])

finally:
free(back_buf)
free(conv_buf)
if dstype > 0:
H5Tclose(dstype)
if dspace > 0:
H5Sclose(dspace)
if cspace > 0:
H5Sclose(cspace)

return 0

for i in range(count):
if (back_buf != NULL) and (need_bkg[i]) and (back_buf[i] != NULL):
free(back_buf[i])

if (conv_buf != NULL) and (conv_buf[i] != NULL):
free(conv_buf[i])

if cspace and (cspace[i] > 0):
H5Sclose(cspace[i])
if dstype and (dstype[i] > 0):
H5Tclose(dstype[i])

if mspace_tmp != NULL:
free(mspace_tmp)
if fspace_tmp != NULL:
free(fspace_tmp)

if npoints != NULL:
free(npoints)
if need_bkg != NULL:
free(need_bkg)
if back_buf != NULL:
free(back_buf)
if conv_buf != NULL:
free(conv_buf)
if cspace != NULL:
free(cspace)

if rawdstype > 0:
H5Tclose(rawdstype)

cdef hid_t make_reduced_type(hid_t mtype, hid_t dstype):
# Go through dstype, pick out the fields which also appear in mtype, and
Expand Down
3 changes: 3 additions & 0 deletions h5py/api_functions.txt
Expand Up @@ -99,6 +99,9 @@ hdf5:
herr_t H5Dread(hid_t dset_id, hid_t mem_type_id, hid_t mem_space_id, hid_t file_space_id, hid_t plist_id, void *buf) nogil
herr_t H5Dwrite(hid_t dset_id, hid_t mem_type, hid_t mem_space, hid_t file_space, hid_t xfer_plist, void* buf) nogil

1.14.0-1.16.99 herr_t H5Dread_multi(size_t count, hid_t *dset_id, hid_t *mem_type_id, hid_t *mem_space_id, hid_t *file_space_id, hid_t dxpl_id, void **buf)
1.14.0-1.16.99 herr_t H5Dwrite_multi(size_t count, hid_t *dset_id, hid_t *mem_type_id, hid_t *mem_space_id, hid_t *file_space_id, hid_t dxpl_id, const void **buf)

herr_t H5Dextend(hid_t dataset_id, hsize_t *size) nogil

herr_t H5Dfill(void *fill, hid_t fill_type_id, void *buf, hid_t buf_type_id, hid_t space_id) nogil
Expand Down
95 changes: 79 additions & 16 deletions h5py/h5d.pyx
Expand Up @@ -151,6 +151,55 @@ def open(ObjectID loc not None, char* name, PropID dapl=None):
"""
return DatasetID(H5Dopen(loc.id, name, pdefault(dapl)))

IF HDF5_VERSION >= (1, 14, 0):
@with_phil
def read_multi(count, list dataset_ids, list mspace_ids, list fspace_ids,
list type_ids, list bufs not None, PropID dxpl=None):
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These parameters are lists instead of numpy arrays to avoid an issue with the destination buffers.

Specifically, the individual destination buffer for each dset read should be a numpy array. If bufs were itself also a numpy array containing these arrays, then it would copy those arrays instead of pointing to the same memory, and the original destination arrays the user created wouldn't get populated.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, that makes sense.

The Python function probably shouldn't take count as the first parameter - it's necessary in C to tell you how many items you can read after a pointer, but lists have a length.

But if that's the case, I'd probably do a check up front that the lists are all the same length, and fail with a clear error if not.

""" (int count, list dataset_ids, list mspace_ids, list fspace_ids,
list type_ids, list bufs not None, PropID dxpl=None)

Read raw data from a set of datasets into the provided buffers.

For each dataset that will be read, its id, the id of a corresponding memory
and file space, a type id, and a buffer should be provided. The dataset
transfer property list applies to all transfers.
"""

cdef hid_t* type_hids
cdef hid_t* mspace_hids
cdef hid_t* fspace_hids
cdef hid_t* dataset_hids
cdef void** buffer_ptrs

cdef hid_t plist_id

try:
buffer_ptrs = <void**>malloc(count * sizeof(void*))
type_hids = <hid_t*>malloc(count * sizeof(hid_t*))
mspace_hids = <hid_t*>malloc(count * sizeof(hid_t*))
fspace_hids = <hid_t*>malloc(count * sizeof(hid_t*))
dataset_hids = <hid_t*>malloc(count * sizeof(hid_t*))

plist_id = pdefault(dxpl)

for i in range(count):
buffer_ptrs[i] = <void*> PyArray_DATA(bufs[i])
type_hids[i] = <hid_t> type_ids[i]
mspace_hids[i] = <hid_t> mspace_ids[i]
fspace_hids[i] = <hid_t> fspace_ids[i]
dataset_hids[i] = <hid_t> dataset_ids[i]

dset_rw(count, dataset_hids, type_hids, mspace_hids, fspace_hids, plist_id, buffer_ptrs, 1)

finally:
free(type_hids)
free(mspace_hids)
free(fspace_hids)
free(dataset_hids)
free(buffer_ptrs)



# --- Proxy functions for safe(r) threading -----------------------------------


Expand Down Expand Up @@ -224,22 +273,29 @@ cdef class DatasetID(ObjectID):
this is not the case, ValueError will be raised and the read will
fail. Keyword dxpl may be a dataset transfer property list.
"""
cdef hid_t self_id, mtype_id, mspace_id, fspace_id, plist_id
cdef void* data
cdef hid_t *self_id
cdef hid_t *mtype_id
cdef hid_t *mspace_id
cdef hid_t *fspace_id
cdef hid_t plist_id

cdef void** data
cdef int oldflags

if mtype is None:
mtype = py_create(arr_obj.dtype)
check_numpy_write(arr_obj, -1)

self_id = self.id
mtype_id = mtype.id
mspace_id = mspace.id
fspace_id = fspace.id
self_id = &self.id
mtype_id = &mtype.id
mspace_id = &mspace.id
fspace_id = &fspace.id
plist_id = pdefault(dxpl)
data = PyArray_DATA(arr_obj)

dset_rw(self_id, mtype_id, mspace_id, fspace_id, plist_id, data, 1)
data_tmp = PyArray_DATA(arr_obj)
data = &data_tmp

dset_rw(1, self_id, mtype_id, mspace_id, fspace_id, plist_id, data, 1)


@with_phil
Expand All @@ -264,22 +320,29 @@ cdef class DatasetID(ObjectID):
The provided Numpy array must be C-contiguous. If this is not the
case, ValueError will be raised and the read will fail.
"""
cdef hid_t self_id, mtype_id, mspace_id, fspace_id, plist_id
cdef void* data
cdef hid_t *self_id
cdef hid_t *mtype_id
cdef hid_t *mspace_id
cdef hid_t *fspace_id
cdef hid_t plist_id

cdef void** data
cdef int oldflags

if mtype is None:
mtype = py_create(arr_obj.dtype)
check_numpy_read(arr_obj, -1)

self_id = self.id
mtype_id = mtype.id
mspace_id = mspace.id
fspace_id = fspace.id
self_id = &(self.id)
mtype_id = &(mtype.id)
mspace_id = &(mspace.id)
fspace_id = &(fspace.id)
plist_id = pdefault(dxpl)
data = PyArray_DATA(arr_obj)

dset_rw(self_id, mtype_id, mspace_id, fspace_id, plist_id, data, 0)
data_tmp = PyArray_DATA(arr_obj)
data = &data_tmp

dset_rw(1, self_id, mtype_id, mspace_id, fspace_id, plist_id, data, 0)


@with_phil
Expand Down