diff --git a/docs/api.rst b/docs/api.rst index 66418542..39aaa5e9 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -39,6 +39,7 @@ Event brokers .. autoclass:: apscheduler.abc.Subscription .. autoclass:: apscheduler.eventbrokers.local.LocalEventBroker .. autoclass:: apscheduler.eventbrokers.asyncpg.AsyncpgEventBroker +.. autoclass:: apscheduler.eventbrokers.psycopg.PsycopgEventBroker .. autoclass:: apscheduler.eventbrokers.mqtt.MQTTEventBroker .. autoclass:: apscheduler.eventbrokers.redis.RedisEventBroker diff --git a/docs/versionhistory.rst b/docs/versionhistory.rst index b00afc86..b132c418 100644 --- a/docs/versionhistory.rst +++ b/docs/versionhistory.rst @@ -4,6 +4,10 @@ Version history To find out how to migrate your application from a previous version of APScheduler, see the :doc:`migration section `. +**UNRELEASED** + +- Added the ``psycopg`` event broker + **4.0.0a5** - **BREAKING** Added the ``cleanup()`` scheduler method and a configuration option diff --git a/src/apscheduler/eventbrokers/psycopg.py b/src/apscheduler/eventbrokers/psycopg.py new file mode 100644 index 00000000..9cfefc27 --- /dev/null +++ b/src/apscheduler/eventbrokers/psycopg.py @@ -0,0 +1,174 @@ +from __future__ import annotations + +from collections.abc import AsyncGenerator, Mapping +from contextlib import AsyncExitStack, asynccontextmanager +from logging import Logger +from typing import TYPE_CHECKING, Any, NoReturn +from urllib.parse import urlunparse + +import attrs +from anyio import ( + EndOfStream, + create_memory_object_stream, + move_on_after, +) +from anyio.abc import TaskStatus +from anyio.streams.memory import MemoryObjectSendStream +from attr.validators import instance_of +from psycopg import AsyncConnection, InterfaceError + +from .._events import Event +from .._exceptions import SerializationError +from .._validators import positive_number +from .base import BaseExternalEventBroker + +if TYPE_CHECKING: + from sqlalchemy.ext.asyncio import AsyncEngine + + +def convert_options(value: Mapping[str, Any]) -> dict[str, Any]: + return dict(value, autocommit=True) + + +@attrs.define(eq=False) +class PsycopgEventBroker(BaseExternalEventBroker): + """ + An asynchronous, psycopg_ based event broker that uses a PostgreSQL server to + broadcast events using its ``NOTIFY`` mechanism. + + .. psycopg: https://pypi.org/project/psycopg/ + + :param conninfo: a libpq connection string (e.g. + ``postgres://user:pass@host:port/dbname``) + :param channel: the ``NOTIFY`` channel to use + :param max_idle_time: maximum time (in seconds) to let the connection go idle, + before sending a ``SELECT 1`` query to prevent a connection timeout + """ + + conninfo: str = attrs.field(validator=instance_of(str)) + options: Mapping[str, Any] = attrs.field( + factory=dict, converter=convert_options, validator=instance_of(Mapping) + ) + channel: str = attrs.field( + kw_only=True, default="apscheduler", validator=instance_of(str) + ) + max_idle_time: float = attrs.field( + kw_only=True, default=10, validator=[instance_of((int, float)), positive_number] + ) + + _send: MemoryObjectSendStream[str] = attrs.field(init=False) + + @classmethod + def from_async_sqla_engine( + cls, + engine: AsyncEngine, + options: Mapping[str, Any] | None = None, + **kwargs: Any, + ) -> PsycopgEventBroker: + """ + Create a new psycopg event broker from a SQLAlchemy engine. + + The engine will only be used to create the appropriate options for + :meth:`psycopg.AsyncConnection.connect`. + + :param engine: an asynchronous SQLAlchemy engine using asyncpg as the driver + :type engine: ~sqlalchemy.ext.asyncio.AsyncEngine + :param options: extra keyword arguments passed to :func:`asyncpg.connect` (will + override any automatically generated arguments based on the engine) + :param kwargs: keyword arguments to pass to the initializer of this class + :return: the newly created event broker + + """ + if engine.dialect.driver != "psycopg": + raise ValueError( + f'The driver in the engine must be "psycopg" (current: ' + f"{engine.dialect.driver})" + ) + + conninfo = urlunparse( + [ + "postgres", + engine.url.username, + engine.url.password, + engine.url.host, + engine.url.database, + ] + ) + opts = dict(options, autocommit=True) + return cls(conninfo, opts, **kwargs) + + @property + def _temporary_failure_exceptions(self) -> tuple[type[Exception], ...]: + return OSError, InterfaceError + + @asynccontextmanager + async def _connect(self) -> AsyncGenerator[AsyncConnection, None]: + async for attempt in self._retry(): + with attempt: + conn = await AsyncConnection.connect(self.conninfo, **self.options) + try: + yield conn + finally: + await conn.close() + + async def start(self, exit_stack: AsyncExitStack, logger: Logger) -> None: + await super().start(exit_stack, logger) + await self._task_group.start(self._listen_notifications) + exit_stack.callback(self._task_group.cancel_scope.cancel) + self._send = await self._task_group.start(self._publish_notifications) + await exit_stack.enter_async_context(self._send) + + async def _listen_notifications(self, *, task_status: TaskStatus[None]) -> None: + task_started_sent = False + while True: + async with self._connect() as conn: + try: + await conn.execute(f"LISTEN {self.channel}") + + if not task_started_sent: + task_status.started() + task_started_sent = True + + self._logger.debug("Listen connection established") + async for notify in conn.notifies(): + event = self.reconstitute_event_str(notify.payload) + await self.publish_local(event) + except InterfaceError as exc: + self._logger.error("Connection error: %s", exc) + + async def _publish_notifications( + self, *, task_status: TaskStatus[MemoryObjectSendStream[str]] + ) -> NoReturn: + send, receive = create_memory_object_stream[str](100) + task_started_sent = False + with receive: + while True: + async with self._connect() as conn: + if not task_started_sent: + task_status.started(send) + task_started_sent = True + + self._logger.debug("Publish connection established") + notification: str | None = None + while True: + with move_on_after(self.max_idle_time): + try: + notification = await receive.receive() + except EndOfStream: + return + + if notification: + await conn.execute( + "SELECT pg_notify(%t, %t)", [self.channel, notification] + ) + else: + await conn.execute("SELECT 1") + + async def publish(self, event: Event) -> None: + notification = self.generate_notification_str(event) + if len(notification) > 7999: + raise SerializationError( + "Serialized event object exceeds 7999 bytes in size" + ) + + await self._send.send(notification) diff --git a/tests/conftest.py b/tests/conftest.py index efcacd6a..5eb89de8 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -86,6 +86,17 @@ async def asyncpg_broker(serializer: Serializer) -> EventBroker: return broker +@pytest.fixture +async def psycopg_broker(serializer: Serializer) -> EventBroker: + pytest.importorskip("psycopg", reason="psycopg is not installed") + from apscheduler.eventbrokers.psycopg import PsycopgEventBroker + + broker = PsycopgEventBroker( + "postgres://postgres:secret@localhost:5432/testdb", serializer=serializer + ) + return broker + + @pytest.fixture( params=[ pytest.param(lf("local_broker"), id="local"), @@ -94,6 +105,11 @@ async def asyncpg_broker(serializer: Serializer) -> EventBroker: id="asyncpg", marks=[pytest.mark.external_service], ), + pytest.param( + lf("psycopg_broker"), + id="psycopg", + marks=[pytest.mark.external_service], + ), pytest.param( lf("redis_broker"), id="redis",