CAMEL-20227: Fix Pausable EIP losing messages due to Kafka offset advancement#23805
Merged
Conversation
…fset advancement The Pausable EIP with Kafka consumer was losing messages because: 1. afterConsume() only evaluated the resume predicate when already paused, but never called consumer.pause() or seeked back to committed offsets 2. The poll loop continued immediately without pausing, causing auto-commit to advance offsets for unprocessed records Fix: Rewrite afterConsume() to always evaluate the predicate, call consumer.pause() and seek to the correct offset when pausing, and consumer.resume() when resuming. This ensures poll() returns empty records during pause and offsets are not advanced for unprocessed messages. Co-Authored-By: Claude <noreply@anthropic.com> Signed-off-by: Claus Ibsen <claus.ibsen@gmail.com>
Contributor
|
🌟 Thank you for your contribution to the Apache Camel project! 🌟 🐫 Apache Camel Committers, please review the following items:
|
…le consumers Add a note recommending autoCommitEnable=false with allowManualCommit=true when using pausable consumers, to avoid message loss from Kafka auto-committing offsets for unprocessed records during pause/resume cycles. Co-Authored-By: Claude <noreply@anthropic.com> Signed-off-by: Claus Ibsen <claus.ibsen@gmail.com>
Contributor
|
🧪 CI tested the following changed modules:
All tested modules (8 modules)
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Claude Code on behalf of Claus Ibsen
Fixes the Pausable EIP with Kafka consumer losing messages when the consumer is paused and later resumed. The root cause was that
KafkaConsumerListener.afterConsume()never calledconsumer.pause()or seeked back to committed offsets, soconsumer.poll()kept advancing offsets for records that were never processed.afterConsume()to always evaluate the resume predicate, callconsumer.pause()+seekConsumer()when pausing, andconsumer.resume()when resumingpausedboolean flag that prevented proper predicate evaluationseekConsumer()helper shared betweenafterConsumeandafterProcessconsumer.pause()sopoll()returns empty records during pause (noThread.sleepneeded)Prior partial fix (
869e870952c2) only added apausedflag without addressing the offset advancement. A full fix existed on the unmergedorigin/kafka-pausebranch — this PR adapts that fix to currentmainwith the cleanerconsumer.pause()approach.Test plan
KafkaPausableConsumerITpasses — verifies all messages are received after pause/resumeKafkaPausableConsumerCircuitBreakerITpassesautoCommitEnable=trueduring pause/resume cycle🤖 Generated with Claude Code