Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -687,15 +687,27 @@ synchronized void bumpIdempotentEpochAndResetIdIfNeeded() {
}
if (currentState != State.INITIALIZING && !hasProducerId()) {
transitionTo(State.INITIALIZING);
InitProducerIdRequestData requestData = new InitProducerIdRequestData()
.setTransactionalId(null)
.setTransactionTimeoutMs(Integer.MAX_VALUE);
InitProducerIdHandler handler = new InitProducerIdHandler(new InitProducerIdRequest.Builder(requestData), false);
enqueueRequest(handler);
enqueueInitProducerIdRequest();
} else if (currentState == State.INITIALIZING && !hasProducerId()
&& pendingRequests.isEmpty() && !hasInFlightRequest()) {
// The previous InitProducerId request was dequeued but lost before it could
// be sent (e.g., due to an authentication error during connection). Re-enqueue
// to allow recovery without requiring a producer restart.
log.info("Re-enqueuing InitProducerId request after previous attempt was lost");
enqueueInitProducerIdRequest();
}
}
}

private void enqueueInitProducerIdRequest() {
InitProducerIdRequestData requestData = new InitProducerIdRequestData()
.setTransactionalId(null)
.setTransactionTimeoutMs(Integer.MAX_VALUE);
InitProducerIdHandler handler = new InitProducerIdHandler(
new InitProducerIdRequest.Builder(requestData), false);
enqueueRequest(handler);
}

/**
* Returns the next sequence number to be written to the given TopicPartition.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4613,6 +4613,34 @@ private void assertProduceFutureFailed(Future<RecordMetadata> future) throws Int
}
}

@Test
public void testIdempotentProducerRecoversFromLostInitProducerIdRequest() {
// Simulate the scenario from KAFKA-20237: InitProducerId request is dequeued
// but lost due to authentication failure, leaving state stuck at INITIALIZING.
// Note: isInitializing() checks isTransactional(), so we verify state via
// behavioral assertions (request enqueue/dequeue) rather than state checks.
initializeTransactionManager(Optional.empty(), false);

// First call transitions to INITIALIZING and enqueues request
transactionManager.bumpIdempotentEpochAndResetIdIfNeeded();

// Verify a request was enqueued by dequeuing it (as nextRequest() would in Sender)
TransactionManager.TxnRequestHandler handler = transactionManager.nextRequest(false);
assertNotNull(handler, "InitProducerIdHandler should have been enqueued");

// Simulate authentication failure: the request was dequeued but never sent.
// authenticationFailed() iterates pendingRequests (now empty) so it does nothing.
transactionManager.authenticationFailed(new org.apache.kafka.common.errors.AuthenticationException("SSL handshake failed"));

// On the next Sender iteration, bumpIdempotentEpochAndResetIdIfNeeded should
// detect the lost request and re-enqueue
transactionManager.bumpIdempotentEpochAndResetIdIfNeeded();

// Verify a new request was re-enqueued
TransactionManager.TxnRequestHandler retryHandler = transactionManager.nextRequest(false);
assertNotNull(retryHandler, "A new InitProducerIdHandler should have been re-enqueued");
}

private void runUntil(Supplier<Boolean> condition) {
ProducerTestUtils.runUntil(sender, condition);
}
Expand Down