-
Notifications
You must be signed in to change notification settings - Fork 3.7k
[fix][broker] fix the global topic policy config override the local topic policy config #21212
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
@chenhongSZ Please add the following content to your PR description and select a checkbox: |
mattisonchao
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please add a test to verify the changes and explain how you can avoid the compactor to override local policy with global policy(same event key) :)
|
@mattisonchao This pr does not fix the problem you mentioned(same event key), do I need to submit another pr to fix it or fix it together in this pr? |
|
Lines 387 to 397 in 66271e3
|
|
@Technoboy-
|
ah, good catch. please raise a new pr to fixing it. |
|
there are three issues here. right?
I fixed the first two in this pr, the third issue I'll fix with another pr. @Technoboy- @aloyszhang PTAL |
| if (hasMore) { | ||
| reader.readNextAsync().thenAccept(msg -> { | ||
| refreshTopicPoliciesCache(msg); | ||
| notifyListener(msg); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's best to wait for initPolicesCache to complete before notifying the listeners.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make sense to me, I have fixed it. PTAL
I have fixed this issue and added a test case in my latest commit, PTAL, thanks |
# Conflicts: # pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
|
/pulsarbot run-failure-checks |
| return CompletableFuture.completedFuture(null).thenRunAsync(() -> { | ||
| TopicPoliciesService topicPoliciesService = brokerService.getPulsar().getTopicPoliciesService(); | ||
| onUpdate(topicPoliciesService.getLocalTopicPoliciesIfExists(TopicName.getPartitionedTopicName(topic))); | ||
| onUpdate(topicPoliciesService.getGlobalTopicPoliciesIfExists(TopicName.getPartitionedTopicName(topic))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change fixed the bug that the global policies were discarded when loading a topic up. We also need to add a test to cover this fix. The test flow should be like this:
- create a topic and unload it
- set global policies
- start a consumer/producer to load the topic up
- ensure the global policies affect the topic
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for the reminder, I've added a test for this change in the latest commit, PTAL
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wrote a test for this case, could you add it into current PR?
@Test
public void testGetTopicPoliciesWhenDeleteTopicPolicy2() throws Exception {
final String tpName = BrokerTestUtil.newUniqueName("persistent://" + myNamespace + "/tp");
final String subscriptionName = "s1";
final int dispatchThrottlingRateInMsg = 1000;
admin.topics().createNonPartitionedTopic(tpName);
PersistentTopic persistentTopic1 =
(PersistentTopic) pulsar.getBrokerService().getTopic(tpName, false).join().get();
admin.topics().createSubscription(tpName, subscriptionName, MessageId.earliest);
Producer producer = pulsarClient.newProducer().topic(tpName).create();
// Set global policy.
DispatchRate dispatchRate = new DispatchRateImpl(dispatchThrottlingRateInMsg, 1, false, 1);
admin.topicPolicies(true).setDispatchRate(tpName, dispatchRate);
// Assert policy was affected.
Awaitility.await().ignoreExceptions().untilAsserted(() -> {
HierarchyTopicPolicies policies = persistentTopic1.getHierarchyTopicPolicies();
assertNotNull(policies);
assertEquals(policies.getDispatchRate().getTopicValue().getDispatchThrottlingRateInMsg(),
dispatchThrottlingRateInMsg);
DispatchRate dispatchRateAp = admin.topicPolicies(true).getDispatchRate(tpName, true);
assertEquals(dispatchRateAp.getDispatchThrottlingRateInMsg(), dispatchThrottlingRateInMsg);
});
// Unload topic and check again.
admin.topics().unload(tpName);
// Wait topic load complete.
producer.send("1".getBytes(StandardCharsets.UTF_8));
PersistentTopic persistentTopic2 =
(PersistentTopic) pulsar.getBrokerService().getTopic(tpName, false).join().get();
// Assert policy was affected.
Awaitility.await().ignoreExceptions().untilAsserted(() -> {
HierarchyTopicPolicies policies = persistentTopic2.getHierarchyTopicPolicies();
assertNotNull(policies);
assertEquals(policies.getDispatchRate().getTopicValue().getDispatchThrottlingRateInMsg(),
dispatchThrottlingRateInMsg);
DispatchRate dispatchRateAp = admin.topicPolicies(true).getDispatchRate(tpName, true);
assertEquals(dispatchRateAp.getDispatchThrottlingRateInMsg(), dispatchThrottlingRateInMsg);
});
// cleanup.
producer.close();
admin.topics().delete(tpName, false);
}There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
| } | ||
|
|
||
| @Test | ||
| public void testInitPolicesCacheAndNotifyListenersAfterCompaction() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A new channel to trace the comment #21212 (review)
@mattisonchao
Could you please add a test to verify the changes and explain how you can avoid the compactor to override local policy with global policy(same event key) :)
Agree with @mattisonchao , the test flow might be like this:
- set local policies
- set global policies
- wait for the compaction task to finish
- read the
change_eventtopic to verify there are two events for this topic. Or find a way to start a newSystemTopicBasedTopicPoliciesServiceand verify the two caches (policiesCacheandglobalPoliciesCache) each has one item for this topic
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test case(testInitPolicesCacheAndNotifyListenersAfterCompaction) is exactly what you described, please correct me if I have misunderstood.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do not think the test you provided can cover the case. Could you try to run the test below?
private void triggerAndWaitNewTopicCompaction(String topicName) throws Exception {
PersistentTopic tp =
(PersistentTopic) pulsar.getBrokerService().getTopic(topicName, false).join().get();
// Wait for the old task finish.
Awaitility.await().untilAsserted(() -> {
CompletableFuture<Long> compactionTask = WhiteboxImpl.getInternalState(tp, "currentCompaction");
assertTrue(compactionTask == null || compactionTask.isDone());
});
// Trigger a new task.
tp.triggerCompaction();
// Wait for the new task finish.
Awaitility.await().untilAsserted(() -> {
CompletableFuture<Long> compactionTask = WhiteboxImpl.getInternalState(tp, "currentCompaction");
assertTrue(compactionTask == null || compactionTask.isDone());
});
}
/***
* It is not a thread safety method, something will go to a wrong pointer if there is a task is trying to load a
* topic policies.
*/
private void clearTopicPoliciesCache() {
TopicPoliciesService topicPoliciesService = pulsar.getTopicPoliciesService();
if (topicPoliciesService instanceof TopicPoliciesService.TopicPoliciesServiceDisabled) {
return;
}
assertTrue(topicPoliciesService instanceof SystemTopicBasedTopicPoliciesService);
Map<NamespaceName, CompletableFuture<Void>> policyCacheInitMap =
WhiteboxImpl.getInternalState(topicPoliciesService, "policyCacheInitMap");
for (CompletableFuture<Void> future : policyCacheInitMap.values()) {
future.join();
}
Map<TopicName, TopicPolicies> policiesCache =
WhiteboxImpl.getInternalState(topicPoliciesService, "policiesCache");
Map<TopicName, TopicPolicies> globalPoliciesCache =
WhiteboxImpl.getInternalState(topicPoliciesService, "globalPoliciesCache");
policyCacheInitMap.clear();
policiesCache.clear();
globalPoliciesCache.clear();
}
@Test
public void testLocalPolicyAffectAfterCompaction() throws Exception {
final String tpName = BrokerTestUtil.newUniqueName("persistent://" + myNamespace + "/tp");
final String tpNameChangeEvents = "persistent://" + myNamespace + "/" + NAMESPACE_EVENTS_LOCAL_NAME;
final String subscriptionName = "s1";
final int rateMsgLocal = 2000;
final int rateMsgGlobal = 1000;
admin.topics().createNonPartitionedTopic(tpName);
admin.topics().createSubscription(tpName, subscriptionName, MessageId.earliest);
// Set global policy and local policy.
DispatchRate dispatchRateLocal = new DispatchRateImpl(rateMsgLocal, 1, false, 1);
DispatchRate dispatchRateGlobal = new DispatchRateImpl(rateMsgGlobal, 1, false, 1);
admin.topicPolicies(false).setDispatchRate(tpName, dispatchRateLocal);
admin.topicPolicies(true).setDispatchRate(tpName, dispatchRateGlobal);
// Trigger __change_events compaction and clear topic policies cache.
triggerAndWaitNewTopicCompaction(tpNameChangeEvents);
clearTopicPoliciesCache();
// Reload the topic policies.
// Verify the local policies was affected.
Optional<TopicPolicies> topicPoliciesOptional =
pulsar.getTopicPoliciesService().getTopicPoliciesAsync(TopicName.get(tpName)).join();
assertTrue(topicPoliciesOptional.isPresent());
assertEquals(topicPoliciesOptional.get().getDispatchRate().getDispatchThrottlingRateInMsg(),
rateMsgLocal);
// cleanup.
admin.topics().delete(tpName, false);
}There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the test case(testInitPolicesCacheAndNotifyListenersAfterCompaction) can cover the fix about compaction, the test case will fail without this fix.
I also added the test cases you provided above, and it's passed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, there is a mistake in the test. Could you change the line like below? Thanks
private void triggerAndWaitNewTopicCompaction(String topicName) throws Exception {
PersistentTopic tp =
(PersistentTopic) pulsar.getBrokerService().getTopic(topicName, false).join().get();
// Wait for the old task finish.
Awaitility.await().untilAsserted(() -> {
CompletableFuture<Long> compactionTask = WhiteboxImpl.getInternalState(tp, "currentCompaction");
assertTrue(compactionTask == null || compactionTask.isDone());
});
// Trigger a new task.
tp.triggerCompaction();
// Wait for the new task finish.
Awaitility.await().untilAsserted(() -> {
CompletableFuture<Long> compactionTask = WhiteboxImpl.getInternalState(tp, "currentCompaction");
assertTrue(compactionTask == null || compactionTask.isDone());
});
}
@Test
public void testLocalPolicyAffectAfterCompaction() throws Exception {
final String tpName = BrokerTestUtil.newUniqueName("persistent://" + myNamespace + "/tp");
final String tpNameChangeEvents = "persistent://" + myNamespace + "/" + NAMESPACE_EVENTS_LOCAL_NAME;
final String subscriptionName = "s1";
final int rateMsgLocal = 2000;
final int rateMsgGlobal = 1000;
admin.topics().createNonPartitionedTopic(tpName);
admin.topics().createSubscription(tpName, subscriptionName, MessageId.earliest);
// Set global policy and local policy.
DispatchRate dispatchRateLocal = new DispatchRateImpl(rateMsgLocal, 1, false, 1);
DispatchRate dispatchRateGlobal = new DispatchRateImpl(rateMsgGlobal, 1, false, 1);
admin.topicPolicies(false).setDispatchRate(tpName, dispatchRateLocal);
admin.topicPolicies(true).setDispatchRate(tpName, dispatchRateGlobal);
// Trigger __change_events compaction.
triggerAndWaitNewTopicCompaction(tpNameChangeEvents);
// Create a new SystemTopicBasedTopicPoliciesService and verify the local policies was affected.
Optional<TopicPolicies> topicPoliciesOptional =
new SystemTopicBasedTopicPoliciesService(pulsar).getTopicPoliciesAsync(TopicName.get(tpName)).join();
assertTrue(topicPoliciesOptional.isPresent());
assertEquals(topicPoliciesOptional.get().getDispatchRate().getDispatchThrottlingRateInMsg(),
rateMsgLocal);
// cleanup.
admin.topics().delete(tpName, false);
}There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The global topic policy and the local topic policy use the same message key, and they will overwrite each other by topic compression.
The changes below solved this issue.
But this fix also changed the rule of key building, which makes it not forward-compatible. For example:
- The user updates local policies, and Pulsar sends a message with the key "persistent://public/default/topic_1"
- The user upgrades the Pulsar cluster to the newest version.
- The user updates local policies, and Pulsar sends a message with the key "persistent/public/default/topic_1/false"
- The user updates global policies, and Pulsar sends a message with the key "persistent/public/default/topic_1/true"
At this time, there should be three messages in the system topic, but the behavior will be correct even if it is not expected, but the behavior will be wrong after the steps below.
- Delete the topic
- Pulsar deletes the key "persistent/public/default/topic_1/false"
- Pulsar deletes the key "persistent/public/default/topic_1/true"
- Create a new topic with the same name as the old one.
Currently, the message whose key is "persistent://public/default/topic_1" is still there.
I think once the new key is generated, the data of the old key should be deleted. And could you also add a test for this case?
By the way, the original rule is persistent://xxx, do not change it to persistent/xxx.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please negotiate this special key name. Do not make it casually. :)
Plus, please care about the compatibility of this change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will fix the compatibility problem and add a test later.
I didn't change the rules of the key building, just added a property, because we need to get the topic name from the key
original: persistent://tenant/namespace/topic/
new:persistent://tenant/namespace/topic/isGlobal
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, there is a mistake in the test. Could you change the line like below? Thanks
private void triggerAndWaitNewTopicCompaction(String topicName) throws Exception { PersistentTopic tp = (PersistentTopic) pulsar.getBrokerService().getTopic(topicName, false).join().get(); // Wait for the old task finish. Awaitility.await().untilAsserted(() -> { CompletableFuture<Long> compactionTask = WhiteboxImpl.getInternalState(tp, "currentCompaction"); assertTrue(compactionTask == null || compactionTask.isDone()); }); // Trigger a new task. tp.triggerCompaction(); // Wait for the new task finish. Awaitility.await().untilAsserted(() -> { CompletableFuture<Long> compactionTask = WhiteboxImpl.getInternalState(tp, "currentCompaction"); assertTrue(compactionTask == null || compactionTask.isDone()); }); } @Test public void testLocalPolicyAffectAfterCompaction() throws Exception { final String tpName = BrokerTestUtil.newUniqueName("persistent://" + myNamespace + "/tp"); final String tpNameChangeEvents = "persistent://" + myNamespace + "/" + NAMESPACE_EVENTS_LOCAL_NAME; final String subscriptionName = "s1"; final int rateMsgLocal = 2000; final int rateMsgGlobal = 1000; admin.topics().createNonPartitionedTopic(tpName); admin.topics().createSubscription(tpName, subscriptionName, MessageId.earliest); // Set global policy and local policy. DispatchRate dispatchRateLocal = new DispatchRateImpl(rateMsgLocal, 1, false, 1); DispatchRate dispatchRateGlobal = new DispatchRateImpl(rateMsgGlobal, 1, false, 1); admin.topicPolicies(false).setDispatchRate(tpName, dispatchRateLocal); admin.topicPolicies(true).setDispatchRate(tpName, dispatchRateGlobal); // Trigger __change_events compaction. triggerAndWaitNewTopicCompaction(tpNameChangeEvents); // Create a new SystemTopicBasedTopicPoliciesService and verify the local policies was affected. Optional<TopicPolicies> topicPoliciesOptional = new SystemTopicBasedTopicPoliciesService(pulsar).getTopicPoliciesAsync(TopicName.get(tpName)).join(); assertTrue(topicPoliciesOptional.isPresent()); assertEquals(topicPoliciesOptional.get().getDispatchRate().getDispatchThrottlingRateInMsg(), rateMsgLocal); // cleanup. admin.topics().delete(tpName, false); }
fixed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've fixed the compatibility problem and added a test to cover this change. PTAL
|
Also, we need to clean the global topic policy in Lines 443 to 461 in 09a1720
|
fixed and added a test to cover this change, PTAL |
| } | ||
|
|
||
| @Test | ||
| public void testGetTopicPoliciesWhenDeleteTopicPolicy2() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I forgot to rename this method. Please help to rename it to "testGlobalPolicyAfterUnloadTopic"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
| } | ||
|
|
||
| @Test | ||
| public void testInitPolicesCacheAndNotifyListenersAfterCompaction() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do not think the test you provided can cover the case. Could you try to run the test below?
private void triggerAndWaitNewTopicCompaction(String topicName) throws Exception {
PersistentTopic tp =
(PersistentTopic) pulsar.getBrokerService().getTopic(topicName, false).join().get();
// Wait for the old task finish.
Awaitility.await().untilAsserted(() -> {
CompletableFuture<Long> compactionTask = WhiteboxImpl.getInternalState(tp, "currentCompaction");
assertTrue(compactionTask == null || compactionTask.isDone());
});
// Trigger a new task.
tp.triggerCompaction();
// Wait for the new task finish.
Awaitility.await().untilAsserted(() -> {
CompletableFuture<Long> compactionTask = WhiteboxImpl.getInternalState(tp, "currentCompaction");
assertTrue(compactionTask == null || compactionTask.isDone());
});
}
/***
* It is not a thread safety method, something will go to a wrong pointer if there is a task is trying to load a
* topic policies.
*/
private void clearTopicPoliciesCache() {
TopicPoliciesService topicPoliciesService = pulsar.getTopicPoliciesService();
if (topicPoliciesService instanceof TopicPoliciesService.TopicPoliciesServiceDisabled) {
return;
}
assertTrue(topicPoliciesService instanceof SystemTopicBasedTopicPoliciesService);
Map<NamespaceName, CompletableFuture<Void>> policyCacheInitMap =
WhiteboxImpl.getInternalState(topicPoliciesService, "policyCacheInitMap");
for (CompletableFuture<Void> future : policyCacheInitMap.values()) {
future.join();
}
Map<TopicName, TopicPolicies> policiesCache =
WhiteboxImpl.getInternalState(topicPoliciesService, "policiesCache");
Map<TopicName, TopicPolicies> globalPoliciesCache =
WhiteboxImpl.getInternalState(topicPoliciesService, "globalPoliciesCache");
policyCacheInitMap.clear();
policiesCache.clear();
globalPoliciesCache.clear();
}
@Test
public void testLocalPolicyAffectAfterCompaction() throws Exception {
final String tpName = BrokerTestUtil.newUniqueName("persistent://" + myNamespace + "/tp");
final String tpNameChangeEvents = "persistent://" + myNamespace + "/" + NAMESPACE_EVENTS_LOCAL_NAME;
final String subscriptionName = "s1";
final int rateMsgLocal = 2000;
final int rateMsgGlobal = 1000;
admin.topics().createNonPartitionedTopic(tpName);
admin.topics().createSubscription(tpName, subscriptionName, MessageId.earliest);
// Set global policy and local policy.
DispatchRate dispatchRateLocal = new DispatchRateImpl(rateMsgLocal, 1, false, 1);
DispatchRate dispatchRateGlobal = new DispatchRateImpl(rateMsgGlobal, 1, false, 1);
admin.topicPolicies(false).setDispatchRate(tpName, dispatchRateLocal);
admin.topicPolicies(true).setDispatchRate(tpName, dispatchRateGlobal);
// Trigger __change_events compaction and clear topic policies cache.
triggerAndWaitNewTopicCompaction(tpNameChangeEvents);
clearTopicPoliciesCache();
// Reload the topic policies.
// Verify the local policies was affected.
Optional<TopicPolicies> topicPoliciesOptional =
pulsar.getTopicPoliciesService().getTopicPoliciesAsync(TopicName.get(tpName)).join();
assertTrue(topicPoliciesOptional.isPresent());
assertEquals(topicPoliciesOptional.get().getDispatchRate().getDispatchThrottlingRateInMsg(),
rateMsgLocal);
// cleanup.
admin.topics().delete(tpName, false);
}There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm sorry, but I don't think this change suits me.
There are some reasons are as follows:
- Apache Pulsar has the
broker/namespace/topiclevel policy. The global topic policy is just a particular topic policy based on the geo-replication. We should refrain from introducing this concept into the topic entity. - We should encapsulate this concept into topic policy service, which can decide to give the topic a global or local policy because the topic doesn't care if the policy is global or not.
Therefore, I would like to let [SystemTopicBasedTopicPoliciesService.java](https://github.com/apache/pulsar/pull/21212/files#diff-9d2948d863c111e4be6d508a1c573667a1326b98c4314e917ba9e344bb61dc27) decide to update global or local policy to the topic to avoid isGlobal param everywhere.
WDYT?
/cc @codelipenghui @Technoboy- @poorbarcode
|
Plus, we didn't support the
|
I agree, It is indeed we should encapsulate this concept into topic policy service. I will make some changes as below:
|
those changes have been done in my latest commit, PTAL, thanks @mattisonchao |
|
@chenhongSZ Thanks for your great work! I will review it today. |
|
Sorry for some other tasks. I have to postpone this review. |
| } | ||
|
|
||
| TopicPolicies topicPolicies = new TopicPolicies(); | ||
| BeanUtil.copyProperties(globalTopicPolicies, topicPolicies, true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, I think the original implementation is better

Fixes #21202
Motivation
fix the local and global topic policies override each other.
Modifications
org.apache.pulsar.common.policies.data.PolicyHierarchyValue.topicValueintoglobalTopicValueand variablelocalTopicValueinitPolicesCache.Documentation
docdoc-requireddoc-not-neededdoc-completeMatching PR in forked repository
PR in forked repository: