From 29cf9f603c60c010560ede9e6ffb930ecb90f4af Mon Sep 17 00:00:00 2001 From: Ravid Gontov Date: Thu, 16 Apr 2026 20:40:23 +0300 Subject: [PATCH 1/2] perf(reader): replace O(N*M) equality delete predicate tree with O(N+M) HashSet-based filter MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The existing equality delete implementation builds a predicate AST with one node per delete record and evaluates every node against every data batch. For N delete records and M data rows, this is O(N*M) — 25 minutes for 30M rows with ~5000 delete records in our benchmark. Replace the predicate tree with a `HashSet` of delete key tuples, matching the approach used by the Java/Spark implementation (`StructLikeSet`). Delete records are collected into a hash set during parsing (O(N)), then each data row is checked with an O(1) lookup. Key changes: - `caching_delete_file_loader.rs`: `parse_equality_deletes_record_batch_stream` now returns an `EqDeleteSet` (hash set + field metadata) instead of a `Predicate`. The balanced binary tree construction and `rewrite_not()` calls are eliminated entirely. `Datum` already derives `Hash` and `Eq` (via `OrderedFloat` for floats), so no new trait implementations are needed. - `delete_filter.rs`: `EqDelState::Loaded` holds `Arc` instead of `Predicate`. The new `build_equality_delete_sets()` groups delete files by their `equality_ids` field layout before unioning, preventing incorrect merges when different delete files use different equality column sets. Single-file groups return the cached `Arc` directly with no deep clone. - `reader.rs`: Equality delete filtering is decoupled from the scan predicate `RowFilter`. The scan predicate stays in the Parquet `RowFilter` pipeline (page/row-group pruning preserved). Equality deletes are applied as a lazy post-read `.map()` step on the record batch stream via `apply_eq_delete_filter()`, which reuses a single `EqDeleteKey` allocation across all rows to avoid per-row `Vec` allocations. --- .../src/arrow/caching_delete_file_loader.rs | 201 ++++++++++---- crates/iceberg/src/arrow/delete_filter.rs | 258 ++++++++++++++---- crates/iceberg/src/arrow/reader.rs | 145 ++++++++-- 3 files changed, 472 insertions(+), 132 deletions(-) diff --git a/crates/iceberg/src/arrow/caching_delete_file_loader.rs b/crates/iceberg/src/arrow/caching_delete_file_loader.rs index ae97534d83..f6393beae9 100644 --- a/crates/iceberg/src/arrow/caching_delete_file_loader.rs +++ b/crates/iceberg/src/arrow/caching_delete_file_loader.rs @@ -16,7 +16,6 @@ // under the License. use std::collections::{HashMap, HashSet}; -use std::ops::Not; use std::sync::Arc; use arrow_array::{Array, ArrayRef, Int64Array, StringArray, StructArray}; @@ -27,8 +26,6 @@ use super::delete_filter::{DeleteFilter, PosDelLoadAction}; use crate::arrow::delete_file_loader::BasicDeleteFileLoader; use crate::arrow::{arrow_primitive_to_literal, arrow_schema_to_schema}; use crate::delete_vector::DeleteVector; -use crate::expr::Predicate::AlwaysTrue; -use crate::expr::{Predicate, Reference}; use crate::io::FileIO; use crate::scan::{ArrowRecordBatchStream, FileScanTaskDeleteFile}; use crate::spec::{ @@ -38,6 +35,41 @@ use crate::spec::{ }; use crate::{Error, ErrorKind, Result}; +/// A composite key for equality delete lookups. Each element corresponds to one +/// equality_id column. For single-column deletes this contains one element. +#[derive(Hash, Eq, PartialEq, Debug, Clone)] +pub(crate) struct EqDeleteKey(pub(crate) Vec>); + +/// Bundles the hash set of delete keys with the field metadata needed to extract +/// matching keys from data record batches. +#[derive(Debug, Clone)] +pub(crate) struct EqDeleteSet { + /// Delete key tuples to filter out of data batches. + pub(crate) keys: HashSet, + /// Ordered list of (field_name, field_id) used to locate the key columns in + /// data record batches. The order matches the element order in `EqDeleteKey`. + pub(crate) fields: Vec<(String, i32)>, +} + +impl EqDeleteSet { + fn new(fields: Vec<(String, i32)>) -> Self { + Self { + keys: HashSet::new(), + fields, + } + } + + /// Returns true when the set contains no delete keys. + pub(crate) fn is_empty(&self) -> bool { + self.keys.is_empty() + } + + /// Merge another set (with the same field layout) into this one. + pub(crate) fn union(&mut self, other: &EqDeleteSet) { + self.keys.extend(other.keys.iter().cloned()); + } +} + #[derive(Clone, Debug)] pub(crate) struct CachingDeleteFileLoader { basic_delete_file_loader: BasicDeleteFileLoader, @@ -59,7 +91,7 @@ enum DeleteFileContext { FreshEqDel { batch_stream: ArrowRecordBatchStream, equality_ids: HashSet, - sender: tokio::sync::oneshot::Sender, + sender: tokio::sync::oneshot::Sender>, }, } @@ -99,16 +131,17 @@ impl CachingDeleteFileLoader { /// another concurrently processing data file scan task. If it is, we skip it. /// If not, the DeleteFilter is updated to contain a notifier to prevent other data file /// tasks from starting to load the same equality delete file. We spawn a task to load - /// the EQ delete's record batch stream, convert it to a predicate, update the delete filter, - /// and notify any task that was waiting for it. + /// the EQ delete's record batch stream, convert it to an `EqDeleteSet` (hash set of + /// delete key tuples), update the delete filter, and notify any task that was waiting + /// for it. /// * When this gets updated to add support for delete vectors, the load phase will return /// a PuffinReader for them. /// * The parse phase parses each record batch stream according to its associated data type. /// The result of this is a map of data file paths to delete vectors for the positional /// delete tasks (and in future for the delete vector tasks). For equality delete - /// file tasks, this results in an unbound Predicate. - /// * The unbound Predicates resulting from equality deletes are sent to their associated oneshot - /// channel to store them in the right place in the delete file managers state. + /// file tasks, this results in an `EqDeleteSet` (hash set of delete key tuples). + /// * The `EqDeleteSet`s resulting from equality deletes are sent to their associated oneshot + /// channel to store them in the right place in the delete file manager's state. /// * The results of all of these futures are awaited on in parallel with the specified /// level of concurrency and collected into a vec. We then combine all the delete /// vector maps that resulted from any positional delete or delete vector files into a @@ -130,7 +163,7 @@ impl CachingDeleteFileLoader { /// Pos Del Del Vec (Not yet Implemented) EQ Del /// | | | /// [parse pos del stream] [parse del vec puffin] [parse eq del] - /// HashMap HashMap (Predicate, Sender) + /// HashMap HashMap (EqDeleteSet, Sender) /// | | | /// | | [persist to state] /// | | () @@ -249,9 +282,17 @@ impl CachingDeleteFileLoader { let (sender, receiver) = channel(); del_filter.insert_equality_delete(&task.file_path, receiver); - // Per the Iceberg spec, evolve schema for equality deletes but only for the - // equality_ids columns, not all table columns. - let equality_ids_vec = task.equality_ids.clone().unwrap(); + // Per the Iceberg spec, equality_ids is required for equality delete files. + // Evolve schema only for the equality_ids columns, not all table columns. + let equality_ids_vec = task.equality_ids.clone().ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!( + "equality_ids is required for equality delete file '{}' but was not set", + task.file_path + ), + ) + })?; let evolved_stream = BasicDeleteFileLoader::evolve_schema( basic_delete_file_loader .parquet_to_batch_stream(&task.file_path, task.file_size_in_bytes) @@ -293,16 +334,16 @@ impl CachingDeleteFileLoader { batch_stream, equality_ids, } => { - let predicate = + let eq_delete_set = Self::parse_equality_deletes_record_batch_stream(batch_stream, equality_ids) .await?; sender - .send(predicate) + .send(Arc::new(eq_delete_set)) .map_err(|err| { Error::new( ErrorKind::Unexpected, - "Could not send eq delete predicate to state", + "Could not send eq delete set to state", ) }) .map(|_| ParsedDeleteFileContext::EqDel) @@ -354,19 +395,23 @@ impl CachingDeleteFileLoader { Ok(result) } + /// Parses equality delete record batches into a hash-based delete set. + /// + /// We collect delete key tuples into a `HashSet` for O(1) per-row lookups. async fn parse_equality_deletes_record_batch_stream( mut stream: ArrowRecordBatchStream, equality_ids: HashSet, - ) -> Result { - let mut row_predicates = Vec::new(); + ) -> Result { let mut batch_schema_iceberg: Option = None; let accessor = EqDelRecordBatchPartnerAccessor; + // Discover field metadata from the first non-empty batch. + let mut eq_delete_set: Option = None; while let Some(record_batch) = stream.next().await { let record_batch = record_batch?; if record_batch.num_columns() == 0 { - return Ok(AlwaysTrue); + return Ok(EqDeleteSet::new(Vec::new())); } let schema = match &batch_schema_iceberg { @@ -388,49 +433,37 @@ impl CachingDeleteFileLoader { continue; } - // Process the collected columns in lockstep + // Lazily initialize the EqDeleteSet with field metadata from the + // first batch that has columns. The field order is stable across + // batches because it comes from the delete file schema. + let delete_set = eq_delete_set.get_or_insert_with(|| { + let fields = datum_columns_with_names + .iter() + .map(|(_, name, field_id)| (name.clone(), *field_id)) + .collect(); + EqDeleteSet::new(fields) + }); + + // Collect delete key tuples by iterating all columns in lockstep. #[allow(clippy::len_zero)] while datum_columns_with_names[0].0.len() > 0 { - let mut row_predicate = AlwaysTrue; - for &mut (ref mut column, ref field_name) in &mut datum_columns_with_names { + let mut key_values = Vec::with_capacity(datum_columns_with_names.len()); + for (column, _, _) in &mut datum_columns_with_names { if let Some(item) = column.next() { - let cell_predicate = if let Some(datum) = item? { - Reference::new(field_name.clone()).equal_to(datum.clone()) - } else { - Reference::new(field_name.clone()).is_null() - }; - row_predicate = row_predicate.and(cell_predicate) + key_values.push(item?); } } - row_predicates.push(row_predicate.not().rewrite_not()); + delete_set.keys.insert(EqDeleteKey(key_values)); } } - // All row predicates are combined to a single predicate by creating a balanced binary tree. - // Using a simple fold would result in a deeply nested predicate that can cause a stack overflow. - while row_predicates.len() > 1 { - let mut next_level = Vec::with_capacity(row_predicates.len().div_ceil(2)); - let mut iter = row_predicates.into_iter(); - while let Some(p1) = iter.next() { - if let Some(p2) = iter.next() { - next_level.push(p1.and(p2)); - } else { - next_level.push(p1); - } - } - row_predicates = next_level; - } - - match row_predicates.pop() { - Some(p) => Ok(p), - None => Ok(AlwaysTrue), - } + Ok(eq_delete_set.unwrap_or_else(|| EqDeleteSet::new(Vec::new()))) } } struct EqDelColumnProcessor<'a> { equality_ids: &'a HashSet, - collected_columns: Vec<(ArrayRef, String, Type)>, + collected_columns: Vec<(ArrayRef, String, i32, Type)>, } impl<'a> EqDelColumnProcessor<'a> { @@ -441,6 +474,7 @@ impl<'a> EqDelColumnProcessor<'a> { } } + /// Produces per-column Datum iterators alongside (field_name, field_id) metadata. #[allow(clippy::type_complexity)] fn finish( self, @@ -448,11 +482,12 @@ impl<'a> EqDelColumnProcessor<'a> { Vec<( Box>>>, String, + i32, )>, > { self.collected_columns .into_iter() - .map(|(array, field_name, field_type)| { + .map(|(array, field_name, field_id, field_type)| { let primitive_type = field_type .as_primitive_type() .ok_or_else(|| { @@ -477,7 +512,7 @@ impl<'a> EqDelColumnProcessor<'a> { .transpose() })); - Ok((datum_iterator, field_name)) + Ok((datum_iterator, field_name, field_id)) }) .collect::>>() } @@ -495,6 +530,7 @@ impl SchemaWithPartnerVisitor for EqDelColumnProcessor<'_> { self.collected_columns.push(( partner.clone(), field.name.clone(), + field.id, field.field_type.as_ref().clone(), )); } @@ -629,11 +665,58 @@ mod tests { ) .await .expect("error parsing batch stream"); - println!("{parsed_eq_delete}"); - let expected = "(((((y != 1) OR (z != 100)) OR (a != \"HELP\")) OR (sa != 4)) OR (b != 62696E6172795F64617461)) AND (((((y != 2) OR (z IS NOT NULL)) OR (a IS NOT NULL)) OR (sa != 5)) OR (b IS NOT NULL))".to_string(); + // The delete file has 2 rows, so we expect 2 keys in the set + assert_eq!(parsed_eq_delete.keys.len(), 2); + + // Field metadata should list the 5 equality columns (y, z, a, sa, b) + assert_eq!(parsed_eq_delete.fields.len(), 5); + let field_names: Vec<&str> = parsed_eq_delete + .fields + .iter() + .map(|(n, _)| n.as_str()) + .collect(); + assert!(field_names.contains(&"y")); + assert!(field_names.contains(&"z")); + assert!(field_names.contains(&"a")); + assert!(field_names.contains(&"sa")); + assert!(field_names.contains(&"b")); + + // Row 1: y=1, z=100, a="HELP", sa=4, b=binary_data + let row1 = EqDeleteKey(vec![ + Some(Datum::long(1)), + Some(Datum::long(100)), + Some(Datum::string("HELP")), + Some(Datum::int(4)), + Some(Datum::binary(b"binary_data".to_vec())), + ]); + assert!( + parsed_eq_delete.keys.contains(&row1), + "Row 1 should be in delete set" + ); + + // Row 2: y=2, z=NULL, a=NULL, sa=5, b=NULL + let row2 = EqDeleteKey(vec![ + Some(Datum::long(2)), + None, + None, + Some(Datum::int(5)), + None, + ]); + assert!( + parsed_eq_delete.keys.contains(&row2), + "Row 2 should be in delete set" + ); - assert_eq!(parsed_eq_delete.to_string(), expected); + // A non-existent key should not be in the set + let non_existent = EqDeleteKey(vec![ + Some(Datum::long(999)), + Some(Datum::long(0)), + Some(Datum::string("NOPE")), + Some(Datum::int(0)), + Some(Datum::binary(b"nope".to_vec())), + ]); + assert!(!parsed_eq_delete.keys.contains(&non_existent)); } /// Create a simple field with metadata. @@ -955,13 +1038,19 @@ mod tests { // Verify both delete types can be processed together let result = delete_filter - .build_equality_delete_predicate(&file_scan_task) + .build_equality_delete_sets(&file_scan_task) .await; assert!( result.is_ok(), - "Failed to build equality delete predicate: {:?}", + "Failed to build equality delete sets: {:?}", result.err() ); + // The equality delete sets should contain delete keys + let eq_sets = result.unwrap(); + assert!( + !eq_sets.is_empty(), + "Expected at least one equality delete set" + ); } #[tokio::test] diff --git a/crates/iceberg/src/arrow/delete_filter.rs b/crates/iceberg/src/arrow/delete_filter.rs index 6369938ce2..8e0e002906 100644 --- a/crates/iceberg/src/arrow/delete_filter.rs +++ b/crates/iceberg/src/arrow/delete_filter.rs @@ -21,9 +21,8 @@ use std::sync::{Arc, Mutex, RwLock}; use tokio::sync::Notify; use tokio::sync::oneshot::Receiver; +use super::caching_delete_file_loader::EqDeleteSet; use crate::delete_vector::DeleteVector; -use crate::expr::Predicate::AlwaysTrue; -use crate::expr::{Bind, BoundPredicate, Predicate}; use crate::scan::{FileScanTask, FileScanTaskDeleteFile}; use crate::spec::DataContentType; use crate::{Error, ErrorKind, Result}; @@ -31,7 +30,7 @@ use crate::{Error, ErrorKind, Result}; #[derive(Debug)] enum EqDelState { Loading(Arc), - Loaded(Predicate), + Loaded(Arc), } /// State tracking for positional delete files. @@ -148,17 +147,18 @@ impl DeleteFilter { } } - /// Retrieve the equality delete predicate for a given eq delete file path - pub(crate) async fn get_equality_delete_predicate_for_delete_file_path( + /// Retrieve the equality delete set for a given eq delete file path. + /// Waits asynchronously if the set is still being loaded. + pub(crate) async fn get_equality_delete_set_for_delete_file_path( &self, file_path: &str, - ) -> Option { + ) -> Option> { let notifier = { match self.state.read().unwrap().equality_deletes.get(file_path) { None => return None, Some(EqDelState::Loading(notifier)) => notifier.clone(), - Some(EqDelState::Loaded(predicate)) => { - return Some(predicate.clone()); + Some(EqDelState::Loaded(eq_delete_set)) => { + return Some(eq_delete_set.clone()); } } }; @@ -166,50 +166,73 @@ impl DeleteFilter { notifier.notified().await; match self.state.read().unwrap().equality_deletes.get(file_path) { - Some(EqDelState::Loaded(predicate)) => Some(predicate.clone()), + Some(EqDelState::Loaded(eq_delete_set)) => Some(eq_delete_set.clone()), _ => unreachable!("Cannot be any other state than loaded"), } } - /// Builds eq delete predicate for the provided task. - pub(crate) async fn build_equality_delete_predicate( + /// Builds equality delete sets for the provided task. + /// + /// Returns a list of delete sets, one per distinct `equality_ids` group. + /// Most tables use a single `equality_ids` set, so this typically returns + /// zero or one element. Multiple elements occur only when different delete + /// files on the same partition use different equality column sets. + /// + /// When only one delete file applies for a group, returns the cached `Arc` + /// directly — no deep clone of the hash set. + pub(crate) async fn build_equality_delete_sets( &self, file_scan_task: &FileScanTask, - ) -> Result> { - // * Filter the task's deletes into just the Equality deletes - // * Retrieve the unbound predicate for each from self.state.equality_deletes - // * Logical-AND them all together to get a single combined `Predicate` - // * Bind the predicate to the task's schema to get a `BoundPredicate` + ) -> Result>> { + // Collect all applicable equality delete sets, reusing cached Arcs. + // Group by field layout so we only union sets with matching columns. + let mut groups: HashMap, Vec>> = HashMap::new(); - let mut combined_predicate = AlwaysTrue; for delete in &file_scan_task.deletes { if !is_equality_delete(delete) { continue; } - let Some(predicate) = self - .get_equality_delete_predicate_for_delete_file_path(&delete.file_path) + let Some(eq_set) = self + .get_equality_delete_set_for_delete_file_path(&delete.file_path) .await else { return Err(Error::new( ErrorKind::Unexpected, format!( - "Missing predicate for equality delete file '{}'", + "Missing equality delete set for file '{}'", delete.file_path ), )); }; - combined_predicate = combined_predicate.and(predicate); + if !eq_set.is_empty() { + groups + .entry(eq_set.fields.clone()) + .or_default() + .push(eq_set); + } } - if combined_predicate == AlwaysTrue { - return Ok(None); + // For each group, union all sets into one. + let mut result = Vec::with_capacity(groups.len()); + for (_fields, sets) in groups { + match sets.len() { + 0 => {} + // Single file in group: return the cached Arc directly. + 1 => result.push(sets.into_iter().next().unwrap()), + // Multiple files with same fields: union into a new set. + _ => { + let mut combined = (*sets[0]).clone(); + for set in &sets[1..] { + combined.union(set); + } + result.push(Arc::new(combined)); + } + } } - let bound_predicate = combined_predicate - .bind(file_scan_task.schema.clone(), file_scan_task.case_sensitive)?; - Ok(Some(bound_predicate)) + Ok(result) } pub(crate) fn upsert_delete_vector( @@ -232,7 +255,7 @@ impl DeleteFilter { pub(crate) fn insert_equality_delete( &self, delete_file_path: &str, - eq_del: Receiver, + eq_del: Receiver>, ) { let notify = Arc::new(Notify::new()); { @@ -276,8 +299,9 @@ pub(crate) mod tests { use tempfile::TempDir; use super::*; - use crate::arrow::caching_delete_file_loader::CachingDeleteFileLoader; - use crate::expr::Reference; + use crate::arrow::caching_delete_file_loader::{ + CachingDeleteFileLoader, EqDeleteKey, EqDeleteSet, + }; use crate::io::FileIO; use crate::spec::{DataFileFormat, Datum, NestedField, PrimitiveType, Schema, Type}; @@ -468,18 +492,17 @@ pub(crate) mod tests { } #[tokio::test] - async fn test_build_equality_delete_predicate_case_sensitive() { + async fn test_build_equality_delete_set_unions_multiple_files() { let schema = Arc::new( Schema::builder() .with_schema_id(1) .with_fields(vec![ - NestedField::required(1, "Id", Type::Primitive(PrimitiveType::Long)).into(), + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(), ]) .build() .unwrap(), ); - // ---------- fake FileScanTask ---------- let task = FileScanTask { file_size_in_bytes: 0, start: 0, @@ -488,15 +511,24 @@ pub(crate) mod tests { data_file_path: "data.parquet".to_string(), data_file_format: crate::spec::DataFileFormat::Parquet, schema: schema.clone(), - project_field_ids: vec![], + project_field_ids: vec![1], predicate: None, - deletes: vec![FileScanTaskDeleteFile { - file_path: "eq-del.parquet".to_string(), - file_size_in_bytes: 1, // never read; this test fails before opening the file - file_type: DataContentType::EqualityDeletes, - partition_spec_id: 0, - equality_ids: None, - }], + deletes: vec![ + FileScanTaskDeleteFile { + file_path: "eq-del-1.parquet".to_string(), + file_size_in_bytes: 1, + file_type: DataContentType::EqualityDeletes, + partition_spec_id: 0, + equality_ids: Some(vec![1]), + }, + FileScanTaskDeleteFile { + file_path: "eq-del-2.parquet".to_string(), + file_size_in_bytes: 1, + file_type: DataContentType::EqualityDeletes, + partition_spec_id: 0, + equality_ids: Some(vec![1]), + }, + ], partition: None, partition_spec: None, name_mapping: None, @@ -505,20 +537,148 @@ pub(crate) mod tests { let filter = DeleteFilter::default(); - // ---------- insert equality delete predicate ---------- - let pred = Reference::new("id").equal_to(Datum::long(10)); + // Insert two equality delete sets with different keys + let mut set1 = EqDeleteSet { + keys: std::collections::HashSet::new(), + fields: vec![("id".to_string(), 1)], + }; + set1.keys.insert(EqDeleteKey(vec![Some(Datum::long(10))])); + set1.keys.insert(EqDeleteKey(vec![Some(Datum::long(20))])); - let (tx, rx) = tokio::sync::oneshot::channel(); - filter.insert_equality_delete("eq-del.parquet", rx); + let mut set2 = EqDeleteSet { + keys: std::collections::HashSet::new(), + fields: vec![("id".to_string(), 1)], + }; + set2.keys.insert(EqDeleteKey(vec![Some(Datum::long(30))])); - tx.send(pred).unwrap(); + let (tx1, rx1) = tokio::sync::oneshot::channel(); + filter.insert_equality_delete("eq-del-1.parquet", rx1); + tx1.send(Arc::new(set1)).unwrap(); - // ---------- should FAIL ---------- - let result = filter.build_equality_delete_predicate(&task).await; + let (tx2, rx2) = tokio::sync::oneshot::channel(); + filter.insert_equality_delete("eq-del-2.parquet", rx2); + tx2.send(Arc::new(set2)).unwrap(); + // Small delay to allow the spawned tasks to complete + tokio::time::sleep(tokio::time::Duration::from_millis(50)).await; + + let result = filter.build_equality_delete_sets(&task).await; + assert!(result.is_ok()); + + let eq_sets = result.unwrap(); + // Same equality_ids → unioned into one set + assert_eq!(eq_sets.len(), 1); + let eq_set = &eq_sets[0]; + // Union of {10, 20} and {30} should contain all three + assert_eq!(eq_set.keys.len(), 3); + assert!( + eq_set + .keys + .contains(&EqDeleteKey(vec![Some(Datum::long(10))])) + ); + assert!( + eq_set + .keys + .contains(&EqDeleteKey(vec![Some(Datum::long(20))])) + ); assert!( - result.is_err(), - "case_sensitive=true should fail when column case mismatches" + eq_set + .keys + .contains(&EqDeleteKey(vec![Some(Datum::long(30))])) + ); + } + + /// Delete files with different equality_ids must NOT be unioned — they + /// produce separate sets, each applied independently. + #[tokio::test] + async fn test_build_equality_delete_sets_different_equality_ids() { + let schema = Arc::new( + Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(), + NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(), + ]) + .build() + .unwrap(), + ); + + let task = FileScanTask { + file_size_in_bytes: 0, + start: 0, + length: 0, + record_count: None, + data_file_path: "data.parquet".to_string(), + data_file_format: crate::spec::DataFileFormat::Parquet, + schema: schema.clone(), + project_field_ids: vec![1, 2], + predicate: None, + deletes: vec![ + FileScanTaskDeleteFile { + file_path: "eq-del-by-id.parquet".to_string(), + file_size_in_bytes: 1, + file_type: DataContentType::EqualityDeletes, + partition_spec_id: 0, + equality_ids: Some(vec![1]), + }, + FileScanTaskDeleteFile { + file_path: "eq-del-by-name.parquet".to_string(), + file_size_in_bytes: 1, + file_type: DataContentType::EqualityDeletes, + partition_spec_id: 0, + equality_ids: Some(vec![2]), + }, + ], + partition: None, + partition_spec: None, + name_mapping: None, + case_sensitive: true, + }; + + let filter = DeleteFilter::default(); + + // Delete file 1: delete by id + let mut set_by_id = EqDeleteSet { + keys: std::collections::HashSet::new(), + fields: vec![("id".to_string(), 1)], + }; + set_by_id + .keys + .insert(EqDeleteKey(vec![Some(Datum::long(10))])); + + // Delete file 2: delete by name + let mut set_by_name = EqDeleteSet { + keys: std::collections::HashSet::new(), + fields: vec![("name".to_string(), 2)], + }; + set_by_name + .keys + .insert(EqDeleteKey(vec![Some(Datum::string("alice"))])); + + let (tx1, rx1) = tokio::sync::oneshot::channel(); + filter.insert_equality_delete("eq-del-by-id.parquet", rx1); + tx1.send(Arc::new(set_by_id)).unwrap(); + + let (tx2, rx2) = tokio::sync::oneshot::channel(); + filter.insert_equality_delete("eq-del-by-name.parquet", rx2); + tx2.send(Arc::new(set_by_name)).unwrap(); + + tokio::time::sleep(tokio::time::Duration::from_millis(50)).await; + + let eq_sets = filter + .build_equality_delete_sets(&task) + .await + .expect("should succeed"); + + // Different equality_ids → two separate sets, NOT unioned + assert_eq!( + eq_sets.len(), + 2, + "Delete files with different equality_ids must produce separate sets" ); + + // Each set should have exactly one key + let key_counts: Vec = eq_sets.iter().map(|s| s.keys.len()).collect(); + assert!(key_counts.contains(&1)); } } diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index 488f41cf20..4fbbd1b676 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -29,6 +29,7 @@ use arrow_ord::cmp::{eq, gt, gt_eq, lt, lt_eq, neq}; use arrow_schema::{ ArrowError, DataType, FieldRef, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, }; +use arrow_select::filter::filter_record_batch; use arrow_string::like::starts_with; use bytes::Bytes; use fnv::FnvHashSet; @@ -45,10 +46,10 @@ use parquet::file::metadata::{ use parquet::schema::types::{SchemaDescriptor, Type as ParquetType}; use typed_builder::TypedBuilder; -use crate::arrow::caching_delete_file_loader::CachingDeleteFileLoader; +use crate::arrow::caching_delete_file_loader::{CachingDeleteFileLoader, EqDeleteKey, EqDeleteSet}; use crate::arrow::int96::coerce_int96_timestamps; use crate::arrow::record_batch_transformer::RecordBatchTransformerBuilder; -use crate::arrow::{arrow_schema_to_schema, get_arrow_datum}; +use crate::arrow::{arrow_primitive_to_literal, arrow_schema_to_schema, get_arrow_datum}; use crate::delete_vector::DeleteVector; use crate::error::Result; use crate::expr::visitors::bound_predicate_visitor::{BoundPredicateVisitor, visit}; @@ -462,36 +463,21 @@ impl ArrowReader { } let delete_filter = delete_filter_rx.await.unwrap()?; - let delete_predicate = delete_filter.build_equality_delete_predicate(&task).await?; - - // In addition to the optional predicate supplied in the `FileScanTask`, - // we also have an optional predicate resulting from equality delete files. - // If both are present, we logical-AND them together to form a single filter - // predicate that we can pass to the `RecordBatchStreamBuilder`. - let final_predicate = match (&task.predicate, delete_predicate) { - (None, None) => None, - (Some(predicate), None) => Some(predicate.clone()), - (None, Some(ref predicate)) => Some(predicate.clone()), - (Some(filter_predicate), Some(delete_predicate)) => { - Some(filter_predicate.clone().and(delete_predicate)) - } - }; + let eq_delete_sets = delete_filter.build_equality_delete_sets(&task).await?; + + // The scan predicate (if any) is applied via the Parquet RowFilter. + // Equality deletes are applied as a separate post-read filter step using + // a HashSet for O(1) per-row lookups instead of O(N) predicate evaluation. + let final_predicate = task.predicate.clone(); - // There are three possible sources for potential lists of selected RowGroup indices, - // and two for `RowSelection`s. - // Selected RowGroup index lists can come from three sources: + // Selected RowGroup index lists can come from two sources: // * When task.start and task.length specify a byte range (file splitting); - // * When there are equality delete files that are applicable; // * When there is a scan predicate and row_group_filtering_enabled = true. // `RowSelection`s can be created in either or both of the following cases: // * When there are positional delete files that are applicable; // * When there is a scan predicate and row_selection_enabled = true - // Note that row group filtering from predicates only happens when - // there is a scan predicate AND row_group_filtering_enabled = true, - // but we perform row selection filtering if there are applicable - // equality delete files OR (there is a scan predicate AND row_selection_enabled), - // since the only implemented method of applying positional deletes is - // by using a `RowSelection`. + // Equality deletes are applied as a post-read hash-based filter (not via + // RowFilter or RowSelection) for O(1) per-row lookups. let mut selected_row_group_indices = None; let mut row_selection = None; @@ -600,7 +586,112 @@ impl ArrowReader { Err(err) => Err(err.into()), }); - Ok(Box::pin(record_batch_stream) as ArrowRecordBatchStream) + // Apply equality delete filtering as a post-read step using hash-based + // lookups. This runs after decompression and the record batch transformer, + // checking each row against each delete set in O(1) per row. + // Multiple sets occur only when delete files use different equality_ids. + if eq_delete_sets.is_empty() { + Ok(Box::pin(record_batch_stream) as ArrowRecordBatchStream) + } else { + let filtered_stream = record_batch_stream.map(move |batch_result| { + let mut batch = batch_result?; + for eq_delete_set in &eq_delete_sets { + batch = Self::apply_eq_delete_filter(&batch, eq_delete_set)?; + } + Ok(batch) + }); + Ok(Box::pin(filtered_stream) as ArrowRecordBatchStream) + } + } + + /// Filters a record batch by removing rows whose equality-delete key columns + /// match an entry in the delete set. Uses O(1) hash lookups per row. + fn apply_eq_delete_filter( + batch: &RecordBatch, + delete_set: &EqDeleteSet, + ) -> Result { + // Locate delete key columns in the batch by field_id (stored in Arrow + // field metadata under the "PARQUET:field_id" key). + // For each delete key field, locate the corresponding column in the batch + // and convert it to a Vec> for hash-based lookups. + let datum_columns: Vec>> = delete_set + .fields + .iter() + .map(|(field_name, field_id)| { + // Find the column by field_id in the batch schema metadata, + // falling back to name-based lookup. + let col = batch + .schema() + .fields() + .iter() + .enumerate() + .find_map(|(col_idx, field)| { + let id = field + .metadata() + .get(PARQUET_FIELD_ID_META_KEY)? + .parse::() + .ok()?; + (id == *field_id).then(|| batch.column(col_idx)) + }) + .map(Ok) + .unwrap_or_else(|| { + batch.schema().index_of(field_name).map(|idx| batch.column(idx)).map_err(|_| { + Error::new( + ErrorKind::Unexpected, + format!( + "Equality delete key column '{}' (field_id={}) not found in batch", + field_name, field_id + ), + ) + }) + })?; + // Resolve the Iceberg PrimitiveType from the Arrow data type. + let iceberg_type = + crate::arrow::arrow_type_to_type(col.data_type())?; + let literals = arrow_primitive_to_literal(col, &iceberg_type)?; + // Convert Literal → Datum + let primitive_type = iceberg_type + .as_primitive_type() + .ok_or_else(|| { + Error::new(ErrorKind::Unexpected, "field is not a primitive type") + })? + .clone(); + let datums = literals + .into_iter() + .map(|opt_lit| { + opt_lit + .and_then(|lit| lit.as_primitive_literal()) + .map(|prim_lit| Datum::new(primitive_type.clone(), prim_lit)) + }) + .collect::>(); + Ok(datums) + }) + .collect::>>()?; + + let num_rows = batch.num_rows(); + let num_cols = datum_columns.len(); + let mut keep = vec![true; num_rows]; + + // Reuse a single EqDeleteKey allocation across all rows to avoid + // per-row Vec allocation + clone. We swap in new values each iteration. + let mut probe_key = EqDeleteKey(vec![None; num_cols]); + + for row_idx in 0..num_rows { + for (col_idx, col) in datum_columns.iter().enumerate() { + probe_key.0[col_idx].clone_from(&col[row_idx]); + } + if delete_set.keys.contains(&probe_key) { + keep[row_idx] = false; + } + } + + let mask = BooleanArray::from(keep); + filter_record_batch(batch, &mask).map_err(|e| { + Error::new( + ErrorKind::Unexpected, + format!("Failed to filter record batch: {e}"), + ) + }) } /// Opens a Parquet file and loads its metadata, returning both the reader and metadata. From 72bbff2fe5a3d66d237eb37a698238fdcef9867d Mon Sep 17 00:00:00 2001 From: Ravid Gontov Date: Sun, 19 Apr 2026 00:49:54 +0300 Subject: [PATCH 2/2] fix(reader): auto-include equality delete key columns in projection When a user scans with `.select(["col_a", "col_b"])` and the table has merge-on-read equality delete files keyed on a column NOT in the select list (e.g. `id`), the HashSet-based `apply_eq_delete_filter` fails with: Equality delete key column 'id' (field_id=1) not found in batch The fix augments the Parquet projection mask and RecordBatchTransformer with any equality delete key field IDs that are missing from the user's projection. After applying equality deletes, the extra columns are stripped from the output batches so the user sees only their requested columns. This matches the behavior of Spark, Flink, and Trino, which transparently widen the internal projection for delete evaluation. --- crates/iceberg/src/arrow/reader.rs | 464 ++++++++++++++++++++++++++++- 1 file changed, 452 insertions(+), 12 deletions(-) diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index 4fbbd1b676..766f3aae45 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -17,7 +17,7 @@ //! Parquet file data reader -use std::collections::{HashMap, HashSet}; +use std::collections::{BTreeSet, HashMap, HashSet}; use std::ops::Range; use std::str::FromStr; use std::sync::Arc; @@ -59,7 +59,7 @@ use crate::expr::{BoundPredicate, BoundReference}; use crate::io::{FileIO, FileMetadata, FileRead}; use crate::metadata_columns::{RESERVED_FIELD_ID_FILE, is_metadata_field}; use crate::scan::{ArrowRecordBatchStream, FileScanTask, FileScanTaskStream}; -use crate::spec::{Datum, NameMapping, NestedField, PrimitiveType, Schema, Type}; +use crate::spec::{DataContentType, Datum, NameMapping, NestedField, PrimitiveType, Schema, Type}; use crate::util::available_parallelism; use crate::{Error, ErrorKind}; @@ -421,12 +421,41 @@ impl ArrowReader { .copied() .collect(); - // Create projection mask based on field IDs - // - If file has embedded IDs: field-ID-based projection (missing_field_ids=false) - // - If name mapping applied: field-ID-based projection (missing_field_ids=true but IDs now match) - // - If fallback IDs: position-based projection (missing_field_ids=true) + // Collect equality delete key field IDs from the task's delete files. + // These may reference columns NOT in the user's projection. We must + // include them in the Parquet read so equality deletes can be applied, + // then strip them from the output batches afterward. + let eq_delete_key_field_ids: BTreeSet = task + .deletes + .iter() + .filter(|d| matches!(d.file_type, DataContentType::EqualityDeletes)) + .filter_map(|d| d.equality_ids.as_ref()) + .flatten() + .copied() + .collect(); + + // Augment the Parquet projection with any equality delete key columns + // that the user didn't request. Guard: when the user's projection is + // empty, ProjectionMask::all() reads all columns — no augmentation needed. + let augmented_field_ids: Vec = if !eq_delete_key_field_ids.is_empty() + && !project_field_ids_without_metadata.is_empty() + { + let user_set: HashSet = + project_field_ids_without_metadata.iter().copied().collect(); + let mut augmented = project_field_ids_without_metadata.clone(); + for &id in &eq_delete_key_field_ids { + if !user_set.contains(&id) && !is_metadata_field(id) { + augmented.push(id); + } + } + augmented + } else { + project_field_ids_without_metadata.clone() + }; + + // Create projection mask based on field IDs (augmented with eq delete keys) let projection_mask = Self::get_arrow_projection_mask( - &project_field_ids_without_metadata, + &augmented_field_ids, &task.schema, record_batch_stream_builder.parquet_schema(), record_batch_stream_builder.schema(), @@ -438,9 +467,25 @@ impl ArrowReader { // RecordBatchTransformer performs any transformations required on the RecordBatches // that come back from the file, such as type promotion, default column insertion, - // column re-ordering, partition constants, and virtual field addition (like _file) + // column re-ordering, partition constants, and virtual field addition (like _file). + // When equality delete key columns were added to the projection, the transformer + // must also know about them so it can apply type promotion correctly. + let transformer_field_ids: Vec = + if !eq_delete_key_field_ids.is_empty() && !task.project_field_ids.is_empty() { + let user_set: HashSet = task.project_field_ids.iter().copied().collect(); + let mut ids = task.project_field_ids.to_vec(); + for &id in &eq_delete_key_field_ids { + if !user_set.contains(&id) { + ids.push(id); + } + } + ids + } else { + task.project_field_ids.to_vec() + }; + let mut record_batch_transformer_builder = - RecordBatchTransformerBuilder::new(task.schema_ref(), task.project_field_ids()); + RecordBatchTransformerBuilder::new(task.schema_ref(), &transformer_field_ids); // Add the _file metadata column if it's in the projected fields if task.project_field_ids().contains(&RESERVED_FIELD_ID_FILE) { @@ -590,17 +635,42 @@ impl ArrowReader { // lookups. This runs after decompression and the record batch transformer, // checking each row against each delete set in O(1) per row. // Multiple sets occur only when delete files use different equality_ids. + // + // If we augmented the projection with equality delete key columns that + // the user didn't request, strip those extra columns after applying + // deletes so the output schema matches the user's original projection. + let extra_eq_cols_to_strip = if !eq_delete_key_field_ids.is_empty() + && !task.project_field_ids.is_empty() + && eq_delete_key_field_ids + .iter() + .any(|id| !task.project_field_ids.contains(id)) + { + task.project_field_ids.len() + } else { + 0 + }; + if eq_delete_sets.is_empty() { - Ok(Box::pin(record_batch_stream) as ArrowRecordBatchStream) + if extra_eq_cols_to_strip > 0 { + let stripped = record_batch_stream.map(move |batch_result| { + Self::strip_extra_columns(batch_result?, extra_eq_cols_to_strip) + }); + Ok(Box::pin(stripped) as ArrowRecordBatchStream) + } else { + Ok(Box::pin(record_batch_stream) as ArrowRecordBatchStream) + } } else { - let filtered_stream = record_batch_stream.map(move |batch_result| { + let filtered = record_batch_stream.map(move |batch_result| { let mut batch = batch_result?; for eq_delete_set in &eq_delete_sets { batch = Self::apply_eq_delete_filter(&batch, eq_delete_set)?; } + if extra_eq_cols_to_strip > 0 { + batch = Self::strip_extra_columns(batch, extra_eq_cols_to_strip)?; + } Ok(batch) }); - Ok(Box::pin(filtered_stream) as ArrowRecordBatchStream) + Ok(Box::pin(filtered) as ArrowRecordBatchStream) } } @@ -694,6 +764,21 @@ impl ArrowReader { }) } + /// Strips columns beyond `num_cols_to_keep` from the batch. + /// + /// Used to remove equality delete key columns that were added to the + /// projection solely for delete evaluation. The extra columns are always + /// appended at the end by the augmentation logic in `process_file_scan_task`. + fn strip_extra_columns(batch: RecordBatch, num_cols_to_keep: usize) -> Result { + let indices: Vec = (0..num_cols_to_keep).collect(); + batch.project(&indices).map_err(|e| { + Error::new( + ErrorKind::Unexpected, + format!("stripping eq delete key columns: {e}"), + ) + }) + } + /// Opens a Parquet file and loads its metadata, returning both the reader and metadata. /// The reader can be reused to build a `ParquetRecordBatchStreamBuilder` without /// reopening the file. @@ -5555,4 +5640,359 @@ message schema { ts_array.value(0) ); } + + // ========================================================================= + // Equality delete + column projection tests + // + // Verify that equality deletes work correctly when the user's column + // projection does not include the equality delete key columns. + // ========================================================================= + + /// Helper: create a 3-column data file and an equality delete Parquet file. + /// + /// Data: 5 rows — id(1..=5), name(a..e), value(10,20,30,40,50). + /// Delete file schema uses the caller-provided Arrow schema + batches. + fn create_eq_delete_test_fixtures( + table_location: &str, + delete_batches: Vec, + ) -> (String, String, SchemaRef) { + use arrow_array::Int32Array; + + let table_schema: SchemaRef = Arc::new( + Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(), + NestedField::required(3, "value", Type::Primitive(PrimitiveType::Int)).into(), + ]) + .build() + .unwrap(), + ); + + let data_arrow_schema = Arc::new(ArrowSchema::new(vec![ + Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "1".to_string(), + )])), + Field::new("name", DataType::Utf8, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "2".to_string(), + )])), + Field::new("value", DataType::Int32, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "3".to_string(), + )])), + ])); + + let data_file_path = format!("{table_location}/data.parquet"); + let data_batch = RecordBatch::try_new(data_arrow_schema.clone(), vec![ + Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5])), + Arc::new(StringArray::from(vec!["a", "b", "c", "d", "e"])), + Arc::new(Int32Array::from(vec![10, 20, 30, 40, 50])), + ]) + .unwrap(); + + let props = WriterProperties::builder() + .set_compression(Compression::SNAPPY) + .build(); + let file = File::create(&data_file_path).unwrap(); + let mut writer = + ArrowWriter::try_new(file, data_arrow_schema.clone(), Some(props)).unwrap(); + writer.write(&data_batch).unwrap(); + writer.close().unwrap(); + + let delete_file_path = format!("{table_location}/eq_deletes.parquet"); + let delete_schema = delete_batches[0].schema(); + let delete_props = WriterProperties::builder() + .set_compression(Compression::SNAPPY) + .build(); + let delete_file = File::create(&delete_file_path).unwrap(); + let mut delete_writer = + ArrowWriter::try_new(delete_file, delete_schema, Some(delete_props)).unwrap(); + for batch in &delete_batches { + delete_writer.write(batch).unwrap(); + } + delete_writer.close().unwrap(); + + (data_file_path, delete_file_path, table_schema) + } + + /// Helper: build a single-column delete batch for field "id" (field_id=1). + fn eq_delete_batch_for_id(ids: Vec) -> RecordBatch { + use arrow_array::Int32Array; + let schema = Arc::new(ArrowSchema::new(vec![ + Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "1".to_string(), + )])), + ])); + RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(ids))]).unwrap() + } + + /// Test A: Projected scan EXCLUDES the equality delete key column. + #[tokio::test] + async fn test_eq_delete_projection_excludes_delete_key() { + let tmp_dir = TempDir::new().unwrap(); + let loc = tmp_dir.path().to_str().unwrap().to_string(); + + let (data_path, del_path, schema) = + create_eq_delete_test_fixtures(&loc, vec![eq_delete_batch_for_id(vec![3])]); + + let reader = ArrowReaderBuilder::new(FileIO::new_with_fs()).build(); + + let task = FileScanTask { + file_size_in_bytes: std::fs::metadata(&data_path).unwrap().len(), + start: 0, + length: 0, + record_count: Some(5), + data_file_path: data_path, + data_file_format: DataFileFormat::Parquet, + schema, + project_field_ids: vec![2, 3], // name, value — NOT id + predicate: None, + deletes: vec![FileScanTaskDeleteFile { + file_size_in_bytes: std::fs::metadata(&del_path).unwrap().len(), + file_path: del_path, + file_type: DataContentType::EqualityDeletes, + partition_spec_id: 0, + equality_ids: Some(vec![1]), + }], + partition: None, + partition_spec: None, + name_mapping: None, + case_sensitive: false, + }; + + let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream; + let result: Vec = reader.read(tasks).unwrap().try_collect().await.unwrap(); + + let total_rows: usize = result.iter().map(|b| b.num_rows()).sum(); + assert_eq!(total_rows, 4, "Expected 4 rows after deleting id=3"); + + let out_schema = result[0].schema(); + let col_names: Vec<&str> = out_schema + .fields() + .iter() + .map(|f| f.name().as_str()) + .collect(); + assert_eq!( + col_names, + vec!["name", "value"], + "Output must only contain projected columns" + ); + + let names: Vec = result + .iter() + .flat_map(|b| { + b.column(0) + .as_any() + .downcast_ref::() + .unwrap() + .iter() + .map(|s| s.unwrap().to_string()) + .collect::>() + }) + .collect(); + assert_eq!(names, vec!["a", "b", "d", "e"]); + } + + /// Test B: Projected scan INCLUDES the equality delete key column. + #[tokio::test] + async fn test_eq_delete_projection_includes_delete_key() { + use arrow_array::Int32Array; + + let tmp_dir = TempDir::new().unwrap(); + let loc = tmp_dir.path().to_str().unwrap().to_string(); + + let (data_path, del_path, schema) = + create_eq_delete_test_fixtures(&loc, vec![eq_delete_batch_for_id(vec![3])]); + + let reader = ArrowReaderBuilder::new(FileIO::new_with_fs()).build(); + + let task = FileScanTask { + file_size_in_bytes: std::fs::metadata(&data_path).unwrap().len(), + start: 0, + length: 0, + record_count: Some(5), + data_file_path: data_path, + data_file_format: DataFileFormat::Parquet, + schema, + project_field_ids: vec![1, 2, 3], // id, name, value — includes delete key + predicate: None, + deletes: vec![FileScanTaskDeleteFile { + file_size_in_bytes: std::fs::metadata(&del_path).unwrap().len(), + file_path: del_path, + file_type: DataContentType::EqualityDeletes, + partition_spec_id: 0, + equality_ids: Some(vec![1]), + }], + partition: None, + partition_spec: None, + name_mapping: None, + case_sensitive: false, + }; + + let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream; + let result: Vec = reader.read(tasks).unwrap().try_collect().await.unwrap(); + + let total_rows: usize = result.iter().map(|b| b.num_rows()).sum(); + assert_eq!(total_rows, 4); + + let out_schema = result[0].schema(); + let col_names: Vec<&str> = out_schema + .fields() + .iter() + .map(|f| f.name().as_str()) + .collect(); + assert_eq!(col_names, vec!["id", "name", "value"]); + + let ids: Vec = result + .iter() + .flat_map(|b| { + b.column(0) + .as_any() + .downcast_ref::() + .unwrap() + .iter() + .map(|v| v.unwrap()) + .collect::>() + }) + .collect(); + assert_eq!(ids, vec![1, 2, 4, 5]); + } + + /// Test C: Full scan (all fields projected) with equality deletes. + #[tokio::test] + async fn test_eq_delete_full_scan_no_projection() { + let tmp_dir = TempDir::new().unwrap(); + let loc = tmp_dir.path().to_str().unwrap().to_string(); + + let (data_path, del_path, schema) = + create_eq_delete_test_fixtures(&loc, vec![eq_delete_batch_for_id(vec![3])]); + + let reader = ArrowReaderBuilder::new(FileIO::new_with_fs()).build(); + + let task = FileScanTask { + file_size_in_bytes: std::fs::metadata(&data_path).unwrap().len(), + start: 0, + length: 0, + record_count: Some(5), + data_file_path: data_path, + data_file_format: DataFileFormat::Parquet, + schema, + project_field_ids: vec![1, 2, 3], + predicate: None, + deletes: vec![FileScanTaskDeleteFile { + file_size_in_bytes: std::fs::metadata(&del_path).unwrap().len(), + file_path: del_path, + file_type: DataContentType::EqualityDeletes, + partition_spec_id: 0, + equality_ids: Some(vec![1]), + }], + partition: None, + partition_spec: None, + name_mapping: None, + case_sensitive: false, + }; + + let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream; + let result: Vec = reader.read(tasks).unwrap().try_collect().await.unwrap(); + + let total_rows: usize = result.iter().map(|b| b.num_rows()).sum(); + assert_eq!(total_rows, 4); + + let out_schema = result[0].schema(); + let col_names: Vec<&str> = out_schema + .fields() + .iter() + .map(|f| f.name().as_str()) + .collect(); + assert_eq!(col_names, vec!["id", "name", "value"]); + } + + /// Test D: Multi-field equality delete, NEITHER key in projection. + #[tokio::test] + async fn test_eq_delete_multi_field_key_excluded_from_projection() { + use arrow_array::Int32Array; + + let tmp_dir = TempDir::new().unwrap(); + let loc = tmp_dir.path().to_str().unwrap().to_string(); + + // Delete keyed on BOTH id(1) AND name(2) + let del_schema = Arc::new(ArrowSchema::new(vec![ + Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "1".to_string(), + )])), + Field::new("name", DataType::Utf8, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "2".to_string(), + )])), + ])); + let del_batch = RecordBatch::try_new(del_schema, vec![ + Arc::new(Int32Array::from(vec![3])), + Arc::new(StringArray::from(vec!["c"])), + ]) + .unwrap(); + + let (data_path, del_path, schema) = create_eq_delete_test_fixtures(&loc, vec![del_batch]); + + let reader = ArrowReaderBuilder::new(FileIO::new_with_fs()).build(); + + let task = FileScanTask { + file_size_in_bytes: std::fs::metadata(&data_path).unwrap().len(), + start: 0, + length: 0, + record_count: Some(5), + data_file_path: data_path, + data_file_format: DataFileFormat::Parquet, + schema, + project_field_ids: vec![3], // only value — neither delete key + predicate: None, + deletes: vec![FileScanTaskDeleteFile { + file_size_in_bytes: std::fs::metadata(&del_path).unwrap().len(), + file_path: del_path, + file_type: DataContentType::EqualityDeletes, + partition_spec_id: 0, + equality_ids: Some(vec![1, 2]), + }], + partition: None, + partition_spec: None, + name_mapping: None, + case_sensitive: false, + }; + + let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream; + let result: Vec = reader.read(tasks).unwrap().try_collect().await.unwrap(); + + let total_rows: usize = result.iter().map(|b| b.num_rows()).sum(); + assert_eq!(total_rows, 4, "Expected 4 rows after deleting id=3,name=c"); + + let out_schema = result[0].schema(); + let col_names: Vec<&str> = out_schema + .fields() + .iter() + .map(|f| f.name().as_str()) + .collect(); + assert_eq!( + col_names, + vec!["value"], + "Output must only contain projected columns" + ); + + let values: Vec = result + .iter() + .flat_map(|b| { + b.column(0) + .as_any() + .downcast_ref::() + .unwrap() + .iter() + .map(|v| v.unwrap()) + .collect::>() + }) + .collect(); + assert_eq!(values, vec![10, 20, 40, 50]); + } }