diff --git a/docs/content/docs/sql/reference/queries/changelog.md b/docs/content/docs/sql/reference/queries/changelog.md index d7e6e29668ab7..15ad336f4d0d4 100644 --- a/docs/content/docs/sql/reference/queries/changelog.md +++ b/docs/content/docs/sql/reference/queries/changelog.md @@ -36,7 +36,7 @@ Flink SQL provides built-in process table functions (PTFs) for working with chan ## TO_CHANGELOG -The `TO_CHANGELOG` PTF converts a dynamic table (i.e. an updating table) into an append-only table with an explicit operation code column. Each input row - regardless of its original `RowKind` (INSERT, UPDATE_BEFORE, UPDATE_AFTER, DELETE) - is emitted as an INSERT-only row with a string column indicating the original operation. +The `TO_CHANGELOG` PTF converts a dynamic table (i.e. an updating table) into an append-only table with an explicit operation code column. Each input row - regardless of its original change operation (INSERT, UPDATE_BEFORE, UPDATE_AFTER, DELETE) - is emitted as an INSERT-only row with a string column indicating the original operation. This is useful when you need to materialize changelog events into a downstream system that only supports appends (e.g., a message queue, log store, or append-only file sink). It is also useful to filter out certain types of updates, for example DELETEs. @@ -56,13 +56,13 @@ SELECT * FROM TO_CHANGELOG( |:-------------|:---------|:------------| | `input` | Yes | The input table. Must include `PARTITION BY` for parallel execution. Accepts insert-only, retract, and upsert tables. | | `op` | No | A `DESCRIPTOR` with a single column name for the operation code column. Defaults to `op`. | -| `op_mapping` | No | A `MAP` mapping `RowKind` names to custom output codes. When provided, only mapped RowKinds are forwarded - unmapped events are dropped. | +| `op_mapping` | No | A `MAP` mapping change operation names to custom output codes. Keys can contain comma-separated names to map multiple operations to the same code (e.g., `'INSERT, UPDATE_AFTER'`). When provided, only mapped operations are forwarded - unmapped events are dropped. Each change operation may appear at most once across all entries. | #### Default op_mapping -When `op_mapping` is omitted, all four RowKinds are mapped to their standard names: +When `op_mapping` is omitted, all four change operations are mapped to their standard names: -| RowKind | Output value | +| Change Operation | Output value | |:----------------|:------------------| | INSERT | `'INSERT'` | | UPDATE_BEFORE | `'UPDATE_BEFORE'` | @@ -122,6 +122,19 @@ SELECT * FROM TO_CHANGELOG( -- op_code values are 'I' and 'U' instead of full names ``` +#### Deletion flag pattern + +```sql +SELECT * FROM TO_CHANGELOG( + input => TABLE my_aggregation PARTITION BY id, + op => DESCRIPTOR(deleted), + op_mapping => MAP['INSERT, UPDATE_AFTER', 'false', 'DELETE', 'true'] +) +-- INSERT and UPDATE_AFTER produce deleted='false' +-- DELETE produces deleted='true' +-- UPDATE_BEFORE is dropped (not in the mapping) +``` + #### Table API ```java @@ -133,6 +146,12 @@ Table result = myTable.partitionBy($("id")).toChangelog( descriptor("op_code").asArgument("op"), map("INSERT", "I", "UPDATE_AFTER", "U").asArgument("op_mapping") ); + +// Deletion flag pattern: comma-separated keys map multiple change operations to the same code +Table result = myTable.partitionBy($("id")).toChangelog( + descriptor("deleted").asArgument("op"), + map("INSERT, UPDATE_AFTER", "false", "DELETE", "true").asArgument("op_mapping") +); ``` {{< top >}} diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PartitionedTable.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PartitionedTable.java index 8d5f1c91b286a..7db750a296d35 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PartitionedTable.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PartitionedTable.java @@ -144,9 +144,9 @@ public interface PartitionedTable { * Converts this dynamic table into an append-only table with an explicit operation code column * using the built-in {@code TO_CHANGELOG} process table function. * - *

Each input row - regardless of its original RowKind - is emitted as an INSERT-only row - * with a string {@code "op"} column indicating the original operation (INSERT, UPDATE_AFTER, - * DELETE, etc.). + *

Each input row - regardless of its original change operation - is emitted as an + * INSERT-only row with a string {@code "op"} column indicating the original operation (INSERT, + * UPDATE_AFTER, DELETE, etc.). * *

Optional arguments can be passed using named expressions: * @@ -159,6 +159,12 @@ public interface PartitionedTable { * descriptor("op_code").asArgument("op"), * map("INSERT", "I", "UPDATE_AFTER", "U").asArgument("op_mapping") * ); + * + * // Deletion flag pattern: comma-separated keys map multiple change operations to the same code + * table.partitionBy($("id")).toChangelog( + * descriptor("deleted").asArgument("op"), + * map("INSERT, UPDATE_AFTER", "false", "DELETE", "true").asArgument("op_mapping") + * ); * } * * @param arguments optional named arguments for {@code op} and {@code op_mapping} diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToChangelogTypeStrategy.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToChangelogTypeStrategy.java index cb04e7202099c..d13d52a49c772 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToChangelogTypeStrategy.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToChangelogTypeStrategy.java @@ -35,6 +35,7 @@ import org.apache.flink.types.ColumnList; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; @@ -140,21 +141,56 @@ private static Optional> validateInputs( final Optional opMapping = callContext.getArgumentValue(2, Map.class); if (opMapping.isPresent()) { - final boolean hasInvalidMappingKey = - opMapping.get().keySet().stream() - .anyMatch( - key -> - !(key instanceof String) - || !VALID_ROW_KIND_NAMES.contains(key)); - if (hasInvalidMappingKey) { - return callContext.fail( - throwOnFailure, "Invalid target mapping for argument 'op_mapping'."); + final Optional> validationError = + validateOpMappingKeys(callContext, opMapping.get(), throwOnFailure); + if (validationError.isPresent()) { + return validationError; } } return Optional.of(callContext.getArgumentDataTypes()); } + /** + * Validates op_mapping keys. Keys may be comma-separated (e.g., {@code "INSERT, UPDATE_AFTER"}) + * to map multiple change operations to the same output code. Whitespace around names is + * trimmed. Names are case-sensitive and must match exactly (e.g., {@code INSERT}, not {@code + * insert}). Each name must be valid and appear at most once across all entries. + */ + @SuppressWarnings("rawtypes") + private static Optional> validateOpMappingKeys( + final CallContext callContext, final Map opMapping, final boolean throwOnFailure) { + final Set allRowKindsSeen = new HashSet<>(); + for (final Object key : opMapping.keySet()) { + if (!(key instanceof String)) { + return callContext.fail( + throwOnFailure, "Invalid target mapping for argument 'op_mapping'."); + } + final String[] rowKindNames = ((String) key).split(","); + for (final String rawName : rowKindNames) { + final String rowKindName = rawName.trim(); + if (!VALID_ROW_KIND_NAMES.contains(rowKindName)) { + return callContext.fail( + throwOnFailure, + String.format( + "Invalid target mapping for argument 'op_mapping'. " + + "Unknown change operation: '%s'. Valid values are: %s.", + rowKindName, VALID_ROW_KIND_NAMES)); + } + final boolean isDuplicate = !allRowKindsSeen.add(rowKindName); + if (isDuplicate) { + return callContext.fail( + throwOnFailure, + String.format( + "Invalid target mapping for argument 'op_mapping'. " + + "Duplicate change operation: '%s'.", + rowKindName)); + } + } + } + return Optional.empty(); + } + private static String resolveOpColumnName(final CallContext callContext) { return callContext .getArgumentValue(1, ColumnList.class) diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogSemanticTests.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogSemanticTests.java index 798044effa7d1..706f7abd1d3d1 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogSemanticTests.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogSemanticTests.java @@ -46,8 +46,10 @@ public List programs() { ToChangelogTestPrograms.TABLE_API_DEFAULT, ToChangelogTestPrograms.LAG_ON_UPSERT_VIA_CHANGELOG, ToChangelogTestPrograms.LAG_ON_RETRACT_VIA_CHANGELOG, + ToChangelogTestPrograms.DELETION_FLAG, ToChangelogTestPrograms.MISSING_PARTITION_BY, ToChangelogTestPrograms.INVALID_DESCRIPTOR, - ToChangelogTestPrograms.INVALID_OP_MAPPING); + ToChangelogTestPrograms.INVALID_OP_MAPPING, + ToChangelogTestPrograms.DUPLICATE_ROW_KIND); } } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java index 22602bcf38a70..bfc670971121d 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java @@ -96,7 +96,7 @@ public class ToChangelogTestPrograms { public static final TableTestProgram CUSTOM_OP_MAPPING = TableTestProgram.of( "to-changelog-custom-op-mapping", - "custom op_mapping maps RowKinds to user-defined codes and drops unmapped") + "custom op_mapping maps change operations to user-defined codes and drops unmapped") .setupTableSource( SourceTestStep.newBuilder("t") .addSchema( @@ -322,6 +322,46 @@ public class ToChangelogTestPrograms { + "FROM orders_changelog") .build(); + // -------------------------------------------------------------------------------------------- + // Use case: deletion flag pattern (comma-separated change operation keys) + // -------------------------------------------------------------------------------------------- + + /** + * Kafka Connect style deletion flag: INSERT and UPDATE_AFTER both produce deleted='false' and + * DELETE produces deleted='true'. UPDATE_BEFORE is silently dropped. + */ + public static final TableTestProgram DELETION_FLAG = + TableTestProgram.of( + "to-changelog-deletion-flag", + "comma-separated change operations produce deletion flag output") + .setupTableSource( + SourceTestStep.newBuilder("t") + .addSchema( + "name STRING PRIMARY KEY NOT ENFORCED", "score BIGINT") + .addMode(ChangelogMode.all()) + .producedValues( + Row.ofKind(RowKind.INSERT, "Alice", 10L), + Row.ofKind(RowKind.INSERT, "Bob", 20L), + Row.ofKind(RowKind.UPDATE_BEFORE, "Alice", 10L), + Row.ofKind(RowKind.UPDATE_AFTER, "Alice", 30L), + Row.ofKind(RowKind.DELETE, "Bob", 20L)) + .build()) + .setupTableSink( + SinkTestStep.newBuilder("sink") + .addSchema("name STRING", "deleted STRING", "score BIGINT") + .consumedValues( + "+I[Alice, false, 10]", + "+I[Bob, false, 20]", + "+I[Alice, false, 30]", + "+I[Bob, true, 20]") + .build()) + .runSql( + "INSERT INTO sink SELECT * FROM TO_CHANGELOG(" + + "input => TABLE t PARTITION BY name, " + + "op => DESCRIPTOR(deleted), " + + "op_mapping => MAP['INSERT, UPDATE_AFTER', 'false', 'DELETE', 'true'])") + .build(); + // -------------------------------------------------------------------------------------------- // Error validation tests // -------------------------------------------------------------------------------------------- @@ -353,13 +393,26 @@ public class ToChangelogTestPrograms { public static final TableTestProgram INVALID_OP_MAPPING = TableTestProgram.of( "to-changelog-invalid-op-mapping", - "fails when op_mapping has invalid RowKind name") + "fails when op_mapping has invalid change operation name") .setupTableSource(SIMPLE_SOURCE) .runFailingSql( "SELECT * FROM TO_CHANGELOG(" + "input => TABLE t PARTITION BY id, " + "op_mapping => MAP['INVALID_KIND', 'X'])", ValidationException.class, - "Invalid target mapping for argument 'op_mapping'.") + "Unknown change operation: 'INVALID_KIND'") + .build(); + + public static final TableTestProgram DUPLICATE_ROW_KIND = + TableTestProgram.of( + "to-changelog-duplicate-rowkind", + "fails when a change operation appears in multiple op_mapping entries") + .setupTableSource(SIMPLE_SOURCE) + .runFailingSql( + "SELECT * FROM TO_CHANGELOG(" + + "input => TABLE t PARTITION BY id, " + + "op_mapping => MAP['INSERT, DELETE', 'A', 'DELETE', 'B'])", + ValidationException.class, + "Duplicate change operation: 'DELETE'") .build(); } diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/ToChangelogFunction.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/ToChangelogFunction.java index 75211e164f533..f16ea33375f99 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/ToChangelogFunction.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/ToChangelogFunction.java @@ -109,13 +109,22 @@ private static int[] buildNonPartitionIndices( return IntStream.range(0, fieldCount).filter(i -> !partitionKeySet.contains(i)).toArray(); } + /** + * Builds a RowKind-to-output-code map. Keys may be comma-separated (e.g., "INSERT, + * UPDATE_AFTER") to map multiple RowKinds to the same output code. + */ private static Map buildOpMap(@Nullable final Map opMapping) { if (opMapping == null) { return new EnumMap<>(DEFAULT_OP_MAPPING); } - final Map map = new EnumMap<>(RowKind.class); - opMapping.forEach((name, code) -> map.put(RowKind.valueOf(name), code)); - return map; + final Map result = new EnumMap<>(RowKind.class); + opMapping.forEach( + (commaSeparatedRowKinds, outputCode) -> { + for (final String rawName : commaSeparatedRowKinds.split(",")) { + result.put(RowKind.valueOf(rawName.trim()), outputCode); + } + }); + return result; } public void eval(