Skip to content

Commit

Permalink
Support unrunnable in adaptivity, also use take rather than sample
Browse files Browse the repository at this point in the history
  • Loading branch information
mrocklin committed Jul 27, 2023
1 parent 1b6a2c5 commit bd4069a
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 11 deletions.
8 changes: 6 additions & 2 deletions distributed/deploy/tests/test_adaptive.py
Expand Up @@ -486,12 +486,16 @@ async def test_adaptive_stopped():
await async_poll_for(lambda: not pc.is_running(), timeout=5)


@pytest.mark.parametrize("saturation", [1, float("inf")])
@gen_cluster(
client=True,
nthreads=[],
config={"distributed.scheduler.default-task-durations": {"slowinc": 1000}},
config={
"distributed.scheduler.default-task-durations": {"slowinc": 1000},
},
)
async def test_scale_up_large_tasks(c, s):
async def test_scale_up_large_tasks(c, s, saturation):
s.WORKER_SATURATION = saturation
futures = c.map(slowinc, range(10))
while not s.tasks:
await asyncio.sleep(0.001)
Expand Down
16 changes: 7 additions & 9 deletions distributed/scheduler.py
Expand Up @@ -37,6 +37,7 @@
import psutil
from sortedcontainers import SortedDict, SortedSet
from tlz import (
concat,
first,
groupby,
merge,
Expand All @@ -45,6 +46,7 @@
partition,
pluck,
second,
take,
valmap,
)
from tornado.ioloop import IOLoop
Expand Down Expand Up @@ -8062,32 +8064,28 @@ def adaptive_target(self, target_duration=None):

# CPU

if len(self.queued) < 100:
if len(self.queued) + len(self.unrunnable) < 100:
queued_occupancy = 0
for ts in self.queued:
for ts in concat([self.queued, self.unrunnable]):
if ts.prefix.duration_average == -1:
queued_occupancy += self.UNKNOWN_TASK_DURATION
else:
queued_occupancy += ts.prefix.duration_average
else:
queued_occupancy = 0
queued = random.sample(self.queued._heap, 100)
queued = [wr() for _, _, wr in queued]
for ts in random.sample(queued, 100):
if ts is None:
continue
for ts in take(100, concat([self.queued, self.unrunnable])):
if ts.prefix.duration_average == -1:
queued_occupancy += self.UNKNOWN_TASK_DURATION
else:
queued_occupancy += ts.prefix.duration_average
queued_occupancy *= len(self.queued) / 100
queued_occupancy *= (len(self.queued) + len(self.unrunnable)) / 100

cpu = math.ceil(
(self.total_occupancy + queued_occupancy) / target_duration
) # TODO: threads per worker

# Avoid a few long tasks from asking for many cores
tasks_ready = len(self.queued)
tasks_ready = len(self.queued) + len(self.unrunnable)
for ws in self.workers.values():
tasks_ready += len(ws.processing)

Expand Down

0 comments on commit bd4069a

Please sign in to comment.