Skip to content

Commit

Permalink
Succeed when unloading namespace bundles not owned by any broker (#709)
Browse files Browse the repository at this point in the history
  • Loading branch information
merlimat committed Aug 22, 2017
1 parent 45ea0bc commit db85e87
Show file tree
Hide file tree
Showing 6 changed files with 209 additions and 116 deletions.
Expand Up @@ -769,9 +769,15 @@ public void unloadNamespaceBundle(@PathParam("property") String property, @PathP

NamespaceName fqnn = new NamespaceName(property, cluster, namespace);
validatePoliciesReadOnlyAccess();

if (!isBundleOwnedByAnyBroker(fqnn, policies.bundles, bundleRange)) {
log.info("[{}] Namespace bundle is not owned by any broker {}/{}/{}/{}", clientAppId(), property, cluster,
namespace, bundleRange);
return;
}

NamespaceBundle nsBundle = validateNamespaceBundleOwnership(fqnn, policies.bundles, bundleRange, authoritative,
true);

try {
pulsar().getNamespaceService().unloadNamespaceBundle(nsBundle);
log.info("[{}] Successfully unloaded namespace bundle {}", clientAppId(), nsBundle.toString());
Expand Down Expand Up @@ -1346,13 +1352,13 @@ private void clearBacklog(NamespaceName nsName, String bundleRange, String subsc
}
for (Topic topic : topicList) {
if(topic instanceof PersistentTopic) {
futures.add(((PersistentTopic)topic).clearBacklog(subscription));
futures.add(((PersistentTopic)topic).clearBacklog(subscription));
}
}
} else {
for (Topic topic : topicList) {
if(topic instanceof PersistentTopic) {
futures.add(((PersistentTopic)topic).clearBacklog());
futures.add(((PersistentTopic)topic).clearBacklog());
}
}
}
Expand Down
Expand Up @@ -18,11 +18,13 @@
*/
package org.apache.pulsar.broker.lookup;

import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.pulsar.common.api.Commands.newLookupErrorResponse;
import static org.apache.pulsar.common.api.Commands.newLookupResponse;

import java.net.URI;
import java.net.URISyntaxException;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;

import javax.ws.rs.DefaultValue;
Expand All @@ -37,18 +39,15 @@
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;

import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.web.NoSwaggerDocumentation;
import org.apache.pulsar.broker.web.PulsarWebResource;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.api.proto.PulsarApi.ServerError;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopicResponse.LookupType;
import org.apache.pulsar.common.api.proto.PulsarApi.ServerError;
import org.apache.pulsar.common.lookup.data.LookupData;
import org.apache.pulsar.common.naming.DestinationName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.util.Codec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -100,16 +99,17 @@ public void lookupDestinationAsync(@PathParam("property") String property, @Path
return;
}

CompletableFuture<LookupResult> lookupFuture = pulsar().getNamespaceService().getBrokerServiceUrlAsync(topic,
CompletableFuture<Optional<LookupResult>> lookupFuture = pulsar().getNamespaceService().getBrokerServiceUrlAsync(topic,
authoritative);

lookupFuture.thenAccept(result -> {
if (result == null) {
lookupFuture.thenAccept(optionalResult -> {
if (optionalResult == null || !optionalResult.isPresent()) {
log.warn("No broker was found available for topic {}", topic);
completeLookupResponseExceptionally(asyncResponse, new WebApplicationException(Response.Status.SERVICE_UNAVAILABLE));
return;
}

LookupResult result = optionalResult.get();
// We have found either a broker that owns the topic, or a broker to which we should redirect the client to
if (result.isRedirect()) {
boolean newAuthoritative = this.isLeaderBroker();
Expand Down Expand Up @@ -223,8 +223,13 @@ public static CompletableFuture<ByteBuf> lookupDestinationAsync(PulsarService pu
log.debug("[{}] Lookup result {}", fqdn.toString(), lookupResult);
}

LookupData lookupData = lookupResult.getLookupData();
if (lookupResult.isRedirect()) {
if (!lookupResult.isPresent()) {
lookupfuture.complete(newLookupErrorResponse(ServerError.ServiceNotReady, "Namespace bundle is not owned by any broker", requestId));
return;
}

LookupData lookupData = lookupResult.get().getLookupData();
if (lookupResult.get().isRedirect()) {
boolean newAuthoritative = isLeaderBroker(pulsarService);
lookupfuture.complete(
newLookupResponse(lookupData.getBrokerUrl(), lookupData.getBrokerUrlTls(),
Expand Down
Expand Up @@ -129,7 +129,8 @@ public NamespaceService(PulsarService pulsar) {
this.ownershipCache = new OwnershipCache(pulsar, bundleFactory);
}

public CompletableFuture<LookupResult> getBrokerServiceUrlAsync(DestinationName topic, boolean authoritative) {
public CompletableFuture<Optional<LookupResult>> getBrokerServiceUrlAsync(DestinationName topic,
boolean authoritative) {
return getBundleAsync(topic)
.thenCompose(bundle -> findBrokerServiceUrl(bundle, authoritative, false /* read-only */));
}
Expand All @@ -151,7 +152,12 @@ private NamespaceBundle getFullBundle(NamespaceName fqnn) throws Exception {
return bundleFactory.getFullBundle(fqnn);
}

public URL getWebServiceUrl(ServiceUnitId suName, boolean authoritative, boolean isRequestHttps, boolean readOnly)
/**
* Return the URL of the broker who's owning a particular service unit.
*
* If the service unit is not owned, return an empty optional
*/
public Optional<URL> getWebServiceUrl(ServiceUnitId suName, boolean authoritative, boolean isRequestHttps, boolean readOnly)
throws Exception {
if (suName instanceof DestinationName) {
DestinationName name = (DestinationName) suName;
Expand All @@ -172,23 +178,21 @@ public URL getWebServiceUrl(ServiceUnitId suName, boolean authoritative, boolean
throw new IllegalArgumentException("Unrecognized class of NamespaceBundle: " + suName.getClass().getName());
}

private CompletableFuture<URL> internalGetWebServiceUrl(NamespaceBundle bundle, boolean authoritative,
private CompletableFuture<Optional<URL>> internalGetWebServiceUrl(NamespaceBundle bundle, boolean authoritative,
boolean isRequestHttps, boolean readOnly) {

return findBrokerServiceUrl(bundle, authoritative, readOnly).thenApply(lookupResult -> {
if (lookupResult != null) {
if (lookupResult.isPresent()) {
try {
LookupData lookupData = lookupResult.getLookupData();
LookupData lookupData = lookupResult.get().getLookupData();
final String redirectUrl = isRequestHttps ? lookupData.getHttpUrlTls() : lookupData.getHttpUrl();
return new URL(redirectUrl);

return Optional.of(new URL(redirectUrl));
} catch (Exception e) {
// just log the exception, nothing else to do
LOG.warn("internalGetWebServiceUrl [{}]", e.getMessage(), e);
}

}
return null;
return Optional.empty();
});
}

Expand Down Expand Up @@ -273,13 +277,13 @@ private boolean registerNamespace(String namespace, boolean ensureOwned) throws
* @return
* @throws PulsarServerException
*/
private CompletableFuture<LookupResult> findBrokerServiceUrl(NamespaceBundle bundle, boolean authoritative,
private CompletableFuture<Optional<LookupResult>> findBrokerServiceUrl(NamespaceBundle bundle, boolean authoritative,
boolean readOnly) {
if (LOG.isDebugEnabled()) {
LOG.debug("findBrokerServiceUrl: {} - read-only: {}", bundle, readOnly);
}

CompletableFuture<LookupResult> future = new CompletableFuture<>();
CompletableFuture<Optional<LookupResult>> future = new CompletableFuture<>();

// First check if we or someone else already owns the bundle
ownershipCache.getOwnerAsync(bundle).thenAccept(nsData -> {
Expand All @@ -288,8 +292,7 @@ private CompletableFuture<LookupResult> findBrokerServiceUrl(NamespaceBundle bun

if (readOnly) {
// Do not attempt to acquire ownership
future.completeExceptionally(
new IllegalStateException(String.format("Can't find owner of ServiceUnit: %s", bundle)));
future.complete(Optional.empty());
} else {
// Now, no one owns the namespace yet. Hence, we will try to dynamically assign it
pulsar.getExecutor().execute(() -> {
Expand All @@ -303,7 +306,7 @@ private CompletableFuture<LookupResult> findBrokerServiceUrl(NamespaceBundle bun
if (LOG.isDebugEnabled()) {
LOG.debug("Namespace bundle {} already owned by {} ", bundle, nsData);
}
future.complete(new LookupResult(nsData.get()));
future.complete(Optional.of(new LookupResult(nsData.get())));
}
}).exceptionally(exception -> {
LOG.warn("Failed to check owner for bundle {}: {}", bundle, exception.getMessage(), exception);
Expand All @@ -314,8 +317,8 @@ private CompletableFuture<LookupResult> findBrokerServiceUrl(NamespaceBundle bun
return future;
}

private void searchForCandidateBroker(NamespaceBundle bundle, CompletableFuture<LookupResult> lookupFuture,
boolean authoritative) {
private void searchForCandidateBroker(NamespaceBundle bundle,
CompletableFuture<Optional<LookupResult>> lookupFuture, boolean authoritative) {
String candidateBroker = null;
try {
// check if this is Heartbeat or SLAMonitor namespace
Expand Down Expand Up @@ -365,7 +368,7 @@ private void searchForCandidateBroker(NamespaceBundle bundle, CompletableFuture<
// Schedule the task to pre-load destinations
pulsar.loadNamespaceDestinations(bundle);

lookupFuture.complete(new LookupResult(ownerInfo));
lookupFuture.complete(Optional.of(new LookupResult(ownerInfo)));
}
}).exceptionally(exception -> {
LOG.warn("Failed to acquire ownership for namespace bundle {}: ", bundle, exception.getMessage(),
Expand All @@ -383,7 +386,8 @@ private void searchForCandidateBroker(NamespaceBundle bundle, CompletableFuture<
}

// Now setting the redirect url
createLookupResult(candidateBroker).thenAccept(lookupResult -> lookupFuture.complete(lookupResult))
createLookupResult(candidateBroker)
.thenAccept(lookupResult -> lookupFuture.complete(Optional.of(lookupResult)))
.exceptionally(ex -> {
lookupFuture.completeExceptionally(ex);
return null;
Expand Down
Expand Up @@ -26,6 +26,7 @@
import java.net.URI;
import java.net.URL;
import java.util.Iterator;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;

import javax.servlet.ServletContext;
Expand Down Expand Up @@ -120,7 +121,7 @@ public static String splitPath(String source, int slice) {
public String clientAppId() {
return (String) httpRequest.getAttribute(AuthenticationFilter.AuthenticatedRoleAttributeName);
}

public boolean isRequestHttps() {
return "https".equalsIgnoreCase(httpRequest.getScheme());
}
Expand All @@ -140,7 +141,7 @@ protected void validateSuperUserAccess() {
String appId = clientAppId();
if(log.isDebugEnabled()) {
log.debug("[{}] Check super user access: Authenticated: {} -- Role: {}", uri.getRequestUri(),
isClientAuthenticated(appId), appId);
isClientAuthenticated(appId), appId);
}
if (!config().getSuperUserRoles().contains(appId)) {
throw new RestException(Status.UNAUTHORIZED, "This operation requires super-user access");
Expand All @@ -166,7 +167,7 @@ protected void validateAdminAccessOnProperty(String property) {
throw new RestException(e);
}
}

protected static void validateAdminAccessOnProperty(PulsarService pulsar, String clientAppId, String property) throws RestException, Exception{
if (pulsar.getConfiguration().isAuthenticationEnabled() && pulsar.getConfiguration().isAuthorizationEnabled()) {
log.debug("check admin access on property: {} - Authenticated: {} -- role: {}", property,
Expand Down Expand Up @@ -258,7 +259,7 @@ protected void validateClusterOwnership(String cluster) throws WebApplicationExc
}

}

protected static CompletableFuture<ClusterData> getClusterDataIfDifferentCluster(PulsarService pulsar,
String cluster, String clientAppId) {

Expand Down Expand Up @@ -373,6 +374,21 @@ protected NamespaceBundle validateNamespaceBundleRange(NamespaceName fqnn, Bundl
}
}

/**
* Checks whether a given bundle is currently loaded by any broker
*/
protected boolean isBundleOwnedByAnyBroker(NamespaceName fqnn, BundlesData bundles,
String bundleRange) {
NamespaceBundle nsBundle = validateNamespaceBundleRange(fqnn, bundles, bundleRange);
NamespaceService nsService = pulsar().getNamespaceService();
try {
return nsService.getWebServiceUrl(nsBundle, /*authoritative */ false, isRequestHttps(), /* read-only */ true).isPresent();
} catch (Exception e) {
log.error("[{}] Failed to check whether namespace bundle is owned {}/{}", clientAppId(), fqnn.toString(), bundleRange, e);
throw new RestException(e);
}
}

protected NamespaceBundle validateNamespaceBundleOwnership(NamespaceName fqnn, BundlesData bundles,
String bundleRange, boolean authoritative, boolean readOnly) {
try {
Expand Down Expand Up @@ -402,9 +418,9 @@ public void validateBundleOwnership(NamespaceBundle bundle, boolean authoritativ
// - If authoritative is false and this broker is not leader, forward to leader
// - If authoritative is false and this broker is leader, determine owner and forward w/ authoritative=true
// - If authoritative is true, own the namespace and continue
URL webUrl = nsService.getWebServiceUrl(bundle, authoritative, isRequestHttps(), readOnly);
Optional<URL> webUrl = nsService.getWebServiceUrl(bundle, authoritative, isRequestHttps(), readOnly);
// Ensure we get a url
if (webUrl == null) {
if (webUrl == null || !webUrl.isPresent()) {
log.warn("Unable to get web service url");
throw new RestException(Status.PRECONDITION_FAILED,
"Failed to find ownership for ServiceUnit:" + bundle.toString());
Expand All @@ -413,8 +429,8 @@ public void validateBundleOwnership(NamespaceBundle bundle, boolean authoritativ
if (!nsService.isServiceUnitOwned(bundle)) {
boolean newAuthoritative = this.isLeaderBroker();
// Replace the host and port of the current request and redirect
URI redirect = UriBuilder.fromUri(uri.getRequestUri()).host(webUrl.getHost()).port(webUrl.getPort())
.replaceQueryParam("authoritative", newAuthoritative).build();
URI redirect = UriBuilder.fromUri(uri.getRequestUri()).host(webUrl.get().getHost())
.port(webUrl.get().getPort()).replaceQueryParam("authoritative", newAuthoritative).build();

log.debug("{} is not a service unit owned", bundle);

Expand Down Expand Up @@ -454,18 +470,18 @@ protected void validateDestinationOwnership(DestinationName fqdn, boolean author

try {
// per function name, this is trying to acquire the whole namespace ownership
URL webUrl = nsService.getWebServiceUrl(fqdn, authoritative, isRequestHttps(), false);
Optional<URL> webUrl = nsService.getWebServiceUrl(fqdn, authoritative, isRequestHttps(), false);
// Ensure we get a url
if (webUrl == null) {
if (webUrl == null || !webUrl.isPresent()) {
log.info("Unable to get web service url");
throw new RestException(Status.PRECONDITION_FAILED, "Failed to find ownership for destination:" + fqdn);
}

if (!nsService.isServiceUnitOwned(fqdn)) {
boolean newAuthoritative = this.isLeaderBroker(pulsar());
boolean newAuthoritative = isLeaderBroker(pulsar());
// Replace the host and port of the current request and redirect
URI redirect = UriBuilder.fromUri(uri.getRequestUri()).host(webUrl.getHost()).port(webUrl.getPort())
.replaceQueryParam("authoritative", newAuthoritative).build();
URI redirect = UriBuilder.fromUri(uri.getRequestUri()).host(webUrl.get().getHost())
.port(webUrl.get().getPort()).replaceQueryParam("authoritative", newAuthoritative).build();
// Redirect
log.debug("Redirecting the rest call to {}", redirect);
throw new WebApplicationException(Response.temporaryRedirect(redirect).build());
Expand Down Expand Up @@ -514,7 +530,7 @@ protected static void validateReplicationSettingsOnNamespace(PulsarService pulsa
"Failed to validate global cluster configuration : ns=%s emsg=%s", namespace, e.getMessage()));
}
}

protected static CompletableFuture<Void> validateReplicationSettingsOnNamespaceAsync(PulsarService pulsarService,
NamespaceName namespace) {

Expand Down Expand Up @@ -575,7 +591,7 @@ protected static CompletableFuture<Void> validateReplicationSettingsOnNamespaceA
protected void checkConnect(DestinationName destination) throws RestException, Exception {
checkAuthorization(pulsar(), destination, clientAppId());
}

protected static void checkAuthorization(PulsarService pulsarService, DestinationName destination, String role)
throws RestException, Exception {
if (!pulsarService.getConfiguration().isAuthorizationEnabled()) {
Expand All @@ -597,7 +613,7 @@ public void setPulsar(PulsarService pulsar) {
protected boolean isLeaderBroker() {
return isLeaderBroker(pulsar());
}

protected static boolean isLeaderBroker(PulsarService pulsar) {

String leaderAddress = pulsar.getLeaderElectionService().getCurrentLeader().getServiceUrl();
Expand Down

0 comments on commit db85e87

Please sign in to comment.