Skip to content

Commit

Permalink
KAFKA-14936: Add Grace period logic to Stream Table Join (2/N) (#13855)
Browse files Browse the repository at this point in the history
This PR adds the interface for grace period to the Joined object as well as uses the buffer. The majority of it is tests and moving some of the existing join logic.

Reviewers: Victoria Xia <victoria.xia@confluent.io>, Bruno Cadonna <cadonna@apache.org>
  • Loading branch information
wcarlson5 committed Jun 29, 2023
1 parent 0054168 commit 12be344
Show file tree
Hide file tree
Showing 12 changed files with 455 additions and 40 deletions.
77 changes: 64 additions & 13 deletions streams/src/main/java/org/apache/kafka/streams/kstream/Joined.java
Expand Up @@ -18,6 +18,8 @@

import org.apache.kafka.common.serialization.Serde;

import java.time.Duration;

/**
* The {@code Joined} class represents optional params that can be passed to
* {@link KStream#join(KTable, ValueJoiner, Joined) KStream#join(KTable,...)} and
Expand All @@ -29,19 +31,22 @@ public class Joined<K, V, VO> implements NamedOperation<Joined<K, V, VO>> {
protected final Serde<V> valueSerde;
protected final Serde<VO> otherValueSerde;
protected final String name;
protected final Duration gracePeriod;

private Joined(final Serde<K> keySerde,
final Serde<V> valueSerde,
final Serde<VO> otherValueSerde,
final String name) {
final String name,
final Duration gracePeriod) {
this.keySerde = keySerde;
this.valueSerde = valueSerde;
this.otherValueSerde = otherValueSerde;
this.name = name;
this.gracePeriod = gracePeriod;
}

protected Joined(final Joined<K, V, VO> joined) {
this(joined.keySerde, joined.valueSerde, joined.otherValueSerde, joined.name);
this(joined.keySerde, joined.valueSerde, joined.otherValueSerde, joined.name, joined.gracePeriod);
}

/**
Expand All @@ -59,7 +64,7 @@ protected Joined(final Joined<K, V, VO> joined) {
public static <K, V, VO> Joined<K, V, VO> with(final Serde<K> keySerde,
final Serde<V> valueSerde,
final Serde<VO> otherValueSerde) {
return new Joined<>(keySerde, valueSerde, otherValueSerde, null);
return new Joined<>(keySerde, valueSerde, otherValueSerde, null, null);
}

/**
Expand All @@ -84,7 +89,34 @@ public static <K, V, VO> Joined<K, V, VO> with(final Serde<K> keySerde,
final Serde<V> valueSerde,
final Serde<VO> otherValueSerde,
final String name) {
return new Joined<>(keySerde, valueSerde, otherValueSerde, name);
return new Joined<>(keySerde, valueSerde, otherValueSerde, name, null);
}

/**
* Create an instance of {@code Joined} with key, value, and otherValue {@link Serde} instances.
* {@code null} values are accepted and will be replaced by the default serdes as defined in
* config.
*
* @param keySerde the key serde to use. If {@code null} the default key serde from config will be
* used
* @param valueSerde the value serde to use. If {@code null} the default value serde from config
* will be used
* @param otherValueSerde the otherValue serde to use. If {@code null} the default value serde
* from config will be used
* @param name the name used as the base for naming components of the join including any
* repartition topics
* @param gracePeriod stream buffer time
* @param <K> key type
* @param <V> value type
* @param <VO> other value type
* @return new {@code Joined} instance with the provided serdes
*/
public static <K, V, VO> Joined<K, V, VO> with(final Serde<K> keySerde,
final Serde<V> valueSerde,
final Serde<VO> otherValueSerde,
final String name,
final Duration gracePeriod) {
return new Joined<>(keySerde, valueSerde, otherValueSerde, name, gracePeriod);
}

/**
Expand All @@ -98,7 +130,7 @@ public static <K, V, VO> Joined<K, V, VO> with(final Serde<K> keySerde,
* @return new {@code Joined} instance configured with the keySerde
*/
public static <K, V, VO> Joined<K, V, VO> keySerde(final Serde<K> keySerde) {
return new Joined<>(keySerde, null, null, null);
return new Joined<>(keySerde, null, null, null, null);
}

/**
Expand All @@ -112,9 +144,10 @@ public static <K, V, VO> Joined<K, V, VO> keySerde(final Serde<K> keySerde) {
* @return new {@code Joined} instance configured with the valueSerde
*/
public static <K, V, VO> Joined<K, V, VO> valueSerde(final Serde<V> valueSerde) {
return new Joined<>(null, valueSerde, null, null);
return new Joined<>(null, valueSerde, null, null, null);
}


/**
* Create an instance of {@code Joined} with an other value {@link Serde}.
* {@code null} values are accepted and will be replaced by the default value serde as defined in config.
Expand All @@ -126,7 +159,7 @@ public static <K, V, VO> Joined<K, V, VO> valueSerde(final Serde<V> valueSerde)
* @return new {@code Joined} instance configured with the otherValueSerde
*/
public static <K, V, VO> Joined<K, V, VO> otherValueSerde(final Serde<VO> otherValueSerde) {
return new Joined<>(null, null, otherValueSerde, null);
return new Joined<>(null, null, otherValueSerde, null, null);
}

/**
Expand All @@ -142,10 +175,9 @@ public static <K, V, VO> Joined<K, V, VO> otherValueSerde(final Serde<VO> otherV
*
*/
public static <K, V, VO> Joined<K, V, VO> as(final String name) {
return new Joined<>(null, null, null, name);
return new Joined<>(null, null, null, name, null);
}


/**
* Set the key {@link Serde} to be used. Null values are accepted and will be replaced by the default
* key serde as defined in config
Expand All @@ -154,7 +186,7 @@ public static <K, V, VO> Joined<K, V, VO> as(final String name) {
* @return new {@code Joined} instance configured with the {@code name}
*/
public Joined<K, V, VO> withKeySerde(final Serde<K> keySerde) {
return new Joined<>(keySerde, valueSerde, otherValueSerde, name);
return new Joined<>(keySerde, valueSerde, otherValueSerde, name, gracePeriod);
}

/**
Expand All @@ -165,7 +197,7 @@ public Joined<K, V, VO> withKeySerde(final Serde<K> keySerde) {
* @return new {@code Joined} instance configured with the {@code valueSerde}
*/
public Joined<K, V, VO> withValueSerde(final Serde<V> valueSerde) {
return new Joined<>(keySerde, valueSerde, otherValueSerde, name);
return new Joined<>(keySerde, valueSerde, otherValueSerde, name, gracePeriod);
}

/**
Expand All @@ -176,7 +208,7 @@ public Joined<K, V, VO> withValueSerde(final Serde<V> valueSerde) {
* @return new {@code Joined} instance configured with the {@code valueSerde}
*/
public Joined<K, V, VO> withOtherValueSerde(final Serde<VO> otherValueSerde) {
return new Joined<>(keySerde, valueSerde, otherValueSerde, name);
return new Joined<>(keySerde, valueSerde, otherValueSerde, name, gracePeriod);
}

/**
Expand All @@ -189,7 +221,26 @@ public Joined<K, V, VO> withOtherValueSerde(final Serde<VO> otherValueSerde) {
*/
@Override
public Joined<K, V, VO> withName(final String name) {
return new Joined<>(keySerde, valueSerde, otherValueSerde, name);
return new Joined<>(keySerde, valueSerde, otherValueSerde, name, gracePeriod);
}

/**
* Set the grace period on the stream side of the join. Records will enter a buffer before being processed.
* Out of order records in the grace period will be processed in timestamp order. Late records, out of the
* grace period, will be executed right as they come in, if it is past the table history retention this could
* result in a null join. Long gaps in stream side arriving records will cause
* records to be delayed in processing.
*
*
* @param gracePeriod the duration of the grace period. Must be less than the joining table's history retention.
* @return new {@code Joined} instance configured with the gracePeriod
*/
public Joined<K, V, VO> withGracePeriod(final Duration gracePeriod) {
return new Joined<>(keySerde, valueSerde, otherValueSerde, name, gracePeriod);
}

public Duration gracePeriod() {
return gracePeriod;
}

public Serde<K> keySerde() {
Expand Down
Expand Up @@ -20,6 +20,7 @@
import org.apache.kafka.streams.kstream.ValueJoinerWithKey;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import java.util.Optional;

class KStreamGlobalKTableJoin<K1, V1, K2, V2, VOut> implements ProcessorSupplier<K1, V1, K1, VOut> {

Expand All @@ -40,6 +41,6 @@ class KStreamGlobalKTableJoin<K1, V1, K2, V2, VOut> implements ProcessorSupplier

@Override
public Processor<K1, V1, K1, VOut> get() {
return new KStreamKTableJoinProcessor<>(valueGetterSupplier.get(), mapper, joiner, leftJoin);
return new KStreamKTableJoinProcessor<>(valueGetterSupplier.get(), mapper, joiner, leftJoin, Optional.empty(), Optional.empty());
}
}
Expand Up @@ -72,8 +72,13 @@
import java.util.Collections;
import java.util.HashSet;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import org.apache.kafka.streams.state.VersionedBytesStoreSupplier;
import org.apache.kafka.streams.state.internals.RocksDBTimeOrderedKeyValueBuffer;
import org.apache.kafka.streams.state.internals.RocksDBTimeOrderedKeyValueBytesStore;
import org.apache.kafka.streams.state.internals.RocksDBTimeOrderedKeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBuffer;

import static org.apache.kafka.streams.kstream.internals.graph.OptimizableRepartitionNode.optimizableRepartitionNodeBuilder;

Expand Down Expand Up @@ -1256,10 +1261,25 @@ private <VO, VR> KStream<K, VR> doStreamTableJoin(final KTable<K, VO> table,
final NamedInternal renamed = new NamedInternal(joinedInternal.name());

final String name = renamed.orElseGenerateWithPrefix(builder, leftJoin ? LEFTJOIN_NAME : JOIN_NAME);

Optional<TimeOrderedKeyValueBuffer<K, V, V>> buffer = Optional.empty();

if (joined.gracePeriod() != null) {
if (!((KTableImpl<K, ?, VO>) table).graphNode.isOutputVersioned().orElse(true)) {
throw new IllegalArgumentException("KTable must be versioned to use a grace period in a stream table join.");
}
final String bufferStoreName = name + "-Buffer";
final RocksDBTimeOrderedKeyValueBytesStore store = new RocksDBTimeOrderedKeyValueBytesStoreSupplier(bufferStoreName).get();

buffer = Optional.of(new RocksDBTimeOrderedKeyValueBuffer<>(store, joined.gracePeriod(), name));
}

final ProcessorSupplier<K, V, K, ? extends VR> processorSupplier = new KStreamKTableJoin<>(
((KTableImpl<K, ?, VO>) table).valueGetterSupplier(),
joiner,
leftJoin);
leftJoin,
Optional.ofNullable(joined.gracePeriod()),
buffer);

final ProcessorParameters<K, V, ?, ?> processorParameters = new ProcessorParameters<>(processorSupplier, name);
final StreamTableJoinNode<K, V> streamTableJoinNode = new StreamTableJoinNode<>(
Expand Down
Expand Up @@ -20,25 +20,36 @@
import org.apache.kafka.streams.kstream.ValueJoinerWithKey;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBuffer;

import java.time.Duration;
import java.util.Optional;

class KStreamKTableJoin<K, V1, V2, VOut> implements ProcessorSupplier<K, V1, K, VOut> {

private final KeyValueMapper<K, V1, K> keyValueMapper = (key, value) -> key;
private final KTableValueGetterSupplier<K, V2> valueGetterSupplier;
private final ValueJoinerWithKey<? super K, ? super V1, ? super V2, VOut> joiner;
private final boolean leftJoin;
private final Optional<Duration> gracePeriod;
private final Optional<TimeOrderedKeyValueBuffer<K, V1, V1>> buffer;


KStreamKTableJoin(final KTableValueGetterSupplier<K, V2> valueGetterSupplier,
final ValueJoinerWithKey<? super K, ? super V1, ? super V2, VOut> joiner,
final boolean leftJoin) {
final boolean leftJoin,
final Optional<Duration> gracePeriod,
final Optional<TimeOrderedKeyValueBuffer<K, V1, V1>> buffer) {
this.valueGetterSupplier = valueGetterSupplier;
this.joiner = joiner;
this.leftJoin = leftJoin;
this.gracePeriod = gracePeriod;
this.buffer = buffer;
}

@Override
public Processor<K, V1, K, VOut> get() {
return new KStreamKTableJoinProcessor<>(valueGetterSupplier.get(), keyValueMapper, joiner, leftJoin);
return new KStreamKTableJoinProcessor<>(valueGetterSupplier.get(), keyValueMapper, joiner, leftJoin, gracePeriod, buffer);
}

}

0 comments on commit 12be344

Please sign in to comment.