-
Notifications
You must be signed in to change notification settings - Fork 14k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-5251: Producer should cancel unsent AddPartitions and Produce requests on abort #3161
Conversation
@apurvam Please review when you have a chance. I don't think this is a blocker. If we receive an abortable error in AddPartitions, we don't add the partitions back to the pending set. And since they were already moved from the new partitions set, we won't add them back to the queue in |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
return false; | ||
} | ||
String transactionalId = transactionManager.transactionalId(); | ||
if (transactionManager.isCompletingTransaction() && !transactionManager.hasPartitionsToAdd() && accumulator.hasUnflushedBatches()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that this is a bug fix. The call to nextRequestHandler
may complete the transaction without sending an EndTxnRequest. In that case, there would still be queued produce requests which will probably fall into the next transaction.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do not fully understand your comment here.. Is that actually fixing the original line 308?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
EDIT: nvm, I think I understand it now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM overall. ping @apurvam to take another look as I'm not as familiar on producer client end.
return false; | ||
} | ||
String transactionalId = transactionManager.transactionalId(); | ||
if (transactionManager.isCompletingTransaction() && !transactionManager.hasPartitionsToAdd() && accumulator.hasUnflushedBatches()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do not fully understand your comment here.. Is that actually fixing the original line 308?
return false; | ||
} | ||
String transactionalId = transactionManager.transactionalId(); | ||
if (transactionManager.isCompletingTransaction() && !transactionManager.hasPartitionsToAdd() && accumulator.hasUnflushedBatches()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
EDIT: nvm, I think I understand it now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Left a couple of minor comments.
|
||
if (nextRequestHandler.isEndTxn() && transactionManager.isCompletingTransaction() && accumulator.hasUnflushedBatches()) { | ||
if (!accumulator.flushInProgress()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A comment to the effect of 'There may be batches in the queue which were previously sent and are in retry. We have to flush those to keep the sequence numbers on the client and servers in sync'.
sender.run(time.milliseconds()); // Resend ProduceRequest | ||
sender.run(time.milliseconds()); // Send EndTxn | ||
|
||
assertTrue(abortResult.isCompleted()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably worth checking that the responseFuture succeeded here as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually we do this below.
Tests passing locally. Merging to trunk and 0.11.0. |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
…equests on abort Author: Jason Gustafson <jason@confluent.io> Reviewers: Apurva Mehta <apurva@confluent.io>, Guozhang Wang <wangguoz@gmail.com> Closes #3161 from hachikuji/KAFKA-5251 (cherry picked from commit d41cf1b) Signed-off-by: Jason Gustafson <jason@confluent.io>
Refer to this link for build results (access rights to CI server needed): |
No description provided.