Skip to content

Commit

Permalink
Merge cf0f7ba into fd0f0e6
Browse files Browse the repository at this point in the history
  • Loading branch information
PaulHancock committed Jul 30, 2018
2 parents fd0f0e6 + cf0f7ba commit dfea5f3
Show file tree
Hide file tree
Showing 17 changed files with 92 additions and 279 deletions.
161 changes: 74 additions & 87 deletions AegeanTools/BANE.py
@@ -1,8 +1,14 @@
#! /usr/bin/env python

"""
This module contains all of the BANE specific code
The function filter_image should be imported from elsewhere and run as is.
"""

__author__ = 'Paul Hancock'
__version__ = 'v1.6.5'
__date__ = '2018-07-05'

# standard imports
from astropy.io import fits
import copy
Expand All @@ -13,18 +19,14 @@
import os
from scipy.interpolate import LinearNDInterpolator
import sys
from tempfile import NamedTemporaryFile
from time import gmtime, strftime

# Aegean tools
from .fits_interp import compress

__author__ = 'Paul Hancock'
__version__ = 'v1.7'
__date__ = '2018-07-28'

# global variables for multiprocessing
ibkg = irms = None
events = []


def sigmaclip(arr, lo, hi, reps=3):
"""
Expand Down Expand Up @@ -52,10 +54,6 @@ def sigmaclip(arr, lo, hi, reps=3):
clipped : numpy.array
The clipped array.
The clipped array may be empty!
mean : float
The mean of the array, possibly nan
std : float
The std of the array, possibly nan
Notes
-----
Expand All @@ -64,7 +62,7 @@ def sigmaclip(arr, lo, hi, reps=3):
clipped = np.array(arr)[np.isfinite(arr)]

if len(clipped) < 1:
return clipped, np.nan, np.nan
return clipped

std = np.std(clipped)
mean = np.mean(clipped)
Expand All @@ -78,7 +76,7 @@ def sigmaclip(arr, lo, hi, reps=3):
mean = np.mean(clipped)
if 2*abs(pstd-std)/(pstd+std) < 0.2:
break
return clipped, mean, std
return clipped


def _sf2(args):
Expand All @@ -103,7 +101,7 @@ def _sf2(args):
raise Exception("".join(traceback.format_exception(*sys.exc_info())))


def sigma_filter(filename, region, step_size, box_size, shape, sid):
def sigma_filter(filename, region, step_size, box_size, shape, dobkg=True):
"""
Calculate the background and rms for a sub region of an image. The results are
written to shared memory - irms and ibkg.
Expand All @@ -125,8 +123,8 @@ def sigma_filter(filename, region, step_size, box_size, shape, sid):
shape : tuple
The shape of the fits image
sid : int
The slice number
dobkg : bool
Do a background calculation. If false then only the rms is calculated. Default = True.
Returns
-------
Expand Down Expand Up @@ -193,64 +191,26 @@ def box(r, c):
rms_points = []
rms_values = []

# indices of the shape we want to write to (not the shape of data)
gx, gy = np.mgrid[ymin:ymax, 0:shape[1]]

for row, col in locations(step_size, ymin-data_row_min, ymax-data_row_min, 0, shape[1]):
r_min, r_max, c_min, c_max = box(row, col)
new = data[r_min:r_max, c_min:c_max]
new = np.ravel(new)
new, bkg, rms = sigmaclip(new, 3, 3)
# If we are left with (or started with) no data, then just move on
if len(new) < 1:
continue

bkg_points.append((row + data_row_min, col)) # these coords need to be indices into the larger array
bkg_values.append(bkg)

if len(bkg_points) > 1:
logging.debug("Interpolating bkg")
ifunc = LinearNDInterpolator(bkg_points, bkg_values)
interpolated_bkg = np.array(ifunc((gx, gy)), dtype=np.float32)
del ifunc
else:
logging.debug("bkg is all nans")
interpolated_bkg = np.empty(gx.shape, dtype=np.float32) * np.nan
with ibkg.get_lock():
logging.debug("Writing bkg to sharemem")
for i, row in enumerate(interpolated_bkg):
ibkg[i + ymin] = np.ctypeslib.as_ctypes(row)
logging.debug(" .. done writing")

# signal that this worker is at the barrier
events[sid].set()


# wait for neighbour workers to reach the barrier
if sid > 0:
logging.debug("{0} is waiting for {1}".format(sid, sid-1))
events[sid-1].wait()
if sid < len(events)-1:
logging.debug("{0} is waiting for {1}".format(sid, sid + 1))
events[sid+1].wait()
logging.debug("{0} background subtraction".format(sid))

for i in range(data_row_max-data_row_min):
data[i, :] = data[i, :] - ibkg[data_row_min + i]


for row, col in locations(step_size, ymin-data_row_min, ymax-data_row_min, 0, shape[1]):
r_min, r_max, c_min, c_max = box(row, col)
new = data[r_min:r_max, c_min:c_max]
new = np.ravel(new)
new, bkg, rms = sigmaclip(new, 3, 3)
new = sigmaclip(new, 3, 3)
# If we are left with (or started with) no data, then just move on
if len(new) < 1:
continue

if dobkg:
bkg = np.median(new)
bkg_points.append((row + data_row_min, col)) # these coords need to be indices into the larger array
bkg_values.append(bkg)
rms = np.std(new)
rms_points.append((row + data_row_min, col))
rms_values.append(rms)

# indices of the shape we want to write to (not the shape of data)
gx, gy = np.mgrid[ymin:ymax, 0:shape[1]]
# If the bkg/rms calculation above didn't yield any points, then our interpolated values are all nans
if len(rms_points) > 1:
logging.debug("Interpolating rms")
ifunc = LinearNDInterpolator(rms_points, rms_values)
Expand All @@ -267,16 +227,20 @@ def box(r, c):
irms[i + ymin] = np.ctypeslib.as_ctypes(row)
logging.debug(" .. done writing rms")

##
# Apply mask
##

# with ibkg.get_lock():
# pass
#
# with irms.get_lock():
# pass

if dobkg:
if len(bkg_points)>1:
logging.debug("Interpolating bkg")
ifunc = LinearNDInterpolator(bkg_points, bkg_values)
interpolated_bkg = np.array(ifunc((gx, gy)), dtype=np.float32)
del ifunc
else:
logging.debug("bkg is all nans")
interpolated_bkg = np.empty(gx.shape, dtype=np.float32)*np.nan
with ibkg.get_lock():
logging.debug("Writing bkg to sharemem")
for i, row in enumerate(interpolated_bkg):
ibkg[i + ymin] = np.ctypeslib.as_ctypes(row)
logging.debug(" .. done writing bkg")
logging.debug('rows {0}-{1} finished at {2}'.format(ymin, ymax, strftime("%Y-%m-%d %H:%M:%S", gmtime())))
del bkg_points, bkg_values
del rms_points, rms_values
Expand Down Expand Up @@ -312,7 +276,7 @@ def mask_img(data, mask_data):
logging.info("failed to mask file, not a critical failure")


def filter_mc_sharemem(filename, step_size, box_size, cores, shape, nslice=None):
def filter_mc_sharemem(filename, step_size, box_size, cores, shape, dobkg=True, nslice=8):
"""
Calculate the background and noise images corresponding to the input file.
The calculation is done via a box-car approach and uses multiple cores and shared memory.
Expand All @@ -338,6 +302,9 @@ def filter_mc_sharemem(filename, step_size, box_size, cores, shape, nslice=None)
shape : (int, int)
The shape of the image in the given file.
dobkg : bool
If True then calculate the background, otherwise assume it is zero.
Returns
-------
bkg, rms : numpy.ndarray
Expand All @@ -346,16 +313,19 @@ def filter_mc_sharemem(filename, step_size, box_size, cores, shape, nslice=None)

if cores is None:
cores = multiprocessing.cpu_count()
# update nslice if not set or if cores is 1
if (nslice is None) or (cores==1):
if nslice is None:
nslice = cores

img_y, img_x = shape
# initialise some shared memory
global ibkg, irms
bkg = np.ctypeslib.as_ctypes(np.empty(shape, dtype=np.float32))
ibkg = multiprocessing.sharedctypes.Array(bkg._type_, bkg, lock=True)

if dobkg:
global ibkg
bkg = np.ctypeslib.as_ctypes(np.empty(shape, dtype=np.float32))
ibkg = multiprocessing.sharedctypes.Array(bkg._type_, bkg, lock=True)
else:
bkg = None
ibkg = None
global irms
rms = np.ctypeslib.as_ctypes(np.empty(shape, dtype=np.float32))
irms = multiprocessing.sharedctypes.Array(rms._type_, rms, lock=True)

Expand All @@ -382,13 +352,9 @@ def filter_mc_sharemem(filename, step_size, box_size, cores, shape, nslice=None)
logging.debug("ymaxs {0}".format(ymaxs))

args = []
for i, (ymin, ymax) in enumerate(zip(ymins, ymaxs)):
for ymin, ymax in zip(ymins, ymaxs):
region = (ymin, ymax)
args.append((filename, region, step_size, box_size, shape, i))

# set up a list of events to synchronise the workers, one event per strip
global events
events = [multiprocessing.Event() for _ in range(len(ymaxs))]
args.append((filename, region, step_size, box_size, shape, dobkg))

# start a new process for each task, hopefully to reduce residual memory use
pool = multiprocessing.Pool(processes=cores, maxtasksperchild=1)
Expand All @@ -402,10 +368,13 @@ def filter_mc_sharemem(filename, step_size, box_size, cores, shape, nslice=None)
pool.close()
pool.join()

rms = np.array(irms)
if dobkg:
bkg = np.array(ibkg)
return bkg, rms


def filter_image(im_name, out_base, step_size=None, box_size=None, cores=None, mask=True, compressed=False, nslice=None):
def filter_image(im_name, out_base, step_size=None, box_size=None, twopass=False, cores=None, mask=True, compressed=False, nslice=None):
"""
Create a background and noise image from an input image.
Resulting images are written to `outbase_bkg.fits` and `outbase_rms.fits`
Expand All @@ -420,6 +389,9 @@ def filter_image(im_name, out_base, step_size=None, box_size=None, cores=None, m
Tuple of the x,y step size in pixels
box_size : (int,int)
The size of the box in piexls
twopass : bool
Perform a second pass calculation to ensure that the noise is not contaminated by the background.
Default = False
cores : int
Number of CPU corse to use.
Default = all available
Expand Down Expand Up @@ -475,14 +447,29 @@ def filter_image(im_name, out_base, step_size=None, box_size=None, cores=None, m

logging.info("using grid_size {0}, box_size {1}".format(step_size,box_size))
logging.info("on data shape {0}".format(shape))

bkg, rms = filter_mc_sharemem(im_name, step_size=step_size, box_size=box_size, cores=cores, shape=shape, nslice=nslice)
logging.info("done")

# force float 32s to avoid bloated files
bkg = np.array(bkg, dtype=np.float32)
rms = np.array(rms, dtype=np.float32)

if twopass:
# TODO: check what this does for our memory usage
# Answer: The interpolation step peaks at about 5x the normal value.
tempfile = NamedTemporaryFile(delete=False)
data = fits.getdata(im_name) - bkg
# write 32bit floats to reduce memory overhead
write_fits(np.array(data, dtype=np.float32), header, tempfile)
tempfile.close()
temp_name = tempfile.name
del data, tempfile, rms
logging.info("running second pass to get a better rms")
junk, rms = filter_mc_sharemem(temp_name, step_size=step_size, box_size=box_size, cores=cores, shape=shape, dobkg=False, nslice=nslice)
del junk
rms = np.array(rms, dtype=np.float32)
os.remove(temp_name)

bkg_out = '_'.join([os.path.expanduser(out_base), 'bkg.fits'])
rms_out = '_'.join([os.path.expanduser(out_base), 'rms.fits'])

Expand Down
6 changes: 3 additions & 3 deletions AegeanTools/regions.py 100755 → 100644
Expand Up @@ -3,7 +3,7 @@
Describe sky areas as a collection of HEALPix pixels
"""

from __future__ import print_function, division
from __future__ import print_function

__author__ = "Paul Hancock"

Expand Down Expand Up @@ -181,7 +181,7 @@ def _renorm(self):
# remove the four pixels from this level
self.pixeldict[d].difference_update(nset)
# add a new pixel to the next level up
self.pixeldict[d-1].add(p//4)
self.pixeldict[d-1].add(p/4)
self.demoted = set()
return

Expand Down Expand Up @@ -243,7 +243,7 @@ def union(self, other, renorm=True):
for d in range(self.maxdepth+1, other.maxdepth+1):
for p in other.pixeldict[d]:
# promote this pixel to self.maxdepth
pp = p//4**(d-self.maxdepth)
pp = p/4**(d-self.maxdepth)
self.pixeldict[self.maxdepth].add(pp)
if renorm:
self._renorm()
Expand Down
14 changes: 0 additions & 14 deletions CHANGELOG.md
@@ -1,17 +1,3 @@
2018-07-28
==========
BANE
- version is now v1.7.0
- address a segfault problem that occurs on large images when `--onepass` is not used (#75)
- making use of `--stripes` greater than 1 requires at least `--cores=2`
- further reduce memory footprint:
- reduce the amount of interpolation stages that are required by 1 per stripe
- do background subtraction in the sub-processes to avoid loading the entire image in the main process
- deprecate options `--onepass`/`--twopass` and set a warning when used

MIMAS
- fix a bug where `save_region` didn't work in python3

2018-07-19
==========
Aegean
Expand Down
12 changes: 5 additions & 7 deletions scripts/BANE
Expand Up @@ -25,8 +25,9 @@ if __name__=="__main__":
help='The [x,y] size of the box over which the rms/bkg is calculated. Default = 5*grid.')
parser.add_option('--cores', dest='cores', type='int',
help='Number of cores to use. Default = all available.')
parser.add_option('--onepass', dest='twopass', action='store_true', help='DEPRECATED')
parser.add_option('--twopass', dest='twopass', action='store_true', help='DEPRECATED')
parser.add_option('--onepass', dest='twopass', action='store_false', help='the opposite of twopass. default=False')
parser.add_option('--twopass', dest='twopass', action='store_true',
help='Calculate the bkg and rms in a two passes instead of one. (when the bkg changes rapidly)')
parser.add_option('--nomask', dest='mask', action='store_false', default=True,
help="Don't mask the output array [default = mask]")
parser.add_option('--noclobber', dest='clobber', action='store_false', default=True,
Expand All @@ -39,7 +40,7 @@ if __name__=="__main__":
parser.add_option('--stripes', dest='stripes', type='int', nargs=1, default=None,
help='Number of slices.')

parser.set_defaults(out_base=None, step_size=None, box_size=None, twopass=False, cores=None, usescipy=False, debug=False)
parser.set_defaults(out_base=None, step_size=None, box_size=None, twopass=True, cores=None, usescipy=False, debug=False)

(options, args) = parser.parse_args()

Expand All @@ -64,9 +65,6 @@ if __name__=="__main__":
if options.out_base is None:
options.out_base = os.path.splitext(filename)[0]

if options.twopass:
logging.warn("--onepass and --twopass have been deprecated and are ignored, and will be removed in a future release.")

if not options.clobber:
bkgout, rmsout = options.out_base+'_bkg.fits', options.out_base+'_rms.fits'
if os.path.exists(bkgout) and os.path.exists(rmsout):
Expand All @@ -75,6 +73,6 @@ if __name__=="__main__":
sys.exit(1)

BANE.filter_image(im_name=filename, out_base=options.out_base, step_size=options.step_size,
box_size=options.box_size, cores=options.cores,
box_size=options.box_size, twopass=options.twopass, cores=options.cores,
mask=options.mask, compressed=options.compress, nslice=options.stripes)

File renamed without changes.

0 comments on commit dfea5f3

Please sign in to comment.