Skip to content

Commit

Permalink
KAFKA-14936: fix grace period partition issue (#14269)
Browse files Browse the repository at this point in the history
Move the store creation to builder pattern and recover mintimestamp

Reviewers: John Roesler<vvcephei@apache.org>, Bill Bejeck <bbejeck@gmail.com>
  • Loading branch information
wcarlson5 committed Aug 21, 2023
1 parent 0191745 commit bae7ea5
Show file tree
Hide file tree
Showing 7 changed files with 177 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,6 @@
import java.util.Set;
import org.apache.kafka.streams.state.VersionedBytesStoreSupplier;
import org.apache.kafka.streams.state.internals.RocksDBTimeOrderedKeyValueBuffer;
import org.apache.kafka.streams.state.internals.RocksDBTimeOrderedKeyValueBytesStore;
import org.apache.kafka.streams.state.internals.RocksDBTimeOrderedKeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBuffer;

import static org.apache.kafka.streams.kstream.internals.graph.OptimizableRepartitionNode.optimizableRepartitionNodeBuilder;

Expand Down Expand Up @@ -1232,7 +1229,7 @@ private <KG, VG, VR> KStream<K, VR> globalTableJoin(final GlobalKTable<KG, VG> g
leftJoin);
final ProcessorParameters<K, V, ?, ?> processorParameters = new ProcessorParameters<>(processorSupplier, name);
final StreamTableJoinNode<K, V> streamTableJoinNode =
new StreamTableJoinNode<>(name, processorParameters, new String[] {}, null, null);
new StreamTableJoinNode<>(name, processorParameters, new String[] {}, null, null, Optional.empty());

builder.addGraphNode(graphNode, streamTableJoinNode);

Expand Down Expand Up @@ -1262,32 +1259,31 @@ private <VO, VR> KStream<K, VR> doStreamTableJoin(final KTable<K, VO> table,

final String name = renamed.orElseGenerateWithPrefix(builder, leftJoin ? LEFTJOIN_NAME : JOIN_NAME);

Optional<TimeOrderedKeyValueBuffer<K, V, V>> buffer = Optional.empty();
Optional<String> bufferStoreName = Optional.empty();

if (joined.gracePeriod() != null) {
if (!((KTableImpl<K, ?, VO>) table).graphNode.isOutputVersioned().orElse(true)) {
throw new IllegalArgumentException("KTable must be versioned to use a grace period in a stream table join.");
}
final String bufferStoreName = name + "-Buffer";
final RocksDBTimeOrderedKeyValueBytesStore store = new RocksDBTimeOrderedKeyValueBytesStoreSupplier(bufferStoreName).get();

buffer = Optional.of(new RocksDBTimeOrderedKeyValueBuffer<>(store, joined.gracePeriod(), name, true));
bufferStoreName = Optional.of(name + "-Buffer");
builder.addStateStore(new RocksDBTimeOrderedKeyValueBuffer.Builder<>(bufferStoreName.get(), joined.gracePeriod(), name));
}

final ProcessorSupplier<K, V, K, ? extends VR> processorSupplier = new KStreamKTableJoin<>(
((KTableImpl<K, ?, VO>) table).valueGetterSupplier(),
joiner,
leftJoin,
Optional.ofNullable(joined.gracePeriod()),
buffer);
bufferStoreName);

final ProcessorParameters<K, V, ?, ?> processorParameters = new ProcessorParameters<>(processorSupplier, name);
final StreamTableJoinNode<K, V> streamTableJoinNode = new StreamTableJoinNode<>(
name,
processorParameters,
((KTableImpl<K, ?, VO>) table).valueGetterSupplier().storeNames(),
this.name,
joined.gracePeriod()
joined.gracePeriod(),
bufferStoreName
);

builder.addGraphNode(graphNode, streamTableJoinNode);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.apache.kafka.streams.kstream.ValueJoinerWithKey;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBuffer;

import java.time.Duration;
import java.util.Optional;
Expand All @@ -32,24 +31,24 @@ class KStreamKTableJoin<K, V1, V2, VOut> implements ProcessorSupplier<K, V1, K,
private final ValueJoinerWithKey<? super K, ? super V1, ? super V2, VOut> joiner;
private final boolean leftJoin;
private final Optional<Duration> gracePeriod;
private final Optional<TimeOrderedKeyValueBuffer<K, V1, V1>> buffer;
private final Optional<String> storeName;


KStreamKTableJoin(final KTableValueGetterSupplier<K, V2> valueGetterSupplier,
final ValueJoinerWithKey<? super K, ? super V1, ? super V2, VOut> joiner,
final boolean leftJoin,
final Optional<Duration> gracePeriod,
final Optional<TimeOrderedKeyValueBuffer<K, V1, V1>> buffer) {
final Optional<String> storeName) {
this.valueGetterSupplier = valueGetterSupplier;
this.joiner = joiner;
this.leftJoin = leftJoin;
this.gracePeriod = gracePeriod;
this.buffer = buffer;
this.storeName = storeName;
}

@Override
public Processor<K, V1, K, VOut> get() {
return new KStreamKTableJoinProcessor<>(valueGetterSupplier.get(), keyValueMapper, joiner, leftJoin, gracePeriod, buffer);
return new KStreamKTableJoinProcessor<>(valueGetterSupplier.get(), keyValueMapper, joiner, leftJoin, gracePeriod, storeName);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.time.Duration;
import java.util.Optional;

import static java.util.Objects.requireNonNull;
import static org.apache.kafka.streams.processor.internals.ProcessorContextUtils.asInternalProcessorContext;
import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
Expand All @@ -49,27 +50,25 @@ class KStreamKTableJoinProcessor<K1, K2, V1, V2, VOut> extends ContextualProcess
private final boolean leftJoin;
private Sensor droppedRecordsSensor;
private final Optional<Duration> gracePeriod;
private final Optional<TimeOrderedKeyValueBuffer<K1, V1, V1>> buffer;
private TimeOrderedKeyValueBuffer<K1, V1, V1> buffer;
protected long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
private InternalProcessorContext internalProcessorContext;
private final boolean useBuffer;
private final String storeName;

KStreamKTableJoinProcessor(final KTableValueGetter<K2, V2> valueGetter,
final KeyValueMapper<? super K1, ? super V1, ? extends K2> keyMapper,
final ValueJoinerWithKey<? super K1, ? super V1, ? super V2, ? extends VOut> joiner,
final boolean leftJoin,
final Optional<Duration> gracePeriod,
final Optional<TimeOrderedKeyValueBuffer<K1, V1, V1>> buffer) {
final Optional<String> storeName) {
this.valueGetter = valueGetter;
this.keyMapper = keyMapper;
this.joiner = joiner;
this.leftJoin = leftJoin;
this.useBuffer = buffer.isPresent();
if (gracePeriod.isPresent() ^ buffer.isPresent()) {
throw new IllegalArgumentException("Grace Period requires a buffer");
}
this.useBuffer = gracePeriod.isPresent();
this.gracePeriod = gracePeriod;
this.buffer = buffer;
this.storeName = storeName.orElse("");
}

@Override
Expand All @@ -83,9 +82,8 @@ public void init(final ProcessorContext<K1, VOut> context) {
if (!valueGetter.isVersioned() && gracePeriod.isPresent()) {
throw new IllegalArgumentException("KTable must be versioned to use a grace period in a stream table join.");
}

buffer.get().setSerdesIfNull(new SerdeGetter(context));
buffer.get().init((org.apache.kafka.streams.processor.StateStoreContext) context(), null);
buffer = requireNonNull(context.getStateStore(storeName));
buffer.setSerdesIfNull(new SerdeGetter(context));
}
}

Expand All @@ -100,10 +98,10 @@ public void process(final Record<K1, V1> record) {
if (!useBuffer) {
doJoin(record);
} else {
if (!buffer.get().put(observedStreamTime, record, internalProcessorContext.recordContext())) {
if (!buffer.put(observedStreamTime, record, internalProcessorContext.recordContext())) {
doJoin(record);
} else {
buffer.get().evictWhile(() -> true, this::emit);
buffer.evictWhile(() -> true, this::emit);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import java.time.Duration;
import java.util.Arrays;
import java.util.Optional;

/**
* Represents a join between a KStream and a KTable or GlobalKTable
Expand All @@ -33,20 +34,23 @@ public class StreamTableJoinNode<K, V> extends GraphNode {
private final ProcessorParameters<K, V, ?, ?> processorParameters;
private final String otherJoinSideNodeName;
private final Duration gracePeriod;
private final Optional<String> bufferName;


public StreamTableJoinNode(final String nodeName,
final ProcessorParameters<K, V, ?, ?> processorParameters,
final String[] storeNames,
final String otherJoinSideNodeName,
final Duration gracePeriod) {
final Duration gracePeriod,
final Optional<String> bufferName) {
super(nodeName);

// in the case of Stream-Table join the state stores associated with the KTable
this.storeNames = storeNames;
this.processorParameters = processorParameters;
this.otherJoinSideNodeName = otherJoinSideNodeName;
this.gracePeriod = gracePeriod;
this.bufferName = bufferName;
}

@Override
Expand All @@ -69,6 +73,7 @@ public void writeToTopology(final InternalTopologyBuilder topologyBuilder) {
// Steam - KTable join only
if (otherJoinSideNodeName != null) {
topologyBuilder.connectProcessorAndStateStores(processorName, storeNames);
bufferName.ifPresent(s -> topologyBuilder.connectProcessorAndStateStores(processorName, s));
if (gracePeriod != null) {
for (final String storeName : storeNames) {
if (!topologyBuilder.isStoreVersioned(storeName)) {
Expand Down

0 comments on commit bae7ea5

Please sign in to comment.