Skip to content

Commit

Permalink
[fix][broker] Skip topic.close during unloading if the topic future f…
Browse files Browse the repository at this point in the history
…ails with ownership check, and fix isBundleOwnedByAnyBroker to use ns.checkOwnershipPresentAsync for ExtensibleLoadBalancer
  • Loading branch information
heesung-sn committed Mar 28, 2024
1 parent 6b29382 commit 8b42318
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2244,9 +2244,18 @@ private CompletableFuture<Integer> unloadServiceUnit(NamespaceBundle serviceUnit
closeFutures.add(topicFuture
.thenCompose(t -> t.isPresent() ? t.get().close(
disconnectClients, closeWithoutWaitingClientDisconnect)
: CompletableFuture.completedFuture(null)));
: CompletableFuture.completedFuture(null))
.exceptionally(e -> {
if (e.getCause() instanceof BrokerServiceException.ServiceUnitNotReadyException
&& e.getMessage().contains("Please redo the lookup")) {
log.warn("[{}] Topic ownership check failed. Skipping it", topicName);
return null;
}
throw FutureUtil.wrapToCompletionException(e);
}));
}
});

if (getPulsar().getConfig().isTransactionCoordinatorEnabled()
&& serviceUnit.getNamespaceObject().equals(NamespaceName.SYSTEM_NAMESPACE)) {
TransactionMetadataStoreService metadataStoreService =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -615,6 +615,10 @@ protected CompletableFuture<Boolean> isBundleOwnedByAnyBroker(NamespaceName fqnn
.requestHttps(isRequestHttps())
.readOnly(true)
.loadTopicsInBundle(false).build();

if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
return nsService.checkOwnershipPresentAsync(nsBundle);
}
return nsService.getWebServiceUrlAsync(nsBundle, options).thenApply(Optional::isPresent);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TopicType;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
Expand Down Expand Up @@ -65,18 +64,11 @@ protected ExtensibleLoadManagerImplBaseTest(String defaultTestNamespace) {
}

protected ServiceConfiguration initConfig(ServiceConfiguration conf) {
// Set the inflight state waiting time and ownership monitor delay time to 5 seconds to avoid
// stuck when doing unload.
conf.setLoadBalancerInFlightServiceUnitStateWaitingTimeInMillis(5 * 1000);
conf.setLoadBalancerServiceUnitStateMonitorIntervalInSeconds(1);
conf.setForceDeleteNamespaceAllowed(true);
conf.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED);
conf.setAllowAutoTopicCreation(true);
conf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName());
conf.setLoadBalancerLoadSheddingStrategy(TransferShedder.class.getName());
conf.setLoadBalancerSheddingEnabled(false);
conf.setLoadBalancerDebugModeEnabled(true);
conf.setTopicLevelPoliciesEnabled(true);
return conf;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -264,6 +265,30 @@ public CompletableFuture<Map<String, BrokerLookupData>> filterAsync(Map<String,
assertTrue(brokerLookupData.isPresent());
}

@Test(timeOut = 30 * 1000)
public void testUnloadUponTopicLookupFailure() throws Exception {
TopicName topicName =
TopicName.get("public/test/testUnloadUponTopicLookupFailure");
NamespaceBundle bundle = pulsar1.getNamespaceService().getBundle(topicName);
primaryLoadManager.assign(Optional.empty(), bundle).get();

CompletableFuture future1 = new CompletableFuture();
CompletableFuture future2 = new CompletableFuture();
pulsar1.getBrokerService().getTopics().put(topicName.toString(), future1);
pulsar2.getBrokerService().getTopics().put(topicName.toString(), future2);
CompletableFuture.delayedExecutor(2, TimeUnit.SECONDS).execute(() -> {
future1.completeExceptionally(new CompletionException(
new BrokerServiceException.ServiceUnitNotReadyException("Please redo the lookup")));
future2.completeExceptionally(new CompletionException(
new BrokerServiceException.ServiceUnitNotReadyException("Please redo the lookup")));
});
admin.namespaces().unloadNamespaceBundle(bundle.getNamespaceObject().toString(), bundle.getBundleRange());

pulsar1.getBrokerService().getTopics().remove(topicName.toString());
pulsar2.getBrokerService().getTopics().remove(topicName.toString());
}


@Test(timeOut = 30 * 1000)
public void testUnloadAdminAPI() throws Exception {
Pair<TopicName, NamespaceBundle> topicAndBundle = getBundleIsNotOwnByChangeEventTopic("test-unload");
Expand Down

0 comments on commit 8b42318

Please sign in to comment.