Skip to content

Commit

Permalink
Commit for #1413
Browse files Browse the repository at this point in the history
  • Loading branch information
sifex committed May 18, 2024
1 parent ed270ad commit 0d760ce
Show file tree
Hide file tree
Showing 6 changed files with 131 additions and 23 deletions.
2 changes: 1 addition & 1 deletion .github/PULL_REQUEST_TEMPLATE.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,5 @@ Please delete options that are not relevant.
- [ ] My changes do not generate any new warnings
- [ ] I have added tests to validate the effectiveness of my fix or the functionality of my new feature
- [ ] Both new and existing unit tests pass successfully on my local environment by running `scripts/test-cov.sh`
- [ ] I have ensured that static analysis tests are passing by running `scripts/static-anaylysis.sh`
- [ ] I have ensured that static analysis tests are passing by running `scripts/static-analysis.sh`
- [ ] I have included code examples to illustrate the modifications
23 changes: 18 additions & 5 deletions faststream/confluent/testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from faststream.broker.message import encode_message, gen_cor_id
from faststream.confluent.broker import KafkaBroker
from faststream.confluent.client import TopicPartition
from faststream.confluent.publisher.asyncapi import AsyncAPIBatchPublisher
from faststream.confluent.publisher.producer import AsyncConfluentFastProducer
from faststream.confluent.subscriber.asyncapi import AsyncAPIBatchSubscriber
Expand All @@ -30,10 +31,17 @@ def create_publisher_fake_subscriber(
broker: KafkaBroker,
publisher: "AsyncAPIPublisher[Any]",
) -> "HandlerCallWrapper[Any, Any, Any]":
sub = broker.subscriber( # type: ignore[call-overload,misc]
publisher.topic,
batch=isinstance(publisher, AsyncAPIBatchPublisher),
)
if publisher.partition:
tp = TopicPartition(topic=publisher.topic, partition=publisher.partition)
sub = broker.subscriber(
partitions=[tp],
batch=isinstance(publisher, AsyncAPIBatchPublisher),
)
else:
sub = broker.subscriber(
publisher.topic,
batch=isinstance(publisher, AsyncAPIBatchPublisher),
)

if not sub.calls:

Expand Down Expand Up @@ -93,7 +101,12 @@ async def publish( # type: ignore[override]
)

for handler in self.broker._subscribers.values(): # pragma: no branch
if topic in handler.topics:
call: bool = False

if not call and topic in handler.topics:
call = True

