Skip to content

Commit

Permalink
Merge pull request #686 from mrocklin/to-npy-stack
Browse files Browse the repository at this point in the history
Write dask array to a stack of .npy files
  • Loading branch information
mrocklin committed Sep 13, 2015
2 parents b8df7d6 + 9d6b9f0 commit 5b40bbd
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 1 deletion.
2 changes: 1 addition & 1 deletion dask/array/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from .core import (Array, stack, concatenate, take, tensordot, transpose,
from_array, choose, where, coarsen, insert, broadcast_to, fromfunction,
unique, store, squeeze, topk, bincount, histogram, map_blocks, atop,
to_hdf5, dot, cov, array)
to_hdf5, dot, cov, array, to_npy_stack, from_npy_stack)
from .core import (logaddexp, logaddexp2, conj, exp, log, log2, log10, log1p,
expm1, sqrt, square, sin, cos, tan, arcsin, arccos, arctan, arctan2,
hypot, sinh, cosh, tanh, arcsinh, arccosh, arctanh, deg2rad, rad2deg,
Expand Down
81 changes: 81 additions & 0 deletions dask/array/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
memoize, map, groupby, valmap, accumulate, merge,
curry, reduce, interleave, sliding_window, partial)
import numpy as np
import os
import pickle
import uuid

from threading import Lock
from . import chunk
Expand Down Expand Up @@ -1121,6 +1124,7 @@ def squeeze(self):
return squeeze(self)

def rechunk(self, chunks):
""" See da.rechunk for docstring """
from .rechunk import rechunk
return rechunk(self, chunks)

Expand Down Expand Up @@ -2614,3 +2618,80 @@ def cov(m, y=None, rowvar=1, bias=0, ddof=None):
return (dot(X.T, X.conj()) / fact).squeeze()
else:
return (dot(X, X.T.conj()) / fact).squeeze()


def to_npy_stack(dirname, x, axis=0):
""" Write dask array to a stack of .npy files
This partitions the dask.array along one axis and stores each block along
that axis as a single .npy file in the specified directory
Example
-------
>>> x = da.ones((5, 10, 10), chunks=(2, 4, 4)) # doctest: +SKIP
>>> da.to_npy_stack('data/', x, axis=0) # doctest: +SKIP
$ tree data/
data/
|-- 0.npy
|-- 1.npy
|-- 2.npy
|-- info
The ``.npy`` files store numpy arrays for ``x[0:2], x[2:4], and x[4:5]``
respectively, as is specified by the chunk size along the zeroth axis. The
info file stores the dtype, chunks, and axis information of the array.
You can load these stacks with the ``da.from_npy_stack`` function.
>>> y = da.from_npy_stack('data/') # doctest: +SKIP
See also:
from_npy_stack
"""

chunks = tuple((c if i == axis else (sum(c),))
for i, c in enumerate(x.chunks))
xx = x.rechunk(chunks)

if not os.path.exists(dirname):
os.path.mkdir(dirname)

meta = {'chunks': chunks, 'dtype': x.dtype, 'axis': axis}

with open(os.path.join(dirname, 'info'), 'wb') as f:
pickle.dump(meta, f)

name = 'to-npy-stack-' + str(uuid.uuid1())
dsk = dict(((name, i), (np.save, os.path.join(dirname, '%d.npy' % i), key))
for i, key in enumerate(core.flatten(xx._keys())))

Array._get(merge(dsk, xx.dask), list(dsk))

def from_npy_stack(dirname, mmap_mode='r'):
""" Load dask array from stack of npy files
See ``da.to_npy_stack`` for docstring
Parameters
----------
dirname: string
Directory of .npy files
mmap_mode: (None or 'r')
Read data in memory map mode
"""
with open(os.path.join(dirname, 'info'), 'rb') as f:
info = pickle.load(f)

dtype = info['dtype']
chunks = info['chunks']
axis = info['axis']

name = 'from-npy-stack-%s' % dirname
keys = list(product([name], *[range(len(c)) for c in chunks]))
values = [(np.load, os.path.join(dirname, '%d.npy' % i), mmap_mode)
for i in range(len(chunks[axis]))]
dsk = dict(zip(keys, values))

return Array(dsk, name, chunks, dtype)
34 changes: 34 additions & 0 deletions dask/array/tests/test_array_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
pytest.importorskip('numpy')

from operator import add
from tempfile import mkdtemp
import shutil
import os

from toolz import merge
from toolz.curried import identity
Expand Down Expand Up @@ -1419,3 +1422,34 @@ def test_cov():
assert eq(da.cov(e, d), np.cov(y, x))

assert raises(ValueError, lambda: da.cov(d, ddof=1.5))


def test_memmap():
with tmpfile('npy') as fn_1:
with tmpfile('npy') as fn_2:
x = da.arange(100, chunks=15)
target = np.memmap(fn_1, shape=x.shape, mode='w+', dtype=x.dtype)

x.store(target)

assert eq(target, x)

np.save(fn_2, target)

assert eq(np.load(fn_2, mmap_mode='r'), x)


def test_to_npy_stack():
x = np.arange(5*10*10).reshape((5, 10, 10))
d = da.from_array(x, chunks=(2, 4, 4))

dirname = mkdtemp()
try:
da.to_npy_stack(dirname, d, axis=0)
assert os.path.exists(os.path.join(dirname, '0.npy'))
assert (np.load(os.path.join(dirname, '1.npy')) == x[2:4]).all()

e = da.from_npy_stack(dirname)
assert eq(d, e)
finally:
shutil.rmtree(dirname)

0 comments on commit 5b40bbd

Please sign in to comment.