diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java index 9061dd76b47fd..1e463fad1c954 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java @@ -167,6 +167,10 @@ protected boolean exists(String path) throws MetadataStoreException { } } + protected CompletableFuture existsAsync(String path) { + return cache.exists(path); + } + public int getOperationTimeoutSec() { return operationTimeoutSec; } diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java index dcba2ac854c3c..2beeab8c9510b 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java @@ -18,14 +18,11 @@ */ package org.apache.pulsar.broker.resources; -import static org.apache.pulsar.common.policies.path.PolicyPath.path; import com.fasterxml.jackson.core.type.TypeReference; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.stream.Collectors; import lombok.Getter; @@ -39,7 +36,6 @@ import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicies; import org.apache.pulsar.common.util.Codec; -import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.metadata.api.MetadataCache; import org.apache.pulsar.metadata.api.MetadataStore; import org.apache.pulsar.metadata.api.MetadataStoreException; @@ -122,7 +118,7 @@ public CompletableFuture setPoliciesAsync(NamespaceName ns, Function partitionedTopicExistsAsync(TopicName tn) { + return existsAsync(joinPath(PARTITIONED_TOPIC_PATH, tn.getNamespace(), tn.getDomain().value(), + tn.getEncodedLocalName())); + } + public CompletableFuture deletePartitionedTopicAsync(TopicName tn) { return deleteAsync(joinPath(PARTITIONED_TOPIC_PATH, tn.getNamespace(), tn.getDomain().value(), tn.getEncodedLocalName())); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java index 625f4191d6c04..f3a94d222fdca 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java @@ -632,10 +632,7 @@ protected void internalCreatePartitionedTopic(AsyncResponse asyncResponse, int n return; } - List> createFutureList = new ArrayList<>(); - CompletableFuture createLocalFuture = new CompletableFuture<>(); - createFutureList.add(createLocalFuture); checkTopicExistsAsync(topicName).thenAccept(exists -> { if (exists) { log.warn("[{}] Failed to create already existing topic {}", clientAppId(), topicName); @@ -658,7 +655,13 @@ protected void internalCreatePartitionedTopic(AsyncResponse asyncResponse, int n return null; }); - FutureUtil.waitForAll(createFutureList).whenComplete((ignored, ex) -> { + List replicatedClusters = new ArrayList<>(); + if (!createLocalTopicOnly && topicName.isGlobal() && isNamespaceReplicated(namespaceName)) { + getNamespaceReplicatedClusters(namespaceName) + .stream().filter(cluster -> !cluster.equals(pulsar().getConfiguration().getClusterName())) + .forEach(replicatedClusters::add); + } + createLocalFuture.whenComplete((ignored, ex) -> { if (ex != null) { log.error("[{}] Failed to create partitions for topic {}", clientAppId(), topicName, ex.getCause()); if (ex.getCause() instanceof RestException) { @@ -669,14 +672,20 @@ protected void internalCreatePartitionedTopic(AsyncResponse asyncResponse, int n return; } - if (!createLocalTopicOnly && topicName.isGlobal() && isNamespaceReplicated(namespaceName)) { - getNamespaceReplicatedClusters(namespaceName) - .stream() - .filter(cluster -> !cluster.equals(pulsar().getConfiguration().getClusterName())) - .forEach(cluster -> createFutureList.add( - ((TopicsImpl) pulsar().getBrokerService().getClusterPulsarAdmin(cluster).topics()) + if (!replicatedClusters.isEmpty()) { + replicatedClusters.forEach(cluster -> { + pulsar().getPulsarResources().getClusterResources().getClusterAsync(cluster) + .thenAccept(clusterDataOp -> { + ((TopicsImpl) pulsar().getBrokerService() + .getClusterPulsarAdmin(cluster, clusterDataOp).topics()) .createPartitionedTopicAsync( - topicName.getPartitionedTopicName(), numPartitions, true))); + topicName.getPartitionedTopicName(), numPartitions, true); + }) + .exceptionally(throwable -> { + log.error("Failed to create partition topic in cluster {}.", cluster, throwable); + return null; + }); + }); } log.info("[{}] Successfully created partitions for topic {} in cluster {}", diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index fb466bf77cc0a..ef4510cd6ef51 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -509,9 +509,14 @@ private CompletableFuture updatePartitionInOtherCluster(int numPartitions, if (cluster.equals(pulsar().getConfig().getClusterName())) { return; } - results.add(pulsar().getBrokerService().getClusterPulsarAdmin(cluster).topics() - .updatePartitionedTopicAsync(topicName.toString(), - numPartitions, true, false)); + CompletableFuture updatePartitionTopicFuture = + pulsar().getPulsarResources().getClusterResources().getClusterAsync(cluster) + .thenApply(clusterDataOp -> + pulsar().getBrokerService().getClusterPulsarAdmin(cluster, clusterDataOp)) + .thenCompose(pulsarAdmin -> + pulsarAdmin.topics().updatePartitionedTopicAsync( + topicName.toString(), numPartitions, true, false)); + results.add(updatePartitionTopicFuture); }); return FutureUtil.waitForAll(results); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java index b7e85dadb2464..59ec74fee29a9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java @@ -32,6 +32,7 @@ import org.apache.pulsar.client.impl.ProducerImpl; import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.util.FutureUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -64,14 +65,14 @@ protected enum State { } public AbstractReplicator(String topicName, String replicatorPrefix, String localCluster, String remoteCluster, - BrokerService brokerService) throws NamingException, PulsarServerException { - validatePartitionedTopic(topicName, brokerService); + BrokerService brokerService, PulsarClientImpl replicationClient) + throws PulsarServerException { this.brokerService = brokerService; this.topicName = topicName; this.replicatorPrefix = replicatorPrefix; this.localCluster = localCluster.intern(); this.remoteCluster = remoteCluster.intern(); - this.replicationClient = (PulsarClientImpl) brokerService.getReplicationClient(remoteCluster); + this.replicationClient = replicationClient; this.client = (PulsarClientImpl) brokerService.pulsar().getClient(); this.producer = null; this.producerQueueSize = brokerService.pulsar().getConfiguration().getReplicationProducerQueueSize(); @@ -242,20 +243,18 @@ public static String getReplicatorName(String replicatorPrefix, String cluster) * @param topic * @param brokerService */ - private void validatePartitionedTopic(String topic, BrokerService brokerService) throws NamingException { + public static CompletableFuture validatePartitionedTopicAsync(String topic, BrokerService brokerService) { TopicName topicName = TopicName.get(topic); - boolean isPartitionedTopic = false; - try { - isPartitionedTopic = - brokerService.pulsar().getPulsarResources().getNamespaceResources().getPartitionedTopicResources() - .partitionedTopicExists(topicName); - } catch (Exception e) { - log.warn("Failed to verify partitioned topic {}-{}", topicName, e.getMessage()); - } - if (isPartitionedTopic) { - throw new NamingException( - topicName + " is a partitioned-topic and replication can't be started for partitioned-producer "); - } + return brokerService.pulsar().getPulsarResources().getNamespaceResources().getPartitionedTopicResources() + .partitionedTopicExistsAsync(topicName).thenCompose(isPartitionedTopic -> { + if (isPartitionedTopic) { + String s = topicName + + " is a partitioned-topic and replication can't be started for partitioned-producer "; + log.error(s); + return FutureUtil.failedFuture(new NamingException(s)); + } + return CompletableFuture.completedFuture(null); + }); } private static final Logger log = LoggerFactory.getLogger(AbstractReplicator.class); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 14c211baee094..51843bbf56c21 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -1052,6 +1052,7 @@ public void deleteTopicAuthenticationWithRetry(String topic, CompletableFuture> createNonPersistentTopic(String topic) { + CompletableFuture> topicFuture = new CompletableFuture<>(); if (!pulsar.getConfiguration().isEnableNonPersistentTopics()) { if (log.isDebugEnabled()) { log.debug("Broker is unable to load non-persistent topic {}", topic); @@ -1061,27 +1062,40 @@ private CompletableFuture> createNonPersistentTopic(String topic } final long topicCreateTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()); NonPersistentTopic nonPersistentTopic = new NonPersistentTopic(topic, this); - - CompletableFuture> future = nonPersistentTopic.initialize() - .thenCompose(__ -> nonPersistentTopic.checkReplication()) - .thenApply(__ -> { - log.info("Created topic {}", nonPersistentTopic); - long topicLoadLatencyMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()) - topicCreateTimeMs; - pulsarStats.recordTopicLoadTimeValue(topic, topicLoadLatencyMs); - addTopicToStatsMaps(TopicName.get(topic), nonPersistentTopic); - return Optional.of(nonPersistentTopic); - }); - - future.exceptionally((ex) -> { - log.warn("Replication check failed. Removing topic from topics list {}, {}", topic, ex); - nonPersistentTopic.stopReplProducers().whenComplete((v, exception) -> { - pulsar.getExecutor().execute(() -> topics.remove(topic, future)); + CompletableFuture isOwner = checkTopicNsOwnership(topic); + isOwner.thenRun(() -> { + nonPersistentTopic.initialize() + .thenCompose(__ -> nonPersistentTopic.checkReplication()) + .thenRun(() -> { + log.info("Created topic {}", nonPersistentTopic); + long topicLoadLatencyMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()) - topicCreateTimeMs; + pulsarStats.recordTopicLoadTimeValue(topic, topicLoadLatencyMs); + addTopicToStatsMaps(TopicName.get(topic), nonPersistentTopic); + topicFuture.complete(Optional.of(nonPersistentTopic)); + }).exceptionally(ex -> { + log.warn("Replication check failed. Removing topic from topics list {}, {}", topic, ex.getCause()); + nonPersistentTopic.stopReplProducers().whenComplete((v, exception) -> { + pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); + topicFuture.completeExceptionally(ex); + }); + return null; }); - + }).exceptionally(e -> { + log.warn("CheckTopicNsOwnership fail when createNonPersistentTopic! {}", topic, e.getCause()); + // CheckTopicNsOwnership fail dont create nonPersistentTopic, when topic do lookup will find the correct + // broker. When client get non-persistent-partitioned topic + // metadata will the non-persistent-topic will be created. + // so we should add checkTopicNsOwnership logic otherwise the topic will be created + // if it dont own by this broker,we should return success + // otherwise it will keep retrying getPartitionedTopicMetadata + topicFuture.complete(Optional.of(nonPersistentTopic)); + // after get metadata return success, we should delete this topic from this broker, because this topic not + // owner by this broker and it don't initialize and checkReplication + pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); return null; }); - return future; + return topicFuture; } private CompletableFuture futureWithDeadline() { @@ -1089,7 +1103,7 @@ private CompletableFuture futureWithDeadline() { () -> FUTURE_DEADLINE_TIMEOUT_EXCEPTION); } - public PulsarClient getReplicationClient(String cluster) { + public PulsarClient getReplicationClient(String cluster, Optional clusterDataOp) { PulsarClient client = replicationClients.get(cluster); if (client != null) { return client; @@ -1097,7 +1111,7 @@ public PulsarClient getReplicationClient(String cluster) { return replicationClients.computeIfAbsent(cluster, key -> { try { - ClusterData data = pulsar.getPulsarResources().getClusterResources().getCluster(cluster) + ClusterData data = clusterDataOp .orElseThrow(() -> new MetadataStoreException.NotFoundException(cluster)); ClientBuilder clientBuilder = PulsarClient.builder() .enableTcpNoDelay(false) @@ -1164,14 +1178,14 @@ private void configTlsSettings(ClientBuilder clientBuilder, String serviceUrl, } } - public PulsarAdmin getClusterPulsarAdmin(String cluster) { + public PulsarAdmin getClusterPulsarAdmin(String cluster, Optional clusterDataOp) { PulsarAdmin admin = clusterAdmins.get(cluster); if (admin != null) { return admin; } return clusterAdmins.computeIfAbsent(cluster, key -> { try { - ClusterData data = pulsar.getPulsarResources().getClusterResources().getCluster(cluster) + ClusterData data = clusterDataOp .orElseThrow(() -> new MetadataStoreException.NotFoundException(cluster)); ServiceConfiguration conf = pulsar.getConfig(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java index dd57fd91337cc..ce1fe3443f5f3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java @@ -28,13 +28,13 @@ import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.service.AbstractReplicator; import org.apache.pulsar.broker.service.BrokerService; -import org.apache.pulsar.broker.service.BrokerServiceException.NamingException; import org.apache.pulsar.broker.service.Replicator; import org.apache.pulsar.broker.service.persistent.PersistentReplicator; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.impl.MessageImpl; import org.apache.pulsar.client.impl.ProducerImpl; +import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.client.impl.SendCallback; import org.apache.pulsar.common.policies.data.stats.NonPersistentReplicatorStatsImpl; import org.apache.pulsar.common.stats.Rate; @@ -49,8 +49,9 @@ public class NonPersistentReplicator extends AbstractReplicator implements Repli private final NonPersistentReplicatorStatsImpl stats = new NonPersistentReplicatorStatsImpl(); public NonPersistentReplicator(NonPersistentTopic topic, String localCluster, String remoteCluster, - BrokerService brokerService) throws NamingException, PulsarServerException { - super(topic.getName(), topic.getReplicatorPrefix(), localCluster, remoteCluster, brokerService); + BrokerService brokerService, PulsarClientImpl replicationClient) throws PulsarServerException { + super(topic.getName(), topic.getReplicatorPrefix(), localCluster, remoteCluster, brokerService, + replicationClient); producerBuilder.blockIfQueueFull(false); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index 981840bed80f7..43245b9e672e6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -32,7 +32,6 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.concurrent.atomic.LongAdder; import org.apache.bookkeeper.mledger.Entry; @@ -40,6 +39,7 @@ import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.resources.NamespaceResources; +import org.apache.pulsar.broker.service.AbstractReplicator; import org.apache.pulsar.broker.service.AbstractTopic; import org.apache.pulsar.broker.service.BrokerService; import org.apache.pulsar.broker.service.BrokerServiceException; @@ -62,6 +62,7 @@ import org.apache.pulsar.broker.stats.NamespaceStats; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.transaction.TxnID; +import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition; import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType; import org.apache.pulsar.common.api.proto.KeySharedMeta; @@ -529,14 +530,7 @@ public CompletableFuture checkReplication() { } if (!replicators.containsKey(cluster)) { - if (!startReplicator(cluster)) { - // it happens when global topic is a partitioned topic and replicator can't start on - // original - // non partitioned-topic (topic without partition prefix) - return FutureUtil - .failedFuture(new NamingException( - topic + " failed to start replicator for " + cluster)); - } + futures.add(startReplicator(cluster)); } } @@ -554,29 +548,35 @@ public CompletableFuture checkReplication() { } - boolean startReplicator(String remoteCluster) { + CompletableFuture startReplicator(String remoteCluster) { log.info("[{}] Starting replicator to remote: {}", topic, remoteCluster); String localCluster = brokerService.pulsar().getConfiguration().getClusterName(); return addReplicationCluster(remoteCluster, NonPersistentTopic.this, localCluster); } - protected boolean addReplicationCluster(String remoteCluster, NonPersistentTopic nonPersistentTopic, + protected CompletableFuture addReplicationCluster(String remoteCluster, NonPersistentTopic nonPersistentTopic, String localCluster) { - AtomicBoolean isReplicatorStarted = new AtomicBoolean(true); - replicators.computeIfAbsent(remoteCluster, r -> { - try { - return new NonPersistentReplicator(NonPersistentTopic.this, localCluster, remoteCluster, brokerService); - } catch (NamingException | PulsarServerException e) { - isReplicatorStarted.set(false); - log.error("[{}] Replicator startup failed due to partitioned-topic {}", topic, remoteCluster); - } - return null; - }); - // clean up replicator if startup is failed - if (!isReplicatorStarted.get()) { - replicators.remove(remoteCluster); - } - return isReplicatorStarted.get(); + return AbstractReplicator.validatePartitionedTopicAsync(nonPersistentTopic.getName(), brokerService) + .thenCompose(__ -> brokerService.pulsar().getPulsarResources().getClusterResources() + .getClusterAsync(remoteCluster) + .thenApply(clusterData -> + brokerService.getReplicationClient(remoteCluster, clusterData))) + .thenAccept(replicationClient -> { + replicators.computeIfAbsent(remoteCluster, r -> { + try { + return new NonPersistentReplicator(NonPersistentTopic.this, localCluster, + remoteCluster, brokerService, (PulsarClientImpl) replicationClient); + } catch (PulsarServerException e) { + log.error("[{}] Replicator startup failed {}", topic, remoteCluster, e); + } + return null; + }); + + // clean up replicator if startup is failed + if (replicators.containsKey(remoteCluster) && replicators.get(remoteCluster) == null) { + replicators.remove(remoteCluster); + } + }); } CompletableFuture removeReplicator(String remoteCluster) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java index aa53e150672ea..4d79c9a37cf72 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java @@ -45,7 +45,6 @@ import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.service.AbstractReplicator; import org.apache.pulsar.broker.service.BrokerService; -import org.apache.pulsar.broker.service.BrokerServiceException.NamingException; import org.apache.pulsar.broker.service.BrokerServiceException.TopicBusyException; import org.apache.pulsar.broker.service.Replicator; import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.Type; @@ -54,6 +53,7 @@ import org.apache.pulsar.client.impl.Backoff; import org.apache.pulsar.client.impl.MessageImpl; import org.apache.pulsar.client.impl.ProducerImpl; +import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.client.impl.SendCallback; import org.apache.pulsar.common.api.proto.MarkerType; import org.apache.pulsar.common.policies.data.Policies; @@ -105,8 +105,10 @@ public class PersistentReplicator extends AbstractReplicator private final ReplicatorStatsImpl stats = new ReplicatorStatsImpl(); public PersistentReplicator(PersistentTopic topic, ManagedCursor cursor, String localCluster, String remoteCluster, - BrokerService brokerService) throws NamingException, PulsarServerException { - super(topic.getName(), topic.getReplicatorPrefix(), localCluster, remoteCluster, brokerService); + BrokerService brokerService, PulsarClientImpl replicationClient) + throws PulsarServerException { + super(topic.getName(), topic.getReplicatorPrefix(), localCluster, remoteCluster, brokerService, + replicationClient); this.topic = topic; this.cursor = cursor; this.expiryMonitor = new PersistentMessageExpiryMonitor(topicName, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 71b94715089bf..5992677d4292c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -76,6 +76,7 @@ import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.resources.NamespaceResources.PartitionedTopicResources; +import org.apache.pulsar.broker.service.AbstractReplicator; import org.apache.pulsar.broker.service.AbstractTopic; import org.apache.pulsar.broker.service.BrokerService; import org.apache.pulsar.broker.service.BrokerServiceException; @@ -116,6 +117,7 @@ import org.apache.pulsar.client.impl.BatchMessageIdImpl; import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.client.impl.MessageImpl; +import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.common.api.proto.CommandSubscribe; import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition; import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType; @@ -264,20 +266,8 @@ public PersistentTopic(String topic, ManagedLedger ledger, BrokerService brokerS this.compactedTopic = new CompactedTopicImpl(brokerService.pulsar().getBookKeeperClient()); for (ManagedCursor cursor : ledger.getCursors()) { - if (cursor.getName().startsWith(replicatorPrefix)) { - String localCluster = brokerService.pulsar().getConfiguration().getClusterName(); - String remoteCluster = PersistentReplicator.getRemoteCluster(cursor.getName()); - boolean isReplicatorStarted = false; - try { - isReplicatorStarted = addReplicationCluster(remoteCluster, cursor, localCluster); - } catch (Exception e) { - log.warn("[{}] failed to start replication", topic, e); - } - if (!isReplicatorStarted) { - throw new NamingException( - PersistentTopic.this.getName() + " Failed to start replicator " + remoteCluster); - } - } else if (cursor.getName().equals(DEDUPLICATION_CURSOR_NAME)) { + if (cursor.getName().equals(DEDUPLICATION_CURSOR_NAME) + || cursor.getName().startsWith(replicatorPrefix)) { // This is not a regular subscription, we are going to // ignore it for now and let the message dedup logic to take care of it } else { @@ -308,7 +298,16 @@ public PersistentTopic(String topic, ManagedLedger ledger, BrokerService brokerS @Override public CompletableFuture initialize() { - return brokerService.pulsar().getPulsarResources().getNamespaceResources() + List> futures = new ArrayList<>(); + for (ManagedCursor cursor : ledger.getCursors()) { + if (cursor.getName().startsWith(replicatorPrefix)) { + String localCluster = brokerService.pulsar().getConfiguration().getClusterName(); + String remoteCluster = PersistentReplicator.getRemoteCluster(cursor.getName()); + futures.add(addReplicationCluster(remoteCluster, cursor, localCluster)); + } + } + return FutureUtil.waitForAll(futures).thenCompose(__ -> + brokerService.pulsar().getPulsarResources().getNamespaceResources() .getPoliciesAsync(TopicName.get(topic).getNamespaceObject()) .thenAccept(optPolicies -> { if (!optPolicies.isPresent()) { @@ -337,7 +336,7 @@ public CompletableFuture initialize() { updateUnackedMessagesAppliedOnSubscription(null); updateUnackedMessagesExceededOnConsumer(null); return null; - }); + })); } // for testing purposes @@ -1534,13 +1533,13 @@ CompletableFuture startReplicator(String remoteCluster) { @Override public void openCursorComplete(ManagedCursor cursor, Object ctx) { String localCluster = brokerService.pulsar().getConfiguration().getClusterName(); - boolean isReplicatorStarted = addReplicationCluster(remoteCluster, cursor, localCluster); - if (isReplicatorStarted) { - future.complete(null); - } else { - future.completeExceptionally(new NamingException( - PersistentTopic.this.getName() + " Failed to start replicator " + remoteCluster)); - } + addReplicationCluster(remoteCluster, cursor, localCluster).whenComplete((__, ex) -> { + if (ex == null) { + future.complete(null); + } else { + future.completeExceptionally(ex); + } + }); } @Override @@ -1553,23 +1552,29 @@ public void openCursorFailed(ManagedLedgerException exception, Object ctx) { return future; } - protected boolean addReplicationCluster(String remoteCluster, ManagedCursor cursor, String localCluster) { - AtomicBoolean isReplicatorStarted = new AtomicBoolean(true); - replicators.computeIfAbsent(remoteCluster, r -> { - try { - return new PersistentReplicator(PersistentTopic.this, cursor, localCluster, remoteCluster, - brokerService); - } catch (NamingException | PulsarServerException e) { - isReplicatorStarted.set(false); - log.error("[{}] Replicator startup failed due to partitioned-topic {}", topic, remoteCluster); - } - return null; - }); - // clean up replicator if startup is failed - if (!isReplicatorStarted.get()) { - replicators.remove(remoteCluster); - } - return isReplicatorStarted.get(); + protected CompletableFuture addReplicationCluster(String remoteCluster, ManagedCursor cursor, + String localCluster) { + return AbstractReplicator.validatePartitionedTopicAsync(PersistentTopic.this.getName(), brokerService) + .thenCompose(__ -> brokerService.pulsar().getPulsarResources().getClusterResources() + .getClusterAsync(remoteCluster) + .thenApply(clusterData -> + brokerService.getReplicationClient(remoteCluster, clusterData))) + .thenAccept(replicationClient -> { + replicators.computeIfAbsent(remoteCluster, r -> { + try { + return new PersistentReplicator(PersistentTopic.this, cursor, localCluster, + remoteCluster, brokerService, (PulsarClientImpl) replicationClient); + } catch (PulsarServerException e) { + log.error("[{}] Replicator startup failed {}", topic, remoteCluster, e); + } + return null; + }); + + // clean up replicator if startup is failed + if (replicators.containsKey(remoteCluster) && replicators.get(remoteCluster) == null) { + replicators.remove(remoteCluster); + } + }); } CompletableFuture removeReplicator(String remoteCluster) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java index 71fbac8ff2dd0..5fe651be704ce 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java @@ -1770,7 +1770,10 @@ public void testAtomicReplicationRemoval() throws Exception { doReturn(remoteCluster).when(cursor).getName(); brokerService.getReplicationClients().put(remoteCluster, client); PersistentReplicator replicator = spy( - new PersistentReplicator(topic, cursor, localCluster, remoteCluster, brokerService)); + new PersistentReplicator(topic, cursor, localCluster, remoteCluster, brokerService, + (PulsarClientImpl) brokerService.getReplicationClient(remoteCluster, + brokerService.pulsar().getPulsarResources().getClusterResources() + .getCluster(remoteCluster)))); replicatorMap.put(remoteReplicatorName, replicator); // step-1 remove replicator : it will disconnect the producer but it will wait for callback to be completed @@ -1815,7 +1818,10 @@ public void testClosingReplicationProducerTwice() throws Exception { ManagedCursor cursor = mock(ManagedCursorImpl.class); doReturn(remoteCluster).when(cursor).getName(); brokerService.getReplicationClients().put(remoteCluster, client); - PersistentReplicator replicator = new PersistentReplicator(topic, cursor, localCluster, remoteCluster, brokerService); + PersistentReplicator replicator = new PersistentReplicator(topic, cursor, localCluster, remoteCluster, + brokerService, (PulsarClientImpl) brokerService.getReplicationClient(remoteCluster, + brokerService.pulsar().getPulsarResources().getClusterResources() + .getCluster(remoteCluster))); // PersistentReplicator constructor calls startProducer() verify(clientImpl) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRemoveClusterTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRemoveClusterTest.java index 701ab47fa7f69..65e90966ea541 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRemoveClusterTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRemoveClusterTest.java @@ -75,7 +75,9 @@ public void testRemoveClusterFromNamespace() throws Exception { admin1.namespaces().createNamespace("pulsar1/ns1", Sets.newHashSet("r1", "r2", "r3")); - PulsarClient repClient1 = pulsar1.getBrokerService().getReplicationClient("r3"); + PulsarClient repClient1 = pulsar1.getBrokerService().getReplicationClient("r3", + pulsar1.getBrokerService().pulsar().getPulsarResources().getClusterResources() + .getCluster("r3")); Assert.assertNotNull(repClient1); Assert.assertFalse(repClient1.isClosed()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java index 9b4fcc87aa186..11c8e197f8df9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java @@ -262,7 +262,9 @@ public void testConcurrentReplicator() throws Exception { .getOrCreateTopic(topicName.toString()).get(); PulsarClientImpl pulsarClient = spy((PulsarClientImpl) pulsar1.getBrokerService() - .getReplicationClient("r3")); + .getReplicationClient("r3", + pulsar1.getBrokerService().pulsar().getPulsarResources().getClusterResources() + .getCluster("r3"))); final Method startRepl = PersistentTopic.class.getDeclaredMethod("startReplicator", String.class); startRepl.setAccessible(true); diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/GeoReplicationTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/GeoReplicationTest.java new file mode 100644 index 0000000000000..75ce3630b80e7 --- /dev/null +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/GeoReplicationTest.java @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.tests.integration.messaging; + +import lombok.Cleanup; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.tests.integration.topologies.PulsarGeoClusterTestBase; +import org.awaitility.Awaitility; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.nio.charset.StandardCharsets; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +/** + * Geo replication test. + */ +@Slf4j +public class GeoReplicationTest extends PulsarGeoClusterTestBase { + + @BeforeClass(alwaysRun = true) + public final void setupBeforeClass() throws Exception { + setup(); + } + + @AfterClass(alwaysRun = true) + public final void tearDownAfterClass() throws Exception { + cleanup(); + } + + @Test(timeOut = 1000 * 30, dataProvider = "TopicDomain") + public void testTopicReplication(String domain) throws Exception { + String cluster1 = getGeoCluster().getClusters()[0].getClusterName(); + String cluster2 = getGeoCluster().getClusters()[1].getClusterName(); + + @Cleanup + PulsarAdmin admin = PulsarAdmin.builder() + .serviceHttpUrl(getGeoCluster().getClusters()[0].getHttpServiceUrl()) + .requestTimeout(30, TimeUnit.SECONDS) + .build(); + + String topic = domain + "://public/default/testTopicReplication-" + UUID.randomUUID(); + Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> { + try { + admin.topics().createPartitionedTopic(topic, 10); + } catch (Exception e) { + log.error("Failed to create partitioned topic {}.", topic, e); + Assert.fail("Failed to create partitioned topic " + topic); + } + Assert.assertEquals(admin.topics().getPartitionedTopicMetadata(topic).partitions, 10); + }); + log.info("Test geo-replication produce and consume for topic {}.", topic); + + @Cleanup + PulsarClient client1 = PulsarClient.builder() + .serviceUrl(getGeoCluster().getClusters()[0].getPlainTextServiceUrl()) + .build(); + + @Cleanup + PulsarClient client2 = PulsarClient.builder() + .serviceUrl(getGeoCluster().getClusters()[1].getPlainTextServiceUrl()) + .build(); + + @Cleanup + Producer p = client1.newProducer() + .topic(topic) + .create(); + log.info("Successfully create producer in cluster {} for topic {}.", cluster1, topic); + + @Cleanup + Consumer c = client2.newConsumer() + .topic(topic) + .subscriptionName("geo-sub") + .subscribe(); + log.info("Successfully create consumer in cluster {} for topic {}.", cluster2, topic); + + for (int i = 0; i < 10; i++) { + p.send(String.format("Message [%d]", i).getBytes(StandardCharsets.UTF_8)); + } + log.info("Successfully produce message to cluster {} for topic {}.", cluster1, topic); + + for (int i = 0; i < 10; i++) { + Message message = c.receive(10, TimeUnit.SECONDS); + Assert.assertNotNull(message); + } + log.info("Successfully consume message from cluster {} for topic {}.", cluster2, topic); + } +} diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java index e566df1b84f62..7117cbb1f9e36 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java @@ -70,7 +70,14 @@ public class PulsarCluster { * @return the built pulsar cluster */ public static PulsarCluster forSpec(PulsarClusterSpec spec) { - return new PulsarCluster(spec); + CSContainer csContainer = new CSContainer(spec.clusterName) + .withNetwork(Network.newNetwork()) + .withNetworkAliases(CSContainer.NAME); + return new PulsarCluster(spec, csContainer, false); + } + + public static PulsarCluster forSpec(PulsarClusterSpec spec, CSContainer csContainer) { + return new PulsarCluster(spec, csContainer, true); } @Getter @@ -81,6 +88,7 @@ public static PulsarCluster forSpec(PulsarClusterSpec spec) { private final Network network; private final ZKContainer zkContainer; private final CSContainer csContainer; + private final boolean sharedCsContainer; private final Map bookieContainers; private final Map brokerContainers; private final Map workerContainers; @@ -91,11 +99,12 @@ public static PulsarCluster forSpec(PulsarClusterSpec spec) { private Map> externalServices = Collections.emptyMap(); private final boolean enablePrestoWorker; - private PulsarCluster(PulsarClusterSpec spec) { + private PulsarCluster(PulsarClusterSpec spec, CSContainer csContainer, boolean sharedCsContainer) { this.spec = spec; + this.sharedCsContainer = sharedCsContainer; this.clusterName = spec.clusterName(); - this.network = Network.newNetwork(); + this.network = csContainer.getNetwork(); this.enablePrestoWorker = spec.enablePrestoWorker(); this.sqlFollowWorkerContainers = Maps.newTreeMap(); @@ -110,26 +119,24 @@ private PulsarCluster(PulsarClusterSpec spec) { this.zkContainer = new ZKContainer(clusterName); this.zkContainer .withNetwork(network) - .withNetworkAliases(ZKContainer.NAME) + .withNetworkAliases(appendClusterName(ZKContainer.NAME)) .withEnv("clusterName", clusterName) - .withEnv("zkServers", ZKContainer.NAME) + .withEnv("zkServers", appendClusterName(ZKContainer.NAME)) .withEnv("configurationStore", CSContainer.NAME + ":" + CS_PORT) .withEnv("forceSync", "no") - .withEnv("pulsarNode", "pulsar-broker-0"); + .withEnv("pulsarNode", appendClusterName("pulsar-broker-0")); - this.csContainer = new CSContainer(clusterName) - .withNetwork(network) - .withNetworkAliases(CSContainer.NAME); + this.csContainer = csContainer; this.bookieContainers = Maps.newTreeMap(); this.brokerContainers = Maps.newTreeMap(); this.workerContainers = Maps.newTreeMap(); - this.proxyContainer = new ProxyContainer(clusterName, ProxyContainer.NAME) + this.proxyContainer = new ProxyContainer(appendClusterName("pulsar-proxy"), ProxyContainer.NAME) .withNetwork(network) - .withNetworkAliases("pulsar-proxy") - .withEnv("zkServers", ZKContainer.NAME) - .withEnv("zookeeperServers", ZKContainer.NAME) + .withNetworkAliases(appendClusterName("pulsar-proxy")) + .withEnv("zkServers", appendClusterName(ZKContainer.NAME)) + .withEnv("zookeeperServers", appendClusterName(ZKContainer.NAME)) .withEnv("configurationStoreServers", CSContainer.NAME + ":" + CS_PORT) .withEnv("clusterName", clusterName); if (spec.proxyEnvs != null) { @@ -143,8 +150,8 @@ private PulsarCluster(PulsarClusterSpec spec) { bookieContainers.putAll( runNumContainers("bookie", spec.numBookies(), (name) -> new BKContainer(clusterName, name) .withNetwork(network) - .withNetworkAliases(name) - .withEnv("zkServers", ZKContainer.NAME) + .withNetworkAliases(appendClusterName(name)) + .withEnv("zkServers", appendClusterName(ZKContainer.NAME)) .withEnv("useHostNameAsBookieID", "true") // Disable fsyncs for tests since they're slow within the containers .withEnv("journalSyncData", "false") @@ -158,11 +165,11 @@ private PulsarCluster(PulsarClusterSpec spec) { // create brokers brokerContainers.putAll( runNumContainers("broker", spec.numBrokers(), (name) -> { - BrokerContainer brokerContainer = new BrokerContainer(clusterName, name) + BrokerContainer brokerContainer = new BrokerContainer(clusterName, appendClusterName(name)) .withNetwork(network) - .withNetworkAliases(name) - .withEnv("zkServers", ZKContainer.NAME) - .withEnv("zookeeperServers", ZKContainer.NAME) + .withNetworkAliases(appendClusterName(name)) + .withEnv("zkServers", appendClusterName(ZKContainer.NAME)) + .withEnv("zookeeperServers", appendClusterName(ZKContainer.NAME)) .withEnv("configurationStoreServers", CSContainer.NAME + ":" + CS_PORT) .withEnv("clusterName", clusterName) .withEnv("brokerServiceCompactionMonitorIntervalInSeconds", "1") @@ -238,8 +245,10 @@ public void start() throws Exception { log.info("Successfully started local zookeeper container."); // start the configuration store - csContainer.start(); - log.info("Successfully started configuration store container."); + if (!sharedCsContainer) { + csContainer.start(); + log.info("Successfully started configuration store container."); + } // init the cluster zkContainer.execCmd( @@ -338,9 +347,11 @@ public synchronized void stop() { if (null != proxyContainer) { containers.add(proxyContainer); } - if (null != csContainer) { + + if (!sharedCsContainer && null != csContainer) { containers.add(csContainer); } + if (null != zkContainer) { containers.add(zkContainer); } @@ -672,4 +683,8 @@ public void dumpFunctionLogs(String name) { } } } + + private String appendClusterName(String name) { + return sharedCsContainer ? clusterName + "-" + name : name; + } } diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarGeoCluster.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarGeoCluster.java new file mode 100644 index 0000000000000..9be3c382b7035 --- /dev/null +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarGeoCluster.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.tests.integration.topologies; + +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.tests.integration.containers.CSContainer; +import org.testcontainers.containers.Network; + +@Slf4j +public class PulsarGeoCluster { + + @Getter + private final PulsarClusterSpec[] clusterSpecs; + + @Getter + private final CSContainer csContainer; + + @Getter + private final PulsarCluster[] clusters; + + /** + * Pulsar Cluster Spec + * + * @param specs each pulsar cluster spec. + * @return the built a pulsar cluster with geo replication + */ + public static PulsarGeoCluster forSpec(PulsarClusterSpec... specs) { + return new PulsarGeoCluster(specs); + } + + public PulsarGeoCluster(PulsarClusterSpec... clusterSpecs) { + this.clusterSpecs = clusterSpecs; + this.clusters = new PulsarCluster[clusterSpecs.length]; + + this.csContainer = new CSContainer("geo-cluster") + .withNetwork(Network.newNetwork()) + .withNetworkAliases(CSContainer.NAME); + + for (int i = 0; i < this.clusters.length; i++) { + clusters[i] = PulsarCluster.forSpec(this.clusterSpecs[i], this.csContainer); + } + } + + public void start() throws Exception { + // start the configuration store + this.csContainer.start(); + log.info("Successfully started configuration store container."); + + for (PulsarCluster cluster : clusters) { + cluster.start(); + log.info("Successfully started all components for cluster {}.", cluster.getClusterName()); + } + } + + public void stop() throws Exception { + for (PulsarCluster cluster : clusters) { + cluster.stop(); + log.info("Successfully stopped all components for cluster {}.", cluster.getClusterName()); + } + // stop the configuration store + this.csContainer.stop(); + log.info("Successfully stopped configuration store container."); + } + +} diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarGeoClusterTestBase.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarGeoClusterTestBase.java new file mode 100644 index 0000000000000..51c74eee50b18 --- /dev/null +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarGeoClusterTestBase.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.tests.integration.topologies; + +import static java.util.stream.Collectors.joining; +import java.util.stream.Stream; + +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class PulsarGeoClusterTestBase extends PulsarTestBase { + + @Override + protected final void setup() throws Exception { + setupCluster(); + } + + @Override + protected final void cleanup() throws Exception { + tearDownCluster(); + } + + protected void setupCluster() throws Exception { + this.setupCluster(""); + } + + @Getter + private PulsarGeoCluster geoCluster; + + public void setupCluster(String namePrefix) throws Exception { + PulsarClusterSpec.PulsarClusterSpecBuilder[] specBuilders = new PulsarClusterSpec.PulsarClusterSpecBuilder[2]; + for (int i = 0; i < 2; i++) { + String clusterName = Stream.of(this.getClass().getSimpleName(), namePrefix, String.valueOf(i), + randomName(5)) + .filter(s -> s != null && !s.isEmpty()) + .collect(joining("-")); + specBuilders[i] = PulsarClusterSpec.builder().clusterName(clusterName); + } + specBuilders = beforeSetupCluster(specBuilders); + PulsarClusterSpec[] specs = new PulsarClusterSpec[2]; + for (int i = 0; i < specBuilders.length; i++) { + specs[i] = specBuilders[i].build(); + } + setupCluster0(specs); + } + + protected PulsarClusterSpec.PulsarClusterSpecBuilder[] beforeSetupCluster ( + PulsarClusterSpec.PulsarClusterSpecBuilder... specBuilder) { + return specBuilder; + } + + protected void setupCluster0(PulsarClusterSpec... specs) throws Exception { + incrementSetupNumber(); + log.info("Setting up geo cluster with {} local clusters}", specs.length); + + this.geoCluster = PulsarGeoCluster.forSpec(specs); + + beforeStartCluster(); + + this.geoCluster.start(); + + log.info("Geo Cluster is setup!"); + } + + protected void beforeStartCluster() throws Exception { + // no-op + } + + public void tearDownCluster() throws Exception { + markCurrentSetupNumberCleaned(); + if (null != this.geoCluster) { + this.geoCluster.stop(); + } + } +} diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarTestBase.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarTestBase.java index 9989b15faa95e..ebdfbe84de401 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarTestBase.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarTestBase.java @@ -34,9 +34,18 @@ import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.tests.TestRetrySupport; import org.testng.Assert; +import org.testng.annotations.DataProvider; public abstract class PulsarTestBase extends TestRetrySupport { + @DataProvider(name = "TopicDomain") + public Object[][] topicDomain() { + return new Object[][] { + {"persistent"}, + {"non-persistent"} + }; + } + public static String randomName(int numChars) { StringBuilder sb = new StringBuilder(); for (int i = 0; i < numChars; i++) { diff --git a/tests/integration/src/test/resources/pulsar-messaging.xml b/tests/integration/src/test/resources/pulsar-messaging.xml index 631e961cbfb20..cfbdb22587034 100644 --- a/tests/integration/src/test/resources/pulsar-messaging.xml +++ b/tests/integration/src/test/resources/pulsar-messaging.xml @@ -24,6 +24,7 @@ +