Skip to content

Commit

Permalink
Merged changes for #1413
Browse files Browse the repository at this point in the history
  • Loading branch information
sifex committed May 18, 2024
1 parent c555f0f commit a03f4ea
Show file tree
Hide file tree
Showing 6 changed files with 104 additions and 28 deletions.
2 changes: 1 addition & 1 deletion .github/PULL_REQUEST_TEMPLATE.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,5 @@ Please delete options that are not relevant.
- [ ] My changes do not generate any new warnings
- [ ] I have added tests to validate the effectiveness of my fix or the functionality of my new feature
- [ ] Both new and existing unit tests pass successfully on my local environment by running `scripts/test-cov.sh`
- [ ] I have ensured that static analysis tests are passing by running `scripts/static-anaylysis.sh`
- [ ] I have ensured that static analysis tests are passing by running `scripts/static-analysis.sh`
- [ ] I have included code examples to illustrate the modifications
26 changes: 15 additions & 11 deletions faststream/confluent/testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,17 +98,21 @@ async def publish( # type: ignore[override]
reply_to=reply_to,
)

for handler in self.broker._subscribers.values(): # pragma: no branch
if topic in handler.topics:
return await call_handler(
handler=handler,
message=[incoming]
if isinstance(handler, AsyncAPIBatchSubscriber)
else incoming,
rpc=rpc,
rpc_timeout=rpc_timeout,
raise_timeout=raise_timeout,
)
for consumer_group in set(
{sub.group_id for sub in self.broker._subscribers.values()}
):
for handler in self.broker._subscribers.values(): # pragma: no branch
if topic in handler.topics and consumer_group == handler.group_id:
await call_handler(
handler=handler,
message=[incoming]
if isinstance(handler, AsyncAPIBatchSubscriber)
else incoming,
rpc=rpc,
rpc_timeout=rpc_timeout,
raise_timeout=raise_timeout,
)
break

return None

Expand Down
3 changes: 1 addition & 2 deletions faststream/kafka/testing.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from datetime import datetime
from typing import TYPE_CHECKING, Any, Callable, Dict, Optional
from unittest.mock import AsyncMock, MagicMock
from typing import TYPE_CHECKING, Any, Dict, Optional

from aiokafka import ConsumerRecord
from typing_extensions import override
Expand Down
2 changes: 1 addition & 1 deletion scripts/set_variables.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ echo AIRT_PROJECT variable set to $AIRT_PROJECT
export UID=$(id -u)
export GID=$(id -g)

export DOCKER_COMPOSE_PROJECT="${USER}-faststream"
export DOCKER_COMPOSE_PROJECT="${USER//./_}-faststream"
echo DOCKER_COMPOSE_PROJECT variable set to $DOCKER_COMPOSE_PROJECT
export KAFKA_HOSTNAME="${DOCKER_COMPOSE_PROJECT}-kafka-1"
echo KAFKA_HOSTNAME variable set to $KAFKA_HOSTNAME
Expand Down
62 changes: 49 additions & 13 deletions tests/brokers/confluent/test_test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ class TestTestclient(BrokerTestclientTestcase):

@pytest.mark.confluent()
async def test_with_real_testclient(
self,
broker: KafkaBroker,
queue: str,
event: asyncio.Event,
self,
broker: KafkaBroker,
queue: str,
event: asyncio.Event,
):
@broker.subscriber(queue, auto_offset_reset="earliest")
def subscriber(m):
Expand All @@ -34,9 +34,9 @@ def subscriber(m):
assert event.is_set()

async def test_batch_pub_by_default_pub(
self,
test_broker: KafkaBroker,
queue: str,
self,
test_broker: KafkaBroker,
queue: str,
):
@test_broker.subscriber(queue, batch=True, auto_offset_reset="earliest")
async def m():
Expand All @@ -47,9 +47,9 @@ async def m():
m.mock.assert_called_once_with(["hello"])

async def test_batch_pub_by_pub_batch(
self,
test_broker: KafkaBroker,
queue: str,
self,
test_broker: KafkaBroker,
queue: str,
):
@test_broker.subscriber(queue, batch=True, auto_offset_reset="earliest")
async def m():
Expand All @@ -60,9 +60,9 @@ async def m():
m.mock.assert_called_once_with(["hello"])

async def test_batch_publisher_mock(
self,
test_broker: KafkaBroker,
queue: str,
self,
test_broker: KafkaBroker,
queue: str,
):
publisher = test_broker.publisher(queue + "1", batch=True)

Expand Down Expand Up @@ -122,3 +122,39 @@ async def h2(): ...
await h2.wait_call(10)

assert len(routes) == 2

@pytest.mark.confluent()
async def test_multiple_subscribers_different_groups(
self,
queue: str,
test_broker: KafkaBroker,
):
@test_broker.subscriber(queue, group_id="group1")
async def subscriber1(): ...

@test_broker.subscriber(queue, group_id="group2")
async def subscriber2(): ...

await test_broker.start()
await test_broker.publish("", queue)

assert subscriber1.mock.call_count == 1
assert subscriber2.mock.call_count == 1

@pytest.mark.confluent()
async def test_multiple_subscribers_same_group(
self,
queue: str,
test_broker: KafkaBroker,
):
@test_broker.subscriber(queue, group_id="group1")
async def subscriber1(): ...

@test_broker.subscriber(queue, group_id="group1")
async def subscriber2(): ...

await test_broker.start()
await test_broker.publish("", queue)

assert subscriber1.mock.call_count == 1
assert subscriber2.mock.call_count == 0
37 changes: 37 additions & 0 deletions tests/brokers/kafka/test_test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,3 +164,40 @@ async def h2(): ...
await h2.wait_call(3)

assert len(routes) == 2

@pytest.mark.kafka()
async def test_multiple_subscribers_different_groups(
self,
queue: str,
test_broker: KafkaBroker,
):
@test_broker.subscriber(queue, group_id="group1")
async def subscriber1(): ...

@test_broker.subscriber(queue, group_id="group2")
async def subscriber2(): ...

await test_broker.start()
await test_broker.publish("", queue)

assert subscriber1.mock.call_count == 1
assert subscriber2.mock.call_count == 1

@pytest.mark.kafka()
async def test_multiple_subscribers_same_group(
self,
queue: str,
test_broker: KafkaBroker,
):
@test_broker.subscriber(queue, group_id="group1")
async def subscriber1(): ...

@test_broker.subscriber(queue, group_id="group1")
async def subscriber2(): ...

await test_broker.start()
await test_broker.publish("", queue)

assert subscriber1.mock.call_count == 1
assert subscriber2.mock.call_count == 0

0 comments on commit a03f4ea

Please sign in to comment.