In [2]:
import nbodykit
from nbodykit.lab import BigFileCatalog
from nbodykit.transform import ConcatenateSources, CartesianToEquatorial
from nbodykit.cosmology import Planck15
import numpy
import bigfile

from mpi4py import MPI
nbodykit.setup_logging()
nbodykit.set_options(dask_chunk_size=1024 * 1024)
nbodykit.set_options(global_cache_size=0)

from nbodykit.utils import DistributedArray, GatherArray
import dask.array as da

In [3]:
def inv_sigma(ds, dl, zl):
    ddls = 1 - numpy.multiply.outer(1 / ds, dl)
    ddls = ddls.clip(0)
    w = (100. / 3e5) ** 2 * (1 + zl)* dl
    inv_sigma_c = (ddls * w)
    return inv_sigma_c

In [4]:
def wlen(Om, dl, zl, ds, Nzs=1):
    """
        Parameters
        ----------
        dl, zl: distance and redshift of lensing objects
        
        ds: distance source plane bins. if a single scalar, do a delta function bin.
        
        Nzs : number of objects in each ds bin. len(ds) - 1 items
        
    """
    ds = numpy.atleast_1d(ds) # promote to 1d, sum will get rid of it
    integrand = 1.5 * Om * Nzs * inv_sigma(ds, dl, zl)
    Ntot = numpy.sum(Nzs)
    w_lensing = numpy.sum(integrand, axis=0) / Ntot
    
    return w_lensing

In [5]:
def weighted_map(ipix, npix, weights, localsize, comm):
    """ Make a map from particles, for quantities like
    
       W(t) = \int dx delta(t, x) w
       
       Parameters
       ----------
       ipix: array_like
     
       weights : array_like
    
       Returns
       -------
       Wmap, Nmap; distributed maps
       
       Wmap is the weighted map. Nmap is the number of objects
    """

    ipix, labels = numpy.unique(ipix, return_inverse=True)
    N = numpy.bincount(labels)
    weights = numpy.bincount(labels, weights)
    #print("shrink to %d from %d" % (len(ipix), len(labels)))

    del labels
 
    pairs = numpy.empty(len(ipix) + 1, dtype=[('ipix', 'i4'), ('N', 'i4'), ('weights', 'f8') ])
    pairs['ipix'][:-1] = ipix
    pairs['weights'][:-1] = weights
    pairs['N'][:-1] = N

    pairs['ipix'][-1] = npix - 1 # trick to make sure the final length is correct.
    pairs['weights'][-1] = 0
    pairs['N'][-1] = 0

    disa = DistributedArray(pairs, comm=comm)
    disa.sort('ipix')

    w = disa['ipix'].bincount(weights=disa['weights'].local, local=False, shared_edges=False)
    N = disa['ipix'].bincount(weights=disa['N'].local, local=False, shared_edges=False)

    if npix - w.cshape[0] != 0:
        if comm.rank == 0:
            print('padding -- this shouldnt have occured ', npix, w.cshape)
        # pad with zeros, since the last few bins can be empty.
        ipadding = DistributedArray.cempty((npix - w.cshape[0],), dtype='i4', comm=comm)
        fpadding = DistributedArray.cempty((npix - w.cshape[0],), dtype='f8', comm=comm)

        fpadding.local[:] = 0
        ipadding.local[:] = 0

        w = DistributedArray.concat(w, fpadding)
        N = DistributedArray.concat(N, ipadding)

    w = DistributedArray.concat(w, localsize=localsize)
    N = DistributedArray.concat(N, localsize=localsize)

    return w.local, N.local

