From 7cda9f8491ba5bf7860db6fb650f9d43dfe3d686 Mon Sep 17 00:00:00 2001 From: Ramin Gharib Date: Thu, 14 May 2026 21:57:41 +0200 Subject: [PATCH 1/6] [FLINK-39538][table] Support upsert output mode for FROM_CHANGELOG FROM_CHANGELOG now emits an upsert changelog (INSERT, UPDATE_AFTER, full DELETE) when the input table is partitioned (set semantics via PARTITION BY) and the active op_mapping maps to UPDATE_AFTER without UPDATE_BEFORE. The partition key acts as the upsert key. In all other cases the output remains a retract changelog. Submitting an op_mapping with UPDATE_AFTER but no UPDATE_BEFORE without PARTITION BY is rejected at validation time, because upsert mode requires a key. To enable the strategy to inspect the resolved op_mapping and the input table's partition keys, ChangelogFunction.ChangelogContext is extended with two default methods: getArgumentValue(int, Class) and getTableSemantics(int). Defaults return Optional.empty() to preserve source compatibility for existing implementations. The planner-side wrapper in FlinkChangelogModeInferenceProgram delegates the two new methods to the underlying CallContext. Upsert mode uses full deletes (ChangelogMode.upsert(false)) because the runtime forwards each input delete row with all fields populated; only the RowKind is rewritten. This matches the runtime's behavior and avoids forcing downstream operators to reconstruct rows from state. The upsert key derivation in FlinkRelMdUniqueKeys.getPtfUniqueKeys already returns the partition columns when a PTF emits upsert, so no metadata changes are needed. --- .../docs/sql/reference/queries/changelog.md | 28 ++++++ flink-python/pyflink/table/table.py | 6 ++ .../flink/table/api/PartitionedTable.java | 12 +++ .../org/apache/flink/table/api/Table.java | 5 + .../functions/BuiltInFunctionDefinitions.java | 4 +- .../table/functions/ChangelogFunction.java | 26 +++++ .../strategies/FromChangelogTypeStrategy.java | 94 ++++++++++++++++--- .../FromChangelogInputTypeStrategyTest.java | 36 ++++++- .../FlinkChangelogModeInferenceProgram.scala | 14 ++- .../stream/FromChangelogSemanticTests.java | 2 + .../stream/FromChangelogTestPrograms.java | 63 +++++++++++++ .../plan/stream/sql/FromChangelogTest.java | 19 ++++ .../plan/stream/sql/FromChangelogTest.xml | 20 ++++ 13 files changed, 309 insertions(+), 20 deletions(-) diff --git a/docs/content/docs/sql/reference/queries/changelog.md b/docs/content/docs/sql/reference/queries/changelog.md index b02215129e5da..e247ee613c09b 100644 --- a/docs/content/docs/sql/reference/queries/changelog.md +++ b/docs/content/docs/sql/reference/queries/changelog.md @@ -149,6 +149,34 @@ Prefer row semantics, when possible. `PARTITION BY` is only necessary when downs If you are producing an upsert table — that is, you are emitting `UPDATE_AFTER` but no `UPDATE_BEFORE` from your CDC input stream — the partition key you select here will be considered both the primary key and the upsert key by the engine. Make sure the `PARTITION BY` key matches your primary key exactly. +#### Upsert output + +When `PARTITION BY` is combined with an `op_mapping` that does NOT include `UPDATE_BEFORE`, the output changelog is an upsert table keyed on the partition columns. Each input row produces an `INSERT`, `UPDATE_AFTER`, or `DELETE` event with the partition key acting as the upsert key. + +```sql +-- Upsert input: INSERT / UPDATE_AFTER / DELETE only +-- +I[id:1, op:'INSERT', name:'Alice'] +-- +I[id:2, op:'INSERT', name:'Bob'] +-- +I[id:1, op:'UPDATE_AFTER', name:'Alice2'] +-- +I[id:2, op:'DELETE', name:'Bob'] + +SELECT * FROM FROM_CHANGELOG( + input => TABLE cdc_stream PARTITION BY id, + op_mapping => MAP[ + 'INSERT', 'INSERT', + 'UPDATE_AFTER', 'UPDATE_AFTER', + 'DELETE', 'DELETE'] +) + +-- Output (upsert changelog, upsert key = id): +-- +I[id:1, name:'Alice'] +-- +I[id:2, name:'Bob'] +-- +U[id:1, name:'Alice2'] +-- -D[id:2, name:'Bob'] +``` + +Without `PARTITION BY`, or when the active `op_mapping` includes `UPDATE_BEFORE`, the output remains a retract changelog. + #### Ordering CDC events with ORDER BY CDC streams can deliver events out of order. For example, a key's `UPDATE_AFTER` may arrive before its matching `UPDATE_BEFORE` when events are partitioned across upstream brokers. If the source itself does not guarantee ordering, applying such a changelog directly produces incorrect state. diff --git a/flink-python/pyflink/table/table.py b/flink-python/pyflink/table/table.py index bebf1cafa292a..583167fa47838 100644 --- a/flink-python/pyflink/table/table.py +++ b/flink-python/pyflink/table/table.py @@ -1234,6 +1234,12 @@ def from_changelog(self, *arguments: Expression) -> 'Table': in the mapping; pass ``error_handling => 'SKIP'`` to silently drop those rows instead. + The output is a retract changelog. To emit an upsert changelog instead, combine + ``PARTITION BY`` (set semantics on the table argument) with an ``op_mapping`` that + maps to ``UPDATE_AFTER`` without ``UPDATE_BEFORE``. The partition key becomes the + upsert key. An upsert mapping without ``PARTITION BY`` is rejected at validation + time, since upsert mode requires a key. + Example: :: diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PartitionedTable.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PartitionedTable.java index 3f65a22075b89..863455329e16b 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PartitionedTable.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PartitionedTable.java @@ -226,6 +226,18 @@ public interface PartitionedTable { *

