Skip to content

Commit

Permalink
Catch case where only one run_spec is non-deterministic
Browse files Browse the repository at this point in the history
  • Loading branch information
crusaderky committed Feb 16, 2024
1 parent 2f42db3 commit 07974fd
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 14 deletions.
11 changes: 7 additions & 4 deletions distributed/scheduler.py
Expand Up @@ -4774,12 +4774,15 @@ def _generate_taskstates(
# run_spec in the submitted graph may be None. This happens
# when an already persisted future is part of the graph
elif k in dsk:
# If both tokens are non-deterministic, skip comparison
try:
tok_lhs: Any = tokenize(ts.run_spec, ensure_deterministic=True)
tok_rhs: Any = tokenize(dsk[k], ensure_deterministic=True)
tok_lhs = tokenize(ts.run_spec, ensure_deterministic=True)
except TokenizationError:
# Non-deterministic tokens; skip comparison
tok_lhs = tok_rhs = None
tok_lhs = ""

Check warning on line 4781 in distributed/scheduler.py

View check run for this annotation

Codecov / codecov/patch

distributed/scheduler.py#L4780-L4781

Added lines #L4780 - L4781 were not covered by tests
try:
tok_rhs = tokenize(dsk[k], ensure_deterministic=True)
except TokenizationError:
tok_rhs = ""

Check warning on line 4785 in distributed/scheduler.py

View check run for this annotation

Codecov / codecov/patch

distributed/scheduler.py#L4784-L4785

Added lines #L4784 - L4785 were not covered by tests

# Additionally check dependency names. This should only be necessary
# if run_specs can't be tokenized deterministically.
Expand Down
34 changes: 24 additions & 10 deletions distributed/tests/test_scheduler.py
Expand Up @@ -4757,21 +4757,35 @@ async def test_resubmit_different_task_same_key_many_clients(c, s):
assert await x2 == 2 # kept old run_spec


@pytest.mark.parametrize(
"before,after,expect_msg",
[
(object(), 123, True),
(123, object(), True),
(o := object(), o, False),
],
)
@gen_cluster(client=True, nthreads=[])
async def test_resubmit_nondeterministic_task_same_deps(c, s):
async def test_resubmit_nondeterministic_task_same_deps(
c, s, before, after, expect_msg
):
"""Some run_specs can't be tokenized deterministically. Silently skip comparison on
the run_spec in those cases. Dependencies must be the same.
the run_spec when both lhs and rhs are nondeterministic.
Dependencies must be the same.
"""
o = object()
# Round-tripping `o` through two separate cloudpickle.dumps() calls generates two
# different object instances, which yield different tokens.
x1 = c.submit(lambda x: x, o, key="x")
x2 = delayed(lambda x: x)(o, dask_key_name="x")
x1 = c.submit(lambda x: x, before, key="x")
x2 = delayed(lambda x: x)(after, dask_key_name="x")
y = delayed(lambda x: x)(x2, dask_key_name="y")
fut = c.compute(y)
await async_poll_for(lambda: "y" in s.tasks, timeout=5)

with captured_logger("distributed.scheduler", level=logging.WARNING) as log:
fut = c.compute(y)
await async_poll_for(lambda: "y" in s.tasks, timeout=5)

has_msg = "Detected different `run_spec` for key 'x'" in log.getvalue()
assert has_msg == expect_msg

async with Worker(s.address):
assert type(await fut) is object
assert type(await fut) is type(before)


@pytest.mark.parametrize("add_deps", [False, True])
Expand Down

0 comments on commit 07974fd

Please sign in to comment.