Skip to content

Commit

Permalink
[feat][broker][branch-3.0] PIP-321 Introduce allowed-cluster at the n…
Browse files Browse the repository at this point in the history
…amespace level (apache#22378) (apache#22960)

(cherry-picked from commit apache@36bae69)
Co-authored-by: Xiangying Meng <55571188+liangyepianzhou@users.noreply.github.com>

(cherry picked from commit 7b2e724)
  • Loading branch information
Demogorgon314 authored and srinath-ctds committed Jul 1, 2024
1 parent 1a44c3d commit 3d16cf7
Show file tree
Hide file tree
Showing 13 changed files with 546 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -321,32 +321,28 @@ protected Policies getNamespacePolicies(NamespaceName namespaceName) {
}

protected CompletableFuture<Policies> getNamespacePoliciesAsync(NamespaceName namespaceName) {
return namespaceResources().getPoliciesAsync(namespaceName).thenCompose(policies -> {
if (policies.isPresent()) {
return pulsar()
.getNamespaceService()
.getNamespaceBundleFactory()
.getBundlesAsync(namespaceName)
.thenCompose(bundles -> {
BundlesData bundleData = null;
try {
bundleData = bundles.getBundlesData();
} catch (Exception e) {
log.error("[{}] Failed to get namespace policies {}", clientAppId(), namespaceName, e);
return FutureUtil.failedFuture(new RestException(e));
}
policies.get().bundles = bundleData != null ? bundleData : policies.get().bundles;
if (policies.get().is_allow_auto_update_schema == null) {
// the type changed from boolean to Boolean. return broker value here for keeping compatibility.
policies.get().is_allow_auto_update_schema = pulsar().getConfig()
.isAllowAutoUpdateSchemaEnabled();
CompletableFuture<Policies> result = new CompletableFuture<>();
namespaceResources().getPoliciesAsync(namespaceName)
.thenCombine(getLocalPolicies().getLocalPoliciesAsync(namespaceName), (pl, localPolicies) -> {
if (pl.isPresent()) {
Policies policies = pl.get();
localPolicies.ifPresent(value -> policies.bundles = value.bundles);
if (policies.is_allow_auto_update_schema == null) {
// the type changed from boolean to Boolean. return
// broker value here for keeping compatibility.
policies.is_allow_auto_update_schema = pulsar().getConfig()
.isAllowAutoUpdateSchemaEnabled();
}
result.complete(policies);
} else {
result.completeExceptionally(new RestException(Status.NOT_FOUND, "Namespace does not exist"));
}
return CompletableFuture.completedFuture(policies.get());
return null;
}).exceptionally(ex -> {
result.completeExceptionally(ex.getCause());
return null;
});
} else {
return FutureUtil.failedFuture(new RestException(Status.NOT_FOUND, "Namespace does not exist"));
}
});
return result;
}

protected BacklogQuota namespaceBacklogQuota(NamespaceName namespace,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -702,9 +702,21 @@ protected CompletableFuture<Void> internalSetNamespaceReplicationClusters(List<S
"Invalid cluster id: " + clusterId);
}
return validatePeerClusterConflictAsync(clusterId, replicationClusterSet)
.thenCompose(__ ->
validateClusterForTenantAsync(
namespaceName.getTenant(), clusterId));
.thenCompose(__ -> getNamespacePoliciesAsync(this.namespaceName)
.thenCompose(nsPolicies -> {
if (nsPolicies.allowed_clusters.isEmpty()) {
return validateClusterForTenantAsync(
namespaceName.getTenant(), clusterId);
}
if (!nsPolicies.allowed_clusters.contains(clusterId)) {
String msg = String.format("Cluster [%s] is not in the "
+ "list of allowed clusters list for namespace "
+ "[%s]", clusterId, namespaceName.toString());
log.info(msg);
throw new RestException(Status.FORBIDDEN, msg);
}
return CompletableFuture.completedFuture(null);
}));
}).collect(Collectors.toList());
return FutureUtil.waitForAll(futures).thenApply(__ -> replicationClusterSet);
}))
Expand Down Expand Up @@ -2695,4 +2707,65 @@ protected void internalRemoveBacklogQuota(AsyncResponse asyncResponse, BacklogQu
return null;
});
}

protected CompletableFuture<Void> internalSetNamespaceAllowedClusters(List<String> clusterIds) {
return validateNamespacePolicyOperationAsync(namespaceName, PolicyName.ALLOW_CLUSTERS, PolicyOperation.WRITE)
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
// Allowed clusters in the namespace policy should be included in the allowed clusters in the tenant
// policy.
.thenCompose(__ -> FutureUtil.waitForAll(clusterIds.stream().map(clusterId ->
validateClusterForTenantAsync(namespaceName.getTenant(), clusterId))
.collect(Collectors.toList())))
// Allowed clusters should include all the existed replication clusters and could not contain global
// cluster.
.thenCompose(__ -> {
checkNotNull(clusterIds, "ClusterIds should not be null");
if (clusterIds.contains("global")) {
throw new RestException(Status.PRECONDITION_FAILED,
"Cannot specify global in the list of allowed clusters");
}
return getNamespacePoliciesAsync(this.namespaceName).thenApply(namespacePolicies -> {
namespacePolicies.replication_clusters.forEach(replicationCluster -> {
if (!clusterIds.contains(replicationCluster)) {
throw new RestException(Status.BAD_REQUEST,
String.format("Allowed clusters do not contain the replication cluster %s. "
+ "Please remove the replication cluster if the cluster is not allowed "
+ "for this namespace", replicationCluster));
}
});
return Sets.newHashSet(clusterIds);
});
})
// Verify the allowed clusters are valid and they do not contain the peer clusters.
.thenCompose(allowedClusters -> clustersAsync()
.thenCompose(clusters -> {
List<CompletableFuture<Void>> futures =
allowedClusters.stream().map(clusterId -> {
if (!clusters.contains(clusterId)) {
throw new RestException(Status.FORBIDDEN,
"Invalid cluster id: " + clusterId);
}
return validatePeerClusterConflictAsync(clusterId, allowedClusters);
}).collect(Collectors.toList());
return FutureUtil.waitForAll(futures).thenApply(__ -> allowedClusters);
}))
// Update allowed clusters into policies.
.thenCompose(allowedClusterSet -> updatePoliciesAsync(namespaceName, policies -> {
policies.allowed_clusters = allowedClusterSet;
return policies;
}));
}

protected CompletableFuture<Set<String>> internalGetNamespaceAllowedClustersAsync() {
return validateNamespacePolicyOperationAsync(namespaceName, PolicyName.ALLOW_CLUSTERS, PolicyOperation.READ)
.thenAccept(__ -> {
if (!namespaceName.isGlobal()) {
throw new RestException(Status.PRECONDITION_FAILED,
"Cannot get the allowed clusters for a non-global namespace");
}
}).thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
.thenApply(policies -> policies.allowed_clusters);
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -2976,5 +2976,51 @@ public void removeNamespaceEntryFilters(@Suspended AsyncResponse asyncResponse,
});
}

@POST
@Path("/{tenant}/{namespace}/allowedClusters")
@ApiOperation(value = "Set the allowed clusters for a namespace.")
@ApiResponses(value = {
@ApiResponse(code = 400, message = "The list of allowed clusters should include all replication clusters."),
@ApiResponse(code = 403, message = "The requester does not have admin permissions."),
@ApiResponse(code = 404, message = "The specified tenant, cluster, or namespace does not exist."),
@ApiResponse(code = 409, message = "A peer-cluster cannot be part of an allowed-cluster."),
@ApiResponse(code = 412, message = "The namespace is not global or the provided cluster IDs are invalid.")})
public void setNamespaceAllowedClusters(@Suspended AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@ApiParam(value = "List of allowed clusters", required = true)
List<String> clusterIds) {
validateNamespaceName(tenant, namespace);
internalSetNamespaceAllowedClusters(clusterIds)
.thenAccept(asyncResponse::resume)
.exceptionally(e -> {
log.error("[{}] Failed to set namespace allowed clusters on namespace {}",
clientAppId(), namespace, e);
resumeAsyncResponseExceptionally(asyncResponse, e);
return null;
});
}

@GET
@Path("/{tenant}/{namespace}/allowedClusters")
@ApiOperation(value = "Get the allowed clusters for a namespace.",
response = String.class, responseContainer = "List")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist"),
@ApiResponse(code = 412, message = "Namespace is not global")})
public void getNamespaceAllowedClusters(@Suspended AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace) {
validateNamespaceName(tenant, namespace);
internalGetNamespaceAllowedClustersAsync()
.thenAccept(asyncResponse::resume)
.exceptionally(e -> {
log.error("[{}] Failed to get namespace allowed clusters on namespace {}", clientAppId(),
namespace, e);
resumeAsyncResponseExceptionally(asyncResponse, e);
return null;
});
}

