Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-16665: Allow to initialize newly assigned partition's positions without allowing fetching while callback runs #15856

Merged
merged 3 commits into from
May 7, 2024

Conversation

lianetm
Copy link
Contributor

@lianetm lianetm commented May 3, 2024

Fix to allow to initialize positions for newly assigned partitions, while the onPartitionsAssigned callback is running, even though the partitions remain non-fetchable until the callback completes.

Before this PR, we were not allowing initialization or fetching while the callback was running. The fix here only allows to initialize the newly assigned partition position, and keeps the existing logic for making sure that the partition remains non-fetchable until the callback completes.

The need for this fix came out in one of the connect system tests, that attempts to retrieve a newly assigned partition position with a call to consumer.position from within the onPartitionsAssigned callback (WorkerSinkTask). With this PR, we allow to make such calls (test added), which is the behaviour of the legacy consumer.

@lianetm
Copy link
Contributor Author

lianetm commented May 3, 2024

Hey @lucasbru , could you take a look?

For more context, we initially decided not to allow initializing positions while the onPartitionsAssigned callback was running, concerned about a user trying to seek in the callback, and us initializing to a different position within the poll, but that shouldn't happen because we execute the callbacks in the foreground thread, so poll will be blocked while the onPartitionsAssigned completes (so user's seek would happen, callback completes, then next poll). Actually, with the connect test we realized that we do need to allow to initialize positions, because a user may want to call .position() from within the callback, like the WorkerSinkTask does (that sys test passes locally now with this fix)

Then, this PR was after a symptom of this same problem, but I would say we put the fix in the wrong place by not allowing reset, when in reality the issue was that we were not allowing to initialize the position in the first place. This is why with this current PR I'm kind of reverting that other PR comments, and part of the logic (not all , mainly to keep some refactoring it had).

I added an integration test, and these are the existing unit tests we had (they got updated with the PR, but indirectly, so they don't show up clearly when you see the changes, helpful to look at them too)

Thanks!

Copy link
Member

@lucasbru lucasbru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks! We should remember to move this test over to PlaintextConsumerCallbackTest once we merge https://github.com/apache/kafka/pull/15408/files

@lucasbru
Copy link
Member

lucasbru commented May 6, 2024

@lianetm some conflicts need to be resolved

@lianetm
Copy link
Contributor Author

lianetm commented May 6, 2024

Thanks @lucasbru, conflicts solved. Good pointer about the new file for callback tests, I created https://issues.apache.org/jira/browse/KAFKA-16675 assigned to me to make sure I move the test as soon as that one gets merged.

@lucasbru lucasbru merged commit ea485a7 into apache:trunk May 7, 2024
1 check failed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
2 participants