Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

da.map_blocks with drop_axis stopped working in dask 1.1.4 #4584

Closed
pantaray opened this issue Mar 12, 2019 · 23 comments · Fixed by #4831
Closed

da.map_blocks with drop_axis stopped working in dask 1.1.4 #4584

pantaray opened this issue Mar 12, 2019 · 23 comments · Fixed by #4831

Comments

@pantaray
Copy link

Hi!

First and foremost a big Thank You for the amazing work that you're doing with dask! It's a fantastic piece of software!

We're using dask as parallelization engine for a large-scale data analysis package (still in early development). Our computational routines work on independent chunks of one big source data-set and then write to a single file (accessible to all workers). We used map_blocks for this in dask 1.0.0 and things worked great but unfortunately the same code does not work any more in dask 1.1.4. Here's a MCVE:

import dask
import dask.array as da
import numpy as np

# in reality used to write `chk` to a (pre-formatted) file
def da_dummy(chk, block_info=None):
    idx = block_info[0]["chunk-location"][-1]
    #
    # use `idx` to store `chk` at correct location
    # 
    return idx

np_a = np.ones((10,50*20))
a = da.from_array(np_a, chunks=(10,20))

# works in dask 1.0.0, raises an IndexError in dask 1.1.4
result = a.map_blocks(da_dummy, dtype="int", chunks=(1,), drop_axis=[0])
res = result.compute()

In dask 1.0.0 this snippet produces an array res that looks like this:

array([ 0,  1,  2,  3,  4,  5,  6,  7,  8,  9, 10, 11, 12, 13, 14, 15, 16,
       17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33,
       34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49])

In dask 1.1.4 the same code triggers an IndexError: index 2 is out of bounds for axis 0 with size 2, here the full traceback:

---------------------------------------------------------------------------
IndexError                                Traceback (most recent call last)
~/Documents/job/dask_fail.py in <module>
     15 
     16 # works in dask 1.0.0, raises an IndexError in dask 1.1.4
---> 17 result = a.map_blocks(da_dummy, dtype="int", chunks=(1,), drop_axis=[0])
     18 res = result.compute()
     19 

/usr/local/software/anaconda/envs/p3/lib/python3.6/site-packages/dask/array/core.py in map_blocks(self, func, *args, **kwargs)
   1569     @wraps(map_blocks)
   1570     def map_blocks(self, func, *args, **kwargs):
-> 1571         return map_blocks(func, self, *args, **kwargs)
   1572 
   1573     def map_overlap(self, func, depth, boundary=None, trim=True, **kwargs):

/usr/local/software/anaconda/envs/p3/lib/python3.6/site-packages/dask/array/core.py in map_blocks(func, *args, **kwargs)
    534                                            for ij, j in enumerate(old_k[1:])],
    535                         'chunk-location': old_k[1:]}
--> 536                     for i in shapes}
    537 
    538             v = copy.copy(v)  # Need to copy and unpack subgraph callable

/usr/local/software/anaconda/envs/p3/lib/python3.6/site-packages/dask/array/core.py in <dictcomp>(.0)
    534                                            for ij, j in enumerate(old_k[1:])],
    535                         'chunk-location': old_k[1:]}
--> 536                     for i in shapes}
    537 
    538             v = copy.copy(v)  # Need to copy and unpack subgraph callable

/usr/local/software/anaconda/envs/p3/lib/python3.6/site-packages/dask/array/core.py in <listcomp>(.0)
    532                         'num-chunks': num_chunks[i],
    533                         'array-location': [(starts[i][ij][j], starts[i][ij][j + 1])
--> 534                                            for ij, j in enumerate(old_k[1:])],
    535                         'chunk-location': old_k[1:]}
    536                     for i in shapes}

IndexError: index 2 is out of bounds for axis 0 with size 2

A (desperate) attempt at somehow working around this was to not use drop_axis and instead leave the chunk-size as is and return a bogus tuple, but no luck either:

import dask
import dask.array as da
import numpy as np

np_a = np.ones((10,50*20))
a = da.from_array(np_a, chunks=(10,20))

# desperate workaround for dask 1.1.4 - still not working
def da_dummy2(chk, block_info=None):
    print(block_info) # show what's going on in here...
    idx = block_info[0]["chunk-location"][-1]
    return (idx, 1)
result = a.map_blocks(da_dummy2, dtype="int", chunks=(1, 1))
res = result.compute()

This gives an IndexError: tuple index out of range, full traceback here:

---------------------------------------------------------------------------
IndexError                                Traceback (most recent call last)
~/Documents/job/dask_fail.py in <module>
     20     return (idx, 1)
     21 result = a.map_blocks(da_dummy2, dtype="int", chunks=(1, 1))
