[incremental scan] Support for equality deletes#60
Conversation
| /// Spawns a concurrent task to collect all manifest entries from the `from_snapshot`. | ||
| /// Returns a receiver that will yield the manifest entries as they are collected. | ||
| /// Errors are sent through `error_tx`. | ||
| fn spawn_baseline_file_collection( |
There was a problem hiding this comment.
@vustef this is code to read metadata about the from_snapshot. I could not really reuse code from the full scan here, because the manifest reading logic is tightly coupled into plan_files. Should not hurt here though.
There was a problem hiding this comment.
I don't see though what is different in this function than any other that reads metadata about a snapshot?
There was a problem hiding this comment.
Could from_snapshot be a parameter, so that the function is not special and tied to the "baseline_file_collection"?
There was a problem hiding this comment.
As said, the way metadata is read in the full scan is tightly coupled to the specific manifest file context types, etc.
| if !equality_deletes.is_empty() { | ||
| // The predicate from build_combined_equality_delete_predicate is a "survival" | ||
| // filter (keeps non-deleted rows). Negate it to select rows TO DELETE. | ||
| let survival_predicate = delete_filter |
There was a problem hiding this comment.
Notice that for the full scan, the equality predicate is built at read time in the arrow reader. However, since the incremental scan builds the delete index beforehand, we can compile it here at plan time.
vustef
left a comment
There was a problem hiding this comment.
First round + will review tests later. Thanks Gerald
| /// Spawns a concurrent task to collect all manifest entries from the `from_snapshot`. | ||
| /// Returns a receiver that will yield the manifest entries as they are collected. | ||
| /// Errors are sent through `error_tx`. | ||
| fn spawn_baseline_file_collection( |
There was a problem hiding this comment.
I don't see though what is different in this function than any other that reads metadata about a snapshot?
| /// Spawns a concurrent task to collect all manifest entries from the `from_snapshot`. | ||
| /// Returns a receiver that will yield the manifest entries as they are collected. | ||
| /// Errors are sent through `error_tx`. | ||
| fn spawn_baseline_file_collection( |
There was a problem hiding this comment.
Could from_snapshot be a parameter, so that the function is not special and tied to the "baseline_file_collection"?
vustef
left a comment
There was a problem hiding this comment.
Went through everything now. My main concern is code duplication between all these different tasks (full, and all incremental ones). Let's discuss after you go through comments
| // Three-branch strategy matching Java's ReadConf constructor: | ||
| // Branch 1: file has embedded field IDs → use as-is | ||
| // Branch 2: name_mapping present → apply name mapping, reopen | ||
| // Branch 3: fallback → assign position-based IDs, reopen |
There was a problem hiding this comment.
Not sure if this is needed, it's stated in the function. Otherwise I expected to see this logic here
| ) -> Result<ParquetRecordBatchStreamBuilder<ArrowFileReader>> { | ||
| // Metadata fields (e.g. _file, _pos) are virtual — they don't exist as Parquet columns. | ||
| // Filter them out so get_arrow_projection_mask only sees real schema field IDs. | ||
| let real_field_ids: Vec<i32> = field_ids |
There was a problem hiding this comment.
nit: perhaps we should keep the names, in this case let project_field_ids_without_metadata, so that it's easier to correlate that with the previous code / upstream
There was a problem hiding this comment.
I know this is the difference, but I'm think we can control this difference through a param, that might allow us to skip this step. We either get delete_predicate as a param in a common function, or we get a receiver.
| let (iceberg_field_ids, field_id_map) = | ||
| Self::build_field_id_set_and_map(builder.parquet_schema(), bound_predicate)?; | ||
|
|
||
| if let Some(use_fallback) = projection { |
There was a problem hiding this comment.
I don't get this, so incremental always uses fallback? Why? And why do we call this fallback?
There was a problem hiding this comment.
Why should incremental always use this fallback? That's parameterized by has_missing_field_ids. From Claude
So "fallback" is not about incremental code at all — it refers to position-based projection for migrated tables that lack embedded Parquet field IDs, versus the normal field-ID-based
projection. It corresponds directly to has_missing_field_ids.
Breaking it down:
- projection = None → skip projection entirely (append task: projection is handled separately afterward via apply_projection)
- projection = Some(false) → apply field-ID-based projection (file has embedded field IDs, normal case)
- projection = Some(true) → apply position-based projection (file lacks field IDs, i.e. has_missing_field_ids = true, migrated table)```
| } | ||
| }; | ||
|
|
||
| // There are three possible sources for potential lists of selected RowGroup indices, |
There was a problem hiding this comment.
TODO we lost this comment, will reinsert
https://relationalai.atlassian.net/browse/RAI-44214
+
https://relationalai.atlassian.net/browse/RAI-44483