Skip to content

[FLINK-39419][table] Switch TO_CHANGELOG to row semantics with full deletes + require update before#27911

Open
gustavodemorais wants to merge 4 commits intoapache:masterfrom
gustavodemorais:FLINK-39419
Open

[FLINK-39419][table] Switch TO_CHANGELOG to row semantics with full deletes + require update before#27911
gustavodemorais wants to merge 4 commits intoapache:masterfrom
gustavodemorais:FLINK-39419

Conversation

@gustavodemorais
Copy link
Copy Markdown
Contributor

What is the purpose of the change

Switches TO_CHANGELOG from SET_SEMANTIC_TABLE to ROW_SEMANTIC_TABLE, removing the PARTITION BY requirement. The planner inserts ChangelogNormalize for upsert sources to guarantee UPDATE_BEFORE and full DELETE rows (correctness over performance)..

Brief change log

  • Switched TO_CHANGELOG to row semantics (no PARTITION BY)
  • Added REQUIRE_UPDATE_BEFORE + REQUIRE_FULL_DELETE for correctness guarantees
  • Moved toChangelog() from PartitionedTable to Table interface
  • Simplified runtime and type strategy by removing partition key projection
  • Added UPSERT_INPUT semantic test with key-only deletes to verify ChangelogNormalize fills full rows
  • Fixed CallBindingCallContext.convertMap() to unwrap implicit CASTs on MAP literal operands
  • Re-enabled DUPLICATE_ROW_KIND validation test

Verifying this change

  • Plan tests pass
  • Semantic tests pass

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (yes)
  • If yes, how is the feature documented? (docs / JavaDocs)

@gustavodemorais gustavodemorais changed the title [FLINK-39419][table] Switch TO_CHANGELOG to row semantics supporting with full deletes + require update before [FLINK-39419][table] Switch TO_CHANGELOG to row semantics with full deletes + require update before Apr 10, 2026
@flinkbot
Copy link
Copy Markdown
Collaborator

flinkbot commented Apr 10, 2026

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

Copy link
Copy Markdown
Contributor

@twalthr twalthr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this simplification @gustavodemorais. This aligns better with existing concepts.

Comment on lines +787 to +789
// UPDATE_BEFORE and full DELETE rows. REQUIRE_FULL_DELETE is
// redundant without partition keys (isPtfUpsert is always false)
// but documents the intent explicitly.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// UPDATE_BEFORE and full DELETE rows. REQUIRE_FULL_DELETE is
// redundant without partition keys (isPtfUpsert is always false)
// but documents the intent explicitly.
// UPDATE_BEFORE and implicitly full DELETE rows.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added REQUIRE_FULL_DELETE in the PR just to explicitly state what we're doing, even though it's not necessary. I think that makes it more clear what we're doing, always generating full deletes

I've added a short note for that on top of REQUIRE_FULL_DELETE - I could also delete it, I don't have a strong opinion

* different lengths (e.g., {@code CAST('INSERT' AS VARCHAR(12))}).
*/
private static SqlNode unwrapCast(final SqlNode node) {
if (node.getKind() == SqlKind.CAST) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we also need to check the type of the cast, otherwise you might strip cast like cast(int as double) or cast(string as boolean)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, done

ToChangelogTestPrograms.LAG_ON_RETRACT_VIA_CHANGELOG,
ToChangelogTestPrograms.DELETION_FLAG,
ToChangelogTestPrograms.MISSING_PARTITION_BY,
ToChangelogTestPrograms.REJECTS_PARTITION_BY,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not necessary, already tested by ptf framework

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

deleted

@gustavodemorais
Copy link
Copy Markdown
Contributor Author

Thanks for the review, @twalthr! I've addressed your feedback, take a look

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants