diff --git a/AegeanTools/BANE.py b/AegeanTools/BANE.py index d48593e5..668c0118 100755 --- a/AegeanTools/BANE.py +++ b/AegeanTools/BANE.py @@ -1,14 +1,8 @@ #! /usr/bin/env python - """ This module contains all of the BANE specific code The function filter_image should be imported from elsewhere and run as is. """ - -__author__ = 'Paul Hancock' -__version__ = 'v1.6.5' -__date__ = '2018-07-05' - # standard imports from astropy.io import fits import copy @@ -21,12 +15,17 @@ import sys from tempfile import NamedTemporaryFile from time import gmtime, strftime - # Aegean tools from .fits_interp import compress +__author__ = 'Paul Hancock' +__version__ = 'v1.7' +__date__ = '2018-07-28' + # global variables for multiprocessing ibkg = irms = None +events = [] + def sigmaclip(arr, lo, hi, reps=3): """ @@ -54,6 +53,10 @@ def sigmaclip(arr, lo, hi, reps=3): clipped : numpy.array The clipped array. The clipped array may be empty! + mean : float + The mean of the array, possibly nan + std : float + The std of the array, possibly nan Notes ----- @@ -62,7 +65,7 @@ def sigmaclip(arr, lo, hi, reps=3): clipped = np.array(arr)[np.isfinite(arr)] if len(clipped) < 1: - return clipped + return clipped, np.nan, np.nan std = np.std(clipped) mean = np.mean(clipped) @@ -76,7 +79,7 @@ def sigmaclip(arr, lo, hi, reps=3): mean = np.mean(clipped) if 2*abs(pstd-std)/(pstd+std) < 0.2: break - return clipped + return clipped, mean, std def _sf2(args): @@ -101,7 +104,7 @@ def _sf2(args): raise Exception("".join(traceback.format_exception(*sys.exc_info()))) -def sigma_filter(filename, region, step_size, box_size, shape, dobkg=True): +def sigma_filter(filename, region, step_size, box_size, shape, sid): """ Calculate the background and rms for a sub region of an image. The results are written to shared memory - irms and ibkg. @@ -123,8 +126,8 @@ def sigma_filter(filename, region, step_size, box_size, shape, dobkg=True): shape : tuple The shape of the fits image - dobkg : bool - Do a background calculation. If false then only the rms is calculated. Default = True. + sid : int + The slice number Returns ------- @@ -138,6 +141,9 @@ def sigma_filter(filename, region, step_size, box_size, shape, dobkg=True): data_row_min = max(0, ymin - box_size[0]//2) data_row_max = min(shape[0], ymax + box_size[0]//2) + # the index into data of the row ymin + data_ymin = ymin - data_row_min + # Figure out how many axes are in the datafile NAXIS = fits.getheader(filename)["NAXIS"] @@ -191,26 +197,64 @@ def box(r, c): rms_points = [] rms_values = [] + # indices of the shape we want to write to (not the shape of data) + gx, gy = np.mgrid[ymin:ymax, 0:shape[1]] + for row, col in locations(step_size, ymin-data_row_min, ymax-data_row_min, 0, shape[1]): r_min, r_max, c_min, c_max = box(row, col) new = data[r_min:r_max, c_min:c_max] new = np.ravel(new) - new = sigmaclip(new, 3, 3) + new, bkg, rms = sigmaclip(new, 3, 3) + # If we are left with (or started with) no data, then just move on + if len(new) < 1: + continue + + bkg_points.append((row + data_row_min, col)) # these coords need to be indices into the larger array + bkg_values.append(bkg) + + if len(bkg_points) > 1: + logging.debug("Interpolating bkg") + ifunc = LinearNDInterpolator(bkg_points, bkg_values) + interpolated_bkg = np.array(ifunc((gx, gy)), dtype=np.float32) + del ifunc + else: + logging.debug("bkg is all nans") + interpolated_bkg = np.empty(gx.shape, dtype=np.float32) * np.nan + with ibkg.get_lock(): + logging.debug("Writing bkg to sharemem") + for i, row in enumerate(interpolated_bkg): + ibkg[i + ymin] = np.ctypeslib.as_ctypes(row) + logging.debug(" .. done writing") + + # signal that this worker is at the barrier + events[sid].set() + + + # wait for neighbour workers to reach the barrier + if sid > 0: + logging.debug("{0} is waiting for {1}".format(sid, sid-1)) + events[sid-1].wait() + if sid < len(events)-1: + logging.debug("{0} is waiting for {1}".format(sid, sid + 1)) + events[sid+1].wait() + logging.debug("{0} background subtraction".format(sid)) + + for i in range(data_row_max-data_row_min): + data[i, :] = data[i, :] - ibkg[data_row_min + i] + + + for row, col in locations(step_size, ymin-data_row_min, ymax-data_row_min, 0, shape[1]): + r_min, r_max, c_min, c_max = box(row, col) + new = data[r_min:r_max, c_min:c_max] + new = np.ravel(new) + new, bkg, rms = sigmaclip(new, 3, 3) # If we are left with (or started with) no data, then just move on if len(new) < 1: continue - if dobkg: - bkg = np.median(new) - bkg_points.append((row + data_row_min, col)) # these coords need to be indices into the larger array - bkg_values.append(bkg) - rms = np.std(new) rms_points.append((row + data_row_min, col)) rms_values.append(rms) - # indices of the shape we want to write to (not the shape of data) - gx, gy = np.mgrid[ymin:ymax, 0:shape[1]] - # If the bkg/rms calculation above didn't yield any points, then our interpolated values are all nans if len(rms_points) > 1: logging.debug("Interpolating rms") ifunc = LinearNDInterpolator(rms_points, rms_values) @@ -227,20 +271,16 @@ def box(r, c): irms[i + ymin] = np.ctypeslib.as_ctypes(row) logging.debug(" .. done writing rms") - if dobkg: - if len(bkg_points)>1: - logging.debug("Interpolating bkg") - ifunc = LinearNDInterpolator(bkg_points, bkg_values) - interpolated_bkg = np.array(ifunc((gx, gy)), dtype=np.float32) - del ifunc - else: - logging.debug("bkg is all nans") - interpolated_bkg = np.empty(gx.shape, dtype=np.float32)*np.nan - with ibkg.get_lock(): - logging.debug("Writing bkg to sharemem") - for i, row in enumerate(interpolated_bkg): - ibkg[i + ymin] = np.ctypeslib.as_ctypes(row) - logging.debug(" .. done writing bkg") + ## + # Apply mask + ## + + # with ibkg.get_lock(): + # pass + # + # with irms.get_lock(): + # pass + logging.debug('rows {0}-{1} finished at {2}'.format(ymin, ymax, strftime("%Y-%m-%d %H:%M:%S", gmtime()))) del bkg_points, bkg_values del rms_points, rms_values @@ -276,7 +316,7 @@ def mask_img(data, mask_data): logging.info("failed to mask file, not a critical failure") -def filter_mc_sharemem(filename, step_size, box_size, cores, shape, dobkg=True, nslice=8): +def filter_mc_sharemem(filename, step_size, box_size, cores, shape, nslice=None): """ Calculate the background and noise images corresponding to the input file. The calculation is done via a box-car approach and uses multiple cores and shared memory. @@ -302,9 +342,6 @@ def filter_mc_sharemem(filename, step_size, box_size, cores, shape, dobkg=True, shape : (int, int) The shape of the image in the given file. - dobkg : bool - If True then calculate the background, otherwise assume it is zero. - Returns ------- bkg, rms : numpy.ndarray @@ -313,19 +350,16 @@ def filter_mc_sharemem(filename, step_size, box_size, cores, shape, dobkg=True, if cores is None: cores = multiprocessing.cpu_count() - if nslice is None: + # update nslice if not set or if cores is 1 + if (nslice is None) or (cores==1): nslice = cores img_y, img_x = shape # initialise some shared memory - if dobkg: - global ibkg - bkg = np.ctypeslib.as_ctypes(np.empty(shape, dtype=np.float32)) - ibkg = multiprocessing.sharedctypes.Array(bkg._type_, bkg, lock=True) - else: - bkg = None - ibkg = None - global irms + global ibkg, irms + bkg = np.ctypeslib.as_ctypes(np.empty(shape, dtype=np.float32)) + ibkg = multiprocessing.sharedctypes.Array(bkg._type_, bkg, lock=True) + rms = np.ctypeslib.as_ctypes(np.empty(shape, dtype=np.float32)) irms = multiprocessing.sharedctypes.Array(rms._type_, rms, lock=True) @@ -352,9 +386,13 @@ def filter_mc_sharemem(filename, step_size, box_size, cores, shape, dobkg=True, logging.debug("ymaxs {0}".format(ymaxs)) args = [] - for ymin, ymax in zip(ymins, ymaxs): + for i, (ymin, ymax) in enumerate(zip(ymins, ymaxs)): region = (ymin, ymax) - args.append((filename, region, step_size, box_size, shape, dobkg)) + args.append((filename, region, step_size, box_size, shape, i)) + + # set up a list of events to synchronise the workers, one event per strip + global events + events = [multiprocessing.Event() for _ in range(len(ymaxs))] # start a new process for each task, hopefully to reduce residual memory use pool = multiprocessing.Pool(processes=cores, maxtasksperchild=1) @@ -368,13 +406,10 @@ def filter_mc_sharemem(filename, step_size, box_size, cores, shape, dobkg=True, pool.close() pool.join() - rms = np.array(irms) - if dobkg: - bkg = np.array(ibkg) return bkg, rms -def filter_image(im_name, out_base, step_size=None, box_size=None, twopass=False, cores=None, mask=True, compressed=False, nslice=None): +def filter_image(im_name, out_base, step_size=None, box_size=None, cores=None, mask=True, compressed=False, nslice=None): """ Create a background and noise image from an input image. Resulting images are written to `outbase_bkg.fits` and `outbase_rms.fits` @@ -389,9 +424,6 @@ def filter_image(im_name, out_base, step_size=None, box_size=None, twopass=False Tuple of the x,y step size in pixels box_size : (int,int) The size of the box in piexls - twopass : bool - Perform a second pass calculation to ensure that the noise is not contaminated by the background. - Default = False cores : int Number of CPU corse to use. Default = all available @@ -447,6 +479,7 @@ def filter_image(im_name, out_base, step_size=None, box_size=None, twopass=False logging.info("using grid_size {0}, box_size {1}".format(step_size,box_size)) logging.info("on data shape {0}".format(shape)) + bkg, rms = filter_mc_sharemem(im_name, step_size=step_size, box_size=box_size, cores=cores, shape=shape, nslice=nslice) logging.info("done") @@ -454,22 +487,6 @@ def filter_image(im_name, out_base, step_size=None, box_size=None, twopass=False bkg = np.array(bkg, dtype=np.float32) rms = np.array(rms, dtype=np.float32) - if twopass: - # TODO: check what this does for our memory usage - # Answer: The interpolation step peaks at about 5x the normal value. - tempfile = NamedTemporaryFile(delete=False) - data = fits.getdata(im_name) - bkg - # write 32bit floats to reduce memory overhead - write_fits(np.array(data, dtype=np.float32), header, tempfile) - tempfile.close() - temp_name = tempfile.name - del data, tempfile, rms - logging.info("running second pass to get a better rms") - junk, rms = filter_mc_sharemem(temp_name, step_size=step_size, box_size=box_size, cores=cores, shape=shape, dobkg=False, nslice=nslice) - del junk - rms = np.array(rms, dtype=np.float32) - os.remove(temp_name) - bkg_out = '_'.join([os.path.expanduser(out_base), 'bkg.fits']) rms_out = '_'.join([os.path.expanduser(out_base), 'rms.fits']) diff --git a/AegeanTools/regions.py b/AegeanTools/regions.py old mode 100644 new mode 100755 index 1875e0a7..41220e47 --- a/AegeanTools/regions.py +++ b/AegeanTools/regions.py @@ -3,7 +3,7 @@ Describe sky areas as a collection of HEALPix pixels """ -from __future__ import print_function +from __future__ import print_function, division __author__ = "Paul Hancock" @@ -181,7 +181,7 @@ def _renorm(self): # remove the four pixels from this level self.pixeldict[d].difference_update(nset) # add a new pixel to the next level up - self.pixeldict[d-1].add(p/4) + self.pixeldict[d-1].add(p//4) self.demoted = set() return @@ -243,7 +243,7 @@ def union(self, other, renorm=True): for d in range(self.maxdepth+1, other.maxdepth+1): for p in other.pixeldict[d]: # promote this pixel to self.maxdepth - pp = p/4**(d-self.maxdepth) + pp = p//4**(d-self.maxdepth) self.pixeldict[self.maxdepth].add(pp) if renorm: self._renorm() diff --git a/CHANGELOG.md b/CHANGELOG.md index dce336ca..8a8f8a4d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,17 @@ +2018-07-28 +========== +BANE +- version is now v1.7.0 +- address a segfault problem that occurs on large images when `--onepass` is not used (#75) +- making use of `--stripes` greater than 1 requires at least `--cores=2` +- further reduce memory footprint: + - reduce the amount of interpolation stages that are required by 1 per stripe + - do background subtraction in the sub-processes to avoid loading the entire image in the main process +- deprecate options `--onepass`/`--twopass` and set a warning when used + +MIMAS +- fix a bug where `save_region` didn't work in python3 + 2018-07-19 ========== Aegean diff --git a/scripts/BANE b/scripts/BANE index 869739f2..ad619137 100755 --- a/scripts/BANE +++ b/scripts/BANE @@ -25,9 +25,8 @@ if __name__=="__main__": help='The [x,y] size of the box over which the rms/bkg is calculated. Default = 5*grid.') parser.add_option('--cores', dest='cores', type='int', help='Number of cores to use. Default = all available.') - parser.add_option('--onepass', dest='twopass', action='store_false', help='the opposite of twopass. default=False') - parser.add_option('--twopass', dest='twopass', action='store_true', - help='Calculate the bkg and rms in a two passes instead of one. (when the bkg changes rapidly)') + parser.add_option('--onepass', dest='twopass', action='store_true', help='DEPRECATED') + parser.add_option('--twopass', dest='twopass', action='store_true', help='DEPRECATED') parser.add_option('--nomask', dest='mask', action='store_false', default=True, help="Don't mask the output array [default = mask]") parser.add_option('--noclobber', dest='clobber', action='store_false', default=True, @@ -40,7 +39,7 @@ if __name__=="__main__": parser.add_option('--stripes', dest='stripes', type='int', nargs=1, default=None, help='Number of slices.') - parser.set_defaults(out_base=None, step_size=None, box_size=None, twopass=True, cores=None, usescipy=False, debug=False) + parser.set_defaults(out_base=None, step_size=None, box_size=None, twopass=False, cores=None, usescipy=False, debug=False) (options, args) = parser.parse_args() @@ -65,6 +64,9 @@ if __name__=="__main__": if options.out_base is None: options.out_base = os.path.splitext(filename)[0] + if options.twopass: + logging.warn("--onepass and --twopass have been deprecated and are ignored, and will be removed in a future release.") + if not options.clobber: bkgout, rmsout = options.out_base+'_bkg.fits', options.out_base+'_rms.fits' if os.path.exists(bkgout) and os.path.exists(rmsout): @@ -73,6 +75,6 @@ if __name__=="__main__": sys.exit(1) BANE.filter_image(im_name=filename, out_base=options.out_base, step_size=options.step_size, - box_size=options.box_size, twopass=options.twopass, cores=options.cores, + box_size=options.box_size, cores=options.cores, mask=options.mask, compressed=options.compress, nslice=options.stripes) diff --git a/tests/test_AerRes.py b/tests/test_AeRes.py old mode 100644 new mode 100755 similarity index 100% rename from tests/test_AerRes.py rename to tests/test_AeRes.py diff --git a/tests/test_BANE.py b/tests/test_BANE.py old mode 100644 new mode 100755 index 0d6b7d2c..2dd7e101 --- a/tests/test_BANE.py +++ b/tests/test_BANE.py @@ -21,15 +21,15 @@ def test_sigmaclip(): # normal usage case data = np.random.random(100) data[13] = np.nan - if not len(BANE.sigmaclip(data, 3, 4, reps=4)) > 0: + if not len(BANE.sigmaclip(data, 3, 4, reps=4)[0]) > 0: raise AssertionError() # test list where all elements get clipped - if not len(BANE.sigmaclip([-10, 10], 1, 2, reps=2)) == 0: + if not len(BANE.sigmaclip([-10, 10], 1, 2, reps=2)[0]) == 0: raise AssertionError() # test empty list - if not len(BANE.sigmaclip([], 0, 3)) == 0: + if not len(BANE.sigmaclip([], 0, 3)[0]) == 0: raise AssertionError() @@ -62,7 +62,7 @@ def test_filter_image(): raise AssertionError() os.remove(bkg) - BANE.filter_image(fname, cores=2, out_base=outbase, twopass=True, compressed=True) + BANE.filter_image(fname, cores=2, out_base=outbase, compressed=True) if not os.path.exists(rms): raise AssertionError() @@ -92,7 +92,7 @@ def test_ND_images(): pass else: raise AssertionError() - + if __name__ == "__main__": # introspect and run all the functions starting with 'test' diff --git a/tests/test_MIMAS.py b/tests/test_MIMAS.py new file mode 100755 index 00000000..2f1503d4 --- /dev/null +++ b/tests/test_MIMAS.py @@ -0,0 +1,150 @@ +#! /usr/bin/env python +""" +Test MIMAS.py +""" + +from __future__ import print_function +from AegeanTools import MIMAS +from AegeanTools.regions import Region +import numpy as np +import os + +__author__ = 'Paul Hancock' + + +def test_Dummy(): + """test the Dummy class""" + a = MIMAS.Dummy() + return a + + +def test_galactic2fk5(): + """test function galactic2fk5""" + l, b = 12, 0 + ra, dec = MIMAS.galactic2fk5(l, b) + return ra, dec + + +def test_mask_plane(): + """TODO""" + return + + +def test_mask_file(): + """TODO""" + return + + +def test_mask_table(): + """TODO""" + return + + +def test_mask_catalog(): + """TODO""" + return + + +def test_mim2reg(): + """TODO""" + return + + +def test_mim2fits(): + """TODO""" + return + + +def test_box2poly(): + """TODO""" + return + + +def test_circle2circle(): + """TODO""" + return + + +def test_poly2poly(): + """TODO""" + return + + +def test_reg2mim(): + """TODO""" + return + + +def test_combine_regions(): + """TODO""" + return + + +def test_intersect_regions(): + """test the intersection of multiple regions""" + cap = Region() + cap.add_circles(0, np.radians(-90), np.radians(10)) + cfile = 'cap.mim' + MIMAS.save_region(cap, cfile) + + # MIMAS should complain about just one file to load + try: + MIMAS.intersect_regions([cfile]) + except Exception as e: + pass + else: + raise AssertionError() + + # intersect a region with itself should produce same area/pixels + cap2 = MIMAS.intersect_regions([cfile, cfile]) + if cap2.get_area() != cap.get_area(): + raise AssertionError("intersect broken on reflexive test") + if not np.all(cap2.demoted == cap.demoted): + raise AssertionError("intersect broken on reflexive test") + + # a new region near the equator with no overlap + cap2 = Region() + cap2.add_circles(0, 0, np.radians(3)) + c2file = 'cap2.mim' + MIMAS.save_region(cap2, c2file) + + # the intersection should have no area + i = MIMAS.intersect_regions([cfile, c2file]) + if not (i.get_area() == 0.): + raise AssertionError("intersection doesn't give null result") + + os.remove(cfile) + os.remove(c2file) + return + + +def test_save_region(): + """test save_region""" + cap = Region() + cap.add_circles(0, np.radians(-90), np.radians(10)) + cfile = 'cap.mim' + MIMAS.save_region(cap, cfile) + if not os.path.exists(cfile): + raise AssertionError("Failed to write file") + os.remove(cfile) + return + + +def test_save_as_image(): + """test save_as_image""" + cap = Region() + cap.add_circles(0, np.radians(30), np.radians(10)) + cfile = 'circle.png' + MIMAS.save_as_image(cap, cfile) + if not os.path.exists(cfile): + raise AssertionError("Failed to write file") + os.remove(cfile) + return + + +if __name__ == "__main__": + # introspect and run all the functions starting with 'test' + for f in dir(): + if f.startswith('test'): + print(f) + globals()[f]() diff --git a/tests/test_angle_tools.py b/tests/test_angle_tools.py old mode 100644 new mode 100755 diff --git a/tests/test_cluster.py b/tests/test_cluster.py old mode 100644 new mode 100755 diff --git a/tests/test_fits_image.py b/tests/test_fits_image.py old mode 100644 new mode 100755 diff --git a/tests/test_fits_interp.py b/tests/test_fits_interp.py old mode 100644 new mode 100755 diff --git a/tests/test_fitting.py b/tests/test_fitting.py old mode 100644 new mode 100755 diff --git a/tests/test_misc.py b/tests/test_misc.py old mode 100644 new mode 100755 diff --git a/tests/test_models.py b/tests/test_models.py old mode 100644 new mode 100755 diff --git a/tests/test_msq2.py b/tests/test_msq2.py old mode 100644 new mode 100755 diff --git a/tests/test_regions.py b/tests/test_regions.py old mode 100644 new mode 100755 index 62136299..433b5218 --- a/tests/test_regions.py +++ b/tests/test_regions.py @@ -4,13 +4,12 @@ """ from __future__ import print_function - -__author__ = 'Paul Hancock' - from AegeanTools.regions import Region import numpy as np import os +__author__ = 'Paul Hancock' + def test_radec2sky(): """Test function: Region.radec2sky""" @@ -202,7 +201,16 @@ def test_intersect(): b = Region(maxdepth=7) b.add_circles(0, np.radians(-90), np.radians(0.5)) a.intersect(b) - if not (a.get_area() == b.get_area()): raise AssertionError("test_intersect FAILED") + if not (a.get_area() == b.get_area()): + raise AssertionError("test_intersect FAILED") + + a = Region(maxdepth=8) + a.add_circles(0, np.radians(75), np.radians(3)) + c = Region(maxdepth=8) + c.add_circles(0, np.radians(90), np.radians(10)) + a.intersect(c) + if not (a.get_area() == 0.): + raise AssertionError("test_intersect FAILED") def test_demote(): diff --git a/tests/test_wcs_helpers.py b/tests/test_wcs_helpers.py old mode 100644 new mode 100755