Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Websockets v2 iterator #3067

Merged
merged 8 commits into from
Aug 1, 2023
55 changes: 46 additions & 9 deletions docs/providers.rst
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,24 @@ WebsocketProviderV2 (beta)
* ``call_timeout`` is the timeout in seconds, used when receiving or sending data
over the connection. Defaults to ``None`` (no timeout).

.. code-block:: python
Under the hood, the ``WebsocketProviderV2`` uses the python websockets library for
making requests. If you would like to modify how requests are made, you can
use the ``websocket_kwargs`` to do so. See the `websockets documentation`_ for
available arguments.

The timeout for each call to send or receive is controlled by a ``call_timeout``
argument. This is set to ``None`` by default, which means no timeout.

Usage
~~~~~

The ``AsyncWeb3`` class may be used as a context manager, utilizing the ``async with``
syntax, when connecting via ``persistent_connection()`` using the
``WebsocketProviderV2``. This will automatically close the connection when the context
manager exits. A similar example, using the ``websockets`` connection as an
asynchronous context manager, can be found in the `websockets connection`_ docs.

.. code-block:: python

>>> import asyncio
>>> from web3 import AsyncWeb3
Expand All @@ -246,7 +263,7 @@ WebsocketProviderV2 (beta)
... logger.setLevel(logging.DEBUG)
... logger.addHandler(logging.StreamHandler())

>>> async def ws_v2_subscription_example():
>>> async def ws_v2_subscription_context_manager_example():
... async with AsyncWeb3.persistent_websocket(
... WebsocketProviderV2(f"ws://127.0.0.1:8546")
... ) as w3:
Expand All @@ -272,16 +289,36 @@ WebsocketProviderV2 (beta)
... # the connection closes automatically when exiting the context
... # manager (the `async with` block)

>>> asyncio.run(ws_v2_subscription_example())
>>> asyncio.run(ws_v2_subscription_context_manager_example())


Under the hood, the ``WebsocketProviderV2`` uses the python websockets library for
making requests. If you would like to modify how requests are made, you can
use the ``websocket_kwargs`` to do so. See the `websockets documentation`_ for
available arguments.
The ``AsyncWeb3`` class may also be used as an asynchronous iterator, utilizing the
``async for`` syntax, when connecting via ``persistent_connection()`` using the
``WebsocketProviderV2``. This may be used to set up an indefinite websocket connection
and reconnect automatically if the connection is lost. A similar example, using the
``websockets`` connection as an asynchronous iterator, can be found in the
`websockets connection`_ docs.

.. _`websockets connection`: https://websockets.readthedocs.io/en/stable/reference/asyncio/client.html#websockets.client.connect

.. code-block:: python

>>> import asyncio
>>> from web3 import AsyncWeb3
>>> from web3.providers import WebsocketProviderV2
>>> import websockets

>>> async def ws_v2_subscription_iterator_example():
... async for w3 in AsyncWeb3.persistent_websocket(
... WebsocketProviderV2(f"ws://127.0.0.1:8546")
... ):
... try:
... ...
... except websockets.ConnectionClosed:
... continue

>>> asyncio.run(ws_v2_subscription_iterator_example())

The timeout for each call to send or receive is controlled by a ``call_timeout``
argument. This is set to ``None`` by default, which means no timeout.


AutoProvider
Expand Down
1 change: 1 addition & 0 deletions newsfragments/3067.bugfix.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix the type for the optional param asking for "full transactions" when subscribing to ``newPendingTransactions`` via ``eth_subscribe`` to ``bool``.
1 change: 1 addition & 0 deletions newsfragments/3067.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Asynchronous iterator support for ``AsyncWeb3`` with ``WebsocketProviderV2`` using ``async for`` syntax.
Empty file.
49 changes: 49 additions & 0 deletions tests/integration/go_ethereum/test_goethereum_ws_v2/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import pytest

from tests.integration.common import (
COINBASE,
)
from tests.utils import (
get_open_port,
)


@pytest.fixture(scope="module")
def ws_port():
return get_open_port()


@pytest.fixture(scope="module")
def endpoint_uri(ws_port):
return f"ws://localhost:{ws_port}"


def _geth_command_arguments(ws_port, base_geth_command_arguments, geth_version):
yield from base_geth_command_arguments
if geth_version.major == 1:
yield from (
"--miner.etherbase",
COINBASE[2:],
"--ws",
"--ws.port",
ws_port,
"--ws.api",
"admin,eth,net,web3,personal,miner",
"--ws.origins",
"*",
"--ipcdisable",
"--allow-insecure-unlock",
)
if geth_version.minor not in [10, 11]:
raise AssertionError("Unsupported Geth version")
else:
raise AssertionError("Unsupported Geth version")


@pytest.fixture(scope="module")
def geth_command_arguments(
geth_binary, get_geth_version, datadir, ws_port, base_geth_command_arguments
):
return _geth_command_arguments(
ws_port, base_geth_command_arguments, get_geth_version
)
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,6 @@

import pytest_asyncio

from tests.integration.common import (
COINBASE,
)
from tests.utils import (
get_open_port,
)
from web3 import (
AsyncWeb3,
WebsocketProviderV2,
Expand All @@ -19,56 +13,15 @@
GoEthereumAsyncPersonalModuleTest,
)

from .common import (
from ..common import (
GoEthereumAsyncEthModuleTest,
GoEthereumAsyncNetModuleTest,
)
from .utils import (
from ..utils import (
wait_for_aiohttp,
)


@pytest.fixture(scope="module")
def ws_port():
return get_open_port()


@pytest.fixture(scope="module")
def endpoint_uri(ws_port):
return f"ws://localhost:{ws_port}"


def _geth_command_arguments(ws_port, base_geth_command_arguments, geth_version):
yield from base_geth_command_arguments
if geth_version.major == 1:
yield from (
"--miner.etherbase",
COINBASE[2:],
"--ws",
"--ws.port",
ws_port,
"--ws.api",
"admin,eth,net,web3,personal,miner",
"--ws.origins",
"*",
"--ipcdisable",
"--allow-insecure-unlock",
)
if geth_version.minor not in [10, 11]:
raise AssertionError("Unsupported Geth version")
else:
raise AssertionError("Unsupported Geth version")


@pytest.fixture(scope="module")
def geth_command_arguments(
geth_binary, get_geth_version, datadir, ws_port, base_geth_command_arguments
):
return _geth_command_arguments(
ws_port, base_geth_command_arguments, get_geth_version
)


@pytest_asyncio.fixture(scope="module")
async def async_w3(geth_process, endpoint_uri):
await wait_for_aiohttp(endpoint_uri)
Expand All @@ -79,19 +32,22 @@ async def async_w3(geth_process, endpoint_uri):


class TestGoEthereumAsyncAdminModuleTest(GoEthereumAsyncAdminModuleTest):
@pytest.mark.asyncio
@pytest.mark.xfail(
reason="running geth with the --nodiscover flag doesn't allow peer addition"
)
async def test_admin_peers(self, async_w3: "AsyncWeb3") -> None:
await super().test_admin_peers(async_w3)

@pytest.mark.asyncio
async def test_admin_start_stop_http(self, async_w3: "AsyncWeb3") -> None:
# This test causes all tests after it to fail on CI if it's allowed to run
pytest.xfail(
reason="Only one HTTP endpoint is allowed to be active at any time"
)
await super().test_admin_start_stop_http(async_w3)

@pytest.mark.asyncio
async def test_admin_start_stop_ws(self, async_w3: "AsyncWeb3") -> None:
# This test inconsistently causes all tests after it to
# fail on CI if it's allowed to run
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import pytest

import pytest_asyncio

from web3 import (
AsyncWeb3,
WebsocketProviderV2,
)
from web3._utils.module_testing.go_ethereum_admin_module import (
GoEthereumAsyncAdminModuleTest,
)
from web3._utils.module_testing.go_ethereum_personal_module import (
GoEthereumAsyncPersonalModuleTest,
)

from ..common import (
GoEthereumAsyncEthModuleTest,
GoEthereumAsyncNetModuleTest,
)
from ..utils import (
wait_for_aiohttp,
)


@pytest_asyncio.fixture(scope="module")
async def async_w3(geth_process, endpoint_uri):
await wait_for_aiohttp(endpoint_uri)
async for w3 in AsyncWeb3.persistent_websocket(
WebsocketProviderV2(endpoint_uri, call_timeout=30)
):
return w3


class TestGoEthereumAsyncAdminModuleTest(GoEthereumAsyncAdminModuleTest):
@pytest.mark.asyncio
@pytest.mark.xfail(
reason="running geth with the --nodiscover flag doesn't allow peer addition"
)
async def test_admin_peers(self, async_w3: "AsyncWeb3") -> None:
await super().test_admin_peers(async_w3)

@pytest.mark.asyncio
async def test_admin_start_stop_http(self, async_w3: "AsyncWeb3") -> None:
# This test causes all tests after it to fail on CI if it's allowed to run
pytest.xfail(
reason="Only one HTTP endpoint is allowed to be active at any time"
)
await super().test_admin_start_stop_http(async_w3)

@pytest.mark.asyncio
async def test_admin_start_stop_ws(self, async_w3: "AsyncWeb3") -> None:
# This test inconsistently causes all tests after it to
# fail on CI if it's allowed to run
pytest.xfail(
reason="Only one WebSocket endpoint is allowed to be active at any time"
)
await super().test_admin_start_stop_ws(async_w3)


class TestGoEthereumAsyncEthModuleTest(GoEthereumAsyncEthModuleTest):
pass


class TestGoEthereumAsyncNetModuleTest(GoEthereumAsyncNetModuleTest):
pass


class TestGoEthereumAsyncPersonalModuleTest(GoEthereumAsyncPersonalModuleTest):
pass
4 changes: 2 additions & 2 deletions tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ commands=
integration-goethereum-http_flaky: pytest {posargs:tests/integration/go_ethereum/test_goethereum_http.py --flaky}
integration-goethereum-ws: pytest {posargs:tests/integration/go_ethereum/test_goethereum_ws.py}
integration-goethereum-ws_flaky: pytest {posargs:tests/integration/go_ethereum/test_goethereum_ws.py --flaky}
integration-goethereum-ws-v2: pytest {posargs:tests/integration/go_ethereum/test_goethereum_ws_v2.py}
integration-goethereum-ws-v2_flaky: pytest {posargs:tests/integration/go_ethereum/test_goethereum_ws_v2.py --flaky}
integration-goethereum-ws-v2: pytest {posargs:tests/integration/go_ethereum/test_goethereum_ws_v2}
integration-goethereum-ws-v2_flaky: pytest {posargs:tests/integration/go_ethereum/test_goethereum_ws_v2 --flaky}
integration-ethtester: pytest {posargs:tests/integration/test_ethereum_tester.py}
docs: make -C {toxinidir} validate-docs
deps =
Expand Down
8 changes: 5 additions & 3 deletions web3/eth/async_eth.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@
TxData,
TxParams,
TxReceipt,
TxTypeSubscriptionArg,
Wei,
_Hash32,
)
Expand Down Expand Up @@ -669,7 +668,7 @@ async def uninstall_filter(self, filter_id: HexStr) -> bool:
Callable[
[
SubscriptionType,
Optional[Union[LogsSubscriptionArg, TxTypeSubscriptionArg]],
Optional[Union[LogsSubscriptionArg, bool]],
],
Awaitable[HexStr],
]
Expand All @@ -682,7 +681,10 @@ async def subscribe(
self,
subscription_type: SubscriptionType,
subscription_arg: Optional[
Union[LogsSubscriptionArg, TxTypeSubscriptionArg]
Union[
LogsSubscriptionArg, # logs, optional filter params
bool, # newPendingTransactions, full_transactions
]
] = None,
) -> HexStr:
if not isinstance(self.w3.provider, PersistentConnectionProvider):
Expand Down
11 changes: 11 additions & 0 deletions web3/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
)
from typing import (
Any,
AsyncIterator,
Dict,
List,
Optional,
Expand Down Expand Up @@ -538,6 +539,16 @@ def __init__(
)
AsyncWeb3.__init__(self, provider, middlewares, modules, external_modules, ens)

# async for w3 in w3.persistent_websocket(provider)
async def __aiter__(self) -> AsyncIterator["_PersistentConnectionWeb3"]:
while True:
try:
yield self
except Exception:
# provider should handle connection / reconnection
continue

# async with w3.persistent_websocket(provider) as w3
async def __aenter__(self) -> "_PersistentConnectionWeb3":
await self.provider.connect()
return self
Expand Down