From 3d16cf7357dd17cc0d12b3eff1d5d65e3aee3e7a Mon Sep 17 00:00:00 2001 From: Kai Wang Date: Mon, 1 Jul 2024 10:01:10 +0800 Subject: [PATCH] [feat][broker][branch-3.0] PIP-321 Introduce allowed-cluster at the namespace level (#22378) (#22960) (cherry-picked from commit https://github.com/apache/pulsar/commit/36bae695fb07f3ee790bee603149c4c2712187e0) Co-authored-by: Xiangying Meng <55571188+liangyepianzhou@users.noreply.github.com> (cherry picked from commit 7b2e72464b83507c0cf17ff0b364a4883d682d1f) --- .../pulsar/broker/admin/AdminResource.java | 44 +++--- .../broker/admin/impl/NamespacesBase.java | 79 ++++++++++- .../pulsar/broker/admin/v2/Namespaces.java | 46 +++++++ .../service/persistent/PersistentTopic.java | 94 ++++++++----- .../pulsar/broker/web/PulsarWebResource.java | 15 ++- .../namespace/NamespaceServiceTest.java | 127 ++++++++++++++++++ .../pulsar/broker/service/ReplicatorTest.java | 56 ++++++++ .../pulsar/client/admin/Namespaces.java | 84 ++++++++++++ .../pulsar/common/policies/data/Policies.java | 5 +- .../client/admin/internal/NamespacesImpl.java | 22 +++ .../pulsar/admin/cli/PulsarAdminToolTest.java | 8 ++ .../pulsar/admin/cli/CmdNamespaces.java | 32 +++++ .../common/policies/data/PolicyName.java | 3 +- 13 files changed, 546 insertions(+), 69 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java index 36e2ff0a34890..bd084f84e2aa0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java @@ -321,32 +321,28 @@ protected Policies getNamespacePolicies(NamespaceName namespaceName) { } protected CompletableFuture getNamespacePoliciesAsync(NamespaceName namespaceName) { - return namespaceResources().getPoliciesAsync(namespaceName).thenCompose(policies -> { - if (policies.isPresent()) { - return pulsar() - .getNamespaceService() - .getNamespaceBundleFactory() - .getBundlesAsync(namespaceName) - .thenCompose(bundles -> { - BundlesData bundleData = null; - try { - bundleData = bundles.getBundlesData(); - } catch (Exception e) { - log.error("[{}] Failed to get namespace policies {}", clientAppId(), namespaceName, e); - return FutureUtil.failedFuture(new RestException(e)); - } - policies.get().bundles = bundleData != null ? bundleData : policies.get().bundles; - if (policies.get().is_allow_auto_update_schema == null) { - // the type changed from boolean to Boolean. return broker value here for keeping compatibility. - policies.get().is_allow_auto_update_schema = pulsar().getConfig() - .isAllowAutoUpdateSchemaEnabled(); + CompletableFuture result = new CompletableFuture<>(); + namespaceResources().getPoliciesAsync(namespaceName) + .thenCombine(getLocalPolicies().getLocalPoliciesAsync(namespaceName), (pl, localPolicies) -> { + if (pl.isPresent()) { + Policies policies = pl.get(); + localPolicies.ifPresent(value -> policies.bundles = value.bundles); + if (policies.is_allow_auto_update_schema == null) { + // the type changed from boolean to Boolean. return + // broker value here for keeping compatibility. + policies.is_allow_auto_update_schema = pulsar().getConfig() + .isAllowAutoUpdateSchemaEnabled(); + } + result.complete(policies); + } else { + result.completeExceptionally(new RestException(Status.NOT_FOUND, "Namespace does not exist")); } - return CompletableFuture.completedFuture(policies.get()); + return null; + }).exceptionally(ex -> { + result.completeExceptionally(ex.getCause()); + return null; }); - } else { - return FutureUtil.failedFuture(new RestException(Status.NOT_FOUND, "Namespace does not exist")); - } - }); + return result; } protected BacklogQuota namespaceBacklogQuota(NamespaceName namespace, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index 1d750bee1bc95..f9170e65ca7e7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -702,9 +702,21 @@ protected CompletableFuture internalSetNamespaceReplicationClusters(List - validateClusterForTenantAsync( - namespaceName.getTenant(), clusterId)); + .thenCompose(__ -> getNamespacePoliciesAsync(this.namespaceName) + .thenCompose(nsPolicies -> { + if (nsPolicies.allowed_clusters.isEmpty()) { + return validateClusterForTenantAsync( + namespaceName.getTenant(), clusterId); + } + if (!nsPolicies.allowed_clusters.contains(clusterId)) { + String msg = String.format("Cluster [%s] is not in the " + + "list of allowed clusters list for namespace " + + "[%s]", clusterId, namespaceName.toString()); + log.info(msg); + throw new RestException(Status.FORBIDDEN, msg); + } + return CompletableFuture.completedFuture(null); + })); }).collect(Collectors.toList()); return FutureUtil.waitForAll(futures).thenApply(__ -> replicationClusterSet); })) @@ -2695,4 +2707,65 @@ protected void internalRemoveBacklogQuota(AsyncResponse asyncResponse, BacklogQu return null; }); } + + protected CompletableFuture internalSetNamespaceAllowedClusters(List clusterIds) { + return validateNamespacePolicyOperationAsync(namespaceName, PolicyName.ALLOW_CLUSTERS, PolicyOperation.WRITE) + .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync()) + // Allowed clusters in the namespace policy should be included in the allowed clusters in the tenant + // policy. + .thenCompose(__ -> FutureUtil.waitForAll(clusterIds.stream().map(clusterId -> + validateClusterForTenantAsync(namespaceName.getTenant(), clusterId)) + .collect(Collectors.toList()))) + // Allowed clusters should include all the existed replication clusters and could not contain global + // cluster. + .thenCompose(__ -> { + checkNotNull(clusterIds, "ClusterIds should not be null"); + if (clusterIds.contains("global")) { + throw new RestException(Status.PRECONDITION_FAILED, + "Cannot specify global in the list of allowed clusters"); + } + return getNamespacePoliciesAsync(this.namespaceName).thenApply(namespacePolicies -> { + namespacePolicies.replication_clusters.forEach(replicationCluster -> { + if (!clusterIds.contains(replicationCluster)) { + throw new RestException(Status.BAD_REQUEST, + String.format("Allowed clusters do not contain the replication cluster %s. " + + "Please remove the replication cluster if the cluster is not allowed " + + "for this namespace", replicationCluster)); + } + }); + return Sets.newHashSet(clusterIds); + }); + }) + // Verify the allowed clusters are valid and they do not contain the peer clusters. + .thenCompose(allowedClusters -> clustersAsync() + .thenCompose(clusters -> { + List> futures = + allowedClusters.stream().map(clusterId -> { + if (!clusters.contains(clusterId)) { + throw new RestException(Status.FORBIDDEN, + "Invalid cluster id: " + clusterId); + } + return validatePeerClusterConflictAsync(clusterId, allowedClusters); + }).collect(Collectors.toList()); + return FutureUtil.waitForAll(futures).thenApply(__ -> allowedClusters); + })) + // Update allowed clusters into policies. + .thenCompose(allowedClusterSet -> updatePoliciesAsync(namespaceName, policies -> { + policies.allowed_clusters = allowedClusterSet; + return policies; + })); + } + + protected CompletableFuture> internalGetNamespaceAllowedClustersAsync() { + return validateNamespacePolicyOperationAsync(namespaceName, PolicyName.ALLOW_CLUSTERS, PolicyOperation.READ) + .thenAccept(__ -> { + if (!namespaceName.isGlobal()) { + throw new RestException(Status.PRECONDITION_FAILED, + "Cannot get the allowed clusters for a non-global namespace"); + } + }).thenCompose(__ -> getNamespacePoliciesAsync(namespaceName)) + .thenApply(policies -> policies.allowed_clusters); + } + + } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java index 259195056e326..f1f4c62ed3439 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java @@ -2976,5 +2976,51 @@ public void removeNamespaceEntryFilters(@Suspended AsyncResponse asyncResponse, }); } + @POST + @Path("/{tenant}/{namespace}/allowedClusters") + @ApiOperation(value = "Set the allowed clusters for a namespace.") + @ApiResponses(value = { + @ApiResponse(code = 400, message = "The list of allowed clusters should include all replication clusters."), + @ApiResponse(code = 403, message = "The requester does not have admin permissions."), + @ApiResponse(code = 404, message = "The specified tenant, cluster, or namespace does not exist."), + @ApiResponse(code = 409, message = "A peer-cluster cannot be part of an allowed-cluster."), + @ApiResponse(code = 412, message = "The namespace is not global or the provided cluster IDs are invalid.")}) + public void setNamespaceAllowedClusters(@Suspended AsyncResponse asyncResponse, + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace, + @ApiParam(value = "List of allowed clusters", required = true) + List clusterIds) { + validateNamespaceName(tenant, namespace); + internalSetNamespaceAllowedClusters(clusterIds) + .thenAccept(asyncResponse::resume) + .exceptionally(e -> { + log.error("[{}] Failed to set namespace allowed clusters on namespace {}", + clientAppId(), namespace, e); + resumeAsyncResponseExceptionally(asyncResponse, e); + return null; + }); + } + + @GET + @Path("/{tenant}/{namespace}/allowedClusters") + @ApiOperation(value = "Get the allowed clusters for a namespace.", + response = String.class, responseContainer = "List") + @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist"), + @ApiResponse(code = 412, message = "Namespace is not global")}) + public void getNamespaceAllowedClusters(@Suspended AsyncResponse asyncResponse, + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace) { + validateNamespaceName(tenant, namespace); + internalGetNamespaceAllowedClustersAsync() + .thenAccept(asyncResponse::resume) + .exceptionally(e -> { + log.error("[{}] Failed to get namespace allowed clusters on namespace {}", clientAppId(), + namespace, e); + resumeAsyncResponseExceptionally(asyncResponse, e); + return null; + }); + } + private static final Logger log = LoggerFactory.getLogger(Namespaces.class); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index db27e24709883..e78211d5a1d2e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1758,52 +1758,78 @@ public CompletableFuture checkReplication() { if (log.isDebugEnabled()) { log.debug("[{}] Checking replication status", name); } - List configuredClusters = topicPolicies.getReplicationClusters().get(); if (CollectionUtils.isEmpty(configuredClusters)) { log.warn("[{}] No replication clusters configured", name); return CompletableFuture.completedFuture(null); } - int newMessageTTLInSeconds = topicPolicies.getMessageTTLInSeconds().get(); - String localCluster = brokerService.pulsar().getConfiguration().getClusterName(); - // if local cluster is removed from global namespace cluster-list : then delete topic forcefully - // because pulsar doesn't serve global topic without local repl-cluster configured. - if (TopicName.get(topic).isGlobal() && !configuredClusters.contains(localCluster)) { - log.info("Deleting topic [{}] because local cluster is not part of " - + " global namespace repl list {}", topic, configuredClusters); - return deleteForcefully(); - } - - removeTerminatedReplicators(replicators); - List> futures = new ArrayList<>(); - - // Check for missing replicators - for (String cluster : configuredClusters) { - if (cluster.equals(localCluster)) { - continue; - } - if (!replicators.containsKey(cluster)) { - futures.add(startReplicator(cluster)); - } - } - - // Check for replicators to be stopped - replicators.forEach((cluster, replicator) -> { - // Update message TTL - ((PersistentReplicator) replicator).updateMessageTTL(newMessageTTLInSeconds); - if (!cluster.equals(localCluster)) { - if (!configuredClusters.contains(cluster)) { - futures.add(removeReplicator(cluster)); + return checkAllowedCluster(localCluster).thenCompose(success -> { + if (!success) { + // if local cluster is removed from global namespace cluster-list : then delete topic forcefully + // because pulsar doesn't serve global topic without local repl-cluster configured. + return deleteForcefully(); + } + + int newMessageTTLInSeconds = topicPolicies.getMessageTTLInSeconds().get(); + + removeTerminatedReplicators(replicators); + List> futures = new ArrayList<>(); + + // The replication clusters at namespace level will get local cluster when creating a namespace. + // If there are only one cluster in the replication clusters, it means the replication is not enabled. + // If the cluster 1 and cluster 2 use the same configuration store and the namespace is created in cluster1 + // without enabling geo-replication, then the replication clusters always has cluster1. + // + // When a topic under the namespace is load in the cluster2, the `cluster1` may be identified as + // remote cluster and start geo-replication. This check is to avoid the above case. + if (!(configuredClusters.size() == 1 && replicators.isEmpty())) { + // Check for missing replicators + for (String cluster : configuredClusters) { + if (cluster.equals(localCluster)) { + continue; + } + if (!replicators.containsKey(cluster)) { + futures.add(startReplicator(cluster)); + } } + // Check for replicators to be stopped + replicators.forEach((cluster, replicator) -> { + // Update message TTL + ((PersistentReplicator) replicator).updateMessageTTL(newMessageTTLInSeconds); + if (!cluster.equals(localCluster)) { + if (!configuredClusters.contains(cluster)) { + futures.add(removeReplicator(cluster)); + } + } + }); } - }); - futures.add(checkShadowReplication()); + futures.add(checkShadowReplication()); - return FutureUtil.waitForAll(futures); + return FutureUtil.waitForAll(futures); + }); + } + + private CompletableFuture checkAllowedCluster(String localCluster) { + List replicationClusters = topicPolicies.getReplicationClusters().get(); + return brokerService.pulsar().getPulsarResources().getNamespaceResources() + .getPoliciesAsync(TopicName.get(topic).getNamespaceObject()).thenCompose(policiesOptional -> { + Set allowedClusters = Set.of(); + if (policiesOptional.isPresent()) { + allowedClusters = policiesOptional.get().allowed_clusters; + } + if (TopicName.get(topic).isGlobal() && !replicationClusters.contains(localCluster) + && !allowedClusters.contains(localCluster)) { + log.warn("Local cluster {} is not part of global namespace repl list {} and allowed list {}", + localCluster, replicationClusters, allowedClusters); + return CompletableFuture.completedFuture(false); + } else { + return CompletableFuture.completedFuture(true); + } + }); } private CompletableFuture checkShadowReplication() { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java index 2f437962002a3..dafad019613a6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java @@ -902,14 +902,16 @@ public static CompletableFuture checkLocalOrGetPeerReplicationC log.warn(msg); validationFuture.completeExceptionally(new RestException(Status.NOT_FOUND, "Namespace is deleted")); - } else if (policies.replication_clusters.isEmpty()) { + } else if (policies.replication_clusters.isEmpty() && policies.allowed_clusters.isEmpty()) { String msg = String.format( "Namespace does not have any clusters configured : local_cluster=%s ns=%s", localCluster, namespace.toString()); log.warn(msg); validationFuture.completeExceptionally(new RestException(Status.PRECONDITION_FAILED, msg)); - } else if (!policies.replication_clusters.contains(localCluster)) { - getOwnerFromPeerClusterListAsync(pulsarService, policies.replication_clusters) + } else if (!policies.replication_clusters.contains(localCluster) && !policies.allowed_clusters + .contains(localCluster)) { + getOwnerFromPeerClusterListAsync(pulsarService, policies.replication_clusters, + policies.allowed_clusters) .thenAccept(ownerPeerCluster -> { if (ownerPeerCluster != null) { // found a peer that own this namespace @@ -949,9 +951,9 @@ public static CompletableFuture checkLocalOrGetPeerReplicationC } private static CompletableFuture getOwnerFromPeerClusterListAsync(PulsarService pulsar, - Set replicationClusters) { + Set replicationClusters, Set allowedClusters) { String currentCluster = pulsar.getConfiguration().getClusterName(); - if (replicationClusters == null || replicationClusters.isEmpty() || isBlank(currentCluster)) { + if (replicationClusters.isEmpty() && allowedClusters.isEmpty() || isBlank(currentCluster)) { return CompletableFuture.completedFuture(null); } @@ -961,7 +963,8 @@ private static CompletableFuture getOwnerFromPeerClusterListAsy return CompletableFuture.completedFuture(null); } for (String peerCluster : cluster.get().getPeerClusterNames()) { - if (replicationClusters.contains(peerCluster)) { + if (replicationClusters.contains(peerCluster) + || allowedClusters.contains(peerCluster)) { return pulsar.getPulsarResources().getClusterResources().getClusterAsync(peerCluster) .thenApply(ret -> { if (!ret.isPresent()) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java index 38a60165d5606..e975fe3cfa926 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java @@ -32,6 +32,7 @@ import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; import com.github.benmanes.caffeine.cache.AsyncLoadingCache; +import com.google.common.collect.Sets; import com.google.common.hash.Hashing; import java.lang.reflect.Field; import java.lang.reflect.Method; @@ -40,6 +41,7 @@ import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Optional; @@ -64,6 +66,7 @@ import org.apache.pulsar.broker.service.BrokerTestBase; import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.client.admin.Namespaces; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.PulsarClient; @@ -76,8 +79,11 @@ import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.BundlesData; +import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.LocalPolicies; import org.apache.pulsar.common.policies.data.Policies; +import org.apache.pulsar.common.policies.data.TenantInfo; +import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.apache.pulsar.common.util.ObjectMapperFactory; import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.apache.pulsar.metadata.api.GetResult; @@ -828,6 +834,127 @@ public void testCheckTopicExists(String topicDomain) throws Exception { }); } + @Test + public void testAllowedClustersAtNamespaceLevelShouldBeIncludedInAllowedClustersAtTenantLevel() throws Exception { + // 1. Setup + pulsar.getConfiguration().setForceDeleteNamespaceAllowed(true); + pulsar.getConfiguration().setForceDeleteTenantAllowed(true); + Set tenantAllowedClusters = Set.of("test", "r1", "r2"); + Set allowedClusters1 = Set.of("test", "r1", "r2", "r3"); + Set allowedClusters2 = Set.of("test", "r1", "r2"); + Set clusters = Set.of("r1", "r2", "r3", "r4"); + final String tenant = "my-tenant"; + final String namespace = tenant + "/testAllowedCluster"; + admin.tenants().createTenant(tenant, + new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet("test"))); + admin.namespaces().createNamespace(namespace); + pulsar.getPulsarResources().getTenantResources().updateTenantAsync(tenant, tenantInfo -> + TenantInfo.builder().allowedClusters(tenantAllowedClusters).build()); + for (String cluster : clusters) { + pulsar.getPulsarResources().getClusterResources().createCluster(cluster, ClusterData.builder().build()); + } + // 2. Verify + admin.namespaces().setNamespaceAllowedClusters(namespace, allowedClusters2); + + try { + admin.namespaces().setNamespaceAllowedClusters(namespace, allowedClusters1); + fail(); + } catch (PulsarAdminException e) { + assertEquals(e.getStatusCode(), 403); + assertEquals(e.getMessage(), + "Cluster [r3] is not in the list of allowed clusters list for tenant [my-tenant]"); + } + // 3. Clean up + admin.namespaces().deleteNamespace(namespace, true); + admin.tenants().deleteTenant(tenant, true); + for (String cluster : clusters) { + pulsar.getPulsarResources().getClusterResources().deleteCluster(cluster); + } + pulsar.getConfiguration().setForceDeleteNamespaceAllowed(false); + pulsar.getConfiguration().setForceDeleteTenantAllowed(false); + } + + /** + * Test case: + * 1. Replication clusters should be included in the allowed clusters. For compatibility, the replication + * clusters could be set before the allowed clusters are set. + * 2. Peer cluster can not be a part of the allowed clusters. + */ + @Test + public void testNewAllowedClusterAdminAPIAndItsImpactOnReplicationClusterAPI() throws Exception { + // 1. Setup + pulsar.getConfiguration().setForceDeleteNamespaceAllowed(true); + pulsar.getConfiguration().setForceDeleteTenantAllowed(true); + // Setup: Prepare cluster resource, tenant and namespace + Set replicationClusters = Set.of("test", "r1", "r2"); + Set tenantAllowedClusters = Set.of("test", "r1", "r2", "r3"); + Set allowedClusters = Set.of("test", "r1", "r2", "r3"); + Set clusters = Set.of("r1", "r2", "r3", "r4"); + final String tenant = "my-tenant"; + final String namespace = tenant + "/testAllowedCluster"; + admin.tenants().createTenant(tenant, + new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet("test"))); + admin.namespaces().createNamespace(namespace); + pulsar.getPulsarResources().getTenantResources().updateTenantAsync(tenant, tenantInfo -> + TenantInfo.builder().allowedClusters(tenantAllowedClusters).build()); + + Namespaces namespaces = admin.namespaces(); + for (String cluster : clusters) { + pulsar.getPulsarResources().getClusterResources().createCluster(cluster, ClusterData.builder().build()); + } + // 2. Verify + // 2.1 Replication clusters should be included in the allowed clusters. + + // SUCCESS + // 2.1.1. Set replication clusters without allowed clusters at namespace level. + namespaces.setNamespaceReplicationClusters(namespace, replicationClusters); + // 2..1.2 Set allowed clusters. + namespaces.setNamespaceAllowedClusters(namespace, allowedClusters); + // 2.1.3. Get allowed clusters and replication clusters. + List allowedClustersResponse = namespaces.getNamespaceAllowedClusters(namespace); + + List replicationClustersResponse = namespaces.getNamespaceReplicationClusters(namespace); + + assertEquals(replicationClustersResponse.size(), replicationClusters.size()); + assertEquals(allowedClustersResponse.size(), allowedClusters.size()); + + // FAIL + // 2.1.4. Fail: Set allowed clusters whose scope is smaller than replication clusters. + Set allowedClustersSmallScope = Set.of("r1", "r3"); + try { + namespaces.setNamespaceAllowedClusters(namespace, allowedClustersSmallScope); + fail(); + } catch (PulsarAdminException ignore) {} + // 2.1.5. Fail: Set replication clusters whose scope is excel the allowed clusters. + Set replicationClustersExcel = Set.of("r1", "r4"); + try { + namespaces.setNamespaceReplicationClusters(namespace, replicationClustersExcel); + fail(); + //Todo: The status code in the old implementation is confused. + } catch (PulsarAdminException.NotAuthorizedException ignore) {} + + // 2.2 Peer cluster can not be a part of the allowed clusters. + LinkedHashSet peerCluster = new LinkedHashSet<>(); + peerCluster.add("r2"); + pulsar.getPulsarResources().getClusterResources().deleteCluster("r1"); + pulsar.getPulsarResources().getClusterResources().createCluster("r1", + ClusterData.builder().peerClusterNames(peerCluster).build()); + try { + namespaces.setNamespaceAllowedClusters(namespace, Set.of("test", "r1", "r2", "r3")); + fail(); + } catch (PulsarAdminException.ConflictException ignore) {} + + // CleanUp: Namespace with replication clusters can not be deleted by force. + namespaces.setNamespaceReplicationClusters(namespace, Set.of(conf.getClusterName())); + admin.namespaces().deleteNamespace(namespace, true); + admin.tenants().deleteTenant(tenant, true); + for (String cluster : clusters) { + pulsar.getPulsarResources().getClusterResources().deleteCluster(cluster); + } + pulsar.getConfiguration().setForceDeleteNamespaceAllowed(false); + pulsar.getConfiguration().setForceDeleteTenantAllowed(false); + } + /** * 1. Manually trigger "LoadReportUpdaterTask" * 2. Registry another new zk-node-listener "waitForBrokerChangeNotice". diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java index 3cc2ca2457a4b..75ff51055fc7e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java @@ -62,6 +62,7 @@ import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.commons.lang3.RandomUtils; import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; @@ -1782,6 +1783,61 @@ public void testReplicatorWithTTL() throws Exception { assertEquals(result, Lists.newArrayList("V1", "V2", "V3", "V4")); } + @Test + public void testEnableReplicationWithNamespaceAllowedClustersPolices() throws Exception { + log.info("--- testEnableReplicationWithNamespaceAllowedClustersPolices ---"); + String namespace1 = "pulsar/ns" + RandomUtils.nextLong(); + admin1.namespaces().createNamespace(namespace1); + admin2.namespaces().createNamespace(namespace1 + "init_cluster_node"); + admin1.namespaces().setNamespaceAllowedClusters(namespace1, Sets.newHashSet("r1", "r2")); + final TopicName topicName = TopicName.get( + BrokerTestUtil.newUniqueName("persistent://" + namespace1 + "/testReplicatorProducerNotExceed1")); + + @Cleanup PulsarClient client1 = PulsarClient + .builder() + .serviceUrl(pulsar1.getBrokerServiceUrl()) + .build(); + @Cleanup Producer producer = client1 + .newProducer() + .topic(topicName.toString()) + .create(); + producer.newMessage().send(); + // Enable replication at the topic level in the cluster1. + admin1.topics().setReplicationClusters(topicName.toString(), List.of("r1", "r2")); + + PersistentTopic persistentTopic1 = (PersistentTopic) pulsar1.getBrokerService().getTopic(topicName.toString(), + false) + .get() + .get(); + // Verify the replication from cluster1 to cluster2 is ready, but the replication form the cluster2 to cluster1 + // is not ready. + Awaitility.await().untilAsserted(() -> { + ConcurrentOpenHashMap replicatorMap = persistentTopic1.getReplicators(); + assertEquals(replicatorMap.size(), 1); + Replicator replicator = replicatorMap.get(replicatorMap.keys().get(0)); + assertTrue(replicator.isConnected()); + }); + + PersistentTopic persistentTopic2 = (PersistentTopic) pulsar2.getBrokerService().getTopic(topicName.toString(), + false) + .get() + .get(); + + Awaitility.await().untilAsserted(() -> { + ConcurrentOpenHashMap replicatorMap = persistentTopic2.getReplicators(); + assertEquals(replicatorMap.size(), 0); + }); + // Enable replication at the topic level in the cluster2. + admin2.topics().setReplicationClusters(topicName.toString(), List.of("r1", "r2")); + // Verify the replication between cluster1 and cluster2 is ready. + Awaitility.await().untilAsserted(() -> { + ConcurrentOpenHashMap replicatorMap = persistentTopic2.getReplicators(); + assertEquals(replicatorMap.size(), 1); + Replicator replicator = replicatorMap.get(replicatorMap.keys().get(0)); + assertTrue(replicator.isConnected()); + }); + } + private void pauseReplicator(PersistentReplicator replicator) { Awaitility.await().untilAsserted(() -> { assertTrue(replicator.isConnected()); diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java index 2690df658b7be..32c659dc01db5 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java @@ -4623,4 +4623,88 @@ void setIsAllowAutoUpdateSchema(String namespace, boolean isAllowAutoUpdateSchem * @return */ CompletableFuture removeNamespaceEntryFiltersAsync(String namespace); + + /** + * Get the allowed clusters for a namespace. + *

+ * Response example: + * + *

+     * ["use", "usw", "usc"]
+     * 
+ * + * @param namespace + * Namespace name + * @throws NotAuthorizedException + * Don't have admin permission + * @throws NotFoundException + * Namespace does not exist + * @throws PreconditionFailedException + * Namespace is not global + * @throws PulsarAdminException + * Unexpected error + */ + List getNamespaceAllowedClusters(String namespace) throws PulsarAdminException; + + /** + * Get the allowed clusters for a namespace asynchronously. + *

+ * Response example: + * + *

+     * ["use", "usw", "usc"]
+     * 
+ * + * @param namespace + * Namespace name + */ + CompletableFuture> getNamespaceAllowedClustersAsync(String namespace); + + /** + * Set the allowed clusters for a namespace. + *

+ * Request example: + * + *

+     * ["us-west", "us-east", "us-cent"]
+     * 
+ * + * @param namespace + * Namespace name + * @param clusterIds + * Pulsar Cluster Ids + * + * @throws ConflictException + * Peer-cluster cannot be part of an allowed-cluster + * @throws NotAuthorizedException + * Don't have admin permission + * @throws NotFoundException + * Namespace does not exist + * @throws PreconditionFailedException + * Namespace is not global + * @throws PreconditionFailedException + * Invalid cluster ids + * @throws PulsarAdminException + * The list of allowed clusters should include all replication clusters. + * @throws PulsarAdminException + * Unexpected error + */ + void setNamespaceAllowedClusters(String namespace, Set clusterIds) throws PulsarAdminException; + + /** + * Set the allowed clusters for a namespace asynchronously. + *

+ * Request example: + * + *

+     * ["us-west", "us-east", "us-cent"]
+     * 
+ * + * @param namespace + * Namespace name + * @param clusterIds + * Pulsar Cluster Ids + */ + CompletableFuture setNamespaceAllowedClustersAsync(String namespace, Set clusterIds); + } diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java index 066fdf1df4f09..4e0c68bed3a88 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java @@ -36,6 +36,8 @@ public class Policies { public final AuthPolicies auth_policies = AuthPolicies.builder().build(); @SuppressWarnings("checkstyle:MemberName") public Set replication_clusters = new HashSet<>(); + @SuppressWarnings("checkstyle:MemberName") + public Set allowed_clusters = new HashSet<>(); public BundlesData bundles; @SuppressWarnings("checkstyle:MemberName") public Map backlog_quota_map = new HashMap<>(); @@ -135,7 +137,7 @@ public enum BundleType { @Override public int hashCode() { - return Objects.hash(auth_policies, replication_clusters, + return Objects.hash(auth_policies, replication_clusters, allowed_clusters, backlog_quota_map, publishMaxMessageRate, clusterDispatchRate, topicDispatchRate, subscriptionDispatchRate, replicatorDispatchRate, clusterSubscribeRate, deduplicationEnabled, autoTopicCreationOverride, @@ -165,6 +167,7 @@ public boolean equals(Object obj) { Policies other = (Policies) obj; return Objects.equals(auth_policies, other.auth_policies) && Objects.equals(replication_clusters, other.replication_clusters) + && Objects.equals(allowed_clusters, other.allowed_clusters) && Objects.equals(backlog_quota_map, other.backlog_quota_map) && Objects.equals(clusterDispatchRate, other.clusterDispatchRate) && Objects.equals(topicDispatchRate, other.topicDispatchRate) diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java index 59f0ef3b34763..792fbdc91d1ff 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java @@ -1950,4 +1950,26 @@ public CompletableFuture removeNamespaceEntryFiltersAsync(String namespace WebTarget path = namespacePath(ns, "entryFilters"); return asyncDeleteRequest(path); } + + @Override + public List getNamespaceAllowedClusters(String namespace) throws PulsarAdminException { + return sync(() -> getNamespaceAllowedClustersAsync(namespace)); + } + + @Override + public CompletableFuture> getNamespaceAllowedClustersAsync(String namespace) { + return asyncGetNamespaceParts(new FutureCallback>(){}, namespace, "allowedClusters"); + } + + @Override + public void setNamespaceAllowedClusters(String namespace, Set clusterIds) throws PulsarAdminException { + sync(() -> setNamespaceAllowedClustersAsync(namespace, clusterIds)); + } + + @Override + public CompletableFuture setNamespaceAllowedClustersAsync(String namespace, Set clusterIds) { + NamespaceName ns = NamespaceName.get(namespace); + WebTarget path = namespacePath(ns, "allowedClusters"); + return asyncPostRequest(path, Entity.entity(clusterIds, MediaType.APPLICATION_JSON)); + } } diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java index 8dc6f752c09ad..64f0f8bebb2f3 100644 --- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java +++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java @@ -428,6 +428,14 @@ public void namespaces() throws Exception { namespaces.run(split("get-clusters myprop/clust/ns1")); verify(mockNamespaces).getNamespaceReplicationClusters("myprop/clust/ns1"); + namespaces.run(split("set-allowed-clusters myprop/clust/ns1 -c use,usw,usc")); + verify(mockNamespaces).setNamespaceAllowedClusters("myprop/clust/ns1", + Sets.newHashSet("use", "usw", "usc")); + + namespaces.run(split("get-allowed-clusters myprop/clust/ns1")); + verify(mockNamespaces).getNamespaceAllowedClusters("myprop/clust/ns1"); + + namespaces.run(split("set-subscription-types-enabled myprop/clust/ns1 -t Shared,Failover")); verify(mockNamespaces).setSubscriptionTypesEnabled("myprop/clust/ns1", Sets.newHashSet(SubscriptionType.Shared, SubscriptionType.Failover)); diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java index 5f2006edeafdf..ac0b424301bb8 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java @@ -2680,6 +2680,35 @@ void run() throws PulsarAdminException { } } + @Parameters(commandDescription = "Set allowed clusters for a namespace") + private class SetAllowedClusters extends CliCommand { + @Parameter(description = "tenant/namespace", required = true) + private java.util.List params; + + @Parameter(names = { "--clusters", + "-c" }, description = "Replication Cluster Ids list (comma separated values)", required = true) + private String clusterIds; + + @Override + void run() throws PulsarAdminException { + String namespace = validateNamespace(params); + List clusters = Lists.newArrayList(clusterIds.split(",")); + getAdmin().namespaces().setNamespaceAllowedClusters(namespace, Sets.newHashSet(clusters)); + } + } + + @Parameters(commandDescription = "Get allowed clusters for a namespace") + private class GetAllowedClusters extends CliCommand { + @Parameter(description = "tenant/namespace", required = true) + private java.util.List params; + + @Override + void run() throws PulsarAdminException { + String namespace = validateNamespace(params); + print(getAdmin().namespaces().getNamespaceAllowedClusters(namespace)); + } + } + public CmdNamespaces(Supplier admin) { super("namespaces", admin); jcommander.addCommand("list", new GetNamespacesPerProperty()); @@ -2707,6 +2736,9 @@ public CmdNamespaces(Supplier admin) { jcommander.addCommand("get-subscription-types-enabled", new GetSubscriptionTypesEnabled()); jcommander.addCommand("remove-subscription-types-enabled", new RemoveSubscriptionTypesEnabled()); + jcommander.addCommand("set-allowed-clusters", new SetAllowedClusters()); + jcommander.addCommand("get-allowed-clusters", new GetAllowedClusters()); + jcommander.addCommand("get-backlog-quotas", new GetBacklogQuotaMap()); jcommander.addCommand("set-backlog-quota", new SetBacklogQuota()); jcommander.addCommand("remove-backlog-quota", new RemoveBacklogQuota()); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java index 456d4b9270cd6..a01e1e90027be 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java @@ -51,5 +51,6 @@ public enum PolicyName { MAX_TOPICS, RESOURCEGROUP, ENTRY_FILTERS, - SHADOW_TOPIC + SHADOW_TOPIC, + ALLOW_CLUSTERS }