Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add unack-message threshold to restrict consumer for receiving messages without acknowledging-msg up to the threshold #48

Merged
merged 1 commit into from Oct 11, 2016

Conversation

rdhabalia
Copy link
Contributor

Motivation

If consumer keep tries to receive messages without acknowledging to broker then broker won't know last-deleted/received message entry. and if broker/consumer crashes/restarts in between then next time broker needs to send all the messages again as client didn't ack those messages.

Modifications

  • introduced configurable threshold maxUnackedMessagesPerConsumer limit till which consumer can consume messages without sending ack
  • once consumer reaches this maxUnackedMessagesPerConsumer limit, broker stops sending further messages until consumer sends acknowledgement for maxUnackedMessagesPerConsumer/2 messages.

Result

  • broker can block consumer if consumer doesn't send ack for maxUnackedMessagesPerConsumer number of messages.

@rdhabalia rdhabalia added the type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages label Sep 29, 2016
@rdhabalia rdhabalia added this to the 1.15 milestone Sep 29, 2016
@rdhabalia rdhabalia self-assigned this Sep 29, 2016
@yahoocla
Copy link

CLA is valid!

2 similar comments
@yahoocla
Copy link

CLA is valid!

@yahoocla
Copy link

CLA is valid!

oldPermits);
if (!blockedConsumerOnUnackedMsgs) {
if (log.isDebugEnabled()) {
log.debug("[{}] Added more flow control message permits {} (old was: {})", this,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we are updating the messagePermits regardless, we should keep this log outside the if condition.

additionalNumberOfMessages, oldPermits);
}
subscription.consumerFlow(this, additionalNumberOfMessages);
unackedMessages.addAndGet(additionalNumberOfMessages);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we update the unackedMessages while dispatching messages?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as we talked: we are not moving increment of unackedMessages at sendMessages(..) because this method is being called asynchronously and before it updates unackedMessages counter: if flowPermits(...) get triggered then it will add more permits and call dispatcher to dispatch more messages.

@@ -66,14 +66,19 @@
private final ConcurrentOpenHashSet<PositionImpl> pendingAcks;

private final ConsumerStats stats;

private final int maxUnackedMessages;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should be able to disable this feature easily. What happens if we set maxUnackedMessages to 0.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To disable feature we can configure maxUnackedMessages =Integer.MAX_VALUE. Right now, configured value of maxUnackedMessages is 50000. Should we increase it to higher number.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to use 0 for "unlimited"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright. I have added method shouldBlockConsumerOnUnackMsgs() which will enable this feature on sharedSubscription if maxUnackedMessages is > 0.

@rdhabalia rdhabalia force-pushed the block_con branch 4 times, most recently from 58ca6d8 to 923636e Compare October 4, 2016 00:58
@@ -97,6 +97,10 @@ tlsTrustCertsFilePath=
# Accept untrusted TLS certificate from client
tlsAllowInsecureConnection=false

# Max number of unacknowledged messages allowed to receive messages by a consumer on a shared subscription. Broker will stop sending
# messages to consumer once, this limit reaches until consumer starts acknowledging messages back
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can mention here that setting the value to 0 will make it unlimited

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes.. added the documentation..

@rdhabalia rdhabalia force-pushed the block_con branch 8 times, most recently from 0ec780f to f7c99ff Compare October 5, 2016 06:57
@@ -66,14 +66,19 @@
private final ConcurrentOpenHashSet<PositionImpl> pendingAcks;
Copy link
Contributor Author

@rdhabalia rdhabalia Oct 5, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added change: to consider number of messages in a batch-msg while blocking consumer.

@rdhabalia rdhabalia force-pushed the block_con branch 3 times, most recently from 4cfa842 to 910fcf5 Compare October 6, 2016 20:45
…es without acknowledging-msg up to the threshold
@merlimat
Copy link
Contributor

merlimat commented Oct 8, 2016

@sboobna can you give a final pass?

Copy link
Contributor

@sboobna sboobna left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@merlimat merlimat merged commit 95b8239 into apache:master Oct 11, 2016
@rdhabalia rdhabalia deleted the block_con branch November 11, 2016 23:04
sijie added a commit to sijie/pulsar that referenced this pull request Mar 4, 2018
massakam pushed a commit to massakam/pulsar that referenced this pull request Aug 6, 2020
hrsakai pushed a commit to hrsakai/pulsar that referenced this pull request Dec 10, 2020
Signed-off-by: xiaolong.ran <ranxiaolong716@gmail.com>
Fix the map of pendingReqs concurrent issue.
hangc0276 pushed a commit to hangc0276/pulsar that referenced this pull request May 26, 2021
In pulsar cpp client, the default event time is set as minus value and caused error when use kafka consumer to consume message produced by pulsar cpp/cgo client. 
In this change, if we checked the event time is not set, using publish time to set it.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants