Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Binance snapshot race condition #673

Closed
wants to merge 2 commits into from

Conversation

jinusean
Copy link
Contributor

@jinusean jinusean commented Oct 3, 2021

Fixes #671

  • - Tested
  • - Changelog updated
  • - Tests run and pass
  • - Flake8 run and all errors/warnings resolved
  • - Contributors file updated (optional)

@bmoscon
Copy link
Owner

bmoscon commented Oct 3, 2021

@nirvana-msu does this fix your issue? Seems reasonable to me, but I haven't tested it

@nirvana-msu
Copy link
Contributor

@jinusean no it definitely does not fix it. It may make the exception go away, but the effect is even worse as it would instead lead to silently generating incorrect order book state. Actually, it doesn't even make exception go away - if self._book_buffer condition being True does not imply that there is std_pair in the dict, so del self._book_buffer[std_pair] will still often raise.

But that's not important.

The fundamental issue here is that due to feed restarts, there may be multiple of these _concurrent_snapshot coroutines competing with each other to do the same thing. There will never be any good from having multiple _concurrent_snapshot executing at the same time, regardless of how we change the code inside. The only way to guarantee correct order book state, is to ensure that a snapshot is overlayed with all subsequent order book updates in the correct order. When you have multiple of these coroutines running at the same time, they'll screw it up because the operations are non-atomic and their order it not guaranteed. For instance, coroutine A pops first update from _book_buffer, and coroutine B pops second update. It could be that B actually applies second udpate before A applies first (we cannot rely on the order in which coroutines will execute on the loop). Further, another one of these couroutines may at any point reset the snapshot - and if the snapshot it sets actually preceeded some of the updates that have already been applied (could easily be as the order of HTTP responses is not guaranteed to match order of requests), there'd be no way to recover correct state.

TL;DR is that there is no way for this to work correctly when there is a possibility of more than one _concurrent_snapshot executing concurrently. The correct way to fix it should be to cancel old tasks before starting new one on a feed restart - i.e. to guarantee that there will only be one _concurrent_snapshot executing for this pair at a time.

@jinusean
Copy link
Contributor Author

jinusean commented Oct 3, 2021

Ah understood. So a cache of the running tasks and a cancellation of those tasks should suffice?

@nirvana-msu
Copy link
Contributor

nirvana-msu commented Oct 3, 2021

Something of this sort. You'll probably also need to await cancelled tasks to avoid messages from asyncio that an exception has never been retrieved. So possibly something along the lines of:

    def _reset(self):
        self._l2_book = {}
        self.last_update_id = {}

        if self.concurrent_http:
            for task in self._concurrent_snapshot_task_cache:
                task.cancel()
                try:
                    await task
                except asyncio.CancelledError:
                    pass
            # buffer 'depthUpdate' book msgs until snapshot is fetched
            self._book_buffer: Dict[str, deque[Tuple[dict, str, float]]] = {}

But even then, I think there'd still be a possibility that a previous stream (the one currently running, preceding the reset) just didn't get to start the _concurrent_snapshot task yet (so its not yet in the cache) - but will do so soon, leading again to a race condition.

i.e. the issue here is that we have two connections running / processing updates at the same time. Perhaps a better way to deal with it would be to ensure a connection, along with all pending tasks, is cancelled at at a higher level (in connection_handler?) before resetting and initiating a new one. Clearly that doesn't seem to happen at the moment (otherwise we'd never run into this race condition). I don't understand internals well enough yet to tell for sure - @bmoscon any advice?

@bmoscon
Copy link
Owner

bmoscon commented Oct 3, 2021

maybe just use an asyncio mutex/semaphore so only one task can run for a symbol at a time?

@nirvana-msu
Copy link
Contributor

nirvana-msu commented Oct 3, 2021

But it’s not just a matter of only having one run at a time. We have to ensure a task from an older connection would never be run after a task from a newer connection. If it does, it can screw up order book state (?). We somehow need to ensure that an old connection is completely cancelled with all corresponding tasks and pending coroutimes, before we proceed with handling book state in a new connection.

@bmoscon
Copy link
Owner

bmoscon commented Oct 3, 2021

perhaps this should just be removed them, it seems like its adding a lot of unnecessary complexity

@nirvana-msu
Copy link
Contributor

What should be removed?

@bmoscon
Copy link
Owner

bmoscon commented Oct 3, 2021

the parallel snapshot code. Its kind of unnecessary. The user can get the same behavior by creating a feed per symbol

@nirvana-msu
Copy link
Contributor

So some way of doing this concurrently is definitely needed - it’s a great boost to performance. I’m not sure if a starting a feed per symbol if feasible when e.g. subscribing to each symbol on Binance? Are there rate limits on number of websocket connections?

Maybe the fix isn’t so hard.. I’m not sure if what I described above (with a possibility of a task not yet being in the cache when we reset, but added later) is actually possible - I’ll need to check what exactly happens when a connection is reset. Let me dig in the internals a bit to understand it better.

@nirvana-msu
Copy link
Contributor

Ok so looking at the code, I don't think the more complicated scenario I was describing is possible. By the time the new connection is created (and state is being reset), there should be nothing else running from the old connection, except for those dangling _concurrent_snapshot tasks.

So unless I'm mistaken, it should be sufficient to simply keep track of those running _concurrent_snapshot tasks in a cache, and cancel them (like I showed in #673 (comment)) when resetting.

@jinusean would you take a go at it?

@jinusean
Copy link
Contributor Author

jinusean commented Oct 4, 2021

@nirvana-msu I've added the recommended changes but I'm still getting a KeyError on the initial cancellation. Could you take a look?

@villekuosmanen
Copy link
Contributor

villekuosmanen commented Oct 4, 2021

Seems like a tough concurrency bug, I was trying to think of good solutions but couldn't come up with any.

It feels like the biggest problem here is how book_buffer is shared between coroutines, even though it's used as a local store for buffered messages. If each coroutine had its own book_buffer, I don't think having multiple ones running at the same time would really matter since the sequence numbers would help us filter out duplicate updates. Of course, if you made book_buffer local to each coroutine, we would still need a way to append new book messages to them and I don't know if there's a neat way to do that in Python.

I'll definitely check if this bug exists in the refresh snapshot logic in #606 when I have time, but since these snapshots are refreshed at most once per minute I think we should be fine on that front.

@jinusean
Copy link
Contributor Author

jinusean commented Oct 4, 2021 via email

@nirvana-msu
Copy link
Contributor

nirvana-msu commented Oct 8, 2021

So I've done some debugging. I can explain why cancellation isn't working, and also I've found an even bigger flaw in the current concurrent_http logic. So here's exactly what's happening:

  1. At first all goes as planned, a few book updates getting accumulated in self._book_buffer and finally a snapshot is fetched and self._l2_book is populated.
  2. Some of the pending updates in self._book_buffer may be processed by _concurrent_snapshot as expected (to be exact, they'll be discarded as they preceeded the snapshot).
    However, most likely before all of them are processed, another book update comes from the feed and it hits this logic:
    if std_pair in self._l2_book:
    return await self._handle_book_msg(msg, pair, timestamp)

    This is where the big flaw is. It only checks if self._l2_book exists for our pair - and if so, it proceceeds handling this book update. It completely disregards the fact that we may still have unprocessed updates in the self._book_buffer queue, that must be applied before this one. Instead, the logic should also check for any pending updates in self._book_buffer, and if there are any, it should instead just append to the end of the queue.

So lets continue with what happens:
That new feed update came and was processed, bypassing the queue in self._book_buffer (which still has some messages). So far it's ok - this is the update that actually follows the snapshot, so it's been applied properly. But note that self._l2_book[std_pair].delta is no longer None once it has been processed.
3) Now _concurrent_snapshot gets a chance to run. It pops the next update and checks message id using _check_update_id. The first condition which is supposed to skip updates preceding the snapshot now fails, because self._l2_book[std_pair].delta is not None anymore! But the update is clearly from the past, so the checks that follow do not match, and it decides to reset the book state:

if self._l2_book[std_pair].delta is None and msg['u'] <= self.last_update_id[std_pair]:
return True
elif msg['U'] <= self.last_update_id[std_pair] + 1 <= msg['u']:
self.last_update_id[std_pair] = msg['u']
return False
elif self.last_update_id[std_pair] + 1 == msg['U']:
self.last_update_id[std_pair] = msg['u']
return False
else:
self._reset()

4) Recall that we are still within the task wrapping _concurrent_snapshot (that's important!). While doing reset, we cancel the task and await cancellation:
https://github.com/jinusean/cryptofeed/blob/fix-binance-buffer/cryptofeed/exchanges/binance.py#L168-L171
We then suppress CancelledError. The confusing part here is that the error we've just caught is not the one that we wanted to catch when awaiting the task result. Instead, this is the CancelledError thrown inside the task itself. it's done that way to give a task the chance to clean up. So we're suppressing the cancellation error within the task and hence the task is never actually cancelled.
And because the coroutine is never cancelled, and self._book_buffer gets reset, then KeyError is raised.

So there are two takeaways:

  1. The logic bug that processes new updates while there are still pending updates in self._book_buffer should be fixed. That can be done by simply changing the code to something like:
        if std_pair in self._l2_book and std_pair not in self._book_buffer:
            return await self._handle_book_msg(msg, pair, timestamp)
  1. We should make the logic that awaits task cancellation more robust. Ideally, it would be best to avoid having this problem in the first place - i.e. best scenario is to refactor the code such that the task cancellation could not be invoked from within the task itself. Or else we need to deal with it somehow.. A couple of related discussions (though not quite the same as our issue):
    https://stackoverflow.com/a/55424838/5540279
    https://bugs.python.org/issue35945

NB: Once you fix the first issue, the code will start working - because when the task is cancelled from outside it behaves properly. But second issue should still not be ignored - when in the future the code changes such that task cancellation would happen from the inside, the issue would re-appear.

@nirvana-msu
Copy link
Contributor

I can suggest one way how to tackle the second issue. We need to refactor the code to avoid ever having to cancel the task from within itself. It means we cannot simply call self._reset() within _check_update_id (because it could be called from inside _concurrent_snapshot task). But what we could do, is use exceptions for flow control instead.

So instead of calling self._reset() within _check_update_id, we raise a custom error like InconsistentBookStateError. Then we'd need to handle this error differently whether we catch it inside _concurrent_snapshot, or outside it. If caught outside _concurrent_snapshot we can just await self._reset(), but if caught inside it, we need some special handling. One way I can think of would be to pop the task itself from the cache of tasks (as we want to avoid cancelling ourselves from inside), then await self._reset() to ensure all other tasks are cancelled, and finally we end the task itself by just returning from it.

@bmoscon
Copy link
Owner

bmoscon commented Oct 9, 2021

@jinusean - will need to rebase with the latest changes, then I think the suggestion from @nirvana-msu can be implemented and we'll be good to go here

@bmoscon
Copy link
Owner

bmoscon commented Oct 12, 2021

@jinusean - I reverted most of the changes for concurrent snapshot, so once you have it fixed in your codebase you'll need to re-integrate based off of what I have (and reapply the changes I removed). Too many bugs in the binance codebase, so I had to remove that large set of changes so I can push out a release while I wait for these issues to be resolved.

@bmoscon
Copy link
Owner

bmoscon commented Oct 12, 2021

you can see what I reverted here: b90fa9b

@bmoscon
Copy link
Owner

bmoscon commented Oct 25, 2021

its been over 3 weeks with no response. I'm going to consider this matter closed. If you wish to revisit the parallel snapshots, please re-open a new PR

@bmoscon bmoscon closed this Oct 25, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Race Condition in Binance L2_BOOK feed/snapshot logic when concurrent_http is True and feed is restarted
4 participants