Skip to content

[flink] Support streaming upsert with partial column update for data evolution#7954

Closed
JingsongLi wants to merge 2 commits into
apache:masterfrom
JingsongLi:stream_de
Closed

[flink] Support streaming upsert with partial column update for data evolution#7954
JingsongLi wants to merge 2 commits into
apache:masterfrom
JingsongLi:stream_de

Conversation

@JingsongLi
Copy link
Copy Markdown
Contributor

Add streaming upsert support for append-only tables in data evolution mode. Records are classified as INSERT or UPDATE via a business key index, with NULL-based partial column detection to write only changed columns and keep original files for merge-on-read. Includes partition-aware routing with configurable index parallelism and documentation.

…evolution

Add streaming upsert support for append-only tables in data evolution mode.
Records are classified as INSERT or UPDATE via a business key index, with
NULL-based partial column detection to write only changed columns and keep
original files for merge-on-read. Includes partition-aware routing with
configurable index parallelism and documentation.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@JingsongLi JingsongLi changed the title [flink] Support streaming upsert with partial column update for data evolution [WIP][flink] Support streaming upsert with partial column update for data evolution May 25, 2026
Copy link
Copy Markdown
Contributor

@leaves12138 leaves12138 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for working on this feature. I found a streaming data-correctness issue in the partial-update write path that should be fixed before merging.

Blocking issue:

UpsertWriteOperator.writePartialUpdate chooses one writeType for the whole original file by calling computeAlwaysNonNullColumns(updates). That method keeps only columns that are non-null in every updated row. However the upsert input uses null as "do not change this column" on a per-row basis. If two rows in the same original file update different columns in the same checkpoint, valid updates are silently dropped.

Example:

  1. Existing file contains (1, 'a', 1.0), (2, 'b', 2.0).
  2. One upsert batch contains (1, 'a_v2', NULL) and (2, NULL, 20.0).
  3. computeAlwaysNonNullColumns keeps only id, because name is null in the second update and value is null in the first update.
  4. The partial file is written with only id, so both the name update for id=1 and the value update for id=2 are lost; the final table remains (1, 'a', 1.0), (2, 'b', 2.0).

The write column selection needs to be based on columns that are updated by at least one row, and for rows where a selected column is null in the update record it should write the original value for that column. Alternatively, split the rewrite into multiple partial files by non-null column set. Please also add a regression test for mixed partial-column updates within one upsert/checkpoint.

Validation: I ran mvn -pl paimon-flink/paimon-flink-common -am -Pfast-build -DskipTests compile successfully on this PR.

The write column selection was based on columns non-NULL in ALL update
rows (intersection), which silently dropped updates when rows in the
same file updated different columns in one batch. Changed to union
(any-row non-NULL) and merge each update row with the original to
backfill NULL columns. Added regression test for mixed partial-column
updates within one upsert batch.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@JingsongLi JingsongLi changed the title [WIP][flink] Support streaming upsert with partial column update for data evolution [flink] Support streaming upsert with partial column update for data evolution May 25, 2026
Copy link
Copy Markdown
Contributor

@leaves12138 leaves12138 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the quick fix. The previous partial-update correctness issue is addressed by using the union of non-null update columns and filling null cells from the original row before writing the partial file. The added testMixedPartialColumnUpdatesInSameBatch covers the reported case.

Validation on my side:

  • mvn -pl paimon-flink/paimon-flink-common -am -Pfast-build -DskipTests compile
  • mvn -pl paimon-api -Pfast-build -DskipTests install
  • mvn -pl paimon-flink/paimon-flink-common -Pfast-build -DfailIfNoTests=false -Dtest=DataEvolutionUpsertITCase#testMixedPartialColumnUpdatesInSameBatch test

LGTM, pending CI.

@JingsongLi
Copy link
Copy Markdown
Contributor Author

If the same UpsertKey is repeated multiple times in the short term, the old data may not have been submitted yet, and the new data will still be treated as the new UpsertKey, so there may still be issues.

@JingsongLi JingsongLi closed this May 25, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants