Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Geo-replication] Subscription replication is not working across clusters #10054

Open
yabinmeng opened this issue Mar 26, 2021 · 25 comments · May be fixed by #16651
Open

[Geo-replication] Subscription replication is not working across clusters #10054

yabinmeng opened this issue Mar 26, 2021 · 25 comments · May be fixed by #16651
Labels
type/bug The PR fixed a bug or issue reported a bug

Comments

@yabinmeng
Copy link

yabinmeng commented Mar 26, 2021

Describe the bug
I'm testing geo-replication with subscription enabled. In my testing environment, there are 2 clusters (ClusterA and ClusterB) with a global zookeeper on a dedicated server host.

Each cluster also has its own producer and consumer client application which is based on Pulsar (Java) producer API and consumer Sync API.

Below is the summary of my testing steps:

  1. ProducerA publishes some messages (e.g. 10 messages 0 ~ 9) to a topic that is geo-replication enabled.
  2. ConsumerA receives the first half of the messages (e.g. msg 0 ~ 4) from the same topic and gets killed.
  3. After a short while (> 10s), starts ConsumerB in ClusterB to receive messages from the same topic.

I'm expecting ConsumerB to receive the second half of the messages (e.g. msg 5 ~ 9) if subscription replication is working properly. But the results shows that for most time, ConsumerB receives all 10 messages (e.g. msg 0 ~ 9). There are also several cases where ConsumerB receives 6 or 7 messages (e.g. msg 3 ~ 9 or 4 ~ 9).

My testing is based on Pulsar 2.6.3 with default configuration, with key geo-replication/subscription replication settings below:

broker.conf

enableReplicatedSubscriptions=true
replicatedSubscriptionsSnapshotFrequencyMillis=1000

The consumer application also has the following code added:

   .replicateSubscriptionState(true)

To Reproduce
See "Description" section

Expected behavior
See "Description" section

Screenshots
Below is one example of the testing results;

  1. ProducerA publishes 6 messages to a geo-replication enabled topic "georep/default/t1"
$ ./producerA.sh -n 6 -t georep/default/t1
2021-03-24 06:51:54.965
---------------------------------------------------------
Cluster Name: ClusterA
Producer    : [P]0090276c-8cd2-11eb-b297-d555334bf2d0
Topic       : persistent://georep/default/t1
Number of messages to publish: 6
---------------------------------------------------------
  message published: msg-key=[0] 20210324-065155-016, msg-payload=mN8etPUo9l0PDTW2cpa0
  message published: msg-key=[1] 20210324-065155-089, msg-payload=z4tGKNSz4vmnqWIwIKO6kiclyhhso
  message published: msg-key=[2] 20210324-065155-144, msg-payload=8T0O88q9upN1bqvEtIiEko208fHM4R0HMP1it1NycVkavxaR5b41OR7NX
  message published: msg-key=[3] 20210324-065155-199, msg-payload=AlMs3I3C2xGa0BJu8MIv1o
  message published: msg-key=[4] 20210324-065155-253, msg-payload=1CcO0W4qInSLYhOJrjvVMw04Ti32x8sk8xoyt4wwHGHL6GfctcvFKcf
  message published: msg-key=[5] 20210324-065155-308, msg-payload=GtzTIPuBtsBG21sSiHv2CBEv0gdP4k
  1. ConsumerA receives 3 messages from this topic
$ ./consumerA.sh -sn mysub -t georep/default/t1 -n 3
2021-03-24 06:52:17.993
---------------------------------------------------------
Cluster Name: ClusterA
Consumer    : [C]0e4d002b-8cd2-11eb-8164-69da960415c9
Topic       : persistent://georep/default/t1
Subscription: mysub
---------------------------------------------------------
  message received: msg-key=[0] 20210324-065155-016, msg-properties={cluster=ClusterA}, msg-payload=mN8etPUo9l0PDTW2cpa0, publish-time: 2021-03-24 06:51:55.021, receive-time: 2021-03-24 06:52:18.956
  message received: msg-key=[1] 20210324-065155-089, msg-properties={cluster=ClusterA}, msg-payload=z4tGKNSz4vmnqWIwIKO6kiclyhhso, publish-time: 2021-03-24 06:51:55.089, receive-time: 2021-03-24 06:52:18.957
  message received: msg-key=[2] 20210324-065155-144, msg-properties={cluster=ClusterA}, msg-payload=8T0O88q9upN1bqvEtIiEko208fHM4R0HMP1it1NycVkavxaR5b41OR7NX, publish-time: 2021-03-24 06:51:55.144, receive-time: 2021-03-24 06:52:18.957
  1. Wait for a short while and starts ConsumerB in ClusterB and tries to receive all remaining messages. But actually it receives all 6 messages, the first 3 of which have already been consumed and acknowledged by ClusterA
./consumerB.sh -sn mysub -t georep/default/t1
2021-03-24 06:52:54.635
---------------------------------------------------------
Cluster Name: ClusterB
Consumer    : [C]24216084-8cd2-11eb-b01c-bff91b5df0bd
Topic       : persistent://georep/default/t1
Subscription: mysub
---------------------------------------------------------
  message received: msg-key=[0] 20210324-065155-016, msg-properties={cluster=ClusterA}, msg-payload=mN8etPUo9l0PDTW2cpa0, publish-time: 2021-03-24 06:51:55.021, receive-time: 2021-03-24 06:52:55.028
  message received: msg-key=[1] 20210324-065155-089, msg-properties={cluster=ClusterA}, msg-payload=z4tGKNSz4vmnqWIwIKO6kiclyhhso, publish-time: 2021-03-24 06:51:55.089, receive-time: 2021-03-24 06:52:55.028
  message received: msg-key=[2] 20210324-065155-144, msg-properties={cluster=ClusterA}, msg-payload=8T0O88q9upN1bqvEtIiEko208fHM4R0HMP1it1NycVkavxaR5b41OR7NX, publish-time: 2021-03-24 06:51:55.144, receive-time: 2021-03-24 06:52:55.029
  message received: msg-key=[3] 20210324-065155-199, msg-properties={cluster=ClusterA}, msg-payload=AlMs3I3C2xGa0BJu8MIv1o, publish-time: 2021-03-24 06:51:55.199, receive-time: 2021-03-24 06:52:55.029
  message received: msg-key=[4] 20210324-065155-253, msg-properties={cluster=ClusterA}, msg-payload=1CcO0W4qInSLYhOJrjvVMw04Ti32x8sk8xoyt4wwHGHL6GfctcvFKcf, publish-time: 2021-03-24 06:51:55.254, receive-time: 2021-03-24 06:52:55.030
  message received: msg-key=[5] 20210324-065155-308, msg-properties={cluster=ClusterA}, msg-payload=GtzTIPuBtsBG21sSiHv2CBEv0gdP4k, publish-time: 2021-03-24 06:51:55.308, receive-time: 2021-03-24 06:52:55.030

Desktop (please complete the following information):

  • OS: Ubuntu 16

Additional context
Add any other context about the problem here.

@codelipenghui
Copy link
Contributor

@yabinmeng Does the consumeA acknowledged the received messages before killing it? You can try to get the topic internal-stats(bin/pulsar-admin topics stats-internal) before run consumerB to make sure the mark delete position is moving forward in the local cluster.

@yabinmeng
Copy link
Author

@codelipenghui yes, consumeA acknowledged each message after receiving it.

@lhotari
Copy link
Member

lhotari commented Mar 31, 2021

I did some investigation on this issue and I found a pattern.
The problem occurs if there are pending messages in the subscription.

As described above, I first produced 6 messages in the topic in cluster A.
If I consume all 6 messages in the consumer running in cluster A, starting the consumer in cluster B won't get any messages as expected.
However if only 3 messages are consumed and 3 remain in the subscription, it doesn't work.

@codelipenghui Thanks to the hint of using stats-internal .

After producing 6 messages:

from cluster A:

