Skip to content

Commit

Permalink
[fix][broker] Make ExtensibleLoadManagerImpl.getOwnedServiceUnits asy…
Browse files Browse the repository at this point in the history
…nc (apache#22727)

(cherry picked from commit fd5916c)
  • Loading branch information
heesung-sn committed May 17, 2024
1 parent 22b3a27 commit 05317c8
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -195,13 +195,14 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
/**
* Get all the bundles that are owned by this broker.
*/
public Set<NamespaceBundle> getOwnedServiceUnits() {
public CompletableFuture<Set<NamespaceBundle>> getOwnedServiceUnitsAsync() {
if (!started) {
log.warn("Failed to get owned service units, load manager is not started.");
return Collections.emptySet();
return CompletableFuture.completedFuture(Collections.emptySet());
}
Set<Map.Entry<String, ServiceUnitStateData>> entrySet = serviceUnitStateChannel.getOwnershipEntrySet();

String brokerId = brokerRegistry.getBrokerId();
Set<Map.Entry<String, ServiceUnitStateData>> entrySet = serviceUnitStateChannel.getOwnershipEntrySet();
Set<NamespaceBundle> ownedServiceUnits = entrySet.stream()
.filter(entry -> {
var stateData = entry.getValue();
Expand All @@ -214,34 +215,26 @@ public Set<NamespaceBundle> getOwnedServiceUnits() {
}).collect(Collectors.toSet());
// Add heartbeat and SLA monitor namespace bundle.
NamespaceName heartbeatNamespace = NamespaceService.getHeartbeatNamespace(brokerId, pulsar.getConfiguration());
try {
NamespaceBundle fullBundle = pulsar.getNamespaceService().getNamespaceBundleFactory()
.getFullBundle(heartbeatNamespace);
ownedServiceUnits.add(fullBundle);
} catch (Exception e) {
log.warn("Failed to get heartbeat namespace bundle.", e);
}
NamespaceName heartbeatNamespaceV2 = NamespaceService
.getHeartbeatNamespaceV2(brokerId, pulsar.getConfiguration());
try {
NamespaceBundle fullBundle = pulsar.getNamespaceService().getNamespaceBundleFactory()
.getFullBundle(heartbeatNamespaceV2);
ownedServiceUnits.add(fullBundle);
} catch (Exception e) {
log.warn("Failed to get heartbeat namespace V2 bundle.", e);
}

NamespaceName slaMonitorNamespace = NamespaceService
.getSLAMonitorNamespace(brokerId, pulsar.getConfiguration());
try {
NamespaceBundle fullBundle = pulsar.getNamespaceService().getNamespaceBundleFactory()
.getFullBundle(slaMonitorNamespace);
ownedServiceUnits.add(fullBundle);
} catch (Exception e) {
log.warn("Failed to get SLA Monitor namespace bundle.", e);
}

return ownedServiceUnits;
return pulsar.getNamespaceService().getNamespaceBundleFactory()
.getFullBundleAsync(heartbeatNamespace)
.thenAccept(fullBundle -> ownedServiceUnits.add(fullBundle)).exceptionally(e -> {
log.warn("Failed to get heartbeat namespace bundle.", e);
return null;
}).thenCompose(__ -> pulsar.getNamespaceService().getNamespaceBundleFactory()
.getFullBundleAsync(heartbeatNamespaceV2))
.thenAccept(fullBundle -> ownedServiceUnits.add(fullBundle)).exceptionally(e -> {
log.warn("Failed to get heartbeat namespace V2 bundle.", e);
return null;
}).thenCompose(__ -> pulsar.getNamespaceService().getNamespaceBundleFactory()
.getFullBundleAsync(slaMonitorNamespace))
.thenAccept(fullBundle -> ownedServiceUnits.add(fullBundle)).exceptionally(e -> {
log.warn("Failed to get SLA Monitor namespace bundle.", e);
return null;
}).thenApply(__ -> ownedServiceUnits);
}

public enum Role {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -808,12 +808,12 @@ public CompletableFuture<Map<String, NamespaceOwnershipStatus>> getOwnedNameSpac
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
ExtensibleLoadManagerImpl extensibleLoadManager =
ExtensibleLoadManagerImpl.get(loadManager.get());
var statusMap = extensibleLoadManager.getOwnedServiceUnits().stream()
.collect(Collectors.toMap(NamespaceBundle::toString,
bundle -> getNamespaceOwnershipStatus(true,
namespaceIsolationPolicies.getPolicyByNamespace(
bundle.getNamespaceObject()))));
return CompletableFuture.completedFuture(statusMap);
return extensibleLoadManager.getOwnedServiceUnitsAsync()
.thenApply(OwnedServiceUnits -> OwnedServiceUnits.stream()
.collect(Collectors.toMap(NamespaceBundle::toString,
bundle -> getNamespaceOwnershipStatus(true,
namespaceIsolationPolicies.getPolicyByNamespace(
bundle.getNamespaceObject())))));
}
Collection<CompletableFuture<OwnedBundle>> futures =
ownershipCache.getOwnedBundlesAsync().values();
Expand Down Expand Up @@ -1129,7 +1129,12 @@ public OwnershipCache getOwnershipCache() {
public Set<NamespaceBundle> getOwnedServiceUnits() {
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
ExtensibleLoadManagerImpl extensibleLoadManager = ExtensibleLoadManagerImpl.get(loadManager.get());
return extensibleLoadManager.getOwnedServiceUnits();
try {
return extensibleLoadManager.getOwnedServiceUnitsAsync()
.get(config.getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
return ownershipCache.getOwnedBundles().values().stream().map(OwnedBundle::getNamespaceBundle)
.collect(Collectors.toSet());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1126,13 +1126,15 @@ public void testGetOwnedServiceUnitsAndGetOwnedNamespaceStatus() throws Exceptio
.getFullBundle(slaMonitorNamespacePulsar2);


Set<NamespaceBundle> ownedServiceUnitsByPulsar1 = primaryLoadManager.getOwnedServiceUnits();
Set<NamespaceBundle> ownedServiceUnitsByPulsar1 = primaryLoadManager.getOwnedServiceUnitsAsync()
.get(5, TimeUnit.SECONDS);
log.info("Owned service units: {}", ownedServiceUnitsByPulsar1);
// heartbeat namespace bundle will own by pulsar1
assertTrue(ownedServiceUnitsByPulsar1.contains(bundle1));
assertTrue(ownedServiceUnitsByPulsar1.contains(bundle2));
assertTrue(ownedServiceUnitsByPulsar1.contains(slaBundle1));
Set<NamespaceBundle> ownedServiceUnitsByPulsar2 = secondaryLoadManager.getOwnedServiceUnits();
Set<NamespaceBundle> ownedServiceUnitsByPulsar2 = secondaryLoadManager.getOwnedServiceUnitsAsync()
.get(5, TimeUnit.SECONDS);
log.info("Owned service units: {}", ownedServiceUnitsByPulsar2);
assertTrue(ownedServiceUnitsByPulsar2.contains(bundle3));
assertTrue(ownedServiceUnitsByPulsar2.contains(bundle4));
Expand Down Expand Up @@ -1168,7 +1170,8 @@ private void assertOwnedServiceUnits(
ExtensibleLoadManagerImpl extensibleLoadManager,
NamespaceBundle bundle) throws PulsarAdminException {
Awaitility.await().untilAsserted(() -> {
Set<NamespaceBundle> ownedBundles = extensibleLoadManager.getOwnedServiceUnits();
Set<NamespaceBundle> ownedBundles = extensibleLoadManager.getOwnedServiceUnitsAsync()
.get(5, TimeUnit.SECONDS);
assertTrue(ownedBundles.contains(bundle));
});
Map<String, NamespaceOwnershipStatus> ownedNamespaces =
Expand All @@ -1181,9 +1184,11 @@ private void assertOwnedServiceUnits(
}

@Test(timeOut = 30 * 1000)
public void testGetOwnedServiceUnitsWhenLoadManagerNotStart() {
public void testGetOwnedServiceUnitsWhenLoadManagerNotStart()
throws Exception {
ExtensibleLoadManagerImpl loadManager = new ExtensibleLoadManagerImpl();
Set<NamespaceBundle> ownedServiceUnits = loadManager.getOwnedServiceUnits();
Set<NamespaceBundle> ownedServiceUnits = loadManager.getOwnedServiceUnitsAsync()
.get(5, TimeUnit.SECONDS);
assertNotNull(ownedServiceUnits);
assertTrue(ownedServiceUnits.isEmpty());
}
Expand All @@ -1198,7 +1203,7 @@ public void testTryAcquiringOwnership()
NamespaceEphemeralData namespaceEphemeralData = primaryLoadManager.tryAcquiringOwnership(bundle).get();
assertTrue(Set.of(pulsar1.getBrokerServiceUrl(), pulsar2.getBrokerServiceUrl())
.contains(namespaceEphemeralData.getNativeUrl()));
admin.namespaces().deleteNamespace(namespace, true);
admin.namespaces().deleteNamespace(namespace);
}

@Test(timeOut = 30 * 1000)
Expand Down

0 comments on commit 05317c8

Please sign in to comment.