-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Make Consumer thread safe and lock-free #10352
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This way we are limiting the number of concurrent reads on the client, by the size of the executor.
The performance test results are in two folders 2.6.1 and 2.8.0 Test command: bin/benchmark --drivers driver-pulsar/pulsar.yaml workloads/max-rate-1-topic-20-partitions-20p-20c-1kb.yaml 5 IO Thread |
Wow, all unit tests are done at once |
I will re-trigger CI several times to see if there will be some occasional problems |
In the case of adding lock, there are still the following flaky tests In the case of adding thread pool Therefore, the performance of adding a thread pool and adding a lock is the same. I haven't seen other flaky errors when using the thread pool. |
pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientTestFixtures.java
Outdated
Show resolved
Hide resolved
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
Show resolved
Hide resolved
# Conflicts: # pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@merlimat @sijie @rdhabalia @codelipenghui @eolivelli PTAL, thanks |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall is good to me.
I left a comment.
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
nice |
} | ||
singleMessagePayload.release(); | ||
singleMessagePayload.release(); | ||
tryTriggerListener(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall is good, just left a comment.
I think we don't need move tryTriggerListener
into cycle, because we cannot use Listener
while calling receive
or receiveAsync
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The behavior here is consistent, the previous logic is
if(){
// do some thing
}else{
// do some thing
}
triggerListener
I just put the triggerListener into the if and else respectively.
If I don’t do this, triggerListener will execute in parallel with the if-else logic
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You put triggerListener
into the cycle
Maybe have a bad influence when the batchSize is big
we enqueue all message and trigger once before, now we enqueue one message and trigger one time.
What about just
if(){
// do some thing
}else{
// do some thing
}
internalPinnedExecutor.execute(() -> {
triggerListener();
});
waiting for @linlinnn approval before merging |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks.
thank you @linlinnn for your comment. let's wait for CI to complete |
### Motivation Lock-free solution for apache#10240
Lock-free solution for apache#10240 (cherry picked from commit def1932)
Lock-free solution for apache#10240 (cherry picked from commit def1932)
…ving pending batch receives requests ### Motivation The consumer will apply the default batch receive policy even if the user will not use the batch receive API. https://github.com/apache/pulsar/blob/6704f12104219611164aa2bb5bbdfc929613f1bf/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/BatchReceivePolicy.java#L60-L61 This will consume lots of CPU if the client have many consumers (100k consumers) The Pulsar perf tool can also reproduce the problem if run the test with many consumers ### Modification If there is no pending batch receive operation for a consumer, no need to trigger the batch timeout task periodically. We can only start the timeout check after adding batch receive request to pending request queue. Remove the lock in MultiTopicsConsumerImpl as apache#10352 does ### Verification Added new test to verify the batch receive timeout task will not start if no batch receive request
…ving pending batch receives requests (#16160) ### Motivation The consumer will apply the default batch receive policy even if the user will not use the batch receive API. https://github.com/apache/pulsar/blob/6704f12104219611164aa2bb5bbdfc929613f1bf/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/BatchReceivePolicy.java#L60-L61 This will consume lots of CPU if the client have many consumers (100k consumers) The Pulsar perf tool can also reproduce the problem if run the test with many consumers ### Modification If there is no pending batch receive operation for a consumer, no need to trigger the batch timeout task periodically. We can only start the timeout check after adding batch receive request to pending request queue. Remove the lock in MultiTopicsConsumerImpl as #10352 does ### Verification Added new test to verify the batch receive timeout task will not start if no batch receive request
…ving pending batch receives requests (#16160) The consumer will apply the default batch receive policy even if the user will not use the batch receive API. https://github.com/apache/pulsar/blob/6704f12104219611164aa2bb5bbdfc929613f1bf/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/BatchReceivePolicy.java#L60-L61 This will consume lots of CPU if the client have many consumers (100k consumers) The Pulsar perf tool can also reproduce the problem if run the test with many consumers If there is no pending batch receive operation for a consumer, no need to trigger the batch timeout task periodically. We can only start the timeout check after adding batch receive request to pending request queue. Remove the lock in MultiTopicsConsumerImpl as #10352 does Added new test to verify the batch receive timeout task will not start if no batch receive request (cherry picked from commit a0ccdc9)
…ving pending batch receives requests (apache#16160) The consumer will apply the default batch receive policy even if the user will not use the batch receive API. https://github.com/apache/pulsar/blob/6704f12104219611164aa2bb5bbdfc929613f1bf/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/BatchReceivePolicy.java#L60-L61 This will consume lots of CPU if the client have many consumers (100k consumers) The Pulsar perf tool can also reproduce the problem if run the test with many consumers If there is no pending batch receive operation for a consumer, no need to trigger the batch timeout task periodically. We can only start the timeout check after adding batch receive request to pending request queue. Remove the lock in MultiTopicsConsumerImpl as apache#10352 does Added new test to verify the batch receive timeout task will not start if no batch receive request (cherry picked from commit a0ccdc9) (cherry picked from commit 6ed4ed0)
…ving pending batch receives requests (#16160) The consumer will apply the default batch receive policy even if the user will not use the batch receive API. https://github.com/apache/pulsar/blob/6704f12104219611164aa2bb5bbdfc929613f1bf/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/BatchReceivePolicy.java#L60-L61 This will consume lots of CPU if the client have many consumers (100k consumers) The Pulsar perf tool can also reproduce the problem if run the test with many consumers If there is no pending batch receive operation for a consumer, no need to trigger the batch timeout task periodically. We can only start the timeout check after adding batch receive request to pending request queue. Remove the lock in MultiTopicsConsumerImpl as #10352 does Added new test to verify the batch receive timeout task will not start if no batch receive request (cherry picked from commit a0ccdc9)
…ving pending batch receives requests (#16160) The consumer will apply the default batch receive policy even if the user will not use the batch receive API. https://github.com/apache/pulsar/blob/6704f12104219611164aa2bb5bbdfc929613f1bf/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/BatchReceivePolicy.java#L60-L61 This will consume lots of CPU if the client have many consumers (100k consumers) The Pulsar perf tool can also reproduce the problem if run the test with many consumers If there is no pending batch receive operation for a consumer, no need to trigger the batch timeout task periodically. We can only start the timeout check after adding batch receive request to pending request queue. Remove the lock in MultiTopicsConsumerImpl as #10352 does Added new test to verify the batch receive timeout task will not start if no batch receive request (cherry picked from commit a0ccdc9)
Motivation
Lock-free solution for #10240