Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PIP-215: Configurable TopicCompactionStrategy for StrategicTwoPhaseCompactor and TableView #18099

Closed
heesung-sn opened this issue Oct 18, 2022 · 4 comments

Comments

@heesung-sn
Copy link
Contributor

heesung-sn commented Oct 18, 2022

Motivation

Currently, the Topic compaction logic implemented in TwoPhaseCompactor only compacts messages to the last one(with the same key).

Here, we want to configure Topic compaction with different strategies. For example, to support the Conflict State Resolution(Race Conditions) in PIP-192 (#16691), we need to compact messages with the first valid states.

Goal

  • Create another Topic compactor, StrategicTwoPhaseCompactor, where we can configure a compaction strategy,
    TopicCompactionStrategy

  • Update the TableViewConfigurationData to load and consider the TopicCompactionStrategy when updating the internal key-value map in TableView.

  • Add TopicCompactionStrategy in Topic-level Policy to selectively run StrategicTwoPhaseCompactor or TwoPhaseCompactor when executing compaction.

  • Do not change the default behaviors of topic compaction and table views. Enable this feature only TopicCompactionStrategy is configured .

  • Make a conservative release. Initially use this strategic compaction feature only for the internal system topics. Do not expose until proven to be stable and requested by pulsar users.

API Changes

public interface TopicCompactionStrategy<T> {

    /**
     * Returns the schema object for this strategy.
     * @return
     */
    Schema<T> getSchema();

     /**
     * Tests if the compaction needs to keep the left(previous message value) compared with the right(current message value) for the same key.
     *
     * @param prev previous message value
     * @param cur current message value
     * @return True if it needs to keep the prev and ignore the cur. Otherwise, False.
     */
    boolean shouldKeepLeft(T prev, T cur);

     /**
     * Check if the merge is enabled. If enabled, it will run T merge(..).
     *
     * @return True if the merge is enabled.
     */
    boolean isMergeEnabled();


     /**
     * Merges the previous message value and the cur message value.
     *
     * @param prev previous message value
     * @param cur current message value
     * @return the merged value
     */
    T merge(T prev, T cur);



    static TopicCompactionStrategy load(String topicCompactionStrategy) {
        if (topicCompactionStrategy == null) {
            return null;
        }
        try {
            //
            Class<?> clazz = Class.forName(topicCompactionStrategy);
            Object instance = clazz.getDeclaredConstructor().newInstance();
            return (TopicCompactionStrategy) instance;
        } catch (Exception e) {
            throw new IllegalArgumentException("Error when loading topic compaction strategy.", e);
        }
    }
}
public interface TableView<T> extends Closeable {
    

    /**
     * Performs the give action for each future entry in this map until action throws an exception.
     *
     * @param action The action to be performed for each entry
     */
    void listen(BiConsumer<String, T> action);
public class TableViewConfigurationData implements Serializable, Cloneable {
...
   + private String topicCompactionStrategy;



public class TableViewImpl<T> implements TableView<T> {

    private final TableViewConfigurationData conf;

...

    + private TopicCompactionStrategy<T> compactionStrategy;


    TableViewImpl(PulsarClientImpl client, Schema<T> schema, TableViewConfigurationData conf) {
...
       + this.compactionStrategy = TopicCompactionStrategy.load(conf.getTopicCompactionStrategy());
public class ReaderConfigurationData<T> implements Serializable, Cloneable {

    + private SubscriptionMode subscriptionMode = SubscriptionMode.NonDurable;

    + private SubscriptionInitialPosition subscriptionInitialPosition = SubscriptionInitialPosition.Latest;
public class CompactionReaderImpl<T> extends ReaderImpl<T> {

    ConsumerBase<T> consumer;
    private CompactionReaderImpl(PulsarClientImpl client, ReaderConfigurationData<T> readerConfiguration,
                                 ExecutorProvider executorProvider, CompletableFuture<Consumer<T>> consumerFuture,
                                 Schema<T> schema) {
        super(client, readerConfiguration, executorProvider, consumerFuture, schema);
        this.consumer = getConsumer();
    }

    public static <T> CompactionReaderImpl<T> create(PulsarClientImpl client, Schema<T> schema, String topic,
                                                     CompletableFuture<Consumer<T>> consumerFuture) {
        ReaderConfigurationData<T> conf = new ReaderConfigurationData<>();
        conf.setTopicName(topic);
        conf.setSubscriptionName(COMPACTION_SUBSCRIPTION);
        conf.setStartMessageId(MessageId.earliest);
        conf.setAutoUpdatePartitions(true);
        conf.setAutoUpdatePartitionsIntervalSeconds(30);
        conf.setReadCompacted(true);
        conf.setPoolMessages(true);
        conf.setSubscriptionMode(SubscriptionMode.Durable);
        conf.setSubscriptionInitialPosition(SubscriptionInitialPosition.Earliest);
        return new CompactionReaderImpl(client, conf, client.externalExecutorProvider(), consumerFuture, schema);
    }


   ...
    @Override
    public CompletableFuture<Message<T>> readNextAsync() {
        CompletableFuture<Message<T>> receiveFuture = consumer.receiveAsync();
        return receiveFuture;
    }
...
    public CompletableFuture<Void> acknowledgeCumulativeAsync(MessageId messageId, Map<String, Long> properties) {
        return consumer.doAcknowledgeWithTxn(messageId, CommandAck.AckType.Cumulative, properties, null);
    }
}

public class StrategicTwoPhaseCompactor extends TwoPhaseCompactor {
...
    public <T> CompletableFuture<Long> compact(String topic,
                                               TopicCompactionStrategy<T> strategy) {
        CompletableFuture<Consumer<T>> consumerFuture = new CompletableFuture<>();
        CompactionReaderImpl reader = CompactionReaderImpl.create(
                (PulsarClientImpl) pulsar, strategy.getSchema(), topic, consumerFuture);

        return consumerFuture.thenComposeAsync(__ -> compactAndCloseReader(reader, strategy), scheduler);
    }
...
pulsar-admin topicPolicies set-compaction-strategy options
pulsar-admin topicPolicies get-compaction-strategy options

Implementation

# Goal 1:
- Create another Topic compactor, `StrategicTwoPhaseCompactor`, where we can configure a compaction strategy, 
 `TopicCompactionStrategy`

StrategicTwoPhaseCompactor will have two phases.

First Phase:
Using the CompactionReader<T>, instead of RawReader, it will iterate each message and compact messages with the same keys by following the merge() in TopicCompactionStrategy.

The CompactionReader will be added to the pulsar-broker only(not in the pulsar-client).

Second Phase:
The compacted messages will be written to a ledger.

# Goal 2:
- Update the `TableViewConfigurationData` to load and consider the `TopicCompactionStrategy` when updating the internal key-value map in `TableView`. 

When updating the internal key-value map, it will follow the same compaction logic defined in TopicCompactionStrategy .

# Goal 3:
- Add `TopicCompactionStrategy` in Topic-level Policy to run `StrategicTwoPhaseCompactor` instead of `TwoPhaseCompactor` when executing compaction.

When running the compaction, it will look up the TopicCompactionStrategy in the Topic-level Policy and run StrategicTwoPhaseCompactor, if configured. By default, it should run TwoPhaseCompactor.

Alternatives

Why not resolve conflict by a single broker(leader broker) using two topics : un-compacted and competed(pre-filter)?

  • brokers broadcast messages to the non-compacted topic first.
  • only the leader broker consumes this non-compacted topic and compacts any conflicting messages. Then the leader
    produces compacted messages to the compacted topic(resolve conflicts by the single writer).
  • brokers consume the compacted topic. (no needs to compact the messages separately)

in the worst case, when
there are many conflicting messages, this PIP can incur more repeated
custom compaction than the alternative as individual consumers need to
compact messages (topic compaction and table views). However, one of the
advantages of this proposal is that pub/sub is faster since it uses a
single topic. For example, in PIP-192, if the "bundle assignment" broadcast
is fast enough, conflicting bundle assignment requests can be
reduced(broadcast filter effect).

Anything else?

No response

@heesung-sn
Copy link
Contributor Author

Raised a local-fork PR for the implementation reference: heesung-sn#12

@heesung-sn
Copy link
Contributor Author

Raised a PR to the apache/master branch. #18195

@heesung-sn heesung-sn changed the title PIP-215: Configurable Topic Compaction Strategy PIP-215: Configurable TopicCompactionStrategy for StrategicTwoPhaseCompactor and TableView Nov 3, 2022
@heesung-sn
Copy link
Contributor Author

@github-actions
Copy link

github-actions bot commented Dec 9, 2022

The issue had no activity for 30 days, mark with Stale label.

@github-actions github-actions bot added the Stale label Dec 9, 2022
Demogorgon314 pushed a commit that referenced this issue Mar 29, 2023
Master Issue: Master Issue: #16691, #18099

### Motivation

Raising a PR to implement Master Issue: #16691, #18099

We want to reduce unload frequencies from flaky traffic.

### Modifications
This PR 
- Introduced a config `loadBalancerSheddingConditionHitCountThreshold` to further restrict shedding conditions based on the hit count.
- Normalized offload traffic
- Lowered the default `loadBalanceSheddingDelayInSeconds` value from 600 to 180, as 10 mins are too long. 3 mins can be long enough to catch the new load after unloads.
- Changed the config `loadBalancerBundleLoadReportPercentage` to `loadBalancerMaxNumberOfBundlesInBundleLoadReport` to make the topk bundle count absolute instead of relative.
- Renamed `loadBalancerNamespaceBundleSplitConditionThreshold` to `loadBalancerNamespaceBundleSplitConditionHitCountThreshold` to be consistent with `*ConditionHitCountThreshold`.
- Renamed `loadBalancerMaxNumberOfBrokerTransfersPerCycle ` to `loadBalancerMaxNumberOfBrokerSheddingPerCycle`.
- Added LoadDataStore cleanup logic in BSC monitor.
- Added `msgThroughputEMA` in BrokerLoadData to smooth the broker throughput info.
- Updated Topk bundles sorted in a ascending order (instead of descending)
- Update some info logs to only show in the debug mode.
- Added load data tombstone upon Own, Releasing, Splitting
- Added the bundle ownership(isOwned) check upon split and unload.
- Added swap unload logic
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant