-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[pip][design] PIP-274: Support pluggable topic compactor #20493
Conversation
--> | ||
|
||
|
||
### Configuration |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
broker.conf
compactorClassName=org.apache.pulsar.compaction.TwoPhaseCompactor
If I want namespace A
to apply policy TwoPhaseCompactor
, but namespace B
applies policy KafkaCompactor
, how can I configure it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do not support the namespace level configuration. This is a global configuration.
By the way, since a topic may have Kafka Format and Pulsar Format at the same time, you need to be compatible with Pulsar Format in KafkaCompactor
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I have a Pulsar cluster and there are some topics on this cluster, and we need to migrate Kafka to Pulsar now, how can I do?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since a topic may have Kafka Format and Pulsar Format at the same time, you need to be compatible with Pulsar Format in KafkaCompactor.
Then, you copied the logic from the existing compactor? :(
IMO, I wish we could support ns or topic-level configuration to avoid someday we will implement a powerful compactor. (one compactor supports all of protocol)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I have a Pulsar cluster and there are some topics on this cluster, and we need to migrate Kafka to Pulsar now, how can I do?
For KafkaCompactor, you need to deal with Kafka Format and Pulsar Format, Kop can send Pulsar Format or Kafka Format
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you change the interface to support all functions of TwoPhaseCompactor?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you change the interface to support all functions of TwoPhaseCompactor?
Support all functions of TwoPhaseCompactor is not a required option, this is just a requirement in the case of Kop
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For KafkaCompactor, you need to deal with Kafka Format and Pulsar Format, Kop can send Pulsar Format or Kafka Format
Support all functions of TwoPhaseCompactor is not a required option, this is just a requirement in the case of Kop
I have three questions:
- The format rules of "Pulsar Format" and existing pulsar are exactly the same, right?
KafkaCompactor
can handle both Kafka Format and Pulsar Format, right?[2]- If not[2], when users want to migrate Kafka to an existing Pulsar cluster, how can they do it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The format rules of "Pulsar Format" and existing pulsar are exactly the same, right?
Yes
KafkaCompactor can handle both Kafka Format and Pulsar Format, right?
Yes
If not[2], when users want to migrate Kafka to an existing Pulsar cluster, how can they do it?
If KafkacomPactor
is not implemented, you may need to use Pulsar Format
in Kop to produce messages, and Kop will automatically transform Kafka Format
to Pulsar Format
.
BookKeeper bk, | ||
ScheduledExecutorService scheduler); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not a good abstraction to have BookKeeper
(or even PulsarClient
) involved. These parameters are just copied from the existing Compactor
's constructor, which is not abstracted well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the motivation is to handle messages whose format is other than pulsar, we might need to pass some functions like:
interface CompactEntryProcessor {
// Just an example method
void handle(Entry entry);
}
and pass the processor to the compactor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@BewareMyPower Do you mean to abstract the CompactEntryProcessor
and add a compactEntryProcessorClassName
configuration?
But we still need to construct this CompactEntryProcessor
by BookKeeper.
How about abstraction the CompactStorage
like:
interface CompactionHandle {
CompletableFuture<Void> asyncAddEntry(ByteBuf entry);
CompletableFuture<List<ByteBuf>> asyncReadEntries(long from, long end)
CompletableFuture<Long> getId();
CompletableFuture<Void> flush();
CompletableFuture<Void> close();
}
public interface CompactStorage {
CompletableFuture<CompactionHandle> createCompactionStorage();
void deleteCompactionStorage(CompactionHandle compactionHandle);
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No. You understood wrong. Adding these two interfaces is complicated for users to implement. I mean, we only need ways to handle entries of different format, so users don't need to implement the same logic of reading entries or writing entries again.
For example, we only need to replace
if (RawBatchConverter.isReadableBatch(m)) {
// ...
} else {
// ...
}
with
processor.handle(m, latestForKey);
Where the processor
is an interface:
interface CompactMessageProcessor {
/**
* Process the message to update the latest key => value map.
*
* @param message the raw message
* @param latestForKey the map that maps the key to the latest value
*/
void handle(RawMessage message, Map<String, MessageId> latestForKey);
}
Then, we don't need to rewrite the same logic again. We only need to change the logic about how to process a RawMessage
.
Can you please follow https://github.com/apache/pulsar/blob/master/pip/README.md regarding the PR title and get the number of the pip? |
|
||
CompletableFuture<Long> compact(String topic); | ||
|
||
CompactorMXBean getStats(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think CompactorMXBean
is a very specific abstraction for metrics. It doesn't allow a compactor to add it's own metrics and will make future update to metrics hard.
First, some metrics are going to be common for any compactor used:
- CompactionSucceedCount - how many compactions have been successful
- CompactionFailedCount - ... failed
CompactionDurationInMillis
- a counter counting how much was spent in compactions so far
Those metrics should be maintained by the service, which calls the compactor interface of compact(topic
. It only needs to know if the compaction was a success or not based on the return value.
I suggest using two interfaces which will be given in initialize()
:
interface CompactionMetricsRecorder {
TopicCompactionMetricsRecorder provideTopicCompactionMetricsRecorder(String topic)
}
interface TopicCompactionMetricsRecorder {
recordMessageRemoved()
recordMessagesWritten(numOfMessages, messagesSizeInBytes, duration, durationUnit)
recordMessagesRead(numOfMessages, messagesSizeInBytes, duration, durationUnit)
...
}
There are some metrics I still need to figure out where to fit since I need to help understand something. The metrics are:
- compactedEntriesCount - how many entries were written in the last compaction
- compactedEntriesSize - what's the total size of entries written in the last compaction
Once the compact() has ended, how does the compactor hand over the details on the output - may be the ledger ID, new compaction horizon, etc? I don't see that in the interface.
The reason I'm asking is that maybe other compactors would do incremental compaction, so it changes what to expect from it.
Now, what's left is a way to record custom metrics specific to your compactor.
Need to think about it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems using CompactionMetricsRecorder
instead of CompactorMXBean
still unresolved other compactor can't add it's own metrics. Can you explain the benefits of doing this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For CompactionSucceedCount
/CompactionFailedCount
/CompactionDurationInMillis
, I think they already contain in AbstractCompactor
other compactors only need to extend from it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Extending is the worst. We should keep interfacing only.
The interface creates a clean design that makes it easier to change in the future to some other mechanism (say OTel...). Somebody external to Pulsar needs to be able to depend as little as possible on Pulsar. CompactorMXBean
is the opposite of that. First, nobody is using MXBean, and it has a different meaning (it was named like this to be integrated into JMX back in 2012). Second, we need to separate the metrics as I wrote.
Today adding metrics that are high cardinality (contain topic label) is done by writing them directly to SimpleTextOutputStream. You usually add a method for that, and you call it from the PrometheusMetricsGenerator.
Thinking out loud - maybe we can add a method to CompactionMetricsRecorder
void writeCustomMetrics(SimpleTextOutputStream)
WDYT?
|
||
## Public-facing Changes | ||
|
||
<!-- |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's ok to remove all comments if you're done writing
--> | ||
|
||
|
||
### Configuration |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you change the interface to support all functions of TwoPhaseCompactor?
@coderzc Have you seen you are colliding with another PIP? |
Close this PIP, I opened PIP-278 to discuss related topics. |
Motivation
This is a PIP to enable support pluggable compactor. The PR contents have the motivation.
Documentation
doc
doc-required
doc-not-needed
doc-complete
Matching PR in forked repository
PR in forked repository: