From 097a978b74ad02620909e666fd5b2e931078b1f9 Mon Sep 17 00:00:00 2001 From: Gustavo de Morais Date: Sun, 29 Mar 2026 17:48:27 +0200 Subject: [PATCH 1/4] [FLINK-39349][table] Support comma-separated RowKind keys in TO_CHANGELOG op_mapping --- .../strategies/ToChangelogTypeStrategy.java | 53 +++++++++++++++--- .../exec/stream/ToChangelogSemanticTests.java | 4 +- .../exec/stream/ToChangelogTestPrograms.java | 55 ++++++++++++++++++- .../functions/ptf/ToChangelogFunction.java | 15 ++++- 4 files changed, 113 insertions(+), 14 deletions(-) diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToChangelogTypeStrategy.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToChangelogTypeStrategy.java index cb04e7202099c..c83eff3f38440 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToChangelogTypeStrategy.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToChangelogTypeStrategy.java @@ -35,6 +35,7 @@ import org.apache.flink.types.ColumnList; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; @@ -140,21 +141,55 @@ private static Optional> validateInputs( final Optional opMapping = callContext.getArgumentValue(2, Map.class); if (opMapping.isPresent()) { - final boolean hasInvalidMappingKey = - opMapping.get().keySet().stream() - .anyMatch( - key -> - !(key instanceof String) - || !VALID_ROW_KIND_NAMES.contains(key)); - if (hasInvalidMappingKey) { - return callContext.fail( - throwOnFailure, "Invalid target mapping for argument 'op_mapping'."); + final Optional> validationError = + validateOpMappingKeys(callContext, opMapping.get(), throwOnFailure); + if (validationError.isPresent()) { + return validationError; } } return Optional.of(callContext.getArgumentDataTypes()); } + /** + * Validates op_mapping keys. Keys may be comma-separated RowKind names (e.g., "INSERT, + * UPDATE_AFTER") to map multiple RowKinds to the same output code. Each individual RowKind must + * be valid and appear at most once across all entries. + */ + @SuppressWarnings("rawtypes") + private static Optional> validateOpMappingKeys( + final CallContext callContext, final Map opMapping, final boolean throwOnFailure) { + final Set allRowKindsSeen = new HashSet<>(); + for (final Object key : opMapping.keySet()) { + if (!(key instanceof String)) { + return callContext.fail( + throwOnFailure, "Invalid target mapping for argument 'op_mapping'."); + } + final String[] rowKindNames = ((String) key).split(","); + for (final String rawName : rowKindNames) { + final String rowKindName = rawName.trim(); + if (!VALID_ROW_KIND_NAMES.contains(rowKindName)) { + return callContext.fail( + throwOnFailure, + String.format( + "Invalid target mapping for argument 'op_mapping'. " + + "Unknown RowKind: '%s'. Valid values are: %s.", + rowKindName, VALID_ROW_KIND_NAMES)); + } + final boolean isDuplicate = !allRowKindsSeen.add(rowKindName); + if (isDuplicate) { + return callContext.fail( + throwOnFailure, + String.format( + "Invalid target mapping for argument 'op_mapping'. " + + "Duplicate RowKind: '%s'.", + rowKindName)); + } + } + } + return Optional.empty(); + } + private static String resolveOpColumnName(final CallContext callContext) { return callContext .getArgumentValue(1, ColumnList.class) diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogSemanticTests.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogSemanticTests.java index 798044effa7d1..706f7abd1d3d1 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogSemanticTests.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogSemanticTests.java @@ -46,8 +46,10 @@ public List programs() { ToChangelogTestPrograms.TABLE_API_DEFAULT, ToChangelogTestPrograms.LAG_ON_UPSERT_VIA_CHANGELOG, ToChangelogTestPrograms.LAG_ON_RETRACT_VIA_CHANGELOG, + ToChangelogTestPrograms.DELETION_FLAG, ToChangelogTestPrograms.MISSING_PARTITION_BY, ToChangelogTestPrograms.INVALID_DESCRIPTOR, - ToChangelogTestPrograms.INVALID_OP_MAPPING); + ToChangelogTestPrograms.INVALID_OP_MAPPING, + ToChangelogTestPrograms.DUPLICATE_ROW_KIND); } } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java index 22602bcf38a70..54a412a71869a 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java @@ -322,6 +322,46 @@ public class ToChangelogTestPrograms { + "FROM orders_changelog") .build(); + // -------------------------------------------------------------------------------------------- + // Use case: deletion flag pattern (comma-separated RowKind keys) + // -------------------------------------------------------------------------------------------- + + /** + * Kafka Connect style deletion flag: INSERT and UPDATE_AFTER both produce deleted='false', + * DELETE produces deleted='true', and UPDATE_BEFORE is silently dropped. + */ + public static final TableTestProgram DELETION_FLAG = + TableTestProgram.of( + "to-changelog-deletion-flag", + "comma-separated RowKinds produce deletion flag output") + .setupTableSource( + SourceTestStep.newBuilder("t") + .addSchema( + "name STRING PRIMARY KEY NOT ENFORCED", "score BIGINT") + .addMode(ChangelogMode.all()) + .producedValues( + Row.ofKind(RowKind.INSERT, "Alice", 10L), + Row.ofKind(RowKind.INSERT, "Bob", 20L), + Row.ofKind(RowKind.UPDATE_BEFORE, "Alice", 10L), + Row.ofKind(RowKind.UPDATE_AFTER, "Alice", 30L), + Row.ofKind(RowKind.DELETE, "Bob", 20L)) + .build()) + .setupTableSink( + SinkTestStep.newBuilder("sink") + .addSchema("name STRING", "deleted STRING", "score BIGINT") + .consumedValues( + "+I[Alice, false, 10]", + "+I[Bob, false, 20]", + "+I[Alice, false, 30]", + "+I[Bob, true, 20]") + .build()) + .runSql( + "INSERT INTO sink SELECT * FROM TO_CHANGELOG(" + + "input => TABLE t PARTITION BY name, " + + "op => DESCRIPTOR(deleted), " + + "op_mapping => MAP['INSERT, UPDATE_AFTER', 'false', 'DELETE', 'true'])") + .build(); + // -------------------------------------------------------------------------------------------- // Error validation tests // -------------------------------------------------------------------------------------------- @@ -360,6 +400,19 @@ public class ToChangelogTestPrograms { + "input => TABLE t PARTITION BY id, " + "op_mapping => MAP['INVALID_KIND', 'X'])", ValidationException.class, - "Invalid target mapping for argument 'op_mapping'.") + "Unknown RowKind: 'INVALID_KIND'") + .build(); + + public static final TableTestProgram DUPLICATE_ROW_KIND = + TableTestProgram.of( + "to-changelog-duplicate-rowkind", + "fails when a RowKind appears in multiple op_mapping entries") + .setupTableSource(SIMPLE_SOURCE) + .runFailingSql( + "SELECT * FROM TO_CHANGELOG(" + + "input => TABLE t PARTITION BY id, " + + "op_mapping => MAP['INSERT, DELETE', 'A', 'DELETE', 'B'])", + ValidationException.class, + "Duplicate RowKind: 'DELETE'") .build(); } diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/ToChangelogFunction.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/ToChangelogFunction.java index 75211e164f533..f16ea33375f99 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/ToChangelogFunction.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/ToChangelogFunction.java @@ -109,13 +109,22 @@ private static int[] buildNonPartitionIndices( return IntStream.range(0, fieldCount).filter(i -> !partitionKeySet.contains(i)).toArray(); } + /** + * Builds a RowKind-to-output-code map. Keys may be comma-separated (e.g., "INSERT, + * UPDATE_AFTER") to map multiple RowKinds to the same output code. + */ private static Map buildOpMap(@Nullable final Map opMapping) { if (opMapping == null) { return new EnumMap<>(DEFAULT_OP_MAPPING); } - final Map map = new EnumMap<>(RowKind.class); - opMapping.forEach((name, code) -> map.put(RowKind.valueOf(name), code)); - return map; + final Map result = new EnumMap<>(RowKind.class); + opMapping.forEach( + (commaSeparatedRowKinds, outputCode) -> { + for (final String rawName : commaSeparatedRowKinds.split(",")) { + result.put(RowKind.valueOf(rawName.trim()), outputCode); + } + }); + return result; } public void eval( From 5c04b7facd54f8017e6a01a6389887ee73a8dfac Mon Sep 17 00:00:00 2001 From: Gustavo de Morais Date: Sun, 29 Mar 2026 17:48:30 +0200 Subject: [PATCH 2/4] [FLINK-39349][table] Document deletion flag pattern for TO_CHANGELOG --- .../docs/sql/reference/queries/changelog.md | 21 ++++++++++++++++++- .../flink/table/api/PartitionedTable.java | 6 ++++++ 2 files changed, 26 insertions(+), 1 deletion(-) diff --git a/docs/content/docs/sql/reference/queries/changelog.md b/docs/content/docs/sql/reference/queries/changelog.md index d7e6e29668ab7..c5def368588f1 100644 --- a/docs/content/docs/sql/reference/queries/changelog.md +++ b/docs/content/docs/sql/reference/queries/changelog.md @@ -56,7 +56,7 @@ SELECT * FROM TO_CHANGELOG( |:-------------|:---------|:------------| | `input` | Yes | The input table. Must include `PARTITION BY` for parallel execution. Accepts insert-only, retract, and upsert tables. | | `op` | No | A `DESCRIPTOR` with a single column name for the operation code column. Defaults to `op`. | -| `op_mapping` | No | A `MAP` mapping `RowKind` names to custom output codes. When provided, only mapped RowKinds are forwarded - unmapped events are dropped. | +| `op_mapping` | No | A `MAP` mapping `RowKind` names to custom output codes. Keys can contain comma-separated RowKind names to map multiple RowKinds to the same code (e.g., `'INSERT, UPDATE_AFTER'`). When provided, only mapped RowKinds are forwarded - unmapped events are dropped. Each RowKind may appear at most once across all entries. | #### Default op_mapping @@ -122,6 +122,19 @@ SELECT * FROM TO_CHANGELOG( -- op_code values are 'I' and 'U' instead of full names ``` +#### Deletion flag pattern + +```sql +SELECT * FROM TO_CHANGELOG( + input => TABLE my_aggregation PARTITION BY id, + op => DESCRIPTOR(deleted), + op_mapping => MAP['INSERT, UPDATE_AFTER', 'false', 'DELETE', 'true'] +) +-- INSERT and UPDATE_AFTER produce deleted='false' +-- DELETE produces deleted='true' +-- UPDATE_BEFORE is dropped (not in the mapping) +``` + #### Table API ```java @@ -133,6 +146,12 @@ Table result = myTable.partitionBy($("id")).toChangelog( descriptor("op_code").asArgument("op"), map("INSERT", "I", "UPDATE_AFTER", "U").asArgument("op_mapping") ); + +// Deletion flag pattern: comma-separated keys map multiple RowKinds to the same code +Table result = myTable.partitionBy($("id")).toChangelog( + descriptor("deleted").asArgument("op"), + map("INSERT, UPDATE_AFTER", "false", "DELETE", "true").asArgument("op_mapping") +); ``` {{< top >}} diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PartitionedTable.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PartitionedTable.java index 8d5f1c91b286a..f2eb12fbfde5c 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PartitionedTable.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PartitionedTable.java @@ -159,6 +159,12 @@ public interface PartitionedTable { * descriptor("op_code").asArgument("op"), * map("INSERT", "I", "UPDATE_AFTER", "U").asArgument("op_mapping") * ); + * + * // Deletion flag pattern: comma-separated keys map multiple RowKinds to the same code + * table.partitionBy($("id")).toChangelog( + * descriptor("deleted").asArgument("op"), + * map("INSERT, UPDATE_AFTER", "false", "DELETE", "true").asArgument("op_mapping") + * ); * } * * @param arguments optional named arguments for {@code op} and {@code op_mapping} From 305a6f0f4d0d80f08c163058345340bdb5a3dd6d Mon Sep 17 00:00:00 2001 From: Gustavo de Morais Date: Mon, 30 Mar 2026 10:10:30 +0200 Subject: [PATCH 3/4] [FLINK-39349][table] Use 'change operation' instead of 'RowKind' in user-facing text --- .../docs/sql/reference/queries/changelog.md | 10 +++++----- .../flink/table/api/PartitionedTable.java | 4 ++-- .../strategies/ToChangelogTypeStrategy.java | 4 ++-- .../exec/stream/ToChangelogTestPrograms.java | 18 +++++++++--------- 4 files changed, 18 insertions(+), 18 deletions(-) diff --git a/docs/content/docs/sql/reference/queries/changelog.md b/docs/content/docs/sql/reference/queries/changelog.md index c5def368588f1..15ad336f4d0d4 100644 --- a/docs/content/docs/sql/reference/queries/changelog.md +++ b/docs/content/docs/sql/reference/queries/changelog.md @@ -36,7 +36,7 @@ Flink SQL provides built-in process table functions (PTFs) for working with chan ## TO_CHANGELOG -The `TO_CHANGELOG` PTF converts a dynamic table (i.e. an updating table) into an append-only table with an explicit operation code column. Each input row - regardless of its original `RowKind` (INSERT, UPDATE_BEFORE, UPDATE_AFTER, DELETE) - is emitted as an INSERT-only row with a string column indicating the original operation. +The `TO_CHANGELOG` PTF converts a dynamic table (i.e. an updating table) into an append-only table with an explicit operation code column. Each input row - regardless of its original change operation (INSERT, UPDATE_BEFORE, UPDATE_AFTER, DELETE) - is emitted as an INSERT-only row with a string column indicating the original operation. This is useful when you need to materialize changelog events into a downstream system that only supports appends (e.g., a message queue, log store, or append-only file sink). It is also useful to filter out certain types of updates, for example DELETEs. @@ -56,13 +56,13 @@ SELECT * FROM TO_CHANGELOG( |:-------------|:---------|:------------| | `input` | Yes | The input table. Must include `PARTITION BY` for parallel execution. Accepts insert-only, retract, and upsert tables. | | `op` | No | A `DESCRIPTOR` with a single column name for the operation code column. Defaults to `op`. | -| `op_mapping` | No | A `MAP` mapping `RowKind` names to custom output codes. Keys can contain comma-separated RowKind names to map multiple RowKinds to the same code (e.g., `'INSERT, UPDATE_AFTER'`). When provided, only mapped RowKinds are forwarded - unmapped events are dropped. Each RowKind may appear at most once across all entries. | +| `op_mapping` | No | A `MAP` mapping change operation names to custom output codes. Keys can contain comma-separated names to map multiple operations to the same code (e.g., `'INSERT, UPDATE_AFTER'`). When provided, only mapped operations are forwarded - unmapped events are dropped. Each change operation may appear at most once across all entries. | #### Default op_mapping -When `op_mapping` is omitted, all four RowKinds are mapped to their standard names: +When `op_mapping` is omitted, all four change operations are mapped to their standard names: -| RowKind | Output value | +| Change Operation | Output value | |:----------------|:------------------| | INSERT | `'INSERT'` | | UPDATE_BEFORE | `'UPDATE_BEFORE'` | @@ -147,7 +147,7 @@ Table result = myTable.partitionBy($("id")).toChangelog( map("INSERT", "I", "UPDATE_AFTER", "U").asArgument("op_mapping") ); -// Deletion flag pattern: comma-separated keys map multiple RowKinds to the same code +// Deletion flag pattern: comma-separated keys map multiple change operations to the same code Table result = myTable.partitionBy($("id")).toChangelog( descriptor("deleted").asArgument("op"), map("INSERT, UPDATE_AFTER", "false", "DELETE", "true").asArgument("op_mapping") diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PartitionedTable.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PartitionedTable.java index f2eb12fbfde5c..b6cb219bc7432 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PartitionedTable.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PartitionedTable.java @@ -144,7 +144,7 @@ public interface PartitionedTable { * Converts this dynamic table into an append-only table with an explicit operation code column * using the built-in {@code TO_CHANGELOG} process table function. * - *

Each input row - regardless of its original RowKind - is emitted as an INSERT-only row + *

Each input row - regardless of its original change operation - is emitted as an INSERT-only row * with a string {@code "op"} column indicating the original operation (INSERT, UPDATE_AFTER, * DELETE, etc.). * @@ -160,7 +160,7 @@ public interface PartitionedTable { * map("INSERT", "I", "UPDATE_AFTER", "U").asArgument("op_mapping") * ); * - * // Deletion flag pattern: comma-separated keys map multiple RowKinds to the same code + * // Deletion flag pattern: comma-separated keys map multiple change operations to the same code * table.partitionBy($("id")).toChangelog( * descriptor("deleted").asArgument("op"), * map("INSERT, UPDATE_AFTER", "false", "DELETE", "true").asArgument("op_mapping") diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToChangelogTypeStrategy.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToChangelogTypeStrategy.java index c83eff3f38440..81643e451db1b 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToChangelogTypeStrategy.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToChangelogTypeStrategy.java @@ -173,7 +173,7 @@ private static Optional> validateOpMappingKeys( throwOnFailure, String.format( "Invalid target mapping for argument 'op_mapping'. " - + "Unknown RowKind: '%s'. Valid values are: %s.", + + "Unknown change operation: '%s'. Valid values are: %s.", rowKindName, VALID_ROW_KIND_NAMES)); } final boolean isDuplicate = !allRowKindsSeen.add(rowKindName); @@ -182,7 +182,7 @@ private static Optional> validateOpMappingKeys( throwOnFailure, String.format( "Invalid target mapping for argument 'op_mapping'. " - + "Duplicate RowKind: '%s'.", + + "Duplicate change operation: '%s'.", rowKindName)); } } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java index 54a412a71869a..bfc670971121d 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java @@ -96,7 +96,7 @@ public class ToChangelogTestPrograms { public static final TableTestProgram CUSTOM_OP_MAPPING = TableTestProgram.of( "to-changelog-custom-op-mapping", - "custom op_mapping maps RowKinds to user-defined codes and drops unmapped") + "custom op_mapping maps change operations to user-defined codes and drops unmapped") .setupTableSource( SourceTestStep.newBuilder("t") .addSchema( @@ -323,17 +323,17 @@ public class ToChangelogTestPrograms { .build(); // -------------------------------------------------------------------------------------------- - // Use case: deletion flag pattern (comma-separated RowKind keys) + // Use case: deletion flag pattern (comma-separated change operation keys) // -------------------------------------------------------------------------------------------- /** - * Kafka Connect style deletion flag: INSERT and UPDATE_AFTER both produce deleted='false', - * DELETE produces deleted='true', and UPDATE_BEFORE is silently dropped. + * Kafka Connect style deletion flag: INSERT and UPDATE_AFTER both produce deleted='false' and + * DELETE produces deleted='true'. UPDATE_BEFORE is silently dropped. */ public static final TableTestProgram DELETION_FLAG = TableTestProgram.of( "to-changelog-deletion-flag", - "comma-separated RowKinds produce deletion flag output") + "comma-separated change operations produce deletion flag output") .setupTableSource( SourceTestStep.newBuilder("t") .addSchema( @@ -393,26 +393,26 @@ public class ToChangelogTestPrograms { public static final TableTestProgram INVALID_OP_MAPPING = TableTestProgram.of( "to-changelog-invalid-op-mapping", - "fails when op_mapping has invalid RowKind name") + "fails when op_mapping has invalid change operation name") .setupTableSource(SIMPLE_SOURCE) .runFailingSql( "SELECT * FROM TO_CHANGELOG(" + "input => TABLE t PARTITION BY id, " + "op_mapping => MAP['INVALID_KIND', 'X'])", ValidationException.class, - "Unknown RowKind: 'INVALID_KIND'") + "Unknown change operation: 'INVALID_KIND'") .build(); public static final TableTestProgram DUPLICATE_ROW_KIND = TableTestProgram.of( "to-changelog-duplicate-rowkind", - "fails when a RowKind appears in multiple op_mapping entries") + "fails when a change operation appears in multiple op_mapping entries") .setupTableSource(SIMPLE_SOURCE) .runFailingSql( "SELECT * FROM TO_CHANGELOG(" + "input => TABLE t PARTITION BY id, " + "op_mapping => MAP['INSERT, DELETE', 'A', 'DELETE', 'B'])", ValidationException.class, - "Duplicate RowKind: 'DELETE'") + "Duplicate change operation: 'DELETE'") .build(); } From e47fadb241e1849d7062d3436805e729a553e511 Mon Sep 17 00:00:00 2001 From: Gustavo de Morais Date: Mon, 30 Mar 2026 10:52:12 +0200 Subject: [PATCH 4/4] [FLINK-39349][table] Clarify whitespace and case handling in op_mapping validation JavaDoc --- .../java/org/apache/flink/table/api/PartitionedTable.java | 6 +++--- .../inference/strategies/ToChangelogTypeStrategy.java | 7 ++++--- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PartitionedTable.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PartitionedTable.java index b6cb219bc7432..7db750a296d35 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PartitionedTable.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PartitionedTable.java @@ -144,9 +144,9 @@ public interface PartitionedTable { * Converts this dynamic table into an append-only table with an explicit operation code column * using the built-in {@code TO_CHANGELOG} process table function. * - *

Each input row - regardless of its original change operation - is emitted as an INSERT-only row - * with a string {@code "op"} column indicating the original operation (INSERT, UPDATE_AFTER, - * DELETE, etc.). + *

Each input row - regardless of its original change operation - is emitted as an + * INSERT-only row with a string {@code "op"} column indicating the original operation (INSERT, + * UPDATE_AFTER, DELETE, etc.). * *

Optional arguments can be passed using named expressions: * diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToChangelogTypeStrategy.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToChangelogTypeStrategy.java index 81643e451db1b..d13d52a49c772 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToChangelogTypeStrategy.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToChangelogTypeStrategy.java @@ -152,9 +152,10 @@ private static Optional> validateInputs( } /** - * Validates op_mapping keys. Keys may be comma-separated RowKind names (e.g., "INSERT, - * UPDATE_AFTER") to map multiple RowKinds to the same output code. Each individual RowKind must - * be valid and appear at most once across all entries. + * Validates op_mapping keys. Keys may be comma-separated (e.g., {@code "INSERT, UPDATE_AFTER"}) + * to map multiple change operations to the same output code. Whitespace around names is + * trimmed. Names are case-sensitive and must match exactly (e.g., {@code INSERT}, not {@code + * insert}). Each name must be valid and appear at most once across all entries. */ @SuppressWarnings("rawtypes") private static Optional> validateOpMappingKeys(