Skip to content

Commit

Permalink
Merge 8eff919 into aca6586
Browse files Browse the repository at this point in the history
  • Loading branch information
nkern committed Jul 23, 2019
2 parents aca6586 + 8eff919 commit c2fe812
Show file tree
Hide file tree
Showing 13 changed files with 16,421 additions and 259 deletions.
105 changes: 75 additions & 30 deletions hera_pspec/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,32 @@
from hera_pspec import uvpspec, version, utils
import argparse
import time
from functools import wraps


def transactional(fn):
"""
Handle 'transactional' operations on PSpecContainer, where the HDF5 file is
opened and then closed again for every operation. This is done when
keep_open = False.
For calling a @transactional function within another @transactional function,
feed the kwarg nested=True in the nested function call, and it will not close
the container upon exit even if keep_open = False, until the outer-most
@transactional function exits, in which case it will close the container
if keep_open = False.
"""
@wraps(fn)
def wrapper(*args, **kwargs):
psc = args[0] # self object

# Open HDF5 file if needed
if not psc.keep_open:
if psc.data is None:
psc._open()


# if passed 'nested' kwarg get it
nested = kwargs.pop('nested', False)

# Run function
try:
f = fn(*args, **kwargs)
Expand All @@ -26,9 +37,9 @@ def wrapper(*args, **kwargs):
if not psc.keep_open:
psc._close()
raise err

# Close HDF5 file if necessary
if not psc.keep_open:
if not psc.keep_open and not nested:
psc._close()

# Return function result
Expand All @@ -41,12 +52,16 @@ class PSpecContainer(object):
"""
Container class for managing multiple UVPSpec objects.
"""

def __init__(self, filename, mode='r', keep_open=True, tsleep=0.5, maxiter=2):
def __init__(self, filename, mode='r', keep_open=True, swmr=False,
tsleep=0.5, maxiter=2):
"""
Manage a collection of UVPSpec objects that are stored in a structured
HDF5 file.
Note: one should not create new groups or datasets with SWMR. See page 6 of
https://support.hdfgroup.org/HDF5/docNewFeatures/SWMR/HDF5_SWMR_Users_Guide.pdf
for SWMR limitations.
Parameters
----------
filename : str
Expand All @@ -61,13 +76,14 @@ def __init__(self, filename, mode='r', keep_open=True, tsleep=0.5, maxiter=2):
Whether the HDF5 file should be kept open, or opened and then
closed again each time an operation is performed. Setting
`keep_open=False` is helpful for multi-process access patterns.
This feature uses the Single-Writer Multiple-Reader (SWMR) feature
of HDF5. Note that SWMR can only be used on POSIX-compliant
filesystems, and so may not work on some network filesystems.
Default: True (keep file open).
swmr : bool, optional
Enable Single-Writer Multiple-Reader (SWMR) feature of HDF5.
Note that SWMR can only be used on POSIX-compliant
filesystems, and so may not work on some network filesystems.
Default: False (do not use SWMR)
tsleep : float, optional
Time to wait in seconds after each attempt at opening the file.
Expand All @@ -80,14 +96,15 @@ def __init__(self, filename, mode='r', keep_open=True, tsleep=0.5, maxiter=2):
self.mode = mode
self.tsleep = tsleep
self.maxiter = maxiter
self.swmr = swmr
if mode not in ['r', 'rw']:
raise ValueError("Must set mode to either 'r' or 'rw'.")

# Open file ready for reading and/or writing (if not in transactional mode)
self.data = None
if keep_open:
self._open()

def _open(self):
"""
Open HDF5 file ready for reading/writing. Does nothing if the file is
Expand All @@ -107,17 +124,17 @@ def _open(self):
mode = 'a'
else:
mode = 'r'
if self.mode == 'r':
if self.swmr and self.mode == 'r':
swmr = True
else:
swmr = False

# check HDF5 version if swmr
if swmr:
hdf5_v = h5py.version.hdf5_version_tuple[0] \
+ h5py.version.hdf5_version_tuple[1]/100.
hdf5_v = float('.'.join(h5py.version.hdf5_version.split('.')[:2]))
if hdf5_v < 1.1:
print("HDF5 version must be >= 1.10 for SWMR")
raise NotImplementedError("HDF5 version is {}: must "
"be >= 1.10 for SWMR".format(hdf5_v))

# Try to open the file
Ncount = 0
Expand All @@ -130,7 +147,8 @@ def _open(self):
# Enable single writer, multiple reader mode on HDF5 file.
# This allows multiple handles to exist for the same file
# at the same time, as long as only one is in rw mode
self.data.swmr_mode = True
if self.swmr:
self.data.swmr_mode = True
except ValueError:
pass
break
Expand Down Expand Up @@ -159,7 +177,7 @@ def _open(self):
# Denote as Container
if 'pspec_type' not in list(self.data.attrs.keys()):
self.data.attrs['pspec_type'] = self.__class__.__name__

def _close(self):
"""
Close HDF5 file. DOes nothing if file is already closed.
Expand Down Expand Up @@ -227,7 +245,10 @@ def _update_header(self):
git version of hera_pspec.
"""
if 'header' not in list(self.data.keys()):
hdr = self.data.create_group('header')
if not self.swmr:
hdr = self.data.create_group('header')
else:
raise ValueError("Cannot create a header group with SWMR")
else:
hdr = self.data['header']

Expand All @@ -236,7 +257,7 @@ def _update_header(self):
if hdr.attrs['hera_pspec.git_hash'] != version.git_hash:
print("WARNING: HDF5 file was created by a different version "
"of hera_pspec.")
else:
elif not self.swmr:
hdr.attrs['hera_pspec.git_hash'] = version.git_hash

