Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update inlining Futures in task graph in Client._graph_to_futures #3303

Merged
merged 5 commits into from Dec 10, 2019

Conversation

jrbourbeau
Copy link
Member

This PR updates Client._graph_to_futures to use distributed.utils_comm.pack_data to inline Futures instead of dask.optimization.inline here

values = {
k for k, v in dsk.items() if isinstance(v, Future) and k not in keyset
}
if values:
dsk = dask.optimization.inline(dsk, keys=values)

E.g. transforming a task graph like:

{'inc-18a8806d490eca53bbf2c39a744bc349': <Future: finished, type: builtins.int, key: inc-18a8806d490eca53bbf2c39a744bc349>,
 'inc-2541182851e5e0087c5c5387feefcf99': <Future: finished, type: builtins.int, key: inc-2541182851e5e0087c5c5387feefcf99>,
 'inc-74961de59174673d7c8aaf11df6fd815': <Future: finished, type: builtins.int, key: inc-74961de59174673d7c8aaf11df6fd815>,
 'sum-of-future-results': (<built-in function sum>,
                           ['inc-2541182851e5e0087c5c5387feefcf99',
                            'inc-18a8806d490eca53bbf2c39a744bc349',
                            'inc-74961de59174673d7c8aaf11df6fd815'])}

into:

{'inc-18a8806d490eca53bbf2c39a744bc349': <Future: finished, type: builtins.int, key: inc-18a8806d490eca53bbf2c39a744bc349>,
 'inc-2541182851e5e0087c5c5387feefcf99': <Future: finished, type: builtins.int, key: inc-2541182851e5e0087c5c5387feefcf99>,
 'inc-74961de59174673d7c8aaf11df6fd815': <Future: finished, type: builtins.int, key: inc-74961de59174673d7c8aaf11df6fd815>,
 'sum-of-future-results': (<built-in function sum>,
                           [<Future: finished, type: builtins.int, key: inc-2541182851e5e0087c5c5387feefcf99>,
                            <Future: finished, type: builtins.int, key: inc-18a8806d490eca53bbf2c39a744bc349>,
                            <Future: finished, type: builtins.int, key: inc-74961de59174673d7c8aaf11df6fd815>])}

This is motivated by pack_data performing better than inline for large graphs (see benchmark below) and should, I think, perform the same substitutions.

Example benchmark:
import time

from dask.optimization import inline
from distributed import Client, wait, Future
from distributed.utils_comm import pack_data


def inc(x):
    return x + 1


if __name__ == "__main__":

    with Client() as client:
        # Create a lots of Futures and wait for them to finish
        futures = client.map(inc, range(5_000))
        wait(futures)

        # Create task graph from Futures
        dsk = {f.key: f for f in futures}

        # Add new task which depends on Futures
        sum_key = "sum-of-future-results"
        dsk[sum_key] = (sum, [f.key for f in futures])

        # Use two different methods to inline the Futures in dsk:
        #   1. distributed.utils_comm.pack_data
        #   2. dask.optimization.inline

        # Using pack_data
        values = {k: v for k, v in dsk.items() if isinstance(v, Future)}
        t_start = time.time()
        pack_result = pack_data(dsk, values)
        dt = time.time() - t_start
        print(f"pack_data took {dt} seconds")

        # Using inline
        values = {k for k, v in dsk.items() if isinstance(v, Future)}
        t_start = time.time()
        inline_result = inline(dsk, values)
        dt = time.time() - t_start
        print(f"inline took {dt} seconds")

        assert pack_result == inline_result

outputs (on my laptop)

pack_data took 0.005619049072265625 seconds
inline took 11.879703044891357 seconds

cc @jcrist if you get a moment to look at this

xref dask/dask#5299

@jcrist
Copy link
Member

jcrist commented Dec 5, 2019

pack_data isn't 100% correct for applying to dask graphs. We want to only traverse containers that are part of the spec (distributed also traverses dicts, which isn't part of the spec :/). We also should only traverse tuples that are tasks. I feel like I've written this code before, but can't find it. Here's a quick implementation:

def fast_subs(o, d):
    typ = type(o)
    if typ is tuple and o and callable(o[0]):
        return (o[0],) + tuple(fast_subs(i, d) for i in o[1:])
    elif typ is list:
        return [fast_subs(i, d) for i in o]
    elif typ is dict:
        return {k: fast_subs(v, d) for (k, v) in o.items()}
    elif typ is str:
        # I *believe* when you get to the point where you call this
        # all keys will have already been converted to strings.
        return d.get(o, o)
    return o

o:
Core data structures containing literals and keys
d: dict
Mapping of keys to values
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the current implementation these have to be str keys, may be worth noting.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hrmm that's a really good point. I think we'll want to cover generic, non-string keys too (thinking of, for example, a persisted dask array which has keys like {("chunk", 0): <Future>}).

Since d is a mapping which contains keys in the task graph to substitute, we could first check whether or not o is itself a key in d. That would let us know to make a substitution when we come across a key like, for example, ("chunk", 0). I pushed a commit with what I mean in code

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think at this point all keys are strings already, but I may be wrong. I don't remember when keys are converted to strings (if it's on the client or the scheduler side), but at some point everything's a string, so the previous code may be fine. I was mostly commenting to update the docstring.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The client converts keys to strings before sending to the scheduler. The scheduler only understands string-valued keys.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing that out, I hadn't realized this string conversion took place. It looks like the the conversion happens here:

dsk2 = str_graph({k: v[0] for k, v in d.items()}, extra_keys)

a few lines after Futures have been inlined in the graph.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just updated subs_multiple with the improvements from @jcrist in #3303 (comment)

Alternatively, we could try moving the str_graph call before substitutions take place. However, one advantage to the current subs_multiple implementation is we could use it elsewhere (on generic keys) should the need arise

distributed/utils_comm.py Outdated Show resolved Hide resolved
@jcrist
Copy link
Member

jcrist commented Dec 10, 2019

LGTM, merging. Thanks @jrbourbeau.

@jcrist jcrist merged commit b92782d into dask:master Dec 10, 2019
@jrbourbeau jrbourbeau deleted the client-inline-futures branch December 10, 2019 18:21
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants