Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Write dask array to a stack of .npy files #686

Merged
merged 2 commits into from Sep 13, 2015

Conversation

mrocklin
Copy link
Member

@mrocklin mrocklin commented Sep 9, 2015

Fixes #684

This partitions the dask.array along one axis and stores each block along
that axis as a single .npy file in the specified directory

Example

>>> x = da.ones((5, 10, 10), chunks=(2, 4, 4)) 
>>> da.to_npy_stack('data/', x, axis=0)

$ tree data/
data/
|-- 0.npy
|-- 1.npy
|-- 2.npy
|-- info

The .npy files store numpy arrays for x[0:2], x[2:4], and x[4:5]
respectively, as is specified by the chunk size along the zeroth axis. The
info file stores the dtype, chunks, and axis information of the array.

You can load these stacks with the da.from_npy_stack function.

>>> y = da.from_npy_stack('data/') 

cc @rossant

@mrocklin mrocklin mentioned this pull request Sep 9, 2015
Fixes dask#684

This partitions the dask.array along one axis and stores each block along
that axis as a single .npy file in the specified directory

Example
-------

>>> x = da.ones((5, 10, 10), chunks=(2, 4, 4))  # doctest: +SKIP
>>> da.to_npy_stack('data/', x, axis=0)  # doctest: +SKIP

$ tree data/
data/
|-- 0.npy
|-- 1.npy
|-- 2.npy
|-- info

The ``.npy`` files store numpy arrays for ``x[0:2], x[2:4], and x[4:5]``
respectively, as is specified by the chunk size along the zeroth axis.  The
info file stores the dtype, chunks, and axis information of the array.

You can load these stacks with the ``da.from_npy_stack`` function.

>>> y = da.from_npy_stack('data/')  # doctest: +SKIP

name = 'from-npy-stack-%s' % dirname
keys = list(product([name], *[range(len(c)) for c in chunks]))
values = [(np.load, os.path.join(dirname, '%d.npy' % i))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're not using memmap mode here, but you probably should if you want this to be performant for partial reads.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added as a kwarg with default set to use memmap mode.

@rossant
Copy link

rossant commented Sep 10, 2015

looks great, thanks!

@rossant
Copy link

rossant commented Sep 10, 2015

How could I call to_npy_stack() somewhere in a graph? i.e. something like:

x -> (f(x1), ..., f(xn)) -> to_npy_stack() -> (g(f(x1)), ..., g(f(xn))) -> to_npy_stack()

The idea being to cache some intermediate steps during the computation. Ideally each subtask would save its result as soon as it has finished.

@mrocklin
Copy link
Member Author

Usually dask.array functions aren't part of graphs, they create them. What are your thoughts on using it as a side effect? What are you trying to accomplish?

def cache_npy(x, dirname):
    x.to_npy_array(dirname)
    return da.from_npy_array(dirname)

@rossant
Copy link

rossant commented Sep 11, 2015

I think I'll wrap tasks with a @cache decorator such that each task saves its result in a .npy file. Then I'll just have to manually create the info file and I'll able to recreate the dask array with from_npy_array. In other words, I'll use a graph to build a big dask array chunk-by-chunk.

@rossant
Copy link

rossant commented Sep 11, 2015

So to be clear I may need the following extra functions:

to_npy(dirname, chunk, chunk_array)  # save a single chunk to a .npy file
to_npy_info(dirname, dtype=None, shape=None, chunks=None)  # save the pickle metadata file

to_npy_stack() could call them. This would allow me to create the stacked array in parallel.

@mrocklin
Copy link
Member Author

def to_npy_info(dirname, dtype, chunks, axis):
    with open(os.path.join(dirname, 'info'), 'wb') as f:
        pickle.dump({'chunks': chunks, 'dtype': x.dtype, 'axis': axis}, f)

def to_npy(dirname, chunk, chunk_array):
    np.save(os.path.join(dirname, '%d.npy' % chunk, chunk_array)

These are simple enough that I'd rather keep them inlined for code clarity

@rossant
Copy link

rossant commented Sep 11, 2015

Thanks! That's actually what I intended to do initially.

Is this format expected to change in the future? My users are going to save a lot of processed data in this format (in an .internal subfolder). There will be a function to export from this format to a more common format (like a single big .npy file, HDF5, etc.). If the format or API changes in a future version of dask, this export function won't work.

The alternative could be to only use this stacked format temporarily when the tasks run (to avoid multiprocessing concurrency issues), and immediately concatenate all of these chunk arrays together in a single big .npy file (tens of GB or more). This will take more processing time, but I guess it would be more robust wrt to potential future format changes in dask.

@mrocklin
Copy link
Member Author

This format is custom; so far there is only one source of input (yourself), so I hesitate to say that it's stable. It is possible that other use cases arise that necessitate changing it slightly.

However, if a community depends on it then we can probably keep it stable or at least communicate well with you about when it would change.

Before promising stability I would like to see how it gets used in practice. Have you had a chance to try this out? How does it perform? Does it satisfy the needs of your users?

Alternatively, you could always create these functions in your own code. The to_npy_stack and from_npy_stack functions could live anywhere, not just the dask.array codebase.

@rossant
Copy link

rossant commented Sep 11, 2015

I haven't tried this extensively yet. To give you some context, I'm currently starting a significant refactoring of my code, which was previously based on HDF5 for the file format. Following many problems with HDF5, we've decided to move to an internal undocumented store of npy files with export functions to various formats.

At this point I'm still investigating whether dask could be part of our processing software. I'm a bit cautious regarding new dependencies, as we've had many problems in the past with Python libraries making sudden breaking changes. dask does look really promising, though; it's quite lightweight and uses simple data structures.

I'm mainly looking at two things right now in dask: cache/persistence, and parallel processing (mainly IPython.parallel). The documentation mentions that arrays are serialized with multiprocessing, which might be a problem in our case where we'd exchange huge arrays within a network. So far for communication between tasks I've been saving temporary arrays on an SSD-based network file system, which seems efficient enough. Using dask would simplify the code and make it more easily extendable. Therefore I was trying to see if I could use the same trick.

Finally, I wouldn't mind if this PR lived in my codebase for now. If it's ending up working very well, we could always merge this upstream later on.

@mrocklin
Copy link
Member Author

To be clear, dask.array uses a threaded single-node scheduler by default. How much data do you have, what is your application? What are your computing needs? Is a single node with multiple threads insufficient?

From my perspective the ideal route is to merge the to_npy_stack and from_npy_stack functions in close to their current form, have you work with them a bit and provide feedback. I think that it's a useful and simple format. I'm just wary of calling it stable before it has been exercised a bit.

@rossant
Copy link

rossant commented Sep 11, 2015

Typically I have tens of GB of raw data and hundreds of GB of processed data. When using a single CPU and thread, processing time is counted in days. Raw data consists of multielectrode neural recordings: multichannel data sampled at 16 bit and 20 kHz (a bit like a .wav file, but with hundreds to thousands of channels). A single experiment can last hours or days.

The processing step I'd consider dask for is spike detection: applying various filters and specialized graph-theoric algorithms to detect spiking events in continuous data.

The problem is embarassingly parallel: I can split the raw data in, say, 5-second overlapping chunks (there can be thousands of them), and every chunk can be processed in parallel. However, there are several synchronization points where relatively small amounts of data are exchanged.

Computations are CPU-bound, written in pure Python/NumPy. AFAIK I cannot use a single process and several CPUs for this kind of computation. I need several processes, and I thought about using the multiprocessing scheduler.

FYI the code is here: https://github.com/kwikteam/phy/blob/master/phy/detect/spikedetekt.py

@mrocklin
Copy link
Member Author

Is it possible that your computations could be accelerated? Processing hundreds of GBs on a smaller machine is quite doable if you're able to ensure that bottleneck code is somewhat fast. NumPy has no problem running in multiple threads. I usually switch to numba with @jit(nopython=True, nogil=True) for more custom algorithms and have had pretty good success with it (though perhaps that's to be expected given where I work).

What part of your computation is your bottleneck?

@rossant
Copy link

rossant commented Sep 11, 2015

There is no one bottleneck: all the different parts of the algorithm take a similar amount of time. Accelerating the implementation might be possible, but it would require a lot of work and testing. An important part of the algorithm has been optimized with Python structures like lists and dictionaries, it has been extensively tested, and it would take a lot of effort to rewrite it with something like Numba. It is just simpler for us at this point to parallelize the computations. We have cheap and easy access to many CPUs and we don't make use of them at all at the moment.

@mrocklin
Copy link
Member Author

OK, makes sense.

@mrocklin
Copy link
Member Author

I plan to merge this soon if there are no further comments.

@rossant
Copy link

rossant commented Sep 13, 2015

fine by me!

mrocklin added a commit that referenced this pull request Sep 13, 2015
Write dask array to a stack of .npy files
@mrocklin mrocklin merged commit 5b40bbd into dask:master Sep 13, 2015
@mrocklin mrocklin deleted the to-npy-stack branch September 13, 2015 14:15
@sinhrks sinhrks added this to the 0.7.2 milestone Sep 25, 2015
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants