-
Notifications
You must be signed in to change notification settings - Fork 13.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-13630: reduce amount of time that producer network thread holds batch queue lock #11722
KAFKA-13630: reduce amount of time that producer network thread holds batch queue lock #11722
Conversation
… batch queue lock
Thanks for the PR. Do you have some results to share regarding the impact this optimization has? |
This application has a single producer thread, a high |
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
Show resolved
Hide resolved
batch.setProducerState(producerIdAndEpoch, transactionManager.sequenceNumber(batch.topicPartition), isTransactional); | ||
transactionManager.incrementSequenceNumber(batch.topicPartition, batch.recordCount); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the expectation is that these 2 would be atomic (i.e. would be bad if one thread executed 615, then another thread executed 615 again and got the same sequence number, before the first thread got a chance to execute 616).
Also I think the expectation is that batches that are ordered one after another in the queue would get the sequence numbers in the same order (i.e. that batch that is later in the queue would get higher sequence number).
Previously these expectations were protected by the queue lock so "poll", "get sequence", "update sequence" would execute as atomic block, with this change the operations could interleave.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, this makes sense. I agree it makes sense to push this back into the synchronized block; the more important part for reducing lock duration is to move the close()
outside of the block. Would you agree with this approach?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've addressed this in 34008bf.
Below is flamegraph that getDeque()
and close()
consume the CPU in the drain()
tree. I believe with both of these performed outside of the lock we are still in a good spot here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moving close
outside of locked scope LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you please test with Java 11 or newer? Looks like you tested with Java 8 which uses the slower crc32c method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct, this application is jdk8. I'll have to find a jdk11 app to compare, it could take a while as this app is not ready for it. Is a jdk11 test a blocker?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, not a blocker.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR, a couple of comments.
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jasonk000 , thanks for the PR. Left a comment.
// the rest of the work by processing outside the lock | ||
// close() is particularly expensive | ||
|
||
batch.close(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we move the batch.close()
out of synchronized scope, is it possible that other thread might take the batch as "alive" and do other operation before we close it completely?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like it won't happen since we only lock on deque object, but just want to confirm, to make sure it won't break anything.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we lock on deque, the batch is either "in" the deque and append/etc will work on it in synchronized block, or it is removed from deque, and will no longer be available for other threads to do any work on it. From my reading, Sender only collects batches via the drain and expiry path, both of which acquire the lock before removing the element from the deque. This implies that the removal from batch is atomic, whether that is drained or expired out, it can only be one. The Sender re-enqueue works on batches that were previously drained as well. Nothing stands out at me as a possible area for leakage but I'm happy to be corrected and more eyes is always better for this sort of thing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make sense, and that's also what I've seen. Thanks for confirmation!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
hi @showuon , was there anything remaining here before we merge it? thx |
@ijuma @artemlivshits , do you want to have another look at this PR? Thanks. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks!
…work thread holds batch queue lock (apache#11722)
…work thread holds batch queue lock (apache#11722) (#391) TICKET = LIKAFKA-46436 LI_DESCRIPTION = This cherry picks from apache#11722 Hold the deque lock for only as long as is required to collect and make a decision in ready() and drain() loops. Once this is done, remaining work can be done without lock, so release it. This allows producers to continue appending. EXIT_CRITERIA = N/A
Hold the
deque
lock for only as long as is required to collect and make a decision inready()
anddrain()
loops. Once this is done, remaining work can be done without lock, so release it. This allows producers to continue appending.Committer Checklist (excluded from commit message)