---> 22 res = result.compute()
     23 

/usr/local/software/anaconda/envs/p3/lib/python3.6/site-packages/dask/base.py in compute(self, **kwargs)
    154         dask.base.compute
    155         """
--> 156         (result,) = compute(self, traverse=False, **kwargs)
    157         return result
    158 

/usr/local/software/anaconda/envs/p3/lib/python3.6/site-packages/dask/base.py in compute(*args, **kwargs)
    397     postcomputes = [x.__dask_postcompute__() for x in collections]
    398     results = schedule(dsk, keys, **kwargs)
--> 399     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    400 
    401 

/usr/local/software/anaconda/envs/p3/lib/python3.6/site-packages/dask/base.py in <listcomp>(.0)
    397     postcomputes = [x.__dask_postcompute__() for x in collections]
    398     results = schedule(dsk, keys, **kwargs)
--> 399     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    400 
    401 

/usr/local/software/anaconda/envs/p3/lib/python3.6/site-packages/dask/array/core.py in finalize(results)
    777     while isinstance(results2, (tuple, list)):
    778         if len(results2) > 1:
--> 779             return concatenate3(results)
    780         else:
    781             results2 = results2[0]

/usr/local/software/anaconda/envs/p3/lib/python3.6/site-packages/dask/array/core.py in concatenate3(arrays)
   3495     if not ndim:
   3496         return arrays
-> 3497     chunks = chunks_from_arrays(arrays)
   3498     shape = tuple(map(sum, chunks))
   3499 

/usr/local/software/anaconda/envs/p3/lib/python3.6/site-packages/dask/array/core.py in chunks_from_arrays(arrays)
   3325 
   3326     while isinstance(arrays, (list, tuple)):
-> 3327         result.append(tuple([shape(deepfirst(a))[dim] for a in arrays]))
   3328         arrays = arrays[0]
   3329         dim += 1

/usr/local/software/anaconda/envs/p3/lib/python3.6/site-packages/dask/array/core.py in <listcomp>(.0)
   3325 
   3326     while isinstance(arrays, (list, tuple)):
-> 3327         result.append(tuple([shape(deepfirst(a))[dim] for a in arrays]))
   3328         arrays = arrays[0]
   3329         dim += 1

IndexError: tuple index out of range

Any help with this is greatly appreciated. Thank you!

@TomAugspurger
Copy link
Member

Thanks for the report, confirm locally.

These are a tad tricky to debug. I didn't come up with anything concrete in 15 minutes or so. Will give it another shot later.

@pantaray
Copy link
Author

pantaray commented Apr 3, 2019

Hi Tom!

I was wondering if you already had a chance to further investigate the root cause of the problem? Thank you!

@mrocklin
Copy link
Member

mrocklin commented Apr 3, 2019 via email

@jakirkham
Copy link
Member

Sure, the next step would be to run git bisect and see what commit that identifies. If you are able to try that @pantaray, that would be very helpful. If not, I can give it a go using what you have provided.

@mrocklin mrocklin added this to Proposed in Core maintenance Apr 17, 2019
@mrocklin
Copy link
Member

I took a brief look at this. This is one of the unfortuante collisions in complex logic of all of the keyword arguments within map_blocks. In particular here we're using both drop_axis and block_info. It would be good to fix, but this is exotic enough and that code is hairy enough that, for me at least, it's something of a low priority.

@pantaray I have a couple of suggestions:
1.

@mrocklin
Copy link
Member

Whoops, misfire. A couple suggestions:

  1. Find a way to not use map_blocks with both of these keyword arguments at the same time. Maybe you return an output array that is of the same number of dimensions as your input?

  2. Dive in and fix this in the codebase.

    The relevant code (I think) is here

dask/dask/array/core.py

Lines 588 to 589 in d8a4f3b

arr_k = old_k[-len(shape):]
arr_k = tuple(j if num_chunks[i][ij] > 1 else 0 for ij, j in enumerate(arr_k))

To be clear, I'm definitely acknowledging that this is a bug and that it should be fixed. I'm also being somewhat honest here that given the complexity and edge-case-ness of this it's unlikely to be at the top of any of the maintainer's queues any time soon. Honestly we probably shouldn't be supporting all of the keyword arguments in map_blocks and should instead be diverting people towards suggestions like 1 above, to find some other, perhaps less magical way.

Hope this helps somewhat. Sorry for the lack of support.

@pantaray
Copy link
Author

pantaray commented May 7, 2019

@mrocklin Thank you for the explanation!

Just to clarify: our base problem is that we have a dask array holding about 100GB of data scattered across multiple workers in distributed memory. How can we save the array to disk?

Hence, the only reason we we're using this particular map_blocks-construct is to collect and save results computed by multiple workers to a single file. You mentioning the "edge-case-ness" of this approach makes me wonder what are the best practices for achieving this? We're definitely not hell-bent on using map_blocks for this and are absolutely open to employing third-party storage formats like HDF5 containers to save our results. However, both store and to_hdf don't work with multiprocessing schedulers (as discussed in #2488 or #3074). What would you recommend to use instead?

Thank you again for your time and effort!

@jakirkham
Copy link
Member

Zarr works reasonably well for parallel writes. I'm not unbiased though as I do work on it some. That said, I came to it as a Dask user interested in addressing this same problem.

@pantaray
Copy link
Author

Sorry for the delay, we've spent the last few days testing various options for tackling this.

@jakirkham: thank you for the suggestion!

We ran a bunch of performance tests on our cluster using

  1. Sequential HDF5 writing: workers pump their chunks of the distributed array a into a single container via map_blocks followed by persist() (not compute() to avoid the above errors). Writing is protected (and hence serialized) using a distributed lock.
  2. Single worker HDF5 writing: a single worker fetches pieces of the array using compute() and stores these pieces in a HDF5 dataset
  3. zarr: create a zarr array z and use da.to_zarr(a, z) to store the distributed array a
  4. Virtual HDF5 datasets: every worker saves its chunk(s) of a into separate HDF5 containers via map_blocks/persist(). The generated HDF files are subsequently used to create a single virtual dataset in our target HDF5 container.

In our tests zarr arrays and generating multiple HDF5 containers showed the best performance (by a mile) with zarr being by far the most convenient solution from a coding point of view (a two liner). We ended up going with virtual HDF5 datasets since most of our source files are HDF5 containers already. I'm happy to provide a MCVE in case anyone's interested.

Thank you again for your help!

@jakirkham
Copy link
Member

Certainly virtual HDF5 datasets are an interesting solution to explore here. I wonder if we should include an example in the docs. Thoughts from others here?

@jakirkham
Copy link
Member

Have run git bisect on this and narrowed the issue down to commit ( 9f870d1 ). Right before that commit everything works correctly and with that commit this error emerges.

@jakirkham
Copy link
Member

Should add that I don't see this error on master. However it no longer gives the original result (on 1.0.0) either. So there is still a bug (albeit a different one). Using git bisect narrowed down the behavior change to commit ( a675507 ). For clarity, this is what res looks like with this change.

array([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
       0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
       0, 0, 0, 0, 0, 0])

@jakirkham
Copy link
Member

Yep, looks like this also came up in issue ( #4712 ). Where the last change was added to address this issue. Let's see if @bmerry has thoughts on this particular case.

@bmerry
Copy link
Contributor

bmerry commented May 21, 2019

When working on #4712 I do recall looking at the code and thinking that it didn't look like it would handle drop_axis. If I have time I'll take a look later this week to see if I can see how to fix it. It does get fairly hairy once you're combining new_axis, drop_axis and block_info, so I don't want to promise anything.

bmerry added a commit to bmerry/dask that referenced this issue May 21, 2019
This should address dask#4584, but it will still need unit tests. The amount
of code is actually reduced, because I deleted some code that should
never be used: it was trying to cater for arrays in kwargs, but the API
says the kwargs must not be arrays.

Instead of trying to track axes that appear and disappear, the axis
labels that are already computed (for passing to blockwise) are
leveraged to simplify the indexing logic. The current block location is
represented as a dictionary from axis label to block index, and then the
location for the current input array is looked up through that
dictionary. This may have a minor performance impact because there is
now an extra per-chunk dictionary being thrown around.
@bmerry
Copy link
Contributor

bmerry commented May 21, 2019

@pantaray Give the commit above a try. It seems to work on your minimal example, but that's no guarantee that it'll work in all cases since the example is only accessing one element from block_info. I still need to write unit tests for the interaction between various features (drop_axis with concatenation, new_axis and broadcasting) before I submit a pull request.

bmerry added a commit to bmerry/dask that referenced this issue May 21, 2019
@jrbourbeau jrbourbeau moved this from Backlog to In progress in Core maintenance May 21, 2019
bmerry added a commit to bmerry/dask that referenced this issue May 22, 2019
The amount of code is actually reduced, because I deleted some code that
should never be used: it was trying to cater for arrays in kwargs, but
the API says the kwargs must not be arrays.

Instead of trying to track axes that appear and disappear, the axis
labels that are already computed (for passing to blockwise) are
leveraged to simplify the indexing logic. The current block location is
represented as a dictionary from axis label to block index, and then the
location for the current input array is looked up through that
dictionary. This may have a minor performance impact because there is
now an extra per-chunk dictionary being thrown around.

Closes dask#4584.
TomAugspurger pushed a commit that referenced this issue May 22, 2019
The amount of code is actually reduced, because I deleted some code that
should never be used: it was trying to cater for arrays in kwargs, but
the API says the kwargs must not be arrays.

Instead of trying to track axes that appear and disappear, the axis
labels that are already computed (for passing to blockwise) are
leveraged to simplify the indexing logic. The current block location is
represented as a dictionary from axis label to block index, and then the
location for the current input array is looked up through that
dictionary. This may have a minor performance impact because there is
now an extra per-chunk dictionary being thrown around.

Closes #4584.
@jrbourbeau jrbourbeau moved this from In progress to Done in Core maintenance May 23, 2019
@jakirkham
Copy link
Member

@pantaray, if you haven't already, could you please retry with master and let us know if that fixes the issue or not? Thanks 😄

@pantaray
Copy link
Author

@bmerry @jakirkham Thanks for looking into this!

With the current master (1.2.2+28.g41e5026b) the first example (da_dummy with drop_axis) works now, the "workaround" da_dummy2 (no drop_axis but returning (idx, 1)) still gives me the same IndexError: tuple index out of range, full traceback here:

---------------------------------------------------------------------------
IndexError                                Traceback (most recent call last)
~/Downloads/dask_fail.py in <module>
     24     return (idx, 1)
     25 result = a.map_blocks(da_dummy2, dtype="int", chunks=(1, 1))
---> 26 res = result.compute()
     27 

~/Downloads/dask/dask/base.py in compute(self, **kwargs)
    154         dask.base.compute
    155         """
--> 156         (result,) = compute(self, traverse=False, **kwargs)
    157         return result
    158 

~/Downloads/dask/dask/base.py in compute(*args, **kwargs)
    397     postcomputes = [x.__dask_postcompute__() for x in collections]
    398     results = schedule(dsk, keys, **kwargs)
--> 399     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    400 
    401 

~/Downloads/dask/dask/base.py in <listcomp>(.0)
    397     postcomputes = [x.__dask_postcompute__() for x in collections]
    398     results = schedule(dsk, keys, **kwargs)
--> 399     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    400 
    401 

~/Downloads/dask/dask/array/core.py in finalize(results)
    824     while isinstance(results2, (tuple, list)):
    825         if len(results2) > 1:
--> 826             return concatenate3(results)
    827         else:
    828             results2 = results2[0]

~/Downloads/dask/dask/array/core.py in concatenate3(arrays)
   3673     if not ndim:
   3674         return arrays
-> 3675     chunks = chunks_from_arrays(arrays)
   3676     shape = tuple(map(sum, chunks))
   3677 

~/Downloads/dask/dask/array/core.py in chunks_from_arrays(arrays)
   3503 
   3504     while isinstance(arrays, (list, tuple)):
-> 3505         result.append(tuple([shape(deepfirst(a))[dim] for a in arrays]))
   3506         arrays = arrays[0]
   3507         dim += 1

~/Downloads/dask/dask/array/core.py in <listcomp>(.0)
   3503 
   3504     while isinstance(arrays, (list, tuple)):
-> 3505         result.append(tuple([shape(deepfirst(a))[dim] for a in arrays]))
   3506         arrays = arrays[0]
   3507         dim += 1

IndexError: tuple index out of range

@bmerry
Copy link
Contributor

bmerry commented May 24, 2019

As far as I know the map_blocks function is supposed to return a numpy array, not a tuple, so I don't really expect da_dummy2 to work, and it's probably coincidental that da_dummy works.

@jakirkham
Copy link
Member

Glad to hear this fixed the issue! 🎉 Thanks for checking @pantaray and thanks for the fix @bmerry. 😄

That's interesting that map_blocks worked with multiple return results at one point. As Bruce says, this is not behavior that was intentional. That said, I can certainly appreciate the value of having multiple results returned. If that is of interest to you, would you be willing to open a new issue for this feature request? 😉

@pantaray
Copy link
Author

@bmerry @jakirkham Thanks for the quick feedback!

My bad, you're of course correct: map_blocks isn't really intended to return anything but arrays. Unfortunately, even with return np.array([idx, 1]) in da_dummy2 I still get the same error.

@bmerry
Copy link
Contributor

bmerry commented May 24, 2019

That's probably because you're returning an array with shape (2,) whereas it should return an array with shape (1, 1) to match the chunks argument.

@pantaray
Copy link
Author

@bmerry good catch. With return np.array([[idx]]) (to enforce shape (1, 1)) da_dummy2 works as well now. Thanks again!

@jakirkham
Copy link
Member

Glad to see we got this issue sorted 😄

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
Development

Successfully merging a pull request may close this issue.

5 participants