Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-5132: abort long running transactions #2957

Closed
wants to merge 13 commits into from
Closed

Conversation

dguy
Copy link
Contributor

@dguy dguy commented May 2, 2017

Abort any ongoing transactions that haven't been touched for longer than the transaction timeout

@dguy
Copy link
Contributor Author

dguy commented May 2, 2017

@guozhangwang @apurvam @hachikuji @mjsax should the expiry time be based on the transaction start time or the last updated time?

Also, in the Google doc it says:

If its status is PREPARE_COMMIT, then complete the committing process of the transaction.
If its status is PREPARE_ABORT, then complete the aborting process of the transaction.

Though i'm not sure why we'd need to do this. If the transaction has made it either of those statuses then it is going to complete anyway.

@apurvam
Copy link
Contributor

apurvam commented May 2, 2017

The expiry time is based on the start time. I think we need to add that field to the messages on the transaction log and set it on the first add partitions. From then on, we use that time to determine if the transaction needs to be timed out.

Regarding your second comment, you are correct. If the transaction is in PREPARE_XXX state, it should be rolled forward or rolled back. I think the point of that passage is that we should not force abort it if it is already rolling forward and it has hit the timeout.

@asfbot
Copy link

asfbot commented May 2, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/3376/
Test FAILed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented May 2, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/3367/
Test FAILed (JDK 8 and Scala 2.12).

@asfbot
Copy link

asfbot commented May 2, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/3370/
Test FAILed (JDK 7 and Scala 2.10).

@apurvam
Copy link
Contributor

apurvam commented May 2, 2017

Hmm. there might be a resource leak somewhere, because all the tests are failing with an OutOfMemoryException.

@asfbot
Copy link

asfbot commented May 3, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/3405/
Test FAILed (JDK 8 and Scala 2.12).

@asfbot
Copy link

asfbot commented May 3, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/3408/
Test FAILed (JDK 7 and Scala 2.10).

@dguy
Copy link
Contributor Author

dguy commented May 3, 2017

I don't understand the build failure. I've ran the jenkins build locally and it is all good.

@asfbot
Copy link

asfbot commented May 3, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/3414/
Test FAILed (JDK 8 and Scala 2.11).

@dguy
Copy link
Contributor Author

dguy commented May 3, 2017

It is leaking threads. Looking into it.

@asfbot
Copy link

asfbot commented May 3, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/3422/
Test PASSed (JDK 7 and Scala 2.10).

@asfbot
Copy link

asfbot commented May 3, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/3419/
Test PASSed (JDK 8 and Scala 2.12).

@asfbot
Copy link

asfbot commented May 3, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/3428/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented May 3, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/3443/
Test FAILed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented May 3, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/3437/
Test FAILed (JDK 7 and Scala 2.10).

@asfbot
Copy link

asfbot commented May 3, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/3434/
Test PASSed (JDK 8 and Scala 2.12).

Copy link
Contributor

@apurvam apurvam left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me overall. My biggest comment is about the 'two phase' operation of bumping the epoch and then transitioning to PREPARE_ABORT when a transaction needs to be expired. I think we can just initiate PREPARE_ABORT with the higher epoch. Please correct me if I am wrong!

Also, out of curiosity, all the shutdown and close are in response to the resource leaks that were exposed on Jenkins. But a lot of these resources seemed to be leaking anyway, independently of these changes (there was no networkClient.close, no transactionCoordinator.shutdown(), etc.). This suggests that the tests were already at tipping point, and this new code pushed it over. Is my assessment correct?

idAndMetadata.metadata.producerEpoch,
TransactionResult.ABORT,
(errors: Errors) => {
warn(s"rollback of transactionalId: ${idAndMetadata.transactionalId} failed during transaction expiry. errors: $errors")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shoudn't this only be logged if there is an error?

idAndMetadata.metadata.prepareTransitionTo(Ongoing)
txnManager.appendTransactionToLog(idAndMetadata.transactionalId, idAndMetadata.metadata, (errors: Errors) => {
if (errors != Errors.NONE)
// TODO: Is this sufficient? It will be retried later if it failed
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is sufficient. What else can you do on a background scheduled thread anyway?

if (!txnManager.isCoordinatorLoadingInProgress(idAndMetadata.transactionalId)) {
idAndMetadata.metadata.producerEpoch = (idAndMetadata.metadata.producerEpoch + 1).toShort
idAndMetadata.metadata.prepareTransitionTo(Ongoing)
txnManager.appendTransactionToLog(idAndMetadata.transactionalId, idAndMetadata.metadata, (errors: Errors) => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why can't we initiate an ABORT directly with the bumped epoch? We have to write the PREPARE anyway. We might as well do it with the the higher epoch.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking about this a bit more, I can see the motivation for two phase approach: you will categorically fence off any existing producer while maintaining the Ongoing state. You can then safely begin the abort.

However, even with the two phase approach, won't you still have a race condition where the existing producer could do a PREPARE_COMMIT which gets in before your epoch bump?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes the idea is to fence off any existing producer.
As for the race condition - yes i should check that there is no pending state transaction before bumping the epoch etc. Thanks for pointing out

scheduler.startup()

if (!scheduler.isStarted)
scheduler.startup()
// TODO: add transaction and pid expiration logic
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this comment still valid?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

half valid ;-)

@dguy
Copy link
Contributor Author

dguy commented May 4, 2017

@apurvam - yes the resources were leaking anyway, but this pushed it beyond the limit.

@asfbot
Copy link

asfbot commented May 4, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/3481/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented May 4, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/3475/
Test FAILed (JDK 7 and Scala 2.10).

@asfbot
Copy link

asfbot commented May 4, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/3472/
Test FAILed (JDK 8 and Scala 2.12).

@dguy
Copy link
Contributor Author

dguy commented May 4, 2017

retest this please

Copy link
Contributor

@apurvam apurvam left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good. Left one comment about my understanding of the assumptions around synchronization. If those are correct, I think this is pretty solid.

&& idAndMetadata.metadata.pendingState.isEmpty) {
idAndMetadata.metadata.producerEpoch = (idAndMetadata.metadata.producerEpoch + 1).toShort
idAndMetadata.metadata.prepareTransitionTo(Ongoing)
txnManager.appendTransactionToLog(idAndMetadata.transactionalId, idAndMetadata.metadata, (errors: Errors) => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks good. For my edification: this txnManager.appendTransactionToLog will happen under the idAndMetadata.metadata lock, which is a shared object. so any futher request from the producer with this transactional id will be blocked until after the epoch is bumped, and hence will get a ProducerFencedException, correct?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@apurvam correct they will get a ProducerFencedException.

@asfbot
Copy link

asfbot commented May 6, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/3578/
Test FAILed (JDK 8 and Scala 2.12).

@asfbot
Copy link

asfbot commented May 6, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/3582/
Test FAILed (JDK 7 and Scala 2.10).

@asfbot
Copy link

asfbot commented May 6, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/3588/
Test FAILed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented May 10, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/3709/
Test FAILed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented May 10, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/3699/
Test FAILed (JDK 8 and Scala 2.12).

@asfbot
Copy link

asfbot commented May 10, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/3703/
Test FAILed (JDK 7 and Scala 2.10).

if (!txnManager.isCoordinatorLoadingInProgress(idAndMetadata.transactionalId)
&& idAndMetadata.metadata.pendingState.isEmpty) {
idAndMetadata.metadata.producerEpoch = (idAndMetadata.metadata.producerEpoch + 1).toShort
idAndMetadata.metadata.prepareTransitionTo(Ongoing)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry if this was raised before, but why don't we bump the epoch and transition to PREPARE_ABORT in the same write? If there's an edge case we're protecting by doing the epoch bump first, it might be useful to add a comment to document it.

@guozhangwang
Copy link
Contributor

retest this please.

@asfbot
Copy link

asfbot commented May 12, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/3781/
Test FAILed (JDK 8 and Scala 2.12).

@asfbot
Copy link

asfbot commented May 12, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.11/3793/
Test FAILed (JDK 7 and Scala 2.11).

@guozhangwang
Copy link
Contributor

retest this please

@@ -611,6 +611,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
CoreUtils.swallow(zkUtils.close())
if (metrics != null)
CoreUtils.swallow(metrics.close())
if (transactionCoordinator != null)
CoreUtils.swallow(transactionCoordinator.shutdown())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to be removed as we have already merged a PR that fixes this:

3085d4f

Note that the shutdown needs to happen before the LogManager is shutdown (we had an issue where the GroupCoordinator tried to write to an already closed LogManager in the past).

@asfbot
Copy link

asfbot commented May 12, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.11/3798/
Test PASSed (JDK 7 and Scala 2.11).

@asfbot
Copy link

asfbot commented May 12, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/3786/
Test PASSed (JDK 8 and Scala 2.12).

@asfbot
Copy link

asfbot commented May 12, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/3794/
Test FAILed (JDK 8 and Scala 2.12).

@asfbot
Copy link

asfbot commented May 12, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.11/3802/
Test PASSed (JDK 7 and Scala 2.11).

@dguy
Copy link
Contributor Author

dguy commented May 12, 2017

retest this please

@asfbot
Copy link

asfbot commented May 12, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/3790/
Test PASSed (JDK 8 and Scala 2.12).

@asfbot
Copy link

asfbot commented May 12, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/3799/
Test FAILed (JDK 8 and Scala 2.12).

@asfbot
Copy link

asfbot commented May 12, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.11/3811/
Test PASSed (JDK 7 and Scala 2.11).

Copy link
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Merged to trunk.

@asfgit asfgit closed this in 4951849 May 12, 2017
@dguy dguy deleted the kafka-5132 branch May 16, 2017 14:04
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
6 participants