Skip to content

Commit

Permalink
In Namespace.addClusters and Properties.addRoles changed dataType to…
Browse files Browse the repository at this point in the history
… Set (#1494)

* Removed Pulsar Functions from AdminTool

* In Namespace.addClusters and Properties.addRoles changed dataType to Set

* Revert "Removed Pulsar Functions from AdminTool"

This reverts commit e04f4e2.

* Fixed compilation for few Tests

* Converted replication_clusters into a Set
  • Loading branch information
Jai Asher authored and merlimat committed Apr 4, 2018
1 parent f6cc450 commit f52f4ef
Show file tree
Hide file tree
Showing 50 changed files with 131 additions and 145 deletions.
Expand Up @@ -230,7 +230,7 @@ void start() throws Exception {

if (!admin.properties().getProperties().contains(property)) {
admin.properties().createProperty(property,
new PropertyAdmin(Lists.newArrayList(config.getSuperUserRoles()), Sets.newHashSet(cluster)));
new PropertyAdmin(Sets.newHashSet(config.getSuperUserRoles()), Sets.newHashSet(cluster)));
}

if (!admin.namespaces().getNamespaces(property).contains(namespace)) {
Expand Down
Expand Up @@ -141,7 +141,7 @@ protected void internalDeleteNamespace(boolean authoritative) {
if (policies.replication_clusters.size() == 1
&& !policies.replication_clusters.contains(config().getClusterName())) {
// the only replication cluster is other cluster, redirect
String replCluster = policies.replication_clusters.get(0);
String replCluster = Lists.newArrayList(policies.replication_clusters).get(0);
ClusterData replClusterData = clustersCache().get(AdminResource.path("clusters", replCluster))
.orElseThrow(() -> new RestException(Status.NOT_FOUND,
"Cluster " + replCluster + " does not exist"));
Expand Down Expand Up @@ -242,7 +242,7 @@ protected void internalDeleteNamespaceBundle(String bundleRange, boolean authori
if (policies.replication_clusters.size() == 1
&& !policies.replication_clusters.contains(config().getClusterName())) {
// the only replication cluster is other cluster, redirect
String replCluster = policies.replication_clusters.get(0);
String replCluster = Lists.newArrayList(policies.replication_clusters).get(0);
ClusterData replClusterData = clustersCache().get(AdminResource.path("clusters", replCluster))
.orElseThrow(() -> new RestException(Status.NOT_FOUND,
"Cluser " + replCluster + " does not exist"));
Expand Down Expand Up @@ -346,7 +346,7 @@ protected void internalRevokePermissionsOnNamespace(String role) {
}
}

protected List<String> internalGetNamespaceReplicationClusters() {
protected Set<String> internalGetNamespaceReplicationClusters() {
if (!namespaceName.isGlobal()) {
throw new RestException(Status.PRECONDITION_FAILED,
"Cannot get the replication clusters for a non-global namespace");
Expand Down Expand Up @@ -388,7 +388,7 @@ protected void internalSetNamespaceReplicationClusters(List<String> clusterIds)
// Force to read the data s.t. the watch to the cache content is setup.
policiesNode = policiesCache().getWithStat(path(POLICIES, namespaceName.toString())).orElseThrow(
() -> new RestException(Status.NOT_FOUND, "Namespace " + namespaceName + " does not exist"));
policiesNode.getKey().replication_clusters = clusterIds;
policiesNode.getKey().replication_clusters = replicationClusterSet;

// Write back the new policies into zookeeper
globalZk().setData(path(POLICIES, namespaceName.toString()),
Expand Down
Expand Up @@ -249,7 +249,7 @@ public void revokePermissionsOnNamespace(@PathParam("property") String property,
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist"),
@ApiResponse(code = 412, message = "Namespace is not global") })
public List<String> getNamespaceReplicationClusters(@PathParam("property") String property,
public Set<String> getNamespaceReplicationClusters(@PathParam("property") String property,
@PathParam("cluster") String cluster, @PathParam("namespace") String namespace) {
validateAdminAccessOnProperty(property);
validateNamespaceName(property, cluster, namespace);
Expand Down
Expand Up @@ -184,7 +184,7 @@ public void revokePermissionsOnNamespace(@PathParam("property") String property,
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist"),
@ApiResponse(code = 412, message = "Namespace is not global") })
public List<String> getNamespaceReplicationClusters(@PathParam("property") String property,
public Set<String> getNamespaceReplicationClusters(@PathParam("property") String property,
@PathParam("namespace") String namespace) {
validateAdminAccessOnProperty(property);
validateNamespaceName(property, namespace);
Expand Down
Expand Up @@ -532,7 +532,7 @@ public CompletableFuture<Void> checkReplication() {

Set<String> configuredClusters;
if (policies.replication_clusters != null) {
configuredClusters = Sets.newTreeSet(policies.replication_clusters);
configuredClusters = policies.replication_clusters;
} else {
configuredClusters = Collections.emptySet();
}
Expand Down
Expand Up @@ -607,7 +607,7 @@ protected static CompletableFuture<ClusterData> checkLocalOrGetPeerReplicationCl
return validationFuture;
}

private static ClusterData getOwnerFromPeerClusterList(PulsarService pulsar, List<String> replicationClusters) {
private static ClusterData getOwnerFromPeerClusterList(PulsarService pulsar, Set<String> replicationClusters) {
String currentCluster = pulsar.getConfiguration().getClusterName();
if (replicationClusters == null || replicationClusters.isEmpty() || isBlank(currentCluster)) {
return null;
Expand All @@ -619,9 +619,8 @@ private static ClusterData getOwnerFromPeerClusterList(PulsarService pulsar, Lis
if (!cluster.isPresent() || cluster.get().getPeerClusterNames() == null) {
return null;
}
Set<String> replicationClusterSet = Sets.newHashSet(replicationClusters);
for (String peerCluster : cluster.get().getPeerClusterNames()) {
if (replicationClusterSet.contains(peerCluster)) {
if (replicationClusters.contains(peerCluster)) {
return pulsar.getConfigurationCache().clustersCache().get(path("clusters", peerCluster))
.orElseThrow(() -> new RestException(Status.NOT_FOUND,
"Peer cluster " + peerCluster + " data not found"));
Expand Down
Expand Up @@ -25,7 +25,6 @@

import java.net.MalformedURLException;
import java.net.URL;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -118,7 +117,7 @@ private void createProperty(PulsarAdmin pulsarAdmin)
allowedClusters.add("my-cluster");
PropertyAdmin adminConfig = new PropertyAdmin();
adminConfig.setAllowedClusters(allowedClusters);
List<String> adminRoles = new ArrayList<>();
Set<String> adminRoles = new HashSet<>();
adminRoles.add("");
adminConfig.setAdminRoles(adminRoles);
pulsarAdmin.properties().createProperty("sla-monitor", adminConfig);
Expand Down
Expand Up @@ -35,7 +35,6 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand All @@ -44,7 +43,6 @@
import javax.ws.rs.client.InvocationCallback;
import javax.ws.rs.client.WebTarget;

import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
Expand All @@ -61,7 +59,6 @@
import org.apache.pulsar.client.admin.internal.LookupImpl;
import org.apache.pulsar.client.admin.internal.PersistentTopicsImpl;
import org.apache.pulsar.client.admin.internal.PropertiesImpl;
import org.apache.pulsar.client.api.ClientConfiguration;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.Message;
Expand Down Expand Up @@ -111,6 +108,8 @@
import com.google.common.collect.Sets;
import com.google.common.hash.Hashing;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class AdminApiTest extends MockedPulsarServiceBaseTest {

Expand Down Expand Up @@ -152,7 +151,7 @@ public void setup() throws Exception {

// Setup namespaces
admin.clusters().createCluster("use", new ClusterData("http://127.0.0.1" + ":" + BROKER_WEBSERVICE_PORT));
PropertyAdmin propertyAdmin = new PropertyAdmin(Lists.newArrayList("role1", "role2"), Sets.newHashSet("use"));
PropertyAdmin propertyAdmin = new PropertyAdmin(Sets.newHashSet("role1", "role2"), Sets.newHashSet("use"));
admin.properties().createProperty("prop-xyz", propertyAdmin);
admin.namespaces().createNamespace("prop-xyz/use/ns1");
}
Expand Down Expand Up @@ -555,14 +554,14 @@ public void testGetDynamicLocalConfiguration() throws Exception {
@Test(enabled = true)
public void properties() throws PulsarAdminException {
Set<String> allowedClusters = Sets.newHashSet("use");
PropertyAdmin propertyAdmin = new PropertyAdmin(Lists.newArrayList("role1", "role2"), allowedClusters);
PropertyAdmin propertyAdmin = new PropertyAdmin(Sets.newHashSet("role1", "role2"), allowedClusters);
admin.properties().updateProperty("prop-xyz", propertyAdmin);

assertEquals(admin.properties().getProperties(), Lists.newArrayList("prop-xyz"));

assertEquals(admin.properties().getPropertyAdmin("prop-xyz"), propertyAdmin);

PropertyAdmin newPropertyAdmin = new PropertyAdmin(Lists.newArrayList("role3", "role4"), allowedClusters);
PropertyAdmin newPropertyAdmin = new PropertyAdmin(Sets.newHashSet("role3", "role4"), allowedClusters);
admin.properties().updateProperty("prop-xyz", newPropertyAdmin);

assertEquals(admin.properties().getPropertyAdmin("prop-xyz"), newPropertyAdmin);
Expand All @@ -583,7 +582,7 @@ public void properties() throws PulsarAdminException {
@Test(invocationCount = 1)
public void namespaces() throws PulsarAdminException, PulsarServerException, Exception {
admin.clusters().createCluster("usw", new ClusterData());
PropertyAdmin propertyAdmin = new PropertyAdmin(Lists.newArrayList("role1", "role2"),
PropertyAdmin propertyAdmin = new PropertyAdmin(Sets.newHashSet("role1", "role2"),
Sets.newHashSet("use", "usw"));
admin.properties().updateProperty("prop-xyz", propertyAdmin);

Expand Down Expand Up @@ -1615,7 +1614,7 @@ class CustomPropertyAdmin extends PropertyAdmin {
public int newProperty;
}

PropertyAdmin pa = new PropertyAdmin(Lists.newArrayList("test_appid1", "test_appid2"), Sets.newHashSet("use"));
PropertyAdmin pa = new PropertyAdmin(Sets.newHashSet("test_appid1", "test_appid2"), Sets.newHashSet("use"));
CustomPropertyAdmin cpa = new CustomPropertyAdmin();
cpa.setAdminRoles(pa.getAdminRoles());
cpa.setAllowedClusters(pa.getAllowedClusters());
Expand Down Expand Up @@ -1877,7 +1876,7 @@ public PulsarAdmin getAdmin() {
@Test
public void testTopicBundleRangeLookup() throws PulsarAdminException, PulsarServerException, Exception {
admin.clusters().createCluster("usw", new ClusterData());
PropertyAdmin propertyAdmin = new PropertyAdmin(Lists.newArrayList("role1", "role2"),
PropertyAdmin propertyAdmin = new PropertyAdmin(Sets.newHashSet("role1", "role2"),
Sets.newHashSet("use", "usw"));
admin.properties().updateProperty("prop-xyz", propertyAdmin);
admin.namespaces().createNamespace("prop-xyz/use/getBundleNs", 100);
Expand Down
Expand Up @@ -88,7 +88,7 @@ public void setup() throws Exception {

// Setup namespaces
admin.clusters().createCluster("use", new ClusterData("http://127.0.0.1" + ":" + BROKER_WEBSERVICE_PORT));
PropertyAdmin propertyAdmin = new PropertyAdmin(Lists.newArrayList("role1", "role2"), Sets.newHashSet("use"));
PropertyAdmin propertyAdmin = new PropertyAdmin(Sets.newHashSet("role1", "role2"), Sets.newHashSet("use"));
admin.properties().createProperty("prop-xyz", propertyAdmin);
admin.namespaces().createNamespace("prop-xyz/use/ns1");
}
Expand Down Expand Up @@ -599,7 +599,7 @@ public void testReplicationPeerCluster() throws Exception {
final String property = "peer-prop";
Set<String> allowedClusters = Sets.newHashSet("us-west1", "us-west2", "us-west3", "us-west4", "us-east1",
"us-east2");
PropertyAdmin propConfig = new PropertyAdmin(Lists.newArrayList("test"), allowedClusters);
PropertyAdmin propConfig = new PropertyAdmin(Sets.newHashSet("test"), allowedClusters);
admin.properties().createProperty(property, propConfig);

final String namespace = property + "/global/conflictPeer";
Expand All @@ -611,23 +611,23 @@ public void testReplicationPeerCluster() throws Exception {
Lists.newArrayList("us-west2", "us-west3"));

// (1) no conflicting peer
List<String> clusterIds = Lists.newArrayList("us-east1", "us-east2");
Set<String> clusterIds = Sets.newHashSet("us-east1", "us-east2");
admin.namespaces().setNamespaceReplicationClusters(namespace, clusterIds);

// (2) conflicting peer
clusterIds = Lists.newArrayList("us-west2", "us-west3", "us-west1");
clusterIds = Sets.newHashSet("us-west2", "us-west3", "us-west1");
try {
admin.namespaces().setNamespaceReplicationClusters(namespace, clusterIds);
fail("Peer-cluster can't coexist in replication cluster list");
} catch (PulsarAdminException.ConflictException e) {
// Ok
}

clusterIds = Lists.newArrayList("us-west2", "us-west3");
clusterIds = Sets.newHashSet("us-west2", "us-west3");
// no peer coexist in replication clusters
admin.namespaces().setNamespaceReplicationClusters(namespace, clusterIds);

clusterIds = Lists.newArrayList("us-west1", "us-west4");
clusterIds = Sets.newHashSet("us-west1", "us-west4");
// no peer coexist in replication clusters
admin.namespaces().setNamespaceReplicationClusters(namespace, clusterIds);
}
Expand Down
Expand Up @@ -52,9 +52,9 @@
import org.apache.pulsar.broker.admin.v1.BrokerStats;
import org.apache.pulsar.broker.admin.v1.Brokers;
import org.apache.pulsar.broker.admin.v1.Clusters;
import org.apache.pulsar.broker.admin.v1.Properties;
import org.apache.pulsar.broker.admin.v1.Namespaces;
import org.apache.pulsar.broker.admin.v1.PersistentTopics;
import org.apache.pulsar.broker.admin.v1.Properties;
import org.apache.pulsar.broker.admin.v1.ResourceQuotas;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.cache.ConfigurationCacheService;
Expand Down Expand Up @@ -358,7 +358,7 @@ void properties() throws Exception {
verify(properties, times(1)).validateSuperUserAccess();

Set<String> allowedClusters = Sets.newHashSet();
PropertyAdmin propertyAdmin = new PropertyAdmin(Lists.newArrayList("role1", "role2"), allowedClusters);
PropertyAdmin propertyAdmin = new PropertyAdmin(Sets.newHashSet("role1", "role2"), allowedClusters);
properties.createProperty("test-property", propertyAdmin);
verify(properties, times(2)).validateSuperUserAccess();

Expand All @@ -368,7 +368,7 @@ void properties() throws Exception {
assertEquals(properties.getPropertyAdmin("test-property"), propertyAdmin);
verify(properties, times(4)).validateSuperUserAccess();

PropertyAdmin newPropertyAdmin = new PropertyAdmin(Lists.newArrayList("role1", "other-role"), allowedClusters);
PropertyAdmin newPropertyAdmin = new PropertyAdmin(Sets.newHashSet("role1", "other-role"), allowedClusters);
properties.updateProperty("test-property", newPropertyAdmin);
verify(properties, times(5)).validateSuperUserAccess();

Expand Down Expand Up @@ -466,7 +466,7 @@ void properties() throws Exception {

// Create a namespace to test deleting a non-empty property
clusters.createCluster("use", new ClusterData());
newPropertyAdmin = new PropertyAdmin(Lists.newArrayList("role1", "other-role"), Sets.newHashSet("use"));
newPropertyAdmin = new PropertyAdmin(Sets.newHashSet("role1", "other-role"), Sets.newHashSet("use"));
properties.createProperty("my-property", newPropertyAdmin);

namespaces.createNamespace("my-property", "use", "my-namespace", new BundlesData());
Expand Down
Expand Up @@ -50,7 +50,7 @@ public void setup() throws Exception {

// Setup namespaces
admin.clusters().createCluster("use", new ClusterData("http://127.0.0.1" + ":" + BROKER_WEBSERVICE_PORT));
PropertyAdmin propertyAdmin = new PropertyAdmin(Lists.newArrayList("role1", "role2"), Sets.newHashSet("use"));
PropertyAdmin propertyAdmin = new PropertyAdmin(Sets.newHashSet("role1", "role2"), Sets.newHashSet("use"));
admin.properties().createProperty("prop-xyz", propertyAdmin);
admin.namespaces().createNamespace("prop-xyz/use/ns1");
}
Expand Down

0 comments on commit f52f4ef

Please sign in to comment.