Skip to content

PIP 92: Topic policy across multiple clusters

feynmanlin edited this page Aug 12, 2021 · 1 revision
  • Status: Proposal
  • Authors: Penghui Li、Chen Hang、 Lin Lin
  • Pull Request:
  • Mailing List discussion:
  • Release:

Motivation

When setting the topic policy for a geo-replicated cluster, some policies want to affect the whole geo-replicated cluster but some only want to affect the local cluster. So the proposal is to support global topic policy and local topic policy.

Approach

Currently, we are using the TopicPolicies construction to store the topic policy for a topic. An easy way to achieve different topic policies for multiple clusters is to add a global flag to TopicPolicies .Replicator will replicate policies with a global flag to other clusters

public class TopicPolicies {
     bBoolean isGlobal = false
}

We only cache one local Topic Policies in the memory before. Now it will become two, one is Local and the other is Global.

After adding the global topic policy, the topic applied priority is:

Local cluster topic policy Global topic policy Namespace policy Broker default configuration

When setting a global topic policy, we can use the --global option, it should be:

bin/pulsar-admin topics set-retention -s 1G -t 1d --global my-topic

If the --global option is not added, the behavior is consistent with before, and only updates the local policies. Topic policies are stored in System Topic, we can directly use Replicator to replicate data to other Clusters. We need to add a new API to the Replicator interface, which can set the Filter Function. Then add a Function to filter out the data with isGlobal = false

Delete a cluster? Deleting a cluster will now delete local topic policies and will not affect other clusters, because only global policies will be replicated to other clusters Changes Every topic policy API adds the --global option. Including broker REST API, Admin SDK, CMD.

Add API public void setFilterFunction(Function<Message, Boolean>). If Function returns false, then filter out the input message.

Compatibility

The solution does not introduce any compatibility issues, isGlobal in TopicPolicies is false by default in existing Policies.

Test Plan

Existing TopicPolicies will not be affected Only replicate Global Topic Policies, FilterFunction can run as expected Priority of Topic Policies matches our setting

Clone this wiki locally