feat: support data evolution table mode#193
Conversation
QuakeWang
left a comment
There was a problem hiding this comment.
@JingsongLi Hi, I have reviewed the pr, just leave some minor comments.
|
|
||
| Ok(try_stream! { | ||
| for split in &splits { | ||
| if split.raw_convertible() || split.data_files().len() == 1 { |
There was a problem hiding this comment.
The raw-convertible / single-file branch yields read_single_file() directly, but this helper does not reorder columns back to projected_column_names after ProjectionMask, unlike the existing read() path.
Parquet returns file-schema order, not request order, so a projection like ["value", "id"] will come back as ["id", "value"] for single-file splits. That makes this path inconsistent with the current read() behavior, and it can also make the new projection test flaky when the plan contains single-file groups.
Should we preserve the same reorder logic here so that the data-evolution raw path matches the normal read path?
| let mut column_source: HashMap<String, (usize, i64)> = HashMap::new(); | ||
|
|
||
| for (file_idx, file_meta) in data_files.iter().enumerate() { | ||
| let file_columns: Vec<String> = if let Some(ref wc) = file_meta.write_cols { |
There was a problem hiding this comment.
Falling back to “all columns from the current table schema” when write_cols == None is not correct here.
One of the main data-evolution scenarios is reading old files after the table has added new columns. In that case, an old file only contains fields from the schema identified by file_meta.schema_id; using the current table schema here will incorrectly mark later-added columns as if this file already provided them. That affects winning-column resolution, and it can also drop entire row groups when the projection only contains newly added columns.
There was a problem hiding this comment.
I cannot get your point. Please raise an example.
There was a problem hiding this comment.
I cannot get your point. Please raise an example.
My point is about old files after schema evolution.
If an old file was written when the table schema was (id, name), and later the table becomes (id, name, age), then write_cols == None should not mean that this old file contains age too. But the current fallback uses the current table schema, so it will treat the old file as if it also provides the newly added columns.
I think for write_cols == None, we should use the file schema from file_meta.schema_id, not the current table schema.
There was a problem hiding this comment.
The old file can be treaded as (id, name, age), the age is null. Actually, this is a schema evolution.
I don't want to introduce the reading of old schema files because it should address issues such as column type changes.
There was a problem hiding this comment.
The old file can be treaded as
(id, name, age), the age is null. Actually, this is a schema evolution.I don't want to introduce the reading of old schema files because it should address issues such as column type changes.
@JingsongLi I get your point. I agree we do not necessarily need to introduce historical schema reading here, especially if that expands the scope to type evolution.
My concern is mainly that, even under the “current schema + NULL for missing columns” semantics, the current fallback may still behave incorrectly in some cases. For example, when projecting only a newly added column, I think this path may return 0 rows instead of preserving the row count and filling NULLs.
So my point is less about requiring old-schema reads, and more about whether the current fallback already implements the expected schema-evolution behavior correctly.
There was a problem hiding this comment.
81665a1
You mean these three lines code? We should remove it.
There was a problem hiding this comment.
Yes, those three lines are part of the issue, so removing them makes sense.
I just want to make sure this is sufficient: for the add-column case, if the projected column does not physically exist in the old files, we should still preserve the row count and fill NULL, instead of dropping rows.
There was a problem hiding this comment.
I found it quite troublesome to fill in NULL, and we also need the corresponding Arrow type, which we don't have here, so the current implementation doesn't have a corresponding column. In the future, when we need to support schema evolution, we will see how to change it.
There was a problem hiding this comment.
I found it quite troublesome to fill in NULL, and we also need the corresponding Arrow type, which we don't have here, so the current implementation doesn't have a corresponding column. In the future, when we need to support schema evolution, we will see how to change it.
OK, that makes sense to me.
| if let Some(files) = data_deletion_files { | ||
| builder = builder.with_data_deletion_files(files); | ||
| if data_evolution_enabled { | ||
| let file_groups = split_by_row_id(data_files); |
There was a problem hiding this comment.
The split-generation logic here diverges from upstream DataEvolutionSplitGenerator.
Upstream does not simply group by equal first_row_id and emit one split per group. It first merges overlapping row_id_ranges, then applies ordered bin packing using target_split_size/open_file_cost, and computes rawConvertible from the packed result. The current implementation introduces two regressions:
source.split.*is effectively bypassed in data-evolution mode, so splits become much more fragmented.- Grouping only by
first_row_idmisses the overlapping row-id-range case, which no longer matches upstream grouping semantics.
I think we should align this with the Java split generator before merging the new read path.
There was a problem hiding this comment.
Packaging different rowids together makes the implementation very complex, which requires re grouping during read, which is not necessary for Rust.
|
+1 |
Purpose
Supports https://paimon.apache.org/docs/master/append-table/data-evolution/
Sub task of #173
Brief change log
Tests
API and Format
Documentation