Skip to content

Commit

Permalink
[fix][broker] Fix ResourceGroups loading
Browse files Browse the repository at this point in the history
Signed-off-by: Zixuan Liu <nodeces@gmail.com>
  • Loading branch information
nodece committed Mar 21, 2024
1 parent e2f94dc commit 68bb959
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,25 @@
*/
package org.apache.pulsar.broker.resourcegroup;

import static org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Sets;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.resources.ResourceGroupResources;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.policies.data.ResourceGroup;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.metadata.api.NotificationType;
import org.slf4j.Logger;
Expand All @@ -41,52 +51,68 @@
* @see <a href="https://github.com/apache/pulsar/wiki/PIP-82%3A-Tenant-and-namespace-level-rate-limiting">Global-quotas</a>
*
*/
public class ResourceGroupConfigListener implements Consumer<Notification> {
public class ResourceGroupConfigListener implements Consumer<Notification>, AutoCloseable {

private static final Logger LOG = LoggerFactory.getLogger(ResourceGroupConfigListener.class);
private final ResourceGroupService rgService;
private final PulsarService pulsarService;
private final ResourceGroupResources rgResources;
private final ResourceGroupNamespaceConfigListener rgNamespaceConfigListener;
private volatile ResourceGroupNamespaceConfigListener rgNamespaceConfigListener;
private final ScheduledExecutorService executorService;

public ResourceGroupConfigListener(ResourceGroupService rgService, PulsarService pulsarService) {
this.rgService = rgService;
this.pulsarService = pulsarService;
this.rgResources = pulsarService.getPulsarResources().getResourcegroupResources();
loadAllResourceGroups();
this.rgResources.getStore().registerListener(this);
rgNamespaceConfigListener = new ResourceGroupNamespaceConfigListener(
rgService, pulsarService, this);
this.executorService =
Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-resource-group-config"));
execute(() -> loadAllResourceGroupsWithRetryAsync(0));
}

private void loadAllResourceGroups() {
rgResources.listResourceGroupsAsync().whenCompleteAsync((rgList, ex) -> {
if (ex != null) {
LOG.error("Exception when fetching resource groups", ex);
return;
private void loadAllResourceGroupsWithRetryAsync(long retry) {
loadAllResourceGroupsAsync().thenAccept(__ -> {
if (rgNamespaceConfigListener == null) {
rgNamespaceConfigListener = new ResourceGroupNamespaceConfigListener(rgService, pulsarService, this);
}
}).exceptionally(e -> {
long nextRetry = retry + 1;
long delay = 500 * nextRetry;
LOG.error("Failed to load all resource groups during initialization, retrying after {}ms: ", delay, e);
schedule(() -> loadAllResourceGroupsWithRetryAsync(nextRetry), delay);
return null;
});
}

private CompletableFuture<Void> loadAllResourceGroupsAsync() {
return rgResources.listResourceGroupsAsync().thenCompose(rgList -> {
final Set<String> existingSet = rgService.resourceGroupGetAll();
HashSet<String> newSet = new HashSet<>();

newSet.addAll(rgList);

final Sets.SetView<String> deleteList = Sets.difference(existingSet, newSet);

for (String rgName: deleteList) {
for (String rgName : deleteList) {
deleteResourceGroup(rgName);
}

final Sets.SetView<String> addList = Sets.difference(newSet, existingSet);
for (String rgName: addList) {
pulsarService.getPulsarResources().getResourcegroupResources()
.getResourceGroupAsync(rgName).thenAcceptAsync(optionalRg -> {
ResourceGroup rg = optionalRg.get();
createResourceGroup(rgName, rg);
}).exceptionally((ex1) -> {
LOG.error("Failed to fetch resourceGroup", ex1);
return null;
});
List<CompletableFuture<Void>> futures = new ArrayList<>();
for (String rgName : addList) {
futures.add(pulsarService.getPulsarResources()
.getResourcegroupResources()
.getResourceGroupAsync(rgName)
.thenAccept(optionalRg -> {
if (optionalRg.isPresent()) {
ResourceGroup rg = optionalRg.get();
createResourceGroup(rgName, rg);
}
})
);
}

return FutureUtil.waitForAll(futures);
});
}

Expand Down Expand Up @@ -140,7 +166,10 @@ public void accept(Notification notification) {
Optional<String> rgName = ResourceGroupResources.resourceGroupNameFromPath(notifyPath);
if ((notification.getType() == NotificationType.ChildrenChanged)
|| (notification.getType() == NotificationType.Created)) {
loadAllResourceGroups();
loadAllResourceGroupsAsync().exceptionally((ex) -> {
LOG.error("Exception when fetching resource groups", ex);
return null;
});
} else if (rgName.isPresent()) {
switch (notification.getType()) {
case Modified:
Expand All @@ -151,4 +180,23 @@ public void accept(Notification notification) {
}
}
}

protected void execute(Runnable runnable) {
executorService.execute(catchingAndLoggingThrowables(runnable));
}

protected void schedule(Runnable runnable, long delayMs) {
executorService.schedule(catchingAndLoggingThrowables(runnable), delayMs, TimeUnit.MILLISECONDS);
}


@VisibleForTesting
ResourceGroupNamespaceConfigListener getRgNamespaceConfigListener() {
return rgNamespaceConfigListener;
}

@Override
public void close() throws Exception {
executorService.shutdown();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,7 @@ public ResourceGroup getNamespaceResourceGroup(NamespaceName namespaceName) {

@Override
public void close() throws Exception {
rgConfigListener.close();
if (aggregateLocalUsagePeriodicTask != null) {
aggregateLocalUsagePeriodicTask.cancel(true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,30 @@
*/
package org.apache.pulsar.broker.resourcegroup;

import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertThrows;
import com.google.common.collect.Sets;
import java.util.ArrayList;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.broker.resources.ResourceGroupResources;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.ResourceGroup;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.awaitility.Awaitility;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
Expand Down Expand Up @@ -288,4 +298,40 @@ private void prepareData() throws PulsarAdminException {
testAddRg.setDispatchRateInBytes(200L);

}

@Test
public void testNewResourceGroupNamespaceConfigListener() throws Exception {
PulsarService pulsarService = mock(PulsarService.class);
PulsarResources pulsarResources = mock(PulsarResources.class);
doReturn(pulsarResources).when(pulsarService).getPulsarResources();

ResourceGroupService resourceGroupService = mock(ResourceGroupService.class);
ResourceGroupResources resourceGroupResources = mock(ResourceGroupResources.class);
RuntimeException exception = new RuntimeException("listResourceGroupsAsync error");
doReturn(CompletableFuture.failedFuture(exception))
.when(resourceGroupResources).listResourceGroupsAsync();
doReturn(mock(MetadataStore.class))
.when(resourceGroupResources).getStore();
doReturn(resourceGroupResources).when(pulsarResources).getResourcegroupResources();

ServiceConfiguration ServiceConfiguration = new ServiceConfiguration();
doReturn(ServiceConfiguration).when(pulsarService).getConfiguration();

@Cleanup
ResourceGroupConfigListener resourceGroupConfigListener =
new ResourceGroupConfigListener(resourceGroupService, pulsarService);

// getResourcegroupResources() returns an error, ResourceGroupNamespaceConfigListener doesn't be created.
Awaitility.await().pollDelay(3, TimeUnit.SECONDS).untilAsserted(() -> {
assertNull(resourceGroupConfigListener.getRgNamespaceConfigListener());
});

// ResourceGroupNamespaceConfigListener will be created, and uses real pulsar resource.
doReturn(CompletableFuture.completedFuture(new ArrayList<String>()))
.when(resourceGroupResources).listResourceGroupsAsync();
doReturn(pulsar.getPulsarResources()).when(pulsarService).getPulsarResources();
Awaitility.await().untilAsserted(() -> {
assertNotNull(resourceGroupConfigListener.getRgNamespaceConfigListener());
});
}
}

0 comments on commit 68bb959

Please sign in to comment.