Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 16 additions & 16 deletions docs/content/docs/sql/reference/queries/changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ This is useful when you need to materialize changelog events into a downstream s

```sql
SELECT * FROM TO_CHANGELOG(
input => TABLE source_table PARTITION BY key_col,
input => TABLE source_table,
[op => DESCRIPTOR(op_column_name),]
[op_mapping => MAP['INSERT', 'I', 'DELETE', 'D', ...]]
)
Expand All @@ -54,7 +54,7 @@ SELECT * FROM TO_CHANGELOG(

| Parameter | Required | Description |
|:-------------|:---------|:------------|
| `input` | Yes | The input table. Must include `PARTITION BY` for parallel execution. Accepts insert-only, retract, and upsert tables. |
| `input` | Yes | The input table. Accepts insert-only, retract, and upsert tables. |
| `op` | No | A `DESCRIPTOR` with a single column name for the operation code column. Defaults to `op`. |
| `op_mapping` | No | A `MAP<STRING, STRING>` mapping change operation names to custom output codes. Keys can contain comma-separated names to map multiple operations to the same code (e.g., `'INSERT, UPDATE_AFTER'`). When provided, only mapped operations are forwarded - unmapped events are dropped. Each change operation may appear at most once across all entries. |

Expand All @@ -74,7 +74,7 @@ When `op_mapping` is omitted, all four change operations are mapped to their sta
The output columns are ordered as:

```
[partition_key_columns, op_column, remaining_columns]
[op_column, all_input_columns]
```

All output rows have `INSERT` - the table is always append-only.
Expand All @@ -85,25 +85,25 @@ All output rows have `INSERT` - the table is always append-only.

```sql
-- Input: retract table from an aggregation
-- +I[id:1, name:'Alice', cnt:1]
-- +U[id:1, name:'Alice', cnt:2]
-- -D[id:2, name:'Bob', cnt:1]
-- +I[name:'Alice', cnt:1]
-- +U[name:'Alice', cnt:2]
-- -D[name:'Bob', cnt:1]

SELECT * FROM TO_CHANGELOG(
input => TABLE my_aggregation PARTITION BY id
input => TABLE my_aggregation
)

-- Output (append-only):
-- +I[id:1, op:'INSERT', name:'Alice', cnt:1]
-- +I[id:1, op:'UPDATE_AFTER', name:'Alice', cnt:2]
-- +I[id:2, op:'DELETE', name:'Bob', cnt:1]
-- +I[op:'INSERT', name:'Alice', cnt:1]
-- +I[op:'UPDATE_AFTER', name:'Alice', cnt:2]
-- +I[op:'DELETE', name:'Bob', cnt:1]
```

#### Custom operation column name

```sql
SELECT * FROM TO_CHANGELOG(
input => TABLE my_aggregation PARTITION BY id,
input => TABLE my_aggregation,
op => DESCRIPTOR(operation)
)
-- The op column is now named 'operation' instead of 'op'
Expand All @@ -113,7 +113,7 @@ SELECT * FROM TO_CHANGELOG(

```sql
SELECT * FROM TO_CHANGELOG(
input => TABLE my_aggregation PARTITION BY id,
input => TABLE my_aggregation,
op => DESCRIPTOR(op_code),
op_mapping => MAP['INSERT', 'I', 'UPDATE_AFTER', 'U']
)
Expand All @@ -126,7 +126,7 @@ SELECT * FROM TO_CHANGELOG(

```sql
SELECT * FROM TO_CHANGELOG(
input => TABLE my_aggregation PARTITION BY id,
input => TABLE my_aggregation,
op => DESCRIPTOR(deleted),
op_mapping => MAP['INSERT, UPDATE_AFTER', 'false', 'DELETE', 'true']
)
Expand All @@ -139,16 +139,16 @@ SELECT * FROM TO_CHANGELOG(

```java
// Default: adds 'op' column and supports all changelog modes
Table result = myTable.partitionBy($("id")).toChangelog();
Table result = myTable.toChangelog();

// With custom parameters
Table result = myTable.partitionBy($("id")).toChangelog(
Table result = myTable.toChangelog(
descriptor("op_code").asArgument("op"),
map("INSERT", "I", "UPDATE_AFTER", "U").asArgument("op_mapping")
);

// Deletion flag pattern: comma-separated keys map multiple change operations to the same code
Table result = myTable.partitionBy($("id")).toChangelog(
Table result = myTable.toChangelog(
descriptor("deleted").asArgument("op"),
map("INSERT, UPDATE_AFTER", "false", "DELETE", "true").asArgument("op_mapping")
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.table.annotation.ArgumentTrait;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.functions.ProcessTableFunction;
import org.apache.flink.table.functions.UserDefinedFunction;

Expand Down Expand Up @@ -139,37 +138,4 @@ public interface PartitionedTable {
* @see ProcessTableFunction
*/
Table process(Class<? extends UserDefinedFunction> function, Object... arguments);

/**
* Converts this dynamic table into an append-only table with an explicit operation code column
* using the built-in {@code TO_CHANGELOG} process table function.
*
* <p>Each input row - regardless of its original change operation - is emitted as an
* INSERT-only row with a string {@code "op"} column indicating the original operation (INSERT,
* UPDATE_AFTER, DELETE, etc.).
*
* <p>Optional arguments can be passed using named expressions:
*
* <pre>{@code
* // Default: adds 'op' column and supports all changelog modes
* table.partitionBy($("id")).toChangelog();
*
* // Custom op column name and mapping
* table.partitionBy($("id")).toChangelog(
* descriptor("op_code").asArgument("op"),
* map("INSERT", "I", "UPDATE_AFTER", "U").asArgument("op_mapping")
* );
*
* // Deletion flag pattern: comma-separated keys map multiple change operations to the same code
* table.partitionBy($("id")).toChangelog(
* descriptor("deleted").asArgument("op"),
* map("INSERT, UPDATE_AFTER", "false", "DELETE", "true").asArgument("op_mapping")
* );
* }</pre>
*
* @param arguments optional named arguments for {@code op} and {@code op_mapping}
* @return an append-only {@link Table} with an {@code op} column prepended to the non-partition
* columns
*/
Table toChangelog(Expression... arguments);
}
Original file line number Diff line number Diff line change
Expand Up @@ -1422,4 +1422,36 @@ default TableResult executeInsert(
* @see ProcessTableFunction
*/
Table process(Class<? extends UserDefinedFunction> function, Object... arguments);

/**
* Converts this dynamic table into an append-only table with an explicit operation code column
* using the built-in {@code TO_CHANGELOG} process table function.
*
* <p>Each input row - regardless of its original change operation - is emitted as an
* INSERT-only row with a string {@code "op"} column indicating the original operation (INSERT,
* UPDATE_AFTER, DELETE, etc.).
*
* <p>Optional arguments can be passed using named expressions:
*
* <pre>{@code
* // Default: adds 'op' column and supports all changelog modes
* table.toChangelog();
*
* // Custom op column name and mapping
* table.toChangelog(
* descriptor("op_code").asArgument("op"),
* map("INSERT", "I", "UPDATE_AFTER", "U").asArgument("op_mapping")
* );
*
* // Deletion flag pattern: comma-separated keys map multiple change operations to the same code
* table.toChangelog(
* descriptor("deleted").asArgument("op"),
* map("INSERT, UPDATE_AFTER", "false", "DELETE", "true").asArgument("op_mapping")
* );
* }</pre>
*
* @param arguments optional named arguments for {@code op} and {@code op_mapping}
* @return an append-only {@link Table} with an {@code op} column prepended to the input columns
*/
Table toChangelog(Expression... arguments);
}
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,11 @@ public Table process(Class<? extends UserDefinedFunction> function, Object... ar
function, unionTableAndArguments(operationTree, tableEnvironment, arguments));
}

@Override
public Table toChangelog(Expression... arguments) {
return process(BuiltInFunctionDefinitions.TO_CHANGELOG.getName(), (Object[]) arguments);
}

private TablePipeline insertInto(
ContextResolvedTable contextResolvedTable,
@Nullable InsertConflictStrategy conflictStrategy,
Expand Down Expand Up @@ -901,11 +906,6 @@ public Table process(Class<? extends UserDefinedFunction> function, Object... ar
createPartitionQueryOperation(), table.tableEnvironment, arguments));
}

@Override
public Table toChangelog(Expression... arguments) {
return process(BuiltInFunctionDefinitions.TO_CHANGELOG.getName(), (Object[]) arguments);
}

private QueryOperation createPartitionQueryOperation() {
return table.operationTreeBuilder.partition(partitionKeys, table.operationTree);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -782,15 +782,22 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
.name("TO_CHANGELOG")
.kind(PROCESS_TABLE)
.staticArguments(
// Row semantics (no PARTITION BY). Accepts updating
// inputs. The planner inserts ChangelogNormalize for
// upsert sources to produce UPDATE_BEFORE and full
// DELETE rows.
StaticArgument.table(
"input",
Row.class,
false,
EnumSet.of(
StaticArgumentTrait.TABLE,
StaticArgumentTrait.SET_SEMANTIC_TABLE,
StaticArgumentTrait.ROW_SEMANTIC_TABLE,
StaticArgumentTrait.SUPPORT_UPDATES,
StaticArgumentTrait.REQUIRE_UPDATE_BEFORE)),
StaticArgumentTrait.REQUIRE_UPDATE_BEFORE,
// Not strictly necessary but explicitly state that
// we require full deletes.
StaticArgumentTrait.REQUIRE_FULL_DELETE)),
StaticArgument.scalar("op", DataTypes.DESCRIPTOR(), true),
StaticArgument.scalar(
"op_mapping",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,6 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

/** Type strategies for the {@code TO_CHANGELOG} process table function. */
@Internal
Expand Down Expand Up @@ -98,15 +96,10 @@ public List<Signature> getExpectedSignatures(final FunctionDefinition definition

final String opColumnName = resolveOpColumnName(callContext);
final List<Field> inputFields = DataType.getFields(semantics.dataType());
final Set<Integer> partitionKeys = intArrayToSet(semantics.partitionByColumns());

final List<Field> outputFields = new ArrayList<>();
outputFields.add(DataTypes.FIELD(opColumnName, DataTypes.STRING()));
for (int i = 0; i < inputFields.size(); i++) {
if (!partitionKeys.contains(i)) {
outputFields.add(inputFields.get(i));
}
}
outputFields.addAll(inputFields);

return Optional.of(DataTypes.ROW(outputFields).notNull());
};
Expand Down Expand Up @@ -199,9 +192,5 @@ private static String resolveOpColumnName(final CallContext callContext) {
.orElse(DEFAULT_OP_COLUMN_NAME);
}

private static Set<Integer> intArrayToSet(final int[] array) {
return IntStream.of(array).boxed().collect(Collectors.toSet());
}

private ToChangelogTypeStrategy() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -368,9 +368,11 @@ private static ColumnList convertColumnList(List<SqlNode> operands) {
final Map<String, String> map = new LinkedHashMap<>();
try {
for (int i = 0; i < operands.size(); i += 2) {
final String key = SqlLiteral.unchain(operands.get(i)).getValueAs(String.class);
final String key =
SqlLiteral.unchain(unwrapCast(operands.get(i))).getValueAs(String.class);
final String value =
SqlLiteral.unchain(operands.get(i + 1)).getValueAs(String.class);
SqlLiteral.unchain(unwrapCast(operands.get(i + 1)))
.getValueAs(String.class);
map.put(key, value);
}
} catch (Exception e) {
Expand All @@ -380,6 +382,18 @@ private static ColumnList convertColumnList(List<SqlNode> operands) {
return map;
}

/** Unwraps implicit CHAR-type CASTs added by Calcite for length normalization. */
private static SqlNode unwrapCast(final SqlNode node) {
if (node.getKind() == SqlKind.CAST && node instanceof SqlCall) {
final SqlNode inner = ((SqlCall) node).operand(0);
if (inner instanceof SqlLiteral
&& SqlTypeName.CHAR_TYPES.contains(((SqlLiteral) inner).getTypeName())) {
return inner;
}
}
return node;
}

/** A MAP constructor is a string literal if all its key-value children are string literals. */
private static boolean isLiteralMap(SqlNode sqlNode) {
if (sqlNode.getKind() != SqlKind.MAP_VALUE_CONSTRUCTOR) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,13 @@ public List<TableTestProgram> programs() {
return List.of(
ToChangelogTestPrograms.INSERT_ONLY_INPUT,
ToChangelogTestPrograms.UPDATING_INPUT,
ToChangelogTestPrograms.UPSERT_INPUT,
ToChangelogTestPrograms.CUSTOM_OP_MAPPING,
ToChangelogTestPrograms.CUSTOM_OP_NAME,
ToChangelogTestPrograms.TABLE_API_DEFAULT,
ToChangelogTestPrograms.LAG_ON_UPSERT_VIA_CHANGELOG,
ToChangelogTestPrograms.LAG_ON_RETRACT_VIA_CHANGELOG,
ToChangelogTestPrograms.DELETION_FLAG,
ToChangelogTestPrograms.MISSING_PARTITION_BY,
ToChangelogTestPrograms.INVALID_DESCRIPTOR,
ToChangelogTestPrograms.INVALID_OP_MAPPING,
ToChangelogTestPrograms.DUPLICATE_ROW_KIND);
Expand Down
Loading