diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java index 0fc273ca684fd..31c0bc4074feb 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java @@ -187,9 +187,6 @@ protected Transformation createSinkTransformation( Optional lineageVertexOpt = TableLineageUtils.extractLineageDataset(outputObject); - // only add materialization if input has change - final boolean needMaterialization = !inputInsertOnly && upsertMaterialize; - Transformation sinkTransform = applyConstraintValidations(inputTransform, config, persistedRowType); @@ -202,10 +199,10 @@ protected Transformation createSinkTransformation( primaryKeys, sinkParallelism, inputParallelism, - needMaterialization); + upsertMaterialize); } - if (needMaterialization) { + if (upsertMaterialize) { sinkTransform = applyUpsertMaterialize( sinkTransform, diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSink.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSink.java index 1726105dee87e..aaeb7d009682f 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSink.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSink.java @@ -26,8 +26,8 @@ import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.transformations.OneInputTransformation; import org.apache.flink.table.api.InsertConflictStrategy; +import org.apache.flink.table.api.InsertConflictStrategy.ConflictBehavior; import org.apache.flink.table.api.TableException; -import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.api.config.ExecutionConfigOptions; import org.apache.flink.table.api.config.ExecutionConfigOptions.RowtimeInserter; import org.apache.flink.table.api.config.ExecutionConfigOptions.SinkUpsertMaterializeStrategy; @@ -55,6 +55,8 @@ import org.apache.flink.table.runtime.keyselector.RowDataKeySelector; import org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializer; import org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializerV2; +import org.apache.flink.table.runtime.operators.sink.WatermarkCompactingSinkMaterializer; +import org.apache.flink.table.runtime.operators.sink.WatermarkTimestampAssigner; import org.apache.flink.table.runtime.sequencedmultisetstate.SequencedMultiSetStateConfig; import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; import org.apache.flink.table.runtime.typeutils.TypeCheckUtils; @@ -100,6 +102,7 @@ producedTransformations = { CommonExecSink.CONSTRAINT_VALIDATOR_TRANSFORMATION, CommonExecSink.PARTITIONER_TRANSFORMATION, + StreamExecSink.WATERMARK_TIMESTAMP_ASSIGNER_TRANSFORMATION, CommonExecSink.UPSERT_MATERIALIZE_TRANSFORMATION, CommonExecSink.TIMESTAMP_INSERTER_TRANSFORMATION, CommonExecSink.SINK_TRANSFORMATION @@ -124,6 +127,7 @@ producedTransformations = { CommonExecSink.CONSTRAINT_VALIDATOR_TRANSFORMATION, CommonExecSink.PARTITIONER_TRANSFORMATION, + StreamExecSink.WATERMARK_TIMESTAMP_ASSIGNER_TRANSFORMATION, CommonExecSink.UPSERT_MATERIALIZE_TRANSFORMATION, CommonExecSink.TIMESTAMP_INSERTER_TRANSFORMATION, CommonExecSink.SINK_TRANSFORMATION @@ -133,6 +137,9 @@ public class StreamExecSink extends CommonExecSink implements StreamExecNode { private static final Logger LOG = LoggerFactory.getLogger(StreamExecSink.class); + public static final String WATERMARK_TIMESTAMP_ASSIGNER_TRANSFORMATION = + "watermark-timestamp-assigner"; + public static final String FIELD_NAME_INPUT_CHANGELOG_MODE = "inputChangelogMode"; public static final String FIELD_NAME_REQUIRE_UPSERT_MATERIALIZE = "requireUpsertMaterialize"; public static final String FIELD_NAME_UPSERT_MATERIALIZE_STRATEGY = "upsertMaterializeStrategy"; @@ -237,17 +244,6 @@ public StreamExecSink( @Override protected Transformation translateToPlanInternal( PlannerBase planner, ExecNodeConfig config) { - // TODO: FLINK-38928 Remove this validation once runtime support for ERROR and NOTHING is - // implemented - if (InsertConflictStrategy.error().equals(conflictStrategy) - || InsertConflictStrategy.nothing().equals(conflictStrategy)) { - throw new ValidationException( - "ON CONFLICT DO " - + conflictStrategy - + " is not yet supported. " - + "Please use ON CONFLICT DO DEDUPLICATE instead."); - } - final ExecEdge inputEdge = getInputEdges().get(0); final Transformation inputTransform = (Transformation) inputEdge.translateToPlan(planner); @@ -358,9 +354,29 @@ protected Transformation applyUpsertMaterialize( .mapToObj(idx -> fieldNames[idx]) .collect(Collectors.toList()); + // For ERROR/NOTHING strategies, apply WatermarkTimestampAssigner first + // This assigns the current watermark as the timestamp to each record, + // which is required for the WatermarkCompactingSinkMaterializer to work correctly + Transformation transformForMaterializer = inputTransform; + if (isErrorOrNothingConflictStrategy()) { + // Use input parallelism to preserve watermark semantics + transformForMaterializer = + ExecNodeUtil.createOneInputTransformation( + inputTransform, + createTransformationMeta( + WATERMARK_TIMESTAMP_ASSIGNER_TRANSFORMATION, + "WatermarkTimestampAssigner", + "WatermarkTimestampAssigner", + config), + new WatermarkTimestampAssigner(), + inputTransform.getOutputType(), + inputTransform.getParallelism(), + false); + } + OneInputTransformation materializeTransform = ExecNodeUtil.createOneInputTransformation( - inputTransform, + transformForMaterializer, createTransformationMeta( UPSERT_MATERIALIZE_TRANSFORMATION, String.format( @@ -390,6 +406,17 @@ private OneInputStreamOperator createSumOperator( GeneratedRecordEqualiser rowEqualiser, GeneratedHashFunction rowHashFunction) { + // Check if we should use the watermark-compacting materializer for ERROR/NOTHING strategies + if (isErrorOrNothingConflictStrategy()) { + return WatermarkCompactingSinkMaterializer.create( + conflictStrategy, + physicalRowType, + rowEqualiser, + upsertKeyEqualiser, + inputUpsertKey); + } + + // Use existing logic for DEDUPLICATE (legacy behavior) SinkUpsertMaterializeStrategy sinkUpsertMaterializeStrategy = Optional.ofNullable(upsertMaterializeStrategy) .orElse(SinkUpsertMaterializeStrategy.LEGACY); @@ -415,6 +442,12 @@ private OneInputStreamOperator createSumOperator( config)); } + private boolean isErrorOrNothingConflictStrategy() { + return conflictStrategy != null + && (conflictStrategy.getBehavior() == ConflictBehavior.ERROR + || conflictStrategy.getBehavior() == ConflictBehavior.NOTHING); + } + private static SequencedMultiSetStateConfig createStateConfig( SinkUpsertMaterializeStrategy strategy, TimeDomain ttlTimeDomain, diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalSink.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalSink.scala index c77d8312d523b..cf2a7d53c9e71 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalSink.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalSink.scala @@ -18,6 +18,7 @@ package org.apache.flink.table.planner.plan.nodes.physical.stream import org.apache.flink.table.api.InsertConflictStrategy +import org.apache.flink.table.api.InsertConflictStrategy.ConflictBehavior import org.apache.flink.table.api.config.ExecutionConfigOptions.{SinkUpsertMaterializeStrategy, TABLE_EXEC_SINK_UPSERT_MATERIALIZE_STRATEGY} import org.apache.flink.table.catalog.ContextResolvedTable import org.apache.flink.table.connector.sink.DynamicTableSink @@ -34,9 +35,12 @@ import org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTableConfig import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} import org.apache.calcite.rel.{RelNode, RelWriter} import org.apache.calcite.rel.hint.RelHint +import org.apache.calcite.util.ImmutableBitSet import java.util +import scala.collection.JavaConversions._ + /** * Stream physical RelNode to write data into an external sink defined by a [[DynamicTableSink]]. */ @@ -134,4 +138,35 @@ class StreamPhysicalSink( .itemIf("upsertMaterialize", "true", upsertMaterialize) .itemIf("conflictStrategy", conflictStrategy, conflictStrategy != null) } + + def isDeduplicateConflictStrategy: Boolean = { + conflictStrategy != null && conflictStrategy.getBehavior == ConflictBehavior.DEDUPLICATE + } + + def primaryKeysContainsUpsertKey: Boolean = { + val primaryKeys = contextResolvedTable.getResolvedSchema.getPrimaryKeyIndexes + val pks = ImmutableBitSet.of(primaryKeys: _*) + val fmq = FlinkRelMetadataQuery.reuseOrCreate(getCluster.getMetadataQuery) + val changeLogUpsertKeys = fmq.getUpsertKeys(getInput) + changeLogUpsertKeys != null && changeLogUpsertKeys.exists(pks.contains) + } + + def getUpsertKeyNames: String = { + val fmq = FlinkRelMetadataQuery.reuseOrCreate(getCluster.getMetadataQuery) + val changeLogUpsertKeys = fmq.getUpsertKeys(getInput) + if (changeLogUpsertKeys == null) { + "none" + } else { + val fieldNames = contextResolvedTable.getResolvedSchema.getColumnNames + changeLogUpsertKeys + .map(uk => uk.toArray.map(fieldNames.get).mkString("[", ", ", "]")) + .mkString(", ") + } + } + + def getPrimaryKeyNames: String = { + val fieldNames = contextResolvedTable.getResolvedSchema.getColumnNames + val primaryKeys = contextResolvedTable.getResolvedSchema.getPrimaryKeyIndexes + primaryKeys.map(fieldNames.get).mkString("[", ", ", "]") + } } diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala index baa9b25db025e..9cc84db2b7bae 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala @@ -1060,38 +1060,30 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti case UpsertMaterialize.FORCE => primaryKeys.nonEmpty && !sinkIsRetract case UpsertMaterialize.NONE => false case UpsertMaterialize.AUTO => - if ( - (inputIsAppend && InsertConflictStrategy - .deduplicate() - .equals(sink.conflictStrategy)) || sinkIsAppend || sinkIsRetract - ) { + // if the sink is not an UPSERT sink (has no PK, or is an APPEND or RETRACT sink) + // we don't need to materialize results + if (primaryKeys.isEmpty || sinkIsAppend || sinkIsRetract) { return false } - if (primaryKeys.isEmpty) { + + // For a DEDUPLICATE strategy and INSERT only input, we simply let the inserts be handled + // as UPSERT_AFTER and overwrite previous value + if (inputIsAppend && sink.isDeduplicateConflictStrategy) { return false } - val pks = ImmutableBitSet.of(primaryKeys: _*) - val fmq = FlinkRelMetadataQuery.reuseOrCreate(sink.getCluster.getMetadataQuery) - val changeLogUpsertKeys = fmq.getUpsertKeys(sink.getInput) - // if input has updates and primary key != upsert key (upsert key can be null) we should - // enable upsertMaterialize. An optimize is: do not enable upsertMaterialize when sink - // pk(s) contains input changeLogUpsertKeys - val upsertKeyDiffersFromPk = - changeLogUpsertKeys == null || !changeLogUpsertKeys.exists(pks.contains) + + // if input has updates and primary key != upsert key we should enable upsertMaterialize. + // + // An optimize is: do not enable upsertMaterialize when sink pk(s) contains input + // changeLogUpsertKeys + val upsertKeyDiffersFromPk = !sink.primaryKeysContainsUpsertKey // Validate that ON CONFLICT is specified when upsert key differs from primary key val requireOnConflict = tableConfig.get(ExecutionConfigOptions.TABLE_EXEC_SINK_REQUIRE_ON_CONFLICT) if (requireOnConflict && upsertKeyDiffersFromPk && sink.conflictStrategy == null) { - val fieldNames = sink.contextResolvedTable.getResolvedSchema.getColumnNames - val pkNames = primaryKeys.map(fieldNames.get(_)).mkString("[", ", ", "]") - val upsertKeyNames = if (changeLogUpsertKeys == null) { - "none" - } else { - changeLogUpsertKeys - .map(uk => uk.toArray.map(fieldNames.get(_)).mkString("[", ", ", "]")) - .mkString(", ") - } + val pkNames = sink.getPrimaryKeyNames + val upsertKeyNames = sink.getUpsertKeyNames throw new ValidationException( "The query has an upsert key that differs from the primary key of the sink table " + s"'${sink.contextResolvedTable.getIdentifier.asSummaryString}'. " + diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/SinkSemanticTests.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/SinkSemanticTests.java index 7eaa6c06c7f33..6dba8353f0296 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/SinkSemanticTests.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/SinkSemanticTests.java @@ -18,21 +18,48 @@ package org.apache.flink.table.planner.plan.nodes.exec.stream; +import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.planner.plan.nodes.exec.testutils.SemanticTestBase; +import org.apache.flink.table.test.program.SinkTestStep; import org.apache.flink.table.test.program.TableTestProgram; +import org.apache.flink.table.test.program.TestStep; +import org.apache.flink.table.test.program.TestStep.TestKind; +import java.util.EnumSet; import java.util.List; +import java.util.Map; /** Semantic tests for {@link StreamExecSink}. */ public class SinkSemanticTests extends SemanticTestBase { + @Override + public EnumSet supportedSetupSteps() { + EnumSet steps = super.supportedSetupSteps(); + steps.add(TestKind.SINK_WITHOUT_DATA); + return steps; + } + + @Override + protected void runStep(TestStep testStep, TableEnvironment env) throws Exception { + if (testStep.getKind() == TestKind.SINK_WITHOUT_DATA) { + final SinkTestStep sinkTestStep = (SinkTestStep) testStep; + sinkTestStep.apply( + env, + Map.ofEntries( + Map.entry("connector", "values"), + Map.entry("sink-insert-only", "false"))); + } else { + super.runStep(testStep, env); + } + } + @Override public List programs() { return List.of( SinkTestPrograms.INSERT_RETRACT_WITHOUT_PK, SinkTestPrograms.INSERT_RETRACT_WITH_PK, - SinkTestPrograms.ON_CONFLICT_DO_NOTHING_NOT_SUPPORTED, - SinkTestPrograms.ON_CONFLICT_DO_ERROR_NOT_SUPPORTED, + SinkTestPrograms.ON_CONFLICT_DO_NOTHING_KEEPS_FIRST, + SinkTestPrograms.ON_CONFLICT_DO_ERROR_NO_CONFLICT, SinkTestPrograms.UPSERT_KEY_DIFFERS_FROM_PK_WITHOUT_ON_CONFLICT, SinkTestPrograms.UPSERT_KEY_DIFFERS_FROM_PK_WITH_ON_CONFLICT, SinkTestPrograms.UPSERT_KEY_MATCHES_PK_WITHOUT_ON_CONFLICT, diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/SinkTestPrograms.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/SinkTestPrograms.java index c59fb88521d8d..6ddc55cc18b90 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/SinkTestPrograms.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/SinkTestPrograms.java @@ -89,46 +89,47 @@ public class SinkTestPrograms { "INSERT INTO sink_t SELECT UPPER(name), SUM(score) FROM source_t GROUP BY name") .build(); - // --- ON CONFLICT validation tests --- + // --- ON CONFLICT tests --- - public static final TableTestProgram ON_CONFLICT_DO_NOTHING_NOT_SUPPORTED = + public static final TableTestProgram ON_CONFLICT_DO_NOTHING_KEEPS_FIRST = TableTestProgram.of( - "sink-on-conflict-do-nothing-not-supported", - "ON CONFLICT DO NOTHING is not yet supported and should throw ValidationException.") + "sink-on-conflict-do-nothing-keeps-first", + "ON CONFLICT DO NOTHING keeps the first record when multiple records have the same PK.") .setupTableSource( SourceTestStep.newBuilder("source_t") .addSchema("a INT", "b BIGINT") .addOption("changelog-mode", "I") - .producedValues(Row.ofKind(RowKind.INSERT, 1, 1L)) + .producedValues( + Row.ofKind(RowKind.INSERT, 1, 10L), + Row.ofKind(RowKind.INSERT, 1, 20L), + Row.ofKind(RowKind.INSERT, 2, 30L)) .build()) .setupTableSink( SinkTestStep.newBuilder("sink_t") .addSchema("a INT PRIMARY KEY NOT ENFORCED", "b BIGINT") + .consumedValues("+I[1, 10]", "+I[2, 30]") .build()) - .runFailingSql( - "INSERT INTO sink_t SELECT a, b FROM source_t ON CONFLICT DO NOTHING", - ValidationException.class, - "ON CONFLICT DO NOTHING is not yet supported") + .runSql("INSERT INTO sink_t SELECT a, b FROM source_t ON CONFLICT DO NOTHING") .build(); - public static final TableTestProgram ON_CONFLICT_DO_ERROR_NOT_SUPPORTED = + public static final TableTestProgram ON_CONFLICT_DO_ERROR_NO_CONFLICT = TableTestProgram.of( - "sink-on-conflict-do-error-not-supported", - "ON CONFLICT DO ERROR is not yet supported and should throw ValidationException.") + "sink-on-conflict-do-error-no-conflict", + "ON CONFLICT DO ERROR with no conflicts passes through all records.") .setupTableSource( SourceTestStep.newBuilder("source_t") .addSchema("a INT", "b BIGINT") .addOption("changelog-mode", "I") - .producedValues(Row.ofKind(RowKind.INSERT, 1, 1L)) + .producedValues( + Row.ofKind(RowKind.INSERT, 1, 10L), + Row.ofKind(RowKind.INSERT, 2, 20L)) .build()) .setupTableSink( SinkTestStep.newBuilder("sink_t") .addSchema("a INT PRIMARY KEY NOT ENFORCED", "b BIGINT") + .consumedValues("+I[1, 10]", "+I[2, 20]") .build()) - .runFailingSql( - "INSERT INTO sink_t SELECT a, b FROM source_t ON CONFLICT DO ERROR", - ValidationException.class, - "ON CONFLICT DO ERROR is not yet supported") + .runSql("INSERT INTO sink_t SELECT a, b FROM source_t ON CONFLICT DO ERROR") .build(); public static final TableTestProgram UPSERT_KEY_DIFFERS_FROM_PK_WITHOUT_ON_CONFLICT = diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/SortedLongSerializer.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/SortedLongSerializer.java new file mode 100644 index 0000000000000..4a31a1d2da1e7 --- /dev/null +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/SortedLongSerializer.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.sink; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot; +import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; +import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.util.MathUtils; + +import java.io.IOException; + +/** + * A serializer for {@code Long} that produces a lexicographically sortable byte representation. + * + *

Standard big-endian long serialization does not maintain numeric ordering for negative values + * in lexicographic byte comparison because negative numbers have their sign bit set (1), which + * makes them appear greater than positive numbers (sign bit 0) in unsigned byte comparison. + * + *

This serializer flips the sign bit during serialization, converting from signed to unsigned + * ordering. This ensures that when iterating over keys in a sorted state backend (like RocksDB), + * the entries are returned in numeric order. + * + * @see org.apache.flink.streaming.api.operators.TimerSerializer + */ +@Internal +public final class SortedLongSerializer extends TypeSerializerSingleton { + + private static final long serialVersionUID = 1L; + + /** Sharable instance of the SortedLongSerializer. */ + public static final SortedLongSerializer INSTANCE = new SortedLongSerializer(); + + private static final Long ZERO = 0L; + + @Override + public boolean isImmutableType() { + return true; + } + + @Override + public Long createInstance() { + return ZERO; + } + + @Override + public Long copy(Long from) { + return from; + } + + @Override + public Long copy(Long from, Long reuse) { + return from; + } + + @Override + public int getLength() { + return Long.BYTES; + } + + @Override + public void serialize(Long record, DataOutputView target) throws IOException { + target.writeLong(MathUtils.flipSignBit(record)); + } + + @Override + public Long deserialize(DataInputView source) throws IOException { + return MathUtils.flipSignBit(source.readLong()); + } + + @Override + public Long deserialize(Long reuse, DataInputView source) throws IOException { + return deserialize(source); + } + + @Override + public void copy(DataInputView source, DataOutputView target) throws IOException { + target.writeLong(source.readLong()); + } + + @Override + public TypeSerializerSnapshot snapshotConfiguration() { + return new SortedLongSerializerSnapshot(); + } + + // ------------------------------------------------------------------------ + + /** Serializer configuration snapshot for compatibility and format evolution. */ + @SuppressWarnings("WeakerAccess") + public static final class SortedLongSerializerSnapshot + extends SimpleTypeSerializerSnapshot { + + public SortedLongSerializerSnapshot() { + super(() -> INSTANCE); + } + } +} diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/WatermarkCompactingSinkMaterializer.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/WatermarkCompactingSinkMaterializer.java new file mode 100644 index 0000000000000..43d41d82b6739 --- /dev/null +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/WatermarkCompactingSinkMaterializer.java @@ -0,0 +1,455 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.sink; + +import org.apache.flink.api.common.state.MapState; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.ListSerializer; +import org.apache.flink.runtime.state.KeyedStateBackend; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.VoidNamespace; +import org.apache.flink.runtime.state.VoidNamespaceSerializer; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.table.api.InsertConflictStrategy; +import org.apache.flink.table.api.InsertConflictStrategy.ConflictBehavior; +import org.apache.flink.table.api.TableRuntimeException; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.utils.ProjectedRowData; +import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser; +import org.apache.flink.table.runtime.generated.RecordEqualiser; +import org.apache.flink.table.runtime.operators.TableStreamOperator; +import org.apache.flink.table.runtime.typeutils.InternalSerializers; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.types.RowKind; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.flink.types.RowKind.DELETE; +import static org.apache.flink.types.RowKind.INSERT; +import static org.apache.flink.types.RowKind.UPDATE_AFTER; + +/** + * A sink materializer that buffers records and compacts them on watermark progression. + * + *

This operator implements the watermark-based compaction algorithm from FLIP-558 for handling + * changelog disorder when the upsert key differs from the sink's primary key. + */ +public class WatermarkCompactingSinkMaterializer extends TableStreamOperator + implements OneInputStreamOperator { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = + LoggerFactory.getLogger(WatermarkCompactingSinkMaterializer.class); + + private static final String STATE_CLEARED_WARN_MSG = + "The state is cleared because of state TTL. This will result in incorrect result. " + + "You can increase the state TTL to avoid this."; + private static final Set ORDERED_STATE_BACKENDS = Set.of("rocksdb", "forst"); + + private final InsertConflictStrategy conflictStrategy; + private final TypeSerializer serializer; + private final GeneratedRecordEqualiser generatedRecordEqualiser; + private final GeneratedRecordEqualiser generatedUpsertKeyEqualiser; + private final int[] inputUpsertKey; + private final boolean hasUpsertKey; + + private transient MapStateDescriptor> bufferDescriptor; + private transient MapState> buffer; + private transient ValueState currentValue; + private transient RecordEqualiser equaliser; + private transient RecordEqualiser upsertKeyEqualiser; + private transient TimestampedCollector collector; + private transient boolean isOrderedStateBackend; + + // Reused ProjectedRowData for comparing upsertKey if hasUpsertKey. + private transient ProjectedRowData upsertKeyProjectedRow1; + private transient ProjectedRowData upsertKeyProjectedRow2; + + // Tracks the current watermark. Used to detect in-flight records after restore. + private transient long currentWatermark = Long.MIN_VALUE; + + public WatermarkCompactingSinkMaterializer( + InsertConflictStrategy conflictStrategy, + TypeSerializer serializer, + GeneratedRecordEqualiser generatedRecordEqualiser, + @Nullable GeneratedRecordEqualiser generatedUpsertKeyEqualiser, + @Nullable int[] inputUpsertKey) { + validateConflictStrategy(conflictStrategy); + this.conflictStrategy = conflictStrategy; + this.serializer = serializer; + this.generatedRecordEqualiser = generatedRecordEqualiser; + this.generatedUpsertKeyEqualiser = generatedUpsertKeyEqualiser; + this.inputUpsertKey = inputUpsertKey; + this.hasUpsertKey = inputUpsertKey != null && inputUpsertKey.length > 0; + } + + private static void validateConflictStrategy(InsertConflictStrategy strategy) { + Preconditions.checkArgument( + strategy.getBehavior() == ConflictBehavior.ERROR + || strategy.getBehavior() == ConflictBehavior.NOTHING, + "Only ERROR and NOTHING strategies are supported, got: %s", + strategy); + } + + @Override + public void initializeState(StateInitializationContext context) throws Exception { + super.initializeState(context); + + // Initialize state descriptors and handles + this.bufferDescriptor = + new MapStateDescriptor<>( + "watermark-buffer", + SortedLongSerializer.INSTANCE, + new ListSerializer<>(serializer)); + this.buffer = context.getKeyedStateStore().getMapState(bufferDescriptor); + + ValueStateDescriptor currentValueDescriptor = + new ValueStateDescriptor<>("current-value", serializer); + this.currentValue = context.getKeyedStateStore().getState(currentValueDescriptor); + + if (context.isRestored()) { + // Detect ordered state backend before consolidation + detectOrderedStateBackend(); + + // Consolidate all buffered records to MIN_VALUE for each key. + // This ensures they are compacted on the first watermark after restore. + getKeyedStateBackend() + .applyToAllKeys( + VoidNamespace.INSTANCE, + VoidNamespaceSerializer.INSTANCE, + bufferDescriptor, + (key, state) -> consolidateBufferToMinValue()); + } + } + + private void consolidateBufferToMinValue() throws Exception { + List consolidated = new ArrayList<>(); + + if (isOrderedStateBackend) { + // RocksDB/ForSt: entries are already sorted by timestamp + Iterator>> iterator = buffer.entries().iterator(); + while (iterator.hasNext()) { + consolidated.addAll(iterator.next().getValue()); + iterator.remove(); + } + } else { + // Other backends: collect, sort by timestamp, then consolidate + List>> entries = new ArrayList<>(); + Iterator>> iterator = buffer.entries().iterator(); + while (iterator.hasNext()) { + entries.add(iterator.next()); + iterator.remove(); + } + + entries.sort(Map.Entry.comparingByKey()); + + for (Map.Entry> entry : entries) { + consolidated.addAll(entry.getValue()); + } + } + + if (!consolidated.isEmpty()) { + buffer.put(Long.MIN_VALUE, consolidated); + } + } + + @Override + public void open() throws Exception { + super.open(); + initializeEqualisers(); + detectOrderedStateBackend(); + this.collector = new TimestampedCollector<>(output); + } + + private void initializeEqualisers() { + if (hasUpsertKey) { + this.upsertKeyEqualiser = + generatedUpsertKeyEqualiser.newInstance( + getRuntimeContext().getUserCodeClassLoader()); + upsertKeyProjectedRow1 = ProjectedRowData.from(inputUpsertKey); + upsertKeyProjectedRow2 = ProjectedRowData.from(inputUpsertKey); + } + this.equaliser = + generatedRecordEqualiser.newInstance(getRuntimeContext().getUserCodeClassLoader()); + } + + private void detectOrderedStateBackend() { + KeyedStateBackend keyedStateBackend = getKeyedStateBackend(); + String backendType = + keyedStateBackend != null ? keyedStateBackend.getBackendTypeIdentifier() : ""; + this.isOrderedStateBackend = ORDERED_STATE_BACKENDS.contains(backendType); + + if (isOrderedStateBackend) { + LOG.info("Using ordered state backend optimization for {} backend", backendType); + } + } + + @Override + public void processElement(StreamRecord element) throws Exception { + RowData row = element.getValue(); + long assignedTimestamp = element.getTimestamp(); + + // If we haven't received any watermark yet (still at MIN_VALUE after restore) + // and the timestamp is beyond MIN_VALUE, it's from in-flight data that was + // checkpointed before restore. Assign to MIN_VALUE. + if (currentWatermark == Long.MIN_VALUE && assignedTimestamp > Long.MIN_VALUE) { + assignedTimestamp = Long.MIN_VALUE; + } + + bufferRecord(assignedTimestamp, row); + } + + private void bufferRecord(long timestamp, RowData row) throws Exception { + List records = buffer.get(timestamp); + if (records == null) { + records = new ArrayList<>(); + } + switch (row.getRowKind()) { + case INSERT: + case UPDATE_AFTER: + // Try to cancel out a pending retraction; if none, just append + if (!tryCancelRetraction(records, row)) { + records.add(row); + } + break; + case UPDATE_BEFORE: + case DELETE: + // Try to cancel out an existing addition; if none, keep for cross-bucket + if (!tryCancelAddition(records, row)) { + records.add(row); + } + break; + } + buffer.put(timestamp, records); + } + + @Override + public void processWatermark(Watermark mark) throws Exception { + final long watermarkTimestamp = mark.getTimestamp(); + this.currentWatermark = watermarkTimestamp; + + // Iterate over all keys and compact their buffered records + this.getKeyedStateBackend() + .applyToAllKeys( + VoidNamespace.INSTANCE, + VoidNamespaceSerializer.INSTANCE, + bufferDescriptor, + (key, state) -> compactAndEmit(watermarkTimestamp)); + + super.processWatermark(mark); + } + + private void compactAndEmit(long newWatermark) throws Exception { + RowData previousValue = currentValue.value(); + List pendingRecords = collectPendingRecords(previousValue, newWatermark); + + if (pendingRecords.size() > 1) { + if (conflictStrategy.getBehavior() == ConflictBehavior.ERROR) { + throw new TableRuntimeException( + "Primary key constraint violation: multiple distinct records with " + + "the same primary key detected. Use ON CONFLICT DO NOTHING " + + "to keep the first record."); + } else if (previousValue == null) { + final RowData newValue = pendingRecords.get(0); + emit(newValue, INSERT); + currentValue.update(newValue); + } + } else if (pendingRecords.isEmpty()) { + if (previousValue != null) { + emit(previousValue, DELETE); + currentValue.clear(); + } + } else { + final RowData newValue = pendingRecords.get(0); + if (previousValue == null) { + emit(newValue, INSERT); + currentValue.update(newValue); + } else if (!recordEquals(previousValue, newValue)) { + emit(newValue, UPDATE_AFTER); + currentValue.update(newValue); + } + } + } + + private List collectPendingRecords(RowData previousValue, long newWatermark) + throws Exception { + List records = new ArrayList<>(); + if (previousValue != null) { + records.add(previousValue); + } + Iterator>> iterator = buffer.entries().iterator(); + + while (iterator.hasNext()) { + Map.Entry> entry = iterator.next(); + if (entry.getKey() <= newWatermark) { + for (RowData pendingRecord : entry.getValue()) { + switch (pendingRecord.getRowKind()) { + case INSERT: + case UPDATE_AFTER: + addRow(records, pendingRecord); + break; + + case UPDATE_BEFORE: + case DELETE: + retractRow(records, pendingRecord); + break; + } + } + iterator.remove(); + } else if (isOrderedStateBackend) { + break; + } + } + return records; + } + + private void addRow(List values, RowData add) { + if (hasUpsertKey) { + int index = findFirst(values, add); + if (index == -1) { + values.add(add); + } else { + values.set(index, add); + } + } else { + values.add(add); + } + } + + private void retractRow(List values, RowData retract) { + final int index = findFirst(values, retract); + if (index == -1) { + LOG.info(STATE_CLEARED_WARN_MSG); + } else { + // Remove first found row + values.remove(index); + } + } + + /** + * Attempts to cancel out a retraction by finding a matching retractive record + * (DELETE/UPDATE_BEFORE) with identical content. + * + * @return true if a matching retraction was found and removed, false otherwise + */ + private boolean tryCancelRetraction(List values, RowData addition) { + final Iterator iterator = values.iterator(); + while (iterator.hasNext()) { + RowData candidate = iterator.next(); + RowKind kind = candidate.getRowKind(); + if ((kind == DELETE || kind == RowKind.UPDATE_BEFORE) + && recordEquals(addition, candidate)) { + iterator.remove(); + return true; + } + } + return false; + } + + /** + * Attempts to cancel out an addition by finding a matching additive record + * (INSERT/UPDATE_AFTER) with identical content. + * + * @return true if a matching addition was found and removed, false otherwise + */ + private boolean tryCancelAddition(List values, RowData retraction) { + final Iterator iterator = values.iterator(); + while (iterator.hasNext()) { + RowData candidate = iterator.next(); + RowKind kind = candidate.getRowKind(); + if ((kind == INSERT || kind == UPDATE_AFTER) && recordEquals(retraction, candidate)) { + iterator.remove(); + return true; + } + } + return false; + } + + private int findFirst(List values, RowData target) { + final Iterator iterator = values.iterator(); + int i = 0; + while (iterator.hasNext()) { + if (equalsIgnoreRowKind(target, iterator.next())) { + return i; + } + i++; + } + return -1; + } + + private boolean equalsIgnoreRowKind(RowData newRow, RowData oldRow) { + newRow.setRowKind(oldRow.getRowKind()); + if (hasUpsertKey) { + return this.upsertKeyEqualiser.equals( + upsertKeyProjectedRow1.replaceRow(newRow), + upsertKeyProjectedRow2.replaceRow(oldRow)); + } + return equaliser.equals(newRow, oldRow); + } + + private void emit(RowData row, RowKind kind) { + RowKind originalKind = row.getRowKind(); + row.setRowKind(kind); + collector.collect(row); + row.setRowKind(originalKind); + } + + private boolean recordEquals(RowData row1, RowData row2) { + RowKind kind1 = row1.getRowKind(); + RowKind kind2 = row2.getRowKind(); + row1.setRowKind(RowKind.INSERT); + row2.setRowKind(RowKind.INSERT); + boolean result = equaliser.equals(row1, row2); + row1.setRowKind(kind1); + row2.setRowKind(kind2); + return result; + } + + /** Factory method to create a new instance. */ + public static WatermarkCompactingSinkMaterializer create( + InsertConflictStrategy conflictStrategy, + RowType physicalRowType, + GeneratedRecordEqualiser rowEqualiser, + @Nullable GeneratedRecordEqualiser upsertKeyEqualiser, + @Nullable int[] inputUpsertKey) { + return new WatermarkCompactingSinkMaterializer( + conflictStrategy, + InternalSerializers.create(physicalRowType), + rowEqualiser, + upsertKeyEqualiser, + inputUpsertKey); + } +} diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/WatermarkTimestampAssigner.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/WatermarkTimestampAssigner.java new file mode 100644 index 0000000000000..17598df521d4a --- /dev/null +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/WatermarkTimestampAssigner.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.sink; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.operators.TableStreamOperator; + +/** + * Operator that assigns the current watermark as the timestamp to each incoming StreamRecord. + * + *

This is used in conjunction with {@link WatermarkCompactingSinkMaterializer} which buffers + * records by their timestamp. Without meaningful timestamps, all records would be buffered under + * the same key, breaking the watermark-based compaction logic. + * + *

If the current watermark is {@code Long.MIN_VALUE} (the initial state before any watermark + * arrives), records will be assigned that value and will be compacted when the first watermark + * arrives. + */ +@Internal +public class WatermarkTimestampAssigner extends TableStreamOperator + implements OneInputStreamOperator { + + private static final long serialVersionUID = 1L; + + @Override + public void processElement(StreamRecord element) throws Exception { + element.setTimestamp(currentWatermark); + output.collect(element); + } +} diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/WatermarkCompactingSinkMaterializerTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/WatermarkCompactingSinkMaterializerTest.java new file mode 100644 index 0000000000000..482a5e61ea2c0 --- /dev/null +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/WatermarkCompactingSinkMaterializerTest.java @@ -0,0 +1,733 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.sink; + +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; +import org.apache.flink.table.api.InsertConflictStrategy; +import org.apache.flink.table.api.InsertConflictStrategy.ConflictBehavior; +import org.apache.flink.table.api.TableRuntimeException; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser; +import org.apache.flink.table.runtime.generated.RecordEqualiser; +import org.apache.flink.table.types.logical.BigIntType; +import org.apache.flink.table.types.logical.IntType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.VarCharType; +import org.apache.flink.table.utils.HandwrittenSelectorUtil; +import org.apache.flink.types.RowKind; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; + +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; + +import static org.apache.flink.table.runtime.util.StreamRecordUtils.delete; +import static org.apache.flink.table.runtime.util.StreamRecordUtils.deleteRecord; +import static org.apache.flink.table.runtime.util.StreamRecordUtils.insert; +import static org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord; +import static org.apache.flink.table.runtime.util.StreamRecordUtils.rowOfKind; +import static org.apache.flink.table.runtime.util.StreamRecordUtils.updateAfter; +import static org.apache.flink.table.runtime.util.StreamRecordUtils.updateAfterRecord; +import static org.apache.flink.table.runtime.util.StreamRecordUtils.updateBefore; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Tests for {@link WatermarkCompactingSinkMaterializer}. */ +class WatermarkCompactingSinkMaterializerTest { + + private static final int PRIMARY_KEY_INDEX = 1; + + private static final LogicalType[] LOGICAL_TYPES = + new LogicalType[] {new BigIntType(), new IntType(), new VarCharType()}; + + private static final GeneratedRecordEqualiser RECORD_EQUALISER = + new GeneratedRecordEqualiser("", "", new Object[0]) { + @Override + public RecordEqualiser newInstance(ClassLoader classLoader) { + return new TestRecordEqualiser(); + } + }; + + private static final GeneratedRecordEqualiser UPSERT_KEY_EQUALISER = + new GeneratedRecordEqualiser("", "", new Object[0]) { + @Override + public RecordEqualiser newInstance(ClassLoader classLoader) { + return new TestUpsertKeyEqualiser(); + } + }; + + @ParameterizedTest + @EnumSource( + value = ConflictBehavior.class, + names = {"ERROR", "NOTHING"}) + void testBasicInsertWithWatermarkProgression(ConflictBehavior behavior) throws Exception { + try (KeyedOneInputStreamOperatorTestHarness harness = + createHarness(behavior)) { + harness.open(); + + // Insert first record (watermark is MIN_VALUE) + harness.processElement(insertRecord(1L, 1, "a1")); + assertEmitsNothing(harness); // Buffered, waiting for watermark + + // Advance watermark to trigger compaction + harness.processWatermark(100L); + assertEmits(harness, insert(1L, 1, "a1")); + + // Update with same upsert key (this is the expected pattern for single-source updates) + harness.processElement(updateAfterRecord(1L, 1, "a2")); + assertEmitsNothing(harness); + + // Advance watermark again + harness.processWatermark(200L); + assertEmits(harness, updateAfter(1L, 1, "a2")); + } + } + + @ParameterizedTest + @EnumSource( + value = ConflictBehavior.class, + names = {"ERROR", "NOTHING"}) + void testDeleteAfterInsert(ConflictBehavior behavior) throws Exception { + try (KeyedOneInputStreamOperatorTestHarness harness = + createHarness(behavior)) { + harness.open(); + + // Insert and compact + harness.processElement(insertRecord(1L, 1, "a1")); + harness.processWatermark(100L); + assertEmits(harness, insert(1L, 1, "a1")); + + // Delete and compact + harness.processElement(deleteRecord(1L, 1, "a1")); + harness.processWatermark(200L); + assertEmits(harness, delete(1L, 1, "a1")); + } + } + + @ParameterizedTest + @EnumSource( + value = ConflictBehavior.class, + names = {"ERROR", "NOTHING"}) + void testInsertAndDeleteInSameWindow(ConflictBehavior behavior) throws Exception { + try (KeyedOneInputStreamOperatorTestHarness harness = + createHarness(behavior)) { + harness.open(); + + // Insert and delete before watermark advances + harness.processElement(insertRecord(1L, 1, "a1")); + harness.processElement(deleteRecord(1L, 1, "a1")); + + // Compact - should emit nothing since insert and delete cancel out + harness.processWatermark(100L); + assertEmitsNothing(harness); + } + } + + @Test + void testDoNothingKeepsFirstRecord() throws Exception { + try (KeyedOneInputStreamOperatorTestHarness harness = + createHarness(ConflictBehavior.NOTHING)) { + harness.open(); + + // Insert two records with different upsert keys but same primary key + harness.processElement(insertRecord(1L, 1, "first")); + harness.processElement(insertRecord(2L, 1, "second")); + + // Compact - should keep the first record + harness.processWatermark(100L); + assertEmits(harness, insert(1L, 1, "first")); + } + } + + @Test + void testDoErrorThrowsOnConflict() throws Exception { + try (KeyedOneInputStreamOperatorTestHarness harness = + createHarness(ConflictBehavior.ERROR)) { + harness.open(); + + // Insert two records with different upsert keys but same primary key + harness.processElement(insertRecord(1L, 1, "first")); + harness.processElement(insertRecord(2L, 1, "second")); + + // Compact - should throw exception + assertThatThrownBy(() -> harness.processWatermark(100L)) + .isInstanceOf(TableRuntimeException.class) + .hasMessageContaining("Primary key constraint violation"); + } + } + + @Test + void testDoErrorAllowsSameUpsertKey() throws Exception { + try (KeyedOneInputStreamOperatorTestHarness harness = + createHarness(ConflictBehavior.ERROR)) { + harness.open(); + + // Insert two records with same upsert key (updates to same source) + harness.processElement(insertRecord(1L, 1, "v1")); + harness.processElement(updateAfterRecord(1L, 1, "v2")); + + // Compact - should not throw, just keep the latest + harness.processWatermark(100L); + assertEmits(harness, insert(1L, 1, "v2")); + } + } + + @ParameterizedTest + @EnumSource( + value = ConflictBehavior.class, + names = {"ERROR", "NOTHING"}) + void testChangelogDisorderHandling(ConflictBehavior behavior) throws Exception { + try (KeyedOneInputStreamOperatorTestHarness harness = + createHarness(behavior)) { + harness.open(); + + // Simulate changelog disorder from FLIP-558 example: + // Records from different sources (different upsert keys: 1L and 2L) map to same PK (1) + // Ideal order: +I(1,1,a1), -U(1,1,a1), +U(2,1,b1) + // Disordered: +U(2,1,b1), +I(1,1,a1), -U(1,1,a1) + + // The +U from source 2 arrives first (upsert key = 2L) + harness.processElement(updateAfterRecord(2L, 1, "b1")); + // Then +I and -U from source 1 arrive (upsert key = 1L) + harness.processElement(insertRecord(1L, 1, "a1")); + harness.processElement(deleteRecord(1L, 1, "a1")); + + // Net result: only (2L, 1, "b1") remains after cancellation, no conflict + harness.processWatermark(100L); + assertEmits(harness, insert(2L, 1, "b1")); + } + } + + @ParameterizedTest + @EnumSource( + value = ConflictBehavior.class, + names = {"ERROR", "NOTHING"}) + void testNoEmissionWhenValueUnchanged(ConflictBehavior behavior) throws Exception { + try (KeyedOneInputStreamOperatorTestHarness harness = + createHarness(behavior)) { + harness.open(); + + // Insert and compact + harness.processElement(insertRecord(1L, 1, "value")); + harness.processWatermark(100L); + assertEmits(harness, insert(1L, 1, "value")); + + // Insert same value again (same upsert key) + harness.processElement(updateAfterRecord(1L, 1, "value")); + harness.processWatermark(200L); + // Should not emit since value is the same + assertEmitsNothing(harness); + } + } + + /** + * Tests that record timestamps are handled correctly when multiple inputs send records that + * arrive out of timestamp order. This simulates the case where records from different upstream + * tasks have different timestamps and arrive interleaved. + * + *

Input 1 uses upsert key = 1L, Input 2 uses upsert key = 2L. All records have same primary + * key (1). + * + *

Sequence: + * + *

    + *
  1. INSERT(input=1, t=2) + *
  2. watermark=3 -> emits INSERT + *
  3. UPDATE_BEFORE(input=1, t=4) + *
  4. UPDATE_AFTER(input=1, t=6) + *
  5. UPDATE_AFTER(input=2, t=4) - arrives after t=6 record but has smaller timestamp + *
  6. watermark=5 -> compacts t<=5 records + *
  7. UPDATE_BEFORE(input=2, t=6) + *
  8. watermark=10 -> compacts remaining t=6 records + *
+ */ + @Test + void testTwoUpstreamTasksWithDisorderedWatermarks() throws Exception { + try (KeyedOneInputStreamOperatorTestHarness harness = + createHarness(ConflictBehavior.NOTHING)) { + harness.open(); + + // INSERT from input 1 with timestamp 2 + harness.processElement(recordWithTimestamp(RowKind.INSERT, 1L, 1, "v1", 2L)); + assertEmitsNothing(harness); + + // watermark=3: compacts records with t<=3, emits INSERT(t=2) + harness.processWatermark(3L); + assertEmits(harness, insert(1L, 1, "v1")); + + // UPDATE_BEFORE from input 1 with timestamp 4 + harness.processElement(recordWithTimestamp(RowKind.UPDATE_BEFORE, 1L, 1, "v1", 4L)); + assertEmitsNothing(harness); + + // UPDATE_AFTER from input 1 with timestamp 6 + harness.processElement(recordWithTimestamp(RowKind.UPDATE_AFTER, 1L, 1, "v4", 6L)); + assertEmitsNothing(harness); + + // UPDATE_AFTER from input 2 with timestamp 4 + // This record arrives after the t=6 record but has a smaller timestamp + harness.processElement(recordWithTimestamp(RowKind.UPDATE_AFTER, 2L, 1, "v3", 4L)); + + // watermark=5: compacts records with t<=5 + // Buffered: UPDATE_BEFORE(1L, t=4) and UPDATE_AFTER(2L, t=4) cancel out for input 1, + // UPDATE_AFTER(2L, t=4) is emitted + harness.processWatermark(5L); + assertEmits(harness, updateAfter(2L, 1, "v3")); + + // UPDATE_BEFORE from input 2 with timestamp 6 + harness.processElement(recordWithTimestamp(RowKind.UPDATE_BEFORE, 2L, 1, "v3", 6L)); + assertEmitsNothing(harness); + + // Final watermark to flush all remaining buffered records (t=6) + // Buffered: UPDATE_AFTER(1L, t=6) and UPDATE_BEFORE(2L, t=6) + // After compaction: UPDATE_AFTER(1L, "v4") remains as final value + harness.processWatermark(10L); + assertEmits(harness, updateAfter(1L, 1, "v4")); + } + } + + // --- Tests without upsert key --- + + @ParameterizedTest + @EnumSource( + value = ConflictBehavior.class, + names = {"ERROR", "NOTHING"}) + void testBasicInsertWithoutUpsertKey(ConflictBehavior behavior) throws Exception { + try (KeyedOneInputStreamOperatorTestHarness harness = + createHarnessWithoutUpsertKey(behavior)) { + harness.open(); + + // Insert first record + harness.processElement(insertRecord(1L, 1, "a1")); + assertEmitsNothing(harness); + + // Advance watermark to trigger compaction + harness.processWatermark(100L); + assertEmits(harness, insert(1L, 1, "a1")); + } + } + + @Test + void testUpdateWithoutUpsertKeyNothingStrategy() throws Exception { + // Without upsert key, UPDATE_AFTER on existing value causes conflict (two rows accumulate) + // NOTHING strategy keeps the first value + try (KeyedOneInputStreamOperatorTestHarness harness = + createHarnessWithoutUpsertKey(ConflictBehavior.NOTHING)) { + harness.open(); + + harness.processElement(insertRecord(1L, 1, "a1")); + harness.processWatermark(100L); + assertEmits(harness, insert(1L, 1, "a1")); + + // UPDATE_AFTER with different value - creates conflict, NOTHING keeps first + harness.processElement(updateAfterRecord(2L, 1, "a2")); + harness.processWatermark(200L); + // NOTHING keeps the previous value (1L, 1, "a1"), no emission since unchanged + assertEmitsNothing(harness); + } + } + + @Test + void testUpdateWithoutUpsertKeyErrorStrategy() throws Exception { + // Without upsert key, UPDATE_AFTER on existing value causes conflict (two rows accumulate) + // ERROR strategy throws + try (KeyedOneInputStreamOperatorTestHarness harness = + createHarnessWithoutUpsertKey(ConflictBehavior.ERROR)) { + harness.open(); + + harness.processElement(insertRecord(1L, 1, "a1")); + harness.processWatermark(100L); + assertEmits(harness, insert(1L, 1, "a1")); + + // UPDATE_AFTER with different value - creates conflict, ERROR throws + harness.processElement(updateAfterRecord(2L, 1, "a2")); + assertThatThrownBy(() -> harness.processWatermark(200L)) + .isInstanceOf(TableRuntimeException.class) + .hasMessageContaining("Primary key constraint violation"); + } + } + + @ParameterizedTest + @EnumSource( + value = ConflictBehavior.class, + names = {"ERROR", "NOTHING"}) + void testDeleteAfterInsertWithoutUpsertKey(ConflictBehavior behavior) throws Exception { + try (KeyedOneInputStreamOperatorTestHarness harness = + createHarnessWithoutUpsertKey(behavior)) { + harness.open(); + + // Insert and compact + harness.processElement(insertRecord(1L, 1, "a1")); + harness.processWatermark(100L); + assertEmits(harness, insert(1L, 1, "a1")); + + // Delete with exact same row and compact + harness.processElement(deleteRecord(1L, 1, "a1")); + harness.processWatermark(200L); + assertEmits(harness, delete(1L, 1, "a1")); + } + } + + @ParameterizedTest + @EnumSource( + value = ConflictBehavior.class, + names = {"ERROR", "NOTHING"}) + void testInsertAndDeleteInSameWindowWithoutUpsertKey(ConflictBehavior behavior) + throws Exception { + try (KeyedOneInputStreamOperatorTestHarness harness = + createHarnessWithoutUpsertKey(behavior)) { + harness.open(); + + // Insert and delete with exact same row before watermark advances + harness.processElement(insertRecord(1L, 1, "a1")); + harness.processElement(deleteRecord(1L, 1, "a1")); + + // Compact - should emit nothing since insert and delete cancel out + harness.processWatermark(100L); + assertEmitsNothing(harness); + } + } + + @Test + void testIdenticalInsertsWithoutUpsertKeyNothingKeepsFirst() throws Exception { + // Without upsert key, even identical inserts are separate entries + // NOTHING strategy just keeps the first one + try (KeyedOneInputStreamOperatorTestHarness harness = + createHarnessWithoutUpsertKey(ConflictBehavior.NOTHING)) { + harness.open(); + + // Insert two identical records (same full row) + harness.processElement(insertRecord(1L, 1, "same")); + harness.processElement(insertRecord(1L, 1, "same")); + + // Compact - NOTHING keeps first record + harness.processWatermark(100L); + assertEmits(harness, insert(1L, 1, "same")); + } + } + + @Test + void testIdenticalInsertsWithoutUpsertKeyErrorThrows() throws Exception { + // Without upsert key, even identical inserts are separate entries + // ERROR strategy throws because there are multiple records + try (KeyedOneInputStreamOperatorTestHarness harness = + createHarnessWithoutUpsertKey(ConflictBehavior.ERROR)) { + harness.open(); + + // Insert two identical records (same full row) + harness.processElement(insertRecord(1L, 1, "same")); + harness.processElement(insertRecord(1L, 1, "same")); + + // Compact - ERROR throws because there are multiple pending records + assertThatThrownBy(() -> harness.processWatermark(100L)) + .isInstanceOf(TableRuntimeException.class) + .hasMessageContaining("Primary key constraint violation"); + } + } + + @ParameterizedTest + @EnumSource( + value = ConflictBehavior.class, + names = {"ERROR", "NOTHING"}) + void testInsertUpdateDeleteWithoutUpsertKey(ConflictBehavior behavior) throws Exception { + try (KeyedOneInputStreamOperatorTestHarness harness = + createHarnessWithoutUpsertKey(behavior)) { + harness.open(); + + // Insert, then update_before + update_after sequence + harness.processElement(insertRecord(1L, 1, "v1")); + harness.processWatermark(100L); + assertEmits(harness, insert(1L, 1, "v1")); + + // Update: retract old value, insert new value + harness.processElement(new StreamRecord<>(updateBefore(1L, 1, "v1"))); + harness.processElement(updateAfterRecord(2L, 1, "v2")); + harness.processWatermark(200L); + assertEmits(harness, updateAfter(2L, 1, "v2")); + + // Delete the current value + harness.processElement(deleteRecord(2L, 1, "v2")); + harness.processWatermark(300L); + assertEmits(harness, delete(2L, 1, "v2")); + } + } + + // --- Restore Tests --- + + /** + * Tests that buffered records at different timestamps before checkpoint are consolidated to + * MIN_VALUE on restore and compacted on the first watermark. + */ + @ParameterizedTest + @EnumSource( + value = ConflictBehavior.class, + names = {"ERROR", "NOTHING"}) + void testBufferedRecordsConsolidatedOnRestore(ConflictBehavior behavior) throws Exception { + OperatorSubtaskState snapshot; + + // First harness: buffer records at different timestamps, then take snapshot + try (KeyedOneInputStreamOperatorTestHarness harness = + createHarness(behavior)) { + harness.open(); + + // Buffer records at different timestamps (simulating records before checkpoint) + harness.processElement(recordWithTimestamp(RowKind.INSERT, 1L, 1, "v1", 1000L)); + harness.processElement(recordWithTimestamp(RowKind.UPDATE_AFTER, 1L, 1, "v2", 2000L)); + harness.processElement(recordWithTimestamp(RowKind.UPDATE_AFTER, 1L, 1, "v3", 3000L)); + + // No watermark yet, so nothing emitted + assertEmitsNothing(harness); + + // Take snapshot with buffered records + snapshot = harness.snapshot(1L, 1L); + } + + // Second harness: restore from snapshot + try (KeyedOneInputStreamOperatorTestHarness harness = + createHarness(behavior)) { + harness.initializeState(snapshot); + harness.open(); + + // After restore, watermarks restart from MIN_VALUE. + // The buffered records should have been consolidated to MIN_VALUE. + // First watermark (even a small one) should trigger compaction of all consolidated + // records. + harness.processWatermark(100L); + + // All records were from same upsert key, so only final value is emitted + assertEmits(harness, insert(1L, 1, "v3")); + } + } + + /** + * Tests that in-flight records from unaligned checkpoints (records with timestamps > MIN_VALUE + * arriving before first watermark after restore) are correctly handled. + */ + @ParameterizedTest + @EnumSource( + value = ConflictBehavior.class, + names = {"ERROR", "NOTHING"}) + void testInFlightRecordsAfterRestore(ConflictBehavior behavior) throws Exception { + OperatorSubtaskState snapshot; + + // First harness: take empty snapshot + try (KeyedOneInputStreamOperatorTestHarness harness = + createHarness(behavior)) { + harness.open(); + snapshot = harness.snapshot(1L, 1L); + } + + // Second harness: restore and simulate in-flight records + try (KeyedOneInputStreamOperatorTestHarness harness = + createHarness(behavior)) { + harness.initializeState(snapshot); + harness.open(); + + // Simulate in-flight records that were checkpointed with their old timestamps. + // These arrive after restore but before any watermark is received. + // They have timestamps > MIN_VALUE from before the checkpoint. + harness.processElement(recordWithTimestamp(RowKind.INSERT, 1L, 1, "v1", 5000L)); + harness.processElement(recordWithTimestamp(RowKind.UPDATE_AFTER, 1L, 1, "v2", 5100L)); + + // No watermark yet, nothing emitted + assertEmitsNothing(harness); + + // First watermark after restore. Since currentWatermark was MIN_VALUE, + // in-flight records should have been assigned to MIN_VALUE and will be compacted. + harness.processWatermark(100L); + + // Both records had same upsert key, so only final value is emitted + assertEmits(harness, insert(1L, 1, "v2")); + } + } + + /** + * Tests restore with multiple keys having buffered records at different timestamps. Verifies + * that each key's records are correctly consolidated and compacted. + */ + @Test + void testRestoreWithMultipleKeys() throws Exception { + OperatorSubtaskState snapshot; + + // First harness: buffer records for multiple keys + try (KeyedOneInputStreamOperatorTestHarness harness = + createHarness(ConflictBehavior.NOTHING)) { + harness.open(); + + // Key 1: multiple updates + harness.processElement(recordWithTimestamp(RowKind.INSERT, 1L, 1, "k1v1", 1000L)); + harness.processElement(recordWithTimestamp(RowKind.UPDATE_AFTER, 1L, 1, "k1v2", 2000L)); + + // Key 2: single insert + harness.processElement(recordWithTimestamp(RowKind.INSERT, 2L, 2, "k2v1", 1500L)); + + // Key 3: insert then delete (should result in nothing) + harness.processElement(recordWithTimestamp(RowKind.INSERT, 3L, 3, "k3v1", 1000L)); + harness.processElement(recordWithTimestamp(RowKind.DELETE, 3L, 3, "k3v1", 2500L)); + + snapshot = harness.snapshot(1L, 1L); + } + + // Second harness: restore and verify + try (KeyedOneInputStreamOperatorTestHarness harness = + createHarness(ConflictBehavior.NOTHING)) { + harness.initializeState(snapshot); + harness.open(); + + // First watermark compacts all consolidated records + harness.processWatermark(100L); + + // Extract and verify results (order depends on key processing order) + List emitted = extractRecords(harness); + assertThat(emitted).hasSize(2); + // Key 1 should have final value "k1v2", Key 2 should have "k2v1", Key 3 cancelled out + assertThat(emitted) + .anySatisfy( + row -> { + assertThat(row.getInt(1)).isEqualTo(1); + assertThat(row.getString(2).toString()).isEqualTo("k1v2"); + }) + .anySatisfy( + row -> { + assertThat(row.getInt(1)).isEqualTo(2); + assertThat(row.getString(2).toString()).isEqualTo("k2v1"); + }); + } + } + + // --- Helper Methods --- + + private StreamRecord recordWithTimestamp( + RowKind kind, long upsertKey, int pk, String value, long timestamp) { + return new StreamRecord<>(rowOfKind(kind, upsertKey, pk, value), timestamp); + } + + private KeyedOneInputStreamOperatorTestHarness createHarness( + ConflictBehavior behavior) throws Exception { + WatermarkCompactingSinkMaterializer operator = + WatermarkCompactingSinkMaterializer.create( + toStrategy(behavior), + RowType.of(LOGICAL_TYPES), + RECORD_EQUALISER, + UPSERT_KEY_EQUALISER, + new int[] {0}); // upsert key is first column + + return new KeyedOneInputStreamOperatorTestHarness<>( + operator, + HandwrittenSelectorUtil.getRowDataSelector( + new int[] {PRIMARY_KEY_INDEX}, LOGICAL_TYPES), + HandwrittenSelectorUtil.getRowDataSelector( + new int[] {PRIMARY_KEY_INDEX}, LOGICAL_TYPES) + .getProducedType()); + } + + private KeyedOneInputStreamOperatorTestHarness + createHarnessWithoutUpsertKey(ConflictBehavior behavior) throws Exception { + WatermarkCompactingSinkMaterializer operator = + WatermarkCompactingSinkMaterializer.create( + toStrategy(behavior), + RowType.of(LOGICAL_TYPES), + RECORD_EQUALISER, + null, // no upsert key equaliser + null); // no upsert key + + return new KeyedOneInputStreamOperatorTestHarness<>( + operator, + HandwrittenSelectorUtil.getRowDataSelector( + new int[] {PRIMARY_KEY_INDEX}, LOGICAL_TYPES), + HandwrittenSelectorUtil.getRowDataSelector( + new int[] {PRIMARY_KEY_INDEX}, LOGICAL_TYPES) + .getProducedType()); + } + + private static InsertConflictStrategy toStrategy(ConflictBehavior behavior) { + switch (behavior) { + case ERROR: + return InsertConflictStrategy.error(); + case NOTHING: + return InsertConflictStrategy.nothing(); + case DEDUPLICATE: + return InsertConflictStrategy.deduplicate(); + default: + throw new IllegalArgumentException("Unknown behavior: " + behavior); + } + } + + private void assertEmitsNothing( + KeyedOneInputStreamOperatorTestHarness harness) { + assertThat(extractRecords(harness)).isEmpty(); + } + + private void assertEmits( + KeyedOneInputStreamOperatorTestHarness harness, + RowData... expected) { + List emitted = extractRecords(harness); + assertThat(emitted).containsExactly(expected); + } + + private List extractRecords( + KeyedOneInputStreamOperatorTestHarness harness) { + final RowData.FieldGetter[] fieldGetters = new RowData.FieldGetter[LOGICAL_TYPES.length]; + for (int i = 0; i < LOGICAL_TYPES.length; i++) { + fieldGetters[i] = RowData.createFieldGetter(LOGICAL_TYPES[i], i); + } + + final List rows = new ArrayList<>(); + Object o; + while ((o = harness.getOutput().poll()) != null) { + // Skip watermarks, only process StreamRecords + if (o instanceof StreamRecord) { + RowData value = (RowData) ((StreamRecord) o).getValue(); + Object[] row = new Object[LOGICAL_TYPES.length]; + for (int i = 0; i < LOGICAL_TYPES.length; i++) { + row[i] = fieldGetters[i].getFieldOrNull(value); + } + GenericRowData newRow = GenericRowData.of(row); + newRow.setRowKind(value.getRowKind()); + rows.add(newRow); + } + } + return rows; + } + + /** Test equaliser that compares all fields. */ + private static class TestRecordEqualiser implements RecordEqualiser { + @Override + public boolean equals(RowData row1, RowData row2) { + return row1.getRowKind() == row2.getRowKind() + && row1.getLong(0) == row2.getLong(0) + && row1.getInt(1) == row2.getInt(1) + && Objects.equals(row1.getString(2), row2.getString(2)); + } + } + + /** Test equaliser that only compares the upsert key (first column). */ + private static class TestUpsertKeyEqualiser implements RecordEqualiser { + @Override + public boolean equals(RowData row1, RowData row2) { + return row1.getLong(0) == row2.getLong(0); + } + } +} diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/util/StreamRecordUtils.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/util/StreamRecordUtils.java index 40929ed6ac64b..99a5deb41e93d 100644 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/util/StreamRecordUtils.java +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/util/StreamRecordUtils.java @@ -104,6 +104,26 @@ public static StreamRecord deleteRecord(Object... fields) { return new StreamRecord<>(row); } + /** Creates a new {@link RowData} with INSERT kind. Alias for {@link #row(Object...)}. */ + public static RowData insert(Object... fields) { + return rowOfKind(RowKind.INSERT, fields); + } + + /** Creates a new {@link RowData} with UPDATE_BEFORE kind. */ + public static RowData updateBefore(Object... fields) { + return rowOfKind(RowKind.UPDATE_BEFORE, fields); + } + + /** Creates a new {@link RowData} with UPDATE_AFTER kind. */ + public static RowData updateAfter(Object... fields) { + return rowOfKind(RowKind.UPDATE_AFTER, fields); + } + + /** Creates a new {@link RowData} with DELETE kind. */ + public static RowData delete(Object... fields) { + return rowOfKind(RowKind.DELETE, fields); + } + /** Receives a object array, generates a RowData based on the array. */ public static RowData rowOfKind(RowKind rowKind, Object... fields) { Object[] objects = new Object[fields.length];