@transactional
Expand All @@ -261,7 +282,7 @@ def set_pspec(self, group, psname, pspec, overwrite=False):
"""
if self.mode == 'r':
raise IOError("HDF5 file was opened read-only; cannot write to file.")

if isinstance(group, (tuple, list, dict)):
raise ValueError("Only one group can be specified at a time.")

Expand All @@ -283,6 +304,12 @@ def set_pspec(self, group, psname, pspec, overwrite=False):
raise ValueError("If pspec is a list, psname must also be a list.")
# No lists should pass beyond this point

# check for swmr write of new group or dataset
if self.swmr:
tree = self.tree(return_str=False, nested=True)
if group not in tree or psname not in tree[group]:
raise ValueError("Cannot write new group or dataset with SWMR")

# Check that input is of the correct type
if not isinstance(pspec, uvpspec.UVPSpec):
print("pspec:", type(pspec), pspec)
Expand Down Expand Up @@ -394,7 +421,7 @@ def pspec_filter(n, obj):
# Traverse the entire set of groups/datasets looking for pspecs
grp.visititems(pspec_filter)
return ps_list

@transactional
def groups(self):
"""
Expand All @@ -409,19 +436,37 @@ def groups(self):
if u'header' in groups:
groups.remove(u'header')
return groups

@transactional
def tree(self):
def tree(self, return_str=True):
"""
Output a string containing a tree diagram of groups and the power
spectra that they contain.
Parameters
----------
return_str : bool, optional
If True, return the tree as a string, otherwise
return as a dictionary
Returns
-------
str or dict
Tree structure of HDF5 file
"""
s = ""
for grp in self.groups():
s += "(%s)\n" % grp
for pspec in self.spectra(grp):
s += " |--%s\n" % pspec
return s
grps = self.groups(nested=True)
tree = dict()
for grp in grps:
tree[grp] = self.spectra(grp, nested=True)
if not return_str:
return tree
else:
s = ""
for grp in grps:
s += "(%s)\n" % grp
for pspec in tree[grp]:
s += " |--%s\n" % pspec
return s

@transactional
def save(self):
Expand Down
5,151 changes: 5,151 additions & 0 deletions hera_pspec/data/zen.2458116.30448.HH.flagged_abs.calfits

Large diffs are not rendered by default.

Binary file added hera_pspec/data/zen.2458116.30448.HH.uvh5
Binary file not shown.
5,043 changes: 5,043 additions & 0 deletions hera_pspec/data/zen.2458116.31193.HH.flagged_abs.calfits

Large diffs are not rendered by default.

Binary file added hera_pspec/data/zen.2458116.31193.HH.uvh5
Binary file not shown.
5,615 changes: 5,615 additions & 0 deletions hera_pspec/data/zen.2458116.31939.HH.flagged_abs.calfits

Large diffs are not rendered by default.

Binary file added hera_pspec/data/zen.2458116.31939.HH.uvh5
Binary file not shown.
17 changes: 13 additions & 4 deletions hera_pspec/grouping.py
Original file line number Diff line number Diff line change
Expand Up @@ -819,7 +819,7 @@ def bootstrap_resampled_error(uvp, blpair_groups=None, time_avg=False, Nsamples=

def bootstrap_run(filename, spectra=None, blpair_groups=None, time_avg=False, Nsamples=1000, seed=0,
normal_std=True, robust_std=True, cintervals=None, keep_samples=False,
bl_error_tol=1.0, overwrite=False, add_to_history='', verbose=True):
bl_error_tol=1.0, overwrite=False, add_to_history='', verbose=True, maxiter=1):
"""
Run bootstrap resampling on a PSpecContainer object to estimate errorbars.
For each group/spectrum specified in the PSpecContainer, this function produces
Expand All @@ -828,7 +828,9 @@ def bootstrap_run(filename, spectra=None, blpair_groups=None, time_avg=False, Ns
(3.) series of bootstrap resamples of UVPSpec average (optional)
The output of 1. and 2. are placed in a *_avg spectrum, while the output of 3.
is placed in *_bs0, *_bs1, *_bs2 etc. objects.
is placed in *_bs0, *_bs1, *_bs2 etc. objects.
Note: PSpecContainers should not be opened in SWMR mode for this function.
Parameters:
-----------
Expand Down Expand Up @@ -882,14 +884,21 @@ def bootstrap_run(filename, spectra=None, blpair_groups=None, time_avg=False, Ns
verbose : bool
If True, report feedback to stdout.
maxiter : int, optional, default=1
Maximum number of attempts to open the PSpecContainer by a single process.
0.5 sec wait per attempt. Useful in the case of multiprocesses bootstrapping
different groups of the same container.
"""
from hera_pspec import uvpspec
from hera_pspec import PSpecContainer
# type check
if isinstance(filename, (str, np.str)):
psc = PSpecContainer(filename)
# open in transactional mode
psc = PSpecContainer(filename, mode='rw', keep_open=False, swmr=False, tsleep=0.5, maxiter=maxiter)
elif isinstance(filename, PSpecContainer):
psc = filename
assert not psc.swmr, "PSpecContainer should not be in SWMR mode"
else:
raise AssertionError("filename must be a PSpecContainer or filepath to one")

Expand All @@ -900,7 +909,7 @@ def bootstrap_run(filename, spectra=None, blpair_groups=None, time_avg=False, Ns
# get spectra if not fed
all_spectra = utils.flatten([ [os.path.join(grp, s)
for s in psc.spectra(grp)]
for grp in groups ])
for grp in groups])
if spectra is None:
spectra = all_spectra
else:
Expand Down
Loading

0 comments on commit c2fe812

Please sign in to comment.