New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-12870; Flush in progress not cleared after transaction completion #10880
Conversation
Note I have a few additional tests which I will post shortly. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Took a quick look at non-testing part and looks reasonable to me. Will make a thorough pass including the test code soon.
); | ||
} | ||
|
||
private void runUntil(Sender sender, Supplier<Boolean> condition) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I remember we have similar utils in TransactionManagerTest
, could we consolidate them?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it be reasonable to add a runUntil
helper to Sender
for testing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup, I think that's also fine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Decided to pull out a test util instead. Hopefully that is ok.
…on (#10880) We had been using `RecordAccumulator.beginFlush` in order to force the `RecordAccumulator` to flush pending batches when a transaction was being completed. Internally, `RecordAccumulator` has a simple counter for the number of flushes in progress. The count gets incremented in `beginFlush` and it is expected to be decremented by `awaitFlushCompletion`. The second call to decrement the counter never happened in the transactional path, so the counter could get stuck at a positive value, which means that the linger time would effectively be ignored. This patch fixes the problem by removing the use of `beginFlush` in `Sender`. Instead, we now add an additional condition in `RecordAccumulator` to explicitly check when a transaction is being completed. Reviewers: Guozhang Wang <wangguoz@gmail.com>
…on (apache#10880) We had been using `RecordAccumulator.beginFlush` in order to force the `RecordAccumulator` to flush pending batches when a transaction was being completed. Internally, `RecordAccumulator` has a simple counter for the number of flushes in progress. The count gets incremented in `beginFlush` and it is expected to be decremented by `awaitFlushCompletion`. The second call to decrement the counter never happened in the transactional path, so the counter could get stuck at a positive value, which means that the linger time would effectively be ignored. This patch fixes the problem by removing the use of `beginFlush` in `Sender`. Instead, we now add an additional condition in `RecordAccumulator` to explicitly check when a transaction is being completed. Reviewers: Guozhang Wang <wangguoz@gmail.com>
We had been using
RecordAccumulator.beginFlush
in order to force theRecordAccumulator
to flush pending batches when a transaction was being completed. Internally,RecordAccumulator
has a simple counter for the number of flushes in progress. The count gets incremented inbeginFlush
and it is expected to be decremented byawaitFlushCompletion
. The second call to decrement the counter never happened in the transactional path, so the counter could get stuck at a positive value, which means that the linger time would effectively be ignored.The patch here fixes the problem by removing the use of
beginFlush
inSender
. Instead, we now add an additional condition inRecordAccumulator
to explicitly check when a transaction is being completed.Committer Checklist (excluded from commit message)