Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-16217: Stop the abort transaction try loop when closing producers #15541

Merged
merged 4 commits into from
Mar 29, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -270,13 +270,14 @@ public void run() {
while (!forceClose && transactionManager != null && transactionManager.hasOngoingTransaction()) {
if (!transactionManager.isCompleting()) {
log.info("Aborting incomplete transaction due to shutdown");

try {
// It is possible for the transaction manager to throw errors when aborting. Catch these
// so as not to interfere with the rest of the shutdown logic.
transactionManager.beginAbort();
} catch (Exception e) {
log.error("Error in kafka producer I/O thread while aborting transaction: ", e);
log.error("Error in kafka producer I/O thread while aborting transaction when during closing: ", e);
// Force close in case the transactionManager is in error states.
forceClose = true;
}
}
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,11 @@
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

public class SenderTest {
private static final int MAX_REQUEST_SIZE = 1024 * 1024;
Expand Down Expand Up @@ -3239,6 +3241,25 @@ public void testProducerBatchRetriesWhenPartitionLeaderChanges() throws Exceptio
}
}

// This test is expected to run fast. If timeout, the sender is not able to close properly.
@Timeout(5)
@Test
public void testSenderShouldCloseWhenTransactionManagerInErrorState() throws Exception {
metrics.close();
Map<String, String> clientTags = Collections.singletonMap("client-id", "clientA");
metrics = new Metrics(new MetricConfig().tags(clientTags));
TransactionManager transactionManager = mock(TransactionManager.class);
SenderMetricsRegistry metricsRegistry = new SenderMetricsRegistry(metrics);
Sender sender = new Sender(logContext, client, metadata, this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL,
1, metricsRegistry, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, transactionManager, apiVersions);
when(transactionManager.hasOngoingTransaction()).thenReturn(true);
when(transactionManager.beginAbort()).thenThrow(new IllegalStateException());
sender.initiateClose();

// The sender should directly get closed.
sender.run();
}

/**
* Test the scenario that FetchResponse returns NOT_LEADER_OR_FOLLOWER, indicating change in leadership, but it
* does not contain new leader info(defined in KIP-951).
Expand Down