Skip to content

Allow tasks with restrictions to be stolen#2740

Closed
calebho wants to merge 30 commits intodask:masterfrom
calebho:resource_restricted_stealing
Closed

Allow tasks with restrictions to be stolen#2740
calebho wants to merge 30 commits intodask:masterfrom
calebho:resource_restricted_stealing

Conversation

@calebho
Copy link
Contributor

@calebho calebho commented May 31, 2019

Addresses stealing tasks with resource restrictions, as mentioned in #1851.
If a task has hard restrictions, do not just give up on stealing.
Instead, use the restrictions to determine which workers can steal it
before attempting to execute a steal operation.

A follow up PR will be needed to address the issue of long-running tasks
not being stolen because the scheduler has no information about their
runtime.

calebho and others added 2 commits May 31, 2019 15:46
Addresses stealing tasks with resource restrictions, as mentioned in dask#1851.
If a task has hard restrictions, do not just give up on stealing.
Instead, use the restrictions to determine which workers can steal it
before attempting to execute a steal operation.

A follow up PR will be needed to address the issue of long-running tasks
not being stolen because the scheduler has no information about their
runtime.
@jakirkham
Copy link
Member

@TomAugspurger, would you have a chance to look at this PR?

…dask#2733)

Convert resource key toples to a string representation before they are
submitted to the scheduler. The commit is intended to fix dask#2716.

The test case persists the result of a tiny DataFrame operation
and checks the resource restrictions.
Copy link
Member

@TomAugspurger TomAugspurger left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good, but I'm not very familiar with this. May want a second opinion from @mrocklin when he has time.

.pytest_cache/
dask-worker-space/
.vscode/
*.swp
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems fine since we include it for other files, but I typically put these files under a global gitignore.

``from_``.
"""
if what.loose_restrictions:
logger.debug("Task {} has loose restrictions".format(what.key))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typically logging uses old style string formatting, and the values are passed to logger.debug, .e.g

logger.debug("Task %s has loose restrictions", what.key)

return out


def _can_steal(ws, what, from_):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this is going to be called relatively often. AFAICT, it'll be called once per task, thief-worker pair for every task we're considering stealing. Given that, we should ensure that it's fast. How confident are you that there aren't any surprising performance issues here? Most things look OK, but may want to look into get_address_host.

  • get_address_host

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general, get_address_host is linear in the length of the input address, however I think it's safe to assume that len(ws.address) is small and bounded as a function of ws.

The four collections used here are

  1. what.host_restrictions,
  2. what.worker_restrictions,
  3. ws.resources, and
  4. from_.resources.

Key containment is checked using (1) and (2), and since they are sets these operations are constant with respect to the size of the containers. (3) and (4) are used in _has_resources. The work in the loop only uses (3). Because (3) is a dict, value retrieval is constant with respect to the size of (3). The loop iterates over (4) once, so the total work in this function is linear in the size of (4). So overall, _can_steal is O(len(from_.resources)).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I'm also concerned about the performance here.

This code gets called often. Previously we didn't iterate through idle, we just indexed into it (indeed we made it a SortedSet object just to get this constant-time access).

The work stealing code accounts for a non-trivial amount of the scheduler performance costs. I would not be surprised if adding this check significantly increases those costs. Given that resources aren't used in the common case I'm very hesitant to go this route without strong demonstration that it doesn't affect performance.

However, I would be much more comfortable with this if it was only engaged in the uncommon case that the task had resource constraints. It could be that a simple test like the following resolves the issue.

if ts.resources:
    theives = [...]
    theif = ...
else:
    thief = self.idle[i * len(self.idle)]

Then this PR is essentially a no-op in the common case, and we can be a lot more relaxed about performance.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

def _can_steal(ws, what, from_):

I like the use of the pronoun ws for a worker state. I think that if what is a task state then we should probably use the ts pronoun instead. It's not easy to see what what means otherwise without reading the docstring carefully. If ws and from_ are both worker states then I recommend the names thief and victim instead.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like we're going to call this function a lot and it also seems like a lot of it might be pretty repetitive. Do you have any thoughts on how to reduce repetition here?

As an example, lets imagine that we have 1000 workers and 10,000,000 tasks. I think that we call balance every 100ms and we'd like stealing to take up less than 10% of our CPU scheduler overhead. My guess is that we don't currently stay under budget when there are restrictions, and my guess is that a lot of it has to do with this function. I think that it would be good to take a hard look at this function and reduce costs. Some of this might be through things like caching, but I'm not sure of the best approach here.

@jakirkham
Copy link
Member

@calebho, any thoughts on Tom and Matt's suggestions above? 🙂

@calebho
Copy link
Contributor Author

calebho commented Jun 4, 2019

So I was investigating another issue related to work stealing where all tasks were running on single worker leaving all other workers idle. Here's a demonstrative example:

import time
from distributed import Client
from dask_jobqueue import SLURMCluster
from tempfile import TemporaryDirectory
import os


def long_identity(i):
    time.sleep(15)
    return i


def main():
    world_size = 5
    ntasks = 5
    with TemporaryDirectory(dir=os.getcwd()) as local_dir:
        print(local_dir)
        cluster = SLURMCluster(
            name='testing',
            queue='dev',
            cores=10,  # set to at most `ntasks` to observe work stealing
            memory='4GB',
            local_directory=local_dir,
            log_directory=local_dir,
            extra=['--resources "TASKS=1"'],
            silence_logs=False,
        )
        cluster.adapt(minimum=world_size, maximum=world_size)
        cl = Client(cluster)
        futs = []
        for i in range(ntasks):
            futs.append(cl.submit(long_identity, i, resources={'TASKS': 1}))

        for f in futs:
            f.result()


if __name__ == '__main__':
    main()

The interesting thing about this example is if you use more cores than there are tasks, then no work stealing occurs. If you set cores to at most ntasks, then work stealing occurs. After some digging, I think this is because the scheduler doesn't believe the worker holding all the tasks is saturated, even though all of its resources are consumed by a single task. As a result, the tasks are processed serially. Looking into how Scheduler.saturated is computed, if the size of WorkerState.processing is less than WorkerState.ncores, then the worker is considered idle (and thus not saturated). This is a problem because many ML workflows are expressed in this manner: few long running tasks which use some number of GPUs. The number of CPUs allocated for these tasks is in general less important (just allocate enough to fully saturate the GPUs).

This gave me another idea for how to solve the current issue which would also handle the problem above; what if Scheduler.saturated and Scheduler.idle were changed to take into account the resources of the currently processing tasks? The downside is that determining whether a worker is saturated or not now becomes much more complicated. For example, say a worker has two resources {'foo': 2, 'bar': 4}. What does it mean for this worker to be saturated? All resources depleted seems reasonable, but I think it would interesting to explore whether the question should instead be "Is this worker saturated with respect to these resources?". Before I get too far down this rabbit hole, I wanted to pause and sanity check with everyone here.

mrocklin and others added 7 commits June 5, 2019 10:09
* bokeh -> dashboard in template

* Add doc to ProfileTimePlot

* Add test for bokeh worker routes

* Remove info route from worker

To do this we ...

1.  Remove the baked in "info" link in the base template
2.  Add that to the scheduler's list of links
3.  Add a redirect from "info" to the actual page
4.  Create a generic redirect route
5.  Move that and the RequestHandler to utils to avoid code duplication
    between scheduler and worker

Fixes dask#2722

* Add worker name

Fixes dask/dask#4878
* Add type name to LocalCluster.__repr__
* Add SpecCluster.new_worker_spec method

This is helpful for subclassing
* Move some of the adaptive logic into the scheduler
* don't close closed clusters
* require pytest >= 4 in CI
* use worker_spec if it exists
* Don't scale a closed cluster
* handle intermittent failures
This also fixes an ImportError in prometheus-client=0.7
Copy link
Member

@mrocklin mrocklin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I had a review sitting here for a few days. I apologize for the delay in response.

if not _has_restrictions(what):
logger.debug(
"Task %s has loose restrictions or no restrictions at all", what.key
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that this causes a logging event for every non-matching worker for every task at every stealing balance call, is that correct? If so, we can't do this, both because it will flood logs, and because the call to logger.debug can be non-trivially expensive.

return out


def _can_steal(ws, what, from_):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

def _can_steal(ws, what, from_):

I like the use of the pronoun ws for a worker state. I think that if what is a task state then we should probably use the ts pronoun instead. It's not easy to see what what means otherwise without reading the docstring carefully. If ws and from_ are both worker states then I recommend the names thief and victim instead.

return out


def _can_steal(ws, what, from_):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like we're going to call this function a lot and it also seems like a lot of it might be pretty repetitive. Do you have any thoughts on how to reduce repetition here?

As an example, lets imagine that we have 1000 workers and 10,000,000 tasks. I think that we call balance every 100ms and we'd like stealing to take up less than 10% of our CPU scheduler overhead. My guess is that we don't currently stay under budget when there are restrictions, and my guess is that a lot of it has to do with this function. I think that it would be good to take a hard look at this function and reduce costs. Some of this might be through things like caching, but I'm not sure of the best approach here.

TomAugspurger and others added 3 commits June 8, 2019 11:29
This allows for setting the config after importing distributed

xref dask/dask-examples#75 (comment)
Co-Authored-By: Matthew Rocklin <mrocklin@gmail.com>
@calebho
Copy link
Contributor Author

calebho commented Jun 13, 2019

As an example, lets imagine that we have 1000 workers and 10,000,000 tasks. I think that we call balance every 100ms and we'd like stealing to take up less than 10% of our CPU scheduler overhead. My guess is that we don't currently stay under budget when there are restrictions, and my guess is that a lot of it has to do with this function. I think that it would be good to take a hard look at this function and reduce costs. Some of this might be through things like caching, but I'm not sure of the best approach here.

If this is the concern, then I don't think _can_steal is the issue here because it's linear in the number of victim resources and this number is generally small (< 10 seems reasonable to assume). Even if it somehow runs in constant time w.r.t. the number of victim resources it won't matter if it's being called 10 billion times.

Looking at this if-else block, one way to get around this cost might be to sample a constant number of workers from idle instead of using the entire population. The issue then becomes how to construct an appropriate distribution over the idle workers such that

  1. the expected number of workers in the sample which can steal the given task is high, and
  2. constructing such a sample scales w.r.t. the number of workers.

mrocklin and others added 5 commits June 18, 2019 14:55
@martindurant
Copy link
Member

@mrocklin , @TomAugspurger , I am not following this discussion, but the OP seems to have replied to at least some concerns - did you want more specific information or work?

@martindurant
Copy link
Member

ah , @calebho , there seems to be a conflict now - sorry about that. Can you please merge from master?

TomAugspurger and others added 7 commits June 24, 2019 15:11
Addresses stealing tasks with resource restrictions, as mentioned in dask#1851.
If a task has hard restrictions, do not just give up on stealing.
Instead, use the restrictions to determine which workers can steal it
before attempting to execute a steal operation.

A follow up PR will be needed to address the issue of long-running tasks
not being stolen because the scheduler has no information about their
runtime.
Co-Authored-By: Matthew Rocklin <mrocklin@gmail.com>
@calebho
Copy link
Contributor Author

calebho commented Jun 24, 2019

Oops I might have inadvertently polluted this PR with a bunch of commits from upstream because I rebased. I guess I can remake the branch on my fork?

@martindurant
Copy link
Member

Sure, start a new PR, if more convenient.

mrocklin pushed a commit that referenced this pull request Mar 3, 2020
Addresses stealing tasks with resource restrictions, as mentioned in #1851.
If a task has hard restrictions, do not just give up on stealing.
Instead, use the restrictions to determine which workers can steal it
before attempting to execute a steal operation.

A follow up PR will be needed to address the issue of long-running tasks
not being stolen because the scheduler has no information about their
runtime.

Supercedes #2740
@jrbourbeau
Copy link
Member

Closing as this PR was superseded by #3069. Thanks @calebho!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants