diff --git a/.coveragerc b/.coveragerc new file mode 100644 index 00000000..98f5d134 --- /dev/null +++ b/.coveragerc @@ -0,0 +1,6 @@ +[run] +omit = */tests/* + +[report] +omit = */tests/* + diff --git a/hera_pspec/container.py b/hera_pspec/container.py index 685223e7..10b7e959 100644 --- a/hera_pspec/container.py +++ b/hera_pspec/container.py @@ -2,6 +2,9 @@ import h5py from hera_pspec.uvpspec import UVPSpec import hera_pspec.version as version +from hera_pspec import utils +import argparse + class PSpecContainer(object): """ @@ -31,8 +34,7 @@ def __init__(self, filename, mode='r'): # Open file ready for reading and/or writing self.data = None self._open() - - + def _open(self): """ Open HDF5 file ready for reading/writing. @@ -41,10 +43,16 @@ def _open(self): # allow non-destructive operations! mode = 'a' if self.mode == 'rw' else 'r' self.data = h5py.File(self.filename, mode) + + # Update header info if self.mode == 'rw': + # Update header self._update_header() - - + + # Denote as Container + if 'pspec_type' not in self.data.attrs.keys(): + self.data.attrs['pspec_type'] = self.__class__.__name__ + def _store_pspec(self, pspec_group, uvp): """ Store a UVPSpec object as group of datasets within the HDF5 file. @@ -65,8 +73,7 @@ def _store_pspec(self, pspec_group, uvp): # Write UVPSpec to group uvp.write_to_group(pspec_group, run_check=True) - - + def _load_pspec(self, pspec_group): """ Load a new UVPSpec object from a HDF5 group. @@ -94,7 +101,6 @@ def _load_pspec(self, pspec_group): uvp.read_from_group(pspec_group) return uvp - def _update_header(self): """ Update the header in the HDF5 file with useful metadata, including the @@ -110,9 +116,9 @@ def _update_header(self): if hdr.attrs['hera_pspec.git_hash'] != version.git_hash: print("WARNING: HDF5 file was created by a different version " "of hera_pspec.") - hdr.attrs['hera_pspec.git_hash'] = version.git_hash - - + else: + hdr.attrs['hera_pspec.git_hash'] = version.git_hash + def set_pspec(self, group, psname, pspec, overwrite=False): """ Store a delay power spectrum in the container. @@ -309,3 +315,93 @@ def __del__(self): self.data.close() except: pass + + +def merge_spectra(psc, dset_split_str='_x_', ext_split_str='_', verbose=True): + """ + Iterate through a PSpecContainer and, within each group, + merge spectra of similar name but different psname extension. + + Power spectra to-be-merged are assumed to follow the naming convention + + dset1_x_dset2_ext1, dset1_x_dset2_ext2, ... + + where _x_ is the default dset_split_str, and _ is the default ext_split_str. + The spectra names are first split by dset_split_str, and then by ext_split_str. In + this particular case, all instances of dset1_x_dset2* will be merged together. + + In order to merge spectra names with no dset distinction and only an extension, + feed dset_split_str as '' or None. Example, to merge together: uvp_1, uvp_2, uvp_3 + feed dset_split_str=None and ext_split_str='_'. + + Parameters: + ----------- + dset_split_str : str + The pattern used to split dset1 from dset2 in the psname. + + ext_split_str : str + The pattern used to split the dset name from its extension in the psname. + """ + from hera_pspec import uvpspec + # load container + if isinstance(psc, (str, np.str)): + psc = PSpecContainer(psc, mode='rw') + else: + assert isinstance(psc, PSpecContainer) + + # get groups + groups = psc.groups() + assert len(groups) > 0, "no groups exist in this Container object" + + # Iterate over groups + for grp in groups: + # Get spectra in this group + spectra = psc.data[grp].keys() + + # Get unique spectra by splitting and then re-joining + unique_spectra = [] + for spc in spectra: + if dset_split_str == '' or dset_split_str is None: + sp = spc.split(ext_split_str)[0] + else: + sp = utils.flatten([s.split(ext_split_str) for s in spc.split(dset_split_str)])[:2] + sp = dset_split_str.join(sp) + if sp not in unique_spectra: + unique_spectra.append(sp) + + # Iterate over each unique spectra, and merge all spectra extensions + for spc in unique_spectra: + # get merge list + to_merge = [spectra[i] for i in np.where([spc in _sp for _sp in spectra])[0]] + try: + # merge + uvps = [psc.get_pspec(grp, uvp) for uvp in to_merge] + merged_uvp = uvpspec.combine_uvpspec(uvps, verbose=verbose) + # write to file + psc.set_pspec(grp, spc, merged_uvp, overwrite=True) + # if successful merge, remove uvps + for uvp in to_merge: + if uvp != spc: + del psc.data[grp][uvp] + except: + # merge failed, so continue + if verbose: + print "uvp merge failed for spectra {}/{}".format(grp, spc) + + +def get_merge_spectra_argparser(): + a = argparse.ArgumentParser( + description="argument parser for hera_pspec.container.merge_spectra") + + # Add list of arguments + a.add_argument("filename", type=str, + help="Filename of HDF5 container (PSpecContainer) containing " + "groups / input power spectra.") + + a.add_argument("--dset_split_str", default='_x_', type=str, help='The pattern used to split dset1 ' + 'from dset2 in the psname.') + a.add_argument("--ext_split_str", default='_', type=str, help='The pattern used to split the dset ' + 'names from their extension in the psname (if it exists).') + a.add_argument("--verbose", default=False, action='store_true', help='Report feedback to stdout.') + + return a diff --git a/hera_pspec/data/_test_utils.yaml b/hera_pspec/data/_test_utils.yaml index 2f3de7e1..7341c2e0 100644 --- a/hera_pspec/data/_test_utils.yaml +++ b/hera_pspec/data/_test_utils.yaml @@ -3,6 +3,7 @@ data: root: ../hera_pspec/ subdirs: - data + pairs: [['xx', 'xx'], ['yy', 'yy']] template: zen.*.xx.HH.uvXA beam: data/NF_HERA_Beams.beamfits flags: zen.2458098.66239.yy.HH.uv.vis.uvfits.flags.npz @@ -13,9 +14,11 @@ pspec: weight: iC norm: I taper: none - groupname: test + groupname: None little_h: True avg_group: False exclude_auto_bls: False exclude_permutations: False - + options: + foo: None + bar: [['foo', 'bar']] \ No newline at end of file diff --git a/hera_pspec/grouping.py b/hera_pspec/grouping.py index cb5202f1..638fa659 100644 --- a/hera_pspec/grouping.py +++ b/hera_pspec/grouping.py @@ -1,9 +1,12 @@ import numpy as np from collections import OrderedDict as odict from hera_pspec import uvpspec_utils as uvputils -#from hera_pspec import UVPSpec +from hera_pspec import utils, version import random import copy +import argparse +from astropy import stats as astats +import os def group_baselines(bls, Ngroups, keep_remainder=False, randomize=False, @@ -229,7 +232,8 @@ def select_common(uvp_list, spws=True, blpairs=True, times=True, pols=True, def average_spectra(uvp_in, blpair_groups=None, time_avg=False, - blpair_weights=None, normalize_weights=True, inplace=True): + blpair_weights=None, normalize_weights=True, inplace=True, + add_to_history=''): """ Average power spectra across the baseline-pair-time axis, weighted by each spectrum's integration time. @@ -284,6 +288,9 @@ def average_spectra(uvp_in, blpair_groups=None, time_avg=False, If True, edit data in self, else make a copy and return. Default: True. + add_to_history : str, optional + Added text to add to file history. + Notes ----- Currently, every baseline-pair in a blpair group must have the same @@ -494,13 +501,16 @@ def average_spectra(uvp_in, blpair_groups=None, time_avg=False, if hasattr(uvp_in, 'label1'): uvp.label1 = uvp_in.label1 if hasattr(uvp_in, 'label2'): uvp.label2 = uvp_in.label2 + # Add to history + uvp.history = "Spectra averaged with hera_pspec [{}]\n{}\n{}\n{}".format(version.git_hash[:15], add_to_history, '-'*40, uvp.history) + # Validity check uvp.check() # Return if inplace == False: return uvp - + def fold_spectra(uvp): """ @@ -547,21 +557,20 @@ def fold_spectra(uvp): uvp.folded = True -def bootstrap_average_blpairs(uvp_list, blpair_groups, time_avg=False, - seed=None): +def bootstrap_average_blpairs(uvp_list, blpair_groups=None, time_avg=False, seed=None): """ Generate a bootstrap-sampled average over a set of power spectra. The sampling is done over user-defined groups of baseline pairs (with replacement). - + Multiple UVPSpec objects can be passed to this function. The bootstrap sampling is carried out over all of the available baseline-pairs (in a user-specified group) across all UVPSpec objects. (This means that each UVPSpec object can contribute a different number of baseline-pairs to the average each time.) - + Successive calls to the function will produce different random samples. - + Parameters ---------- uvp_list : list of UVPSpec objects @@ -603,15 +612,15 @@ def bootstrap_average_blpairs(uvp_list, blpair_groups, time_avg=False, """ # Validate input data from hera_pspec import UVPSpec - if isinstance(uvp_list, UVPSpec): uvp_list = [uvp_list,] + single_uvp = False + if isinstance(uvp_list, UVPSpec): + single_uvp = True + uvp_list = [uvp_list,] assert isinstance(uvp_list, list), \ "uvp_list must be a list of UVPSpec objects" - - # Check that uvp_list contains UVPSpec objects with the correct dimensions - for uvp in uvp_list: - assert isinstance(uvp, UVPSpec), \ - "uvp_list must be a list of UVPSpec objects" - + assert np.all([isinstance(uvp, UVPSpec) for uvp in uvp_list]), \ + "uvp_list must be a list of UVPSpec objects" + # Check for same length of time axis if no time averaging will be done if not time_avg: if np.unique([uvp.Ntimes for uvp in uvp_list]).size > 1: @@ -624,13 +633,13 @@ def bootstrap_average_blpairs(uvp_list, blpair_groups, time_avg=False, for grp in blpair_groups: assert isinstance(grp, list), \ "blpair_groups must be a list of lists of baseline-pair tuples/integers" - + # Convert blpair tuples into blpair integers if necessary if isinstance(blpair_groups[0][0], tuple): new_blp_grps = [map(lambda blp: uvputils._antnums_to_blpair(blp), blpg) for blpg in blpair_groups] blpair_groups = new_blp_grps - + # Homogenise input UVPSpec objects in terms of available polarizations # and spectral windows if len(uvp_list) > 1: @@ -640,7 +649,7 @@ def bootstrap_average_blpairs(uvp_list, blpair_groups, time_avg=False, avail_blpairs = [np.unique(uvp.blpair_array) for uvp in uvp_list] all_avail_blpairs = np.unique([blp for blplist in avail_blpairs for blp in blplist]) - + # Check that all requested blpairs exist in at least one UVPSpec object all_req_blpairs = np.unique([blp for grp in blpair_groups for blp in grp]) missing = [] @@ -650,10 +659,10 @@ def bootstrap_average_blpairs(uvp_list, blpair_groups, time_avg=False, raise KeyError("The following baseline-pairs were specified, but do " "not exist in any of the input UVPSpec objects: %s" \ % str(missing)) - + # Set random seed if specified if seed is not None: np.random.seed(seed) - + # Sample with replacement from the full list of available baseline-pairs # in each group, and create a blpair_group and blpair_weights entry for # each UVPSpec object @@ -662,17 +671,17 @@ def bootstrap_average_blpairs(uvp_list, blpair_groups, time_avg=False, # Loop over each requested blpair group for grp in blpair_groups: - + # Find which blpairs in this group are available in each UVPSpec object avail = [np.intersect1d(grp, blp_list) for blp_list in avail_blpairs] avail_flat = [blp for lst in avail for blp in lst] num_avail = len(avail_flat) - + # Draw set of random integers (with replacement) and convert into # list of weights for each blpair in each UVPSpec draw = np.random.randint(low=0, high=num_avail, size=num_avail) wgt = np.array([(draw == i).sum() for i in range(num_avail)]) - + # Extract the blpair weights for each UVPSpec j = 0 for i in range(len(uvp_list)): @@ -681,7 +690,7 @@ def bootstrap_average_blpairs(uvp_list, blpair_groups, time_avg=False, _wgts = wgt[np.arange(j, j+n_blps)].astype(float) #+ 1e-4 blpair_wgts_list[i].append( list(_wgts) ) j += n_blps - + # Loop over UVPSpec objects and calculate averages in each blpair group, # using the bootstrap-sampled blpair weights uvp_avg = [] @@ -690,7 +699,269 @@ def bootstrap_average_blpairs(uvp_list, blpair_groups, time_avg=False, blpair_weights=blpair_wgts_list[i], time_avg=time_avg, inplace=False) uvp_avg.append(_uvp) - + # Return list of averaged spectra for now - return uvp_avg, blpair_wgts_list + if single_uvp: + return uvp_avg[0], blpair_wgts_list[0] + else: + return uvp_avg, blpair_wgts_list + + +def bootstrap_resampled_error(uvp, blpair_groups=None, time_avg=False, Nsamples=1000, seed=None, + normal_std=True, robust_std=True, conf_ints=None, bl_error_tol=1.0, + add_to_history='', verbose=False): + """ + Given a UVPSpec object, generate bootstrap resamples of its average + and calculate their spread as an estimate of the errorbar on the + uniformly averaged data. + + Parameters: + ----------- + uvp : UVPSpec object + A UVPSpec object from which to bootstrap resample & average. + + blpair_groups : list + A list of baseline-pair groups to bootstrap resample over. Default + behavior is to calculate and use redundant baseline groups. + + time_avg : boolean + If True, average spectra before bootstrap resampling + + Nsamples : int + Number of times to perform bootstrap resample in estimating errorbar. + + seed : int + Random seed to use in bootstrap resampling. + + normal_std : bool + If True, calculate an error estimate from numpy.std + + robust_std : bool + If True, calculate an error estimate from astropy.stats.biweight_midvariance + + conf_ints : list + A list of integer confidence interval percentages (0 < conf_int < 100) to use + as an error estimate, using numpy.percentile + + bl_error_tol : float + Redundancy error tolerance of redundant groups if blpair_groups is None. + + add_to_history : str + String to add to history of output uvp_avg object. + + verbose : bool + If True, report feedback to stdout. + Returns: + -------- + uvp_avg : UVPSpec object + A uvp holding the uniformaly averaged data in data_array, and the various error + estimates calculated from the bootstrap resampling. + """ + from hera_pspec import UVPSpec + # type check + assert isinstance(uvp, (UVPSpec, str, np.str)), "uvp must be fed as a UVPSpec object or filepath" + if isinstance(uvp, (str, np.str)): + _uvp = UVPSpec() + _uvp.read_hdf5(uvp) + uvp = _uvp + + # Check for blpair_groups + if blpair_groups is None: + blpair_groups, _, _, _ = utils.get_blvec_reds(uvp, bl_error_tol=bl_error_tol) + + # Uniform average + uvp_avg = average_spectra(uvp, blpair_groups=blpair_groups, time_avg=time_avg, inplace=False) + + # initialize a seed + if seed is not None: np.random.seed(seed) + + # Iterate over Nsamples and create bootstrap resamples + uvp_boots = [] + uvp_wgts = [] + for i in range(Nsamples): + # resample + boot, wgt = bootstrap_average_blpairs(uvp, blpair_groups=blpair_groups, time_avg=time_avg, seed=None) + uvp_boots.append(boot) + uvp_wgts.append(wgt) + + # get all keys in uvp_avg and get data from each uvp_boot + keys = uvp_avg.get_all_keys() + uvp_boot_data = odict([(k, np.array(map(lambda u: u.get_data(k), uvp_boots))) for k in keys]) + + # calculate various error estimates + stats_array = odict() + if normal_std: + stats_array["normal_std"] = odict() + for k in keys: + stats_array["normal_std"][k] = np.std(uvp_boot_data[k].real, axis=0) \ + + 1j*np.std(uvp_boot_data[k].imag, axis=0) + if robust_std: + stats_array["robust_std"] = odict() + for k in keys: + stats_array["robust_std"][k] = np.sqrt(astats.biweight_midvariance(uvp_boot_data[k].real, axis=0)) \ + + 1j*np.sqrt(astats.biweight_midvariance(uvp_boot_data[k].imag, axis=0)) + if conf_ints is not None: + for ci in conf_ints: + ci_tag = "conf_int_{:05.2f}".format(ci) + stats_array[ci_tag] = odict() + for k in keys: + stats_array[ci_tag][k] = np.percentile(uvp_boot_data[k].real, ci, axis=0) \ + + 1j*np.percentile(uvp_boot_data[k].imag, ci, axis=0) + + # Set stats array in uvp_avg + + # Update history + uvp_avg.history = "Bootstrap errors estimated w/ hera_pspec [{}], {} samples, {} seed\n{}\n{}\n{}" \ + "".format(version.git_hash[:15], Nsamples, seed, add_to_history, '-'*40, uvp_avg.history) + + return uvp_avg, uvp_boots, uvp_wgts + + +def bootstrap_run(filename, spectra=None, blpair_groups=None, time_avg=False, Nsamples=1000, seed=0, + normal_std=True, robust_std=True, conf_ints=None, keep_samples=False, + bl_error_tol=1.0, overwrite=False, add_to_history='', verbose=True): + """ + Run bootstrap resampling on a PSpecContainer object to estimate errorbars. + For each group/spectrum specified in the PSpecContainer, this function produces + 1. uniform average of UVPSpec objects in the group + 2. various error estimates from the bootstrap resamples + (3.) series of bootstrap resamples of UVPSpec average (optional) + + The output of 1. and 2. are placed in a *_avg spectrum, while the output of 3. + is placed in *_bs0, *_bs1, *_bs2 etc. objects. + + Parameters: + ----------- + filename : str or PSpecContainer object + PSpecContainer object to run bootstrapping on. + + spectra : list + A list of power spectra names (with group prefix) to run bootstrapping on. + Default is all spectra in object. Ex. ["group1/psname1", "group1/psname2", ...] + + blpair_groups : list + A list of baseline-pair groups to bootstrap over. Default is to solve for and use + redundant baseline groups. Ex: [ [((1, 2), (2, 3)), ((1, 2), (3, 4))], + [((1, 3), (2, 4)), ((1, 3), (3, 5))], + ... + ] + + time_avg : bool + If True, perform time-average of power spectra in averaging step. + + Nsamples : int + The number of samples in bootstrap resampling to generate. + + seed : int + The random seed to initialize with before drwaing bootstrap samples. + + normal_std : bool + If True, use np.std to calculate a "normal" standard deviation of BS samples. + + robust_std : bool + If True, use astropy.stats.biweight_midvariance(..., c=9.0) to get a "robust" + standard deviation of the BS samples. + + conf_ints : list + A list of confidence interval percentages (0 < ci < 100) to calculate from + BS samples using np.percentile. + + keep_samples : bool + If True, store each bootstrap resample in PSpecContainer object with *_bs# suffix. + + bl_error_tol : float + If calculating redundant baseline groups, this is the redundancy tolerance in meters. + + overwrite : bool + If True, overwrite output files if they already exist. + + add_to_history : str + String to append to history in bootstrap_resample_error() call. + + verbose : bool + If True, report feedback to stdout. + """ + from hera_pspec import uvpspec + from hera_pspec import PSpecContainer + # type check + if isinstance(filename, (str, np.str)): + psc = PSpecContainer(filename) + elif isinstance(filename, PSpecContainer): + psc = filename + else: + raise AssertionError("filename must be a PSpecContainer or filepath to one") + + # get groups in psc + groups = psc.groups() + assert len(groups) > 0, "No groups exist in PSpecContainer" + + # get spectra if not fed + all_spectra = utils.flatten([map(lambda s: os.path.join(grp, s), psc.spectra(grp)) for grp in groups]) + if spectra is None: + spectra = all_spectra + else: + spectra = [spc for spc in spectra if spc in all_spectra] + assert len(spectra) > 0, "no specified spectra exist in PSpecContainer" + + # iterate over spectra + for spc_name in spectra: + # split group and spectra names + grp, spc = spc_name.split('/') + + # run boostrap_resampled_error + uvp = psc.get_pspec(grp, spc) + (uvp_avg, uvp_boots, + uvp_wgts) = bootstrap_resampled_error(uvp, blpair_groups=blpair_groups, time_avg=time_avg, + Nsamples=Nsamples, seed=seed, normal_std=normal_std, + robust_std=robust_std, conf_ints=conf_ints, + bl_error_tol=bl_error_tol, add_to_history=add_to_history, + verbose=verbose) + + # set averaged uvp + psc.set_pspec(grp, spc+"_avg", uvp_avg, overwrite=overwrite) + + # if keep_samples write uvp_boots + if keep_samples: + for i, uvpb in enumerate(uvp_boots): + psc.set_pspec(grp, spc+"_bs{}".format(i), uvpb, overwrite=overwrite) + + +def get_bootstrap_run_argparser(): + a = argparse.ArgumentParser( + description="argument parser for grouping.bootstrap_run()") + + def list_of_lists_of_tuples(s): + s = map(lambda x: map(int, x.split()), s.split(',')) + return s + + # Add list of arguments + a.add_argument("filename", type=str, + help="Filename of HDF5 container (PSpecContainer) containing " + "input power spectra.") + a.add_argument("--spectra", default=None, type=str, nargs='+', + help="List of power spectra names (with group prefix) to bootstrap over.") + a.add_argument("--blpair_groups", default=None, type=list_of_lists_of_tuples, + help="List of baseline-pair groups (must be space-delimited blpair integers) " + "wrapped in quotes to use in resampling. Default is to solve for and use redundant groups (recommended)." + "Ex: --blpair_groups '101102103104 102103014015, 101013102104' --> " + "[ [((1, 2), (3, 4)), ((2, 3), (4, 5))], [((1, 3), (2, 4))], ...]") + a.add_argument("--time_avg", default=False, type=bool, help="Perform time-average in averaging step.") + a.add_argument("--Nsamples", default=100, type=int, help="Number of bootstrap resamples to generate.") + a.add_argument("--seed", default=0, type=int, help="random seed to initialize bootstrap resampling with.") + a.add_argument("--normal_std", default=True, type=bool, + help="Whether to calculate a 'normal' standard deviation (np.std).") + a.add_argument("--robust_std", default=False, type=bool, + help="Whether to calculate a 'robust' standard deviation (astropy.stats.biweight_midvariance).") + a.add_argument("--conf_ints", default=None, type=float, nargs='+', + help="Confidence intervals (precentage from 0 < ci < 100) to calculate.") + a.add_argument("--keep_samples", default=False, action='store_true', + help="If True, store bootstrap resamples in PSpecContainer object with *_bs# extension.") + a.add_argument("--bl_error_tol", type=float, default=1.0, + help="Baseline redudancy tolerance if calculating redundant groups.") + a.add_argument("--overwrite", default=False, action='store_true', help="overwrite outputs if they exist.") + a.add_argument("--add_to_history", default='', type=str, help="String to add to history of power spectra.") + a.add_argument("--verbose", default=False, action='store_true', help="report feedback to stdout.") + + return a + diff --git a/hera_pspec/pspecdata.py b/hera_pspec/pspecdata.py index 8246ace3..f0dbde49 100644 --- a/hera_pspec/pspecdata.py +++ b/hera_pspec/pspecdata.py @@ -1,7 +1,7 @@ import numpy as np import aipy from pyuvdata import UVData -import copy, operator, itertools +import copy, operator, itertools, sys from collections import OrderedDict as odict import hera_cal as hc from hera_pspec import uvpspec, utils, version, pspecbeam, container @@ -10,11 +10,13 @@ import datetime import time import argparse +import ast +import glob class PSpecData(object): - def __init__(self, dsets=[], wgts=[],dsets_std=None, labels=None, beam=None): + def __init__(self, dsets=[], wgts=[], dsets_std=None, labels=None, beam=None): """ Object to store multiple sets of UVData visibilities and perform operations such as power spectrum estimation on them. @@ -25,10 +27,12 @@ def __init__(self, dsets=[], wgts=[],dsets_std=None, labels=None, beam=None): Set of UVData objects containing the data that will be used to compute the power spectrum. If specified as a dict, the key names will be used to tag each dataset. Default: Empty list. + dsets_std: list or dict of UVData objects, optional Set of UVData objects containing the standard deviations of each data point in UVData objects in dsets. If specified as a dict, the key names will be used to tag each dataset. Default: Empty list. + wgts : list or dict of UVData objects, optional Set of UVData objects containing weights for the input data. Default: Empty list. @@ -55,13 +59,14 @@ def __init__(self, dsets=[], wgts=[],dsets_std=None, labels=None, beam=None): # and taper to none by default self.data_weighting = 'identity' self.taper = 'none' - #set dsets_std to None if any are None. + + # set dsets_std to None if any are None. if not dsets_std is None and None in dsets_std: - dsets_std=None + dsets_std = None # Store the input UVData objects if specified if len(dsets) > 0: - self.add(dsets, wgts,dsets_std=dsets_std, labels=labels) + self.add(dsets, wgts, dsets_std=dsets_std, labels=labels) # Store a primary beam self.primary_beam = beam @@ -91,7 +96,6 @@ def add(self, dsets, wgts, labels=None, dsets_std=None): standard deviations (real and imaginary) of data to add to the collection. If dsets is a dict, will assume dsets_std is a dict and if dsets is a list, will assume dsets_std is a list. - """ # Check for dicts and unpack into an ordered list if found if isinstance(dsets, dict): @@ -104,9 +108,9 @@ def add(self, dsets, wgts, labels=None, dsets_std=None): raise TypeError("If 'dsets' is a dict, 'wgts' must also be " "a dict") - if not isinstance(dsets_std,dict): + if not isinstance(dsets_std, dict): if dsets_std is None: - dsets_std=[None for m in range(len(dsets))] + dsets_std = [None for m in range(len(dsets))] else: raise TypeError("If 'dsets' is a dict, 'dsets_std' must" "also be a dict") @@ -114,7 +118,6 @@ def add(self, dsets, wgts, labels=None, dsets_std=None): _dsets_std = [dsets_std[key] for key in labels] dsets_std = _dsets_std - # Unpack dsets and wgts dicts labels = dsets.keys() _dsets = [dsets[key] for key in labels] @@ -122,25 +125,23 @@ def add(self, dsets, wgts, labels=None, dsets_std=None): dsets = _dsets wgts = _wgts - # Convert input args to lists if possible if isinstance(dsets, UVData): dsets = [dsets,] if isinstance(wgts, UVData): wgts = [wgts,] if isinstance(labels, str): labels = [labels,] - if isinstance(dsets_std,UVData): dsets_std=[dsets_std,] + if isinstance(dsets_std, UVData): dsets_std = [dsets_std,] if wgts is None: wgts = [wgts,] - if dsets_std is None: dsets_std=[dsets_std for m in range(len(dsets))] + if dsets_std is None: dsets_std = [dsets_std for m in range(len(dsets))] if isinstance(dsets, tuple): dsets = list(dsets) if isinstance(wgts, tuple): wgts = list(wgts) - if isinstance(dsets_std,tuple): dsets_std=list(dsets_std) + if isinstance(dsets_std, tuple): dsets_std = list(dsets_std) # Only allow UVData or lists if not isinstance(dsets, list) or not isinstance(wgts, list)\ - or not isinstance(dsets_std,list): + or not isinstance(dsets_std, list): raise TypeError("dsets, dsets_std, and wgts must be UVData" "or lists of UVData") - # Make sure enough weights were specified assert(len(dsets) == len(wgts)) assert(len(dsets_std) == len(dsets)) @@ -153,20 +154,32 @@ def add(self, dsets, wgts, labels=None, dsets_std=None): if not isinstance(w, UVData) and w is not None: raise TypeError("Only UVData objects (or None) can be used as " "weights.") - if not isinstance(s,UVData) and s is not None: + if not isinstance(s, UVData) and s is not None: raise TypeError("Only UVData objects (or None) can be used as " "error sets") + # Store labels (if they were set) + if self.labels is None: + self.labels = [] + if labels is None: + labels = ["dset{:d}".format(i) for i in range(len(self.dsets), len(dsets)+len(self.dsets))] + self.labels += labels + # Append to list self.dsets += dsets self.wgts += wgts - self.dsets_std+=dsets_std - - # Store labels (if they were set) - if labels is None: - self.labels = [None for d in dsets] - else: - self.labels += labels + self.dsets_std += dsets_std + + # Check for repeated labels, and make them unique + for i, l in enumerate(self.labels): + ext = 1 + while True: + if l in self.labels[:i]: + l = self.labels[i] + ".{:d}".format(ext) + ext += 1 + else: + self.labels[i] = l + break # Store no. frequencies and no. times self.Nfreqs = self.dsets[0].Nfreqs @@ -251,8 +264,6 @@ def validate_datasets(self, verbose=True): max_diff = np.sqrt(max_diff_ra**2 + max_diff_dec**2) if max_diff > 0.15: raise_warning("Warning: maximum phase-center difference between datasets is > 10 arcmin", verbose=verbose) - - def check_key_in_dset(self, key, dset_ind): """ Check 'key' exists in the UVData object self.dsets[dset_ind] @@ -272,8 +283,6 @@ def check_key_in_dset(self, key, dset_ind): True if the key exists, False otherwise """ #FIXME: Fix this to enable label keys - - # get iterable key = uvutils.get_iterable(key) if isinstance(key, str): @@ -383,7 +392,6 @@ def parse_blkey(self, key): return dset_idx, bl - def x(self, key): """ Get data for a given dataset and baseline, as specified in a standard @@ -405,9 +413,7 @@ def x(self, key): spw = slice(self.spw_range[0], self.spw_range[1]) return self.dsets[dset].get_data(bl).T[spw] - - - def dx(self,key): + def dx(self, key): """ Get standard deviation of data for given dataset and baseline as pecified in standard key format. @@ -427,8 +433,8 @@ def dx(self,key): assert isinstance(key,tuple) dset,bl = self.parse_blkey(key) spw = slice(self.spw_range[0], self.spw_range[1]) - return self.dsets_std[dset].get_data(bl).T[spw] + return self.dsets_std[dset].get_data(bl).T[spw] def w(self, key): """ @@ -450,7 +456,6 @@ def w(self, key): dset, bl = self.parse_blkey(key) spw = slice(self.spw_range[0], self.spw_range[1]) - if self.wgts[dset] is not None: return self.wgts[dset].get_data(bl).T[spw] else: @@ -459,8 +464,6 @@ def w(self, key): wgts = (~self.dsets[dset].get_flags(bl)).astype(float).T[spw] return wgts - - def set_C(self, cov): """ Set the cached covariance matrix to a set of user-provided values. @@ -515,7 +518,6 @@ def C_model(self, key, model='empirical'): return self._C[Ckey] - def cross_covar_model(self, key1, key2, model='empirical', conj_1=False, conj_2=True): """ Return a covariance model having specified a key and model type. @@ -797,7 +799,7 @@ def set_Ndlys(self, ndlys=None): raise ValueError("Cannot estimate more delays than there are frequency channels") self.spw_Ndlys = ndlys - def cov_q_hat(self,key1,key2,time_indices=None): + def cov_q_hat(self, key1, key2, time_indices=None): """ Compute the un-normalized covariance matrix for q_hat for a given pair of visibility vectors. Returns the following matrix: @@ -811,80 +813,75 @@ def cov_q_hat(self,key1,key2,time_indices=None): Parameters ---------- - alpha,beta: integers for covariance index. - - key1,key2: tuples or lists of tuples + key1, key2: tuples or lists of tuples Tuples containing the indices of the dataset and baselines for the two input datavectors. If a list of tuples is provided, the baselines in the list will be combined with inverse noise weights. - time_indices, list of indices of times to include or just a single time. - default, None -> compute covariance for all times. - + time_indices: list of indices of times to include or just a single time. + default is None -> compute covariance for all times. Returns ------- - cov_q_hat, matrix with covariances between un-normalized band powers + cov_q_hat: matrix with covariances between un-normalized band powers """ + # type check if time_indices is None: - time_indices=[tind for tind in range(self.Ntimes)] - elif isinstance(time_indices,int): - time_indices=[time_indices] - if not isinstance(time_indices,list): + time_indices = [tind for tind in range(self.Ntimes)] + elif isinstance(time_indices, (int, np.integer)): + time_indices = [time_indices] + if not isinstance(time_indices, list): raise ValueError("time_indices must be an integer or list of integers.") + #check time_indices for tind in time_indices: - if not (tind>=0 and tind<=self.Ntimes): + if not (tind >= 0 and tind <= self.Ntimes): raise ValueError("Invalid time index provided.") - qc=np.zeros((len(time_indices),self.spw_Ndlys,self.spw_Ndlys),dtype=complex) - R1,R2=0,0 - n1a,n2a,n1b,n2b=0,0,0,0 - #compute noise covariance matrices. Assume diagonal! - #compute E^alpha and E^beta - if isinstance(key1,list): + qc = np.zeros((len(time_indices), self.spw_Ndlys, self.spw_Ndlys), dtype=np.complex128) + R1, R2 = 0.0, 0.0 + n1a, n2a, n1b, n2b=0.0, 0.0, 0.0, 0.0 + # compute noise covariance matrices. Assume diagonal! + # compute E^alpha and E^beta + if isinstance(key1, list): for _key in key1: - R1+=self.R(_key) - n1a+=np.real(self.dx(_key))**2. - n1b+=np.imag(self.dx(_key))**2. + R1 += self.R(_key) + n1a += np.real(self.dx(_key))**2. + n1b += np.imag(self.dx(_key))**2. else: - R1=self.R(key1) - n1a=np.real(self.dx(key1))**2. - n1b=np.imag(self.dx(key1))**2. + R1 = self.R(key1) + n1a = np.real(self.dx(key1))**2. + n1b = np.imag(self.dx(key1))**2. - if isinstance(key2,list): + if isinstance(key2, list): for _key in key2: - R2+=self.R(_key) - n2a+=np.real(self.dx(_key))**2. - n2b+=np.imag(self.dx(_key))**2. + R2 += self.R(_key) + n2a += np.real(self.dx(_key))**2. + n2b += np.imag(self.dx(_key))**2. else: - R2=self.R(key2) - n2a=np.real(self.dx(key2)**2.) - n2b=np.imag(self.dx(key2)**2.) - - N1=np.zeros((self.spw_Nfreqs,self.spw_Nfreqs),dtype=complex) - N2=np.zeros_like(N1) - Qalphas=np.repeat(np.array([self.get_Q_alt(dly) for dly in range(self.spw_Ndlys)])[np.newaxis,:,:,:],self.spw_Ndlys,axis=0) - Qbetas=np.repeat(np.array([self.get_Q_alt(dly) for dly in range(self.spw_Ndlys)])[:,np.newaxis,:,:],self.spw_Ndlys,axis=1) - #Q_alpha/Q_beta are N_dlys x N_dlys x N_freq x N_freq - #taking advantage of broadcast rules! - #matmul only applies to the last two dimensions - #and stacks everything else! - Ealphas=np.matmul(R1.T.conj(),np.matmul(Qalphas,R2)) - Ebetas=np.matmul(R1.T.conj(),np.matmul(Qbetas,R2)) - #E_alpha/E_beta ar N_dlys x N_dlys x N_freq x N_freq - for indnum,tind in enumerate(time_indices): - N1[:,:]=np.diag(n1a[:,tind]+n1b[:,tind]) - N2[:,:]=np.diag(n2a[:,tind]+n2b[:,tind]) - #total covariance is sum of real and imaginary covariances - qc[indnum]=np.trace(np.matmul(Ealphas,np.matmul(N1,np.matmul(Ebetas,N2))),axis1=2,axis2=3) + R2 = self.R(key2) + n2a = np.real(self.dx(key2)**2.) + n2b = np.imag(self.dx(key2)**2.) + + N1 = np.zeros((self.spw_Nfreqs, self.spw_Nfreqs), dtype=np.complex128) + N2 = np.zeros_like(N1) + Qalphas = np.repeat(np.array([self.get_Q_alt(dly) for dly in range(self.spw_Ndlys)])[np.newaxis, :, :, :], self.spw_Ndlys, axis=0) + Qbetas = np.repeat(np.array([self.get_Q_alt(dly) for dly in range(self.spw_Ndlys)])[:, np.newaxis, :, :], self.spw_Ndlys, axis=1) + + # Q_alpha/Q_beta are N_dlys x N_dlys x N_freq x N_freq + # taking advantage of broadcast rules! + # matmul only applies to the last two dimensions + # and stacks everything else! + Ealphas = np.matmul(R1.T.conj(), np.matmul(Qalphas, R2)) + Ebetas = np.matmul(R1.T.conj(), np.matmul(Qbetas, R2)) + #E_alpha/E_beta ar N_dlys x N_dlys x N_freq x N_freq + for indnum, tind in enumerate(time_indices): + N1[:, :] = np.diag(n1a[:, tind] + n1b[:, tind]) + N2[:, :] = np.diag(n2a[:, tind] + n2b[:, tind]) + # total covariance is sum of real and imaginary covariances + qc[indnum] = np.trace(np.matmul(Ealphas, np.matmul(N1, np.matmul(Ebetas, N2))), axis1=2, axis2=3) return qc/4. - - - - - def q_hat(self, key1, key2, allow_fft=False): """ Construct an unnormalized bandpower, q_hat, from a given pair of @@ -914,9 +911,7 @@ def q_hat(self, key1, key2, allow_fft=False): If a list of tuples is provided, the baselines in the list will be combined with inverse noise weights. - allow_fft : bool, optional - Whether to use a fast FFT summation trick to construct q_hat, or a simpler brute-force matrix multiplication. The FFT method assumes a delta-fn bin in delay space. It also only works if the number @@ -927,7 +922,7 @@ def q_hat(self, key1, key2, allow_fft=False): q_hat : array_like Unnormalized bandpowers """ - Rx1, Rx2 = 0, 0 + Rx1, Rx2 = 0.0, 0.0 # Calculate R x_1 if isinstance(key1, list): @@ -982,7 +977,6 @@ def get_G(self, key1, key2): G : array_like, complex Fisher matrix, with dimensions (Nfreqs, Nfreqs). """ - if self.spw_Ndlys == None: raise ValueError("Number of delay bins should have been set" "by now! Cannot be equal to None") @@ -1066,7 +1060,6 @@ def get_H(self, key1, key2, sampling=False): H : array_like, complex Dimensions (Nfreqs, Nfreqs). """ - if self.spw_Ndlys == None: raise ValueError("Number of delay bins should have been set" "by now! Cannot be equal to None") @@ -1137,7 +1130,6 @@ def get_unnormed_E(self,key1,key2): Set of E matrices, with dimensions (Ndlys, Nfreqs, Nfreqs). """ - if self.spw_Ndlys == None: raise ValueError("Number of delay bins should have been set" "by now! Cannot be equal to None") @@ -1210,7 +1202,6 @@ def get_unnormed_V(self, key1, key2, model='empirical'): V : array_like, complex Bandpower covariance matrix, with dimensions (Nfreqs, Nfreqs). """ - # Collect all the relevant pieces E_matrices = self.get_unnormed_E(key1, key2) C1 = self.C_model(key1) @@ -1281,7 +1272,6 @@ def get_MW(self, G, H, mode='I', band_covar=None): Window function matrix, W. (If G was passed in as a dict, a dict of array_like will be returned.) """ - ### Next few lines commented out because (while elegant), the situation ### here where G is a distionary is not supported by the rest of the code. # Recursive case, if many G's were specified at once @@ -1437,21 +1427,22 @@ def p_hat(self, M, q): """ return np.dot(M, q) - def cov_p_hat(self,M,q_cov): + def cov_p_hat(self, M, q_cov): """ Covariance estimate between two different band powers p_alpha and p_beta given by M_{alpha i} M^*_{beta,j} C_q^{ij} where C_q^{ij} is the q-covariance Parameters ---------- - M: array_like + M : array_like Normalization matrix, M. - q_cov: array_like + + q_cov : array_like covariance between bandpowers in q_alpha and q_beta """ - p_cov=np.zeros_like(q_cov) + p_cov = np.zeros_like(q_cov) for tnum in range(len(p_cov)): - p_cov[tnum]=np.einsum('ab,cd,bd->ac',M,M,q_cov[tnum]) + p_cov[tnum] = np.einsum('ab,cd,bd->ac', M, M, q_cov[tnum]) return p_cov def broadcast_dset_flags(self, spw_ranges=None, time_thresh=0.2, unflag=False): @@ -1774,7 +1765,7 @@ def validate_pol(self, dsets, pol_pair): def pspec(self, bls1, bls2, dsets, pols, n_dlys=None, input_data_weight='identity', norm='I', taper='none', sampling=False, little_h=True, spw_ranges=None, - verbose=True, history='',store_cov=False): + verbose=True, history='', store_cov=False): """ Estimate the delay power spectrum from a pair of datasets contained in this object, using the optimal quadratic estimator of arXiv:1502.06016. @@ -1819,8 +1810,6 @@ def pspec(self, bls1, bls2, dsets, pols, n_dlys=None, input_data_weight='identit to the order in spw_ranges. Default: None, which then sets n_dlys = number of frequencies. - - input_data_weight : str, optional String specifying which weighting matrix to apply to the input data. See the options in the set_R() method for details. @@ -1850,7 +1839,9 @@ def pspec(self, bls1, bls2, dsets, pols, n_dlys=None, input_data_weight='identit channel used to index the `freq_array` of each dataset. The default (None) is to use the entire band provided in each dataset. - + store_cov : boolean, optional + If True, solve for covariance between bandpowers and store in + output UVPSpec object. verbose : bool, optional If True, print progress, warnings and debugging info to stdout. @@ -1902,10 +1893,12 @@ def pspec(self, bls1, bls2, dsets, pols, n_dlys=None, input_data_weight='identit """ # set taper and data weighting - self.taper = taper - self.data_weighting = input_data_weight + self.set_taper(taper) + self.set_weighting(input_data_weight) + # Validate the input data to make sure it's sensible self.validate_datasets(verbose=verbose) + # Currently the "pspec normalization scalar" doesn't work if a # non-identity data weighting AND a non-trivial taper are used if taper != 'none' and input_data_weight != 'identity': @@ -1921,6 +1914,7 @@ def pspec(self, bls1, bls2, dsets, pols, n_dlys=None, input_data_weight='identit assert isinstance(dsets[0], (int, np.int)) and isinstance(dsets[1], (int, np.int)), "dsets must contain integer indices" dset1 = self.dsets[self.dset_idx(dsets[0])] dset2 = self.dsets[self.dset_idx(dsets[1])] + # assert form of bls1 and bls2 assert isinstance(bls1, list), "bls1 and bls2 must be fed as a list of antpair tuples" assert isinstance(bls2, list), "bls1 and bls2 must be fed as a list of antpair tuples" @@ -1944,6 +1938,7 @@ def pspec(self, bls1, bls2, dsets, pols, n_dlys=None, input_data_weight='identit # validate bl-pair redundancy validate_blpairs(bl_pairs, dset1, dset2, baseline_tol=1.0) + # configure spectral window selections if spw_ranges is None: spw_ranges = [(0, self.Nfreqs)] @@ -2014,7 +2009,6 @@ def pspec(self, bls1, bls2, dsets, pols, n_dlys=None, input_data_weight='identit spw_pol = [] spw_cov = [] - d = self.delays() * 1e-9 dlys.extend(d) spws.extend(np.ones_like(d, np.int) * i) @@ -2059,7 +2053,6 @@ def pspec(self, bls1, bls2, dsets, pols, n_dlys=None, input_data_weight='identit # Loop over baseline pairs for k, blp in enumerate(bl_pairs): - # assign keys if isinstance(blp, list): # interpet blp as group of baseline-pairs @@ -2076,7 +2069,6 @@ def pspec(self, bls1, bls2, dsets, pols, n_dlys=None, input_data_weight='identit key1 = (dsets[0],) + blp[0] + (p_str[0],) key2 = (dsets[1],) + blp[1] + (p_str[1],) - if verbose: print("\n(bl1, bl2) pair: {}\npol: {}".format(blp, tuple(p))) @@ -2113,13 +2105,13 @@ def pspec(self, bls1, bls2, dsets, pols, n_dlys=None, input_data_weight='identit if norm == 'I': pv *= self.scalar_delay_adjustment(Gv=Gv, Hv=Hv) - #Generate the covariance matrix if error bars provided + # Generate the covariance matrix if error bars provided if store_cov: if verbose: print(" Building q_hat covariance...") - cov_qv=self.cov_q_hat(key1,key2) - cov_pv=self.cov_p_hat(Mv,cov_qv) + cov_qv = self.cov_q_hat(key1, key2) + cov_pv = self.cov_p_hat(Mv, cov_qv) if self.primary_beam != None: - cov_pv*=\ + cov_pv *= \ (scalar * self.scalar_delay_adjustment(key1, key2, sampling=sampling))**2. pol_cov.extend(cov_pv) @@ -2171,26 +2163,26 @@ def pspec(self, bls1, bls2, dsets, pols, n_dlys=None, input_data_weight='identit spw_wgts.append(pol_wgts) spw_ints.append(pol_ints) spw_cov.append(pol_cov) + # insert into data and integration dictionaries spw_data = np.moveaxis(np.array(spw_data), 0, -1) spw_wgts = np.moveaxis(np.array(spw_wgts), 0, -1) spw_ints = np.moveaxis(np.array(spw_ints), 0, -1) - spw_cov = np.moveaxis(np.array(spw_cov),0 , -1) + spw_cov = np.moveaxis(np.array(spw_cov), 0, -1) data_array[i] = spw_data cov_array[i] = spw_cov wgt_array[i] = spw_wgts integration_array[i] = spw_ints sclr_arr.append(spw_scalar) - # raise error if none of pols are consistent witht the UVData objects - if len(spw_pol)==0: - raise ValueError("None of the specified polarization pair match that of the UVData objects") + # raise error if none of pols are consistent witht the UVData objects + if len(spw_pol)==0: + raise ValueError("None of the specified polarization pair match that of the UVData objects") # fill uvp object uvp = uvpspec.UVPSpec() # fill meta-data - uvp.store_cov=store_cov uvp.time_1_array = np.array(time1) uvp.time_2_array = np.array(time2) uvp.time_avg_array = np.mean([uvp.time_1_array, uvp.time_2_array], axis=0) @@ -2256,8 +2248,10 @@ def pspec(self, bls1, bls2, dsets, pols, n_dlys=None, input_data_weight='identit uvp.integration_array = integration_array uvp.wgt_array = wgt_array uvp.nsample_array = dict(map(lambda k: (k, np.ones_like(uvp.integration_array[k], np.float)), uvp.integration_array.keys())) + # run check uvp.check() + return uvp def rephase_to_dset(self, dset_index=0, inplace=True): @@ -2378,6 +2372,9 @@ def Jy_to_mK(self, beam=None): if self.primary_beam is not None: print "Warning: feeding a beam model when self.primary_beam already exists..." + # Check beam is not None + assert beam is not None, "Beam object is None but attempting to convert Jy -> mK" + # assert type of beam assert isinstance(beam, pspecbeam.PSpecBeamBase), "beam model must be a subclass of pspecbeam.PSpecBeamBase" @@ -2436,18 +2433,16 @@ def trim_dset_lsts(self, lst_tol=6): def pspec_run(dsets, filename, dsets_std=None, groupname=None, dset_labels=None, dset_pairs=None, - spw_ranges=None, pol_pairs=None, blpairs=None, + psname_ext=None, spw_ranges=None, n_dlys=None, pol_pairs=None, blpairs=None, input_data_weight='identity', norm='I', taper='none', exclude_auto_bls=True, exclude_permutations=True, - Nblps_per_group=None, bl_len_range=(0, 1e10), bl_error_tol=1.0, - beam=None, cosmo=None, rephase_to_dset=None, Jy2mK=True, - overwrite=True, verbose=True, store_cov=False, history=''): + Nblps_per_group=None, bl_len_range=(0, 1e10), bl_deg_range=(0, 180), bl_error_tol=1.0, + beam=None, cosmo=None, rephase_to_dset=None, trim_dset_lsts=False, broadcast_dset_flags=True, + time_thresh=0.2, Jy2mK=False, overwrite=True, verbose=True, store_cov=False, history=''): """ - Create a PSpecData object, run OQE delay spectrum estimation and write results to a PSpecContainer object. - Parameters ---------- dsets : list @@ -2468,15 +2463,25 @@ def pspec_run(dsets, filename, dsets_std=None, groupname=None, dset_labels=None, List of strings to label the input datasets. These labels form the psname of each UVPSpec object. Default is "dset0_x_dset1" where 0 and 1 are replaced with the dset index in dsets. + Note: it is not advised to put underscores in the dset label names, + as some downstream functions assume this to be the case. dset_pairs : list of len-2 integer tuples List of tuples specifying the dset pairs to use in OQE estimation. Default is to form all N_choose_2 pairs from input dsets. + psname_ext : string + A string extension for the psname in the PSpecContainer object. + Example: 'group/psname{}'.format(psname_ext) + spw_ranges : list of len-2 integer tuples List of tuples specifying the spectral window range. See PSpecData.pspec() for details. Default is the entire band. + n_dlys : list + List of integers denoting number of delays to use per spectral window. + Same length as spw_ranges. + pol_pairs : list of len-2 tuples List of string or integer tuples specifying the polarization pairs to use in OQE with each dataset pair in dset_pairs. @@ -2525,6 +2530,10 @@ def pspec_run(dsets, filename, dsets_std=None, groupname=None, dset_labels=None, A tuple containing the minimum and maximum baseline length to use in utils.calc_reds call. Only used if blpairs is None. + bl_deg_range : len-2 float tuple + A tuple containing the min and max baseline angle (ENU frame in degrees) + to use in utils.calc_reds. Total range is between 0 and 180 degrees. + bl_error_tol : float Baseline vector error tolerance when constructing redundant groups. @@ -2543,11 +2552,26 @@ def pspec_run(dsets, filename, dsets_std=None, groupname=None, dset_labels=None, visibility data to the LST-grid of this dataset. Default behavior is no rephasing. + trim_dset_lsts : boolean + If True, look for constant offset in LST between all dsets, and trim + non-overlapping LSTs. + + broadcast_dset_flags : boolean + If True, broadcast dset flags across time using fractional time_thresh. + + time_thresh : float + Fractional flagging threshold, above which a broadcast of flags across + time is triggered (if broadcast_dset_flags == True). This is done + independently for each baseline's visibility waterfall. + Jy2mK : boolean If True, use the beam model provided to convert the units of each dataset from Jy to milli-Kelvin. If the visibility data are not in Jy, this correction is not applied. + store_cov : boolean, optional + If True, solve for covariance between bandpowers and store in + output UVPSpec object. overwrite : boolean If True, overwrite outputs if they exist on disk. @@ -2566,10 +2590,13 @@ def pspec_run(dsets, filename, dsets_std=None, groupname=None, dset_labels=None, """ # type check err_msg = "dsets must be fed as a list of dataset string paths or UVData objects." - #if isinstance(dsets, (str, np.str, UVData)): - # dsets = [dsets] AEW: I've commented this out since the code below assumes >1 dataset. assert isinstance(dsets, (list, tuple, np.ndarray)), err_msg - Ndsets = len(dsets) + + # parse psname + if psname_ext is not None: + assert isinstance(psname_ext, (str, np.str)) + else: + psname_ext = '' # polarizations check if pol_pairs is not None: @@ -2589,45 +2616,57 @@ def pspec_run(dsets, filename, dsets_std=None, groupname=None, dset_labels=None, # get redundant baseline groups bls = None + # Construct dataset pairs to operate on + Ndsets = len(dsets) + if dset_pairs is None: + dset_pairs = list(itertools.combinations(range(Ndsets), 2)) + + if dset_labels is None: + dset_labels = ["dset{}".format(i) for i in range(Ndsets)] + else: + assert not np.any(['_' in dl for dl in dset_labels]), "cannot accept underscores in input dset_labels: {}".format(dset_labels) + # load data if fed as filepaths if isinstance(dsets[0], (str, np.str)): - # load data into UVData objects if fed as list of strings - t0 = time.time() - _dsets = [] - for d in dsets: - uvd = UVData() - uvd.read_miriad(d, bls=bls, polarizations=pols) - _dsets.append(uvd) - dsets = _dsets - utils.log("Loaded data in %1.1f sec." % (time.time() - t0), - lvl=1, verbose=verbose) + try: + # load data into UVData objects if fed as list of strings + t0 = time.time() + dsets = _load_dsets(dsets, bls=bls, pols=pols, verbose=verbose) + utils.log("Loaded data in %1.1f sec." % (time.time() - t0), + lvl=1, verbose=verbose) + except ValueError: + # at least one of the dset loads failed due to no data being present + utils.log("One of the dset loads failed due to no data overlap given the bls and pols selection", verbose=verbose) + return + err_msg = "dsets must be fed as a list of dataset string paths or UVData objects." assert np.all([isinstance(d, UVData) for d in dsets]), err_msg - #check dsets_std input + # check dsets_std input if dsets_std is not None: - #if isinstance(dsets_std,(str,np.str,UVData)): - # dsets_std=[dsets_std] - assert isinstance(dsets_std,(list,tuple,np.ndarray)),err_msg - assert len(dsets_std) == Ndsets - #if path strings provided, read in UVData objects. - if isinstance(dsets_std[0],(str,np.str)): - t0=time.time() - _dsets_std=[] - for d in dsets_std: - uvd=UVData() - uvd.read_miriad(d,bls=bls,polarizations=pols) - _dsets_std.append(uvd) - dsets_std = _dsets_std - utils.log("Loaded std data in %1.1f sec."%(time.time()-t0),lvl=1,verbose=verbose) - err_msg="dsets_std must be fed as a list of dataset string paths or UVData objects." - assert np.all([isinstance(d,UVData) for d in dsets]), err_msg + err_msg = "input dsets_std must be a list of UVData objects or filepaths to miriad files" + assert isinstance(dsets_std,(list, tuple, np.ndarray)), err_msg + assert len(dsets_std) == Ndsets, "len(dsets_std) must equal len(dsets)" + + # if path strings provided, read in UVData objects. + if isinstance(dsets_std[0], (str, np.str)): + try: + # load data into UVData objects if fed as list of strings + t0 = time.time() + dsets_std = _load_dsets(dsets_std, bls=bls, pols=pols, verbose=verbose) + utils.log("Loaded data in %1.1f sec." % (time.time() - t0), + lvl=1, verbose=verbose) + except ValueError: + # at least one of the dsets_std loads failed due to no data being present + utils.log("One of the dsets_std loads failed due to no data overlap given the bls and pols selection", verbose=verbose) + return + + assert np.all([isinstance(d, UVData) for d in dsets]), err_msg # configure polarization if pol_pairs is None: unique_pols = reduce(operator.and_, [set(d.polarization_array) for d in dsets]) pol_pairs = [(up, up) for up in unique_pols] - assert len(pol_pairs) > 0, "no pol_pairs specified" # load beam @@ -2641,21 +2680,25 @@ def pspec_run(dsets, filename, dsets_std=None, groupname=None, dset_labels=None, beam.cosmo = cosmo # package into PSpecData - ds = PSpecData(dsets=dsets, wgts=[None for d in dsets],dsets_std=dsets_std, beam=beam) + ds = PSpecData(dsets=dsets, wgts=[None for d in dsets], labels=dset_labels, dsets_std=dsets_std, beam=beam) # Rephase if desired if rephase_to_dset is not None: ds.rephase_to_dset(rephase_to_dset) + # trim dset LSTs + if trim_dset_lsts: + ds.trim_dset_lsts() + + # broadcast flags + if broadcast_dset_flags: + ds.broadcast_dset_flags(time_thresh=time_thresh) + # perform Jy to mK conversion if desired if Jy2mK: ds.Jy_to_mK() - # Construct dataset pairs to operate on - if dset_pairs is None: - dset_pairs = list(itertools.combinations(range(Ndsets), 2)) - if dset_labels is None: - dset_labels = ["dset{}".format(i) for i in range(Ndsets)] + # check dset pair type err_msg = "dset_pairs must be fed as a list of len-2 integer tuples" assert isinstance(dset_pairs, list), err_msg assert np.all([isinstance(d, tuple) for d in dset_pairs]), err_msg @@ -2671,7 +2714,8 @@ def pspec_run(dsets, filename, dsets_std=None, groupname=None, dset_labels=None, exclude_auto_bls=exclude_auto_bls, exclude_permutations=exclude_permutations, Nblps_per_group=Nblps_per_group, - bl_len_range=bl_len_range) + bl_len_range=bl_len_range, + bl_deg_range=bl_deg_range) bls1_list.append(bls1) bls2_list.append(bls2) @@ -2699,17 +2743,23 @@ def pspec_run(dsets, filename, dsets_std=None, groupname=None, dset_labels=None, # Loop over dataset combinations for i, dset_idxs in enumerate(dset_pairs): + # check bls lists aren't empty + if len(bls1_list[i]) == 0 or len(bls2_list[i]) == 0: + continue + + # check bls lists aren't empty + if len(bls1_list[i]) == 0 or len(bls2_list[i]) == 0: + continue + # Run OQE - uvp = ds.pspec(bls1_list[i], bls2_list[i], dset_idxs, pol_pairs, - spw_ranges=spw_ranges, - input_data_weight=input_data_weight, - norm=norm, taper=taper, history=history) - + uvp = ds.pspec(bls1_list[i], bls2_list[i], dset_idxs, pol_pairs, + spw_ranges=spw_ranges, n_dlys=n_dlys, store_cov=store_cov, + input_data_weight=input_data_weight, norm=norm, taper=taper, + history=history, verbose=verbose) + # Store output - psname = '{}_x_{}'.format(dset_labels[dset_idxs[0]], - dset_labels[dset_idxs[1]]) - psc.set_pspec(group=groupname, psname=psname, pspec=uvp, - overwrite=overwrite) + psname = '{}_x_{}{}'.format(dset_labels[dset_idxs[0]], dset_labels[dset_idxs[1]], psname_ext) + psc.set_pspec(group=groupname, psname=psname, pspec=uvp, overwrite=overwrite) return psc @@ -2717,27 +2767,48 @@ def pspec_run(dsets, filename, dsets_std=None, groupname=None, dset_labels=None, def get_pspec_run_argparser(): a = argparse.ArgumentParser(description="argument parser for pspecdata.pspec_run()") + def list_of_int_tuples(v): + v = map(lambda x: tuple(map(int, x.split())), v.split(",")) + return v + + def list_of_str_tuples(v): + v = map(lambda x: tuple(map(str, x.split())), v.split(",")) + return v + + def list_of_tuple_tuples(v): + v = map(lambda x: tuple(map(int, x.split())), v.split(",")) + v = map(lambda x: (x[:2], x[2:]), v) + return v + a.add_argument("dsets", nargs='*', help="List of UVData objects or miriad filepaths.") a.add_argument("filename", type=str, help="Output filename of HDF5 container.") + a.add_argument("--dsets_std", nargs='*', default=None, type=str, help="List of miriad filepaths to visibility standard deviations.") a.add_argument("--groupname", default=None, type=str, help="Groupname for the UVPSpec objects in the HDF5 container.") - a.add_argument("--dset_pairs", default=None, type=tuple, nargs='*', help="List of len-2 integer tuples of dset pairings for OQE.") + a.add_argument("--dset_pairs", default=None, type=list_of_int_tuples, help="List of dset pairings for OQE wrapped in quotes. Ex: '0 0, 1 1' --> [(0, 0), (1, 1), ...]") a.add_argument("--dset_labels", default=None, type=str, nargs='*', help="List of string labels for each input dataset.") - a.add_argument("--spw_ranges", default=None, type=tuple, nargs='*', help="List of len-2 integer tuples of spectral window selections for OQE.") - a.add_argument("--pol_pairs", default=None, type=tuple, nargs='*', help="List of len-2 integer of string tuples of polarization pairs for OQE.") - a.add_argument("--blpairs", default=None, type=tuple, nargs='*', help="List of integer tuples containing baseline pair to run OQE on. Ex: [((1, 2), (3, 4)), ((1, 2), (5, 6)), ...]") + a.add_argument("--spw_ranges", default=None, type=list_of_int_tuples, help="List of spw channel selections wrapped in quotes. Ex: '200 300, 500 650' --> [(200, 300), (500, 650), ...]") + a.add_argument("--n_dlys", default=None, type=int, nargs='+', help="List of integers specifying number of delays to use per spectral window selection.") + a.add_argument("--pol_pairs", default=None, type=list_of_str_tuples, help="List of pol-string pairs to use in OQE wrapped in quotes. Ex: 'xx xx, yy yy' --> [('xx', 'xx'), ('yy', 'yy'), ...]") + a.add_argument("--blpairs", default=None, type=list_of_tuple_tuples, help="List of baseline-pair antenna integers to run OQE on. Ex: '1 2 3 4, 5 6 7 8' --> [((1 2), (3, 4)), ((5, 6), (7, 8)), ...]") a.add_argument("--input_data_weight", default='identity', type=str, help="Data weighting for OQE. See PSpecData.pspec for details.") a.add_argument("--norm", default='I', type=str, help='M-matrix normalization type for OQE. See PSpecData.pspec for details.') a.add_argument("--taper", default='none', type=str, help="Taper function to use in OQE delay transform. See PSpecData.pspec for details.") a.add_argument("--beam", default=None, type=str, help="Filepath to UVBeam healpix map of antenna beam.") - a.add_argument("--cosmo", default=None, nargs='*', type=float, help="List of float values for [Om_L, Om_b, Om_c, H0, Om_M, Om_k].") + a.add_argument("--cosmo", default=None, nargs='+', type=float, help="List of float values for [Om_L, Om_b, Om_c, H0, Om_M, Om_k].") a.add_argument("--rephase_to_dset", default=None, type=int, help="dset integer index to phase all other dsets to. Default is no rephasing.") + a.add_argument("--trim_dset_lsts", default=False, action='store_true', help="Trim non-overlapping dset LSTs.") + a.add_argument("--broadcast_dset_flags", default=False, action='store_true', help="Broadcast dataset flags across time according to time_thresh.") + a.add_argument("--time_thresh", default=0.2, type=float, help="Fractional flagging threshold across time to trigger flag broadcast if broadcast_dset_flags is True") a.add_argument("--Jy2mK", default=False, action='store_true', help="Convert datasets from Jy to mK if a beam model is provided.") a.add_argument("--exclude_auto_bls", default=False, action='store_true', help='If blpairs is not provided, exclude all baselines paired with itself.') a.add_argument("--exclude_permutations", default=False, action='store_true', help='If blpairs is not provided, exclude a basline-pair permutations. Ex: if (A, B) exists, exclude (B, A).') a.add_argument("--Nblps_per_group", default=None, type=int, help="If blpairs is not provided and group == True, set the number of blpairs in each group.") - a.add_argument("--bl_len_range", default=(0, 1e10), type=tuple, help="If blpairs is not provided, limit the baselines used based on their minimum and maximum length in meters.") + a.add_argument("--bl_len_range", default=(0, 1e10), nargs='+', type=float, help="If blpairs is not provided, limit the baselines used based on their minimum and maximum length in meters.") + a.add_argument("--bl_deg_range", default=(0, 180), nargs='+', type=float, help="If blpairs is not provided, limit the baseline used based on a min and max angle cut in ENU frame in degrees.") a.add_argument("--bl_error_tol", default=1.0, type=float, help="If blpairs is not provided, this is the error tolerance in forming redundant baseline groups in meters.") + a.add_argument("--store_cov", default=False, action='store_true', help="Compute and store covariance of bandpowers given dsets_std files.") a.add_argument("--overwrite", default=False, action='store_true', help="Overwrite output if it exists.") + a.add_argument("--psname_ext", default='', type=str, help="Extension for pspectra name in PSpecContainer.") a.add_argument("--verbose", default=False, action='store_true', help="Report feedback to standard output.") return a @@ -2789,7 +2860,22 @@ def validate_blpairs(blpairs, uvd1, uvd2, baseline_tol=1.0, verbose=True): if np.linalg.norm(bl1_vec - bl2_vec) >= baseline_tol: raise_warning("blpair {} exceeds redundancy tolerance of {} m".format(blp, baseline_tol), verbose=verbose) + def raise_warning(warning, verbose=True): '''warning function''' if verbose: print(warning) + + +def _load_dsets(fnames, bls=None, pols=None, logf=None, verbose=True): + """ helper function for loading Miriad datasets in pspec_run """ + dsets = [] + Ndsets = len(fnames) + for i, dset in enumerate(fnames): + utils.log("Reading {} / {} datasets...".format(i+1, Ndsets), f=logf, lvl=1, verbose=verbose) + # read data + uvd = UVData() + uvd.read_miriad(glob.glob(dset), bls=bls, polarizations=pols) + dsets.append(uvd) + return dsets + diff --git a/hera_pspec/testing.py b/hera_pspec/testing.py index 0998b686..93cc9e1b 100644 --- a/hera_pspec/testing.py +++ b/hera_pspec/testing.py @@ -111,7 +111,7 @@ def build_vanilla_uvpspec(beam=None): return uvp, cosmo -def uvpspec_from_data(data, bls, data_std=None, spw_ranges=None, beam=None, taper='none', cosmo=None, verbose=False): +def uvpspec_from_data(data, bl_grps, data_std=None, spw_ranges=None, beam=None, taper='none', cosmo=None, verbose=False): """ Build an example UVPSpec object from a visibility file and PSpecData. @@ -120,9 +120,9 @@ def uvpspec_from_data(data, bls, data_std=None, spw_ranges=None, beam=None, tape data : UVData object or str This can be a UVData object or a string filepath to a miriad file. - bls : list - This is a list of at least 2 baseline tuples. - Ex: [(24, 25), (37, 38), ...] + bl_grps : list + This is a list of baseline groups (e.g. redundant groups) to form blpairs from. + Ex: [[(24, 25), (37, 38), ...], [(24, 26), (37, 39), ...], ... ] data_std: UVData object or str or None Can be UVData object or a string filepath to a miriad file. @@ -176,8 +176,16 @@ def uvpspec_from_data(data, bls, data_std=None, spw_ranges=None, beam=None, tape # instantiate pspecdata ds = pspecdata.PSpecData(dsets=[uvd, uvd], dsets_std=[uvd_std,uvd_std], wgts=[None, None], labels=['d1', 'd2'], beam=beam) - # get red bls - bls1, bls2, _ = utils.construct_blpairs(bls, exclude_auto_bls=True) + # get blpair groups + assert isinstance(bl_grps, list), "bl_grps must be a list" + if not isinstance(bl_grps[0], list): bl_grps = [bl_grps] + assert np.all([isinstance(blgrp, list) for blgrp in bl_grps]), "bl_grps must be fed as a list of lists" + assert np.all([isinstance(blgrp[0], tuple) for blgrp in bl_grps]), "bl_grps must be fed as a list of lists of tuples" + bls1, bls2 = [], [] + for blgrp in bl_grps: + _bls1, _bls2, _ = utils.construct_blpairs(blgrp, exclude_auto_bls=True, exclude_permutations=True) + bls1.extend(_bls1) + bls2.extend(_bls2) # run pspec uvp = ds.pspec(bls1, bls2, (0, 1), (pol, pol), input_data_weight='identity', spw_ranges=spw_ranges, diff --git a/hera_pspec/tests/test_container.py b/hera_pspec/tests/test_container.py index d159b9bb..a5b0c4cc 100644 --- a/hera_pspec/tests/test_container.py +++ b/hera_pspec/tests/test_container.py @@ -1,9 +1,10 @@ import unittest from nose.tools import assert_raises +import nose.tools as nt import numpy as np import os, sys, copy from hera_pspec.data import DATA_PATH -from hera_pspec import PSpecContainer, UVPSpec, testing +from hera_pspec import container, PSpecContainer, UVPSpec, testing class Test_PSpecContainer(unittest.TestCase): @@ -124,3 +125,51 @@ def test_PSpecContainer(self): # Check that save() can be called ps_store.save() + +def test_merge_spectra(): + fname = os.path.join(DATA_PATH, "zen.2458042.17772.xx.HH.uvXA") + uvp1 = testing.uvpspec_from_data(fname, [(24, 25), (37, 38)], spw_ranges=[(10, 40)]) + uvp2 = testing.uvpspec_from_data(fname, [(38, 39), (52, 53)], spw_ranges=[(10, 40)]) + + # test basic execution + if os.path.exists('ex.h5'): + os.remove('ex.h5') + psc = PSpecContainer("ex.h5", mode='rw') + psc.set_pspec("grp1", "uvp_a", uvp1, overwrite=True) + psc.set_pspec("grp1", "uvp_b", uvp2, overwrite=True) + container.merge_spectra(psc, dset_split_str=None, ext_split_str='_') + nt.assert_equal(psc.spectra('grp1'), [u'uvp']) + + # test dset name handling + if os.path.exists('ex.h5'): + os.remove('ex.h5') + psc = PSpecContainer("ex.h5", mode='rw') + psc.set_pspec("grp1", "d1_x_d2_a", uvp1, overwrite=True) + psc.set_pspec("grp1", "d1_x_d2_b", uvp2, overwrite=True) + psc.set_pspec("grp1", "d2_x_d3_a", uvp1, overwrite=True) + psc.set_pspec("grp1", "d2_x_d3_b", uvp2, overwrite=True) + container.merge_spectra('ex.h5', dset_split_str='_x_', ext_split_str='_') + nt.assert_equal(psc.spectra('grp1'), [u'd1_x_d2', u'd2_x_d3']) + + # test exceptions + if os.path.exists('ex.h5'): + os.remove('ex.h5') + psc = PSpecContainer("ex.h5", mode='rw') + # test no group exception + nt.assert_raises(AssertionError, container.merge_spectra, psc) + # test failed combine_uvpspec + psc.set_pspec("grp1", "d1_x_d2_a", uvp1, overwrite=True) + psc.set_pspec("grp1", "d1_x_d2_b", uvp1, overwrite=True) + container.merge_spectra(psc, dset_split_str='_x_', ext_split_str='_') + nt.assert_equal(psc.spectra('grp1'), [u'd1_x_d2_a', u'd1_x_d2_b']) + + if os.path.exists("ex.h5"): + os.remove("ex.h5") + +def test_merge_spectra_argparser(): + args = container.get_merge_spectra_argparser() + a = args.parse_args(["filename", "--dset_split_str", "_x_", "--ext_split_str", "_"]) + nt.assert_equal(a.filename, "filename") + nt.assert_equal(a.dset_split_str, "_x_") + nt.assert_equal(a.ext_split_str, "_") + diff --git a/hera_pspec/tests/test_grouping.py b/hera_pspec/tests/test_grouping.py index febad6be..0106d551 100644 --- a/hera_pspec/tests/test_grouping.py +++ b/hera_pspec/tests/test_grouping.py @@ -5,7 +5,10 @@ from hera_pspec.data import DATA_PATH from hera_pspec import uvpspec, conversions, parameter, pspecbeam, pspecdata, testing from hera_pspec import uvpspec_utils as uvputils -from hera_pspec import grouping +from hera_pspec import grouping, container +from pyuvdata import UVData +from hera_cal import redcal + class Test_grouping(unittest.TestCase): @@ -213,5 +216,96 @@ def test_select_common(self): times=True, pols=True, inplace=True) self.assertEqual(uvp1, uvp2) +def test_bootstrap_resampled_error(): + # generate a UVPSpec + visfile = os.path.join(DATA_PATH, "zen.even.xx.LST.1.28828.uvOCRSA") + beamfile = os.path.join(DATA_PATH, "HERA_NF_dipole_power.beamfits") + cosmo = conversions.Cosmo_Conversions() + beam = pspecbeam.PSpecBeamUV(beamfile, cosmo=cosmo) + uvd = UVData() + uvd.read_miriad(visfile) + ap, a = uvd.get_ENU_antpos(pick_data_ants=True) + reds = redcal.get_pos_reds(dict(zip(a, ap)), low_hi=True, bl_error_tol=1.0)[:3] + uvp = testing.uvpspec_from_data(uvd, reds, spw_ranges=[(50, 100)], beam=beam, cosmo=cosmo) + + # Lots of this function is already tested by bootstrap_run + # so only test the stuff not already tested + if os.path.exists("uvp.h5"): + os.remove("uvp.h5") + uvp.write_hdf5("uvp.h5", overwrite=True) + ua, ub, uw = grouping.bootstrap_resampled_error("uvp.h5", blpair_groups=None, Nsamples=10, seed=0, verbose=False) + # check number of boots + nt.assert_equal(len(ub), 10) + # check seed has been used properly + nt.assert_equal(uw[0][0][:5], [1.0, 1.0, 0.0, 2.0, 1.0]) + nt.assert_equal(uw[0][1][:5], [2.0, 1.0, 1.0, 6.0, 1.0]) + nt.assert_equal(uw[1][0][:5], [2.0, 2.0, 1.0, 1.0, 2.0]) + nt.assert_equal(uw[1][1][:5], [1.0, 0.0, 1.0, 1.0, 4.0]) + + if os.path.exists("uvp.h5"): + os.remove("uvp.h5") + +def test_bootstrap_run(): + # generate a UVPSpec and container + visfile = os.path.join(DATA_PATH, "zen.even.xx.LST.1.28828.uvOCRSA") + beamfile = os.path.join(DATA_PATH, "HERA_NF_dipole_power.beamfits") + cosmo = conversions.Cosmo_Conversions() + beam = pspecbeam.PSpecBeamUV(beamfile, cosmo=cosmo) + uvd = UVData() + uvd.read_miriad(visfile) + ap, a = uvd.get_ENU_antpos(pick_data_ants=True) + reds = redcal.get_pos_reds(dict(zip(a, ap)), low_hi=True, bl_error_tol=1.0)[:3] + uvp = testing.uvpspec_from_data(uvd, reds, spw_ranges=[(50, 100)], beam=beam, cosmo=cosmo) + if os.path.exists("ex.h5"): + os.remove("ex.h5") + psc = container.PSpecContainer("ex.h5", mode='rw') + psc.set_pspec("grp1", "uvp", uvp) + + # Test basic bootstrap run + grouping.bootstrap_run(psc, time_avg=True, Nsamples=100, seed=0, + normal_std=True, robust_std=True, conf_ints=[16, 84], keep_samples=True, + bl_error_tol=1.0, overwrite=True, add_to_history='hello!', verbose=False) + spcs = psc.spectra("grp1") + # assert all bs samples were written + nt.assert_true(np.all(["uvp_bs{}".format(i) in spcs for i in range(100)])) + # assert average was written + nt.assert_true("uvp_avg" in spcs and "uvp" in spcs) + # assert average only has one time and 3 blpairs + uvp_avg = psc.get_pspec("grp1", "uvp_avg") + nt.assert_equal(uvp_avg.Ntimes, 1) + nt.assert_equal(uvp_avg.Nblpairs, 3) + # check avg file history + nt.assert_true("hello!" in uvp_avg.history) + # assert original uvp is unchanged + nt.assert_true(uvp == psc.get_pspec("grp1", 'uvp')) + # check stats array + ## TODO + + # test exceptions + del psc + if os.path.exists("ex.h5"): + os.remove("ex.h5") + psc = container.PSpecContainer("ex.h5", mode='rw') + # test empty groups + nt.assert_raises(AssertionError, grouping.bootstrap_run, "ex.h5") + # test bad filename + nt.assert_raises(AssertionError, grouping.bootstrap_run, 1) + # test fed spectra doesn't exist + psc.set_pspec("grp1", "uvp", uvp) + nt.assert_raises(AssertionError, grouping.bootstrap_run, psc, spectra=['grp1/foo']) + if os.path.exists("ex.h5"): + os.remove("ex.h5") + + +def test_get_bootstrap_run_argparser(): + args = grouping.get_bootstrap_run_argparser() + a = args.parse_args(['fname', '--spectra', 'grp1/uvp1', 'grp1/uvp2', 'grp2/uvp1', + '--blpair_groups', '101102103104 101102102103, 102103104105', + '--time_avg', 'True', '--Nsamples', '100', '--conf_ints', '16', '84']) + nt.assert_equal(a.spectra, ['grp1/uvp1', 'grp1/uvp2', 'grp2/uvp1']) + nt.assert_equal(a.blpair_groups, [[101102103104, 101102102103], [102103104105]]) + nt.assert_equal(a.conf_ints, [16.0, 84.0]) + + if __name__ == "__main__": unittest.main() diff --git a/hera_pspec/tests/test_pspecdata.py b/hera_pspec/tests/test_pspecdata.py index 8bcfc2ed..13d047b2 100644 --- a/hera_pspec/tests/test_pspecdata.py +++ b/hera_pspec/tests/test_pspecdata.py @@ -99,11 +99,12 @@ def setUp(self): _d = uv.UVData() _d.read_miriad(os.path.join(DATA_PATH, dfile)) self.d.append(_d) + # Load standard deviations - self.d_std=[] + self.d_std = [] for dfile in dfiles_std: _d = uv.UVData() - _d.read_miriad(os.path.join(DATA_PATH,dfile)) + _d.read_miriad(os.path.join(DATA_PATH, dfile)) self.d_std.append(_d) # Set trivial weights @@ -153,6 +154,28 @@ def test_init(self): # Test exception when not a UVData instance self.assertRaises(TypeError, ds.add, [1], [None]) + # Test get weights when fed a UVData for weights + ds = pspecdata.PSpecData(dsets=[self.uvd, self.uvd], wgts=[self.uvd, self.uvd]) + key = (0, (24, 25), 'xx') + nt.assert_true(np.all(np.isclose(ds.x(key), ds.w(key)))) + + # Test labels when adding dsets + uvd = self.uvd + ds = pspecdata.PSpecData() + nt.assert_equal(len(ds.labels), 0) + ds.add([uvd, uvd], [None, None]) + nt.assert_equal(len(ds.labels), 2) + ds.add(uvd, None, labels='foo') + nt.assert_equal(len(ds.dsets), len(ds.labels), 3) + nt.assert_equal(ds.labels, ['dset0', 'dset1', 'foo']) + ds.add(uvd, None) + nt.assert_equal(ds.labels, ['dset0', 'dset1', 'foo', 'dset3']) + + # Test some exceptions + ds = pspecdata.PSpecData() + nt.assert_raises(ValueError, ds.get_G, key, key) + nt.assert_raises(ValueError, ds.get_H, key, key) + def test_add_data(self): """ Test adding non UVData object. @@ -161,6 +184,7 @@ def test_add_data(self): #test TypeError if dsets is dict but dsets_std is not nt.assert_raises(TypeError,self.ds.add,{'d':0},{'w':0},None,[0]) nt.assert_raises(TypeError,self.ds.add,{'d':0},{'w':0},None,{'e':0}) + def test_labels(self): """ Test that dataset labels work. @@ -214,7 +238,6 @@ def test_get_Q_alt(self): """ Test the Q = dC/dp function. """ - vect_length = 50 x_vect = np.random.normal(size=vect_length) \ + 1.j * np.random.normal(size=vect_length) @@ -248,7 +271,6 @@ def test_get_Q_alt(self): self.assertAlmostEqual(xQx, np.abs(vect_length**2.)) # Sending in sinusoids for x and y should give delta functions - # Now do all the same tests from above but for a different number # of delay channels self.ds.set_Ndlys(vect_length-3) @@ -470,69 +492,67 @@ def test_get_MW(self): for norm in test_norm: self.assertAlmostEqual(norm, 1.) - - def test_cov_q(self,ndlys=13): + def test_cov_q(self, ndlys=13): """ Test that q_hat_cov has the right shape and accepts keys in correct format. Also validate with arbitrary number of delays. """ for d in self.d: - d.flag_array[:]=False #ensure that there are no flags! - d.select(times=np.unique(d.time_array)[:10],frequencies=d.freq_array[0,:16]) + d.flag_array[:] = False #ensure that there are no flags! + d.select(times=np.unique(d.time_array)[:10], frequencies=d.freq_array[0, :16]) for d_std in self.d_std: - d_std.flag_array[:]=False - d_std.select(times=np.unique(d_std.time_array)[:10],frequencies=d_std.freq_array[0,:16]) - self.ds=pspecdata.PSpecData(dsets=self.d,wgts=self.w,dsets_std=self.d_std) - self.ds=pspecdata.PSpecData(dsets=self.d,wgts=self.w,dsets_std=self.d_std) + d_std.flag_array[:] = False + d_std.select(times=np.unique(d_std.time_array)[:10], frequencies=d_std.freq_array[0, :16]) + self.ds = pspecdata.PSpecData(dsets=self.d, wgts=self.w, dsets_std=self.d_std) + self.ds = pspecdata.PSpecData(dsets=self.d, wgts=self.w, dsets_std=self.d_std) Ntime = self.ds.Ntimes self.ds.set_Ndlys(ndlys) - #Here is the analytic covariance matrix... - chan_x,chan_y=np.meshgrid(range(self.ds.Nfreqs),range(self.ds.Nfreqs)) - cov_analytic=np.zeros((self.ds.spw_Ndlys,self.ds.spw_Ndlys),dtype=np.complex128) + # Here is the analytic covariance matrix... + chan_x, chan_y = np.meshgrid(range(self.ds.Nfreqs), range(self.ds.Nfreqs)) + cov_analytic = np.zeros((self.ds.spw_Ndlys, self.ds.spw_Ndlys), dtype=np.complex128) for alpha in range(self.ds.spw_Ndlys): for beta in range(self.ds.spw_Ndlys): - cov_analytic[alpha,beta]=np.exp(-2j*np.pi*(alpha-beta)*(chan_x-chan_y)/self.ds.spw_Ndlys).sum() - key1 = (0,24,38) - key2 = (1,25,38) + cov_analytic[alpha, beta] = np.exp(-2j*np.pi*(alpha-beta)*(chan_x-chan_y)/self.ds.spw_Ndlys).sum() + key1 = (0, 24, 38) + key2 = (1, 25, 38) print(cov_analytic) for input_data_weight in ['identity','iC']: self.ds.set_weighting(input_data_weight) - for taper in taper_selection: qc = self.ds.cov_q_hat(key1,key2) self.assertTrue(np.allclose(np.array(list(qc.shape)), - np.array([self.ds.Ntimes,self.ds.spw_Ndlys,self.ds.spw_Ndlys]),atol=1e-6)) + np.array([self.ds.Ntimes, self.ds.spw_Ndlys, self.ds.spw_Ndlys]), atol=1e-6)) """ Now test that analytic Error calculation gives Nchan^2 """ self.ds.set_weighting('identity') - qc=self.ds.cov_q_hat(key1,key2) + qc = self.ds.cov_q_hat(key1, key2) self.assertTrue(np.allclose(qc, - np.repeat(cov_analytic[np.newaxis,:,:],self.ds.Ntimes,axis=0),atol=1e-6)) + np.repeat(cov_analytic[np.newaxis, :, :], self.ds.Ntimes, axis=0), atol=1e-6)) """ Test lists of keys """ self.ds.set_weighting('identity') - qc=self.ds.cov_q_hat([key1],[key2],time_indices=[0]) + qc=self.ds.cov_q_hat([key1], [key2], time_indices=[0]) self.assertTrue(np.allclose(qc, - np.repeat(cov_analytic[np.newaxis,:,:],self.ds.Ntimes,axis=0),atol=1e-6)) - self.assertRaises(ValueError,self.ds.cov_q_hat,key1,key2,200) - self.assertRaises(ValueError,self.ds.cov_q_hat,key1,key2,"watch out!") + np.repeat(cov_analytic[np.newaxis, :, :], self.ds.Ntimes, axis=0), atol=1e-6)) + self.assertRaises(ValueError, self.ds.cov_q_hat, key1, key2, 200) + self.assertRaises(ValueError, self.ds.cov_q_hat, key1, key2, "watch out!") + def test_cov_p_hat(self): """ Test cov_p_hat, verify on identity. """ - self.ds=pspecdata.PSpecData(dsets=self.d,wgts=self.w,dsets_std=self.d_std) - cov_p=self.ds.cov_p_hat(np.sqrt(6.)*np.identity(10),np.array([5.*np.identity(10)])) + self.ds = pspecdata.PSpecData(dsets=self.d, wgts=self.w, dsets_std=self.d_std) + cov_p = self.ds.cov_p_hat(np.sqrt(6.)*np.identity(10),np.array([5.*np.identity(10)])) for p in range(10): for q in range(10): - if p==q: - self.assertTrue(np.isclose(30.,cov_p[0,p,q],atol=1e-6)) + if p == q: + self.assertTrue(np.isclose(30., cov_p[0, p, q], atol=1e-6)) else: - self.assertTrue(np.isclose(0.,cov_p[0,p,q],atol=1e-6)) - + self.assertTrue(np.isclose(0., cov_p[0, p, q], atol=1e-6)) def test_q_hat(self): """ @@ -1028,11 +1048,12 @@ def test_pspec(self): # test covariance calculation runs with small number of delays uvd = copy.deepcopy(self.uvd) - uvd_std=copy.deepcopy(self.uvd_std) + uvd_std = copy.deepcopy(self.uvd_std) ds = pspecdata.PSpecData(dsets=[uvd, uvd], wgts=[None, None], - dsets_std=[uvd_std,uvd_std], beam=self.bm) + dsets_std=[uvd_std, uvd_std], beam=self.bm) uvp = ds.pspec(bls1, bls2, (0, 1), ('xx','xx'), input_data_weight='identity', norm='I', taper='none', little_h=True, verbose=True, spw_ranges=[(10,14)], store_cov=True) + nt.assert_true(hasattr(uvp, 'cov_array')) def test_normalization(self): # Test Normalization of pspec() compared to PAPER legacy techniques @@ -1229,21 +1250,25 @@ def test_pspec_run(): fnames_std=[os.path.join(DATA_PATH,d) for d in ['zen.even.std.xx.LST.1.28828.uvOCRSA', 'zen.odd.std.xx.LST.1.28828.uvOCRSA']] # test basic execution - psc = pspecdata.pspec_run(fnames, "./out.hdf5", Jy2mK=False, verbose=False, overwrite=True) + if os.path.exists("./out.hdf5"): + os.remove("./out.hdf5") + psc = pspecdata.pspec_run(fnames, "./out.hdf5", Jy2mK=False, verbose=False, overwrite=True, + bl_len_range=(14, 15), bl_deg_range=(50, 70), psname_ext='_0') nt.assert_true(isinstance(psc, container.PSpecContainer)) nt.assert_equal(psc.groups(), ['dset0_dset1']) - nt.assert_equal(psc.spectra(psc.groups()[0]), ['dset0_x_dset1']) + nt.assert_equal(psc.spectra(psc.groups()[0]), ['dset0_x_dset1_0']) nt.assert_true(os.path.exists("./out.hdf5")) - if os.path.exists("./out.hdf5"): - os.remove("./out.hdf5") - # test Jy2mK, rephase_to_dset, blpairs and dset_labels + # test Jy2mK, rephase_to_dset, trim_dset_lsts, broadcast_dset_flags, blpairs and dset_labels cosmo = conversions.Cosmo_Conversions(Om_L=0.0) - psc = pspecdata.pspec_run(fnames, "./out.hdf5",dsets_std=fnames_std, Jy2mK=True, beam=beamfile, verbose=False, overwrite=True, + if os.path.exists("./out.hdf5"): + os.remove("./out.hdf5") + psc = pspecdata.pspec_run(fnames, "./out.hdf5", dsets_std=fnames_std, Jy2mK=True, beam=beamfile, verbose=False, overwrite=True, rephase_to_dset=0, blpairs=[((37, 38), (37, 38)), ((37, 38), (52, 53))], pol_pairs=[('xx', 'xx'), ('xx', 'xx')], dset_labels=["foo", "bar"], dset_pairs=[(0, 0), (0, 1)], spw_ranges=[(50, 75), (120, 140)], - cosmo=cosmo) + cosmo=cosmo, trim_dset_lsts=True, broadcast_dset_flags=True, time_thresh=0.1, + store_cov=True) nt.assert_true("foo_bar" in psc.groups()) nt.assert_equal(psc.spectra('foo_bar'), [u'foo_x_bar', u'foo_x_foo']) uvp = psc.get_pspec("foo_bar", "foo_x_bar") @@ -1251,9 +1276,48 @@ def test_pspec_run(): nt.assert_equal(uvp.bl_array.tolist(), [37038, 52053]) nt.assert_equal(uvp.pol_array.tolist(), [-5, -5]) nt.assert_equal(uvp.cosmo, cosmo) + nt.assert_true(hasattr(uvp, 'cov_array')) + nt.assert_equal(set(uvp.labels), set(['bar', 'foo'])) #nt.assert_equal(uvp.labels, []) #nt.assert_equal(uvp.get_spw_ranges, []) + # test when no data is loaded in dset + if os.path.exists("./out.hdf5"): + os.remove("./out.hdf5") + psc = pspecdata.pspec_run(fnames, "./out.hdf5", Jy2mK=False, verbose=False, overwrite=True, + blpairs=[((500, 501), (600, 601))]) + nt.assert_equal(psc, None) + nt.assert_false(os.path.exists("./out.h5")) + uvds = [] + for f in fnames: + uvd = UVData() + uvd.read_miriad(f) + uvds.append(uvd) + psc = pspecdata.pspec_run(uvds, "./out.hdf5", dsets_std=fnames_std, Jy2mK=False, verbose=False, overwrite=True, + blpairs=[((500, 501), (600, 601))]) + nt.assert_equal(psc, None) + nt.assert_false(os.path.exists("./out.h5")) + + # test when data is loaded, but no blpairs match + if os.path.exists("./out.hdf5"): + os.remove("./out.hdf5") + psc = pspecdata.pspec_run(fnames, "./out.hdf5", Jy2mK=False, verbose=False, overwrite=True, + blpairs=[((37, 38), (600, 601))]) + nt.assert_true(psc is not None) + nt.assert_equal(len(psc.groups()), 0) + + # test glob-parseable input dataset + dsets = [os.path.join(DATA_PATH, "zen.2458042.?????.xx.HH.uvXA"), + os.path.join(DATA_PATH, "zen.2458042.?????.xx.HH.uvXA")] + if os.path.exists("./out.hdf5"): + os.remove("./out.hdf5") + psc = pspecdata.pspec_run(dsets, "./out.hdf5", Jy2mK=False, verbose=True, overwrite=True, + blpairs=[((24, 25), (37, 38))]) + uvp = psc.get_pspec("dset0_dset1", "dset0_x_dset1") + nt.assert_equal(uvp.Ntimes, 120) + if os.path.exists("./out.hdf5"): + os.remove("./out.hdf5") + # test exceptions nt.assert_raises(AssertionError, pspecdata.pspec_run, (1, 2), "./out.hdf5") nt.assert_raises(AssertionError, pspecdata.pspec_run, [1, 2], "./out.hdf5") @@ -1267,8 +1331,12 @@ def test_pspec_run(): def test_get_argparser(): args = pspecdata.get_pspec_run_argparser() - a = args.parse_args([['foo'], 'bar']) - + a = args.parse_args([['foo'], 'bar', '--dset_pairs', '0 0, 1 1', '--pol_pairs', 'xx xx, yy yy', + '--spw_ranges', '300 400, 600 800', '--blpairs', '24 25 24 25, 37 38 37 38']) + nt.assert_equal(a.pol_pairs, [('xx', 'xx'), ('yy', 'yy')]) + nt.assert_equal(a.dset_pairs, [(0, 0), (1, 1)]) + nt.assert_equal(a.spw_ranges, [(300, 400), (600, 800)]) + nt.assert_equal(a.blpairs, [((24, 25), (24, 25)), ((37, 38), (37, 38))]) """ # LEGACY MONTE CARLO TESTS diff --git a/hera_pspec/tests/test_testing.py b/hera_pspec/tests/test_testing.py index 342aba0e..63dc9184 100644 --- a/hera_pspec/tests/test_testing.py +++ b/hera_pspec/tests/test_testing.py @@ -4,6 +4,7 @@ import os from pyuvdata import UVData import numpy as np +from hera_cal import redcal def test_build_vanilla_uvpspec(): @@ -18,22 +19,32 @@ def test_build_vanilla_uvpspec(): nt.assert_equal(beam_OP.tolist(), uvp.OmegaP.tolist()) def test_uvpspec_from_data(): + # get data fname = os.path.join(DATA_PATH, "zen.even.xx.LST.1.28828.uvOCRSA") - fname_std = os.path.join(DATA_PATH, "zen.even.std.xx.LST.1.28828.uvOCRSA") uvd = UVData() uvd.read_miriad(fname) beamfile = os.path.join(DATA_PATH, 'HERA_NF_dipole_power.beamfits') beam = pspecbeam.PSpecBeamUV(beamfile) - uvp = testing.uvpspec_from_data(fname, [(37, 38), (38, 39), (52, 53), (53, 54)], beam=beam) - nt.assert_equal(uvp.Nfreqs, 150) - nt.assert_equal(np.unique(uvp.blpair_array).tolist(), [37038038039, 37038052053, 37038053054, 38039037038, - 38039052053, 38039053054, 52053037038, 52053038039, - 52053053054, 53054037038, 53054038039, 53054052053]) - uvp2 = testing.uvpspec_from_data(uvd, [(37, 38), (38, 39), (52, 53), (53, 54)], beam=beamfile) + # test basic execution + uvp = testing.uvpspec_from_data(fname, [(37, 38), (38, 39), (52, 53), (53, 54)], beam=beam, spw_ranges=[(50, 100)]) + nt.assert_equal(uvp.Nfreqs, 50) + nt.assert_equal(np.unique(uvp.blpair_array).tolist(), [37038038039, 37038052053, 37038053054, 38039052053, 38039053054, 52053053054]) + uvp2 = testing.uvpspec_from_data(uvd, [(37, 38), (38, 39), (52, 53), (53, 54)], beam=beamfile, spw_ranges=[(50, 100)]) uvp.history = '' uvp2.history = '' nt.assert_equal(uvp, uvp2) - #test std - uvp = testing.uvpspec_from_data(fname, [(37, 38), (38, 39), (52, 53), (53, 54)], - data_std=fname_std, beam=beam, spw_ranges=[(20,28)]) + + # test multiple bl groups + antpos, ants = uvd.get_ENU_antpos(pick_data_ants=True) + reds = redcal.get_pos_reds(dict(zip(ants, antpos)), low_hi=True) + uvp = testing.uvpspec_from_data(fname, reds[:3], beam=beam, spw_ranges=[(50, 100)]) + nt.assert_equal(len(set(uvp.bl_array) - set([37038, 37051, 37052, 38039, 38052, 38053, 39053, 39054, + 51052, 51067, 52053, 52067, 52068, 53054, 53068, 53069, + 54069, 67068, 68069])), 0) + nt.assert_equal(uvp.Nblpairs, 51) + + # test exceptions + nt.assert_raises(AssertionError, testing.uvpspec_from_data, fname, (37, 38)) + nt.assert_raises(AssertionError, testing.uvpspec_from_data, fname, [([37, 38], [38, 39])]) + nt.assert_raises(AssertionError, testing.uvpspec_from_data, fname, [[[37, 38], [38, 39]]]) diff --git a/hera_pspec/tests/test_utils.py b/hera_pspec/tests/test_utils.py index 56d53dad..2b81db0d 100644 --- a/hera_pspec/tests/test_utils.py +++ b/hera_pspec/tests/test_utils.py @@ -6,8 +6,7 @@ from hera_pspec import utils, testing from collections import OrderedDict as odict from pyuvdata import UVData - - +from hera_cal import redcal def test_cov(): # load another data file @@ -51,6 +50,13 @@ def test_load_config(): # Check that missing files cause an error nt.assert_raises(IOError, utils.load_config, "file_that_doesnt_exist") + # Check 'None' and list of lists become Nones and list of tuples + nt.assert_equal(cfg['data']['pairs'], [('xx', 'xx'), ('yy', 'yy')]) + nt.assert_equal(cfg['pspec']['taper'], 'none') + nt.assert_equal(cfg['pspec']['groupname'], None) + nt.assert_equal(cfg['pspec']['options']['bar'], [('foo', 'bar')]) + nt.assert_equal(cfg['pspec']['options']['foo'], None) + class Test_Utils(unittest.TestCase): @@ -184,6 +190,27 @@ def test_calc_reds(self): uvd2.antenna_positions[0] += 2 nt.assert_raises(AssertionError, utils.calc_reds, uvd, uvd2) + def test_config_pspec_blpairs(self): + # test basic execution + uv_template = os.path.join(DATA_PATH, "zen.{group}.{pol}.LST.1.28828.uvOCRSA") + groupings = utils.config_pspec_blpairs(uv_template, [('xx', 'xx')], [('even', 'odd')], verbose=False) + nt.assert_equal(len(groupings), 1) + nt.assert_equal(groupings.keys()[0], (('even', 'odd'), ('xx', 'xx'))) + nt.assert_equal(len(groupings.values()[0]), 11833) + + # test multiple, some non-existant pairs + groupings = utils.config_pspec_blpairs(uv_template, [('xx', 'xx'), ('yy', 'yy')], [('even', 'odd'), ('even', 'odd')], verbose=False) + nt.assert_equal(len(groupings), 1) + nt.assert_equal(groupings.keys()[0], (('even', 'odd'), ('xx', 'xx'))) + + # test xants + groupings = utils.config_pspec_blpairs(uv_template, [('xx', 'xx')], [('even', 'odd')], xants=[0, 1, 2], verbose=False) + nt.assert_equal(len(groupings.values()[0]), 9735) + + # test exceptions + nt.assert_raises(AssertionError, utils.config_pspec_blpairs, uv_template, [('xx', 'xx'), ('xx', 'xx')], [('even', 'odd')], verbose=False) + + def test_log(): """ Test that log() prints output. @@ -204,8 +231,7 @@ def test_log(): try: raise NameError except NameError: - err, _, tb = sys.exc_info() - utils.log("raised an exception", f=logf, tb=tb, verbose=False) + utils.log("raised an exception", f=logf, tb=sys.exc_info(), verbose=False) logf.close() with open("logf.log", "r") as f: log = ''.join(f.readlines()) @@ -218,3 +244,31 @@ def test_hash(): Check that MD5 hashing works. """ hsh = utils.hash(np.ones((8,16))) + + +def test_get_blvec_reds(): + fname = os.path.join(DATA_PATH, "zen.2458042.17772.xx.HH.uvXA") + uvd = UVData() + uvd.read_miriad(fname) + antpos, ants = uvd.get_ENU_antpos(pick_data_ants=True) + reds = redcal.get_pos_reds(dict(zip(ants, antpos)), low_hi=True) + uvp = testing.uvpspec_from_data(fname, reds[:2], spw_ranges=[(10, 40)]) + + # test execution w/ dictionary + blvecs = dict(zip(uvp.bl_array, uvp.get_ENU_bl_vecs())) + (red_bl_grp, red_bl_len, red_bl_ang, + red_bl_tag) = utils.get_blvec_reds(blvecs, bl_error_tol=1.0) + nt.assert_equal(len(red_bl_grp), 2) + nt.assert_equal(red_bl_tag, ['015_060', '015_120']) + + # test w/ a UVPSpec + (red_bl_grp, red_bl_len, red_bl_ang, + red_bl_tag) = utils.get_blvec_reds(uvp, bl_error_tol=1.0) + nt.assert_equal(len(red_bl_grp), 2) + nt.assert_equal(red_bl_tag, ['015_060', '015_120']) + + # test w/ zero tolerance: each blpair is its own group + (red_bl_grp, red_bl_len, red_bl_ang, + red_bl_tag) = utils.get_blvec_reds(uvp, bl_error_tol=0.0) + nt.assert_equal(len(red_bl_grp), uvp.Nblpairs) + diff --git a/hera_pspec/tests/test_uvpspec.py b/hera_pspec/tests/test_uvpspec.py index a5615f97..0ecea9d4 100644 --- a/hera_pspec/tests/test_uvpspec.py +++ b/hera_pspec/tests/test_uvpspec.py @@ -365,7 +365,7 @@ def test_combine_uvpspec(self): # test concat across blpairts uvp2 = testing.uvpspec_from_data(uvd, [(53, 54), (67, 68)], spw_ranges=[(20, 30), (60, 90)], beam=beam) out = uvpspec.combine_uvpspec([uvp1, uvp2], verbose=False) - nt.assert_equal(out.Nblpairs, 8) + nt.assert_equal(out.Nblpairs, 4) nt.assert_equal(out.Nbls, 5) # test failure due to variable static metadata @@ -435,7 +435,7 @@ def test_combine_uvpspec_std(self): # test concat across blpairts uvp2 = testing.uvpspec_from_data(uvd, [(53, 54), (67, 68)], spw_ranges=[(20, 24), (64, 68)], data_std=uvd_std, beam=beam) out = uvpspec.combine_uvpspec([uvp1, uvp2], verbose=False) - nt.assert_equal(out.Nblpairs, 8) + nt.assert_equal(out.Nblpairs, 4) nt.assert_equal(out.Nbls, 5) # test failure due to variable static metadata diff --git a/hera_pspec/utils.py b/hera_pspec/utils.py index c13e766f..44c121d6 100644 --- a/hera_pspec/utils.py +++ b/hera_pspec/utils.py @@ -7,6 +7,14 @@ from hera_cal import redcal import itertools import argparse +import glob +import os +import aipy +from collections import OrderedDict as odict +from pyuvdata import utils as uvutils +from pyuvdata import UVData +from datetime import datetime + def hash(w): """ @@ -152,7 +160,7 @@ def construct_blpairs(bls, exclude_auto_bls=False, exclude_permutations=False, g def calc_reds(uvd1, uvd2, bl_tol=1.0, filter_blpairs=True, xant_flag_thresh=0.95, exclude_auto_bls=True, - exclude_permutations=True, Nblps_per_group=None, bl_len_range=(0, 1e10)): + exclude_permutations=True, Nblps_per_group=None, bl_len_range=(0, 1e10), bl_deg_range=(0, 180)): """ Use hera_cal.redcal to get matching redundant baselines groups from uvd1 and uvd2 within the specified baseline tolerance, not including flagged ants. @@ -189,10 +197,14 @@ def calc_reds(uvd1, uvd2, bl_tol=1.0, filter_blpairs=True, xant_flag_thresh=0.95 Number of baseline-pairs to put into each sub-group. No grouping if None. Default: None - bl_len_range : tuple of float, optional - Tuple containing minimum baseline length and maximum baseline length [meters] + bl_len_range : tuple, optional + len-2 tuple containing minimum baseline length and maximum baseline length [meters] to keep in baseline type selection + bl_deg_range : tuple, optional + len-2 tuple containing (minimum, maximum) baseline angle in degrees to keep in + baseline selection + Returns ------- baselines1 : list of baseline tuples @@ -274,11 +286,25 @@ def calc_reds(uvd1, uvd2, bl_tol=1.0, filter_blpairs=True, xant_flag_thresh=0.95 # filter based on xants, existance in uvd1 and uvd2 and bl_len_range bls1, bls2 = [], [] for bl1, bl2 in _blps: + # get baseline length and angle bl1i = uvd1.antnums_to_baseline(*bl1) bl2i = uvd1.antnums_to_baseline(*bl2) - bl_len = np.mean(map(lambda bl: np.linalg.norm(antpos[bl[0]]-antpos[bl[1]]), [bl1, bl2])) + bl1v = (antpos[bl1[0]] - antpos[bl1[1]])[:2] + bl2v = (antpos[bl2[0]] - antpos[bl2[1]])[:2] + bl1_len, bl2_len = np.linalg.norm(bl1v), np.linalg.norm(bl2v) + bl1_deg = np.arctan2(*bl1v[::-1]) * 180 / np.pi + if bl1_deg < 0: bl1_deg = (bl1_deg + 180) % 360 + bl2_deg = np.arctan2(*bl2v[::-1]) * 180 / np.pi + if bl2_deg < 0: bl2_deg = (bl2_deg + 180) % 360 + bl_len = np.mean([bl1_len, bl2_len]) + bl_deg = np.mean([bl1_deg, bl2_deg]) + # filter based on length cut if bl_len < bl_len_range[0] or bl_len > bl_len_range[1]: continue + # filter based on angle cut + if bl_deg < bl_deg_range[0] or bl_deg > bl_deg_range[1]: + continue + # filter on other things if filter_blpairs: if (bl1i not in uvd1.baseline_array or bl1[0] in xants1 or bl1[1] in xants1) \ or (bl2i not in uvd2.baseline_array or bl2[0] in xants2 or bl2[1] in xants2): @@ -363,7 +389,7 @@ def spw_range_from_freqs(data, freq_range, bounds_error=True): spw_range : tuple or list of tuples Indices of the channels at the lower and upper bounds of the specified spectral window(s). - + Note: If the requested spectral window is outside the available frequency range, and bounds_error is False, '(None, None)' is returned. """ @@ -471,7 +497,7 @@ def spw_range_from_redshifts(data, z_range, bounds_error=True): def log(msg, f=None, lvl=0, tb=None, verbose=True): """ - Add a message to the log (just prints to the terminal for now). + Add a message to the log. Parameters ---------- @@ -485,16 +511,16 @@ def log(msg, f=None, lvl=0, tb=None, verbose=True): Indent level of the message. Each level adds two extra spaces. Default: 0. - tb : traceback object, optional - Traceback object to print with traceback.format_tb() + tb : traceback tuple, optional + Output of sys.exc_info() verbose : bool, optional if True, print msg. Even if False, still writes to file if f is provided. """ - # catch for traceback provided + # catch for traceback if provided if tb is not None: - msg += "\n{}\n".format(traceback.format_tb(tb)[0]) + msg += "\n{}".format('\n'.join(traceback.format_exception(*tb))) # print output = "%s%s" % (" "*lvl, msg) @@ -504,18 +530,36 @@ def log(msg, f=None, lvl=0, tb=None, verbose=True): # write if f is not None: f.write(output) + f.flush() def load_config(config_file): """ Load configuration details from a YAML file. + All entries of 'None' --> None and all lists + of lists become lists of tuples. """ + # define recursive replace function + def replace(d): + if isinstance(d, (dict, odict)): + for k in d.keys(): + # 'None' and '' turn into None + if d[k] == 'None': d[k] = None + # list of lists turn into lists of tuples + if isinstance(d[k], list) and np.all([isinstance(i, list) for i in d[k]]): + d[k] = [tuple(i) for i in d[k]] + elif isinstance(d[k], (dict, odict)): replace(d[k]) + # Open and read config file with open(config_file, 'r') as cfile: try: cfg = yaml.load(cfile) except yaml.YAMLError as exc: raise(exc) + + # Replace entries + replace(cfg) + return cfg @@ -525,3 +569,284 @@ def flatten(nested_list): """ return [item for sublist in nested_list for item in sublist] + +def config_pspec_blpairs(uv_templates, pol_pairs, group_pairs, exclude_auto_bls=True, + exclude_permutations=True, bl_len_range=(0, 1e10), + bl_deg_range=(0, 180), xants=None, verbose=True): + """ + Given a list of miriad file templates and selections for + polarization and group labels, construct a master list of + blpair-group-pol pairs using utils.construct_reds(). + + A group is a fieldname in the visibility files that denotes the + "type" of dataset. For example, the group field in the following files + zen.even.LST.1.01.xx.HH.uv + zen.odd.LST.1.01.xx.HH.uv + are the "even" and "odd" field, and specifies the two time binning groups. + To form cross spectra between these two files, one would feed a group_pair + of: group_pairs = [('even', 'odd'), ...]. + + A baseline-pair is formed by self-matching unique-files in the + glob-parsed master list, and then string-formatting-in appropriate + pol and group selections given pol_pairs and group_pairs. Those two + files are then passed to utils.calc_reds(..., **kwargs) to construct + the baseline-pairs for that particular file-matching. + + Parameters + ---------- + uv_templates : list + List of glob-parseable string templates, each of which must have + a {pol} and {group} field. + + pol_pairs : list + List of len-2 polarization tuples to use in forming cross spectra. + Ex: [('xx', 'xx'), ('yy', 'yy'), ...] + + group_pairs : list + List of len-2 group tuples to use in forming cross spectra. + See top of doc-string for an explanation of a "group" in this context. + Ex: [('grp1', 'grp1'), ('grp2', 'grp2'), ...] + + exclude_auto_bls : bool + If True, exclude all baselines paired with itself. + + exclude_permutations : bool + If True, exclude baseline2_cross_baseline1 if + baseline1_cross_baseline2 exists. + + bl_len_range : len-2 tuple + A len-2 integer tuple specifying the range of baseline lengths + (meters in ENU frame) to consider. + + bl_deg_range : len-2 tuple + A len-2 integer tuple specifying the range of baseline angles + (degrees in ENU frame) to consider. + + xants : list + A list of integer antenna numbers to exclude. + + verbose : bool + If True, print feedback to stdout. + + Returns + ------- + groupings : dict + A dictionary holding pol and group pair (tuple) as keys + and a list of baseline-pairs as values. + """ + # type check + if isinstance(uv_templates, (str, np.str)): + uv_templates = [uv_templates] + assert len(pol_pairs) == len(group_pairs), "len(pol_pairs) must equal "\ + "len(group_pairs)" + + # get unique pols and groups + pols = sorted(set([item for sublist in pol_pairs for item in sublist])) + groups = sorted(set([item for sublist in group_pairs for item in sublist])) + + # parse wildcards in uv_templates to get wildcard-unique filenames + unique_files = [] + pol_grps = [] + for template in uv_templates: + for pol in pols: + for group in groups: + # parse wildcards with pol / group selection + files = glob.glob(template.format(pol=pol, group=group)) + # if any files were parsed, add to pol_grps + if len(files) > 0: + pol_grps.append((pol, group)) + # insert into unique_files with {pol} and {group} re-inserted + for _file in files: + _unique_file = _file.replace(".{pol}.".format(pol=pol), + ".{pol}.").replace(".{group}.".format(group=group), ".{group}.") + if _unique_file not in unique_files: + unique_files.append(_unique_file) + unique_files = sorted(unique_files) + + # use a single file from unique_files and a single pol-group combination to get antenna positions + _file = unique_files[0].format(pol=pol_grps[0][0], group=pol_grps[0][1]) + uvd = UVData() + uvd.read_miriad_metadata(_file) + + # get baseline pairs + (_bls1, _bls2, _, _, + _) = calc_reds(uvd, uvd, filter_blpairs=False, exclude_auto_bls=exclude_auto_bls, + exclude_permutations=exclude_permutations, bl_len_range=bl_len_range, + bl_deg_range=bl_deg_range) + + # take out xants if fed + if xants is not None: + bls1, bls2 = [], [] + for bl1, bl2 in zip(_bls1, _bls2): + if bl1[0] not in xants and bl1[1] not in xants and bl2[0] not in xants and bl2[1] not in xants: + bls1.append(bl1) + bls2.append(bl2) + else: + bls1, bls2 = _bls1, _bls2 + blps = zip(bls1, bls2) + + # iterate over pol-group pairs that exist + groupings = odict() + for pp, gp in zip(pol_pairs, group_pairs): + if (pp[0], gp[0]) not in pol_grps or (pp[1], gp[1]) not in pol_grps: + if verbose: + print "pol_pair {} and group_pair {} not found in data files".format(pp, gp) + continue + groupings[(tuple(gp), tuple(pp))] = blps + + return groupings + + +def job_monitor(run_func, iterator, action_name, M=map, lf=None, maxiter=1, verbose=True): + """ + Job monitoring function. + + Parameters + ---------- + run_func : function + A worker function to run on each element in iterator + + iterator : iterable + An iterable whose elements define an individual job launch + + action_name : str + The name of the block in the pipeline + + lf : file descriptor + Log-file descriptor to print message to. + + maxiter : int + Maximum number of job re-tries for failed jobs. + + verbose : bool + If True, print feedback to stdout and logfile. + + Returns + ------- + failures : list + A list of failed job indices from iterator + """ + # run function over jobs + exit_codes = np.array(M(run_func, iterator)) + time = datetime.utcnow() + + # inspect for failures + if np.all(exit_codes != 0): + # everything failed, raise error + log("\n{}\nAll {} jobs failed w/ exit codes\n {}: {}\n".format("-"*60, action_name, exit_codes, time), f=lf, verbose=verbose) + raise ValueError("All {} jobs failed".format(action_name)) + + # if not all failed, try re-run + failures = np.where(exit_codes != 0)[0] + counter = 1 + while True: + if not np.all(exit_codes == 0): + if counter >= maxiter: + # break after certain number of tries + break + # re-run function over jobs that failed + exit_codes = np.array(M(run_func, failures)) + # update counter + counter += 1 + # update failures + failures = failures[exit_codes != 0] + else: + # all passed + break + + # print failures if they exist + if len(failures) > 0: + log("\nSome {} jobs failed after {} tries:\n{}".format(action, maxiter, failures), f=lf, verbose=verbose) + else: + log("\nAll {} jobs ran through".format(action), f=lf, verbose=verbose) + + return failures + + +def get_blvec_reds(blvecs, bl_error_tol=1.0): + """ + Given a blvecs dictionary, form groups of baseline-pair objects based on + redundancy in ENU coordinates. Note: this only uses the East-North components + of the baseline vectors to calculate redundancy. + + Parameters: + ----------- + blvecs : dictionary (or UVPSpec object) + A dictionary with baseline vectors as values. Alternatively, this + can be a UVPSpec object with baseline-pairs and baseline vectors. + + bl_error_tol : int + Redundancy tolerance of baseline vector in meters. + + Returns: + -------- + red_bl_grp : list + A list of baseline groups, ordered by ascending baseline length. + + red_bl_len : list + A list of baseline lengths in meters for each bl group + + red_bl_ang : list + A list of baseline angles in degrees for each bl group + + red_bl_tag : list + A list of baseline string tags denoting bl length and angle + """ + from hera_pspec import UVPSpec + # type check + assert isinstance(blvecs, (dict, odict, UVPSpec)), "blpairs must be fed as a dict or UVPSpec" + if isinstance(blvecs, UVPSpec): + # get baseline vectors + uvp = blvecs + bls = uvp.bl_array + bl_vecs = uvp.get_ENU_bl_vecs()[:, :2] + blvecs = dict(zip(map(uvp.bl_to_antnums, bls), bl_vecs)) + # get baseline-pairs + blpairs = uvp.get_blpairs() + # form dictionary + _blvecs = odict() + for blp in blpairs: + bl1 = blp[0] + bl2 = blp[1] + _blvecs[blp] = (blvecs[bl1] + blvecs[bl2]) / 2. + blvecs = _blvecs + + # create empty lists + red_bl_grp = [] + red_bl_vec = [] + red_bl_len = [] + red_bl_ang = [] + red_bl_tag = [] + + # iterate over each baseline in blvecs + for bl in blvecs.keys(): + # get bl vector and properties + bl_vec = blvecs[bl][:2] + bl_len = np.linalg.norm(bl_vec) + bl_ang = np.arctan2(*bl_vec[::-1]) * 180 / np.pi + if bl_ang < 0: bl_ang = (bl_ang + 180) % 360 + bl_tag = "{:03.0f}_{:03.0f}".format(bl_len, bl_ang) + + # append to list if unique within tolerance + match = [np.all(np.isclose(blv, bl_vec, bl_error_tol)) for blv in red_bl_vec] + if np.any(match): + match_id = np.where(match)[0][0] + red_bl_grp[match_id].append(bl) + + # else create new list + else: + red_bl_grp.append([bl]) + red_bl_vec.append(bl_vec) + red_bl_len.append(bl_len) + red_bl_ang.append(bl_ang) + red_bl_tag.append(bl_tag) + + # order based on tag + order = np.argsort(red_bl_tag) + red_bl_grp = [red_bl_grp[i] for i in order] + red_bl_len = [red_bl_len[i] for i in order] + red_bl_ang = [red_bl_ang[i] for i in order] + red_bl_tag = [red_bl_tag[i] for i in order] + + return red_bl_grp, red_bl_len, red_bl_ang, red_bl_tag + diff --git a/hera_pspec/uvpspec.py b/hera_pspec/uvpspec.py index b50b517f..97594447 100644 --- a/hera_pspec/uvpspec.py +++ b/hera_pspec/uvpspec.py @@ -91,12 +91,12 @@ def __init__(self): "bl_vecs", "bl_array", "channel_width", "telescope_location", "weighting", "vis_units", "norm_units", "taper", "norm", "nsample_array", 'lst_avg_array', 'time_avg_array', 'folded', "scalar_array", "labels", "label_1_array", - "label_2_array","store_cov"] + "label_2_array"] # all parameters must fall into one and only one of the following groups, which are used in __eq__ self._immutables = ["Ntimes", "Nblpairts", "Nblpairs", "Nspwdlys", "Nspws", "Ndlys", "Npols", "Nfreqs", "history", "Nbls", "channel_width", "weighting", - "vis_units", "norm", "norm_units", "taper", "cosmo", "beamfile" ,'folded',"store_cov"] + "vis_units", "norm", "norm_units", "taper", "cosmo", "beamfile", 'folded'] self._ndarrays = ["spw_array", "freq_array", "dly_array", "pol_array", "lst_1_array", 'lst_avg_array', 'time_avg_array', "lst_2_array", "time_1_array", "time_2_array", "blpair_array", "OmegaP", "OmegaPP", "beam_freqs", @@ -105,7 +105,6 @@ def __init__(self): self._dicts = ["data_array", "wgt_array", "integration_array", "nsample_array", "cov_array"] # define which attributes are considred meta data. Large attrs should be constructed as datasets - self._meta_dsets = ["lst_1_array", "lst_2_array", "time_1_array", "time_2_array", "blpair_array", "bl_vecs", "bl_array", 'lst_avg_array', 'time_avg_array', 'OmegaP', 'OmegaPP', "label_1_array", "label_2_array"] @@ -768,7 +767,6 @@ def get_ENU_bl_vecs(self): (self.bl_vecs + self.telescope_location).T, \ *uvutils.LatLonAlt_from_XYZ(self.telescope_location)).T - def read_from_group(self, grp, just_meta=False, spws=None, bls=None, blpairs=None, times=None, pols=None, only_pairs_in_bls=False): @@ -809,6 +807,10 @@ def read_from_group(self, grp, just_meta=False, spws=None, bls=None, If True, keep only baseline-pairs whose first _and_ second baseline are found in the 'bls' list. Default: False. """ + # Make sure the group is a UVPSpec object + assert 'pspec_type' in grp.attrs, "This object is not a UVPSpec object" + assert grp.attrs['pspec_type'] == 'UVPSpec', "This object is not a UVPSpec object" + # Clear all data in the current object self._clear() @@ -837,7 +839,6 @@ def read_from_group(self, grp, just_meta=False, spws=None, bls=None, self.check(just_meta=just_meta) - def read_hdf5(self, filepath, just_meta=False, spws=None, bls=None, blpairs=None, times=None, pols=None, only_pairs_in_bls=False): @@ -916,23 +917,27 @@ def write_to_group(self, group, run_check=True): group.create_dataset(k, data=getattr(self, k)) # Iterate over spectral windows and create datasets + store_cov = hasattr(self, 'cov_array') for i in np.unique(self.spw_array): group.create_dataset("data_spw{}".format(i), data=self.data_array[i], - dtype=np.complex) + dtype=np.complex128) group.create_dataset("wgt_spw{}".format(i), data=self.wgt_array[i], - dtype=np.float) + dtype=np.float64) group.create_dataset("integration_spw{}".format(i), data=self.integration_array[i], - dtype=np.float) + dtype=np.float64) group.create_dataset("nsample_spw{}".format(i), data=self.nsample_array[i], - dtype=np.float) - if self.store_cov: + dtype=np.float64) + if store_cov: group.create_dataset("cov_spw{}".format(i), data=self.cov_array[i], - dtype=np.complex) + dtype=np.complex128) + + # denote as a uvpspec object + group.attrs['pspec_type'] = self.__class__.__name__ def write_hdf5(self, filepath, overwrite=False, run_check=True): """ @@ -960,7 +965,6 @@ def write_hdf5(self, filepath, overwrite=False, run_check=True): with h5py.File(filepath, 'w') as f: self.write_to_group(f, run_check=run_check) - def set_cosmology(self, new_cosmo, overwrite=False, new_beam=None, verbose=True): """ @@ -1502,11 +1506,10 @@ def combine_uvpspec(uvps, verbose=True): Nblpairts = len(new_blpts) Npols = len(new_pols) - # store covariance only if all uvps have stored covariance. store_cov = np.all([hasattr(uvp,'cov_array') for uvp in uvps]) - # Create new empty data arrays and fill spw arrays + # Create new empty data arrays and fill spw arrays u.data_array = odict() u.integration_array = odict() u.wgt_array = odict() @@ -1525,7 +1528,7 @@ def combine_uvpspec(uvps, verbose=True): u.wgt_array[i] = np.empty((Nblpairts, spw[2], 2, Npols), np.float64) u.nsample_array[i] = np.empty((Nblpairts, Npols), np.float64) if store_cov: - u.cov_array[i]=np.empty((Nblpairts, spw[2], spw[2], Npols), np.complex128) + u.cov_array[i] = np.empty((Nblpairts, spw[2], spw[2], Npols), np.complex128) # Set frequencies and delays spw_Nfreqs = spw[-1] @@ -1537,14 +1540,12 @@ def combine_uvpspec(uvps, verbose=True): u.freq_array.extend(spw_freqs) u.dly_array.extend(spw_dlys) - # Convert to numpy arrays u.spw_array = np.array(u.spw_array) u.freq_array = np.array(u.freq_array) u.dly_array = np.array(u.dly_array) u.pol_array = np.array(new_pols) - # Number of spectral windows, delays etc. u.Nspws = Nspws u.Nblpairts = Nblpairts @@ -1607,7 +1608,7 @@ def combine_uvpspec(uvps, verbose=True): u.label_1_array[i,j,k] = u_lbls[uvps[l].labels[lbl1]] u.label_2_array[i,j,k] = u_lbls[uvps[l].labels[lbl2]] if store_cov: - u.cov_array[i][j, :, :, k]=uvps[l].cov_array[m][n, :, :, q] + u.cov_array[i][j, :, :, k] = uvps[l].cov_array[m][n, :, :, q] # Populate new LST, time, and blpair arrays for j, blpt in enumerate(new_blpts): n = blpts_idxs0[j] @@ -1646,7 +1647,7 @@ def combine_uvpspec(uvps, verbose=True): u.data_array[i][j,:,k] = uvps[l].data_array[m][n,:,q] if store_cov: - u.cov_array[i][j, :, :, k]=uvps[l].cov_array[m][n, :, :, q] + u.cov_array[i][j, :, :, k] = uvps[l].cov_array[m][n, :, :, q] u.wgt_array[i][j,:,:,k] = uvps[l].wgt_array[m][n,:,:,q] u.integration_array[i][j,k] = uvps[l].integration_array[m][n,q] u.nsample_array[i][j,k] = uvps[l].integration_array[m][n,q] diff --git a/hera_pspec/uvpspec_utils.py b/hera_pspec/uvpspec_utils.py index adcc03db..70530abb 100644 --- a/hera_pspec/uvpspec_utils.py +++ b/hera_pspec/uvpspec_utils.py @@ -1,4 +1,3 @@ - import numpy as np import copy, operator from collections import OrderedDict as odict @@ -44,7 +43,8 @@ def _get_blpairs_from_bls(uvp, bls, only_pairs_in_bls=False): return blp_select -def _select(uvp, spws=None, bls=None, only_pairs_in_bls=False, blpairs=None, times=None, pols=None, h5file=None): +def _select(uvp, spws=None, bls=None, only_pairs_in_bls=False, blpairs=None, times=None, + pols=None, h5file=None): """ Select function for selecting out certain slices of the data, as well as loading in data from HDF5 file. @@ -157,7 +157,10 @@ def _select(uvp, spws=None, bls=None, only_pairs_in_bls=False, blpairs=None, tim ints = odict() nsmp = odict() cov = odict() - store_cov = hasattr(uvp, 'cov_array') + if h5file is not None: + store_cov = 'cov_spw0' in h5file + else: + store_cov = hasattr(uvp, 'cov_array') for s in np.unique(uvp.spw_array): if h5file is not None: data[s] = h5file['data_spw{}'.format(s)][blp_select, :, pol_select] diff --git a/pipelines/idr2_preprocessing/preprocess_data.py b/pipelines/idr2_preprocessing/preprocess_data.py index 9b6e37a5..ba38966c 100755 --- a/pipelines/idr2_preprocessing/preprocess_data.py +++ b/pipelines/idr2_preprocessing/preprocess_data.py @@ -115,16 +115,12 @@ def bl_reformat(j, i=i, datapols=datapols, dfs=dfs, lens=lens, angs=angs, reds=r uvd.read_miriad(dfs, ant_pairs_nums=reds[j]) uvd.write_miriad(outname, clobber=True) except: - err, _, tb = sys.exc_info() - hp.utils.log("\njob {} threw {} Exception with traceback:".format(j, err), f=ef, tb=tb, verbose=verbose) + hp.utils.log("\njob {} threw exception:".format(j), f=ef, tb=sys.exc_info(), verbose=verbose) return 1 return 0 - # distribute across baseline types - exit_codes = M(bl_reformat, range(len(reds))) - - # print to log - hp.utils.log("\nbaseline reformatting exit codes for pol {}:\n {}".format(datapols[i][0], exit_codes), f=lf, verbose=verbose) + # launch jobs + failures = job_monitor(bl_reformat, range(len(reds)), "BL REFORMAT: pol {}".format(pol), lf=lf, maxiter=maxiter, verbose=verbose) # edit data template input_data_template = os.path.join(out_dir, new_data_template.format(pol='{pol}', suffix=data_suffix)) @@ -163,18 +159,15 @@ def run_xrfi(i, datafiles=datafiles, p=cf['algorithm']['xrfi']): add_to_history='', clobber=overwrite) except: - err, _, tb = sys.exc_info() - hp.utils.log("\n{} threw {} Exception with traceback:".format(outname, err), f=ef, tb=tb, verbose=verbose) + hp.utils.log("\njob {} threw exception:".format(i), f=ef, tb=sys.exc_info(), verbose=verbose) return 1 return 0 - # run tavg sub on each datafile - exit_codes = M(run_xrfi, range(len(datafiles))) - - # print to log - hp.utils.log("\nRFI flag exit codes:\n {}".format(exit_codes), f=lf, verbose=verbose) + # launch jobs + failures = hp.utils.job_monitor(run_xrfi, range(len(datafiles)), "XRFI", lf=lf, maxiter=maxiter, verbose=verbose) + # update template input_data_template = os.path.join(out_dir, os.path.basename(input_data_template) + file_ext) data_suffix += file_ext @@ -194,7 +187,8 @@ def run_xrfi(i, datafiles=datafiles, p=cf['algorithm']['xrfi']): datafiles, datapols = uvt.utils.search_data(input_data_template.format(group=groupname, pol='{pol}'), pols, matched_pols=False, reverse_nesting=False, flatten=False) # load a datafile and get antenna numbers - _, _, uvd = uvutils.get_miriad_antpos(datafiles[0][0]) + uvd = UVData() + uvd.read_miriad_metadata(datafiles[0][0]) antpos, ants = uvd.get_ENU_antpos() antpos_dict = dict(zip(ants, antpos)) @@ -252,14 +246,13 @@ def full_tavg(j, pol=pol, lens=lens, angs=angs, reds=reds, dfs=dfs, data_suffix= tavg_file = os.path.join(out_dir, tavg_file) F.write_data(tavg_file, write_avg=True, overwrite=overwrite) except: - err, _, tb = sys.exc_info() - hp.utils.log("\n{} threw {} Exception with traceback:".format(j, err), f=ef, tb=tb, verbose=verbose) + hp.utils.log("\njob {} threw exception:".format(j), f=ef, tb=sys.exc_info(), verbose=verbose) return 1 return 0 - # run function - exit_codes = M(full_tavg, range(len(reds))) + # launch jobs + failures = hp.utils.job_monitor(full_tavg, range(len(reds)), "FULL TAVG: pol {}".format(pol), lf=lf, maxiter=maxiter, verbose=verbose) # collate tavg spectra into a single file tavgfiles = sorted(glob.glob(os.path.join(out_dir, "zen.{group}.{pol}.*.{tavg_tag}.{suffix}".format(group=groupname, pol=pol, tavg_tag=tavg_tag, suffix=data_suffix)))) @@ -296,22 +289,21 @@ def tavg_sub(j, dfs=dfs, pol=pol, tavg_file=tavg_out, p=cf['algorithm']['timeavg print "baseline {} not found in time-averaged spectrum".format(bl) uvd.flag_array[bl_inds, :, :, pol_ind] = True + # put uniq_bls in if it doesn't exist + if not uvd.extra_keywords.has_key('uniq_bls'): + uvd.extra_keywords['uniq_bls'] = json.dumps(np.unique(uvd.baseline_array).tolist()) # write tavg-subtracted data out_df = os.path.join(out_dir, os.path.basename(df) + p['file_ext']) uvd.history += "\nTime-Average subtracted." uvd.write_miriad(out_df, clobber=overwrite) except: - err, _, tb = sys.exc_info() - hp.utils.log("\n{} threw {} Exception with traceback:".format(i, err), f=ef, tb=tb, verbose=verbose) + hp.utils.log("\njob {} threw exception:".format(i), f=ef, tb=sys.exc_info(), verbose=verbose) return 1 return 0 - # run function - exit_codes = M(tavg_sub, range(len(dfs))) - - # print to log - hp.utils.log("\npol {} time-average subtraction exit codes:\n {}".format(datapols[i][0], exit_codes), f=lf, verbose=verbose) + # launch jobs + failures = hp.utils.job_monitor(tavg_sub, range(len(dfs)), "TAVG SUB: pol {}".format(pol), lf=lf, maxiter=maxiter, verbose=verbose) time = datetime.utcnow() hp.utils.log("\nfinished full time-average spectra and subtraction: {}\n{}".format(time, "-"*60), f=lf, verbose=verbose) @@ -332,7 +324,8 @@ def tavg_sub(j, dfs=dfs, pol=pol, tavg_file=tavg_out, p=cf['algorithm']['timeavg datafiles, datapols = uvt.utils.search_data(input_data_template.format(group=groupname, pol='{pol}'), pols, matched_pols=False, reverse_nesting=False, flatten=False) # load a datafile and get antenna numbers - _, _, uvd = uvutils.get_miriad_antpos(datafiles[0][0]) + uvd = UVData() + uvd.read_miriad_metadata(datafiles[0][0]) antpos, ants = uvd.get_ENU_antpos() antpos_dict = dict(zip(ants, antpos)) @@ -369,17 +362,13 @@ def time_average(j, dfs=dfs, pol=pol, lens=lens, angs=angs, reds=reds, data_suff tavg_file = os.path.join(out_dir, tavg_file + p['file_ext']) F.write_data(tavg_file, write_avg=True, overwrite=overwrite) except: - err, _, tb = sys.exc_info() - hp.utils.log("\n{} threw {} Exception with traceback:".format(i, err), f=ef, tb=tb, verbose=verbose) + hp.utils.log("\njob {} threw exception:".format(i), f=ef, tb=sys.exc_info(), verbose=verbose) return 1 return 0 - # distribute jobs across baselinetype - exit_codes = M(time_average, range(len(reds))) - - # print to log - hp.utils.log("\npol {} time average exit codes:\n {}".format(pol, exit_codes), f=lf, verbose=verbose) + # launch jobs + failures = hp.utils.job_monitor(time_average, range(len(reds)), "TIME AVERAGE: pol {}".format(pol), lf=lf, maxiter=maxiter, verbose=verbose) # collate averaged data into time chunks tavg_files = os.path.join(out_dir, "zen.{group}.{pol}.*.{suffix}".format(group=groupname, pol=pol, suffix=data_suffix + file_ext)) @@ -403,22 +392,19 @@ def reformat_files(j, pol=pol, tavg_files=tavg_files, times=times, data_suffix=d outfile = os.path.join(out_dir, "zen.{group}.{pol}.LST.{LST:.5f}.{suffix}".format(group=groupname, pol=pol, LST=lst, suffix=data_suffix + p['file_ext'])) uvd.write_miriad(outfile, clobber=overwrite) except: - err, _, tb = sys.exc_info() - hp.utils.log("\n{} threw {} Exception with traceback:".format(i, err), f=ef, tb=tb, verbose=verbose) + hp.utils.log("\njob {} threw exception:".format(i), f=ef, tb=sys.exc_info(), verbose=verbose) return 1 return 0 - exit_codes = M(reformat_files, range(len(times))) + # launch jobs + failures = hp.utils.job_monitor(reformat_files, range(len(times)), "TAVG REFORMAT: pol {}".format(pol), lf=lf, maxiter=maxiter, verbose=verbose) # clean up time averaged files for f in tavg_files: if os.path.exists(f): shutil.rmtree(f) - # print to log - hp.utils.log("\npol {} time reformat files exit codes:\n {}".format(pol, exit_codes), f=lf, verbose=verbose) - input_data_template = os.path.join(out_dir, os.path.basename(input_data_template) + file_ext) data_suffix += file_ext @@ -461,17 +447,13 @@ def make_pstokes(i, datafiles=datafiles, datapols=datapols, p=cf['algorithm']['p if verbose: print "failed to make pstokes {} for job {}".format(pstokes, i) except: - err, _, tb = sys.exc_info() - hp.utils.log("datafile {} threw {} Exception with traceback:".format(i, err), f=ef, tb=tb, verbose=verbose) + hp.utils.log("job {} threw exception:".format(i), f=ef, tb=sys.exc_info(), verbose=verbose) return 1 return 0 - # iterate over unique datafiles and construct pstokes - exit_codes = M(make_pstokes, range(len(datafiles))) - - # print to log - hp.utils.log("\npseudo stokes exit codes:\n {}".format(exit_codes), f=lf, verbose=verbose) + # launch jobs + failures = hp.utils.job_monitor(make_pstokes, range(len(datafiles)), "PSTOKES", lf=lf, maxiter=maxiter, verbose=verbose) # add pstokes pols to pol list for downstream calculations pols += outstokes @@ -508,16 +490,12 @@ def fg_filter(i, datafiles=datafiles, p=cf['algorithm']['fg_filt']): outfile = os.path.join(out_dir, os.path.basename(df) + p['inpaint_file_ext']) DF.write_filtered_data(outfile, filetype_out='miriad', clobber=overwrite, write_filled_data=True, add_to_history="FG model flag inpainted with: {}".format(json.dumps(p['filt_params']))) except: - err, _, tb = sys.exc_info() - hp.utils.log("datafile {} threw {} Exception with traceback:".format(i, err), f=ef, tb=tb, verbose=verbose) + hp.utils.log("job {} threw exception:".format(i), f=ef, tb=sys.exc_info(), verbose=verbose) return 1 return 0 - # iterate over datafiles and filter - exit_codes = M(fg_filter, range(len(datafiles))) - - # print to log - hp.utils.log("\nfg filtering exit codes:\n {}".format(exit_codes), f=lf, verbose=verbose) + # launch jobs + failures = hp.utils.job_monitor(fg_filter, range(len(datafiles)), "FG FILTER", lf=lf, maxiter=maxiter, verbose=verbose) time = datetime.utcnow() hp.utils.log("\nfinished foreground-filtering: {}\n{}".format(time, "-"*60), f=lf, verbose=verbose) diff --git a/pipelines/idr2_preprocessing/preprocess_params.yaml b/pipelines/idr2_preprocessing/preprocess_params.yaml index fd7932cc..d9275d38 100644 --- a/pipelines/idr2_preprocessing/preprocess_params.yaml +++ b/pipelines/idr2_preprocessing/preprocess_params.yaml @@ -39,6 +39,7 @@ data : groupname : "all" # a single groupname # full path to a data template with {pol} and {group} input_data_template : '../../hera_pspec/data/zen.{group}.{pol}.LST.1.06964.uvA' + maxiter : 1 # number of maximum job retries #--------------------------------------------------------------- # Algorithm Parameters diff --git a/pipelines/pspec_pipeline/pspec_batch.sh b/pipelines/pspec_pipeline/pspec_batch.sh new file mode 100644 index 00000000..dd70cba7 --- /dev/null +++ b/pipelines/pspec_pipeline/pspec_batch.sh @@ -0,0 +1,19 @@ +#!/bin/bash +#PBS -q hera +#PBS -V +#PBS -j oe +#PBS -o pspec_batch.out +#PBS -l nodes=1:ppn=8 +#PBS -l walltime=24:00:00 +#PBS -l vmem=128gb, mem=128gb + +# start script +echo "starting power spectrum pipeline: $(date)" + +# put a lock on parameter file, run pspec_pipe.py, unlock file +chmod 444 pspec_pipe.yaml +pspec_pipe.py pspec_pipe.yaml +chmod 775 pspec_pipe.yaml + +# end script +echo "ending power spectrum pipeline: $(date)" diff --git a/pipelines/pspec_pipeline/pspec_pipe.py b/pipelines/pspec_pipeline/pspec_pipe.py new file mode 100755 index 00000000..1b1a764b --- /dev/null +++ b/pipelines/pspec_pipeline/pspec_pipe.py @@ -0,0 +1,228 @@ +#!/usr/bin/env python2 +""" +pspec_pipe.py +----------------------------------------- +Copyright (c) 2018 The HERA Collaboration + +This script is used as the IDR2.1 power +spectrum pipeline. + +See pspec_pipe.yaml for relevant parameter selections. +""" +import multiprocess +import numpy as np +import hera_cal as hc +import hera_pspec as hp +import hera_qm as hq +from pyuvdata import UVData +import pyuvdata.utils as uvutils +import os +import sys +import glob +import yaml +from datetime import datetime +import uvtools as uvt +import json +import itertools +import aipy +import shutil +from collections import OrderedDict as odict + + +#------------------------------------------------------------------------------- +# Parse YAML Configuration File +#------------------------------------------------------------------------------- +# get config and load dictionary +config = sys.argv[1] +cf = hp.utils.load_config(config) + +# update globals with IO params, data and analysis params +globals().update(cf['io']) +globals().update(cf['data']) +globals().update(cf['analysis']) + +# get common suffix +data_suffix = os.path.splitext(data_template)[1][1:] + +# open logfile +logfile = os.path.join(out_dir, logfile) +if os.path.exists(logfile) and overwrite == False: + raise IOError("logfile {} exists and overwrite == False, quitting pipeline...".format(logfile)) +lf = open(logfile, "w") +if joinlog: + ef = lf +else: + ef = open(os.path.join(out_dir, errfile), "w") +time = datetime.utcnow() +hp.utils.log("Starting pspec pipeline on {}\n{}\n".format(time, '-'*60), f=lf, verbose=verbose) +hp.utils.log(json.dumps(cf, indent=1) + '\n', f=lf, verbose=verbose) + +# Create multiprocesses +if multiproc: + pool = multiprocess.Pool(nproc) + M = pool.map +else: + M = map + +# change to working dir +os.chdir(work_dir) + + +#------------------------------------------------------------------------------- +# Run Jacknife Data Difference +#------------------------------------------------------------------------------- +if run_diff: + # get algorithm parameters + globals().update(cf['algorithm']['diff']) + time = datetime.utcnow() + hp.utils.log("\n{}\nstarting {} visibility data difference: {}\n".format("-"*60, diff_type, time), f=lf, verbose=verbose) + + raise NotImplementedError + + +#------------------------------------------------------------------------------- +# Run OQE Pipeline +#------------------------------------------------------------------------------- +if run_pspec: + # get algorithm parameters + globals().update(cf['algorithm']['pspec']) + time = datetime.utcnow() + hp.utils.log("\n{}\nstarting pspec OQE: {}\n".format("-"*60, time), f=lf, verbose=verbose) + + # configure dataset groupings + groupings = hp.utils.config_pspec_blpairs(data_template, pol_pairs, group_pairs, exclude_auto_bls=exclude_auto_bls, + exclude_permutations=exclude_permutations, bl_len_range=bl_len_range, + bl_deg_range=bl_deg_range, xants=xants) + + # create dictionary of individual jobs to launch + jobs = odict() + for pair in groupings: + # make Nblps / Nblps_per_job jobs for this pair + blps = groupings[pair] + Nblps = len(blps) + Njobs = Nblps // Nblps_per_job + 1 + job_blps = [blps[i*Nblps_per_job:(i+1)*Nblps_per_job] for i in range(Njobs)] + for i, _blps in enumerate(job_blps): + key = tuple(hp.utils.flatten(pair) + [i]) + jobs[key] = _blps + + # create pspec worker function + def pspec(i, outfile=outfname, dt=data_template, st=std_template, jobs=jobs, pol_pairs=pol_pairs, p=cf['algorithm']['pspec']): + try: + # get key + key = jobs.keys()[i] + # parse dsets + dsets = [dt.format(group=key[0], pol=key[2]), dt.format(group=key[1], pol=key[3])] + dset_labels = [os.path.basename(d) for d in dsets] + if st is not None and st is not '' and st is not 'None': + dsets_std = [dt.format(group=key[0], pol=key[2]), dt.format(group=key[1], pol=key[3])] + else: + dsets_std = None + + # pspec_run + hp.pspecdata.pspec_run(dsets, outfile, dsets_std=dsets_std, dset_labels=dset_labels, + dset_pairs=[(0, 1)], spw_ranges=p['spw_ranges'], n_dlys=p['n_dlys'], + pol_pairs=pol_pairs, blpairs=jobs[key], input_data_weight=p['input_data_weight'], + norm=p['norm'], taper=p['taper'], beam=p['beam'], cosmo=p['cosmo'], + rephase_to_dset=p['rephase_to_dset'], trim_dset_lsts=p['trim_dset_lsts'], + broadcast_dset_flags=p['broadcast_dset_flags'], time_thresh=p['time_thresh'], + Jy2mK=p['Jy2mK'], overwrite=overwrite, psname_ext="_{}".format(key[4]), + verbose=verbose) + except: + hp.utils.log("\nPSPEC job {} errored with:".format(i), f=ef, tb=sys.exc_info(), verbose=verbose) + return 1 + + return 0 + + # launch pspec jobs + failures = hp.utils.job_monitor(pspec, range(len(jobs)), "PSPEC", lf=lf, maxiter=maxiter, verbose=verbose) + + # print failures if they exist + if len(failures) > 0: + hp.utils.log("\nSome PSPEC jobs failed after {} tries:\n{}".format(maxiter, '\n'.join(["job {}: {}".format(i, str(jobs.keys()[i])) for i in failures])), f=lf, verbose=verbose) + + # print to log + time = datetime.utcnow() + hp.utils.log("\nFinished PSPEC pipeline: {}\n{}".format(time, "-"*60), f=lf, verbose=verbose) + + # Merge power spectrum files from separate jobs + hp.utils.log("\nStarting power spectrum file merge: {}\n{}".format(time, '-'*60), f=lf, verbose=verbose) + + # Get all groups + groups = psc.groups() + + # Define merge function + def merge(i, filename=filename, groups=groups): + try: + psc = hp.PSpecContainer(filename, mode='rw') + grp = groups[i] + spectra = [os.path.join(grp, sp) for sp in psc.get_spectra(grp)] + hp.utils.merge_pspec(psc, spectra=spectra) + except: + hp.utils.log("\nPSPEC MERGE job {} errored with:".format(i), f=ef, tb=sys.exc_info(), verbose=verbose) + return 1 + + return 0 + + # launch pspec merge jobs + failures = hp.utils.job_monitor(merge, range(len(groups)), "PSPEC MERGE", lf=lf, maxiter=maxiter, verbose=verbose) + + # print failures if they exist + if len(failures) > 0: + hp.utils.log("\nSome PSPEC MERGE jobs failed after {} tries:\n{}".format(maxiter, '\n'.join(["group {}: {}".format(i, str(groups[i])) for i in failures])), f=lf, verbose=verbose) + + # print to log + time = datetime.utcnow() + hp.utils.log("\nFinished PSPEC pipeline: {}\n{}".format(time, "-"*60), f=lf, verbose=verbose) + + +#------------------------------------------------------------------------------- +# Run Bootstrap Pipeline +#------------------------------------------------------------------------------- +if run_bootstrap: + # get algorithm parameters + globals().update(cf['algorithm']['bootstrap']) + time = datetime.utcnow() + hp.utils.log("\n{}\nStarting BOOTSTRAP resampling pipeline: {}\n".format("-"*60, time), f=lf, verbose=verbose) + + # open container + psc = hp.PSpecContainer(filename, mode='r') + + # get groups + groups = psc.get_groups() + del psc + + # define bootstrap function + def bootstrap(i, filename=filename, groups=groups, p=cf['algorithm']['bootstrap']): + try: + # get container + psc = hp.PSpecContainer(filename, mode='rw') + + # get spectra + group = groups[i] + spectra = psc.spectra(group) + + # run bootstrap + hp.grouping.bootstrap_run(psc, spectra=spectra, time_avg=p['time_avg'], Nsamples=p['Nsamples'], + seed=p['seed'], normal_std=p['normal_std'], robust_std=p['robust_std'], + conf_ints=p['conf_ints'], keep_samples=p['keep_samples'], + bl_error_tol=p['bl_error_tol'], overwrite=overwrite, verbose=verbose) + + + except: + hp.utils.log("\nBOOTSTRAP job {} errored with:".format(i), f=ef, tb=sys.exc_info(), verbose=verbose) + return 1 + + return 0 + + # launch bootstrap jobs + failures = hp.utils.job_monitor(bootstrap, range(len(groups)), "BOOTSTRAP", lf=lf, maxiter=maxiter, verbose=verbose) + + # print failures if they exist + if len(failures) > 0: + hp.utils.log("\nSome BOOTSTRAP jobs failed after {} tries:\n{}".format(maxiter, '\n'.join(["group {}: {}".format(i, str(groups[i])) for i in failures])), f=lf, verbose=verbose) + + # print to log + time = datetime.utcnow() + hp.utils.log("\nFinished BOOTSTRAP pipeline: {}\n{}".format(time, "-"*60), f=lf, verbose=verbose) + diff --git a/pipelines/pspec_pipeline/pspec_pipe.yaml b/pipelines/pspec_pipeline/pspec_pipe.yaml new file mode 100644 index 00000000..4a10c7b7 --- /dev/null +++ b/pipelines/pspec_pipeline/pspec_pipe.yaml @@ -0,0 +1,99 @@ +# pspec_pipe.yaml +# +# hera_pspec IDR2.1 pipeline +# configuration file +# +# Note: only strings, booleans, integers, floats, +# lists, and lists of lists may be specified here. +# When PSpecData parameters have required type None +# use 'None' here, and for list of tuples, +# use list of lists here. + +#--------------------------------------------------------------- +# IO Parameters +#--------------------------------------------------------------- +io : + work_dir : './' # directory to work in + out_dir : './' # directory to dump all output in + logfile : 'pspipe_out.log' # logfile + errfile : 'pspipe_err.log' # error file + joinlog : False # redirect error output into logfile + overwrite : True # overwrite + verbose : True # report feedback to standard output + +#--------------------------------------------------------------- +# Analysis Parameters +#--------------------------------------------------------------- +analysis : + run_diff : False # Run visibility data difference + run_pspec : True # Run power spectrum estimation + run_bootstrap : True # Run bootstrapping for errorbars + multiproc : False # use multiprocess module + nproc : 8 # number of processes to spawn + +#--------------------------------------------------------------- +# Data Parameters +#--------------------------------------------------------------- +data : + # Datafile group pairs to search for + group_pairs : + - ['2458042', '2458042'] + + # Datafile pol pairs to search for + pol_pairs : + - ['xx', 'xx'] + + # Define groupnaming convention + group_split_str : "_x_" + + # full path to data template with {pol} and {group} fields + data_template : './zen.{group}.?????.{pol}.HH.uvXA' + std_template : None + +#--------------------------------------------------------------- +# Algorithm Parameters +#--------------------------------------------------------------- +algorithm : + + # jacknife data difference + diff : + split_type : "antenna" + + # pspec pipeline + pspec : + outfname : "pspec.h5" + input_data_weight : "identity" + norm : "I" + taper : "none" + beam : "HERA_NF_dipole_power.beamfits" + cosmo : "None" + spw_ranges : + - [10, 20] + n_dlys : + - 5 + Jy2mK : True + rephase_to_dset : "None" + trim_dset_lsts : False + broadcast_dset_flags : True + time_thresh : 0.2 + Nblps_per_job : 100 + exclude_auto_bls : True + exclude_permutations : True + bl_len_range : [0, 20] + bl_deg_range : [40, 80] + xants : [] + maxiter : 1 + + # bootstrap pipeline + bootstrap : + time_avg : True + Nsamples : 100 + seed : None + normal_std : True + robust_std : True + conf_ints : None + keep_samples : False + bl_error_tol : 1.0 + maxiter : 1 + + diff --git a/scripts/bootstrap_run.py b/scripts/bootstrap_run.py new file mode 100755 index 00000000..61f37b6c --- /dev/null +++ b/scripts/bootstrap_run.py @@ -0,0 +1,21 @@ +#!/usr/bin/env python +""" +Pipeline script to load a PSpecContainer and bootstrap over redundant baseline- +pair groups to produce errorbars. +""" +import sys +import os +from hera_pspec import pspecdata, grouping +from hera_pspec import uvpspec_utils as uvputils + +# Parse commandline args +args = grouping.get_bootstrap_run_argparser() +a = args.parse_args() +kwargs = vars(a) # dict of args + +# Get arguments +filename = kwargs.pop('filename') + +# Run bootstrap +grouping.bootstrap_run(filename, **kwargs) + diff --git a/scripts/psc_merge_spectra.py b/scripts/psc_merge_spectra.py new file mode 100755 index 00000000..55183d52 --- /dev/null +++ b/scripts/psc_merge_spectra.py @@ -0,0 +1,13 @@ +#!/usr/bin/env python +""" +Command-line script for merging UVPSpec power spectra +within a PSpecContainer. +""" +from hera_pspec import container + +# Parse commandline args +args = container.get_merge_spectra_argparser() +a = args.parse_args() + +container.merge_spectra(a.filename, uvp_split_str=a.uvp_split_str, + ext_split_str=a.ext_split_str, verbose=a.verbose) diff --git a/setup.py b/setup.py index 26de6a35..cc215dd5 100644 --- a/setup.py +++ b/setup.py @@ -33,7 +33,8 @@ def package_files(package_dir, subdirectory): 'install_requires': ['numpy>=1.10', 'scipy>=0.19',], 'include_package_data': True, 'scripts': ['scripts/pspec_run.py', 'scripts/pspec_red.py', - 'pipelines/idr2_preprocessing/preprocess_data.py'], + 'pipelines/idr2_preprocessing/preprocess_data.py', + 'pipelines/pspec_pipeline/pspec_pipe.py'], 'zip_safe': False, }