New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for NumPy 1.20.0 #7084
Conversation
There are 6 remaining failures, though they're essentially the same: _________ test_array_cumreduction_axis[sequential-None-False-cumprod] __________
[gw2] linux -- Python 3.9.1 /usr/bin/python3
func = 'cumprod', use_nan = False, axis = None, method = 'sequential'
@pytest.mark.parametrize("func", ["cumsum", "cumprod", "nancumsum", "nancumprod"])
@pytest.mark.parametrize("use_nan", [False, True])
@pytest.mark.parametrize("axis", [None, 0, 1, -1])
@pytest.mark.parametrize("method", ["sequential", "blelloch"])
def test_array_cumreduction_axis(func, use_nan, axis, method):
np_func = getattr(np, func)
da_func = getattr(da, func)
s = (10, 11, 12)
a = np.arange(np.prod(s), dtype=float).reshape(s)
if use_nan:
a[1] = np.nan
d = da.from_array(a, chunks=(4, 5, 6))
a_r = np_func(a, axis=axis)
d_r = da_func(d, axis=axis, method=method)
> assert_eq(a_r, d_r)
dask/array/tests/test_reductions.py:553:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
dask/array/tests/test_reductions.py:18: in assert_eq
_assert_eq(a, b, equal_nan=True)
dask/array/utils.py:259: in assert_eq
b, bdt, b_meta, b_computed = _get_dt_meta_computed(
dask/array/utils.py:234: in _get_dt_meta_computed
x = x.compute(scheduler="sync")
dask/base.py:279: in compute
(result,) = compute(self, traverse=False, **kwargs)
dask/base.py:561: in compute
results = schedule(dsk, keys, **kwargs)
dask/local.py:528: in get_sync
return get_async(apply_sync, 1, dsk, keys, **kwargs)
dask/local.py:495: in get_async
fire_task()
dask/local.py:457: in fire_task
apply_async(
dask/local.py:517: in apply_sync
res = func(*args, **kwds)
dask/local.py:227: in execute_task
result = pack_exception(e, dumps)
dask/local.py:222: in execute_task
result = _execute_task(task, data)
dask/core.py:121: in _execute_task
return func(*(_execute_task(a, cache) for a in args))
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
a = array([0.])
b = array([2.64000000e+002, 6.99600000e+004, 1.86093600e+007, 4.96869912e+009,
1.33161136e+012, 3.58203457e+014, 9.... inf, inf, inf,
inf, inf, inf, inf])
def _cumprod_merge(a, b):
if isinstance(a, np.ma.masked_array) or isinstance(b, np.ma.masked_array):
values = np.ma.getdata(a) * np.ma.getdata(b)
return np.ma.masked_array(values, mask=np.ma.getmaskarray(b))
> return a * b
E RuntimeWarning: invalid value encountered in multiply
dask/array/reductions.py:1398: RuntimeWarning
________ test_array_cumreduction_axis[sequential-None-False-nancumprod] ________
[gw2] linux -- Python 3.9.1 /usr/bin/python3
func = 'nancumprod', use_nan = False, axis = None, method = 'sequential'
@pytest.mark.parametrize("func", ["cumsum", "cumprod", "nancumsum", "nancumprod"])
@pytest.mark.parametrize("use_nan", [False, True])
@pytest.mark.parametrize("axis", [None, 0, 1, -1])
@pytest.mark.parametrize("method", ["sequential", "blelloch"])
def test_array_cumreduction_axis(func, use_nan, axis, method):
np_func = getattr(np, func)
da_func = getattr(da, func)
s = (10, 11, 12)
a = np.arange(np.prod(s), dtype=float).reshape(s)
if use_nan:
a[1] = np.nan
d = da.from_array(a, chunks=(4, 5, 6))
a_r = np_func(a, axis=axis)
d_r = da_func(d, axis=axis, method=method)
> assert_eq(a_r, d_r)
dask/array/tests/test_reductions.py:553:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
dask/array/tests/test_reductions.py:18: in assert_eq
_assert_eq(a, b, equal_nan=True)
dask/array/utils.py:259: in assert_eq
b, bdt, b_meta, b_computed = _get_dt_meta_computed(
dask/array/utils.py:234: in _get_dt_meta_computed
x = x.compute(scheduler="sync")
dask/base.py:279: in compute
(result,) = compute(self, traverse=False, **kwargs)
dask/base.py:561: in compute
results = schedule(dsk, keys, **kwargs)
dask/local.py:528: in get_sync
return get_async(apply_sync, 1, dsk, keys, **kwargs)
dask/local.py:495: in get_async
fire_task()
dask/local.py:457: in fire_task
apply_async(
dask/local.py:517: in apply_sync
res = func(*args, **kwds)
dask/local.py:227: in execute_task
result = pack_exception(e, dumps)
dask/local.py:222: in execute_task
result = _execute_task(task, data)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
arg = (<built-in function mul>, ('nancumprod-f9ff59bf79794516542461a63b8a4646', 'extra', 2), ('nancumprod-6ea9b00ccb738b2c003c7ca94a26a289', 2))
cache = {('nancumprod-6ea9b00ccb738b2c003c7ca94a26a289', 2): array([2.64000000e+002, 6.99600000e+004, 1.86093600e+007, 4.96869... inf, inf, inf]), ('nancumprod-f9ff59bf79794516542461a63b8a4646', 'extra', 2): array([0.])}
dsk = None
def _execute_task(arg, cache, dsk=None):
"""Do the actual work of collecting data and executing a function
Examples
--------
>>> cache = {'x': 1, 'y': 2}
Compute tasks against a cache
>>> _execute_task((add, 'x', 1), cache) # Compute task in naive manner
2
>>> _execute_task((add, (inc, 'x'), 1), cache) # Support nested computation
3
Also grab data from cache
>>> _execute_task('x', cache)
1
Support nested lists
>>> list(_execute_task(['x', 'y'], cache))
[1, 2]
>>> list(map(list, _execute_task([['x', 'y'], ['y', 'x']], cache)))
[[1, 2], [2, 1]]
>>> _execute_task('foo', cache) # Passes through on non-keys
'foo'
"""
if isinstance(arg, list):
return [_execute_task(a, cache) for a in arg]
elif istask(arg):
func, args = arg[0], arg[1:]
# Note: Don't assign the subtask results to a variable. numpy detects
# temporaries by their reference count and can execute certain
# operations in-place.
> return func(*(_execute_task(a, cache) for a in args))
E RuntimeWarning: invalid value encountered in multiply
dask/core.py:121: RuntimeWarning
________ test_array_cumreduction_axis[sequential-None-True-nancumprod] _________
[gw2] linux -- Python 3.9.1 /usr/bin/python3
func = 'nancumprod', use_nan = True, axis = None, method = 'sequential'
@pytest.mark.parametrize("func", ["cumsum", "cumprod", "nancumsum", "nancumprod"])
@pytest.mark.parametrize("use_nan", [False, True])
@pytest.mark.parametrize("axis", [None, 0, 1, -1])
@pytest.mark.parametrize("method", ["sequential", "blelloch"])
def test_array_cumreduction_axis(func, use_nan, axis, method):
np_func = getattr(np, func)
da_func = getattr(da, func)
s = (10, 11, 12)
a = np.arange(np.prod(s), dtype=float).reshape(s)
if use_nan:
a[1] = np.nan
d = da.from_array(a, chunks=(4, 5, 6))
a_r = np_func(a, axis=axis)
d_r = da_func(d, axis=axis, method=method)
> assert_eq(a_r, d_r)
dask/array/tests/test_reductions.py:553:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
dask/array/tests/test_reductions.py:18: in assert_eq
_assert_eq(a, b, equal_nan=True)
dask/array/utils.py:259: in assert_eq
b, bdt, b_meta, b_computed = _get_dt_meta_computed(
dask/array/utils.py:234: in _get_dt_meta_computed
x = x.compute(scheduler="sync")
dask/base.py:279: in compute
(result,) = compute(self, traverse=False, **kwargs)
dask/base.py:561: in compute
results = schedule(dsk, keys, **kwargs)
dask/local.py:528: in get_sync
return get_async(apply_sync, 1, dsk, keys, **kwargs)
dask/local.py:495: in get_async
fire_task()
dask/local.py:457: in fire_task
apply_async(
dask/local.py:517: in apply_sync
res = func(*args, **kwds)
dask/local.py:227: in execute_task
result = pack_exception(e, dumps)
dask/local.py:222: in execute_task
result = _execute_task(task, data)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
arg = (<built-in function mul>, ('nancumprod-d63b9da7ed3d89f94324b8fe7b8ee65a', 'extra', 2), ('nancumprod-041d650acfcfb8560bb397de6fac230f', 2))
cache = {('nancumprod-041d650acfcfb8560bb397de6fac230f', 2): array([2.64000000e+002, 6.99600000e+004, 1.86093600e+007, 4.96869... inf, inf, inf]), ('nancumprod-d63b9da7ed3d89f94324b8fe7b8ee65a', 'extra', 2): array([0.])}
dsk = None
def _execute_task(arg, cache, dsk=None):
"""Do the actual work of collecting data and executing a function
Examples
--------
>>> cache = {'x': 1, 'y': 2}
Compute tasks against a cache
>>> _execute_task((add, 'x', 1), cache) # Compute task in naive manner
2
>>> _execute_task((add, (inc, 'x'), 1), cache) # Support nested computation
3
Also grab data from cache
>>> _execute_task('x', cache)
1
Support nested lists
>>> list(_execute_task(['x', 'y'], cache))
[1, 2]
>>> list(map(list, _execute_task([['x', 'y'], ['y', 'x']], cache)))
[[1, 2], [2, 1]]
>>> _execute_task('foo', cache) # Passes through on non-keys
'foo'
"""
if isinstance(arg, list):
return [_execute_task(a, cache) for a in arg]
elif istask(arg):
func, args = arg[0], arg[1:]
# Note: Don't assign the subtask results to a variable. numpy detects
# temporaries by their reference count and can execute certain
# operations in-place.
> return func(*(_execute_task(a, cache) for a in args))
E RuntimeWarning: invalid value encountered in multiply
dask/core.py:121: RuntimeWarning
__________ test_array_cumreduction_axis[blelloch-None-False-cumprod] ___________
[gw2] linux -- Python 3.9.1 /usr/bin/python3
func = 'cumprod', use_nan = False, axis = None, method = 'blelloch'
@pytest.mark.parametrize("func", ["cumsum", "cumprod", "nancumsum", "nancumprod"])
@pytest.mark.parametrize("use_nan", [False, True])
@pytest.mark.parametrize("axis", [None, 0, 1, -1])
@pytest.mark.parametrize("method", ["sequential", "blelloch"])
def test_array_cumreduction_axis(func, use_nan, axis, method):
np_func = getattr(np, func)
da_func = getattr(da, func)
s = (10, 11, 12)
a = np.arange(np.prod(s), dtype=float).reshape(s)
if use_nan:
a[1] = np.nan
d = da.from_array(a, chunks=(4, 5, 6))
a_r = np_func(a, axis=axis)
d_r = da_func(d, axis=axis, method=method)
> assert_eq(a_r, d_r)
dask/array/tests/test_reductions.py:553:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
dask/array/tests/test_reductions.py:18: in assert_eq
_assert_eq(a, b, equal_nan=True)
dask/array/utils.py:259: in assert_eq
b, bdt, b_meta, b_computed = _get_dt_meta_computed(
dask/array/utils.py:234: in _get_dt_meta_computed
x = x.compute(scheduler="sync")
dask/base.py:279: in compute
(result,) = compute(self, traverse=False, **kwargs)
dask/base.py:561: in compute
results = schedule(dsk, keys, **kwargs)
dask/local.py:528: in get_sync
return get_async(apply_sync, 1, dsk, keys, **kwargs)
dask/local.py:495: in get_async
fire_task()
dask/local.py:457: in fire_task
apply_async(
dask/local.py:517: in apply_sync
res = func(*args, **kwds)
dask/local.py:227: in execute_task
result = pack_exception(e, dumps)
dask/local.py:222: in execute_task
result = _execute_task(task, data)
dask/core.py:121: in _execute_task
return func(*(_execute_task(a, cache) for a in args))
dask/array/reductions.py:1145: in _prefixscan_combine
return binop(pre, func(x, axis=axis, dtype=dtype))
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
a = array([0.])
b = array([2.64000000e+002, 6.99600000e+004, 1.86093600e+007, 4.96869912e+009,
1.33161136e+012, 3.58203457e+014, 9.... inf, inf, inf,
inf, inf, inf, inf])
def _cumprod_merge(a, b):
if isinstance(a, np.ma.masked_array) or isinstance(b, np.ma.masked_array):
values = np.ma.getdata(a) * np.ma.getdata(b)
return np.ma.masked_array(values, mask=np.ma.getmaskarray(b))
> return a * b
E RuntimeWarning: invalid value encountered in multiply
dask/array/reductions.py:1398: RuntimeWarning
_________ test_array_cumreduction_axis[blelloch-None-False-nancumprod] _________
[gw2] linux -- Python 3.9.1 /usr/bin/python3
func = 'nancumprod', use_nan = False, axis = None, method = 'blelloch'
@pytest.mark.parametrize("func", ["cumsum", "cumprod", "nancumsum", "nancumprod"])
@pytest.mark.parametrize("use_nan", [False, True])
@pytest.mark.parametrize("axis", [None, 0, 1, -1])
@pytest.mark.parametrize("method", ["sequential", "blelloch"])
def test_array_cumreduction_axis(func, use_nan, axis, method):
np_func = getattr(np, func)
da_func = getattr(da, func)
s = (10, 11, 12)
a = np.arange(np.prod(s), dtype=float).reshape(s)
if use_nan:
a[1] = np.nan
d = da.from_array(a, chunks=(4, 5, 6))
a_r = np_func(a, axis=axis)
d_r = da_func(d, axis=axis, method=method)
> assert_eq(a_r, d_r)
dask/array/tests/test_reductions.py:553:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
dask/array/tests/test_reductions.py:18: in assert_eq
_assert_eq(a, b, equal_nan=True)
dask/array/utils.py:259: in assert_eq
b, bdt, b_meta, b_computed = _get_dt_meta_computed(
dask/array/utils.py:234: in _get_dt_meta_computed
x = x.compute(scheduler="sync")
dask/base.py:279: in compute
(result,) = compute(self, traverse=False, **kwargs)
dask/base.py:561: in compute
results = schedule(dsk, keys, **kwargs)
dask/local.py:528: in get_sync
return get_async(apply_sync, 1, dsk, keys, **kwargs)
dask/local.py:495: in get_async
fire_task()
dask/local.py:457: in fire_task
apply_async(
dask/local.py:517: in apply_sync
res = func(*args, **kwds)
dask/local.py:227: in execute_task
result = pack_exception(e, dumps)
dask/local.py:222: in execute_task
result = _execute_task(task, data)
dask/core.py:121: in _execute_task
return func(*(_execute_task(a, cache) for a in args))
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
func = <function nancumprod at 0x7f22206bb040>, binop = <built-in function mul>
pre = array([0.])
x = array([264., 265., 266., 267., 268., 269., 270., 271., 272., 273., 274.,
275., 276., 277., 278., 279., 280., 28...7., 378., 379., 380., 381., 382., 383., 384.,
385., 386., 387., 388., 389., 390., 391., 392., 393., 394., 395.])
axis = 0, dtype = dtype('float64')
def _prefixscan_combine(func, binop, pre, x, axis, dtype):
"""Combine results of a parallel prefix scan such as cumsum
Parameters
----------
func : callable
Cumulative function (e.g. ``np.cumsum``)
binop : callable
Associative function (e.g. ``add``)
pre : np.array
The value calculated in parallel from ``preop``.
For example, the sum of all the previous blocks.
x : np.array
Current block
axis : int
dtype : dtype
Returns
-------
np.array
"""
# We could compute this in two tasks.
# This would allow us to do useful work (i.e., func), while waiting on `pre`.
# Using one task may guide the scheduler to do better and reduce scheduling overhead.
> return binop(pre, func(x, axis=axis, dtype=dtype))
E RuntimeWarning: invalid value encountered in multiply
dask/array/reductions.py:1145: RuntimeWarning
___________ test_array_cumreduction_axis[blelloch-None-True-cumprod] ___________
[gw2] linux -- Python 3.9.1 /usr/bin/python3
func = 'cumprod', use_nan = True, axis = None, method = 'blelloch'
@pytest.mark.parametrize("func", ["cumsum", "cumprod", "nancumsum", "nancumprod"])
@pytest.mark.parametrize("use_nan", [False, True])
@pytest.mark.parametrize("axis", [None, 0, 1, -1])
@pytest.mark.parametrize("method", ["sequential", "blelloch"])
def test_array_cumreduction_axis(func, use_nan, axis, method):
np_func = getattr(np, func)
da_func = getattr(da, func)
s = (10, 11, 12)
a = np.arange(np.prod(s), dtype=float).reshape(s)
if use_nan:
a[1] = np.nan
d = da.from_array(a, chunks=(4, 5, 6))
a_r = np_func(a, axis=axis)
d_r = da_func(d, axis=axis, method=method)
> assert_eq(a_r, d_r)
dask/array/tests/test_reductions.py:553:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
dask/array/tests/test_reductions.py:18: in assert_eq
_assert_eq(a, b, equal_nan=True)
dask/array/utils.py:259: in assert_eq
b, bdt, b_meta, b_computed = _get_dt_meta_computed(
dask/array/utils.py:234: in _get_dt_meta_computed
x = x.compute(scheduler="sync")
dask/base.py:279: in compute
(result,) = compute(self, traverse=False, **kwargs)
dask/base.py:561: in compute
results = schedule(dsk, keys, **kwargs)
dask/local.py:528: in get_sync
return get_async(apply_sync, 1, dsk, keys, **kwargs)
dask/local.py:495: in get_async
fire_task()
dask/local.py:457: in fire_task
apply_async(
dask/local.py:517: in apply_sync
res = func(*args, **kwds)
dask/local.py:227: in execute_task
result = pack_exception(e, dumps)
dask/local.py:222: in execute_task
result = _execute_task(task, data)
dask/core.py:121: in _execute_task
return func(*(_execute_task(a, cache) for a in args))
dask/optimization.py:963: in __call__
return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))
dask/core.py:151: in get
result = _execute_task(task, cache)
dask/core.py:121: in _execute_task
return func(*(_execute_task(a, cache) for a in args))
dask/utils.py:31: in apply
return func(*args, **kwargs)
<__array_function__ internals>:5: in prod
???
/usr/lib64/python3.9/site-packages/numpy/core/fromnumeric.py:3030: in prod
return _wrapreduction(a, np.multiply, 'prod', axis, dtype, out,
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
obj = array([264., 265., 266., 267., 268., 269., 270., 271., 272., 273., 274.,
275., 276., 277., 278., 279., 280., 28...7., 378., 379., 380., 381., 382., 383., 384.,
385., 386., 387., 388., 389., 390., 391., 392., 393., 394., 395.])
ufunc = <ufunc 'multiply'>, method = 'prod', axis = 0, dtype = None, out = None
kwargs = {'initial': <no value>, 'keepdims': True, 'where': <no value>}
passkwargs = {'keepdims': True}
def _wrapreduction(obj, ufunc, method, axis, dtype, out, **kwargs):
passkwargs = {k: v for k, v in kwargs.items()
if v is not np._NoValue}
if type(obj) is not mu.ndarray:
try:
reduction = getattr(obj, method)
except AttributeError:
pass
else:
# This branch is needed for reductions like any which don't
# support a dtype.
if dtype is not None:
return reduction(axis=axis, dtype=dtype, out=out, **passkwargs)
else:
return reduction(axis=axis, out=out, **passkwargs)
> return ufunc.reduce(obj, axis, dtype, out, **passkwargs)
E RuntimeWarning: overflow encountered in reduce
/usr/lib64/python3.9/site-packages/numpy/core/fromnumeric.py:87: RuntimeWarning
_________ test_array_cumreduction_axis[blelloch-None-True-nancumprod] __________
[gw2] linux -- Python 3.9.1 /usr/bin/python3
func = 'nancumprod', use_nan = True, axis = None, method = 'blelloch'
@pytest.mark.parametrize("func", ["cumsum", "cumprod", "nancumsum", "nancumprod"])
@pytest.mark.parametrize("use_nan", [False, True])
@pytest.mark.parametrize("axis", [None, 0, 1, -1])
@pytest.mark.parametrize("method", ["sequential", "blelloch"])
def test_array_cumreduction_axis(func, use_nan, axis, method):
np_func = getattr(np, func)
da_func = getattr(da, func)
s = (10, 11, 12)
a = np.arange(np.prod(s), dtype=float).reshape(s)
if use_nan:
a[1] = np.nan
d = da.from_array(a, chunks=(4, 5, 6))
a_r = np_func(a, axis=axis)
d_r = da_func(d, axis=axis, method=method)
> assert_eq(a_r, d_r)
dask/array/tests/test_reductions.py:553:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
dask/array/tests/test_reductions.py:18: in assert_eq
_assert_eq(a, b, equal_nan=True)
dask/array/utils.py:259: in assert_eq
b, bdt, b_meta, b_computed = _get_dt_meta_computed(
dask/array/utils.py:234: in _get_dt_meta_computed
x = x.compute(scheduler="sync")
dask/base.py:279: in compute
(result,) = compute(self, traverse=False, **kwargs)
dask/base.py:561: in compute
results = schedule(dsk, keys, **kwargs)
dask/local.py:528: in get_sync
return get_async(apply_sync, 1, dsk, keys, **kwargs)
dask/local.py:495: in get_async
fire_task()
dask/local.py:457: in fire_task
apply_async(
dask/local.py:517: in apply_sync
res = func(*args, **kwds)
dask/local.py:227: in execute_task
result = pack_exception(e, dumps)
dask/local.py:222: in execute_task
result = _execute_task(task, data)
dask/core.py:121: in _execute_task
return func(*(_execute_task(a, cache) for a in args))
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
func = <function nancumprod at 0x7f22206bb040>, binop = <built-in function mul>
pre = array([0.])
x = array([264., 265., 266., 267., 268., 269., 270., 271., 272., 273., 274.,
275., 276., 277., 278., 279., 280., 28...7., 378., 379., 380., 381., 382., 383., 384.,
385., 386., 387., 388., 389., 390., 391., 392., 393., 394., 395.])
axis = 0, dtype = dtype('float64')
def _prefixscan_combine(func, binop, pre, x, axis, dtype):
"""Combine results of a parallel prefix scan such as cumsum
Parameters
----------
func : callable
Cumulative function (e.g. ``np.cumsum``)
binop : callable
Associative function (e.g. ``add``)
pre : np.array
The value calculated in parallel from ``preop``.
For example, the sum of all the previous blocks.
x : np.array
Current block
axis : int
dtype : dtype
Returns
-------
np.array
"""
# We could compute this in two tasks.
# This would allow us to do useful work (i.e., func), while waiting on `pre`.
# Using one task may guide the scheduler to do better and reduce scheduling overhead.
> return binop(pre, func(x, axis=axis, dtype=dtype))
E RuntimeWarning: invalid value encountered in multiply
dask/array/reductions.py:1145: RuntimeWarning Should this test simply set NumPy to ignore invalid values? |
Ignoring the invalid/overflow warnings, I see that Dask is very wrong somehow. For |
I think the issue is that there are Consider: In [1]: s = (10, 11, 12)
...: a = np.arange(1, np.prod(s)+1, dtype=float).reshape(s)
In [2]: np.cumprod(a)
Out[2]: array([ 1., 2., 6., ..., inf, inf, inf])
In [3]: s = (10, 11, 12)
...: a = np.arange(1, np.prod(s)+1).reshape(s)
In [4]: np.cumprod(a)
Out[4]: array([1, 2, 6, ..., 0, 0, 0]) In the case of this test I think it would probably be resolved by having smaller partitions or by having a smaller array. |
Oh and then this is exacerbated because we try to multiply the result of the previous partitions ( In [5]: np.cumprod(a) * np.array([0.])
<ipython-input-32-aa8604efc829>:1: RuntimeWarning: invalid value encountered in multiply
np.cumprod(a) * np.array([0.])
Out[5]: array([ 0., 0., 0., ..., nan, nan, nan]) |
In dask we are taking the |
But then I guess you are committed to computing linearly... |
Ok I put up a fix for the tests in #7089. I didn't want to just push it to your branch but here's the diff in case that's easier: diff --git a/dask/array/reductions.py b/dask/array/reductions.py
index dd2b9f04..908e6873 100644
--- a/dask/array/reductions.py
+++ b/dask/array/reductions.py
@@ -1194,7 +1194,7 @@ def prefixscan_blelloch(func, preop, binop, x, axis=None, dtype=None, out=None):
dask array
"""
if axis is None:
- x = x.flatten()
+ x = x.flatten().rechunk(chunks=x.npartitions)
axis = 0
if dtype is None:
dtype = getattr(func(np.empty((0,), dtype=x.dtype)), "dtype", object)
@@ -1340,7 +1340,7 @@ def cumreduction(
)
if axis is None:
- x = x.flatten()
+ x = x.flatten().rechunk(chunks=x.npartitions)
axis = 0
if dtype is None:
dtype = getattr(func(np.empty((0,), dtype=x.dtype)), "dtype", object)
diff --git a/dask/array/tests/test_reductions.py b/dask/array/tests/test_reductions.py
index a773a203..c166b863 100644
--- a/dask/array/tests/test_reductions.py
+++ b/dask/array/tests/test_reductions.py
@@ -546,6 +546,10 @@ def test_array_cumreduction_axis(func, use_nan, axis, method):
if use_nan:
a[1] = np.nan
d = da.from_array(a, chunks=(4, 5, 6))
+ if func in ["cumprod", "nancumprod"] and method == "blelloch" and axis is None:
+ with pytest.warns(RuntimeWarning):
+ da_func(d, axis=axis, method=method).compute()
+ return
a_r = np_func(a, axis=axis)
d_r = da_func(d, axis=axis, method=method) |
OK, so there are two bugs here; that test was relying on |
Or on the other hand, maybe that means the test was broken and not really checking what it intended to. |
I think that's the correct interpretation. My preference is to merge this PR with the partial fix from #7089 and write up a separate issue to fix cumprod. |
superseded by #7089 |
The NumPy 1.20.0 release candidate is already in Fedora Rawhide for testing, and 34 tests fail. This fixes most of them.
black dask
/flake8 dask