Skip to content

Commit

Permalink
Merge pull request #193 from HERA-Team/parallel_safe_container
Browse files Browse the repository at this point in the history
Concurrent access to PSpecContainer
  • Loading branch information
philbull committed Jul 10, 2019
2 parents 4bb8d6e + 0e57b6a commit aca6586
Show file tree
Hide file tree
Showing 6 changed files with 282 additions and 66 deletions.
6 changes: 4 additions & 2 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@
MOCK_MODULES = ['numpy', 'scipy', 'scipy.interpolate', 'scipy.integrate',
'pyuvdata', 'h5py', 'aipy', 'omnical', 'linsolve', 'hera_qm',
'uvtools', 'hera_cal', 'hera_cal.utils', 'healpy',
'scikit-learn', 'astropy', 'astropy.cosmology', 'matplotlib',
'matplotlib.pyplot', 'pylab', 'yaml']
'scikit-learn', 'astropy', 'astropy.cosmology', 'astropy.units',
'astropy.constants', 'matplotlib', 'matplotlib.pyplot',
'pylab', 'yaml', 'pyuvdata.utils', ]

for mod_name in MOCK_MODULES:
sys.modules[mod_name] = mock.Mock()

Expand Down
4 changes: 2 additions & 2 deletions docs/container.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@

``PSpecContainer`` is a container for organizing collections of ``UVPSpec`` objects. It is based on HDF5.

.. .. autoclass:: hera_pspec.PSpecContainer
.. :members:
.. autoclass:: hera_pspec.PSpecContainer
:members:
211 changes: 170 additions & 41 deletions hera_pspec/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,47 @@
import h5py
from hera_pspec import uvpspec, version, utils
import argparse
import time


def transactional(fn):
"""
Handle 'transactional' operations on PSpecContainer, where the HDF5 file is
opened and then closed again for every operation. This is done when
keep_open = False.
"""
def wrapper(*args, **kwargs):
psc = args[0] # self object

# Open HDF5 file if needed
if not psc.keep_open:
psc._open()

# Run function
try:
f = fn(*args, **kwargs)
except Exception as err:
# Close file before raising error
if not psc.keep_open:
psc._close()
raise err

# Close HDF5 file if necessary
if not psc.keep_open:
psc._close()

# Return function result
return f

return wrapper


class PSpecContainer(object):
"""
Container class for managing multiple UVPSpec objects.
"""

def __init__(self, filename, mode='r'):
def __init__(self, filename, mode='r', keep_open=True, tsleep=0.5, maxiter=2):
"""
Manage a collection of UVPSpec objects that are stored in a structured
HDF5 file.
Expand All @@ -23,24 +56,100 @@ def __init__(self, filename, mode='r'):
Whether to load the HDF5 file as read/write ('rw') or read-only
('r'). If 'rw' is specified and the file doesn't exist, an empty
one will be created.
keep_open : bool, optional
Whether the HDF5 file should be kept open, or opened and then
closed again each time an operation is performed. Setting
`keep_open=False` is helpful for multi-process access patterns.
This feature uses the Single-Writer Multiple-Reader (SWMR) feature
of HDF5. Note that SWMR can only be used on POSIX-compliant
filesystems, and so may not work on some network filesystems.
Default: True (keep file open).
tsleep : float, optional
Time to wait in seconds after each attempt at opening the file.
maxiter : int, optional
Maximum number of attempts to open file (useful for concurrent
access when file may be locked temporarily by other processes).
"""
self.filename = filename
self.keep_open = keep_open
self.mode = mode
self.tsleep = tsleep
self.maxiter = maxiter
if mode not in ['r', 'rw']:
raise ValueError("Must set mode to either 'r' or 'rw'.")

# Open file ready for reading and/or writing
# Open file ready for reading and/or writing (if not in transactional mode)
self.data = None
self._open()

if keep_open:
self._open()

def _open(self):
"""
Open HDF5 file ready for reading/writing.
Open HDF5 file ready for reading/writing. Does nothing if the file is
already open.
This method uses HDF5's single writer, multiple reader (swmr) mode,
which allows multiple handles to exist for the same file at the same
time, as long as only one is in rw mode. The rw instance should be the
*first* one that is created; if a read-only instance is already open
when a rw instance is created, an error will be raised by h5py.
"""
if self.data is not None: return

# Convert user-specified mode to a mode that HDF5 recognizes. We only
# allow non-destructive operations!
mode = 'a' if self.mode == 'rw' else 'r'
self.data = h5py.File(self.filename, mode)
if self.mode == 'rw':
mode = 'a'
else:
mode = 'r'
if self.mode == 'r':
swmr = True
else:
swmr = False

# check HDF5 version if swmr
if swmr:
hdf5_v = h5py.version.hdf5_version_tuple[0] \
+ h5py.version.hdf5_version_tuple[1]/100.
if hdf5_v < 1.1:
print("HDF5 version must be >= 1.10 for SWMR")

# Try to open the file
Ncount = 0
while True:
try:
self.data = h5py.File(self.filename, mode, libver='latest',
swmr=swmr)
if self.mode == 'rw':
try:
# Enable single writer, multiple reader mode on HDF5 file.
# This allows multiple handles to exist for the same file
# at the same time, as long as only one is in rw mode
self.data.swmr_mode = True
except ValueError:
pass
break
except (IOError, OSError):
# raise Exception if exceeded maxiter
if Ncount >= self.maxiter:
if self.mode == 'rw':
raise OSError(
"Failed to open HDF5 file. Another process may "
"be holding it open; use \nkeep_open=False to "
"help prevent this from happening (single "
"process), or use the\nlock kwarg (multiple "
"processes).")
else:
raise

# sleep and try again
Ncount += 1
time.sleep(self.tsleep)

# Update header info
if self.mode == 'rw':
Expand All @@ -50,7 +159,16 @@ def _open(self):
# Denote as Container
if 'pspec_type' not in list(self.data.attrs.keys()):
self.data.attrs['pspec_type'] = self.__class__.__name__


def _close(self):
"""
Close HDF5 file. DOes nothing if file is already closed.
"""
if self.data is None:
return
self.data.close()
self.data = None

def _store_pspec(self, pspec_group, uvp):
"""
Store a UVPSpec object as group of datasets within the HDF5 file.
Expand Down Expand Up @@ -120,7 +238,8 @@ def _update_header(self):
"of hera_pspec.")
else:
hdr.attrs['hera_pspec.git_hash'] = version.git_hash


@transactional
def set_pspec(self, group, psname, pspec, overwrite=False):
"""
Store a delay power spectrum in the container.
Expand Down Expand Up @@ -198,7 +317,7 @@ def set_pspec(self, group, psname, pspec, overwrite=False):
# Store info about what kind of power spectra are in the group
psgrp.attrs['pspec_type'] = pspec.__class__.__name__


@transactional
def get_pspec(self, group, psname=None):
"""
Get a UVPSpec power spectrum object from a given group.
Expand Down Expand Up @@ -234,7 +353,6 @@ def get_pspec(self, group, psname=None):
else:
raise KeyError("No pspec named '%s' in group '%s'" % (key2, key1))


# Otherwise, extract all available power spectra
uvp = []
def pspec_filter(n, obj):
Expand All @@ -244,8 +362,8 @@ def pspec_filter(n, obj):
# Traverse the entire set of groups/datasets looking for pspecs
grp.visititems(pspec_filter) # This adds power spectra to the uvp list
return uvp


@transactional
def spectra(self, group):
"""
Return list of available power spectra.
Expand Down Expand Up @@ -276,7 +394,8 @@ def pspec_filter(n, obj):
# Traverse the entire set of groups/datasets looking for pspecs
grp.visititems(pspec_filter)
return ps_list


@transactional
def groups(self):
"""
Return list of groups in the container.
Expand All @@ -287,9 +406,11 @@ def groups(self):
List of group names.
"""
groups = list(self.data.keys())
if u'header' in groups: groups.remove(u'header')
if u'header' in groups:
groups.remove(u'header')
return groups


