Skip to content

Commit

Permalink
Improve upon yielding control to the event loop
Browse files Browse the repository at this point in the history
- In order to guarantee that the event loop can run other tasks more efficiently, asyncio.sleep(0) seems to be the most efficient way to yield control back to the event loop in a way that many tasks can still run concurrently without quickly timing out.
- Increase the default timeout to look for a response to a request from 20 seconds to 50 seconds.
- Make the caching methods in the request processor synchronous since they don't need to be async.
  • Loading branch information
fselmo committed Oct 26, 2023
1 parent 2db5fee commit 2c78c12
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 23 deletions.
12 changes: 8 additions & 4 deletions tests/core/providers/test_wsv2_provider.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import json
import pytest
import sys
Expand Down Expand Up @@ -87,7 +88,7 @@ async def test_async_make_request_returns_cached_response_with_no_recv_if_cached

# cache the response, so we should get it immediately & should never call `recv()`
desired_response = {"jsonrpc": "2.0", "id": 0, "result": "0x1337"}
await provider._request_processor.cache_raw_response(desired_response)
provider._request_processor.cache_raw_response(desired_response)

response = await method_under_test(RPCEndpoint("some_method"), ["desired_params"])
assert response == desired_response
Expand All @@ -104,15 +105,18 @@ async def test_async_make_request_returns_cached_response_with_no_recv_if_cached
reason="Uses AsyncMock, not supported by python 3.7",
)
async def test_async_make_request_times_out_of_while_loop_looking_for_response():
provider = WebsocketProviderV2("ws://mocked", request_timeout=0.1)
timeout = 0.001
provider = WebsocketProviderV2("ws://mocked", request_timeout=timeout)

method_under_test = provider.make_request

_mock_ws(provider)
provider._ws.recv.side_effect = lambda *args, **kwargs: b'{"jsonrpc": "2.0"}'
# mock the websocket to never receive a response & sleep longer than the timeout
provider._ws.recv = lambda *args, **kwargs: asyncio.sleep(1)

with pytest.raises(
TimeExhausted,
match=r"Timed out waiting for response with request id `0` after 0.1 second",
match=r"Timed out waiting for response with request id `0` after "
rf"{timeout} second\(s\)",
):
await method_under_test(RPCEndpoint("some_method"), ["desired_params"])
16 changes: 8 additions & 8 deletions web3/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -369,8 +369,13 @@ async def _ws_recv_stream(self) -> AsyncGenerator[RPCResponse, None]:
)

while True:
# sleep(0) here seems to be the most efficient way to yield control back to
# the event loop while waiting for the response to be cached or received on
# the websocket.
await asyncio.sleep(0)

# look in the cache for a response
response = await self._request_processor.pop_raw_response(subscription=True)
response = self._request_processor.pop_raw_response(subscription=True)
if response is not None:
break
else:
Expand All @@ -380,20 +385,15 @@ async def _ws_recv_stream(self) -> AsyncGenerator[RPCResponse, None]:
try:
# keep timeout low but reasonable to check both the cache
# and the websocket connection for new responses
response = await self._provider._ws_recv(timeout=2)
response = await self._provider._ws_recv(timeout=0.5)
except asyncio.TimeoutError:
# if no response received, continue to next iteration
continue

if response.get("method") == "eth_subscription":
break
else:
await self._provider._request_processor.cache_raw_response(
response
)

# this is important to let asyncio run other tasks
await asyncio.sleep(0.05)
self._provider._request_processor.cache_raw_response(response)

yield await self._process_ws_response(response)

Expand Down
2 changes: 1 addition & 1 deletion web3/providers/persistent.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
RPCResponse,
)

DEFAULT_PERSISTENT_CONNECTION_TIMEOUT = 20
DEFAULT_PERSISTENT_CONNECTION_TIMEOUT = 50


class PersistentConnectionProvider(AsyncJSONBaseProvider, ABC):
Expand Down
6 changes: 2 additions & 4 deletions web3/providers/websocket/request_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,9 +191,7 @@ def append_middleware_response_processor(

# raw response cache

async def cache_raw_response(
self, raw_response: Any, subscription: bool = False
) -> None:
def cache_raw_response(self, raw_response: Any, subscription: bool = False) -> None:
if subscription:
self._provider.logger.debug(
f"Caching subscription response:\n response={raw_response}"
Expand All @@ -208,7 +206,7 @@ async def cache_raw_response(
)
self._request_response_cache.cache(cache_key, raw_response)

async def pop_raw_response(
def pop_raw_response(
self, cache_key: str = None, subscription: bool = False
) -> Any:
if subscription:
Expand Down
14 changes: 8 additions & 6 deletions web3/providers/websocket/websocket_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,13 +169,18 @@ async def _match_response_id_to_request_id() -> RPCResponse:
request_cache_key = generate_cache_key(request_id)

while True:
# sleep(0) here seems to be the most efficient way to yield control
# back to the event loop while waiting for the response to be cached
# or received on the websocket.
await asyncio.sleep(0)

if request_cache_key in self._request_processor._request_response_cache:
# if response is already cached, pop it from cache
self.logger.debug(
f"Response for id {request_id} is already cached, pop it "
"from the cache."
)
return await self._request_processor.pop_raw_response(
return self._request_processor.pop_raw_response(
cache_key=request_cache_key,
)

Expand All @@ -189,7 +194,7 @@ async def _match_response_id_to_request_id() -> RPCResponse:
try:
# keep timeout low but reasonable to check both the
# cache and the websocket connection for new responses
response = await self._ws_recv(timeout=2)
response = await self._ws_recv(timeout=0.5)
except asyncio.TimeoutError:
# keep the request timeout around the whole of this
# while loop in case the response sneaks into the cache
Expand All @@ -209,13 +214,10 @@ async def _match_response_id_to_request_id() -> RPCResponse:
is_subscription = (
response.get("method") == "eth_subscription"
)
await self._request_processor.cache_raw_response(
self._request_processor.cache_raw_response(
response, subscription=is_subscription
)

# this is important to let asyncio run other tasks
await asyncio.sleep(0.05)

try:
# Add the request timeout around the while loop that checks the request
# cache and tried to recv(). If the request is neither in the cache, nor
Expand Down

0 comments on commit 2c78c12

Please sign in to comment.