Skip to content

ENH: Add subclass of Adaptive that works with unknown tasks#4973

Open
elaineejiang wants to merge 5 commits intodask:mainfrom
elaineejiang:elastic-adaptive
Open

ENH: Add subclass of Adaptive that works with unknown tasks#4973
elaineejiang wants to merge 5 commits intodask:mainfrom
elaineejiang:elastic-adaptive

Conversation

@elaineejiang
Copy link
Copy Markdown

Related to #4816

I've found that Adaptive doesn't work as well when the functions I'm submitting are labeled as "unknown tasks". At work, we use a lot of internal functions that have highly variable durations. I added a subclass of Adaptive to scale based on the number of unblocked tasks instead of estimated durations. I recently gave a short talk on this extension at Dask Summit: https://zoom.us/rec/play/3jXhd0X69egba6uXWXsYhrBlNTKef2-J3dTX0Hr0j15NOU-RteQcple[…]g.1623702769220.3b25454921a97baf96ba551741201890&_x_zm_rhtaid=511 (skip to 00:10:42).

Curious to hear what you all think!

Also, not set on the subclass name ElasticAdaptive (a bit redundant) -- happy to take suggestions.

@fjetter
Copy link
Copy Markdown
Member

fjetter commented Jun 29, 2021

Interesting. This somehow raises the question if it is worth measuring variance of task duration as well. At least theoretically we could then define the target using a confidence interval (smth like target = mean + 3 sigma)

@jrbourbeau I am not aware of other cases of subclasses to existing functionality in our code base. Any experience how we would want to proceed here? The functionality seems general enough that it may be useful for others (given enough docs)

Also, not set on the subclass name ElasticAdaptive (a bit redundant) -- happy to take suggestions.

Your subclass proposes to use Tasks instead of Occupancy. Maybe this is something we can work with.

we could do

class _Adaptive:
    pass


class OccupancyAdaptive(_Adaptive):
    pass

class TaskAdaptive(_Adaptive):
    pass

# Default / backwards compat
Adaptive = OccupancyAdaptive

Comment on lines +255 to +290
async def recommendations(self, target: int) -> dict:
"""
Make scale up/down recommendations based on current state and target
"""
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need another recommendation method? Can this be somehow merged (haven't investigated the diff, yet)

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The diff is that this recommendation method doesn't close workers that haven't arrived yet.

It removes this logic (https://github.com/dask/distributed/blob/main/distributed/deploy/adaptive_core.py#L171):

not_yet_arrived = requested - observed
to_close = set()
if not_yet_arrived:
to_close.update(toolz.take(len(plan) - target, not_yet_arrived))

This might be specific to my use case since I'm using off-prem workers that take a while to start up.

@TomAugspurger
Copy link
Copy Markdown
Member

This somehow raises the question if it is worth measuring variance of task duration as well

#4023 / #4028 proposes to capture the variance at a task-prefix level.

@elaineejiang
Copy link
Copy Markdown
Author

@fjetter I like the Occupany vs. Task naming convention -- makes the distinction very clear. Will update the PR.

And thanks @TomAugspurger! Taking into account task variance would definitely be useful.

@elaineejiang elaineejiang requested a review from fjetter July 20, 2021 13:31
@GPUtester
Copy link
Copy Markdown
Collaborator

Can one of the admins verify this patch?

@elaineejiang
Copy link
Copy Markdown
Author

@fjetter or @TomAugspurger - would either of you mind giving this another review? The failed tests don't appear related. Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants