Skip to content

Commit

Permalink
Merge b6c6574 into db2610f
Browse files Browse the repository at this point in the history
  • Loading branch information
rainwoodman committed Dec 11, 2018
2 parents db2610f + b6c6574 commit 4e32c33
Showing 1 changed file with 49 additions and 21 deletions.
70 changes: 49 additions & 21 deletions nbodykit/base/catalog.py
Expand Up @@ -564,9 +564,10 @@ def save(self, output, columns, dataset=None, datasets=None, header='Header'):
datasets : list of str, optional
names for the data set where each column is stored; defaults to
the name of the column (deprecated)
header : str, optional
header : str, optional, or None
the name of the data set holding the header information, where
:attr:`attrs` is stored
if header is None, do not save the header.
"""
import bigfile
import json
Expand All @@ -590,29 +591,56 @@ def save(self, output, columns, dataset=None, datasets=None, header='Header'):
raise ValueError("`datasets` must have the same length as `columns`")

with bigfile.FileMPI(comm=self.comm, filename=output, create=True) as ff:
try:
bb = ff.open(header)
except:
bb = ff.create(header)
with bb :
for key in self.attrs:
try:
bb.attrs[key] = self.attrs[key]
except ValueError:
try:
json_str = 'json://'+json.dumps(self.attrs[key], cls=JSONEncoder)
bb.attrs[key] = json_str
except:
raise ValueError("cannot save '%s' key in attrs dictionary" % key)

for column, dataset in zip(columns, datasets):
c = self[column]
array = self[column]
# ensure data is only chunked in the first dimension
size = self.comm.allreduce(len(array))
offset = numpy.sum(self.comm.allgather(len(array))[:self.comm.rank], dtype='i8')

# sane value -- 32 million items per physical file
sizeperfile = 32 * 1024 * 1024

Nfile = (size + sizeperfile - 1) // sizeperfile

dtype = numpy.dtype((array.dtype, array.shape[1:]))

# save column attrs too
with ff.create_from_array(dataset, c) as bb:
if hasattr(c, 'attrs'):
for key in c.attrs:
bb.attrs[key] = c.attrs[key]
with ff.create(dataset, dtype, size, Nfile) as bb:
def work(block, block_info=None):
block_info = block_info[0] # first arg
# chunked in the first dimension, thus the start
# of first dim is the offset of write
loffset = block_info['array-location'][0][0]
print(offset + loffset, size)
bb.write(offset + loffset, block)
return 0

array1 = array.rechunk(chunks=_global_options['dask_chunk_size'])
# do the writing in parallel
array1.map_blocks(work, dtype='i4').compute()

if hasattr(array, 'attrs'):
for key in array.attrs:
bb.attrs[key] = array.attrs[key]

# writer header afterwards, such that header can be a block that saves
# data.
if header is not None:
try:
bb = ff.open(header)
except:
bb = ff.create(header)
with bb :
for key in self.attrs:
try:
bb.attrs[key] = self.attrs[key]
except ValueError:
try:
json_str = 'json://'+json.dumps(self.attrs[key], cls=JSONEncoder)
bb.attrs[key] = json_str
except:
raise ValueError("cannot save '%s' key in attrs dictionary" % key)


def read(self, columns):
"""
Expand Down

0 comments on commit 4e32c33

Please sign in to comment.