diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java index c8f7b721cce6a..451f93067b2ca 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java @@ -48,6 +48,7 @@ import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.awaitility.Awaitility; +import org.awaitility.reflect.WhiteboxImpl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.annotations.AfterMethod; @@ -67,6 +68,7 @@ public void setup() throws Exception { isTcpLookup = true; // enabled transaction, to test pattern consumers not subscribe to transaction system topic. conf.setTransactionCoordinatorEnabled(true); + conf.setSubscriptionPatternMaxLength(10000); super.internalSetup(); super.producerBaseSetup(); } @@ -210,6 +212,12 @@ public void testBinaryProtoToGetTopicsOfNamespacePersistent() throws Exception { .subscribe(); assertTrue(consumer.getTopic().startsWith(PatternMultiTopicsConsumerImpl.DUMMY_TOPIC_NAME_PREFIX)); + // Wait topic list watcher creation. + Awaitility.await().untilAsserted(() -> { + CompletableFuture completableFuture = WhiteboxImpl.getInternalState(consumer, "watcherFuture"); + assertTrue(completableFuture.isDone() && !completableFuture.isCompletedExceptionally()); + }); + // 4. verify consumer get methods, to get right number of partitions and topics. assertSame(pattern, ((PatternMultiTopicsConsumerImpl) consumer).getPattern()); List topics = ((PatternMultiTopicsConsumerImpl) consumer).getPartitions(); @@ -287,6 +295,12 @@ public void testBinaryProtoSubscribeAllTopicOfNamespace() throws Exception { .subscribe(); assertTrue(consumer.getTopic().startsWith(PatternMultiTopicsConsumerImpl.DUMMY_TOPIC_NAME_PREFIX)); + // Wait topic list watcher creation. + Awaitility.await().untilAsserted(() -> { + CompletableFuture completableFuture = WhiteboxImpl.getInternalState(consumer, "watcherFuture"); + assertTrue(completableFuture.isDone() && !completableFuture.isCompletedExceptionally()); + }); + // 4. verify consumer get methods, to get right number of partitions and topics. assertSame(pattern, ((PatternMultiTopicsConsumerImpl) consumer).getPattern()); List topics = ((PatternMultiTopicsConsumerImpl) consumer).getPartitions(); @@ -364,6 +378,12 @@ public void testBinaryProtoToGetTopicsOfNamespaceNonPersistent() throws Exceptio .subscriptionTopicsMode(RegexSubscriptionMode.NonPersistentOnly) .subscribe(); + // Wait topic list watcher creation. + Awaitility.await().untilAsserted(() -> { + CompletableFuture completableFuture = WhiteboxImpl.getInternalState(consumer, "watcherFuture"); + assertTrue(completableFuture.isDone() && !completableFuture.isCompletedExceptionally()); + }); + // 4. verify consumer get methods, to get right number of partitions and topics. assertSame(pattern, ((PatternMultiTopicsConsumerImpl) consumer).getPattern()); List topics = ((PatternMultiTopicsConsumerImpl) consumer).getPartitions(); @@ -455,6 +475,12 @@ public void testBinaryProtoToGetTopicsOfNamespaceAll() throws Exception { .ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS) .subscribe(); + // Wait topic list watcher creation. + Awaitility.await().untilAsserted(() -> { + CompletableFuture completableFuture = WhiteboxImpl.getInternalState(consumer, "watcherFuture"); + assertTrue(completableFuture.isDone() && !completableFuture.isCompletedExceptionally()); + }); + // 4. verify consumer get methods, to get right number of partitions and topics. assertSame(pattern, ((PatternMultiTopicsConsumerImpl) consumer).getPattern()); List topics = ((PatternMultiTopicsConsumerImpl) consumer).getPartitions(); @@ -525,6 +551,11 @@ public void testStartEmptyPatternConsumer() throws Exception { .ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS) .receiverQueueSize(4) .subscribe(); + // Wait topic list watcher creation. + Awaitility.await().untilAsserted(() -> { + CompletableFuture completableFuture = WhiteboxImpl.getInternalState(consumer, "watcherFuture"); + assertTrue(completableFuture.isDone() && !completableFuture.isCompletedExceptionally()); + }); // 3. verify consumer get methods, to get 5 number of partitions and topics. assertSame(pattern, ((PatternMultiTopicsConsumerImpl) consumer).getPattern()); @@ -605,6 +636,12 @@ public void testAutoSubscribePatterConsumerFromBrokerWatcher() throws Exception .receiverQueueSize(4) .subscribe(); + // Wait topic list watcher creation. + Awaitility.await().untilAsserted(() -> { + CompletableFuture completableFuture = WhiteboxImpl.getInternalState(consumer, "watcherFuture"); + assertTrue(completableFuture.isDone() && !completableFuture.isCompletedExceptionally()); + }); + // 1. create partition String topicName = "persistent://my-property/my-ns/pattern-topic-1-" + key; TenantInfoImpl tenantInfo = createDefaultTenantInfo(); @@ -665,6 +702,12 @@ public void testAutoSubscribePatternConsumer() throws Exception { .receiverQueueSize(4) .subscribe(); + // Wait topic list watcher creation. + Awaitility.await().untilAsserted(() -> { + CompletableFuture completableFuture = WhiteboxImpl.getInternalState(consumer, "watcherFuture"); + assertTrue(completableFuture.isDone() && !completableFuture.isCompletedExceptionally()); + }); + assertTrue(consumer instanceof PatternMultiTopicsConsumerImpl); // 4. verify consumer get methods, to get 6 number of partitions and topics: 6=1+2+3 @@ -775,6 +818,12 @@ public void testAutoUnsubscribePatternConsumer() throws Exception { .receiverQueueSize(4) .subscribe(); + // Wait topic list watcher creation. + Awaitility.await().untilAsserted(() -> { + CompletableFuture completableFuture = WhiteboxImpl.getInternalState(consumer, "watcherFuture"); + assertTrue(completableFuture.isDone() && !completableFuture.isCompletedExceptionally()); + }); + assertTrue(consumer instanceof PatternMultiTopicsConsumerImpl); // 4. verify consumer get methods, to get 0 number of partitions and topics: 6=1+2+3 @@ -861,6 +910,12 @@ public void testTopicDeletion() throws Exception { .subscriptionName("sub") .subscribe(); + // Wait topic list watcher creation. + Awaitility.await().untilAsserted(() -> { + CompletableFuture completableFuture = WhiteboxImpl.getInternalState(consumer, "watcherFuture"); + assertTrue(completableFuture.isDone() && !completableFuture.isCompletedExceptionally()); + }); + assertTrue(consumer instanceof PatternMultiTopicsConsumerImpl); PatternMultiTopicsConsumerImpl consumerImpl = (PatternMultiTopicsConsumerImpl) consumer;