Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1056,6 +1056,22 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti
val sinkIsAppend = sinkChangelogMode.containsOnly(RowKind.INSERT)
val sinkIsRetract = sinkChangelogMode.contains(RowKind.UPDATE_BEFORE)

// Validate ON CONFLICT is only allowed for upsert sinks
if (sink.conflictStrategy != null) {
val isUpsertSink = !sinkIsAppend && !sinkIsRetract
if (!isUpsertSink) {
val reason = if (sinkIsAppend) {
"it only accepts INSERT (append-only) changes"
} else {
"it requires UPDATE_BEFORE (retract mode)"
}
throw new ValidationException(
s"ON CONFLICT clause is only allowed for upsert sinks. " +
s"The sink '${sink.contextResolvedTable.getIdentifier.asSummaryString}' " +
s"is not an upsert sink because $reason.")
}
}

tableConfig.get(ExecutionConfigOptions.TABLE_EXEC_SINK_UPSERT_MATERIALIZE) match {
case UpsertMaterialize.FORCE => primaryKeys.nonEmpty && !sinkIsRetract
case UpsertMaterialize.NONE => false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ public List<TableTestProgram> programs() {
SinkTestPrograms.UPSERT_KEY_MATCHES_PK_WITHOUT_ON_CONFLICT,
SinkTestPrograms.APPEND_ONLY_WITH_PK_WITHOUT_ON_CONFLICT,
SinkTestPrograms.APPEND_ONLY_WITH_PK_WITH_ON_CONFLICT,
SinkTestPrograms.UPSERT_KEY_DIFFERS_FROM_PK_WITHOUT_ON_CONFLICT_DISABLED);
SinkTestPrograms.UPSERT_KEY_DIFFERS_FROM_PK_WITHOUT_ON_CONFLICT_DISABLED,
SinkTestPrograms.ON_CONFLICT_NOT_ALLOWED_FOR_APPEND_ONLY_SINK,
SinkTestPrograms.ON_CONFLICT_NOT_ALLOWED_FOR_RETRACT_SINK);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -282,4 +282,50 @@ public class SinkTestPrograms {
.build())
.runSql("INSERT INTO sink_t SELECT a, b FROM source_t")
.build();

public static final TableTestProgram ON_CONFLICT_NOT_ALLOWED_FOR_APPEND_ONLY_SINK =
TableTestProgram.of(
"sink-on-conflict-not-allowed-for-append-only-sink",
"ON CONFLICT clause is not allowed for append-only sinks (no primary key).")
.setupTableSource(
SourceTestStep.newBuilder("source_t")
.addSchema("a INT", "b BIGINT")
.addOption("changelog-mode", "I")
.producedValues(Row.ofKind(RowKind.INSERT, 1, 10L))
.build())
.setupTableSink(
SinkTestStep.newBuilder("sink_t")
.addSchema("a INT", "b BIGINT")
.addOption("sink-insert-only", "true")
.build())
.runFailingSql(
"INSERT INTO sink_t SELECT a, b FROM source_t ON CONFLICT DO NOTHING",
ValidationException.class,
"ON CONFLICT clause is only allowed for upsert sinks. "
+ "The sink 'default_catalog.default_database.sink_t' is not an upsert sink "
+ "because it only accepts INSERT (append-only) changes.")
.build();

public static final TableTestProgram ON_CONFLICT_NOT_ALLOWED_FOR_RETRACT_SINK =
TableTestProgram.of(
"sink-on-conflict-not-allowed-for-retract-sink",
"ON CONFLICT clause is not allowed for retract sinks (requires UPDATE_BEFORE).")
.setupTableSource(
SourceTestStep.newBuilder("source_t")
.addSchema("a INT", "b BIGINT")
.addOption("changelog-mode", "I")
.producedValues(Row.ofKind(RowKind.INSERT, 1, 10L))
.build())
.setupTableSink(
SinkTestStep.newBuilder("sink_t")
.addSchema("a INT PRIMARY KEY NOT ENFORCED", "cnt BIGINT")
.addOption("sink-changelog-mode-enforced", "I,UB,UA,D")
.build())
.runFailingSql(
"INSERT INTO sink_t SELECT a, COUNT(*) AS cnt FROM source_t GROUP BY a ON CONFLICT DO DEDUPLICATE",
ValidationException.class,
"ON CONFLICT clause is only allowed for upsert sinks. "
+ "The sink 'default_catalog.default_database.sink_t' is not an upsert sink "
+ "because it requires UPDATE_BEFORE (retract mode).")
.build();
}
Original file line number Diff line number Diff line change
Expand Up @@ -803,7 +803,7 @@ Sink(table=[default_catalog.default_database.snk], fields=[a0, a1, a2, a3, b0, b
:- Reused(reference_id=[1])
+- Reused(reference_id=[2])

Sink(table=[default_catalog.default_database.snk2], fields=[a0, a1, a2, a3, b0, b2, b1], conflictStrategy=[DEDUPLICATE])
Sink(table=[default_catalog.default_database.snk2], fields=[a0, a1, a2, a3, b0, b2, b1])
+- Join(joinType=[InnerJoin], where=[((a1 = b1) AND (a2 = b2))], select=[a0, a1, a2, a3, b0, b2, b1], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
:- Reused(reference_id=[1])
+- Reused(reference_id=[2])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -634,11 +634,11 @@ class DeltaJoinTest extends TableTestBase {
"and src1.a2 = src2.b2 " +
"on conflict do deduplicate")

// snk2 is a retract sink, so ON CONFLICT is not allowed
stmt.addInsertSql(
"insert into snk2 select * from src1 join src2 " +
"on src1.a1 = src2.b1 " +
"and src1.a2 = src2.b2 " +
"on conflict do deduplicate")
"and src1.a2 = src2.b2")

util.verifyExecPlan(stmt)
}
Expand Down