Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 99 additions & 6 deletions docs/content/docs/sql/reference/queries/changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -296,17 +296,19 @@ This is useful when you need to materialize changelog events into a downstream s
SELECT * FROM TO_CHANGELOG(
input => TABLE source_table [PARTITION BY key_col],
[op => DESCRIPTOR(op_column_name),]
[op_mapping => MAP['INSERT', 'I', 'DELETE', 'D', ...]]
[op_mapping => MAP['INSERT', 'I', 'DELETE', 'D', ...],]
[produces_full_deletes => BOOLEAN]
)
```

### Parameters

| Parameter | Required | Description |
|:-------------|:---------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `input` | Yes | The input table. With `PARTITION BY`, rows with the same key are co-located and run in the same operator instance. Without `PARTITION BY`, each row is processed independently. Accepts insert-only, retract, and upsert tables. For upsert tables, the provided `PARTITION BY` key should match or be a subset of the upsert key of the subquery. |
| `op` | No | A `DESCRIPTOR` with a single column name for the operation code column. Defaults to `op`. |
| `op_mapping` | No | A `MAP<STRING, STRING>` mapping change operation names to custom output codes. Keys can contain comma-separated names to map multiple operations to the same code (e.g., `'INSERT, UPDATE_AFTER'`). When provided, only mapped operations are forwarded - unmapped events are dropped. Each change operation may appear at most once across all entries. |
| Parameter | Required | Description |
|:------------------------|:---------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `input` | Yes | The input table. With `PARTITION BY`, rows with the same key are co-located and run in the same operator instance. Without `PARTITION BY`, each row is processed independently. Accepts insert-only, retract, and upsert tables. For upsert tables, the provided `PARTITION BY` key should match or be a subset of the upsert key of the subquery. |
| `op` | No | A `DESCRIPTOR` with a single column name for the operation code column. Defaults to `op`. |
| `op_mapping` | No | A `MAP<STRING, STRING>` mapping change operation names to custom output codes. Keys can contain comma-separated names to map multiple operations to the same code (e.g., `'INSERT, UPDATE_AFTER'`). When provided, only mapped operations are forwarded - unmapped events are dropped. Each change operation may appear at most once across all entries. |
| `produces_full_deletes` | No | A `BOOLEAN` literal that controls how DELETE rows are emitted. When `true` (default), DELETE rows carry all columns, the full image. When `false`, only the identifying key columns are preserved and the rest are nulled. See [Full vs partial deletes](#full-vs-partial-deletes) for more details. |

#### Default op_mapping

Expand Down Expand Up @@ -397,6 +399,90 @@ SELECT * FROM TO_CHANGELOG(
-- UPDATE_BEFORE is dropped (not in the mapping)
```

#### Upsert key

An **upsert key** is a column or set of columns that uniquely identifies a row across its lifecycle in a changelog. It is what downstream operators and sinks use to decide which earlier row a new INSERT, UPDATE_AFTER, or DELETE refers to.

The planner derives the upsert key from the input table:

* A declared `PRIMARY KEY` on the source table when reading directly.
* The grouping columns of an upstream `GROUP BY <key>`.
* The keys propagated by operators that preserve them (e.g. lookup joins, calc-projections that keep the key columns).

When no upsert key can be derived (e.g. a plain append-only source with no key constraint and no grouping upstream), the input has no row identity and downstream operators must treat it as append-only or fall back to retract semantics.

