[flink] introduce a simplified MERGE INTO procedure on data-evolution-table for flink #7128
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Purpose
Linked issue: #7019
This PR is about to introduce a simplified MERGE INTO action/procedure on DataEvolutionTable for flink.
The motivation is that for data-evolution tables, we could efficiently update or insert columns without rewriting existing data files. Paimon-spark module implements it through
merge intosyntax, which is not supported by flink. So we introduce this action to simulatemerge intobehavier.NOTE: Due to limitations in Flink’s implementation, compared with paimon-spark we currently only support the MERGE UPDATE SET branch and do not support inserting new records yet.
The process can be illustrated as below:

This step will assign
_row_idfor each row in source table.FirstRowIdof each newly assigned_row_id.This step is about to ensure that rows belonging to the same files should be processed by same writer operators.
a. Sort rows by
_row_idb. Read original data from each row ranges, merge original data with new rows
c. Write out merged data
This implementation is specially designed for cases where the source table may be much smaller than the target table. Each writer is responsible for reading the original file data. Another possible approach is to perform a left outer join of the target table with the source table, rather than an inner join.
Merge Detail
New rows will be merged with existing rows to make new files aligned with existing files. For example, consider existing rows:
They belong to a same file whose row range is [1, 5]
Then a new updated row comes:
We will merge exiting file and the new file, write out:
Tests
Please see org.apache.paimon.flink.action.DataEvolutionMergeIntoActionITCase
API and Format
Do not modify any existing api.
Documentation
Will be added ASAP