Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions docs/content/docs/sql/reference/queries/changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,44 @@ Prefer row semantics, when possible. `PARTITION BY` is only necessary when downs

If you are producing an upsert table — that is, you are emitting `UPDATE_AFTER` but no `UPDATE_BEFORE` from your CDC input stream — the partition key you select here will be considered both the primary key and the upsert key by the engine. Make sure the `PARTITION BY` key matches your primary key exactly.

#### Upsert table

To generate an upsert table, two requirements must be met:

* **Key partitioning**: use `PARTITION BY <key>`, where the partition key corresponds to the unique/primary key of the dataset.
* **Op mapping configuration**: the `op_mapping` must include `UPDATE_AFTER` and must NOT include `UPDATE_BEFORE`.

The engine assumes that the keys provided in the `PARTITION BY` clause function as the unique upsert keys. The resulting output changelog becomes an upsert table keyed on these partition columns. Each incoming row is evaluated and produces `INSERT`, `UPDATE_AFTER`, or `DELETE` events, using the partition key as the explicit upsert key. Therefore, if the incoming changelog contains unique keys (such as a primary key), they **must** be used in the `PARTITION BY` clause.

<span class="label label-danger">Note</span>
- An `op_mapping` that produces `UPDATE_AFTER` without `UPDATE_BEFORE` describes an upsert changelog and requires a key. `PARTITION BY` must be present on the table argument; otherwise the call would produce key-less updates and is rejected at validation time with a `ValidationException`.
Comment on lines +159 to +162
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The note is a bit repetitive, we already stated that it's an upsert table and we have explicit above that the partition by is required

Suggested change
The engine assumes that the keys provided in the `PARTITION BY` clause function as the unique upsert keys. The resulting output changelog becomes an upsert table keyed on these partition columns. Each incoming row is evaluated and produces `INSERT`, `UPDATE_AFTER`, or `DELETE` events, using the partition key as the explicit upsert key. Therefore, if the incoming changelog contains unique keys (such as a primary key), they **must** be used in the `PARTITION BY` clause.
<span class="label label-danger">Note</span>
- An `op_mapping` that produces `UPDATE_AFTER` without `UPDATE_BEFORE` describes an upsert changelog and requires a key. `PARTITION BY` must be present on the table argument; otherwise the call would produce key-less updates and is rejected at validation time with a `ValidationException`.
An `op_mapping` that produces `UPDATE_AFTER` without `UPDATE_BEFORE` describes an upsert changelog and requires a key. The engine assumes that the keys provided in the `PARTITION BY` clause function as the unique upsert keys. The resulting output changelog becomes an upsert table keyed on these partition columns. Each incoming row is evaluated and produces `INSERT`, `UPDATE_AFTER`, or `DELETE` events, using the partition key as the explicit upsert key. Therefore, if the incoming changelog contains unique keys (such as a primary key), they **must** be used in the `PARTITION BY` clause.


```sql
-- Upsert input: INSERT / UPDATE_AFTER / DELETE only
-- +I[id:1, op:'INSERT', name:'Alice']
-- +I[id:2, op:'INSERT', name:'Bob']
-- +I[id:1, op:'UPDATE_AFTER', name:'Alice2']
-- +I[id:2, op:'DELETE', name:'Bob']

SELECT * FROM FROM_CHANGELOG(
input => TABLE cdc_stream PARTITION BY id,
op_mapping => MAP[
'INSERT', 'INSERT',
'UPDATE_AFTER', 'UPDATE_AFTER',
'DELETE', 'DELETE']
)

-- Output (upsert changelog, upsert key = id):
-- +I[id:1, name:'Alice']
-- +I[id:2, name:'Bob']
-- +U[id:1, name:'Alice2']
-- -D[id:2, name:'Bob']
```

