From 60bcc5c8baf29398030f39e6aeb0f79d60b3be24 Mon Sep 17 00:00:00 2001 From: Matt Van Horn <455140+mvanhorn@users.noreply.github.com> Date: Wed, 11 Mar 2026 17:37:00 -0700 Subject: [PATCH 1/2] KAFKA-20237: Re-enqueue InitProducerId request after authentication failure When an idempotent (non-transactional) KafkaProducer encounters an SSL handshake failure during the initial connection, the InitProducerId request is dequeued from the pending requests queue but never sent. The AuthenticationException is caught in Sender.runOnce(), which calls transactionManager.authenticationFailed(). However, since the request was already dequeued, authenticationFailed() iterates over an empty queue and does nothing. The TransactionManager remains stuck in INITIALIZING state with no pending request to complete initialization. On subsequent Sender iterations, bumpIdempotentEpochAndResetIdIfNeeded() skips re-enqueueing because currentState == INITIALIZING, and nextRequest() returns null because the queue is empty. The producer becomes permanently unable to send messages. Fix: Add a recovery path in bumpIdempotentEpochAndResetIdIfNeeded() that detects when the state is INITIALIZING but the pending queue is empty and no request is in-flight. In this case, re-enqueue the InitProducerId request to allow recovery after the connection is restored. Also extract the InitProducerId request creation into a helper method to avoid duplication. --- .../internals/TransactionManager.java | 22 ++++++++++--- .../internals/TransactionManagerTest.java | 31 +++++++++++++++++++ 2 files changed, 48 insertions(+), 5 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java index 64ee1cd79f6a9..52f47dcbcda78 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java @@ -687,15 +687,27 @@ synchronized void bumpIdempotentEpochAndResetIdIfNeeded() { } if (currentState != State.INITIALIZING && !hasProducerId()) { transitionTo(State.INITIALIZING); - InitProducerIdRequestData requestData = new InitProducerIdRequestData() - .setTransactionalId(null) - .setTransactionTimeoutMs(Integer.MAX_VALUE); - InitProducerIdHandler handler = new InitProducerIdHandler(new InitProducerIdRequest.Builder(requestData), false); - enqueueRequest(handler); + enqueueInitProducerIdRequest(); + } else if (currentState == State.INITIALIZING && !hasProducerId() + && pendingRequests.isEmpty() && !hasInFlightRequest()) { + // The previous InitProducerId request was dequeued but lost before it could + // be sent (e.g., due to an authentication error during connection). Re-enqueue + // to allow recovery without requiring a producer restart. + log.info("Re-enqueuing InitProducerId request after previous attempt was lost"); + enqueueInitProducerIdRequest(); } } } + private void enqueueInitProducerIdRequest() { + InitProducerIdRequestData requestData = new InitProducerIdRequestData() + .setTransactionalId(null) + .setTransactionTimeoutMs(Integer.MAX_VALUE); + InitProducerIdHandler handler = new InitProducerIdHandler( + new InitProducerIdRequest.Builder(requestData), false); + enqueueRequest(handler); + } + /** * Returns the next sequence number to be written to the given TopicPartition. */ diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java index a13e44a5e16df..c80ee9fb49a71 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java @@ -4613,6 +4613,37 @@ private void assertProduceFutureFailed(Future future) throws Int } } + @Test + public void testIdempotentProducerRecoversFromLostInitProducerIdRequest() { + // Simulate the scenario from KAFKA-20237: InitProducerId request is dequeued + // but lost due to authentication failure, leaving state stuck at INITIALIZING. + initializeTransactionManager(Optional.empty(), false); + + // First call transitions to INITIALIZING and enqueues request + transactionManager.bumpIdempotentEpochAndResetIdIfNeeded(); + assertTrue(transactionManager.isInitializing()); + + // Simulate the request being dequeued (as nextRequest() would do in Sender) + TransactionManager.TxnRequestHandler handler = transactionManager.nextRequest(false); + assertNotNull(handler, "InitProducerIdHandler should have been enqueued"); + + // Simulate authentication failure: the request was dequeued but never sent. + // authenticationFailed() iterates pendingRequests (now empty) so it does nothing. + transactionManager.authenticationFailed(new org.apache.kafka.common.errors.AuthenticationException("SSL handshake failed")); + + // State should still be INITIALIZING (the handler was already dequeued) + assertTrue(transactionManager.isInitializing()); + + // On the next Sender iteration, bumpIdempotentEpochAndResetIdIfNeeded should + // detect the lost request and re-enqueue + transactionManager.bumpIdempotentEpochAndResetIdIfNeeded(); + assertTrue(transactionManager.isInitializing()); + + // Verify a new request was enqueued + TransactionManager.TxnRequestHandler retryHandler = transactionManager.nextRequest(false); + assertNotNull(retryHandler, "A new InitProducerIdHandler should have been re-enqueued"); + } + private void runUntil(Supplier condition) { ProducerTestUtils.runUntil(sender, condition); } From 0c258f91c2a8def683a0ad719f5dace4685735b8 Mon Sep 17 00:00:00 2001 From: Matt Van Horn <455140+mvanhorn@users.noreply.github.com> Date: Thu, 12 Mar 2026 16:43:52 -0700 Subject: [PATCH 2/2] fix(test): remove isInitializing() assertions for non-transactional producer isInitializing() returns isTransactional() && currentState == INITIALIZING, which is always false for idempotent (non-transactional) producers. Replace state-based assertions with behavioral assertions that verify the request enqueue/dequeue lifecycle directly. Co-Authored-By: Claude Opus 4.6 --- .../producer/internals/TransactionManagerTest.java | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java index c80ee9fb49a71..672e759e529ab 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java @@ -4617,13 +4617,14 @@ private void assertProduceFutureFailed(Future future) throws Int public void testIdempotentProducerRecoversFromLostInitProducerIdRequest() { // Simulate the scenario from KAFKA-20237: InitProducerId request is dequeued // but lost due to authentication failure, leaving state stuck at INITIALIZING. + // Note: isInitializing() checks isTransactional(), so we verify state via + // behavioral assertions (request enqueue/dequeue) rather than state checks. initializeTransactionManager(Optional.empty(), false); // First call transitions to INITIALIZING and enqueues request transactionManager.bumpIdempotentEpochAndResetIdIfNeeded(); - assertTrue(transactionManager.isInitializing()); - // Simulate the request being dequeued (as nextRequest() would do in Sender) + // Verify a request was enqueued by dequeuing it (as nextRequest() would in Sender) TransactionManager.TxnRequestHandler handler = transactionManager.nextRequest(false); assertNotNull(handler, "InitProducerIdHandler should have been enqueued"); @@ -4631,15 +4632,11 @@ public void testIdempotentProducerRecoversFromLostInitProducerIdRequest() { // authenticationFailed() iterates pendingRequests (now empty) so it does nothing. transactionManager.authenticationFailed(new org.apache.kafka.common.errors.AuthenticationException("SSL handshake failed")); - // State should still be INITIALIZING (the handler was already dequeued) - assertTrue(transactionManager.isInitializing()); - // On the next Sender iteration, bumpIdempotentEpochAndResetIdIfNeeded should // detect the lost request and re-enqueue transactionManager.bumpIdempotentEpochAndResetIdIfNeeded(); - assertTrue(transactionManager.isInitializing()); - // Verify a new request was enqueued + // Verify a new request was re-enqueued TransactionManager.TxnRequestHandler retryHandler = transactionManager.nextRequest(false); assertNotNull(retryHandler, "A new InitProducerIdHandler should have been re-enqueued"); }