{
  "entriesAddedCounter" : 30934,
  "numberOfEntries" : 30934,
  "totalSize" : 3074239,
  "currentLedgerEntries" : 30934,
  "currentLedgerSize" : 3074239,
  "lastLedgerCreatedTimestamp" : "2021-03-31T07:34:57.665Z",
  "waitingCursorsCount" : 1,
  "pendingAddEntriesCount" : 0,
  "lastConfirmedEntry" : "4:30933",
  "state" : "LedgerOpened",
  "ledgers" : [ {
    "ledgerId" : 4,
    "entries" : 0,
    "size" : 0,
    "offloaded" : false
  } ],
  "cursors" : {
    "mysub" : {
      "markDeletePosition" : "4:30732",
      "readPosition" : "4:30733",
      "waitingReadOp" : false,
      "pendingReadOps" : 0,
      "messagesConsumedCounter" : 30733,
      "cursorLedger" : 6,
      "cursorLedgerLastEntry" : 75,
      "individuallyDeletedMessages" : "[]",
      "lastLedgerSwitchTimestamp" : "2021-03-31T07:34:57.676Z",
      "state" : "Open",
      "numberOfEntriesSinceFirstNotAckedMessage" : 1,
      "totalNonContiguousDeletedMessagesRange" : 0,
      "properties" : {
        "pulsar.replicated.subscription" : 1
      }
    },
    "pulsar.repl.cluster-b" : {
      "markDeletePosition" : "4:30933",
      "readPosition" : "4:30934",
      "waitingReadOp" : true,
      "pendingReadOps" : 0,
      "messagesConsumedCounter" : 30934,
      "cursorLedger" : 5,
      "cursorLedgerLastEntry" : 5233,
      "individuallyDeletedMessages" : "[]",
      "lastLedgerSwitchTimestamp" : "2021-03-31T07:34:57.674Z",
      "state" : "Open",
      "numberOfEntriesSinceFirstNotAckedMessage" : 1,
      "totalNonContiguousDeletedMessagesRange" : 0,
      "properties" : { }
    }
  }
}

from cluster B:

{
  "entriesAddedCounter" : 31144,
  "numberOfEntries" : 31144,
  "totalSize" : 3150803,
  "currentLedgerEntries" : 31144,
  "currentLedgerSize" : 3150803,
  "lastLedgerCreatedTimestamp" : "2021-03-31T07:34:58.097Z",
  "waitingCursorsCount" : 1,
  "pendingAddEntriesCount" : 0,
  "lastConfirmedEntry" : "0:31143",
  "state" : "LedgerOpened",
  "ledgers" : [ {
    "ledgerId" : 0,
    "entries" : 0,
    "size" : 0,
    "offloaded" : false
  } ],
  "cursors" : {
    "mysub" : {
      "markDeletePosition" : "0:30727",
      "readPosition" : "0:30728",
      "waitingReadOp" : false,
      "pendingReadOps" : 0,
      "messagesConsumedCounter" : 30728,
      "cursorLedger" : 2,
      "cursorLedgerLastEntry" : 81,
      "individuallyDeletedMessages" : "[]",
      "lastLedgerSwitchTimestamp" : "2021-03-31T07:34:58.833Z",
      "state" : "Open",
      "numberOfEntriesSinceFirstNotAckedMessage" : 1,
      "totalNonContiguousDeletedMessagesRange" : 0,
      "properties" : {
        "pulsar.replicated.subscription" : 1
      }
    },
    "pulsar.repl.cluster-a" : {
      "markDeletePosition" : "0:31143",
      "readPosition" : "0:31144",
      "waitingReadOp" : true,
      "pendingReadOps" : 0,
      "messagesConsumedCounter" : 31144,
      "cursorLedger" : 1,
      "cursorLedgerLastEntry" : 5274,
      "individuallyDeletedMessages" : "[]",
      "lastLedgerSwitchTimestamp" : "2021-03-31T07:34:58.219Z",
      "state" : "Open",
      "numberOfEntriesSinceFirstNotAckedMessage" : 1,
      "totalNonContiguousDeletedMessagesRange" : 0,
      "properties" : { }
    }
  }
}

After consuming 3 messages in cluster A:

from cluster A:

