Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -187,9 +187,6 @@ protected Transformation<Object> createSinkTransformation(
Optional<LineageVertex> lineageVertexOpt =
TableLineageUtils.extractLineageDataset(outputObject);

// only add materialization if input has change
final boolean needMaterialization = !inputInsertOnly && upsertMaterialize;

Transformation<RowData> sinkTransform =
applyConstraintValidations(inputTransform, config, persistedRowType);

Expand All @@ -202,10 +199,10 @@ protected Transformation<Object> createSinkTransformation(
primaryKeys,
sinkParallelism,
inputParallelism,
needMaterialization);
upsertMaterialize);
}

if (needMaterialization) {
if (upsertMaterialize) {
sinkTransform =
applyUpsertMaterialize(
sinkTransform,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.table.api.InsertConflictStrategy;
import org.apache.flink.table.api.InsertConflictStrategy.ConflictBehavior;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.api.config.ExecutionConfigOptions.RowtimeInserter;
import org.apache.flink.table.api.config.ExecutionConfigOptions.SinkUpsertMaterializeStrategy;
Expand Down Expand Up @@ -55,6 +55,8 @@
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializer;
import org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializerV2;
import org.apache.flink.table.runtime.operators.sink.WatermarkCompactingSinkMaterializer;
import org.apache.flink.table.runtime.operators.sink.WatermarkTimestampAssigner;
import org.apache.flink.table.runtime.sequencedmultisetstate.SequencedMultiSetStateConfig;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.runtime.typeutils.TypeCheckUtils;
Expand Down Expand Up @@ -100,6 +102,7 @@
producedTransformations = {
CommonExecSink.CONSTRAINT_VALIDATOR_TRANSFORMATION,
CommonExecSink.PARTITIONER_TRANSFORMATION,
StreamExecSink.WATERMARK_TIMESTAMP_ASSIGNER_TRANSFORMATION,
CommonExecSink.UPSERT_MATERIALIZE_TRANSFORMATION,
CommonExecSink.TIMESTAMP_INSERTER_TRANSFORMATION,
CommonExecSink.SINK_TRANSFORMATION
Expand All @@ -124,6 +127,7 @@
producedTransformations = {
CommonExecSink.CONSTRAINT_VALIDATOR_TRANSFORMATION,
CommonExecSink.PARTITIONER_TRANSFORMATION,
StreamExecSink.WATERMARK_TIMESTAMP_ASSIGNER_TRANSFORMATION,
CommonExecSink.UPSERT_MATERIALIZE_TRANSFORMATION,
CommonExecSink.TIMESTAMP_INSERTER_TRANSFORMATION,
CommonExecSink.SINK_TRANSFORMATION
Expand All @@ -133,6 +137,9 @@
public class StreamExecSink extends CommonExecSink implements StreamExecNode<Object> {
private static final Logger LOG = LoggerFactory.getLogger(StreamExecSink.class);

public static final String WATERMARK_TIMESTAMP_ASSIGNER_TRANSFORMATION =
"watermark-timestamp-assigner";

public static final String FIELD_NAME_INPUT_CHANGELOG_MODE = "inputChangelogMode";
public static final String FIELD_NAME_REQUIRE_UPSERT_MATERIALIZE = "requireUpsertMaterialize";
public static final String FIELD_NAME_UPSERT_MATERIALIZE_STRATEGY = "upsertMaterializeStrategy";
Expand Down Expand Up @@ -237,17 +244,6 @@ public StreamExecSink(
@Override
protected Transformation<Object> translateToPlanInternal(
PlannerBase planner, ExecNodeConfig config) {
// TODO: FLINK-38928 Remove this validation once runtime support for ERROR and NOTHING is
// implemented
if (InsertConflictStrategy.error().equals(conflictStrategy)
|| InsertConflictStrategy.nothing().equals(conflictStrategy)) {
throw new ValidationException(
"ON CONFLICT DO "
+ conflictStrategy
+ " is not yet supported. "
+ "Please use ON CONFLICT DO DEDUPLICATE instead.");
}

final ExecEdge inputEdge = getInputEdges().get(0);
final Transformation<RowData> inputTransform =
(Transformation<RowData>) inputEdge.translateToPlan(planner);
Expand Down Expand Up @@ -358,9 +354,29 @@ protected Transformation<RowData> applyUpsertMaterialize(
.mapToObj(idx -> fieldNames[idx])
.collect(Collectors.toList());

// For ERROR/NOTHING strategies, apply WatermarkTimestampAssigner first
// This assigns the current watermark as the timestamp to each record,
// which is required for the WatermarkCompactingSinkMaterializer to work correctly
Transformation<RowData> transformForMaterializer = inputTransform;
if (isErrorOrNothingConflictStrategy()) {
// Use input parallelism to preserve watermark semantics
transformForMaterializer =
ExecNodeUtil.createOneInputTransformation(
inputTransform,
createTransformationMeta(
WATERMARK_TIMESTAMP_ASSIGNER_TRANSFORMATION,
"WatermarkTimestampAssigner",
"WatermarkTimestampAssigner",
config),
new WatermarkTimestampAssigner(),
inputTransform.getOutputType(),
inputTransform.getParallelism(),
false);
}

OneInputTransformation<RowData, RowData> materializeTransform =
ExecNodeUtil.createOneInputTransformation(
inputTransform,
transformForMaterializer,
createTransformationMeta(
UPSERT_MATERIALIZE_TRANSFORMATION,
String.format(
Expand Down Expand Up @@ -390,6 +406,17 @@ private OneInputStreamOperator<RowData, RowData> createSumOperator(
GeneratedRecordEqualiser rowEqualiser,
GeneratedHashFunction rowHashFunction) {

// Check if we should use the watermark-compacting materializer for ERROR/NOTHING strategies
if (isErrorOrNothingConflictStrategy()) {
return WatermarkCompactingSinkMaterializer.create(
conflictStrategy,
physicalRowType,
rowEqualiser,
upsertKeyEqualiser,
inputUpsertKey);
}

// Use existing logic for DEDUPLICATE (legacy behavior)
SinkUpsertMaterializeStrategy sinkUpsertMaterializeStrategy =
Optional.ofNullable(upsertMaterializeStrategy)
.orElse(SinkUpsertMaterializeStrategy.LEGACY);
Expand All @@ -415,6 +442,12 @@ private OneInputStreamOperator<RowData, RowData> createSumOperator(
config));
}

private boolean isErrorOrNothingConflictStrategy() {
return conflictStrategy != null
&& (conflictStrategy.getBehavior() == ConflictBehavior.ERROR
|| conflictStrategy.getBehavior() == ConflictBehavior.NOTHING);
}

private static SequencedMultiSetStateConfig createStateConfig(
SinkUpsertMaterializeStrategy strategy,
TimeDomain ttlTimeDomain,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.flink.table.planner.plan.nodes.physical.stream

import org.apache.flink.table.api.InsertConflictStrategy
import org.apache.flink.table.api.InsertConflictStrategy.ConflictBehavior
import org.apache.flink.table.api.config.ExecutionConfigOptions.{SinkUpsertMaterializeStrategy, TABLE_EXEC_SINK_UPSERT_MATERIALIZE_STRATEGY}
import org.apache.flink.table.catalog.ContextResolvedTable
import org.apache.flink.table.connector.sink.DynamicTableSink
Expand All @@ -34,9 +35,12 @@ import org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTableConfig
import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
import org.apache.calcite.rel.{RelNode, RelWriter}
import org.apache.calcite.rel.hint.RelHint
import org.apache.calcite.util.ImmutableBitSet

import java.util

import scala.collection.JavaConversions._

/**
* Stream physical RelNode to write data into an external sink defined by a [[DynamicTableSink]].
*/
Expand Down Expand Up @@ -134,4 +138,35 @@ class StreamPhysicalSink(
.itemIf("upsertMaterialize", "true", upsertMaterialize)
.itemIf("conflictStrategy", conflictStrategy, conflictStrategy != null)
}

def isDeduplicateConflictStrategy: Boolean = {
conflictStrategy != null && conflictStrategy.getBehavior == ConflictBehavior.DEDUPLICATE
}

def primaryKeysContainsUpsertKey: Boolean = {
val primaryKeys = contextResolvedTable.getResolvedSchema.getPrimaryKeyIndexes
val pks = ImmutableBitSet.of(primaryKeys: _*)
val fmq = FlinkRelMetadataQuery.reuseOrCreate(getCluster.getMetadataQuery)
val changeLogUpsertKeys = fmq.getUpsertKeys(getInput)
changeLogUpsertKeys != null && changeLogUpsertKeys.exists(pks.contains)
}

def getUpsertKeyNames: String = {
val fmq = FlinkRelMetadataQuery.reuseOrCreate(getCluster.getMetadataQuery)
val changeLogUpsertKeys = fmq.getUpsertKeys(getInput)
if (changeLogUpsertKeys == null) {
"none"
} else {
val fieldNames = contextResolvedTable.getResolvedSchema.getColumnNames
changeLogUpsertKeys
.map(uk => uk.toArray.map(fieldNames.get).mkString("[", ", ", "]"))
.mkString(", ")
}
}

def getPrimaryKeyNames: String = {
val fieldNames = contextResolvedTable.getResolvedSchema.getColumnNames
val primaryKeys = contextResolvedTable.getResolvedSchema.getPrimaryKeyIndexes
primaryKeys.map(fieldNames.get).mkString("[", ", ", "]")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1060,38 +1060,30 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti
case UpsertMaterialize.FORCE => primaryKeys.nonEmpty && !sinkIsRetract
case UpsertMaterialize.NONE => false
case UpsertMaterialize.AUTO =>
if (
(inputIsAppend && InsertConflictStrategy
.deduplicate()
.equals(sink.conflictStrategy)) || sinkIsAppend || sinkIsRetract
) {
// if the sink is not an UPSERT sink (has no PK, or is an APPEND or RETRACT sink)
// we don't need to materialize results
if (primaryKeys.isEmpty || sinkIsAppend || sinkIsRetract) {
return false
}
if (primaryKeys.isEmpty) {

// For a DEDUPLICATE strategy and INSERT only input, we simply let the inserts be handled
// as UPSERT_AFTER and overwrite previous value
if (inputIsAppend && sink.isDeduplicateConflictStrategy) {
return false
}
val pks = ImmutableBitSet.of(primaryKeys: _*)
val fmq = FlinkRelMetadataQuery.reuseOrCreate(sink.getCluster.getMetadataQuery)
val changeLogUpsertKeys = fmq.getUpsertKeys(sink.getInput)
// if input has updates and primary key != upsert key (upsert key can be null) we should
// enable upsertMaterialize. An optimize is: do not enable upsertMaterialize when sink
// pk(s) contains input changeLogUpsertKeys
val upsertKeyDiffersFromPk =
changeLogUpsertKeys == null || !changeLogUpsertKeys.exists(pks.contains)

// if input has updates and primary key != upsert key we should enable upsertMaterialize.
//
// An optimize is: do not enable upsertMaterialize when sink pk(s) contains input
// changeLogUpsertKeys
val upsertKeyDiffersFromPk = !sink.primaryKeysContainsUpsertKey

// Validate that ON CONFLICT is specified when upsert key differs from primary key
val requireOnConflict =
tableConfig.get(ExecutionConfigOptions.TABLE_EXEC_SINK_REQUIRE_ON_CONFLICT)
if (requireOnConflict && upsertKeyDiffersFromPk && sink.conflictStrategy == null) {
val fieldNames = sink.contextResolvedTable.getResolvedSchema.getColumnNames
val pkNames = primaryKeys.map(fieldNames.get(_)).mkString("[", ", ", "]")
val upsertKeyNames = if (changeLogUpsertKeys == null) {
"none"
} else {
changeLogUpsertKeys
.map(uk => uk.toArray.map(fieldNames.get(_)).mkString("[", ", ", "]"))
.mkString(", ")
}
val pkNames = sink.getPrimaryKeyNames
val upsertKeyNames = sink.getUpsertKeyNames
throw new ValidationException(
"The query has an upsert key that differs from the primary key of the sink table " +
s"'${sink.contextResolvedTable.getIdentifier.asSummaryString}'. " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,48 @@

package org.apache.flink.table.planner.plan.nodes.exec.stream;

import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.planner.plan.nodes.exec.testutils.SemanticTestBase;
import org.apache.flink.table.test.program.SinkTestStep;
import org.apache.flink.table.test.program.TableTestProgram;
import org.apache.flink.table.test.program.TestStep;
import org.apache.flink.table.test.program.TestStep.TestKind;

import java.util.EnumSet;
import java.util.List;
import java.util.Map;

/** Semantic tests for {@link StreamExecSink}. */
public class SinkSemanticTests extends SemanticTestBase {

@Override
public EnumSet<TestKind> supportedSetupSteps() {
EnumSet<TestKind> steps = super.supportedSetupSteps();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the reason for this change? all sinks seem to have data?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The failing tests sink's don't have data. To be honest I am not sure how it worked before this PR 🤔

steps.add(TestKind.SINK_WITHOUT_DATA);
return steps;
}

@Override
protected void runStep(TestStep testStep, TableEnvironment env) throws Exception {
if (testStep.getKind() == TestKind.SINK_WITHOUT_DATA) {
final SinkTestStep sinkTestStep = (SinkTestStep) testStep;
sinkTestStep.apply(
env,
Map.ofEntries(
Map.entry("connector", "values"),
Map.entry("sink-insert-only", "false")));
} else {
super.runStep(testStep, env);
}
}

@Override
public List<TableTestProgram> programs() {
return List.of(
SinkTestPrograms.INSERT_RETRACT_WITHOUT_PK,
SinkTestPrograms.INSERT_RETRACT_WITH_PK,
SinkTestPrograms.ON_CONFLICT_DO_NOTHING_NOT_SUPPORTED,
SinkTestPrograms.ON_CONFLICT_DO_ERROR_NOT_SUPPORTED,
SinkTestPrograms.ON_CONFLICT_DO_NOTHING_KEEPS_FIRST,
SinkTestPrograms.ON_CONFLICT_DO_ERROR_NO_CONFLICT,
SinkTestPrograms.UPSERT_KEY_DIFFERS_FROM_PK_WITHOUT_ON_CONFLICT,
SinkTestPrograms.UPSERT_KEY_DIFFERS_FROM_PK_WITH_ON_CONFLICT,
SinkTestPrograms.UPSERT_KEY_MATCHES_PK_WITHOUT_ON_CONFLICT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,46 +89,47 @@ public class SinkTestPrograms {
"INSERT INTO sink_t SELECT UPPER(name), SUM(score) FROM source_t GROUP BY name")
.build();

// --- ON CONFLICT validation tests ---
// --- ON CONFLICT tests ---

public static final TableTestProgram ON_CONFLICT_DO_NOTHING_NOT_SUPPORTED =
public static final TableTestProgram ON_CONFLICT_DO_NOTHING_KEEPS_FIRST =
TableTestProgram.of(
"sink-on-conflict-do-nothing-not-supported",
"ON CONFLICT DO NOTHING is not yet supported and should throw ValidationException.")
"sink-on-conflict-do-nothing-keeps-first",
"ON CONFLICT DO NOTHING keeps the first record when multiple records have the same PK.")
.setupTableSource(
SourceTestStep.newBuilder("source_t")
.addSchema("a INT", "b BIGINT")
.addOption("changelog-mode", "I")
.producedValues(Row.ofKind(RowKind.INSERT, 1, 1L))
.producedValues(
Row.ofKind(RowKind.INSERT, 1, 10L),
Row.ofKind(RowKind.INSERT, 1, 20L),
Row.ofKind(RowKind.INSERT, 2, 30L))
.build())
.setupTableSink(
SinkTestStep.newBuilder("sink_t")
.addSchema("a INT PRIMARY KEY NOT ENFORCED", "b BIGINT")
.consumedValues("+I[1, 10]", "+I[2, 30]")
.build())
.runFailingSql(
"INSERT INTO sink_t SELECT a, b FROM source_t ON CONFLICT DO NOTHING",
ValidationException.class,
"ON CONFLICT DO NOTHING is not yet supported")
.runSql("INSERT INTO sink_t SELECT a, b FROM source_t ON CONFLICT DO NOTHING")
.build();

public static final TableTestProgram ON_CONFLICT_DO_ERROR_NOT_SUPPORTED =
public static final TableTestProgram ON_CONFLICT_DO_ERROR_NO_CONFLICT =
TableTestProgram.of(
"sink-on-conflict-do-error-not-supported",
"ON CONFLICT DO ERROR is not yet supported and should throw ValidationException.")
"sink-on-conflict-do-error-no-conflict",
"ON CONFLICT DO ERROR with no conflicts passes through all records.")
.setupTableSource(
SourceTestStep.newBuilder("source_t")
.addSchema("a INT", "b BIGINT")
.addOption("changelog-mode", "I")
.producedValues(Row.ofKind(RowKind.INSERT, 1, 1L))
.producedValues(
Row.ofKind(RowKind.INSERT, 1, 10L),
Row.ofKind(RowKind.INSERT, 2, 20L))
.build())
.setupTableSink(
SinkTestStep.newBuilder("sink_t")
.addSchema("a INT PRIMARY KEY NOT ENFORCED", "b BIGINT")
.consumedValues("+I[1, 10]", "+I[2, 20]")
.build())
.runFailingSql(
"INSERT INTO sink_t SELECT a, b FROM source_t ON CONFLICT DO ERROR",
ValidationException.class,
"ON CONFLICT DO ERROR is not yet supported")
.runSql("INSERT INTO sink_t SELECT a, b FROM source_t ON CONFLICT DO ERROR")
.build();

public static final TableTestProgram UPSERT_KEY_DIFFERS_FROM_PK_WITHOUT_ON_CONFLICT =
Expand Down
Loading