-
-
Notifications
You must be signed in to change notification settings - Fork 734
Overhaul test_priorities.py #6077
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
distributed/tests/test_priorities.py
Outdated
@gen_cluster(client=True, nthreads=[("", 1)]) | ||
async def test_submit(c, s, a): | ||
low = c.submit(inc, 1, priority=-1) | ||
futures = c.map(slowinc, range(10), delay=0.1) | ||
ev = Event() | ||
clog = c.submit(lambda ev: ev.wait(), ev) | ||
high = c.submit(inc, 2, priority=1) | ||
async with Worker(s.address, nthreads=1): | ||
await wait(high) | ||
assert all(s.processing.values()) | ||
assert s.tasks[low.key].state == "processing" | ||
|
||
await wait(high) | ||
assert all(s.processing.values()) | ||
assert s.tasks[low.key].state == "processing" | ||
await ev.set() | ||
await clog | ||
await wait(low) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the logic of this test is now slightly different. The initial test submits a bunch of futures, the scheduler queues them up but doesn't have the chance to submit anything, yet. This way, we rely on the scheduler to properly reorder the tasks based on their priority.
I consider starting the Worker as part of the test important since all tasks will be in state no-worker
and once the worker connects, they are worked off in priority order
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same argument for tests below
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason why this works is because the worker itself also prioritises the work again. Since the tasks themselves do not have any dependents, the scheduler can assign them all to the same worker. The worker will receive all compute-task messages in the same tick, will queue them up in it's ready heap and after all is queued up, it will call ensure_computing, just picking off the highest priority item.
Your test shifts responsibility of this from scheduler to worker. It's great that this work but this test may actually fail if we change anything about either one of
- How our BatchedSend batches messages. If we were to send all compute-task messages individually you'd get a different behaviour
- Call ensure_computing not at the end of the entire message batch but intermediately. I'm reasonably certain this test will fail with Migrate ensure_computing transitions to new WorkerState event mechanism - part 2 #6062
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Call ensure_computing not at the end of the entire message batch but intermediately. I'm reasonably certain this test will fail with
However, the original one should succeed with the changes proposed in #6062
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I now reinstated the Worker start to after all tasks have reached the scheduler
Is the stuff in here still flaky? do you need another review? |
@fjetter this is good to be merged if you're happy with it |
Unit Test Results 18 files ± 0 18 suites ±0 9h 23m 36s ⏱️ + 19m 1s For more details on these failures, see this check. Results for commit e6f8eda. ± Comparison against base commit 6e30766. ♻️ This comment has been updated with latest results. |
distributed/tests/test_priorities.py
Outdated
async def test_submit(c, s, a): | ||
low = c.submit(inc, 1, key="low", priority=-1) | ||
ev = Event() | ||
clog = c.submit(lambda ev: ev.wait(), ev, key="clog") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 on adding keys. I find myself too often not knowing what is what
distributed/tests/test_priorities.py
Outdated
for fut in xs + ys + zs: | ||
assert s.tasks[fut.key].priority[:2] == (0, 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This actually encodes a bit of a different signal than the previous test. If we look at the actual priorities (see screenshot below) we can see that, after sorting it, x
, y
and z
interleave.
Your condition just asserts that the first two digits are aligned but doesn't tell us much about what comes after. If we ever were to change how priorities work (not that unlikely to add another field, for instance), this test might break.
The previous test waited until a bunch of tasks finished (number smaller or equal to the number of futures per array, i.e. 5<=10) and ensures that there is some interleaving happening by asserting that some of y
and z
are already done. This is not exact but closer to the intention of interleaving that when we assert the leading digits of the priorities
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I rewrote the last two tests to verify the interleaving
ba783b6
to
2771ae4
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The one concern I have now is that all tests are testing with a Worker that is spinning up after the task submission.
This takes a different code path than if workers are available during submission.
For instance, this entire file is now insensitive to whatever we do in update_graph. If I were to remove the sorting in
distributed/distributed/scheduler.py
Line 4942 in a610f73
for ts in sorted(runnables, key=operator.attrgetter("priority"), reverse=True): |
distributed/tests/test_priorities.py
Outdated
async with Worker(s.address, nthreads=1) as a: # See comment in test_submit | ||
await wait(ys + zs) | ||
story = a.story("x0", "x1", "x2", "y0", "y1", "y2", "z0", "z1", "z2") | ||
|
||
order = [ev[0] for ev in story if ev[1:3] == ("ready", "executing")] | ||
# e.g. order = ['x0', 'z0', 'z1', 'z2', 'x1', 'x2', 'y0', 'y1', 'y2'] | ||
# There's an element of randomness, but there must be *some* interleaving of x and z | ||
assert order[-3:] == ["y0", "y1", "y2"] | ||
assert order.index("x0") < order.index("x1") < order.index("x2") | ||
assert order.index("z0") < order.index("z1") < order.index("z2") | ||
assert order.index("x0") < order.index("z2") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I honestly considered the previous version easier to read and maintain. changing the 5 to a 9 in the sum(t.state ...)
condition made the test pass for me 1000 times while the 5 indeed was flaky.
This version even fails for me locally.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
locally I get
> assert order[-3:] == ["y0", "y1", "y2"]
E AssertionError: assert ['x2', 'y1', 'y2'] == ['y0', 'y1', 'y2']
E At index 0 diff: 'x2' != 'y0'
E Full diff:
E - ['y0', 'y1', 'y2']
E ? ^^
E + ['x2', 'y1', 'y2']
E ? ^^
f5c3233
to
8159aa5
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The test setup is much more complicated that I'd like it to be. However, the test cases themselves are easily understandable and the parametrization provides genuine value, so I'm OK with it
Uh oh!
There was an error while loading. Please reload this page.