Skip to content

Commit

Permalink
tests: move to pytest-lazy-fixtures dep & mqtt update (#865)
Browse files Browse the repository at this point in the history
This PR addresses failing tests due to pytest-lazy-fixture incompatibility with pytest v8, and breaking changes in paho.mqtt v2.

Replaces pytest-lazy-fixture with pytest-lazy-fixtures, and adjusts use of MQTT client to conform to new API.
  • Loading branch information
peterschutt committed Feb 27, 2024
1 parent 8d26ee5 commit 275e803
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 46 deletions.
12 changes: 6 additions & 6 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ Changelog = "https://apscheduler.readthedocs.io/en/master/versionhistory.html"
asyncpg = ["asyncpg >= 0.20"]
cbor = ["cbor2 >= 5.0"]
mongodb = ["pymongo >= 4"]
mqtt = ["paho-mqtt >= 1.5"]
mqtt = ["paho-mqtt >= 2.0"]
redis = ["redis >= 4.4.0"]
sqlalchemy = ["sqlalchemy >= 2.0.19"]
test = [
Expand All @@ -54,13 +54,13 @@ test = [
"anyio[trio]",
"asyncmy >= 0.2.5; python_implementation == 'CPython'",
"coverage >= 7",
"paho-mqtt >= 1.5",
"paho-mqtt >= 2.0",
"psycopg",
"pymongo >= 4",
"pymysql[rsa]",
"PySide6 >= 6.6; python_implementation == 'CPython'",
"pytest ~= 7.4",
"pytest-lazy-fixture",
"pytest >= 8.0",
"pytest-lazy-fixtures",
"pytest-mock",
"time-machine >= 2.13.0; python_implementation == 'CPython'",
"uwsgi; python_implementation == 'CPython' and platform_system == 'Linux'",
Expand Down Expand Up @@ -91,7 +91,7 @@ source = ["apscheduler"]
show_missing = true

[tool.ruff]
select = [
lint.select = [
"ASYNC", # flake8-async
"E", "F", "W", # default Flake8
"G", # flake8-logging-format
Expand All @@ -103,7 +103,7 @@ select = [
]
src = ["src"]

[tool.ruff.isort]
[tool.ruff.lint.isort]
"required-imports" = ["from __future__ import annotations"]

[tool.mypy]
Expand Down
34 changes: 10 additions & 24 deletions src/apscheduler/eventbrokers/mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@
from anyio import to_thread
from anyio.from_thread import BlockingPortal
from paho.mqtt.client import Client, MQTTMessage
from paho.mqtt.properties import Properties
from paho.mqtt.reasoncodes import ReasonCodes

from .._events import Event
from .base import BaseExternalEventBroker
Expand Down Expand Up @@ -58,41 +56,29 @@ async def start(self, exit_stack: AsyncExitStack, logger: Logger) -> None:
await to_thread.run_sync(self._ready_future.result, 10)
exit_stack.push_async_callback(to_thread.run_sync, self.client.disconnect)

def _on_connect(
self,
client: Client,
userdata: Any,
flags: dict[str, Any],
rc: ReasonCodes | int,
properties: Properties | None = None,
) -> None:
def _on_connect(self, client: Client, *_: Any) -> None:
self._logger.info("%s: Connected", self.__class__.__name__)
try:
client.subscribe(self.topic, qos=self.subscribe_qos)
except Exception as exc:
self._ready_future.set_exception(exc)
raise

def _on_connect_fail(self, client: Client, userdata: Any) -> None:
def _on_connect_fail(self, *_: Any) -> None:
exc = sys.exc_info()[1]
self._logger.error("%s: Connection failed (%s)", self.__class__.__name__, exc)

def _on_disconnect(
self,
client: Client,
userdata: Any,
rc: ReasonCodes | int,
properties: Properties | None = None,
) -> None:
self._logger.error("%s: Disconnected (code: %s)", self.__class__.__name__, rc)

def _on_subscribe(
self, client: Client, userdata: Any, mid: int, granted_qos: list[int]
) -> None:
def _on_disconnect(self, *args: Any) -> None:
reason_code = args[3] if len(args) == 5 else args[2]
self._logger.error(
"%s: Disconnected (code: %s)", self.__class__.__name__, reason_code
)

def _on_subscribe(self, *_: Any) -> None:
self._logger.info("%s: Subscribed", self.__class__.__name__)
self._ready_future.set_result(None)

def _on_message(self, client: Client, userdata: Any, msg: MQTTMessage) -> None:
def _on_message(self, _: Any, __: Any, msg: MQTTMessage) -> None:
event = self.reconstitute_event(msg.payload)
if event is not None:
self._portal.call(self.publish_local, event)
Expand Down
40 changes: 24 additions & 16 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

import pytest
from _pytest.fixtures import SubRequest
from pytest_lazyfixture import lazy_fixture
from pytest_lazy_fixtures import lf

from apscheduler.abc import DataStore, EventBroker, Serializer
from apscheduler.datastores.memory import MemoryDataStore
Expand Down Expand Up @@ -66,13 +66,21 @@ async def redis_broker(serializer: Serializer) -> EventBroker:
return broker


@pytest.fixture
def mqtt_broker(serializer: Serializer) -> EventBroker:
@pytest.fixture(
params=[
pytest.param(1, id="callback_api_v1"),
pytest.param(2, id="callback_api_v2"),
]
)
def mqtt_broker(request: SubRequest, serializer: Serializer) -> EventBroker:
from paho.mqtt.client import Client
from paho.mqtt.enums import CallbackAPIVersion

from apscheduler.eventbrokers.mqtt import MQTTEventBroker

return MQTTEventBroker(Client(), serializer=serializer)
callback_api_version = CallbackAPIVersion(request.param)

return MQTTEventBroker(Client(callback_api_version), serializer=serializer)


@pytest.fixture
Expand All @@ -88,19 +96,19 @@ async def asyncpg_broker(serializer: Serializer) -> EventBroker:

@pytest.fixture(
params=[
pytest.param(lazy_fixture("local_broker"), id="local"),
pytest.param(lf("local_broker"), id="local"),
pytest.param(
lazy_fixture("asyncpg_broker"),
lf("asyncpg_broker"),
id="asyncpg",
marks=[pytest.mark.external_service],
),
pytest.param(
lazy_fixture("redis_broker"),
lf("redis_broker"),
id="redis",
marks=[pytest.mark.external_service],
),
pytest.param(
lazy_fixture("mqtt_broker"), id="mqtt", marks=[pytest.mark.external_service]
lf("mqtt_broker"), id="mqtt", marks=[pytest.mark.external_service]
),
]
)
Expand Down Expand Up @@ -255,40 +263,40 @@ async def asyncmy_store() -> AsyncGenerator[DataStore, None]:
@pytest.fixture(
params=[
pytest.param(
lazy_fixture("memory_store"),
lf("memory_store"),
id="memory",
),
pytest.param(
lazy_fixture("aiosqlite_store"),
lf("aiosqlite_store"),
id="aiosqlite",
),
pytest.param(
lazy_fixture("asyncpg_store"),
lf("asyncpg_store"),
id="asyncpg",
marks=[pytest.mark.external_service],
),
pytest.param(
lazy_fixture("asyncmy_store"),
lf("asyncmy_store"),
id="asyncmy",
marks=[pytest.mark.external_service],
),
pytest.param(
lazy_fixture("psycopg_async_store"),
lf("psycopg_async_store"),
id="psycopg_async",
marks=[pytest.mark.external_service],
),
pytest.param(
lazy_fixture("psycopg_sync_store"),
lf("psycopg_sync_store"),
id="psycopg_sync",
marks=[pytest.mark.external_service],
),
pytest.param(
lazy_fixture("pymysql_store"),
lf("pymysql_store"),
id="pymysql",
marks=[pytest.mark.external_service],
),
pytest.param(
lazy_fixture("mongodb_store"),
lf("mongodb_store"),
id="mongodb",
marks=[pytest.mark.external_service],
),
Expand Down

0 comments on commit 275e803

Please sign in to comment.