Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Write dask array to a stack of .npy files #686

Merged
merged 2 commits into from
Sep 13, 2015
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion dask/array/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from .core import (Array, stack, concatenate, take, tensordot, transpose,
from_array, choose, where, coarsen, insert, broadcast_to, fromfunction,
unique, store, squeeze, topk, bincount, histogram, map_blocks, atop,
to_hdf5, dot, cov, array)
to_hdf5, dot, cov, array, to_npy_stack, from_npy_stack)
from .core import (logaddexp, logaddexp2, conj, exp, log, log2, log10, log1p,
expm1, sqrt, square, sin, cos, tan, arcsin, arccos, arctan, arctan2,
hypot, sinh, cosh, tanh, arcsinh, arccosh, arctanh, deg2rad, rad2deg,
Expand Down
71 changes: 71 additions & 0 deletions dask/array/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
memoize, map, groupby, valmap, accumulate, merge,
curry, reduce, interleave, sliding_window, partial)
import numpy as np
import os
import pickle
import uuid

from threading import Lock
from . import chunk
Expand Down Expand Up @@ -1089,6 +1092,7 @@ def squeeze(self):
return squeeze(self)

def rechunk(self, chunks):
""" See da.rechunk for docstring """
from .rechunk import rechunk
return rechunk(self, chunks)

Expand Down Expand Up @@ -2580,3 +2584,70 @@ def cov(m, y=None, rowvar=1, bias=0, ddof=None):
return (dot(X.T, X.conj()) / fact).squeeze()
else:
return (dot(X, X.T.conj()) / fact).squeeze()


def to_npy_stack(dirname, x, axis=0):
""" Write dask array to a stack of .npy files

This partitions the dask.array along one axis and stores each block along
that axis as a single .npy file in the specified directory

Example
-------

>>> x = da.ones((5, 10, 10), chunks=(2, 4, 4)) # doctest: +SKIP
>>> da.to_npy_stack('data/', x, axis=0) # doctest: +SKIP

$ tree data/
data/
|-- 0.npy
|-- 1.npy
|-- 2.npy
|-- info

The ``.npy`` files store numpy arrays for ``x[0:2], x[2:4], and x[4:5]``
respectively, as is specified by the chunk size along the zeroth axis. The
info file stores the dtype, chunks, and axis information of the array.

You can load these stacks with the ``da.from_npy_stack`` function.

>>> y = da.from_npy_stack('data/') # doctest: +SKIP

See also:
from_npy_stack
"""

chunks = tuple((c if i == axis else (sum(c),))
for i, c in enumerate(x.chunks))
xx = x.rechunk(chunks)

if not os.path.exists(dirname):
os.path.mkdir(dirname)

meta = {'chunks': chunks, 'dtype': x.dtype, 'axis': axis}

with open(os.path.join(dirname, 'info'), 'wb') as f:
pickle.dump(meta, f)

name = 'to-npy-stack-' + str(uuid.uuid1())
dsk = dict(((name, i), (np.save, os.path.join(dirname, '%d.npy' % i), key))
for i, key in enumerate(core.flatten(xx._keys())))

Array._get(merge(dsk, xx.dask), list(dsk))

def from_npy_stack(dirname):
""" See da.to_npy_stack for docstring """
with open(os.path.join(dirname, 'info'), 'rb') as f:
info = pickle.load(f)

dtype = info['dtype']
chunks = info['chunks']
axis = info['axis']

name = 'from-npy-stack-%s' % dirname
keys = list(product([name], *[range(len(c)) for c in chunks]))
values = [(np.load, os.path.join(dirname, '%d.npy' % i))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're not using memmap mode here, but you probably should if you want this to be performant for partial reads.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added as a kwarg with default set to use memmap mode.

for i in range(len(chunks[axis]))]
dsk = dict(zip(keys, values))

return Array(dsk, name, chunks, dtype)
34 changes: 34 additions & 0 deletions dask/array/tests/test_array_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
pytest.importorskip('numpy')

from operator import add
from tempfile import mkdtemp
import shutil
import os

from toolz import merge
from toolz.curried import identity
Expand Down Expand Up @@ -1401,3 +1404,34 @@ def test_cov():
assert eq(da.cov(e, d), np.cov(y, x))

assert raises(ValueError, lambda: da.cov(d, ddof=1.5))


def test_memmap():
with tmpfile('npy') as fn_1:
with tmpfile('npy') as fn_2:
x = da.arange(100, chunks=15)
target = np.memmap(fn_1, shape=x.shape, mode='w+', dtype=x.dtype)

x.store(target)

assert eq(target, x)

np.save(fn_2, target)

assert eq(np.load(fn_2, mmap_mode='r'), x)


def test_to_npy_stack():
x = np.arange(5*10*10).reshape((5, 10, 10))
d = da.from_array(x, chunks=(2, 4, 4))

dirname = mkdtemp()
try:
da.to_npy_stack(dirname, d, axis=0)
assert os.path.exists(os.path.join(dirname, '0.npy'))
assert (np.load(os.path.join(dirname, '1.npy')) == x[2:4]).all()

e = da.from_npy_stack(dirname)
assert eq(d, e)
finally:
shutil.rmtree(dirname)