KAFKA-20237: Re-enqueue InitProducerId request after authentication failure#21715
Open
mvanhorn wants to merge 2 commits intoapache:trunkfrom
Open
KAFKA-20237: Re-enqueue InitProducerId request after authentication failure#21715mvanhorn wants to merge 2 commits intoapache:trunkfrom
mvanhorn wants to merge 2 commits intoapache:trunkfrom
Conversation
…ailure When an idempotent (non-transactional) KafkaProducer encounters an SSL handshake failure during the initial connection, the InitProducerId request is dequeued from the pending requests queue but never sent. The AuthenticationException is caught in Sender.runOnce(), which calls transactionManager.authenticationFailed(). However, since the request was already dequeued, authenticationFailed() iterates over an empty queue and does nothing. The TransactionManager remains stuck in INITIALIZING state with no pending request to complete initialization. On subsequent Sender iterations, bumpIdempotentEpochAndResetIdIfNeeded() skips re-enqueueing because currentState == INITIALIZING, and nextRequest() returns null because the queue is empty. The producer becomes permanently unable to send messages. Fix: Add a recovery path in bumpIdempotentEpochAndResetIdIfNeeded() that detects when the state is INITIALIZING but the pending queue is empty and no request is in-flight. In this case, re-enqueue the InitProducerId request to allow recovery after the connection is restored. Also extract the InitProducerId request creation into a helper method to avoid duplication.
…roducer isInitializing() returns isTransactional() && currentState == INITIALIZING, which is always false for idempotent (non-transactional) producers. Replace state-based assertions with behavioral assertions that verify the request enqueue/dequeue lifecycle directly. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
|
A label of 'needs-attention' was automatically added to this PR in order to raise the |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
JIRA:
KAFKA-20237
When an idempotent (non-transactional) KafkaProducer encounters an SSL
handshake failure during the initial connection, the
InitProducerIdrequest is dequeued from the pending requests queue but never sent. The
AuthenticationExceptionis caught inSender.runOnce(), which callstransactionManager.authenticationFailed(). However, since the requestwas already dequeued,
authenticationFailed()iterates over an emptyqueue and does nothing.
The
TransactionManagerremains stuck inINITIALIZINGstate:bumpIdempotentEpochAndResetIdIfNeeded()skips re-enqueueing becausecurrentState == INITIALIZINGnextRequest()returns null because the queue is emptythe SSL configuration is corrected
Changes
Added a recovery path in
bumpIdempotentEpochAndResetIdIfNeeded()thatdetects when the state is
INITIALIZINGbut the pending queue is emptyand no request is in-flight. In this case, the
InitProducerIdrequestis re-enqueued, allowing the producer to recover without requiring a
restart.
Also extracted the
InitProducerIdrequest creation into a helpermethod (
enqueueInitProducerIdRequest()) to avoid duplication.Testing
Added
testIdempotentProducerRecoversFromLostInitProducerIdRequest()that simulates:
Sender.maybeSendAndPollTransactionalRequest()would)authenticationFailed()(which does nothing since queueis empty)
bumpIdempotentEpochAndResetIdIfNeeded()re-enqueues the requestImpact
This enables self-recovery for idempotent producers in cloud-native
environments where certificate rotation or temporary auth server
unavailability can cause transient SSL failures. Previously, the only
workaround was to close and recreate the KafkaProducer.
This contribution was developed with AI assistance (Claude Code).