diff --git a/nbodykit/tests/test_transform.py b/nbodykit/tests/test_transform.py new file mode 100644 index 000000000..7ae0831a6 --- /dev/null +++ b/nbodykit/tests/test_transform.py @@ -0,0 +1,32 @@ +from runtests.mpi import MPITest +from nbodykit.lab import * +from nbodykit import setup_logging + +import pytest + +# debug logging +setup_logging("debug") + +@MPITest([1, 4]) +def test_concatenate(comm): + CurrentMPIComm.set(comm) + + # make two sources + s1 = UniformCatalog(3e-6, 2600) + s2 = UniformCatalog(3e-6, 2600) + + # concatenate all columns + cat = transform.concatenate(s1, s2) + + # check the size and columns + assert cat.size == s1.size + s2.size + assert set(cat.columns) == set(s1.columns) + + # only one column + cat = transform.concatenate(s1, s2, columns='Position') + pos = numpy.concatenate([s1['Position'], s2['Position']], axis=0) + numpy.testing.assert_array_equal(pos, cat['Position']) + + # fail on invalid column + with pytest.raises(ValueError): + cat = transform.concatenate(s1, s2, columns='InvalidColumn') diff --git a/nbodykit/transform.py b/nbodykit/transform.py index 3252231fd..2d04810ce 100644 --- a/nbodykit/transform.py +++ b/nbodykit/transform.py @@ -1,16 +1,19 @@ import numpy import dask.array as da import dask +from six import string_types -def StackColumns(*cols): +def vstack(*cols): """ Stack the input dask arrays vertically, column by column - + + This uses :func:`dask.array.vstack` + Parameters ---------- cols : dask arrays the dask arrays to stack vertically - + Returns ------- dask.array.Array @@ -19,17 +22,63 @@ def StackColumns(*cols): """ if not all(isinstance(col, da.Array) for col in cols): raise TypeError("all input columns in `StackColumns` must be dask arrays") - + return da.vstack(cols).T - + +def concatenate(*sources, columns=None): + """ + Concatenate Source objects together, optionally including only + certain columns in the returned source + + Parameters + ---------- + *sources : CatalogSource + the catalogs to concatenate + columns : str, list of str; optional + the columns to include in the concatenated catalog + + Returns + ------- + CatalogSource : + the concatenated catalog source + """ + # FIXME: new name for CatalogSubset? + from nbodykit.base.catalog import CatalogSubset + + if isinstance(columns, string_types): + columns = [columns] + + # concatenate all columns, if none provided + if columns is None or columns == []: + columns = sources[0].columns + + # check comms + if not all(src.comm == sources[0].comm for src in sources): + raise ValueError("cannot concatenate sources: comm mismatch") + + # check all columns are there + for source in sources: + if not all(col in source for col in columns): + raise ValueError(("cannot concatenate sources: columns are missing " + "from some sources")) + # the total size + size = sum(src.size for src in sources) + + data = {} + for col in columns: + data[col] = da.concatenate([src[col] for src in sources], axis=0) + + return CatalogSubset(size, sources[0].comm, use_cache=sources[0].use_cache, **data) + + def ConstantArray(value, size, chunks=100000): """ - Return a dask array of the specified ``size`` holding a + Return a dask array of the specified ``size`` holding a single value - - This uses numpy's "stride tricks" to avoid replicating + + This uses numpy's "stride tricks" to avoid replicating the data in memory for each element of the array. - + Parameters ---------- value : float @@ -39,47 +88,47 @@ def ConstantArray(value, size, chunks=100000): chunks : int; optional the size of the dask array chunks """ - toret = numpy.array(value) + toret = numpy.array(value) toret = numpy.lib.stride_tricks.as_strided(toret, (size, toret.size), (0, toret.itemsize)) return da.from_array(toret.squeeze(), chunks=chunks) - + def SkyToUnitSphere(ra, dec, degrees=True): """ - Convert sky coordinates (ra, dec) coordinates to + Convert sky coordinates (ra, dec) coordinates to cartesian coordinates on the unit sphere - + Parameters ----------- ra, dec : dask.array, (N,) the input sky coordinates giving (ra, dec) degrees : bool, optional specifies whether ``ra`` and ``dec`` are in degrees - + Returns ------- pos : dask.array, (N,3) - the cartesian position coordinates, where columns represent + the cartesian position coordinates, where columns represent ``x``, ``y``, and ``z`` - """ + """ # put into radians from degrees if degrees: ra = da.deg2rad(ra) dec = da.deg2rad(dec) - + # cartesian coordinates x = da.cos( dec ) * da.cos( ra ) y = da.cos( dec ) * da.sin( ra ) - z = da.sin( dec ) + z = da.sin( dec ) return da.vstack([x,y,z]).T - + def SkyToCartesion(ra, dec, redshift, cosmo, degrees=True, interpolate_cdist=True): """ - Convert sky coordinates (ra, dec, redshift) coordinates to - cartesian coordinates, scaled to the comoving distance if `unit_sphere = False`, + Convert sky coordinates (ra, dec, redshift) coordinates to + cartesian coordinates, scaled to the comoving distance if `unit_sphere = False`, else on the unit sphere - - + + Parameters ----------- ra, dec, redshift : dask.array, (N,) @@ -91,33 +140,33 @@ def SkyToCartesion(ra, dec, redshift, cosmo, degrees=True, interpolate_cdist=Tru interpolate_cdist : bool, optional if ``True``, interpolate the comoving distance as a function of redshift before evaluating the full results; can lead to significant speed improvements - + Returns ------- pos : dask.array, (N,3) - the cartesian position coordinates, where columns represent + the cartesian position coordinates, where columns represent ``x``, ``y``, and ``z`` """ # pos on the unit sphere pos = SkyToUnitSphere(ra, dec, degrees=degrees) - + # multiply by the comoving distance in Mpc/h if interpolate_cdist: comoving_distance = cosmo.comoving_distance.fit('z', bins=numpy.logspace(-5, 1, 1024)) else: comoving_distance = cosmo.comoving_distance r = redshift.map_blocks(lambda z: comoving_distance(z).value * cosmo.h, dtype=redshift.dtype) - + return r[:,None] * pos - + def HaloConcentration(mass, cosmo, redshift, mdef='vir'): """ Return halo concentration from halo mass, based on the analytic fitting formulas presented in Dutton and Maccio 2014 (arxiv:1402.7073) - - .. note:: + + .. note:: The units of the input mass are assumed to be :math:`M_{\odot}/h` - + Parameters ---------- mass : array_like @@ -128,44 +177,44 @@ def HaloConcentration(mass, cosmo, redshift, mdef='vir'): redshift : float compute the c(M) relation at this redshift mdef : str; optional - string specifying the halo mass definition to use; should be - 'vir' or 'XXXc' or 'XXXm' where 'XXX' is an int specifying the + string specifying the halo mass definition to use; should be + 'vir' or 'XXXc' or 'XXXm' where 'XXX' is an int specifying the overdensity - + Returns ------- concen : dask.array.Array a dask array holding the analytic concentration - + References ---------- - Dutton and Maccio, "Cold dark matter haloes in the Planck era: evolution + Dutton and Maccio, "Cold dark matter haloes in the Planck era: evolution of structural parameters for Einasto and NFW profiles", 2014, arxiv:1402.7073 """ from halotools.empirical_models import ConcMass - + if not isinstance(mass, da.Array): mass = da.from_array(mass, chunks=100000) - + # initialize the model kws = {'cosmology':cosmo.engine, 'conc_mass_model':'dutton_maccio14', 'mdef':mdef, 'redshift':redshift} model = ConcMass(**kws) - + return mass.map_blocks(lambda mass: model.compute_concentration(prim_haloprop=mass), dtype=mass.dtype) - + def HaloRadius(mass, cosmo, redshift, mdef='vir'): """ Return halo radius from halo mass, based on the specified mass definition. This is independent of halo profile, and simply returns: - + ``radius = (mass * 3.0 / 4.0 / pi / rho)**(1.0 / 3.0)`` - - where ``rho`` is the density threshold, which depends on cosmology, + + where ``rho`` is the density threshold, which depends on cosmology, redshift, and mass definition - - .. note:: + + .. note:: The units of the input mass are assumed to be :math:`M_{\odot}/h` - + Parameters ---------- mass : array_like @@ -174,22 +223,22 @@ def HaloRadius(mass, cosmo, redshift, mdef='vir'): cosmo : nbodykit.cosmology.Cosmology the cosmology instance redshift : float - compute the density threshold which determines the R(M) relation + compute the density threshold which determines the R(M) relation at this redshift mdef : str; optional - string specifying the halo mass definition to use; should be - 'vir' or 'XXXc' or 'XXXm' where 'XXX' is an int specifying the + string specifying the halo mass definition to use; should be + 'vir' or 'XXXc' or 'XXXm' where 'XXX' is an int specifying the overdensity - + Returns ------- radius : dask.array.Array a dask array holding the halo radius """ from halotools.empirical_models import profile_helpers - + if not isinstance(mass, da.Array): mass = da.from_array(mass, chunks=100000) kws = {'cosmology':cosmo.engine, 'mdef':mdef, 'redshift':redshift} - return mass.map_blocks(lambda mass: profile_helpers.halo_mass_to_halo_radius(mass=mass, **kws), dtype=mass.dtype) \ No newline at end of file + return mass.map_blocks(lambda mass: profile_helpers.halo_mass_to_halo_radius(mass=mass, **kws), dtype=mass.dtype)