Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes #1412 with TestKafkaBroker behaviour where Consumer Groups weren't being respected #1413

Open
wants to merge 15 commits into
base: main
Choose a base branch
from

Conversation

sifex
Copy link

@sifex sifex commented May 1, 2024

Description

This fixes bug #1412 where distribution of messages on a topic does not get distributed to each consumer group when running the TestKafkaBroker.

Fixes #1412

Example:
This example should have 2 messages received to 2 subscribers.

@test_broker.subscriber(queue, group_id="group1")
async def subscriber1(): ...

@test_broker.subscriber(queue, group_id="group2")
async def subscriber2(): ...

assert subscriber1.mock.call_count == 1
assert subscriber2.mock.call_count == 1

Where as if we have the same group_id, we only expect to receive it once.

@test_broker.subscriber(queue, group_id="same_group")
async def subscriber1(): ...

@test_broker.subscriber(queue, group_id="same_group")
async def subscriber2(): ...

assert subscriber1.mock.call_count == 1
assert subscriber2.mock.call_count == 0

Other fixes

  • It also fixes a small bug in DOCKER_COMPOSE_PROJECT project name generation where the users' username field contains a . character (which docker believes to be invalid naming).
  • Typo fix in .github/PULL_REQUEST_TEMPLATE.md

Type of change

  • Bug fix (a non-breaking change that resolves an issue)

Checklist

  • My code adheres to the style guidelines of this project (scripts/lint.sh shows no errors)
  • I have conducted a self-review of my own code
  • I have made the necessary changes to the documentation
  • My changes do not generate any new warnings
  • I have added tests to validate the effectiveness of my fix or the functionality of my new feature
  • Both new and existing unit tests pass successfully on my local environment by running scripts/test-cov.sh (No. Currently Segfaults on Local M1 MBP, will rely on GHA)
  • I have ensured that static analysis tests are passing by running scripts/static-anaylysis.sh
  • I have included code examples to illustrate the modifications

@sifex
Copy link
Author

sifex commented May 1, 2024

Currently facing an issue where the RPC test assumes you can get a response back from a publisher? Not sure why this assumption was made.

@pytest.mark.asyncio()
    async def test_rpc(self, queue: str, rpc_broker: BrokerUsecase):
        @rpc_broker.subscriber(queue)
        async def m(m):  # pragma: no cover
            return "1"
    
        async with rpc_broker:
            await rpc_broker.start()
            r = await rpc_broker.publish("hello", queue, rpc_timeout=3, rpc=True)
    
>       assert r == "1"
E       AssertionError

imo I think publishers by definition can't return anything other than a ReceiveAck / similar response from the message queue – also not sure if I misunderstand the use case for RPC in this context.

@Lancetnik
Copy link
Collaborator

Currently facing an issue where the RPC test assumes you can get a response back from a publisher? Not sure why this assumption was made.

@pytest.mark.asyncio()
    async def test_rpc(self, queue: str, rpc_broker: BrokerUsecase):
        @rpc_broker.subscriber(queue)
        async def m(m):  # pragma: no cover
            return "1"
    
        async with rpc_broker:
            await rpc_broker.start()
            r = await rpc_broker.publish("hello", queue, rpc_timeout=3, rpc=True)
    
>       assert r == "1"
E       AssertionError

imo I think publishers by definition can't return anything other than a ReceiveAck / similar response from the message queue – also not sure if I misunderstand the use case for RPC in this context.

RPC in TestClient allows you to validate your handler result without special publisher creation. I think, we should save this behavior. In you case, if user has the only last result - it should be OK for most cases: (the following pseudocode represents assuming bahavior)

result = None
for sub in subscribers:
    result = call_subscriber(...)
return result

raise_timeout=raise_timeout,
)
for consumer_group in set(
{sub.group_id for sub in self.broker._subscribers.values()}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like O(n**2) in this case. Can we find any decision without iteration throw all subscribers twice?

Copy link
Collaborator

@Lancetnik Lancetnik left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please fix the comments and upload main changes to make it mergeable?

sifex added a commit to sifex/faststream that referenced this pull request May 18, 2024
sifex added a commit to sifex/faststream that referenced this pull request May 18, 2024
@sifex
Copy link
Author

sifex commented May 18, 2024

Should be good to go, applied ruff formatting + linting.

@sifex sifex requested a review from Lancetnik May 18, 2024 22:05
@Lancetnik
Copy link
Collaborator

I'll check it and solve conflicts with main soon. Thank you for the work!

Lancetnik
Lancetnik previously approved these changes May 20, 2024
Copy link
Collaborator

@Lancetnik Lancetnik left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sifex looks really great, but now I can't understand, why it respects consumer group😆

Anyway, can you, please add test for this behavior for publish_batch method? Then we can merge it

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Kafka Test Subscriber doesn't match Consumer Group behaviour
2 participants