Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Per Message Unacknowledged Redelivery #51

Closed
wants to merge 1 commit into from

Conversation

sschepens
Copy link
Contributor

Motivation

Pulsar should be more robust in terms of which messages it redelivers when there are unacked messages, right now Pulsar redelivers ALL currently unacked messages, this increases A LOT the chance to get duplicate messages.

Modifications

Protobuf:

  • RedeliverUnacknowledgedMessages now carries an optional list of MessageIdData specifying the messages to be redelivered.

Broker:

  • Now supports receiving a MessageIds on redelivery and adds them to the replay list, only if the could be atomically removed from the unacked message list (maybe we're lucky and someone acked or redelivered the message before we try).

Client:

  • Extracted UnAckedMessageTracker to an interface and renamed current implementation to SimpleUnAckedMessageTracker. We probably want to also make it configurable so that a user could tweak the behavior and have full control (such as more precise time tracking).
  • ConsumerConfiguration now allows for enabling per message redelivery and sepcifying how big a batch can be before asking the Broker to redeliver all unacked messages.
  • SimpleUnAckedMessageTracker now always sends MessageIds to redelivery and Consumer determines whether it should use per message redelivery or full redelivery.
  • PartitionedConsumer calls per message redelivery on its internal clients by splitting MessageIds by partition.

Result

Change should theoretically be backwards-compatible because Pulsar uses Protobuf.
Users can chose to prevent message duplicates on Redelivery be incrementing network traffic to Broker (sending message ids).

This is a first try at this implementation, please provide feedback!
I would also need a little guidance with tests, if I run mvn test now it hangs on some strange BookKeeper tests on project managed-ledger. I would also like some opinion on where to write tests.

Cheers!

@yahoocla
Copy link

yahoocla commented Oct 6, 2016

Thank you for submitting this pull request, however I do not see a valid CLA on file for you. Before we can merge this request please visit https://yahoocla.herokuapp.com/ and agree to the terms. Thanks! 😄

@yahoocla
Copy link

yahoocla commented Oct 6, 2016

CLA is valid!

Copy link
Contributor

@merlimat merlimat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The change looks goood. Just a few comments.

Please few unit tests to verify the redelivery of just the intended messages. Take a look at tests like https://github.com/yahoo/pulsar/blob/master/pulsar-broker/src/test/java/com/yahoo/pulsar/client/api/SimpleProducerConsumerTest.java to see how to setup the mocked broker for the tests

.map(messageIdData -> PositionImpl.get(messageIdData.getLedgerId(), messageIdData.getEntryId()))
.filter(position -> {
if (!pendingAcks.remove(position)) {
position.recycle();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to call position.recycle(). It's a no-op method that should be removed anyway

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

* the connected consumers. This is a non blocking call and doesn't throw an exception. In case the connection
* breaks, the messages are redelivered after reconnect.
*/
void redeliverUnacknowledgedMessages(List<MessageIdImpl> messageIds);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would be the use case for exposing this in the public API? I'm not against it, but once it's out, we cannot remove it, so we should only add it if it's really needed.
Also, the method should be called redeliverMessages(messageIds)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And we should take MessageId and not MessageIdImpl

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would be the use case for exposing this in the public API? I'm not against it, but once it's out, we cannot remove it, so we should only add it if it's really needed.
Also, the method should be called redeliverMessages(messageIds)ç

I believe there would be no need for exposing this, at least not now, but maybe we could make void redeliverUnacknowledgedMessages() have the default behavior of not redelivering all messages.
I think this should not be called redeliverMessages(messageIds) because the broker actually makes sure it is redelivering only unacknowledged messages.

And we should take MessageId and not MessageIdImpl

I've seen a lot of things using MessageIdImpl, that's why I went that way, it's a trivial change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

following comment by @rdhabalia should we switch method signature to receive MessageId? UnAckedMessageTracker keeps MessageIdImpls

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sboobna does it really make sense to receive MessageId? It's just an empty interface


import java.io.Closeable;

public interface UnAckedMessageTracker extends Closeable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an internal interface and should not be in the public API java package

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We would probably like to provide our own implementation of UnAckedMessageTracker sometime in the future, or have Pulsar provide a more precise tracking than the current one, that's why I made this public.
Is there any reason you're against this?

Copy link
Contributor

@merlimat merlimat Oct 11, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's keep that change separate from this PR for now

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure


private boolean perMessageRedeliverUnacknowledged = false;

private UnAckedMessageTracker unAckedMessageTracker = null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not used

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yup

@@ -47,6 +47,12 @@

private long ackTimeoutMillis = 0;

private int maxRedeliverUnacknowledgedMessagesBatch = 1000;

private boolean perMessageRedeliverUnacknowledged = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't need this flag, the per-message redelivery should always be enabled

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

}
ClientCnx cnx = cnx();
if (isConnected() && cnx.getRemoteEndpointProtocolVersion() >= ProtocolVersion.v2.getNumber()) {
MessageIdData.Builder builder = MessageIdData.newBuilder();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general, let's leave the protobuf operation to be done in Commands class

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now Commands class cannot receive MessageId because pulsar-common doesn't and shouldn't have dependency on anything else.

How would you create a builder for this?

I've checked if I could find a similar case in Commands but couldn't find one

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@merlimat any thought on this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense. It's good as it is then.

MessageIdData.Builder builder = MessageIdData.newBuilder();
List<MessageIdData> messageIdDatas = messageIds.stream()
.map(messageId -> {
builder.setPartition(messageId.getPartitionIndex());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be a new builder instance each time the mapping function is called?

Also, the builder itself should be recycled after builder.build() is called

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did this to avoid generating garbage or cpu time, since we're replacing the same fields on every iteration there should be no problems with reusing the same builder instance and should save calls to recycler.

Copy link
Contributor

@merlimat merlimat Oct 11, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, looks good

@@ -262,6 +263,12 @@ public synchronized void redeliverUnacknowledgedMessages(Consumer consumer) {

}

@Override
public void redeliverUnacknowledgedMessages(Consumer consumer, List<PositionImpl> positions) {
// It doesn't seem to make sense to redeliver single messages to single consumers.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's correct, we cannot redeliver individual messages in this case or we would break ordering.

* <i>(default: 1000)</i>
*/
public int getMaxRedeliverUnacknowledgedMessagesBatch() {
return maxRedeliverUnacknowledgedMessagesBatch;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I'm missing something, but what is the reason for this config?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sboobna, @merlimat suggested that we could add an upper bound to the messages sent in a redeliver unacknlowledged messages command, I thought we could make this configurable so that clients have a chance to tweak this.

Copy link
Contributor

@merlimat merlimat Oct 11, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On a second thought, instead of having a user-configurable setting, we could have an internal limit on how many messages to ask for redelivery with a single protobuf command.

Eg.: each redelivery command can contain up to 1000 position. If there are more than 1000 timed out messages, the client will send multiple redelivery commands.

This would also solve the problem of an application setting that limit too big.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sschepens Please consider the change described above.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done, could you check if it's somewhat what you expected @merlimat ?

@merlimat merlimat assigned merlimat and unassigned merlimat Oct 10, 2016
.map(messageId -> {
builder.setPartition(messageId.getPartitionIndex());
builder.setLedgerId(messageId.getLedgerId());
builder.setEntryId(messageId.getEntryId());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case of batch-message usecase: messageId will be type of BatchMessageIdImpl which may create duplicate MessageId into the final list. instead should we use set or more-specific SortedSet to read entries in sequence at broker side.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aren't BatchMessageIdImpl only used for acks? SimpleUnAckedMessageTracker stores MessageIdImpl so there should be no case where this method receives BatchMessageIdImpl.

That's also a reason why we the method signature should probably be redeliverUnacknowledgedMessages(List<MessageIdImpl> messageIds)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's true, in case of batched messages, the application will use ack multiple BatchMessageImpl. When all the messages within a single batch are acked, that will trigger the ack to the broker based on the whole batch MessageIdImpl

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rdhabalia @merlimat I'm not really sure how we should deal with BatchMessageIdImpl. I believe it doesn't really make sense te redeliver a single message in a batch, I don't really know if it's possible too. So, what should we do when UnAckedMessageTracker considers a BatchMessageIdImpl should be redelivered?

@rdhabalia
Copy link
Contributor

just a thought:
in this PR, we essentially have below change:

@sschepens
Copy link
Contributor Author

instead of delivering all pendingMessages, we try to redeliver SimpleUnAckedMessageTracker. oldOpenSet messages which are actually timed-out by sending only those specific messageIds.

Yes

As alternative can we just send head-messageId of SimpleUnAckedMessageTracker. oldOpenSet to broker and broker will redeliver all pending-messages older than that message-id. It may help to avoid overhead of data-transfer and serialization/deserialization.

I thought of this but, I don't really know if it's that easy.
Would it really work on a Shared subscription?
What if a message has been redelivered several times, everytime we receive a messageId that's newer we would redeliver that message as well.

Yes, this implementation has a lot of network overhead but it is precise.

Another way to provide a more efficient behavior would be to move UnAckedTracker to the Broker, traditionally Brokers track when messages should be redelivered, but this would mean a whole lot of changes because ackTimeout should be passed when creating a subscription or be configurable per topic.

@sschepens sschepens force-pushed the per_message_redelivery branch 2 times, most recently from 6ee3a33 to 8057ea8 Compare October 11, 2016 14:55
}
ClientCnx cnx = cnx();
if (isConnected() && cnx.getRemoteEndpointProtocolVersion() >= ProtocolVersion.v2.getNumber()) {
MessageIdData.Builder builder = MessageIdData.newBuilder();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense. It's good as it is then.

.map(messageId -> {
builder.setPartition(messageId.getPartitionIndex());
builder.setLedgerId(messageId.getLedgerId());
builder.setEntryId(messageId.getEntryId());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's true, in case of batched messages, the application will use ack multiple BatchMessageImpl. When all the messages within a single batch are acked, that will trigger the ack to the broker based on the whole batch MessageIdImpl

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

import com.yahoo.pulsar.client.impl.ConsumerStats;
import com.yahoo.pulsar.client.impl.MessageIdImpl;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed unused imports

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

* <i>(default: 1000)</i>
*/
public int getMaxRedeliverUnacknowledgedMessagesBatch() {
return maxRedeliverUnacknowledgedMessagesBatch;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sschepens Please consider the change described above.

@merlimat
Copy link
Contributor

Change looks good to me, once you add some test to verify the new behavior we should be good to go.

There is one compilation failure in travis build :

[ERROR] /home/travis/build/yahoo/pulsar/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/Consumer.java:[433,24] error: incompatible types: bad return type in method reference

@sschepens
Copy link
Contributor Author

@merlimat I don't really know how that error happened, I ran the tests on my machine and they just worked.
What should I do about BatchMessageIdImpl in the end?
I'll add some tests as soon as a I can

@merlimat
Copy link
Contributor

@sschepens About BatchMessageIdImpl, after discussing with @rdhabalia and @sboobna, we concluded that:

  • Right now BatchMessageIdImpl are being added to the unacked message tracker (though there's no particular need for it
  • With this PR, we would end up requesting the same batch multiple times

After #55 gets merged, only the "whole batch" MessageId will be inserted into the unacked message tracker, thus this PR will be good as it is now (except for some probable minor conflicts with the other PR)

@sschepens
Copy link
Contributor Author

@merlimat I committed some tests, could you check them? They also raised a a few more tweaks to prevent using per message redelivery on subscriptions that are not shared.

@sschepens
Copy link
Contributor Author

sschepens commented Oct 14, 2016

@merlimat I don't really know why Travis keeps failing, It's complaining about the return type of method ConcurrentOpenHashSet#remove, it runs OK on my machine, could it be the jdk version?

Edit: Nevermind, I found the issue, could you give this a review now?

@sschepens sschepens force-pushed the per_message_redelivery branch 2 times, most recently from 9c921aa to 40c8543 Compare October 17, 2016 13:36
@merlimat merlimat added the type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages label Oct 17, 2016
@merlimat merlimat added this to the 1.15 milestone Oct 17, 2016
Copy link
Contributor

@merlimat merlimat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very nice. Thank you

@merlimat
Copy link
Contributor

merlimat commented Oct 17, 2016

I've rebased to current master and fixed some minor conflicts in #70

@merlimat merlimat closed this Oct 17, 2016
@sschepens sschepens deleted the per_message_redelivery branch October 18, 2016 13:10
sijie added a commit to sijie/pulsar that referenced this pull request Mar 4, 2018
massakam pushed a commit to massakam/pulsar that referenced this pull request Aug 6, 2020
hrsakai pushed a commit to hrsakai/pulsar that referenced this pull request Dec 10, 2020
Signed-off-by: xiaolong.ran <ranxiaolong716@gmail.com>

Improve README.md and add CONTRIBUTING.md file.
dlg99 pushed a commit to dlg99/pulsar that referenced this pull request Mar 29, 2022
nicoloboschi referenced this pull request in nicoloboschi/pulsar Apr 12, 2022
dragonls pushed a commit to dragonls/pulsar that referenced this pull request Oct 21, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants