From d092a00a60c7da8cecb0732ef7482c9c2101fd1b Mon Sep 17 00:00:00 2001 From: Lev Vereshchagin Date: Thu, 6 Nov 2025 10:46:08 +0300 Subject: [PATCH 1/5] Add Pydantic model factory support to STOMP broker tests --- .../faststream-stomp/test_faststream_stomp/test_main.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/packages/faststream-stomp/test_faststream_stomp/test_main.py b/packages/faststream-stomp/test_faststream_stomp/test_main.py index a4f7e173..d5b45633 100644 --- a/packages/faststream-stomp/test_faststream_stomp/test_main.py +++ b/packages/faststream-stomp/test_faststream_stomp/test_main.py @@ -1,5 +1,6 @@ import faker import faststream_stomp +import pydantic import pytest import stompman from faststream import FastStream @@ -8,6 +9,7 @@ from faststream_stomp.prometheus import StompPrometheusMiddleware from opentelemetry.sdk.metrics import MeterProvider from opentelemetry.sdk.trace import TracerProvider +from polyfactory.factories.pydantic_factory import ModelFactory from prometheus_client import CollectorRegistry from test_stompman.conftest import build_dataclass @@ -24,6 +26,10 @@ def broker(fake_connection_params: stompman.ConnectionParameters) -> faststream_ return faststream_stomp.StompBroker(stompman.Client([fake_connection_params])) +class SomePydanticModel(pydantic.BaseModel): + foo: str + + async def test_testing(faker: faker.Faker, broker: faststream_stomp.StompBroker) -> None: expected_body, first_destination, second_destination, third_destination, correlation_id = ( faker.pystr(), @@ -55,6 +61,8 @@ def second_handle(body: str) -> None: assert third_publisher.mock third_publisher.mock.assert_called_once_with(expected_body) + await br.publish(ModelFactory.create_factory(SomePydanticModel).build(), faker.pystr()) + class TestNotImplemented: async def test_broker_request(self, faker: faker.Faker, broker: faststream_stomp.StompBroker) -> None: From 31272791fd9cfb4bcd6335899203ddae9ba8cb0a Mon Sep 17 00:00:00 2001 From: Lev Vereshchagin Date: Thu, 6 Nov 2025 10:47:07 +0300 Subject: [PATCH 2/5] Refactor test structure and add pydantic model publishing test --- .../test_faststream_stomp/test_main.py | 74 ++++++++++--------- 1 file changed, 38 insertions(+), 36 deletions(-) diff --git a/packages/faststream-stomp/test_faststream_stomp/test_main.py b/packages/faststream-stomp/test_faststream_stomp/test_main.py index d5b45633..f6c6cb8a 100644 --- a/packages/faststream-stomp/test_faststream_stomp/test_main.py +++ b/packages/faststream-stomp/test_faststream_stomp/test_main.py @@ -26,42 +26,44 @@ def broker(fake_connection_params: stompman.ConnectionParameters) -> faststream_ return faststream_stomp.StompBroker(stompman.Client([fake_connection_params])) -class SomePydanticModel(pydantic.BaseModel): - foo: str - - -async def test_testing(faker: faker.Faker, broker: faststream_stomp.StompBroker) -> None: - expected_body, first_destination, second_destination, third_destination, correlation_id = ( - faker.pystr(), - faker.pystr(), - faker.pystr(), - faker.pystr(), - gen_cor_id(), - ) - second_publisher = broker.publisher(second_destination) - third_publisher = broker.publisher(third_destination) - - @broker.subscriber(first_destination) - @second_publisher - @third_publisher - def first_handle(body: str) -> str: - assert body == expected_body - return body - - @broker.subscriber(second_destination) - def second_handle(body: str) -> None: - assert body == expected_body - - async with faststream_stomp.TestStompBroker(broker) as br: - await br.publish(expected_body, first_destination, correlation_id=correlation_id) - assert first_handle.mock - first_handle.mock.assert_called_once_with(expected_body) - assert second_publisher.mock - second_publisher.mock.assert_called_once_with(expected_body) - assert third_publisher.mock - third_publisher.mock.assert_called_once_with(expected_body) - - await br.publish(ModelFactory.create_factory(SomePydanticModel).build(), faker.pystr()) +class TestTesting: + async def test_integration(self, faker: faker.Faker, broker: faststream_stomp.StompBroker) -> None: + expected_body, first_destination, second_destination, third_destination, correlation_id = ( + faker.pystr(), + faker.pystr(), + faker.pystr(), + faker.pystr(), + gen_cor_id(), + ) + second_publisher = broker.publisher(second_destination) + third_publisher = broker.publisher(third_destination) + + @broker.subscriber(first_destination) + @second_publisher + @third_publisher + def first_handle(body: str) -> str: + assert body == expected_body + return body + + @broker.subscriber(second_destination) + def second_handle(body: str) -> None: + assert body == expected_body + + async with faststream_stomp.TestStompBroker(broker) as br: + await br.publish(expected_body, first_destination, correlation_id=correlation_id) + assert first_handle.mock + first_handle.mock.assert_called_once_with(expected_body) + assert second_publisher.mock + second_publisher.mock.assert_called_once_with(expected_body) + assert third_publisher.mock + third_publisher.mock.assert_called_once_with(expected_body) + + async def test_publish_pydantic(self, faker: faker.Faker, broker: faststream_stomp.StompBroker) -> None: + class SomePydanticModel(pydantic.BaseModel): + foo: str + + async with faststream_stomp.TestStompBroker(broker) as br: + await br.publish(ModelFactory.create_factory(SomePydanticModel).build(), faker.pystr()) class TestNotImplemented: From c3c98cbe01861744d7bb55c866a4a42187f476b1 Mon Sep 17 00:00:00 2001 From: Lev Vereshchagin Date: Thu, 6 Nov 2025 10:55:12 +0300 Subject: [PATCH 3/5] Add Pydantic model publishing test --- .../test_faststream_stomp/test_integration.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/packages/faststream-stomp/test_faststream_stomp/test_integration.py b/packages/faststream-stomp/test_faststream_stomp/test_integration.py index 2bda3a16..0a18307a 100644 --- a/packages/faststream-stomp/test_faststream_stomp/test_integration.py +++ b/packages/faststream-stomp/test_faststream_stomp/test_integration.py @@ -8,6 +8,7 @@ import faker import faststream_stomp +import pydantic import pytest import stompman from asgi_lifespan import LifespanManager @@ -17,6 +18,7 @@ from faststream.message import gen_cor_id from faststream_stomp.models import StompStreamMessage from faststream_stomp.router import StompRoutePublisher +from polyfactory.factories.pydantic_factory import ModelFactory if TYPE_CHECKING: from faststream_stomp.broker import StompBroker @@ -235,3 +237,11 @@ async def test_broker_connect_twice(broker: faststream_stomp.StompBroker) -> Non app = AsgiFastStream(broker, on_startup=[broker.connect]) async with LifespanManager(app): pass + + +async def test_publish_pydantic(broker: faststream_stomp.StompBroker) -> None: + class SomePydanticModel(pydantic.BaseModel): + foo: str + + async with broker: + await broker.publish(ModelFactory.create_factory(SomePydanticModel).build(), faker.pystr()) From 8279f5da9bce93cb7e3c76626d8c2b161731e542 Mon Sep 17 00:00:00 2001 From: Lev Vereshchagin Date: Thu, 6 Nov 2025 10:55:24 +0300 Subject: [PATCH 4/5] Add faker fixture to pydantic publish test --- .../faststream-stomp/test_faststream_stomp/test_integration.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/faststream-stomp/test_faststream_stomp/test_integration.py b/packages/faststream-stomp/test_faststream_stomp/test_integration.py index 0a18307a..8ebc36d9 100644 --- a/packages/faststream-stomp/test_faststream_stomp/test_integration.py +++ b/packages/faststream-stomp/test_faststream_stomp/test_integration.py @@ -239,7 +239,7 @@ async def test_broker_connect_twice(broker: faststream_stomp.StompBroker) -> Non pass -async def test_publish_pydantic(broker: faststream_stomp.StompBroker) -> None: +async def test_publish_pydantic(faker: faker.Faker, broker: faststream_stomp.StompBroker) -> None: class SomePydanticModel(pydantic.BaseModel): foo: str From 5302da9dacaf76b24fb0191f6df2787eb88e4b09 Mon Sep 17 00:00:00 2001 From: Lev Vereshchagin Date: Thu, 6 Nov 2025 11:04:50 +0300 Subject: [PATCH 5/5] Refactor STOMP broker to use shared FastDepends serializer configuration --- packages/faststream-stomp/faststream_stomp/broker.py | 5 +++-- packages/faststream-stomp/faststream_stomp/publisher.py | 6 ++++-- packages/faststream-stomp/faststream_stomp/testing.py | 2 +- 3 files changed, 8 insertions(+), 5 deletions(-) diff --git a/packages/faststream-stomp/faststream_stomp/broker.py b/packages/faststream-stomp/faststream_stomp/broker.py index d43c5010..907d97b7 100644 --- a/packages/faststream-stomp/faststream_stomp/broker.py +++ b/packages/faststream-stomp/faststream_stomp/broker.py @@ -106,6 +106,7 @@ def __init__( description: str | None = None, tags: Iterable[Tag | TagDict] = (), ) -> None: + fd_config = FastDependsConfig(use_fastdepends=apply_types) broker_config = BrokerConfigWithStompClient( broker_middlewares=middlewares, # type: ignore[arg-type] broker_parser=parser, @@ -115,11 +116,11 @@ def __init__( log_level=log_level, default_storage_cls=StompParamsStorage, # type: ignore[type-abstract] ), - fd_config=FastDependsConfig(use_fastdepends=apply_types), + fd_config=fd_config, broker_dependencies=dependencies, graceful_timeout=graceful_timeout, extra_context={"broker": self}, - producer=StompProducer(client), + producer=StompProducer(client=client, serializer=fd_config._serializer), client=client, ) specification = BrokerSpec( diff --git a/packages/faststream-stomp/faststream_stomp/publisher.py b/packages/faststream-stomp/faststream_stomp/publisher.py index 7c8115f5..0269fe01 100644 --- a/packages/faststream-stomp/faststream_stomp/publisher.py +++ b/packages/faststream-stomp/faststream_stomp/publisher.py @@ -2,6 +2,7 @@ from typing import Any, NoReturn import stompman +from fast_depends.library.serializer import SerializerProto from faststream import PublishCommand, PublishType from faststream._internal.basic_types import SendableMessage from faststream._internal.configs import BrokerConfig @@ -23,11 +24,12 @@ class StompProducer(ProducerProto[StompPublishCommand]): _parser: AsyncCallable _decoder: AsyncCallable - def __init__(self, client: stompman.Client) -> None: + def __init__(self, *, client: stompman.Client, serializer: SerializerProto | None) -> None: self.client = client + self.serializer = serializer async def publish(self, cmd: StompPublishCommand) -> None: - body, content_type = encode_message(cmd.body, serializer=None) + body, content_type = encode_message(cmd.body, serializer=self.serializer) all_headers = cmd.headers.copy() if cmd.headers else {} if cmd.correlation_id: all_headers["correlation-id"] = cmd.correlation_id diff --git a/packages/faststream-stomp/faststream_stomp/testing.py b/packages/faststream-stomp/faststream_stomp/testing.py index efbe5403..f9da31ba 100644 --- a/packages/faststream-stomp/faststream_stomp/testing.py +++ b/packages/faststream-stomp/faststream_stomp/testing.py @@ -62,7 +62,7 @@ def __init__(self, broker: StompBroker) -> None: self.broker = broker async def publish(self, cmd: StompPublishCommand) -> None: - body, content_type = encode_message(cmd.body, serializer=None) + body, content_type = encode_message(cmd.body, serializer=self.broker.config.fd_config._serializer) all_headers: MessageHeaders = (cmd.headers.copy() if cmd.headers else {}) | { # type: ignore[assignment] "destination": cmd.destination, "message-id": str(uuid.uuid4()),