diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java index 60329d7670460..59027c2d48710 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java @@ -95,7 +95,7 @@ public Map getOwnedNamespaes(@PathParam("clust throw new RestException(e); } } - + @POST @Path("/configuration/{configName}/{configValue}") @ApiOperation(value = "Update dynamic serviceconfiguration into zk only. This operation requires Pulsar super-user privileges.") diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java index de670c136ebfc..c05ea77a1c56e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java @@ -40,12 +40,15 @@ import org.apache.bookkeeper.util.ZkUtils; import org.apache.pulsar.broker.admin.AdminResource; import org.apache.pulsar.broker.cache.ConfigurationCacheService; +import static org.apache.pulsar.broker.namespace.NamespaceService.NAMESPACE_ISOLATION_POLICIES; import org.apache.pulsar.broker.web.RestException; import org.apache.pulsar.common.naming.NamedEntity; +import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationData; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.FailureDomain; import org.apache.pulsar.common.policies.data.NamespaceIsolationData; import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicies; +import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicyImpl; import org.apache.pulsar.common.util.ObjectMapperFactory; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; @@ -56,6 +59,7 @@ import com.fasterxml.jackson.core.JsonGenerationException; import com.fasterxml.jackson.databind.JsonMappingException; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; import io.swagger.annotations.ApiOperation; @@ -262,7 +266,7 @@ public void deleteCluster(@PathParam("cluster") String cluster) { } // check the namespaceIsolationPolicies associated with the cluster - String path = path("clusters", cluster, "namespaceIsolationPolicies"); + String path = path("clusters", cluster, NAMESPACE_ISOLATION_POLICIES); Optional nsIsolationPolicies = namespaceIsolationPoliciesCache().get(path); // Need to delete the isolation policies if present @@ -332,7 +336,7 @@ public Map getNamespaceIsolationPolicies(@PathPa try { NamespaceIsolationPolicies nsIsolationPolicies = namespaceIsolationPoliciesCache() - .get(path("clusters", cluster, "namespaceIsolationPolicies")) + .get(path("clusters", cluster, NAMESPACE_ISOLATION_POLICIES)) .orElseThrow(() -> new RestException(Status.NOT_FOUND, "NamespaceIsolationPolicies for cluster " + cluster + " does not exist")); // construct the response to NamespaceisolationData map @@ -356,7 +360,7 @@ public NamespaceIsolationData getNamespaceIsolationPolicy(@PathParam("cluster") try { NamespaceIsolationPolicies nsIsolationPolicies = namespaceIsolationPoliciesCache() - .get(path("clusters", cluster, "namespaceIsolationPolicies")) + .get(path("clusters", cluster, NAMESPACE_ISOLATION_POLICIES)) .orElseThrow(() -> new RestException(Status.NOT_FOUND, "NamespaceIsolationPolicies for cluster " + cluster + " does not exist")); // construct the response to NamespaceisolationData map @@ -374,6 +378,105 @@ public NamespaceIsolationData getNamespaceIsolationPolicy(@PathParam("cluster") } } + @GET + @Path("/{cluster}/namespaceIsolationPolicies/brokers") + @ApiOperation(value = "Get list of brokers with namespace-isolation policies attached to them", response = BrokerNamespaceIsolationData.class) + @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Namespace-isolation policies not found"), + @ApiResponse(code = 412, message = "Cluster doesn't exist") }) + public List getBrokersWithNamespaceIsolationPolicy( + @PathParam("cluster") String cluster) { + validateSuperUserAccess(); + validateClusterExists(cluster); + + Set availableBrokers; + final String nsIsolationPoliciesPath = AdminResource.path("clusters", cluster, NAMESPACE_ISOLATION_POLICIES); + Map nsPolicies; + try { + availableBrokers = pulsar().getLoadManager().get().getAvailableBrokers(); + } catch (Exception e) { + log.error("[{}] Failed to get list of brokers in cluster {}", clientAppId(), cluster, e); + throw new RestException(e); + } + try { + Optional nsPoliciesResult = namespaceIsolationPoliciesCache() + .get(nsIsolationPoliciesPath); + if (!nsPoliciesResult.isPresent()) { + throw new RestException(Status.NOT_FOUND, "namespace-isolation policies not found for " + cluster); + } + nsPolicies = nsPoliciesResult.get().getPolicies(); + } catch (Exception e) { + log.error("[{}] Failed to get namespace isolation-policies {}", clientAppId(), cluster, e); + throw new RestException(e); + } + return availableBrokers.stream().map(broker -> { + BrokerNamespaceIsolationData brokerIsolationData = new BrokerNamespaceIsolationData(); + brokerIsolationData.brokerName = broker; + if (nsPolicies != null) { + nsPolicies.forEach((name, policyData) -> { + NamespaceIsolationPolicyImpl nsPolicyImpl = new NamespaceIsolationPolicyImpl(policyData); + if (nsPolicyImpl.isPrimaryBroker(broker) || nsPolicyImpl.isSecondaryBroker(broker)) { + if (brokerIsolationData.namespaceRegex == null) { + brokerIsolationData.namespaceRegex = Lists.newArrayList(); + } + brokerIsolationData.namespaceRegex.addAll(policyData.namespaces); + } + }); + } + return brokerIsolationData; + }).collect(Collectors.toList()); + } + + @GET + @Path("/{cluster}/namespaceIsolationPolicies/brokers/{broker}") + @ApiOperation(value = "Get a broker with namespace-isolation policies attached to it", response = BrokerNamespaceIsolationData.class) + @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Namespace-isolation policies/ Broker not found"), + @ApiResponse(code = 412, message = "Cluster doesn't exist") }) + public BrokerNamespaceIsolationData getBrokerWithNamespaceIsolationPolicy(@PathParam("cluster") String cluster, + @PathParam("broker") String broker) { + validateSuperUserAccess(); + validateClusterExists(cluster); + + Set availableBrokers; + final String nsIsolationPoliciesPath = AdminResource.path("clusters", cluster, NAMESPACE_ISOLATION_POLICIES); + Map nsPolicies; + try { + availableBrokers = pulsar().getLoadManager().get().getAvailableBrokers(); + } catch (Exception e) { + log.error("[{}] Failed to get list of brokers in cluster {}", clientAppId(), cluster, e); + throw new RestException(e); + } + if (availableBrokers == null || !availableBrokers.contains(broker)) { + throw new RestException(Status.NOT_FOUND, "Broker is not part of active broker list " + broker); + } + try { + Optional nsPoliciesResult = namespaceIsolationPoliciesCache() + .get(nsIsolationPoliciesPath); + if (!nsPoliciesResult.isPresent()) { + throw new RestException(Status.NOT_FOUND, "namespace-isolation policies not found for " + cluster); + } + nsPolicies = nsPoliciesResult.get().getPolicies(); + } catch (Exception e) { + log.error("[{}] Failed to get namespace isolation-policies {}", clientAppId(), cluster, e); + throw new RestException(e); + } + BrokerNamespaceIsolationData brokerIsolationData = new BrokerNamespaceIsolationData(); + brokerIsolationData.brokerName = broker; + if (nsPolicies != null) { + nsPolicies.forEach((name, policyData) -> { + NamespaceIsolationPolicyImpl nsPolicyImpl = new NamespaceIsolationPolicyImpl(policyData); + if (nsPolicyImpl.isPrimaryBroker(broker) || nsPolicyImpl.isSecondaryBroker(broker)) { + if (brokerIsolationData.namespaceRegex == null) { + brokerIsolationData.namespaceRegex = Lists.newArrayList(); + } + brokerIsolationData.namespaceRegex.addAll(policyData.namespaces); + } + }); + } + return brokerIsolationData; + } + @POST @Path("/{cluster}/namespaceIsolationPolicies/{policyName}") @ApiOperation(value = "Set namespace isolation policy") @@ -389,7 +492,7 @@ public void setNamespaceIsolationPolicy(@PathParam("cluster") String cluster, // validate the policy data before creating the node policyData.validate(); - String nsIsolationPolicyPath = path("clusters", cluster, "namespaceIsolationPolicies"); + String nsIsolationPolicyPath = path("clusters", cluster, NAMESPACE_ISOLATION_POLICIES); NamespaceIsolationPolicies nsIsolationPolicies = namespaceIsolationPoliciesCache() .get(nsIsolationPolicyPath).orElseGet(() -> { try { @@ -458,7 +561,7 @@ public void deleteNamespaceIsolationPolicy(@PathParam("cluster") String cluster, try { - String nsIsolationPolicyPath = path("clusters", cluster, "namespaceIsolationPolicies"); + String nsIsolationPolicyPath = path("clusters", cluster, NAMESPACE_ISOLATION_POLICIES); NamespaceIsolationPolicies nsIsolationPolicies = namespaceIsolationPoliciesCache() .get(nsIsolationPolicyPath).orElseGet(() -> { try { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadManager.java index 3a7409e96c728..6a9dc38f33256 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadManager.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadManager.java @@ -20,6 +20,7 @@ import java.util.List; import java.util.Optional; +import java.util.Set; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.PulsarService; @@ -107,6 +108,14 @@ public interface LoadManager { * @throws Exception */ public void disableBroker() throws Exception; + + /** + * Get list of available brokers in cluster + * + * @return + * @throws Exception + */ + Set getAvailableBrokers() throws Exception; public void stop() throws PulsarServerException; @@ -139,4 +148,5 @@ static LoadManager create(final PulsarService pulsar) { // If we failed to create a load manager, default to SimpleLoadManagerImpl. return new SimpleLoadManagerImpl(pulsar); } + } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/ModularLoadManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/ModularLoadManager.java index 9d5603d2e3a0d..369bf5a5b3982 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/ModularLoadManager.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/ModularLoadManager.java @@ -19,6 +19,7 @@ package org.apache.pulsar.broker.loadbalance; import java.util.Optional; +import java.util.Set; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.PulsarService; @@ -103,4 +104,11 @@ public interface ModularLoadManager { * @return */ Deserializer getLoadReportDeserializer(); + + /** + * Get available broker list in cluster + * + * @return + */ + Set getAvailableBrokers(); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java index 23f651e710400..714389fe0993d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java @@ -335,6 +335,7 @@ private void reapDeadBrokerPreallocations(Set aliveBrokers) { } } + @Override public Set getAvailableBrokers() { try { return availableActiveBrokers.get(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java index 54b9d5680f07a..82d99b62aefe7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java @@ -21,6 +21,7 @@ import java.util.Collections; import java.util.List; import java.util.Optional; +import java.util.Set; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.PulsarService; @@ -122,4 +123,9 @@ public Deserializer getLoadReportDeserializer() { public ModularLoadManager getLoadManager() { return loadManager; } + + @Override + public Set getAvailableBrokers() throws Exception { + return loadManager.getAvailableBrokers(); + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java index bbaf8bbb84d57..da4c0e3e6df37 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java @@ -351,6 +351,11 @@ public ZooKeeperChildrenCache getActiveBrokersCache() { return this.availableActiveBrokers; } + @Override + public Set getAvailableBrokers() throws Exception { + return this.availableActiveBrokers.get(); + } + public ZooKeeperDataCache getLoadReportCache() { return this.loadReportCacheZk; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleResourceAllocationPolicies.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleResourceAllocationPolicies.java index fa67c0d7b98ca..2c1aa99e7b6dd 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleResourceAllocationPolicies.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleResourceAllocationPolicies.java @@ -26,6 +26,7 @@ import org.apache.pulsar.broker.loadbalance.LoadReport; import org.apache.pulsar.broker.loadbalance.ResourceUnit; import org.apache.pulsar.broker.loadbalance.ServiceUnit; +import static org.apache.pulsar.broker.namespace.NamespaceService.NAMESPACE_ISOLATION_POLICIES; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.policies.NamespaceIsolationPolicy; import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicies; @@ -51,7 +52,7 @@ public boolean canAssign(ServiceUnit srvUnit, ResourceUnit rescrUnit, Map getIsolationPolicies(String clusterName) { try { return namespaceIsolationPolicies - .get(AdminResource.path("clusters", clusterName, "namespaceIsolationPolicies")); + .get(AdminResource.path("clusters", clusterName, NAMESPACE_ISOLATION_POLICIES)); } catch (Exception e) { LOG.warn("GetIsolationPolicies: Unable to get the namespaceIsolationPolicies [{}]", e); return Optional.empty(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index 2668f9540c194..a712aec397169 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -119,6 +119,8 @@ public enum AddressType { public static final Pattern SLA_NAMESPACE_PATTERN = Pattern.compile(SLA_NAMESPACE_PROPERTY + "/[^/]+/([^:]+:\\d+)"); public static final String HEARTBEAT_NAMESPACE_FMT = "pulsar/%s/%s:%s"; public static final String SLA_NAMESPACE_FMT = SLA_NAMESPACE_PROPERTY + "/%s/%s:%s"; + + public static final String NAMESPACE_ISOLATION_POLICIES = "namespaceIsolationPolicies"; /** * Default constructor. @@ -519,7 +521,7 @@ private NamespaceOwnershipStatus getNamespaceOwnershipStatus(OwnedBundle nsObj, private NamespaceIsolationPolicies getLocalNamespaceIsolationPolicies() throws Exception { String localCluster = pulsar.getConfiguration().getClusterName(); return pulsar.getConfigurationCache().namespaceIsolationPoliciesCache() - .get(AdminResource.path("clusters", localCluster, "namespaceIsolationPolicies")).orElseGet(() -> { + .get(AdminResource.path("clusters", localCluster, NAMESPACE_ISOLATION_POLICIES)).orElseGet(() -> { // the namespace isolation policies are empty/undefined = an empty object return new NamespaceIsolationPolicies(); }); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java index c8c3b75804ef8..7e4262fed7398 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java @@ -27,6 +27,8 @@ import static org.testng.Assert.fail; import java.net.URL; +import java.util.ArrayList; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -54,9 +56,13 @@ import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.policies.data.AutoFailoverPolicyData; +import org.apache.pulsar.common.policies.data.AutoFailoverPolicyType; +import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationData; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.ConsumerStats; import org.apache.pulsar.common.policies.data.FailureDomain; +import org.apache.pulsar.common.policies.data.NamespaceIsolationData; import org.apache.pulsar.common.policies.data.NonPersistentTopicStats; import org.apache.pulsar.common.policies.data.PartitionedTopicStats; import org.apache.pulsar.common.policies.data.PersistencePolicies; @@ -65,6 +71,7 @@ import org.apache.pulsar.common.policies.data.PropertyAdmin; import org.apache.pulsar.common.policies.data.RetentionPolicies; import org.apache.pulsar.common.policies.data.SubscriptionStats; +import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.DataProvider; @@ -805,4 +812,47 @@ public void testTenantNameWithInvalidCharacters() throws Exception { // Expected } } + + @Test + public void brokerNamespaceIsolationPolicies() throws Exception { + + // create + String policyName1 = "policy-1"; + String namespaceRegex = "other/use/other.*"; + String cluster = "use"; + String brokerName = pulsar.getAdvertisedAddress(); + String brokerAddress = brokerName + ":" + pulsar.getConfiguration().getWebServicePort(); + NamespaceIsolationData nsPolicyData1 = new NamespaceIsolationData(); + nsPolicyData1.namespaces = new ArrayList(); + nsPolicyData1.namespaces.add(namespaceRegex); + nsPolicyData1.primary = new ArrayList(); + nsPolicyData1.primary.add(brokerName + ":[0-9]*"); + nsPolicyData1.secondary = new ArrayList(); + nsPolicyData1.secondary.add(brokerName + ".*"); + nsPolicyData1.auto_failover_policy = new AutoFailoverPolicyData(); + nsPolicyData1.auto_failover_policy.policy_type = AutoFailoverPolicyType.min_available; + nsPolicyData1.auto_failover_policy.parameters = new HashMap(); + nsPolicyData1.auto_failover_policy.parameters.put("min_limit", "1"); + nsPolicyData1.auto_failover_policy.parameters.put("usage_threshold", "100"); + admin.clusters().createNamespaceIsolationPolicy(cluster, policyName1, nsPolicyData1); + + List brokerIsolationDataList = admin.clusters() + .getBrokersWithNamespaceIsolationPolicy(cluster); + assertEquals(brokerIsolationDataList.size(), 1); + assertEquals(brokerIsolationDataList.get(0).brokerName, brokerAddress); + assertEquals(brokerIsolationDataList.get(0).namespaceRegex.size(), 1); + assertEquals(brokerIsolationDataList.get(0).namespaceRegex.get(0), namespaceRegex); + + BrokerNamespaceIsolationData brokerIsolationData = admin.clusters() + .getBrokerWithNamespaceIsolationPolicy(cluster, brokerAddress); + assertEquals(brokerIsolationData.brokerName, brokerAddress); + assertEquals(brokerIsolationData.namespaceRegex.size(), 1); + assertEquals(brokerIsolationData.namespaceRegex.get(0), namespaceRegex); + + try { + admin.clusters().getBrokerWithNamespaceIsolationPolicy(cluster, "invalid-broker"); + Assert.fail("should have failed due to invalid broker address"); + } catch (PulsarAdminException.NotFoundException e) {// expected + } + } } diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Clusters.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Clusters.java index 4048367de1a32..54c9c86af9f27 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Clusters.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Clusters.java @@ -27,6 +27,7 @@ import org.apache.pulsar.client.admin.PulsarAdminException.NotAuthorizedException; import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException; import org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException; +import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationData; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.FailureDomain; import org.apache.pulsar.common.policies.data.NamespaceIsolationData; @@ -227,6 +228,28 @@ public interface Clusters { void createNamespaceIsolationPolicy(String cluster, String policyName, NamespaceIsolationData namespaceIsolationData) throws PulsarAdminException; + + /** + * Returns list of active brokers with namespace-isolation policies attached to it. + * + * @param cluster + * @return + * @throws PulsarAdminException + */ + List getBrokersWithNamespaceIsolationPolicy(String cluster) + throws PulsarAdminException; + + /** + * Returns active broker with namespace-isolation policies attached to it. + * + * @param cluster + * @param broker + * @return + * @throws PulsarAdminException + */ + BrokerNamespaceIsolationData getBrokerWithNamespaceIsolationPolicy(String cluster, String broker) + throws PulsarAdminException; + /** * Update a namespace isolation policy for a cluster diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ClustersImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ClustersImpl.java index 562fbee7f61b1..32cb5c0981218 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ClustersImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ClustersImpl.java @@ -31,6 +31,7 @@ import org.apache.pulsar.client.admin.Clusters; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationData; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.FailureDomain; import org.apache.pulsar.common.policies.data.ErrorData; @@ -124,6 +125,30 @@ public Map getNamespaceIsolationPolicies(String } } + + @Override + public List getBrokersWithNamespaceIsolationPolicy(String cluster) + throws PulsarAdminException { + try { + return request(adminClusters.path(cluster).path("namespaceIsolationPolicies").path("brokers")) + .get(new GenericType>() { + }); + } catch (Exception e) { + throw getApiException(e); + } + } + + @Override + public BrokerNamespaceIsolationData getBrokerWithNamespaceIsolationPolicy(String cluster, String broker) + throws PulsarAdminException { + try { + return request(adminClusters.path(cluster).path("namespaceIsolationPolicies").path("brokers").path(broker)) + .get(BrokerNamespaceIsolationData.class); + } catch (Exception e) { + throw getApiException(e); + } + } + @Override public void createNamespaceIsolationPolicy(String cluster, String policyName, NamespaceIsolationData namespaceIsolationData) throws PulsarAdminException { diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java index e70f20b8d1838..d081ccf2e4e93 100644 --- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java +++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java @@ -23,9 +23,6 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; - import java.util.EnumSet; import org.apache.pulsar.client.admin.BrokerStats; @@ -53,6 +50,9 @@ import org.mockito.Mockito; import org.testng.annotations.Test; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; + @Test public class PulsarAdminToolTest { @@ -426,6 +426,21 @@ void resourceQuotas() throws Exception { verify(mockResourceQuotas).resetNamespaceBundleResourceQuota("myprop/clust/ns1", "0x80000000_0xffffffff"); } + @Test + void namespaceIsolationPolicy() throws Exception { + PulsarAdmin admin = Mockito.mock(PulsarAdmin.class); + Clusters mockClusters = mock(Clusters.class); + when(admin.clusters()).thenReturn(mockClusters); + + CmdNamespaceIsolationPolicy nsIsolationPoliciesCmd = new CmdNamespaceIsolationPolicy(admin); + + nsIsolationPoliciesCmd.run(split("brokers use")); + verify(mockClusters).getBrokersWithNamespaceIsolationPolicy("use"); + + nsIsolationPoliciesCmd.run(split("broker use --broker my-broker")); + verify(mockClusters).getBrokerWithNamespaceIsolationPolicy("use", "my-broker"); + } + @Test void persistentTopics() throws Exception { PulsarAdmin admin = Mockito.mock(PulsarAdmin.class); diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaceIsolationPolicy.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaceIsolationPolicy.java index f215462acfe16..ed3bda6985300 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaceIsolationPolicy.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaceIsolationPolicy.java @@ -27,6 +27,7 @@ import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.common.policies.data.AutoFailoverPolicyData; import org.apache.pulsar.common.policies.data.AutoFailoverPolicyType; +import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationData; import org.apache.pulsar.common.policies.data.NamespaceIsolationData; import com.beust.jcommander.Parameter; @@ -82,6 +83,39 @@ void run() throws PulsarAdminException { } } + @Parameters(commandDescription = "List all brokers with namespace-isolation policies attached to it. This operation requires Pulsar super-user privileges") + private class GetAllBrokersWithPolicies extends CliCommand { + @Parameter(description = "cluster-name\n", required = true) + private List params; + + void run() throws PulsarAdminException { + String clusterName = getOneArgument(params); + + List brokers = admin.clusters() + .getBrokersWithNamespaceIsolationPolicy(clusterName); + + print(brokers); + } + } + + @Parameters(commandDescription = "Get broker with namespace-isolation policies attached to it. This operation requires Pulsar super-user privileges") + private class GetBrokerWithPolicies extends CliCommand { + @Parameter(description = "cluster-name\n", required = true) + private List params; + + @Parameter(names = "--broker", description = "Broker-name to get namespace-isolation policies attached to it", required = true) + private String broker; + + void run() throws PulsarAdminException { + String clusterName = getOneArgument(params); + + BrokerNamespaceIsolationData brokerData = admin.clusters() + .getBrokerWithNamespaceIsolationPolicy(clusterName, broker); + + print(brokerData); + } + } + @Parameters(commandDescription = "Get namespace isolation policy of a cluster. This operation requires Pulsar super-user privileges") private class GetPolicy extends CliCommand { @Parameter(description = "cluster-name policy-name\n", required = true) @@ -195,6 +229,8 @@ public CmdNamespaceIsolationPolicy(PulsarAdmin admin) { jcommander.addCommand("get", new GetPolicy()); jcommander.addCommand("list", new GetAllPolicies()); jcommander.addCommand("delete", new DeletePolicy()); + jcommander.addCommand("brokers", new GetAllBrokersWithPolicies()); + jcommander.addCommand("broker", new GetBrokerWithPolicies()); } } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/BrokerNamespaceIsolationData.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/BrokerNamespaceIsolationData.java new file mode 100644 index 0000000000000..35fc73c1f2cad --- /dev/null +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/BrokerNamespaceIsolationData.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.policies.data; + +import java.util.List; + +import com.google.common.base.Objects; + +public class BrokerNamespaceIsolationData { + + public String brokerName; + public List namespaceRegex; //isolated namespace regex + + @Override + public boolean equals(Object obj) { + if (obj instanceof BrokerNamespaceIsolationData) { + BrokerNamespaceIsolationData other = (BrokerNamespaceIsolationData) obj; + return Objects.equal(brokerName, other.brokerName) && Objects.equal(namespaceRegex, other.namespaceRegex); + } + return false; + } + +}