Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Worker.log_event is not threadsafe / BatchedSend.send not threadsafe #5552

Closed
gjoseph92 opened this issue Dec 1, 2021 · 1 comment · Fixed by #5946
Closed

Worker.log_event is not threadsafe / BatchedSend.send not threadsafe #5552

gjoseph92 opened this issue Dec 1, 2021 · 1 comment · Fixed by #5946
Labels
bug Something is broken diagnostics

Comments

@gjoseph92
Copy link
Collaborator

What happened:

Calling get_worker().log_event(...) from multiple worker threads can fail (or probably corrupt the BatchedSend), since BatchedSend.send is not thread-safe.

I saw a traceback like

/opt/conda/envs/coiled/lib/python3.8/site-packages/distributed/worker.py in log_event()

/opt/conda/envs/coiled/lib/python3.8/site-packages/distributed/batched.py in send()

/opt/conda/envs/coiled/lib/python3.8/site-packages/tornado/locks.py in set()

RuntimeError: Set changed size during iteration

This is probably coming from the _background_send coroutine calling wait on the tornado Event (adding to its _waiters set) while the send call in a worker thread calls set (iterating over the _waiters set).

What you expected to happen:

Users don't have to manage thread-safety of this API; it locks automatically.

Anything else we need to know?:

This is certainly a rarely-used API, but it is advertised in the docs: http://distributed.dask.org/en/stable/logging.html#structured-logs.

Easy to fix (sync?), though not a high priority, just wanted to note it for posterity.

Environment:

  • Dask version: 2021.11.2
  • Python version: 3.8.7
  • Operating System: linux
  • Install method (conda, pip, source): pip
@gjoseph92 gjoseph92 added bug Something is broken diagnostics labels Dec 1, 2021
gjoseph92 added a commit to gjoseph92/stackstac that referenced this issue Dec 1, 2021
This would occasionally cause errors because of dask/distributed#5552, and wasn't very useful anyway.
gjoseph92 added a commit to gjoseph92/stackstac that referenced this issue Dec 1, 2021
This would occasionally cause errors because of dask/distributed#5552, and wasn't very useful anyway.
gjoseph92 added a commit to gjoseph92/stackstac that referenced this issue Dec 1, 2021
It seems to be something broken with BatchedSend (what a surprise!).
_maybe_ something to do with visualizing something that's been persisted
(Futures that depend on other Futures)? Locally at least, commenting out
the `persist()` seems to make things work smoothly.

But the main problem we see is messages getting stuck in the BatchedSend
and not getting sent. If you look at `client.scheduler_comm` when things
should be computing but aren't, you'll usually see that there are
messages in the buffer.

In these cases, `scheduler_comm.waker` is not set. This seems wrong;
calling send should set the event. Unless the deadline is set.

But also, `waker._waiters` is empty, as though nothing is waiting on the
event either. I suppose this would be the case while `background_send`
is awaiting the `comm.write`. Or there's some broken thread-safety
(dask/distributed#5552).

Another thing I've noticed is that `scheduler_comm.next_deadline -
scheduler_comm.loop.time()` goes very, very negative (up to hundreds of
seconds) while there are things in the buffer. As in it's hundreds of
seconds behind when it should have sent.
One question is what happens the deadline for `waker.wait` has already
passed. The "avoid sprious wakeups" logic is assuming that so long as
any deadline is set, the coroutine is going to wake up soon anyway. This
would only be the case if that timeout actually works. It's possible
that a `comm.write` takes longer than the interval, and by the time we
come back around to `waker.wait`, the deadline has already passed. If
the wait returns immediately in that case, then we're fine. But if
tornado schedules the callback into the ether, then we'll never wake up.
Though this still doesn't line up with the empty `_waiters` on the
event.

I guess the last explanation is just that `await comm.write` is really,
really, really slow. If it takes hundreds of seconds to do the write
(including serialization), then of course the next batch wouldn't be
sent yet. Websocket comms do have some ridiculous copying. And maybe
because the binder machines and network are so wimpy, it's just extra
slow there? Feel like this doesn't quite add up though.
@fjetter
Copy link
Member

fjetter commented Mar 10, 2022

I'm wondering what the worst case of a non-thread safe interaction would be. As the post points out, this can raise a RuntimeError whenever a send is called. That typically happens after a message has been queued up in the buffer so the worst case for a single message submit would be that a message submission is delayed until the next wake. No big deal

However, all patterns that send many messages subsequently would be at risk of dropping a message

for msg in msgs:
    batched.send(msg)

I'm further wondering if this race condition could not corrupt the tornado.Event itself, such that the BatchedSend._background_send might deadlock since actions on futures (under the hood) are also not thread safe and might leave the critical future in a corrupt state


Today, we investigated this topic briefly around a spurious connection failure but I don't think any of this could actually corrupt the connection itself.

gjoseph92 added a commit to gjoseph92/distributed that referenced this issue Mar 15, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something is broken diagnostics
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants