Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 43 additions & 3 deletions distributed/tests/test_cancelled_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,15 @@
from unittest import mock

import distributed
from distributed import Event
from distributed.core import CommClosedError
from distributed.utils_test import _LockedCommPool, gen_cluster, inc, slowinc
from distributed.utils_test import (
_LockedCommPool,
assert_worker_story,
gen_cluster,
inc,
slowinc,
)


async def wait_for_state(key, state, dask_worker):
Expand Down Expand Up @@ -51,7 +58,13 @@ async def test_abort_execution_add_as_dependency(c, s, a):

@gen_cluster(client=True)
async def test_abort_execution_to_fetch(c, s, a, b):
fut = c.submit(slowinc, 1, delay=2, key="f1", workers=[a.worker_address])
ev = Event()

def f(ev):
ev.wait()
return 123

fut = c.submit(f, ev, key="f1", workers=[a.worker_address])
await wait_for_state(fut.key, "executing", a)
fut.release()
await wait_for_cancelled(fut.key, a)
Expand All @@ -62,7 +75,34 @@ async def test_abort_execution_to_fetch(c, s, a, b):
# then, a must switch the execute to fetch. Instead of doing so, it will
# simply re-use the currently computing result.
fut = c.submit(inc, fut, workers=[a.worker_address], key="f2")
await fut
await wait_for_state("f2", "waiting", a)
await ev.set()
assert await fut == 124 # It would be 3 if the result was copied from b
del fut
while "f1" in a.tasks:
await asyncio.sleep(0.01)

assert_worker_story(
a.story("f1"),
[
("f1", "compute-task"),
("f1", "released", "waiting", "waiting", {"f1": "ready"}),
("f1", "waiting", "ready", "ready", {}),
("f1", "ready", "executing", "executing", {}),
("free-keys", ("f1",)),
("f1", "executing", "released", "cancelled", {}),
("f1", "ensure-task-exists", "cancelled"),
("f1", "cancelled", "fetch", "cancelled", {"f1": ("resumed", "fetch")}),
("f1", "cancelled", "resumed", "resumed", {}),
("f1", "put-in-memory"),
("f1", "resumed", "memory", "memory", {"f2": "ready"}),
("free-keys", ("f1",)),
("f1", "release-key"),
("f1", "memory", "released", "released", {}),
("f1", "released", "forgotten", "forgotten", {}),
],
strict=True,
)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suspect that you and @fjetter are aligned here, so I'm happy to step back here.

However, my experience is that tests like these will become expensive and annoying if you ever want to change the state machine system. Every time a dev changes the system, they will need to look at this test, understand its intent, and change it accordingly. If this output is at all likely to change over the next few years then this seems like it might add a lot of inertia.

Dask tests used to look a lot like this in the early days. There was a painful process to get rid of them and replace them with tests that use the user API.

Again, I'm happy if this is the direction that you all want to go (please do not block progress on this comment).

Copy link
Copy Markdown
Collaborator Author

@crusaderky crusaderky Mar 30, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

During the worker state machine refactoring, I stepped on a few tests that failed because they were testing a story and it changed in the refactoring. Yes, it costed me an extra 10 minutes to go through them and update the story, but it was a good thing to see and validate what changed in the story exactly.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this makes sense if it's one or two, and if it's the same person writing the tests as writing the system.

However, if you make dozens of these tests, then if someone follows you in a year they will either not be able to make any changes, or more likely, they will delete all of these tests.

Anyway, I think that you and Florian have this covered. I'm raising this as a general concern. It may be that you aren't planning to repeat this many times, or it may be that you are planning to repeat it, but are making a reasoned decision to proceed down this path because the benefits outweight the costs. Again, not blocking anything here.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

However, if you make dozens of these tests, then if someone follows you in a year they will either not be able to make any changes, or more likely, they will delete all of these tests.

If somebody wants to change the state machine without understanding this, they shouldn't change the state machine at all.
Deleting these tests would be rather reckless. This has been incredibly helpful in debugging deadlocks. If these stories change, this should be done absolutely intentional and not by chance.
We should not introduce these all over the place but particularly tests that test sophisticated race conditions, this has proven to be quite useful.

