-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
PIP 92: part-2 Topic policy across multiple clusters #12517
PIP 92: part-2 Topic policy across multiple clusters #12517
Conversation
# Conflicts: # pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java # pulsar-common/src/main/java/org/apache/pulsar/common/events/EventsTopicNames.java
# Conflicts: # pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java # pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@315157973:Thanks for your contribution. For this PR, do we need to update docs? |
if (EventsTopicNames.isTopicPoliciesSystemTopic(topic)) { | ||
return super.checkReplication(); | ||
} | ||
return CompletableFuture.completedFuture(null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SystemTopic was previously used for Topic Policies and transaction. Only Topic Policies need to be replicate remotely, so I created a new class TopicPoliciesSystemTopic.
return producer.newMessage().key(getEventKey(event)).value(event).send(); | ||
return producer.newMessage() | ||
.key(getEventKey(event)) | ||
.properties(event.getProperties() == null ? Collections.emptyMap() : event.getProperties()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isGlobal is field of the TopicPolicies object. In order to avoid deserializing the entire message, I put the isGlobal tag in Properties so that only the header of the Entry need to be deserialized
661ab90
to
5786ed6
Compare
93a0ff1
to
0e70b0d
Compare
# Conflicts: # pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java
/pulsarbot run-failure-checks |
e1d6829
to
e4ec371
Compare
cebb328
to
8e1bee1
Compare
# Conflicts: # pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java # pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
# Conflicts: # pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
# Conflicts: # pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java # pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
@codelipenghui PTAL, thanks, I have resolved the conflicts so many times....Is it ready to merge ? |
* The incoming parameter is the original message, the output is whether the message should be filtered. | ||
* @param filterFunction | ||
*/ | ||
default void setFilterFunction(Function<MessageImpl, Boolean> filterFunction) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can leverage the selective replication
Line 319 in a2b7cae
if (msg.hasReplicateTo() && !msg.getReplicateTo().contains(remoteCluster)) { |
...ker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
Outdated
Show resolved
Hide resolved
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
Show resolved
Hide resolved
pulsar-common/src/main/java/org/apache/pulsar/common/events/PulsarEvent.java
Outdated
Show resolved
Hide resolved
ping @eolivelli Could you please help take a final look? There are lots of changes since your last review. |
Hello, is this PR ready to merge or somewhere to change? |
### Motivation When setting the topic policy for a geo-replicated cluster, some policies want to affect the whole geo-replicated cluster but some only want to affect the local cluster. So the proposal is to support global topic policy and local topic policy. ### Modifications 1. Topic policies support cross-cluster replication. For local topic policies, set the `replicateTo` property of the message to avoid being replicated to the remote 2. Retention supports setting global parameters 3. Add global topic policies for `SystemTopicBasedTopicPoliciesService` ### Verifying this change Verify the addition, deletion, modification, and inspection of the interface is normal Verify whether the policies can be replicate normally in the multi-Broker scenario
### Motivation PIP 92 has introduced topic policies across clusters. But after #12517, if the policy is not global, it set the replicate cluster to an empty set. ``` PulsarEvent.PulsarEventBuilder builder = PulsarEvent.builder(); if (policies == null || !policies.isGlobalPolicies()) { // we don't need to replicate local policies to remote cluster, so set `replicateTo` to empty. builder.replicateTo(new HashSet<>()); } ``` It should set the `replicateTo` with the local cluster, not an empty set. Otherwise, it will cause the system event to be replicated. Details are here : https://github.com/apache/pulsar/blob/d4c2e613d305f8f785b5ef357b7cbe2ccc271043/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java#L319-L328
### Motivation PIP 92 has introduced topic policies across clusters. But after #12517, if the policy is not global, it set the replicate cluster to an empty set. ``` PulsarEvent.PulsarEventBuilder builder = PulsarEvent.builder(); if (policies == null || !policies.isGlobalPolicies()) { // we don't need to replicate local policies to remote cluster, so set `replicateTo` to empty. builder.replicateTo(new HashSet<>()); } ``` It should set the `replicateTo` with the local cluster, not an empty set. Otherwise, it will cause the system event to be replicated. Details are here : https://github.com/apache/pulsar/blob/d4c2e613d305f8f785b5ef357b7cbe2ccc271043/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java#L319-L328 (cherry picked from commit e470de5)
### Motivation PIP 92 has introduced topic policies across clusters. But after apache#12517, if the policy is not global, it set the replicate cluster to an empty set. ``` PulsarEvent.PulsarEventBuilder builder = PulsarEvent.builder(); if (policies == null || !policies.isGlobalPolicies()) { // we don't need to replicate local policies to remote cluster, so set `replicateTo` to empty. builder.replicateTo(new HashSet<>()); } ``` It should set the `replicateTo` with the local cluster, not an empty set. Otherwise, it will cause the system event to be replicated. Details are here : https://github.com/apache/pulsar/blob/d4c2e613d305f8f785b5ef357b7cbe2ccc271043/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java#L319-L328
Motivation
When setting the topic policy for a geo-replicated cluster, some policies want to affect the whole geo-replicated cluster but some only want to affect the local cluster. So the proposal is to support global topic policy and local topic policy.
Modifications
replicateTo
property of the message to avoid being replicated to the remoteSystemTopicBasedTopicPoliciesService
Verifying this change
Verify the addition, deletion, modification, and inspection of the interface is normal
Verify whether the policies can be replicate normally in the multi-Broker scenario
Documentation