Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stage to make source maps from binned shear catalog instead of doing calibration internally #320

Merged
merged 17 commits into from
Dec 11, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
276 changes: 134 additions & 142 deletions txpipe/maps.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
SHEAR_POS = 1
POS_POS = 2

NON_TOMO_BIN = 999999

# These generic mapping options are used by multiple different
# map types.
# TODO: consider dropping support for gnomonic maps.
Expand Down Expand Up @@ -140,167 +142,158 @@ def save_maps(self, pixel_scheme, maps):
output_files[tag].write_map(map_name, pix, map_data, self.pixel_metadata)


class TXSourceMaps(TXBaseMaps):
"""
Make tomographic shear maps

Make g1, g2, var(g1), var(g2), and lensing weight maps
from shear catalogs and tomography.
class TXSourceMaps(PipelineStage):
"""Generate source maps directly from binned, calibrated shear catalogs.

Should be replaced to use the binned_shear_catalog since that's
calibrated already.
This implementation uses DASK, which offers a numpy-like syntax and
hides the complicated parallelization details.

"""

name = "TXSourceMaps"
dask_parallel = True

inputs = [
("shear_catalog", ShearCatalog),
("shear_tomography_catalog", TomographyCatalog),
("binned_shear_catalog", HDFFile),
]

outputs = [
("source_maps", MapsFile),
]

# Generic mapping options + one option
# to use the truth shear columns
config_options = {"true_shear": False, **map_config_options}

def prepare_mappers(self, pixel_scheme):
# read shear cols and calibration info
nbin_source, cal = self.get_calibrators()

# store in config so it is saved later
self.config["nbin_source"] = nbin_source
# create basic mapper object
source_bins = list(range(nbin_source))
mapper = ShearMapper(
pixel_scheme,
source_bins,
sparse=self.config["sparse"],
)
return [mapper, cal]
config_options = {
"block_size": 0,
**map_config_options
}

def get_calibrators(self):
shear_catalog_type = read_shear_catalog_type(self)
cal, cal2D = Calibrator.load(self.get_input("shear_tomography_catalog"))
nbin_source = len(cal)
return nbin_source, [cal, cal2D]

def data_iterator(self):

# can optionally read truth values
with self.open_input("shear_catalog", wrapper=True) as f:
shear_cols, renames = f.get_primary_catalog_names(self.config["true_shear"])

# use utility function that combines data chunks
# from different files. Reading from n file sections
# takes 3n+1 arguments
it = self.combined_iterators(
self.config["chunk_rows"], # number of rows to iterate at once
# first file info
"shear_catalog", # tag of input file to iterate through
"shear", # data group within file to look at
shear_cols, # column(s) to read
# next file
"shear_tomography_catalog", # tag of input file to iterate through
"tomography", # data group within file to look at
["bin"], # column(s) to read
)
return rename_iterated(it, renames)

def accumulate_maps(self, pixel_scheme, data, mappers):
mapper = mappers[0]
mapper.add_data(data)

def calibrate_maps(self, g1, g2, var_g1, var_g2, cals):
def run(self):
import dask
import dask.array as da
import healpy

# We will return lists of calibrated maps
g1_out = {}
g2_out = {}
var_g1_out = {}
var_g2_out = {}

cals, cal_2D = cals

# We calibrate the 2D case separately
for i in g1.keys():
# we want to avoid accidentally calibrating any pixels
# that should be masked.
mask = (
(g1[i] == healpy.UNSEEN)
| (g2[i] == healpy.UNSEEN)
| (var_g1[i] == healpy.UNSEEN)
| (var_g2[i] == healpy.UNSEEN)
)
if i == "2D":
cal = cal_2D
else:
cal = cals[i]
g1_out[i], g2_out[i] = cal.apply(g1[i], g2[i])
std1 = np.sqrt(var_g1[i])
std2 = np.sqrt(var_g2[i])
std1, std2 = cal.apply(std1, std2, subtract_mean=False)
var_g1_out[i] = std1**2
var_g2_out[i] = std2**2

# re-apply the masking, just to make sure
for x in [g1_out[i], g2_out[i], var_g1_out[i], var_g2_out[i]]:
x[mask] = healpy.UNSEEN

return g1_out, g2_out, var_g1_out, var_g2_out

def finalize_mappers(self, pixel_scheme, mappers):
# only one mapper here - we call its finalize method
# to collect everything
mapper, cal = mappers
pix, counts, g1, g2, var_g1, var_g2, weights_g, esq = mapper.finalize(self.comm)
# Configuration options
pixel_scheme = choose_pixelization(**self.config)
nside = self.config["nside"]
npix = healpy.nside2npix(nside)
block_size = self.config["block_size"]
if block_size == 0:
block_size = "auto"

# We have to keep this open throughout the process, because
# dask will internally load chunks of the input hdf5 data.
f = self.open_input("binned_shear_catalog")
nbin = f['shear'].attrs['nbin_source']

# The "all" bin is the non-tomographic case.
bins = list(range(nbin)) + ["all"]
output = {}

for i in bins:
# These don't actually load all the data - everything is lazy
ra = da.from_array(f[f"shear/bin_{i}/ra"], block_size)
dec = da.from_array(f[f"shear/bin_{i}/dec"], block_size)
g1 = da.from_array(f[f"shear/bin_{i}/g1"], block_size)
g2 = da.from_array(f[f"shear/bin_{i}/g2"], block_size)
weight = da.from_array(f[f"shear/bin_{i}/weight"], block_size)

# This seems to work directly, but we should check performance
pix = pixel_scheme.ang2pix(ra, dec)

# count map is just the number of galaxies per pixel
count_map = da.bincount(pix, minlength=npix)

# For the other map we use bincount with weights - these are the
# various maps by pixel. bincount gives the number of objects in each
# vaue of the first argument, weighted by the weights keyword, so effectively
# it gives us
# p_i = sum_{j} x[j] * delta_{pix[j], i}
# which is out map
weight_map = da.bincount(pix, weights=weight, minlength=npix)
g1_map = da.bincount(pix, weights=weight * g1, minlength=npix)
g2_map = da.bincount(pix, weights=weight * g2, minlength=npix)
esq_map = da.bincount(pix, weights=weight**2 * 0.5 * (g1**2 + g2**2), minlength=npix)

# normalize by weights to get the mean map value in each pixel
g1_map /= weight_map
g2_map /= weight_map

# Generate a catalog-like vector of the means so we can
# subtract from the full catalog. Not sure if this ever actually gets
# created, or if dask just keeps a conceptual reference to it.
g1_mean = g1_map[pix]
g2_mean = g2_map[pix]

# Also generate variance maps
var1_map = da.bincount(pix, weights=weight * (g1 - g1_mean)**2, minlength=npix)
var2_map = da.bincount(pix, weights=weight * (g2 - g2_mean)**2, minlength=npix)

# we want the variance on the mean, so we divide by both the weight
# (to go from the sum to the variance) and then by the count (to get the
# variance on the mean). Have verified that this is the same as using
# var() on the original arrays.
var1_map /= (weight_map * count_map)
var2_map /= (weight_map * count_map)

# slight change in output name
if i == "all":
i = "2D"

# replace nans with UNSEEN. The NaNs can occur if there are no objects
# in a pixel, so the value is undefined.
g1_map[da.isnan(g1_map)] = healpy.UNSEEN
g2_map[da.isnan(g2_map)] = healpy.UNSEEN
var1_map[da.isnan(var1_map)] = healpy.UNSEEN
var2_map[da.isnan(var2_map)] = healpy.UNSEEN
esq_map[da.isnan(esq_map)] = healpy.UNSEEN

# Save all the stuff we want here.
output[f"count_{i}"] = count_map
output[f"g1_{i}"] = g1_map
output[f"g2_{i}"] = g2_map
output[f"lensing_weight_{i}"] = weight_map
output[f"count_{i}"] = count_map
output[f"var_e_{i}"] = esq_map
output[f"var_g1_{i}"] = var1_map
output[f"var_g2_{i}"] = var2_map

# mask is where a pixel is hit in any of the tomo bins
mask = da.zeros(npix, dtype=bool)
for i in bins:
if i == "all":
i = "2D"
mask |= output[f"lensing_weight_{i}"] > 0

output["mask"] = mask

# Everything above is lazy - this forces the computation.
# It works out an efficient (we hope) way of doing everything in parallel
output, = dask.compute(output)
f.close()

# collate metadata
metadata = {
key: self.config[key]
for key in map_config_options
}
metadata['nbin'] = nbin
metadata['nbin_source'] = nbin

# build up output
maps = {}
pix = np.where(output["mask"])[0]

# only master gets full stuff
if self.rank != 0:
return maps
# write the output maps
with self.open_output("source_maps", wrapper=True) as out:
for i in bins:
# again rename "all" to "2D"
if i == "all":
i = "2D"

# Calibrate the maps
g1, g2, var_g1, var_g2 = self.calibrate_maps(g1, g2, var_g1, var_g2, cal)
# We save the pixels in the mask - i.g. any pixel that is hit in any
# tomographic bin is included. Some will be UNSEEN.
for key in "g1", "g2", "count", "var_e", "var_g1", "var_g2", "lensing_weight":
out.write_map(f"{key}_{i}", pix, output[f"{key}_{i}"][pix], metadata)

for b in mapper.bins:
# keys are the output tag and the map name
maps["source_maps", f"g1_{b}"] = (pix, g1[b])
maps["source_maps", f"g2_{b}"] = (pix, g2[b])
maps["source_maps", f"var_g1_{b}"] = (pix, var_g1[b])
maps["source_maps", f"var_g2_{b}"] = (pix, var_g2[b])
maps["source_maps", f"lensing_weight_{b}"] = (pix, weights_g[b])
maps["source_maps", f"count_{b}"] = (pix, counts[b])
# added from HSC branch, to get analytic noise in twopoint_fourier
out_e = np.zeros_like(esq[b])
out_e[esq[b] > 0] = esq[b][esq[b] > 0]


# calibrate the esq value - this is hacky for now!
shear_catalog_type = read_shear_catalog_type(self)
if (shear_catalog_type == "metadetect") or (shear_catalog_type == "metacal"):
print("DOING HACKY CAL OF VAR(e)")
cal_1D, cal_2D = cal
if b == "2D":
c = cal_2D
else:
c = cal_1D[b]
Rinv_approx = c.Rinv.diagonal().mean()
esq[b] *= Rinv_approx**2

# replace saved quantity
out_e = np.zeros_like(esq[b])
out_e[esq[b] > 0] = esq[b][esq[b] > 0]
else:
print("VAR(e) IS WRONG!")

maps["source_maps", f"var_e_{b}"] = (pix, out_e)
out.file['maps'].attrs.update(metadata)

return maps


class TXLensMaps(TXBaseMaps):
Expand Down Expand Up @@ -342,7 +335,6 @@ def prepare_mappers(self, pixel_scheme):
return [mapper]

def data_iterator(self):
print("TODO: add use of lens weights here")
# see TXSourceMaps abov for info on this
return self.combined_iterators(
self.config["chunk_rows"],
Expand Down
Loading