diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java index 64ee1cd79f6a9..52f47dcbcda78 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java @@ -687,15 +687,27 @@ synchronized void bumpIdempotentEpochAndResetIdIfNeeded() { } if (currentState != State.INITIALIZING && !hasProducerId()) { transitionTo(State.INITIALIZING); - InitProducerIdRequestData requestData = new InitProducerIdRequestData() - .setTransactionalId(null) - .setTransactionTimeoutMs(Integer.MAX_VALUE); - InitProducerIdHandler handler = new InitProducerIdHandler(new InitProducerIdRequest.Builder(requestData), false); - enqueueRequest(handler); + enqueueInitProducerIdRequest(); + } else if (currentState == State.INITIALIZING && !hasProducerId() + && pendingRequests.isEmpty() && !hasInFlightRequest()) { + // The previous InitProducerId request was dequeued but lost before it could + // be sent (e.g., due to an authentication error during connection). Re-enqueue + // to allow recovery without requiring a producer restart. + log.info("Re-enqueuing InitProducerId request after previous attempt was lost"); + enqueueInitProducerIdRequest(); } } } + private void enqueueInitProducerIdRequest() { + InitProducerIdRequestData requestData = new InitProducerIdRequestData() + .setTransactionalId(null) + .setTransactionTimeoutMs(Integer.MAX_VALUE); + InitProducerIdHandler handler = new InitProducerIdHandler( + new InitProducerIdRequest.Builder(requestData), false); + enqueueRequest(handler); + } + /** * Returns the next sequence number to be written to the given TopicPartition. */ diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java index a13e44a5e16df..672e759e529ab 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java @@ -4613,6 +4613,34 @@ private void assertProduceFutureFailed(Future future) throws Int } } + @Test + public void testIdempotentProducerRecoversFromLostInitProducerIdRequest() { + // Simulate the scenario from KAFKA-20237: InitProducerId request is dequeued + // but lost due to authentication failure, leaving state stuck at INITIALIZING. + // Note: isInitializing() checks isTransactional(), so we verify state via + // behavioral assertions (request enqueue/dequeue) rather than state checks. + initializeTransactionManager(Optional.empty(), false); + + // First call transitions to INITIALIZING and enqueues request + transactionManager.bumpIdempotentEpochAndResetIdIfNeeded(); + + // Verify a request was enqueued by dequeuing it (as nextRequest() would in Sender) + TransactionManager.TxnRequestHandler handler = transactionManager.nextRequest(false); + assertNotNull(handler, "InitProducerIdHandler should have been enqueued"); + + // Simulate authentication failure: the request was dequeued but never sent. + // authenticationFailed() iterates pendingRequests (now empty) so it does nothing. + transactionManager.authenticationFailed(new org.apache.kafka.common.errors.AuthenticationException("SSL handshake failed")); + + // On the next Sender iteration, bumpIdempotentEpochAndResetIdIfNeeded should + // detect the lost request and re-enqueue + transactionManager.bumpIdempotentEpochAndResetIdIfNeeded(); + + // Verify a new request was re-enqueued + TransactionManager.TxnRequestHandler retryHandler = transactionManager.nextRequest(false); + assertNotNull(retryHandler, "A new InitProducerIdHandler should have been re-enqueued"); + } + private void runUntil(Supplier condition) { ProducerTestUtils.runUntil(sender, condition); }