diff --git a/nbodykit/algorithms/fftpower.py b/nbodykit/algorithms/fftpower.py index 2a061f5a9..a48c31505 100644 --- a/nbodykit/algorithms/fftpower.py +++ b/nbodykit/algorithms/fftpower.py @@ -54,13 +54,13 @@ def __init__(self, first, mode, Nmesh=None, BoxSize=None, second=None, los=[0, 0 ---------- comm : the MPI communicator - first : ParticleSource, MeshSource - the source for the first field. ParticleSource is automatically converted + first : CatalogSource, MeshSource + the source for the first field. CatalogSource is automatically converted to MeshSource mode : {'1d', '2d'} compute either 1d or 2d power spectra Nmesh : int the number of cells per side in the particle mesh used to paint the source - second : ParticleSource, MeshSource; optional + second : CatalogSource, MeshSource; optional the second source for cross-correlations los : array_like ; optional the direction to use as the line-of-sight @@ -86,7 +86,7 @@ def __init__(self, first, mode, Nmesh=None, BoxSize=None, second=None, los=[0, 0 # grab comm from first source self.comm = first.comm - # if input is ParticleSource, use defaults to make it into a mesh + # if input is CatalogSource, use defaults to make it into a mesh if not hasattr(first, 'paint'): first = first.to_mesh(BoxSize=BoxSize, Nmesh=Nmesh, dtype='f8', compensated=True) @@ -297,7 +297,7 @@ def _compute_3d_power(self): Parameters ---------- - sources : list of ParticleSource or MeshSource + sources : list of CatalogSource or MeshSource the list of sources which the 3D power will be computed pm : ParticleMesh the particle mesh object that handles the painting and FFTs diff --git a/nbodykit/algorithms/fibercollisions.py b/nbodykit/algorithms/fibercollisions.py index ca404201e..0d8d19763 100644 --- a/nbodykit/algorithms/fibercollisions.py +++ b/nbodykit/algorithms/fibercollisions.py @@ -1,6 +1,6 @@ from nbodykit import CurrentMPIComm -from nbodykit.base.particles import ParticleSource -from nbodykit.source import Array +from nbodykit.base.catalog import CatalogSource +from nbodykit.source import ArrayCatalog from nbodykit.transform import SkyToUnitSphere import numpy @@ -41,14 +41,14 @@ def __init__(self, ra, dec, collision_radius=62/60./60., seed=None, degrees=True the random seed """ # compute the pos - ra = ParticleSource.make_column(ra) - dec = ParticleSource.make_column(dec) + ra = CatalogSource.make_column(ra) + dec = CatalogSource.make_column(dec) pos = SkyToUnitSphere(ra, dec, degrees=degrees).compute() # make the source dt = numpy.dtype([('Position', (pos.dtype.str, 3))]) pos = numpy.squeeze(pos.view(dtype=dt)) - source = Array(pos, BoxSize=numpy.array([2., 2., 2.])) + source = ArrayCatalog(pos, BoxSize=numpy.array([2., 2., 2.])) self.source = source self.comm = source.comm @@ -116,7 +116,7 @@ def run(self): for col, x in d: result[col] = x # make a particle source - self.labels = Array(result, comm=self.comm, **self.source.attrs) + self.labels = ArrayCatalog(result, comm=self.comm, **self.source.attrs) def _assign_fibers(self, Label): """ diff --git a/nbodykit/algorithms/fof.py b/nbodykit/algorithms/fof.py index 753676a78..0ae4fbc25 100644 --- a/nbodykit/algorithms/fof.py +++ b/nbodykit/algorithms/fof.py @@ -3,7 +3,7 @@ import numpy import logging from mpi4py import MPI -from nbodykit.source import Array +from nbodykit.source import ArrayCatalog class FOF(object): """ @@ -25,7 +25,7 @@ def __init__(self, source, linking_length, nmin, absolute=False): """ Parameters ---------- - source : ParticleSource + source : CatalogSource the source to run the FOF algorithm on; must support 'Position' linking_length : float the linking length, either in absolute units, or relative @@ -84,7 +84,7 @@ def find_features(self): Returns ------- - ParticleSource : + CatalogSource : a source holding the ('Position', 'Velocity', 'Length') of each halo """ @@ -94,7 +94,7 @@ def find_features(self): # return a Source attrs = self._source.attrs.copy() attrs.update(self.attrs) - return Array(halos, comm=self.comm, **attrs) + return ArrayCatalog(halos, comm=self.comm, **attrs) def to_halos(self, particle_mass, cosmo, redshift, mdef='vir'): """ @@ -107,7 +107,7 @@ def to_halos(self, particle_mass, cosmo, redshift, mdef='vir'): Parameters ---------- - source : ParticleSource + source : CatalogSource the source containing info about the particles in each halo particle_mass : float the particle mass, used to compute the number of particles in @@ -136,7 +136,7 @@ def to_halos(self, particle_mass, cosmo, redshift, mdef='vir'): # the center-of-mass (Position, Velocity, Length) for each halo data = fof_catalog(self._source, self.labels, self.comm) data = data[data['Length'] > 0] - halos = Array(data, **attrs) + halos = ArrayCatalog(data, **attrs) # add the halo mass column halos['Mass'] = particle_mass * halos['Length'] @@ -291,7 +291,7 @@ def fof(source, linking_length, comm): Parameters ---------- - source: ParticleSource + source: CatalogSource the input source of particles; must support 'Position' column; ``source.attrs['BoxSize']`` is also used linking_length: float @@ -345,7 +345,7 @@ def fof_catalog(source, label, comm, Parameters ---------- - source: ParticleSource + source: CatalogSource the parent source of particles from which the center-of-mass position and velocity are computed for each halo label : array_like diff --git a/nbodykit/algorithms/tests/__init__.py b/nbodykit/algorithms/tests/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/nbodykit/tests/test_conv_power.py b/nbodykit/algorithms/tests/test_conv_power.py similarity index 90% rename from nbodykit/tests/test_conv_power.py rename to nbodykit/algorithms/tests/test_conv_power.py index 1d3f12371..fdc9d6d23 100644 --- a/nbodykit/tests/test_conv_power.py +++ b/nbodykit/algorithms/tests/test_conv_power.py @@ -17,8 +17,8 @@ def test_selection(comm): CurrentMPIComm.set(comm) cosmo = cosmology.Planck15 - data = Source.RandomParticles(NDATA, seed=42) - randoms = Source.RandomParticles(NDATA*10, seed=84) + data = RandomCatalog(NDATA, seed=42) + randoms = RandomCatalog(NDATA*10, seed=84) # add the random columns for s in [data, randoms]: @@ -38,7 +38,7 @@ def test_selection(comm): s['Selection'] = (s['z'] > 0.4)&(s['z'] < 0.6) # the FKP source - fkp = Source.FKPCatalog(data, randoms) + fkp = FKPCatalog(data, randoms) fkp = fkp.to_mesh(Nmesh=128, dtype='f8', nbar='NZ', selection='Selection') # compute the multipoles @@ -64,8 +64,8 @@ def test_run(comm): CurrentMPIComm.set(comm) cosmo = cosmology.Planck15 - data = Source.RandomParticles(NDATA, seed=42) - randoms = Source.RandomParticles(NDATA*10, seed=84) + data = RandomCatalog(NDATA, seed=42) + randoms = RandomCatalog(NDATA*10, seed=84) # add the random columns for s in [data, randoms]: @@ -86,7 +86,7 @@ def test_run(comm): s['Weight'] = (1 + P0*s['NZ'])**2 # the FKP source - fkp = Source.FKPCatalog(data, randoms) + fkp = FKPCatalog(data, randoms) fkp = fkp.to_mesh(Nmesh=128, dtype='f8', nbar='NZ', fkp_weight='FKPWeight', comp_weight='Weight', selection='Selection') # compute the multipoles @@ -117,8 +117,8 @@ def test_with_zhist(comm): CurrentMPIComm.set(comm) cosmo = cosmology.Planck15 - data = Source.RandomParticles(NDATA, seed=42, use_cache=True) - randoms = Source.RandomParticles(NDATA*10, seed=84, use_cache=True) + data = RandomCatalog(NDATA, seed=42, use_cache=True) + randoms = RandomCatalog(NDATA*10, seed=84, use_cache=True) # add the random columns for s in [data, randoms]: @@ -132,7 +132,7 @@ def test_with_zhist(comm): s['Position'] = transform.SkyToCartesion(s['ra'], s['dec'], s['z'], cosmo=cosmo) # initialize the FKP source - fkp = Source.FKPCatalog(data, randoms) + fkp = FKPCatalog(data, randoms) # compute NZ from randoms zhist = RedshiftHistogram(fkp.randoms, FSKY, cosmo, redshift='z') diff --git a/nbodykit/tests/test_fftpower.py b/nbodykit/algorithms/tests/test_fftpower.py similarity index 78% rename from nbodykit/tests/test_fftpower.py rename to nbodykit/algorithms/tests/test_fftpower.py index 072ba2cca..3b863cbcf 100644 --- a/nbodykit/tests/test_fftpower.py +++ b/nbodykit/algorithms/tests/test_fftpower.py @@ -10,7 +10,7 @@ def test_fftpower_poles(comm): CurrentMPIComm.set(comm) - source = Source.UniformParticles(nbar=3e-3, BoxSize=512., seed=42) + source = UniformCatalog(nbar=3e-3, BoxSize=512., seed=42) r = FFTPower(source, mode='2d', BoxSize=1024, Nmesh=32, poles=[0,2,4]) pkmu = r.power['power'].real @@ -27,7 +27,7 @@ def test_fftpower_poles(comm): def test_fftpower_padding(comm): CurrentMPIComm.set(comm) - source = Source.UniformParticles(nbar=3e-3, BoxSize=512., seed=42) + source = UniformCatalog(nbar=3e-3, BoxSize=512., seed=42) r = FFTPower(source, mode='1d', BoxSize=1024, Nmesh=32) assert r.attrs['N1'] != 0 @@ -37,7 +37,7 @@ def test_fftpower_padding(comm): def test_fftpower_padding(comm): CurrentMPIComm.set(comm) - source = Source.UniformParticles(nbar=3e-3, BoxSize=512., seed=42) + source = UniformCatalog(nbar=3e-3, BoxSize=512., seed=42) r = FFTPower(source, mode='1d', BoxSize=1024, Nmesh=32) assert r.attrs['N1'] != 0 @@ -47,7 +47,7 @@ def test_fftpower_padding(comm): def test_fftpower_save(comm): CurrentMPIComm.set(comm) - source = Source.UniformParticles(nbar=3e-3, BoxSize=512., seed=42) + source = UniformCatalog(nbar=3e-3, BoxSize=512., seed=42) r = FFTPower(source, mode='2d', Nmesh=32) r.save('fftpower-test.json') @@ -63,7 +63,7 @@ def test_fftpower_save(comm): def test_fftpower(comm): CurrentMPIComm.set(comm) - source = Source.UniformParticles(nbar=3e-3, BoxSize=512., seed=42) + source = UniformCatalog(nbar=3e-3, BoxSize=512., seed=42) r = FFTPower(source, mode='1d', Nmesh=32) # the zero mode is cleared @@ -76,7 +76,7 @@ def test_fftpower_mismatch_boxsize(comm): CurrentMPIComm.set(comm) # input sources - source1 = Source.UniformParticles(nbar=3e-3, BoxSize=512., seed=42) - source2 = Source.LinearMesh(cosmology.NoWiggleEHPower(cosmo, 0.55), BoxSize=1024, Nmesh=32, seed=33) + source1 = UniformCatalog(nbar=3e-3, BoxSize=512., seed=42) + source2 = LinearMesh(cosmology.NoWiggleEHPower(cosmo, 0.55), BoxSize=1024, Nmesh=32, seed=33) r = FFTPower(source1, second=source2, mode='1d', BoxSize=1024, Nmesh=32) diff --git a/nbodykit/tests/test_fiber_colls.py b/nbodykit/algorithms/tests/test_fiber_colls.py similarity index 100% rename from nbodykit/tests/test_fiber_colls.py rename to nbodykit/algorithms/tests/test_fiber_colls.py diff --git a/nbodykit/tests/test_fof.py b/nbodykit/algorithms/tests/test_fof.py similarity index 89% rename from nbodykit/tests/test_fof.py rename to nbodykit/algorithms/tests/test_fof.py index ed1ebbd8a..bd436486f 100644 --- a/nbodykit/tests/test_fof.py +++ b/nbodykit/algorithms/tests/test_fof.py @@ -12,7 +12,7 @@ def test_fof(comm): CurrentMPIComm.set(comm) # lognormal particles - source = Source.LogNormal(Plin=cosmology.EHPower(cosmo, 0.55), + source = LogNormalCatalog(Plin=cosmology.EHPower(cosmo, 0.55), nbar=3e-3, BoxSize=512., Nmesh=128, seed=42) # compute P(k,mu) and multipoles diff --git a/nbodykit/tests/test_zhist.py b/nbodykit/algorithms/tests/test_zhist.py similarity index 92% rename from nbodykit/tests/test_zhist.py rename to nbodykit/algorithms/tests/test_zhist.py index 32a4b8e8e..c1dd6b88b 100644 --- a/nbodykit/tests/test_zhist.py +++ b/nbodykit/algorithms/tests/test_zhist.py @@ -16,7 +16,7 @@ def test_save(comm): cosmo = cosmology.Planck15 # create the source - source = Source.RandomParticles(N, seed=42) + source = RandomCatalog(N, seed=42) source['z'] = source.rng.normal(loc=0.5, scale=0.1, size=source.size) # compute the histogram @@ -43,7 +43,7 @@ def test_unweighted(comm): cosmo = cosmology.Planck15 # create the source - source = Source.RandomParticles(N, seed=42) + source = RandomCatalog(N, seed=42) source['z'] = source.rng.normal(loc=0.5, scale=0.1, size=source.size) # compute the histogram @@ -62,7 +62,7 @@ def test_weighted(comm): cosmo = cosmology.Planck15 # create the source - source = Source.RandomParticles(N, seed=42) + source = RandomCatalog(N, seed=42) source['z'] = source.rng.normal(loc=0.5, scale=0.1, size=source.size) source['weight'] = source.rng.uniform(0, high=1., size=source.size) diff --git a/nbodykit/algorithms/threeptcf.py b/nbodykit/algorithms/threeptcf.py index 88d37b8e5..32250b88c 100644 --- a/nbodykit/algorithms/threeptcf.py +++ b/nbodykit/algorithms/threeptcf.py @@ -19,7 +19,7 @@ def __init__(self, source, poles, edges, BoxSize=None, periodic=True, weight='We """ Parameters ---------- - source : ParticleSource + source : CatalogSource the input source of particles providing the 'Position' column poles : list of int the list of multipole numbers to compute diff --git a/nbodykit/algorithms/zhist.py b/nbodykit/algorithms/zhist.py index 19da72fb0..ca05964ae 100644 --- a/nbodykit/algorithms/zhist.py +++ b/nbodykit/algorithms/zhist.py @@ -61,7 +61,7 @@ def __init__(self, source, fsky, cosmo, bins=None, redshift='Redshift', weight=N """ Parameters ---------- - source : ParticleSource + source : CatalogSource the source of particles holding the redshift column to histogram fsky : float the sky area fraction, which is used in the volume calculation when diff --git a/nbodykit/base/catalog.py b/nbodykit/base/catalog.py new file mode 100644 index 000000000..f1598c4e3 --- /dev/null +++ b/nbodykit/base/catalog.py @@ -0,0 +1,576 @@ +from nbodykit.extern.six import add_metaclass, string_types +from nbodykit.transform import ConstantArray + +import abc +import numpy +import logging +import warnings +import dask.array as da + +# default size of Cache for CatalogSource arrays +CACHE_SIZE = 1e9 + + +def column(name=None): + """ + Decorator that defines a function as a column in a CatalogSource + """ + def decorator(getter): + getter.column_name = name + return getter + + if hasattr(name, '__call__'): + # a single callable is provided + getter = name + name = getter.__name__ + return decorator(getter) + else: + return decorator + + +def find_columns(cls): + """ + Find all hard-coded column names associated with the input class + + Returns + ------- + hardcolumns : set + a set of the names of all hard-coded columns for the + input class ``cls`` + """ + hardcolumns = [] + + # search through the class dict for columns + for key, value in cls.__dict__.items(): + if hasattr(value, 'column_name'): + hardcolumns.append(value.column_name) + + # recursively search the base classes, too + for base in cls.__bases__: + hardcolumns += find_columns(base) + + return list(sorted(set(hardcolumns))) + + +def find_column(cls, name): + """ + Find a specific column ``name`` of an input class, or raise + an exception if it does not exist + + Returns + ------- + column : callable + the callable that returns the column data + """ + # first search through class + for key, value in cls.__dict__.items(): + if not hasattr(value, 'column_name'): continue + if value.column_name == name: return value + + for base in cls.__bases__: + try: return find_column(base, name) + except: pass + + args = (name, str(cls)) + raise AttributeError("unable to find column '%s' for '%s'" % args) + + +@add_metaclass(abc.ABCMeta) +class CatalogSource(object): + """ + An abstract base class for a source of particles. This objects behaves + like a structured numpy array -- it must have well-defined size + when initialized. The ``size`` here represents the number of particles + in the source. + + Subclasses of this class must define a ``size`` property. + + The class stores a series of columns as dask arrays, which can be accessed + in a dict-like fashion. + + The class handles the generation of particle data, either reading + from disk or generating at runtime, and provides the interface for + creating a ``ParticleMesh``. The mesh handles the gridding of discrete + particle data on to a continuous mesh of cells. + """ + logger = logging.getLogger('CatalogSource') + + @staticmethod + def make_column(array): + """ + Utility function to convert a numpy array to a column object + (dask.array.Array) + """ + if isinstance(array, da.Array): + return array + else: + return da.from_array(array, chunks=100000) + + def __init__(self, comm, use_cache=False): + + # ensure self.comm is set, though usually already set by the child. + self.comm = comm + + # initialize a cache + self.use_cache = use_cache + + # user-provided overrides for columns + self._overrides = {} + + # if size is already computed update csize + # otherwise the subclass shall call update_csize explicitly. + if self.size is not NotImplemented: + self.update_csize() + + def __len__(self): + """ + The length of CatalogSource is equal to :attr:`size`; this is the + local size of the source on a given rank + """ + return self.size + + def __iter__(self): + return iter(self.columns) + + def __contains__(self, col): + return col in self.columns + + def __getitem__(self, sel): + """ + The following types of indexing supported are supported + for CatalogSource: + + 1. strings specifying a column in the CatalogSource; returns + a dask array holding the column data + 2. boolean arrays specifying a slice of the CatalogSource; + returns a CatalogSubset holding only the revelant slice + 3. slice object specifying which particles to select + """ + # handle boolean array slices + if not isinstance(sel, string_types): + + # convert slices to boolean arrays + if isinstance(sel, (slice, list)): + + # list must be all integers + if isinstance(sel, list) and not numpy.array(sel).dtype == numpy.integer: + raise KeyError("array-like indexing via a list should be a list of integers") + + index = numpy.zeros(self.size, dtype='?') + index[sel] = True; sel = index + + # do the slicing + if not numpy.isscalar(sel): + return get_catalog_subset(self, sel) + else: + raise KeyError("strings and boolean arrays are the only supported indexing methods") + + # get the right column + if sel in self._overrides: + r = self._overrides[sel] + elif sel in self.hardcolumns: + r = self.get_hardcolumn(sel) + else: + raise KeyError("column `%s` is not defined in this source; " %sel + \ + "try adding column via `source[column] = data`") + + # evaluate callables if we need to + if callable(r): r = r() + + # add a column attrs dict + if not hasattr(r, 'attrs'): r.attrs = {} + + return r + + def __setitem__(self, col, value): + """ + Add columns to the CatalogSource, overriding any existing columns + with the name ``col`` + """ + # handle scalar values + if numpy.isscalar(value): + assert self.size is not NotImplemented, "size is not implemented! cannot set scalar array" + value = ConstantArray(value, self.size, chunks=100000) + + # check the correct size, if we know the size + if self.size is not NotImplemented: + assert len(value) == self.size, "error setting column, data must be array of size %d" %self.size + + # add the column to the overrides dict + self._overrides[col] = self.make_column(value) + + @abc.abstractproperty + def size(self): + """ + The number of particles in the source on the local rank. This + property must be defined for all subclasses + """ + return NotImplemented + + @property + def use_cache(self): + """ + If set to ``True``, use the builtin cache features of ``dask`` + to cache data in memory + """ + return self._use_cache + + @use_cache.setter + def use_cache(self, val): + """ + Initialize a Cache object of size set by ``CACHE_SIZE``, which + is 1 GB by default + """ + if val: + try: + from dask.cache import Cache + if not hasattr(self, '_cache'): + self._cache = Cache(CACHE_SIZE) + except ImportError: + warnings.warn("caching of CatalogSource requires ``cachey`` module; turning cache off") + else: + if hasattr(self, '_cache'): delattr(self, '_cache') + self._use_cache = val + + @property + def hardcolumns(self): + """ + A list of the hard-coded columns in the Source. These are usually + member functions marked by @column decorator. + + Subclasses may override this method and :func:`get_hardcolumn` to bypass + the decorator logic. + """ + try: + self._hardcolumns + except AttributeError: + self._hardcolumns = find_columns(self.__class__) + return self._hardcolumns + + @property + def columns(self): + """ + All columns in this Source, which include: + + 1. the columns hard-coded into the class's defintion + 2. override columns provided by the user + """ + hcolumns = list(self.hardcolumns) + overrides = list(self._overrides) + return sorted(set(hcolumns + overrides)) + + @property + def csize(self): + """ + The total, collective size of the Source, i.e., summed across all + ranks. + + It is the sum of :attr:`size` across all available ranks. + """ + return self._csize + + @property + def attrs(self): + """ + Dictionary storing relevant meta-data + """ + try: + return self._attrs + except AttributeError: + self._attrs = {} + return self._attrs + + @column + def Selection(self): + """ + Override the ``Selection`` column to select a subset slice of a + CatalogSource. + + By default, this column is set to ``True`` for all particles. + """ + return ConstantArray(True, self.size, chunks=100000) + + @column + def Weight(self): + """ + When interpolating a CatalogSource on to a mesh, the value of this + array is used as the Weight that each particle contributes to a given + mesh cell. + + By default, this array is set to unity for all particles + """ + return ConstantArray(1.0, self.size, chunks=100000) + + def get_hardcolumn(self, col): + """ + Construct and return a hard-coded column. These are usually produced by calling + member functions marked by the @column decorator. + + Subclasses may override this method and the hardcolumns attribute to bypass + the decorator logic. + """ + return find_column(self.__class__, col)(self) + + def update_csize(self): + """ + Set the collective size, :attr:`csize` + + This function should be called in :func:`__init__` of a subclass, + after :attr:`size` has been set to a valid value (not ``NotImplemented``) + """ + if self.size is NotImplemented: + raise ValueError(("``size`` cannot be NotImplemented when trying " + "to compute collective size `csize`")) + + # sum size across all ranks + self._csize = self.comm.allreduce(self.size) + + # log some info + if self.comm.rank == 0: + self.logger.debug("rank 0, local number of particles = %d" %self.size) + self.logger.info("total number of particles in %s = %d" %(str(self), self.csize)) + + def compute(self, *args, **kwargs): + """ + Our version of :func:`dask.compute` that computes + multiple delayed dask collections at once + + This should be called on the return value of :func:`read` + to converts any dask arrays to numpy arrays + + Parameters + ----------- + args : object + Any number of objects. If the object is a dask + collection, it's computed and the result is returned. + Otherwise it's passed through unchanged. + + Notes + ----- + The dask default optimizer induces too many (unnecesarry) + IO calls -- we turn this off feature off by default. + + Eventually we want our own optimizer probably. + """ + import dask + + # do not optimize graph (can lead to slower optimizations) + kwargs.setdefault('optimize_graph', False) + + # use a cache? + if self.use_cache and hasattr(self, '_cache'): + with self._cache: + toret = dask.compute(*args, **kwargs) + else: + toret = dask.compute(*args, **kwargs) + + # do not return tuples of length one + if len(toret) == 1: toret = toret[0] + return toret + + def save(self, output, columns, datasets=None, header='Header'): + """ + Save the CatalogSource to a :class:`bigfile.BigFile` + + Only the selected columns are saved and :attr:`attrs` are saved in + ``header``. The attrs of columns are stored in the datasets. + + Parameters + ---------- + output : str + the name of the file to write to + columns : list of str + the names of the columns to save in the file + datasets : list of str, optional + names for the data set where each column is stored; defaults to + the name of the column + header : str, optional + the name of the data set holding the header information, where + :attr:`attrs` is stored + """ + import bigfile + + if datasets is None: datasets = columns + if len(datasets) != len(columns): + raise ValueError("`datasets` must have the same length as `columns`") + + with bigfile.BigFileMPI(comm=self.comm, filename=output, create=True) as ff: + try: + bb = ff.open(header) + except: + bb = ff.create(header) + with bb : + for key in self.attrs: + bb.attrs[key] = self.attrs[key] + + for column, dataset in zip(columns, datasets): + c = self[column] + + with ff.create_from_array(dataset, c) as bb: + if hasattr(c, 'attrs'): + for key in c.attrs: + bb.attrs[key] = c.attrs[key] + + def read(self, columns): + """ + Return the requested columns as dask arrays + + Parameters + ---------- + columns : list of str + the names of the requested columns + + Returns + ------- + list of dask.array.Array : + the list of column data, in the form of dask arrays + """ + missing = set(columns) - set(self.columns) + if len(missing) > 0: + raise ValueError("source does not contain columns: %s; " %str(missing) + \ + "try adding columns via `source[column] = data`") + + return [self[col] for col in columns] + + def to_mesh(self, Nmesh=None, BoxSize=None, dtype='f4', + interlaced=False, compensated=False, window='cic', + weight='Weight', selection='Selection'): + """ + Convert the CatalogSource to a MeshSource + + Parameters + ---------- + Nmesh : int, optional + the number of cells per side on the mesh; must be provided if + not stored in :attr:`attrs` + BoxSize : scalar, 3-vector, optional + the size of the box; must be provided if + not stored in :attr:`attrs` + dtype : string, optional + the data type of the mesh array + interlaced : bool, optional + use the interlacing technique of Sefusatti et al. 2015 to reduce + the effects of aliasing on Fourier space quantities computed + from the mesh + compensated : bool, optional + whether to correct for the window introduced by the grid + interpolation scheme + window : str, optional + the string specifying which window interpolation scheme to use; + see `pmesh.window.methods` + weight : str, optional + the name of the column specifying the weight for each particle + selection : str, optional + the name of the column that specifies which (if any) slice + of the CatalogSource to take + + Returns + ------- + mesh : CatalogMeshSource + a mesh object that provides an interface for gridding particle + data onto a specified mesh + """ + from nbodykit.base.catalogmesh import CatalogMeshSource + from pmesh.window import methods + + if window not in methods: + raise ValueError("valid window methods: %s" %str(methods)) + + if BoxSize is None: + try: + BoxSize = self.attrs['BoxSize'] + except KeyError: + raise ValueError(("cannot convert particle source to a mesh; " + "'BoxSize' keyword is not supplied and the CatalogSource " + "does not define one in 'attrs'.")) + if Nmesh is None: + try: + Nmesh = self.attrs['Nmesh'] + except KeyError: + raise ValueError(("cannot convert particle source to a mesh; " + "'Nmesh' keyword is not supplied and the CatalogSource " + "does not define one in 'attrs'.")) + + r = CatalogMeshSource(self, Nmesh=Nmesh, BoxSize=BoxSize, + dtype=dtype, weight=weight, selection=selection) + r.interlaced = interlaced + r.compensated = compensated + r.window = window + return r + + +class CatalogSubset(CatalogSource): + """ + A subset of a CatalogSource holding only a portion + of the original source + """ + def __init__(self, size, comm, use_cache=False, **columns): + """ + Parameters + ---------- + size : int + the size of the new source; this was likely determined by + the number of particles passing the selection criterion + comm : MPI communicator + the MPI communicator; this should be the same as the + comm of the object that we are selecting from + use_cache : bool; optional + whether to cache results + **columns : + the data arrays that will be added to this source; keys + represent the column names + """ + self._size = size + CatalogSource.__init__(self, comm=comm, use_cache=use_cache) + + # store the column arrays + for name in columns: + self[name] = columns[name] + + @property + def size(self): + return self._size + +def get_catalog_subset(parent, index): + """ + Select a subset of a CatalogSource according to a boolean index array, + returning a CatalogSubset holding only the data that satisfies + the slice criterion + + Parameters + ---------- + parent : CatalogSource + the parent source that will be sliced + index : array_like + either a dask or numpy boolean array; this determines which + rows are included in the returned object + + Returns + ------- + subset : CatalogSubset + the particle source with the same meta-data as `parent`, and + with the sliced data arrays + """ + # compute the index slice if needed and get the size + if isinstance(index, da.Array): + index = parent.compute(index) + elif isinstance(index, list): + index = numpy.array(index) + + # verify the index is a boolean array + if len(index) != len(parent): + raise ValueError("slice index has length %d; should be %d" %(len(index), len(parent))) + if getattr(index, 'dtype', None) != '?': + raise ValueError("index used to slice CatalogSource must be boolean and array-like") + + # new size is just number of True entries + size = index.sum() + + # initialize subset Source of right size + subset_data = {col:parent[col][index] for col in parent} + toret = CatalogSubset(size, parent.comm, use_cache=parent.use_cache, **subset_data) + + # and the meta-data + toret.attrs.update(parent.attrs) + + return toret \ No newline at end of file diff --git a/nbodykit/base/particlemesh.py b/nbodykit/base/catalogmesh.py similarity index 95% rename from nbodykit/base/particlemesh.py rename to nbodykit/base/catalogmesh.py index d85aa0b9b..dc680e434 100644 --- a/nbodykit/base/particlemesh.py +++ b/nbodykit/base/catalogmesh.py @@ -4,18 +4,18 @@ import numpy import logging from nbodykit.base.mesh import MeshSource -from nbodykit.base.particles import ParticleSource +from nbodykit.base.catalog import CatalogSource # for converting from particle to mesh from pmesh import window from pmesh.pm import RealField, ComplexField -class ParticleMeshSource(MeshSource, ParticleSource): - logger = logging.getLogger('ParticleMeshSource') +class CatalogMeshSource(MeshSource, CatalogSource): + logger = logging.getLogger('CatalogMeshSource') def __repr__(self): - return "(%s as ParticleMeshSource)" % repr(self.source) + return "(%s as CatalogeMeshSource)" % repr(self.source) - # intended to be used by ParticleSource internally + # intended to be used by CatalogSource internally def __init__(self, source, BoxSize, Nmesh, dtype, weight, selection, position='Position'): # ensure self.comm is set, though usually already set by the child. self.comm = source.comm @@ -29,7 +29,7 @@ def __init__(self, source, BoxSize, Nmesh, dtype, weight, selection, position='P # this will override BoxSize and Nmesh carried from the source, if there is any! MeshSource.__init__(self, BoxSize=BoxSize, Nmesh=Nmesh, dtype=dtype, comm=source.comm) - ParticleSource.__init__(self, comm=source.comm) + CatalogSource.__init__(self, comm=source.comm) # copy over the overrides self._overrides.update(self.source._overrides) @@ -90,7 +90,7 @@ def to_real_field(self): """ # check for 'Position' column if self.position not in self: - raise ValueError("in order to paint a ParticleSource to a RealField, add a " + \ + raise ValueError("in order to paint a CatalogSource to a RealField, add a " + \ "column named '%s', representing the particle positions" %self.position) pm = self.pm diff --git a/nbodykit/base/mesh.py b/nbodykit/base/mesh.py index 53e882807..4020b4114 100644 --- a/nbodykit/base/mesh.py +++ b/nbodykit/base/mesh.py @@ -2,7 +2,6 @@ import abc import numpy import logging - from pmesh.pm import ParticleMesh, RealField, ComplexField @add_metaclass(abc.ABCMeta) diff --git a/nbodykit/base/particles.py b/nbodykit/base/particles.py deleted file mode 100644 index 07a155957..000000000 --- a/nbodykit/base/particles.py +++ /dev/null @@ -1,321 +0,0 @@ -from nbodykit.extern.six import add_metaclass -from nbodykit.transform import ConstantArray - -import abc -import numpy -import logging -import warnings - -CACHE_SIZE = 1e9 - -def column(name=None): - def decorator(getter): - getter.column_name = name - return getter - - if hasattr(name, '__call__'): - # a single callable is provided - getter = name - name = getter.__name__ - return decorator(getter) - else: - return decorator - -def find_columns(cls): - hardcolumns = [] - - for key, value in cls.__dict__.items(): - if hasattr(value, 'column_name'): - hardcolumns.append(value.column_name) - - # recursively search the base classes, too - for base in cls.__bases__: - hardcolumns += find_columns(base) - - return list(sorted(set(hardcolumns))) - -def find_column(cls, name): - for key, value in cls.__dict__.items(): - if not hasattr(value, 'column_name'): continue - if value.column_name == name: return value - - for base in cls.__bases__: - try: return find_column(base, name) - except: pass - - raise AttributeError("Column %s not found in class %s." % (name, str(cls))) - -@add_metaclass(abc.ABCMeta) -class ParticleSource(object): - """ - Base class for a source of input particles - - This combines the process of reading and painting - """ - logger = logging.getLogger('ParticleSource') - - @staticmethod - def make_column(array): - """ convert a numpy array to a column object (dask.array.Array) """ - import dask.array as da - if isinstance(array, da.Array): - return array - else: - return da.from_array(array, chunks=100000) - - # called by the subclasses - def __init__(self, comm, use_cache=False): - - # ensure self.comm is set, though usually already set by the child. - self.comm = comm - - # initialize a cache - self.use_cache = use_cache - - # initial dicts of overrided and fallback columns - self._overrides = {} - self._fallbacks = {} - - # if size is already computed update csize - # otherwise the subclass shall call update_csize explicitly. - if self.size is not NotImplemented: - self.update_csize() - - @property - def use_cache(self): - return self._use_cache - - @use_cache.setter - def use_cache(self, val): - if val: - try: - # try to add a Cache if we don't have one yet - from dask.cache import Cache - if not hasattr(self, '_cache'): - self._cache = Cache(CACHE_SIZE) - except ImportError: - warnings.warn("caching of ParticleSource requires ``cachey`` module; turning cache off") - else: - if hasattr(self, '_cache'): - delattr(self, '_cache') - self._use_cache = val - - def to_mesh(self, Nmesh=None, BoxSize=None, dtype='f4', - interlaced=False, compensated=False, window='cic', - weight='Weight', selection='Selection' - ): - """ - Convert the ParticleSource to a MeshSource - - FIXME: probably add Position, Weight and Selection column names - - """ - from nbodykit.base.particlemesh import ParticleMeshSource - - if BoxSize is None: - try: - BoxSize = self.attrs['BoxSize'] - except KeyError: - raise ValueError("cannot convert particle source to a mesh; " - "'BoxSize' keyword is not supplied and the particle source does not define one in 'attrs'.") - - if Nmesh is None: - try: - Nmesh = self.attrs['Nmesh'] - except KeyError: - raise ValueError("cannot convert particle source to a mesh; " - "'Nmesh' keyword is not supplied and the particle source does not define one in 'attrs'.") - - r = ParticleMeshSource(self, Nmesh=Nmesh, BoxSize=BoxSize, dtype=dtype, weight=weight, selection=selection) - - r.interlaced = interlaced - r.compensated = compensated - r.window = window - return r - - def update_csize(self): - """ set the collective size - - Call this function in __init__ of subclass, - after .size is a valid value (not NotImplemented) - """ - self._csize = self.comm.allreduce(self.size) - - if self.comm.rank == 0: - self.logger.debug("rank 0, local number of particles = %d" % self.size) - self.logger.info("total number of particles = %d" % self.csize) - - # defaults (these save memory by using ConstantArray) - self._fallbacks['Selection'] = ConstantArray(True, self.size, chunks=100000) - self._fallbacks['Weight'] = ConstantArray(1.0, self.size, chunks=100000) - - @property - def attrs(self): - """ - Dictionary storing relevant meta-data - """ - try: - return self._attrs - except AttributeError: - self._attrs = {} - return self._attrs - - def compute(self, *args, **kwargs): - """ - Our version of :func:`dask.compute` that computes - multiple delayed dask collections at once - - This should be called on the return value of :func:`read` - to converts any dask arrays to numpy arrays - - Parameters - ----------- - args : object - Any number of objects. If the object is a dask - collection, it's computed and the result is returned. - Otherwise it's passed through unchanged. - - Notes - ----- - The dask default optimizer induces too many (unnecesarry) - IO calls -- we turn this off feature off by default. - - Eventually we want our own optimizer probably. - """ - import dask - - # XXX find a better place for this function - kwargs.setdefault('optimize_graph', False) - - if self.use_cache and hasattr(self, '_cache'): - with self._cache: - toret = dask.compute(*args, **kwargs) - else: - toret = dask.compute(*args, **kwargs) - - # do not return tuples of length one - if len(toret) == 1: toret = toret[0] - return toret - - def __len__(self): - """ - The length of ParticleSource is equal to :attr:`size`; this is the - local size of the source on a given rank - """ - return self.size - - def __contains__(self, col): - return col in self.columns - - @property - def hardcolumns(self): - """ a list of hard coded columns. - These are usually member functions marked by @column decorator. - - Subclasses may override this method and get_hardcolumn to bypass - the decorator logic. - """ - try: - self._hardcolumns - except AttributeError: - - self._hardcolumns = find_columns(self.__class__) - return self._hardcolumns - - def get_hardcolumn(self, col): - """ construct and return a hard coded column. - These are usually produced by calling member functions marked by @column decorator. - - Subclasses may override this method and the hardcolumns attribute to bypass - the decorator logic. - """ - return find_column(self.__class__, col)(self) - - - @property - def columns(self): - """ - The names of the data fields defined for each particle, including overriden columns and fallback columns - """ - return sorted(set(list(self.hardcolumns) + list(self._overrides) + list(self._fallbacks))) - - @property - def csize(self): - """ - The collective size of the source, i.e., summed across all ranks - """ - return self._csize - - @abc.abstractproperty - def size(self): - """ - The number of particles in the source on the local rank. - """ - return NotImplemented - - def __getitem__(self, col): - if col in self._overrides: - r = self._overrides[col] - elif col in self.hardcolumns: - r = self.get_hardcolumn(col) - elif col in self._fallbacks: - r = self._fallbacks[col] - else: - raise KeyError("column `%s` is not defined in this source; try adding column via `source[column] = data`" %col) - - if callable(r): r = r() - if not hasattr(r, 'attrs'): - r.attrs = {} - return r - - def save(self, output, columns, datasets=None, header='Header'): - """ Save the data source to a bigfile. - - selected columns are saved. attrs are saved in header. - attrs of columns are stored in the datasets. - """ - import bigfile - if datasets is None: - datasets = columns - - with bigfile.BigFileMPI(comm=self.comm, filename=output, create=True) as ff: - try: - bb = ff.open(header) - except: - bb = ff.create(header) - with bb : - for key in self.attrs: - bb.attrs[key] = self.attrs[key] - - for column, dataset in zip(columns, datasets): - c = self[column] - - with ff.create_from_array(dataset, c) as bb: - if hasattr(c, 'attrs'): - for key in c.attrs: - bb.attrs[key] = c.attrs[key] - - def __setitem__(self, col, value): - - # handle scalar values - if numpy.isscalar(value): - assert self.size is not NotImplemented, "size is not implemented! cannot set scalar array" - value = ConstantArray(value, self.size, chunks=100000) - - # check the correct size, if we know the size - if self.size is not NotImplemented: - assert len(value) == self.size, "error setting column, data must be array of size %d" %self.size - self._overrides[col] = self.make_column(value) - - def read(self, columns): - """ - Return the requested columns as dask arrays - - Currently, this returns a dask array holding the total amount - of data for each rank, divided equally amongst the available ranks - """ - missing = set(columns) - set(self.columns) - if len(missing) > 0: - raise ValueError("source does not contain columns: %s; try adding columns via `source[column] = data`" %str(missing)) - - return [self[col] for col in columns] diff --git a/nbodykit/base/tests/__init__.py b/nbodykit/base/tests/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/nbodykit/cosmology/tests/__init__.py b/nbodykit/cosmology/tests/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/nbodykit/tests/test_cosmology.py b/nbodykit/cosmology/tests/test_cosmology.py similarity index 100% rename from nbodykit/tests/test_cosmology.py rename to nbodykit/cosmology/tests/test_cosmology.py diff --git a/nbodykit/io/bigfile.py b/nbodykit/io/bigfile.py index b16c0e26a..450700319 100644 --- a/nbodykit/io/bigfile.py +++ b/nbodykit/io/bigfile.py @@ -17,6 +17,19 @@ class BigFile(FileType): """ def __init__(self, path, exclude=None, header='.', dataset='./'): + """ + Parameters + ---------- + path : str + the name of the directory holding the bigfile data + exclude : list of str; optional + the data sets to exlude from loading within bigfile; default + is the header + header : str; optional + the path to the header + dataset : str + load a specific dataset from the bigfile + """ if not dataset.endswith('/'): dataset = dataset + '/' import bigfile diff --git a/nbodykit/io/stack.py b/nbodykit/io/stack.py index 49852d970..cf4bbcc4d 100644 --- a/nbodykit/io/stack.py +++ b/nbodykit/io/stack.py @@ -14,18 +14,21 @@ class FileStack(FileType): This allows data to be accessed across multiple files from a single file object """ - def __init__(self, path, filetype, **kwargs): + def __init__(self, filetype, path, *args, **kwargs): """ Parameters ---------- + filetype : FileType subclass + the type of file class to initialize path : str list of file names, or string specifying single file or containing a glob-like '*' pattern - filetype : FileType subclass - the type of file class + *args : + additional arguments to pass to the ``filetype`` instance + during initialization **kwargs : - arguments passed to ``filetype`` when initializing each file in - the stack + additional keyword arguments passed to the ``filetype`` instance + during initialization """ # check that filetype is subclass of FileType if not inspect.isclass(filetype) or not issubclass(filetype, FileType): diff --git a/nbodykit/io/tests/test_stack.py b/nbodykit/io/tests/test_stack.py index 21944d35e..4a258239c 100644 --- a/nbodykit/io/tests/test_stack.py +++ b/nbodykit/io/tests/test_stack.py @@ -37,7 +37,7 @@ def test_data(comm): # initialize the stack path = os.path.join(tmpdir, 'tpm.00*') - f = FileStack(path, TPMBinaryFile, precision='f4') + f = FileStack(TPMBinaryFile, path, precision='f4') # check size assert f.size == 2048 @@ -49,7 +49,7 @@ def test_data(comm): # pass a list paths = [os.path.join(tmpdir, f) for f in ['tpm.000', 'tpm.001']] - f = FileStack(paths, TPMBinaryFile, precision='f4') + f = FileStack(TPMBinaryFile, paths, precision='f4') # check size assert f.size == 2048 @@ -79,7 +79,7 @@ def test_single_path(comm): pos[sl].tofile(ff); vel[sl].tofile(ff); uid[sl].tofile(ff) # single path - f = FileStack(os.path.join(tmpdir, 'tpm.000'), TPMBinaryFile, precision='f4') + f = FileStack(TPMBinaryFile, os.path.join(tmpdir, 'tpm.000'), precision='f4') assert f.size == 1024 assert f.nfiles == 1 @@ -104,7 +104,7 @@ def test_bad_path(comm): pos[sl].tofile(ff); vel[sl].tofile(ff); uid[sl].tofile(ff) # bad path name - try: f = FileStack(ff, TPMBinaryFile, precision='f4') + try: f = FileStack(TPMBinaryFile, ff, precision='f4') except: pass diff --git a/nbodykit/lab.py b/nbodykit/lab.py index ee4e3ee8d..2314a7a71 100644 --- a/nbodykit/lab.py +++ b/nbodykit/lab.py @@ -1,11 +1,15 @@ from mpi4py import MPI - import numpy -from nbodykit.batch import TaskManager -from nbodykit import source as Source, cosmology +# sources +from nbodykit.source.catalog import * +from nbodykit.source.mesh import * + +# algorithms from nbodykit.algorithms import * + +from nbodykit.batch import TaskManager +from nbodykit import cosmology from nbodykit import CurrentMPIComm from nbodykit import transform - from nbodykit import io as IO \ No newline at end of file diff --git a/nbodykit/source/__init__.py b/nbodykit/source/__init__.py index 1f91c2d57..f8069d7f9 100644 --- a/nbodykit/source/__init__.py +++ b/nbodykit/source/__init__.py @@ -1,2 +1,2 @@ -from .particle import * +from .catalog import * from .mesh import * diff --git a/nbodykit/source/catalog/__init__.py b/nbodykit/source/catalog/__init__.py new file mode 100644 index 000000000..14fcea09f --- /dev/null +++ b/nbodykit/source/catalog/__init__.py @@ -0,0 +1,14 @@ +# catalogs for different file types +from .file import CSVCatalog +from .file import BinaryCatalog +from .file import BigFileCatalog +from .file import HDFCatalog +from .file import TPMBinaryCatalog + +from .array import ArrayCatalog +from .lognormal import LogNormalCatalog +from .uniform import UniformCatalog, RandomCatalog +from .fkp import FKPCatalog +from .halos import HaloCatalog +from .hod import HODCatalog + diff --git a/nbodykit/source/particle/from_numpy.py b/nbodykit/source/catalog/array.py similarity index 62% rename from nbodykit/source/particle/from_numpy.py rename to nbodykit/source/catalog/array.py index 6570a9ba1..cd481664e 100644 --- a/nbodykit/source/particle/from_numpy.py +++ b/nbodykit/source/catalog/array.py @@ -1,21 +1,26 @@ -from nbodykit.base.particles import ParticleSource +from nbodykit.base.catalog import CatalogSource from nbodykit import CurrentMPIComm import numpy -class Array(ParticleSource): +class ArrayCatalog(CatalogSource): """ - A source of particles from numpy array + A catalog source initialized from a :mod:`numpy` array """ @CurrentMPIComm.enable def __init__(self, data, comm=None, use_cache=False, **kwargs): """ Parameters ---------- - comm : MPI.Communicator - the MPI communicator data : numpy.array - a structured array holding the - + a structured numpy array; fields of the array are interpreted + as the columns of the catalog + comm : MPI Communicator, optional + the MPI communicator instance; default (``None``) sets to the + current communicator + use_cache : bool, optional + whether to cache data read from disk; default is ``False`` + **kwargs : + additional keywords to store as meta-data in :attr:`attrs` """ self.comm = comm self._source = data @@ -28,11 +33,11 @@ def __init__(self, data, comm=None, use_cache=False, **kwargs): if any(dt != dtypes[0] for dt in dtypes): raise ValueError("mismatch between dtypes across ranks in Array") + CatalogSource.__init__(self, comm=comm, use_cache=use_cache) + # update the meta-data self.attrs.update(kwargs) - ParticleSource.__init__(self, comm=comm, use_cache=use_cache) - @property def size(self): return len(self._source) @@ -42,7 +47,7 @@ def hardcolumns(self): """ The union of the columns in the file and any transformed columns """ - defaults = ParticleSource.hardcolumns.fget(self) + defaults = CatalogSource.hardcolumns.fget(self) return list(self._source.dtype.names) + defaults def get_hardcolumn(self, col): @@ -54,5 +59,5 @@ def get_hardcolumn(self, col): if col in self._source.dtype.names: return self.make_column(self._source[col]) else: - return ParticleSource.get_hardcolumn(self, col) + return CatalogSource.get_hardcolumn(self, col) diff --git a/nbodykit/source/catalog/file.py b/nbodykit/source/catalog/file.py new file mode 100644 index 000000000..20f9a34d0 --- /dev/null +++ b/nbodykit/source/catalog/file.py @@ -0,0 +1,120 @@ +from nbodykit.base.catalog import CatalogSource +from nbodykit.io.stack import FileStack +from nbodykit import CurrentMPIComm +from nbodykit import io + +class FileCatalogBase(CatalogSource): + """ + Base class to create a source of particles from a + single file, or multiple files, on disk + + Files of a specific type should be subclasses of this class. + """ + @CurrentMPIComm.enable + def __init__(self, filetype, args=(), kwargs={}, comm=None, use_cache=False): + """ + Parameters + ---------- + filetype : subclass of :class:`nbodykit.io.FileType` + the file-like class used to load the data from file; should be a + subclass of :class:`nbodykit.io.FileType` + path : str, list of str + the path to the file(s) to load; can be a list of files to load, or + contain a glob-like asterisk pattern + args : dict, optional + the arguments to pass to the ``filetype`` class when constructing + each file object + """ + self.comm = comm + self.filetype = filetype + + # bcast the FileStack + if self.comm.rank == 0: + self._source = FileStack(filetype, *args, **kwargs) + else: + self._source = None + self._source = self.comm.bcast(self._source) + + # update the meta-data + self.attrs.update(self._source.attrs) + + if self.comm.rank == 0: + self.logger.info("Extra arguments to FileType: %s" %args) + + CatalogSource.__init__(self, comm=comm, use_cache=use_cache) + + @property + def size(self): + """ + The local size + """ + start = self.comm.rank * self._source.size // self.comm.size + end = (self.comm.rank + 1) * self._source.size // self.comm.size + return end - start + + @property + def hardcolumns(self): + """ + The union of the columns in the file and any transformed columns + """ + defaults = CatalogSource.hardcolumns.fget(self) + return list(self._source.dtype.names) + defaults + + def get_hardcolumn(self, col): + """ + Return a column from the underlying file source + + Columns are returned as dask arrays + """ + if col in self._source.dtype.names: + start = self.comm.rank * self._source.size // self.comm.size + end = (self.comm.rank + 1) * self._source.size // self.comm.size + return self._source.get_dask(col)[start:end] + else: + return CatalogSource.get_hardcolumn(self, col) + + +def FileCatalogFactory(name, filetype): + """ + Factory method to create subclasses of :class:`FileCatalogBase` + that use specific classes from :mod:`nbodykit.io` to read + different types of data from disk + """ + def wrapdoc(cls): + """Add the docstring of the IO class""" + def dec(f): + io_doc = cls.__init__.__doc__ + if io_doc is None: io_doc = "" + f.__doc__ = io_doc + f.__doc__ + return f + return dec + + @wrapdoc(filetype) + def __init__(self, *args, **kwargs): + """ + Additional Keyword Parameters + ----------------------------- + comm : MPI Communicator, optional + the MPI communicator instance; default (``None``) sets to the + current communicator + use_cache : bool, optional + whether to cache data read from disk; default is ``False`` + attrs : dict; optional + dictionary of meta-data to store in :attr:`attrs` + """ + comm = kwargs.pop('comm', None) + use_cache = kwargs.pop('use_cache', False) + attrs = kwargs.pop('attrs', {}) + FileCatalogBase.__init__(self, filetype=filetype, args=args, kwargs=kwargs) + self.attrs.update(attrs) + + + __doc__ = "A catalog source created using :class:`io.%s` to read data from disk" % filetype.__name__ + newclass = type(name, (FileCatalogBase,),{"__init__": __init__, "__doc__":__doc__}) + return newclass + +CSVCatalog = FileCatalogFactory("CSVCatalog", io.CSVFile) +BinaryCatalog = FileCatalogFactory("BinaryCatalog", io.BinaryFile) +BigFileCatalog = FileCatalogFactory("BigFileCatalog", io.BigFile) +HDFCatalog = FileCatalogFactory("HDFCatalog", io.HDFFile) +TPMBinaryCatalog = FileCatalogFactory("TPMBinaryCatalog", io.BinaryFile) diff --git a/nbodykit/source/particle/fkp.py b/nbodykit/source/catalog/fkp.py similarity index 93% rename from nbodykit/source/particle/fkp.py rename to nbodykit/source/catalog/fkp.py index e03e8deb9..ff50b7f07 100644 --- a/nbodykit/source/particle/fkp.py +++ b/nbodykit/source/catalog/fkp.py @@ -1,18 +1,17 @@ +from nbodykit.utils import attrs_to_dict +from nbodykit.transform import ConstantArray +from nbodykit.base.catalog import CatalogSource, column +from nbodykit.base.catalogmesh import CatalogMeshSource + import numpy import logging import functools import contextlib - -from nbodykit.utils import attrs_to_dict -from nbodykit.transform import ConstantArray -from nbodykit.base.particles import ParticleSource, column -from nbodykit.base.particlemesh import ParticleMeshSource - from pmesh.pm import RealField, ComplexField -class FKPMeshSource(ParticleMeshSource): +class FKPMeshSource(CatalogMeshSource): """ - A subclass of :class:`~nbodykit.base.particlemesh.ParticleMeshSource` + A subclass of :class:`~nbodykit.base.catalogmesh.CatalogMeshSource` designed to paint the FKP density field to a mesh, given two catalogs, one for `data` and one for `randoms` """ @@ -25,7 +24,7 @@ def __init__(self, source, BoxSize, Nmesh, dtype, selection, weight, comp_weight self.fkp_weight = fkp_weight self.nbar = nbar - ParticleMeshSource.__init__(self, source, BoxSize, Nmesh, dtype, weight, selection) + CatalogMeshSource.__init__(self, source, BoxSize, Nmesh, dtype, weight, selection) @contextlib.contextmanager def _set_mesh(self, prefix): @@ -68,7 +67,7 @@ def to_real_field(self): """ # paint -1.0*alpha*N_randoms with self._set_mesh('randoms'): - real = ParticleMeshSource.to_real_field(self) + real = CatalogMeshSource.to_real_field(self) # from N/Nbar = 1+delta to un-normalized number real[:] *= real.attrs['N'] / numpy.prod(self.pm.Nmesh) @@ -102,7 +101,7 @@ def to_real_field(self): # paint the data with self._set_mesh('data'): - real2 = ParticleMeshSource.to_real_field(self) + real2 = CatalogMeshSource.to_real_field(self) # from N/Nbar = 1+delta to un-normalized number real2[:] *= real2.attrs['N'] / numpy.prod(self.pm.Nmesh) @@ -199,15 +198,15 @@ def FKPColumn(self, which, col): source = getattr(self, which) return source[col] -class FKPCatalog(ParticleSource): +class FKPCatalog(CatalogSource): """ - Combine a ``data`` ParticleSource and a ``randoms`` ParticleSource + Combine a ``data`` CatalogSource and a ``randoms`` CatalogSource into a single unified Source This main functionality of this class is: * provide a uniform interface to accessing columns from the - `data` ParticleSource and `randoms` ParticleSource, using + `data` CatalogSource and `randoms` CatalogSource, using column names prefixed with "data." or "randoms." * compute the shared :attr:`BoxSize` of the Source, by finding the maximum Cartesian extent of the `randoms` @@ -218,9 +217,9 @@ def __init__(self, data, randoms, BoxSize=None, BoxPad=0.02, use_cache=True): """ Parameters ---------- - data : ParticleSource + data : CatalogSource the Source of particles representing the `data` catalog - randoms : ParticleSource + randoms : CatalogSource the Source of particles representing the `randoms` catalog BoxSize : float, 3-vector; optional the size of the Cartesian box to use for the unified `data` and @@ -244,18 +243,18 @@ def __init__(self, data, randoms, BoxSize=None, BoxPad=0.02, use_cache=True): self.attrs.update(attrs_to_dict(randoms, 'randoms.')) # init the base class - ParticleSource.__init__(self, comm=self.comm, use_cache=use_cache) + CatalogSource.__init__(self, comm=self.comm, use_cache=use_cache) # turn on cache? if self.use_cache: self.data.use_cache = True self.randoms.use_cache = True - # define the fallbacks new weight columns: FKPWeight and TotalWeight + # add new weight columns: FKPWeight and TotalWeight import dask.array as da for source in [self.data, self.randoms]: - source._fallbacks['FKPWeight'] = ConstantArray(1.0, source.size, chunks=100000) - source._fallbacks['TotalWeight'] = ConstantArray(1.0, source.size, chunks=100000) + source['FKPWeight'] = ConstantArray(1.0, source.size, chunks=100000) + source['TotalWeight'] = ConstantArray(1.0, source.size, chunks=100000) # prefixed columns in this source return on-demand from "data" or "randoms" for name in ['data', 'randoms']: @@ -283,7 +282,7 @@ def size(self): def _define_cartesian_box(self): """ - Internal function to put the :attr:`randoms` ParticleSource in a Cartesian box + Internal function to put the :attr:`randoms` CatalogSource in a Cartesian box This function add two necessary attribues: diff --git a/nbodykit/source/particle/halos.py b/nbodykit/source/catalog/halos.py similarity index 96% rename from nbodykit/source/particle/halos.py rename to nbodykit/source/catalog/halos.py index 0c322d5cc..2a4aaef27 100644 --- a/nbodykit/source/particle/halos.py +++ b/nbodykit/source/catalog/halos.py @@ -1,8 +1,8 @@ -from nbodykit.base.particles import ParticleSource, column +from nbodykit.base.catalog import CatalogSource, column from nbodykit import transform import numpy -class HaloCatalog(ParticleSource): +class HaloCatalog(CatalogSource): """ A wrapper Source class to interface nicely with :class:`halotools.sim_manager.UserSuppliedHaloCatalog` @@ -13,7 +13,7 @@ def __init__(self, source, cosmo, redshift, mdef='vir', """ Parameters ---------- - source : ParticleSource + source : CatalogSource the source holding the particles to be interpreted as halos particle_mass : float the @@ -49,7 +49,7 @@ def __init__(self, source, cosmo, redshift, mdef='vir', self.attrs['mdef'] = mdef # init the base class - ParticleSource.__init__(self, comm=comm, use_cache=use_cache) + CatalogSource.__init__(self, comm=comm, use_cache=use_cache) @property def size(self): diff --git a/nbodykit/source/particle/hod.py b/nbodykit/source/catalog/hod.py similarity index 96% rename from nbodykit/source/particle/hod.py rename to nbodykit/source/catalog/hod.py index d0dfd1f9a..b7733b4a3 100644 --- a/nbodykit/source/particle/hod.py +++ b/nbodykit/source/catalog/hod.py @@ -1,6 +1,6 @@ +from nbodykit.base.catalog import column +from nbodykit.source.catalog.array import ArrayCatalog from nbodykit import CurrentMPIComm -from nbodykit.base.particles import column -from nbodykit.source.particle.from_numpy import Array from nbodykit.utils import GatherArray, ScatterArray from nbodykit.extern.six import add_metaclass @@ -18,7 +18,7 @@ def remove_object_dtypes(data): return data @add_metaclass(abc.ABCMeta) -class HODBase(Array): +class HODBase(ArrayCatalog): """ A base class to be used for HOD population of a halo catalog. @@ -97,7 +97,7 @@ def __init__(self, halos, seed=None, use_cache=False, comm=None, **params): self._model.param_dict[param] = self.attrs[param] # make the actual source - Array.__init__(self, self._makesource(), comm=comm, use_cache=use_cache) + ArrayCatalog.__init__(self, self._makesource(), comm=comm, use_cache=use_cache) # crash with no particles! if self.csize == 0: @@ -194,7 +194,7 @@ def repopulate(self, seed=None, **params): self._log_populated_stats(data) # re-initialize with new source - Array.__init__(self, ScatterArray(data, self.comm), comm=self.comm, use_cache=self.use_cache) + ArrayCatalog.__init__(self, ScatterArray(data, self.comm), comm=self.comm, use_cache=self.use_cache) def _log_populated_stats(self, data): """ @@ -237,9 +237,9 @@ def VelocityOffset(self): rsd_factor = (1+z) / (100*self.cosmo.efunc(z)) return self['Velocity'] * rsd_factor -class HOD(HODBase): +class HODCatalog(HODBase): """ - A `ParticleSource` that uses the HOD prescription of + A `CatalogSource` that uses the HOD prescription of Zheng et al. 2007 to populate an input halo catalog with galaxies, and returns the (Position, Velocity) of those galaxies diff --git a/nbodykit/source/particle/lognormal.py b/nbodykit/source/catalog/lognormal.py similarity index 89% rename from nbodykit/source/particle/lognormal.py rename to nbodykit/source/catalog/lognormal.py index 46a7076a6..a6f1670d6 100644 --- a/nbodykit/source/particle/lognormal.py +++ b/nbodykit/source/catalog/lognormal.py @@ -1,20 +1,21 @@ -from nbodykit.base.particles import ParticleSource, column +from nbodykit.base.catalog import CatalogSource, column from nbodykit import cosmology from nbodykit.utils import attrs_to_dict from nbodykit import CurrentMPIComm import numpy -class LogNormal(ParticleSource): +class LogNormalCatalog(CatalogSource): """ - A source of (biased) particles Poisson-sampled from a log-normal density field + A catalog source containing (biased) particles that have + been Poisson-sampled from a log-normal density field """ def __repr__(self): - return "LogNormal(seed=%(seed)d, bias=%(bias)g)" %self.attrs + return "LogNormalCatalog(seed=%(seed)d, bias=%(bias)g)" %self.attrs @CurrentMPIComm.enable def __init__(self, Plin, nbar, BoxSize, Nmesh, bias=2., seed=None, - comm=None, cosmo=None, redshift=None, use_cache=False): + cosmo=None, redshift=None, comm=None, use_cache=False): """ Parameters ---------- @@ -33,12 +34,15 @@ def __init__(self, Plin, nbar, BoxSize, Nmesh, bias=2., seed=None, to the density field seed : int, optional the global random seed; if set to ``None``, the seed will be set randomly - comm : MPI communicator, optional - the MPI communicator cosmo : nbodykit.cosmology.Cosmology, optional this must be supplied if `Plin` does not carry ``cosmo`` attribute redshift : float, optional this must be supplied if `Plin` does not carry a ``redshift`` attribute + comm : MPI Communicator, optional + the MPI communicator instance; default (``None``) sets to the + current communicator + use_cache : bool, optional + whether to cache data read from disk; default is ``False`` """ self.comm = comm self.Plin = Plin @@ -73,7 +77,7 @@ def __init__(self, Plin, nbar, BoxSize, Nmesh, bias=2., seed=None, self.attrs['seed'] = seed # init the base class - ParticleSource.__init__(self, comm=comm, use_cache=use_cache) + CatalogSource.__init__(self, comm=comm, use_cache=use_cache) # make the actual source self._source, pm = self._makesource(BoxSize=BoxSize, Nmesh=Nmesh) diff --git a/nbodykit/source/catalog/tests/__init__.py b/nbodykit/source/catalog/tests/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/nbodykit/tests/test_particles.py b/nbodykit/source/catalog/tests/test_catalog.py similarity index 88% rename from nbodykit/tests/test_particles.py rename to nbodykit/source/catalog/tests/test_catalog.py index 4167655f8..fb0b963d2 100644 --- a/nbodykit/tests/test_particles.py +++ b/nbodykit/source/catalog/tests/test_catalog.py @@ -14,7 +14,7 @@ def test_lognormal_sparse(comm): CurrentMPIComm.set(comm) # this should generate 15 particles - source = Source.LogNormal(Plin=cosmology.EHPower(cosmo, 0.55), + source = LogNormalCatalog(Plin=cosmology.EHPower(cosmo, 0.55), nbar=1e-5, BoxSize=128., Nmesh=8, seed=42) mesh = source.to_mesh(compensated=False) @@ -27,7 +27,7 @@ def test_lognormal_dense(comm): cosmo = cosmology.Planck15 CurrentMPIComm.set(comm) - source = Source.LogNormal(Plin=cosmology.EHPower(cosmo, 0.55), + source = LogNormalCatalog(Plin=cosmology.EHPower(cosmo, 0.55), nbar=0.2e-2, BoxSize=128., Nmesh=8, seed=42) mesh = source.to_mesh(compensated=False) @@ -39,7 +39,7 @@ def test_lognormal_velocity(comm): cosmo = cosmology.Planck15 CurrentMPIComm.set(comm) - source = Source.LogNormal(Plin=cosmology.EHPower(cosmo, 0.55), + source = LogNormalCatalog(Plin=cosmology.EHPower(cosmo, 0.55), nbar=0.5e-2, BoxSize=1024., Nmesh=32, seed=42) source['Weight'] = source['Velocity'][:, 0] ** 2 @@ -61,7 +61,7 @@ def test_transform(comm): ('Velocity', ('f4', 3))] ) - source = Source.Array(data, BoxSize=100, Nmesh=32) + source = ArrayCatalog(data, BoxSize=100, Nmesh=32) source['Velocity'] = source['Position'] + source['Velocity'] @@ -78,7 +78,7 @@ def test_transform(comm): def test_tomesh(comm): CurrentMPIComm.set(comm) - source = Source.UniformParticles(nbar=0.2e-2, BoxSize=1024., seed=42) + source = UniformCatalog(nbar=0.2e-2, BoxSize=1024., seed=42) source['Weight0'] = source['Velocity'][:, 0] source['Weight1'] = source['Velocity'][:, 1] source['Weight2'] = source['Velocity'][:, 2] @@ -107,11 +107,11 @@ def test_save(comm): tmpfile = comm.bcast(tmpfile) - source = Source.UniformParticles(nbar=0.2e-2, BoxSize=1024., seed=42) + source = UniformCatalog(nbar=0.2e-2, BoxSize=1024., seed=42) source.save(tmpfile, ['Position', 'Velocity']) - source2 = Source.File(BigFile, tmpfile, Nmesh=32, args=dict(header='Header')) + source2 = BigFileCatalog(tmpfile, header='Header', attrs={"Nmesh":32}) for k in source.attrs: assert_array_equal(source2.attrs[k], source.attrs[k]) @@ -151,7 +151,7 @@ def test_file(comm): cosmo = cosmology.Planck15 CurrentMPIComm.set(comm) - source = Source.File(HDFFile, tmpfile, Nmesh=32, args={'root': 'X'}) + source = HDFCatalog(tmpfile, root='X', attrs={"Nmesh":32}) assert_allclose(source['Position'], dset['Position']) diff --git a/nbodykit/tests/test_hod.py b/nbodykit/source/catalog/tests/test_hod.py similarity index 87% rename from nbodykit/tests/test_hod.py rename to nbodykit/source/catalog/tests/test_hod.py index 2052a23b1..28e245a23 100644 --- a/nbodykit/tests/test_hod.py +++ b/nbodykit/source/catalog/tests/test_hod.py @@ -14,7 +14,7 @@ def test_hod(comm): BoxSize = 512 # lognormal particles - source = Source.LogNormal(Plin=cosmology.EHPower(cosmo, redshift), + source = LogNormalCatalog(Plin=cosmology.EHPower(cosmo, redshift), nbar=3e-3, BoxSize=BoxSize, Nmesh=128, seed=42) # run FOF @@ -22,7 +22,7 @@ def test_hod(comm): halos = r.to_halos(cosmo=cosmo, redshift=redshift, particle_mass=1e12, mdef='vir') # make the HOD catalog from halotools catalog - hod = Source.HOD(halos.to_halotools(), seed=42) + hod = HODCatalog(halos.to_halotools(), seed=42) # RSD offset in 'z' direction hod['Position'] += hod['VelocityOffset'] * [0, 0, 1] diff --git a/nbodykit/source/particle/uniform.py b/nbodykit/source/catalog/uniform.py similarity index 91% rename from nbodykit/source/particle/uniform.py rename to nbodykit/source/catalog/uniform.py index a6f4abd02..911cb6134 100644 --- a/nbodykit/source/particle/uniform.py +++ b/nbodykit/source/catalog/uniform.py @@ -1,10 +1,10 @@ -from nbodykit.base.particles import ParticleSource, column +from nbodykit.base.catalog import CatalogSource, column from nbodykit import CurrentMPIComm from numpy.random import RandomState import numpy -from contextlib import contextmanager -from functools import wraps +import functools +import contextlib N_PER_SEED = 100000 @@ -15,7 +15,7 @@ def _mpi_enabled_rng(func): Designed to be used with :class:`MPIRandomState` """ - @wraps(func) + @functools.wraps(func) def func_wrapper(*args, **kwargs): self = func.__self__ @@ -112,7 +112,7 @@ def __getattribute__(self, name): attr = _mpi_enabled_rng(attr) return attr - @contextmanager + @contextlib.contextmanager def seeded_context(self, seed): """ A context manager to set and then restore the random seed @@ -122,9 +122,9 @@ def seeded_context(self, seed): yield self.set_state(startstate) -class RandomParticles(ParticleSource): +class RandomCatalog(CatalogSource): """ - A source of particles that can have columns added via a + A catalog source that can have columns added via a collective random number generator The random number generator stored as :attr:`rng` behaves @@ -133,7 +133,7 @@ class RandomParticles(ParticleSource): the number of ranks """ def __repr__(self): - return "RandomParticles(seed=%(seed)s)" % self.attrs + return "RandomCatalog(seed=%(seed)s)" % self.attrs @CurrentMPIComm.enable def __init__(self, csize, seed=None, comm=None, use_cache=False): @@ -157,23 +157,23 @@ def __init__(self, csize, seed=None, comm=None, use_cache=False): self._size = self.rng.size # init the base class - ParticleSource.__init__(self, comm=comm, use_cache=use_cache) + CatalogSource.__init__(self, comm=comm, use_cache=use_cache) @property def size(self): return self._size -class UniformParticles(RandomParticles): +class UniformCatalog(RandomCatalog): """ - A Source which has uniformly-distributed ``Position`` + A catalog source that has uniformly-distributed ``Position`` and ``Velocity`` columns The random numbers generated do not depend on the number of ranks, i.e., ``comm.size``. """ def __repr__(self): - return "UniformParticles(seed=%(seed)s)" % self.attrs + return "UniformCatalog(seed=%(seed)s)" % self.attrs @CurrentMPIComm.enable def __init__(self, nbar, BoxSize, seed=None, comm=None, use_cache=False): @@ -199,7 +199,7 @@ def __init__(self, nbar, BoxSize, seed=None, comm=None, use_cache=False): N = rng.poisson(nbar * numpy.prod(self.attrs['BoxSize'])) if N == 0: raise ValueError("no uniform particles generated, try increasing `nbar` parameter") - RandomParticles.__init__(self, N, seed=seed, comm=comm, use_cache=use_cache) + RandomCatalog.__init__(self, N, seed=seed, comm=comm, use_cache=use_cache) self._pos = self.rng.uniform(size=(self._size, 3)) * self.attrs['BoxSize'] self._vel = self.rng.uniform(size=(self._size, 3)) * self.attrs['BoxSize'] * 0.01 diff --git a/nbodykit/source/mesh/tests/__init__.py b/nbodykit/source/mesh/tests/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/nbodykit/tests/test_grid.py b/nbodykit/source/mesh/tests/test_grid.py similarity index 89% rename from nbodykit/tests/test_grid.py rename to nbodykit/source/mesh/tests/test_grid.py index 7c8e5d586..fe95287ed 100644 --- a/nbodykit/tests/test_grid.py +++ b/nbodykit/source/mesh/tests/test_grid.py @@ -20,7 +20,7 @@ def test_preview(comm): # linear grid Plin = cosmology.EHPower(cosmo, redshift=0.55) - source = Source.LinearMesh(Plin, Nmesh=64, BoxSize=512, seed=42) + source = LinearMesh(Plin, Nmesh=64, BoxSize=512, seed=42) real = source.to_field('real') @@ -39,8 +39,8 @@ def test_memory_grid(comm): real = pm.generate_whitenoise(mode='real', seed=3333) complex = real.r2c() - realmesh = Source.MemoryMesh(real) - complexmesh = Source.MemoryMesh(complex) + realmesh = MemoryMesh(real) + complexmesh = MemoryMesh(complex) assert_array_equal(real, realmesh.to_field(mode='real')) assert_array_equal(complex, complexmesh.to_field(mode='complex')) @@ -59,7 +59,7 @@ def test_linear_grid(comm): # linear grid Plin = cosmology.EHPower(cosmo, redshift=0.55) - source = Source.LinearMesh(Plin, Nmesh=64, BoxSize=512, seed=42) + source = LinearMesh(Plin, Nmesh=64, BoxSize=512, seed=42) # compute P(k) from linear grid r = FFTPower(source, mode='1d', Nmesh=64, dk=0.01, kmin=0.005) @@ -93,7 +93,7 @@ def test_bigfile_grid(comm): # input linear mesh Plin = cosmology.EHPower(cosmo, redshift=0.55) - source = Source.LinearMesh(Plin, BoxSize=512, Nmesh=64, seed=42) + source = LinearMesh(Plin, BoxSize=512, Nmesh=64, seed=42) real = source.paint(mode='real') complex = source.paint(mode="complex") @@ -108,7 +108,7 @@ def test_bigfile_grid(comm): real.save(output, dataset='Field') # now load it and paint to the algorithm's ParticleMesh - source = Source.BigFileMesh(path=output, dataset='Field') + source = BigFileMesh(path=output, dataset='Field') loaded_real = source.paint() # compare to direct algorithm result @@ -117,7 +117,7 @@ def test_bigfile_grid(comm): complex.save(output, dataset='FieldC') # now load it and paint to the algorithm's ParticleMesh - source = Source.BigFileMesh(path=output, dataset='FieldC') + source = BigFileMesh(path=output, dataset='FieldC') loaded_real = source.paint(mode="complex") # compare to direct algorithm result diff --git a/nbodykit/source/particle/__init__.py b/nbodykit/source/particle/__init__.py deleted file mode 100644 index 12ddaef4a..000000000 --- a/nbodykit/source/particle/__init__.py +++ /dev/null @@ -1,8 +0,0 @@ -from .from_file import File -from .from_numpy import Array -from .lognormal import LogNormal -from .uniform import UniformParticles, RandomParticles -from .fkp import FKPCatalog -from .halos import HaloCatalog -from .hod import HOD - diff --git a/nbodykit/source/particle/from_file.py b/nbodykit/source/particle/from_file.py deleted file mode 100644 index 398f3d964..000000000 --- a/nbodykit/source/particle/from_file.py +++ /dev/null @@ -1,72 +0,0 @@ -from nbodykit.io.stack import FileStack -from nbodykit.base.particles import ParticleSource -from nbodykit import CurrentMPIComm -import numpy - -class File(ParticleSource): - """ - Read a source of particles from a single file, or multiple - files, on disk - """ - @CurrentMPIComm.enable - def __init__(self, filetype, path, args={}, comm=None, use_cache=False, **kwargs): - """ - Parameters - ---------- - filetype : subclass of :class:`nbodykit.io.FileType` - the file-like class used to load the data from file; should be a - subclass of :class:`nbodykit.io.FileType` - path : str, list of str - the path to the file(s) to load; can be a list of files to load, or - contain a glob-like asterisk pattern - args : dict, optional - the arguments to pass to the ``filetype`` class when constructing - each file object - **kwargs : - additional keywords are stored as meta-data in the :attr:`attrs` dict - """ - self.comm = comm - self.filetype = filetype - - # bcast the FileStack - if self.comm.rank == 0: - self._source = FileStack(path, filetype, **args) - else: - self._source = None - self._source = self.comm.bcast(self._source) - - # update the meta-data - self.attrs.update(self._source.attrs) - self.attrs.update(kwargs) - - if self.comm.rank == 0: - self.logger.info("Extra arguments to FileType: %s" %args) - - ParticleSource.__init__(self, comm=comm, use_cache=use_cache) - - @property - def size(self): - """ - The local size - """ - start = self.comm.rank * self._source.size // self.comm.size - end = (self.comm.rank + 1) * self._source.size // self.comm.size - return end - start - - @property - def hardcolumns(self): - """ - The union of the columns in the file and any transformed columns - """ - return list(self._source.dtype.names) - - def get_hardcolumn(self, col): - """ - Return a column from the underlying file source - - Columns are returned as dask arrays - """ - start = self.comm.rank * self._source.size // self.comm.size - end = (self.comm.rank + 1) * self._source.size // self.comm.size - return self._source.get_dask(col)[start:end] - diff --git a/nbodykit/tests/test_lab.py b/nbodykit/tests/test_lab.py index 4ad1e167b..5a6edb67f 100644 --- a/nbodykit/tests/test_lab.py +++ b/nbodykit/tests/test_lab.py @@ -12,7 +12,7 @@ def test_fftpower(comm): CurrentMPIComm.set(comm) # lognormal particles - source = Source.LogNormal(Plin=cosmology.EHPower(cosmo, redshift=0.55), + source = LogNormalCatalog(Plin=cosmology.EHPower(cosmo, redshift=0.55), nbar=3e-7, BoxSize=1380., Nmesh=8, seed=42) # apply RSD @@ -32,7 +32,7 @@ def test_paint(comm): CurrentMPIComm.set(comm) # lognormal particles - source = Source.LogNormal(Plin=cosmology.EHPower(cosmo, redshift=0.55), + source = LogNormalCatalog(Plin=cosmology.EHPower(cosmo, redshift=0.55), nbar=3e-7, BoxSize=1380., Nmesh=8, seed=42) # apply RSD diff --git a/nbodykit/tests/test_taskmanager.py b/nbodykit/tests/test_taskmanager.py index 72cb39d7c..3c7478f58 100644 --- a/nbodykit/tests/test_taskmanager.py +++ b/nbodykit/tests/test_taskmanager.py @@ -18,7 +18,7 @@ def test_iterate(comm): for seed in tm.iterate([0, 1, 2]): # uniform particles - source = Source.UniformParticles(nbar=3e-7, BoxSize=1380., seed=seed) + source = UniformCatalog(nbar=3e-7, BoxSize=1380., seed=seed) # compute P(k,mu) and multipoles r = FFTPower(source, mode='2d', Nmesh=8, poles=[0,2,4]) @@ -40,7 +40,7 @@ def test_map(comm): def fftpower(seed): # uniform particles - source = Source.UniformParticles(nbar=3e-7, BoxSize=1380., seed=seed) + source = UniformCatalog(nbar=3e-7, BoxSize=1380., seed=seed) # compute P(k,mu) and multipoles r = FFTPower(source, mode='2d', Nmesh=8, poles=[0,2,4])