-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
WebsocketProviderV2 recv()
lock + other multi-tasking support
#3125
Conversation
4507da5
to
a0752bc
Compare
9e34643
to
0cb3778
Compare
0cb3778
to
b2dcbc5
Compare
b2dcbc5
to
69647ba
Compare
69647ba
to
e185952
Compare
e185952
to
f677194
Compare
- Bonus: Add API for retrieving active subscriptions
- If we are listening to a websocket persistently, we shouldn't worry about timeouts in the same sense as a request that is waiting for a response. Instead, keep looping through the cycle of checking the ubscriptions deque / cache and trying to recv() on the websocket connection until something comes up.
- define an ``__await__()`` method that allows for awaiting a persistent connection to the websocket; e.g. ``w3 = await AsyncWeb3.persistent_websocket(WebsocketProviderV2)``
f677194
to
d9418dd
Compare
- Add the beginnings of the documentation for why and how ``PersistentConnectionProvider`` instances cache request information and responses in order to process them appropriately. - Link to the ``RequestProcessor`` documentation from the ``WebsocketProviderV2`` docs.
- One-to-many response logic doesn't use the request_timeout for the ``recv()`` since it indefinitely listens to the open websocket and with each turn of the while loop it needs to check the cache again. Use a similar logic for one-to-one requests except add the request timeout around the entirety of the while loop. This makes it so that we look for a response throughout the defined request_timeout time but we make sure to constantly check both the cache and call ``recv()`` on the websocket, in case the response has snuck into the cache from another request made somewhere else or from a persistent listener set up to receive messages.
d9418dd
to
1ddacf5
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nothing major, looks good overall!
@@ -524,6 +527,8 @@ def persistent_websocket( | |||
class _PersistentConnectionWeb3(AsyncWeb3): | |||
provider: PersistentConnectionProvider | |||
|
|||
# w3 = AsyncWeb3.persistent_websocket(provider) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comments that follow kind of look like code that was commented out. Recommend creating comment blocks that call these examples out more explicitly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I really only meant these to be for organization / reference. The websockets library does a similar thing with their connect()
method here. I feel like we could let the documentation do the explaining. Thoughts?
@@ -524,6 +527,8 @@ def persistent_websocket( | |||
class _PersistentConnectionWeb3(AsyncWeb3): | |||
provider: PersistentConnectionProvider | |||
|
|||
# w3 = AsyncWeb3.persistent_websocket(provider) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like these examples of usage. Would a label, something like # Example usage:
or # Access this method with pattern:
make it more clear for users?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd like to leave that up to the documentation. This is more so we separate the logic for each of those design patterns into code blocks and so contributors know where to look for it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nothing came up in messing with it. Once existing notes are handled, lgtm!
What was wrong?
Some use cases for
WebsocketProviderV2
don't allowasyncio.gather(tasks...)
, for example, as it will halt when two coroutines try to callrecv()
on the websocket connection at once. This PR attempts to ameliorate these scenarios.Add an
asyncio.Lock
around the websocketrecv()
method.Change
listen_to_websocket()
so that it only listens to subscription (or one-to-many request / response) messages. This is because everymake_request()
expects a 1-to-1 response-to-request and expects it to be returned on the same line e.g.variable_assignment = await w3.eth.block_number
. If we are persistently listening to the socket, we clearly expect many messages. So, we only listen to messages from one-to-many calls, triggered by aws.send()
.Add an API for retrieving active subscriptions:
w3.ws.subscriptions
(WebsocketConnection.subscriptions)Because of the separation of 1-to-1 request / response and one-to-many send() to recv() messages, separate the
RequestProcessor
caches into two types of "caches". The subscription cache is really a deque that always usespopleft()
, so make it one. The response cache is looking for a particular cache key so this still makes sense to keep as anOrderedDict
.Defines an
__await__()
on the_PersistentConnectionWeb3
class so theAsyncWeb3.persistent_websocket()
instantiation can be awaited(i.e.
w3 = await AsyncWeb3.persistent_websocket(WebsocketProviderV2(...))
). New integration test suite added with this pattern of instantiation.Todo:
Cute Animal Picture
snake socks