Skip to content

Commit

Permalink
[fix] [broker] Update topic policies as much as possible when some ex…
Browse files Browse the repository at this point in the history
… was thrown (#21810)

### Motivation

After the topic policies update, there are many components will be updated one by one, even if the config of components has not been modified. There are the 11 components that need update:
- `7` rate limiters(`publish`, `dispatch topic-level`, `dispatch subscription-level`,  `dispatch resourceGroup-level`, `subscribe API`, `replication`, `shadow topic replication`)
- update ManagedLedger configs(`retention`, `offloader`)
- start/stop replication
- start/stop compaction
- start/stop deduplication

Once a component update fails, the following update will be skipped. It would cause a confusing thing: you want to set a retention policy, but it will be skipped due to the `update subscribe rate limiter` failure (you did not edit the `subscribe rate limitation policy`)

Since none of the components in the above list have any additional dependencies for individual updates, ensuring success as much as possible is appropriate.

### Modifications
- Update topic policies as much as possible even if some component updates fail, all component updates are still in the same thread, and they still update one by one, just throw the error later.
- Rename `updatePublishDispatcher` to `updatePublishRateLimiter`
  • Loading branch information
poorbarcode committed Jan 3, 2024
1 parent 1a4a9d9 commit ed59967
Show file tree
Hide file tree
Showing 6 changed files with 123 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1229,7 +1229,7 @@ protected boolean isExceedMaximumMessageSize(int size, PublishContext publishCon
/**
* update topic publish dispatcher for this topic.
*/
public void updatePublishDispatcher() {
public void updatePublishRateLimiter() {
PublishRate publishRate = topicPolicies.getPublishRate().get();
if (publishRate.publishThrottlingRateInByte > 0 || publishRate.publishThrottlingRateInMsg > 0) {
log.info("Enabling publish rate limiting {} on topic {}", publishRate, getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2706,7 +2706,7 @@ private void updateMaxPublishRatePerTopicInMessages() {
forEachTopic(topic -> {
if (topic instanceof AbstractTopic) {
((AbstractTopic) topic).updateBrokerPublishRate();
((AbstractTopic) topic).updatePublishDispatcher();
((AbstractTopic) topic).updatePublishRateLimiter();
}
}));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ public CompletableFuture<Void> initialize() {
isEncryptionRequired = policies.encryption_required;
isAllowAutoUpdateSchema = policies.is_allow_auto_update_schema;
}
updatePublishDispatcher();
updatePublishRateLimiter();
updateResourceGroupLimiter(policies);
return updateClusterMigrated();
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ public CompletableFuture<Void> initialize() {
.thenAcceptAsync(optPolicies -> {
if (!optPolicies.isPresent()) {
isEncryptionRequired = false;
updatePublishDispatcher();
updatePublishRateLimiter();
updateResourceGroupLimiter(new Policies());
initializeDispatchRateLimiterIfNeeded();
updateSubscribeRateLimiter();
Expand All @@ -372,7 +372,7 @@ public CompletableFuture<Void> initialize() {

updateSubscribeRateLimiter();

updatePublishDispatcher();
updatePublishRateLimiter();

updateResourceGroupLimiter(policies);

Expand Down Expand Up @@ -3086,39 +3086,60 @@ public CompletableFuture<Void> onPoliciesUpdate(@Nonnull Policies data) {
return CompletableFuture.completedFuture(null);
}

// Update props.
// The component "EntryFilters" is update in the method "updateTopicPolicyByNamespacePolicy(data)".
// see more detail: https://github.com/apache/pulsar/pull/19364.
updateTopicPolicyByNamespacePolicy(data);
checkReplicatedSubscriptionControllerState();
isEncryptionRequired = data.encryption_required;

isAllowAutoUpdateSchema = data.is_allow_auto_update_schema;

updateDispatchRateLimiter();

updateSubscribeRateLimiter();
// Apply policies for components.
List<CompletableFuture<Void>> applyPolicyTasks = applyUpdatedTopicPolicies();
applyPolicyTasks.add(applyUpdatedNamespacePolicies(data));
return FutureUtil.waitForAll(applyPolicyTasks)
.thenAccept(__ -> log.info("[{}] namespace-level policies updated successfully", topic))
.exceptionally(ex -> {
log.error("[{}] update namespace polices : {} error", this.getName(), data, ex);
throw FutureUtil.wrapToCompletionException(ex);
});
}

updatePublishDispatcher();
private CompletableFuture<Void> applyUpdatedNamespacePolicies(Policies namespaceLevelPolicies) {
return FutureUtil.runWithCurrentThread(() -> updateResourceGroupLimiter(namespaceLevelPolicies));
}

updateResourceGroupLimiter(data);
private List<CompletableFuture<Void>> applyUpdatedTopicPolicies() {
List<CompletableFuture<Void>> applyPoliciesFutureList = new ArrayList<>();

List<CompletableFuture<Void>> producerCheckFutures = new ArrayList<>(producers.size());
producers.values().forEach(producer -> producerCheckFutures.add(
// Client permission check.
subscriptions.forEach((subName, sub) -> {
sub.getConsumers().forEach(consumer -> applyPoliciesFutureList.add(consumer.checkPermissionsAsync()));
});
producers.values().forEach(producer -> applyPoliciesFutureList.add(
producer.checkPermissionsAsync().thenRun(producer::checkEncryption)));
// Check message expiry.
applyPoliciesFutureList.add(FutureUtil.runWithCurrentThread(() -> checkMessageExpiry()));

return FutureUtil.waitForAll(producerCheckFutures).thenCompose((__) -> {
return updateSubscriptionsDispatcherRateLimiter().thenCompose((___) -> {
replicators.forEach((name, replicator) -> replicator.updateRateLimiter());
shadowReplicators.forEach((name, replicator) -> replicator.updateRateLimiter());
checkMessageExpiry();
CompletableFuture<Void> replicationFuture = checkReplicationAndRetryOnFailure();
CompletableFuture<Void> dedupFuture = checkDeduplicationStatus();
CompletableFuture<Void> persistentPoliciesFuture = checkPersistencePolicies();
return CompletableFuture.allOf(replicationFuture, dedupFuture, persistentPoliciesFuture,
preCreateSubscriptionForCompactionIfNeeded());
});
}).exceptionally(ex -> {
log.error("[{}] update namespace polices : {} error", this.getName(), data, ex);
throw FutureUtil.wrapToCompletionException(ex);
});
// Update rate limiters.
applyPoliciesFutureList.add(FutureUtil.runWithCurrentThread(() -> updateDispatchRateLimiter()));
applyPoliciesFutureList.add(FutureUtil.runWithCurrentThread(() -> updateSubscribeRateLimiter()));
applyPoliciesFutureList.add(FutureUtil.runWithCurrentThread(() -> updatePublishRateLimiter()));

applyPoliciesFutureList.add(FutureUtil.runWithCurrentThread(() -> updateSubscriptionsDispatcherRateLimiter()));
applyPoliciesFutureList.add(FutureUtil.runWithCurrentThread(
() -> replicators.forEach((name, replicator) -> replicator.updateRateLimiter())));
applyPoliciesFutureList.add(FutureUtil.runWithCurrentThread(
() -> shadowReplicators.forEach((name, replicator) -> replicator.updateRateLimiter())));

// Other components.
applyPoliciesFutureList.add(FutureUtil.runWithCurrentThread(() -> checkReplicationAndRetryOnFailure()));
applyPoliciesFutureList.add(FutureUtil.runWithCurrentThread(() -> checkDeduplicationStatus()));
applyPoliciesFutureList.add(FutureUtil.runWithCurrentThread(() -> checkPersistencePolicies()));
applyPoliciesFutureList.add(FutureUtil.runWithCurrentThread(
() -> preCreateSubscriptionForCompactionIfNeeded()));

return applyPoliciesFutureList;
}

/**
Expand Down Expand Up @@ -3778,42 +3799,30 @@ public void onUpdate(TopicPolicies policies) {
if (policies == null) {
return;
}
// Update props.
// The component "EntryFilters" is update in the method "updateTopicPolicy(data)".
// see more detail: https://github.com/apache/pulsar/pull/19364.
updateTopicPolicy(policies);
shadowTopics = policies.getShadowTopics();
updateDispatchRateLimiter();
checkReplicatedSubscriptionControllerState();
updateSubscriptionsDispatcherRateLimiter().thenRun(() -> {
updatePublishDispatcher();
updateSubscribeRateLimiter();
replicators.forEach((name, replicator) -> replicator.updateRateLimiter());
shadowReplicators.forEach((name, replicator) -> replicator.updateRateLimiter());
checkMessageExpiry();
})
.thenCompose(__ -> checkReplicationAndRetryOnFailure())
.thenCompose(__ -> checkDeduplicationStatus())
.thenCompose(__ -> preCreateSubscriptionForCompactionIfNeeded())
.thenCompose(__ -> checkPersistencePolicies())
.thenAccept(__ -> log.info("[{}] Policies updated successfully", topic))
.exceptionally(e -> {
Throwable t = FutureUtil.unwrapCompletionException(e);
log.error("[{}] update topic policy error: {}", topic, t.getMessage(), t);
return null;
});

// Apply policies for components(not contains the specified policies which only defined in namespace policies).
FutureUtil.waitForAll(applyUpdatedTopicPolicies())
.thenAccept(__ -> log.info("[{}] topic-level policies updated successfully", topic))
.exceptionally(e -> {
Throwable t = FutureUtil.unwrapCompletionException(e);
log.error("[{}] update topic-level policy error: {}", topic, t.getMessage(), t);
return null;
});
}

private CompletableFuture<Void> updateSubscriptionsDispatcherRateLimiter() {
List<CompletableFuture<Void>> subscriptionCheckFutures = new ArrayList<>((int) subscriptions.size());
private void updateSubscriptionsDispatcherRateLimiter() {
subscriptions.forEach((subName, sub) -> {
List<CompletableFuture<Void>> consumerCheckFutures = new ArrayList<>(sub.getConsumers().size());
sub.getConsumers().forEach(consumer -> consumerCheckFutures.add(consumer.checkPermissionsAsync()));
subscriptionCheckFutures.add(FutureUtil.waitForAll(consumerCheckFutures).thenRun(() -> {
Dispatcher dispatcher = sub.getDispatcher();
if (dispatcher != null) {
dispatcher.updateRateLimiter();
}
}));
Dispatcher dispatcher = sub.getDispatcher();
if (dispatcher != null) {
dispatcher.updateRateLimiter();
}
});
return FutureUtil.waitForAll(subscriptionCheckFutures);
}

protected CompletableFuture<Void> initTopicPolicy() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.ConfigHelper;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.namespace.NamespaceService;
Expand All @@ -50,6 +52,7 @@
import org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.persistent.SubscribeRateLimiter;
import org.apache.pulsar.client.admin.PulsarAdminException;
Expand Down Expand Up @@ -83,8 +86,11 @@
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.awaitility.reflect.WhiteboxImpl;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
Expand Down Expand Up @@ -3157,4 +3163,49 @@ public void testProduceChangesWithEncryptionRequired() throws Exception {
});
}

@Test
public void testUpdateRetentionWithPartialFailure() throws Exception {
String tpName = BrokerTestUtil.newUniqueName("persistent://" + myNamespace + "/tp");
admin.topics().createNonPartitionedTopic(tpName);

// Load topic up.
admin.topics().getInternalStats(tpName);

// Inject an error that makes dispatch rate update fail.
PersistentTopic persistentTopic =
(PersistentTopic) pulsar.getBrokerService().getTopic(tpName, false).join().get();
ConcurrentOpenHashMap<String, PersistentSubscription> subscriptions =
WhiteboxImpl.getInternalState(persistentTopic, "subscriptions");
PersistentSubscription mockedSubscription = Mockito.mock(PersistentSubscription.class);
Mockito.when(mockedSubscription.getDispatcher()).thenThrow(new RuntimeException("Mocked error: getDispatcher"));
subscriptions.put("mockedSubscription", mockedSubscription);

// Update namespace-level retention policies.
RetentionPolicies retentionPolicies1 = new RetentionPolicies(1, 1);
admin.namespaces().setRetentionAsync(myNamespace, retentionPolicies1);

// Verify: update retention will be success even if other component update throws exception.
Awaitility.await().untilAsserted(() -> {
ManagedLedgerImpl ML = (ManagedLedgerImpl) persistentTopic.getManagedLedger();
assertEquals(ML.getConfig().getRetentionSizeInMB(), 1);
assertEquals(ML.getConfig().getRetentionTimeMillis(), 1 * 60 * 1000);
});

// Update topic-level retention policies.
RetentionPolicies retentionPolicies2 = new RetentionPolicies(2, 2);
admin.topics().setRetentionAsync(tpName, retentionPolicies2);

// Verify: update retention will be success even if other component update throws exception.
Awaitility.await().untilAsserted(() -> {
ManagedLedgerImpl ML = (ManagedLedgerImpl) persistentTopic.getManagedLedger();
assertEquals(ML.getConfig().getRetentionSizeInMB(), 2);
assertEquals(ML.getConfig().getRetentionTimeMillis(), 2 * 60 * 1000);
});

// Cleanup.
subscriptions.clear();
admin.namespaces().removeRetention(myNamespace);
admin.topics().delete(tpName, false);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.common.util;

import com.google.common.util.concurrent.MoreExecutors;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -55,6 +56,11 @@ public static CompletableFuture<Void> waitForAll(Collection<? extends Completabl
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
}

public static CompletableFuture<Void> runWithCurrentThread(Runnable runnable) {
return CompletableFuture.runAsync(
() -> runnable.run(), MoreExecutors.directExecutor());
}

public static <T> CompletableFuture<List<T>> waitForAll(Stream<CompletableFuture<List<T>>> futures) {
return futures.reduce(CompletableFuture.completedFuture(new ArrayList<>()),
(pre, curr) -> pre.thenCompose(preV -> curr.thenApply(currV -> {
Expand Down

0 comments on commit ed59967

Please sign in to comment.