Skip to content

Commit

Permalink
PIP-10: Removing cluster from topic name (#1150)
Browse files Browse the repository at this point in the history
* Make DestinationName and NamespaceName classes to handle both formats

* Refactor /namespaces and /resource-quotas handlers for new-style namespace names

* Refactor topics APIs to accept new name formats

* Separate the broker admin rest endpoints in different pacakges.

v1 - legacy endpoints with clusters name
v2 - endpoints without the cluster name

The new admin v2 endpoints are mounted ad /admin/v2.

* Remove refactor remnants.

* Fix list namespaces to handle v1 and v2 formats.

* Add default namespace policies on create if none are sent.

* Fix internals that assumed a cluster in the path.

* Fix merge compile issue.

* Fix compile issues from merge with master.
  • Loading branch information
cckellogg authored and merlimat committed Feb 13, 2018
1 parent a5a7c79 commit 35e84c3
Show file tree
Hide file tree
Showing 42 changed files with 4,572 additions and 2,650 deletions.
Expand Up @@ -275,7 +275,8 @@ public void start() throws PulsarServerException {

this.webService = new WebService(this);
this.webService.addRestResources("/", "org.apache.pulsar.broker.web", false);
this.webService.addRestResources("/admin", "org.apache.pulsar.broker.admin", true);
this.webService.addRestResources("/admin", "org.apache.pulsar.broker.admin.v1", true);
this.webService.addRestResources("/admin/v2", "org.apache.pulsar.broker.admin.v2", true);
this.webService.addRestResources("/lookup", "org.apache.pulsar.broker.lookup", true);

this.webService.addServlet("/metrics",
Expand Down Expand Up @@ -462,8 +463,7 @@ public void loadNamespaceDestinations(NamespaceBundle bundle) {
List<CompletableFuture<Topic>> persistentTopics = Lists.newArrayList();
long topicLoadStart = System.nanoTime();

for (String topic : getNamespaceService().getListOfDestinations(nsName.getProperty(), nsName.getCluster(),
nsName.getLocalName())) {
for (String topic : getNamespaceService().getListOfDestinations(nsName)) {
try {
DestinationName dn = DestinationName.get(topic);
if (bundle.includes(dn)) {
Expand Down
Expand Up @@ -52,6 +52,7 @@
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.PropertyAdmin;
import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicies;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.zookeeper.ZooKeeperCache;
import org.apache.pulsar.zookeeper.ZooKeeperCache.Deserializer;
Expand Down Expand Up @@ -189,11 +190,22 @@ public void validatePoliciesReadOnlyAccess() {
protected List<String> getListOfNamespaces(String property) throws Exception {
List<String> namespaces = Lists.newArrayList();

for (String cluster : globalZk().getChildren(path(POLICIES, property), false)) {
// this will return a cluster in v1 and a namespace in v2
for (String clusterOrNamespace : globalZk().getChildren(path(POLICIES, property), false)) {
// Then get the list of namespaces
try {
for (String namespace : globalZk().getChildren(path(POLICIES, property, cluster), false)) {
namespaces.add(String.format("%s/%s/%s", property, cluster, namespace));
final List<String> children = globalZk().getChildren(path(POLICIES, property, clusterOrNamespace), false);
if (children == null || children.isEmpty()) {
String namespace = NamespaceName.get(property, clusterOrNamespace).toString();
// if the length is 0 then this is probably a leftover cluster from namespace created
// with the v1 admin format (prop/cluster/ns) and then deleted, so no need to add it to the list
if (globalZk().getData(path(POLICIES, namespace), false, null).length != 0) {
namespaces.add(namespace);
}
} else {
children.forEach(ns -> {
namespaces.add(NamespaceName.get(property, clusterOrNamespace, ns).toString());
});
}
} catch (KeeperException.NoNodeException e) {
// A cluster was deleted between the 2 getChildren() calls, ignoring
Expand All @@ -204,7 +216,56 @@ protected List<String> getListOfNamespaces(String property) throws Exception {
return namespaces;
}


protected NamespaceName namespaceName;

protected void validateNamespaceName(String property, String namespace) {
try {
this.namespaceName = NamespaceName.get(property, namespace);
} catch (IllegalArgumentException e) {
log.warn("[{}] Failed to create namespace with invalid name {}", clientAppId(), namespace, e);
throw new RestException(Status.PRECONDITION_FAILED, "Namespace name is not valid");
}
}

@Deprecated
protected void validateNamespaceName(String property, String cluster, String namespace) {
try {
this.namespaceName = NamespaceName.get(property, cluster, namespace);
} catch (IllegalArgumentException e) {
log.warn("[{}] Failed to create namespace with invalid name {}", clientAppId(), namespace, e);
throw new RestException(Status.PRECONDITION_FAILED, "Namespace name is not valid");
}
}

protected DestinationName destinationName;

protected void validateDestinationName(String property, String namespace, String encodedTopic) {
String topic = Codec.decode(encodedTopic);
try {
this.namespaceName = NamespaceName.get(property, namespace);
this.destinationName = DestinationName.get(domain(), namespaceName, topic);
} catch (IllegalArgumentException e) {
log.warn("[{}] Failed to validate topic name {}://{}/{}/{}", clientAppId(), domain(), property, namespace,
topic, e);
throw new RestException(Status.PRECONDITION_FAILED, "Topic name is not valid");
}

this.destinationName = DestinationName.get(domain(), namespaceName, topic);
}

@Deprecated
protected void validateDestinationName(String property, String cluster, String namespace, String encodedTopic) {
String topic = Codec.decode(encodedTopic);
try {
this.namespaceName = NamespaceName.get(property, cluster, namespace);
this.destinationName = DestinationName.get(domain(), namespaceName, topic);
} catch (IllegalArgumentException e) {
log.warn("[{}] Failed to validate topic name {}://{}/{}/{}/{}", clientAppId(), domain(), property, cluster,
namespace, topic, e);
throw new RestException(Status.PRECONDITION_FAILED, "Topic name is not valid");
}
}

/**
* Redirect the call to the specified broker
*
Expand All @@ -227,20 +288,20 @@ protected void validateBrokerName(String broker) throws MalformedURLException {
}
}

protected Policies getNamespacePolicies(String property, String cluster, String namespace) {
protected Policies getNamespacePolicies(NamespaceName namespaceName) {
try {
Policies policies = policiesCache().get(AdminResource.path(POLICIES, property, cluster, namespace))
Policies policies = policiesCache().get(AdminResource.path(POLICIES, namespaceName.toString()))
.orElseThrow(() -> new RestException(Status.NOT_FOUND, "Namespace does not exist"));
// fetch bundles from LocalZK-policies
NamespaceBundles bundles = pulsar().getNamespaceService().getNamespaceBundleFactory()
.getBundles(NamespaceName.get(property, cluster, namespace));
.getBundles(namespaceName);
BundlesData bundleData = NamespaceBundleFactory.getBundlesData(bundles);
policies.bundles = bundleData != null ? bundleData : policies.bundles;
return policies;
} catch (RestException re) {
throw re;
} catch (Exception e) {
log.error("[{}] Failed to get namespace policies {}/{}/{}", clientAppId(), property, cluster, namespace, e);
log.error("[{}] Failed to get namespace policies {}", clientAppId(), namespaceName, e);
throw new RestException(e);
}
}
Expand All @@ -249,35 +310,35 @@ public static ObjectMapper jsonMapper() {
return ObjectMapperFactory.getThreadLocal();
}

ZooKeeperDataCache<PropertyAdmin> propertiesCache() {
public ZooKeeperDataCache<PropertyAdmin> propertiesCache() {
return pulsar().getConfigurationCache().propertiesCache();
}

ZooKeeperDataCache<Policies> policiesCache() {
protected ZooKeeperDataCache<Policies> policiesCache() {
return pulsar().getConfigurationCache().policiesCache();
}

ZooKeeperDataCache<LocalPolicies> localPoliciesCache() {
protected ZooKeeperDataCache<LocalPolicies> localPoliciesCache() {
return pulsar().getLocalZkCacheService().policiesCache();
}

ZooKeeperDataCache<ClusterData> clustersCache() {
protected ZooKeeperDataCache<ClusterData> clustersCache() {
return pulsar().getConfigurationCache().clustersCache();
}

ZooKeeperChildrenCache managedLedgerListCache() {
protected ZooKeeperChildrenCache managedLedgerListCache() {
return pulsar().getLocalZkCacheService().managedLedgerListCache();
}

Set<String> clusters() {
protected Set<String> clusters() {
try {
return pulsar().getConfigurationCache().clustersListCache().get();
} catch (Exception e) {
throw new RestException(e);
}
}

ZooKeeperChildrenCache clustersListCache() {
protected ZooKeeperChildrenCache clustersListCache() {
return pulsar().getConfigurationCache().clustersListCache();
}

Expand All @@ -297,32 +358,30 @@ protected ZooKeeperChildrenCache failureDomainListCache() {
return pulsar().getConfigurationCache().failureDomainListCache();
}

protected PartitionedTopicMetadata getPartitionedTopicMetadata(String property, String cluster, String namespace,
String destination, boolean authoritative) {
DestinationName dn = DestinationName.get(domain(), property, cluster, namespace, destination);
validateClusterOwnership(dn.getCluster());
protected PartitionedTopicMetadata getPartitionedTopicMetadata(DestinationName destinationName,
boolean authoritative) {
validateClusterOwnership(destinationName.getCluster());
// validates global-namespace contains local/peer cluster: if peer/local cluster present then lookup can
// serve/redirect request else fail partitioned-metadata-request so, client fails while creating
// producer/consumer
validateGlobalNamespaceOwnership(dn.getNamespaceObject());
validateGlobalNamespaceOwnership(destinationName.getNamespaceObject());

try {
checkConnect(dn);
checkConnect(destinationName);
} catch (WebApplicationException e) {
validateAdminAccessOnProperty(dn.getProperty());
validateAdminAccessOnProperty(destinationName.getProperty());
} catch (Exception e) {
// unknown error marked as internal server error
log.warn("Unexpected error while authorizing lookup. destination={}, role={}. Error: {}", destination,
log.warn("Unexpected error while authorizing lookup. destination={}, role={}. Error: {}", destinationName,
clientAppId(), e.getMessage(), e);
throw new RestException(e);
}

String path = path(PARTITIONED_TOPIC_PATH_ZNODE, property, cluster, namespace, domain(),
dn.getEncodedLocalName());
String path = path(PARTITIONED_TOPIC_PATH_ZNODE, namespaceName.toString(), domain(), destinationName.getEncodedLocalName());
PartitionedTopicMetadata partitionMetadata = fetchPartitionedTopicMetadata(pulsar(), path);

if (log.isDebugEnabled()) {
log.debug("[{}] Total number of partitions for topic {} is {}", clientAppId(), dn,
log.debug("[{}] Total number of partitions for topic {} is {}", clientAppId(), destinationName,
partitionMetadata.partitions);
}
return partitionMetadata;
Expand All @@ -339,8 +398,8 @@ protected static PartitionedTopicMetadata fetchPartitionedTopicMetadata(PulsarSe
}
}

protected static CompletableFuture<PartitionedTopicMetadata> fetchPartitionedTopicMetadataAsync(PulsarService pulsar,
String path) {
protected static CompletableFuture<PartitionedTopicMetadata> fetchPartitionedTopicMetadataAsync(
PulsarService pulsar, String path) {
CompletableFuture<PartitionedTopicMetadata> metadataFuture = new CompletableFuture<>();
try {
// gets the number of partitions from the zk cache
Expand Down Expand Up @@ -375,4 +434,22 @@ protected void validateClusterExists(String cluster) {
throw new RestException(e);
}
}

protected Policies getNamespacePolicies(String property, String cluster, String namespace) {
try {
Policies policies = policiesCache().get(AdminResource.path(POLICIES, property, cluster, namespace))
.orElseThrow(() -> new RestException(Status.NOT_FOUND, "Namespace does not exist"));
// fetch bundles from LocalZK-policies
NamespaceBundles bundles = pulsar().getNamespaceService().getNamespaceBundleFactory()
.getBundles(NamespaceName.get(property, cluster, namespace));
BundlesData bundleData = NamespaceBundleFactory.getBundlesData(bundles);
policies.bundles = bundleData != null ? bundleData : policies.bundles;
return policies;
} catch (RestException re) {
throw re;
} catch (Exception e) {
log.error("[{}] Failed to get namespace policies {}/{}/{}", clientAppId(), property, cluster, namespace, e);
throw new RestException(e);
}
}
}

0 comments on commit 35e84c3

Please sign in to comment.