Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update inlining Futures in task graph in Client._graph_to_futures #3303

Merged
merged 5 commits into from Dec 10, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 5 additions & 0 deletions distributed/tests/test_utils_comm.py
Expand Up @@ -23,6 +23,11 @@ def test_subs_multiple():
dsk = {"a": (sum, ["x", "y"])}
assert subs_multiple(dsk, data) == {"a": (sum, [1, 2])}

# Tuple key
data = {"x": 1, ("y", 0): 2}
dsk = {"a": (sum, ["x", ("y", 0)])}
assert subs_multiple(dsk, data) == {"a": (sum, [1, 2])}


@gen_cluster(client=True)
def test_gather_from_workers_permissive(c, s, a, b):
Expand Down
23 changes: 15 additions & 8 deletions distributed/utils_comm.py
Expand Up @@ -15,6 +15,8 @@

logger = logging.getLogger(__name__)

__no_value__ = "__no_value__"


async def gather_from_workers(who_has, rpc, close=True, serializers=None, who=None):
""" Gather data directly from peers
Expand Down Expand Up @@ -299,15 +301,20 @@ def subs_multiple(o, d):

"""
typ = type(o)
if typ is tuple and o and callable(o[0]): # istask(o)
if not (typ is tuple and o and callable(o[0])): # not istask(o)
try: # Is o a key to substitute?
result = d.get(o, __no_value__)
if result is not __no_value__:
return result
except TypeError: # unhashable type
pass
jrbourbeau marked this conversation as resolved.
Show resolved Hide resolved
if typ is list:
return typ([subs_multiple(i, d) for i in o])
elif typ is dict:
return {k: subs_multiple(v, d) for (k, v) in o.items()}
return o
else:
return (o[0],) + tuple(subs_multiple(i, d) for i in o[1:])
elif typ is list:
return typ([subs_multiple(i, d) for i in o])
elif typ is dict:
return {k: subs_multiple(v, d) for (k, v) in o.items()}
elif typ is str:
return d.get(o, o)
return o


retry_count = dask.config.get("distributed.comm.retry.count")
Expand Down