diff --git a/faststream/confluent/testing.py b/faststream/confluent/testing.py index 9420ff3aa5..1a00e829c2 100644 --- a/faststream/confluent/testing.py +++ b/faststream/confluent/testing.py @@ -98,9 +98,11 @@ async def publish( # type: ignore[override] reply_to=reply_to, ) + return_value = None + for handler in self.broker._subscribers.values(): # pragma: no branch if topic in handler.topics: - return await call_handler( + handle_value = await call_handler( handler=handler, message=[incoming] if isinstance(handler, AsyncAPIBatchSubscriber) @@ -110,7 +112,9 @@ async def publish( # type: ignore[override] raise_timeout=raise_timeout, ) - return None + return_value = return_value or handle_value + + return return_value async def publish_batch( self, diff --git a/faststream/kafka/testing.py b/faststream/kafka/testing.py index fd8b520332..5abe59cf97 100755 --- a/faststream/kafka/testing.py +++ b/faststream/kafka/testing.py @@ -105,18 +105,17 @@ async def publish( # type: ignore[override] reply_to=reply_to, ) - for handler in self.broker._subscribers.values(): # pragma: no branch - call: bool = False - - for p in handler.partitions: - if p.topic == topic and (partition is None or p.partition == partition): - call = True + return_value = None - if not call and topic in handler.topics: - call = True - - if call: - return await call_handler( + for handler in self.broker._subscribers.values(): # pragma: no branch + if ( + any( + p.topic == topic and (partition is None or p.partition == partition) + for p in handler.partitions + ) + or topic in handler.topics + ): + handle_value = await call_handler( handler=handler, message=[incoming] if isinstance(handler, AsyncAPIBatchSubscriber) @@ -126,7 +125,9 @@ async def publish( # type: ignore[override] raise_timeout=raise_timeout, ) - return None + return_value = return_value or handle_value + + return return_value async def publish_batch( self, diff --git a/scripts/set_variables.sh b/scripts/set_variables.sh index e2c0c9531c..ef662336bb 100755 --- a/scripts/set_variables.sh +++ b/scripts/set_variables.sh @@ -9,7 +9,7 @@ echo AIRT_PROJECT variable set to $AIRT_PROJECT export UID=$(id -u) export GID=$(id -g) -export DOCKER_COMPOSE_PROJECT="${USER}-faststream" +export DOCKER_COMPOSE_PROJECT="${USER//./_}-faststream" echo DOCKER_COMPOSE_PROJECT variable set to $DOCKER_COMPOSE_PROJECT export KAFKA_HOSTNAME="${DOCKER_COMPOSE_PROJECT}-kafka-1" echo KAFKA_HOSTNAME variable set to $KAFKA_HOSTNAME diff --git a/tests/brokers/confluent/test_test_client.py b/tests/brokers/confluent/test_test_client.py index b8e232802f..53c10b7d20 100644 --- a/tests/brokers/confluent/test_test_client.py +++ b/tests/brokers/confluent/test_test_client.py @@ -134,3 +134,71 @@ async def h2(): ... await h2.wait_call(10) assert len(routes) == 2 + + async def test_multiple_subscribers_different_groups( + self, + queue: str, + test_broker: KafkaBroker, + ): + @test_broker.subscriber(queue, group_id="group1") + async def subscriber1(): ... + + @test_broker.subscriber(queue, group_id="group2") + async def subscriber2(): ... + + await test_broker.start() + await test_broker.publish("", queue) + + assert subscriber1.mock.call_count == 1 + assert subscriber2.mock.call_count == 1 + + async def test_multiple_subscribers_same_group( + self, + queue: str, + test_broker: KafkaBroker, + ): + @test_broker.subscriber(queue, group_id="group1") + async def subscriber1(): ... + + @test_broker.subscriber(queue, group_id="group1") + async def subscriber2(): ... + + await test_broker.start() + await test_broker.publish("", queue) + + assert subscriber1.mock.call_count == 1 + assert subscriber2.mock.call_count == 0 + + async def test_multiple_batch_subscriber_with_different_group( + self, + test_broker: KafkaBroker, + queue: str, + ): + @test_broker.subscriber(queue, batch=True, group_id="group1") + async def subscriber1(): ... + + @test_broker.subscriber(queue, batch=True, group_id="group2") + async def subscriber2(): ... + + await test_broker.start() + await test_broker.publish("", queue) + + assert subscriber1.mock.call_count == 1 + assert subscriber2.mock.call_count == 1 + + async def test_multiple_batch_subscriber_with_same_group( + self, + test_broker: KafkaBroker, + queue: str, + ): + @test_broker.subscriber(queue, batch=True, group_id="group1") + async def subscriber1(): ... + + @test_broker.subscriber(queue, batch=True, group_id="group1") + async def subscriber2(): ... + + await test_broker.start() + await test_broker.publish("", queue) + + assert subscriber1.mock.call_count == 1 + assert subscriber2.mock.call_count == 0 diff --git a/tests/brokers/kafka/test_test_client.py b/tests/brokers/kafka/test_test_client.py index a89ecff707..b1c01b8ff0 100644 --- a/tests/brokers/kafka/test_test_client.py +++ b/tests/brokers/kafka/test_test_client.py @@ -182,3 +182,71 @@ async def h2(): ... await h2.wait_call(3) assert len(routes) == 2 + + async def test_multiple_subscribers_different_groups( + self, + queue: str, + test_broker: KafkaBroker, + ): + @test_broker.subscriber(queue, group_id="group1") + async def subscriber1(): ... + + @test_broker.subscriber(queue, group_id="group2") + async def subscriber2(): ... + + await test_broker.start() + await test_broker.publish("", queue) + + assert subscriber1.mock.call_count == 1 + assert subscriber2.mock.call_count == 1 + + async def test_multiple_subscribers_same_group( + self, + queue: str, + test_broker: KafkaBroker, + ): + @test_broker.subscriber(queue, group_id="group1") + async def subscriber1(): ... + + @test_broker.subscriber(queue, group_id="group1") + async def subscriber2(): ... + + await test_broker.start() + await test_broker.publish("", queue) + + assert subscriber1.mock.call_count == 1 + assert subscriber2.mock.call_count == 0 + + async def test_multiple_batch_subscriber_with_different_group( + self, + queue: str, + test_broker: KafkaBroker, + ): + @test_broker.subscriber(queue, batch=True, group_id="group1") + async def subscriber1(): ... + + @test_broker.subscriber(queue, batch=True, group_id="group2") + async def subscriber2(): ... + + await test_broker.start() + await test_broker.publish("", queue) + + assert subscriber1.mock.call_count == 1 + assert subscriber2.mock.call_count == 1 + + async def test_multiple_batch_subscriber_with_same_group( + self, + queue: str, + test_broker: KafkaBroker, + ): + @test_broker.subscriber(queue, batch=True, group_id="group1") + async def subscriber1(): ... + + @test_broker.subscriber(queue, batch=True, group_id="group1") + async def subscriber2(): ... + + await test_broker.start() + await test_broker.publish("", queue) + + assert subscriber1.mock.call_count == 1 + assert subscriber2.mock.call_count == 0