Skip to content

Commit

Permalink
Introduce admin api to get broker and namespace-isolation policy map (#…
Browse files Browse the repository at this point in the history
…1565)

* Introduce admin api to get broker and namespace-isolation policy map

* add missed commit
  • Loading branch information
rdhabalia authored and merlimat committed Apr 13, 2018
1 parent aba0d42 commit fc67fe2
Show file tree
Hide file tree
Showing 15 changed files with 335 additions and 11 deletions.
Expand Up @@ -95,7 +95,7 @@ public Map<String, NamespaceOwnershipStatus> getOwnedNamespaes(@PathParam("clust
throw new RestException(e);
}
}

@POST
@Path("/configuration/{configName}/{configValue}")
@ApiOperation(value = "Update dynamic serviceconfiguration into zk only. This operation requires Pulsar super-user privileges.")
Expand Down
Expand Up @@ -40,12 +40,15 @@
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.cache.ConfigurationCacheService;
import static org.apache.pulsar.broker.namespace.NamespaceService.NAMESPACE_ISOLATION_POLICIES;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.common.naming.NamedEntity;
import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationData;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.FailureDomain;
import org.apache.pulsar.common.policies.data.NamespaceIsolationData;
import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicies;
import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicyImpl;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
Expand All @@ -56,6 +59,7 @@

import com.fasterxml.jackson.core.JsonGenerationException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;

import io.swagger.annotations.ApiOperation;
Expand Down Expand Up @@ -262,7 +266,7 @@ public void deleteCluster(@PathParam("cluster") String cluster) {
}

// check the namespaceIsolationPolicies associated with the cluster
String path = path("clusters", cluster, "namespaceIsolationPolicies");
String path = path("clusters", cluster, NAMESPACE_ISOLATION_POLICIES);
Optional<NamespaceIsolationPolicies> nsIsolationPolicies = namespaceIsolationPoliciesCache().get(path);

// Need to delete the isolation policies if present
Expand Down Expand Up @@ -332,7 +336,7 @@ public Map<String, NamespaceIsolationData> getNamespaceIsolationPolicies(@PathPa

try {
NamespaceIsolationPolicies nsIsolationPolicies = namespaceIsolationPoliciesCache()
.get(path("clusters", cluster, "namespaceIsolationPolicies"))
.get(path("clusters", cluster, NAMESPACE_ISOLATION_POLICIES))
.orElseThrow(() -> new RestException(Status.NOT_FOUND,
"NamespaceIsolationPolicies for cluster " + cluster + " does not exist"));
// construct the response to NamespaceisolationData map
Expand All @@ -356,7 +360,7 @@ public NamespaceIsolationData getNamespaceIsolationPolicy(@PathParam("cluster")

try {
NamespaceIsolationPolicies nsIsolationPolicies = namespaceIsolationPoliciesCache()
.get(path("clusters", cluster, "namespaceIsolationPolicies"))
.get(path("clusters", cluster, NAMESPACE_ISOLATION_POLICIES))
.orElseThrow(() -> new RestException(Status.NOT_FOUND,
"NamespaceIsolationPolicies for cluster " + cluster + " does not exist"));
// construct the response to NamespaceisolationData map
Expand All @@ -374,6 +378,105 @@ public NamespaceIsolationData getNamespaceIsolationPolicy(@PathParam("cluster")
}
}

@GET
@Path("/{cluster}/namespaceIsolationPolicies/brokers")
@ApiOperation(value = "Get list of brokers with namespace-isolation policies attached to them", response = BrokerNamespaceIsolationData.class)
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace-isolation policies not found"),
@ApiResponse(code = 412, message = "Cluster doesn't exist") })
public List<BrokerNamespaceIsolationData> getBrokersWithNamespaceIsolationPolicy(
@PathParam("cluster") String cluster) {
validateSuperUserAccess();
validateClusterExists(cluster);

Set<String> availableBrokers;
final String nsIsolationPoliciesPath = AdminResource.path("clusters", cluster, NAMESPACE_ISOLATION_POLICIES);
Map<String, NamespaceIsolationData> nsPolicies;
try {
availableBrokers = pulsar().getLoadManager().get().getAvailableBrokers();
} catch (Exception e) {
log.error("[{}] Failed to get list of brokers in cluster {}", clientAppId(), cluster, e);
throw new RestException(e);
}
try {
Optional<NamespaceIsolationPolicies> nsPoliciesResult = namespaceIsolationPoliciesCache()
.get(nsIsolationPoliciesPath);
if (!nsPoliciesResult.isPresent()) {
throw new RestException(Status.NOT_FOUND, "namespace-isolation policies not found for " + cluster);
}
nsPolicies = nsPoliciesResult.get().getPolicies();
} catch (Exception e) {
log.error("[{}] Failed to get namespace isolation-policies {}", clientAppId(), cluster, e);
throw new RestException(e);
}
return availableBrokers.stream().map(broker -> {
BrokerNamespaceIsolationData brokerIsolationData = new BrokerNamespaceIsolationData();
brokerIsolationData.brokerName = broker;
if (nsPolicies != null) {
nsPolicies.forEach((name, policyData) -> {
NamespaceIsolationPolicyImpl nsPolicyImpl = new NamespaceIsolationPolicyImpl(policyData);
if (nsPolicyImpl.isPrimaryBroker(broker) || nsPolicyImpl.isSecondaryBroker(broker)) {
if (brokerIsolationData.namespaceRegex == null) {
brokerIsolationData.namespaceRegex = Lists.newArrayList();
}
brokerIsolationData.namespaceRegex.addAll(policyData.namespaces);
}
});
}
return brokerIsolationData;
}).collect(Collectors.toList());
}

@GET
@Path("/{cluster}/namespaceIsolationPolicies/brokers/{broker}")
@ApiOperation(value = "Get a broker with namespace-isolation policies attached to it", response = BrokerNamespaceIsolationData.class)
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace-isolation policies/ Broker not found"),
@ApiResponse(code = 412, message = "Cluster doesn't exist") })
public BrokerNamespaceIsolationData getBrokerWithNamespaceIsolationPolicy(@PathParam("cluster") String cluster,
@PathParam("broker") String broker) {
validateSuperUserAccess();
validateClusterExists(cluster);

Set<String> availableBrokers;
final String nsIsolationPoliciesPath = AdminResource.path("clusters", cluster, NAMESPACE_ISOLATION_POLICIES);
Map<String, NamespaceIsolationData> nsPolicies;
try {
availableBrokers = pulsar().getLoadManager().get().getAvailableBrokers();
} catch (Exception e) {
log.error("[{}] Failed to get list of brokers in cluster {}", clientAppId(), cluster, e);
throw new RestException(e);
}
if (availableBrokers == null || !availableBrokers.contains(broker)) {
throw new RestException(Status.NOT_FOUND, "Broker is not part of active broker list " + broker);
}
try {
Optional<NamespaceIsolationPolicies> nsPoliciesResult = namespaceIsolationPoliciesCache()
.get(nsIsolationPoliciesPath);
if (!nsPoliciesResult.isPresent()) {
throw new RestException(Status.NOT_FOUND, "namespace-isolation policies not found for " + cluster);
}
nsPolicies = nsPoliciesResult.get().getPolicies();
} catch (Exception e) {
log.error("[{}] Failed to get namespace isolation-policies {}", clientAppId(), cluster, e);
throw new RestException(e);
}
BrokerNamespaceIsolationData brokerIsolationData = new BrokerNamespaceIsolationData();
brokerIsolationData.brokerName = broker;
if (nsPolicies != null) {
nsPolicies.forEach((name, policyData) -> {
NamespaceIsolationPolicyImpl nsPolicyImpl = new NamespaceIsolationPolicyImpl(policyData);
if (nsPolicyImpl.isPrimaryBroker(broker) || nsPolicyImpl.isSecondaryBroker(broker)) {
if (brokerIsolationData.namespaceRegex == null) {
brokerIsolationData.namespaceRegex = Lists.newArrayList();
}
brokerIsolationData.namespaceRegex.addAll(policyData.namespaces);
}
});
}
return brokerIsolationData;
}