For row semantics (each row processed independently), use {@link Table#fromChangelog} on * the unpartitioned table. * + *

Output changelog mode: + * + *

+ * *

Examples: * *

{@code
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java
index e7f690263a41c..0ffb90c5fcc02 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java
@@ -1489,6 +1489,11 @@ default TableResult executeInsert(
      *     .fromChangelog();
      * }
* + *

The output is a retract changelog. To emit an upsert changelog instead, combine {@code + * PARTITION BY} with an {@code op_mapping} that maps to {@code UPDATE_AFTER} without {@code + * UPDATE_BEFORE}. The partition key becomes the upsert key. See {@link + * PartitionedTable#fromChangelog(Expression...)} for details. + * *

Optional arguments can be passed using named expressions: * *

{@code
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
index 6d17bf880e841..cf5e7804e2c62 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
@@ -27,7 +27,6 @@
 import org.apache.flink.table.api.JsonType;
 import org.apache.flink.table.api.JsonValueOnEmptyOrError;
 import org.apache.flink.table.api.TableException;
-import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.expressions.TimeIntervalUnit;
 import org.apache.flink.table.expressions.TimePointUnit;
 import org.apache.flink.table.expressions.ValueLiteralExpression;
@@ -39,6 +38,7 @@
 import org.apache.flink.table.types.inference.TraitCondition;
 import org.apache.flink.table.types.inference.TypeStrategies;
 import org.apache.flink.table.types.inference.strategies.ArrayOfStringArgumentTypeStrategy;
+import org.apache.flink.table.types.inference.strategies.FromChangelogTypeStrategy;
 import org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies;
 import org.apache.flink.table.types.inference.strategies.SpecificTypeStrategies;
 import org.apache.flink.table.types.logical.LocalZonedTimestampType;
@@ -854,7 +854,7 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
                                     DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING()),
                                     true),
                             StaticArgument.scalar("error_handling", DataTypes.STRING(), true))
-                    .changelogModeStrategy(ctx -> ChangelogMode.all())
+                    .changelogModeStrategy(FromChangelogTypeStrategy.CHANGELOG_MODE_STRATEGY)
                     .inputTypeStrategy(FROM_CHANGELOG_INPUT_TYPE_STRATEGY)
                     .outputTypeStrategy(FROM_CHANGELOG_OUTPUT_TYPE_STRATEGY)
                     .runtimeClass(
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/ChangelogFunction.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/ChangelogFunction.java
index ad6e56d09d924..e9b16bff6cfc3 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/ChangelogFunction.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/ChangelogFunction.java
@@ -21,8 +21,11 @@
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.table.annotation.ArgumentTrait;
 import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.types.RowKind;
 
+import java.util.Optional;
+
 /**
  * An extension that allows a process table function (PTF) to emit results with changelog semantics.
  *
@@ -105,5 +108,28 @@ interface ChangelogContext {
          * are required and {@link ChangelogMode#keyOnlyDeletes()} are supported.
          */
         ChangelogMode getRequiredChangelogMode();
+
+        /**
+         * Returns the resolved literal value of the scalar argument at the given position.
+         *
+         * 

Returns empty if the argument is absent, NULL, not a literal, or cannot be expressed + * as an instance of the provided class. Conversions follow the default conversion classes + * of {@link LogicalType LogicalTypes}. + */ + default Optional getArgumentValue(int pos, Class clazz) { + return Optional.empty(); + } + + /** + * Returns information about the table that has been passed to a table argument at the given + * position. + * + *

Semantics are only available for table arguments. They expose, for example, the + * partition-by columns when the table argument uses {@link ArgumentTrait#SET_SEMANTIC_TABLE + * set semantics}. + */ + default Optional getTableSemantics(int pos) { + return Optional.empty(); + } } } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/FromChangelogTypeStrategy.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/FromChangelogTypeStrategy.java index f4d54f276d73c..3165c83ace6c4 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/FromChangelogTypeStrategy.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/FromChangelogTypeStrategy.java @@ -22,6 +22,9 @@ import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.DataTypes.Field; import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.functions.ChangelogFunction.ChangelogContext; +import org.apache.flink.table.functions.ChangelogModeStrategy; import org.apache.flink.table.functions.TableSemantics; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.inference.CallContext; @@ -95,6 +98,50 @@ public Optional> inferInputTypes( return Optional.of(DataTypes.ROW(outputFields).notNull()); }; + // -------------------------------------------------------------------------------------------- + // Changelog mode inference + // -------------------------------------------------------------------------------------------- + + /** + * Emits an upsert changelog when the input is partitioned (set semantics) and the resolved + * {@code op_mapping} maps to {@code UPDATE_AFTER} without {@code UPDATE_BEFORE}. In all other + * cases the output is a retract changelog. When upsert mode is selected, the partition key acts + * as the upsert key. + * + *

Upsert mode uses full deletes ({@link ChangelogMode#upsert(boolean) upsert(false)}) + * because the runtime forwards each input delete row with all fields populated; only the {@link + * org.apache.flink.types.RowKind} is rewritten. + */ + public static final ChangelogModeStrategy CHANGELOG_MODE_STRATEGY = + ctx -> isUpsertConfig(ctx) ? ChangelogMode.upsert(false) : ChangelogMode.all(); + + /** + * Returns {@code true} when the FROM_CHANGELOG call should emit an upsert changelog: the input + * table is partitioned AND the resolved {@code op_mapping} contains {@code UPDATE_AFTER} + * without {@code UPDATE_BEFORE}. Falls back to {@code false} when the mapping is absent or + * cannot be resolved as a literal, since the default mapping includes both (retract). + */ + @SuppressWarnings("unchecked") + public static boolean isUpsertConfig(final ChangelogContext ctx) { + final boolean partitioned = + ctx.getTableSemantics(ARG_TABLE) + .map(ts -> ts.partitionByColumns().length > 0) + .orElse(false); + if (!partitioned) { + return false; + } + final Optional opMapping = ctx.getArgumentValue(ARG_OP_MAPPING, Map.class); + if (opMapping.isEmpty()) { + return false; + } + final Map mapping = opMapping.get(); + final boolean hasUpdateAfter = + mapping.values().stream().anyMatch(v -> UPDATE_AFTER.equals(v.trim())); + final boolean hasUpdateBefore = + mapping.values().stream().anyMatch(v -> UPDATE_BEFORE.equals(v.trim())); + return hasUpdateAfter && !hasUpdateBefore; + } + // -------------------------------------------------------------------------------------------- // Helpers // -------------------------------------------------------------------------------------------- @@ -200,22 +247,47 @@ private static Optional> validateOpMapping( if (validationError.isPresent()) { return validationError; } - - final boolean hasUpdateBefore = - mapping.values().stream().anyMatch(v -> UPDATE_BEFORE.equals(v.trim())); - final boolean hasUpdateAfter = - mapping.values().stream().anyMatch(v -> UPDATE_AFTER.equals(v.trim())); - if (hasUpdateAfter && !hasUpdateBefore) { - return callContext.fail( - throwOnFailure, - "The 'op_mapping' must include UPDATE_BEFORE for retract mode. " - + "Upsert mode (without UPDATE_BEFORE) is not supported " - + "in this version."); + final Optional> upsertKeyError = + validateUpsertRequiresPartitionBy(callContext, mapping, throwOnFailure); + if (upsertKeyError.isPresent()) { + return upsertKeyError; } } return Optional.empty(); } + /** + * An {@code op_mapping} that produces {@code UPDATE_AFTER} without {@code UPDATE_BEFORE} + * describes an upsert changelog. Upsert mode requires a key, so the input table must use set + * semantics via {@code PARTITION BY}. + */ + private static Optional> validateUpsertRequiresPartitionBy( + final CallContext callContext, + final Map mapping, + final boolean throwOnFailure) { + final boolean hasUpdateAfter = + mapping.values().stream().anyMatch(v -> UPDATE_AFTER.equals(v.trim())); + final boolean hasUpdateBefore = + mapping.values().stream().anyMatch(v -> UPDATE_BEFORE.equals(v.trim())); + if (!hasUpdateAfter || hasUpdateBefore) { + return Optional.empty(); + } + final boolean partitioned = + callContext + .getTableSemantics(ARG_TABLE) + .map(ts -> ts.partitionByColumns().length > 0) + .orElse(false); + if (partitioned) { + return Optional.empty(); + } + return callContext.fail( + throwOnFailure, + "An 'op_mapping' that produces UPDATE_AFTER without UPDATE_BEFORE describes an " + + "upsert changelog and requires a key. Add PARTITION BY to the input " + + "table to define the upsert key, or include UPDATE_BEFORE in the " + + "mapping for retract semantics."); + } + /** * Validates op_mapping values. Values must be valid Flink change operation names. Each name * must appear at most once across all entries. diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/FromChangelogInputTypeStrategyTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/FromChangelogInputTypeStrategyTest.java index 322cf94a49e49..70595f1c5971f 100644 --- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/FromChangelogInputTypeStrategyTest.java +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/FromChangelogInputTypeStrategyTest.java @@ -139,11 +139,33 @@ protected Stream testData() { .calledWithLiteralAt(3, "FAIL") .expectArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE, STRING_TYPE), - // Error: UPDATE_AFTER without UPDATE_BEFORE not supported + // Valid: upsert mapping (INSERT + UPDATE_AFTER + DELETE, no UPDATE_BEFORE) with + // PARTITION BY. FROM_CHANGELOG emits an upsert changelog keyed on the partition + // column. TestSpec.forStrategy( - "UPDATE_AFTER requires UPDATE_BEFORE", + "Valid upsert mapping with PARTITION BY", FROM_CHANGELOG_INPUT_TYPE_STRATEGY) - .calledWithArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE) + .calledWithArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE, STRING_TYPE) + .calledWithTableSemanticsAt( + 0, + new TableSemanticsMock( + TABLE_TYPE, new int[] {0}, new int[0], -1, null)) + .calledWithLiteralAt(1, ColumnList.of("op")) + .calledWithLiteralAt( + 2, + Map.of( + "c", "INSERT", + "u", "UPDATE_AFTER", + "d", "DELETE")) + .calledWithLiteralAt(3, "FAIL") + .expectArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE, STRING_TYPE), + + // Error: upsert mapping (INSERT + UPDATE_AFTER + DELETE) without PARTITION BY. + // Upsert mode requires a key, so the call must use set semantics. + TestSpec.forStrategy( + "Upsert mapping without PARTITION BY rejected", + FROM_CHANGELOG_INPUT_TYPE_STRATEGY) + .calledWithArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE, STRING_TYPE) .calledWithTableSemanticsAt(0, new TableSemanticsMock(TABLE_TYPE)) .calledWithLiteralAt(1, ColumnList.of("op")) .calledWithLiteralAt( @@ -152,7 +174,13 @@ protected Stream testData() { "c", "INSERT", "u", "UPDATE_AFTER", "d", "DELETE")) - .expectErrorMessage("must include UPDATE_BEFORE"), + .calledWithLiteralAt(3, "FAIL") + .expectErrorMessage( + "An 'op_mapping' that produces UPDATE_AFTER without " + + "UPDATE_BEFORE describes an upsert changelog and " + + "requires a key. Add PARTITION BY to the input table " + + "to define the upsert key, or include UPDATE_BEFORE in " + + "the mapping for retract semantics."), // Error: Invalid error_handling mode TestSpec.forStrategy( diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala index 6839cc1c28c13..fd646a34148b4 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala @@ -23,7 +23,7 @@ import org.apache.flink.table.api.InsertConflictStrategy.ConflictBehavior import org.apache.flink.table.api.config.ExecutionConfigOptions import org.apache.flink.table.api.config.ExecutionConfigOptions.UpsertMaterialize import org.apache.flink.table.connector.ChangelogMode -import org.apache.flink.table.functions.{BuiltInFunctionDefinition, ChangelogFunction} +import org.apache.flink.table.functions.{BuiltInFunctionDefinition, ChangelogFunction, TableSemantics} import org.apache.flink.table.functions.ChangelogFunction.ChangelogContext import org.apache.flink.table.planner.calcite.{FlinkTypeFactory, RexTableArgCall} import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction @@ -1678,8 +1678,8 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti val callContext = function.toCallContext(udfCall, inputTimeColumns, inputChangelogModes, outputChangelogMode) - // Expose a simplified context to let users focus on important characteristics. - // If necessary, we can expose the full CallContext in the future. + // Expose a simplified context focused on changelog-relevant inputs: changelog modes, + // resolved literal arguments, and table semantics (e.g., partition-by columns). new ChangelogContext { override def getTableChangelogMode(pos: Int): ChangelogMode = { val tableSemantics = callContext.getTableSemantics(pos).orElse(null) @@ -1692,6 +1692,14 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti override def getRequiredChangelogMode: ChangelogMode = { callContext.getOutputChangelogMode.orElse(null) } + + override def getArgumentValue[T](pos: Int, clazz: Class[T]): java.util.Optional[T] = { + callContext.getArgumentValue(pos, clazz) + } + + override def getTableSemantics(pos: Int): java.util.Optional[TableSemantics] = { + callContext.getTableSemantics(pos) + } } } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogSemanticTests.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogSemanticTests.java index c3922fb28adf1..449a3e138db94 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogSemanticTests.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogSemanticTests.java @@ -43,6 +43,8 @@ public List programs() { FromChangelogTestPrograms.CUSTOM_OP_MAPPING, FromChangelogTestPrograms.CUSTOM_OP_NAME, FromChangelogTestPrograms.RETRACT_PARTITION_BY, + FromChangelogTestPrograms.UPSERT_PARTITION_BY, + FromChangelogTestPrograms.UPSERT_PARTITION_BY_CUSTOM_MAPPING, FromChangelogTestPrograms.DELETION_FLAG_PARTITION_BY, FromChangelogTestPrograms.SKIP_INVALID_OP_HANDLING, FromChangelogTestPrograms.SKIP_NULL_OP_CODE, diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogTestPrograms.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogTestPrograms.java index a964a57bdbe84..71a8200381711 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogTestPrograms.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogTestPrograms.java @@ -200,6 +200,69 @@ public class FromChangelogTestPrograms { + "input => TABLE cdc_stream PARTITION BY id)") .build(); + public static final TableTestProgram UPSERT_PARTITION_BY = + TableTestProgram.of( + "from-changelog-upsert-partition-by", + "PARTITION BY + op_mapping without UPDATE_BEFORE produces an " + + "upsert changelog keyed on the partition columns") + .setupTableSource( + SourceTestStep.newBuilder("cdc_stream") + .addSchema("name STRING", "id INT", "op STRING") + .producedValues( + Row.of("Alice", 1, "INSERT"), + Row.of("Bob", 2, "INSERT"), + Row.of("Alice2", 1, "UPDATE_AFTER"), + Row.of("Bob", 2, "DELETE")) + .build()) + .setupTableSink( + SinkTestStep.newBuilder("sink") + .addSchema("id INT PRIMARY KEY NOT ENFORCED", "name STRING") + .consumedValues( + Row.ofKind(RowKind.INSERT, 1, "Alice"), + Row.ofKind(RowKind.INSERT, 2, "Bob"), + Row.ofKind(RowKind.UPDATE_AFTER, 1, "Alice2"), + Row.ofKind(RowKind.DELETE, 2, "Bob")) + .build()) + .runSql( + "INSERT INTO sink SELECT * FROM FROM_CHANGELOG(" + + "input => TABLE cdc_stream PARTITION BY id, " + + "op_mapping => MAP[" + + "'INSERT', 'INSERT', " + + "'UPDATE_AFTER', 'UPDATE_AFTER', " + + "'DELETE', 'DELETE'])") + .build(); + + public static final TableTestProgram UPSERT_PARTITION_BY_CUSTOM_MAPPING = + TableTestProgram.of( + "from-changelog-upsert-partition-by-custom-mapping", + "PARTITION BY + custom upsert op codes produces an upsert changelog") + .setupTableSource( + SourceTestStep.newBuilder("cdc_stream") + .addSchema("name STRING", "id INT", "op STRING") + .producedValues( + Row.of("Alice", 1, "c"), + Row.of("Bob", 2, "c"), + Row.of("Alice2", 1, "ua"), + Row.of("Bob", 2, "d")) + .build()) + .setupTableSink( + SinkTestStep.newBuilder("sink") + .addSchema("id INT PRIMARY KEY NOT ENFORCED", "name STRING") + .consumedValues( + Row.ofKind(RowKind.INSERT, 1, "Alice"), + Row.ofKind(RowKind.INSERT, 2, "Bob"), + Row.ofKind(RowKind.UPDATE_AFTER, 1, "Alice2"), + Row.ofKind(RowKind.DELETE, 2, "Bob")) + .build()) + .runSql( + "INSERT INTO sink SELECT * FROM FROM_CHANGELOG(" + + "input => TABLE cdc_stream PARTITION BY id, " + + "op_mapping => MAP[" + + "'c', 'INSERT', " + + "'ua', 'UPDATE_AFTER', " + + "'d', 'DELETE'])") + .build(); + public static final TableTestProgram DELETION_FLAG_PARTITION_BY = TableTestProgram.of( "from-changelog-deletion-flag-partition-by", diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/FromChangelogTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/FromChangelogTest.java index 17d7f64fa4812..50ea0e1834a0d 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/FromChangelogTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/FromChangelogTest.java @@ -71,4 +71,23 @@ void testRetractPartitionBy() { "SELECT * FROM FROM_CHANGELOG(input => TABLE cdc_stream PARTITION BY id)", CHANGELOG_MODE); } + + @Test + void testUpsertPartitionBy() { + util.tableEnv() + .executeSql( + "CREATE TABLE cdc_stream (" + + " id INT," + + " op STRING," + + " name STRING" + + ") WITH ('connector' = 'values')"); + util.verifyRelPlan( + "SELECT * FROM FROM_CHANGELOG(" + + "input => TABLE cdc_stream PARTITION BY id, " + + "op_mapping => MAP[" + + "'INSERT', 'INSERT', " + + "'UPDATE_AFTER', 'UPDATE_AFTER', " + + "'DELETE', 'DELETE'])", + CHANGELOG_MODE); + } } diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/FromChangelogTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/FromChangelogTest.xml index 2addfe9f5ff5f..0f6895162cfcd 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/FromChangelogTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/FromChangelogTest.xml @@ -50,6 +50,26 @@ LogicalProject(id=[$0], name=[$1]) + + + + + TABLE cdc_stream PARTITION BY id, op_mapping => MAP['INSERT', 'INSERT', 'UPDATE_AFTER', 'UPDATE_AFTER', 'DELETE', 'DELETE'])]]> + + + + + + From dc10ce52f6ec5d009e46f6dbf2159bd0ff3c8543 Mon Sep 17 00:00:00 2001 From: Ramin Gharib Date: Mon, 18 May 2026 17:04:50 +0200 Subject: [PATCH 2/6] [FLINK-39538][table] Address first round of Feedbacks --- .../docs/sql/reference/queries/changelog.md | 16 ++- .../flink/table/api/PartitionedTable.java | 6 +- .../strategies/FromChangelogTypeStrategy.java | 97 +++++++++---------- .../stream/FromChangelogSemanticTests.java | 1 - .../stream/FromChangelogTestPrograms.java | 31 ------ .../plan/stream/sql/FromChangelogTest.java | 2 +- .../plan/stream/sql/FromChangelogTest.xml | 2 +- .../table/planner/utils/TableTestBase.scala | 38 +++++++- 8 files changed, 97 insertions(+), 96 deletions(-) diff --git a/docs/content/docs/sql/reference/queries/changelog.md b/docs/content/docs/sql/reference/queries/changelog.md index e247ee613c09b..8fa058f555c0a 100644 --- a/docs/content/docs/sql/reference/queries/changelog.md +++ b/docs/content/docs/sql/reference/queries/changelog.md @@ -149,9 +149,17 @@ Prefer row semantics, when possible. `PARTITION BY` is only necessary when downs If you are producing an upsert table — that is, you are emitting `UPDATE_AFTER` but no `UPDATE_BEFORE` from your CDC input stream — the partition key you select here will be considered both the primary key and the upsert key by the engine. Make sure the `PARTITION BY` key matches your primary key exactly. -#### Upsert output +#### Upsert table -When `PARTITION BY` is combined with an `op_mapping` that does NOT include `UPDATE_BEFORE`, the output changelog is an upsert table keyed on the partition columns. Each input row produces an `INSERT`, `UPDATE_AFTER`, or `DELETE` event with the partition key acting as the upsert key. +To generate an upsert table, two requirements must be met: + +* **Key partitioning**: use `PARTITION BY `, where the partition key corresponds to the unique/primary key of the dataset. +* **Op mapping configuration**: the `op_mapping` must include `UPDATE_AFTER` and must NOT include `UPDATE_BEFORE`. + +The engine assumes that the keys provided in the `PARTITION BY` clause function as the unique upsert keys. The resulting output changelog becomes an upsert table keyed on these partition columns. Each incoming row is evaluated and produces `INSERT`, `UPDATE_AFTER`, or `DELETE` events, using the partition key as the explicit upsert key. Therefore, if the incoming changelog contains unique keys (such as a primary key), they **must** be used in the `PARTITION BY` clause. + +Note +- An `op_mapping` that produces `UPDATE_AFTER` without `UPDATE_BEFORE` describes an upsert changelog and requires a key. `PARTITION BY` must be present on the table argument; otherwise the call would produce key-less updates and is rejected at validation time with a `ValidationException`. ```sql -- Upsert input: INSERT / UPDATE_AFTER / DELETE only @@ -175,7 +183,9 @@ SELECT * FROM FROM_CHANGELOG( -- -D[id:2, name:'Bob'] ``` -Without `PARTITION BY`, or when the active `op_mapping` includes `UPDATE_BEFORE`, the output remains a retract changelog. +The `FROM_CHANGELOG` PTF assumes events arrive in order. If the source itself does not guarantee ordering for events sharing the same `PARTITION BY` key, use [`ORDER BY`]({{< ref "docs/dev/table/functions/ptfs" >}}#ordering) to reorder them. + +By default, without `PARTITION BY`, or when the active `op_mapping` includes `UPDATE_BEFORE`, the output remains a retract changelog. #### Ordering CDC events with ORDER BY diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PartitionedTable.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PartitionedTable.java index 863455329e16b..d827138cc523f 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PartitionedTable.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PartitionedTable.java @@ -230,9 +230,9 @@ public interface PartitionedTable { * *

diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/FromChangelogTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/FromChangelogTest.java index e0739cfcc2c6d..100d7b3e505a3 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/FromChangelogTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/FromChangelogTest.java @@ -20,6 +20,7 @@ import org.apache.flink.table.api.ExplainDetail; import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.planner.utils.PlanKind; import org.apache.flink.table.planner.utils.TableTestBase; import org.apache.flink.table.planner.utils.TableTestUtil; @@ -29,6 +30,8 @@ import java.util.Collections; import java.util.List; +import scala.Enumeration; + /** * Plan tests for the FROM_CHANGELOG built-in process table function. Uses {@link * ExplainDetail#CHANGELOG_MODE} to verify changelog mode propagation through the plan. @@ -81,13 +84,17 @@ void testUpsertPartitionBy() { + " op STRING," + " name STRING" + ") WITH ('connector' = 'values')"); - util.verifyRelPlanWithUpsertKey( + util.doVerifyPlan( "SELECT * FROM FROM_CHANGELOG(" + "input => TABLE cdc_stream PARTITION BY id, " + "op_mapping => MAP[" + "'INSERT', 'INSERT', " + "'UPDATE_AFTER', 'UPDATE_AFTER', " + "'DELETE', 'DELETE'])", - CHANGELOG_MODE); + new ExplainDetail[] {ExplainDetail.CHANGELOG_MODE}, + false, + new Enumeration.Value[] {PlanKind.AST(), PlanKind.OPT_REL()}, + false, + true); } } diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala index 839dd8f98d6d1..60bda8e43ffc6 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala @@ -617,31 +617,6 @@ abstract class TableTestUtilBase(test: TableTestBase, isStreamingMode: Boolean) verifyRelPlan(query, extraDetails.asScala: _*) } - /** - * Verify the AST and the optimized rel plan for the given SELECT query. The rendered optimized - * rel plan includes the `upsertKeys=[...]` term for rel nodes that derive upsert keys. - */ - def verifyRelPlanWithUpsertKey(query: String, extraDetails: ExplainDetail*): Unit = { - val table = getTableEnv.sqlQuery(query) - val relNode = TableTestUtil.toRelNode(table) - assertPlanEquals( - Array(relNode), - extraDetails.toArray, - withRowType = false, - Array(PlanKind.AST, PlanKind.OPT_REL), - () => assertEqualsOrExpand("sql", query), - withQueryBlockAlias = false, - withUpsertKey = true - ) - } - - /** Java-friendly overload that accepts a list of [[ExplainDetail]]s. */ - def verifyRelPlanWithUpsertKey( - query: String, - extraDetails: java.util.List[ExplainDetail]): Unit = { - verifyRelPlanWithUpsertKey(query, extraDetails.asScala: _*) - } - /** * Verify the AST (abstract syntax tree) and the optimized rel plan for the given INSERT * statement. @@ -1073,6 +1048,22 @@ abstract class TableTestUtilBase(test: TableTestBase, isStreamingMode: Boolean) withRowType: Boolean, expectedPlans: Array[PlanKind], withQueryBlockAlias: Boolean): Unit = { + doVerifyPlan( + query, + extraDetails, + withRowType, + expectedPlans, + withQueryBlockAlias, + withUpsertKey = false) + } + + def doVerifyPlan( + query: String, + extraDetails: Array[ExplainDetail], + withRowType: Boolean, + expectedPlans: Array[PlanKind], + withQueryBlockAlias: Boolean, + withUpsertKey: Boolean): Unit = { val table = getTableEnv.sqlQuery(query) val relNode = TableTestUtil.toRelNode(table) @@ -1082,7 +1073,8 @@ abstract class TableTestUtilBase(test: TableTestBase, isStreamingMode: Boolean) withRowType, expectedPlans, () => assertEqualsOrExpand("sql", query), - withQueryBlockAlias) + withQueryBlockAlias, + withUpsertKey = withUpsertKey) } /** From 935f9f8538d3e743a7bb300acf6a7f74d214b054 Mon Sep 17 00:00:00 2001 From: Ramin Gharib Date: Tue, 19 May 2026 17:36:21 +0200 Subject: [PATCH 6/6] [FLINK-39538][table] Address feedback --- .../org/apache/flink/table/planner/utils/TableTestBase.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala index 60bda8e43ffc6..368289cd1aca0 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala @@ -1074,7 +1074,7 @@ abstract class TableTestUtilBase(test: TableTestBase, isStreamingMode: Boolean) expectedPlans, () => assertEqualsOrExpand("sql", query), withQueryBlockAlias, - withUpsertKey = withUpsertKey) + withUpsertKey) } /**