From ea7d8e08a111bb028be4902e4969277f24261471 Mon Sep 17 00:00:00 2001 From: Matthew Rocklin Date: Wed, 26 Jul 2023 07:10:54 -0500 Subject: [PATCH 1/5] Use queued tasks in adaptive target Fixes #8035 --- distributed/deploy/tests/test_adaptive.py | 15 ++++++++++++++- distributed/scheduler.py | 20 ++++++++++++++++++-- 2 files changed, 32 insertions(+), 3 deletions(-) diff --git a/distributed/deploy/tests/test_adaptive.py b/distributed/deploy/tests/test_adaptive.py index fcc15ee0e9..ba492cdb82 100644 --- a/distributed/deploy/tests/test_adaptive.py +++ b/distributed/deploy/tests/test_adaptive.py @@ -19,7 +19,7 @@ ) from distributed.compatibility import LINUX, MACOS, WINDOWS from distributed.metrics import time -from distributed.utils_test import async_poll_for, gen_test, slowinc +from distributed.utils_test import async_poll_for, gen_cluster, gen_test, slowinc def test_adaptive_local_cluster(loop): @@ -484,3 +484,16 @@ async def test_adaptive_stopped(): pc = instance.periodic_callback await async_poll_for(lambda: not pc.is_running(), timeout=5) + + +@gen_cluster( + client=True, + nthreads=[], + config={"distributed.scheduler.default-task-durations": {"slowinc": 1000}}, +) +async def test_scale_up_large_tasks(c, s): + futures = c.map(slowinc, range(10)) + while not s.tasks: + await asyncio.sleep(0.001) + + assert s.adaptive_target() == 10 diff --git a/distributed/scheduler.py b/distributed/scheduler.py index ca612cd9ba..4f81632545 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -8062,8 +8062,24 @@ def adaptive_target(self, target_duration=None): # CPU - # TODO consider any user-specified default task durations for queued tasks - queued_occupancy = len(self.queued) * self.UNKNOWN_TASK_DURATION + if len(self.queued) < 100: + queued_occupancy = 0 + for ts in self.queued: + if ts.prefix.duration_average == -1: + queued_occupancy += self.UNKNOWN_TASK_DURATION + else: + queued_occupancy += ts.prefix.duration_average + else: + queued_occupancy = 0 + queued = random.sample(self.queued._heap, 100) + queued = [wr() for _, _, wr in queued] + for ts in random.sample(queued, 100): + if ts.prefix.duration_average == -1: + queued_occupancy += self.UNKNOWN_TASK_DURATION + else: + queued_occupancy += ts.prefix.duration_average + queued_occupancy *= len(self.queued) / 100 + cpu = math.ceil( (self.total_occupancy + queued_occupancy) / target_duration ) # TODO: threads per worker From 43d26b38f6133cb2be3182ad0552ed07b792f31c Mon Sep 17 00:00:00 2001 From: Matthew Rocklin Date: Wed, 26 Jul 2023 07:13:05 -0500 Subject: [PATCH 2/5] add large test as well --- distributed/deploy/tests/test_adaptive.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/distributed/deploy/tests/test_adaptive.py b/distributed/deploy/tests/test_adaptive.py index ba492cdb82..21ff135631 100644 --- a/distributed/deploy/tests/test_adaptive.py +++ b/distributed/deploy/tests/test_adaptive.py @@ -497,3 +497,9 @@ async def test_scale_up_large_tasks(c, s): await asyncio.sleep(0.001) assert s.adaptive_target() == 10 + + more_futures = c.map(slowinc, range(200)) + while len(s.tasks) != 200: + await asyncio.sleep(0.001) + + assert s.adaptive_target() == 200 From 1b6a2c57656ac5c2e7dbd6b3fbbd33d070be77ef Mon Sep 17 00:00:00 2001 From: Matthew Rocklin Date: Thu, 27 Jul 2023 07:03:21 -0500 Subject: [PATCH 3/5] handle None Tasks in queued --- distributed/scheduler.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 4f81632545..2718ae973f 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -8074,6 +8074,8 @@ def adaptive_target(self, target_duration=None): queued = random.sample(self.queued._heap, 100) queued = [wr() for _, _, wr in queued] for ts in random.sample(queued, 100): + if ts is None: + continue if ts.prefix.duration_average == -1: queued_occupancy += self.UNKNOWN_TASK_DURATION else: From bd4069ad4cef49e4a99856d4bed18051fccd8d79 Mon Sep 17 00:00:00 2001 From: Matthew Rocklin Date: Thu, 27 Jul 2023 09:57:51 -0500 Subject: [PATCH 4/5] Support unrunnable in adaptivity, also use take rather than sample --- distributed/deploy/tests/test_adaptive.py | 8 ++++++-- distributed/scheduler.py | 16 +++++++--------- 2 files changed, 13 insertions(+), 11 deletions(-) diff --git a/distributed/deploy/tests/test_adaptive.py b/distributed/deploy/tests/test_adaptive.py index 21ff135631..3aae49f346 100644 --- a/distributed/deploy/tests/test_adaptive.py +++ b/distributed/deploy/tests/test_adaptive.py @@ -486,12 +486,16 @@ async def test_adaptive_stopped(): await async_poll_for(lambda: not pc.is_running(), timeout=5) +@pytest.mark.parametrize("saturation", [1, float("inf")]) @gen_cluster( client=True, nthreads=[], - config={"distributed.scheduler.default-task-durations": {"slowinc": 1000}}, + config={ + "distributed.scheduler.default-task-durations": {"slowinc": 1000}, + }, ) -async def test_scale_up_large_tasks(c, s): +async def test_scale_up_large_tasks(c, s, saturation): + s.WORKER_SATURATION = saturation futures = c.map(slowinc, range(10)) while not s.tasks: await asyncio.sleep(0.001) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 2718ae973f..5717efed2a 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -37,6 +37,7 @@ import psutil from sortedcontainers import SortedDict, SortedSet from tlz import ( + concat, first, groupby, merge, @@ -45,6 +46,7 @@ partition, pluck, second, + take, valmap, ) from tornado.ioloop import IOLoop @@ -8062,32 +8064,28 @@ def adaptive_target(self, target_duration=None): # CPU - if len(self.queued) < 100: + if len(self.queued) + len(self.unrunnable) < 100: queued_occupancy = 0 - for ts in self.queued: + for ts in concat([self.queued, self.unrunnable]): if ts.prefix.duration_average == -1: queued_occupancy += self.UNKNOWN_TASK_DURATION else: queued_occupancy += ts.prefix.duration_average else: queued_occupancy = 0 - queued = random.sample(self.queued._heap, 100) - queued = [wr() for _, _, wr in queued] - for ts in random.sample(queued, 100): - if ts is None: - continue + for ts in take(100, concat([self.queued, self.unrunnable])): if ts.prefix.duration_average == -1: queued_occupancy += self.UNKNOWN_TASK_DURATION else: queued_occupancy += ts.prefix.duration_average - queued_occupancy *= len(self.queued) / 100 + queued_occupancy *= (len(self.queued) + len(self.unrunnable)) / 100 cpu = math.ceil( (self.total_occupancy + queued_occupancy) / target_duration ) # TODO: threads per worker # Avoid a few long tasks from asking for many cores - tasks_ready = len(self.queued) + tasks_ready = len(self.queued) + len(self.unrunnable) for ws in self.workers.values(): tasks_ready += len(ws.processing) From 2da9df91ca0067d085f22dcb5778cafad6f67544 Mon Sep 17 00:00:00 2001 From: Matthew Rocklin Date: Thu, 27 Jul 2023 10:16:18 -0500 Subject: [PATCH 5/5] Simplify --- distributed/scheduler.py | 22 ++++++++-------------- 1 file changed, 8 insertions(+), 14 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 5717efed2a..4649dc205d 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -8063,21 +8063,15 @@ def adaptive_target(self, target_duration=None): target_duration = parse_timedelta(target_duration) # CPU + queued = take(100, concat([self.queued, self.unrunnable])) + queued_occupancy = 0 + for ts in queued: + if ts.prefix.duration_average == -1: + queued_occupancy += self.UNKNOWN_TASK_DURATION + else: + queued_occupancy += ts.prefix.duration_average - if len(self.queued) + len(self.unrunnable) < 100: - queued_occupancy = 0 - for ts in concat([self.queued, self.unrunnable]): - if ts.prefix.duration_average == -1: - queued_occupancy += self.UNKNOWN_TASK_DURATION - else: - queued_occupancy += ts.prefix.duration_average - else: - queued_occupancy = 0 - for ts in take(100, concat([self.queued, self.unrunnable])): - if ts.prefix.duration_average == -1: - queued_occupancy += self.UNKNOWN_TASK_DURATION - else: - queued_occupancy += ts.prefix.duration_average + if len(self.queued) + len(self.unrunnable) > 100: queued_occupancy *= (len(self.queued) + len(self.unrunnable)) / 100 cpu = math.ceil(