-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Added pause and resume to Java client Consumer #2961
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The change looks good. Just left one comment regarding pause and receive interactions.
Can you also add unit tests for this feature, for both partitioned vs non-partitioned topics. Take a look at https://github.com/apache/pulsar/blob/master/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java#L197 or https://github.com/apache/pulsar/blob/master/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java#L167
pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
Outdated
Show resolved
Hide resolved
pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
Outdated
Show resolved
Hide resolved
I added tests. Tx. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
run integration tests |
...nk/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSourceTests.java
Outdated
Show resolved
Hide resolved
run integration tests |
Motivation
It is useful to be able to control message flow from the client but still use MessageListener to be notified when new messages arrive. Currently you have to use Consumer.receive() and receiverQueueSize which requires polling, which is inefficient for thousands of consumers. The C++ client has Consumer.pauseMessageListener() and resumeMessageListener().
Modifications
Added Consumer.pause() and Consumer.resume(). When paused the consumer does not send requests for more messages to the broker. This works when using MessageListener and Consumer.receive() which is why I didn't use the same names as the C++ client.
Result
Applications using the Java API can now do client side message flow control very easily based on their own internal state.