Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-7763: commitTransaction and abortTransaction should not wait endless when brokers are down #6066

Merged
merged 10 commits into from Feb 21, 2019

Conversation

@huxihx
Copy link
Contributor

huxihx commented Dec 26, 2018

Currently, commitTransaction and abortTransaction invoke Latch#await to wait for the request completion. When brokers are not available, these calls await forever and producer never ends. This fix replaces it with the timed equivalent await(timeout, unit). When timeout occurred, producer throws a TimeoutException.

More detailed description of your change,
if necessary. The PR title and PR message become
the squashed commit message, so use a separate
comment to ping reviewers.

Summary of testing strategy (including rationale)
for the feature or bug fix. Unit and/or integration
tests are expected for any behaviour change and
system tests should be considered for larger changes.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)
@huxihx

This comment has been minimized.

Copy link
Contributor Author

huxihx commented Dec 26, 2018

retest this please

Copy link
Contributor

hachikuji left a comment

Thanks @huxihx. Left a couple comments.

*/
public void commitTransaction() throws ProducerFencedException {
throwIfNoTransactionManager();
TransactionalRequestResult result = transactionManager.beginCommit();
sender.wakeup();
result.await();
try {

This comment has been minimized.

Copy link
@hachikuji

hachikuji Dec 27, 2018

Contributor

nit: we could probably turn this into a helper since there are only slight differences between commit and abort.

result.await();
try {
if (!result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS)) {
throw new TimeoutException("Timeout expired while committing transaction in " + maxBlockTimeMs + "ms.");

This comment has been minimized.

Copy link
@hachikuji

hachikuji Dec 27, 2018

Contributor

This will probably take a little more work. If there is a timeout, the user should be able to retry. At the moment, I think this would just cause an illegal state error. We probably need to cache the TransactionalRequestResult somewhere so that we can continue on the next retry. We do something like this in initTransactions() already. It might be possible to factor out some common logic so that we don't just have a bunch of internal fields. Maybe we could move the caching into TransactionManager where we would be able to validate the state.

@huxihx

This comment has been minimized.

Copy link
Contributor Author

huxihx commented Dec 29, 2018

retest this please

@huxihx huxihx force-pushed the huxihx:KAFKA-7763 branch to 1c6e0d2 Jan 2, 2019
Copy link
Contributor

hachikuji left a comment

Thanks for the updates. Left some small suggestions.

@@ -99,6 +103,7 @@
private final Set<TopicPartition> pendingPartitionsInTransaction;
private final Set<TopicPartition> partitionsInTransaction;
private final Map<TopicPartition, CommittedOffset> pendingTxnOffsetCommits;
private final Map<State, TransactionalRequestResult> stateToTransactionRequestResult;

This comment has been minimized.

Copy link
@hachikuji

hachikuji Jan 3, 2019

Contributor

We should only have one active TransactionalRequestResult at any time, so I don't think this needs to be a map.

@@ -289,6 +314,32 @@ public synchronized void failIfNotReadyForSend() {
}
}

public void awaitResultOrThrowTimeoutException(long maxBlockTimeMs,

This comment has been minimized.

Copy link
@hachikuji

hachikuji Jan 3, 2019

Contributor

As an alternative, we could move this logic into TransactionalRequestResult. We can change TransactionalRequestResult.await(timeout) to raise a suitable timeout and handle the InterruptedException. We can also pass the operation as a string when constructing TransactionalRequestResult. For example:

    public boolean await(long timeout, TimeUnit unit) {
        try {
            boolean completed = latch.await(timeout, unit);
            if (!completed)
                throw new TimeoutException("Timeout expired after" + timeout + "ms while awaiting " + operation);

            if (!isSuccessful())
                throw error();

            return true;
        } catch (InterruptedException e) {
            throw new InterruptException("Received interrupt while awaiting " + operation, e);
        }
    }
enqueueRequest(handler);
return handler.result;

TransactionalRequestResult initTransactionResult = this.stateToTransactionRequestResult.get(State.INITIALIZING);

This comment has been minimized.

Copy link
@hachikuji

hachikuji Jan 3, 2019

Contributor

If we get rid of the stateToTransactionRequestResult map and just use a single field to track the current pending operation, then this would become a simple check on the current state.

This comment has been minimized.

Copy link
@huxihx

huxihx Jan 3, 2019

Author Contributor

Thanks for the comments. If a single field is employed, nulling out that field seems needing to be placed in TransactionManager.await** which will not be called by TransactionManagerTest at all. That means, we have to create a clear function transactionManager.clearCache() and call it in every method in TransactionManagerTest. Does it make sense?

This comment has been minimized.

Copy link
@hachikuji

hachikuji Jan 5, 2019

Contributor

@huxihx I'm not 100% sure I follow. Here is a quick sketch of what I had in mind: hachikuji@b336403. I'm not sure we need any changes to reset the pending operation, at least not for the purpose of the test case. Probably it makes sense to clear it after a state change though.

@huxihx huxihx force-pushed the huxihx:KAFKA-7763 branch from 1c6e0d2 Jan 7, 2019
Copy link
Contributor

hachikuji left a comment

Thanks, left a few more comments. I think we're getting close.

clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java Outdated
@@ -681,6 +669,11 @@ public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offs
* errors, this method will throw the last received exception immediately and the transaction will not be committed.
* So all {@link #send(ProducerRecord)} calls in a transaction must succeed in order for this method to succeed.
*
* Note that this method will raise {@link TimeoutException} if the transaction cannot be committed before expiration
* of {@code max.block.ms}. Additionally, it will raise {@link InterruptException} if interrupted.
* It is safe to retry in either case, but once the transaction has been successfully committed,

This comment has been minimized.

Copy link
@hachikuji

hachikuji Jan 8, 2019

Contributor

Minor suggestion:

It is safe to retry in either case, but it is not possible to attempt a different operation (such as abortTransaction) since the commit may already be in the progress of completing. If not retrying, the only option is to close the producer.

clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java Outdated
@@ -1149,6 +1176,7 @@ public void handleResponse(AbstractResponse response) {
private final EndTxnRequest.Builder builder;

private EndTxnHandler(EndTxnRequest.Builder builder) {
super("EndTxn");

This comment has been minimized.

Copy link
@hachikuji

hachikuji Jan 8, 2019

Contributor

Perhaps we can use "EndTxn(" + builder.result() + ")"?

clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java Outdated
@@ -1139,7 +1166,7 @@ public void handleResponse(AbstractResponse response) {
abortableError(new GroupAuthorizationException(builder.coordinatorKey()));
} else {
fatalError(new KafkaException(String.format("Could not find a coordinator with type %s with key %s due to" +
"unexpected error: %s", builder.coordinatorType(), builder.coordinatorKey(),
"unexpected error: %s", builder.coordinatorType(), builder.coordinatorKey(),

This comment has been minimized.

Copy link
@hachikuji

hachikuji Jan 8, 2019

Contributor

nit: this is misaligned with the line below.

clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java Outdated
ensureTransactional();

// null it out if the cached result was already completed
if (transactionalRequestResult != null && transactionalRequestResult.isCompleted()) {

This comment has been minimized.

Copy link
@hachikuji

hachikuji Jan 8, 2019

Contributor

We need to make sure that the user observes the completion. If the state is correct, we should return the current TransactionalRequestResult and set the cached value to null. I would suggest we get rid of this check and do something like this:

if (transactionalRequestResult != null) {
  if (currentState != targetState)
    throw new IllegalStateException()

  TransactionalRequestResult result = transactionalRequestResult;
  if (result.isCompleted())
    transactionalRequestResult = null;
  return result;
}

Does that make sense?

This comment has been minimized.

Copy link
@huxihx

huxihx Jan 8, 2019

Author Contributor

@hachikuji Thanks for all the comments. If the code does not retry, the previous failed operation leaves the cached result completed but also illegal to the next transaction operation. In this case, an IllegalStateException is thrown. So I think it's a good idea to clear the result at the very beginning of the method.

This comment has been minimized.

Copy link
@hachikuji

hachikuji Jan 8, 2019

Contributor

If the user does not observe the completion, then how will they know it completed successfully? Let me give a specific scenario. Let's say the user calls commitTransaction() and it times out. Just after timing out, the operation returns successfully. Now what happens when the user calls commitTransaction()? What should happen is that we notice that the pending operation has completed and we return immediately.

clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java Outdated
@@ -99,6 +100,7 @@
private final Set<TopicPartition> pendingPartitionsInTransaction;
private final Set<TopicPartition> partitionsInTransaction;
private final Map<TopicPartition, CommittedOffset> pendingTxnOffsetCommits;
private TransactionalRequestResult transactionalRequestResult;

This comment has been minimized.

Copy link
@hachikuji

hachikuji Jan 8, 2019

Contributor

Maybe pendingResult would be a better name?

Copy link
Contributor

hachikuji left a comment

The updated logic looks good. Left a couple small comments. It would be helpful to have unit tests (i.e. in TransactionManagerTest) which cover the retry logic. Basically the following cases:

  1. Retry commit is safe
  2. Retry abort is safe
  3. Retry a different operation is not permitted
core/src/test/scala/integration/kafka/api/TransactionsTest.scala Outdated
} finally {
producer.close()
restartDeadBrokers() // bring them back

This comment has been minimized.

Copy link
@hachikuji

hachikuji Jan 11, 2019

Contributor

Is this needed?

@huxihx

This comment has been minimized.

Copy link
Contributor Author

huxihx commented Jan 14, 2019

retest this please

@hachikuji

This comment has been minimized.

Copy link
Contributor

hachikuji commented Jan 15, 2019

@huxihx Thanks, the updates looks good. I think we may also need to update the streams code to catch TimeoutException. cc @guozhangwang

@huxihx

This comment has been minimized.

Copy link
Contributor Author

huxihx commented Jan 21, 2019

@guozhangwang please take a review on this patch. Thanks.

@huxihx

This comment has been minimized.

Copy link
Contributor Author

huxihx commented Jan 28, 2019

@guozhangwang Could you please take some time to review the change? Thanks:)

Copy link
Contributor

guozhangwang left a comment

@huxihx sorry for the late review. The changes lgtm.

Streams need to make corresponding changes to handle TimeoutExceptions from commit/abortTxn calls, but that would be done in a separate PR.

@hachikuji

This comment has been minimized.

Copy link
Contributor

hachikuji commented Feb 17, 2019

retest this please

@hachikuji

This comment has been minimized.

Copy link
Contributor

hachikuji commented Feb 17, 2019

@guozhangwang shall we file a JIRA to make sure we follow up?

@hachikuji

This comment has been minimized.

Copy link
Contributor

hachikuji commented Feb 17, 2019

retest this please

1 similar comment
@huxihx

This comment has been minimized.

Copy link
Contributor Author

huxihx commented Feb 18, 2019

retest this please

streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java Outdated
@@ -466,7 +467,7 @@ void commit(final boolean startNewTransaction) {
} else {
consumer.commitSync(consumedOffsetsAndMetadata);
}
} catch (final CommitFailedException | ProducerFencedException error) {
} catch (final CommitFailedException | ProducerFencedException | TimeoutException error) {

This comment has been minimized.

Copy link
@mjsax

mjsax Feb 19, 2019

Member

Not sure if it makes sense to convert a TimeoutException into a TaskMigrated exception... Also note that we created https://issues.apache.org/jira/browse/KAFKA-7932, so it might be best to tackle all Streams related changes there? \cc @vvcephei

This comment has been minimized.

Copy link
@huxihx

huxihx Feb 20, 2019

Author Contributor

@mjsax Do you mean we have TimeoutException propagate all the way up the stack and have KAFKA-7932 handle it?

This comment has been minimized.

Copy link
@hachikuji

hachikuji Feb 20, 2019

Contributor

Yeah, I agree that would be better.

This comment has been minimized.

Copy link
@guozhangwang

guozhangwang Feb 20, 2019

Contributor

@huxihx I'd suggest we remove any Streams' related changes in this PR, since we've already created a separate JIRA (above) for tracking any follow-up works on the streams side, and @vvcephei is working on that already.

This comment has been minimized.

Copy link
@vvcephei

vvcephei Feb 20, 2019

Contributor

Yes, FWIW, I agree. It would be better to just retry the operation that timed out than to kick off a rebalance.

I'll make sure there's a reference to this discussion in the ticket.

huxihx added 10 commits Dec 26, 2018
…dless when brokers are down.

Currently, commitTransaction and abortTransaction invoke Latch#await to wait for the request completion. When brokers are not available, these calls await forever and producer never ends. This fix replaces it with the timed equivalent await(timeout, unit). When timeout occurred, producer throws a TimeoutException.
@huxihx huxihx force-pushed the huxihx:KAFKA-7763 branch to 0fc8d51 Feb 20, 2019
@hachikuji

This comment has been minimized.

Copy link
Contributor

hachikuji commented Feb 21, 2019

Thanks @huxihx. Merging to trunk.

@hachikuji hachikuji merged commit 201da05 into apache:trunk Feb 21, 2019
1 of 2 checks passed
1 of 2 checks passed
JDK 8 and Scala 2.11 FAILURE 10577 tests run, 69 skipped, 1 failed.
Details
JDK 11 and Scala 2.12 SUCCESS 10577 tests run, 69 skipped, 0 failed.
Details
Pengxiaolong added a commit to Pengxiaolong/kafka that referenced this pull request Jun 14, 2019
…t block indefinitely (apache#6066)

Currently, commitTransaction and abortTransaction wait indefinitely for the respective operation to be completed. This patch uses the producer's max block time to limit the time that we will wait. If the timeout elapses, we raise a TimeoutException, which allows the user to either close the producer or retry the operation.

Reviewers: Guozhang Wang <wangguoz@gmail.com>, Jason Gustafson <jason@confluent.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
5 participants
You can’t perform that action at this time.