In [33]:
def read_range(cat, amin, amax):
    """ Read a portion of the lightcone between two red shift ranges

        The lightcone from FastPM is sorted in Aemit and an index is built.
        So we make use of that.

        CrowCanyon is z > 0; We paste the mirror image to form a full sky.
    """
    edges = cat.attrs['aemitIndex.edges']
    offsets = cat.attrs['aemitIndex.offset']
    start, end = edges.searchsorted([amin, amax])
    if cat.comm.rank == 0:
        cat.logger.info("Range of index is %d to %d" %(( start + 1, end + 1)))
    start = offsets[start + 1][0]
    end = offsets[end + 1][0]
    cat =  cat.query_range(start, end)
    cat1 = cat.copy()
    cat1['Position'] = cat1['Position'] * [1, 1, -1.]
    cat3 = ConcatenateSources(cat, cat1)
    if cat1.csize > 0:
        cat3['RA'], cat3['DEC'] = CartesianToEquatorial(cat3['Position'], frame='galactic')
    else:
        cat3['RA'] = 0
        cat3['DEC'] = 0
    return cat3

In [38]:
import healpy as healpix
def make_kappa_maps(cat, nside, zs_list, ds_list, localsize, nbar):
    """ Make kappa maps at a list of ds

        Return kappa, Nm in shape of (n_ds, localsize), kappabar in shape of (n_ds,)

        The maps are distributed in memory, and localsize is the size of
        map on this rank.
    """

    dl = (abs(cat['Position'] **2).sum(axis=-1)) ** 0.5
    chunks = dl.chunks
    ra = cat['RA']
    dec = cat['DEC']
    zl = (1 / cat['Aemit'] - 1)
    
    ipix = da.apply_gufunc(lambda ra, dec, nside:
                           healpix.ang2pix(nside, numpy.radians(90 - dec), numpy.radians(ra)),
                        '(),()->()', ra, dec, nside=nside)

    npix = healpix.nside2npix(nside)

    ipix = ipix.compute()
    dl = dl.persist()
 
    cat.comm.barrier()

    if cat.comm.rank == 0:
        cat.logger.info("ipix and dl are persisted")

    area = (4 * numpy.pi / npix) * dl**2

    Om = cat.attrs['OmegaM'][0]
    
    kappa_list = []
    kappabar_list = []
    Nm_list = []
    for zs, ds in zip(zs_list, ds_list):
        LensKernel = da.apply_gufunc(lambda dl, zl, Om, ds: wlen(Om, dl, zl, ds), 
                                     "(), ()-> ()",
                                     dl, zl, Om=Om, ds=ds)

        weights = (LensKernel / (area * nbar))
        weights = weights.compute()

        cat.comm.barrier()

        if cat.comm.rank == 0:
            cat.logger.info("source plane %g weights are persisted" % zs)
        Wmap, Nmap = weighted_map(ipix, npix, weights, localsize, cat.comm)

        cat.comm.barrier()
        if cat.comm.rank == 0:
            cat.logger.info("source plane %g maps generated" % zs)

        # compute kappa bar
        # this is a simple integral, but we do not know dl, dz relation
        # so do it with values from a subsample of particles
        every = (cat.csize // 100000)
        
        kappa1 = Wmap
        if every == 0: every = 1

        # use GatherArray, because it is faster than comm.gather at this scale
        # (> 4000 ranks on CrayMPI)
        ssdl = GatherArray(dl[::every].compute(), cat.comm)
        ssLensKernel = GatherArray(LensKernel[::every].compute(), cat.comm)

        if cat.comm.rank == 0:
            arg = ssdl.argsort()
            ssdl = ssdl[arg]
            ssLensKernel = ssLensKernel[arg]
            
            kappa1bar = numpy.trapz(ssLensKernel, ssdl)
        else:
            kappa1bar = None
        kappa1bar = cat.comm.bcast(kappa1bar)

        cat.comm.barrier()
        if cat.comm.rank == 0:
            cat.logger.info("source plane %g bar computed " % zs)
        kappa_list.append(kappa1)
        kappabar_list.append(kappa1bar)
        Nm_list.append(Nmap)
    """
    # estimate nbar
    dlmin = dl.min()
    dlmax = dl.max()
        
    volume = (Nmap > 0).sum() / len(Nmap) * 4  / 3 * numpy.pi * (dlmax**3 - dlmin ** 3)
    """
    # returns number rather than delta, since we do not know fsky here.
    #Nmap = Nmap / cat.csize * cat.comm.allreduce((Nmap > 0).sum()) # to overdensity.
    return numpy.array(kappa_list), numpy.array(kappabar_list), numpy.array(Nm_list)

In [46]:
zlmin = 2.0
zlmax = 2.2
zstep = 0.1
zs_list = numpy.arange(zlmin, zlmax, zstep)

# no need to be accurate here
ds_list = Planck15.comoving_distance(zs_list)

basedir = '/lustre/work/akira.tokiwa/globus/fastpm/rfof'
tiled = 'rfof_proc4096_nc1024_size625_nsteps60lin_ldr0_rcvtrue_fstnone_pnf2_lnf2_s100_dhf1.0000_tiled0.20_fll_elllim_10000_npix_8192_rfofkdt_8_LCDM_10tiled'
path = f'{basedir}/{tiled}/usmesh'
testpath = '/lustre/work/akira.tokiwa/SRsample/HR/rfof_proc128_nc256_size625_nsteps60lin_ldr0_rcvnil_fstnone_pnf2_lnf2_s200_dhf1.0000_tiled0.20_fll_elllim_10000_npix_512_rfofkdt_8/usmesh/'
dataset = '1/'
#'/global/cscratch1/sd/yfeng1/m3127/desi/1536-9201-40eae2464/lightcone/usmesh/'

cat = BigFileCatalog(testpath, dataset=dataset)

kappa = 0
Nm = 0
kappabar = 0

nside = 8192
npix = 12 * nside **2#healpix.nside2npix(nside)
localsize = npix * (cat.comm.rank + 1) // cat.comm.size - npix * (cat.comm.rank) // cat.comm.size
nbar = (cat.attrs['NC'] ** 3  / cat.attrs['BoxSize'] ** 3 * cat.attrs['ParticleFraction'])[0]

Nsteps = int(numpy.round((zlmax - zlmin) / zstep))
if Nsteps < 2 : Nsteps = 2
z = numpy.linspace(zlmax, zlmin, Nsteps, endpoint=True)

if cat.comm.rank == 0:
    cat.logger.info("Splitting data redshift bins %s" % str(z))

  return pyxbigfile.Dataset.__init__(self, file, dtype=dtype, size=size)
[ 005587.60 ]   0: 02-01 22:46  CatalogSource   INFO     Extra arguments to FileType: () {'dataset': '1/'}
[ 005587.60 ]   0: 02-01 22:46  CatalogSource   INFO     Splitting data redshift bins [2.2 2. ]


In [48]:
z1 = z[:-1]
z2 = z[1:]
amin = 1 / (1 + z1)
amax = 1 / (1 + z2)

In [49]:
import gc
gc.collect()
if cat.comm.rank == 0:
    cat.logger.info("nbar = %g, zlmin = %g, zlmax = %g zs = %s" % (nbar, z2, z1, zs_list))

slice = read_range(cat, 1/(1 + z1), 1 / (1 + z2))

if cat.comm.rank == 0:
    cat.logger.info("read %d particles" % slice.csize)

[ 005746.78 ]   0: 02-01 22:49  CatalogSource   INFO     nbar = 0.0687195, zlmin = 2, zlmax = 2.2 zs = [2.  2.1 2.2]
[ 005746.78 ]   0: 02-01 22:49  CatalogSource   INFO     Range of index is 33 to 35
  return pyxbigfile.Dataset.__init__(self, file, dtype=dtype, size=size)
[ 005747.10 ]   0: 02-01 22:49  CatalogSource   INFO     read 0 particles


In [50]:
kappa1, kappa1bar, Nm1  = make_kappa_maps(slice, nside, zs_list, ds_list, localsize, nbar)

AxisError: axis 1 is out of bounds for array of dimension 1

In [None]:
kappa = kappa + kappa1
Nm = Nm + Nm1
kappabar = kappabar + kappa1bar