private static final Logger log = LoggerFactory.getLogger(Namespaces.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -1758,52 +1758,78 @@ public CompletableFuture<Void> checkReplication() {
if (log.isDebugEnabled()) {
log.debug("[{}] Checking replication status", name);
}

List<String> configuredClusters = topicPolicies.getReplicationClusters().get();
if (CollectionUtils.isEmpty(configuredClusters)) {
log.warn("[{}] No replication clusters configured", name);
return CompletableFuture.completedFuture(null);
}

int newMessageTTLInSeconds = topicPolicies.getMessageTTLInSeconds().get();

String localCluster = brokerService.pulsar().getConfiguration().getClusterName();

// if local cluster is removed from global namespace cluster-list : then delete topic forcefully
// because pulsar doesn't serve global topic without local repl-cluster configured.
if (TopicName.get(topic).isGlobal() && !configuredClusters.contains(localCluster)) {
log.info("Deleting topic [{}] because local cluster is not part of "
+ " global namespace repl list {}", topic, configuredClusters);
return deleteForcefully();
}

removeTerminatedReplicators(replicators);
List<CompletableFuture<Void>> futures = new ArrayList<>();

// Check for missing replicators
for (String cluster : configuredClusters) {
if (cluster.equals(localCluster)) {
continue;
}
if (!replicators.containsKey(cluster)) {
futures.add(startReplicator(cluster));
}
}

// Check for replicators to be stopped
replicators.forEach((cluster, replicator) -> {
// Update message TTL
((PersistentReplicator) replicator).updateMessageTTL(newMessageTTLInSeconds);
if (!cluster.equals(localCluster)) {
if (!configuredClusters.contains(cluster)) {
futures.add(removeReplicator(cluster));
return checkAllowedCluster(localCluster).thenCompose(success -> {
if (!success) {
// if local cluster is removed from global namespace cluster-list : then delete topic forcefully
// because pulsar doesn't serve global topic without local repl-cluster configured.
return deleteForcefully();
}

int newMessageTTLInSeconds = topicPolicies.getMessageTTLInSeconds().get();

removeTerminatedReplicators(replicators);
List<CompletableFuture<Void>> futures = new ArrayList<>();

// The replication clusters at namespace level will get local cluster when creating a namespace.
// If there are only one cluster in the replication clusters, it means the replication is not enabled.
// If the cluster 1 and cluster 2 use the same configuration store and the namespace is created in cluster1
// without enabling geo-replication, then the replication clusters always has cluster1.
//
// When a topic under the namespace is load in the cluster2, the `cluster1` may be identified as
// remote cluster and start geo-replication. This check is to avoid the above case.
if (!(configuredClusters.size() == 1 && replicators.isEmpty())) {
// Check for missing replicators
for (String cluster : configuredClusters) {
if (cluster.equals(localCluster)) {
continue;
}
if (!replicators.containsKey(cluster)) {
futures.add(startReplicator(cluster));
}
}
// Check for replicators to be stopped
replicators.forEach((cluster, replicator) -> {
// Update message TTL
((PersistentReplicator) replicator).updateMessageTTL(newMessageTTLInSeconds);
if (!cluster.equals(localCluster)) {
if (!configuredClusters.contains(cluster)) {
futures.add(removeReplicator(cluster));
}
}
});
}
});

futures.add(checkShadowReplication());
futures.add(checkShadowReplication());

return FutureUtil.waitForAll(futures);
return FutureUtil.waitForAll(futures);
});
}

private CompletableFuture<Boolean> checkAllowedCluster(String localCluster) {
List<String> replicationClusters = topicPolicies.getReplicationClusters().get();
return brokerService.pulsar().getPulsarResources().getNamespaceResources()
.getPoliciesAsync(TopicName.get(topic).getNamespaceObject()).thenCompose(policiesOptional -> {
Set<String> allowedClusters = Set.of();
if (policiesOptional.isPresent()) {
allowedClusters = policiesOptional.get().allowed_clusters;
}
if (TopicName.get(topic).isGlobal() && !replicationClusters.contains(localCluster)
&& !allowedClusters.contains(localCluster)) {
log.warn("Local cluster {} is not part of global namespace repl list {} and allowed list {}",
localCluster, replicationClusters, allowedClusters);
return CompletableFuture.completedFuture(false);
} else {
return CompletableFuture.completedFuture(true);
}
});
}

private CompletableFuture<Void> checkShadowReplication() {
Expand Down
Loading

0 comments on commit 3d16cf7

Please sign in to comment.