Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker] Fix recentlyJoinedConsumers to address the out-of-order issue #20179

Conversation

equanz
Copy link
Contributor

@equanz equanz commented Apr 25, 2023

Motivation

The Key_Shared subscription type has the following issues.

  1. Key_Shared subscription has out-of-order cases because of the race condition of the recently joined consumers feature.
    Consider the following flow.

    1. Assume that the current read position is 1:6 and the recently joined consumers is empty.
    2. Called OpReadEntry#internalReadEntriesComplete from thread-1.
      Then, the current read position is updated to 1:11 (Messages from 1:6 to 1:10 have yet to be dispatched to consumers).
    3. Called PersistentStickyKeyDispatcherMultipleConsumers#addConsumer from thread-2.
      Then, the new consumer is stored to recently joined consumers with read position 1:11.
    4. Called PersistentDispatcherMultipleConsumers#trySendMessagesToConsumers from thread-5.
      Then, messages from 1:6 to 1:10 are dispatched to consumers. From the recently joined consumers feature, the new consumer can receive messages from 1:6 to 1:10. However, it is not expected.
      For example, if existing consumers have some unacked messages and disconnect, it causes out of order in some cases.
  2. Key_Shared subscription has a redundant process.
    The stuckConsumers feature was introduced from [pulsar-broker] Stop to dispatch when skip message temporally since Key_Shared consumer stuck on delivery #7553 .
    However, it can't fix the issue entirely because it doesn't consider the range changes.
    After this commit, [broker] Fix issue that message ordering could be broken when redelivering messages on Key_Shared subscription #10762 was introduced. It fixes the issue.