The `FROM_CHANGELOG` PTF assumes events arrive in order. If the source itself does not guarantee ordering for events sharing the same `PARTITION BY` key, use [`ORDER BY`]({{< ref "docs/dev/table/functions/ptfs" >}}#ordering) to reorder them.

By default, without `PARTITION BY`, or when the active `op_mapping` includes `UPDATE_BEFORE`, the output remains a retract changelog.

#### Ordering CDC events with ORDER BY

CDC streams can deliver events out of order. For example, a key's `UPDATE_AFTER` may arrive before its matching `UPDATE_BEFORE` when events are partitioned across upstream brokers. If the source itself does not guarantee ordering, applying such a changelog directly produces incorrect state.
Expand Down
6 changes: 6 additions & 0 deletions flink-python/pyflink/table/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -1234,6 +1234,12 @@ def from_changelog(self, *arguments: Expression) -> 'Table':
in the mapping; pass ``error_handling => 'SKIP'`` to silently drop those
rows instead.
The output is a retract changelog. To emit an upsert changelog instead, combine
``PARTITION BY`` (set semantics on the table argument) with an ``op_mapping`` that
maps to ``UPDATE_AFTER`` without ``UPDATE_BEFORE``. The partition key becomes the
upsert key. An upsert mapping without ``PARTITION BY`` is rejected at validation
time, since upsert mode requires a key.
Example:
::
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,18 @@ public interface PartitionedTable {
* <p>For row semantics (each row processed independently), use {@link Table#fromChangelog} on
* the unpartitioned table.
*
* <p>Output changelog mode:
*
* <ul>
* <li><b>Retract</b> (default): the active {@code op_mapping} includes {@code UPDATE_BEFORE}
* or no updates at all. The output possibly emits {@code INSERT}, {@code UPDATE_BEFORE},
* {@code UPDATE_AFTER}, and {@code DELETE}.
* <li><b>Upsert</b>: the {@code op_mapping} maps {@code UPDATE_AFTER} without {@code
* UPDATE_BEFORE}. The output emits {@code INSERT}, {@code UPDATE_AFTER}, and full {@code
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* UPDATE_BEFORE}. The output emits {@code INSERT}, {@code UPDATE_AFTER}, and full {@code
* UPDATE_BEFORE}. The output emits {@code INSERT}, {@code UPDATE_AFTER}, and {@code

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmmm, it wasn't :D. I see "full"

* DELETE}, keyed on the partition columns. An upsert mapping without {@code PARTITION BY}
* is rejected at validation time, since upsert mode requires a key.
* </ul>
*
* <p>Examples:
*
* <pre>{@code
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1489,6 +1489,11 @@ default TableResult executeInsert(
* .fromChangelog();
* }</pre>
*
* <p>The output is a retract changelog. To emit an upsert changelog instead, combine {@code
* PARTITION BY} with an {@code op_mapping} that maps to {@code UPDATE_AFTER} without {@code
* UPDATE_BEFORE}. The partition key becomes the upsert key. See {@link
* PartitionedTable#fromChangelog(Expression...)} for details.
*
* <p>Optional arguments can be passed using named expressions:
*
* <pre>{@code
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.flink.table.api.JsonType;
import org.apache.flink.table.api.JsonValueOnEmptyOrError;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.expressions.TimeIntervalUnit;
import org.apache.flink.table.expressions.TimePointUnit;
import org.apache.flink.table.expressions.ValueLiteralExpression;
Expand All @@ -39,6 +38,7 @@
import org.apache.flink.table.types.inference.TraitCondition;
import org.apache.flink.table.types.inference.TypeStrategies;
import org.apache.flink.table.types.inference.strategies.ArrayOfStringArgumentTypeStrategy;
import org.apache.flink.table.types.inference.strategies.FromChangelogTypeStrategy;
import org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies;
import org.apache.flink.table.types.inference.strategies.SpecificTypeStrategies;
import org.apache.flink.table.types.logical.LocalZonedTimestampType;
Expand Down Expand Up @@ -854,7 +854,7 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING()),
true),
StaticArgument.scalar("error_handling", DataTypes.STRING(), true))
.changelogModeStrategy(ctx -> ChangelogMode.all())
.changelogModeStrategy(FromChangelogTypeStrategy.CHANGELOG_MODE_STRATEGY)
.inputTypeStrategy(FROM_CHANGELOG_INPUT_TYPE_STRATEGY)
.outputTypeStrategy(FROM_CHANGELOG_OUTPUT_TYPE_STRATEGY)
.runtimeClass(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.table.annotation.ArgumentTrait;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.types.RowKind;

import java.util.Optional;

/**
* An extension that allows a process table function (PTF) to emit results with changelog semantics.
*
Expand Down Expand Up @@ -105,5 +108,28 @@ interface ChangelogContext {
* are required and {@link ChangelogMode#keyOnlyDeletes()} are supported.
*/
ChangelogMode getRequiredChangelogMode();

/**
* Returns the resolved literal value of the scalar argument at the given position.
*
* <p>Returns empty if the argument is absent, NULL, not a literal, or cannot be expressed
* as an instance of the provided class. Conversions follow the default conversion classes
* of {@link LogicalType LogicalTypes}.
*/
default <T> Optional<T> getArgumentValue(int pos, Class<T> clazz) {
return Optional.empty();
}

/**
* Returns information about the table that has been passed to a table argument at the given
* position.
*
* <p>Semantics are only available for table arguments. They expose, for example, the
* partition-by columns when the table argument uses {@link ArgumentTrait#SET_SEMANTIC_TABLE
* set semantics}.
*/
default Optional<TableSemantics> getTableSemantics(int pos) {
return Optional.empty();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.DataTypes.Field;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.functions.ChangelogFunction.ChangelogContext;
import org.apache.flink.table.functions.ChangelogModeStrategy;
import org.apache.flink.table.functions.TableSemantics;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.inference.CallContext;
Expand All @@ -36,9 +39,11 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Set;
import java.util.function.IntFunction;
import java.util.stream.Collectors;

/** Type strategies for the {@code FROM_CHANGELOG} process table function. */
Expand Down Expand Up @@ -95,6 +100,57 @@ public Optional<List<DataType>> inferInputTypes(
return Optional.of(DataTypes.ROW(outputFields).notNull());
};

// --------------------------------------------------------------------------------------------
// Changelog mode inference
// --------------------------------------------------------------------------------------------

/**
* Emits an upsert changelog when the input is partitioned (set semantics) and the resolved
* {@code op_mapping} maps to {@code UPDATE_AFTER} without {@code UPDATE_BEFORE}. In all other
* cases the output is a retract changelog. When upsert mode is selected, the partition key acts
* as the upsert key.
*/
public static final ChangelogModeStrategy CHANGELOG_MODE_STRATEGY =
ctx -> isUpsertConfig(ctx) ? ChangelogMode.upsert(false) : ChangelogMode.all();
Comment thread
raminqaf marked this conversation as resolved.

/**
* Returns {@code true} when the FROM_CHANGELOG call should emit an upsert changelog: the input
* table is partitioned AND the resolved {@code op_mapping} contains {@code UPDATE_AFTER}
* without {@code UPDATE_BEFORE}. Falls back to {@code false} when the mapping is absent or
* cannot be resolved as a literal. The default mapping maps to a retract table.
*/
private static boolean isUpsertConfig(final ChangelogContext ctx) {
if (!isPartitioned(ctx::getTableSemantics)) {
return false;
}
return ctx.getArgumentValue(ARG_OP_MAPPING, Map.class)
.map(FromChangelogTypeStrategy::describesUpsert)
.orElse(false);
}

/**
* Returns {@code true} if the mapping maps {@code UPDATE_AFTER} without {@code UPDATE_BEFORE},
* i.e. it describes an upsert changelog.
*/
private static boolean describesUpsert(final Map<String, String> mapping) {
return mappingContains(mapping, UPDATE_AFTER) && !mappingContains(mapping, UPDATE_BEFORE);
}

private static boolean mappingContains(
final Map<String, String> mapping, final String rowKindName) {
return mapping.values().stream()
.filter(Objects::nonNull)
.anyMatch(v -> rowKindName.equals(v.trim()));
}

private static boolean isPartitioned(
final IntFunction<Optional<TableSemantics>> tableSemantics) {
return tableSemantics
.apply(ARG_TABLE)
.map(ts -> ts.partitionByColumns().length > 0)
.orElse(false);
}

// --------------------------------------------------------------------------------------------
// Helpers
// --------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -200,17 +256,17 @@ private static Optional<List<DataType>> validateOpMapping(
if (validationError.isPresent()) {
return validationError;
}

final boolean hasUpdateBefore =
mapping.values().stream().anyMatch(v -> UPDATE_BEFORE.equals(v.trim()));
final boolean hasUpdateAfter =
mapping.values().stream().anyMatch(v -> UPDATE_AFTER.equals(v.trim()));
if (hasUpdateAfter && !hasUpdateBefore) {
// A mapping that produces UPDATE_AFTER without UPDATE_BEFORE describes an upsert
// changelog. Upsert mode requires a key, so PARTITION BY must be present on the
// table argument; otherwise the call would produce key-less updates.
if (describesUpsert(mapping) && !isPartitioned(callContext::getTableSemantics)) {
return callContext.fail(
throwOnFailure,
"The 'op_mapping' must include UPDATE_BEFORE for retract mode. "
+ "Upsert mode (without UPDATE_BEFORE) is not supported "
+ "in this version.");
"An 'op_mapping' that produces UPDATE_AFTER without UPDATE_BEFORE "
+ "describes an upsert changelog and requires a key. Add "
+ "PARTITION BY to the input table to define the upsert key, "
+ "or include UPDATE_BEFORE in the mapping for retract "
+ "semantics.");
}
}
return Optional.empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,11 +139,33 @@ protected Stream<TestSpec> testData() {
.calledWithLiteralAt(3, "FAIL")
.expectArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE, STRING_TYPE),

// Error: UPDATE_AFTER without UPDATE_BEFORE not supported
// Valid: upsert mapping (INSERT + UPDATE_AFTER + DELETE, no UPDATE_BEFORE) with
// PARTITION BY. FROM_CHANGELOG emits an upsert changelog keyed on the partition
// column.
TestSpec.forStrategy(
"UPDATE_AFTER requires UPDATE_BEFORE",
"Valid upsert mapping with PARTITION BY",
FROM_CHANGELOG_INPUT_TYPE_STRATEGY)
.calledWithArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE)
.calledWithArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE, STRING_TYPE)
.calledWithTableSemanticsAt(
0,
new TableSemanticsMock(
TABLE_TYPE, new int[] {0}, new int[0], -1, null))
.calledWithLiteralAt(1, ColumnList.of("op"))
.calledWithLiteralAt(
2,
Map.of(
"c", "INSERT",
"u", "UPDATE_AFTER",
"d", "DELETE"))
.calledWithLiteralAt(3, "FAIL")
.expectArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE, STRING_TYPE),

// Error: upsert mapping (INSERT + UPDATE_AFTER + DELETE) without PARTITION BY.
// Upsert mode requires a key, so the call must use set semantics.
TestSpec.forStrategy(
"Upsert mapping without PARTITION BY rejected",
FROM_CHANGELOG_INPUT_TYPE_STRATEGY)
.calledWithArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE, STRING_TYPE)
.calledWithTableSemanticsAt(0, new TableSemanticsMock(TABLE_TYPE))
.calledWithLiteralAt(1, ColumnList.of("op"))
.calledWithLiteralAt(
Expand All @@ -152,7 +174,13 @@ protected Stream<TestSpec> testData() {
"c", "INSERT",
"u", "UPDATE_AFTER",
"d", "DELETE"))
.expectErrorMessage("must include UPDATE_BEFORE"),
.calledWithLiteralAt(3, "FAIL")
.expectErrorMessage(
"An 'op_mapping' that produces UPDATE_AFTER without "
+ "UPDATE_BEFORE describes an upsert changelog and "
+ "requires a key. Add PARTITION BY to the input table "
+ "to define the upsert key, or include UPDATE_BEFORE in "
+ "the mapping for retract semantics."),

// Error: Invalid error_handling mode
TestSpec.forStrategy(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.flink.table.api.InsertConflictStrategy.ConflictBehavior
import org.apache.flink.table.api.config.ExecutionConfigOptions
import org.apache.flink.table.api.config.ExecutionConfigOptions.UpsertMaterialize
import org.apache.flink.table.connector.ChangelogMode
import org.apache.flink.table.functions.{BuiltInFunctionDefinition, ChangelogFunction}
import org.apache.flink.table.functions.{BuiltInFunctionDefinition, ChangelogFunction, TableSemantics}
import org.apache.flink.table.functions.ChangelogFunction.ChangelogContext
import org.apache.flink.table.planner.calcite.{FlinkTypeFactory, RexTableArgCall}
import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction
Expand Down Expand Up @@ -1678,8 +1678,8 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti
val callContext =
function.toCallContext(udfCall, inputTimeColumns, inputChangelogModes, outputChangelogMode)

// Expose a simplified context to let users focus on important characteristics.
// If necessary, we can expose the full CallContext in the future.
// Expose a simplified context focused on changelog-relevant inputs: changelog modes,
// resolved literal arguments, and table semantics (e.g., partition-by columns).
new ChangelogContext {
override def getTableChangelogMode(pos: Int): ChangelogMode = {
val tableSemantics = callContext.getTableSemantics(pos).orElse(null)
Expand All @@ -1692,6 +1692,14 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti
override def getRequiredChangelogMode: ChangelogMode = {
callContext.getOutputChangelogMode.orElse(null)
}

override def getArgumentValue[T](pos: Int, clazz: Class[T]): java.util.Optional[T] = {
callContext.getArgumentValue(pos, clazz)
}

override def getTableSemantics(pos: Int): java.util.Optional[TableSemantics] = {
callContext.getTableSemantics(pos)
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public List<TableTestProgram> programs() {
FromChangelogTestPrograms.CUSTOM_OP_MAPPING,
FromChangelogTestPrograms.CUSTOM_OP_NAME,
FromChangelogTestPrograms.RETRACT_PARTITION_BY,
FromChangelogTestPrograms.UPSERT_PARTITION_BY,
FromChangelogTestPrograms.DELETION_FLAG_PARTITION_BY,
FromChangelogTestPrograms.SKIP_INVALID_OP_HANDLING,
FromChangelogTestPrograms.SKIP_NULL_OP_CODE,
Expand Down
Loading