Skip to content

Commit

Permalink
KAFKA-7223: Make suppression buffer durable (apache#5724)
Browse files Browse the repository at this point in the history
This is Part 4 of suppression (durability)
Part 1 was apache#5567 (the API)
Part 2 was apache#5687 (the tests)
Part 3 was apache#5693 (in-memory buffering)

Implement a changelog for the suppression buffer so that the buffer state may be recovered on restart or recovery.
As of this PR, suppression is suitable for general usage.

Reviewers: Bill Bejeck <bill@confluent.io>, Guozhang Wang <guozhang@confluent.io>, Matthias J. Sax <matthias@confluent.io>
  • Loading branch information
vvcephei authored and pengxiaolong committed Jun 14, 2019
1 parent a7571c8 commit 4ef08fc
Show file tree
Hide file tree
Showing 10 changed files with 927 additions and 485 deletions.
Expand Up @@ -36,13 +36,15 @@
import org.apache.kafka.streams.kstream.internals.graph.KTableKTableJoinNode;
import org.apache.kafka.streams.kstream.internals.graph.ProcessorGraphNode;
import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters;
import org.apache.kafka.streams.kstream.internals.graph.StatefulProcessorNode;
import org.apache.kafka.streams.kstream.internals.graph.StreamsGraphNode;
import org.apache.kafka.streams.kstream.internals.graph.TableProcessorNode;
import org.apache.kafka.streams.kstream.internals.suppress.FinalResultsSuppressionBuilder;
import org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor;
import org.apache.kafka.streams.kstream.internals.suppress.SuppressedInternal;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer;

import java.time.Duration;
import java.util.Collections;
Expand Down Expand Up @@ -356,20 +358,24 @@ public <K1> KStream<K1, V> toStream(final KeyValueMapper<? super K, ? super V, ?
@Override
public KTable<K, V> suppress(final Suppressed<K> suppressed) {
final String name = builder.newProcessorName(SUPPRESS_NAME);
final String storeName = builder.newStoreName(SUPPRESS_NAME);

final ProcessorSupplier<K, Change<V>> suppressionSupplier =
() -> new KTableSuppressProcessor<>(
buildSuppress(suppressed),
storeName,
keySerde,
valSerde == null ? null : new FullChangeSerde<>(valSerde)
);

final ProcessorParameters<K, Change<V>> processorParameters = new ProcessorParameters<>(
suppressionSupplier,
name
);

final ProcessorGraphNode<K, Change<V>> node = new ProcessorGraphNode<>(name, processorParameters, false);
final ProcessorGraphNode<K, Change<V>> node = new StatefulProcessorNode<>(
name,
new ProcessorParameters<>(suppressionSupplier, name),
null,
new InMemoryTimeOrderedKeyValueBuffer.Builder(storeName),
false
);

builder.addGraphNode(streamsGraphNode, node);

Expand Down

This file was deleted.

Expand Up @@ -28,32 +28,37 @@
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.state.internals.ContextualRecord;
import org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBuffer;

import java.util.Objects;

import static java.util.Objects.requireNonNull;

public class KTableSuppressProcessor<K, V> implements Processor<K, Change<V>> {
private final long maxRecords;
private final long maxBytes;
private final long suppressDurationMillis;
private final TimeOrderedKeyValueBuffer buffer;
private final TimeDefinition<K> bufferTimeDefinition;
private final BufferFullStrategy bufferFullStrategy;
private final boolean shouldSuppressTombstones;
private final String storeName;
private TimeOrderedKeyValueBuffer buffer;
private InternalProcessorContext internalProcessorContext;

private Serde<K> keySerde;
private Serde<Change<V>> valueSerde;
private FullChangeSerde<V> valueSerde;

public KTableSuppressProcessor(final SuppressedInternal<K> suppress,
final String storeName,
final Serde<K> keySerde,
final FullChangeSerde<V> valueSerde) {
this.storeName = storeName;
requireNonNull(suppress);
this.keySerde = keySerde;
this.valueSerde = valueSerde;
maxRecords = suppress.getBufferConfig().maxRecords();
maxBytes = suppress.getBufferConfig().maxBytes();
suppressDurationMillis = suppress.getTimeToWaitForMoreEvents().toMillis();
buffer = new InMemoryTimeOrderedKeyValueBuffer();
bufferTimeDefinition = suppress.getTimeDefinition();
bufferFullStrategy = suppress.getBufferConfig().bufferFullStrategy();
shouldSuppressTombstones = suppress.shouldSuppressTombstones();
Expand All @@ -63,8 +68,9 @@ public KTableSuppressProcessor(final SuppressedInternal<K> suppress,
@Override
public void init(final ProcessorContext context) {
internalProcessorContext = (InternalProcessorContext) context;
this.keySerde = keySerde == null ? (Serde<K>) context.keySerde() : keySerde;
this.valueSerde = valueSerde == null ? FullChangeSerde.castOrWrap(context.valueSerde()) : valueSerde;
keySerde = keySerde == null ? (Serde<K>) context.keySerde() : keySerde;
valueSerde = valueSerde == null ? FullChangeSerde.castOrWrap(context.valueSerde()) : valueSerde;
buffer = Objects.requireNonNull((TimeOrderedKeyValueBuffer) context.getStateStore(storeName));
}

@Override
Expand Down

This file was deleted.

0 comments on commit 4ef08fc

Please sign in to comment.