Skip to content

Commit

Permalink
ensure tokens on futures are unique
Browse files Browse the repository at this point in the history
  • Loading branch information
fjetter committed Mar 11, 2024
1 parent 0438768 commit 8d096fd
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 20 deletions.
39 changes: 21 additions & 18 deletions distributed/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import atexit
import copy
import inspect
import itertools

Check warning on line 7 in distributed/client.py

View check run for this annotation

Codecov / codecov/patch

distributed/client.py#L7

Added line #L7 was not covered by tests
import json
import logging
import os
Expand Down Expand Up @@ -31,7 +32,7 @@
from tlz import first, groupby, merge, partition_all, valmap

import dask
from dask.base import collections_to_dsk, normalize_token, tokenize
from dask.base import collections_to_dsk, tokenize

Check warning on line 35 in distributed/client.py

View check run for this annotation

Codecov / codecov/patch

distributed/client.py#L35

Added line #L35 was not covered by tests
from dask.core import flatten, validate_key
from dask.highlevelgraph import HighLevelGraph
from dask.optimization import SubgraphCallable
Expand Down Expand Up @@ -210,11 +211,15 @@ class Future(WrappedKey):

_cb_executor = None
_cb_executor_pid = None
_counter = itertools.count()

Check warning on line 214 in distributed/client.py

View check run for this annotation

Codecov / codecov/patch

distributed/client.py#L214

Added line #L214 was not covered by tests
# Make sure this stays unique even across multiple processes or hosts
_uid = uuid.uuid4().hex

Check warning on line 216 in distributed/client.py

View check run for this annotation

Codecov / codecov/patch

distributed/client.py#L216

Added line #L216 was not covered by tests

def __init__(self, key, client=None, inform=True, state=None):
def __init__(self, key, client=None, inform=True, state=None, _id=None):

Check warning on line 218 in distributed/client.py

View check run for this annotation

Codecov / codecov/patch

distributed/client.py#L218

Added line #L218 was not covered by tests
self.key = key
self._cleared = False
self._client = client
self._id = _id or (Future._uid, next(Future._counter))

Check warning on line 222 in distributed/client.py

View check run for this annotation

Codecov / codecov/patch

distributed/client.py#L222

Added line #L222 was not covered by tests
self._input_state = state
self._inform = inform
self._state = None
Expand Down Expand Up @@ -499,8 +504,16 @@ def release(self):
except TypeError: # pragma: no cover
pass # Shutting down, add_callback may be None

@staticmethod
def make_future(key, id):

Check warning on line 508 in distributed/client.py

View check run for this annotation

Codecov / codecov/patch

distributed/client.py#L507-L508

Added lines #L507 - L508 were not covered by tests
# Can't use kwargs in pickle __reduce__ methods
return Future(key=key, _id=id)

Check warning on line 510 in distributed/client.py

View check run for this annotation

Codecov / codecov/patch

distributed/client.py#L510

Added line #L510 was not covered by tests

def __reduce__(self) -> str | tuple[Any, ...]:
return Future, (self.key,)
return Future.make_future, (self.key, self._id)

Check warning on line 513 in distributed/client.py

View check run for this annotation

Codecov / codecov/patch

distributed/client.py#L513

Added line #L513 was not covered by tests

def __dask_tokenize__(self):
return (type(self).__name__, self.key, self._id)

Check warning on line 516 in distributed/client.py

View check run for this annotation

Codecov / codecov/patch

distributed/client.py#L515-L516

Added lines #L515 - L516 were not covered by tests

def __del__(self):
try:
Expand Down Expand Up @@ -643,18 +656,6 @@ async def done_callback(future, callback):
callback(future)


@partial(normalize_token.register, Future)
def normalize_future(f):
"""Returns the key and the type as a list
Parameters
----------
list
The key and the type
"""
return [f.key, type(f)]


class AllExit(Exception):
"""Custom exception class to exit All(...) early."""

Expand Down Expand Up @@ -3434,9 +3435,11 @@ def compute(

if traverse:
collections = tuple(
dask.delayed(a)
if isinstance(a, (list, set, tuple, dict, Iterator))
else a
(
dask.delayed(a)
if isinstance(a, (list, set, tuple, dict, Iterator))
else a
)
for a in collections
)

Expand Down
6 changes: 4 additions & 2 deletions distributed/tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -863,11 +863,13 @@ async def test_tokenize_on_futures(c, s, a, b):
y = c.submit(inc, 1)
tok = tokenize(x)
assert tokenize(x) == tokenize(x)
assert tokenize(x) == tokenize(y)
# Tokens must be unique per instance
# See https://github.com/dask/distributed/issues/8561
assert tokenize(x) != tokenize(y)

c.futures[x.key].finish()

assert tok == tokenize(y)
assert tok != tokenize(y)


@pytest.mark.skipif(not LINUX, reason="Need 127.0.0.2 to mean localhost")
Expand Down

0 comments on commit 8d096fd

Please sign in to comment.