Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -174,20 +174,6 @@ protected void validateAdminAccessForTenant(String property) {
super.validateAdminAccessForTenant(property);
}

// This is a stub method for Mockito
@Override
protected void validateNamespaceOwnershipWithBundles(String property, String cluster, String namespace,
boolean authoritative, boolean readOnly, BundlesData bundleData) {
super.validateNamespaceOwnershipWithBundles(property, cluster, namespace, authoritative, readOnly, bundleData);
}

// This is a stub method for Mockito
@Override
protected void validateBundleOwnership(String property, String cluster, String namespace, boolean authoritative,
boolean readOnly, NamespaceBundle bundle) {
super.validateBundleOwnership(property, cluster, namespace, authoritative, readOnly, bundle);
}

// This is a stub method for Mockito
@Override
protected boolean isLeaderBroker() {
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -215,12 +215,14 @@ public void deleteNamespace(@Suspended final AsyncResponse asyncResponse, @PathP
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist"),
@ApiResponse(code = 409, message = "Namespace bundle is not empty") })
public void deleteNamespaceBundle(@PathParam("property") String property,
public void deleteNamespaceBundle(
@Suspended AsyncResponse response,
@PathParam("property") String property,
@PathParam("cluster") String cluster, @PathParam("namespace") String namespace,
@PathParam("bundle") String bundleRange,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateNamespaceName(property, cluster, namespace);
internalDeleteNamespaceBundle(bundleRange, authoritative);
internalDeleteNamespaceBundleAsync(response, bundleRange, authoritative);
}

@GET
Expand Down Expand Up @@ -561,12 +563,14 @@ public void unloadNamespaceBundle(@Suspended final AsyncResponse asyncResponse,
@ApiResponses(value = {
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace"),
@ApiResponse(code = 403, message = "Don't have admin permission") })
public void splitNamespaceBundle(@PathParam("property") String property, @PathParam("cluster") String cluster,
public void splitNamespaceBundle(
@Suspended AsyncResponse response,
@PathParam("property") String property, @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace, @PathParam("bundle") String bundleRange,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@QueryParam("unload") @DefaultValue("false") boolean unload) {
validateNamespaceName(property, cluster, namespace);
internalSplitNamespaceBundle(bundleRange, authoritative, unload, NamespaceBundleSplitAlgorithm.rangeEquallyDivideName);
internalSplitNamespaceBundleAsync(response, bundleRange, authoritative, unload, NamespaceBundleSplitAlgorithm.rangeEquallyDivideName);
}

@POST
Expand Down Expand Up @@ -793,12 +797,14 @@ public void clearNamespaceBacklog(@Suspended final AsyncResponse asyncResponse,
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace does not exist") })
public void clearNamespaceBundleBacklog(@PathParam("property") String property,
public void clearNamespaceBundleBacklog(
@Suspended AsyncResponse response,
@PathParam("property") String property,
@PathParam("cluster") String cluster, @PathParam("namespace") String namespace,
@PathParam("bundle") String bundleRange,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateNamespaceName(property, cluster, namespace);
internalClearNamespaceBundleBacklog(bundleRange, authoritative);
internalClearNamespaceBundleBacklog(response, bundleRange, authoritative);
}

@POST
Expand Down Expand Up @@ -827,12 +833,14 @@ public void clearNamespaceBacklogForSubscription(@Suspended final AsyncResponse
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace does not exist") })
public void clearNamespaceBundleBacklogForSubscription(@PathParam("property") String property,
public void clearNamespaceBundleBacklogForSubscription(
@Suspended AsyncResponse response,
@PathParam("property") String property,
@PathParam("cluster") String cluster, @PathParam("namespace") String namespace,
@PathParam("subscription") String subscription, @PathParam("bundle") String bundleRange,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateNamespaceName(property, cluster, namespace);
internalClearNamespaceBundleBacklogForSubscription(subscription, bundleRange, authoritative);
internalClearNamespaceBundleBacklogForSubscription(response, subscription, bundleRange, authoritative);
}

@POST
Expand All @@ -859,12 +867,16 @@ public void unsubscribeNamespace(@Suspended final AsyncResponse asyncResponse,
@ApiOperation(hidden = true, value = "Unsubscribes the given subscription on all topics on a namespace bundle.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace does not exist") })
public void unsubscribeNamespaceBundle(@PathParam("property") String property, @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace, @PathParam("subscription") String subscription,
public void unsubscribeNamespaceBundle(
@Suspended AsyncResponse response,
@PathParam("property") String property,
@PathParam("cluster") String cluster,
@PathParam("namespace") String namespace,
@PathParam("subscription") String subscription,
@PathParam("bundle") String bundleRange,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateNamespaceName(property, cluster, namespace);
internalUnsubscribeNamespaceBundle(subscription, bundleRange, authoritative);
internalUnsubscribeNamespaceBundle(response, subscription, bundleRange, authoritative);
}

@POST
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -232,8 +233,12 @@ public void getList(@Suspended final AsyncResponse asyncResponse, @PathParam("pr
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace doesn't exist") })
public List<String> getListFromBundle(@PathParam("property") String property, @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace, @PathParam("bundle") String bundleRange) {
public void getListFromBundle(
@Suspended AsyncResponse response,
@PathParam("property") String property,
@PathParam("cluster") String cluster,
@PathParam("namespace") String namespace,
@PathParam("bundle") String bundleRange) {
log.info("[{}] list of topics on namespace bundle {}/{}/{}/{}", clientAppId(), property, cluster, namespace,
bundleRange);
validateAdminAccessForTenant(property);
Expand All @@ -246,25 +251,35 @@ public List<String> getListFromBundle(@PathParam("property") String property, @P
validateGlobalNamespaceOwnership(NamespaceName.get(property, cluster, namespace));
}
NamespaceName fqnn = NamespaceName.get(property, cluster, namespace);
if (!isBundleOwnedByAnyBroker(fqnn, policies.bundles, bundleRange)) {
log.info("[{}] Namespace bundle is not owned by any broker {}/{}/{}/{}", clientAppId(), property, cluster,
namespace, bundleRange);


isBundleOwnedByAnyBroker(fqnn, policies.bundles, bundleRange)
.thenAccept(isOwned -> {
if (!isOwned) {
log.info("[{}] Namespace bundle is not owned by any broker {}/{}", clientAppId(), namespaceName,
bundleRange);
response.resume(Collections.emptyList());
return;
}

validateNamespaceBundleOwnershipAsync(namespaceName, policies.bundles, bundleRange, true,
true)
.thenAccept(nsBundle -> {
List<String> topicList = Lists.newArrayList();
pulsar().getBrokerService().forEachTopic(topic -> {
TopicName topicName = TopicName.get(topic.getName());
if (nsBundle.includes(topicName)) {
topicList.add(topic.getName());
}
});
response.resume(topicList);
});
}).exceptionally(ex -> {
log.error("[{}] Failed to unload namespace bundle {}/{}", clientAppId(), namespaceName, bundleRange,
ex);
response.resume(ex);
return null;
}
NamespaceBundle nsBundle = validateNamespaceBundleOwnership(fqnn, policies.bundles, bundleRange, true, true);
try {
final List<String> topicList = Lists.newArrayList();
pulsar().getBrokerService().forEachTopic(topic -> {
TopicName topicName = TopicName.get(topic.getName());
if (nsBundle.includes(topicName)) {
topicList.add(topic.getName());
}
});
return topicList;
} catch (Exception e) {
log.error("[{}] Failed to unload namespace bundle {}/{}", clientAppId(), fqnn.toString(), bundleRange, e);
throw new RestException(e);
}
});
}

protected void validateAdminOperationOnTopic(TopicName topicName, boolean authoritative) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,17 @@
import org.apache.pulsar.broker.admin.impl.NamespacesBase;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.Mode;
import org.apache.pulsar.common.policies.data.AutoSubscriptionCreationOverride;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.AutoSubscriptionCreationOverride;
import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
import org.apache.pulsar.common.policies.data.BookieAffinityGroupData;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
import org.apache.pulsar.common.policies.data.NamespaceOperation;
import org.apache.pulsar.common.policies.data.OffloadPolicies;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
Expand All @@ -63,14 +65,10 @@
import org.apache.pulsar.common.policies.data.PolicyOperation;
import org.apache.pulsar.common.policies.data.PublishRate;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.policies.data.SchemaAutoUpdateCompatibilityStrategy;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.policies.data.SubscribeRate;
import org.apache.pulsar.common.policies.data.SubscriptionAuthMode;
import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;

import org.apache.pulsar.common.policies.data.TenantOperation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -167,11 +165,14 @@ public void deleteNamespace(@Suspended final AsyncResponse asyncResponse, @PathP
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist"),
@ApiResponse(code = 409, message = "Namespace bundle is not empty") })
public void deleteNamespaceBundle(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
public void deleteNamespaceBundle(
@Suspended AsyncResponse response,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("bundle") String bundleRange,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateNamespaceName(tenant, namespace);
internalDeleteNamespaceBundle(bundleRange, authoritative);
internalDeleteNamespaceBundleAsync(response, bundleRange, authoritative);
}

@GET
Expand Down Expand Up @@ -467,13 +468,16 @@ public void unloadNamespaceBundle(@Suspended final AsyncResponse asyncResponse,
@ApiResponses(value = {
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace"),
@ApiResponse(code = 403, message = "Don't have admin permission") })
public void splitNamespaceBundle(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
public void splitNamespaceBundle(
@Suspended AsyncResponse response,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("bundle") String bundleRange,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@QueryParam("unload") @DefaultValue("false") boolean unload,
@QueryParam("splitAlgorithmName") String splitAlgorithmName) {
validateNamespaceName(tenant, namespace);
internalSplitNamespaceBundle(bundleRange, authoritative, unload, splitAlgorithmName);
internalSplitNamespaceBundleAsync(response, bundleRange, authoritative, unload, splitAlgorithmName);
}

@POST
Expand Down Expand Up @@ -735,11 +739,13 @@ public void clearNamespaceBacklog(@Suspended final AsyncResponse asyncResponse,
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace does not exist") })
public void clearNamespaceBundleBacklog(@PathParam("tenant") String tenant,
public void clearNamespaceBundleBacklog(
@Suspended AsyncResponse response,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace, @PathParam("bundle") String bundleRange,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateNamespaceName(tenant, namespace);
internalClearNamespaceBundleBacklog(bundleRange, authoritative);
internalClearNamespaceBundleBacklog(response, bundleRange, authoritative);
}

@POST
Expand Down Expand Up @@ -768,12 +774,14 @@ public void clearNamespaceBacklogForSubscription(@Suspended final AsyncResponse
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace does not exist") })
public void clearNamespaceBundleBacklogForSubscription(@PathParam("tenant") String tenant,
public void clearNamespaceBundleBacklogForSubscription(
@Suspended AsyncResponse response,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace, @PathParam("subscription") String subscription,
@PathParam("bundle") String bundleRange,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateNamespaceName(tenant, namespace);
internalClearNamespaceBundleBacklogForSubscription(subscription, bundleRange, authoritative);
internalClearNamespaceBundleBacklogForSubscription(response, subscription, bundleRange, authoritative);
}

@POST
Expand All @@ -800,12 +808,15 @@ public void unsubscribeNamespace(@Suspended final AsyncResponse asyncResponse, @
@ApiOperation(value = "Unsubscribes the given subscription on all topics on a namespace bundle.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace does not exist") })
public void unsubscribeNamespaceBundle(@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace, @PathParam("subscription") String subscription,
public void unsubscribeNamespaceBundle(
@Suspended AsyncResponse response,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("subscription") String subscription,
@PathParam("bundle") String bundleRange,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateNamespaceName(tenant, namespace);
internalUnsubscribeNamespaceBundle(subscription, bundleRange, authoritative);
internalUnsubscribeNamespaceBundle(response, subscription, bundleRange, authoritative);
}

@POST
Expand Down
Loading