@transactional
def tree(self):
"""
Output a string containing a tree diagram of groups and the power
Expand All @@ -301,13 +422,14 @@ def tree(self):
for pspec in self.spectra(grp):
s += " |--%s\n" % pspec
return s


@transactional
def save(self):
"""
Force HDF5 file to flush to disk.
"""
self.data.flush()

def __del__(self):
"""
Make sure that HDF5 file is closed on destruct.
Expand All @@ -322,23 +444,24 @@ def __del__(self):
def combine_psc_spectra(psc, groups=None, dset_split_str='_x_', ext_split_str='_',
verbose=True, overwrite=False):
"""
Iterate through a PSpecContainer and, within each specified group,
combine UVPSpec (i.e. spectra) of similar name but varying psname extension.
Iterate through a PSpecContainer and, within each specified group, combine
UVPSpec (i.e. spectra) of similar name but varying psname extension.
Power spectra to-be-merged are assumed to follow the naming convention
dset1_x_dset2_ext1, dset1_x_dset2_ext2, ...
where _x_ is the default dset_split_str, and _ is the default ext_split_str.
The spectra names are first split by dset_split_str, and then by ext_split_str. In
this particular case, all instances of dset1_x_dset2* will be merged together.
The spectra names are first split by dset_split_str, and then by
ext_split_str. In this particular case, all instances of dset1_x_dset2*
will be merged together.
In order to merge spectra names with no dset distinction and only an extension,
feed dset_split_str as '' or None. Example, to merge together: uvp_1, uvp_2, uvp_3
feed dset_split_str=None and ext_split_str='_'.
In order to merge spectra names with no dset distinction and only an
extension, feed dset_split_str as '' or None. Example, to merge together:
uvp_1, uvp_2, uvp_3, feed dset_split_str=None and ext_split_str='_'.
Note this is a destructive and inplace operation, all of the *_ext1 objects are
removed after merge.
Note this is a destructive and inplace operation, all of the *_ext1 objects
are removed after merge.
Parameters
----------
Expand All @@ -360,13 +483,13 @@ def combine_psc_spectra(psc, groups=None, dset_split_str='_x_', ext_split_str='_
overwrite : bool
If True, overwrite output spectra if they exist.
"""
# load container
# Load container
if isinstance(psc, (str, np.str)):
psc = PSpecContainer(psc, mode='rw')
else:
assert isinstance(psc, PSpecContainer)

# get groups
# Get groups
_groups = psc.groups()
if groups is None:
groups = _groups
Expand All @@ -385,7 +508,8 @@ def combine_psc_spectra(psc, groups=None, dset_split_str='_x_', ext_split_str='_
if dset_split_str == '' or dset_split_str is None:
sp = spc.split(ext_split_str)[0]
else:
sp = utils.flatten([s.split(ext_split_str) for s in spc.split(dset_split_str)])[:2]
sp = utils.flatten([s.split(ext_split_str)
for s in spc.split(dset_split_str)])[:2]
sp = dset_split_str.join(sp)
if sp not in unique_spectra:
unique_spectra.append(sp)
Expand All @@ -395,11 +519,13 @@ def combine_psc_spectra(psc, groups=None, dset_split_str='_x_', ext_split_str='_
# check for overwrite
if spc in spectra and overwrite == False:
if verbose:
print("spectra {}/{} already exists and overwrite == False, skipping...".format(grp, spc))
print("spectra {}/{} already exists and overwrite == False, "
"skipping...".format(grp, spc))
continue

# get merge list
to_merge = [spectra[i] for i in np.where([spc in _sp for _sp in spectra])[0]]
to_merge = [spectra[i] for i in \
np.where([spc in _sp for _sp in spectra])[0]]
try:
# merge
uvps = [psc.get_pspec(grp, uvp) for uvp in to_merge]
Expand All @@ -413,7 +539,9 @@ def combine_psc_spectra(psc, groups=None, dset_split_str='_x_', ext_split_str='_
except Exception as exc:
# merge failed, so continue
if verbose:
print("uvp merge failed for spectra {}/{}, exception: {}".format(grp, spc, exc))
print("uvp merge failed for spectra {}/{}, exception: " \
"{}".format(grp, spc, exc))



def get_combine_psc_spectra_argparser():
Expand All @@ -424,11 +552,12 @@ def get_combine_psc_spectra_argparser():
a.add_argument("filename", type=str,
help="Filename of HDF5 container (PSpecContainer) containing "
"groups / input power spectra.")

a.add_argument("--dset_split_str", default='_x_', type=str, help='The pattern used to split dset1 '
'from dset2 in the psname.')
a.add_argument("--ext_split_str", default='_', type=str, help='The pattern used to split the dset '
'names from their extension in the psname (if it exists).')
a.add_argument("--verbose", default=False, action='store_true', help='Report feedback to stdout.')

a.add_argument("--dset_split_str", default='_x_', type=str,
help='The pattern used to split dset1 from dset2 in the '
'psname.')
a.add_argument("--ext_split_str", default='_', type=str,
help='The pattern used to split the dset names from their '
'extension in the psname (if it exists).')
a.add_argument("--verbose", default=False, action='store_true',
help='Report feedback to stdout.')
return a
Loading

0 comments on commit aca6586

Please sign in to comment.