Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[pip][design] PIP-274: Support pluggable topic compactor #20493

Closed
wants to merge 2 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 126 additions & 0 deletions pip/pip-274.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
# Background knowledge

Apache Pulsar is a distributed messaging system that supports multiple messaging protocols and storage methods. Among them, Pulsar Topic Compaction is a mechanism to clean up duplicate messages in topics to reduce storage space and improve system efficiency.
More topic compaction details can be found in [Pulsar Topic Compaction](https://pulsar.apache.org/docs/en/concepts-topic-compaction/).

# Motivation

Currently, the implementation of Pulsar Topic Compaction is fixed and does not support custom strategy, which limits users from using more Compactor policies in their applications.


For example, we need to parse the Kafka format then compact message in Kop, but the current implementation of Pulsar topic compaction does not support this feature.
Another the topic compaction logic implemented in `TwoPhaseCompactor` only compacts messages to the last one, but sometimes we need to keep the first valid message e.g [`StrategicTwoPhaseCompactor`](https://github.com/coderzc/pulsar/blob/0e9935c493060b13b322a84c5418146423992369/pulsar-broker/src/main/java/org/apache/pulsar/compaction/StrategicTwoPhaseCompactor.java).

So we need to make the topic compactor pluggable to support more compaction strategy.

# Goals

## In Scope

<!--
What this PIP intend to achieve once It's integrated into Pulsar.
Why does it benefit Pulsar.
-->

Make the compactor pluggable.

## Out of Scope

<!--
Describe what you have decided to keep out of scope, perhaps left for a different PIP/s.
-->


# High Level Design

<!--
Describe the design of your solution in *high level*.
Describe the solution end to end, from a birds-eye view.
Don't go into implementation details in this section.

I should be able to finish reading from beginning of the PIP to here (including) and understand the feature and
how you intend to solve it, end to end.

DON'T
* Avoid code snippets, unless it's essential to explain your intent.
-->

Make the topic compactor pluggable, users can customize the compactor implementation according to their own special scenarios.


# Detailed Design

## Design & Implementation Details

<!--
This is the section where you dive into the details. It can be:
* Concrete class names and their roles and responsibility, including methods.
* Code snippets of existing code.
* Interface names and its methods.
* ...
-->
* Define a standard Compactor interface that specifies the methods and properties that the Compactor implementation needs to implement. This interface should include methods for Compactor initialization, Compactor execution, and getting Compactor stats.
```java
public interface Compactor {

void initialize(ServiceConfiguration conf,
PulsarClient pulsar,
BookKeeper bk,
ScheduledExecutorService scheduler);
Comment on lines +68 to +69
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not a good abstraction to have BookKeeper (or even PulsarClient) involved. These parameters are just copied from the existing Compactor's constructor, which is not abstracted well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the motivation is to handle messages whose format is other than pulsar, we might need to pass some functions like:

interface CompactEntryProcessor {

    // Just an example method
    void handle(Entry entry);
}

and pass the processor to the compactor.

Copy link
Member Author

@coderzc coderzc Jun 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@BewareMyPower Do you mean to abstract the CompactEntryProcessor and add a compactEntryProcessorClassName configuration?
But we still need to construct this CompactEntryProcessor by BookKeeper.

How about abstraction the CompactStorage like:

interface CompactionHandle {

    CompletableFuture<Void> asyncAddEntry(ByteBuf entry);

    CompletableFuture<List<ByteBuf>> asyncReadEntries(long from, long end)

    CompletableFuture<Long> getId();

    CompletableFuture<Void> flush();

    CompletableFuture<Void> close();
}

public interface CompactStorage {

    CompletableFuture<CompactionHandle> createCompactionStorage();

    void deleteCompactionStorage(CompactionHandle compactionHandle);
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. You understood wrong. Adding these two interfaces is complicated for users to implement. I mean, we only need ways to handle entries of different format, so users don't need to implement the same logic of reading entries or writing entries again.

For example, we only need to replace

if (RawBatchConverter.isReadableBatch(m)) {
    // ...
} else {
    // ...
}

with

processor.handle(m, latestForKey);

Where the processor is an interface:

interface CompactMessageProcessor {

    /**
     * Process the message to update the latest key => value map.
     * 
     * @param message the raw message
     * @param latestForKey the map that maps the key to the latest value
     */
    void handle(RawMessage message, Map<String, MessageId> latestForKey);
}

Then, we don't need to rewrite the same logic again. We only need to change the logic about how to process a RawMessage.


CompletableFuture<Long> compact(String topic);

CompactorMXBean getStats();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think CompactorMXBean is a very specific abstraction for metrics. It doesn't allow a compactor to add it's own metrics and will make future update to metrics hard.

First, some metrics are going to be common for any compactor used:

  • CompactionSucceedCount - how many compactions have been successful
  • CompactionFailedCount - ... failed
  • CompactionDurationInMillis - a counter counting how much was spent in compactions so far

Those metrics should be maintained by the service, which calls the compactor interface of compact(topic. It only needs to know if the compaction was a success or not based on the return value.

I suggest using two interfaces which will be given in initialize():

interface CompactionMetricsRecorder {
      TopicCompactionMetricsRecorder provideTopicCompactionMetricsRecorder(String topic)
}

interface TopicCompactionMetricsRecorder {
      recordMessageRemoved()
      recordMessagesWritten(numOfMessages, messagesSizeInBytes, duration, durationUnit)
      recordMessagesRead(numOfMessages, messagesSizeInBytes, duration, durationUnit)
     ...
}

There are some metrics I still need to figure out where to fit since I need to help understand something. The metrics are:

  • compactedEntriesCount - how many entries were written in the last compaction
  • compactedEntriesSize - what's the total size of entries written in the last compaction

Once the compact() has ended, how does the compactor hand over the details on the output - may be the ledger ID, new compaction horizon, etc? I don't see that in the interface.
The reason I'm asking is that maybe other compactors would do incremental compaction, so it changes what to expect from it.

Now, what's left is a way to record custom metrics specific to your compactor.
Need to think about it

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems using CompactionMetricsRecorder instead of CompactorMXBean still unresolved other compactor can't add it's own metrics. Can you explain the benefits of doing this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For CompactionSucceedCount/CompactionFailedCount /CompactionDurationInMillis , I think they already contain in AbstractCompactor other compactors only need to extend from it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extending is the worst. We should keep interfacing only.

The interface creates a clean design that makes it easier to change in the future to some other mechanism (say OTel...). Somebody external to Pulsar needs to be able to depend as little as possible on Pulsar. CompactorMXBean is the opposite of that. First, nobody is using MXBean, and it has a different meaning (it was named like this to be integrated into JMX back in 2012). Second, we need to separate the metrics as I wrote.

Today adding metrics that are high cardinality (contain topic label) is done by writing them directly to SimpleTextOutputStream. You usually add a method for that, and you call it from the PrometheusMetricsGenerator.

Thinking out loud - maybe we can add a method to CompactionMetricsRecorder

   void  writeCustomMetrics(SimpleTextOutputStream)

WDYT?

}
```

* Rename `org.apache.pulsar.compaction.Compactor` to `org.apache.pulsar.compaction.AbstractCompactor` and make it implement `Compactor` interface.

* Load custom compactor based on configuration in `org.apache.pulsar.broker.PulsarService.newCompactor` and `CompactorTool`.

## Public-facing Changes

<!--
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's ok to remove all comments if you're done writing

Describe the additions you plan to make for each public facing component.
Remove the sections you are not changing.
Clearly mark any changes which are BREAKING backward compatability.
-->


### Configuration
Copy link
Contributor

@poorbarcode poorbarcode Jun 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

broker.conf

compactorClassName=org.apache.pulsar.compaction.TwoPhaseCompactor

If I want namespace A to apply policy TwoPhaseCompactor, but namespace B applies policy KafkaCompactor, how can I configure it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do not support the namespace level configuration. This is a global configuration.
By the way, since a topic may have Kafka Format and Pulsar Format at the same time, you need to be compatible with Pulsar Format in KafkaCompactor.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I have a Pulsar cluster and there are some topics on this cluster, and we need to migrate Kafka to Pulsar now, how can I do?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since a topic may have Kafka Format and Pulsar Format at the same time, you need to be compatible with Pulsar Format in KafkaCompactor.

Then, you copied the logic from the existing compactor? :(
IMO, I wish we could support ns or topic-level configuration to avoid someday we will implement a powerful compactor. (one compactor supports all of protocol)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I have a Pulsar cluster and there are some topics on this cluster, and we need to migrate Kafka to Pulsar now, how can I do?

For KafkaCompactor, you need to deal with Kafka Format and Pulsar Format, Kop can send Pulsar Format or Kafka Format

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you change the interface to support all functions of TwoPhaseCompactor?

Copy link
Member Author

@coderzc coderzc Jun 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you change the interface to support all functions of TwoPhaseCompactor?

Support all functions of TwoPhaseCompactor is not a required option, this is just a requirement in the case of Kop

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh

Copy link
Contributor

@poorbarcode poorbarcode Jun 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For KafkaCompactor, you need to deal with Kafka Format and Pulsar Format, Kop can send Pulsar Format or Kafka Format
Support all functions of TwoPhaseCompactor is not a required option, this is just a requirement in the case of Kop

I have three questions:

  • The format rules of "Pulsar Format" and existing pulsar are exactly the same, right?
  • KafkaCompactor can handle both Kafka Format and Pulsar Format, right?[2]
  • If not[2], when users want to migrate Kafka to an existing Pulsar cluster, how can they do it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The format rules of "Pulsar Format" and existing pulsar are exactly the same, right?

Yes

KafkaCompactor can handle both Kafka Format and Pulsar Format, right?

Yes

If not[2], when users want to migrate Kafka to an existing Pulsar cluster, how can they do it?

If KafkacomPactor is not implemented, you may need to use Pulsar Format in Kop to produce messages, and Kop will automatically transform Kafka Format to Pulsar Format.


broker.conf
```
compactorClassName=org.apache.pulsar.compaction.TwoPhaseCompactor
```

# Backward & Forward Compatability

## Revert

<!--
Describe a cookbook detailing the steps required to revert pulsar to previous version *without* this feature.
-->


## Upgrade

<!--
Specify the list of instructions, if there are such, needed to perform before/after upgrading to Pulsar version containing this feature.
-->

# Alternatives

<!--
If there are alternatives that were already considered by the authors or, after the discussion, by the community, and were rejected, please list them here along with the reason why they were rejected.
-->

# General Notes

# Links

<!--
Updated afterwards
-->
* Mailing List discussion thread: https://lists.apache.org/thread/svyodx3q1nrdd6fyrsfv3xmo0lgn6j2j
* Mailing List voting thread:
Loading