Skip to content

Commit

Permalink
Handle paho-mqtt v1/v2 compat.
Browse files Browse the repository at this point in the history
Relaxes the signatures of methods on `MQTTEventBroker` that are registered as callbacks on the mqtt client so that they will work as either v1 or v2 client style callbacks.

Adds a tox env to explicitly test against paho-mqtt v1 (`$ tox -e paho-mqtt-v1`).
  • Loading branch information
peterschutt committed Feb 17, 2024
1 parent 0cc3131 commit 97c767e
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 26 deletions.
7 changes: 5 additions & 2 deletions pyproject.toml
Expand Up @@ -54,12 +54,12 @@ test = [
"anyio[trio]",
"asyncmy >= 0.2.5; python_implementation == 'CPython'",
"coverage >= 7",
"paho-mqtt >= 2.0",
"paho-mqtt >= 1.5",
"psycopg",
"pymongo >= 4",
"pymysql[rsa]",
"PySide6 >= 6.6; python_implementation == 'CPython'",
"pytest >= 8.0",
"pytest >= 7.4",
"pytest-lazy-fixtures",
"pytest-mock",
"time-machine >= 2.13.0; python_implementation == 'CPython'",
Expand Down Expand Up @@ -132,4 +132,7 @@ commands = pyright --verifytypes apscheduler
[testenv:docs]
extras = doc
commands = sphinx-build -n docs build/sphinx
[testenv:paho-mqtt-v1]
deps = paho-mqtt~=1.5
"""
28 changes: 7 additions & 21 deletions src/apscheduler/eventbrokers/mqtt.py
Expand Up @@ -10,8 +10,6 @@
from anyio import to_thread
from anyio.from_thread import BlockingPortal
from paho.mqtt.client import Client, MQTTMessage
from paho.mqtt.properties import Properties
from paho.mqtt.reasoncodes import ReasonCodes

from .._events import Event
from .base import BaseExternalEventBroker
Expand Down Expand Up @@ -58,37 +56,25 @@ async def start(self, exit_stack: AsyncExitStack, logger: Logger) -> None:
await to_thread.run_sync(self._ready_future.result, 10)
exit_stack.push_async_callback(to_thread.run_sync, self.client.disconnect)

def _on_connect(
self,
client: Client,
userdata: Any,
flags: dict[str, Any],
rc: ReasonCodes | int,
properties: Properties | None = None,
) -> None:
def _on_connect(self, client: Client, *_: Any) -> None:
self._logger.info("%s: Connected", self.__class__.__name__)
try:
client.subscribe(self.topic, qos=self.subscribe_qos)
except Exception as exc:
self._ready_future.set_exception(exc)
raise

def _on_connect_fail(self, client: Client, userdata: Any) -> None:
def _on_connect_fail(self, *_: Any) -> None:
exc = sys.exc_info()[1]
self._logger.error("%s: Connection failed (%s)", self.__class__.__name__, exc)

def _on_disconnect(
self,
client: Client,
userdata: Any,
rc: ReasonCodes | int,
properties: Properties | None = None,
) -> None:
def _on_disconnect(self, *args: Any) -> None:
# NOTE: paho-mqtt compat: 1.x callbacks receive either 3 (MQTTv3) or 4 (MQTTv5) pos args,
# 2.x style always receive 5.
rc = args[3] if len(args) == 5 else args[2]
self._logger.error("%s: Disconnected (code: %s)", self.__class__.__name__, rc)

def _on_subscribe(
self, client: Client, userdata: Any, mid: int, granted_qos: list[int]
) -> None:
def _on_subscribe(self, *_: Any) -> None:
self._logger.info("%s: Subscribed", self.__class__.__name__)
self._ready_future.set_result(None)

Expand Down
12 changes: 9 additions & 3 deletions tests/conftest.py
Expand Up @@ -4,6 +4,7 @@
import sys
from collections.abc import Generator
from contextlib import AsyncExitStack
from importlib.metadata import version
from logging import Logger
from pathlib import Path
from typing import Any, AsyncGenerator, cast
Expand Down Expand Up @@ -69,11 +70,16 @@ async def redis_broker(serializer: Serializer) -> EventBroker:
@pytest.fixture
def mqtt_broker(serializer: Serializer) -> EventBroker:
from paho.mqtt.client import Client
from paho.mqtt.enums import CallbackAPIVersion

from apscheduler.eventbrokers.mqtt import MQTTEventBroker

return MQTTEventBroker(Client(CallbackAPIVersion.VERSION1), serializer=serializer)
if version("paho-mqtt").startswith("1"):
client = Client()
else:
from paho.mqtt.enums import CallbackAPIVersion

client = Client(CallbackAPIVersion.VERSION2)

return MQTTEventBroker(client, serializer=serializer)


@pytest.fixture
Expand Down

0 comments on commit 97c767e

Please sign in to comment.