-
Notifications
You must be signed in to change notification settings - Fork 336
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Optimize batch index ACK performance #988
Optimize batch index ACK performance #988
Conversation
Performance test setupRun a Pulsar 2.11.0 standalone locally with Build the cd perf
go build Run the consumer in terminal 1: ./perf consume --profile \
-s sub \
--enable-batch-index-ack \
my-topic Run the producer in terminal, the throughput is about 20 MB/s: ./perf produce \
--batching-num-messages=10 \
--size 105 \
--rate 200000 \
my-topic Test resultBefore this patch
The consumer can not catch up the producer even if the produce rate is only 20 MB/s. So it fell back to catch up read and the broker reads messages directly with BK. After this patch
Flame GraphBefore this patch: We can see After this patch: We can see the ConclusionFrom my observation, sometimes the backlog was still increasing slowly, once it became the catch up read, the performance could downgrade fast in my local env. But it's still a big improvement and can be improved by adjusting the ACK grouping options. |
### Motivation Currently, when `EnableBatchIndexAck` is true, the ACK performance is very poor. There are two main reasons: 1. Acknowledgment by list is not supported. It means that even N MessageIDs are grouped, there are still N ACK requests to send. 2. The implementation of ACK grouping tracker is wrong. Give a batch that has N messages, when batch index ACK is enabled, each MessageID is cached. However, after all these N MessageIDs arrived, the current implementation does not clear them. ### Modifications - Add a `func(id []*pb.MessageIdData)` to the ACK grouping tracker. When flushing individual ACKs, construct the slice and wrap the slice to `CommandAck` directly. - Refactor the implementation of the ACK grouping tracker: - Do not save each MessageID instance, instead, save the ledger id and the entry id as the key of `pendingAcks`. - Release the mutex before calling ACK functions - Add `TestTrackerPendingAcks` to verify the list of MessageIDs to ACK. After this change, the ACK order cannot be guaranteed, sort the acknowledged MessageIDs in the `ack_grouping_tracker_test.go`.
0e77afd
to
a8289cf
Compare
Motivation
Currently, when
EnableBatchIndexAck
is true, the ACK performance is very poor. There are two main reasons:Modifications
func(id []*pb.MessageIdData)
to the ACK grouping tracker. When flushing individual ACKs, construct the slice and wrap the slice toCommandAck
directly.pendingAcks
: Cache the non-batched MessageIDspendingBatchAcks
: Cache the batched MessageIDs, once all messages in a batch have been added, remove them from it and add the non-batched MessageID topendingAcks
TestBatchIndexAckAllMessages
to verify the new behavior.After this change, the ACK order cannot be guaranteed, sort the acknowledged MessageIDs in the
ack_grouping_tracker_test.go
.