Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-5364: ensurePartitionAdded does not handle pending partitions in abortable error state #3231

Closed
wants to merge 8 commits into from

Conversation

hachikuji
Copy link

No description provided.

@@ -278,7 +282,7 @@ public synchronized boolean isAborting() {
}

synchronized boolean isInTransaction() {
return currentState == State.IN_TRANSACTION || isCompletingTransaction();
return currentState == State.IN_TRANSACTION || isCompletingTransaction() || hasAbortableError();
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This bug is a bit insidious. We use isInTransaction when dequeuing records from the RecordAccumulator to determine if the batch should be transactional. So if we continued trying to send after an abortable error, we could lose the transactional flag.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need a comment somewhere?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will add one.

@asfbot
Copy link

asfbot commented Jun 4, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.11/4874/
Test PASSed (JDK 7 and Scala 2.11).

@asfbot
Copy link

asfbot commented Jun 4, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/4858/
Test PASSed (JDK 8 and Scala 2.12).

@hachikuji
Copy link
Author

cc @apurvam

@ijuma Note that this only fixes part of the problem from KAFKA-5364. I'd recommend merging it for the fix and improved test coverage, but we should leave the JIRA open.

@hachikuji
Copy link
Author

@apurvam I noticed one other race condition. When we abort a transaction, we clear the partitions in newPartitionsInTransaction and we abort unsent produce requests. The problem is that the latter happens asynchronously, so we could very well see a batch dequeued for a partition which we had already cleared. I've therefore removed the fatal transition in ensurePartitionAdded in the latest patch.

@asfbot
Copy link

asfbot commented Jun 4, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.11/4896/
Test FAILed (JDK 7 and Scala 2.11).

@asfbot
Copy link

asfbot commented Jun 4, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.11/4898/
Test FAILed (JDK 7 and Scala 2.11).

@asfbot
Copy link

asfbot commented Jun 4, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/4882/
Test FAILed (JDK 8 and Scala 2.12).

@asfbot
Copy link

asfbot commented Jun 5, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.11/4900/
Test PASSed (JDK 7 and Scala 2.11).

@asfbot
Copy link

asfbot commented Jun 5, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/4884/
Test PASSed (JDK 8 and Scala 2.12).

Copy link
Contributor

@apurvam apurvam left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the patch. The cleanup in code and logic is good. The tests are great. LGTM barring some small comments.

}

@Test(expected = IllegalStateException.class)
public void testMaybeAddPartitionTransactionBeforeBeginTransaction() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here and elsewhere in these tests:
nit: typo, missing 'To' in testMaybeAddPartitionToTransactionBeforeBeginTransaction


transactionManager.beginTransaction();
transactionManager.maybeAddPartitionToTransaction(tp0);
sender.run(time.milliseconds());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The sender.run here should be dropped since the intent of this test is to ensure that the send to partition should fail when the partition is pending add.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is testing the case where the AddPartitionsToTxn request is in-flight.. I will add a comment to clarify.


transactionManager.beginTransaction();
transactionManager.maybeAddPartitionToTransaction(tp0);
sender.run(time.milliseconds());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above, this sender.run serves no purpose and is slightly confusing. AT least maybe add a comment to the effect that 'the response to the actual AddPartitionToTxn request is never received even though the sender runs'.

@@ -289,6 +295,14 @@ synchronized void transitionToFatalError(RuntimeException exception) {
transitionTo(State.FATAL_ERROR, exception);
}

synchronized boolean isPartitionAdded(TopicPartition partition) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps add a '// visible for testing' comment to this and the next method? They really only should be called from tests.

Copy link
Contributor

@apurvam apurvam left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the patch, LGTM!

@asfbot
Copy link

asfbot commented Jun 6, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.11/4937/
Test PASSed (JDK 7 and Scala 2.11).

@asfbot
Copy link

asfbot commented Jun 6, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/4921/
Test PASSed (JDK 8 and Scala 2.12).

return currentState == State.IN_TRANSACTION || isCompletingTransaction();
synchronized boolean hasOngoingTransaction() {
// transactions are considered ongoing once started until completion or a fatal error
return currentState == State.IN_TRANSACTION || isCompletingTransaction() || hasAbortableError();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Besides unit test this function is only used in RecordAccumulator#drain, and I'm wondering in that case would we really need to consider hasAbortableError as well?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually we can simplify this since, for the time being, transactional producers can only send data inside of a transaction. However, I think we should keep hasAbortableError. We consider the transaction ongoing as long as the client still has the capability to complete it.

Copy link
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Just one question for my own knowledge.

@asfbot
Copy link

asfbot commented Jun 6, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.11/4953/
Test PASSed (JDK 7 and Scala 2.11).

@asfbot
Copy link

asfbot commented Jun 6, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/4937/
Test PASSed (JDK 8 and Scala 2.12).

Copy link
Contributor

@ijuma ijuma left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR, great to see the exhaustive tests. I didn't do a full review of the impact of the changes on the TransactionManager since @apurvam and @guozhangwang had already done that.

I left some suggestions and comments. If applicable, they can be addressed in a subsequent PR. Merging this to trunk and 0.11.0.

return lastError;
}

public synchronized boolean ensurePartitionAdded(TopicPartition tp) {
public synchronized void failIfUnreadyForSend() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: why not failIfNotReadyForSend? One character longer, but reads a bit better. :)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Funny. I used that name initially and changed it because I thought the current one read nicer. Now I'm reading it again and I'm inclined to change it back.

}
}

synchronized boolean sendToPartitionAllowed(TopicPartition tp) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this would be clearer if called isSendToPartitionAllowed. At first I thought this method was sending to a partition. :)

}

@Test
public void testsendToPartitionAllowedWithAddedPartitionAfterAbortableError() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like there's a capitalisation error here and in subsequent test names. Will fix it before merging.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, thanks. Thought I fixed this, but I guess I missed a few.

@@ -993,7 +1314,6 @@ public void testRaiseErrorWhenNoPartitionsPendingOnDrain() throws InterruptedExc
// We shouldn't drain batches which haven't been added to the transaction yet.
assertTrue(drainedBatches.containsKey(node1.id()));
assertTrue(drainedBatches.get(node1.id()).isEmpty());
assertTrue(transactionManager.hasFatalError());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we be asserting something else about the TransactionManager? Possibly assertFalse(transactionManager.hasFatalError()), but there may be others.

Also, the method name says testRaiseErrorWhen... and there's a comment about result in an error on drain, but I am not seeing any errors. Do they need to be updated?

}

@Test
public void testsendToPartitionAllowedWithUnaddedPartition() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I'd say NotAdded instead of Unadded, one more character but reads better. :)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm.. I think I prefer the current name. Maybe ...WithPartitionNotAdded would be better.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, WithPartitionNotAdded sounds good. This is definitely a nit, but whenever I see something like unadded or unready, it sounds like it was _un_done instead of _not_done. :)

@asfgit asfgit closed this in 3eed194 Jun 6, 2017
asfgit pushed a commit that referenced this pull request Jun 6, 2017
…n abortable error state

Author: Jason Gustafson <jason@confluent.io>

Reviewers: Apurva Mehta <apurva@confluent.io>, Guozhang Wang <wangguoz@gmail.com>, Ismael Juma <ismael@juma.me.uk>

Closes #3231 from hachikuji/KAFKA-5364
asfgit pushed a commit that referenced this pull request Jun 6, 2017
…n abortable error state

Author: Jason Gustafson <jason@confluent.io>

Reviewers: Apurva Mehta <apurva@confluent.io>, Guozhang Wang <wangguoz@gmail.com>, Ismael Juma <ismael@juma.me.uk>

Closes #3231 from hachikuji/KAFKA-5364
@ijuma
Copy link
Contributor

ijuma commented Jun 6, 2017

I left the JIRA open as per @hachikuji's earlier comment in the PR. It may be that subsequent commits addressed the remaining issues. In that case, please close the JIRA.

hachikuji pushed a commit to hachikuji/kafka that referenced this pull request Jun 7, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants