Skip to content

Commit

Permalink
Update inlining Futures in task graph in Client._graph_to_futures (#3303
Browse files Browse the repository at this point in the history
)

* Use pack_data to inline Futures

* Add subs_mutliple

* Check key mapping for keys to substitue

* Avoid unnecessary hash attempts
  • Loading branch information
jrbourbeau authored and jcrist committed Dec 10, 2019
1 parent 3151f09 commit b92782d
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 3 deletions.
7 changes: 5 additions & 2 deletions distributed/client.py
Expand Up @@ -51,6 +51,7 @@
WrappedKey,
unpack_remotedata,
pack_data,
subs_multiple,
scatter_to_workers,
gather_from_workers,
retry_operation,
Expand Down Expand Up @@ -2435,10 +2436,12 @@ def _graph_to_futures(
futures = {key: Future(key, self, inform=False) for key in keyset}

values = {
k for k, v in dsk.items() if isinstance(v, Future) and k not in keyset
k: v
for k, v in dsk.items()
if isinstance(v, Future) and k not in keyset
}
if values:
dsk = dask.optimization.inline(dsk, keys=values)
dsk = subs_multiple(dsk, values)

d = {k: unpack_remotedata(v, byte_keys=True) for k, v in dsk.items()}
extra_futures = set.union(*[v[1] for v in d.values()]) if d else set()
Expand Down
16 changes: 15 additions & 1 deletion distributed/tests/test_utils_comm.py
@@ -1,7 +1,7 @@
from distributed.core import ConnectionPool
from distributed.comm import Comm
from distributed.utils_test import gen_cluster, loop # noqa: F401
from distributed.utils_comm import pack_data, gather_from_workers, retry
from distributed.utils_comm import pack_data, subs_multiple, gather_from_workers, retry

from unittest import mock

Expand All @@ -15,6 +15,20 @@ def test_pack_data():
assert pack_data({"a": ["x"], "b": "y"}, data) == {"a": [1], "b": "y"}


def test_subs_multiple():
data = {"x": 1, "y": 2}
assert subs_multiple((sum, [0, "x", "y", "z"]), data) == (sum, [0, 1, 2, "z"])
assert subs_multiple((sum, [0, ["x", "y", "z"]]), data) == (sum, [0, [1, 2, "z"]])

dsk = {"a": (sum, ["x", "y"])}
assert subs_multiple(dsk, data) == {"a": (sum, [1, 2])}

# Tuple key
data = {"x": 1, ("y", 0): 2}
dsk = {"a": (sum, ["x", ("y", 0)])}
assert subs_multiple(dsk, data) == {"a": (sum, [1, 2])}


@gen_cluster(client=True)
def test_gather_from_workers_permissive(c, s, a, b):
rpc = ConnectionPool()
Expand Down
32 changes: 32 additions & 0 deletions distributed/utils_comm.py
Expand Up @@ -280,6 +280,38 @@ def pack_data(o, d, key_types=object):
return o


def subs_multiple(o, d):
""" Perform substitutions on a tasks
Parameters
----------
o:
Core data structures containing literals and keys
d: dict
Mapping of keys to values
Examples
--------
>>> dsk = {"a": (sum, ["x", 2])}
>>> data = {"x": 1}
>>> subs_multiple(dsk, data) # doctest: +SKIP
{'a': (sum, [1, 2])}
"""
typ = type(o)
if typ is tuple and o and callable(o[0]): # istask(o)
return (o[0],) + tuple(subs_multiple(i, d) for i in o[1:])
elif typ is list:
return [subs_multiple(i, d) for i in o]
elif typ is dict:
return {k: subs_multiple(v, d) for (k, v) in o.items()}
else:
try:
return d.get(o, o)
except TypeError:
return o


retry_count = dask.config.get("distributed.comm.retry.count")
retry_delay_min = parse_timedelta(
dask.config.get("distributed.comm.retry.delay.min"), default="s"
Expand Down

0 comments on commit b92782d

Please sign in to comment.