Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use BlockwiseDep for map_blocks with block_id or block_info #7686

Open
wants to merge 10 commits into
base: main
Choose a base branch
from

Conversation

bmerry
Copy link
Contributor

@bmerry bmerry commented May 21, 2021

When the map_blocks function has block_id or block_info arguments, it is populated with information about the current block. Previously this was done by synthesizing all the information at the time map_blocks was called and stored in a da.Array of objects. Now that Blockwise supports I/O-deps, it's feasible to defer this work until the Blockwise is materialised.

I've made this a draft PR because I have no idea what I'm doing with the serialization and need some advice. I've made a distributed test pass, but after seeing some comments in other PRs I'm aware that there are some subtleties about what may be deserialized on the scheduler (in this case I think an np.ndarray is getting deserialized there).

Maybe a solution would be to have _BlockInfo.__getitem__ generate a task to generate the block_info, rather than generating it itself? That would also have the advantage that constructing the block info would be left to the workers.

I've extended da.blockwise to accept BlockwiseDep arguments (and documented it), so that map_blocks can pass them through (since map_blocks calls da.blockwise rather than dask.blockwise). I probably still need to add some direct tests, rather than relying on the coverage from map_blocks.

The performance looks good. Here's some benchmark code:

#!/usr/bin/env python3

from time import monotonic

import dask.array as da
import numpy as np


def func(a, b, block_info=None):
    return a + b


a = da.from_array(np.arange(200)[np.newaxis, :], chunks=1)
b = da.from_array(np.arange(200)[:, np.newaxis], chunks=1)
t0 = monotonic()
c = da.map_blocks(func, a, b)
t1 = monotonic()
c.compute()
t2 = monotonic()
print(f'Prepare: {t1 - t0:.3f}')
print(f'Compute: {t2 - t1:.3f}')

Before:

Prepare: 0.446
Compute: 7.457

After:

Prepare: 0.001
Compute: 6.715
  • Tests added / passed
  • Passes black dask / flake8 dask / isort dask

@bmerry
Copy link
Contributor Author

bmerry commented May 21, 2021

cc @rjzamora @dask/array

@github-actions github-actions bot added the array label May 21, 2021
@bmerry
Copy link
Contributor Author

bmerry commented May 25, 2021

@madsbk any tips on the serialization? If I understand correctly (which I'm not at all sure I do), to_serialize says "serialize how you like on the client, but don't deserialize on the scheduler, pass it to the worker to deserialize." However, I don't know how to get the worker to do the deserialization: I end up with a Serialize (or Serialized, I forget) object in the graph instead of the original object. This might be because it's nested inside a task.

@madsbk
Copy link
Contributor

madsbk commented May 25, 2021

to_serialize says "serialize how you like on the client, but don't deserialize on the scheduler, pass it to the worker to deserialize."

Yes this is correct. The worker should deserialize the data automatically but the serialization logic in Distributed is very messy so it is hard to know exactly why it doesn't happen :/
One way to make the worker deserialize a to_serialize object is to give the object as an argument to the materialized tasks.

FYI: I am working on redesigning the serialization logic in Distributed, which will handle issues like this. E.g. the to_serialize function will be an optional flag to avoid unnecessary deserializations, it will not be required for correctness.

@bmerry
Copy link
Contributor Author

bmerry commented May 26, 2021

FYI: I am working on redesigning the serialization logic in Distributed, which will handle issues like this.

Sounds good. What's the timeframe, and is there a ticket I can watch so that I'll know when it's in place? If it's happening soon I might just leave this until then.

One way to make the worker deserialize a to_serialize object is to give the object as an argument to the materialized tasks.

To check that I understand you correctly: if X is a to_serialize object, and the layer materialization produces a task like this:

(func, X, 123)

it should work, whereas a task

(dict, [["key1", X], ["key2", 123]])

might not? (the latter is approximately what I'm currently doing, although there are two layers of dict involved).

bmerry added a commit to bmerry/dask that referenced this pull request May 26, 2021
This is an attempt to improve the serialization situation. Instead of
trying to put the data into a custom subclass of BlockwiseDep, insert it
as (constant) arguments to the wrapper function. This then relies on
SubgraphCallable to handle the serialization.

This may theoretically improve serialization costs when the graph is
materialized on the client (which I think is still the default for
arrays), because the raw data is inside the SubgraphCallable and hence
only serialised once, with production of the individual block_infos left
to the workers.

The benchmark code in dask#7686 is a bit slower than the previous version
(about 7s for compute), but still faster than main.
@bmerry
Copy link
Contributor Author

bmerry commented May 26, 2021

I've updated the PR with a different approach - see commit message of bc2b235 for details. @rjzamora I feel like this new approach might not be in the "spirit" of BlockwiseDep, in that I'm just using an empty BlockwiseDepDict to access the block index, and all the logic for processing that block index into an input to the task is handled by a wrapper function and some constant (indices is None) arguments to the Blockwise. Let me know if you think I should revert bc2b235.

bmerry added 10 commits July 4, 2021 15:21
This still assembles the block_info parameter when the Blockwise layer
is materialised, rather than as part of the task, so it is probably not
going to be any more scalable, but it avoids creating an additional
layer.

It still needs to be updated with serialization support to make it work
with distributed.
There seem to be some issues with it (causing the _BlockInfo itself to
be passed to the function, rather than its expansion). To be fixed in a
later commit.
This adds some tests and makes them pass, but the deserialization is
probably happening in the wrong place or with the wrong method.
This is an attempt to improve the serialization situation. Instead of
trying to put the data into a custom subclass of BlockwiseDep, insert it
as (constant) arguments to the wrapper function. This then relies on
SubgraphCallable to handle the serialization.

This may theoretically improve serialization costs when the graph is
materialized on the client (which I think is still the default for
arrays), because the raw data is inside the SubgraphCallable and hence
only serialised once, with production of the individual block_infos left
to the workers.

The benchmark code in dask#7686 is a bit slower than the previous version
(about 7s for compute), but still faster than main.
It treated a numblocks of `()` as if it was unspecified.
@bmerry
Copy link
Contributor Author

bmerry commented Jul 4, 2021

I've added some extra tests, so I think this is ready for review. I'm still not sure if I'm on the right path with serialization - it seems a bit different to the related code.

@bmerry bmerry marked this pull request as ready for review July 4, 2021 14:22
@bmerry
Copy link
Contributor Author

bmerry commented Jul 4, 2021

The failing tests seem to be flaky and not related to this PR.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants