Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix reconstitute interleaving features #886

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 31 additions & 23 deletions hera_cal/tests/test_vis_clean.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from hera_cal.data import DATA_PATH
from hera_cal import frf
import glob
import re
import copy


Expand Down Expand Up @@ -1130,46 +1131,53 @@ def test_time_chunk_from_baseline_chunks_argparser(self):
assert a.time_chunk_template == 'a'
assert a.outfilename == 'a.out'

def test_time_chunk_from_baseline_chunks(self, tmp_path):
@pytest.mark.parametrize("ninterleave", [1, 2, 4, 8])
def test_time_chunk_from_baseline_chunks(self, tmpdir, ninterleave):
# First, construct some cross-talk baseline files.
datafiles = [os.path.join(DATA_PATH, "test_input/zen.2458101.46106.xx.HH.OCR_53x_54x_only.first.uvh5"),
os.path.join(DATA_PATH, "test_input/zen.2458101.46106.xx.HH.OCR_53x_54x_only.second.uvh5")]

cals = [os.path.join(DATA_PATH, "test_input/zen.2458101.46106.xx.HH.uv.abs.calfits_54x_only.part1"),
os.path.join(DATA_PATH, "test_input/zen.2458101.46106.xx.HH.uv.abs.calfits_54x_only.part2")]
# make a cache directory
cdir = tmp_path / "cache_temp"
cdir.mkdir()
tmp_path = tmpdir.strpath
cdir = tmp_path + "/cache_temp"
os.mkdir(cdir)
Comment on lines +1143 to +1145
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In new code, I'd heartily suggest using the pathlib module for path operations. You can use the pytest fixture tmp_path_factory, then do cdir = tmp_path_factory.mkdir('test_time_chunk_from_baseline_chunks').

# cross-talk filter chunked baselines
for filenum, file in enumerate(datafiles):
baselines = io.baselines_from_filelist_position(file, datafiles)
fname = 'temp.fragment.part.%d.h5' % filenum
fragment_filename = tmp_path / fname
frf.load_tophat_frfilter_and_write(datafiles, baseline_list=baselines, calfile_list=cals,
spw_range=[0, 20], cache_dir=cdir, read_cache=True, write_cache=True,
res_outfilename=fragment_filename, clobber=True, case='sky')
for interleave in range(ninterleave):
baselines = io.baselines_from_filelist_position(file, datafiles)
if ninterleave > 1:
fname = f'temp.fragment.part.%d.interleave_{interleave}.h5' % filenum
else:
fname = f'temp.fragment.part.%d.h5' % filenum
fragment_filename = tmp_path + fname
frf.load_tophat_frfilter_and_write(datafiles, baseline_list=baselines, calfile_list=cals,
spw_range=[0, 20], cache_dir=cdir, read_cache=True, write_cache=True,
res_outfilename=fragment_filename, clobber=True, case='sky')
# load in fragment and make sure the number of baselines is equal to the length of the baseline list
hd_fragment = io.HERAData(str(fragment_filename))
assert len(hd_fragment.bls) == len(baselines)
assert hd_fragment.Ntimes == 60
assert hd_fragment.Nfreqs == 20

fragments = glob.glob(DATA_PATH + '/test_output/temp.fragment.h5.part*')
fragments = sorted(glob.glob(DATA_PATH + '/test_output/temp.fragment.h5.part*'))
# reconstitute the filtered data
for filenum, file in enumerate(datafiles):
# reconstitute
# AEW -- 5-10-2023 -- I AM HERE!
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good to know!