if call:
return await call_handler(
handler=handler,
message=[incoming]
Expand Down
28 changes: 25 additions & 3 deletions faststream/kafka/testing.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from datetime import datetime
from typing import TYPE_CHECKING, Any, Dict, Optional
from typing import TYPE_CHECKING, Any, Callable, Dict, Optional
from unittest.mock import AsyncMock, MagicMock

from aiokafka import ConsumerRecord
from typing_extensions import override
Expand All @@ -23,8 +24,13 @@ class TestKafkaBroker(TestBroker[KafkaBroker]):
"""A class to test Kafka brokers."""

@staticmethod
async def _fake_connect(broker: KafkaBroker, *args: Any, **kwargs: Any) -> None:
async def _fake_connect( # type: ignore[override]
broker: KafkaBroker,
*args: Any,
**kwargs: Any,
) -> Callable[..., AsyncMock]:
broker._producer = FakeProducer(broker)
return _fake_connection

@staticmethod
def create_publisher_fake_subscriber(
Expand Down Expand Up @@ -80,6 +86,8 @@ async def publish( # type: ignore[override]
raise_timeout: bool = False,
) -> Optional[Any]:
"""Publish a message to the Kafka broker."""
correlation_id = correlation_id or gen_cor_id()

incoming = build_message(
message=message,
topic=topic,
Expand All @@ -92,7 +100,12 @@ async def publish( # type: ignore[override]
)

for handler in self.broker._subscribers.values(): # pragma: no branch
if topic in handler.topics:
call: bool = False

if not call and topic in handler.topics:
call = True

if call:
return await call_handler(
handler=handler,
message=[incoming]
Expand All @@ -116,6 +129,8 @@ async def publish_batch(
correlation_id: Optional[str] = None,
) -> None:
"""Publish a batch of messages to the Kafka broker."""
correlation_id = correlation_id or gen_cor_id()

for handler in self.broker._subscribers.values(): # pragma: no branch
if topic in handler.topics:
messages = (
Expand Down Expand Up @@ -184,3 +199,10 @@ def build_message(
offset=0,
headers=[(i, j.encode()) for i, j in headers.items()],
)


def _fake_connection(*args: Any, **kwargs: Any) -> AsyncMock:
mock = AsyncMock()
mock.subscribe = MagicMock
mock.assign = MagicMock
return mock
2 changes: 1 addition & 1 deletion scripts/set_variables.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ echo AIRT_PROJECT variable set to $AIRT_PROJECT
export UID=$(id -u)
export GID=$(id -g)

export DOCKER_COMPOSE_PROJECT="${USER}-faststream"
export DOCKER_COMPOSE_PROJECT="${USER//./_}-faststream"
echo DOCKER_COMPOSE_PROJECT variable set to $DOCKER_COMPOSE_PROJECT
export KAFKA_HOSTNAME="${DOCKER_COMPOSE_PROJECT}-kafka-1"
echo KAFKA_HOSTNAME variable set to $KAFKA_HOSTNAME
Expand Down
62 changes: 49 additions & 13 deletions tests/brokers/confluent/test_test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ class TestTestclient(BrokerTestclientTestcase):

@pytest.mark.confluent()
async def test_with_real_testclient(
self,
broker: KafkaBroker,
queue: str,
event: asyncio.Event,
self,
broker: KafkaBroker,
queue: str,
event: asyncio.Event,
):
@broker.subscriber(queue, auto_offset_reset="earliest")
def subscriber(m):
Expand All @@ -34,9 +34,9 @@ def subscriber(m):
assert event.is_set()

async def test_batch_pub_by_default_pub(
self,
test_broker: KafkaBroker,
queue: str,
self,
test_broker: KafkaBroker,
queue: str,
):
@test_broker.subscriber(queue, batch=True, auto_offset_reset="earliest")
async def m():
Expand All @@ -47,9 +47,9 @@ async def m():
m.mock.assert_called_once_with(["hello"])

async def test_batch_pub_by_pub_batch(
self,
test_broker: KafkaBroker,
queue: str,
self,
test_broker: KafkaBroker,
queue: str,
):
@test_broker.subscriber(queue, batch=True, auto_offset_reset="earliest")
async def m():
Expand All @@ -60,9 +60,9 @@ async def m():
m.mock.assert_called_once_with(["hello"])

async def test_batch_publisher_mock(
self,
test_broker: KafkaBroker,
queue: str,
self,
test_broker: KafkaBroker,
queue: str,
):
publisher = test_broker.publisher(queue + "1", batch=True)

Expand Down Expand Up @@ -122,3 +122,39 @@ async def h2(): ...
await h2.wait_call(10)

assert len(routes) == 2

@pytest.mark.confluent()
async def test_multiple_subscribers_different_groups(
self,
queue: str,
test_broker: KafkaBroker,
):
@test_broker.subscriber(queue, group_id="group1")
async def subscriber1(): ...

@test_broker.subscriber(queue, group_id="group2")
async def subscriber2(): ...

await test_broker.start()
await test_broker.publish("", queue)

assert subscriber1.mock.call_count == 1
assert subscriber2.mock.call_count == 1

@pytest.mark.confluent()
async def test_multiple_subscribers_same_group(
self,
queue: str,
test_broker: KafkaBroker,
):
@test_broker.subscriber(queue, group_id="group1")
async def subscriber1(): ...

@test_broker.subscriber(queue, group_id="group1")
async def subscriber2(): ...

await test_broker.start()
await test_broker.publish("", queue)

assert subscriber1.mock.call_count == 1
assert subscriber2.mock.call_count == 0
37 changes: 37 additions & 0 deletions tests/brokers/kafka/test_test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,3 +120,40 @@ async def h2(): ...
await h2.wait_call(3)

assert len(routes) == 2

@pytest.mark.kafka()
async def test_multiple_subscribers_different_groups(
self,
queue: str,
test_broker: KafkaBroker,
):
@test_broker.subscriber(queue, group_id="group1")
async def subscriber1(): ...

@test_broker.subscriber(queue, group_id="group2")
async def subscriber2(): ...

await test_broker.start()
await test_broker.publish("", queue)

assert subscriber1.mock.call_count == 1
assert subscriber2.mock.call_count == 1

@pytest.mark.kafka()
async def test_multiple_subscribers_same_group(
self,
queue: str,
test_broker: KafkaBroker,
):
@test_broker.subscriber(queue, group_id="group1")
async def subscriber1(): ...

@test_broker.subscriber(queue, group_id="group1")
async def subscriber2(): ...

await test_broker.start()
await test_broker.publish("", queue)

assert subscriber1.mock.call_count == 1
assert subscriber2.mock.call_count == 0

0 comments on commit 0d760ce

Please sign in to comment.