{
  "entriesAddedCounter" : 31510,
  "numberOfEntries" : 31510,
  "totalSize" : 3134935,
  "currentLedgerEntries" : 31510,
  "currentLedgerSize" : 3134935,
  "lastLedgerCreatedTimestamp" : "2021-03-31T07:34:57.665Z",
  "waitingCursorsCount" : 1,
  "pendingAddEntriesCount" : 0,
  "lastConfirmedEntry" : "4:31509",
  "state" : "LedgerOpened",
  "ledgers" : [ {
    "ledgerId" : 4,
    "entries" : 0,
    "size" : 0,
    "offloaded" : false
  } ],
  "cursors" : {
    "mysub" : {
      "markDeletePosition" : "4:30765",
      "readPosition" : "4:31410",
      "waitingReadOp" : false,
      "pendingReadOps" : 0,
      "messagesConsumedCounter" : 31407,
      "cursorLedger" : 6,
      "cursorLedgerLastEntry" : 77,
      "individuallyDeletedMessages" : "[(4:30768..4:31409]]",
      "lastLedgerSwitchTimestamp" : "2021-03-31T07:34:57.676Z",
      "state" : "Open",
      "numberOfEntriesSinceFirstNotAckedMessage" : 645,
      "totalNonContiguousDeletedMessagesRange" : 1,
      "properties" : {
        "pulsar.replicated.subscription" : 1
      }
    },
    "pulsar.repl.cluster-b" : {
      "markDeletePosition" : "4:31509",
      "readPosition" : "4:31510",
      "waitingReadOp" : true,
      "pendingReadOps" : 0,
      "messagesConsumedCounter" : 31510,
      "cursorLedger" : 5,
      "cursorLedgerLastEntry" : 5347,
      "individuallyDeletedMessages" : "[]",
      "lastLedgerSwitchTimestamp" : "2021-03-31T07:34:57.674Z",
      "state" : "Open",
      "numberOfEntriesSinceFirstNotAckedMessage" : 1,
      "totalNonContiguousDeletedMessagesRange" : 0,
      "properties" : { }
    }
  }
}

from cluster B:

{
  "entriesAddedCounter" : 31615,
  "numberOfEntries" : 31615,
  "totalSize" : 3200474,
  "currentLedgerEntries" : 31615,
  "currentLedgerSize" : 3200474,
  "lastLedgerCreatedTimestamp" : "2021-03-31T07:34:58.097Z",
  "waitingCursorsCount" : 1,
  "pendingAddEntriesCount" : 0,
  "lastConfirmedEntry" : "0:31614",
  "state" : "LedgerOpened",
  "ledgers" : [ {
    "ledgerId" : 0,
    "entries" : 0,
    "size" : 0,
    "offloaded" : false
  } ],
  "cursors" : {
    "mysub" : {
      "markDeletePosition" : "0:30758",
      "readPosition" : "0:30759",
      "waitingReadOp" : false,
      "pendingReadOps" : 0,
      "messagesConsumedCounter" : 30759,
      "cursorLedger" : 2,
      "cursorLedgerLastEntry" : 83,
      "individuallyDeletedMessages" : "[]",
      "lastLedgerSwitchTimestamp" : "2021-03-31T07:34:58.833Z",
      "state" : "Open",
      "numberOfEntriesSinceFirstNotAckedMessage" : 1,
      "totalNonContiguousDeletedMessagesRange" : 0,
      "properties" : {
        "pulsar.replicated.subscription" : 1
      }
    },
    "pulsar.repl.cluster-a" : {
      "markDeletePosition" : "0:31614",
      "readPosition" : "0:31615",
      "waitingReadOp" : true,
      "pendingReadOps" : 0,
      "messagesConsumedCounter" : 31615,
      "cursorLedger" : 1,
      "cursorLedgerLastEntry" : 5367,
      "individuallyDeletedMessages" : "[]",
      "lastLedgerSwitchTimestamp" : "2021-03-31T07:34:58.219Z",
      "state" : "Open",
      "numberOfEntriesSinceFirstNotAckedMessage" : 1,
      "totalNonContiguousDeletedMessagesRange" : 0,
      "properties" : { }
    }
  }
}

It seems that the problem might be related to the individuallyDeletedMessages state.
PIP-33 mentions "we're only targeting to sync the "mark-delete" position (eg: offset), without considering the messages deleted out of order after that point."

@codelipenghui Is it expected that individuallyDeletedMessages should be used? The messages are consumed in order. Why doesn't the markDeletePosition move instead? It seems to move only when all pending messages are consumed.

@lhotari
Copy link
Member

lhotari commented Mar 31, 2021

this is another example where there's multiple individual message ranges:

{
  "entriesAddedCounter" : 38551,
  "numberOfEntries" : 38551,
  "totalSize" : 3907922,
  "currentLedgerEntries" : 38551,
  "currentLedgerSize" : 3907922,
  "lastLedgerCreatedTimestamp" : "2021-03-31T07:34:58.097Z",
  "waitingCursorsCount" : 1,
  "pendingAddEntriesCount" : 0,
  "lastConfirmedEntry" : "0:38550",
  "state" : "LedgerOpened",
  "ledgers" : [ {
    "ledgerId" : 0,
    "entries" : 0,
    "size" : 0,
    "offloaded" : false
  } ],
  "cursors" : {
    "mysub" : {
      "markDeletePosition" : "0:38447",
      "readPosition" : "0:38506",
      "waitingReadOp" : false,
      "pendingReadOps" : 0,
      "messagesConsumedCounter" : 38503,
      "cursorLedger" : 2,
      "cursorLedgerLastEntry" : 97,
      "individuallyDeletedMessages" : "[(0:38448..0:38451],(0:38452..0:38454],(0:38455..0:38505]]",
      "lastLedgerSwitchTimestamp" : "2021-03-31T07:34:58.833Z",
      "state" : "Open",
      "numberOfEntriesSinceFirstNotAckedMessage" : 59,
      "totalNonContiguousDeletedMessagesRange" : 3,
      "properties" : {
        "pulsar.replicated.subscription" : 1
      }
    },
    "pulsar.repl.cluster-a" : {
      "markDeletePosition" : "0:38550",
      "readPosition" : "0:38551",
      "waitingReadOp" : true,
      "pendingReadOps" : 0,
      "messagesConsumedCounter" : 38551,
      "cursorLedger" : 1,
      "cursorLedgerLastEntry" : 6556,
      "individuallyDeletedMessages" : "[]",
      "lastLedgerSwitchTimestamp" : "2021-03-31T07:34:58.219Z",
      "state" : "Open",
      "numberOfEntriesSinceFirstNotAckedMessage" : 1,
      "totalNonContiguousDeletedMessagesRange" : 0,
      "properties" : { }
    }
  }
}

in this case, the 1. consumer was run on cluster-B (to consume 3 messages). The other steps performed where the same as described in the previous comment.

@lhotari
Copy link
Member

lhotari commented Mar 31, 2021

I also wonder about the messagesConsumedCounter. I've only produced and consumed <50 messages in this cluster after it's creation. The readPosition and markDeletePosition seem to increase a lot too. I wonder if this is because of the subscription replication marker messages that are part of the messages in the topic (as described in PIP-33).

It seems that the readPosition, markDeletePosition and individuallyDeletedMessages doesn't take it into account that the counter is also increased by the replication subscription messages. IIRC, some of the logic make assumptions based on the gaps in the counter value. This is just a hunch of what the problem could be.

@lhotari
Copy link
Member

lhotari commented Mar 31, 2021

another observation. After leaving it running, there are negative values in the messagesConsumedCounter:

{
  "entriesAddedCounter" : 0,
  "numberOfEntries" : 48221,
  "totalSize" : 4931008,
  "currentLedgerEntries" : 0,
  "currentLedgerSize" : 0,
  "lastLedgerCreatedTimestamp" : "2021-03-31T10:42:19.984Z",
  "waitingCursorsCount" : 0,
  "pendingAddEntriesCount" : 0,
  "lastConfirmedEntry" : "0:48220",
  "state" : "LedgerOpened",
  "ledgers" : [ {
    "ledgerId" : 0,
    "entries" : 48221,
    "size" : 4931008,
    "offloaded" : false
  }, {
    "ledgerId" : 3,
    "entries" : 0,
    "size" : 0,
    "offloaded" : false
  } ],
  "cursors" : {
    "mysub" : {
      "markDeletePosition" : "0:38396",
      "readPosition" : "0:38397",
      "waitingReadOp" : false,
      "pendingReadOps" : 0,
      "messagesConsumedCounter" : -9824,
      "cursorLedger" : 2,
      "cursorLedgerLastEntry" : 97,
      "individuallyDeletedMessages" : "[]",
      "lastLedgerSwitchTimestamp" : "2021-03-31T10:42:19.999Z",
      "state" : "NoLedger",
      "numberOfEntriesSinceFirstNotAckedMessage" : 1,
      "totalNonContiguousDeletedMessagesRange" : 0,
      "properties" : {
        "pulsar.replicated.subscription" : 1
      }
    },
    "pulsar.repl.cluster-a" : {
      "markDeletePosition" : "0:48216",
      "readPosition" : "0:48217",
      "waitingReadOp" : false,
      "pendingReadOps" : 0,
      "messagesConsumedCounter" : -4,
      "cursorLedger" : 1,
      "cursorLedgerLastEntry" : 8490,
      "individuallyDeletedMessages" : "[]",
      "lastLedgerSwitchTimestamp" : "2021-03-31T10:42:20.002Z",
      "state" : "NoLedger",
      "numberOfEntriesSinceFirstNotAckedMessage" : 1,
      "totalNonContiguousDeletedMessagesRange" : 0,
      "properties" : { }
    }
  }
}

It seems odd to have negative values. I wonder if this causes any issues.

@lhotari
Copy link
Member

lhotari commented Mar 31, 2021

It seems that org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl#getPreviousPosition doesn't take the "marker messages" for replicated subscriptions into account and that is part of the problem.

public PositionImpl getPreviousPosition(PositionImpl position) {
if (position.getEntryId() > 0) {
return PositionImpl.get(position.getLedgerId(), position.getEntryId() - 1);
}
// The previous position will be the last position of an earlier ledgers
NavigableMap<Long, LedgerInfo> headMap = ledgers.headMap(position.getLedgerId(), false);
if (headMap.isEmpty()) {
// There is no previous ledger, return an invalid position in the current ledger
return PositionImpl.get(position.getLedgerId(), -1);
}
// We need to find the most recent non-empty ledger
for (long ledgerId : headMap.descendingKeySet()) {
LedgerInfo li = headMap.get(ledgerId);
if (li.getEntries() > 0) {
return PositionImpl.get(li.getLedgerId(), li.getEntries() - 1);
}
}
// in case there are only empty ledgers, we return a position in the first one
return PositionImpl.get(headMap.firstEntry().getKey(), -1);
}

@lhotari
Copy link
Member

lhotari commented Mar 31, 2021

In PIP-33 there is this documented limitation:

For this proposal, we're only targeting to sync the "mark-delete" position (eg: offset), without considering the messages deleted out of order after that point. These will appear as duplicates after a cluster failover. In future, it might be possible to combine different techniques to track individually deleted messages as well.

It seems that because of the "marker messages" being acknowledged automatically, the messages get deleted out of order in all cases where the consumer doesn't keep up with the producer. This explains the individuallyDeletedMessages ranges seen when the problem occurs. When there are individuallyDeletedMessages in the subscription, the markDeletedPosition doesn't get updated and therefore the subscription state doesn't get replicated.

@merlimat Is this also a known limitation in the design? Do you have plans for addressing this issue where the "marker message" acknowledgements prevent subscription replication by causing out-of-order deletions? I might have misunderstood how the solution works and what is causing the behavior reported in the description of this issue.

@lhotari
Copy link
Member

lhotari commented Apr 1, 2021

this test 28faa8e reproduces the issue when "allowDuplicates" is set to "false". The test is part of PR #10098 .

@lhotari
Copy link
Member

lhotari commented Apr 16, 2021

@merlimat do you have a chance to take a look at this issue and answer the question above? thanks

the test case to reproduce the issue has been merged to master branch. You can reproduce the issue by modifying allowDuplicates = false in the org.apache.pulsar.broker.service.ReplicatorSubscriptionTest#testReplicatedSubscriptionAcrossTwoRegions test case:

/**
* Tests replicated subscriptions across two regions
*/
@Test
public void testReplicatedSubscriptionAcrossTwoRegions() throws Exception {
String namespace = BrokerTestUtil.newUniqueName("pulsar/replicatedsubscription");
String topicName = "persistent://" + namespace + "/mytopic";
String subscriptionName = "cluster-subscription";
// Subscription replication produces duplicates, https://github.com/apache/pulsar/issues/10054
// TODO: duplications shouldn't be allowed, change to "false" when fixing the issue
boolean allowDuplicates = true;
// this setting can be used to manually run the test with subscription replication disabled
// it shows that subscription replication has no impact in behavior for this test case
boolean replicateSubscriptionState = true;
admin1.namespaces().createNamespace(namespace);
admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2"));
@Cleanup
PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString())
.statsInterval(0, TimeUnit.SECONDS)
.build();
// create subscription in r1
createReplicatedSubscription(client1, topicName, subscriptionName, replicateSubscriptionState);
@Cleanup
PulsarClient client2 = PulsarClient.builder().serviceUrl(url2.toString())
.statsInterval(0, TimeUnit.SECONDS)
.build();
// create subscription in r2
createReplicatedSubscription(client2, topicName, subscriptionName, replicateSubscriptionState);
Set<String> sentMessages = new LinkedHashSet<>();
// send messages in r1
{
@Cleanup
Producer<byte[]> producer = client1.newProducer().topic(topicName)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();
int numMessages = 6;
for (int i = 0; i < numMessages; i++) {
String body = "message" + i;
producer.send(body.getBytes(StandardCharsets.UTF_8));
sentMessages.add(body);
}
producer.close();
}
Set<String> receivedMessages = new LinkedHashSet<>();
// consume 3 messages in r1
try (Consumer<byte[]> consumer1 = client1.newConsumer()
.topic(topicName)
.subscriptionName(subscriptionName)
.replicateSubscriptionState(replicateSubscriptionState)
.subscribe()) {
readMessages(consumer1, receivedMessages, 3, allowDuplicates);
}
// wait for subscription to be replicated
Thread.sleep(2 * config1.getReplicatedSubscriptionsSnapshotFrequencyMillis());
// consume remaining messages in r2
try (Consumer<byte[]> consumer2 = client2.newConsumer()
.topic(topicName)
.subscriptionName(subscriptionName)
.replicateSubscriptionState(replicateSubscriptionState)
.subscribe()) {
readMessages(consumer2, receivedMessages, -1, allowDuplicates);
}
// assert that all messages have been received
assertEquals(new ArrayList<>(sentMessages), new ArrayList<>(receivedMessages), "Sent and received " +
"messages don't match.");
}

Is it a valid expectation that the test case is making (when allowDuplicates = false)?

(btw. It seems that replicated subscriptions are broken in the master branch. The fix #10247 is required, but that doesn't fix this issue.)

@merlimat
Copy link
Contributor

I took a look at the test and the issue is that by the time the consumer has acknowledged all the messages there is still no valid snapshot, therefore we cannot advance the position of the replicated subscription.

After the topic is created, the brokers will start sending markers and creating snapshots of the cursors. This happens with default cadence of 1sec.

If the consumer asks everything before that time, there will be no snapshot created yet. If traffic continues, after few secs, the subscription will be replicated correctly.

To "fix" the test I just put a Thread.sleep(2 * config1.getReplicatedSubscriptionsSnapshotFrequencyMillis()) before the producer gets started.

One other minor thing in the test is that we don't need to create the subscription in R2 (

createReplicatedSubscription(client2, topicName, subscriptionName, replicateSubscriptionState);
). Creating the replicated subscription in R1 will automatically trigger the creation of the subscription in all the clusters (where the topic is being replicated).

@lhotari
Copy link
Member

lhotari commented Apr 19, 2021

To "fix" the test I just put a Thread.sleep(2 * config1.getReplicatedSubscriptionsSnapshotFrequencyMillis()) before the producer gets started.

One other minor thing in the test is that we don't need to create the subscription in R2 (

Thanks for checking this @merlimat .

I tried these changes in this experiment: lhotari@2892e25

However, if the subscription isn't created before the producer is created, no messages are received on R2. I wonder if the test case is missing some necessary configuration?

@lhotari lhotari changed the title [Geo-replicaiton] Subscription replication is not working across clusters [Geo-replication] Subscription replication is not working across clusters May 18, 2021
@liguangcheng
Copy link

@codelipenghui yes, consumeA acknowledged each message after receiving it.

@yabinmeng Has the problem of not being able to continue consumption on another cluster been resolved?

@liguangcheng
Copy link

I took a look at the test and the issue is that by the time the consumer has acknowledged all the messages there is still no valid snapshot, therefore we cannot advance the position of the replicated subscription.

After the topic is created, the brokers will start sending markers and creating snapshots of the cursors. This happens with default cadence of 1sec.

If the consumer asks everything before that time, there will be no snapshot created yet. If traffic continues, after few secs, the subscription will be replicated correctly.

To "fix" the test I just put a Thread.sleep(2 * config1.getReplicatedSubscriptionsSnapshotFrequencyMillis()) before the producer gets started.

One other minor thing in the test is that we don't need to create the subscription in R2 (

createReplicatedSubscription(client2, topicName, subscriptionName, replicateSubscriptionState);

). Creating the replicated subscription in R1 will automatically trigger the creation of the subscription in all the clusters (where the topic is being replicated).

thank you @merlimat , The cause of the problem is that continuous produce messages are required, and one-time production messages that are too short indeed cannot trigger the replication state.

@liguangcheng
Copy link

To sum up: There are two conditions for subscription replication between multiple sites in pulsar 2.8:

  1. Continuous production data
  2. Close message deduplication

@codelipenghui
Copy link
Contributor

The issue had no activity for 30 days, mark with Stale label.

@liangjianwen
Copy link

I had the same problem with version 2.8.1 of Pulsar. What is the final solution to this issue?

@liguangcheng
Copy link

I had the same problem with version 2.8.1 of Pulsar. What is the final solution to this issue?

To sum up: There are two conditions for subscription replication between multiple sites in pulsar 2.8:

Continuous traffic data between two sites
Close message deduplication in broker.conf

@liangjianwen
Copy link

@liguangcheng Got it, thanks.

@lhotari lhotari linked a pull request Jul 18, 2022 that will close this issue
1 task
@lhotari
Copy link
Member

lhotari commented Jul 18, 2022

@liangjianwen @liguangcheng There's a possible mitigation for the "continuous traffic data between two sites" requirement in #16651 . It doesn't remove it completely since the subscription replication can happen only for positions which have a valid subscription snapshot. Please share more details about your observations or use cases.

@SpicyChickenFLY
Copy link

@liguangcheng Got it, thanks.

@liangjianwen I still got the same problem with 2.8.1 even though I met the two conditions mentioned by @liguangcheng , does it work for you?

@lhotari
Copy link
Member

lhotari commented Nov 24, 2023

For anyone dealing with subscription replication issues, one thing to check is whether replication snapshots are timing out. Increasing the timeout threshold (replicatedSubscriptionsSnapshotTimeoutSeconds=30 -> replicatedSubscriptionsSnapshotTimeoutSeconds=60) could help solve issues in that case.
See #21612 (reply in thread) for more details.

There's also a pending PR #16651 that mitigates some issues in snapshot creation and reduces unnecessary snapshots.

@YJDoc2
Copy link

YJDoc2 commented Mar 27, 2024

Hey, was there any update on this ? I'm using pulsar 2.10.x , and facing the same issue. I read through the related issues on this repo, but even with continuous message production (100 messages with 1 sec sleep between each, as well as 1000 messages with 0.1 sleep between each) and dedup enabled. I'm not getting the expected behavior of consuming x messages on one cluster, closing the client, and starting from x+1 st message on second cluster (or anywhere near x for that matter, the second consumer always start from 0th message). set the snapshot timeout to 60, but not sure where to see if the snapshot is failing, there are no obvious logs related to replication failure in broker.

@lhotari
Copy link
Member

lhotari commented Mar 27, 2024

Hey, was there any update on this ? I'm using pulsar 2.10.x , and facing the same issue. I read through the related issues on this repo, but even with continuous message production (100 messages with 1 sec sleep between each, as well as 1000 messages with 0.1 sleep between each) and dedup enabled. I'm not getting the expected behavior of consuming x messages on one cluster, closing the client, and starting from x+1 st message on second cluster (or anywhere near x for that matter, the second consumer always start from 0th message). set the snapshot timeout to 60, but not sure where to see if the snapshot is failing, there are no obvious logs related to replication failure in broker.

@YJDoc2 there are discussions #22315 and #21612 which might contain some useful information.

Since replicated subscriptions will only replicate the mark-delete position, it is worth checking pulsar-admin topics stats-internal for the topic in each cluster to check the subscription state.

@YJDoc2
Copy link

YJDoc2 commented Apr 1, 2024

Hey @lhotari , thanks for the resources, they helped a lot. So to be clear, from what I have understood, is the following correct :
For two clusters A and B, where cluster B is replicating cluster A (so cluster A is the "main" one)

  • When subscription state replication is NOT enabled, the messages will be replicated from A -> B, and the connected consumers in both clusters, irrespective of their type (exclusive or shared) will consume all the message from the point of enabling the replication, and act as if they are connected to two independent clusters.
  • When subscription state replication IS enabled, only the delete marker is replicated. So even if one consumer consumes and acknowledges a message in one cluster (say A), another consumer (of same subscription) in the other cluster (B) does not get any notification / information of this. When the pulsar (in A or B) decides that some of the messages in queue should be deleted (by either ttl or storage reasons or some internal algo) , that info is shared across clusters, and now newly attached consumers in both the cluster can consumer messages only from the new deletion point onward and not before.

Is my understanding correct? Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/bug The PR fixed a bug or issue reported a bug
Projects
None yet
Development

Successfully merging a pull request may close this issue.

8 participants