Skip to content

Workers can fetch remote data when local clients are busy#5206

Open
wwoods wants to merge 1 commit intodask:mainfrom
wwoods:main-localfix
Open

Workers can fetch remote data when local clients are busy#5206
wwoods wants to merge 1 commit intodask:mainfrom
wwoods:main-localfix

Conversation

@wwoods
Copy link
Copy Markdown
Contributor

@wwoods wwoods commented Aug 12, 2021

Prior to this commit, task results required would be fetched only from local workers if they were available. If all local workers were busy, but the work were available on another machine, this would result in an indefinite delay. This patch allows local workers to be temporarily rejected, allowing for remote workers to provide the data when all local workers are busy.

Outstanding:

  • Is there a reason not to do this behavior?
  • Need to clear busy state on transition from new to fetch. E.g., re-allow local workers as soon as is reasonable, while avoiding a time.monotonic() usage in distributed/worker.py.

Prior to this commit, task results required would be fetched only from local workers if they were available. If all local workers were busy, but the work were available on another machine, this would result in an indefinite delay. This patch allows local workers to be temporarily rejected, allowing for remote workers to provide the data when all local workers are busy.
wwoods added a commit to wwoods/distributed that referenced this pull request Aug 12, 2021
Copy link
Copy Markdown
Member

@fjetter fjetter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason not to do this behavior?

I think it is very difficult to decide when this kind behaviour is actually beneficial. Fetching from a remote worker is likely much more expensive than from a local one and the local one might be available again shortly making waiting the better choice.
I can see scenarios where this might be very problematic, though, so I'm wondering how we may find a common ground. In the end, what we likely want to have is that we favour local workers over remote ones but do not exclude either from the selection. However, afaik, we currently don't have a good method to favour one worker over the other.

If we want to extend this mechanism to be more sensible, we should consider to somehow include/remove/extend the Worker.repetitively_busy counter. Right now, this counter is the only control mechanism for busyness and it's not very selective nor robust.
It currently serves one primary purpose, namely to protect the scheduler from being hammered by who_has calls. Every time we encounter a busy worker, we take this as a chance to ask the scheduler whether other workers also have the requested key. We try to do this not too often and implement a rather crude exponential backoff based on this counter. crude because whenever a gather_dep finishes successfully, this counter is reset possibly still allowing the scheduler to be overloaded
There is also a second effect of this backoff, namely the subsequent call to ensure_communicate is delayed, giving the remote workers a chance to work off their work and become available again. this is also not very reliable since there are many places where we call ensure_communicate nowadays and this backoff might actually not help here.

Bottom line, I think this is a good suggestion but I'm still missing something. I would like to keep complexity as small as possible here since this code area is already very complex and has been involved in some of our recent deadlocks.

Comment thread distributed/worker.py
Comment on lines +2181 to +2182
elif not workers_not_busy:
self.busy_workers.difference_update(workers)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC

  1. as soon as a worker is flagged busy it is added to busy_workers
  2. If a worker is in busy_workers, it will always be ignored
  3. The only way to remove a worker from this "busy" state is if all workers of the inspected tasks are flagged busy and only then they are removed from the busy_workers.

Condition 3.) is very strict since it hides a worker completely until we are lucky enough to hit a task with only busy who_has entries.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're following -- I will point out that the 2nd unchecked TODO on this PR is resetting a peer's busy state when fetching a new key belonging to that peer, which will reduce condition 3's strictness. However, I didn't want to go ahead and bother with figuring out how to do that until the larger discussion re: the best approach to dealing with this was resolved.

Comment thread distributed/worker.py Outdated
Comment on lines +2377 to +2379
self.log.append(("busy-gather", worker, to_gather_keys))
self.busy_workers.add(worker)
self.busy_workers_log.add(worker)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer you using the Worker.log and test for the busy-gather event instead of introducing another set which is simply used for testing

@wwoods
Copy link
Copy Markdown
Contributor Author

wwoods commented Aug 13, 2021

I think it is very difficult to decide when this kind behaviour is actually beneficial

Absolutely. Your detail of the context for a change like this is appreciated. Overall, I'm not sure I have the time to engage in a very deep discussion, but I'll fill in a couple of thoughts below.

However, afaik, we currently don't have a good method to favour one worker over the other.

I'd contend that's exactly what this code, even before this PR, was accomplishing.

It currently serves one primary purpose, namely to protect the scheduler from being hammered by who_has calls.

I haven't traced the code up this far, but I suppose I assumed that once a list of peers with the required data was furnished to the worker, it stopped asking the scheduler? That is, as long as the peers are responding, the likelihood of those peers still having the data and being the only ones with the data is relatively high. The code that this PR touches should purely be a way of prioritizing peers from which to fetch the data in order of busy-ness / optimality. I would presume that the need to query the scheduler for a potentially new set of peers with the data should be an independent line of inquiry (potentially kicked off after this line fails for some amount of time).

Worker.repetitively_busy

That seems reasonable. Repetitively_busy could become a mapping of {worker: count busy} rather than the global scalar it currently is; perhaps the best way to juggle your reasonable concern regarding "when should we do this" would be to set a minimum "count busy" for all local workers after which remote workers could be considered. If this were set fairly high, then that would certainly cause a heavy preference for local data. If we have access to it, the size of data being requested could/should certainly be factored into that threshold.

EDIT: Thinking on this a bit more, one could basically implement a "wear leveling" analogue for peers with this approach too. E.g., rather than the current random.choice(workers), it could be random.choice(workers_with_lowest_count_busy). Count busy could then be some monotonic "cost spent trying to gather from this peer" integrator rather than strictly a count.

@fjetter
Copy link
Copy Markdown
Member

fjetter commented Aug 17, 2021

However, afaik, we currently don't have a good method to favour one worker over the other.

I'd contend that's exactly what this code, even before this PR, was accomplishing.

I was referring to some kind of weighting, not a binary classifier (like include/exclude)

I would presume that the need to query the scheduler for a potentially new set of peers with the data should be an independent line of inquiry (potentially kicked off after this line fails for some amount of time).

Agreed. I don't think it has to be at this exact position and could be offloaded to some other place or triggered by a slightly different condition. I just wanted to point out the status quo

Thinking on this a bit more, one could basically implement a "wear leveling" analogue for peers with this approach too. E.g., rather than the current random.choice(workers), it could be random.choice(workers_with_lowest_count_busy). Count busy could then be some monotonic "cost spent trying to gather from this peer" integrator rather than strictly a count.

I was also thinking about something like that

# pseudo
repetitively_busy = {worker: counter}
weights = {worker: repetitively_busy[worker] if local else repetitively_busy[worker] + 5}  # offset TBD
worker_to_fetch_from = min(repetitively_busy, key=weights)

This weighting might seem a bit of overkill but would serve a few different benefits

  • Managing the busyness counter is rather trivial. There is a well defined rule about when to increase and decrease the value. No arbitrary timer or callback necessary.
  • If, for some reason, we neglect to reset state, the counter/weight system is guaranteed not to deadlock or exclude any workers indefinitely. It might not be operating at peak performance but it would never lock.

Less important but potentially interesting are also dynamically adjusted weights which inform us to pick the "best" worker based on...

  • inter-worker latency/bandwidth (we don't measure these right now but that would be an easy thing to do)
  • data size (maybe for small/big data sizes the choice between local vs. remote is less important?)
  • Current occupancy (Maybe this worker doesn't desperately need data since it still has a lot of work to do and can afford to wait a little longer for a local peer to reduce cluster load)
  • ???
    These are obviously experiments since I don't have a solid handle on whether this would have a real world impact. Still, we could easily perform experiments using these weights

jacobtomlinson pushed a commit that referenced this pull request Aug 23, 2021
#5197)

* Addressing #5159 (IPv6 dask-worker support) by propagating empty host correctly

* black / flake8 fixes

* Fixing IPv6 addresses being passed around; added tests

* Fixing rpartition differential; reverting test change

* Patching default address to match scheduler; fixing bug with fetching work from busy Worker

Prior to this commit, task results required would be fetch only from local workers if they were available. If all local workers were busy, but the work were available on another machine, this would result in an indefinite delay. This patch allows local workers to be temporarily rejected, allowing for remote workers to provide the data when all local workers are busy.

* Splitting out busy_workers into #5206

* Adding CLI test for IPv6 support

* Black fix

* Isort fix...

* Changing assertion to ValueError
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants