Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support grouping ACK requests by time and size #957

Merged
merged 4 commits into from
Feb 22, 2023

Conversation

BewareMyPower
Copy link
Contributor

Fixes #949

Motivation

Currently the Go client does not support grouping ACK requests, so each time Ack (or similar APIs) is called, a ACK request will be sent, which could downgrade the performance. We need to support configuring the time and size to cache MessageID before sending ACK requests.

Modifications

  • Add an AckGroupingOptions field to ConsumerOptions, when it's nil, use 100ms as the max time and 1000 as the max size.
  • Add an ackGroupingTracker interface to support grouping ACK requests.
  • When AckWithResponse is false, adding the MessageID instance to the tracker instead of sending the requests to eventsCh.

Verifying this change

  • Make sure that the change passes the CI checks.

This change added tests and can be verified as follows:

  • Added ack_grouping_tracker_test.go to verify ackGroupingTracker in various cases
  • The consumer side change can be covered by existing tests because the default AckGroupingOptions config is { MaxSize: 1000, MaxTime: 100*time.Millisecond }.

Does this pull request potentially affect one of the following parts:

If yes was chosen, please highlight the changes

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API: (yes)
  • The schema: (yes / no / don't know)
  • The default values of configurations: (yes / no)
  • The wire protocol: (yes / no)

Documentation

  • Does this pull request introduce a new feature? (yes)
  • If yes, how is the feature documented? (GoDocs)
  • If a feature is not applicable for documentation, explain why?
  • If a feature is not documented yet in this PR, please create a followup issue for adding the documentation

Fixes apache#949

### Motivation

Currently the Go client does not support grouping ACK requests, so each
time `Ack` (or similar APIs) is called, a ACK request will be sent,
which could downgrade the performance. We need to support configuring
the time and size to cache `MessageID` before sending ACK requests.

### Modifications
- Add an `AckGroupingOptions` field to `ConsumerOptions`, when it's nil,
  use 100ms as the max time and 1000 as the max size.
- Add an `ackGroupingTracker` interface to support grouping ACK requests
- When `AckWithResponse` is false, adding the `MessageID` instance to
  the tracker instead of sending the requests to `eventsCh`.

### Verifying this change

- [ ] Make sure that the change passes the CI checks.

This change added tests and can be verified as follows:
  - Added `ack_grouping_tracker_test.go` to verify `ackGroupingTracker`
    in various cases
  - The consumer side change can be covered by existing tests because
    the default `AckGroupingOptions` config is
    `{ MaxSize: 1000, MaxTime: 100*time.Millisecond }`.
@BewareMyPower BewareMyPower marked this pull request as ready for review February 16, 2023 09:01
pulsar/ack_grouping_tracker.go Outdated Show resolved Hide resolved
pulsar/ack_grouping_tracker.go Show resolved Hide resolved
@shibd shibd merged commit 7d257b0 into apache:master Feb 22, 2023
@panszobe
Copy link
Contributor

Does Community has some Performance Test Data?

I used the latest version(v0.9.1-0.20230222003822-7d257b01b121) to test today, still found that which Test Case by EnableBatchIndexAcknowledgment=true consumed messages much more slower than EnableBatchIndexAcknowledgment=false, and could not catch up with the producer‘s production.
As consuming from the same topic, consumers of JAVA Client by EnableBatchIndexAcknowledgment=true consumed messages smoothly, and backlog was not enlarged.

I think that Support grouping ACK feature is OK, but maybe EnableBatchIndexAcknowledgment feature have much room for improvement.

@shibd
Copy link
Member

shibd commented Feb 22, 2023

Does Community has some Performance Test Data?

I used the latest version(v0.9.1-0.20230222003822-7d257b01b121) to test today, still found that which Test Case by EnableBatchIndexAcknowledgment=true consumed messages much more slower than EnableBatchIndexAcknowledgment=false, and could not catch up with the producer‘s production. As consuming from the same topic, consumers of JAVA Client by EnableBatchIndexAcknowledgment=true consumed messages smoothly, and backlog was not enlarged.

I think that Support grouping ACK feature is OK, but maybe EnableBatchIndexAcknowledgment feature have much room for improvement.

Maybe we should set ackIndividualCh and ackCumulativeCh size > 0. /cc @BewareMyPower

@BewareMyPower
Copy link
Contributor Author

BewareMyPower commented Feb 22, 2023

Maybe we should set ackIndividualCh and ackCumulativeCh size > 0

@shibd What's the benefit of setting this channel size greater than 0? I think we don't need to cache these MessageIDs again in the channel. They are already cached in cachedAcks.

Does Community has some Performance Test Data?

Currently not. Could you provide more information to diagnosis? For example, the prometheues metrics ProcessingTime records the processing time between a message is received and acknowledged. You can also adjust the AckGroupingOption to see if there are some improvements, e.g. increasing the MaxTime or MaxSize. A flamegraph pprof is also good for analysis. @panszobe

@shibd
Copy link
Member

shibd commented Feb 22, 2023

@shibd What's the benefit of setting this channel size greater than 0? I think we don't need to cache these MessageIDs again in the channel. They are already cached in cachedAcks.

Just guessing. For example, if the channel has buffers while the thread is flushing data, then the add operation will not be blocked.

@panszobe
Copy link
Contributor

Does Community has some Performance Test Data?

Currently not. Could you provide more information to diagnosis? For example, the prometheues metrics ProcessingTime records the processing time between a message is received and acknowledged. You can also adjust the AckGroupingOption to see if there are some improvements, e.g. increasing the MaxTime or MaxSize. A flamegraph pprof is also good for analysis. @panszobe

There are some prometheues metrics in Grafana Graph:

image
image
image

Version label stands for
enable_batch_master: the latest version and EnableBatchIndexAcknowledgment set true
disable_batch_master: the latest version and EnableBatchIndexAcknowledgment set false
disable_batch_v0.6.0: version 0.6.0 and EnableBatchIndexAcknowledgment not provided

As the metrics shows, receiving and acking messages for enable_batch_master is slower than other two versions, processing time metrics looks like it does't have a very large difference.

There are three flamegraph screenshot for enable_batch_master:
image
image
image

For disable_batch_master*:
image
image

As above, compare to disable_batch_master, runEventsLoop and internalSendRequest takes much more time.

Next I will try to adjust AckGroupingOption to see if there are some improvements.
Please have a look, if you can diagnose what's wrong from something provided above is better.
@BewareMyPower @shibd

@BewareMyPower
Copy link
Contributor Author

@panszobe From the flamegraph, I think the root case is the inefficiency process of sending ACKs. The most time of internalAck method is internal.(*rpcClient).RequestOnCnxNoWait. In this method, SendRequestNoWait takes 3/4 of the total time, while the internal.baseCommand function also takes about 1/4 of the total time.

While I'm looking forward to your test results of other AckGroupingOption configs, I'm going to test the performance locally. Could you give the information of your test data? Including:

  • The batch configs of the producer
  • The message size
  • The ACK type (individual or cumulative?)

BTW, the flush method of the ACK grouping tracker only does the following things:

  1. Call partitionConsumer#sendIndividualAck or partitionConsumer#sendCumulativeAck, which just send the ACKs to the channel eventsCh
  2. Remove some elements from the map.

I didn't see the flushIndividualAcks method from the flamegraph so it looks like it's not the bottle neck. /cc @shibd

@panszobe
Copy link
Contributor

@panszobe From the flamegraph, I think the root case is the inefficiency process of sending ACKs. The most time of internalAck method is internal.(*rpcClient).RequestOnCnxNoWait. In this method, SendRequestNoWait takes 3/4 of the total time, while the internal.baseCommand function also takes about 1/4 of the total time.

While I'm looking forward to your test results of other AckGroupingOption configs, I'm going to test the performance locally. Could you give the information of your test data? Including:

  • The batch configs of the producer
  • The message size
  • The ACK type (individual or cumulative?)

The situations of producer: about 20MB/s, 200k messages/s, batch = 10. Through JAVA pressure test procedure.
ACK type is individual, we use AckID method to commit. SubscriptionType is Shared.

I adjusted the AckGroupingOption configs at 09:49, increasing MaxSize to 1000 and MaxTime to 1000, metrics monitor as below:
wecom-temp-21e4577459e75e0e33510de1559f2726
It seems that the adjustment does not have any improvement.

Looking forward to your local test. @BewareMyPower

@BewareMyPower
Copy link
Contributor Author

BewareMyPower commented Feb 23, 2023

increasing MaxSize to 1000 and MaxTime to 1000,

The default MaxSize is 1000, which means every time 1000 messages are acknowledged, the ACK request will be sent. Increasing MaxTime to 1000 won't help because the receiving speed is far more than 1000 messages per second.

BTW, I just remembered an important thing that this PR does not support the list ACK. See

// TODO: support ack a list of MessageIDs
. i.e. each time 1000 message IDs are flushed, 1000 ACK requests will be sent. So it does not make a difference with no grouping. The total number of the ACK requests does not change.

I'm going to implement the list ACK soon and see if it works.

@panszobe
Copy link
Contributor

BTW, I just remembered an important thing that this PR does not support the list ACK. See

// TODO: support ack a list of MessageIDs

Emmm, I made a mistake while adjusting the options. As your diagnosis, I adjusted the options by increasing the two settings to 10000, which still didn't make a difference.
Maybe the break point is your new PR for supporting the list ACK.

@BewareMyPower
Copy link
Contributor Author

Okay. When I open a new PR, I will also perform a simple test in my local env to see if it works.

@BewareMyPower BewareMyPower deleted the bewaremypower/ack-tracker branch February 24, 2023 11:11
@panszobe
Copy link
Contributor

panszobe commented Mar 2, 2023

@BewareMyPower hi~
Seeing the new PR, I used latest verision(v0.9.1-0.20230301160414-42ded0d59c46) to test today, but it seems that it has no improvements.
image
image
image
image

According to the flamegraphs as above, SendRequest and MessageReceive took much time.

image

As above, consuming rates and message receving rates also show the test result.

The subscription is very simple, just receiving and acking the Message, has no business processing logic, as below:
image

But it still could not catch up with the producer's production while the other one which disables BatchIndexAcknowledgment has no large backlog and keeps stable.

Do you have any other idea? Or there is any optimizations doing now?

@BewareMyPower
Copy link
Contributor Author

@panszobe I'm working on this issue currently. But since I'm busy on something else this week, the PR was not opened. If you mean #968, then it's not related to this issue. It's just a problem I found when I wrote the code.

@panszobe
Copy link
Contributor

panszobe commented Mar 2, 2023

@panszobe I'm working on this issue currently. But since I'm busy on something else this week, the PR was not opened. If you mean #968, then it's not related to this issue. It's just a problem I found when I wrote the code.

OK. Looking forward to the new PR. Thanks~

BewareMyPower added a commit to BewareMyPower/pulsar-client-go that referenced this pull request Mar 5, 2023
### Motivation

apache#957 adds the support for
grouping MessageID instances to acknowledge. However, when flushing N
cached MessageID instances, N CommandAck requests will be sent. It
downgrades the performance.

### Modifications

When more than one MessageID instances are acknowledged, use a single
CommandAck request to carry all of them.
BewareMyPower added a commit to BewareMyPower/pulsar-client-go that referenced this pull request Mar 6, 2023
### Motivation

apache#957 adds the support for
grouping MessageID instances to acknowledge. However, when flushing N
cached MessageID instances, N CommandAck requests will be sent. It
downgrades the performance.

### Modifications

When more than one MessageID instances are acknowledged, use a single
CommandAck request to carry all of them.
BewareMyPower added a commit to BewareMyPower/pulsar-client-go that referenced this pull request Mar 8, 2023
### Motivation

apache#957 adds the support for
grouping MessageID instances to acknowledge. However, when flushing N
cached MessageID instances, N CommandAck requests will be sent. It
downgrades the performance.

### Modifications

- When more than one MessageID instances are acknowledged, use a single
CommandAck request to carry all of them.
- Reduce number of the stored MessageIDs of the ACK grouping tracker
- Support configure AckGroupingOptions
- Add test to cover new logic of the ACK grouping tracker
@geniusjoe geniusjoe mentioned this pull request Mar 4, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Consume Performance drops when set EnableBatchIndexAcknowledgment = true
3 participants