Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-5427: Transactional producer should allow FindCoordinator in error state #3297

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -97,17 +97,15 @@ private boolean isTransitionValid(State source, State target) {
case INITIALIZING:
return source == UNINITIALIZED;
case READY:
return source == INITIALIZING || source == COMMITTING_TRANSACTION
|| source == ABORTING_TRANSACTION || source == ABORTABLE_ERROR;
return source == INITIALIZING || source == COMMITTING_TRANSACTION || source == ABORTING_TRANSACTION;
case IN_TRANSACTION:
return source == READY;
case COMMITTING_TRANSACTION:
return source == IN_TRANSACTION;
case ABORTING_TRANSACTION:
return source == IN_TRANSACTION || source == ABORTABLE_ERROR;
case ABORTABLE_ERROR:
return source == IN_TRANSACTION || source == COMMITTING_TRANSACTION || source == ABORTING_TRANSACTION
|| source == ABORTABLE_ERROR;
return source == IN_TRANSACTION || source == COMMITTING_TRANSACTION || source == ABORTABLE_ERROR;
case FATAL_ERROR:
default:
// We can transition to FATAL_ERROR unconditionally.
Expand Down Expand Up @@ -179,7 +177,7 @@ public synchronized TransactionalRequestResult beginCommittingTransaction() {
ensureTransactional();
maybeFailWithError();
transitionTo(State.COMMITTING_TRANSACTION);
return beginCompletingTransaction(true);
return beginCompletingTransaction(TransactionResult.COMMIT);
}

public synchronized TransactionalRequestResult beginAbortingTransaction() {
Expand All @@ -190,14 +188,12 @@ public synchronized TransactionalRequestResult beginAbortingTransaction() {

// We're aborting the transaction, so there should be no need to add new partitions
newPartitionsInTransaction.clear();
return beginCompletingTransaction(false);
return beginCompletingTransaction(TransactionResult.ABORT);
}

private TransactionalRequestResult beginCompletingTransaction(boolean isCommit) {
private TransactionalRequestResult beginCompletingTransaction(TransactionResult transactionResult) {
if (!newPartitionsInTransaction.isEmpty())
enqueueRequest(addPartitionsToTransactionHandler());

TransactionResult transactionResult = isCommit ? TransactionResult.COMMIT : TransactionResult.ABORT;
EndTxnRequest.Builder builder = new EndTxnRequest.Builder(transactionalId, producerIdAndEpoch.producerId,
producerIdAndEpoch.epoch, transactionResult);
EndTxnHandler handler = new EndTxnHandler(builder);
Expand Down Expand Up @@ -225,7 +221,7 @@ public synchronized void maybeAddPartitionToTransaction(TopicPartition topicPart
if (currentState != State.IN_TRANSACTION)
throw new IllegalStateException("Cannot add partitions to a transaction in state " + currentState);

if (partitionsInTransaction.contains(topicPartition) || pendingPartitionsInTransaction.contains(topicPartition))
if (isPartitionAdded(topicPartition) || isPartitionPendingAdd(topicPartition))
return;

log.debug("{}Begin adding new partition {} to transaction", logPrefix, topicPartition);
Expand Down Expand Up @@ -286,6 +282,11 @@ synchronized boolean isAborting() {
}

synchronized void transitionToAbortableError(RuntimeException exception) {
if (currentState == State.ABORTING_TRANSACTION) {
log.debug("Skipping transition to abortable error state since the transaction is already being " +
"aborted. Underlying exception: ", exception);
return;
}
transitionTo(State.ABORTABLE_ERROR, exception);
}

Expand Down Expand Up @@ -504,13 +505,10 @@ private void maybeFailWithError() {

private boolean maybeTerminateRequestWithError(TxnRequestHandler requestHandler) {
if (hasError()) {
if (requestHandler instanceof EndTxnHandler) {
// we allow abort requests to break out of the error state. The state and the last error
// will be cleared when the request returns
EndTxnHandler endTxnHandler = (EndTxnHandler) requestHandler;
if (endTxnHandler.builder.result() == TransactionResult.ABORT)
return false;
}
if (hasAbortableError() && requestHandler instanceof FindCoordinatorHandler)
// No harm letting the FindCoordinator request go through if we're expecting to abort
return false;

requestHandler.fail(lastError);
return true;
}
Expand Down
Expand Up @@ -80,6 +80,7 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

Expand Down Expand Up @@ -1058,6 +1059,71 @@ public void testAllowAbortOnProduceFailure() throws InterruptedException {
assertTrue(transactionManager.isReady()); // make sure we are ready for a transaction now.
}

@Test
public void testAbortableErrorWhileAbortInProgress() throws InterruptedException {
final long pid = 13131L;
final short epoch = 1;

doInitTransactions(pid, epoch);

transactionManager.beginTransaction();
transactionManager.maybeAddPartitionToTransaction(tp0);

Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
"value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future;

assertFalse(responseFuture.isDone());
prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, pid);

sender.run(time.milliseconds()); // Send AddPartitionsRequest
sender.run(time.milliseconds()); // Send Produce Request

TransactionalRequestResult abortResult = transactionManager.beginAbortingTransaction();
assertTrue(transactionManager.isAborting());
assertFalse(transactionManager.hasError());

sendProduceResponse(Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, pid, epoch);
prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, pid, epoch);
sender.run(time.milliseconds()); // receive the produce response

// we do not transition to ABORTABLE_ERROR since we were already aborting
assertTrue(transactionManager.isAborting());
assertFalse(transactionManager.hasError());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perhaps better to assert that transactionManager.hasAbortableError is false.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The hasError check is strictly stronger. It covers both hasAbortableError and hasFatalError.


sender.run(time.milliseconds()); // handle the abort
assertTrue(abortResult.isCompleted());
assertTrue(abortResult.isSuccessful());
assertTrue(transactionManager.isReady()); // make sure we are ready for a transaction now.
}

@Test
public void testFindCoordinatorAllowedInAbortableErrorState() throws InterruptedException {
final long pid = 13131L;
final short epoch = 1;

doInitTransactions(pid, epoch);

transactionManager.beginTransaction();
transactionManager.maybeAddPartitionToTransaction(tp0);

Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
"value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future;

assertFalse(responseFuture.isDone());
sender.run(time.milliseconds()); // Send AddPartitionsRequest

transactionManager.transitionToAbortableError(new KafkaException());
sendAddPartitionsToTxnResponse(Errors.NOT_COORDINATOR, tp0, epoch, pid);
sender.run(time.milliseconds()); // AddPartitions returns
assertTrue(transactionManager.hasAbortableError());

assertNull(transactionManager.coordinator(CoordinatorType.TRANSACTION));
prepareFindCoordinatorResponse(Errors.NONE, false, CoordinatorType.TRANSACTION, transactionalId);
sender.run(time.milliseconds()); // FindCoordinator handled
assertEquals(brokerNode, transactionManager.coordinator(CoordinatorType.TRANSACTION));
assertTrue(transactionManager.hasAbortableError());
}

@Test
public void testCancelUnsentAddPartitionsAndProduceOnAbort() throws InterruptedException {
final long pid = 13131L;
Expand Down Expand Up @@ -1279,15 +1345,42 @@ public void shouldNotSendAbortTxnRequestWhenOnlyAddOffsetsRequestFailed() throws

TransactionalRequestResult abortResult = transactionManager.beginAbortingTransaction();

prepareAddOffsetsToTxnResponse(Errors.TOPIC_AUTHORIZATION_FAILED, consumerGroupId, pid, epoch);
prepareAddOffsetsToTxnResponse(Errors.GROUP_AUTHORIZATION_FAILED, consumerGroupId, pid, epoch);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why was this changed?

Copy link
Contributor Author

@hachikuji hachikuji Jun 12, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The previous error is not expected for AddOffsets, which means it is a fatal error. The intent of the test case was apparently to trigger an abortable error. Note that I've added a test case for the fatal error case below.

sender.run(time.milliseconds()); // Send AddOffsetsToTxnRequest
assertFalse(abortResult.isCompleted());

sender.run(time.milliseconds());
assertTrue(transactionManager.isReady());
assertTrue(abortResult.isCompleted());
assertTrue(abortResult.isSuccessful());
}

@Test
public void shouldFailAbortIfAddOffsetsFailsWithFatalError() throws Exception {
final long pid = 13131L;
final short epoch = 1;

doInitTransactions(pid, epoch);

transactionManager.beginTransaction();
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
offsets.put(tp1, new OffsetAndMetadata(1));
final String consumerGroupId = "myconsumergroup";

transactionManager.sendOffsetsToTransaction(offsets, consumerGroupId);

TransactionalRequestResult abortResult = transactionManager.beginAbortingTransaction();

prepareAddOffsetsToTxnResponse(Errors.UNKNOWN, consumerGroupId, pid, epoch);
sender.run(time.milliseconds()); // Send AddOffsetsToTxnRequest
assertFalse(abortResult.isCompleted());

sender.run(time.milliseconds());
assertTrue(abortResult.isCompleted());
assertFalse(abortResult.isSuccessful());
assertTrue(transactionManager.hasFatalError());
}

@Test
public void testNoDrainWhenPartitionsPending() throws InterruptedException {
final long pid = 13131L;
Expand Down Expand Up @@ -1623,8 +1716,15 @@ public boolean matches(AbstractRequest body) {
}, new InitProducerIdResponse(0, error, pid, epoch), shouldDisconnect);
}

private void sendProduceResponse(Errors error, final long pid, final short epoch) {
client.respond(produceRequestMatcher(pid, epoch), produceResponse(tp0, 0, error, 0));
}

private void prepareProduceResponse(Errors error, final long pid, final short epoch) {
client.prepareResponse(new MockClient.RequestMatcher() {
client.prepareResponse(produceRequestMatcher(pid, epoch), produceResponse(tp0, 0, error, 0));
}
private MockClient.RequestMatcher produceRequestMatcher(final long pid, final short epoch) {
return new MockClient.RequestMatcher() {
@Override
public boolean matches(AbstractRequest body) {
ProduceRequest produceRequest = (ProduceRequest) body;
Expand All @@ -1640,12 +1740,24 @@ public boolean matches(AbstractRequest body) {
assertEquals(transactionalId, produceRequest.transactionalId());
return true;
}
}, produceResponse(tp0, 0, error, 0));
};
}

private void prepareAddPartitionsToTxnResponse(Errors error, final TopicPartition topicPartition,
final short epoch, final long pid) {
client.prepareResponse(addPartitionsRequestMatcher(topicPartition, epoch, pid),
new AddPartitionsToTxnResponse(0, singletonMap(topicPartition, error)));
}

private void prepareAddPartitionsToTxnResponse(Errors error, final TopicPartition topicPartition, final short epoch, final long pid) {
client.prepareResponse(new MockClient.RequestMatcher() {
private void sendAddPartitionsToTxnResponse(Errors error, final TopicPartition topicPartition,
final short epoch, final long pid) {
client.respond(addPartitionsRequestMatcher(topicPartition, epoch, pid),
new AddPartitionsToTxnResponse(0, singletonMap(topicPartition, error)));
}

private MockClient.RequestMatcher addPartitionsRequestMatcher(final TopicPartition topicPartition,
final short epoch, final long pid) {
return new MockClient.RequestMatcher() {
@Override
public boolean matches(AbstractRequest body) {
AddPartitionsToTxnRequest addPartitionsToTxnRequest = (AddPartitionsToTxnRequest) body;
Expand All @@ -1655,7 +1767,7 @@ public boolean matches(AbstractRequest body) {
assertEquals(transactionalId, addPartitionsToTxnRequest.transactionalId());
return true;
}
}, new AddPartitionsToTxnResponse(0, singletonMap(topicPartition, error)));
};
}

private void prepareEndTxnResponse(Errors error, final TransactionResult result, final long pid, final short epoch) {
Expand Down