One of the outcomes of the current refactoring will be the possibility to log state machine events that do not go down to this granular level. This should make tests more readable while maintaining similar expressiveness.



@gen_cluster(client=True)
Expand Down
18 changes: 7 additions & 11 deletions distributed/tests/test_cluster_dump.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,19 +115,15 @@ async def test_cluster_dump_state(c, s, a, b, tmp_path):
@gen_cluster(client=True)
async def test_cluster_dump_story(c, s, a, b, tmp_path):
filename = tmp_path / "dump"
futs = c.map(inc, range(2))
fut_keys = {f.key for f in futs}
await c.gather(futs)
f1 = c.submit(inc, 0, key="f1")
f2 = c.submit(inc, 1, key="f2")
await c.gather([f1, f2])
await c.dump_cluster_state(filename, format="msgpack")

dump = DumpArtefact.from_url(f"{filename}.msgpack.gz")
task_key = next(iter(fut_keys))

def _expected_story(task_key):
return

story = dump.scheduler_story(*fut_keys)
assert len(story) == len(fut_keys)
story = dump.scheduler_story("f1", "f2")
assert story.keys() == {"f1", "f2"}

for k, task_story in story.items():
expected = [
Expand All @@ -140,8 +136,8 @@ def _expected_story(task_key):
for e1, e2 in zip(event, expected_event):
assert e1 == e2

story = dump.worker_story(*fut_keys)
assert len(story) == len(fut_keys)
story = dump.worker_story("f1", "f2")
assert story.keys() == {"f1", "f2"}

for k, task_story in story.items():
assert_worker_story(
Expand Down
11 changes: 8 additions & 3 deletions distributed/tests/test_steal.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

import dask

from distributed import Lock, Nanny, Worker, wait, worker_client
from distributed import Event, Lock, Nanny, Worker, wait, worker_client
from distributed.compatibility import LINUX
from distributed.config import config
from distributed.core import Status
Expand Down Expand Up @@ -1148,9 +1148,11 @@ async def test_reschedule_concurrent_requests_deadlock(c, s, *workers):
# https://github.com/dask/distributed/issues/5370
steal = s.extensions["stealing"]
w0 = workers[0]
ev = Event()
futs1 = c.map(
slowinc,
lambda _, ev: ev.wait(),
range(10),
ev=ev,
key=[f"f1-{ix}" for ix in range(10)],
workers=[w0.address],
allow_other_workers=True,
Expand All @@ -1175,6 +1177,9 @@ async def test_reschedule_concurrent_requests_deadlock(c, s, *workers):
assert wsB == victim_ts.processing_on
# move_task_request is not responsible for respecting worker restrictions
steal.move_task_request(victim_ts, wsB, wsC)

# Let tasks finish
await ev.set()
await c.gather(futs1)

# If this turns out to be overly flaky, the following may be relaxed or
Expand All @@ -1183,7 +1188,7 @@ async def test_reschedule_concurrent_requests_deadlock(c, s, *workers):

# Either the last request goes through or both have been rejected since the
# computation was already done by the time the request comes in. This is
# unfortunately not stable even if we increase the compute time
# unfortunately not stable.
if victim_ts.who_has != {wsC}:
msgs = steal.story(victim_ts)
assert len(msgs) == 2
Expand Down
69 changes: 34 additions & 35 deletions distributed/tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -1677,41 +1677,40 @@ async def test_story_with_deps(c, s, a, b):
Assert that the structure of the story does not change unintentionally and
expected subfields are actually filled
"""
dep = c.submit(inc, 1, workers=[a.address])
res = c.submit(inc, dep, workers=[b.address])
dep = c.submit(inc, 1, workers=[a.address], key="dep")
res = c.submit(inc, dep, workers=[b.address], key="res")
await res
key = res.key

story = a.story(key)
story = a.story("res")
assert story == []
story = b.story(key)
story = b.story("res")

# Story now includes randomized stimulus_ids and timestamps.
stimulus_ids = {ev[-2] for ev in story}
assert len(stimulus_ids) == 3, stimulus_ids
# This is a simple transition log
expected = [
(key, "compute-task"),
(key, "released", "waiting", "waiting", {dep.key: "fetch"}),
(key, "waiting", "ready", "ready", {}),
(key, "ready", "executing", "executing", {}),
(key, "put-in-memory"),
(key, "executing", "memory", "memory", {}),
("res", "compute-task"),
("res", "released", "waiting", "waiting", {"dep": "fetch"}),
("res", "waiting", "ready", "ready", {}),
("res", "ready", "executing", "executing", {}),
("res", "put-in-memory"),
("res", "executing", "memory", "memory", {}),
]
assert_worker_story(story, expected, strict=True)

story = b.story(dep.key)
story = b.story("dep")
stimulus_ids = {ev[-2] for ev in story}
assert len(stimulus_ids) == 2, stimulus_ids
expected = [
(dep.key, "ensure-task-exists", "released"),
(dep.key, "released", "fetch", "fetch", {}),
("gather-dependencies", a.address, {dep.key}),
(dep.key, "fetch", "flight", "flight", {}),
("request-dep", a.address, {dep.key}),
("receive-dep", a.address, {dep.key}),
(dep.key, "put-in-memory"),
(dep.key, "flight", "memory", "memory", {res.key: "ready"}),
("dep", "ensure-task-exists", "released"),
("dep", "released", "fetch", "fetch", {}),
("gather-dependencies", a.address, {"dep"}),
("dep", "fetch", "flight", "flight", {}),
("request-dep", a.address, {"dep"}),
("receive-dep", a.address, {"dep"}),
("dep", "put-in-memory"),
("dep", "flight", "memory", "memory", {"res": "ready"}),
]
assert_worker_story(story, expected, strict=True)

Expand Down Expand Up @@ -3057,11 +3056,11 @@ async def test_task_flight_compute_oserror(c, s, a, b):
await wait(futs)
assert a.data
assert write_queue.empty()
f1 = c.submit(sum, futs, workers=[b.address])
f1 = c.submit(sum, futs, workers=[b.address], key="f1")
peer, msg = await write_queue.get()
assert peer == a.address
assert msg["op"] == "get_data"
in_flight_tasks = [ts for ts in b.tasks.values() if ts.key != f1.key]
in_flight_tasks = [ts for ts in b.tasks.values() if ts.key != "f1"]
assert all(ts.state == "flight" for ts in in_flight_tasks)
await a.close()
write_event.set()
Expand All @@ -3072,28 +3071,28 @@ async def test_task_flight_compute_oserror(c, s, a, b):
# asserting a few internals to make sure that if things change this is done
# deliberately

sum_story = b.story(f1.key)
sum_story = b.story("f1")
expected_sum_story = [
(f1.key, "compute-task"),
("f1", "compute-task"),
(
f1.key,
"f1",
"released",
"waiting",
"waiting",
{ts.key: "fetch" for ts in in_flight_tasks},
),
# inc is lost and needs to be recomputed. Therefore, sum is released
("free-keys", (f1.key,)),
(f1.key, "release-key"),
(f1.key, "waiting", "released", "released", {f1.key: "forgotten"}),
(f1.key, "released", "forgotten", "forgotten", {}),
("free-keys", ("f1",)),
("f1", "release-key"),
("f1", "waiting", "released", "released", {"f1": "forgotten"}),
("f1", "released", "forgotten", "forgotten", {}),
# Now, we actually compute the task *once*. This must not cycle back
(f1.key, "compute-task"),
(f1.key, "released", "waiting", "waiting", {f1.key: "ready"}),
(f1.key, "waiting", "ready", "ready", {}),
(f1.key, "ready", "executing", "executing", {}),
(f1.key, "put-in-memory"),
(f1.key, "executing", "memory", "memory", {}),
("f1", "compute-task"),
("f1", "released", "waiting", "waiting", {"f1": "ready"}),
("f1", "waiting", "ready", "ready", {}),
("f1", "ready", "executing", "executing", {}),
("f1", "put-in-memory"),
("f1", "executing", "memory", "memory", {}),
]
assert_worker_story(sum_story, expected_sum_story, strict=True)

Expand Down