@POST
@Path("/{cluster}/namespaceIsolationPolicies/{policyName}")
@ApiOperation(value = "Set namespace isolation policy")
Expand All @@ -389,7 +492,7 @@ public void setNamespaceIsolationPolicy(@PathParam("cluster") String cluster,
// validate the policy data before creating the node
policyData.validate();

String nsIsolationPolicyPath = path("clusters", cluster, "namespaceIsolationPolicies");
String nsIsolationPolicyPath = path("clusters", cluster, NAMESPACE_ISOLATION_POLICIES);
NamespaceIsolationPolicies nsIsolationPolicies = namespaceIsolationPoliciesCache()
.get(nsIsolationPolicyPath).orElseGet(() -> {
try {
Expand Down Expand Up @@ -458,7 +561,7 @@ public void deleteNamespaceIsolationPolicy(@PathParam("cluster") String cluster,

try {

String nsIsolationPolicyPath = path("clusters", cluster, "namespaceIsolationPolicies");
String nsIsolationPolicyPath = path("clusters", cluster, NAMESPACE_ISOLATION_POLICIES);
NamespaceIsolationPolicies nsIsolationPolicies = namespaceIsolationPoliciesCache()
.get(nsIsolationPolicyPath).orElseGet(() -> {
try {
Expand Down
Expand Up @@ -20,6 +20,7 @@

import java.util.List;
import java.util.Optional;
import java.util.Set;

import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
Expand Down Expand Up @@ -107,6 +108,14 @@ public interface LoadManager {
* @throws Exception
*/
public void disableBroker() throws Exception;

/**
* Get list of available brokers in cluster
*
* @return
* @throws Exception
*/
Set<String> getAvailableBrokers() throws Exception;

public void stop() throws PulsarServerException;

Expand Down Expand Up @@ -139,4 +148,5 @@ static LoadManager create(final PulsarService pulsar) {
// If we failed to create a load manager, default to SimpleLoadManagerImpl.
return new SimpleLoadManagerImpl(pulsar);
}

}
Expand Up @@ -19,6 +19,7 @@
package org.apache.pulsar.broker.loadbalance;

import java.util.Optional;
import java.util.Set;

import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
Expand Down Expand Up @@ -103,4 +104,11 @@ public interface ModularLoadManager {
* @return
*/
Deserializer<? extends ServiceLookupData> getLoadReportDeserializer();

/**
* Get available broker list in cluster
*
* @return
*/
Set<String> getAvailableBrokers();
}
Expand Up @@ -335,6 +335,7 @@ private void reapDeadBrokerPreallocations(Set<String> aliveBrokers) {
}
}

@Override
public Set<String> getAvailableBrokers() {
try {
return availableActiveBrokers.get();
Expand Down
Expand Up @@ -21,6 +21,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Set;

import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
Expand Down Expand Up @@ -122,4 +123,9 @@ public Deserializer<? extends ServiceLookupData> getLoadReportDeserializer() {
public ModularLoadManager getLoadManager() {
return loadManager;
}

@Override
public Set<String> getAvailableBrokers() throws Exception {
return loadManager.getAvailableBrokers();
}
}
Expand Up @@ -351,6 +351,11 @@ public ZooKeeperChildrenCache getActiveBrokersCache() {
return this.availableActiveBrokers;
}

@Override
public Set<String> getAvailableBrokers() throws Exception {
return this.availableActiveBrokers.get();
}

public ZooKeeperDataCache<LoadReport> getLoadReportCache() {
return this.loadReportCacheZk;
}
Expand Down
Expand Up @@ -26,6 +26,7 @@
import org.apache.pulsar.broker.loadbalance.LoadReport;
import org.apache.pulsar.broker.loadbalance.ResourceUnit;
import org.apache.pulsar.broker.loadbalance.ServiceUnit;
import static org.apache.pulsar.broker.namespace.NamespaceService.NAMESPACE_ISOLATION_POLICIES;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.policies.NamespaceIsolationPolicy;
import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicies;
Expand All @@ -51,7 +52,7 @@ public boolean canAssign(ServiceUnit srvUnit, ResourceUnit rescrUnit, Map<Resour
private Optional<NamespaceIsolationPolicies> getIsolationPolicies(String clusterName) {
try {
return namespaceIsolationPolicies
.get(AdminResource.path("clusters", clusterName, "namespaceIsolationPolicies"));
.get(AdminResource.path("clusters", clusterName, NAMESPACE_ISOLATION_POLICIES));
} catch (Exception e) {
LOG.warn("GetIsolationPolicies: Unable to get the namespaceIsolationPolicies [{}]", e);
return Optional.empty();
Expand Down
Expand Up @@ -119,6 +119,8 @@ public enum AddressType {
public static final Pattern SLA_NAMESPACE_PATTERN = Pattern.compile(SLA_NAMESPACE_PROPERTY + "/[^/]+/([^:]+:\\d+)");
public static final String HEARTBEAT_NAMESPACE_FMT = "pulsar/%s/%s:%s";
public static final String SLA_NAMESPACE_FMT = SLA_NAMESPACE_PROPERTY + "/%s/%s:%s";

public static final String NAMESPACE_ISOLATION_POLICIES = "namespaceIsolationPolicies";

/**
* Default constructor.
Expand Down Expand Up @@ -519,7 +521,7 @@ private NamespaceOwnershipStatus getNamespaceOwnershipStatus(OwnedBundle nsObj,
private NamespaceIsolationPolicies getLocalNamespaceIsolationPolicies() throws Exception {
String localCluster = pulsar.getConfiguration().getClusterName();
return pulsar.getConfigurationCache().namespaceIsolationPoliciesCache()
.get(AdminResource.path("clusters", localCluster, "namespaceIsolationPolicies")).orElseGet(() -> {
.get(AdminResource.path("clusters", localCluster, NAMESPACE_ISOLATION_POLICIES)).orElseGet(() -> {
// the namespace isolation policies are empty/undefined = an empty object
return new NamespaceIsolationPolicies();
});
Expand Down
Expand Up @@ -27,6 +27,8 @@
import static org.testng.Assert.fail;

import java.net.URL;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -54,9 +56,13 @@
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.AutoFailoverPolicyData;
import org.apache.pulsar.common.policies.data.AutoFailoverPolicyType;
import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationData;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.ConsumerStats;
import org.apache.pulsar.common.policies.data.FailureDomain;
import org.apache.pulsar.common.policies.data.NamespaceIsolationData;
import org.apache.pulsar.common.policies.data.NonPersistentTopicStats;
import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
Expand All @@ -65,6 +71,7 @@
import org.apache.pulsar.common.policies.data.PropertyAdmin;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
Expand Down Expand Up @@ -805,4 +812,47 @@ public void testTenantNameWithInvalidCharacters() throws Exception {
// Expected
}
}

@Test
public void brokerNamespaceIsolationPolicies() throws Exception {

// create
String policyName1 = "policy-1";
String namespaceRegex = "other/use/other.*";
String cluster = "use";
String brokerName = pulsar.getAdvertisedAddress();
String brokerAddress = brokerName + ":" + pulsar.getConfiguration().getWebServicePort();
NamespaceIsolationData nsPolicyData1 = new NamespaceIsolationData();
nsPolicyData1.namespaces = new ArrayList<String>();
nsPolicyData1.namespaces.add(namespaceRegex);
nsPolicyData1.primary = new ArrayList<String>();
nsPolicyData1.primary.add(brokerName + ":[0-9]*");
nsPolicyData1.secondary = new ArrayList<String>();
nsPolicyData1.secondary.add(brokerName + ".*");
nsPolicyData1.auto_failover_policy = new AutoFailoverPolicyData();
nsPolicyData1.auto_failover_policy.policy_type = AutoFailoverPolicyType.min_available;
nsPolicyData1.auto_failover_policy.parameters = new HashMap<String, String>();
nsPolicyData1.auto_failover_policy.parameters.put("min_limit", "1");
nsPolicyData1.auto_failover_policy.parameters.put("usage_threshold", "100");
admin.clusters().createNamespaceIsolationPolicy(cluster, policyName1, nsPolicyData1);

List<BrokerNamespaceIsolationData> brokerIsolationDataList = admin.clusters()
.getBrokersWithNamespaceIsolationPolicy(cluster);
assertEquals(brokerIsolationDataList.size(), 1);
assertEquals(brokerIsolationDataList.get(0).brokerName, brokerAddress);
assertEquals(brokerIsolationDataList.get(0).namespaceRegex.size(), 1);
assertEquals(brokerIsolationDataList.get(0).namespaceRegex.get(0), namespaceRegex);

BrokerNamespaceIsolationData brokerIsolationData = admin.clusters()
.getBrokerWithNamespaceIsolationPolicy(cluster, brokerAddress);
assertEquals(brokerIsolationData.brokerName, brokerAddress);
assertEquals(brokerIsolationData.namespaceRegex.size(), 1);
assertEquals(brokerIsolationData.namespaceRegex.get(0), namespaceRegex);

try {
admin.clusters().getBrokerWithNamespaceIsolationPolicy(cluster, "invalid-broker");
Assert.fail("should have failed due to invalid broker address");
} catch (PulsarAdminException.NotFoundException e) {// expected
}
}
}

0 comments on commit fc67fe2

Please sign in to comment.