Modifications

  1. Store last sent position instead of read position to recently joined consumers.
    Updating read position, then dispatching messages, and adding new consumer are not exclusive.
    I used the last send position to get a position without any new exclusion features.

  2. Keep the last sent position per key.
    When I introduced the last sent position, I noticed a new concern. Consider the following flow.

    1. Assume that the entries has the following messages,
      • msg-1, key: key-a, position: 1:1
      • msg-2, key: key-a, position: 1:2
      • msg-3, key: key-a, position: 1:3
      • msg-4, key: key-b, position: 1:4
      • msg-5, key: key-b, position: 1:5
      • msg-6, key: key-b, position: 1:6
        the dispatcher has two consumers (c1 messagesForC is 1, c2 messageForC is 1000), and the selector will return c1 if key-a and c2 if key-b.
    2. Send msg-1 to c1 and msg-4 - msg-6 to c2.
      • So, the current last sent position is 1:6.
      • c1 never acknowledge msg-1.
    3. Add new consumer c3, and the selector will return c3 if key-a.
    4. Send msg-2 - msg-3 to c3 because 1:2 and 1:3 are less than the last sent position, 1:6.
    5. Disconnect c1.
    6. Send msg-1 to c3.
      Now, c3 receives messages without expected order about key-a.

    To avoid this issue, I introduce the last sent positions feature.

  3. Remove redundant features due to the addition of the last sent positions feature.

    1. Remove this condition because the last sent positions feature restricts part of messages.
      New consumers can receive any new messages with the message key that is not in the last sent position.

      if (totalMessagesSent == 0 && (recentlyJoinedConsumers == null || recentlyJoinedConsumers.isEmpty())) {
      // This means, that all the messages we've just read cannot be dispatched right now.
      // This condition can only happen when:
      // 1. We have consumers ready to accept messages (otherwise the would not haven been triggered)
      // 2. All keys in the current set of messages are routing to consumers that are currently busy
      // and stuck is not caused by stuckConsumers
      //
      // The solution here is to move on and read next batch of messages which might hopefully contain
      // also keys meant for other consumers.
      //
      // We do it unless that are "recently joined consumers". In that case, we would be looking
      // ahead in the stream while the new consumers are not ready to accept the new messages,
      // therefore would be most likely only increase the distance between read-position and mark-delete
      // position.

    2. Remove this behavior because the last sent positions has positions per key.
      If some key is stuck at a certain point in time, from that point, new consumers store the same information about the key to the recentlyJoinedConsumers.

      PositionImpl maxReadPosition = recentlyJoinedConsumers.get(consumer);
      // At this point, all the old messages were already consumed and this consumer
      // is now ready to receive any message
      if (maxReadPosition == null) {
      // stop to dispatch by stuckConsumers
      if (stuckConsumers.contains(consumer)) {
      if (log.isDebugEnabled()) {
      log.debug("[{}] stop to dispatch by stuckConsumers, consumer: {}", name, consumer);
      }
      return 0;
      }
      // The consumer has not recently joined, so we can send all messages
      return maxMessages;
      }
      // If the read type is Replay, we should avoid send messages that hold by other consumer to the new consumers,
      // For example, we have 10 messages [0,1,2,3,4,5,6,7,8,9]
      // If the consumer0 get message 0 and 1, and does not acked message 0, then consumer1 joined,
      // when consumer1 get message 2,3, the broker will not dispatch messages to consumer1
      // because of the mark delete position did not move forward.
      // So message 2,3 will stored in the redeliver tracker.
      // Now, consumer2 joined, it will read new messages from the cursor,
      // so the recentJoinedPosition is 4 for consumer2
      // Because of there are messages need to redeliver, so the broker will read the redelivery message first [2,3]
      // message [2,3] is lower than the recentJoinedPosition 4,
      // so the message [2,3] will dispatched to the consumer2
      // But the message [2,3] should not dispatch to consumer2.
      if (readType == ReadType.Replay) {
      PositionImpl minReadPositionForRecentJoinedConsumer = recentlyJoinedConsumers.values().iterator().next();
      if (minReadPositionForRecentJoinedConsumer != null
      && minReadPositionForRecentJoinedConsumer.compareTo(maxReadPosition) < 0) {
      maxReadPosition = minReadPositionForRecentJoinedConsumer;
      }
      }

    3. Remove this behavior because this calculation is moved to LastSentPositions#compareToLastSentPosition.

      // Here, the consumer is one that has recently joined, so we can only send messages that were
      // published before it has joined.
      for (int i = 0; i < maxMessages; i++) {
      if (((PositionImpl) entries.get(i).getPosition()).compareTo(maxReadPosition) >= 0) {
      // We have already crossed the divider line. All messages in the list are now
      // newer than what we can currently dispatch to this consumer
      return i;
      }
      }

  4. Remove the stuckConsumers feature.

  5. Reconstruct some consumer stats fields related to Key_Shared.
    This is a breaking change. However, if this PR is merged, the existing field which is removed in this PR is no longer needed.

Verifying this change

  • Make sure that the change passes the CI checks.

This change added tests and can be verified as follows:

  • Added unit test to ensure the behavior of last sent positions feature works properly

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics
  • Anything that affects deployment

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository: equanz#1

@github-actions github-actions bot added the doc-required Your PR changes impact docs and you will update later. label Apr 25, 2023
@equanz equanz force-pushed the change_definition_of_recently_joined_consumers_position branch from 97f7621 to 27880e4 Compare April 25, 2023 05:26
@equanz equanz changed the title [fix][broker] Fix recentlyJoinedConsumers to address the out-of-order bug. [fix][broker] Fix recentlyJoinedConsumers to address the out-of-order bug Apr 25, 2023
@equanz equanz changed the title [fix][broker] Fix recentlyJoinedConsumers to address the out-of-order bug [fix][broker] Fix recentlyJoinedConsumers to address the out-of-order issue Apr 25, 2023
Copy link
Contributor

@poorbarcode poorbarcode left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @equanz

Scenario-1

time process: add consumer process: delivery messages to client
1 Read entries 1:6 ~ 1:10 complete
2 Add new consumer into the selector
3 Choose consumer by the selector
4 Add the new consumer into recently joined consumers
5 Delivery entries 1:6 ~ 1:10 to all consumers(includes old consumers and the new consumer)

You want to explain the issue as the above processes, right[1]?

If yes[1], this PR relies on the change below to solve the above problem, right[2]?

final LastSentPositions lastSentPositions = recentlyJoinedConsumers == null
? null : recentlyJoinedConsumers.get(c);
if (c != null && (lastSentPositions == null
|| lastSentPositions
.compareToLastSentPosition(stickyKey, entry.getLedgerId(), entry.getEntryId()) >= 0)) {
groupedEntries.computeIfAbsent(c, k -> new ArrayList<>()).add(entry);
consumerStickyKeyHashesMap.computeIfAbsent(c, k -> new HashSet<>()).add(stickyKeyHash);
} else {
addMessageToReplay(entry.getLedgerId(), entry.getEntryId(), stickyKeyHash);
entry.release();
}

If yes[2], I think the original data structure Map<Consumer, PositionImpl> can also complete this fix, right[3]?

If yes[3], the constructor LastSentPositions is an improved change to trace the messages more precisely, right[4]?

If yes[4], could you split this PR into two PR: one typed "fix" and another one typed "improve"?

By the way, if there are a huge number of keys, will the constructor LastSentPositions cost too much memory?

@equanz
Copy link
Contributor Author

equanz commented Apr 26, 2023

@poorbarcode
Thank you for your clarification.

  • [1] Yes.

  • [2] Partially yes. However, I think the main point of this PR is the change in the timing of getting the position.

    if (lastSentPositions != null) {
    entriesWithinSameRange.forEach(e -> {
    if (e == null) {
    return;
    }
    final ByteBuffer stickyKey = ByteBuffer.wrap(peekStickyKey(e.getDataBuffer()));
    lastSentPositions.updateOrPutPosition(stickyKey, e.getLedgerId(), e.getEntryId());
    });
    }

    Currently, the position is the readPosition. The readPosition is updated when the read is completed, even if messages are not dispatched. In this PR, I will change the position to the last sent position.

@poorbarcode
Copy link
Contributor

@equanz

Currently, the position is the readPosition. The readPosition is updated when the read is completed, even if messages are not dispatched. In this PR, I will change the position to the last sent position.

Scenario-2

time process: add consumer process: delivery messages to client
1 read position is 1:6
2 Add new consumer into the selector
3 Read entries 1:6 ~ 1:10 complete
4 Set read position 1:11
5 Add the new consumer into recently joined consumers
6 (Highlight)The max read-position of the new consumer is 1:11, but the exact correct value is 1:6
7 Choose consumer by the selector
8 Delivery entries 1:6 ~ 1:10 to all consumers(includes old consumers and the new consumer)

This scenario is what you are saying, right?[Q-1]

If yes[Q-1], we have two solutions to fix it:

  • Use last send position instead of last read position(what this PR does)
  • Find a way to make the lock works for Scenario-1[scenario-1], and add a lock for set read position for Scenario-2.

I also think solution-1 is better, but declaring the data structure of lastSentPosition as a Map is an improvement, right[Q-2]?

If yes[Q-2], could you split this PR into two PR: one typed "fix" and another one typed "improve"? And I think the PR which typed "improve" should be submitted with a PIP because the mechanism of Key_Shared dispatcher is so complicated now we need PIPs to trace the changes of it.

[scenario-1]: #20179 (review)

@equanz
Copy link
Contributor Author

equanz commented May 9, 2023

@poorbarcode

  • [Q-1] Yes.

  • [Q-2] When I created this PR, I introduced Map structure because it was necessary. For more details, please see the description("2. Keep the last sent position per key.").
    I just noticed we could avoid introducing a Map structure if we implemented the last sent position feature, like the mark delete position and the individually deleted messages feature. In other words,

    • The position is already scheduled to be sent.
    • All positions less than or equal to it are already scheduled to be sent.
    • Manage individually sent positions to update the position as expected.

    Consider the following flow.

    1. Assume that the entries has the following messages,
      • msg-1, key: key-a, position: 1:1
      • msg-2, key: key-a, position: 1:2
      • msg-3, key: key-a, position: 1:3
      • msg-4, key: key-b, position: 1:4
      • msg-5, key: key-b, position: 1:5
      • msg-6, key: key-b, position: 1:6
        the dispatcher has two consumers (c1 messagesForC is 1, c2 messageForC is 1000), and the selector will return c1 if key-a and c2 if key-b.
    2. Send msg-1 to c1 and msg-4 - msg-6 to c2.
      • So, the current last sent position is 1:1 and the individually sent positions is [[1:3, 1:6]] (list of closed intervals. not list of list).
      • c1 never acknowledge msg-1.

    scenario A

    1. Add new consumer c3, and the selector will return c3 if key-a.
    2. Can't send msg-2 - msg-3 to c3 because 1:2, and 1:3 are greater than the last sent position, 1:1.
    3. Disconnect c1.
    4. Send msg-1 - msg-3 to c3.
      Now, c3 receives messages with expected order about key-a.

    scenario B

    1. c1 messagesForC is back to 999.
    2. Send msg-2 - msg-3 to c1
      • So, the current last sent position is 1:6, and the individually sent positions is [].

    My primary motivation is not to improve Key_Shared but fix Key_Shared.
    Therefore, if the current approach is not accepted as a fix, I introduce it and remove the feature by Map per key.

@lhotari lhotari requested a review from eolivelli May 10, 2023 10:00
Copy link
Contributor

@eolivelli eolivelli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great work

I have pointed out some breaking changes in the public API, please take a look

@@ -1186,11 +1186,19 @@ public SubscriptionStatsImpl getStats(Boolean getPreciseBacklog, boolean subscri
subStats.allowOutOfOrderDelivery = keySharedDispatcher.isAllowOutOfOrderDelivery();
subStats.keySharedMode = keySharedDispatcher.getKeySharedMode().toString();

LinkedHashMap<Consumer, PositionImpl> recentlyJoinedConsumers = keySharedDispatcher
LinkedHashMap<Consumer, PersistentStickyKeyDispatcherMultipleConsumers.LastSentPositions>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a code smell to refer to this implementation class PersistentStickyKeyDispatcherMultipleConsumers, maybe we should move the LastSentPositions to a top level public class

.getRecentlyJoinedConsumers();
if (recentlyJoinedConsumers != null && recentlyJoinedConsumers.size() > 0) {
recentlyJoinedConsumers.forEach((k, v) -> {
subStats.consumersAfterMarkDeletePosition.put(k.consumerName(), v.toString());
// Dispatchers allows same name consumers
final StringBuilder stringBuilder = new StringBuilder();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could we use a struct instead of a string ? (you can use a 'record' now, but as this fix should be cherrypick a plain old java object works better)
strings are error prone in code and also less efficient

@@ -69,8 +69,8 @@ public interface ConsumerStats {
/** Flag to verify if consumer is blocked due to reaching threshold of unacked messages. */
boolean isBlockedConsumerOnUnackedMsgs();

/** The read position of the cursor when the consumer joining. */
String getReadPositionWhenJoining();
/** Last sent positions per sticky key of the cursor when the consumer joining. */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a breaking API change, please do not do it

you can add the new method but we should keep the old value as well, otherwise we cannot cherry-pick to old branches

@@ -115,8 +115,8 @@ public interface SubscriptionStats {
/** Whether the Key_Shared subscription mode is AUTO_SPLIT or STICKY. */
String getKeySharedMode();

/** This is for Key_Shared subscription to get the recentJoinedConsumers in the Key_Shared subscription. */
Map<String, String> getConsumersAfterMarkDeletePosition();
/** This is for Key_Shared subscription to get the recentlyJoinedConsumers in the Key_Shared subscription. */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the same here

@equanz
Copy link
Contributor Author

equanz commented May 11, 2023

@eolivelli

Thank you for your comments. I'll check it.
By the way, do you have any comments about the following concern from @poorbarcode ?

By the way, if there are a huge number of keys, will the constructor LastSentPositions cost too much memory?

If the current approach is not approved, I create the next PR as in #20179 (comment) instead of addressing your comments in this PR.


@poorbarcode

What do you think about the following comment?
#20179 (comment)

  • I think we can't split easily between fix and improvement in the current approach.
    • For more details, please see the description("2. Keep the last sent position per key.").
  • I think we need to implement a new feature like [fix][broker] Fix recentlyJoinedConsumers to address the out-of-order issue #20179 (comment) to fix without introducing Map structure.
  • My primary motivation is not to improve Key_Shared but fix Key_Shared.
    Therefore, if the current approach is not approved as a fix, I introduce it and remove the feature by Map per key.

private final Set<Consumer> nextStuckConsumers;
private final LinkedHashMap<Consumer, LastSentPositions> recentlyJoinedConsumers;
// The lastSentPosition is not thread-safe
private final LastSentPositions lastSentPositions;

PersistentStickyKeyDispatcherMultipleConsumers(PersistentTopic topic, ManagedCursor cursor,
Copy link
Contributor

@poorbarcode poorbarcode May 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @equanz

Sorry, I've been so busy lately.

Menu

  • Explain the issue.
  • Suggestion a simpler solution instead of the current one

Explain the issue

There have two scenarios that will make consumption out of order, which this PR tries to fix.

Scenario-1

time process: add consumer process: delivery messages to client
1 Read entries 1:6 ~ 1:10 complete
2 Add new consumer into the selector
3 Choose consumer by the selector
4 Add the new consumer into recently joined consumers
5 Delivery entries 1:6 ~ 1:10 to all consumers(includes old consumers and the new consumer)

Scenario-2

time process: add consumer process: delivery messages to client
1 read position is 1:6
2 Add new consumer into the selector
3 Read entries 1:6 ~ 1:10 complete
4 Set read position 1:11
5 Add the new consumer into recently joined consumers
6 (Highlight)The max read-position of the new consumer is 1:11, but the exact correct value is 1:6
7 Choose consumer by the selector
8 Delivery entries 1:6 ~ 1:10 to all consumers(includes old consumers and the new consumer)

A simpler solution

First of all, this is a great catch, and the current patch can solve the issues above.

But this patch makes an already complicated mechanism even more complicated, and I'd like to suggest a simple solution: Change the constructor of recentlyJoinedConsumers richer, let me show a simple code:

 /**
 * There have two positions in the value:
 * The first position:  the read position of the cursor when the consumer joins.
 * The second position: the first entry in normal reading after the consumer joined.
 */
protected Map<Consumer, Pair<PositionImpl,PositionImpl> recentlyJoinedConsumers;

/**
 * Get max read the position of the consumer.
 */
protected PositionImpl getMaxReadPosition(Consumer consumer) {
    Pair<PositionImpl,PositionImpl> pair = recentlyJoinedConsumers.get(consumer);
    return Math.min(pair.getKey(), pair.getValue());
}

/**
 * Register consumer.
 */
public synchronized CompletableFuture<Void> addConsumer(Consumer consumer) {
    addConsumerIntoRecentJoinQueue();
    setReadPositionWhenConsumerJoined();
    // do other things.
}

/**
 * Calculate how many entries the consumer would consume.
 */
private synchronized int getRestrictedMaxEntriesForConsumer(Consumer consumer, List<Entry> entries, ReadType readType){
   setFirstReadPositionOfRecentJoinedConsumers(entries.get(0));

   PositionImpl maxReadPosition = getMaxReadPosition(consumer);
    ...
}

What do you think of this solution?

Copy link
Contributor Author

@equanz equanz May 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your deep insight!
I have a question about your proposed solution.

The second position: the first entry in normal reading after the consumer joined.

Could you explain this position in more detail, please? When and how to get (or calculate?) the position?
(Perhaps, this solution is similar to #20179 (comment)?)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@equanz

Could you explain this position in more detail, please? When and how to get (or calculate?) the position?

It will be set in method getRestrictedMaxEntriesForConsumer, let me show a simpler code

/**
 * Calculate how many entries the consumer would consume.
 * @param theFirstEntryInCurrentReading This is not the first message the consumer will receive, but the first message read in this batch.
 */
private synchronized int getRestrictedMaxEntriesForConsumer(Consumer consumer, List<Entry> entries, ReadType readType, PositionImpl theFirstEntryInCurrentReading){

   if (readType.equals(Normal) ){
       recentlyJoinedConsumers.get(consumer).getRight() == null;
       recentlyJoinedConsumers.get(consumer).getRight() = theFirstEntryInCurrentReading;
   }

   PositionImpl maxReadPosition = getMaxReadPosition(consumer);
    ...
}

The progress runs like this:

time process: add consumer process: in-flight reading
1 read position is 1:6
2 an in-flight reading which tries to read 1:6~1:10
3 Add the new consumer into recently joined consumers
4 Add new consumer into the selector
5 Read entries 1:6 ~ 1:10 complete
6 Set read position 1:11
7 Set recentlyJoinedConsumers.consumer.first-position to 1:11
8 Try to send messages to consumers
9 Set recentlyJoinedConsumers.consumer.second-position to 1:6

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@poorbarcode

I understand.
However, I considered your approach, and then I found another issue. This issue is not specific to the approach you have proposed. It just means the read position is not suitable for recently joined consumers.

Details of the issue

Consider the following flow.

  1. Assume that,
    readPosition: 1:1
    messagesToRedeliver: []
    recentlyJoinedConsumers []
    c1: messagesForC: 1, pending: []
    c2: messagesForC: 1000, pending: []
    selector: key-a: c1, key-b: c2
  2. Dispatch m1 (key: key-a, position: 1:1, type: Normal)
    readPosition: 1:2
    messagesToRedeliver: []
    recentlyJoinedConsumers []
    c1: messagesForC: 0, pending: [m1]
    c2: messagesForC: 1000, pending: []
    selector: key-a: c1, key-b: c2
  3. Dispatch m2 (key: key-b, position: 1:2, type: Normal)
    readPosition: 1:3
    messagesToRedeliver: []
    recentlyJoinedConsumers []
    c1: messagesForC: 0, pending: [m1]
    c2: messagesForC: 999, pending: [m2]
    selector: key-a: c1, key-b: c2
  4. Dispatch m3 (key: key-a, position: 1:3, type: Normal)
    readPosition: 1:4
    messagesToRedeliver: [m3]
    recentlyJoinedConsumers []
    c1: messagesForC: 0, pending: [m1]
    c2: messagesForC: 999, pending: [m2]
    selector: key-a: c1, key-b: c2
  5. Dispatch m4 (key: key-b, position: 1:4, type: Normal) because PersistentStickyKeyDispatcherMultipleConsumers has the isDispatcherStuckOnReplays feature.
    readPosition: 1:5
    markDeletePosition: 1:0 (mock)
    messagesToRedeliver: [m3]
    recentlyJoinedConsumers []
    c1: messagesForC: 0, pending: [m1]
    c2: messagesForC: 998, pending: [m2, m4]
    selector: key-a: c1, key-b: c2
  6. Add consumer c3
    readPosition: 1:5
    messagesToRedeliver: [m3]
    recentlyJoinedConsumers [c3: 1:5]
    c1: messagesForC: 0, pending: [m1]
    c2: messagesForC: 998, pending: [m2, m4]
    c3: messagesForC: 1000, pending: []
    selector: key-a: c1, key-b: c2
  7. Dispatch m3 (key: key-a, position: 1:3, type: Replay)
    readPosition: 1:5
    messagesToRedeliver: []
    recentlyJoinedConsumers [c3: 1:5]
    c1: messagesForC: 0, pending: [m1]
    c2: messagesForC: 998, pending: [m2, m4]
    c3: messagesForC: 999, pending: [m3]
    selector: key-a: c3, key-b: c2
  8. Disconnect c1 and redelivery m1
    readPosition: 1:5
    messagesToRedeliver: []
    recentlyJoinedConsumers [c3: 1:5]
    c2: messagesForC: 998, pending: [m2, m4]
    c3: messagesForC: 998, pending: [m3, m1] // out of order
    selector: key-a: c3, key-b: c2

I tested this issue by following the test case.
Base commit hash: b7f0004

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
index 18fb141be31..66652eec73e 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
@@ -20,12 +20,15 @@ package org.apache.pulsar.client.api;
 
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotEquals;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 import java.io.IOException;
+import java.lang.reflect.Field;
 import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.util.ArrayList;
@@ -35,6 +38,8 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.NavigableMap;
+import java.util.NavigableSet;
 import java.util.Optional;
 import java.util.Random;
 import java.util.Set;
@@ -52,13 +57,17 @@ import lombok.Cleanup;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.nonpersistent.NonPersistentStickyKeyDispatcherMultipleConsumers;
+import org.apache.pulsar.broker.service.persistent.MessageRedeliveryController;
+import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
 import org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumers;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.client.impl.ConsumerImpl;
+import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.common.api.proto.KeySharedMode;
 import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.schema.KeyValue;
+import org.apache.pulsar.common.util.Codec;
 import org.apache.pulsar.common.util.Murmur3_32Hash;
 import org.awaitility.Awaitility;
 import org.slf4j.Logger;
@@ -1191,6 +1200,86 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
         l.await();
     }
 
+    @Test(timeOut = 30_000)
+    public void testCheckBetweenSkippingAndRecentlyJoinedConsumers() throws Exception {
+        conf.setSubscriptionKeySharedUseConsistentHashing(true);
+        conf.setSubscriptionKeySharedConsistentHashingReplicaPoints(100);
+
+        final String topicName = "persistent://public/default/recently-joined-consumers-" + UUID.randomUUID();
+        final String subName = "my-sub";
+
+        final ConsumerBuilder<String> cb = pulsarClient.newConsumer(Schema.STRING)
+                .topic(topicName)
+                .subscriptionName(subName)
+                .subscriptionType(SubscriptionType.Key_Shared);
+
+        // create 2 consumers
+        @Cleanup
+        final Consumer<String> c1 = cb.consumerName("c1").receiverQueueSize(1).subscribe();
+        @Cleanup
+        final Consumer<String> c2 = cb.consumerName("c2").receiverQueueSize(1000).subscribe();
+
+        @Cleanup
+        final Producer<String> p = pulsarClient.newProducer(Schema.STRING)
+                .topic(topicName)
+                .create();
+
+        final Set<String> c1Keys = Set.of("1", "3", "4", "5", "9");
+        final Set<String> c2Keys = Set.of("0", "2", "6", "7", "8");
+
+        // send and receive
+        Awaitility.await().untilAsserted(() -> assertEquals(admin.topics().getStats(topicName).getSubscriptions().get(subName).getConsumers().stream().filter(c -> c.getConsumerName().equals(c1.getConsumerName())).findFirst().get().getAvailablePermits(), 1));
+        p.newMessage().key(Integer.toString(1)).value("msg-0").send();
+        Awaitility.await().untilAsserted(() -> assertEquals(admin.topics().getStats(topicName).getSubscriptions().get(subName).getConsumers().stream().filter(c -> c.getConsumerName().equals(c1.getConsumerName())).findFirst().get().getAvailablePermits(), 0));
+
+        Awaitility.await().untilAsserted(() -> assertEquals(admin.topics().getStats(topicName).getSubscriptions().get(subName).getConsumers().stream().filter(c -> c.getConsumerName().equals(c2.getConsumerName())).findFirst().get().getAvailablePermits(), 1000));
+        p.newMessage().key(Integer.toString(0)).value("msg-1").send();
+        Awaitility.await().untilAsserted(() -> assertEquals(admin.topics().getStats(topicName).getSubscriptions().get(subName).getConsumers().stream().filter(c -> c.getConsumerName().equals(c2.getConsumerName())).findFirst().get().getAvailablePermits(), 999));
+
+        final MessageIdImpl msg2Id = (MessageIdImpl) p.newMessage().key(Integer.toString(1)).value("msg-2").send();
+        p.newMessage().key(Integer.toString(0)).value("msg-3").send();
+        Awaitility.await().untilAsserted(() -> assertEquals(admin.topics().getStats(topicName).getSubscriptions().get(subName).getConsumers().stream().filter(c -> c.getConsumerName().equals(c2.getConsumerName())).findFirst().get().getAvailablePermits(), 998));
+        Awaitility.await().untilAsserted(() -> assertEquals(admin.topics().getStats(topicName).getSubscriptions().get(subName).getMsgBacklog(), 4));
+        log.error("DEBUG: readPosition: {}", admin.topics().getInternalStats(topicName).cursors.get(Codec.encode(subName)).readPosition);
+
+        final PersistentStickyKeyDispatcherMultipleConsumers dispatcher =
+                (PersistentStickyKeyDispatcherMultipleConsumers) pulsar.getBrokerService().getTopicIfExists(topicName).get().get().getSubscription(subName).getDispatcher();
+        final Field redeliveryMessagesField = PersistentDispatcherMultipleConsumers.class
+                .getDeclaredField("redeliveryMessages");
+        redeliveryMessagesField.setAccessible(true);
+        final MessageRedeliveryController redeliveryMessages = (MessageRedeliveryController) redeliveryMessagesField.get(dispatcher);
+
+        final NavigableSet<PositionImpl> replayMsgSet = redeliveryMessages.getMessagesToReplayNow(4);
+        assertEquals(replayMsgSet.size(), 1);
+        final PositionImpl replayMsg = replayMsgSet.first();
+        assertEquals(replayMsg.compareTo(msg2Id.getLedgerId(), msg2Id.getEntryId()), 0);
+        log.error("DEBUG: replayMsgId: {}", replayMsg);
+
+        // add c3
+        @Cleanup
+        final Consumer<String> c3 = cb.consumerName("c3").subscribe();
+        final List<Message<String>> c3Msgs = new ArrayList<>();
+        log.error("DEBUG: recentlyJoinedConsumers: {}", admin.topics().getStats(topicName).getSubscriptions().get(subName).getConsumersAfterMarkDeletePosition());
+
+        final Set<String> c3Keys = Set.of("1", "2", "6", "7");
+
+        Message<String> msg3 = c3.receive(100, TimeUnit.MILLISECONDS);
+        assertNotNull(msg3);
+        c3Msgs.add(msg3);
+
+        // disconnect
+        c1.close();
+
+        msg3 = c3.receive(100, TimeUnit.MILLISECONDS);
+        assertNotNull(msg3);
+        c3Msgs.add(msg3);
+
+        // check out-of-order
+        assertTrue(c3Msgs.get(0).getMessageId().compareTo(c3Msgs.get(1).getMessageId()) > 0);
+        log.error("DEBUG: c3Msgs messageId: {}", c3Msgs.stream().map(Message::getMessageId).toList());
+        log.error("DEBUG: readPosition: {}", admin.topics().getInternalStats(topicName).cursors.get(Codec.encode(subName)).readPosition);
+        log.error("DEBUG: recentlyJoinedConsumers: {}", admin.topics().getStats(topicName).getSubscriptions().get(subName).getConsumersAfterMarkDeletePosition());
+    }
 
     private KeySharedMode getKeySharedModeOfSubscription(Topic topic, String subscription) {
         if (TopicName.get(topic.getName()).getDomain().equals(TopicDomain.persistent)) {

Here is the result.

% mvn test -pl pulsar-broker -Dtest=org.apache.pulsar.client.api.KeySharedSubscriptionTest#testCheckBetweenSkippingAndRecentlyJoinedConsumers
...
[INFO] -------------------------------------------------------
[INFO]  T E S T S
[INFO] -------------------------------------------------------
[INFO] Running org.apache.pulsar.client.api.KeySharedSubscriptionTest
[INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 8.545 s - in org.apache.pulsar.client.api.KeySharedSubscriptionTest
[INFO]
[INFO] Results:
[INFO]
[INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  18.220 s
[INFO] Finished at: 2023-05-22T15:22:03+09:00
[INFO] ------------------------------------------------------------------------

% grep 'DEBUG' pulsar-broker/target/surefire-reports/org.apache.pulsar.client.api.KeySharedSubscriptionTest-output.txt
2023-05-22T16:20:00,748 - ERROR - [TestNG-method=testCheckBetweenSkippingAndRecentlyJoinedConsumers-1:KeySharedSubscriptionTest@1243] - DEBUG: readPosition: 3:4
2023-05-22T16:20:00,748 - ERROR - [TestNG-method=testCheckBetweenSkippingAndRecentlyJoinedConsumers-1:KeySharedSubscriptionTest@1256] - DEBUG: replayMsgId: 3:2
2023-05-22T16:20:00,766 - ERROR - [TestNG-method=testCheckBetweenSkippingAndRecentlyJoinedConsumers-1:KeySharedSubscriptionTest@1262] - DEBUG: recentlyJoinedConsumers: {c3=3:4}
2023-05-22T16:20:00,770 - ERROR - [TestNG-method=testCheckBetweenSkippingAndRecentlyJoinedConsumers-1:KeySharedSubscriptionTest@1279] - DEBUG: c3Msgs messageId: [3:2:-1, 3:0:-1]
2023-05-22T16:20:00,774 - ERROR - [TestNG-method=testCheckBetweenSkippingAndRecentlyJoinedConsumers-1:KeySharedSubscriptionTest@1280] - DEBUG: readPosition: 3:4
2023-05-22T16:20:00,782 - ERROR - [TestNG-method=testCheckBetweenSkippingAndRecentlyJoinedConsumers-1:KeySharedSubscriptionTest@1281] - DEBUG: recentlyJoinedConsumers: {c3=3:4}

How to fix

I considered whether this issue is fixed in the proposed approaches.

  • your proposed approach [fix][broker] Fix recentlyJoinedConsumers to address the out-of-order issue #20179 (comment)
    • Conclusion: It doesn't fix this issue.
    • Reason:
      The 1st position is the read position. If this position is used as recently joined consumers, it causes an out of order.
      The 2nd position is the head of the normal reading entries. As we checked in before, we can send replay messages (e.g. m3) immediately when the consumer is connected. Therefore, this position could be null in this case.
  • last sent position and individually sent positions approach [fix][broker] Fix recentlyJoinedConsumers to address the out-of-order issue #20179 (comment)
    • Conclusion: It will fix this issue.
    • Reason:
      This feature guarantees all messages less than or equal to the last sent position are already scheduled to be sent. Therefore, skipped messages (e.g. m3) are greater than the last sent position.
  • last sent positions approach [fix][broker] Fix recentlyJoinedConsumers to address the out-of-order issue #20179
    • Conclusion: It will fix this issue.
    • Reason:
      Same as above. I tested as follows.
      % mvn test -pl pulsar-common,pulsar-client-admin-api,managed-ledger,pulsar-broker -Dtest=org.apache.pulsar.client.api.KeySharedSubscriptionTest#testCheckBetweenSkippingAndRecentlyJoinedConsumers -DfailIfNoTests=false
      ...
      [INFO] -------------------------------------------------------
      [INFO]  T E S T S
      [INFO] -------------------------------------------------------
      [INFO] Running org.apache.pulsar.client.api.KeySharedSubscriptionTest
      [ERROR] Tests run: 2, Failures: 1, Errors: 0, Skipped: 1, Time elapsed: 14.301 s <<< FAILURE! - in org.apache.pulsar.client.api.KeySharedSubscriptionTest
      [ERROR] testCheckBetweenSkippingAndRecentlyJoinedConsumers(org.apache.pulsar.client.api.KeySharedSubscriptionTest)  Time elapsed: 0.914 s  <<< FAILURE!
      java.lang.AssertionError: expected object to not be null
              at org.testng.Assert.fail(Assert.java:110)
              at org.testng.Assert.assertNotNull(Assert.java:1319)
              at org.testng.Assert.assertNotNull(Assert.java:1303)
              at org.apache.pulsar.client.api.KeySharedSubscriptionTest.testCheckBetweenSkippingAndRecentlyJoinedConsumers(KeySharedSubscriptionTest.java:1443)
              at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
              at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
              at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
              at java.base/java.lang.reflect.Method.invoke(Method.java:568)
              at org.testng.internal.invokers.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:139)
              at org.testng.internal.invokers.InvokeMethodRunnable.runOne(InvokeMethodRunnable.java:47)
              at org.testng.internal.invokers.InvokeMethodRunnable.call(InvokeMethodRunnable.java:76)
              at org.testng.internal.invokers.InvokeMethodRunnable.call(InvokeMethodRunnable.java:11)
              at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
              at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
              at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
              at java.base/java.lang.Thread.run(Thread.java:833)
      
      [INFO]
      [INFO] Results:
      [INFO]
      [ERROR] Failures:
      [ERROR] org.apache.pulsar.client.api.KeySharedSubscriptionTest.testCheckBetweenSkippingAndRecentlyJoinedConsumers(org.apache.pulsar.client.api.KeySharedSubscriptionTest)
      [INFO]   Run 1: PASS
      [ERROR]   Run 2: KeySharedSubscriptionTest.testCheckBetweenSkippingAndRecentlyJoinedConsumers:1443 expected object to not be null
      [INFO]
      [INFO]
      [ERROR] Tests run: 1, Failures: 1, Errors: 0, Skipped: 0
      [INFO]
      [INFO] ------------------------------------------------------------------------
      [INFO] Reactor Summary for Pulsar Client Admin :: API 3.0.0-SNAPSHOT:
      [INFO]
      [INFO] Pulsar Client Admin :: API ......................... SUCCESS [  1.076 s]
      [INFO] Pulsar Common ...................................... SUCCESS [  3.927 s]
      [INFO] Managed Ledger ..................................... SUCCESS [  1.756 s]
      [INFO] Pulsar Broker ...................................... FAILURE [ 23.586 s]
      [INFO] ------------------------------------------------------------------------
      [INFO] BUILD FAILURE
      [INFO] ------------------------------------------------------------------------
      [INFO] Total time:  31.095 s
      [INFO] Finished at: 2023-05-22T17:45:15+09:00
      [INFO] ------------------------------------------------------------------------
      
      % grep 'DEBUG' pulsar-broker/target/surefire-reports/org.apache.pulsar.client.api.KeySharedSubscriptionTest-output.txt
      2023-05-22T17:45:06,279 - ERROR - [TestNG-method=testCheckBetweenSkippingAndRecentlyJoinedConsumers-1:KeySharedSubscriptionTest@1419] - DEBUG: readPosition: 4:4
      2023-05-22T17:45:06,280 - ERROR - [TestNG-method=testCheckBetweenSkippingAndRecentlyJoinedConsumers-1:KeySharedSubscriptionTest@1432] - DEBUG: replayMsgId: 4:2
      2023-05-22T17:45:06,298 - ERROR - [TestNG-method=testCheckBetweenSkippingAndRecentlyJoinedConsumers-1:KeySharedSubscriptionTest@1438] - DEBUG: recentlyJoinedConsumers: {consumerName=c3, consumerId=2, address=/127.0.0.1:65342=[4:0, 4:3]}
      2023-05-22T17:45:11,361 - ERROR - [TestNG-method=testCheckBetweenSkippingAndRecentlyJoinedConsumers-1:KeySharedSubscriptionTest@1419] - DEBUG: readPosition: 4:4
      2023-05-22T17:45:11,361 - ERROR - [TestNG-method=testCheckBetweenSkippingAndRecentlyJoinedConsumers-1:KeySharedSubscriptionTest@1432] - DEBUG: replayMsgId: 4:2
      2023-05-22T17:45:11,377 - ERROR - [TestNG-method=testCheckBetweenSkippingAndRecentlyJoinedConsumers-1:KeySharedSubscriptionTest@1438] - DEBUG: recentlyJoinedConsumers: {consumerName=c3, consumerId=2, address=/127.0.0.1:65353=[4:0, 4:3]}
      

Copy link
Contributor

@poorbarcode poorbarcode May 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@equanz Sorry, my reply has been delayed again.

Thank you for your detailed description(^_^), let me summarize this scenario:

  1. a message(Msg_1) was put into the Replay Queue caused by the target consumer(C_1) having no permits.
  2. at this moment, the new Consumer(C_2) takes over the key of Msg_1 above.
  3. in the next round, Msg_1 was delivered to C_2.

Is this explain correct?[1]


If yes[1], I have some question: If we revert the PR #20026, the step 1 and step 2 can not be executed concurrently, right?[2]


If yes [2], there have 2 case:

Case 1: step 1 runs before step 2

In this case, the max read position of C_2 should be larger than Msg-1, then Msg_1 will not be sent to C_2 in the next round.

Case 1: step 2 runs before step 1

In this case, the key of Msg_1 above will still be taken over by C_1, then Msg_1 will not be sent to C_2 in the next round.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @poorbarcode

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@equanz I think this is a good way to solve the above problems and I have several suggestions

  • Since the current mechanism of Key_Shared mode is complex enough. hopefully, a PIP can be proposed for this change
  • Add a mechanism to limit the memory cost of the mapping of the key and last sent position.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay. I'll consider the next action. Thank you, @poorbarcode.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for too late.
I opened a new PIP design PR.

#20776

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@equanz Thanks

@github-actions
Copy link

github-actions bot commented Jul 1, 2023

The pr had no activity for 30 days, mark with Stale label.

@github-actions github-actions bot added the Stale label Jul 1, 2023
@Technoboy- Technoboy- added this to the 3.2.0 milestone Jul 31, 2023
@Technoboy- Technoboy- modified the milestones: 3.2.0, 3.3.0 Dec 22, 2023
@equanz
Copy link
Contributor Author

equanz commented Jan 26, 2024

I opened a new PR #21953.

@equanz
Copy link
Contributor Author

equanz commented Aug 1, 2024

#21953 was merged.

@equanz equanz closed this Aug 1, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
doc-required Your PR changes impact docs and you will update later. Stale
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants