Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker] Fix ResourceGroups loading #21781

Merged
merged 1 commit into from
Mar 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,22 @@
*/
package org.apache.pulsar.broker.resourcegroup;

import static org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Sets;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.resources.ResourceGroupResources;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.policies.data.ResourceGroup;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.metadata.api.NotificationType;
import org.slf4j.Logger;
Expand All @@ -47,46 +54,59 @@ public class ResourceGroupConfigListener implements Consumer<Notification> {
private final ResourceGroupService rgService;
private final PulsarService pulsarService;
private final ResourceGroupResources rgResources;
private final ResourceGroupNamespaceConfigListener rgNamespaceConfigListener;
private volatile ResourceGroupNamespaceConfigListener rgNamespaceConfigListener;

public ResourceGroupConfigListener(ResourceGroupService rgService, PulsarService pulsarService) {
this.rgService = rgService;
this.pulsarService = pulsarService;
this.rgResources = pulsarService.getPulsarResources().getResourcegroupResources();
loadAllResourceGroups();
this.rgResources.getStore().registerListener(this);
rgNamespaceConfigListener = new ResourceGroupNamespaceConfigListener(
rgService, pulsarService, this);
execute(() -> loadAllResourceGroupsWithRetryAsync(0));
}

private void loadAllResourceGroups() {
rgResources.listResourceGroupsAsync().whenCompleteAsync((rgList, ex) -> {
if (ex != null) {
LOG.error("Exception when fetching resource groups", ex);
return;
private void loadAllResourceGroupsWithRetryAsync(long retry) {
loadAllResourceGroupsAsync().thenAccept(__ -> {
if (rgNamespaceConfigListener == null) {
rgNamespaceConfigListener = new ResourceGroupNamespaceConfigListener(rgService, pulsarService, this);
}
}).exceptionally(e -> {
long nextRetry = retry + 1;
long delay = 500 * nextRetry;
LOG.error("Failed to load all resource groups during initialization, retrying after {}ms: ", delay, e);
schedule(() -> loadAllResourceGroupsWithRetryAsync(nextRetry), delay);
return null;
});
}

private CompletableFuture<Void> loadAllResourceGroupsAsync() {
return rgResources.listResourceGroupsAsync().thenCompose(rgList -> {
final Set<String> existingSet = rgService.resourceGroupGetAll();
HashSet<String> newSet = new HashSet<>();

newSet.addAll(rgList);

final Sets.SetView<String> deleteList = Sets.difference(existingSet, newSet);

for (String rgName: deleteList) {
for (String rgName : deleteList) {
deleteResourceGroup(rgName);
}

final Sets.SetView<String> addList = Sets.difference(newSet, existingSet);
for (String rgName: addList) {
pulsarService.getPulsarResources().getResourcegroupResources()
.getResourceGroupAsync(rgName).thenAcceptAsync(optionalRg -> {
ResourceGroup rg = optionalRg.get();
createResourceGroup(rgName, rg);
}).exceptionally((ex1) -> {
LOG.error("Failed to fetch resourceGroup", ex1);
return null;
});
List<CompletableFuture<Void>> futures = new ArrayList<>();
for (String rgName : addList) {
futures.add(pulsarService.getPulsarResources()
.getResourcegroupResources()
.getResourceGroupAsync(rgName)
.thenAccept(optionalRg -> {
if (optionalRg.isPresent()) {
ResourceGroup rg = optionalRg.get();
createResourceGroup(rgName, rg);
}
})
);
}

return FutureUtil.waitForAll(futures);
});
}

Expand Down Expand Up @@ -140,7 +160,10 @@ public void accept(Notification notification) {
Optional<String> rgName = ResourceGroupResources.resourceGroupNameFromPath(notifyPath);
if ((notification.getType() == NotificationType.ChildrenChanged)
|| (notification.getType() == NotificationType.Created)) {
loadAllResourceGroups();
loadAllResourceGroupsAsync().exceptionally((ex) -> {
LOG.error("Exception when fetching resource groups", ex);
nodece marked this conversation as resolved.
Show resolved Hide resolved
return null;
});
} else if (rgName.isPresent()) {
switch (notification.getType()) {
case Modified:
Expand All @@ -151,4 +174,17 @@ public void accept(Notification notification) {
}
}
}

protected void execute(Runnable runnable) {
pulsarService.getExecutor().execute(catchingAndLoggingThrowables(runnable));
}

protected void schedule(Runnable runnable, long delayMs) {
pulsarService.getExecutor().schedule(catchingAndLoggingThrowables(runnable), delayMs, TimeUnit.MILLISECONDS);
}

@VisibleForTesting
ResourceGroupNamespaceConfigListener getRgNamespaceConfigListener() {
return rgNamespaceConfigListener;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,31 @@
*/
package org.apache.pulsar.broker.resourcegroup;

import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertThrows;
import com.google.common.collect.Sets;
import java.util.ArrayList;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.broker.resources.ResourceGroupResources;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.ResourceGroup;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.awaitility.Awaitility;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
Expand Down Expand Up @@ -288,4 +299,41 @@ private void prepareData() throws PulsarAdminException {
testAddRg.setDispatchRateInBytes(200L);

}

@Test
public void testNewResourceGroupNamespaceConfigListener() {
PulsarService pulsarService = mock(PulsarService.class);
PulsarResources pulsarResources = mock(PulsarResources.class);
doReturn(pulsarResources).when(pulsarService).getPulsarResources();
ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
doReturn(scheduledExecutorService).when(pulsarService).getExecutor();

ResourceGroupService resourceGroupService = mock(ResourceGroupService.class);
ResourceGroupResources resourceGroupResources = mock(ResourceGroupResources.class);
nodece marked this conversation as resolved.
Show resolved Hide resolved
RuntimeException exception = new RuntimeException("listResourceGroupsAsync error");
doReturn(CompletableFuture.failedFuture(exception))
.when(resourceGroupResources).listResourceGroupsAsync();
doReturn(mock(MetadataStore.class))
.when(resourceGroupResources).getStore();
doReturn(resourceGroupResources).when(pulsarResources).getResourcegroupResources();

ServiceConfiguration ServiceConfiguration = new ServiceConfiguration();
doReturn(ServiceConfiguration).when(pulsarService).getConfiguration();

ResourceGroupConfigListener resourceGroupConfigListener =
new ResourceGroupConfigListener(resourceGroupService, pulsarService);

// getResourcegroupResources() returns an error, ResourceGroupNamespaceConfigListener doesn't be created.
Awaitility.await().pollDelay(3, TimeUnit.SECONDS).untilAsserted(() -> {
assertNull(resourceGroupConfigListener.getRgNamespaceConfigListener());
});

// ResourceGroupNamespaceConfigListener will be created, and uses real pulsar resource.
doReturn(CompletableFuture.completedFuture(new ArrayList<String>()))
.when(resourceGroupResources).listResourceGroupsAsync();
doReturn(pulsar.getPulsarResources()).when(pulsarService).getPulsarResources();
Awaitility.await().untilAsserted(() -> {
assertNotNull(resourceGroupConfigListener.getRgNamespaceConfigListener());
});
}
}