Skip to content

Commit

Permalink
If no clusters are specified, the namespace should by default use cur…
Browse files Browse the repository at this point in the history
…rent cluster (#3571)

* If no clusters are specified, the namespace should by default use current cluster

* Don't apply to v1 local namespaces

* It's better to use the isV2() check instead of isGlobal()
  • Loading branch information
merlimat committed Feb 14, 2019
1 parent 14a5962 commit 48e4a0b
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 2 deletions.
Expand Up @@ -30,6 +30,7 @@

import java.net.URI;
import java.net.URL;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
Expand Down Expand Up @@ -74,9 +75,9 @@
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.SchemaAutoUpdateCompatibilityStrategy;
import org.apache.pulsar.common.policies.data.SubscribeRate;
import org.apache.pulsar.common.policies.data.SubscriptionAuthMode;
import org.apache.pulsar.common.policies.data.SchemaAutoUpdateCompatibilityStrategy;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;
Expand Down Expand Up @@ -404,7 +405,7 @@ protected void internalRevokePermissionsOnSubscription(String subscriptionName,
throw new RestException(Status.NOT_IMPLEMENTED, "Authorization is not enabled");
}
}

protected Set<String> internalGetNamespaceReplicationClusters() {
if (!namespaceName.isGlobal()) {
throw new RestException(Status.PRECONDITION_FAILED,
Expand Down Expand Up @@ -1489,6 +1490,11 @@ public static BundlesData getBundles(int numBundles) {
}

private void validatePolicies(NamespaceName ns, Policies policies) {
if (ns.isV2() && policies.replication_clusters.isEmpty()) {
// Default to local cluster
policies.replication_clusters = Collections.singleton(config().getClusterName());
}

// Validate cluster names and permissions
policies.replication_clusters.forEach(cluster -> validateClusterForTenant(ns.getTenant(), cluster));

Expand Down
Expand Up @@ -31,6 +31,7 @@

import java.net.URL;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -903,4 +904,15 @@ public void testClusterIsReadyBeforeCreateTopic() throws PulsarAdminException {
} catch (PulsarAdminException.PreconditionFailedException e) {
}
}

@Test
public void testCreateNamespaceWithNoClusters() throws PulsarAdminException {
String localCluster = pulsar.getConfiguration().getClusterName();
String namespace = "prop-xyz/test-ns-with-no-clusters";
admin.namespaces().createNamespace(namespace);

// Global cluster, if there, should be omitted from the results
assertEquals(admin.namespaces().getNamespaceReplicationClusters(namespace),
Collections.singletonList(localCluster));
}
}

0 comments on commit 48e4a0b

Please sign in to comment.