Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CAMEL-18024: rework the last processed offset tracking #7518

Merged
merged 2 commits into from Apr 28, 2022

Conversation

orpiske
Copy link
Contributor

@orpiske orpiske commented Apr 28, 2022

No description provided.

@github-actions
Copy link
Contributor

⚠️ This PR changes Camel components and will be tested automatically.

@github-actions
Copy link
Contributor

❌ Finished component verification: 1 component(s) test failed out of 1 component(s) tested

@github-actions
Copy link
Contributor

❌ Finished component verification: 1 component(s) test failed out of 4 component(s) tested

2 similar comments
@github-actions
Copy link
Contributor

❌ Finished component verification: 1 component(s) test failed out of 4 component(s) tested

@github-actions
Copy link
Contributor

❌ Finished component verification: 1 component(s) test failed out of 4 component(s) tested

Move the last processed offset tracking code into the commit manager so
it can be handled accordingly to the commit management strategy (sync,
async, noop, etc)
This avoids duplication of commit code, simplifies the manual commit factories and remove the need for queueing the async commits
@github-actions
Copy link
Contributor

✔️ Finished component verification: 0 component(s) test failed out of 1 component(s) tested

@orpiske
Copy link
Contributor Author

orpiske commented Apr 28, 2022

The checkstyle issue is not on this PR code, so it's safe to merge.

@orpiske orpiske merged commit aa306b7 into apache:main Apr 28, 2022
@ludovic-boutros
Copy link
Contributor

Hi @orpiske , I just checked this PR. The async ConcurrentQueue is mandatory if you want to be able to commit offsets in an async process like an aggregation with completion timeout. this is because the commit function must be called by the consumer thread. If not you will get a ConcurrentModification exception.
The use case is for instance to index documents in a bulk manner using aggregate and commit once indexed.

@orpiske
Copy link
Contributor Author

orpiske commented May 17, 2022

Hi @orpiske , I just checked this PR. The async ConcurrentQueue is mandatory if you want to be able to commit offsets in an async process like an aggregation with completion timeout. this is because the commit function must be called by the consumer thread. If not you will get a ConcurrentModification exception.
The use case is for instance to index documents in a bulk manner using aggregate and commit once indexed.

Hi, do you have a reproducer I can quickly transform into an integration test so I can take a look at it and make sure it works/won't break again?

@orpiske
Copy link
Contributor Author

orpiske commented May 17, 2022

@ludovic-boutros doing a quick research regarding your comment I noticed our async test was disabled (it runs well on my own CI, but I re-enabled it again so we have regular checks on the Apache one).

If you have any suggestion about how to modify it to fit the use case/issue you mention, please, just let us know (or send a PR).

Thanks for reviewing it/suggesting/contributing!

@ludovic-boutros
Copy link
Contributor

@orpiske Thank you ! I will try to find some time to make some tests with this PR and let you know.

@ludovic-boutros
Copy link
Contributor

ludovic-boutros commented May 20, 2022

@orpiske the async commit process seems to be broken. The commit() function in DefaultKafkaManualAsyncCommit calls a forceCommit() function which is synchronous.
And when fixing it using the commit() function (async with the manager), the test throw a ConcurrentModificationException().
We need to reintroduce a ConcurrentQueue storing commits which should be process by the main kafka consumer polling loop and thread.

@orpiske
Copy link
Contributor Author

orpiske commented May 21, 2022

@orpiske the async commit process seems to be broken. The commit() function in DefaultKafkaManualAsyncCommit calls a forceCommit() function which is synchronous. And when fixing it using the commit() function (async with the manager), the test throw a ConcurrentModificationException(). We need to reintroduce a ConcurrentQueue storing commits which should be process by the main kafka consumer polling loop and thread.

Please, can you please open a ticket for this or send a PR with your proposed changes?

The important requirement is that the concurrent queue must be isolated to only the async-related code. It should not be part of the other code (ie.: the concerns of the async manual commit code should be separated) nor should be present in any way in the KafkaConsumer, KafkaFetchRecords or Kafka*Processor code. This is important so that we can continue cleaning up and reducing the maintenance effort for this component.

@orpiske
Copy link
Contributor Author

orpiske commented May 21, 2022

And once again: thanks for the review and testing this part of the code. It's not a scenario that is widely used/tested, so I appreciate additional eyes on it.

@orpiske
Copy link
Contributor Author

orpiske commented May 21, 2022

@orpiske the async commit process seems to be broken. The commit() function in DefaultKafkaManualAsyncCommit calls a forceCommit() function which is synchronous. And when fixing it using the commit() function (async with the manager), the test throw a ConcurrentModificationException(). We need to reintroduce a ConcurrentQueue storing commits which should be process by the main kafka consumer polling loop and thread.

Please, can you please open a ticket for this or send a PR with your proposed changes?

The important requirement is that the concurrent queue must be isolated to only the async-related code. It should not be part of the other code (ie.: the concerns of the async manual commit code should be separated) nor should be present in any way in the KafkaConsumer, KafkaFetchRecords or Kafka*Processor code. This is important so that we can continue cleaning up and reducing the maintenance effort for this component.

Alternatively, on your scenario, can you also please test if changing the call from forceCommit to recordOffset leads to the correct behavior?

I believe the offset will eventually be committed by the processor after successful processing.

@orpiske
Copy link
Contributor Author

orpiske commented May 21, 2022

@ludovic-boutros I've got some time to look at this: #7664. Changing to recordOffset seems to do the trick. I'd kindly appreciate if you could give the fix a try on your scenario.

Thanks.

@orpiske orpiske deleted the camel-18024 branch August 16, 2022 11:26
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
3 participants