From 32f0fac078783be3dc648a10af51433c915fe73c Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Fri, 7 Jun 2019 02:07:15 +0200 Subject: [PATCH] Add Dask Array._meta attribute (#4543) This adds a `._meta` attribute to Dask Array that records a small example of the chunk type. This should help to maintain metadata about types, such as is useful for sparse and GPU arrays. Fixes #2977 --- dask/array/blockwise.py | 36 +- dask/array/core.py | 88 ++++- dask/array/creation.py | 32 +- dask/array/gufunc.py | 4 +- dask/array/linalg.py | 50 ++- dask/array/overlap.py | 16 +- dask/array/random.py | 10 +- dask/array/rechunk.py | 2 +- dask/array/reductions.py | 129 +++++-- dask/array/reshape.py | 7 +- dask/array/routines.py | 16 +- dask/array/tests/test_array_core.py | 13 +- dask/array/tests/test_cupy.py | 550 +++++++++++++++++++++++++++- dask/array/tests/test_masked.py | 4 +- dask/array/tests/test_sparse.py | 23 +- dask/array/ufunc.py | 20 +- dask/array/utils.py | 111 +++++- dask/array/wrap.py | 93 ++++- dask/dataframe/core.py | 4 +- 19 files changed, 1065 insertions(+), 143 deletions(-) diff --git a/dask/array/blockwise.py b/dask/array/blockwise.py index 5fc23c23131..932f1e966ab 100644 --- a/dask/array/blockwise.py +++ b/dask/array/blockwise.py @@ -3,12 +3,42 @@ import toolz +import numpy as np + from .. import base, utils from ..delayed import unpack_collections from ..highlevelgraph import HighLevelGraph from ..blockwise import blockwise as core_blockwise +def blockwise_meta(func, dtype, *args, **kwargs): + arrays = args[::2] + ndims = [a.ndim if hasattr(a, 'ndim') else 0 for a in arrays] + args_meta = [arg._meta if hasattr(arg, '_meta') else + arg[tuple(slice(0, 0, None) for _ in range(nd))] if nd > 0 else arg + for arg, nd in zip(arrays, ndims)] + kwargs_meta = {k: v._meta if hasattr(v, '_meta') else v for k, v in kwargs.items()} + + # TODO: look for alternative to this, causes issues when using map_blocks() + # with np.vectorize, such as dask.array.routines._isnonzero_vec(). + if isinstance(func, np.vectorize): + meta = func(*args_meta) + return meta.astype(dtype) + + try: + meta = func(*args_meta, **kwargs_meta) + except TypeError: + # The concatenate argument is an argument introduced by this + # function and may not be support by some external functions, + # such as in NumPy + kwargs_meta.pop('concatenate', None) + meta = func(*args_meta, **kwargs_meta) + except ValueError: + return None + + return meta.astype(dtype) + + def blockwise(func, out_ind, *args, **kwargs): """ Tensor operation: Generalized inner and outer products @@ -203,7 +233,11 @@ def blockwise(func, out_ind, *args, **kwargs): "adjust_chunks values must be callable, int, or tuple") chunks = tuple(chunks) - return Array(graph, out, chunks, dtype=dtype) + try: + meta = blockwise_meta(func, dtype, *args, **kwargs) + return Array(graph, out, chunks, meta=meta) + except Exception: + return Array(graph, out, chunks, dtype=dtype) def atop(*args, **kwargs): diff --git a/dask/array/core.py b/dask/array/core.py index 576d3c9a98f..ee8736a471a 100644 --- a/dask/array/core.py +++ b/dask/array/core.py @@ -731,7 +731,7 @@ def store(sources, targets, lock=True, regions=None, compute=True, sources_dsk, list(core.flatten([e.__dask_keys__() for e in sources])) ) - sources2 = [Array(sources_dsk, e.name, e.chunks, e.dtype) for e in sources] + sources2 = [Array(sources_dsk, e.name, e.chunks, meta=e) for e in sources] # Optimize all targets together targets2 = [] @@ -774,7 +774,7 @@ def store(sources, targets, lock=True, regions=None, compute=True, ) result = tuple( - Array(load_store_dsk, 'load-store-%s' % t, s.chunks, s.dtype) + Array(load_store_dsk, 'load-store-%s' % t, s.chunks, meta=s) for s, t in zip(sources, toks) ) @@ -857,28 +857,40 @@ class Array(DaskMethodsMixin): Shape of the entire array chunks: iterable of tuples block sizes along each dimension + dtype : str or dtype + Typecode or data-type for the new Dask Array + meta : empty ndarray + empty ndarray created with same NumPy backend, ndim and dtype as the + Dask Array being created (overrides dtype) See Also -------- dask.array.from_array """ - __slots__ = 'dask', '_name', '_cached_keys', '_chunks', 'dtype' + __slots__ = 'dask', '_name', '_cached_keys', '_chunks', '_meta' - def __new__(cls, dask, name, chunks, dtype, shape=None): + def __new__(cls, dask, name, chunks, dtype=None, meta=None, shape=None): self = super(Array, cls).__new__(cls) assert isinstance(dask, Mapping) if not isinstance(dask, HighLevelGraph): dask = HighLevelGraph.from_collections(name, dask, dependencies=()) self.dask = dask self.name = name - if dtype is None: - raise ValueError("You must specify the dtype of the array") - self.dtype = np.dtype(dtype) + if dtype is not None and meta is not None: + raise TypeError("You must not specify both meta and dtype") + if dtype is None and meta is None: + raise ValueError("You must specify the meta or dtype of the array") - self._chunks = normalize_chunks(chunks, shape, dtype=self.dtype) + self._chunks = normalize_chunks(chunks, shape, dtype=dtype or meta.dtype) if self._chunks is None: raise ValueError(CHUNKS_NONE_ERROR_MESSAGE) + if dtype: + self._meta = np.empty((0,) * self.ndim, dtype=dtype) + else: + from .utils import meta_from_array + self._meta = meta_from_array(meta, meta.ndim) + for plugin in config.get('array_plugins', ()): result = plugin(self) if result is not None: @@ -944,8 +956,8 @@ def chunksize(self): return tuple(max(c) for c in self.chunks) @property - def _meta(self): - return np.empty(shape=(), dtype=self.dtype) + def dtype(self): + return self._meta.dtype def _get_chunks(self): return self._chunks @@ -1217,7 +1229,7 @@ def __setitem__(self, key, value): if isinstance(value, Array) and value.ndim > 1: raise ValueError('boolean index array should have 1 dimension') y = where(key, value, self) - self.dtype = y.dtype + self._meta = y._meta self.dask = y.dask self.name = y.name self._chunks = y.chunks @@ -1267,7 +1279,37 @@ def __getitem__(self, index): dsk, chunks = slice_array(out, self.name, self.chunks, index2) graph = HighLevelGraph.from_collections(out, dsk, dependencies=[self]) - return Array(graph, out, chunks, dtype=self.dtype) + + if isinstance(index2, tuple): + new_index = [] + for i in range(len(index2)): + if not isinstance(index2[i], tuple): + types = [Integral, list, np.ndarray] + cond = any([isinstance(index2[i], t) for t in types]) + new_index.append(slice(0, 0) if cond else index2[i]) + else: + new_index.append(tuple([Ellipsis if j is not None else + None for j in index2[i]])) + new_index = tuple(new_index) + meta = self._meta[new_index].astype(self.dtype) + else: + meta = self._meta[index2].astype(self.dtype) + + # Exception for object dtype and ndim == 1, which results in primitive types + if not (meta.dtype == object and meta.ndim == 1): + + # If meta still has more dimensions than actual data + if meta.ndim > len(chunks): + meta = np.sum(meta, axis=tuple([i for i in range(meta.ndim - len(chunks))])) + + # Ensure all dimensions are 0 + if not np.isscalar(meta): + meta = meta[tuple([slice(0, 0) for i in range(meta.ndim)])] + # If return array is 0-D, ensure _meta is 0-D + if len(chunks) == 0: + meta = meta.sum() + + return Array(graph, out, chunks, meta=meta) def _vindex(self, key): if not isinstance(key, tuple): @@ -1336,7 +1378,7 @@ def _blocks(self, index): layer = {(name,) + key: tuple(new_keys[key].tolist()) for key in keys} graph = HighLevelGraph.from_collections(name, layer, dependencies=[self]) - return Array(graph, name, chunks, self.dtype) + return Array(graph, name, chunks, meta=self) @property def blocks(self): @@ -1887,7 +1929,7 @@ def copy(self): if self.npartitions == 1: return self.map_blocks(M.copy) else: - return Array(self.dask, self.name, self.chunks, self.dtype) + return Array(self.dask, self.name, self.chunks, meta=self) def __deepcopy__(self, memo): c = self.copy() @@ -2331,7 +2373,12 @@ def from_array(x, chunks='auto', name=None, lock=False, asarray=True, fancy=True dtype=x.dtype) dsk[original_name] = x - return Array(dsk, name, chunks, dtype=x.dtype) + # Workaround for TileDB, its indexing is 1-based, + # and doesn't seems to support 0-length slicing + if x.__class__.__module__.split('.')[0] == 'tiledb' and hasattr(x, '_ctx_'): + return Array(dsk, name, chunks, dtype=x.dtype) + + return Array(dsk, name, chunks, meta=x) def from_zarr(url, component=None, storage_options=None, chunks=None,name=None, **kwargs): @@ -2947,6 +2994,11 @@ def concatenate(seq, axis=0, allow_unknown_chunksizes=False): for i, ind in enumerate(inds): ind[axis] = -(i + 1) + from .utils import meta_from_array + metas = [getattr(s, '_meta', s) for s in seq] + metas = [meta_from_array(m, getattr(m, 'ndim', 1)) for m in metas] + meta = np.concatenate(metas) + uc_args = list(concat(zip(seq, inds))) _, seq = unify_chunks(*uc_args, warn=False) @@ -2961,8 +3013,6 @@ def concatenate(seq, axis=0, allow_unknown_chunksizes=False): if len(set(seq_dtypes)) > 1: dt = reduce(np.promote_types, seq_dtypes) seq = [x.astype(dt) for x in seq] - else: - dt = seq_dtypes[0] names = [a.name for a in seq] @@ -2976,7 +3026,7 @@ def concatenate(seq, axis=0, allow_unknown_chunksizes=False): dsk = dict(zip(keys, values)) graph = HighLevelGraph.from_collections(name, dsk, dependencies=seq) - return Array(graph, name, chunks, dtype=dt) + return Array(graph, name, chunks, meta=meta) def load_store_chunk(x, out, index, lock, return_stored, load_stored): @@ -3357,7 +3407,7 @@ def handle_out(out, result): "out=%s, result=%s" % (str(out.shape), str(result.shape))) out._chunks = result.chunks out.dask = result.dask - out.dtype = result.dtype + out._meta = result._meta out.name = result.name elif out is not None: msg = ("The out parameter is not fully supported." diff --git a/dask/array/creation.py b/dask/array/creation.py index 4b5d85f0b8b..8169513cec4 100644 --- a/dask/array/creation.py +++ b/dask/array/creation.py @@ -11,13 +11,13 @@ from ..highlevelgraph import HighLevelGraph from ..base import tokenize from ..compatibility import Sequence +from ..utils import derived_from from . import chunk from .core import (Array, asarray, normalize_chunks, stack, concatenate, block, broadcast_to, broadcast_arrays) from .wrap import empty, ones, zeros, full -from .utils import AxisError -from ..utils import derived_from +from .utils import AxisError, meta_from_array, zeros_like_safe def empty_like(a, dtype=None, chunks=None): @@ -473,7 +473,11 @@ def eye(N, chunks='auto', M=None, k=0, dtype=float): @derived_from(np) def diag(v): name = 'diag-' + tokenize(v) - if isinstance(v, np.ndarray): + + meta = meta_from_array(v, 2 if v.ndim == 1 else 1) + + if (isinstance(v, np.ndarray) or + (hasattr(v, '__array_function__') and not isinstance(v, Array))): if v.ndim == 1: chunks = ((v.shape[0],), (v.shape[0],)) dsk = {(name, 0, 0): (np.diag, v)} @@ -482,7 +486,7 @@ def diag(v): dsk = {(name, 0): (np.diag, v)} else: raise ValueError("Array must be 1d or 2d only") - return Array(dsk, name, chunks, dtype=v.dtype) + return Array(dsk, name, chunks, meta=meta) if not isinstance(v, Array): raise TypeError("v must be a dask array or numpy array, " "got {0}".format(type(v))) @@ -491,7 +495,7 @@ def diag(v): dsk = {(name, i): (np.diag, row[i]) for i, row in enumerate(v.__dask_keys__())} graph = HighLevelGraph.from_collections(name, dsk, dependencies=[v]) - return Array(graph, name, (v.chunks[0],), dtype=v.dtype) + return Array(graph, name, (v.chunks[0],), meta=meta) else: raise NotImplementedError("Extracting diagonals from non-square " "chunked arrays") @@ -505,9 +509,10 @@ def diag(v): dsk[key] = (np.diag, blocks[i]) else: dsk[key] = (np.zeros, (m, n)) + dsk[key] = (partial(zeros_like_safe, shape=(m, n)), meta) graph = HighLevelGraph.from_collections(name, dsk, dependencies=[v]) - return Array(graph, name, (chunks_1d, chunks_1d), dtype=v.dtype) + return Array(graph, name, (chunks_1d, chunks_1d), meta=meta) @derived_from(np) @@ -574,7 +579,8 @@ def _diag_len(dim1, dim2, offset): chunks = left_chunks + right_shape graph = HighLevelGraph.from_collections(name, dsk, dependencies=[a]) - return Array(graph, name, shape=shape, chunks=chunks, dtype=a.dtype) + meta = meta_from_array(a, len(shape)) + return Array(graph, name, shape=shape, chunks=chunks, meta=meta) def triu(m, k=0): @@ -616,13 +622,15 @@ def triu(m, k=0): for i in range(rdim): for j in range(hdim): if chunk * (j - i + 1) < k: - dsk[(name, i, j)] = (np.zeros, (m.chunks[0][i], m.chunks[1][j])) + dsk[(name, i, j)] = (partial(zeros_like_safe, + shape=(m.chunks[0][i], m.chunks[1][j])), + m._meta) elif chunk * (j - i - 1) < k <= chunk * (j - i + 1): dsk[(name, i, j)] = (np.triu, (m.name, i, j), k - (chunk * (j - i))) else: dsk[(name, i, j)] = (m.name, i, j) graph = HighLevelGraph.from_collections(name, dsk, dependencies=[m]) - return Array(graph, name, shape=m.shape, chunks=m.chunks, dtype=m.dtype) + return Array(graph, name, shape=m.shape, chunks=m.chunks, meta=m) def tril(m, k=0): @@ -668,9 +676,11 @@ def tril(m, k=0): elif chunk * (j - i - 1) < k <= chunk * (j - i + 1): dsk[(name, i, j)] = (np.tril, (m.name, i, j), k - (chunk * (j - i))) else: - dsk[(name, i, j)] = (np.zeros, (m.chunks[0][i], m.chunks[1][j])) + dsk[(name, i, j)] = (partial(zeros_like_safe, + shape=(m.chunks[0][i], m.chunks[1][j])), + m._meta) graph = HighLevelGraph.from_collections(name, dsk, dependencies=[m]) - return Array(graph, name, shape=m.shape, chunks=m.chunks, dtype=m.dtype) + return Array(graph, name, shape=m.shape, chunks=m.chunks, meta=m) def _np_fromfunction(func, shape, dtype, offset, func_kwargs): diff --git a/dask/array/gufunc.py b/dask/array/gufunc.py index 1128efc0e40..92913f44c05 100755 --- a/dask/array/gufunc.py +++ b/dask/array/gufunc.py @@ -10,6 +10,7 @@ from toolz import concat, merge, unique from .core import Array, asarray, blockwise, getitem, apply_infer_dtype +from .utils import normalize_meta from ..highlevelgraph import HighLevelGraph from ..core import flatten @@ -397,11 +398,12 @@ def apply_gufunc(func, signature, *args, **kwargs): leaf_name = "%s_%d-%s" % (name, i, token) leaf_dsk = {(leaf_name,) + key[1:] + core_chunkinds: ((getitem, key, i) if nout else key) for key in keys} graph = HighLevelGraph.from_collections(leaf_name, leaf_dsk, dependencies=[tmp]) + meta = normalize_meta(tmp._meta, len(output_shape), dtype=odt) leaf_arr = Array(graph, leaf_name, chunks=output_chunks, shape=output_shape, - dtype=odt) + meta=meta) ### Axes: if keepdims: diff --git a/dask/array/linalg.py b/dask/array/linalg.py index 070ade97900..509db514f13 100644 --- a/dask/array/linalg.py +++ b/dask/array/linalg.py @@ -10,10 +10,11 @@ from ..blockwise import blockwise from ..compatibility import apply from ..highlevelgraph import HighLevelGraph +from ..utils import derived_from from .core import dotmany, Array, concatenate from .creation import eye from .random import RandomState -from ..utils import derived_from +from .utils import meta_from_array def _cumsum_blocks(it): @@ -194,8 +195,9 @@ def tsqr(data, compute_svd=False, _max_vchunk_size=None): vchunks_rstacked = tuple([sum(map(lambda x: x[1], sub_block_info)) for sub_block_info in all_blocks]) graph = HighLevelGraph(layers, dependencies) # dsk.dependencies[name_r_stacked] = {data.name} + r_stacked_meta = meta_from_array(data, len((sum(vchunks_rstacked), n)), dtype=rr.dtype) r_stacked = Array(graph, name_r_stacked, - shape=(sum(vchunks_rstacked), n), chunks=(vchunks_rstacked, (n)), dtype=rr.dtype) + shape=(sum(vchunks_rstacked), n), chunks=(vchunks_rstacked, (n)), meta=r_stacked_meta) # recurse q_inner, r_inner = tsqr(r_stacked, _max_vchunk_size=cr_max) @@ -349,10 +351,12 @@ def tsqr(data, compute_svd=False, _max_vchunk_size=None): # dsk.dependencies[name_q_st3] = {data.name} # dsk.dependencies[name_r_st2] = {data.name} graph = HighLevelGraph(layers, dependencies) + q_meta = meta_from_array(data, len(q_shape), qq.dtype) + r_meta = meta_from_array(data, len(r_shape), rr.dtype) q = Array(graph, name_q_st3, - shape=q_shape, chunks=q_chunks, dtype=qq.dtype) + shape=q_shape, chunks=q_chunks, meta=q_meta) r = Array(graph, name_r_st2, - shape=r_shape, chunks=r_chunks, dtype=rr.dtype) + shape=r_shape, chunks=r_chunks, meta=r_meta) return q, r else: # In-core SVD computation @@ -399,9 +403,12 @@ def tsqr(data, compute_svd=False, _max_vchunk_size=None): n_vh = n d_vh = max(m_vh, n_vh) # full matrix returned: but basically n graph = HighLevelGraph(layers, dependencies) - u = Array(graph, name_u_st4, shape=(m_u, n_u), chunks=(data.chunks[0], (n_u,)), dtype=uu.dtype) - s = Array(graph, name_s_st2, shape=(n_s,), chunks=((n_s,),), dtype=ss.dtype) - vh = Array(graph, name_v_st2, shape=(d_vh, d_vh), chunks=((n,), (n,)), dtype=vvh.dtype) + u_meta = meta_from_array(data, len((m_u, n_u)), uu.dtype) + s_meta = meta_from_array(data, len((n_s,)), ss.dtype) + vh_meta = meta_from_array(data, len((d_vh, d_vh)), vvh.dtype) + u = Array(graph, name_u_st4, shape=(m_u, n_u), chunks=(data.chunks[0], (n_u,)), meta=u_meta) + s = Array(graph, name_s_st2, shape=(n_s,), chunks=((n_s,),), meta=s_meta) + vh = Array(graph, name_v_st2, shape=(d_vh, d_vh), chunks=((n,), (n,)), meta=vh_meta) return u, s, vh @@ -479,17 +486,20 @@ def sfqr(data, name=None): graph = HighLevelGraph(layers, dependencies) + Q_meta = meta_from_array(data, len((m, min(m, n))), dtype=qq.dtype) + R_1_meta = meta_from_array(data, len((min(m, n), cc)), dtype=rr.dtype) Q = Array(graph, name_Q, - shape=(m, min(m, n)), chunks=(m, min(m, n)), dtype=qq.dtype) + shape=(m, min(m, n)), chunks=(m, min(m, n)), meta=Q_meta) R_1 = Array(graph, name_R_1, - shape=(min(m, n), cc), chunks=(cr, cc), dtype=rr.dtype) + shape=(min(m, n), cc), chunks=(cr, cc), meta=R_1_meta) # R = [R_1 Q'A_rest] Rs = [R_1] if nc > 1: + A_rest_meta = meta_from_array(data, len((min(m, n), n - cc)), dtype=rr.dtype) A_rest = Array(graph, name_A_rest, - shape=(min(m, n), n - cc), chunks=((cr), data.chunks[1][1:]), dtype=rr.dtype) + shape=(min(m, n), n - cc), chunks=((cr), data.chunks[1][1:]), meta=A_rest_meta) Rs.append(Q.T.dot(A_rest)) R = concatenate(Rs, axis=1) @@ -788,15 +798,18 @@ def lu(a): # l_permuted is not referred in upper triangulars pp, ll, uu = scipy.linalg.lu(np.ones(shape=(1, 1), dtype=a.dtype)) + pp_meta = meta_from_array(a, a.ndim, dtype=pp.dtype) + ll_meta = meta_from_array(a, a.ndim, dtype=ll.dtype) + uu_meta = meta_from_array(a, a.ndim, dtype=uu.dtype) graph = HighLevelGraph.from_collections(name_p, dsk, dependencies=[a]) - p = Array(graph, name_p, shape=a.shape, chunks=a.chunks, dtype=pp.dtype) + p = Array(graph, name_p, shape=a.shape, chunks=a.chunks, meta=pp_meta) graph = HighLevelGraph.from_collections(name_l, dsk, dependencies=[a]) - l = Array(graph, name_l, shape=a.shape, chunks=a.chunks, dtype=ll.dtype) + l = Array(graph, name_l, shape=a.shape, chunks=a.chunks, meta=ll_meta) graph = HighLevelGraph.from_collections(name_u, dsk, dependencies=[a]) - u = Array(graph, name_u, shape=a.shape, chunks=a.chunks, dtype=uu.dtype) + u = Array(graph, name_u, shape=a.shape, chunks=a.chunks, meta=uu_meta) return p, l, u @@ -885,7 +898,8 @@ def _key(i, j): graph = HighLevelGraph.from_collections(name, dsk, dependencies=[a, b]) res = _solve_triangular_lower(np.array([[1, 0], [1, 2]], dtype=a.dtype), np.array([0, 1], dtype=b.dtype)) - return Array(graph, name, shape=b.shape, chunks=b.chunks, dtype=res.dtype) + meta = meta_from_array(a, b.ndim, dtype=res.dtype) + return Array(graph, name, shape=b.shape, chunks=b.chunks, meta=meta) def solve(a, b, sym_pos=False): @@ -1034,10 +1048,11 @@ def _cholesky(a): graph_upper = HighLevelGraph.from_collections(name_upper, dsk, dependencies=[a]) graph_lower = HighLevelGraph.from_collections(name, dsk, dependencies=[a]) cho = scipy.linalg.cholesky(np.array([[1, 2], [2, 5]], dtype=a.dtype)) + meta = meta_from_array(a, a.ndim, dtype=cho.dtype) - lower = Array(graph_lower, name, shape=a.shape, chunks=a.chunks, dtype=cho.dtype) + lower = Array(graph_lower, name, shape=a.shape, chunks=a.chunks, meta=meta) # do not use .T, because part of transposed blocks are already calculated - upper = Array(graph_upper, name_upper, shape=a.shape, chunks=a.chunks, dtype=cho.dtype) + upper = Array(graph_upper, name_upper, shape=a.shape, chunks=a.chunks, meta=meta) return lower, upper @@ -1106,8 +1121,9 @@ def lstsq(a, b): _, _, _, ss = np.linalg.lstsq(np.array([[1, 0], [1, 2]], dtype=a.dtype), np.array([0, 1], dtype=b.dtype), rcond=-1) + meta = meta_from_array(r, 1, dtype=ss.dtype) s = Array(graph, sname, shape=(r.shape[0], ), - chunks=r.shape[0], dtype=ss.dtype) + chunks=r.shape[0], meta=meta) return x, residuals, rank, s diff --git a/dask/array/overlap.py b/dask/array/overlap.py index e9a8d5a29e8..03a1501762c 100644 --- a/dask/array/overlap.py +++ b/dask/array/overlap.py @@ -150,7 +150,7 @@ def overlap_internal(x, axes): dsk = merge(interior_slices, overlap_blocks) graph = HighLevelGraph.from_collections(name, dsk, dependencies=[x]) - return Array(graph, name, chunks, dtype=x.dtype) + return Array(graph, name, chunks, meta=x) def trim_overlap(x, depth, boundary=None): @@ -300,8 +300,12 @@ def constant(x, axis, depth, value): chunks = list(x.chunks) chunks[axis] = (depth,) - c = wrap.full(tuple(map(sum, chunks)), value, - chunks=tuple(chunks), dtype=x.dtype) + try: + c = wrap.full_like(x._meta, value, shape=tuple(map(sum, chunks)), + chunks=tuple(chunks), dtype=x.dtype) + except TypeError: + c = wrap.full(tuple(map(sum, chunks)), value, + chunks=tuple(chunks), dtype=x.dtype) return concatenate([c, x, c], axis=axis) @@ -441,7 +445,11 @@ def add_dummy_padding(x, depth, boundary): empty_chunks = list(x.chunks) empty_chunks[k] = (d,) - empty = wrap.empty(empty_shape, chunks=empty_chunks, dtype=x.dtype) + try: + empty = wrap.empty_like(x, shape=empty_shape, + chunks=empty_chunks, dtype=x.dtype) + except TypeError: + empty = wrap.empty(empty_shape, chunks=empty_chunks, dtype=x.dtype) out_chunks = list(x.chunks) ax_chunks = list(out_chunks[k]) diff --git a/dask/array/random.py b/dask/array/random.py index 763d226f068..240e2f56425 100644 --- a/dask/array/random.py +++ b/dask/array/random.py @@ -131,11 +131,6 @@ def _broadcast_any(ar, shape, chunks): else: small_kwargs[key] = ar - # Get dtype - small_kwargs['size'] = (0,) - func = getattr(np.random.RandomState(), funcname) - dtype = func(*small_args, **small_kwargs).dtype - sizes = list(product(*chunks)) seeds = random_state_data(len(sizes), self._numpy_state) token = tokenize(seeds, size, chunks, args, kwargs) @@ -169,12 +164,15 @@ def _broadcast_any(ar, shape, chunks): kwrg[k] = (getitem, lookup[k], slc) vals.append((_apply_random, self._RandomState, funcname, seed, size, arg, kwrg)) + meta = _apply_random(self._RandomState, funcname, seed, + (0,) * len(size), small_args, small_kwargs) + dsk.update(dict(zip(keys, vals))) graph = HighLevelGraph.from_collections( name, dsk, dependencies=dependencies, ) - return Array(graph, name, chunks + extra_chunks, dtype=dtype) + return Array(graph, name, chunks + extra_chunks, meta=meta) @doc_wraps(np.random.RandomState.beta) def beta(self, a, b, size=None, chunks="auto"): diff --git a/dask/array/rechunk.py b/dask/array/rechunk.py index e8becbd242f..5a1a76e5e25 100644 --- a/dask/array/rechunk.py +++ b/dask/array/rechunk.py @@ -569,7 +569,7 @@ def _compute_rechunk(x, chunks): layer = toolz.merge(x2, intermediates) graph = HighLevelGraph.from_collections(merge_name, layer, dependencies=[x]) - return Array(graph, merge_name, chunks, dtype=x.dtype) + return Array(graph, merge_name, chunks, meta=x) class _PrettyBlocks(object): diff --git a/dask/array/reductions.py b/dask/array/reductions.py index e22489a1d3b..3d91244442b 100644 --- a/dask/array/reductions.py +++ b/dask/array/reductions.py @@ -12,11 +12,11 @@ from . import chunk from .core import _concatenate2, Array, handle_out -from .blockwise import blockwise +from .blockwise import blockwise, blockwise_meta from ..blockwise import lol_tuples from .creation import arange, diagonal from .ufunc import sqrt -from .utils import validate_axis +from .utils import full_like_safe, validate_axis from .wrap import zeros, ones from .numpy_compat import ma_divide, divide as np_divide from ..compatibility import getargspec, builtins @@ -145,8 +145,22 @@ def reduction(x, chunk, aggregate, axis=None, keepdims=False, dtype=None, tmp = blockwise(chunk, inds, x, inds, axis=axis, keepdims=True, dtype=x.dtype) tmp._chunks = tuple((output_size, ) * len(c) if i in axis else c for i, c in enumerate(tmp.chunks)) + + if hasattr(x, '_meta'): + try: + reduced_meta = blockwise_meta(chunk, x.dtype, x._meta, axis=axis, + keepdims=True, meta=True) + except TypeError: + reduced_meta = blockwise_meta(chunk, x.dtype, x._meta, axis=axis, + keepdims=True) + except ValueError: + pass + else: + reduced_meta = None + result = _tree_reduce(tmp, aggregate, axis, keepdims, dtype, split_every, - combine, name=name, concatenate=concatenate) + combine, name=name, concatenate=concatenate, + reduced_meta=reduced_meta) if keepdims and output_size != 1: result._chunks = tuple((output_size, ) if i in axis else c for i, c in enumerate(tmp.chunks)) @@ -154,7 +168,7 @@ def reduction(x, chunk, aggregate, axis=None, keepdims=False, dtype=None, def _tree_reduce(x, aggregate, axis, keepdims, dtype, split_every=None, - combine=None, name=None, concatenate=True): + combine=None, name=None, concatenate=True, reduced_meta=None): """ Perform the tree reduction step of a reduction. Lower level, users should use ``reduction`` or ``arg_reduction`` directly. @@ -179,15 +193,18 @@ def _tree_reduce(x, aggregate, axis, keepdims, dtype, split_every=None, func = compose(func, partial(_concatenate2, axes=axis)) for i in range(depth - 1): x = partial_reduce(func, x, split_every, True, dtype=dtype, - name=(name or funcname(combine or aggregate)) + '-partial') + name=(name or funcname(combine or aggregate)) + '-partial', + reduced_meta=reduced_meta) func = partial(aggregate, axis=axis, keepdims=keepdims) if concatenate: func = compose(func, partial(_concatenate2, axes=axis)) return partial_reduce(func, x, split_every, keepdims=keepdims, dtype=dtype, - name=(name or funcname(aggregate)) + '-aggregate') + name=(name or funcname(aggregate)) + '-aggregate', + reduced_meta=reduced_meta) -def partial_reduce(func, x, split_every, keepdims=False, dtype=None, name=None): +def partial_reduce(func, x, split_every, keepdims=False, dtype=None, name=None, + reduced_meta=None): """ Partial reduction across multiple axes. Parameters @@ -223,7 +240,31 @@ def partial_reduce(func, x, split_every, keepdims=False, dtype=None, name=None): g = lol_tuples((x.name,), range(x.ndim), decided, dummy) dsk[(name,) + k] = (func, g) graph = HighLevelGraph.from_collections(name, dsk, dependencies=[x]) - return Array(graph, name, out_chunks, dtype=dtype) + + meta = x._meta + if reduced_meta is not None: + try: + meta = func(reduced_meta, meta=True) + # no meta keyword argument exists for func, and it isn't required + except TypeError: + meta = func(reduced_meta) + # when no work can be computed on the empty array (e.g., func is a ufunc) + except ValueError: + pass + + # some functions can't compute empty arrays (those for which reduced_meta + # fall into the ValueError exception) and we have to rely on reshaping + # the array according to len(out_chunks) + if not np.isscalar(meta) and meta.ndim != len(out_chunks): + if len(out_chunks) == 0: + meta = meta.sum() + else: + meta = meta.reshape((0, ) * len(out_chunks)) + + if np.isscalar(meta): + return Array(graph, name, out_chunks, dtype=dtype) + else: + return Array(graph, name, out_chunks, meta=meta.astype(dtype)) @wraps(chunk.sum) @@ -331,7 +372,9 @@ def numel(x, **kwargs): if axis is None: prod = np.prod(shape, dtype=dtype) - return np.full((1,) * len(shape), prod, dtype=dtype) if keepdims is True else prod + return full_like_safe( + x, prod, shape=(1, ) * len(shape), dtype=dtype + ) if keepdims is True else prod if not isinstance(axis, tuple or list): axis = [axis] @@ -341,7 +384,7 @@ def numel(x, **kwargs): new_shape = tuple(shape[dim] if dim not in axis else 1 for dim in range(len(shape))) else: new_shape = tuple(shape[dim] for dim in range(len(shape)) if dim not in axis) - return np.full(new_shape, prod, dtype=dtype) + return full_like_safe(x, prod, shape=new_shape, dtype=dtype) def nannumel(x, **kwargs): @@ -349,26 +392,40 @@ def nannumel(x, **kwargs): return chunk.sum(~np.isnan(x), **kwargs) -def mean_chunk(x, sum=chunk.sum, numel=numel, dtype='f8', **kwargs): +def mean_chunk(x, sum=chunk.sum, numel=numel, dtype='f8', meta=False, **kwargs): + if meta: + return x n = numel(x, dtype=dtype, **kwargs) + total = sum(x, dtype=dtype, **kwargs) + return {'n': n, 'total': total} -def mean_combine(pairs, sum=chunk.sum, numel=numel, dtype='f8', axis=None, **kwargs): +def mean_combine(pairs, sum=chunk.sum, numel=numel, dtype='f8', axis=None, meta=False, **kwargs): if not isinstance(pairs, list): pairs = [pairs] - ns = deepmap(lambda pair: pair['n'], pairs) - totals = deepmap(lambda pair: pair['total'], pairs) + + ns = deepmap(lambda pair: pair['n'], pairs) if not meta else pairs n = _concatenate2(ns, axes=axis).sum(axis=axis, **kwargs) + + if meta: + return n + + totals = deepmap(lambda pair: pair['total'], pairs) total = _concatenate2(totals, axes=axis).sum(axis=axis, **kwargs) + return {'n': n, 'total': total} -def mean_agg(pairs, dtype='f8', axis=None, **kwargs): - ns = deepmap(lambda pair: pair['n'], pairs) - totals = deepmap(lambda pair: pair['total'], pairs) +def mean_agg(pairs, dtype='f8', axis=None, meta=False, **kwargs): + ns = deepmap(lambda pair: pair['n'], pairs) if not meta else pairs n = _concatenate2(ns, axes=axis).sum(axis=axis, dtype=dtype, **kwargs) + + if meta: + return n + + totals = deepmap(lambda pair: pair['total'], pairs) total = _concatenate2(totals, axes=axis).sum(axis=axis, dtype=dtype, **kwargs) return divide(total, n, dtype=dtype) @@ -402,9 +459,13 @@ def nanmean(a, axis=None, dtype=None, keepdims=False, split_every=None, nanmean = wraps(chunk.nanmean)(nanmean) -def moment_chunk(A, order=2, sum=chunk.sum, numel=numel, dtype='f8', **kwargs): +def moment_chunk(A, order=2, sum=chunk.sum, numel=numel, dtype='f8', meta=False, **kwargs): + if meta: + return A + n = numel(A, **kwargs) + + n = n.astype(np.int64) total = sum(A, dtype=dtype, **kwargs) - n = numel(A, **kwargs).astype(np.int64) u = total / n xs = [sum((A - u)**i, dtype=dtype, **kwargs) for i in range(2, order + 1)] M = np.stack(xs, axis=-1) @@ -419,18 +480,24 @@ def _moment_helper(Ms, ns, inner_term, order, sum, axis, kwargs): return M -def moment_combine(pairs, order=2, ddof=0, dtype='f8', sum=np.sum, axis=None, **kwargs): +def moment_combine(pairs, order=2, ddof=0, dtype='f8', sum=np.sum, axis=None, meta=False, **kwargs): if not isinstance(pairs, list): pairs = [pairs] - totals = _concatenate2(deepmap(lambda pair: pair['total'], pairs), axes=axis) - ns = _concatenate2(deepmap(lambda pair: pair['n'], pairs), axes=axis) - Ms = _concatenate2(deepmap(lambda pair: pair['M'], pairs), axes=axis) kwargs['dtype'] = dtype kwargs['keepdims'] = True - total = totals.sum(axis=axis, **kwargs) + ns = deepmap(lambda pair: pair['n'], pairs) if not meta else pairs + ns = _concatenate2(ns, axes=axis) n = ns.sum(axis=axis, **kwargs) + + if meta: + return n + + totals = _concatenate2(deepmap(lambda pair: pair['total'], pairs), axes=axis) + Ms = _concatenate2(deepmap(lambda pair: pair['M'], pairs), axes=axis) + + total = totals.sum(axis=axis, **kwargs) mu = divide(total, n, dtype=dtype) inner_term = divide(totals, ns, dtype=dtype) - mu @@ -439,12 +506,9 @@ def moment_combine(pairs, order=2, ddof=0, dtype='f8', sum=np.sum, axis=None, ** return {'total': total, 'n': n, 'M': M} -def moment_agg(pairs, order=2, ddof=0, dtype='f8', sum=np.sum, axis=None, **kwargs): +def moment_agg(pairs, order=2, ddof=0, dtype='f8', sum=np.sum, axis=None, meta=False, **kwargs): if not isinstance(pairs, list): pairs = [pairs] - totals = _concatenate2(deepmap(lambda pair: pair['total'], pairs), axes=axis) - ns = _concatenate2(deepmap(lambda pair: pair['n'], pairs), axes=axis) - Ms = _concatenate2(deepmap(lambda pair: pair['M'], pairs), axes=axis) kwargs['dtype'] = dtype # To properly handle ndarrays, the original dimensions need to be kept for @@ -452,7 +516,16 @@ def moment_agg(pairs, order=2, ddof=0, dtype='f8', sum=np.sum, axis=None, **kwar keepdim_kw = kwargs.copy() keepdim_kw['keepdims'] = True + ns = deepmap(lambda pair: pair['n'], pairs) if not meta else pairs + ns = _concatenate2(ns, axes=axis) n = ns.sum(axis=axis, **keepdim_kw) + + if meta: + return n + + totals = _concatenate2(deepmap(lambda pair: pair['total'], pairs), axes=axis) + Ms = _concatenate2(deepmap(lambda pair: pair['M'], pairs), axes=axis) + mu = divide(totals.sum(axis=axis, **keepdim_kw), n, dtype=dtype) inner_term = divide(totals, ns, dtype=dtype) - mu diff --git a/dask/array/reshape.py b/dask/array/reshape.py index 1d3b6c68d4d..ae354fb3a71 100644 --- a/dask/array/reshape.py +++ b/dask/array/reshape.py @@ -6,6 +6,7 @@ import numpy as np from .core import Array +from .utils import meta_from_array from ..base import tokenize from ..core import flatten from ..compatibility import reduce @@ -174,6 +175,8 @@ def reshape(x, shape): if x.shape == shape: return x + meta = meta_from_array(x, len(shape)) + name = 'reshape-' + tokenize(x, shape) if x.npartitions == 1: @@ -181,7 +184,7 @@ def reshape(x, shape): dsk = {(name,) + (0,) * len(shape): (M.reshape, key, shape)} chunks = tuple((d,) for d in shape) graph = HighLevelGraph.from_collections(name, dsk, dependencies=[x]) - return Array(graph, name, chunks, dtype=x.dtype) + return Array(graph, name, chunks, meta=meta) # Logic for how to rechunk inchunks, outchunks = reshape_rechunk(x.shape, shape, x.chunks) @@ -194,4 +197,4 @@ def reshape(x, shape): dsk = {a: (M.reshape, b, shape) for a, b, shape in zip(out_keys, in_keys, shapes)} graph = HighLevelGraph.from_collections(name, dsk, dependencies=[x2]) - return Array(graph, name, outchunks, dtype=x.dtype) + return Array(graph, name, outchunks, meta=meta) diff --git a/dask/array/routines.py b/dask/array/routines.py index c0423093656..ee796ed1059 100644 --- a/dask/array/routines.py +++ b/dask/array/routines.py @@ -5,6 +5,7 @@ import warnings from functools import wraps, partial from numbers import Real, Integral +from distutils.version import LooseVersion import numpy as np from toolz import concat, sliding_window, interleave @@ -16,7 +17,7 @@ from ..utils import funcname, derived_from from . import chunk from .creation import arange, diag, empty, indices -from .utils import safe_wraps, validate_axis +from .utils import safe_wraps, validate_axis, meta_from_array, zeros_like_safe from .wrap import ones from .ufunc import multiply, sqrt @@ -193,7 +194,8 @@ def _tensordot(a, b, axes): # workaround may be removed when numpy version (currently 1.13.0) is bumped a_dims = np.array([a.shape[i] for i in axes[0]]) b_dims = np.array([b.shape[i] for i in axes[1]]) - if len(a_dims) > 0 and (a_dims == b_dims).all() and a_dims.min() == 0: + if (len(a_dims) > 0 and (a_dims == b_dims).all() and a_dims.min() == 0 and + LooseVersion(np.__version__) < LooseVersion("1.14")): x = np.zeros(tuple([s for i, s in enumerate(a.shape) if i not in axes[0]] + [s for i, s in enumerate(b.shape) if i not in axes[1]])) else: @@ -518,7 +520,7 @@ def gradient(f, *varargs, **kwargs): def _bincount_sum(bincounts, dtype=int): n = max(map(len, bincounts)) - out = np.zeros(n, dtype=dtype) + out = zeros_like_safe(bincounts[0], shape=n, dtype=dtype) for b in bincounts: out[:len(b)] += b return out @@ -554,7 +556,9 @@ def bincount(x, weights=None, minlength=0): else: chunks = ((minlength,),) - return Array(graph, final_name, chunks, dtype) + meta = meta_from_array(x, 1, dtype=dtype) + + return Array(graph, final_name, chunks, meta=meta) @derived_from(np) @@ -991,7 +995,9 @@ def squeeze(a, axis=None): sl = tuple(0 if i in axis else slice(None) for i, s in enumerate(a.shape)) - return a[sl] + a = a[sl] + + return a @derived_from(np) diff --git a/dask/array/tests/test_array_core.py b/dask/array/tests/test_array_core.py index e2a68d12cf1..9d2fefb1b74 100644 --- a/dask/array/tests/test_array_core.py +++ b/dask/array/tests/test_array_core.py @@ -172,6 +172,11 @@ def test_Array(): assert len(a) == shape[0] + with pytest.raises(ValueError): + Array(dsk, name, chunks, shape=shape) + with pytest.raises(TypeError): + Array(dsk, name, chunks, shape=shape, dtype='f8', meta=np.empty(0, 0)) + def test_uneven_chunks(): a = Array({}, 'x', chunks=(3, 3), shape=(10, 10), dtype='f8') @@ -355,8 +360,8 @@ def test_concatenate(): pytest.raises(ValueError, lambda: concatenate([a, b, c], axis=2)) -@pytest.mark.parametrize('dtypes', [(('>f8', '>f8'), '>f8'), - (('f8', '>f8'), 'float64'), + ((' 0.5, lambda x: x.rechunk((4, 4, 4)), @@ -43,16 +60,32 @@ pytest.param(lambda x: da.einsum("ijk,ijk", x, x), marks=pytest.mark.xfail( reason='depends on resolution of https://github.com/numpy/numpy/issues/12974')), - lambda x: np.isreal(x), - lambda x: np.iscomplex(x), lambda x: np.isneginf(x), lambda x: np.isposinf(x), - lambda x: np.real(x), - lambda x: np.imag(x), - lambda x: np.fix(x), - lambda x: np.i0(x.reshape((24,))), - lambda x: np.sinc(x), - lambda x: np.nan_to_num(x), + pytest.param(lambda x: np.isreal(x), + marks=pytest.mark.skipif(not IS_NEP18_ACTIVE, + reason="NEP-18 support is not available in NumPy")), + pytest.param(lambda x: np.iscomplex(x), + marks=pytest.mark.skipif(not IS_NEP18_ACTIVE, + reason="NEP-18 support is not available in NumPy")), + pytest.param(lambda x: np.real(x), + marks=pytest.mark.skipif(not IS_NEP18_ACTIVE, + reason="NEP-18 support is not available in NumPy")), + pytest.param(lambda x: np.imag(x), + marks=pytest.mark.skipif(not IS_NEP18_ACTIVE, + reason="NEP-18 support is not available in NumPy")), + pytest.param(lambda x: np.fix(x), + marks=pytest.mark.skipif(not IS_NEP18_ACTIVE, + reason="NEP-18 support is not available in NumPy")), + pytest.param(lambda x: np.i0(x.reshape((24,))), + marks=pytest.mark.skipif(not IS_NEP18_ACTIVE, + reason="NEP-18 support is not available in NumPy")), + pytest.param(lambda x: np.sinc(x), + marks=pytest.mark.skipif(not IS_NEP18_ACTIVE, + reason="NEP-18 support is not available in NumPy")), + pytest.param(lambda x: np.nan_to_num(x), + marks=pytest.mark.skipif(not IS_NEP18_ACTIVE, + reason="NEP-18 support is not available in NumPy")), ] @@ -66,11 +99,10 @@ def test_basic(func): ddc = func(dc) ddn = func(dn) - assert_eq(ddc, ddn) + assert type(ddc._meta) == cupy.core.core.ndarray + assert_eq(ddc, ddc) # Check that _meta and computed arrays match types - if ddc.shape: - result = ddc.compute(scheduler='single-threaded') - assert isinstance(result, cupy.ndarray) + assert_eq(ddc, ddn) @pytest.mark.parametrize('dtype', ['f4', 'f8']) @@ -78,3 +110,495 @@ def test_sizeof(dtype): c = cupy.random.random((2, 3, 4), dtype=dtype) assert sizeof(c) == c.nbytes + + +@pytest.mark.skipif(not IS_NEP18_ACTIVE, reason="NEP-18 support is not available in NumPy") +def test_diag(): + v = cupy.arange(11) + dv = da.from_array(v, chunks=(4,), asarray=False) + assert type(dv._meta) == cupy.core.core.ndarray + assert_eq(dv, dv) # Check that _meta and computed arrays match types + assert_eq(da.diag(dv), cupy.diag(v)) + + v = v + v + 3 + dv = dv + dv + 3 + darr = da.diag(dv) + cupyarr = cupy.diag(v) + assert type(darr._meta) == cupy.core.core.ndarray + assert_eq(darr, darr) # Check that _meta and computed arrays match types + assert_eq(darr, cupyarr) + + x = cupy.arange(64).reshape((8, 8)) + dx = da.from_array(x, chunks=(4, 4), asarray=False) + assert type(dx._meta) == cupy.core.core.ndarray + assert_eq(dx, dx) # Check that _meta and computed arrays match types + assert_eq(da.diag(dx), cupy.diag(x)) + + +@pytest.mark.skipif(not IS_NEP18_ACTIVE, reason="NEP-18 support is not available in NumPy") +def test_diagonal(): + v = cupy.arange(11) + with pytest.raises(ValueError): + da.diagonal(v) + + v = cupy.arange(4).reshape((2, 2)) + with pytest.raises(ValueError): + da.diagonal(v, axis1=0, axis2=0) + + with pytest.raises(AxisError): + da.diagonal(v, axis1=-4) + + with pytest.raises(AxisError): + da.diagonal(v, axis2=-4) + + v = cupy.arange(4 * 5 * 6).reshape((4, 5, 6)) + v = da.from_array(v, chunks=2, asarray=False) + assert_eq(da.diagonal(v), np.diagonal(v)) + # Empty diagonal. + assert_eq(da.diagonal(v, offset=10), np.diagonal(v, offset=10)) + assert_eq(da.diagonal(v, offset=-10), np.diagonal(v, offset=-10)) + assert isinstance(da.diagonal(v).compute(), cupy.core.core.ndarray) + + with pytest.raises(ValueError): + da.diagonal(v, axis1=-2) + + # Negative axis. + assert_eq(da.diagonal(v, axis1=-1), np.diagonal(v, axis1=-1)) + assert_eq(da.diagonal(v, offset=1, axis1=-1), np.diagonal(v, offset=1, axis1=-1)) + + # Heterogenous chunks. + v = cupy.arange(2 * 3 * 4 * 5 * 6).reshape((2, 3, 4, 5, 6)) + v = da.from_array(v, chunks=(1, (1, 2), (1, 2, 1), (2, 1, 2), (5, 1)), asarray=False) + + assert_eq(da.diagonal(v), np.diagonal(v)) + assert_eq(da.diagonal(v, offset=2, axis1=3, axis2=1), + np.diagonal(v, offset=2, axis1=3, axis2=1)) + + assert_eq(da.diagonal(v, offset=-2, axis1=3, axis2=1), + np.diagonal(v, offset=-2, axis1=3, axis2=1)) + + assert_eq(da.diagonal(v, offset=-2, axis1=3, axis2=4), + np.diagonal(v, offset=-2, axis1=3, axis2=4)) + + assert_eq(da.diagonal(v, 1), np.diagonal(v, 1)) + assert_eq(da.diagonal(v, -1), np.diagonal(v, -1)) + # Positional arguments + assert_eq(da.diagonal(v, 1, 2, 1), np.diagonal(v, 1, 2, 1)) + + +@pytest.mark.xfail(reason="no shape argument support *_like functions on CuPy yet") +@pytest.mark.skipif(np.__version__ < '1.17', reason='no shape argument for *_like functions') +@pytest.mark.skipif(not IS_NEP18_ACTIVE, reason="NEP-18 support is not available in NumPy") +def test_tril_triu(): + A = cupy.random.randn(20, 20) + for chk in [5, 4]: + dA = da.from_array(A, (chk, chk), asarray=False) + + assert_eq(da.triu(dA), np.triu(A)) + assert_eq(da.tril(dA), np.tril(A)) + + for k in [-25, -20, -9, -1, 1, 8, 19, 21]: + assert_eq(da.triu(dA, k), np.triu(A, k)) + assert_eq(da.tril(dA, k), np.tril(A, k)) + + +@pytest.mark.xfail(reason="no shape argument support *_like functions on CuPy yet") +@pytest.mark.skipif(np.__version__ < '1.17', reason='no shape argument for *_like functions') +@pytest.mark.skipif(not IS_NEP18_ACTIVE, reason="NEP-18 support is not available in NumPy") +def test_tril_triu_non_square_arrays(): + A = cupy.random.randint(0, 11, (30, 35)) + dA = da.from_array(A, chunks=(5, 5), asarray=False) + assert_eq(da.triu(dA), np.triu(A)) + assert_eq(da.tril(dA), np.tril(A)) + + +@pytest.mark.skipif(not IS_NEP18_ACTIVE, reason="NEP-18 support is not available in NumPy") +def test_apply_gufunc_axis(): + def mydiff(x): + return np.diff(x) + + a = cupy.random.randn(3, 6, 4) + da_ = da.from_array(a, chunks=2, asarray=False) + + m = np.diff(a, axis=1) + dm = apply_gufunc(mydiff, "(i)->(i)", da_, axis=1, output_sizes={'i': 5}, + allow_rechunk=True) + assert_eq(m, dm) + + +def test_overlap_internal(): + x = cupy.arange(64).reshape((8, 8)) + d = da.from_array(x, chunks=(4, 4), asarray=False) + + g = da.overlap.overlap_internal(d, {0: 2, 1: 1}) + assert g.chunks == ((6, 6), (5, 5)) + + expected = np.array([ + [ 0, 1, 2, 3, 4, 3, 4, 5, 6, 7], + [ 8, 9, 10, 11, 12, 11, 12, 13, 14, 15], + [16, 17, 18, 19, 20, 19, 20, 21, 22, 23], + [24, 25, 26, 27, 28, 27, 28, 29, 30, 31], + [32, 33, 34, 35, 36, 35, 36, 37, 38, 39], + [40, 41, 42, 43, 44, 43, 44, 45, 46, 47], + + [16, 17, 18, 19, 20, 19, 20, 21, 22, 23], + [24, 25, 26, 27, 28, 27, 28, 29, 30, 31], + [32, 33, 34, 35, 36, 35, 36, 37, 38, 39], + [40, 41, 42, 43, 44, 43, 44, 45, 46, 47], + [48, 49, 50, 51, 52, 51, 52, 53, 54, 55], + [56, 57, 58, 59, 60, 59, 60, 61, 62, 63]]) + + assert_eq(g, expected) + assert same_keys(da.overlap.overlap_internal(d, {0: 2, 1: 1}), g) + + +def test_trim_internal(): + x = cupy.ones((40, 60)) + d = da.from_array(x, chunks=(10, 10), asarray=False) + e = da.overlap.trim_internal(d, axes={0: 1, 1: 2}) + + assert e.chunks == ((8, 8, 8, 8), (6, 6, 6, 6, 6, 6)) + + +@pytest.mark.skipif(not IS_NEP18_ACTIVE, reason="NEP-18 support is not available in NumPy") +def test_periodic(): + x = cupy.arange(64).reshape((8, 8)) + d = da.from_array(x, chunks=(4, 4), asarray=False) + + e = da.overlap.periodic(d, axis=0, depth=2) + assert e.shape[0] == d.shape[0] + 4 + assert e.shape[1] == d.shape[1] + + assert_eq(e[1, :], d[-1, :]) + assert_eq(e[0, :], d[-2, :]) + + +@pytest.mark.skipif(not IS_NEP18_ACTIVE, reason="NEP-18 support is not available in NumPy") +def test_reflect(): + x = cupy.arange(10) + d = da.from_array(x, chunks=(5, 5), asarray=False) + + e = da.overlap.reflect(d, axis=0, depth=2) + expected = np.array([1, 0, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 9, 8]) + assert_eq(e, expected) + + e = da.overlap.reflect(d, axis=0, depth=1) + expected = np.array([0, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 9]) + assert_eq(e, expected) + + +@pytest.mark.skipif(not IS_NEP18_ACTIVE, reason="NEP-18 support is not available in NumPy") +def test_nearest(): + x = cupy.arange(10) + d = da.from_array(x, chunks=(5, 5), asarray=False) + + e = da.overlap.nearest(d, axis=0, depth=2) + expected = np.array([0, 0, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 9, 9]) + assert_eq(e, expected) + + e = da.overlap.nearest(d, axis=0, depth=1) + expected = np.array([0, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 9]) + assert_eq(e, expected) + + +@pytest.mark.xfail(reason="no shape argument support *_like functions on CuPy yet") +@pytest.mark.skipif(np.__version__ < '1.17', reason='no shape argument for *_like functions') +@pytest.mark.skipif(not IS_NEP18_ACTIVE, reason="NEP-18 support is not available in NumPy") +def test_constant(): + x = cupy.arange(64).reshape((8, 8)) + d = da.from_array(x, chunks=(4, 4), asarray=False) + + e = da.overlap.constant(d, axis=0, depth=2, value=10) + assert e.shape[0] == d.shape[0] + 4 + assert e.shape[1] == d.shape[1] + + assert_eq(e[1, :], np.ones(8, dtype=x.dtype) * 10) + assert_eq(e[-1, :], np.ones(8, dtype=x.dtype) * 10) + + +@pytest.mark.xfail(reason="no shape argument support *_like functions on CuPy yet") +@pytest.mark.skipif(np.__version__ < '1.17', reason='no shape argument for *_like functions') +@pytest.mark.skipif(not IS_NEP18_ACTIVE, reason="NEP-18 support is not available in NumPy") +def test_boundaries(): + x = cupy.arange(64).reshape((8, 8)) + d = da.from_array(x, chunks=(4, 4), asarray=False) + + e = da.overlap.boundaries(d, {0: 2, 1: 1}, {0: 0, 1: 'periodic'}) + + expected = np.array( + [[ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], + [ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], + [ 7, 0, 1, 2, 3, 4, 5, 6, 7, 0], + [15, 8, 9,10,11,12,13,14,15, 8], + [23,16,17,18,19,20,21,22,23,16], + [31,24,25,26,27,28,29,30,31,24], + [39,32,33,34,35,36,37,38,39,32], + [47,40,41,42,43,44,45,46,47,40], + [55,48,49,50,51,52,53,54,55,48], + [63,56,57,58,59,60,61,62,63,56], + [ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], + [ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]]) + assert_eq(e, expected) + + +def test_random_all(): + def rnd_test(func, *args, **kwargs): + a = func(*args, **kwargs) + assert type(a._meta) == cupy.core.core.ndarray + assert_eq(a, a) # Check that _meta and computed arrays match types + + rs = da.random.RandomState(RandomState=cupy.random.RandomState) + + rnd_test(rs.beta, 1, 2, size=5, chunks=3) + rnd_test(rs.binomial, 10, 0.5, size=5, chunks=3) + rnd_test(rs.chisquare, 1, size=5, chunks=3) + rnd_test(rs.exponential, 1, size=5, chunks=3) + rnd_test(rs.f, 1, 2, size=5, chunks=3) + rnd_test(rs.gamma, 5, 1, size=5, chunks=3) + rnd_test(rs.geometric, 1, size=5, chunks=3) + rnd_test(rs.gumbel, 1, size=5, chunks=3) + rnd_test(rs.hypergeometric, 1, 2, 3, size=5, chunks=3) + rnd_test(rs.laplace, size=5, chunks=3) + rnd_test(rs.logistic, size=5, chunks=3) + rnd_test(rs.lognormal, size=5, chunks=3) + rnd_test(rs.logseries, 0.5, size=5, chunks=3) + # No RandomState for multinomial in CuPy + # rnd_test(rs.multinomial, 20, [1 / 6.] * 6, size=5, chunks=3) + rnd_test(rs.negative_binomial, 5, 0.5, size=5, chunks=3) + rnd_test(rs.noncentral_chisquare, 2, 2, size=5, chunks=3) + + rnd_test(rs.noncentral_f, 2, 2, 3, size=5, chunks=3) + rnd_test(rs.normal, 2, 2, size=5, chunks=3) + rnd_test(rs.pareto, 1, size=5, chunks=3) + rnd_test(rs.poisson, size=5, chunks=3) + + rnd_test(rs.power, 1, size=5, chunks=3) + rnd_test(rs.rayleigh, size=5, chunks=3) + rnd_test(rs.random_sample, size=5, chunks=3) + + rnd_test(rs.triangular, 1, 2, 3, size=5, chunks=3) + rnd_test(rs.uniform, size=5, chunks=3) + rnd_test(rs.vonmises, 2, 3, size=5, chunks=3) + rnd_test(rs.wald, 1, 2, size=5, chunks=3) + + rnd_test(rs.weibull, 2, size=5, chunks=3) + rnd_test(rs.zipf, 2, size=5, chunks=3) + + rnd_test(rs.standard_cauchy, size=5, chunks=3) + rnd_test(rs.standard_exponential, size=5, chunks=3) + rnd_test(rs.standard_gamma, 2, size=5, chunks=3) + rnd_test(rs.standard_normal, size=5, chunks=3) + rnd_test(rs.standard_t, 2, size=5, chunks=3) + + +@pytest.mark.parametrize('shape', [(2, 3), (2, 3, 4), (2, 3, 4, 2)]) +def test_random_shapes(shape): + rs = da.random.RandomState(RandomState=cupy.random.RandomState) + + x = rs.poisson(size=shape, chunks=3) + assert type(x._meta) == cupy.core.core.ndarray + assert_eq(x, x) # Check that _meta and computed arrays match types + assert x._meta.shape == (0,) * len(shape) + assert x.shape == shape + + +@pytest.mark.xfail(reason="CuPy division by zero on tensordot(), https://github.com/cupy/cupy/pull/2209") +@pytest.mark.skipif(not IS_NEP18_ACTIVE, reason="NEP-18 support is not available in NumPy") +@pytest.mark.parametrize('m,n,chunks,error_type', [ + (20, 10, 10, None), # tall-skinny regular blocks + (20, 10, (3, 10), None), # tall-skinny regular fat layers + (20, 10, ((8, 4, 8), 10), None), # tall-skinny irregular fat layers + (40, 10, ((15, 5, 5, 8, 7), (10)), None), # tall-skinny non-uniform chunks (why?) + (128, 2, (16, 2), None), # tall-skinny regular thin layers; recursion_depth=1 + (129, 2, (16, 2), None), # tall-skinny regular thin layers; recursion_depth=2 --> 17x2 + (130, 2, (16, 2), None), # tall-skinny regular thin layers; recursion_depth=2 --> 18x2 next + (131, 2, (16, 2), None), # tall-skinny regular thin layers; recursion_depth=2 --> 18x2 next + (300, 10, (40, 10), None), # tall-skinny regular thin layers; recursion_depth=2 + (300, 10, (30, 10), None), # tall-skinny regular thin layers; recursion_depth=3 + (300, 10, (20, 10), None), # tall-skinny regular thin layers; recursion_depth=4 + (10, 5, 10, None), # single block tall + (5, 10, 10, None), # single block short + (10, 10, 10, None), # single block square + (10, 40, (10, 10), ValueError), # short-fat regular blocks + (10, 40, (10, 15), ValueError), # short-fat irregular blocks + (10, 40, ((10), (15, 5, 5, 8, 7)), ValueError), # short-fat non-uniform chunks (why?) + (20, 20, 10, ValueError), # 2x2 regular blocks +]) +def test_tsqr(m, n, chunks, error_type): + mat = cupy.random.rand(m, n) + data = da.from_array(mat, chunks=chunks, name='A', asarray=False) + + # qr + m_q = m + n_q = min(m, n) + m_r = n_q + n_r = n + + # svd + m_u = m + n_u = min(m, n) + n_s = n_q + m_vh = n_q + n_vh = n + d_vh = max(m_vh, n_vh) # full matrix returned + + if error_type is None: + # test QR + q, r = da.linalg.tsqr(data) + assert_eq((m_q, n_q), q.shape) # shape check + assert_eq((m_r, n_r), r.shape) # shape check + assert_eq(mat, da.dot(q, r)) # accuracy check + assert_eq(cupy.eye(n_q, n_q), da.dot(q.T, q)) # q must be orthonormal + assert_eq(r, np.triu(r.rechunk(r.shape[0]))) # r must be upper triangular + + # test SVD + u, s, vh = da.linalg.tsqr(data, compute_svd=True) + s_exact = np.linalg.svd(mat)[1] + assert_eq(s, s_exact) # s must contain the singular values + assert_eq((m_u, n_u), u.shape) # shape check + assert_eq((n_s,), s.shape) # shape check + assert_eq((d_vh, d_vh), vh.shape) # shape check + assert_eq(np.eye(n_u, n_u), da.dot(u.T, u)) # u must be orthonormal + assert_eq(np.eye(d_vh, d_vh), da.dot(vh, vh.T)) # vh must be orthonormal + assert_eq(mat, da.dot(da.dot(u, da.diag(s)), vh[:n_q])) # accuracy check + else: + with pytest.raises(error_type): + q, r = da.linalg.tsqr(data) + with pytest.raises(error_type): + u, s, vh = da.linalg.tsqr(data, compute_svd=True) + + +@pytest.mark.skipif(not IS_NEP18_ACTIVE, reason="NEP-18 support is not available in NumPy") +@pytest.mark.parametrize('m_min,n_max,chunks,vary_rows,vary_cols,error_type', [ + (10, 5, (10, 5), True, False, None), # single block tall + (10, 5, (10, 5), False, True, None), # single block tall + (10, 5, (10, 5), True, True, None), # single block tall + (40, 5, (10, 5), True, False, None), # multiple blocks tall + (40, 5, (10, 5), False, True, None), # multiple blocks tall + (40, 5, (10, 5), True, True, None), # multiple blocks tall + (300, 10, (40, 10), True, False, None), # tall-skinny regular thin layers; recursion_depth=2 + (300, 10, (30, 10), True, False, None), # tall-skinny regular thin layers; recursion_depth=3 + (300, 10, (20, 10), True, False, None), # tall-skinny regular thin layers; recursion_depth=4 + (300, 10, (40, 10), False, True, None), # tall-skinny regular thin layers; recursion_depth=2 + (300, 10, (30, 10), False, True, None), # tall-skinny regular thin layers; recursion_depth=3 + (300, 10, (20, 10), False, True, None), # tall-skinny regular thin layers; recursion_depth=4 + (300, 10, (40, 10), True, True, None), # tall-skinny regular thin layers; recursion_depth=2 + (300, 10, (30, 10), True, True, None), # tall-skinny regular thin layers; recursion_depth=3 + (300, 10, (20, 10), True, True, None), # tall-skinny regular thin layers; recursion_depth=4 +]) +def test_tsqr_uncertain(m_min, n_max, chunks, vary_rows, vary_cols, error_type): + mat = cupy.random.rand(m_min * 2, n_max) + m, n = m_min * 2, n_max + mat[0:m_min, 0] += 1 + _c0 = mat[:, 0] + _r0 = mat[0, :] + c0 = da.from_array(_c0, chunks=m_min, name='c', asarray=False) + r0 = da.from_array(_r0, chunks=n_max, name='r', asarray=False) + data = da.from_array(mat, chunks=chunks, name='A', asarray=False) + if vary_rows: + data = data[c0 > 0.5, :] + mat = mat[_c0 > 0.5, :] + m = mat.shape[0] + if vary_cols: + data = data[:, r0 > 0.5] + mat = mat[:, _r0 > 0.5] + n = mat.shape[1] + + # qr + m_q = m + n_q = min(m, n) + m_r = n_q + n_r = n + + # svd + m_u = m + n_u = min(m, n) + n_s = n_q + m_vh = n_q + n_vh = n + d_vh = max(m_vh, n_vh) # full matrix returned + + if error_type is None: + # test QR + q, r = da.linalg.tsqr(data) + q = q.compute() # because uncertainty + r = r.compute() + assert_eq((m_q, n_q), q.shape) # shape check + assert_eq((m_r, n_r), r.shape) # shape check + assert_eq(mat, np.dot(q, r)) # accuracy check + assert_eq(np.eye(n_q, n_q), np.dot(q.T, q)) # q must be orthonormal + assert_eq(r, np.triu(r)) # r must be upper triangular + + # test SVD + u, s, vh = da.linalg.tsqr(data, compute_svd=True) + u = u.compute() # because uncertainty + s = s.compute() + vh = vh.compute() + s_exact = np.linalg.svd(mat)[1] + assert_eq(s, s_exact) # s must contain the singular values + assert_eq((m_u, n_u), u.shape) # shape check + assert_eq((n_s,), s.shape) # shape check + assert_eq((d_vh, d_vh), vh.shape) # shape check + assert_eq(np.eye(n_u, n_u), np.dot(u.T, u)) # u must be orthonormal + assert_eq(np.eye(d_vh, d_vh), np.dot(vh, vh.T)) # vh must be orthonormal + assert_eq(mat, np.dot(np.dot(u, np.diag(s)), vh[:n_q])) # accuracy check + else: + with pytest.raises(error_type): + q, r = da.linalg.tsqr(data) + with pytest.raises(error_type): + u, s, vh = da.linalg.tsqr(data, compute_svd=True) + + +@pytest.mark.parametrize('m,n,chunks,error_type', [ + (20, 10, 10, ValueError), # tall-skinny regular blocks + (20, 10, (3, 10), ValueError), # tall-skinny regular fat layers + (20, 10, ((8, 4, 8), 10), ValueError), # tall-skinny irregular fat layers + (40, 10, ((15, 5, 5, 8, 7), (10)), ValueError), # tall-skinny non-uniform chunks (why?) + (128, 2, (16, 2), ValueError), # tall-skinny regular thin layers; recursion_depth=1 + (129, 2, (16, 2), ValueError), # tall-skinny regular thin layers; recursion_depth=2 --> 17x2 + (130, 2, (16, 2), ValueError), # tall-skinny regular thin layers; recursion_depth=2 --> 18x2 next + (131, 2, (16, 2), ValueError), # tall-skinny regular thin layers; recursion_depth=2 --> 18x2 next + (300, 10, (40, 10), ValueError), # tall-skinny regular thin layers; recursion_depth=2 + (300, 10, (30, 10), ValueError), # tall-skinny regular thin layers; recursion_depth=3 + (300, 10, (20, 10), ValueError), # tall-skinny regular thin layers; recursion_depth=4 + (10, 5, 10, None), # single block tall + (5, 10, 10, None), # single block short + (10, 10, 10, None), # single block square + (10, 40, (10, 10), None), # short-fat regular blocks + (10, 40, (10, 15), None), # short-fat irregular blocks + (10, 40, ((10), (15, 5, 5, 8, 7)), None), # short-fat non-uniform chunks (why?) + (20, 20, 10, ValueError), # 2x2 regular blocks +]) +def test_sfqr(m, n, chunks, error_type): + mat = np.random.rand(m, n) + data = da.from_array(mat, chunks=chunks, name='A') + m_q = m + n_q = min(m, n) + m_r = n_q + n_r = n + m_qtq = n_q + + if error_type is None: + q, r = da.linalg.sfqr(data) + assert_eq((m_q, n_q), q.shape) # shape check + assert_eq((m_r, n_r), r.shape) # shape check + assert_eq(mat, da.dot(q, r)) # accuracy check + assert_eq(np.eye(m_qtq, m_qtq), da.dot(q.T, q)) # q must be orthonormal + assert_eq(r, da.triu(r.rechunk(r.shape[0]))) # r must be upper triangular + else: + with pytest.raises(error_type): + q, r = da.linalg.sfqr(data) + + +@pytest.mark.xfail(reason="no shape argument support *_like functions on CuPy yet") +@pytest.mark.skipif(np.__version__ < '1.17', reason='no shape argument for *_like functions') +@pytest.mark.skipif(not IS_NEP18_ACTIVE, reason="NEP-18 support is not available in NumPy") +def test_bincount(): + x = cupy.array([2, 1, 5, 2, 1]) + d = da.from_array(x, chunks=2, asarray=False) + e = da.bincount(d, minlength=6) + assert_eq(e, np.bincount(x, minlength=6)) + assert same_keys(da.bincount(d, minlength=6), e) + + assert da.bincount(d, minlength=6).name != da.bincount(d, minlength=7).name + assert da.bincount(d, minlength=6).name == da.bincount(d, minlength=6).name diff --git a/dask/array/tests/test_masked.py b/dask/array/tests/test_masked.py index 222e46c5cbd..e7531e3e6b9 100644 --- a/dask/array/tests/test_masked.py +++ b/dask/array/tests/test_masked.py @@ -124,7 +124,7 @@ def test_mixed_concatenate(func): dd = func(d) ss = func(s) - assert_eq(dd, ss) + assert_eq(dd, ss, check_meta=False) @pytest.mark.parametrize('func', functions) @@ -138,7 +138,7 @@ def test_mixed_random(func): dd = func(d) ss = func(s) - assert_eq(dd, ss) + assert_eq(dd, ss, check_meta=False) def test_mixed_output_type(): diff --git a/dask/array/tests/test_sparse.py b/dask/array/tests/test_sparse.py index c23fa1df80a..76c3f3b6429 100644 --- a/dask/array/tests/test_sparse.py +++ b/dask/array/tests/test_sparse.py @@ -4,7 +4,7 @@ import pytest import dask.array as da -from dask.array.utils import assert_eq +from dask.array.utils import assert_eq, IS_NEP18_ACTIVE sparse = pytest.importorskip('sparse') if sparse: @@ -71,6 +71,10 @@ def test_basic(func): assert (zz != 1).sum() > np.prod(zz.shape) / 2 # mostly dense +@pytest.mark.skipif( + sparse.__version__ < '0.7.0+10', + reason='fixed in https://github.com/pydata/sparse/pull/256' +) def test_tensordot(): x = da.random.random((2, 3, 4), chunks=(1, 2, 2)) x[x < 0.8] = 0 @@ -136,3 +140,20 @@ def test_mixed_output_type(): zz = z.compute() assert isinstance(zz, sparse.COO) assert zz.nnz == y.compute().nnz + + +def test_metadata(): + y = da.random.random((10, 10), chunks=(5, 5)) + y[y < 0.8] = 0 + z = sparse.COO.from_numpy(y.compute()) + y = y.map_blocks(sparse.COO.from_numpy) + + assert isinstance(y._meta, sparse.COO) + assert isinstance((y + 1)._meta, sparse.COO) + assert isinstance(y.sum(axis=0)._meta, sparse.COO) + assert isinstance(y.var(axis=0)._meta, sparse.COO) + assert isinstance(y[:5, ::2]._meta, sparse.COO) + assert isinstance(y.rechunk((2, 2))._meta, sparse.COO) + assert isinstance((y - z)._meta, sparse.COO) + if IS_NEP18_ACTIVE: + assert isinstance(np.concatenate([y, y])._meta, sparse.COO) diff --git a/dask/array/ufunc.py b/dask/array/ufunc.py index 763dc4315fb..2e02cd3472f 100644 --- a/dask/array/ufunc.py +++ b/dask/array/ufunc.py @@ -6,7 +6,7 @@ from toolz import curry from .core import Array, elemwise, blockwise, apply_infer_dtype, asarray -from .utils import IS_NEP18_ACTIVE +from .utils import empty_like_safe, IS_NEP18_ACTIVE from ..base import is_dask_collection, normalize_function from .. import core from ..highlevelgraph import HighLevelGraph @@ -297,15 +297,14 @@ def frexp(x): rdsk = {(right,) + key[1:]: (getitem, key, 1) for key in core.flatten(tmp.__dask_keys__())} - a = np.empty((1, ), dtype=x.dtype) + a = empty_like_safe(x._meta if hasattr(x, '_meta') else x, + shape=(1, ) * x.ndim, dtype=x.dtype) l, r = np.frexp(a) - ldt = l.dtype - rdt = r.dtype graph = HighLevelGraph.from_collections(left, ldsk, dependencies=[tmp]) - L = Array(graph, left, chunks=tmp.chunks, dtype=ldt) + L = Array(graph, left, chunks=tmp.chunks, meta=l) graph = HighLevelGraph.from_collections(right, rdsk, dependencies=[tmp]) - R = Array(graph, right, chunks=tmp.chunks, dtype=rdt) + R = Array(graph, right, chunks=tmp.chunks, meta=r) return L, R @@ -320,13 +319,12 @@ def modf(x): rdsk = {(right,) + key[1:]: (getitem, key, 1) for key in core.flatten(tmp.__dask_keys__())} - a = np.empty((1,), dtype=x.dtype) + a = empty_like_safe(x._meta if hasattr(x, '_meta') else x, + shape=(1, ) * x.ndim, dtype=x.dtype) l, r = np.modf(a) - ldt = l.dtype - rdt = r.dtype graph = HighLevelGraph.from_collections(left, ldsk, dependencies=[tmp]) - L = Array(graph, left, chunks=tmp.chunks, dtype=ldt) + L = Array(graph, left, chunks=tmp.chunks, meta=l) graph = HighLevelGraph.from_collections(right, rdsk, dependencies=[tmp]) - R = Array(graph, right, chunks=tmp.chunks, dtype=rdt) + R = Array(graph, right, chunks=tmp.chunks, meta=r) return L, R diff --git a/dask/array/utils.py b/dask/array/utils.py index 2366d6ad8f0..3e8ba2f219a 100644 --- a/dask/array/utils.py +++ b/dask/array/utils.py @@ -28,6 +28,38 @@ def normalize_to_array(x): return x +def normalize_meta(x, ndim, dtype=None): + if ndim > x.ndim: + meta = x[(Ellipsis, ) + tuple(None for _ in range(ndim - x.ndim))] + meta = meta[tuple(slice(0, 0, None) for _ in range(meta.ndim))] + elif ndim < x.ndim: + meta = np.sum(x, axis=tuple(d for d in range((x.ndim - ndim)))) + else: + meta = x + + if dtype: + meta = meta.astype(dtype) + + return meta + + +def meta_from_array(x, ndim, dtype=None): + if isinstance(x, list) or isinstance(x, tuple): + ndims = [0 if isinstance(a, numbers.Number) + else a.ndim if hasattr(a, 'ndim') else len(a) for a in x] + a = [a if nd == 0 else meta_from_array(a, nd) for a, nd in zip(x, ndims)] + return a if isinstance(x, list) else tuple(x) + + # x._meta must be a Dask Array, some libraries (e.g. zarr) implement a + # _meta attribute that are incompatible with Dask Array._meta + if hasattr(x, '_meta') and isinstance(x, Array): + meta = x._meta + else: + meta = x[tuple(slice(0, 0, None) for _ in range(x.ndim))] + + return normalize_meta(meta, ndim, dtype) + + def allclose(a, b, equal_nan=False, **kwargs): a = normalize_to_array(a) b = normalize_to_array(b) @@ -73,7 +105,7 @@ def assert_eq_shape(a, b, check_nan=True): assert aa == bb -def assert_eq(a, b, check_shape=True, check_graph=True, **kwargs): +def assert_eq(a, b, check_shape=True, check_graph=True, check_meta=True, **kwargs): a_original = a b_original = b if isinstance(a, Array): @@ -81,7 +113,9 @@ def assert_eq(a, b, check_shape=True, check_graph=True, **kwargs): adt = a.dtype if check_graph: _check_dsk(a.dask) + a_meta = getattr(a, '_meta', None) a = a.compute(scheduler='sync') + a_computed = a if hasattr(a, 'todense'): a = a.todense() if not hasattr(a, 'dtype'): @@ -100,7 +134,9 @@ def assert_eq(a, b, check_shape=True, check_graph=True, **kwargs): bdt = b.dtype if check_graph: _check_dsk(b.dask) + b_meta = getattr(b, '_meta', None) b = b.compute(scheduler='sync') + b_computed = b if not hasattr(b, 'dtype'): b = np.array(b, dtype='O') if hasattr(b, 'todense'): @@ -115,12 +151,30 @@ def assert_eq(a, b, check_shape=True, check_graph=True, **kwargs): bdt = getattr(b, 'dtype', None) if str(adt) != str(bdt): - diff = difflib.ndiff(str(adt).splitlines(), str(bdt).splitlines()) - raise AssertionError('string repr are different' + os.linesep + - os.linesep.join(diff)) + # Ignore check for matching length of flexible dtypes, since Array._meta + # can't encode that information + if adt.type == bdt.type and not (adt.type == np.bytes_ or adt.type == np.str_): + diff = difflib.ndiff(str(adt).splitlines(), str(bdt).splitlines()) + raise AssertionError('string repr are different' + os.linesep + + os.linesep.join(diff)) try: assert a.shape == b.shape + if check_meta: + if hasattr(a, '_meta') and hasattr(b, '_meta'): + assert_eq(a._meta, b._meta) + if hasattr(a_original, '_meta'): + assert a_original._meta.ndim == a.ndim + if a_meta is not None: + assert type(a_original._meta) == type(a_meta) + if not (np.isscalar(a_meta) or np.isscalar(a_computed)): + assert type(a_meta) == type(a_computed) + if hasattr(b_original, '_meta'): + assert b_original._meta.ndim == b.ndim + if b_meta is not None: + assert type(b_original._meta) == type(b_meta) + if not (np.isscalar(b_meta) or np.isscalar(b_computed)): + assert type(b_meta) == type(b_computed) assert allclose(a, b, **kwargs) return True except TypeError: @@ -147,6 +201,55 @@ def safe_wraps(wrapped, assigned=functools.WRAPPER_ASSIGNMENTS): return lambda x: x +def empty_like_safe(a, shape, **kwargs): + """ + Return np.empty_like(a, shape=shape, **kwargs) if the shape argument + is supported (requires NumPy >= 1.17), otherwise falls back to + using the old behavior, returning np.empty(shape, **kwargs). + """ + try: + return np.empty_like(a, shape=shape, **kwargs) + except TypeError: + return np.empty(shape, **kwargs) + + +def full_like_safe(a, fill_value, shape, **kwargs): + """ + Return np.full_like(a, fill_value, shape=shape, **kwargs) if the + shape argument is supported (requires NumPy >= 1.17), otherwise + falls back to using the old behavior, returning + np.full(shape, fill_value, **kwargs). + """ + try: + return np.full_like(a, fill_value, shape=shape, **kwargs) + except TypeError: + return np.full(shape, fill_value, **kwargs) + + +def ones_like_safe(a, shape, **kwargs): + """ + Return np.ones_like(a, shape=shape, **kwargs) if the shape argument + is supported (requires NumPy >= 1.17), otherwise falls back to + using the old behavior, returning np.ones(shape, **kwargs). + """ + try: + return np.ones_like(a, shape=shape, **kwargs) + except TypeError: + return np.ones(shape, **kwargs) + + +def zeros_like_safe(a, shape, **kwargs): + """ + Return np.zeros_like(a, shape=shape, **kwargs) if the shape argument + is supported (requires NumPy >= 1.17), otherwise falls back to + using the old behavior, returning np.zeros(shape, **kwargs). + """ + try: + return np.zeros_like(a, shape=shape, **kwargs) + except TypeError: + return np.zeros(shape, **kwargs) + + def validate_axis(axis, ndim): """ Validate an input to axis= keywords """ if isinstance(axis, (tuple, list)): diff --git a/dask/array/wrap.py b/dask/array/wrap.py index 0e05d00c1b0..0890378e159 100644 --- a/dask/array/wrap.py +++ b/dask/array/wrap.py @@ -13,21 +13,10 @@ from ..base import tokenize from ..utils import funcname from .core import Array, normalize_chunks +from .utils import meta_from_array -def wrap_func_shape_as_first_arg(func, *args, **kwargs): - """ - Transform np creation function into blocked version - """ - if 'shape' not in kwargs: - shape, args = args[0], args[1:] - else: - shape = kwargs.pop('shape') - - if isinstance(shape, Array): - raise TypeError('Dask array input not supported. ' - 'Please use tuple, list, or a 1D numpy array instead.') - +def _parse_wrap_args(func, args, kwargs, shape): if isinstance(shape, np.ndarray): shape = shape.tolist() @@ -46,6 +35,30 @@ def wrap_func_shape_as_first_arg(func, *args, **kwargs): name = name or funcname(func) + '-' + tokenize(func, shape, chunks, dtype, args, kwargs) + return {'shape': shape, 'dtype': dtype, 'kwargs': kwargs, + 'chunks': chunks, 'name': name} + + +def wrap_func_shape_as_first_arg(func, *args, **kwargs): + """ + Transform np creation function into blocked version + """ + if 'shape' not in kwargs: + shape, args = args[0], args[1:] + else: + shape = kwargs.pop('shape') + + if isinstance(shape, Array): + raise TypeError('Dask array input not supported. ' + 'Please use tuple, list, or a 1D numpy array instead.') + + parsed = _parse_wrap_args(func, args, kwargs, shape) + shape = parsed['shape'] + dtype = parsed['dtype'] + chunks = parsed['chunks'] + name = parsed['name'] + kwargs = parsed['kwargs'] + keys = product([name], *[range(len(bd)) for bd in chunks]) shapes = product(*chunks) func = partial(func, dtype=dtype, **kwargs) @@ -55,9 +68,52 @@ def wrap_func_shape_as_first_arg(func, *args, **kwargs): return Array(dsk, name, chunks, dtype=dtype) +def wrap_func_like(func, *args, **kwargs): + """ + Transform np creation function into blocked version + """ + x = args[0] + meta = meta_from_array(x, x.ndim) + shape = kwargs.get('shape', x.shape) + + parsed = _parse_wrap_args(func, args, kwargs, shape) + shape = parsed['shape'] + dtype = parsed['dtype'] + chunks = parsed['chunks'] + name = parsed['name'] + kwargs = parsed['kwargs'] + + keys = product([name], *[range(len(bd)) for bd in chunks]) + shapes = product(*chunks) + shapes = list(shapes) + kw = [kwargs for _ in shapes] + for i, s in enumerate(list(shapes)): + kw[i]['shape'] = s + vals = ((partial(func, dtype=dtype, **k),) + args for (k, s) in zip(kw, shapes)) + + dsk = dict(zip(keys, vals)) + + return Array(dsk, name, chunks, meta=meta.astype(dtype)) + + +def wrap_func_like_safe(func, func_like, *args, **kwargs): + """ + Safe implementation for wrap_func_like(), attempts to use func_like(), + if the shape keyword argument, falls back to func(). + """ + try: + return func_like(*args, **kwargs) + except TypeError: + return func(*args, **kwargs) + + @curry def wrap(wrap_func, func, **kwargs): - f = partial(wrap_func, func, **kwargs) + func_like = kwargs.pop('func_like', None) + if func_like is None: + f = partial(wrap_func, func, **kwargs) + else: + f = partial(wrap_func, func_like, **kwargs) template = """ Blocked variant of %(name)s @@ -78,3 +134,12 @@ def wrap(wrap_func, func, **kwargs): zeros = w(np.zeros, dtype='f8') empty = w(np.empty, dtype='f8') full = w(np.full) + + +w_like = wrap(wrap_func_like_safe) + + +ones_like = w_like(np.ones, func_like=np.ones_like) +zeros_like = w_like(np.zeros, func_like=np.zeros_like) +empty_like = w_like(np.empty, func_like=np.empty_like) +full_like = w_like(np.full, func_like=np.full_like) diff --git a/dask/dataframe/core.py b/dask/dataframe/core.py index 793476e8954..84aa60fff49 100644 --- a/dask/dataframe/core.py +++ b/dask/dataframe/core.py @@ -33,6 +33,7 @@ memory_repr, put_lines, M, key_split, OperatorMethodMixin, is_arraylike, typename, skip_doctest) from ..array.core import Array, normalize_arg +from ..array.utils import empty_like_safe from ..blockwise import blockwise, Blockwise from ..base import DaskMethodsMixin, tokenize, dont_optimize, is_dask_collection from ..delayed import delayed, Delayed, unpack_collections @@ -3595,7 +3596,8 @@ def elemwise(op, *args, **kwargs): msg = 'elemwise with 2 or more DataFrames and Scalar is not supported' raise NotImplementedError(msg) # For broadcastable series, use no rows. - parts = [d._meta if _is_broadcastable(d) or isinstance(d, Array) + parts = [d._meta if _is_broadcastable(d) + else empty_like_safe(d, (), dtype=d.dtype) if isinstance(d, Array) else d._meta_nonempty for d in dasks] with raise_on_meta_error(funcname(op)): meta = partial_by_order(*parts, function=op, other=other)