Skip to content

Commit

Permalink
Throttle during painting.
Browse files Browse the repository at this point in the history
If some ranks receive too many partilces, throttle the chunksize.
This fixed random OOM errors, which may be the true cause of #525.

The current solution is not ideal. It is possible that the number
of particles on some ranks will not decrease as we decrease chunksize.

We can look into fixing that when it becomes a real problem.
  • Loading branch information
rainwoodman committed Nov 2, 2018
1 parent 1ba046c commit 96b079e
Showing 1 changed file with 47 additions and 13 deletions.
60 changes: 47 additions & 13 deletions nbodykit/source/mesh/catalog.py
Expand Up @@ -231,10 +231,10 @@ def to_real_field(self, out=None, normalize=True):

# paint data in chunks on each rank;
# we do this by chunk 8 million is pretty big anyways.
chunksize = _global_options['paint_chunk_size']
for i in range(0, Nlocalmax, chunksize):
s = slice(i, i + chunksize)
max_chunksize = _global_options['paint_chunk_size']

# use a local scope to avoid having two copies of data in memory
def dochunk(s):
if len(Position) != 0:

# selection has to be computed many times when data is `large`.
Expand Down Expand Up @@ -268,37 +268,71 @@ def to_real_field(self, out=None, normalize=True):
value = numpy.ones(len(position))

# track total (selected) number and sum of weights
Nlocal += len(position)
Wlocal += weight.sum()
Nlocal = len(position)
Wlocal = weight.sum()

# no interlacing
if not self.interlaced:
lay = pm.decompose(position, smoothing=0.5 * resampler.support)
p = lay.exchange(position)
w = lay.exchange(weight)
v = lay.exchange(value)
else:
lay = pm.decompose(position, smoothing=1.0 * resampler.support)

# if we are receiving too many particles, abort and retry with a smaller chunksize
if any(pm.comm.allgather(lay.newlength > 2 * max_chunksize)):
if pm.comm.rank == 0:
self.logger.info("Throttling chunksize as some ranks will receive too many particles.")
raise StopIteration

p = lay.exchange(position)
w = lay.exchange(weight)
v = lay.exchange(value)

if not self.interlaced:
pm.paint(p, mass=w * v, resampler=resampler, hold=True, out=toret)

# interlacing: use 2 meshes separated by 1/2 cell size
else:
lay = pm.decompose(position, smoothing=1.0 * resampler.support)
p = lay.exchange(position)
w = lay.exchange(weight)
v = lay.exchange(value)

# in mesh units
shifted = pm.affine.shift(0.5)

# paint to two shifted meshes
pm.paint(p, mass=w * v, resampler=resampler, hold=True, out=real1)
pm.paint(p, mass=w * v, resampler=resampler, transform=shifted, hold=True, out=real2)

return Nlocal, Wlocal

import gc
i = 0
chunksize = max_chunksize
while i < Nlocalmax:

s = slice(i, i + chunksize)

if pm.comm.rank == 0:
self.logger.info("Chunk %d ~ %d / %d " % (i, i + chunksize, Nlocalmax))

try:
Nlocal1, Wlocal1 = dochunk(s)
except StopIteration:
chunksize = chunksize / 2
if chunksize < 1:
raise RuntimeError("Cannot find a chunksize that fits into memory.")
continue
finally:
# collect unfreed items
gc.collect()

Nlocal += Nlocal1
Wlocal += Wlocal1

Nglobal = pm.comm.allreduce(Nlocal)

if pm.comm.rank == 0:
self.logger.info("painted %d out of %d objects to mesh"
% (Nglobal, self.source.csize))

i = i + chunksize

# now the loop over particles is done

if not self.interlaced:
Expand Down

0 comments on commit 96b079e

Please sign in to comment.