`TO_CHANGELOG` consumes the upsert key to decide which columns to preserve when emitting partial DELETE rows. See [Full vs partial deletes](#full-vs-partial-deletes) below.

#### Full vs partial deletes

The `produces_full_deletes` argument controls how DELETE rows are emitted and what the planner requires from the input. The matrix below shows each combination with `PARTITION BY` (set semantics) and without (row semantics). When `false`, the function relies on the input table's [upsert key](#upsert-key) to decide which columns to preserve.

##### `produces_full_deletes => true` (default)

The planner requires fully-populated DELETE rows on the input. For upsert sources that emit key-only deletes, a `ChangelogNormalize` operator is inserted upstream to materialize the full pre-image from state. For sources that already emit a full pre-image (e.g. retract), the flag is a no-op. The function then passes the input row through unchanged on DELETE.

**Row semantics** (no `PARTITION BY`):

```sql
-- Upsert source: -D[id:5] (key-only).
-- ChangelogNormalize materializes the full pre-image from state.
-- Output: +I[op:'DELETE', id:5, name:'Alice']
SELECT * FROM TO_CHANGELOG(input => TABLE upsert_source)

-- Retract source: -D[id:5, name:'Alice'] (full pre-image).
-- No ChangelogNormalize inserted; the input row is passed through unchanged.
-- Output: +I[op:'DELETE', id:5, name:'Alice']
SELECT * FROM TO_CHANGELOG(input => TABLE retract_source)
```

**Set semantics** (`PARTITION BY`):

```sql
-- Upsert source: -D[id:5] (key-only).
-- ChangelogNormalize materializes the full pre-image from state.
-- Output: +I[id:5, op:'DELETE', name:'Alice']
SELECT * FROM TO_CHANGELOG(input => TABLE upsert_source PARTITION BY id)

-- Retract source: -D[id:5, name:'Alice'] (full pre-image).
-- No ChangelogNormalize inserted; the input row is passed through unchanged.
-- Output: +I[id:5, op:'DELETE', name:'Alice']
SELECT * FROM TO_CHANGELOG(input => TABLE retract_source PARTITION BY id)
```

##### `produces_full_deletes => false`

The planner skips `ChangelogNormalize` and the function emits partial DELETE rows. This avoids the stateful normalization operator for upsert sources (e.g. Kafka compacted topics) where the full pre-image is not needed downstream. Requires an [upsert key](#upsert-key) to be present for the input table (row semantics) or `PARTITION BY` (set semantics); otherwise the call is rejected with a validation error.

**Row semantics** (no `PARTITION BY`): the function preserves the planner-derived upsert key columns on DELETE rows and nulls the rest. The upsert key is typically a declared `PRIMARY KEY` when directly reading from a source or the key provided in a `GROUP BY <key>`.

```sql
-- Upsert source with PRIMARY KEY (id): -D[id:5] (key-only).
-- Output: +I[op:'DELETE', id:5, name:null]
SELECT * FROM TO_CHANGELOG(input => TABLE upsert_source, produces_full_deletes => false)

-- Retract source with PRIMARY KEY (id): -D[id:5, name:'Alice'] (full pre-image).
-- Output: +I[op:'DELETE', id:5, name:null]
SELECT * FROM TO_CHANGELOG(input => TABLE retract_source, produces_full_deletes => false)
```

**Set semantics** (`PARTITION BY`): the function preserves the partition key and nulls every non-partition-key column on DELETE rows. The key used as the partition-key column should be the unique key that will be used as the record identifier. This matches the shape expected by upsert sinks and Kafka compacted topics.

```sql
-- Upsert source: -D[id:5] (key-only).
-- Output: +I[id:5, op:'DELETE', name:null]
SELECT * FROM TO_CHANGELOG(
input => TABLE upsert_source PARTITION BY id,
produces_full_deletes => false
)

-- Retract source: -D[id:5, name:'Alice'] (full pre-image).
-- Output: +I[id:5, op:'DELETE', name:null]
SELECT * FROM TO_CHANGELOG(
input => TABLE retract_source PARTITION BY id,
produces_full_deletes => false
)
```

#### Partitioning by a key

```sql
Expand Down Expand Up @@ -434,6 +520,13 @@ Table result = myTable.toChangelog(
map("INSERT, UPDATE_AFTER", "false", "DELETE", "true").asArgument("op_mapping")
);

// Opt out of full-delete semantics. When `true` (default), DELETE rows carry the full
// pre-image. When `false`, only the identifying key columns are preserved and the rest
// are nulled. See [Full vs partial deletes](#full-vs-partial-deletes) for more details.
Table result = myTable.toChangelog(
lit(false).asArgument("produces_full_deletes")
);

// Set semantics: co-locate rows with the same key in the same parallel operator instance.
// Equivalent to PARTITION BY in SQL. The partition keys are prepended to the output columns.
Table result = myTable.partitionBy($("id")).toChangelog();
Expand Down
19 changes: 17 additions & 2 deletions flink-python/pyflink/table/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -1196,10 +1196,19 @@ def to_changelog(self, *arguments: Expression) -> 'Table':
INSERT-only row with a string ``op`` column indicating the original operation
(INSERT, UPDATE_BEFORE, UPDATE_AFTER, DELETE).

The optional ``produces_full_deletes`` boolean controls how DELETE rows are
emitted. When ``True`` (default), the planner inserts a ``ChangelogNormalize``
operator for upsert sources that emit key-only deletes so the function emits
fully populated DELETE rows downstream. When ``False``, the function emits
partial DELETE rows: row semantics preserves the planner-derived upsert key
columns and nulls the rest, set semantics (``PARTITION BY``) preserves the
partition key and nulls the rest. Requires an upsert key or ``PARTITION BY``;
otherwise the call is rejected.

Example:
::

>>> from pyflink.table.expressions import descriptor, map_
>>> from pyflink.table.expressions import descriptor, map_, lit, col
>>> # Default: adds 'op' column with standard change operation names
>>> result = table.to_changelog()
>>> # Custom op column name and mapping
Expand All @@ -1213,8 +1222,14 @@ def to_changelog(self, *arguments: Expression) -> 'Table':
... map_("INSERT, UPDATE_AFTER", "false",
... "DELETE", "true").as_argument("op_mapping")
... )
>>> # Opt out of full-delete semantics to emit partial DELETE rows.
>>> # Requires an upsert key or PARTITION BY; otherwise rejected.
>>> result = table.to_changelog(
... lit(False).as_argument("produces_full_deletes")
... )

:param arguments: Optional named arguments for ``op`` and ``op_mapping``.
:param arguments: Optional named arguments for ``op``, ``op_mapping``, and
``produces_full_deletes``.
:return: An append-only :class:`~pyflink.table.Table` with an ``op`` column prepended
to the input columns.
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,9 +204,19 @@ public interface PartitionedTable {
* descriptor("deleted").asArgument("op"),
* map("INSERT, UPDATE_AFTER", "false", "DELETE", "true").asArgument("op_mapping")
* );
*
* // Opt out of full-delete semantics. When `true` (default), DELETE rows carry the full
* // pre-image. When `false`, only the identifying key columns are preserved and the rest
* // are nulled. See [Full vs partial deletes](
* // https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/changelog/#full-vs-partial-deletes)
* // for more details.
* Table result = table
* .partitionBy($("id"))
* .toChangelog(lit(false).asArgument("produces_full_deletes"));
* }</pre>
*
* @param arguments optional named arguments for {@code op} and {@code op_mapping}
* @param arguments optional named arguments for {@code op}, {@code op_mapping}, and {@code
* produces_full_deletes}
* @return an append-only {@link Table} with output schema {@code [partition_keys, op,
* non_partition_input_columns]}
* @see Table#toChangelog(Expression...)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1459,9 +1459,19 @@ default TableResult executeInsert(
* descriptor("deleted").asArgument("op"),
* map("INSERT, UPDATE_AFTER", "false", "DELETE", "true").asArgument("op_mapping")
* );
*
* // Opt out of full-delete semantics. When `true` (default), DELETE rows carry the full
* // pre-image. When `false`, only the identifying key columns are preserved and the rest
* // are nulled. See [Full vs partial deletes](
* // https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/changelog/#full-vs-partial-deletes)
* // for more details.
* Table result = table.toChangelog(
* lit(false).asArgument("produces_full_deletes")
* );
* }</pre>
*
* @param arguments optional named arguments for {@code op} and {@code op_mapping}
* @param arguments optional named arguments for {@code op}, {@code op_mapping}, and {@code
* produces_full_deletes}
* @return an append-only {@link Table} with an {@code op} column prepended to the input columns
*/
Table toChangelog(Expression... arguments);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -781,6 +781,11 @@ public Optional<ChangelogMode> changelogMode() {
return Optional.empty();
}

@Override
public List<int[]> upsertKeyColumns() {
return Collections.emptyList();
}

private PartitionQueryOperation findPartitionOperation(QueryOperation op) {
if (op instanceof PartitionQueryOperation) {
return (PartitionQueryOperation) op;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -840,25 +840,41 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
"UPDATE_BEFORE"))))
.withConditionalTrait(
StaticArgumentTrait.REQUIRE_FULL_DELETE,
TraitCondition.or(
// op_mapping omitted: default mapping includes
// DELETE.
TraitCondition.not(
TraitCondition.argIsPresent(
"op_mapping")),
TraitCondition.argMatches(
"op_mapping",
Map.class,
mapping ->
opMappingContainsKey(
(Map<String, String>)
mapping,
"DELETE")))),
// Require full deletes by default. The user can opt
// out via produces_full_deletes=FALSE.
// REQUIRE_FULL_DELETE
// still gates on the active op_mapping mapping DELETE;
// otherwise no DELETE rows reach the function and there
// is no point inserting ChangelogNormalize upstream.
TraitCondition.and(
TraitCondition.or(
TraitCondition.not(
TraitCondition.argIsPresent(
"produces_full_deletes")),
TraitCondition.argIsEqualTo(
"produces_full_deletes",
Boolean.TRUE)),
TraitCondition.or(
TraitCondition.not(
TraitCondition.argIsPresent(
"op_mapping")),
TraitCondition.argMatches(
"op_mapping",
Map.class,
mapping ->
opMappingContainsKey(
(Map<
String,
String>)
mapping,
"DELETE"))))),
StaticArgument.scalar("op", DataTypes.DESCRIPTOR(), true),
StaticArgument.scalar(
"op_mapping",
DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING()),
true))
true),
StaticArgument.scalar(
"produces_full_deletes", DataTypes.BOOLEAN(), true))
.inputTypeStrategy(TO_CHANGELOG_INPUT_TYPE_STRATEGY)
.outputTypeStrategy(TO_CHANGELOG_OUTPUT_TYPE_STRATEGY)
.runtimeClass(
Expand Down
Loading