diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index f3af7cdfa400d..2801be71e6257 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -917,7 +917,7 @@ protected void internalResetCursor(String subName, long timestamp, boolean autho } } - protected void internalCreateSubscription(String subscriptionName, MessageIdImpl messageId, boolean authoritative) { + protected void internalCreateSubscription(String subscriptionName, MessageIdImpl messageId, boolean authoritative, boolean replicated) { if (topicName.isGlobal()) { validateGlobalNamespaceOwnership(namespaceName); } @@ -968,7 +968,7 @@ protected void internalCreateSubscription(String subscriptionName, MessageIdImpl } PersistentSubscription subscription = (PersistentSubscription) topic - .createSubscription(subscriptionName, InitialPosition.Latest).get(); + .createSubscription(subscriptionName, InitialPosition.Latest, replicated).get(); // Mark the cursor as "inactive" as it was created without a real consumer connected subscription.deactivateCursor(); subscription.resetCursor(PositionImpl.get(messageId.getLedgerId(), messageId.getEntryId())).get(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java index 09f719a7c979e..9766ec6d29eec 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java @@ -292,7 +292,7 @@ public PartitionedTopicInternalStats getPartitionedStatsInternal(@PathParam("pro validateTopicName(property, cluster, namespace, encodedTopic); return internalGetPartitionedStatsInternal(authoritative); } - + @DELETE @Path("/{property}/{cluster}/{namespace}/{topic}/subscription/{subName}") @ApiOperation(hidden = true, value = "Delete a subscription.", notes = "There should not be any active consumers on the subscription.") @@ -397,9 +397,9 @@ public void resetCursorOnPosition(@PathParam("property") String property, @PathP public void createSubscription(@PathParam("property") String property, @PathParam("cluster") String cluster, @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String topic, @PathParam("subscriptionName") String encodedSubName, - @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, MessageIdImpl messageId) { + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, MessageIdImpl messageId, @QueryParam("replicated") boolean replicated) { validateTopicName(property, cluster, namespace, topic); - internalCreateSubscription(decode(encodedSubName), messageId, authoritative); + internalCreateSubscription(decode(encodedSubName), messageId, authoritative, replicated); } @GET diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java index ca9ff88e6d7e5..d62e0c6a3ded2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java @@ -150,7 +150,7 @@ public void createPartitionedTopic(@PathParam("tenant") String tenant, @PathPara @ApiResponse(code = 503, message = "Failed to validate global cluster configuration") }) public void createNonPartitionedTopic(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace, - @PathParam("topic") @Encoded String encodedTopic, + @PathParam("topic") @Encoded String encodedTopic, @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateGlobalNamespaceOwnership(tenant,namespace); validateTopicName(tenant, namespace, encodedTopic); @@ -316,7 +316,7 @@ public PartitionedTopicInternalStats getPartitionedStatsInternal(@PathParam("ten validateTopicName(tenant, namespace, encodedTopic); return internalGetPartitionedStatsInternal(authoritative); } - + @DELETE @Path("/{tenant}/{namespace}/{topic}/subscription/{subName}") @ApiOperation(value = "Delete a subscription.", notes = "There should not be any active consumers on the subscription.") @@ -390,9 +390,9 @@ public void expireMessagesForAllSubscriptions(@PathParam("tenant") String tenant @ApiResponse(code = 405, message = "Not supported for partitioned topics") }) public void createSubscription(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String topic, @PathParam("subscriptionName") String encodedSubName, - @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, MessageIdImpl messageId) { + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, MessageIdImpl messageId, @QueryParam("replicated") boolean replicated) { validateTopicName(tenant, namespace, topic); - internalCreateSubscription(decode(encodedSubName), messageId, authoritative); + internalCreateSubscription(decode(encodedSubName), messageId, authoritative, replicated); } @POST diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index b5b6fe97b2d97..44886136d34e7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -611,6 +611,7 @@ protected void handleSubscribe(final CommandSubscribe subscribe) { final Map metadata = CommandUtils.metadataFromCommand(subscribe); final InitialPosition initialPosition = subscribe.getInitialPosition(); final SchemaData schema = subscribe.hasSchema() ? getSchema(subscribe.getSchema()) : null; + final boolean isReplicated = subscribe.hasReplicateSubscriptionState() && subscribe.getReplicateSubscriptionState(); CompletableFuture isProxyAuthorizedFuture; if (service.isAuthorizationEnabled() && originalPrincipal != null) { @@ -685,7 +686,7 @@ protected void handleSubscribe(final CommandSubscribe subscribe) { return topic.subscribe(ServerCnx.this, subscriptionName, consumerId, subType, priorityLevel, consumerName, isDurable, startMessageId, metadata, - readCompacted, initialPosition); + readCompacted, initialPosition, isReplicated); } else { return FutureUtil.failedFuture( new IncompatibleSchemaException( @@ -696,7 +697,8 @@ protected void handleSubscribe(final CommandSubscribe subscribe) { } else { return topic.subscribe(ServerCnx.this, subscriptionName, consumerId, subType, priorityLevel, consumerName, isDurable, - startMessageId, metadata, readCompacted, initialPosition); + startMessageId, metadata, readCompacted, initialPosition, + isReplicated); } }) .thenAccept(consumer -> { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java index 8076bd30e6ea5..cf4c8627de22c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java @@ -44,6 +44,8 @@ public interface Subscription { String getTopicName(); + boolean isReplicated(); + Dispatcher getDispatcher(); long getNumberOfEntriesInBacklog(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java index 12c9bdcca3d4e..5c844ad7005a8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java @@ -92,9 +92,11 @@ default long getOriginalSequenceId() { CompletableFuture subscribe(ServerCnx cnx, String subscriptionName, long consumerId, SubType subType, int priorityLevel, String consumerName, boolean isDurable, MessageId startMessageId, - Map metadata, boolean readCompacted, InitialPosition initialPosition); + Map metadata, boolean readCompacted, InitialPosition initialPosition, + boolean replicateSubscriptionState); - CompletableFuture createSubscription(String subscriptionName, InitialPosition initialPosition); + CompletableFuture createSubscription(String subscriptionName, InitialPosition initialPosition, + boolean replicateSubscriptionState); CompletableFuture unsubscribe(String subName); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java index 0139d1449ea26..5c060bae9fa19 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java @@ -75,6 +75,11 @@ public Topic getTopic() { return topic; } + @Override + public boolean isReplicated() { + return false; + } + @Override public synchronized void addConsumer(Consumer consumer) throws BrokerServiceException { if (IS_FENCED_UPDATER.get(this) == TRUE) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index 10c83245c50dc..4c3323c3dca20 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -233,7 +233,7 @@ public void addProducer(Producer producer) throws BrokerServiceException { lock.readLock().lock(); try { brokerService.checkTopicNsOwnership(getName()); - + if (isFenced) { log.warn("[{}] Attempting to add producer to a fenced topic", topic); throw new TopicFencedException("Topic is temporarily unavailable"); @@ -315,10 +315,11 @@ public void removeProducer(Producer producer) { @Override public CompletableFuture subscribe(final ServerCnx cnx, String subscriptionName, long consumerId, SubType subType, int priorityLevel, String consumerName, boolean isDurable, MessageId startMessageId, - Map metadata, boolean readCompacted, InitialPosition initialPosition) { + Map metadata, boolean readCompacted, InitialPosition initialPosition, + boolean replicateSubscriptionState) { final CompletableFuture future = new CompletableFuture<>(); - + try { brokerService.checkTopicNsOwnership(getName()); } catch (Exception e) { @@ -396,7 +397,7 @@ public CompletableFuture subscribe(final ServerCnx cnx, String subscri } @Override - public CompletableFuture createSubscription(String subscriptionName, InitialPosition initialPosition) { + public CompletableFuture createSubscription(String subscriptionName, InitialPosition initialPosition, boolean replicateSubscriptionState) { return CompletableFuture.completedFuture(new NonPersistentSubscription(this, subscriptionName)); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/CompactorSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/CompactorSubscription.java index 316ebc593c523..0c3feb31bfd86 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/CompactorSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/CompactorSubscription.java @@ -41,7 +41,7 @@ public class CompactorSubscription extends PersistentSubscription { public CompactorSubscription(PersistentTopic topic, CompactedTopic compactedTopic, String subscriptionName, ManagedCursor cursor) { - super(topic, subscriptionName, cursor); + super(topic, subscriptionName, cursor, false); checkArgument(subscriptionName.equals(Compactor.COMPACTION_SUBSCRIPTION)); this.compactedTopic = compactedTopic; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index ce9d4aa3e16c8..a98af617d25b3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -20,14 +20,15 @@ import com.google.common.base.MoreObjects; +import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.TreeMap; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.AsyncCallbacks.ClearBacklogCallback; -import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback; @@ -40,7 +41,6 @@ import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.pulsar.broker.service.BrokerServiceException; -import org.apache.pulsar.broker.service.BrokerServiceException.PersistenceException; import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException; import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException; import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionFencedException; @@ -76,12 +76,36 @@ public class PersistentSubscription implements Subscription { // for connected subscriptions, message expiry will be checked if the backlog is greater than this threshold private static final int MINIMUM_BACKLOG_FOR_EXPIRY_CHECK = 1000; - public PersistentSubscription(PersistentTopic topic, String subscriptionName, ManagedCursor cursor) { + private static final String REPLICATED_SUBSCRIPTION_PROPERTY = "pulsar.replicated.subscription"; + + // Map of properties that is used to mark this subscription as "replicated". + // Since this is the only field at this point, we can just keep a static + // instance of the map. + private static final Map REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES = new TreeMap<>(); + private static final Map NON_REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES = Collections.emptyMap(); + + private volatile boolean isReplicated; + + static { + REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES.put(REPLICATED_SUBSCRIPTION_PROPERTY, 1L); + } + + static Map getBaseCursorProperties(boolean isReplicated) { + return isReplicated ? REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES : NON_REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES; + } + + static boolean isCursorFromReplicatedSubscription(ManagedCursor cursor) { + return cursor.getProperties().containsKey(REPLICATED_SUBSCRIPTION_PROPERTY); + } + + public PersistentSubscription(PersistentTopic topic, String subscriptionName, ManagedCursor cursor, + boolean replicated) { this.topic = topic; this.cursor = cursor; this.topicName = topic.getName(); this.subName = subscriptionName; this.expiryMonitor = new PersistentMessageExpiryMonitor(topicName, subscriptionName, cursor); + this.isReplicated = replicated; IS_FENCED_UPDATER.set(this, FALSE); } @@ -95,6 +119,15 @@ public Topic getTopic() { return topic; } + @Override + public boolean isReplicated() { + return isReplicated; + } + + void setReplicated(boolean replicated) { + this.isReplicated = replicated; + } + @Override public synchronized void addConsumer(Consumer consumer) throws BrokerServiceException { cursor.updateLastActive(); @@ -194,7 +227,7 @@ public void acknowledgeMessage(List positions, AckType ackType, Map mergeCursorProperties(Map userProperties) { + Map baseProperties = isReplicated ? REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES + : NON_REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES; + + if (userProperties.isEmpty()) { + // Use only the static instance in the common case + return baseProperties; + } else { + Map merged = new TreeMap<>(); + merged.putAll(userProperties); + merged.putAll(baseProperties); + return merged; + } + + } + private static final Logger log = LoggerFactory.getLogger(PersistentSubscription.class); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index ed59c37c32880..e21e359213ba9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -250,7 +250,8 @@ public PersistentTopic(String topic, ManagedLedger ledger, BrokerService brokerS // to take care of it } else { final String subscriptionName = Codec.decode(cursor.getName()); - subscriptions.put(subscriptionName, createPersistentSubscription(subscriptionName, cursor)); + subscriptions.put(subscriptionName, createPersistentSubscription(subscriptionName, cursor, + PersistentSubscription.isCursorFromReplicatedSubscription(cursor))); // subscription-cursor gets activated by default: deactivate as there is no active subscription right // now subscriptions.get(subscriptionName).deactivateCursor(); @@ -298,12 +299,13 @@ private void initializeDispatchRateLimiterIfNeeded(Optional policies) } } - private PersistentSubscription createPersistentSubscription(String subscriptionName, ManagedCursor cursor) { + private PersistentSubscription createPersistentSubscription(String subscriptionName, ManagedCursor cursor, + boolean replicated) { checkNotNull(compactedTopic); if (subscriptionName.equals(Compactor.COMPACTION_SUBSCRIPTION)) { return new CompactorSubscription(this, compactedTopic, subscriptionName, cursor); } else { - return new PersistentSubscription(this, subscriptionName, cursor); + return new PersistentSubscription(this, subscriptionName, cursor, replicated); } } @@ -496,7 +498,8 @@ public void removeProducer(Producer producer) { @Override public CompletableFuture subscribe(final ServerCnx cnx, String subscriptionName, long consumerId, SubType subType, int priorityLevel, String consumerName, boolean isDurable, MessageId startMessageId, - Map metadata, boolean readCompacted, InitialPosition initialPosition) { + Map metadata, boolean readCompacted, InitialPosition initialPosition, + boolean replicatedSubscriptionState) { final CompletableFuture future = new CompletableFuture<>(); @@ -571,7 +574,7 @@ public CompletableFuture subscribe(final ServerCnx cnx, String subscri } CompletableFuture subscriptionFuture = isDurable ? // - getDurableSubscription(subscriptionName, initialPosition) // + getDurableSubscription(subscriptionName, initialPosition, replicatedSubscriptionState) // : getNonDurableSubscription(subscriptionName, startMessageId); int maxUnackedMessages = isDurable ? brokerService.pulsar().getConfiguration().getMaxUnackedMessagesPerConsumer() :0; @@ -617,17 +620,27 @@ public CompletableFuture subscribe(final ServerCnx cnx, String subscri return future; } - private CompletableFuture getDurableSubscription(String subscriptionName, InitialPosition initialPosition) { + private CompletableFuture getDurableSubscription(String subscriptionName, + InitialPosition initialPosition, boolean replicated) { CompletableFuture subscriptionFuture = new CompletableFuture<>(); - ledger.asyncOpenCursor(Codec.encode(subscriptionName), initialPosition, new OpenCursorCallback() { + + Map properties = PersistentSubscription.getBaseCursorProperties(replicated); + + ledger.asyncOpenCursor(Codec.encode(subscriptionName), initialPosition, properties, new OpenCursorCallback() { @Override public void openCursorComplete(ManagedCursor cursor, Object ctx) { if (log.isDebugEnabled()) { log.debug("[{}][{}] Opened cursor", topic, subscriptionName); } - subscriptionFuture.complete(subscriptions.computeIfAbsent(subscriptionName, - name -> createPersistentSubscription(subscriptionName, cursor))); + PersistentSubscription subscription = subscriptions.computeIfAbsent(subscriptionName, + name -> createPersistentSubscription(subscriptionName, cursor, replicated)); + + if (replicated && !subscription.isReplicated()) { + // Flip the subscription state + subscription.setReplicated(replicated); + } + subscriptionFuture.complete(subscription); } @Override @@ -671,7 +684,7 @@ private CompletableFuture getNonDurableSubscription(Stri subscriptionFuture.completeExceptionally(e); } - return new PersistentSubscription(this, subscriptionName, cursor); + return new PersistentSubscription(this, subscriptionName, cursor, false); }); if (!subscriptionFuture.isDone()) { @@ -686,8 +699,8 @@ private CompletableFuture getNonDurableSubscription(Stri @SuppressWarnings("unchecked") @Override - public CompletableFuture createSubscription(String subscriptionName, InitialPosition initialPosition) { - return getDurableSubscription(subscriptionName, initialPosition); + public CompletableFuture createSubscription(String subscriptionName, InitialPosition initialPosition, boolean replicateSubscriptionState) { + return getDurableSubscription(subscriptionName, initialPosition, replicateSubscriptionState); } /** diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java index e75078f65d806..6efe6011b3020 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java @@ -110,7 +110,7 @@ public void testGetSubscriptions() { } catch (Exception e) { Assert.assertEquals("Topic partitions were not yet created", e.getMessage()); } - persistentTopics.createSubscription(testTenant, testNamespace, testLocalTopicName, "test", true, (MessageIdImpl)MessageId.earliest); + persistentTopics.createSubscription(testTenant, testNamespace, testLocalTopicName, "test", true, (MessageIdImpl)MessageId.earliest, false); List subscriptions = persistentTopics.getSubscriptions(testTenant, testNamespace, testLocalTopicName + "-partition-0", true); Assert.assertTrue(subscriptions.contains("test")); persistentTopics.deleteSubscription(testTenant, testNamespace, testLocalTopicName, "test", true); @@ -123,7 +123,7 @@ public void testGetSubscriptions() { public void testNonPartitionedTopics() { pulsar.getConfiguration().setAllowAutoTopicCreation(false); final String nonPartitionTopic = "non-partitioned-topic"; - persistentTopics.createSubscription(testTenant, testNamespace, nonPartitionTopic, "test", true, (MessageIdImpl) MessageId.latest); + persistentTopics.createSubscription(testTenant, testNamespace, nonPartitionTopic, "test", true, (MessageIdImpl) MessageId.latest, false); try { persistentTopics.getSubscriptions(testTenant, testNamespace, nonPartitionTopic + "-partition-0", true); } catch (RestException exc) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java index 24cb8120d91b1..cb5a2ef307f12 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java @@ -263,7 +263,7 @@ private void verifyActiveConsumerChange(CommandActiveConsumerChange change, @Test public void testConsumerGroupChangesWithOldNewConsumers() throws Exception { PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService); - PersistentSubscription sub = new PersistentSubscription(topic, "sub-1", cursorMock); + PersistentSubscription sub = new PersistentSubscription(topic, "sub-1", cursorMock, false); int partitionIndex = 0; PersistentDispatcherSingleActiveConsumer pdfc = new PersistentDispatcherSingleActiveConsumer(cursorMock, @@ -302,7 +302,7 @@ public void testAddRemoveConsumer() throws Exception { log.info("--- Starting PersistentDispatcherFailoverConsumerTest::testAddConsumer ---"); PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService); - PersistentSubscription sub = new PersistentSubscription(topic, "sub-1", cursorMock); + PersistentSubscription sub = new PersistentSubscription(topic, "sub-1", cursorMock, false); int partitionIndex = 0; PersistentDispatcherSingleActiveConsumer pdfc = new PersistentDispatcherSingleActiveConsumer(cursorMock, diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java index 7c5ca2077da5e..01895a8e45366 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java @@ -123,7 +123,8 @@ public void testConcurrentTopicAndSubscriptionDelete() throws Exception { .setSubType(PulsarApi.CommandSubscribe.SubType.Exclusive).build(); Future f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(), - 0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), InitialPosition.Latest); + 0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), InitialPosition.Latest, + false /* replicated */); f1.get(); final CyclicBarrier barrier = new CyclicBarrier(2); @@ -181,7 +182,8 @@ public void testConcurrentTopicGCAndSubscriptionDelete() throws Exception { .setSubType(PulsarApi.CommandSubscribe.SubType.Exclusive).build(); Future f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(), - 0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), InitialPosition.Latest); + 0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), InitialPosition.Latest, + false /* replicated */); f1.get(); final CyclicBarrier barrier = new CyclicBarrier(2); @@ -243,7 +245,8 @@ public void testConcurrentTopicDeleteAndUnsubscribe() throws Exception { .setSubType(PulsarApi.CommandSubscribe.SubType.Exclusive).build(); Future f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(), - 0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), InitialPosition.Latest); + 0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), InitialPosition.Latest, + false /* replicated */); f1.get(); final CyclicBarrier barrier = new CyclicBarrier(2); @@ -301,7 +304,8 @@ public void testConcurrentTopicDeleteAndSubsUnsubscribe() throws Exception { .setSubType(PulsarApi.CommandSubscribe.SubType.Exclusive).build(); Future f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(), - 0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), InitialPosition.Latest); + 0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), InitialPosition.Latest, + false /* replicated */); f1.get(); final CyclicBarrier barrier = new CyclicBarrier(2); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java index 8b3c721d21f11..93664ff651eac 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java @@ -433,7 +433,8 @@ public void testSubscribeFail() throws Exception { .setSubscription("").setRequestId(1).setSubType(SubType.Exclusive).build(); Future f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(), - 0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), InitialPosition.Latest); + 0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), InitialPosition.Latest, + false); try { f1.get(); fail("should fail with exception"); @@ -452,12 +453,14 @@ public void testSubscribeUnsubscribe() throws Exception { // 1. simple subscribe Future f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(), - 0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), InitialPosition.Latest); + 0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), InitialPosition.Latest, + false); f1.get(); // 2. duplicate subscribe Future f2 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(), - 0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), InitialPosition.Latest); + 0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), InitialPosition.Latest, + false); try { f2.get(); @@ -477,7 +480,7 @@ public void testSubscribeUnsubscribe() throws Exception { @Test public void testAddRemoveConsumer() throws Exception { PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService); - PersistentSubscription sub = new PersistentSubscription(topic, "sub-1", cursorMock); + PersistentSubscription sub = new PersistentSubscription(topic, "sub-1", cursorMock, false); // 1. simple add consumer Consumer consumer = new Consumer(sub, SubType.Exclusive, topic.getName(), 1 /* consumer id */, 0, "Cons1"/* consumer name */, @@ -508,8 +511,8 @@ public void testAddRemoveConsumer() throws Exception { public void testMaxConsumersShared() throws Exception { PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService); - PersistentSubscription sub = new PersistentSubscription(topic, "sub-1", cursorMock); - PersistentSubscription sub2 = new PersistentSubscription(topic, "sub-2", cursorMock); + PersistentSubscription sub = new PersistentSubscription(topic, "sub-1", cursorMock, false); + PersistentSubscription sub2 = new PersistentSubscription(topic, "sub-2", cursorMock, false); // for count consumers on topic ConcurrentOpenHashMap subscriptions = new ConcurrentOpenHashMap<>(16, 1); @@ -599,8 +602,8 @@ public void testMaxConsumersSharedForNamespace() throws Exception { public void testMaxConsumersFailover() throws Exception { PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService); - PersistentSubscription sub = new PersistentSubscription(topic, "sub-1", cursorMock); - PersistentSubscription sub2 = new PersistentSubscription(topic, "sub-2", cursorMock); + PersistentSubscription sub = new PersistentSubscription(topic, "sub-1", cursorMock, false); + PersistentSubscription sub2 = new PersistentSubscription(topic, "sub-2", cursorMock, false); // for count consumers on topic ConcurrentOpenHashMap subscriptions = new ConcurrentOpenHashMap<>(16, 1); @@ -690,7 +693,7 @@ public void testMaxConsumersFailoverForNamespace() throws Exception { @Test public void testUbsubscribeRaceConditions() throws Exception { PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService); - PersistentSubscription sub = new PersistentSubscription(topic, "sub-1", cursorMock); + PersistentSubscription sub = new PersistentSubscription(topic, "sub-1", cursorMock, false); Consumer consumer1 = new Consumer(sub, SubType.Exclusive, topic.getName(), 1 /* consumer id */, 0, "Cons1"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest); sub.addConsumer(consumer1); @@ -744,7 +747,8 @@ public void testDeleteTopic() throws Exception { .setSubscription(successSubName).setRequestId(1).setSubType(SubType.Exclusive).build(); Future f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(), - 0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest); + 0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, + false /* replicated */); f1.get(); assertTrue(topic.delete().isCompletedExceptionally()); @@ -759,7 +763,8 @@ public void testDeleteAndUnsubscribeTopic() throws Exception { .setSubscription(successSubName).setRequestId(1).setSubType(SubType.Exclusive).build(); Future f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(), - 0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), InitialPosition.Latest); + 0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), InitialPosition.Latest, + false /* replicated */); f1.get(); final CyclicBarrier barrier = new CyclicBarrier(2); @@ -813,7 +818,8 @@ public void testConcurrentTopicAndSubscriptionDelete() throws Exception { .setSubscription(successSubName).setRequestId(1).setSubType(SubType.Exclusive).build(); Future f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(), - 0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), InitialPosition.Latest); + 0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), InitialPosition.Latest, + false /* replicated */); f1.get(); final CyclicBarrier barrier = new CyclicBarrier(2); @@ -900,7 +906,8 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable { .setSubscription(successSubName).setRequestId(1).setSubType(SubType.Exclusive).build(); Future f = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(), - 0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), InitialPosition.Latest); + 0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), InitialPosition.Latest, + false /* replicated */); try { f.get(); @@ -911,6 +918,7 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable { } } + @SuppressWarnings("unchecked") void setupMLAsyncCallbackMocks() { ledgerMock = mock(ManagedLedger.class); cursorMock = mock(ManagedCursor.class); @@ -984,6 +992,15 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable { } }).when(ledgerMock).asyncOpenCursor(matches(".*success.*"), any(InitialPosition.class), any(OpenCursorCallback.class), anyObject()); + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocationOnMock) throws Throwable { + ((OpenCursorCallback) invocationOnMock.getArguments()[3]).openCursorComplete(cursorMock, null); + return null; + } + }).when(ledgerMock).asyncOpenCursor(matches(".*success.*"), any(InitialPosition.class), any(Map.class), + any(OpenCursorCallback.class), anyObject()); + // call deleteLedgerComplete on ledger asyncDelete doAnswer(new Answer() { @Override @@ -1018,7 +1035,8 @@ public void testFailoverSubscription() throws Exception { // 1. Subscribe with non partition topic Future f1 = topic1.subscribe(serverCnx, cmd1.getSubscription(), cmd1.getConsumerId(), cmd1.getSubType(), 0, cmd1.getConsumerName(), cmd1.getDurable(), null, Collections.emptyMap(), - cmd1.getReadCompacted(), InitialPosition.Latest); + cmd1.getReadCompacted(), InitialPosition.Latest, + false /* replicated */); f1.get(); // 2. Subscribe with partition topic @@ -1030,7 +1048,8 @@ public void testFailoverSubscription() throws Exception { Future f2 = topic2.subscribe(serverCnx, cmd2.getSubscription(), cmd2.getConsumerId(), cmd2.getSubType(), 0, cmd2.getConsumerName(), cmd2.getDurable(), null, Collections.emptyMap(), - cmd2.getReadCompacted(), InitialPosition.Latest); + cmd2.getReadCompacted(), InitialPosition.Latest, + false /* replicated */); f2.get(); // 3. Subscribe and create second consumer @@ -1040,7 +1059,8 @@ public void testFailoverSubscription() throws Exception { Future f3 = topic2.subscribe(serverCnx, cmd3.getSubscription(), cmd3.getConsumerId(), cmd3.getSubType(), 0, cmd3.getConsumerName(), cmd3.getDurable(), null, Collections.emptyMap(), - cmd3.getReadCompacted(), InitialPosition.Latest); + cmd3.getReadCompacted(), InitialPosition.Latest, + false /* replicated */); f3.get(); assertEquals( @@ -1061,7 +1081,8 @@ public void testFailoverSubscription() throws Exception { Future f4 = topic2.subscribe(serverCnx, cmd4.getSubscription(), cmd4.getConsumerId(), cmd4.getSubType(), 0, cmd4.getConsumerName(), cmd4.getDurable(), null, Collections.emptyMap(), - cmd4.getReadCompacted(), InitialPosition.Latest); + cmd4.getReadCompacted(), InitialPosition.Latest, + false /* replicated */); f4.get(); assertEquals( @@ -1087,7 +1108,8 @@ public void testFailoverSubscription() throws Exception { Future f5 = topic2.subscribe(serverCnx, cmd5.getSubscription(), cmd5.getConsumerId(), cmd5.getSubType(), 0, cmd5.getConsumerName(), cmd5.getDurable(), null, Collections.emptyMap(), - cmd5.getReadCompacted(), InitialPosition.Latest); + cmd5.getReadCompacted(), InitialPosition.Latest, + false /* replicated */); try { f5.get(); @@ -1104,7 +1126,8 @@ public void testFailoverSubscription() throws Exception { Future f6 = topic2.subscribe(serverCnx, cmd6.getSubscription(), cmd6.getConsumerId(), cmd6.getSubType(), 0, cmd6.getConsumerName(), cmd6.getDurable(), null, Collections.emptyMap(), - cmd6.getReadCompacted(), InitialPosition.Latest); + cmd6.getReadCompacted(), InitialPosition.Latest, + false /* replicated */); f6.get(); // 7. unsubscribe exclusive sub diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java index 82f61c22eeff1..a43057cbb2c29 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java @@ -46,6 +46,7 @@ import java.lang.reflect.Field; import java.util.ArrayList; import java.util.Collections; +import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; @@ -1479,6 +1480,16 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable { } }).when(ledgerMock).asyncOpenCursor(matches(".*success.*"), any(InitialPosition.class), any(OpenCursorCallback.class), anyObject()); + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocationOnMock) throws Throwable { + Thread.sleep(300); + ((OpenCursorCallback) invocationOnMock.getArguments()[3]).openCursorComplete(cursorMock, null); + return null; + } + }).when(ledgerMock).asyncOpenCursor(matches(".*success.*"), any(InitialPosition.class), any(Map.class), + any(OpenCursorCallback.class), anyObject()); + doAnswer(new Answer() { @Override public Object answer(InvocationOnMock invocationOnMock) throws Throwable { @@ -1489,6 +1500,17 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable { } }).when(ledgerMock).asyncOpenCursor(matches(".*fail.*"), any(InitialPosition.class), any(OpenCursorCallback.class), anyObject()); + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocationOnMock) throws Throwable { + Thread.sleep(300); + ((OpenCursorCallback) invocationOnMock.getArguments()[3]) + .openCursorFailed(new ManagedLedgerException("Managed ledger failure"), null); + return null; + } + }).when(ledgerMock).asyncOpenCursor(matches(".*fail.*"), any(InitialPosition.class), any(Map.class), + any(OpenCursorCallback.class), anyObject()); + doAnswer(new Answer() { @Override public Object answer(InvocationOnMock invocationOnMock) throws Throwable { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionConfigTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionConfigTest.java new file mode 100644 index 0000000000000..8ec937dfcc592 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionConfigTest.java @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service.persistent; + +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; + +import lombok.Cleanup; + +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.ProducerConsumerBase; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.common.policies.data.TopicStats; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +public class ReplicatedSubscriptionConfigTest extends ProducerConsumerBase { + @Override + @BeforeClass + public void setup() throws Exception { + super.internalSetup(); + super.producerBaseSetup(); + } + + @Override + @AfterClass + public void cleanup() throws Exception { + super.internalCleanup(); + } + + @Test + public void createReplicatedSubscription() throws Exception { + String topic = "createReplicatedSubscription-" + System.nanoTime(); + + @Cleanup + Consumer consumer = pulsarClient.newConsumer(Schema.STRING) + .topic(topic) + .subscriptionName("sub1") + .replicateSubscriptionState(true) + .subscribe(); + + TopicStats stats = admin.topics().getStats(topic); + assertTrue(stats.subscriptions.get("sub1").isReplicated); + + admin.topics().unload(topic); + + // Check that subscription is still marked replicated after reloading + stats = admin.topics().getStats(topic); + assertTrue(stats.subscriptions.get("sub1").isReplicated); + } + + @Test + public void upgradeToReplicatedSubscription() throws Exception { + String topic = "upgradeToReplicatedSubscription-" + System.nanoTime(); + + Consumer consumer = pulsarClient.newConsumer(Schema.STRING) + .topic(topic) + .subscriptionName("sub") + .replicateSubscriptionState(false) + .subscribe(); + + TopicStats stats = admin.topics().getStats(topic); + assertFalse(stats.subscriptions.get("sub").isReplicated); + consumer.close(); + + consumer = pulsarClient.newConsumer(Schema.STRING) + .topic(topic) + .subscriptionName("sub") + .replicateSubscriptionState(true) + .subscribe(); + + stats = admin.topics().getStats(topic); + assertTrue(stats.subscriptions.get("sub").isReplicated); + consumer.close(); + } + + @Test + public void upgradeToReplicatedSubscriptionAfterRestart() throws Exception { + String topic = "upgradeToReplicatedSubscriptionAfterRestart-" + System.nanoTime(); + + Consumer consumer = pulsarClient.newConsumer(Schema.STRING) + .topic(topic) + .subscriptionName("sub") + .replicateSubscriptionState(false) + .subscribe(); + + TopicStats stats = admin.topics().getStats(topic); + assertFalse(stats.subscriptions.get("sub").isReplicated); + consumer.close(); + + admin.topics().unload(topic); + + consumer = pulsarClient.newConsumer(Schema.STRING) + .topic(topic) + .subscriptionName("sub") + .replicateSubscriptionState(true) + .subscribe(); + + stats = admin.topics().getStats(topic); + assertTrue(stats.subscriptions.get("sub").isReplicated); + consumer.close(); + } +} diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java index 8cb50f903f707..f8c5e0a5ff800 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java @@ -296,6 +296,12 @@ public interface ConsumerBuilder extends Cloneable { */ ConsumerBuilder acknowledgmentGroupTime(long delay, TimeUnit unit); + /** + * + * @param replicateSubscriptionState + */ + ConsumerBuilder replicateSubscriptionState(boolean replicateSubscriptionState); + /** * Set the max total receiver queue size across partitons. *

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java index 6f8d9c9316906..19f5c6ebaf78a 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java @@ -278,6 +278,12 @@ public ConsumerBuilder subscriptionTopicsMode(@NonNull RegexSubscriptionMode return this; } + @Override + public ConsumerBuilder replicateSubscriptionState(boolean replicateSubscriptionState) { + conf.setReplicateSubscriptionState(replicateSubscriptionState); + return this; + } + @Override public ConsumerBuilder intercept(ConsumerInterceptor... interceptors) { if (interceptorList == null) { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index b2421f246335c..8d91ca12ad3f4 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -526,7 +526,9 @@ public void connectionOpened(final ClientCnx cnx) { si = null; } ByteBuf request = Commands.newSubscribe(topic, subscription, consumerId, requestId, getSubType(), priorityLevel, - consumerName, isDurable, startMessageIdData, metadata, readCompacted, InitialPosition.valueOf(subscriptionInitialPosition.getValue()), si); + consumerName, isDurable, startMessageIdData, metadata, readCompacted, + conf.isReplicateSubscriptionState(), InitialPosition.valueOf(subscriptionInitialPosition.getValue()), + si); if (startMessageIdData != null) { startMessageIdData.recycle(); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java index fd4b719994991..3352ebb287bb0 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java @@ -100,6 +100,8 @@ public class ConsumerConfigurationData implements Serializable, Cloneable { private boolean autoUpdatePartitions = true; + private boolean replicateSubscriptionState = false; + @JsonIgnore public String getSingleTopic() { checkArgument(topicNames.size() == 1); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java index 04018f6eed0d1..2d76410dd86e8 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java @@ -418,12 +418,14 @@ public static ByteBufPair newSend(long producerId, long sequenceId, int numMessa public static ByteBuf newSubscribe(String topic, String subscription, long consumerId, long requestId, SubType subType, int priorityLevel, String consumerName) { return newSubscribe(topic, subscription, consumerId, requestId, subType, priorityLevel, consumerName, - true /* isDurable */, null /* startMessageId */, Collections.emptyMap(), false, InitialPosition.Earliest, null); + true /* isDurable */, null /* startMessageId */, Collections.emptyMap(), false, + false /* isReplicated */, InitialPosition.Earliest, null); } public static ByteBuf newSubscribe(String topic, String subscription, long consumerId, long requestId, SubType subType, int priorityLevel, String consumerName, boolean isDurable, MessageIdData startMessageId, - Map metadata, boolean readCompacted, InitialPosition subscriptionInitialPosition, SchemaInfo schemaInfo) { + Map metadata, boolean readCompacted, boolean isReplicated, + InitialPosition subscriptionInitialPosition, SchemaInfo schemaInfo) { CommandSubscribe.Builder subscribeBuilder = CommandSubscribe.newBuilder(); subscribeBuilder.setTopic(topic); subscribeBuilder.setSubscription(subscription); @@ -435,6 +437,7 @@ public static ByteBuf newSubscribe(String topic, String subscription, long consu subscribeBuilder.setDurable(isDurable); subscribeBuilder.setReadCompacted(readCompacted); subscribeBuilder.setInitialPosition(subscriptionInitialPosition); + subscribeBuilder.setReplicateSubscriptionState(isReplicated); if (startMessageId != null) { subscribeBuilder.setStartMessageId(startMessageId); } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java index 319302db9f976..625752b4b99ad 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java @@ -8471,6 +8471,10 @@ public interface CommandSubscribeOrBuilder // optional .pulsar.proto.CommandSubscribe.InitialPosition initialPosition = 13 [default = Latest]; boolean hasInitialPosition(); org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition getInitialPosition(); + + // optional bool replicate_subscription_state = 14; + boolean hasReplicateSubscriptionState(); + boolean getReplicateSubscriptionState(); } public static final class CommandSubscribe extends org.apache.pulsar.shaded.com.google.protobuf.v241.GeneratedMessageLite @@ -8802,6 +8806,16 @@ public org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosi return initialPosition_; } + // optional bool replicate_subscription_state = 14; + public static final int REPLICATE_SUBSCRIPTION_STATE_FIELD_NUMBER = 14; + private boolean replicateSubscriptionState_; + public boolean hasReplicateSubscriptionState() { + return ((bitField0_ & 0x00001000) == 0x00001000); + } + public boolean getReplicateSubscriptionState() { + return replicateSubscriptionState_; + } + private void initFields() { topic_ = ""; subscription_ = ""; @@ -8816,6 +8830,7 @@ private void initFields() { readCompacted_ = false; schema_ = org.apache.pulsar.common.api.proto.PulsarApi.Schema.getDefaultInstance(); initialPosition_ = org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition.Latest; + replicateSubscriptionState_ = false; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -8911,6 +8926,9 @@ public void writeTo(org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStr if (((bitField0_ & 0x00000800) == 0x00000800)) { output.writeEnum(13, initialPosition_.getNumber()); } + if (((bitField0_ & 0x00001000) == 0x00001000)) { + output.writeBool(14, replicateSubscriptionState_); + } } private int memoizedSerializedSize = -1; @@ -8971,6 +8989,10 @@ public int getSerializedSize() { size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream .computeEnumSize(13, initialPosition_.getNumber()); } + if (((bitField0_ & 0x00001000) == 0x00001000)) { + size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream + .computeBoolSize(14, replicateSubscriptionState_); + } memoizedSerializedSize = size; return size; } @@ -9110,6 +9132,8 @@ public Builder clear() { bitField0_ = (bitField0_ & ~0x00000800); initialPosition_ = org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition.Latest; bitField0_ = (bitField0_ & ~0x00001000); + replicateSubscriptionState_ = false; + bitField0_ = (bitField0_ & ~0x00002000); return this; } @@ -9196,6 +9220,10 @@ public org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe buildPartia to_bitField0_ |= 0x00000800; } result.initialPosition_ = initialPosition_; + if (((from_bitField0_ & 0x00002000) == 0x00002000)) { + to_bitField0_ |= 0x00001000; + } + result.replicateSubscriptionState_ = replicateSubscriptionState_; result.bitField0_ = to_bitField0_; return result; } @@ -9248,6 +9276,9 @@ public Builder mergeFrom(org.apache.pulsar.common.api.proto.PulsarApi.CommandSub if (other.hasInitialPosition()) { setInitialPosition(other.getInitialPosition()); } + if (other.hasReplicateSubscriptionState()) { + setReplicateSubscriptionState(other.getReplicateSubscriptionState()); + } return this; } @@ -9399,6 +9430,11 @@ public Builder mergeFrom( } break; } + case 112: { + bitField0_ |= 0x00002000; + replicateSubscriptionState_ = input.readBool(); + break; + } } } } @@ -9841,6 +9877,27 @@ public Builder clearInitialPosition() { return this; } + // optional bool replicate_subscription_state = 14; + private boolean replicateSubscriptionState_ ; + public boolean hasReplicateSubscriptionState() { + return ((bitField0_ & 0x00002000) == 0x00002000); + } + public boolean getReplicateSubscriptionState() { + return replicateSubscriptionState_; + } + public Builder setReplicateSubscriptionState(boolean value) { + bitField0_ |= 0x00002000; + replicateSubscriptionState_ = value; + + return this; + } + public Builder clearReplicateSubscriptionState() { + bitField0_ = (bitField0_ & ~0x00002000); + replicateSubscriptionState_ = false; + + return this; + } + // @@protoc_insertion_point(builder_scope:pulsar.proto.CommandSubscribe) } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java index f141d4a6d2c1a..40e3dadc2d7c2 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java @@ -43,7 +43,7 @@ public class SubscriptionStats { /** Flag to verify if subscription is blocked due to reaching threshold of unacked messages */ public boolean blockedSubscriptionOnUnackedMsgs; - + /** Number of unacknowledged messages for the subscription */ public long unackedMessages; @@ -59,6 +59,9 @@ public class SubscriptionStats { /** List of connected consumers on this subscription w/ their stats */ public List consumers; + /** Mark that the subscription state is kept in sync across different regions */ + public boolean isReplicated; + public SubscriptionStats() { this.consumers = Lists.newArrayList(); } @@ -83,6 +86,7 @@ public SubscriptionStats add(SubscriptionStats stats) { this.msgBacklog += stats.msgBacklog; this.unackedMessages += stats.unackedMessages; this.msgRateExpired += stats.msgRateExpired; + this.isReplicated |= stats.isReplicated; if (this.consumers.size() != stats.consumers.size()) { for (int i = 0; i < stats.consumers.size(); i++) { ConsumerStats consumerStats = new ConsumerStats(); diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto index 11d69bee21c48..58238d7405656 100644 --- a/pulsar-common/src/main/proto/PulsarApi.proto +++ b/pulsar-common/src/main/proto/PulsarApi.proto @@ -272,6 +272,11 @@ message CommandSubscribe { // Signal wthether the subscription will initialize on latest // or not -- earliest optional InitialPosition initialPosition = 13 [default = Latest]; + + // Mark the subscription as "replicated". Pulsar will make sure + // to periodically sync the state of replicated subscriptions + // across different clusters (when using geo-replication). + optional bool replicate_subscription_state = 14; } message CommandPartitionedTopicMetadata { diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java index 98b3735e96134..800fc803cbdde 100644 --- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java +++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java @@ -95,6 +95,9 @@ static class Arguments { @Parameter(names = { "-q", "--receiver-queue-size" }, description = "Size of the receiver queue") public int receiverQueueSize = 1000; + @Parameter(names = { "--replicated" }, description = "Whether the subscription status should be replicated") + public boolean replicatedSubscription = false; + @Parameter(names = { "--acks-delay-millis" }, description = "Acknowlegments grouping delay in millis") public int acknowledgmentsGroupingDelayMillis = 100; @@ -253,7 +256,8 @@ public EncryptionKeyInfo getPrivateKey(String keyName, Map keyMe .messageListener(listener) // .receiverQueueSize(arguments.receiverQueueSize) // .acknowledgmentGroupTime(arguments.acknowledgmentsGroupingDelayMillis, TimeUnit.MILLISECONDS) // - .subscriptionType(arguments.subscriptionType); + .subscriptionType(arguments.subscriptionType) + .replicateSubscriptionState(arguments.replicatedSubscription); if (arguments.encKeyName != null) { byte[] pKey = Files.readAllBytes(Paths.get(arguments.encKeyFile));