diff --git a/docs/content/docs/sql/reference/queries/changelog.md b/docs/content/docs/sql/reference/queries/changelog.md index b02215129e5da..3a7a97534c3ae 100644 --- a/docs/content/docs/sql/reference/queries/changelog.md +++ b/docs/content/docs/sql/reference/queries/changelog.md @@ -149,6 +149,41 @@ Prefer row semantics, when possible. `PARTITION BY` is only necessary when downs If you are producing an upsert table — that is, you are emitting `UPDATE_AFTER` but no `UPDATE_BEFORE` from your CDC input stream — the partition key you select here will be considered both the primary key and the upsert key by the engine. Make sure the `PARTITION BY` key matches your primary key exactly. +#### Upsert table + +To generate an upsert table, two requirements must be met: + +* **Key partitioning**: use `PARTITION BY `, where the partition key corresponds to the unique/primary key of the dataset. +* **Op mapping configuration**: the `op_mapping` must include `UPDATE_AFTER` and must NOT include `UPDATE_BEFORE`. + +An `op_mapping` that produces `UPDATE_AFTER` without `UPDATE_BEFORE` describes an upsert changelog and requires a key. The engine assumes that the keys provided in the `PARTITION BY` clause function as the unique upsert keys. The resulting output changelog becomes an upsert table keyed on these partition columns. Each incoming row is evaluated and produces `INSERT`, `UPDATE_AFTER`, or `DELETE` events, using the partition key as the explicit upsert key. Therefore, if the incoming changelog contains unique keys (such as a primary key), they **must** be used in the `PARTITION BY` clause. + +```sql +-- Upsert input: INSERT / UPDATE_AFTER / DELETE only +-- +I[id:1, op:'INSERT', name:'Alice'] +-- +I[id:2, op:'INSERT', name:'Bob'] +-- +I[id:1, op:'UPDATE_AFTER', name:'Alice2'] +-- +I[id:2, op:'DELETE', name:'Bob'] + +SELECT * FROM FROM_CHANGELOG( + input => TABLE cdc_stream PARTITION BY id, + op_mapping => MAP[ + 'INSERT', 'INSERT', + 'UPDATE_AFTER', 'UPDATE_AFTER', + 'DELETE', 'DELETE'] +) + +-- Output (upsert changelog, upsert key = id): +-- +I[id:1, name:'Alice'] +-- +I[id:2, name:'Bob'] +-- +U[id:1, name:'Alice2'] +-- -D[id:2, name:'Bob'] +``` + +The `FROM_CHANGELOG` PTF assumes events arrive in order. If the source itself does not guarantee ordering for events sharing the same `PARTITION BY` key, use [`ORDER BY`]({{< ref "docs/dev/table/functions/ptfs" >}}#ordering) to reorder them. + +By default, without `PARTITION BY`, or when the active `op_mapping` includes `UPDATE_BEFORE`, the output remains a retract changelog. + #### Ordering CDC events with ORDER BY CDC streams can deliver events out of order. For example, a key's `UPDATE_AFTER` may arrive before its matching `UPDATE_BEFORE` when events are partitioned across upstream brokers. If the source itself does not guarantee ordering, applying such a changelog directly produces incorrect state. diff --git a/flink-python/pyflink/table/table.py b/flink-python/pyflink/table/table.py index bebf1cafa292a..583167fa47838 100644 --- a/flink-python/pyflink/table/table.py +++ b/flink-python/pyflink/table/table.py @@ -1234,6 +1234,12 @@ def from_changelog(self, *arguments: Expression) -> 'Table': in the mapping; pass ``error_handling => 'SKIP'`` to silently drop those rows instead. + The output is a retract changelog. To emit an upsert changelog instead, combine + ``PARTITION BY`` (set semantics on the table argument) with an ``op_mapping`` that + maps to ``UPDATE_AFTER`` without ``UPDATE_BEFORE``. The partition key becomes the + upsert key. An upsert mapping without ``PARTITION BY`` is rejected at validation + time, since upsert mode requires a key. + Example: :: diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PartitionedTable.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PartitionedTable.java index 3f65a22075b89..51086f1edfe22 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PartitionedTable.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PartitionedTable.java @@ -226,6 +226,18 @@ public interface PartitionedTable { *

For row semantics (each row processed independently), use {@link Table#fromChangelog} on * the unpartitioned table. * + *

Output changelog mode: + * + *

+ * *

Examples: * *

{@code
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java
index e7f690263a41c..0ffb90c5fcc02 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java
@@ -1489,6 +1489,11 @@ default TableResult executeInsert(
      *     .fromChangelog();
      * }
* + *

The output is a retract changelog. To emit an upsert changelog instead, combine {@code + * PARTITION BY} with an {@code op_mapping} that maps to {@code UPDATE_AFTER} without {@code + * UPDATE_BEFORE}. The partition key becomes the upsert key. See {@link + * PartitionedTable#fromChangelog(Expression...)} for details. + * *

Optional arguments can be passed using named expressions: * *

{@code
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
index 6d17bf880e841..cf5e7804e2c62 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
@@ -27,7 +27,6 @@
 import org.apache.flink.table.api.JsonType;
 import org.apache.flink.table.api.JsonValueOnEmptyOrError;
 import org.apache.flink.table.api.TableException;
-import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.expressions.TimeIntervalUnit;
 import org.apache.flink.table.expressions.TimePointUnit;
 import org.apache.flink.table.expressions.ValueLiteralExpression;
@@ -39,6 +38,7 @@
 import org.apache.flink.table.types.inference.TraitCondition;
 import org.apache.flink.table.types.inference.TypeStrategies;
 import org.apache.flink.table.types.inference.strategies.ArrayOfStringArgumentTypeStrategy;
+import org.apache.flink.table.types.inference.strategies.FromChangelogTypeStrategy;
 import org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies;
 import org.apache.flink.table.types.inference.strategies.SpecificTypeStrategies;
 import org.apache.flink.table.types.logical.LocalZonedTimestampType;
@@ -854,7 +854,7 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
                                     DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING()),
                                     true),
                             StaticArgument.scalar("error_handling", DataTypes.STRING(), true))
-                    .changelogModeStrategy(ctx -> ChangelogMode.all())
+                    .changelogModeStrategy(FromChangelogTypeStrategy.CHANGELOG_MODE_STRATEGY)
                     .inputTypeStrategy(FROM_CHANGELOG_INPUT_TYPE_STRATEGY)
                     .outputTypeStrategy(FROM_CHANGELOG_OUTPUT_TYPE_STRATEGY)
                     .runtimeClass(
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/ChangelogFunction.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/ChangelogFunction.java
index ad6e56d09d924..e9b16bff6cfc3 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/ChangelogFunction.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/ChangelogFunction.java
@@ -21,8 +21,11 @@
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.table.annotation.ArgumentTrait;
 import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.types.RowKind;
 
+import java.util.Optional;
+
 /**
  * An extension that allows a process table function (PTF) to emit results with changelog semantics.
  *
@@ -105,5 +108,28 @@ interface ChangelogContext {
          * are required and {@link ChangelogMode#keyOnlyDeletes()} are supported.
          */
         ChangelogMode getRequiredChangelogMode();
+
+        /**
+         * Returns the resolved literal value of the scalar argument at the given position.
+         *
+         * 

Returns empty if the argument is absent, NULL, not a literal, or cannot be expressed + * as an instance of the provided class. Conversions follow the default conversion classes + * of {@link LogicalType LogicalTypes}. + */ + default Optional getArgumentValue(int pos, Class clazz) { + return Optional.empty(); + } + + /** + * Returns information about the table that has been passed to a table argument at the given + * position. + * + *

Semantics are only available for table arguments. They expose, for example, the + * partition-by columns when the table argument uses {@link ArgumentTrait#SET_SEMANTIC_TABLE + * set semantics}. + */ + default Optional getTableSemantics(int pos) { + return Optional.empty(); + } } } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/FromChangelogTypeStrategy.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/FromChangelogTypeStrategy.java index f4d54f276d73c..13fa1245e6280 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/FromChangelogTypeStrategy.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/FromChangelogTypeStrategy.java @@ -22,6 +22,9 @@ import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.DataTypes.Field; import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.functions.ChangelogFunction.ChangelogContext; +import org.apache.flink.table.functions.ChangelogModeStrategy; import org.apache.flink.table.functions.TableSemantics; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.inference.CallContext; @@ -36,9 +39,11 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.OptionalInt; import java.util.Set; +import java.util.function.IntFunction; import java.util.stream.Collectors; /** Type strategies for the {@code FROM_CHANGELOG} process table function. */ @@ -95,6 +100,57 @@ public Optional> inferInputTypes( return Optional.of(DataTypes.ROW(outputFields).notNull()); }; + // -------------------------------------------------------------------------------------------- + // Changelog mode inference + // -------------------------------------------------------------------------------------------- + + /** + * Emits an upsert changelog when the input is partitioned (set semantics) and the resolved + * {@code op_mapping} maps to {@code UPDATE_AFTER} without {@code UPDATE_BEFORE}. In all other + * cases the output is a retract changelog. When upsert mode is selected, the partition key acts + * as the upsert key. + */ + public static final ChangelogModeStrategy CHANGELOG_MODE_STRATEGY = + ctx -> isUpsertConfig(ctx) ? ChangelogMode.upsert(false) : ChangelogMode.all(); + + /** + * Returns {@code true} when the FROM_CHANGELOG call should emit an upsert changelog: the input + * table is partitioned AND the resolved {@code op_mapping} contains {@code UPDATE_AFTER} + * without {@code UPDATE_BEFORE}. Falls back to {@code false} when the mapping is absent or + * cannot be resolved as a literal. The default mapping maps to a retract table. + */ + private static boolean isUpsertConfig(final ChangelogContext ctx) { + if (!isPartitioned(ctx::getTableSemantics)) { + return false; + } + return ctx.getArgumentValue(ARG_OP_MAPPING, Map.class) + .map(FromChangelogTypeStrategy::describesUpsert) + .orElse(false); + } + + /** + * Returns {@code true} if the mapping maps {@code UPDATE_AFTER} without {@code UPDATE_BEFORE}, + * i.e. it describes an upsert changelog. + */ + private static boolean describesUpsert(final Map mapping) { + return mappingContains(mapping, UPDATE_AFTER) && !mappingContains(mapping, UPDATE_BEFORE); + } + + private static boolean mappingContains( + final Map mapping, final String rowKindName) { + return mapping.values().stream() + .filter(Objects::nonNull) + .anyMatch(v -> rowKindName.equals(v.trim())); + } + + private static boolean isPartitioned( + final IntFunction> tableSemantics) { + return tableSemantics + .apply(ARG_TABLE) + .map(ts -> ts.partitionByColumns().length > 0) + .orElse(false); + } + // -------------------------------------------------------------------------------------------- // Helpers // -------------------------------------------------------------------------------------------- @@ -200,17 +256,17 @@ private static Optional> validateOpMapping( if (validationError.isPresent()) { return validationError; } - - final boolean hasUpdateBefore = - mapping.values().stream().anyMatch(v -> UPDATE_BEFORE.equals(v.trim())); - final boolean hasUpdateAfter = - mapping.values().stream().anyMatch(v -> UPDATE_AFTER.equals(v.trim())); - if (hasUpdateAfter && !hasUpdateBefore) { + // A mapping that produces UPDATE_AFTER without UPDATE_BEFORE describes an upsert + // changelog. Upsert mode requires a key, so PARTITION BY must be present on the + // table argument; otherwise the call would produce key-less updates. + if (describesUpsert(mapping) && !isPartitioned(callContext::getTableSemantics)) { return callContext.fail( throwOnFailure, - "The 'op_mapping' must include UPDATE_BEFORE for retract mode. " - + "Upsert mode (without UPDATE_BEFORE) is not supported " - + "in this version."); + "An 'op_mapping' that produces UPDATE_AFTER without UPDATE_BEFORE " + + "describes an upsert changelog and requires a key. Add " + + "PARTITION BY to the input table to define the upsert key, " + + "or include UPDATE_BEFORE in the mapping for retract " + + "semantics."); } } return Optional.empty(); diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/FromChangelogInputTypeStrategyTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/FromChangelogInputTypeStrategyTest.java index 322cf94a49e49..70595f1c5971f 100644 --- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/FromChangelogInputTypeStrategyTest.java +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/FromChangelogInputTypeStrategyTest.java @@ -139,11 +139,33 @@ protected Stream testData() { .calledWithLiteralAt(3, "FAIL") .expectArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE, STRING_TYPE), - // Error: UPDATE_AFTER without UPDATE_BEFORE not supported + // Valid: upsert mapping (INSERT + UPDATE_AFTER + DELETE, no UPDATE_BEFORE) with + // PARTITION BY. FROM_CHANGELOG emits an upsert changelog keyed on the partition + // column. TestSpec.forStrategy( - "UPDATE_AFTER requires UPDATE_BEFORE", + "Valid upsert mapping with PARTITION BY", FROM_CHANGELOG_INPUT_TYPE_STRATEGY) - .calledWithArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE) + .calledWithArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE, STRING_TYPE) + .calledWithTableSemanticsAt( + 0, + new TableSemanticsMock( + TABLE_TYPE, new int[] {0}, new int[0], -1, null)) + .calledWithLiteralAt(1, ColumnList.of("op")) + .calledWithLiteralAt( + 2, + Map.of( + "c", "INSERT", + "u", "UPDATE_AFTER", + "d", "DELETE")) + .calledWithLiteralAt(3, "FAIL") + .expectArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE, STRING_TYPE), + + // Error: upsert mapping (INSERT + UPDATE_AFTER + DELETE) without PARTITION BY. + // Upsert mode requires a key, so the call must use set semantics. + TestSpec.forStrategy( + "Upsert mapping without PARTITION BY rejected", + FROM_CHANGELOG_INPUT_TYPE_STRATEGY) + .calledWithArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE, STRING_TYPE) .calledWithTableSemanticsAt(0, new TableSemanticsMock(TABLE_TYPE)) .calledWithLiteralAt(1, ColumnList.of("op")) .calledWithLiteralAt( @@ -152,7 +174,13 @@ protected Stream testData() { "c", "INSERT", "u", "UPDATE_AFTER", "d", "DELETE")) - .expectErrorMessage("must include UPDATE_BEFORE"), + .calledWithLiteralAt(3, "FAIL") + .expectErrorMessage( + "An 'op_mapping' that produces UPDATE_AFTER without " + + "UPDATE_BEFORE describes an upsert changelog and " + + "requires a key. Add PARTITION BY to the input table " + + "to define the upsert key, or include UPDATE_BEFORE in " + + "the mapping for retract semantics."), // Error: Invalid error_handling mode TestSpec.forStrategy( diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala index 6839cc1c28c13..fd646a34148b4 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala @@ -23,7 +23,7 @@ import org.apache.flink.table.api.InsertConflictStrategy.ConflictBehavior import org.apache.flink.table.api.config.ExecutionConfigOptions import org.apache.flink.table.api.config.ExecutionConfigOptions.UpsertMaterialize import org.apache.flink.table.connector.ChangelogMode -import org.apache.flink.table.functions.{BuiltInFunctionDefinition, ChangelogFunction} +import org.apache.flink.table.functions.{BuiltInFunctionDefinition, ChangelogFunction, TableSemantics} import org.apache.flink.table.functions.ChangelogFunction.ChangelogContext import org.apache.flink.table.planner.calcite.{FlinkTypeFactory, RexTableArgCall} import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction @@ -1678,8 +1678,8 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti val callContext = function.toCallContext(udfCall, inputTimeColumns, inputChangelogModes, outputChangelogMode) - // Expose a simplified context to let users focus on important characteristics. - // If necessary, we can expose the full CallContext in the future. + // Expose a simplified context focused on changelog-relevant inputs: changelog modes, + // resolved literal arguments, and table semantics (e.g., partition-by columns). new ChangelogContext { override def getTableChangelogMode(pos: Int): ChangelogMode = { val tableSemantics = callContext.getTableSemantics(pos).orElse(null) @@ -1692,6 +1692,14 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti override def getRequiredChangelogMode: ChangelogMode = { callContext.getOutputChangelogMode.orElse(null) } + + override def getArgumentValue[T](pos: Int, clazz: Class[T]): java.util.Optional[T] = { + callContext.getArgumentValue(pos, clazz) + } + + override def getTableSemantics(pos: Int): java.util.Optional[TableSemantics] = { + callContext.getTableSemantics(pos) + } } } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogSemanticTests.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogSemanticTests.java index c3922fb28adf1..2892d22eda95e 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogSemanticTests.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogSemanticTests.java @@ -43,6 +43,7 @@ public List programs() { FromChangelogTestPrograms.CUSTOM_OP_MAPPING, FromChangelogTestPrograms.CUSTOM_OP_NAME, FromChangelogTestPrograms.RETRACT_PARTITION_BY, + FromChangelogTestPrograms.UPSERT_PARTITION_BY, FromChangelogTestPrograms.DELETION_FLAG_PARTITION_BY, FromChangelogTestPrograms.SKIP_INVALID_OP_HANDLING, FromChangelogTestPrograms.SKIP_NULL_OP_CODE, diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogTestPrograms.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogTestPrograms.java index a964a57bdbe84..2ab92f052d700 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogTestPrograms.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogTestPrograms.java @@ -200,6 +200,38 @@ public class FromChangelogTestPrograms { + "input => TABLE cdc_stream PARTITION BY id)") .build(); + public static final TableTestProgram UPSERT_PARTITION_BY = + TableTestProgram.of( + "from-changelog-upsert-partition-by", + "PARTITION BY + op_mapping without UPDATE_BEFORE produces an " + + "upsert changelog keyed on the partition columns") + .setupTableSource( + SourceTestStep.newBuilder("cdc_stream") + .addSchema("name STRING", "id INT", "op STRING") + .producedValues( + Row.of("Alice", 1, "INSERT"), + Row.of("Bob", 2, "INSERT"), + Row.of("Alice2", 1, "UPDATE_AFTER"), + Row.of("Bob", 2, "DELETE")) + .build()) + .setupTableSink( + SinkTestStep.newBuilder("sink") + .addSchema("id INT PRIMARY KEY NOT ENFORCED", "name STRING") + .consumedValues( + Row.ofKind(RowKind.INSERT, 1, "Alice"), + Row.ofKind(RowKind.INSERT, 2, "Bob"), + Row.ofKind(RowKind.UPDATE_AFTER, 1, "Alice2"), + Row.ofKind(RowKind.DELETE, 2, "Bob")) + .build()) + .runSql( + "INSERT INTO sink SELECT * FROM FROM_CHANGELOG(" + + "input => TABLE cdc_stream PARTITION BY id, " + + "op_mapping => MAP[" + + "'INSERT', 'INSERT', " + + "'UPDATE_AFTER', 'UPDATE_AFTER', " + + "'DELETE', 'DELETE'])") + .build(); + public static final TableTestProgram DELETION_FLAG_PARTITION_BY = TableTestProgram.of( "from-changelog-deletion-flag-partition-by", diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/FromChangelogTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/FromChangelogTest.java index 17d7f64fa4812..100d7b3e505a3 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/FromChangelogTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/FromChangelogTest.java @@ -20,6 +20,7 @@ import org.apache.flink.table.api.ExplainDetail; import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.planner.utils.PlanKind; import org.apache.flink.table.planner.utils.TableTestBase; import org.apache.flink.table.planner.utils.TableTestUtil; @@ -29,6 +30,8 @@ import java.util.Collections; import java.util.List; +import scala.Enumeration; + /** * Plan tests for the FROM_CHANGELOG built-in process table function. Uses {@link * ExplainDetail#CHANGELOG_MODE} to verify changelog mode propagation through the plan. @@ -71,4 +74,27 @@ void testRetractPartitionBy() { "SELECT * FROM FROM_CHANGELOG(input => TABLE cdc_stream PARTITION BY id)", CHANGELOG_MODE); } + + @Test + void testUpsertPartitionBy() { + util.tableEnv() + .executeSql( + "CREATE TABLE cdc_stream (" + + " id INT," + + " op STRING," + + " name STRING" + + ") WITH ('connector' = 'values')"); + util.doVerifyPlan( + "SELECT * FROM FROM_CHANGELOG(" + + "input => TABLE cdc_stream PARTITION BY id, " + + "op_mapping => MAP[" + + "'INSERT', 'INSERT', " + + "'UPDATE_AFTER', 'UPDATE_AFTER', " + + "'DELETE', 'DELETE'])", + new ExplainDetail[] {ExplainDetail.CHANGELOG_MODE}, + false, + new Enumeration.Value[] {PlanKind.AST(), PlanKind.OPT_REL()}, + false, + true); + } } diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/FromChangelogTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/FromChangelogTest.xml index 2addfe9f5ff5f..5e632be57881d 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/FromChangelogTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/FromChangelogTest.xml @@ -50,6 +50,26 @@ LogicalProject(id=[$0], name=[$1]) + + + + + TABLE cdc_stream PARTITION BY id, op_mapping => MAP['INSERT', 'INSERT', 'UPDATE_AFTER', 'UPDATE_AFTER', 'DELETE', 'DELETE'])]]> + + + + + + diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala index 3e8d5729ee73c..368289cd1aca0 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala @@ -614,7 +614,7 @@ abstract class TableTestUtilBase(test: TableTestBase, isStreamingMode: Boolean) /** Java-friendly overload that accepts a list of [[ExplainDetail]]s. */ def verifyRelPlan(query: String, extraDetails: java.util.List[ExplainDetail]): Unit = { - verifyRelPlan(query, extraDetails.asScala.toSeq: _*) + verifyRelPlan(query, extraDetails.asScala: _*) } /** @@ -1048,6 +1048,22 @@ abstract class TableTestUtilBase(test: TableTestBase, isStreamingMode: Boolean) withRowType: Boolean, expectedPlans: Array[PlanKind], withQueryBlockAlias: Boolean): Unit = { + doVerifyPlan( + query, + extraDetails, + withRowType, + expectedPlans, + withQueryBlockAlias, + withUpsertKey = false) + } + + def doVerifyPlan( + query: String, + extraDetails: Array[ExplainDetail], + withRowType: Boolean, + expectedPlans: Array[PlanKind], + withQueryBlockAlias: Boolean, + withUpsertKey: Boolean): Unit = { val table = getTableEnv.sqlQuery(query) val relNode = TableTestUtil.toRelNode(table) @@ -1057,7 +1073,8 @@ abstract class TableTestUtilBase(test: TableTestBase, isStreamingMode: Boolean) withRowType, expectedPlans, () => assertEqualsOrExpand("sql", query), - withQueryBlockAlias) + withQueryBlockAlias, + withUpsertKey) } /** @@ -1192,7 +1209,8 @@ abstract class TableTestUtilBase(test: TableTestBase, isStreamingMode: Boolean) expectedPlans: Array[PlanKind], assertSqlEqualsOrExpandFunc: () => Unit, withQueryBlockAlias: Boolean = false, - withDuplicateChanges: Boolean = false): Unit = { + withDuplicateChanges: Boolean = false, + withUpsertKey: Boolean = false): Unit = { val expectedPlanKinds = new util.HashSet[PlanKind](expectedPlans.toSeq.asJava) @@ -1219,7 +1237,8 @@ abstract class TableTestUtilBase(test: TableTestBase, isStreamingMode: Boolean) optimizedRels.toArray, extraDetails, withRowType = withRowType, - withDuplicateChanges = withDuplicateChanges) + withDuplicateChanges = withDuplicateChanges, + withUpsertKey = withUpsertKey) // build optimized exec plan if `expectedPlanKinds` contains OPT_EXEC val optimizedExecPlan = if (expectedPlanKinds.contains(PlanKind.OPT_EXEC)) { @@ -1276,7 +1295,8 @@ abstract class TableTestUtilBase(test: TableTestBase, isStreamingMode: Boolean) optimizedRels: Array[RelNode], extraDetails: Array[ExplainDetail], withRowType: Boolean, - withDuplicateChanges: Boolean): String = { + withDuplicateChanges: Boolean, + withUpsertKey: Boolean = false): String = { require(optimizedRels.nonEmpty) val explainLevel = if (extraDetails.contains(ExplainDetail.ESTIMATED_COST)) { SqlExplainLevel.ALL_ATTRIBUTES @@ -1303,7 +1323,9 @@ abstract class TableTestUtilBase(test: TableTestBase, isStreamingMode: Boolean) detailLevel = explainLevel, withChangelogTraits = withChangelogTraits, withRowType = withRowType, - withDuplicateChangesTrait = withDuplicateChanges) + withUpsertKey = withUpsertKey, + withDuplicateChangesTrait = withDuplicateChanges + ) } .mkString("\n") }