Skip to content

[FLINK-39538][table] Support upsert output mode for FROM_CHANGELOG#28164

Open
raminqaf wants to merge 4 commits into
apache:masterfrom
raminqaf:FLINK-39538
Open

[FLINK-39538][table] Support upsert output mode for FROM_CHANGELOG#28164
raminqaf wants to merge 4 commits into
apache:masterfrom
raminqaf:FLINK-39538

Conversation

@raminqaf
Copy link
Copy Markdown
Contributor

What is the purpose of the change

FROM_CHANGELOG now emits an upsert changelog (INSERT, UPDATE_AFTER, full DELETE) when the input table is partitioned (set semantics via PARTITION BY) and the active op_mapping maps to UPDATE_AFTER without UPDATE_BEFORE. The partition key acts as the upsert key. In all other cases the output remains a retract changelog.

Submitting an op_mapping with UPDATE_AFTER but no UPDATE_BEFORE without PARTITION BY is rejected at validation time, because upsert mode requires a key.

To enable the strategy to inspect the resolved op_mapping and the input table's partition keys, ChangelogFunction.ChangelogContext is extended with two default methods: getArgumentValue(int, Class) and getTableSemantics(int). Defaults return Optional.empty() to preserve source compatibility for existing implementations.

The planner-side wrapper in FlinkChangelogModeInferenceProgram delegates the two new methods to the underlying CallContext.

Upsert mode uses full deletes (ChangelogMode.upsert(false)) because the runtime forwards each input delete row with all fields populated; only the RowKind is rewritten. This matches the runtime's behavior and avoids forcing downstream operators to reconstruct rows from state.

The upsert key derivation in FlinkRelMdUniqueKeys.getPtfUniqueKeys already returns the partition columns when a PTF emits upsert, so no metadata changes are needed.

Verifying this change

Changes are backed by semantic tests and plan verification tests

  • Added semantics tests in FromChangelogTestProgram
  • Added tests in FromChangelogTest to verify generated plan and upsertKeys

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / no)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (yes)
  • If yes, how is the feature documented? (docs / JavaDocs )

Was generative AI tooling used to co-author this PR?
  • Yes (please specify the tool below)

Generated-by: Opus 4.7

@flinkbot
Copy link
Copy Markdown
Collaborator

flinkbot commented May 15, 2026

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

Copy link
Copy Markdown
Contributor

@fhueske fhueske left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR @raminqaf

I've left a few comments.

Cheers, Fabian


If you are producing an upsert table — that is, you are emitting `UPDATE_AFTER` but no `UPDATE_BEFORE` from your CDC input stream — the partition key you select here will be considered both the primary key and the upsert key by the engine. Make sure the `PARTITION BY` key matches your primary key exactly.

#### Upsert output
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering to what extend we need to document this behavior.
FROM_CHANGELOG converts and explicit changelog into a Flink dynamic table that is internally handled. From a user's point of view, does it really matter how Flink encodes changes? It shouldn't change the semantics of the following SQL operations. Is using upsert changelog mode not "just" an internal optimization?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a fair point to keep this simple. I thought a bit and still think it'd be good to have the section. We have retract as the default mode. The changelog mode of a table is an user facing option which users are aware which users are often dealing with. So a section telling them what they have to pay attention to when they want to fabricate an upsert table makes sense imo

-- -D[id:2, name:'Bob']
```

Without `PARTITION BY`, or when the active `op_mapping` includes `UPDATE_BEFORE`, the output remains a retract changelog.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Document that validation fails if there is no PARTITION BY and no UPDATE_BEFORE?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

}
final Map<String, String> mapping = opMapping.get();
final boolean hasUpdateAfter =
mapping.values().stream().anyMatch(v -> UPDATE_AFTER.equals(v.trim()));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

was already checked that v isn't null or might v.trim() throw a NPE?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch! I filter the nulls before motching

* cannot be resolved as a literal, since the default mapping includes both (retract).
*/
@SuppressWarnings("unchecked")
public static boolean isUpsertConfig(final ChangelogContext ctx) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this function need to be public?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be (package-)private

final Map<String, String> mapping,
final boolean throwOnFailure) {
final boolean hasUpdateAfter =
mapping.values().stream().anyMatch(v -> UPDATE_AFTER.equals(v.trim()));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same question about NPE as above

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment on lines +276 to +279
callContext
.getTableSemantics(ARG_TABLE)
.map(ts -> ts.partitionByColumns().length > 0)
.orElse(false);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could be extracted into a helper method (is also used in isUpsertConfig()

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The extraction would not be as beautiful as (you and) I imagined to be. One is using CallContetxt the other one is using ChangelogContext. I will try to play around and make it nice 🙂

* describes an upsert changelog. Upsert mode requires a key, so the input table must use set
* semantics via {@code PARTITION BY}.
*/
private static Optional<List<DataType>> validateUpsertRequiresPartitionBy(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The return type of this method looks strange.
It never returns an actual List, only Optional.empty().
I'd rather return a boolean and move the error message into validateOpMapping().

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed

mapping.values().stream().anyMatch(v -> UPDATE_AFTER.equals(v.trim()));
final boolean hasUpdateBefore =
mapping.values().stream().anyMatch(v -> UPDATE_BEFORE.equals(v.trim()));
if (!hasUpdateAfter || hasUpdateBefore) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a comment describing the rational of this condition.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

+ "'DELETE', 'DELETE'])")
.build();

public static final TableTestProgram UPSERT_PARTITION_BY_CUSTOM_MAPPING =
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't the only difference to the other added test that the mapping uses "c" instead of "INSERT"?
Isn't that essentially the same test?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather add a validation test for the case of UPDATE_AFTER without UPDATE_BEFORE and PARTITION BY?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree with @fhueske

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sry for the confusion. It is removed

Copy link
Copy Markdown
Contributor

@gustavodemorais gustavodemorais left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR in general looks very good, @raminqaf!

I've added multiple comments and doc suggestions and only one change requested.


If you are producing an upsert table — that is, you are emitting `UPDATE_AFTER` but no `UPDATE_BEFORE` from your CDC input stream — the partition key you select here will be considered both the primary key and the upsert key by the engine. Make sure the `PARTITION BY` key matches your primary key exactly.

#### Upsert output
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a fair point to keep this simple. I thought a bit and still think it'd be good to have the section. We have retract as the default mode. The changelog mode of a table is an user facing option which users are aware which users are often dealing with. So a section telling them what they have to pay attention to when they want to fabricate an upsert table makes sense imo


If you are producing an upsert table — that is, you are emitting `UPDATE_AFTER` but no `UPDATE_BEFORE` from your CDC input stream — the partition key you select here will be considered both the primary key and the upsert key by the engine. Make sure the `PARTITION BY` key matches your primary key exactly.

#### Upsert output
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
#### Upsert output
#### Upsert table

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed

-- -D[id:2, name:'Bob']
```

Without `PARTITION BY`, or when the active `op_mapping` includes `UPDATE_BEFORE`, the output remains a retract changelog.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Without `PARTITION BY`, or when the active `op_mapping` includes `UPDATE_BEFORE`, the output remains a retract changelog.
By default, without `PARTITION BY`, or when the active `op_mapping` includes `UPDATE_BEFORE`, the output remains a retract changelog.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added


#### Upsert output

When `PARTITION BY` is combined with an `op_mapping` that does NOT include `UPDATE_BEFORE`, the output changelog is an upsert table keyed on the partition columns. Each input row produces an `INSERT`, `UPDATE_AFTER`, or `DELETE` event with the partition key acting as the upsert key.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
When `PARTITION BY` is combined with an `op_mapping` that does NOT include `UPDATE_BEFORE`, the output changelog is an upsert table keyed on the partition columns. Each input row produces an `INSERT`, `UPDATE_AFTER`, or `DELETE` event with the partition key acting as the upsert key.
To generate an `Upsert table`, the following requirements must be met:
* **Key Partitioning:** You must use `PARTITION BY <key>`, where the partition key corresponds to the unique/primary key of the dataset.
* **Op Mapping Configuration:** The `op_mapping` must include `UPDATE_AFTER` and must NOT include `UPDATE_BEFORE`.
**How it works:**
The engine assumes that the keys provided in the `PARTITION BY` clause function as the unique upsert keys. The resulting output changelog becomes an upsert table keyed on these partition columns. Each incoming row is evaluated and produces `INSERT`, `UPDATE_AFTER`, or `DELETE` events, using the partition key as the explicit upsert key. Therefore, if the incoming changelog contains unique keys (such as a primary key), they **must** be used in the `PARTITION BY` clause.
The FROM_CHANGELOG PTF assumes events arrived ordered. If the source itself does not guarantee ordering for events for the same PARTITION BY keys, consider using using ORDER BY <link>.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

*
* <ul>
* <li><b>Retract</b> (default): the active {@code op_mapping} includes {@code UPDATE_BEFORE}
* or no updates at all. The output emits {@code INSERT}, {@code UPDATE_BEFORE}, {@code
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* or no updates at all. The output emits {@code INSERT}, {@code UPDATE_BEFORE}, {@code
* or only {@code INSERT} and {@code DELETE} pairs. The output emits {@code INSERT}, {@code UPDATE_BEFORE}, {@code

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

Comment on lines +111 to +113
* <p>Upsert mode uses full deletes ({@link ChangelogMode#upsert(boolean) upsert(false)})
* because the runtime forwards each input delete row with all fields populated; only the {@link
* org.apache.flink.types.RowKind} is rewritten.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* <p>Upsert mode uses full deletes ({@link ChangelogMode#upsert(boolean) upsert(false)})
* because the runtime forwards each input delete row with all fields populated; only the {@link
* org.apache.flink.types.RowKind} is rewritten.
* <p>Upsert mode uses full deletes by default ({@link ChangelogMode#upsert(boolean) upsert(false)}). We currently only support consuming from a changelog stream with full deletes (where not only the primary keys, but all fields fields are populated).

nit: we could maybe even delete this and add only more info when we add "consume_full_updates"

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed for now

* Returns {@code true} when the FROM_CHANGELOG call should emit an upsert changelog: the input
* table is partitioned AND the resolved {@code op_mapping} contains {@code UPDATE_AFTER}
* without {@code UPDATE_BEFORE}. Falls back to {@code false} when the mapping is absent or
* cannot be resolved as a literal, since the default mapping includes both (retract).
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* cannot be resolved as a literal, since the default mapping includes both (retract).
* cannot be resolved as a literal. The default mapping maps to retract table.

.item("uid", uid)
.item("select", String.join(",", getRowType().getFieldNames()))
.item("rowType", getRowType());
final Set<ImmutableBitSet> upsertKeys =
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think you want to do this. This would change the plan output for all PTFs and you'd theoretically have to regenerate all PTF tests in the code base.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You just want to output the upsertKeys in the plan, right? Then you want to use/thinker your tests to use the testing utility that allows you do so instead of changing the default explain for ptf nodes

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reverted this and now stringify the plan through TableTestBase

+ "'DELETE', 'DELETE'])")
.build();

public static final TableTestProgram UPSERT_PARTITION_BY_CUSTOM_MAPPING =
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree with @fhueske

@raminqaf raminqaf force-pushed the FLINK-39538 branch 2 times, most recently from 08d52fc to 742d0db Compare May 18, 2026 15:17
raminqaf added 2 commits May 18, 2026 17:18
FROM_CHANGELOG now emits an upsert changelog (INSERT, UPDATE_AFTER, full DELETE) when the input table is partitioned (set semantics via PARTITION BY) and the active op_mapping maps to UPDATE_AFTER without UPDATE_BEFORE. The partition key acts as the upsert key. In all other cases the output remains a retract changelog.

Submitting an op_mapping with UPDATE_AFTER but no UPDATE_BEFORE without PARTITION BY is rejected at validation time, because upsert mode requires a key.

To enable the strategy to inspect the resolved op_mapping and the input table's partition keys, ChangelogFunction.ChangelogContext is extended with two default methods: getArgumentValue(int, Class) and getTableSemantics(int). Defaults return Optional.empty() to preserve source compatibility for existing implementations.

The planner-side wrapper in FlinkChangelogModeInferenceProgram delegates the two new methods to the underlying CallContext.

Upsert mode uses full deletes (ChangelogMode.upsert(false)) because the runtime forwards each input delete row with all fields populated; only the RowKind is rewritten. This matches the runtime's behavior and avoids forcing downstream operators to reconstruct rows from state.

The upsert key derivation in FlinkRelMdUniqueKeys.getPtfUniqueKeys already returns the partition columns when a PTF emits upsert, so no metadata changes are needed.
@raminqaf raminqaf force-pushed the FLINK-39538 branch 2 times, most recently from ca6642b to 7da043f Compare May 18, 2026 15:22
Copy link
Copy Markdown
Contributor

@fhueske fhueske left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update @raminqaf!

PR is good to merge.
Left one comment with an improvement suggestion.

Fabian

Comment on lines +126 to +127
return ctx.getArgumentValue(ARG_OP_MAPPING, Map.class).stream()
.anyMatch(FromChangelogTypeStrategy::describesUpsert);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit

Suggested change
return ctx.getArgumentValue(ARG_OP_MAPPING, Map.class).stream()
.anyMatch(FromChangelogTypeStrategy::describesUpsert);
return ctx.getArgumentValue(ARG_OP_MAPPING, Map.class).map(FromChangelogTypeStrategy::describesUpsert).orElse(false);

stream() of an Optional surely works, but kinda hides that this is just a single element.
map() is more clear, IMO.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! Addressed!

@github-actions github-actions Bot added the community-reviewed PR has been reviewed by the community. label May 18, 2026
Copy link
Copy Markdown
Contributor

@gustavodemorais gustavodemorais left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for addressing the feedback, @raminqaf! Take a look

Comment on lines +159 to +162
The engine assumes that the keys provided in the `PARTITION BY` clause function as the unique upsert keys. The resulting output changelog becomes an upsert table keyed on these partition columns. Each incoming row is evaluated and produces `INSERT`, `UPDATE_AFTER`, or `DELETE` events, using the partition key as the explicit upsert key. Therefore, if the incoming changelog contains unique keys (such as a primary key), they **must** be used in the `PARTITION BY` clause.

<span class="label label-danger">Note</span>
- An `op_mapping` that produces `UPDATE_AFTER` without `UPDATE_BEFORE` describes an upsert changelog and requires a key. `PARTITION BY` must be present on the table argument; otherwise the call would produce key-less updates and is rejected at validation time with a `ValidationException`.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The note is a bit repetitive, we already stated that it's an upsert table and we have explicit above that the partition by is required

Suggested change
The engine assumes that the keys provided in the `PARTITION BY` clause function as the unique upsert keys. The resulting output changelog becomes an upsert table keyed on these partition columns. Each incoming row is evaluated and produces `INSERT`, `UPDATE_AFTER`, or `DELETE` events, using the partition key as the explicit upsert key. Therefore, if the incoming changelog contains unique keys (such as a primary key), they **must** be used in the `PARTITION BY` clause.
<span class="label label-danger">Note</span>
- An `op_mapping` that produces `UPDATE_AFTER` without `UPDATE_BEFORE` describes an upsert changelog and requires a key. `PARTITION BY` must be present on the table argument; otherwise the call would produce key-less updates and is rejected at validation time with a `ValidationException`.
An `op_mapping` that produces `UPDATE_AFTER` without `UPDATE_BEFORE` describes an upsert changelog and requires a key. The engine assumes that the keys provided in the `PARTITION BY` clause function as the unique upsert keys. The resulting output changelog becomes an upsert table keyed on these partition columns. Each incoming row is evaluated and produces `INSERT`, `UPDATE_AFTER`, or `DELETE` events, using the partition key as the explicit upsert key. Therefore, if the incoming changelog contains unique keys (such as a primary key), they **must** be used in the `PARTITION BY` clause.

* or no updates at all. The output emits {@code INSERT}, {@code UPDATE_BEFORE}, {@code
* UPDATE_AFTER}, and {@code DELETE}.
* <li><b>Upsert</b>: the {@code op_mapping} maps to {@code UPDATE_AFTER} without {@code
* UPDATE_BEFORE}. The output emits {@code INSERT}, {@code UPDATE_AFTER}, and full {@code
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmmm, it wasn't :D. I see "full"

* Verify the AST and the optimized rel plan for the given SELECT query. The rendered optimized
* rel plan includes the `upsertKeys=[...]` term for rel nodes that derive upsert keys.
*/
def verifyRelPlanWithUpsertKey(query: String, extraDetails: ExplainDetail*): Unit = {
Copy link
Copy Markdown
Contributor

@gustavodemorais gustavodemorais May 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does not seem to fit the style of the utilities we have here. We don have unique verifyRelPlan for each param we can configure. Take a look at how withDuplicateChanges and withQueryBlockAlias were implemented

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

community-reviewed PR has been reviewed by the community.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants