-
Notifications
You must be signed in to change notification settings - Fork 14k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-5364: Don't fail producer if drained partition is not yet in transaction #3202
KAFKA-5364: Don't fail producer if drained partition is not yet in transaction #3202
Conversation
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
@@ -428,6 +433,12 @@ synchronized boolean isReady() { | |||
return isTransactional() && currentState == State.READY; | |||
} | |||
|
|||
private synchronized boolean wouldTransactionContainPartition(TopicPartition tp) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: this name is a bit awkward. Could we split this into two checks isPartitionAdded
and isPartitionPending
or something like that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit comment, otherwise LGTM.
@@ -428,6 +433,12 @@ synchronized boolean isReady() { | |||
return isTransactional() && currentState == State.READY; | |||
} | |||
|
|||
private synchronized boolean wouldTransactionContainPartition(TopicPartition tp) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: rename to currentTransactionContainsPartition
?
…ons which would be added to the transaction.
7dd2f7f
to
a538915
Compare
@guozhangwang @hachikuji addressed comments on names. Went with Jason's suggestion to split the function into two -- we actually had one half of it anyway. |
transitionToFatalError(new IllegalStateException("Attempted to dequeue a record batch to send " + | ||
"for partition " + tp + ", which hasn't been added to the transaction yet")); | ||
return false; | ||
if (isInTransaction() || hasError()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One quick question: what is the expected behavior when we are in a fatal error state? It seems we'd want to ensure that this function always returns false?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a good point. In general, I guess there is no point retrying produce requests if you are fenced or have lost your permissions. And in a fatal error, we have to close and restart the producer, resetting all the sequence numbers. So we should always return false if the error is fatal.
Will update.
…ucer is in a fatal error or abortable error state
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The fix LGTM. Left a couple test suggestions. Hopefully shouldn't be too different from the tests you already have.
// We shouldn't drain batches which haven't been added to the transaction yet. | ||
assertTrue(drainedBatches.containsKey(node1.id())); | ||
assertTrue(drainedBatches.get(node1.id()).isEmpty()); | ||
assertTrue(transactionManager.hasError()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Guess we could make this a little stronger with a hasFatalError
?
@@ -234,10 +234,17 @@ public RuntimeException lastError() { | |||
} | |||
|
|||
public synchronized boolean ensurePartitionAdded(TopicPartition tp) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it be too much trouble to add a couple basic unit tests just for this function?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added special tests for this is overkill since it depends so much on the transaction manager state. I have instead added assertions about the expected return values of this function in various existing tests, which achieves the same purpose.
} | ||
|
||
@Test | ||
public void testRaiseErrorWhenNoPartitionsPendingOnDrain() throws InterruptedException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps we should also cover the case where the transaction is in an abortable error state?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added this case now.
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the updates. LGTM
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
…ansaction Due to the async nature of the producer, it is possible to attempt to drain a messages whose partition hasn't been added to the transaction yet. Before this patch, we considered this a fatal error. However, it is only in error if the partition isn't in the queue to be sent to the coordinator. This patch updates the logic so that we only fail the producer if the partition would never be added to the transaction. If the partition of the batch is yet to be added, we will simply wait for the partition to be added to the transaction before sending the batch to the broker. Author: Apurva Mehta <apurva@confluent.io> Reviewers: Guozhang Wang <wangguoz@gmail.com>, Jason Gustafson <jason@confluent.io> Closes #3202 from apurvam/KAFKA-5364-ensure-partitions-added-to-txn-before-send (cherry picked from commit 673ab67) Signed-off-by: Jason Gustafson <jason@confluent.io>
Due to the async nature of the producer, it is possible to attempt to drain a messages whose partition hasn't been added to the transaction yet. Before this patch, we considered this a fatal error. However, it is only in error if the partition isn't in the queue to be sent to the coordinator.
This patch updates the logic so that we only fail the producer if the partition would never be added to the transaction. If the partition of the batch is yet to be added, we will simply wait for the partition to be added to the transaction before sending the batch to the broker.