diff --git a/pip/pip-274.md b/pip/pip-274.md new file mode 100644 index 0000000000000..855c75f23f55f --- /dev/null +++ b/pip/pip-274.md @@ -0,0 +1,126 @@ +# Background knowledge + +Apache Pulsar is a distributed messaging system that supports multiple messaging protocols and storage methods. Among them, Pulsar Topic Compaction is a mechanism to clean up duplicate messages in topics to reduce storage space and improve system efficiency. +More topic compaction details can be found in [Pulsar Topic Compaction](https://pulsar.apache.org/docs/en/concepts-topic-compaction/). + +# Motivation + +Currently, the implementation of Pulsar Topic Compaction is fixed and does not support custom strategy, which limits users from using more Compactor policies in their applications. + + +For example, we need to parse the Kafka format then compact message in Kop, but the current implementation of Pulsar topic compaction does not support this feature. +Another the topic compaction logic implemented in `TwoPhaseCompactor` only compacts messages to the last one, but sometimes we need to keep the first valid message e.g [`StrategicTwoPhaseCompactor`](https://github.com/coderzc/pulsar/blob/0e9935c493060b13b322a84c5418146423992369/pulsar-broker/src/main/java/org/apache/pulsar/compaction/StrategicTwoPhaseCompactor.java). + +So we need to make the topic compactor pluggable to support more compaction strategy. + +# Goals + +## In Scope + + + +Make the compactor pluggable. + +## Out of Scope + + + + +# High Level Design + + + +Make the topic compactor pluggable, users can customize the compactor implementation according to their own special scenarios. + + +# Detailed Design + +## Design & Implementation Details + + +* Define a standard Compactor interface that specifies the methods and properties that the Compactor implementation needs to implement. This interface should include methods for Compactor initialization, Compactor execution, and getting Compactor stats. +```java +public interface Compactor { + + void initialize(ServiceConfiguration conf, + PulsarClient pulsar, + BookKeeper bk, + ScheduledExecutorService scheduler); + + CompletableFuture compact(String topic); + + CompactorMXBean getStats(); +} +``` + +* Rename `org.apache.pulsar.compaction.Compactor` to `org.apache.pulsar.compaction.AbstractCompactor` and make it implement `Compactor` interface. + +* Load custom compactor based on configuration in `org.apache.pulsar.broker.PulsarService.newCompactor` and `CompactorTool`. + +## Public-facing Changes + + + + +### Configuration + +broker.conf +``` +compactorClassName=org.apache.pulsar.compaction.TwoPhaseCompactor +``` + +# Backward & Forward Compatability + +## Revert + + + + +## Upgrade + + + +# Alternatives + + + +# General Notes + +# Links + + +* Mailing List discussion thread: https://lists.apache.org/thread/svyodx3q1nrdd6fyrsfv3xmo0lgn6j2j +* Mailing List voting thread: