Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 23 additions & 4 deletions docs/content/docs/sql/reference/queries/changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ Flink SQL provides built-in process table functions (PTFs) for working with chan

## TO_CHANGELOG

The `TO_CHANGELOG` PTF converts a dynamic table (i.e. an updating table) into an append-only table with an explicit operation code column. Each input row - regardless of its original `RowKind` (INSERT, UPDATE_BEFORE, UPDATE_AFTER, DELETE) - is emitted as an INSERT-only row with a string column indicating the original operation.
The `TO_CHANGELOG` PTF converts a dynamic table (i.e. an updating table) into an append-only table with an explicit operation code column. Each input row - regardless of its original change operation (INSERT, UPDATE_BEFORE, UPDATE_AFTER, DELETE) - is emitted as an INSERT-only row with a string column indicating the original operation.

This is useful when you need to materialize changelog events into a downstream system that only supports appends (e.g., a message queue, log store, or append-only file sink). It is also useful to filter out certain types of updates, for example DELETEs.

Expand All @@ -56,13 +56,13 @@ SELECT * FROM TO_CHANGELOG(
|:-------------|:---------|:------------|
| `input` | Yes | The input table. Must include `PARTITION BY` for parallel execution. Accepts insert-only, retract, and upsert tables. |
| `op` | No | A `DESCRIPTOR` with a single column name for the operation code column. Defaults to `op`. |
| `op_mapping` | No | A `MAP<STRING, STRING>` mapping `RowKind` names to custom output codes. When provided, only mapped RowKinds are forwarded - unmapped events are dropped. |
| `op_mapping` | No | A `MAP<STRING, STRING>` mapping change operation names to custom output codes. Keys can contain comma-separated names to map multiple operations to the same code (e.g., `'INSERT, UPDATE_AFTER'`). When provided, only mapped operations are forwarded - unmapped events are dropped. Each change operation may appear at most once across all entries. |

#### Default op_mapping

When `op_mapping` is omitted, all four RowKinds are mapped to their standard names:
When `op_mapping` is omitted, all four change operations are mapped to their standard names:

| RowKind | Output value |
| Change Operation | Output value |
|:----------------|:------------------|
| INSERT | `'INSERT'` |
| UPDATE_BEFORE | `'UPDATE_BEFORE'` |
Expand Down Expand Up @@ -122,6 +122,19 @@ SELECT * FROM TO_CHANGELOG(
-- op_code values are 'I' and 'U' instead of full names
```

#### Deletion flag pattern

```sql
SELECT * FROM TO_CHANGELOG(
input => TABLE my_aggregation PARTITION BY id,
op => DESCRIPTOR(deleted),
op_mapping => MAP['INSERT, UPDATE_AFTER', 'false', 'DELETE', 'true']
)
-- INSERT and UPDATE_AFTER produce deleted='false'
-- DELETE produces deleted='true'
-- UPDATE_BEFORE is dropped (not in the mapping)
```

#### Table API

```java
Expand All @@ -133,6 +146,12 @@ Table result = myTable.partitionBy($("id")).toChangelog(
descriptor("op_code").asArgument("op"),
map("INSERT", "I", "UPDATE_AFTER", "U").asArgument("op_mapping")
);

// Deletion flag pattern: comma-separated keys map multiple change operations to the same code
Table result = myTable.partitionBy($("id")).toChangelog(
descriptor("deleted").asArgument("op"),
map("INSERT, UPDATE_AFTER", "false", "DELETE", "true").asArgument("op_mapping")
);
```

{{< top >}}
Original file line number Diff line number Diff line change
Expand Up @@ -144,9 +144,9 @@ public interface PartitionedTable {
* Converts this dynamic table into an append-only table with an explicit operation code column
* using the built-in {@code TO_CHANGELOG} process table function.
*
* <p>Each input row - regardless of its original RowKind - is emitted as an INSERT-only row
* with a string {@code "op"} column indicating the original operation (INSERT, UPDATE_AFTER,
* DELETE, etc.).
* <p>Each input row - regardless of its original change operation - is emitted as an
* INSERT-only row with a string {@code "op"} column indicating the original operation (INSERT,
* UPDATE_AFTER, DELETE, etc.).
*
* <p>Optional arguments can be passed using named expressions:
*
Expand All @@ -159,6 +159,12 @@ public interface PartitionedTable {
* descriptor("op_code").asArgument("op"),
* map("INSERT", "I", "UPDATE_AFTER", "U").asArgument("op_mapping")
* );
*
* // Deletion flag pattern: comma-separated keys map multiple change operations to the same code
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

uggest enhancing Javadoc to clarify how multiple RowKinds mapping to the same output may affect UPDATE_BEFORE behavior.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review, Feat Zhang. The existing JavaDoc and documentation already explain that unmapped operations are dropped. Adding UPDATE_BEFORE-specific details to the generic toChangelog() JavaDoc would be too narrow, since it depends on the use case

* table.partitionBy($("id")).toChangelog(
* descriptor("deleted").asArgument("op"),
* map("INSERT, UPDATE_AFTER", "false", "DELETE", "true").asArgument("op_mapping")
* );
* }</pre>
*
* @param arguments optional named arguments for {@code op} and {@code op_mapping}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.flink.types.ColumnList;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -140,21 +141,56 @@ private static Optional<List<DataType>> validateInputs(

final Optional<Map> opMapping = callContext.getArgumentValue(2, Map.class);
if (opMapping.isPresent()) {
final boolean hasInvalidMappingKey =
opMapping.get().keySet().stream()
.anyMatch(
key ->
!(key instanceof String)
|| !VALID_ROW_KIND_NAMES.contains(key));
if (hasInvalidMappingKey) {
return callContext.fail(
throwOnFailure, "Invalid target mapping for argument 'op_mapping'.");
final Optional<List<DataType>> validationError =
validateOpMappingKeys(callContext, opMapping.get(), throwOnFailure);
if (validationError.isPresent()) {
return validationError;
}
}

return Optional.of(callContext.getArgumentDataTypes());
}

/**
* Validates op_mapping keys. Keys may be comma-separated (e.g., {@code "INSERT, UPDATE_AFTER"})
* to map multiple change operations to the same output code. Whitespace around names is
* trimmed. Names are case-sensitive and must match exactly (e.g., {@code INSERT}, not {@code
* insert}). Each name must be valid and appear at most once across all entries.
*/
@SuppressWarnings("rawtypes")
private static Optional<List<DataType>> validateOpMappingKeys(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Recommend adding comments about whitespace and case handling for better user understanding.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've updated the comment to be more specific about this e8d4956

final CallContext callContext, final Map opMapping, final boolean throwOnFailure) {
final Set<String> allRowKindsSeen = new HashSet<>();
for (final Object key : opMapping.keySet()) {
if (!(key instanceof String)) {
return callContext.fail(
throwOnFailure, "Invalid target mapping for argument 'op_mapping'.");
}
final String[] rowKindNames = ((String) key).split(",");
for (final String rawName : rowKindNames) {
final String rowKindName = rawName.trim();
if (!VALID_ROW_KIND_NAMES.contains(rowKindName)) {
return callContext.fail(
throwOnFailure,
String.format(
"Invalid target mapping for argument 'op_mapping'. "
+ "Unknown change operation: '%s'. Valid values are: %s.",
rowKindName, VALID_ROW_KIND_NAMES));
}
final boolean isDuplicate = !allRowKindsSeen.add(rowKindName);
if (isDuplicate) {
return callContext.fail(
throwOnFailure,
String.format(
"Invalid target mapping for argument 'op_mapping'. "
+ "Duplicate change operation: '%s'.",
rowKindName));
}
}
}
return Optional.empty();
}

private static String resolveOpColumnName(final CallContext callContext) {
return callContext
.getArgumentValue(1, ColumnList.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,10 @@ public List<TableTestProgram> programs() {
ToChangelogTestPrograms.TABLE_API_DEFAULT,
ToChangelogTestPrograms.LAG_ON_UPSERT_VIA_CHANGELOG,
ToChangelogTestPrograms.LAG_ON_RETRACT_VIA_CHANGELOG,
ToChangelogTestPrograms.DELETION_FLAG,
ToChangelogTestPrograms.MISSING_PARTITION_BY,
ToChangelogTestPrograms.INVALID_DESCRIPTOR,
ToChangelogTestPrograms.INVALID_OP_MAPPING);
ToChangelogTestPrograms.INVALID_OP_MAPPING,
ToChangelogTestPrograms.DUPLICATE_ROW_KIND);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public class ToChangelogTestPrograms {
public static final TableTestProgram CUSTOM_OP_MAPPING =
TableTestProgram.of(
"to-changelog-custom-op-mapping",
"custom op_mapping maps RowKinds to user-defined codes and drops unmapped")
"custom op_mapping maps change operations to user-defined codes and drops unmapped")
.setupTableSource(
SourceTestStep.newBuilder("t")
.addSchema(
Expand Down Expand Up @@ -322,6 +322,46 @@ public class ToChangelogTestPrograms {
+ "FROM orders_changelog")
.build();

// --------------------------------------------------------------------------------------------
// Use case: deletion flag pattern (comma-separated change operation keys)
// --------------------------------------------------------------------------------------------

/**
* Kafka Connect style deletion flag: INSERT and UPDATE_AFTER both produce deleted='false' and
* DELETE produces deleted='true'. UPDATE_BEFORE is silently dropped.
*/
public static final TableTestProgram DELETION_FLAG =
TableTestProgram.of(
"to-changelog-deletion-flag",
"comma-separated change operations produce deletion flag output")
.setupTableSource(
SourceTestStep.newBuilder("t")
.addSchema(
"name STRING PRIMARY KEY NOT ENFORCED", "score BIGINT")
.addMode(ChangelogMode.all())
.producedValues(
Row.ofKind(RowKind.INSERT, "Alice", 10L),
Row.ofKind(RowKind.INSERT, "Bob", 20L),
Row.ofKind(RowKind.UPDATE_BEFORE, "Alice", 10L),
Row.ofKind(RowKind.UPDATE_AFTER, "Alice", 30L),
Row.ofKind(RowKind.DELETE, "Bob", 20L))
.build())
.setupTableSink(
SinkTestStep.newBuilder("sink")
.addSchema("name STRING", "deleted STRING", "score BIGINT")
.consumedValues(
"+I[Alice, false, 10]",
"+I[Bob, false, 20]",
"+I[Alice, false, 30]",
"+I[Bob, true, 20]")
.build())
.runSql(
"INSERT INTO sink SELECT * FROM TO_CHANGELOG("
+ "input => TABLE t PARTITION BY name, "
+ "op => DESCRIPTOR(deleted), "
+ "op_mapping => MAP['INSERT, UPDATE_AFTER', 'false', 'DELETE', 'true'])")
.build();

// --------------------------------------------------------------------------------------------
// Error validation tests
// --------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -353,13 +393,26 @@ public class ToChangelogTestPrograms {
public static final TableTestProgram INVALID_OP_MAPPING =
TableTestProgram.of(
"to-changelog-invalid-op-mapping",
"fails when op_mapping has invalid RowKind name")
"fails when op_mapping has invalid change operation name")
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Recommend adding notes on why UPDATE_BEFORE is dropped and practical use cases of deletion flag pattern.

.setupTableSource(SIMPLE_SOURCE)
.runFailingSql(
"SELECT * FROM TO_CHANGELOG("
+ "input => TABLE t PARTITION BY id, "
+ "op_mapping => MAP['INVALID_KIND', 'X'])",
ValidationException.class,
"Invalid target mapping for argument 'op_mapping'.")
"Unknown change operation: 'INVALID_KIND'")
.build();

public static final TableTestProgram DUPLICATE_ROW_KIND =
TableTestProgram.of(
"to-changelog-duplicate-rowkind",
"fails when a change operation appears in multiple op_mapping entries")
.setupTableSource(SIMPLE_SOURCE)
.runFailingSql(
"SELECT * FROM TO_CHANGELOG("
+ "input => TABLE t PARTITION BY id, "
+ "op_mapping => MAP['INSERT, DELETE', 'A', 'DELETE', 'B'])",
ValidationException.class,
"Duplicate change operation: 'DELETE'")
.build();
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,13 +109,22 @@ private static int[] buildNonPartitionIndices(
return IntStream.range(0, fieldCount).filter(i -> !partitionKeySet.contains(i)).toArray();
}

/**
* Builds a RowKind-to-output-code map. Keys may be comma-separated (e.g., "INSERT,
* UPDATE_AFTER") to map multiple RowKinds to the same output code.
*/
private static Map<RowKind, String> buildOpMap(@Nullable final Map<String, String> opMapping) {
if (opMapping == null) {
return new EnumMap<>(DEFAULT_OP_MAPPING);
}
final Map<RowKind, String> map = new EnumMap<>(RowKind.class);
opMapping.forEach((name, code) -> map.put(RowKind.valueOf(name), code));
return map;
final Map<RowKind, String> result = new EnumMap<>(RowKind.class);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Recommend adding logging for conflicts or invalid mappings to help users debug configuration issues.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey, for TO_CHANGELOG invalid mappings are already caught at planning time by ToChangelogTypeStrategy, the user will see a validation error

opMapping.forEach(
(commaSeparatedRowKinds, outputCode) -> {
for (final String rawName : commaSeparatedRowKinds.split(",")) {
result.put(RowKind.valueOf(rawName.trim()), outputCode);
}
});
return result;
}

public void eval(
Expand Down