Skip to content

Commit

Permalink
KAFKA-14834: [11/N] Update table joins to identify out-of-order recor…
Browse files Browse the repository at this point in the history
…ds with `isLatest` (#13609)

This PR fixes a bug in the table-table join handling of out-of-order records in versioned tables where if the latest value for a particular key is a tombstone, by using the isLatest value from the Change object instead of calling get(key) on the state store to fetch timestamps to compare against. As part of this fix, this PR also updates table-table joins to determine whether upstream tables are versioned by using the GraphNode mechanism, instead of checking the table's value getter.

Part of KIP-914.

Reviewer: Matthias J. Sax <matthias@confluent.io>
  • Loading branch information
vcrfxia authored and mjsax committed Apr 19, 2023
1 parent 34f037c commit 6cf1010
Show file tree
Hide file tree
Showing 25 changed files with 730 additions and 743 deletions.
Expand Up @@ -620,11 +620,17 @@ private <K, V> GroupedInternal<K, V> getRepartitionSerdes(final Collection<Optim
}

private void enableVersionedSemantics() {
versionedSemanticsNodes.forEach(node -> ((VersionedSemanticsGraphNode) node).enableVersionedSemantics(isVersionedUpstream(node)));
versionedSemanticsNodes.forEach(node -> {
for (final GraphNode parentNode : node.parentNodes()) {
if (isVersionedOrVersionedUpstream(parentNode)) {
((VersionedSemanticsGraphNode) node).enableVersionedSemantics(true, parentNode.nodeName());
}
}
});
tableSuppressNodesNodes.forEach(node -> {
if (isVersionedUpstream(node)) {
throw new TopologyException("suppress() is only supported for non-versioned KTables " +
"(note that version semantics might be inherited from upstream)");
"(note that version semantics might be inherited from upstream)");
}
});
}
Expand Down
Expand Up @@ -48,6 +48,8 @@
import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionReceiveProcessorSupplier;
import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapper;
import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapperSerde;
import org.apache.kafka.streams.kstream.internals.graph.ForeignTableJoinNode;
import org.apache.kafka.streams.kstream.internals.graph.ForeignJoinSubscriptionSendNode;
import org.apache.kafka.streams.kstream.internals.graph.KTableKTableJoinNode;
import org.apache.kafka.streams.kstream.internals.graph.ProcessorGraphNode;
import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters;
Expand Down Expand Up @@ -804,6 +806,7 @@ private <VO, VR> KTable<K, VR> doJoin(final KTable<K, VO> other,
kTableKTableJoinNode.setOutputVersioned(isOutputVersioned);

builder.addGraphNode(this.graphNode, kTableKTableJoinNode);
builder.addGraphNode(((KTableImpl<?, ?, ?>) other).graphNode, kTableKTableJoinNode);

// we can inherit parent key serde if user do not provide specific overrides
return new KTableImpl<K, Change<VR>, VR>(
Expand Down Expand Up @@ -1098,7 +1101,7 @@ private <VR, KO, VO> KTable<K, VR> doJoinOnForeignKey(final KTable<KO, VO> forei
//not be done needlessly.
((KTableImpl<?, ?, ?>) foreignKeyTable).enableSendingOldValues(true);

//Old values must be sent such that the ForeignJoinSubscriptionSendProcessorSupplier can propagate deletions to the correct node.
//Old values must be sent such that the SubscriptionSendProcessorSupplier can propagate deletions to the correct node.
//This occurs whenever the extracted foreignKey changes values.
enableSendingOldValues(true);

Expand Down Expand Up @@ -1138,22 +1141,18 @@ private <VR, KO, VO> KTable<K, VR> doJoinOnForeignKey(final KTable<KO, VO> forei
keySerde
);

final KTableValueGetterSupplier<K, V> primaryKeyValueGetter = valueGetterSupplier();
final StatefulProcessorNode<K, Change<V>> subscriptionSendNode = new StatefulProcessorNode<>(
final ProcessorGraphNode<K, Change<V>> subscriptionSendNode = new ForeignJoinSubscriptionSendNode<>(
new ProcessorParameters<>(
new SubscriptionSendProcessorSupplier<>(
foreignKeyExtractor,
subscriptionForeignKeySerdePseudoTopic,
valueHashSerdePseudoTopic,
foreignKeySerde,
valueSerde == null ? null : valueSerde.serializer(),
leftJoin,
primaryKeyValueGetter
leftJoin
),
renamed.suffixWithOrElseGet("-subscription-registration-processor", builder, SUBSCRIPTION_REGISTRATION)
),
Collections.emptySet(),
Collections.singleton(primaryKeyValueGetter)
)
);
builder.addGraphNode(graphNode, subscriptionSendNode);

Expand Down Expand Up @@ -1219,13 +1218,13 @@ private <VR, KO, VO> KTable<K, VR> doJoinOnForeignKey(final KTable<KO, VO> forei
);
builder.addGraphNode(subscriptionReceiveNode, subscriptionJoinNode);

final StatefulProcessorNode<KO, Change<VO>> foreignTableJoinNode = new StatefulProcessorNode<>(
final StatefulProcessorNode<KO, Change<VO>> foreignTableJoinNode = new ForeignTableJoinNode<>(
new ProcessorParameters<>(
new ForeignTableJoinProcessorSupplier<>(subscriptionStore, combinedKeySchema, foreignKeyValueGetter),
new ForeignTableJoinProcessorSupplier<>(subscriptionStore, combinedKeySchema),
renamed.suffixWithOrElseGet("-foreign-join-subscription", builder, SUBSCRIPTION_PROCESSOR)
),
Collections.singleton(subscriptionStore),
Collections.singleton(foreignKeyValueGetter)
Collections.emptySet()
);
builder.addGraphNode(((KTableImpl<KO, VO, ?>) foreignKeyTable).graphNode, foreignTableJoinNode);

Expand Down Expand Up @@ -1259,6 +1258,7 @@ private <VR, KO, VO> KTable<K, VR> doJoinOnForeignKey(final KTable<KO, VO> forei
resultSourceNodes.add(foreignResponseSource.nodeName());
builder.internalTopologyBuilder.copartitionSources(resultSourceNodes);

final KTableValueGetterSupplier<K, V> primaryKeyValueGetter = valueGetterSupplier();
final StatefulProcessorNode<K, SubscriptionResponseWrapper<VO>> responseJoinNode = new StatefulProcessorNode<>(
new ProcessorParameters<>(
new ResponseJoinProcessorSupplier<>(
Expand Down
Expand Up @@ -18,17 +18,16 @@

import org.apache.kafka.streams.kstream.ValueJoiner;

abstract class KTableKTableAbstractJoin<K, V1, V2, VOut> implements
public abstract class KTableKTableAbstractJoin<K, V1, V2, VOut> implements
KTableProcessorSupplier<K, V1, K, VOut> {

private final KTableImpl<K, ?, V1> table1;
private final KTableImpl<K, ?, V2> table2;
final KTableValueGetterSupplier<K, V1> valueGetterSupplier1;
final KTableValueGetterSupplier<K, V2> valueGetterSupplier2;
final KTableValueGetter<K, V1> valueGetter1;
final KTableValueGetter<K, V2> valueGetter2;
final ValueJoiner<? super V1, ? super V2, ? extends VOut> joiner;

boolean useVersionedSemantics = false;
boolean sendOldValues = false;

KTableKTableAbstractJoin(final KTableImpl<K, ?, V1> table1,
Expand All @@ -38,8 +37,6 @@ abstract class KTableKTableAbstractJoin<K, V1, V2, VOut> implements
this.table2 = table2;
this.valueGetterSupplier1 = table1.valueGetterSupplier();
this.valueGetterSupplier2 = table2.valueGetterSupplier();
this.valueGetter1 = valueGetterSupplier1.get();
this.valueGetter2 = valueGetterSupplier2.get();
this.joiner = joiner;
}

Expand All @@ -51,4 +48,17 @@ public final boolean enableSendingOldValues(final boolean forceMaterialization)
sendOldValues = true;
return true;
}

public void setUseVersionedSemantics(final boolean useVersionedSemantics) {
this.useVersionedSemantics = useVersionedSemantics;
}

// VisibleForTesting
public boolean isUseVersionedSemantics() {
return useVersionedSemantics;
}

public String joinThisParentNodeName() {
return table1.graphNode.nodeName();
}
}
Expand Up @@ -45,7 +45,7 @@ class KTableKTableInnerJoin<K, V1, V2, VOut> extends KTableKTableAbstractJoin<K,

@Override
public Processor<K, Change<V1>, K, Change<VOut>> get() {
return new KTableKTableJoinProcessor(valueGetter1, valueGetter2);
return new KTableKTableJoinProcessor(valueGetterSupplier2.get());
}

@Override
Expand All @@ -67,14 +67,11 @@ public KTableValueGetter<K, VOut> get() {

private class KTableKTableJoinProcessor extends ContextualProcessor<K, Change<V1>, K, Change<VOut>> {

private final KTableValueGetter<K, V1> thisValueGetter;
private final KTableValueGetter<K, V2> otherValueGetter;
private final KTableValueGetter<K, V2> valueGetter;
private Sensor droppedRecordsSensor;

KTableKTableJoinProcessor(final KTableValueGetter<K, V1> thisValueGetter,
final KTableValueGetter<K, V2> otherValueGetter) {
this.thisValueGetter = thisValueGetter;
this.otherValueGetter = otherValueGetter;
KTableKTableJoinProcessor(final KTableValueGetter<K, V2> valueGetter) {
this.valueGetter = valueGetter;
}

@Override
Expand All @@ -85,8 +82,7 @@ public void init(final ProcessorContext<K, Change<VOut>> context) {
context.taskId().toString(),
(StreamsMetricsImpl) context.metrics()
);
thisValueGetter.init(context);
otherValueGetter.init(context);
valueGetter.init(context);
}

@Override
Expand All @@ -110,20 +106,17 @@ public void process(final Record<K, Change<V1>> record) {
}

// drop out-of-order records from versioned tables (cf. KIP-914)
if (thisValueGetter.isVersioned()) {
final ValueAndTimestamp<V1> valueAndTimestampLeft = thisValueGetter.get(record.key());
if (valueAndTimestampLeft != null && valueAndTimestampLeft.timestamp() > record.timestamp()) {
LOG.info("Skipping out-of-order record from versioned table while performing table-table join.");
droppedRecordsSensor.record();
return;
}
if (useVersionedSemantics && !record.value().isLatest) {
LOG.info("Skipping out-of-order record from versioned table while performing table-table join.");
droppedRecordsSensor.record();
return;
}

VOut newValue = null;
final long resultTimestamp;
VOut oldValue = null;

final ValueAndTimestamp<V2> valueAndTimestampRight = otherValueGetter.get(record.key());
final ValueAndTimestamp<V2> valueAndTimestampRight = valueGetter.get(record.key());
final V2 valueRight = getValueOrNull(valueAndTimestampRight);
if (valueRight == null) {
return;
Expand All @@ -144,8 +137,7 @@ public void process(final Record<K, Change<V1>> record) {

@Override
public void close() {
thisValueGetter.close();
otherValueGetter.close();
valueGetter.close();
}
}

Expand Down
Expand Up @@ -43,7 +43,7 @@ class KTableKTableLeftJoin<K, V1, V2, VOut> extends KTableKTableAbstractJoin<K,

@Override
public Processor<K, Change<V1>, K, Change<VOut>> get() {
return new KTableKTableLeftJoinProcessor(valueGetter1, valueGetter2);
return new KTableKTableLeftJoinProcessor(valueGetterSupplier2.get());
}

@Override
Expand All @@ -66,14 +66,11 @@ public KTableValueGetter<K, VOut> get() {

private class KTableKTableLeftJoinProcessor extends ContextualProcessor<K, Change<V1>, K, Change<VOut>> {

private final KTableValueGetter<K, V1> thisValueGetter;
private final KTableValueGetter<K, V2> otherValueGetter;
private final KTableValueGetter<K, V2> valueGetter;
private Sensor droppedRecordsSensor;

KTableKTableLeftJoinProcessor(final KTableValueGetter<K, V1> thisValueGetter,
final KTableValueGetter<K, V2> otherValueGetter) {
this.thisValueGetter = thisValueGetter;
this.otherValueGetter = otherValueGetter;
KTableKTableLeftJoinProcessor(final KTableValueGetter<K, V2> valueGetter) {
this.valueGetter = valueGetter;
}

@Override
Expand All @@ -84,8 +81,7 @@ public void init(final ProcessorContext<K, Change<VOut>> context) {
context.taskId().toString(),
(StreamsMetricsImpl) context.metrics()
);
thisValueGetter.init(context);
otherValueGetter.init(context);
valueGetter.init(context);
}

@Override
Expand All @@ -109,20 +105,17 @@ public void process(final Record<K, Change<V1>> record) {
}

// drop out-of-order records from versioned tables (cf. KIP-914)
if (thisValueGetter.isVersioned()) {
final ValueAndTimestamp<V1> valueAndTimestampLeft = thisValueGetter.get(record.key());
if (valueAndTimestampLeft != null && valueAndTimestampLeft.timestamp() > record.timestamp()) {
LOG.info("Skipping out-of-order record from versioned table while performing table-table join.");
droppedRecordsSensor.record();
return;
}
if (useVersionedSemantics && !record.value().isLatest) {
LOG.info("Skipping out-of-order record from versioned table while performing table-table join.");
droppedRecordsSensor.record();
return;
}

VOut newValue = null;
final long resultTimestamp;
VOut oldValue = null;

final ValueAndTimestamp<V2> valueAndTimestampRight = otherValueGetter.get(record.key());
final ValueAndTimestamp<V2> valueAndTimestampRight = valueGetter.get(record.key());
final V2 value2 = getValueOrNull(valueAndTimestampRight);
final long timestampRight;

Expand Down Expand Up @@ -150,8 +143,7 @@ public void process(final Record<K, Change<V1>> record) {

@Override
public void close() {
thisValueGetter.close();
otherValueGetter.close();
valueGetter.close();
}
}

Expand Down
Expand Up @@ -43,7 +43,7 @@ class KTableKTableOuterJoin<K, V1, V2, VOut> extends KTableKTableAbstractJoin<K,

@Override
public Processor<K, Change<V1>, K, Change<VOut>> get() {
return new KTableKTableOuterJoinProcessor(valueGetter1, valueGetter2);
return new KTableKTableOuterJoinProcessor(valueGetterSupplier2.get());
}

@Override
Expand All @@ -65,14 +65,11 @@ public KTableValueGetter<K, VOut> get() {

private class KTableKTableOuterJoinProcessor extends ContextualProcessor<K, Change<V1>, K, Change<VOut>> {

private final KTableValueGetter<K, V2> otherValueGetter;
private final KTableValueGetter<K, V1> thisValueGetter;
private final KTableValueGetter<K, V2> valueGetter;
private Sensor droppedRecordsSensor;

KTableKTableOuterJoinProcessor(final KTableValueGetter<K, V1> thisValueGetter,
final KTableValueGetter<K, V2> otherValueGetter) {
this.thisValueGetter = thisValueGetter;
this.otherValueGetter = otherValueGetter;
KTableKTableOuterJoinProcessor(final KTableValueGetter<K, V2> valueGetter) {
this.valueGetter = valueGetter;
}

@Override
Expand All @@ -83,8 +80,7 @@ public void init(final ProcessorContext<K, Change<VOut>> context) {
context.taskId().toString(),
(StreamsMetricsImpl) context.metrics()
);
thisValueGetter.init(context);
otherValueGetter.init(context);
valueGetter.init(context);
}

@Override
Expand All @@ -108,20 +104,17 @@ public void process(final Record<K, Change<V1>> record) {
}

// drop out-of-order records from versioned tables (cf. KIP-914)
if (thisValueGetter.isVersioned()) {
final ValueAndTimestamp<V1> valueAndTimestamp1 = thisValueGetter.get(record.key());
if (valueAndTimestamp1 != null && valueAndTimestamp1.timestamp() > record.timestamp()) {
LOG.info("Skipping out-of-order record from versioned table while performing table-table join.");
droppedRecordsSensor.record();
return;
}
if (useVersionedSemantics && !record.value().isLatest) {
LOG.info("Skipping out-of-order record from versioned table while performing table-table join.");
droppedRecordsSensor.record();
return;
}

VOut newValue = null;
final long resultTimestamp;
VOut oldValue = null;

final ValueAndTimestamp<V2> valueAndTimestamp2 = otherValueGetter.get(record.key());
final ValueAndTimestamp<V2> valueAndTimestamp2 = valueGetter.get(record.key());
final V2 value2 = getValueOrNull(valueAndTimestamp2);
if (value2 == null) {
if (record.value().newValue == null && record.value().oldValue == null) {
Expand All @@ -145,8 +138,7 @@ public void process(final Record<K, Change<V1>> record) {

@Override
public void close() {
thisValueGetter.close();
otherValueGetter.close();
valueGetter.close();
}
}

Expand Down

0 comments on commit 6cf1010

Please sign in to comment.