Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pspecdata input calibration and pspec_run updates #212

Merged
merged 14 commits into from
Jul 25, 2019
105 changes: 75 additions & 30 deletions hera_pspec/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,32 @@
from hera_pspec import uvpspec, version, utils
import argparse
import time
from functools import wraps


def transactional(fn):
"""
Handle 'transactional' operations on PSpecContainer, where the HDF5 file is
opened and then closed again for every operation. This is done when
keep_open = False.

For calling a @transactional function within another @transactional function,
feed the kwarg nested=True in the nested function call, and it will not close
the container upon exit even if keep_open = False, until the outer-most
@transactional function exits, in which case it will close the container
if keep_open = False.
"""
@wraps(fn)
nkern marked this conversation as resolved.
Show resolved Hide resolved
def wrapper(*args, **kwargs):
psc = args[0] # self object

# Open HDF5 file if needed
if not psc.keep_open:
if psc.data is None:
psc._open()


# if passed 'nested' kwarg get it
nested = kwargs.pop('nested', False)

# Run function
try:
f = fn(*args, **kwargs)
Expand All @@ -26,9 +37,9 @@ def wrapper(*args, **kwargs):
if not psc.keep_open:
psc._close()
raise err

# Close HDF5 file if necessary
if not psc.keep_open:
if not psc.keep_open and not nested:
psc._close()

# Return function result
Expand All @@ -41,12 +52,16 @@ class PSpecContainer(object):
"""
Container class for managing multiple UVPSpec objects.
"""

def __init__(self, filename, mode='r', keep_open=True, tsleep=0.5, maxiter=2):
def __init__(self, filename, mode='r', keep_open=True, swmr=False,
tsleep=0.5, maxiter=2):
"""
Manage a collection of UVPSpec objects that are stored in a structured
HDF5 file.

Note: one should not create new groups or datasets with SWMR. See page 6 of
https://support.hdfgroup.org/HDF5/docNewFeatures/SWMR/HDF5_SWMR_Users_Guide.pdf
for SWMR limitations.

Parameters
----------
filename : str
Expand All @@ -61,13 +76,14 @@ def __init__(self, filename, mode='r', keep_open=True, tsleep=0.5, maxiter=2):
Whether the HDF5 file should be kept open, or opened and then
closed again each time an operation is performed. Setting
`keep_open=False` is helpful for multi-process access patterns.

This feature uses the Single-Writer Multiple-Reader (SWMR) feature
of HDF5. Note that SWMR can only be used on POSIX-compliant
filesystems, and so may not work on some network filesystems.

Default: True (keep file open).

swmr : bool, optional
Enable Single-Writer Multiple-Reader (SWMR) feature of HDF5.
Note that SWMR can only be used on POSIX-compliant
filesystems, and so may not work on some network filesystems.
Default: False (do not use SWMR)

tsleep : float, optional
Time to wait in seconds after each attempt at opening the file.

Expand All @@ -80,14 +96,15 @@ def __init__(self, filename, mode='r', keep_open=True, tsleep=0.5, maxiter=2):
self.mode = mode
self.tsleep = tsleep
self.maxiter = maxiter
self.swmr = swmr
if mode not in ['r', 'rw']:
raise ValueError("Must set mode to either 'r' or 'rw'.")

# Open file ready for reading and/or writing (if not in transactional mode)
self.data = None
if keep_open:
self._open()

def _open(self):
"""
Open HDF5 file ready for reading/writing. Does nothing if the file is
Expand All @@ -107,17 +124,17 @@ def _open(self):
mode = 'a'
else:
mode = 'r'
if self.mode == 'r':
if self.swmr and self.mode == 'r':
swmr = True
else:
swmr = False

# check HDF5 version if swmr
if swmr:
hdf5_v = h5py.version.hdf5_version_tuple[0] \
+ h5py.version.hdf5_version_tuple[1]/100.
hdf5_v = float('.'.join(h5py.version.hdf5_version.split('.')[:2]))
if hdf5_v < 1.1:
print("HDF5 version must be >= 1.10 for SWMR")
raise NotImplementedError("HDF5 version is {}: must "
"be >= 1.10 for SWMR".format(hdf5_v))

# Try to open the file
Ncount = 0
Expand All @@ -130,7 +147,8 @@ def _open(self):
# Enable single writer, multiple reader mode on HDF5 file.
# This allows multiple handles to exist for the same file
# at the same time, as long as only one is in rw mode
self.data.swmr_mode = True
if self.swmr:
self.data.swmr_mode = True
except ValueError:
pass
break
Expand Down Expand Up @@ -159,7 +177,7 @@ def _open(self):
# Denote as Container
if 'pspec_type' not in list(self.data.attrs.keys()):
self.data.attrs['pspec_type'] = self.__class__.__name__

def _close(self):
"""
Close HDF5 file. DOes nothing if file is already closed.
Expand Down Expand Up @@ -227,7 +245,10 @@ def _update_header(self):
git version of hera_pspec.
"""
if 'header' not in list(self.data.keys()):
hdr = self.data.create_group('header')
if not self.swmr:
hdr = self.data.create_group('header')
else:
raise ValueError("Cannot create a header group with SWMR")
else:
hdr = self.data['header']

Expand All @@ -236,7 +257,7 @@ def _update_header(self):
if hdr.attrs['hera_pspec.git_hash'] != version.git_hash:
print("WARNING: HDF5 file was created by a different version "
"of hera_pspec.")
else:
elif not self.swmr:
hdr.attrs['hera_pspec.git_hash'] = version.git_hash

@transactional
Expand All @@ -261,7 +282,7 @@ def set_pspec(self, group, psname, pspec, overwrite=False):
"""
if self.mode == 'r':
raise IOError("HDF5 file was opened read-only; cannot write to file.")

if isinstance(group, (tuple, list, dict)):
raise ValueError("Only one group can be specified at a time.")

Expand All @@ -283,6 +304,12 @@ def set_pspec(self, group, psname, pspec, overwrite=False):
raise ValueError("If pspec is a list, psname must also be a list.")
# No lists should pass beyond this point

# check for swmr write of new group or dataset
if self.swmr:
tree = self.tree(return_str=False, nested=True)
if group not in tree or psname not in tree[group]:
raise ValueError("Cannot write new group or dataset with SWMR")

# Check that input is of the correct type
if not isinstance(pspec, uvpspec.UVPSpec):
print("pspec:", type(pspec), pspec)
Expand Down Expand Up @@ -394,7 +421,7 @@ def pspec_filter(n, obj):
# Traverse the entire set of groups/datasets looking for pspecs
grp.visititems(pspec_filter)
return ps_list

@transactional
def groups(self):
"""
Expand All @@ -409,19 +436,37 @@ def groups(self):
if u'header' in groups:
groups.remove(u'header')
return groups

@transactional
def tree(self):
def tree(self, return_str=True):
"""
Output a string containing a tree diagram of groups and the power
spectra that they contain.

Parameters
----------
return_str : bool, optional
If True, return the tree as a string, otherwise
return as a dictionary

Returns
-------
str or dict
Tree structure of HDF5 file
"""
s = ""
for grp in self.groups():
s += "(%s)\n" % grp
for pspec in self.spectra(grp):
s += " |--%s\n" % pspec
return s
grps = self.groups(nested=True)
tree = dict()
for grp in grps:
tree[grp] = self.spectra(grp, nested=True)
if not return_str:
return tree
else:
s = ""
for grp in grps:
s += "(%s)\n" % grp
for pspec in tree[grp]:
s += " |--%s\n" % pspec
return s

@transactional
def save(self):
Expand Down
5,151 changes: 5,151 additions & 0 deletions hera_pspec/data/zen.2458116.30448.HH.flagged_abs.calfits

Large diffs are not rendered by default.

Binary file added hera_pspec/data/zen.2458116.30448.HH.uvh5
Binary file not shown.
5,043 changes: 5,043 additions & 0 deletions hera_pspec/data/zen.2458116.31193.HH.flagged_abs.calfits

Large diffs are not rendered by default.

Binary file added hera_pspec/data/zen.2458116.31193.HH.uvh5
Binary file not shown.
5,615 changes: 5,615 additions & 0 deletions hera_pspec/data/zen.2458116.31939.HH.flagged_abs.calfits

Large diffs are not rendered by default.

Binary file added hera_pspec/data/zen.2458116.31939.HH.uvh5
Binary file not shown.
17 changes: 13 additions & 4 deletions hera_pspec/grouping.py
Original file line number Diff line number Diff line change
Expand Up @@ -819,7 +819,7 @@ def bootstrap_resampled_error(uvp, blpair_groups=None, time_avg=False, Nsamples=

def bootstrap_run(filename, spectra=None, blpair_groups=None, time_avg=False, Nsamples=1000, seed=0,
normal_std=True, robust_std=True, cintervals=None, keep_samples=False,
bl_error_tol=1.0, overwrite=False, add_to_history='', verbose=True):
bl_error_tol=1.0, overwrite=False, add_to_history='', verbose=True, maxiter=1):
"""
Run bootstrap resampling on a PSpecContainer object to estimate errorbars.
For each group/spectrum specified in the PSpecContainer, this function produces
Expand All @@ -828,7 +828,9 @@ def bootstrap_run(filename, spectra=None, blpair_groups=None, time_avg=False, Ns
(3.) series of bootstrap resamples of UVPSpec average (optional)

The output of 1. and 2. are placed in a *_avg spectrum, while the output of 3.
is placed in *_bs0, *_bs1, *_bs2 etc. objects.
is placed in *_bs0, *_bs1, *_bs2 etc. objects.

Note: PSpecContainers should not be opened in SWMR mode for this function.

Parameters:
-----------
Expand Down Expand Up @@ -882,14 +884,21 @@ def bootstrap_run(filename, spectra=None, blpair_groups=None, time_avg=False, Ns

verbose : bool
If True, report feedback to stdout.

maxiter : int, optional, default=1
Maximum number of attempts to open the PSpecContainer by a single process.
0.5 sec wait per attempt. Useful in the case of multiprocesses bootstrapping
different groups of the same container.
"""
from hera_pspec import uvpspec
from hera_pspec import PSpecContainer
# type check
if isinstance(filename, (str, np.str)):
psc = PSpecContainer(filename)
# open in transactional mode
psc = PSpecContainer(filename, mode='rw', keep_open=False, swmr=False, tsleep=0.5, maxiter=maxiter)
elif isinstance(filename, PSpecContainer):
psc = filename
assert not psc.swmr, "PSpecContainer should not be in SWMR mode"
else:
raise AssertionError("filename must be a PSpecContainer or filepath to one")

Expand All @@ -900,7 +909,7 @@ def bootstrap_run(filename, spectra=None, blpair_groups=None, time_avg=False, Ns
# get spectra if not fed
all_spectra = utils.flatten([ [os.path.join(grp, s)
for s in psc.spectra(grp)]
for grp in groups ])
for grp in groups])
if spectra is None:
spectra = all_spectra
else:
Expand Down
Loading