Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use queued tasks in adaptive target #8037

Merged
merged 5 commits into from Jul 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
25 changes: 24 additions & 1 deletion distributed/deploy/tests/test_adaptive.py
Expand Up @@ -19,7 +19,7 @@
)
from distributed.compatibility import LINUX, MACOS, WINDOWS
from distributed.metrics import time
from distributed.utils_test import async_poll_for, gen_test, slowinc
from distributed.utils_test import async_poll_for, gen_cluster, gen_test, slowinc


def test_adaptive_local_cluster(loop):
Expand Down Expand Up @@ -484,3 +484,26 @@ async def test_adaptive_stopped():
pc = instance.periodic_callback

await async_poll_for(lambda: not pc.is_running(), timeout=5)


@pytest.mark.parametrize("saturation", [1, float("inf")])
@gen_cluster(
client=True,
nthreads=[],
config={
"distributed.scheduler.default-task-durations": {"slowinc": 1000},
},
)
async def test_scale_up_large_tasks(c, s, saturation):
s.WORKER_SATURATION = saturation
futures = c.map(slowinc, range(10))
while not s.tasks:
await asyncio.sleep(0.001)
Comment on lines +500 to +501
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
while not s.tasks:
await asyncio.sleep(0.001)
await async_poll_for(lambda: s.tasks, timeout=5)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer not to use these. I find that this removes one line but at the cost of adding a new abstraction (async_poll_for). The tradeoff here doesn't seem positive to me.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are using these ubiquitously. I think this is not a design choice that should be left to the whim and taste of the individual developers; if you don't like them we should have a team talk about them which should result in either using them everywhere or removing them completely.


assert s.adaptive_target() == 10

more_futures = c.map(slowinc, range(200))
while len(s.tasks) != 200:
await asyncio.sleep(0.001)
Comment on lines +506 to +507
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
while len(s.tasks) != 200:
await asyncio.sleep(0.001)
await async_poll_for(lambda: len(s.tasks) == 200, timeout=5)


assert s.adaptive_target() == 200
16 changes: 13 additions & 3 deletions distributed/scheduler.py
Expand Up @@ -37,6 +37,7 @@
import psutil
from sortedcontainers import SortedDict, SortedSet
from tlz import (
concat,
first,
groupby,
merge,
Expand All @@ -45,6 +46,7 @@
partition,
pluck,
second,
take,
valmap,
)
from tornado.ioloop import IOLoop
Expand Down Expand Up @@ -8061,15 +8063,23 @@
target_duration = parse_timedelta(target_duration)

# CPU
queued = take(100, concat([self.queued, self.unrunnable]))
queued_occupancy = 0
for ts in queued:
if ts.prefix.duration_average == -1:
queued_occupancy += self.UNKNOWN_TASK_DURATION
else:
queued_occupancy += ts.prefix.duration_average

if len(self.queued) + len(self.unrunnable) > 100:
queued_occupancy *= (len(self.queued) + len(self.unrunnable)) / 100

Check warning on line 8075 in distributed/scheduler.py

View check run for this annotation

Codecov / codecov/patch

distributed/scheduler.py#L8075

Added line #L8075 was not covered by tests

# TODO consider any user-specified default task durations for queued tasks
queued_occupancy = len(self.queued) * self.UNKNOWN_TASK_DURATION
cpu = math.ceil(
(self.total_occupancy + queued_occupancy) / target_duration
) # TODO: threads per worker

# Avoid a few long tasks from asking for many cores
tasks_ready = len(self.queued)
tasks_ready = len(self.queued) + len(self.unrunnable)
for ws in self.workers.values():
tasks_ready += len(ws.processing)

Expand Down