Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes #1412 with TestKafkaBroker behaviour where Consumer Groups weren't being respected #1413

Merged
merged 15 commits into from
May 25, 2024
Merged
8 changes: 6 additions & 2 deletions faststream/confluent/testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,11 @@ async def publish( # type: ignore[override]
reply_to=reply_to,
)

return_value = None

for handler in self.broker._subscribers.values(): # pragma: no branch
if topic in handler.topics:
return await call_handler(
handle_value = await call_handler(
handler=handler,
message=[incoming]
if isinstance(handler, AsyncAPIBatchSubscriber)
Expand All @@ -110,7 +112,9 @@ async def publish( # type: ignore[override]
raise_timeout=raise_timeout,
)

return None
return_value = return_value or handle_value

return return_value

async def publish_batch(
self,
Expand Down
25 changes: 13 additions & 12 deletions faststream/kafka/testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,18 +105,17 @@ async def publish( # type: ignore[override]
reply_to=reply_to,
)

for handler in self.broker._subscribers.values(): # pragma: no branch
call: bool = False

for p in handler.partitions:
if p.topic == topic and (partition is None or p.partition == partition):
call = True
return_value = None

if not call and topic in handler.topics:
call = True

if call:
return await call_handler(
for handler in self.broker._subscribers.values(): # pragma: no branch
if (
any(
p.topic == topic and (partition is None or p.partition == partition)
for p in handler.partitions
)
or topic in handler.topics
):
handle_value = await call_handler(
handler=handler,
message=[incoming]
if isinstance(handler, AsyncAPIBatchSubscriber)
Expand All @@ -126,7 +125,9 @@ async def publish( # type: ignore[override]
raise_timeout=raise_timeout,
)

return None
return_value = return_value or handle_value

return return_value

async def publish_batch(
self,
Expand Down
2 changes: 1 addition & 1 deletion scripts/set_variables.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ echo AIRT_PROJECT variable set to $AIRT_PROJECT
export UID=$(id -u)
export GID=$(id -g)

export DOCKER_COMPOSE_PROJECT="${USER}-faststream"
export DOCKER_COMPOSE_PROJECT="${USER//./_}-faststream"
echo DOCKER_COMPOSE_PROJECT variable set to $DOCKER_COMPOSE_PROJECT
export KAFKA_HOSTNAME="${DOCKER_COMPOSE_PROJECT}-kafka-1"
echo KAFKA_HOSTNAME variable set to $KAFKA_HOSTNAME
Expand Down
68 changes: 68 additions & 0 deletions tests/brokers/confluent/test_test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,3 +134,71 @@ async def h2(): ...
await h2.wait_call(10)

assert len(routes) == 2

async def test_multiple_subscribers_different_groups(
self,
queue: str,
test_broker: KafkaBroker,
):
@test_broker.subscriber(queue, group_id="group1")
async def subscriber1(): ...

@test_broker.subscriber(queue, group_id="group2")
async def subscriber2(): ...

await test_broker.start()
await test_broker.publish("", queue)

assert subscriber1.mock.call_count == 1
assert subscriber2.mock.call_count == 1

async def test_multiple_subscribers_same_group(
self,
queue: str,
test_broker: KafkaBroker,
):
@test_broker.subscriber(queue, group_id="group1")
async def subscriber1(): ...

@test_broker.subscriber(queue, group_id="group1")
async def subscriber2(): ...

await test_broker.start()
await test_broker.publish("", queue)

assert subscriber1.mock.call_count == 1
assert subscriber2.mock.call_count == 0

async def test_multiple_batch_subscriber_with_different_group(
self,
test_broker: KafkaBroker,
queue: str,
):
@test_broker.subscriber(queue, batch=True, group_id="group1")
async def subscriber1(): ...

@test_broker.subscriber(queue, batch=True, group_id="group2")
async def subscriber2(): ...

await test_broker.start()
await test_broker.publish("", queue)

assert subscriber1.mock.call_count == 1
assert subscriber2.mock.call_count == 1

async def test_multiple_batch_subscriber_with_same_group(
self,
test_broker: KafkaBroker,
queue: str,
):
@test_broker.subscriber(queue, batch=True, group_id="group1")
async def subscriber1(): ...

@test_broker.subscriber(queue, batch=True, group_id="group1")
async def subscriber2(): ...

await test_broker.start()
await test_broker.publish("", queue)

assert subscriber1.mock.call_count == 1
assert subscriber2.mock.call_count == 0
68 changes: 68 additions & 0 deletions tests/brokers/kafka/test_test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,3 +182,71 @@ async def h2(): ...
await h2.wait_call(3)

assert len(routes) == 2

async def test_multiple_subscribers_different_groups(
self,
queue: str,
test_broker: KafkaBroker,
):
@test_broker.subscriber(queue, group_id="group1")
async def subscriber1(): ...

@test_broker.subscriber(queue, group_id="group2")
async def subscriber2(): ...

await test_broker.start()
await test_broker.publish("", queue)

assert subscriber1.mock.call_count == 1
assert subscriber2.mock.call_count == 1

async def test_multiple_subscribers_same_group(
self,
queue: str,
test_broker: KafkaBroker,
):
@test_broker.subscriber(queue, group_id="group1")
async def subscriber1(): ...

@test_broker.subscriber(queue, group_id="group1")
async def subscriber2(): ...

await test_broker.start()
await test_broker.publish("", queue)

assert subscriber1.mock.call_count == 1
assert subscriber2.mock.call_count == 0

async def test_multiple_batch_subscriber_with_different_group(
self,
queue: str,
test_broker: KafkaBroker,
):
@test_broker.subscriber(queue, batch=True, group_id="group1")
async def subscriber1(): ...

@test_broker.subscriber(queue, batch=True, group_id="group2")
async def subscriber2(): ...

await test_broker.start()
await test_broker.publish("", queue)

assert subscriber1.mock.call_count == 1
assert subscriber2.mock.call_count == 1

async def test_multiple_batch_subscriber_with_same_group(
self,
queue: str,
test_broker: KafkaBroker,
):
@test_broker.subscriber(queue, batch=True, group_id="group1")
async def subscriber1(): ...

@test_broker.subscriber(queue, batch=True, group_id="group1")
async def subscriber2(): ...

await test_broker.start()
await test_broker.publish("", queue)

assert subscriber1.mock.call_count == 1
assert subscriber2.mock.call_count == 0