Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use queued tasks in adaptive target #8037

Merged
merged 5 commits into from Jul 28, 2023
Merged

Conversation

mrocklin
Copy link
Member

Fixes #8035

@github-actions
Copy link
Contributor

github-actions bot commented Jul 26, 2023

Unit Test Results

See test report for an extended history of previous test failures. This is useful for diagnosing flaky tests.

       19 files   -        1         19 suites   - 1   10h 16m 5s ⏱️ + 1h 6m 50s
  3 744 tests +       2    3 633 ✔️ +       2     106 💤  - 2  5 +2 
34 869 runs  +1 209  33 197 ✔️ +1 203  1 667 💤 +5  5 +1 

For more details on these failures, see this check.

Results for commit 2da9df9. ± Comparison against base commit 145c13a.

This pull request removes 2 and adds 4 tests. Note that renamed tests count towards both.
distributed.cli.tests.test_dask_worker.test_listen_address_ipv6[tcp:..[ ‑ 1]:---nanny]
distributed.cli.tests.test_dask_worker.test_listen_address_ipv6[tcp:..[ ‑ 1]:---no-nanny]
distributed.deploy.tests.test_adaptive ‑ test_scale_up_large_tasks[1]
distributed.deploy.tests.test_adaptive ‑ test_scale_up_large_tasks[inf]
distributed.tests.test_client ‑ test_context_manager_used_from_different_tasks
distributed.tests.test_client ‑ test_context_manager_used_from_different_threads

♻️ This comment has been updated with latest results.

@dchudz
Copy link
Contributor

dchudz commented Jul 27, 2023

In case this helps Matt or reviewers: I tried out this PR and it makes my own go-to adaptive example go from scaling smaller than I expect to larger than I expect (because my workers have 4 threads each):

In [4]: c.adapt(minimum=1, maximum=200, target_duration="5 minutes")
2023-07-26 23:45:44,466 - distributed.deploy.adaptive - INFO - Adaptive scaling started: minimum=1 maximum=200
Out[4]: <coiled.cluster.CoiledAdaptive at 0x105725330>

In [5]: def sleep_5_secs(x):
   ...:     import time
   ...:     time.sleep(5)
   ...:     return x
   ...:

In [6]: results = client.map(sleep_5_secs, range(10_000))

My back-of-the-envelope says this should give me about 42 workers.

Before this PR, it gave me 17. After this PR, it gives me 167, which is roughly 42*4, because adaptive doesn't account for worker threads:

        cpu = math.ceil(
            (self.total_occupancy + queued_occupancy) / target_duration
        )  # TODO: threads per worker

But I realize that's a separate problem that shouldn't block this PR. Just adding the context/example in case it helps.

@hendrikmakait
Copy link
Member

It looks like test_adaptive_target_empty_cluster[True] fails consistently.

@mrocklin
Copy link
Member Author

It looks like test_adaptive_target_empty_cluster[True] fails consistently.

Thanks. Fixed.

if len(self.queued) < 100:
queued_occupancy = 0
for ts in self.queued:
if ts.prefix.duration_average == -1:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of scope for this PR: I think this is problematic. duration among the same TaskPrefix can vary wildly; I would much rather use a metric that is TaskGroup-specific.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't disagree. I also suspect that this will be fine in most cases.

Comment on lines 8069 to 8071
queued_occupancy += self.UNKNOWN_TASK_DURATION
else:
queued_occupancy += ts.prefix.duration_average
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of scope nit: this screams for encapsulation in a smart duration_average property

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't disagree

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is get_task_duration which also handles the case of user provided estimates.

Copy link
Collaborator

@crusaderky crusaderky left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both this PR and main fail (as the tests demonstrate) in the use case where

  1. there are 0 workers, and
  2. queuing is disabled (distributed.scheduler.worker-saturation: .inf)

In this use case, all tasks will end up in Scheduler.unrunnable instead of Scheduler.queued.

Even when queueing is enabled, this fails when

  1. there are no workers
  2. the tasks require resources

(I understand that the expectation in an adaptive cluster with resources is that all dynamically-started workers provide the resource, e.g. {"GPU": 1}).

Again, tasks will end up in Scheduler.unrunnable.
Please add a test for this use case.

distributed/scheduler.py Outdated Show resolved Hide resolved
distributed/scheduler.py Outdated Show resolved Hide resolved
Comment on lines +496 to +497
while not s.tasks:
await asyncio.sleep(0.001)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
while not s.tasks:
await asyncio.sleep(0.001)
await async_poll_for(lambda: s.tasks, timeout=5)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer not to use these. I find that this removes one line but at the cost of adding a new abstraction (async_poll_for). The tradeoff here doesn't seem positive to me.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are using these ubiquitously. I think this is not a design choice that should be left to the whim and taste of the individual developers; if you don't like them we should have a team talk about them which should result in either using them everywhere or removing them completely.

Comment on lines +502 to +503
while len(s.tasks) != 200:
await asyncio.sleep(0.001)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
while len(s.tasks) != 200:
await asyncio.sleep(0.001)
await async_poll_for(lambda: len(s.tasks) == 200, timeout=5)

@crusaderky
Copy link
Collaborator

crusaderky commented Jul 27, 2023

In short:

from itertools import chain
from toolz import peekn

# Note: this relies on HeapSet.__iter__ and set.__iter__ to yield elements in pseudo-random order
queued, _ = peekn(100, chain(self.queued, self.unrunnable))

queued_occupancy = 0
for ts in queued:
    if ts.prefix.duration_average == -1:
        queued_occupancy += self.UNKNOWN_TASK_DURATION
    else:
        queued_occupancy += ts.prefix.duration_average

@mrocklin
Copy link
Member Author

As a heads-up I'm unlikely to spend a bunch of time on this. It's more likely that I ask folks like @fjetter to ask people around him (maybe even @crusaderky ) to pick this up.

I'm hopeful that this can be a small fix. I would be mildly sad/surprised if it required a large effort (not that that's what you're saying).

@mrocklin
Copy link
Member Author

Never mind. The use of take removes much of the concern about cost here (thanks for the suggestion).

I do think that the request around supporting unrunnable is out of scope (this was broken before) but I've gone ahead and done it anyway. I'm hopeful that we can get this in soon (I'm finding that this is valuable for coiled run work and this is blocking me).

@fjetter fjetter merged commit 9d9702e into dask:main Jul 28, 2023
19 of 27 checks passed
@mrocklin mrocklin deleted the adaptive-queued branch July 28, 2023 15:52
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Adaptive target incorrectly assigns unknown duration to queued tasks
5 participants