Skip to content

Commit

Permalink
[fix][broker] Fix resourceGroups loading
Browse files Browse the repository at this point in the history
Signed-off-by: Zixuan Liu <nodeces@gmail.com>
  • Loading branch information
nodece committed Dec 21, 2023
1 parent b944f10 commit 6ae4d30
Show file tree
Hide file tree
Showing 4 changed files with 177 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,20 @@
package org.apache.pulsar.broker.resourcegroup;

import com.google.common.collect.Sets;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import lombok.SneakyThrows;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.resources.ResourceGroupResources;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.policies.data.ResourceGroup;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.metadata.api.NotificationType;
import org.slf4j.Logger;
Expand All @@ -49,44 +55,55 @@ public class ResourceGroupConfigListener implements Consumer<Notification> {
private final ResourceGroupResources rgResources;
private final ResourceGroupNamespaceConfigListener rgNamespaceConfigListener;

@SneakyThrows
public ResourceGroupConfigListener(ResourceGroupService rgService, PulsarService pulsarService) {
this.rgService = rgService;
this.pulsarService = pulsarService;
this.rgResources = pulsarService.getPulsarResources().getResourcegroupResources();
loadAllResourceGroups();
loadAllResourceGroupsAsync()
.get(pulsarService.getConfiguration().getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS);
this.rgResources.getStore().registerListener(this);
rgNamespaceConfigListener = new ResourceGroupNamespaceConfigListener(
rgService, pulsarService, this);
}

private void loadAllResourceGroups() {
rgResources.listResourceGroupsAsync().whenCompleteAsync((rgList, ex) -> {
if (ex != null) {
LOG.error("Exception when fetching resource groups", ex);
return;
}
private CompletableFuture<Void> loadAllResourceGroupsAsync() {
return rgResources.listResourceGroupsAsync().thenCompose(rgList -> {
final Set<String> existingSet = rgService.resourceGroupGetAll();
HashSet<String> newSet = new HashSet<>();

newSet.addAll(rgList);

final Sets.SetView<String> deleteList = Sets.difference(existingSet, newSet);

for (String rgName: deleteList) {
for (String rgName : deleteList) {
deleteResourceGroup(rgName);
}

final Sets.SetView<String> addList = Sets.difference(newSet, existingSet);
for (String rgName: addList) {
pulsarService.getPulsarResources().getResourcegroupResources()
.getResourceGroupAsync(rgName).thenAcceptAsync(optionalRg -> {
ResourceGroup rg = optionalRg.get();
createResourceGroup(rgName, rg);
}).exceptionally((ex1) -> {
LOG.error("Failed to fetch resourceGroup", ex1);
return null;
});
List<CompletableFuture<Void>> futures = new ArrayList<>();
for (String rgName : addList) {
futures.add(pulsarService.getPulsarResources()
.getResourcegroupResources()
.getResourceGroupAsync(rgName)
.thenAccept(optionalRg -> {
if (optionalRg.isPresent()) {
ResourceGroup rg = optionalRg.get();
createResourceGroup(rgName, rg);
}
}));
}

return FutureUtil.waitForAll(futures);
});
}

private void reloadAllResourceGroupsAsync() {
loadAllResourceGroupsAsync().exceptionally(ex -> {
if (ex != null) {
LOG.error("Exception when fetching resource groups", ex);
}
return null;
});
}

Expand Down Expand Up @@ -140,7 +157,7 @@ public void accept(Notification notification) {
Optional<String> rgName = ResourceGroupResources.resourceGroupNameFromPath(notifyPath);
if ((notification.getType() == NotificationType.ChildrenChanged)
|| (notification.getType() == NotificationType.Created)) {
loadAllResourceGroups();
reloadAllResourceGroupsAsync();
} else if (rgName.isPresent()) {
switch (notification.getType()) {
case Modified:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,19 @@
*/
package org.apache.pulsar.broker.resourcegroup;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import lombok.SneakyThrows;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.resources.NamespaceResources;
import org.apache.pulsar.broker.resources.TenantResources;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.metadata.api.NotificationType;
import org.slf4j.Logger;
Expand All @@ -48,56 +54,64 @@ public class ResourceGroupNamespaceConfigListener implements Consumer<Notificati
private final TenantResources tenantResources;
private final ResourceGroupConfigListener rgConfigListener;

@SneakyThrows
public ResourceGroupNamespaceConfigListener(ResourceGroupService rgService, PulsarService pulsarService,
ResourceGroupConfigListener rgConfigListener) {
this.rgService = rgService;
this.pulsarService = pulsarService;
this.namespaceResources = pulsarService.getPulsarResources().getNamespaceResources();
this.tenantResources = pulsarService.getPulsarResources().getTenantResources();
this.rgConfigListener = rgConfigListener;
loadAllNamespaceResourceGroups();
loadAllNamespaceResourceGroupsAsync()
.get(pulsarService.getConfiguration().getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS);
this.namespaceResources.getStore().registerListener(this);
}

private void updateNamespaceResourceGroup(NamespaceName nsName) {
namespaceResources.getPoliciesAsync(nsName).whenCompleteAsync((optionalPolicies, ex) -> {
private CompletableFuture<Void> updateNamespaceResourceGroupAsync(NamespaceName nsName) {
return namespaceResources.getPoliciesAsync(nsName).thenAccept((optionalPolicies) -> {
if (optionalPolicies.isPresent()) {
Policies policy = optionalPolicies.get();
reconcileNamespaceResourceGroup(nsName, policy);
}
}).whenComplete((__, ex) -> {
if (ex != null) {
LOG.error("Exception when getting namespace {}", nsName, ex);
return;
}
Policies policy = optionalPolicies.get();
reconcileNamespaceResourceGroup(nsName, policy);
});
}

private void loadAllNamespaceResourceGroups() {
tenantResources.listTenantsAsync().whenComplete((tenantList, ex) -> {
if (ex != null) {
LOG.error("Exception when fetching tenants", ex);
return;
}
for (String ts: tenantList) {
namespaceResources.listNamespacesAsync(ts).whenComplete((nsList, ex1) -> {
if (ex1 != null) {
LOG.error("Exception when fetching namespaces", ex1);
} else {
for (String ns : nsList) {
NamespaceName nsn = NamespaceName.get(ts, ns);
namespaceResources.namespaceExistsAsync(nsn)
.thenAccept(exists -> {
if (exists) {
updateNamespaceResourceGroup(NamespaceName.get(ts, ns));
}
});
}
}
});
private CompletableFuture<Void> loadAllNamespaceResourceGroupsAsync() {
return tenantResources.listTenantsAsync().thenCompose(tenantList -> {
List<CompletableFuture<Void>> futures = new ArrayList<>();
for (String ts : tenantList) {
futures.add(namespaceResources.listNamespacesAsync(ts)
.thenCompose((nsList) -> {
List<CompletableFuture<Void>> nsFutures = new ArrayList<>();
for (String ns : nsList) {
NamespaceName nsn = NamespaceName.get(ts, ns);
nsFutures.add(namespaceResources.namespaceExistsAsync(nsn)
.thenCompose(exists -> {
if (exists) {
return updateNamespaceResourceGroupAsync(nsn);
}
return CompletableFuture.completedFuture(null);
}));
}

return FutureUtil.waitForAll(nsFutures);
})
);
}

return FutureUtil.waitForAll(futures);
});
}

public void reloadAllNamespaceResourceGroups() {
loadAllNamespaceResourceGroups();
private void reloadAllNamespaceResourceGroups() {
loadAllNamespaceResourceGroupsAsync().exceptionally((e) -> {
LOG.error("Failed to reload all namespace resourceGroups", e);
return null;
});
}

public void reconcileNamespaceResourceGroup(NamespaceName ns, Policies policy) {
Expand Down Expand Up @@ -153,7 +167,7 @@ public void accept(Notification notification) {
switch (notification.getType()) {
case Modified: {
NamespaceName nsName = NamespaceResources.namespaceFromPath(notifyPath);
updateNamespaceResourceGroup(nsName);
updateNamespaceResourceGroupAsync(nsName);
break;
}
case Deleted: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,24 @@
*/
package org.apache.pulsar.broker.resourcegroup;

import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertThrows;
import static org.testng.Assert.expectThrows;
import com.google.common.collect.Sets;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.broker.resources.ResourceGroupResources;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.naming.NamespaceName;
Expand Down Expand Up @@ -288,4 +298,27 @@ private void prepareData() throws PulsarAdminException {
testAddRg.setDispatchRateInBytes(200L);

}

@Test
public void testLoadAllResourceGroupsAsyncFailed() {
PulsarService pulsarService = mock(PulsarService.class);
PulsarResources pulsarResources = mock(PulsarResources.class);

doReturn(pulsarResources).when(pulsarService).getPulsarResources();

ResourceGroupService resourceGroupService = mock(ResourceGroupService.class);
ResourceGroupResources resourceGroupResources = mock(ResourceGroupResources.class);
RuntimeException exception = new RuntimeException("listResourceGroupsAsync error");
doReturn(CompletableFuture.failedFuture(exception))
.when(resourceGroupResources).listResourceGroupsAsync();
doReturn(resourceGroupResources).when(pulsarResources).getResourcegroupResources();

ServiceConfiguration ServiceConfiguration = new ServiceConfiguration();
doReturn(ServiceConfiguration).when(pulsarService).getConfiguration();

ExecutionException e = expectThrows(ExecutionException.class,
() -> new ResourceGroupConfigListener(resourceGroupService, pulsarService));
assertEquals(e.getCause(), exception);
verify(resourceGroupResources, times(0)).getStore();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.resourcegroup;

import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.expectThrows;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.resources.NamespaceResources;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.broker.resources.TenantResources;
import org.testng.annotations.Test;

public class ResourceGroupNamespaceConfigListenerTest {

@Test
public void testLoadAllNamespaceResourceGroupsAsyncFailed() {
PulsarService pulsarService = mock(PulsarService.class);
PulsarResources pulsarResources = mock(PulsarResources.class);

doReturn(pulsarResources).when(pulsarService).getPulsarResources();

TenantResources tenantResources = mock(TenantResources.class);
RuntimeException exception = new RuntimeException("listTenantsAsync error");
doReturn(CompletableFuture.failedFuture(exception))
.when(tenantResources).listTenantsAsync();
doReturn(tenantResources).when(pulsarResources).getTenantResources();

NamespaceResources namespaceResources = mock(NamespaceResources.class);
doReturn(namespaceResources).when(pulsarResources).getNamespaceResources();

ServiceConfiguration ServiceConfiguration = new ServiceConfiguration();
doReturn(ServiceConfiguration).when(pulsarService).getConfiguration();

ResourceGroupConfigListener resourceGroupConfigListener = mock(ResourceGroupConfigListener.class);
ResourceGroupService resourceGroupService = mock(ResourceGroupService.class);
ExecutionException e = expectThrows(ExecutionException.class,
() -> new ResourceGroupNamespaceConfigListener(resourceGroupService, pulsarService, resourceGroupConfigListener));
assertEquals(e.getCause(), exception);
verify(namespaceResources, times(0)).getStore();
}
}

0 comments on commit 6ae4d30

Please sign in to comment.