fname = 'temp.reconstituted.part.%d.h5' % filenum
vis_clean.time_chunk_from_baseline_chunks(time_chunk_template=file,
baseline_chunk_files=glob.glob(str(tmp_path / 'temp.fragment.part.*.h5')), clobber=True,
outfilename=str(tmp_path / fname))
baseline_chunk_files=glob.glob(str(tmp_path + 'temp.fragment.part.*.h5')), clobber=True,
outfilename=str(tmp_path + fname))
# load in the reconstituted files.
hd_reconstituted = io.HERAData(glob.glob(str(tmp_path / 'temp.reconstituted.part.*.h5')))
hd_reconstituted = io.HERAData(glob.glob(str(tmp_path + 'temp.reconstituted.part.*.h5')))
hd_reconstituted.read()
# compare to xtalk filtering the whole file.
frf.load_tophat_frfilter_and_write(datafile_list=os.path.join(DATA_PATH, "test_input/zen.2458101.46106.xx.HH.OCR_53x_54x_only.uvh5"),
calfile_list=os.path.join(DATA_PATH, "test_input/zen.2458101.46106.xx.HH.uv.abs.calfits_54x_only"),
res_outfilename=str(tmp_path / 'temp.h5'), clobber=True, spw_range=[0, 20], case='sky')
hd = io.HERAData(str(tmp_path / 'temp.h5'))
res_outfilename=str(tmp_path + 'temp.h5'), clobber=True, spw_range=[0, 20], case='sky')
hd = io.HERAData(str(tmp_path + 'temp.h5'))
hd.read()
assert np.all(np.isclose(hd.data_array, hd_reconstituted.data_array))
assert np.all(np.isclose(hd.flag_array, hd_reconstituted.flag_array))
Expand All @@ -1179,23 +1187,25 @@ def test_time_chunk_from_baseline_chunks(self, tmp_path):
# reconstitute
fname = 'temp.reconstituted.part.%d.h5' % filenum
vis_clean.time_chunk_from_baseline_chunks(time_chunk_template=file,
baseline_chunk_files=glob.glob(str(tmp_path / 'temp.fragment.part.*.h5')), clobber=True,
outfilename=str(tmp_path / fname), time_bounds=True)
baseline_chunk_files=glob.glob(str(tmp_path + 'temp.fragment.part.*.h5')), clobber=True,
outfilename=str(tmp_path + fname), time_bounds=True)
# load in the reconstituted files.
hd_reconstituted = io.HERAData(glob.glob(str(tmp_path / 'temp.reconstituted.part.*.h5')))
hd_reconstituted = io.HERAData(glob.glob(str(tmp_path + 'temp.reconstituted.part.*.h5')))
hd_reconstituted.read()
# compare to xtalk filtering the whole file.
frf.load_tophat_frfilter_and_write(datafile_list=os.path.join(DATA_PATH, "test_input/zen.2458101.46106.xx.HH.OCR_53x_54x_only.uvh5"),
calfile_list=os.path.join(DATA_PATH, "test_input/zen.2458101.46106.xx.HH.uv.abs.calfits_54x_only"),
res_outfilename=str(tmp_path / 'temp.h5'), clobber=True, spw_range=[0, 20], case='sky')
hd = io.HERAData(str(tmp_path / 'temp.h5'))
res_outfilename=str(tmp_path + 'temp.h5'),
clobber=True, spw_range=[0, 20], case='sky')
hd = io.HERAData(str(tmp_path + 'temp.h5'))
hd.read()
assert np.all(np.isclose(hd.data_array, hd_reconstituted.data_array))
assert np.all(np.isclose(hd.flag_array, hd_reconstituted.flag_array))
assert np.all(np.isclose(hd.nsample_array, hd_reconstituted.nsample_array))
# check warning.
with pytest.warns(RuntimeWarning):
vis_clean.time_chunk_from_baseline_chunks(datafiles[0], baseline_chunk_files=datafiles[1:], clobber=True, outfilename=str(tmp_path / fname), time_bounds=True)
vis_clean.time_chunk_from_baseline_chunks(datafiles[0], baseline_chunk_files=datafiles[1:],
clobber=True, outfilename=str(tmp_path + fname), time_bounds=True)

def test_discard_autocorr_imag():
hd = io.HERAData(os.path.join(DATA_PATH, "test_input/zen.2458101.46106.xx.HH.OCR_53x_54x_only.uvh5"))
Expand All @@ -1217,5 +1227,3 @@ def test_discard_autocorr_imag():
assert data1[first_key].dtype == np.complex64
assert np.all(data1[first_key].imag == 0)
assert np.all(data1[first_key] == data0[first_key])


102 changes: 76 additions & 26 deletions hera_cal/vis_clean.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from .datacontainer import DataContainer
from .utils import echo
from .flag_utils import factorize_flags

import re


def discard_autocorr_imag(data_container, keys=None):
Expand Down Expand Up @@ -794,8 +794,7 @@
# get filter properties
mfrate = max_frate[k] if max_frate is not None else None
filter_centers, filter_half_widths = gen_filter_properties(ax=ax, horizon=horizon,
standoff=standoff, min_dly=min_dly,
bl_len=self.bllens[k[:2]], max_frate=mfrate)
standoff=standoff, min_dly=min_dly, bl_len=self.bllens[k[:2]], max_frate=mfrate)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't tell what's happening on this line, but it seems weird

if mode != 'clean':
suppression_factors = [[tol], [tol]] if ax == 'both' else [tol]
self.fourier_filter(filter_centers=filter_centers, filter_half_widths=filter_half_widths,
Expand Down Expand Up @@ -2023,7 +2022,9 @@
output will trim the extra frequencies in the time_chunk and write out trimmed freqs. The same is true
for polarizations.
baseline_chunk_files : list of strings
list of paths to baseline-chunk files to select time-chunk file from.
list of paths to baseline-chunk files to select time-chunk file from. If the files have "_interleave" in their title then
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this any concern that this will be triggered accidentally? Do we want to have some kind of boolean trigger (default false) for this behavior or do you think that's too rare of an edge case to be worried about?

the method will automatically identify the number of unique interleaves, chunk the file list up into interleaved sets and
retrieve integrations based on the time in the first file.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For me, just having read this docstring, I'm still not clear on exactly what's happening here. Can you maybe provide a more clear example in the docstring?

outfilename : string
name of the output file to write.
clobber : bool optional.
Expand All @@ -2040,11 +2041,38 @@
-------
Nothing
"""
# check whether "interleave" is in baseline chunk filenames. If so, make sure that they all have "interleave",
# split them into sets, and make sure that the sets all have the same number of files.
interleave_mode = "interleave" in baseline_chunk_files[0]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not consistent with the docstring, which states that if "_interleave" is in the filename, it'll do the interleave mode


for fname in baseline_chunk_files:
if "interleave" not in fname and interleave_mode or \
"interleave" in fname and not interleave_mode:
raise ValueError("must not have a subset of files with 'interleave' in name.")

Check warning on line 2051 in hera_cal/vis_clean.py

View check run for this annotation

Codecov / codecov/patch

hera_cal/vis_clean.py#L2051

Added line #L2051 was not covered by tests
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a nicer error here would be "Cannot have some baseline_chunk_files with "_interleave" in their name, while other's don't"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Furthermore, add this info to the docstring (i.e. say that all or none of the files must have "_interleave" in the name)

if interleave_mode:
interleave_indices = np.unique([int(re.findall("interleave_[0-9]{1,10}", fname)[0][11:]) for fname in baseline_chunk_files])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels rather hard-coded

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At minimum, the precise expectation for the file name format should be documented

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. I actually think a much more flexible and future-proof way of doing this would be to have a new argument to the function, like interleave_regex, which if provided, will do the above (it could be the above by default, but probably better to define it with a capture group instead).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels like a function that should probably be broken out for future use.

ninterleave = len(interleave_indices)
interleave_sets = {}
interleave_index_dict = {}
for inum, interleave in enumerate(interleave_indices):
interleave_sets[inum] = sorted([fname for fname in baseline_chunk_files if f'interleave_{inum}' in fname])
interleave_index_dict[inum] = interleave
# check that all interleave sets have the same length.
if len(np.unique([len(interleave_sets[k]) for k in interleave_sets])) > 1:
raise ValueError("If you are providing interleaved files, each interleaved set must have the same number of files in it!")

Check warning on line 2062 in hera_cal/vis_clean.py

View check run for this annotation

Codecov / codecov/patch

hera_cal/vis_clean.py#L2062

Added line #L2062 was not covered by tests
else:
ninterleave = 1
interleave_sets = {0: baseline_chunk_files}
interleave_index_dict = {0: 0}

hd_time_chunk = io.HERAData(time_chunk_template)
hd_baseline_chunk = io.HERAData(baseline_chunk_files[0])
times = hd_time_chunk.times
hd_baseline_chunk = io.HERAData(baseline_chunk_files[0])
freqs = hd_baseline_chunk.freqs
polarizations = hd_baseline_chunk.pols



# read in the template file, but only include polarizations, frequencies
# from the baseline_chunk_file files.
if not time_bounds:
Expand All @@ -2056,34 +2084,56 @@
# for each baseline_chunk_file, read in only the times relevant to the templatefile.
# and update the data, flags, nsamples array of the template file
# with the baseline_chunk_file data.
for baseline_chunk_file in baseline_chunk_files:
hd_baseline_chunk = io.HERAData(baseline_chunk_file)
# find times that are close.
tload = []
# use tolerance in times that is set by the time resolution of the dataset.
atol = np.median(np.diff(hd_baseline_chunk.times)) / 10.
all_times = np.unique(hd_baseline_chunk.times)
for t in all_times:
if np.any(np.isclose(t, hd_time_chunk.times, atol=atol, rtol=0)):
tload.append(t)
d, f, n = hd_baseline_chunk.read(times=tload, axis='blt')
hd_time_chunk.update(flags=f, data=d, nsamples=n)
# now that we've updated everything, we write the output file.
hd_time_chunk.write_uvh5(outfilename, clobber=clobber)
for inum in interleave_sets:
for baseline_chunk_file in interleave_sets[inum]:
hd_baseline_chunk = io.HERAData(baseline_chunk_file)
# find times that are close.
tload = []
# use tolerance in times that is set by the time resolution of the dataset.
atol = np.median(np.diff(hd_baseline_chunk.times)) / 10.
all_times = np.unique(hd_baseline_chunk.times)
for t in all_times:
if np.any(np.isclose(t, hd_time_chunk.times, atol=atol, rtol=0)):
tload.append(t)
d, f, n = hd_baseline_chunk.read(times=tload, axis='blt')
hd_time_chunk.update(flags=f, data=d, nsamples=n)
# now that we've updated everything, we write the output file.
if ninterleave > 1:
outfilename_i = outfilename.replace('.uvh5', '.interleave_{}.uvh5')
else:
outfilename_i = outfilename

hd_time_chunk.write_uvh5(outfilename_i, clobber=clobber)
# reinstantiate writer file.
hd_time_chunk = io.HERAData(time_chunk_template)
hd_time_chunk.read(times=times, frequencies=freqs, polarizations=polarizations)

else:
dt_time_chunk = np.median(np.diff(hd_time_chunk.times)) / 2.
tmax = hd_time_chunk.times.max() + dt_time_chunk
tmin = hd_time_chunk.times.min() - dt_time_chunk
hd_combined = io.HERAData(baseline_chunk_files)
t_select0 = (hd_baseline_chunk.times >= tmin) & (hd_baseline_chunk.times < tmax)
# use same time selection for all interleaves even though the times don't line
# up. We need the indices too.
# if the interleaves have different Ntimes,
# we will only use the Ntimes from the first interleave.
t_select = t_select0[:hd_baseline_chunk.Ntimes]
# we only compare centers of baseline files to time limits of time-file.
# this is to prevent integrations that straddle file boundaries from being dropped.
# when we perform reconstitution.
t_select = (hd_baseline_chunk.times >= tmin) & (hd_baseline_chunk.times < tmax)
if np.any(t_select):
hd_combined.read(times=hd_baseline_chunk.times[t_select], axis='blt')
hd_combined.write_uvh5(outfilename, clobber=clobber)
else:
warnings.warn("no times selected. No outputfile produced.", RuntimeWarning)

for inum in interleave_sets:
hd_baseline_chunk_iset = io.HERAData(interleave_sets[inum][0])
hd_combined = io.HERAData(interleave_sets[inum])
if np.any(t_select):
hd_combined.read(times=hd_baseline_chunk_iset.times[t_select], axis='blt')
if ninterleave > 1:
outfilename_interleave = outfilename.replace('.uvh5', f'.interleave_{interleave_index_dict[inum]}.uvh5')
else:
outfilename_interleave = outfilename
hd_combined.write_uvh5(outfilename_interleave, clobber=clobber)
else:
warnings.warn("no times selected. No outputfile produced.", RuntimeWarning)


def time_chunk_from_baseline_chunks_argparser():
Expand Down