diff --git a/crates/iceberg/src/arrow/delete_filter.rs b/crates/iceberg/src/arrow/delete_filter.rs index b853baa993..e029912bbe 100644 --- a/crates/iceberg/src/arrow/delete_filter.rs +++ b/crates/iceberg/src/arrow/delete_filter.rs @@ -35,11 +35,22 @@ enum EqDelState { } #[derive(Debug, Default)] -struct DeleteFileFilterState { +pub(crate) struct DeleteFileFilterState { delete_vectors: HashMap>>, equality_deletes: HashMap, } +impl DeleteFileFilterState { + pub fn delete_vectors(&self) -> &HashMap>> { + &self.delete_vectors + } + + /// Remove and return the delete vector for the given data file path. + pub fn remove_delete_vector(&mut self, path: &str) -> Option>> { + self.delete_vectors.remove(path) + } +} + #[derive(Clone, Debug, Default)] pub(crate) struct DeleteFilter { state: Arc>, @@ -65,6 +76,28 @@ impl DeleteFilter { .and_then(|st| st.delete_vectors.get(delete_file_path).cloned()) } + pub(crate) fn with_read(&self, f: F) -> Result + where F: FnOnce(&DeleteFileFilterState) -> Result { + let state = self.state.read().map_err(|e| { + Error::new( + ErrorKind::Unexpected, + format!("Failed to acquire read lock: {}", e), + ) + })?; + f(&state) + } + + pub(crate) fn with_write(&self, f: F) -> Result + where F: FnOnce(&mut DeleteFileFilterState) -> Result { + let mut state = self.state.write().map_err(|e| { + Error::new( + ErrorKind::Unexpected, + format!("Failed to acquire write lock: {}", e), + ) + })?; + f(&mut state) + } + pub(crate) fn try_start_eq_del_load(&self, file_path: &str) -> Option> { let mut state = self.state.write().unwrap(); diff --git a/crates/iceberg/src/arrow/incremental.rs b/crates/iceberg/src/arrow/incremental.rs new file mode 100644 index 0000000000..e7ca2521bf --- /dev/null +++ b/crates/iceberg/src/arrow/incremental.rs @@ -0,0 +1,271 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::pin::Pin; +use std::sync::Arc; + +use arrow_array::{RecordBatch, UInt64Array}; +use arrow_schema::{DataType, Field, Schema as ArrowSchema}; +use futures::channel::mpsc::channel; +use futures::stream::select; +use futures::{SinkExt, Stream, StreamExt, TryStreamExt}; + +use crate::arrow::record_batch_transformer::RecordBatchTransformer; +use crate::arrow::{ + ArrowReader, RESERVED_COL_NAME_FILE_PATH, RESERVED_FIELD_ID_FILE_PATH, StreamsInto, +}; +use crate::delete_vector::DeleteVector; +use crate::io::FileIO; +use crate::runtime::spawn; +use crate::scan::ArrowRecordBatchStream; +use crate::scan::incremental::{ + AppendedFileScanTask, IncrementalFileScanTask, IncrementalFileScanTaskStream, +}; +use crate::{Error, ErrorKind, Result}; + +/// The type of incremental batch: appended data or deleted records. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum IncrementalBatchType { + /// Appended records. + Append, + /// Deleted records. + Delete, +} + +/// The stream of incremental Arrow `RecordBatch`es with batch type. +pub type CombinedIncrementalBatchRecordStream = + Pin> + Send + 'static>>; + +/// Stream type for obtaining a separate stream of appended and deleted record batches. +pub type UnzippedIncrementalBatchRecordStream = (ArrowRecordBatchStream, ArrowRecordBatchStream); + +impl StreamsInto + for IncrementalFileScanTaskStream +{ + /// Takes a stream of `IncrementalFileScanTasks` and reads all the files. Returns a + /// stream of Arrow `RecordBatch`es containing the data from the files. + fn stream(self, reader: ArrowReader) -> Result { + let (appends, deletes) = + StreamsInto::::stream(self, reader)?; + + let left = appends.map(|res| res.map(|batch| (IncrementalBatchType::Append, batch))); + let right = deletes.map(|res| res.map(|batch| (IncrementalBatchType::Delete, batch))); + + Ok(Box::pin(select(left, right)) as CombinedIncrementalBatchRecordStream) + } +} + +impl StreamsInto + for IncrementalFileScanTaskStream +{ + /// Takes a stream of `IncrementalFileScanTasks` and reads all the files. Returns two + /// separate streams of Arrow `RecordBatch`es containing appended data and deleted records. + fn stream(self, reader: ArrowReader) -> Result { + let (appends_tx, appends_rx) = channel(reader.concurrency_limit_data_files); + let (deletes_tx, deletes_rx) = channel(reader.concurrency_limit_data_files); + + let batch_size = reader.batch_size; + let concurrency_limit_data_files = reader.concurrency_limit_data_files; + + spawn(async move { + let _ = self + .try_for_each_concurrent(concurrency_limit_data_files, |task| { + let file_io = reader.file_io.clone(); + let mut appends_tx = appends_tx.clone(); + let mut deletes_tx = deletes_tx.clone(); + async move { + match task { + IncrementalFileScanTask::Append(append_task) => { + spawn(async move { + let record_batch_stream = process_incremental_append_task( + append_task, + batch_size, + file_io, + ) + .await; + + match record_batch_stream { + Ok(mut stream) => { + while let Some(batch) = stream.next().await { + let result = appends_tx + .send(batch.map_err(|e| { + Error::new( + ErrorKind::Unexpected, + "failed to read appended record batch", + ) + .with_source(e) + })) + .await; + + if result.is_err() { + break; + } + } + } + Err(e) => { + let _ = appends_tx.send(Err(e)).await; + } + } + }); + } + IncrementalFileScanTask::Delete(file_path, delete_vector) => { + spawn(async move { + let record_batch_stream = process_incremental_delete_task( + file_path, + delete_vector, + batch_size, + ); + + match record_batch_stream { + Ok(mut stream) => { + while let Some(batch) = stream.next().await { + let result = deletes_tx + .send(batch.map_err(|e| { + Error::new( + ErrorKind::Unexpected, + "failed to read deleted record batch", + ) + .with_source(e) + })) + .await; + + if result.is_err() { + break; + } + } + } + Err(e) => { + let _ = deletes_tx.send(Err(e)).await; + } + } + }); + } + }; + + Ok(()) + } + }) + .await; + }); + + Ok(( + Box::pin(appends_rx) as ArrowRecordBatchStream, + Box::pin(deletes_rx) as ArrowRecordBatchStream, + )) + } +} + +async fn process_incremental_append_task( + task: AppendedFileScanTask, + batch_size: Option, + file_io: FileIO, +) -> Result { + let mut record_batch_stream_builder = ArrowReader::create_parquet_record_batch_stream_builder( + &task.data_file_path, + file_io, + true, + ) + .await?; + + // Create a projection mask for the batch stream to select which columns in the + // Parquet file that we want in the response + let projection_mask = ArrowReader::get_arrow_projection_mask( + &task.project_field_ids, + &task.schema_ref(), + record_batch_stream_builder.parquet_schema(), + record_batch_stream_builder.schema(), + )?; + record_batch_stream_builder = record_batch_stream_builder.with_projection(projection_mask); + + // RecordBatchTransformer performs any transformations required on the RecordBatches + // that come back from the file, such as type promotion, default column insertion + // and column re-ordering + let mut record_batch_transformer = + RecordBatchTransformer::build(task.schema_ref(), &task.project_field_ids); + + if let Some(batch_size) = batch_size { + record_batch_stream_builder = record_batch_stream_builder.with_batch_size(batch_size); + } + + // Apply positional deletes as row selections. + let row_selection = if let Some(positional_delete_indexes) = task.positional_deletes { + Some(ArrowReader::build_deletes_row_selection( + record_batch_stream_builder.metadata().row_groups(), + &None, + &positional_delete_indexes.lock().unwrap(), + )?) + } else { + None + }; + + if let Some(row_selection) = row_selection { + record_batch_stream_builder = record_batch_stream_builder.with_row_selection(row_selection); + } + + // Build the batch stream and send all the RecordBatches that it generates + // to the requester. + let record_batch_stream = record_batch_stream_builder + .build()? + .map(move |batch| match batch { + Ok(batch) => record_batch_transformer.process_record_batch(batch), + Err(err) => Err(err.into()), + }); + + Ok(Box::pin(record_batch_stream) as ArrowRecordBatchStream) +} + +fn process_incremental_delete_task( + file_path: String, + delete_vector: DeleteVector, + batch_size: Option, +) -> Result { + let schema = Arc::new(ArrowSchema::new(vec![Field::new( + "pos", + DataType::UInt64, + false, + )])); + + let batch_size = batch_size.unwrap_or(1024); + + let treemap = delete_vector.inner; + + let stream = futures::stream::iter(treemap) + .chunks(batch_size) + .map(move |chunk| { + let array = UInt64Array::from_iter(chunk); + RecordBatch::try_new( + Arc::clone(&schema), // Cheap Arc clone instead of full schema creation + vec![Arc::new(array)], + ) + .map_err(|_| { + Error::new( + ErrorKind::Unexpected, + "Failed to create RecordBatch for DeleteVector", + ) + }) + .and_then(|batch| { + ArrowReader::add_file_path_column( + batch, + &file_path, + RESERVED_COL_NAME_FILE_PATH, + RESERVED_FIELD_ID_FILE_PATH, + ) + }) + }); + + Ok(Box::pin(stream) as ArrowRecordBatchStream) +} diff --git a/crates/iceberg/src/arrow/mod.rs b/crates/iceberg/src/arrow/mod.rs index c091c45177..db85cd5730 100644 --- a/crates/iceberg/src/arrow/mod.rs +++ b/crates/iceberg/src/arrow/mod.rs @@ -33,6 +33,8 @@ pub mod record_batch_projector; pub(crate) mod record_batch_transformer; mod value; +mod incremental; +pub use incremental::*; pub use reader::*; pub use value::*; /// Partition value calculator for computing partition values diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index c6f5af2f2a..ebb5491a16 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -23,11 +23,14 @@ use std::str::FromStr; use std::sync::Arc; use arrow_arith::boolean::{and, and_kleene, is_not_null, is_null, not, or, or_kleene}; -use arrow_array::{Array, ArrayRef, BooleanArray, Datum as ArrowDatum, RecordBatch, Scalar}; +use arrow_array::{ + Array, ArrayRef, BooleanArray, Datum as ArrowDatum, Int32Array, RecordBatch, RunArray, Scalar, + StringArray, +}; use arrow_cast::cast::cast; use arrow_ord::cmp::{eq, gt, gt_eq, lt, lt_eq, neq}; use arrow_schema::{ - ArrowError, DataType, FieldRef, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, + ArrowError, DataType, Field, FieldRef, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, }; use arrow_string::like::starts_with; use bytes::Bytes; @@ -59,6 +62,22 @@ use crate::spec::{Datum, NestedField, PrimitiveType, Schema, Type}; use crate::utils::available_parallelism; use crate::{Error, ErrorKind}; +/// Reserved field ID for the file path (_file) column per Iceberg spec +/// This is dead code for now but will be used when we add the _file column support. +#[allow(dead_code)] +pub(crate) const RESERVED_FIELD_ID_FILE: i32 = 2147483646; + +/// Column name for the file path metadata column per Iceberg spec +/// This is dead code for now but will be used when we add the _file column support. +#[allow(dead_code)] +pub(crate) const RESERVED_COL_NAME_FILE: &str = "_file"; + +/// Reserved field ID for the file path column used in delete file reading. +pub(crate) const RESERVED_FIELD_ID_FILE_PATH: i32 = 2147483546; + +/// Column name for the file path metadata column used in delete file reading. +pub(crate) const RESERVED_COL_NAME_FILE_PATH: &str = "file_path"; + /// Builder to create ArrowReader pub struct ArrowReaderBuilder { batch_size: Option, @@ -126,15 +145,22 @@ impl ArrowReaderBuilder { /// Reads data from Parquet files #[derive(Clone)] pub struct ArrowReader { - batch_size: Option, - file_io: FileIO, - delete_file_loader: CachingDeleteFileLoader, + pub(crate) batch_size: Option, + pub(crate) file_io: FileIO, + pub(crate) delete_file_loader: CachingDeleteFileLoader, /// the maximum number of data files that can be fetched at the same time - concurrency_limit_data_files: usize, + pub(crate) concurrency_limit_data_files: usize, - row_group_filtering_enabled: bool, - row_selection_enabled: bool, + pub(crate) row_group_filtering_enabled: bool, + pub(crate) row_selection_enabled: bool, +} + +/// Trait indicating that the implementing type streams into a stream of type `S` using +/// a reader of type `R`. +pub trait StreamsInto { + /// Stream from the reader and produce a stream of type `S`. + fn stream(self, reader: R) -> Result; } impl ArrowReader { @@ -378,7 +404,7 @@ impl ArrowReader { /// Using the Parquet page index, we build a `RowSelection` that rejects rows that are indicated /// as having been deleted by a positional delete, taking into account any row groups that have /// been skipped entirely by the filter predicate - fn build_deletes_row_selection( + pub(crate) fn build_deletes_row_selection( row_group_metadata_list: &[RowGroupMetaData], selected_row_groups: &Option>, positional_deletes: &DeleteVector, @@ -483,6 +509,103 @@ impl ArrowReader { Ok(results.into()) } + /// Helper function to add a `_file` column to a RecordBatch. + /// Takes the array and field to add, reducing code duplication. + fn create_file_field( + batch: RecordBatch, + file_array: ArrayRef, + file_field: Field, + field_id: i32, + ) -> Result { + let mut columns = batch.columns().to_vec(); + columns.push(file_array); + + let mut fields: Vec<_> = batch.schema().fields().iter().cloned().collect(); + fields.push(Arc::new(file_field.with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + field_id.to_string(), + )])))); + + let schema = Arc::new(ArrowSchema::new(fields)); + RecordBatch::try_new(schema, columns).map_err(|e| { + Error::new( + ErrorKind::Unexpected, + "Failed to add _file column to RecordBatch", + ) + .with_source(e) + }) + } + + /// Adds a `_file` column to the RecordBatch containing the file path. + /// Uses Run-End Encoding (REE) for maximum memory efficiency when the same + /// file path is repeated across all rows. + /// Note: This is only used in tests for now, for production usage we use the + /// non-REE version as it is Julia-compatible. + #[allow(dead_code)] + pub(crate) fn add_file_path_column_ree( + batch: RecordBatch, + file_path: &str, + field_name: &str, + field_id: i32, + ) -> Result { + let num_rows = batch.num_rows(); + + // Use Run-End Encoded array for optimal memory efficiency + // For a constant value repeated num_rows times, this stores: + // - run_ends: [num_rows] (one i32) for non-empty batches, or [] for empty batches + // - values: [file_path] (one string) for non-empty batches, or [] for empty batches + let run_ends = if num_rows == 0 { + Int32Array::from(Vec::::new()) + } else { + Int32Array::from(vec![num_rows as i32]) + }; + let values = if num_rows == 0 { + StringArray::from(Vec::<&str>::new()) + } else { + StringArray::from(vec![file_path]) + }; + + let file_array = RunArray::try_new(&run_ends, &values).map_err(|e| { + Error::new( + ErrorKind::Unexpected, + "Failed to create RunArray for _file column", + ) + .with_source(e) + })?; + + // Per Iceberg spec, the _file column has reserved field ID RESERVED_FIELD_ID_FILE + // DataType is RunEndEncoded with Int32 run ends and Utf8 values + // Note: values field is nullable to match what RunArray::try_new(..) expects. + let run_ends_field = Arc::new(Field::new("run_ends", DataType::Int32, false)); + let values_field = Arc::new(Field::new("values", DataType::Utf8, true)); + let file_field = Field::new( + field_name, + DataType::RunEndEncoded(run_ends_field, values_field), + false, + ); + + Self::create_file_field(batch, Arc::new(file_array), file_field, field_id) + } + + /// Adds a `_file` column to the RecordBatch containing the file path. + /// Materializes the file path string for each row (no compression). + pub(crate) fn add_file_path_column( + batch: RecordBatch, + file_path: &str, + field_name: &str, + field_id: i32, + ) -> Result { + let num_rows = batch.num_rows(); + + // Create a StringArray with the file path repeated num_rows times + let file_array = StringArray::from(vec![file_path; num_rows]); + + // Per Iceberg spec, the _file column has reserved field ID RESERVED_FIELD_ID_FILE + let file_field = Field::new(field_name, DataType::Utf8, false); + + Self::create_file_field(batch, Arc::new(file_array), file_field, field_id) + } + fn build_field_id_set_and_map( parquet_schema: &SchemaDescriptor, predicate: &BoundPredicate, @@ -521,7 +644,7 @@ impl ArrowReader { } } - fn get_arrow_projection_mask( + pub(crate) fn get_arrow_projection_mask( field_ids: &[i32], iceberg_schema_of_task: &Schema, parquet_schema: &SchemaDescriptor, @@ -1479,6 +1602,7 @@ mod tests { use arrow_array::cast::AsArray; use arrow_array::{ArrayRef, LargeStringArray, RecordBatch, StringArray}; use arrow_schema::{DataType, Field, Schema as ArrowSchema, TimeUnit}; + use as_any::Downcast; use futures::TryStreamExt; use parquet::arrow::arrow_reader::{RowSelection, RowSelector}; use parquet::arrow::{ArrowWriter, ProjectionMask}; @@ -1492,7 +1616,9 @@ mod tests { use crate::ErrorKind; use crate::arrow::reader::{CollectFieldIdVisitor, PARQUET_FIELD_ID_META_KEY}; - use crate::arrow::{ArrowReader, ArrowReaderBuilder}; + use crate::arrow::{ + ArrowReader, ArrowReaderBuilder, RESERVED_COL_NAME_FILE, RESERVED_FIELD_ID_FILE, + }; use crate::delete_vector::DeleteVector; use crate::expr::visitors::bound_predicate_visitor::visit; use crate::expr::{Bind, Predicate, Reference}; @@ -2286,4 +2412,319 @@ message schema { assert!(col_b.is_null(1)); assert!(col_b.is_null(2)); } + + #[test] + fn test_add_file_path_column_ree() { + use arrow_array::{Array, Int32Array, RecordBatch, StringArray}; + use arrow_schema::{DataType, Field, Schema}; + + // Create a simple test batch with 2 columns and 3 rows + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("name", DataType::Utf8, false), + ])); + + let id_array = Int32Array::from(vec![1, 2, 3]); + let name_array = StringArray::from(vec!["Alice", "Bob", "Charlie"]); + + let batch = RecordBatch::try_new(schema.clone(), vec![ + Arc::new(id_array), + Arc::new(name_array), + ]) + .unwrap(); + + assert_eq!(batch.num_columns(), 2); + assert_eq!(batch.num_rows(), 3); + + // Add file path column with REE + let file_path = "/path/to/data/file.parquet"; + let result = ArrowReader::add_file_path_column_ree( + batch, + file_path, + RESERVED_COL_NAME_FILE, + RESERVED_FIELD_ID_FILE, + ); + assert!(result.is_ok(), "Should successfully add file path column"); + + let new_batch = result.unwrap(); + + // Verify the new batch has 3 columns + assert_eq!(new_batch.num_columns(), 3); + assert_eq!(new_batch.num_rows(), 3); + + // Verify schema has the _file column + let schema = new_batch.schema(); + assert_eq!(schema.fields().len(), 3); + + let file_field = schema.field(2); + assert_eq!(file_field.name(), RESERVED_COL_NAME_FILE); + assert!(!file_field.is_nullable()); + + // Verify the field has the correct metadata + let metadata = file_field.metadata(); + assert_eq!( + metadata.get(PARQUET_FIELD_ID_META_KEY), + Some(&RESERVED_FIELD_ID_FILE.to_string()) + ); + + // Verify the data type is RunEndEncoded + match file_field.data_type() { + DataType::RunEndEncoded(run_ends_field, values_field) => { + assert_eq!(run_ends_field.name(), "run_ends"); + assert_eq!(run_ends_field.data_type(), &DataType::Int32); + assert!(!run_ends_field.is_nullable()); + + assert_eq!(values_field.name(), "values"); + assert_eq!(values_field.data_type(), &DataType::Utf8); + } + _ => panic!("Expected RunEndEncoded data type for _file column"), + } + + // Verify the original columns are intact + let id_col = new_batch + .column(0) + .as_primitive::(); + assert_eq!(id_col.values(), &[1, 2, 3]); + + let name_col = new_batch.column(1).as_string::(); + assert_eq!(name_col.value(0), "Alice"); + assert_eq!(name_col.value(1), "Bob"); + assert_eq!(name_col.value(2), "Charlie"); + + // Verify the file path column contains the correct value + // The _file column is a RunArray, so we need to decode it + let file_col = new_batch.column(2); + let run_array = file_col + .as_any() + .downcast_ref::>() + .expect("Expected RunArray for _file column"); + + // Verify the run array structure (should be optimally encoded) + let run_ends = run_array.run_ends(); + assert_eq!(run_ends.values().len(), 1, "Should have only 1 run end"); + assert_eq!( + run_ends.values()[0], + new_batch.num_rows() as i32, + "Run end should equal number of rows" + ); + + // Check that the single value in the RunArray is the expected file path + let values = run_array.values(); + let string_values = values.as_string::(); + assert_eq!(string_values.len(), 1, "Should have only 1 value"); + assert_eq!(string_values.value(0), file_path); + + assert!( + string_values + .downcast_ref::() + .unwrap() + .iter() + .all(|v| v == Some(file_path)) + ) + } + + #[test] + fn test_add_file_path_column_ree_empty_batch() { + use arrow_array::RecordBatch; + use arrow_schema::{DataType, Field, Schema}; + use parquet::arrow::PARQUET_FIELD_ID_META_KEY; + + // Create an empty batch + let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)])); + + let id_array = arrow_array::Int32Array::from(Vec::::new()); + let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(id_array)]).unwrap(); + + assert_eq!(batch.num_rows(), 0); + + // Add file path column to empty batch with REE + let file_path = "/empty/file.parquet"; + let result = ArrowReader::add_file_path_column_ree( + batch, + file_path, + RESERVED_COL_NAME_FILE, + RESERVED_FIELD_ID_FILE, + ); + + // Should succeed with empty RunArray for empty batches + assert!(result.is_ok()); + let new_batch = result.unwrap(); + assert_eq!(new_batch.num_rows(), 0); + assert_eq!(new_batch.num_columns(), 2); + + // Verify the _file column exists with correct schema + let schema = new_batch.schema(); + let file_field = schema.field(1); + assert_eq!(file_field.name(), RESERVED_COL_NAME_FILE); + + // Should use RunEndEncoded even for empty batches + match file_field.data_type() { + DataType::RunEndEncoded(run_ends_field, values_field) => { + assert_eq!(run_ends_field.data_type(), &DataType::Int32); + assert_eq!(values_field.data_type(), &DataType::Utf8); + } + _ => panic!("Expected RunEndEncoded data type for _file column"), + } + + // Verify metadata with reserved field ID + assert_eq!( + file_field.metadata().get(PARQUET_FIELD_ID_META_KEY), + Some(&RESERVED_FIELD_ID_FILE.to_string()) + ); + + // Verify the file path column is empty but properly structured + let file_path_column = new_batch.column(1); + assert_eq!(file_path_column.len(), 0); + } + + #[test] + fn test_add_file_path_column_special_characters() { + use arrow_array::{Int32Array, RecordBatch}; + use arrow_schema::{DataType, Field, Schema}; + + let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)])); + + let id_array = Int32Array::from(vec![42]); + let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(id_array)]).unwrap(); + + // Test with file path containing special characters (materialized version) + let file_path = "/path/with spaces/and-dashes/file_name.parquet"; + let result = ArrowReader::add_file_path_column( + batch, + file_path, + RESERVED_COL_NAME_FILE, + RESERVED_FIELD_ID_FILE, + ); + assert!(result.is_ok()); + + let new_batch = result.unwrap(); + let file_col = new_batch.column(1); + + // Verify the file path is correctly stored as a materialized StringArray + let str_arr = file_col.as_string::(); + assert_eq!(str_arr.value(0), file_path); + } + + #[test] + fn test_add_file_path_column() { + use arrow_array::{Int32Array, RecordBatch, StringArray}; + use arrow_schema::{DataType, Field, Schema}; + + // Create a simple test batch with 2 columns and 3 rows + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("name", DataType::Utf8, false), + ])); + + let id_array = Int32Array::from(vec![1, 2, 3]); + let name_array = StringArray::from(vec!["Alice", "Bob", "Charlie"]); + + let batch = RecordBatch::try_new(schema.clone(), vec![ + Arc::new(id_array), + Arc::new(name_array), + ]) + .unwrap(); + + assert_eq!(batch.num_columns(), 2); + assert_eq!(batch.num_rows(), 3); + + // Add file path column with materialization + let file_path = "/path/to/data/file.parquet"; + let result = ArrowReader::add_file_path_column( + batch, + file_path, + RESERVED_COL_NAME_FILE, + RESERVED_FIELD_ID_FILE, + ); + assert!(result.is_ok(), "Should successfully add file path column"); + + let new_batch = result.unwrap(); + + // Verify the new batch has 3 columns + assert_eq!(new_batch.num_columns(), 3); + assert_eq!(new_batch.num_rows(), 3); + + // Verify schema has the _file column + let schema = new_batch.schema(); + assert_eq!(schema.fields().len(), 3); + + let file_field = schema.field(2); + assert_eq!(file_field.name(), RESERVED_COL_NAME_FILE); + assert!(!file_field.is_nullable()); + + // Verify the field has the correct metadata + let metadata = file_field.metadata(); + assert_eq!( + metadata.get(PARQUET_FIELD_ID_META_KEY), + Some(&RESERVED_FIELD_ID_FILE.to_string()) + ); + + // Verify the data type is Utf8 (materialized strings) + assert_eq!(file_field.data_type(), &DataType::Utf8); + + // Verify the original columns are intact + let id_col = new_batch + .column(0) + .as_primitive::(); + assert_eq!(id_col.values(), &[1, 2, 3]); + + let name_col = new_batch.column(1).as_string::(); + assert_eq!(name_col.value(0), "Alice"); + assert_eq!(name_col.value(1), "Bob"); + assert_eq!(name_col.value(2), "Charlie"); + + // Verify the file path column contains the correct value for all rows + let file_col = new_batch.column(2).as_string::(); + for i in 0..new_batch.num_rows() { + assert_eq!(file_col.value(i), file_path); + } + } + + #[test] + fn test_add_file_path_column_empty_batch() { + use arrow_array::RecordBatch; + use arrow_schema::{DataType, Field, Schema}; + use parquet::arrow::PARQUET_FIELD_ID_META_KEY; + + // Create an empty batch + let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)])); + + let id_array = arrow_array::Int32Array::from(Vec::::new()); + let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(id_array)]).unwrap(); + + assert_eq!(batch.num_rows(), 0); + + // Add file path column to empty batch (materialized version) + let file_path = "/empty/file.parquet"; + let result = ArrowReader::add_file_path_column( + batch, + file_path, + RESERVED_COL_NAME_FILE, + RESERVED_FIELD_ID_FILE, + ); + + // Should succeed with empty StringArray + assert!(result.is_ok()); + let new_batch = result.unwrap(); + assert_eq!(new_batch.num_rows(), 0); + assert_eq!(new_batch.num_columns(), 2); + + // Verify the _file column exists with correct schema + let schema = new_batch.schema(); + let file_field = schema.field(1); + assert_eq!(file_field.name(), RESERVED_COL_NAME_FILE); + + // Should use Utf8 (materialized strings) + assert_eq!(file_field.data_type(), &DataType::Utf8); + + // Verify metadata with reserved field ID + assert_eq!( + file_field.metadata().get(PARQUET_FIELD_ID_META_KEY), + Some(&RESERVED_FIELD_ID_FILE.to_string()) + ); + + // Verify the file path column is empty but properly structured + let file_path_column = new_batch.column(1); + assert_eq!(file_path_column.len(), 0); + } } diff --git a/crates/iceberg/src/delete_file_index.rs b/crates/iceberg/src/delete_file_index.rs index 4f6fd28483..3a3dfbb529 100644 --- a/crates/iceberg/src/delete_file_index.rs +++ b/crates/iceberg/src/delete_file_index.rs @@ -107,6 +107,26 @@ impl DeleteFileIndex { _ => unreachable!("Cannot be any other state than loaded"), } } + + pub(crate) async fn positional_deletes(&self) -> Vec { + let notifier = { + let guard = self.state.read().unwrap(); + match *guard { + DeleteFileIndexState::Populating(ref notifier) => notifier.clone(), + DeleteFileIndexState::Populated(ref index) => { + return index.positional_deletes(); + } + } + }; + + notifier.notified().await; + + let guard = self.state.read().unwrap(); + match guard.deref() { + DeleteFileIndexState::Populated(index) => index.positional_deletes(), + _ => unreachable!("Cannot be any other state than loaded"), + } + } } impl PopulatedDeleteFileIndex { @@ -210,6 +230,14 @@ impl PopulatedDeleteFileIndex { results } + + fn positional_deletes(&self) -> Vec { + self.pos_deletes_by_partition + .values() + .flatten() + .map(|ctx| ctx.as_ref().into()) + .collect() + } } #[cfg(test)] diff --git a/crates/iceberg/src/delete_vector.rs b/crates/iceberg/src/delete_vector.rs index f382bf079e..1040796034 100644 --- a/crates/iceberg/src/delete_vector.rs +++ b/crates/iceberg/src/delete_vector.rs @@ -23,9 +23,9 @@ use roaring::treemap::BitmapIter; use crate::{Error, ErrorKind, Result}; -#[derive(Debug, Default)] +#[derive(Debug, Default, Clone)] pub struct DeleteVector { - inner: RoaringTreemap, + pub inner: RoaringTreemap, } impl DeleteVector { diff --git a/crates/iceberg/src/lib.rs b/crates/iceberg/src/lib.rs index aae8efed74..9c9c7460fb 100644 --- a/crates/iceberg/src/lib.rs +++ b/crates/iceberg/src/lib.rs @@ -97,3 +97,6 @@ pub mod writer; mod delete_vector; pub mod puffin; + +/// Utility functions and modules. +pub mod util; diff --git a/crates/iceberg/src/scan/context.rs b/crates/iceberg/src/scan/context.rs index 3f7c29dbf4..e35c260be4 100644 --- a/crates/iceberg/src/scan/context.rs +++ b/crates/iceberg/src/scan/context.rs @@ -33,19 +33,23 @@ use crate::spec::{ }; use crate::{Error, ErrorKind, Result}; +pub(crate) type ManifestEntryFilterFn = dyn Fn(&ManifestEntryRef) -> bool + Send + Sync; + /// Wraps a [`ManifestFile`] alongside the objects that are needed /// to process it in a thread-safe manner pub(crate) struct ManifestFileContext { - manifest_file: ManifestFile, + pub manifest_file: ManifestFile, - sender: Sender, + pub sender: Sender, - field_ids: Arc>, - bound_predicates: Option>, - object_cache: Arc, - snapshot_schema: SchemaRef, - expression_evaluator_cache: Arc, - delete_file_index: DeleteFileIndex, + pub field_ids: Arc>, + pub bound_predicates: Option>, + pub object_cache: Arc, + pub snapshot_schema: SchemaRef, + pub expression_evaluator_cache: Arc, + pub delete_file_index: DeleteFileIndex, + + pub filter_fn: Option>, } /// Wraps a [`ManifestEntryRef`] alongside the objects that are needed @@ -74,12 +78,15 @@ impl ManifestFileContext { mut sender, expression_evaluator_cache, delete_file_index, + filter_fn, .. } = self; + let filter_fn = filter_fn.unwrap_or_else(|| Arc::new(|_| true)); + let manifest = object_cache.get_manifest(&manifest_file).await?; - for manifest_entry in manifest.entries() { + for manifest_entry in manifest.entries().iter().filter(|e| filter_fn(e)) { let manifest_entry_context = ManifestEntryContext { // TODO: refactor to avoid the expensive ManifestEntry clone manifest_entry: manifest_entry.clone(), @@ -189,7 +196,6 @@ impl PlanContext { ) -> Result> + 'static>> { let manifest_files = manifest_list.entries().iter(); - // TODO: Ideally we could ditch this intermediate Vec as we return an iterator. let mut filtered_mfcs = vec![]; for manifest_file in manifest_files { let tx = if manifest_file.content == ManifestContentType::Deletes { @@ -224,6 +230,7 @@ impl PlanContext { partition_bound_predicate, tx, delete_file_idx.clone(), + None, ); filtered_mfcs.push(Ok(mfc)); @@ -238,6 +245,7 @@ impl PlanContext { partition_filter: Option>, sender: Sender, delete_file_index: DeleteFileIndex, + filter_fn: Option>, ) -> ManifestFileContext { let bound_predicates = if let (Some(ref partition_bound_predicate), Some(snapshot_bound_predicate)) = @@ -260,6 +268,7 @@ impl PlanContext { field_ids: self.field_ids.clone(), expression_evaluator_cache: self.expression_evaluator_cache.clone(), delete_file_index, + filter_fn, } } } diff --git a/crates/iceberg/src/scan/incremental/context.rs b/crates/iceberg/src/scan/incremental/context.rs new file mode 100644 index 0000000000..04db28a4c4 --- /dev/null +++ b/crates/iceberg/src/scan/incremental/context.rs @@ -0,0 +1,142 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashSet; +use std::sync::Arc; + +use futures::channel::mpsc::Sender; + +use crate::Result; +use crate::arrow::caching_delete_file_loader::CachingDeleteFileLoader; +use crate::delete_file_index::DeleteFileIndex; +use crate::io::object_cache::ObjectCache; +use crate::scan::ExpressionEvaluatorCache; +use crate::scan::context::{ManifestEntryContext, ManifestEntryFilterFn, ManifestFileContext}; +use crate::spec::{ + ManifestContentType, ManifestEntryRef, ManifestFile, Operation, SchemaRef, SnapshotRef, + TableMetadataRef, +}; + +#[derive(Debug)] +pub(crate) struct IncrementalPlanContext { + /// The snapshots involved in the incremental scan. + pub snapshots: Vec, + + /// The snapshot to start the incremental scan from. + pub from_snapshot: SnapshotRef, + + /// The metadata of the table being scanned. + pub table_metadata: TableMetadataRef, + + /// The schema of the snapshot to end the incremental scan at. + pub to_snapshot_schema: SchemaRef, + + /// The object cache to use for the scan. + pub object_cache: Arc, + + /// The field IDs to scan. + pub field_ids: Arc>, + + /// The expression evaluator cache to use for the scan. + pub expression_evaluator_cache: Arc, + + /// The caching delete file loader to use for the scan. + pub caching_delete_file_loader: CachingDeleteFileLoader, +} + +impl IncrementalPlanContext { + pub(crate) async fn build_manifest_file_contexts( + &self, + tx_data: Sender, + delete_file_idx: DeleteFileIndex, + delete_file_tx: Sender, + ) -> Result> + 'static>> { + // Validate that all snapshots are Append or Delete operations and collect their IDs + let snapshot_ids: HashSet = { + let mut ids = HashSet::new(); + for snapshot in self.snapshots.iter() { + let operation = &snapshot.summary().operation; + if !matches!( + operation, + Operation::Append | Operation::Overwrite | Operation::Delete + ) { + return Err(crate::Error::new( + crate::ErrorKind::FeatureUnsupported, + format!( + "Incremental scan only supports Append, Overwrite and Delete operations, but snapshot {} has operation {:?}", + snapshot.snapshot_id(), + operation + ), + )); + } + ids.insert(snapshot.snapshot_id()); + } + ids + }; + + let (manifest_files, filter_fn) = { + let mut manifest_files = HashSet::::new(); + for snapshot in self.snapshots.iter() { + let manifest_list = self + .object_cache + .get_manifest_list(snapshot, &self.table_metadata) + .await?; + for entry in manifest_list.entries() { + if !snapshot_ids.contains(&entry.added_snapshot_id) { + continue; + } + manifest_files.insert(entry.clone()); + } + } + let filter_fn: Option> = + Some(Arc::new(move |entry: &ManifestEntryRef| { + entry + .snapshot_id() + .map(|id| snapshot_ids.contains(&id)) + .unwrap_or(true) // Include entries without `snapshot_id`. + })); + + (manifest_files, filter_fn) + }; + + // TODO: Ideally we could ditch this intermediate Vec as we return an iterator. + let mut mfcs = vec![]; + for manifest_file in &manifest_files { + let tx = if manifest_file.content == ManifestContentType::Deletes { + delete_file_tx.clone() + } else { + tx_data.clone() + }; + + let mfc = ManifestFileContext { + manifest_file: manifest_file.clone(), + bound_predicates: None, + sender: tx, + object_cache: self.object_cache.clone(), + snapshot_schema: self.to_snapshot_schema.clone(), + field_ids: self.field_ids.clone(), + expression_evaluator_cache: self.expression_evaluator_cache.clone(), + delete_file_index: delete_file_idx.clone(), + filter_fn: filter_fn.clone(), + }; + + mfcs.push(Ok(mfc)); + } + + Ok(Box::new(mfcs.into_iter())) + } +} diff --git a/crates/iceberg/src/scan/incremental/mod.rs b/crates/iceberg/src/scan/incremental/mod.rs new file mode 100644 index 0000000000..1253353b4d --- /dev/null +++ b/crates/iceberg/src/scan/incremental/mod.rs @@ -0,0 +1,588 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Incremental table scan implementation. + +use std::collections::HashSet; +use std::sync::Arc; + +use crate::arrow::caching_delete_file_loader::CachingDeleteFileLoader; +use crate::arrow::delete_filter::DeleteFilter; +use crate::arrow::{ + ArrowReaderBuilder, CombinedIncrementalBatchRecordStream, StreamsInto, + UnzippedIncrementalBatchRecordStream, +}; +use crate::delete_file_index::DeleteFileIndex; +use crate::io::FileIO; +use crate::scan::DeleteFileContext; +use crate::scan::cache::ExpressionEvaluatorCache; +use crate::scan::context::ManifestEntryContext; +use crate::spec::{DataContentType, ManifestStatus, Snapshot, SnapshotRef}; +use crate::table::Table; +use crate::util::snapshot::ancestors_between; +use crate::utils::available_parallelism; +use crate::{Error, ErrorKind, Result}; + +mod context; +use context::*; +mod task; +use futures::channel::mpsc::{Sender, channel}; +use futures::{SinkExt, StreamExt, TryStreamExt}; +use itertools::Itertools; +pub use task::*; + +use crate::runtime::spawn; + +/// Builder for an incremental table scan. +#[derive(Debug)] +pub struct IncrementalTableScanBuilder<'a> { + table: &'a Table, + // Defaults to `None`, which means all columns. + column_names: Option>, + from_snapshot_id: i64, + to_snapshot_id: i64, + batch_size: Option, + concurrency_limit_data_files: usize, + concurrency_limit_manifest_entries: usize, + concurrency_limit_manifest_files: usize, +} + +impl<'a> IncrementalTableScanBuilder<'a> { + pub(crate) fn new(table: &'a Table, from_snapshot_id: i64, to_snapshot_id: i64) -> Self { + let num_cpus = available_parallelism().get(); + Self { + table, + column_names: None, + from_snapshot_id, + to_snapshot_id, + batch_size: None, + concurrency_limit_data_files: num_cpus, + concurrency_limit_manifest_entries: num_cpus, + concurrency_limit_manifest_files: num_cpus, + } + } + + /// Set the batch size for reading data files. + pub fn with_batch_size(mut self, batch_size: Option) -> Self { + self.batch_size = batch_size; + self + } + + /// Select all columns of the table. + pub fn select_all(mut self) -> Self { + self.column_names = None; + self + } + + /// Select no columns of the table. + pub fn select_empty(mut self) -> Self { + self.column_names = Some(vec![]); + self + } + + /// Select some columns of the table. + pub fn select(mut self, column_names: impl IntoIterator) -> Self { + self.column_names = Some( + column_names + .into_iter() + .map(|item| item.to_string()) + .collect(), + ); + self + } + + /// Set the `from_snapshot_id` for the incremental scan. + pub fn from_snapshot_id(mut self, from_snapshot_id: i64) -> Self { + self.from_snapshot_id = from_snapshot_id; + self + } + + /// Set the `to_snapshot_id` for the incremental scan. + pub fn to_snapshot_id(mut self, to_snapshot_id: i64) -> Self { + self.to_snapshot_id = to_snapshot_id; + self + } + + /// Set the concurrency limit for reading data files. + pub fn with_concurrency_limit_data_files(mut self, limit: usize) -> Self { + self.concurrency_limit_data_files = limit; + self + } + + /// Set the concurrency limit for reading manifest entries. + pub fn with_concurrency_limit_manifest_entries(mut self, limit: usize) -> Self { + self.concurrency_limit_manifest_entries = limit; + self + } + + /// Set the concurrency limit for reading manifest files. + pub fn with_concurrency_limit_manifest_files(mut self, limit: usize) -> Self { + self.concurrency_limit_manifest_files = limit; + self + } + + /// Build the incremental table scan. + pub fn build(self) -> Result { + let snapshot_from: Arc = self + .table + .metadata() + .snapshot_by_id(self.from_snapshot_id) + .ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!("Snapshot with id {} not found", self.from_snapshot_id), + ) + })? + .clone(); + + let snapshot_to: Arc = self + .table + .metadata() + .snapshot_by_id(self.to_snapshot_id) + .ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!("Snapshot with id {} not found", self.to_snapshot_id), + ) + })? + .clone(); + + let snapshots = ancestors_between( + &self.table.metadata_ref(), + snapshot_to.snapshot_id(), + Some(snapshot_from.snapshot_id()), + ) + .collect_vec(); + + if !snapshots.is_empty() { + assert_eq!( + snapshots.first().map(|s| s.snapshot_id()), + Some(snapshot_to.snapshot_id()) + ); + } + + let schema = snapshot_to.schema(self.table.metadata())?; + + if let Some(column_names) = self.column_names.as_ref() { + for column_name in column_names { + if schema.field_by_name(column_name).is_none() { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Column {} not found in table. Schema: {}", + column_name, schema + ), + )); + } + } + } + + let mut field_ids = vec![]; + let column_names = self.column_names.clone().unwrap_or_else(|| { + schema + .as_struct() + .fields() + .iter() + .map(|f| f.name.clone()) + .collect() + }); + + for column_name in column_names.iter() { + let field_id = schema.field_id_by_name(column_name).ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!( + "Column {} not found in table. Schema: {}", + column_name, schema + ), + ) + })?; + + schema + .as_struct() + .field_by_id(field_id) + .ok_or_else(|| { + Error::new( + ErrorKind::FeatureUnsupported, + format!( + "Column {} is not a direct child of schema but a nested field, which is not supported now. Schema: {}", + column_name, schema + ), + ) + })?; + + field_ids.push(field_id); + } + + let plan_context = IncrementalPlanContext { + snapshots, + from_snapshot: snapshot_from, + table_metadata: self.table.metadata_ref(), + to_snapshot_schema: schema, + object_cache: self.table.object_cache().clone(), + field_ids: Arc::new(field_ids), + expression_evaluator_cache: Arc::new(ExpressionEvaluatorCache::new()), + caching_delete_file_loader: CachingDeleteFileLoader::new( + self.table.file_io().clone(), + self.concurrency_limit_data_files, + ), + }; + + Ok(IncrementalTableScan { + plan_context, + file_io: self.table.file_io().clone(), + column_names: Some(column_names), + batch_size: self.batch_size, + concurrency_limit_data_files: self.concurrency_limit_data_files, + concurrency_limit_manifest_entries: self.concurrency_limit_manifest_entries, + concurrency_limit_manifest_files: self.concurrency_limit_manifest_files, + }) + } +} + +/// An incremental table scan. +#[derive(Debug)] +pub struct IncrementalTableScan { + plan_context: IncrementalPlanContext, + file_io: FileIO, + column_names: Option>, + batch_size: Option, + concurrency_limit_data_files: usize, + concurrency_limit_manifest_entries: usize, + concurrency_limit_manifest_files: usize, +} + +impl IncrementalTableScan { + /// Returns the `from` snapshot of this incremental table scan. + pub fn snapshot_from(&self) -> &SnapshotRef { + &self.plan_context.from_snapshot + } + + /// Returns the snapshots involved in this incremental table scan. + pub fn snapshots(&self) -> &[SnapshotRef] { + &self.plan_context.snapshots + } + + /// Returns the `to` snapshot of this incremental table scan. + pub fn snapshot_to(&self) -> &SnapshotRef { + self.snapshots() + .first() + .expect("There is always at least one snapshot") + } + + /// Returns the selected column names of this incremental table scan. + /// If `None`, all columns are selected. + pub fn column_names(&self) -> Option<&[String]> { + self.column_names.as_deref() + } + + /// Plans the files to be read in this incremental table scan. + pub async fn plan_files(&self) -> Result { + let concurrency_limit_manifest_files = self.concurrency_limit_manifest_files; + let concurrency_limit_manifest_entries = self.concurrency_limit_manifest_entries; + + // Used to stream `ManifestEntryContexts` between stages of the planning operation. + let (manifest_entry_data_ctx_tx, manifest_entry_data_ctx_rx) = + channel(concurrency_limit_manifest_files); + let (manifest_entry_delete_ctx_tx, manifest_entry_delete_ctx_rx) = + channel(concurrency_limit_manifest_files); + + // Used to stream the results back to the caller. + let (file_scan_task_tx, file_scan_task_rx) = channel(concurrency_limit_manifest_entries); + + let (delete_file_idx, delete_file_tx) = DeleteFileIndex::new(); + + let manifest_file_contexts = self + .plan_context + .build_manifest_file_contexts( + manifest_entry_data_ctx_tx, + delete_file_idx.clone(), + manifest_entry_delete_ctx_tx, + ) + .await?; + + let mut channel_for_manifest_error: Sender> = file_scan_task_tx.clone(); + + // Concurrently load all [`Manifest`]s and stream their [`ManifestEntry`]s + spawn(async move { + let result = futures::stream::iter(manifest_file_contexts) + .try_for_each_concurrent(concurrency_limit_manifest_files, |ctx| async move { + ctx.fetch_manifest_and_stream_manifest_entries().await + }) + .await; + + if let Err(error) = result { + let _ = channel_for_manifest_error.send(Err(error)).await; + } + }); + + let mut channel_for_data_manifest_entry_error = file_scan_task_tx.clone(); + let mut channel_for_delete_manifest_entry_error = file_scan_task_tx.clone(); + + // Process the delete file [`ManifestEntry`] stream in parallel. Builds the delete + // index below. + spawn(async move { + let result = manifest_entry_delete_ctx_rx + .map(|me_ctx| Ok((me_ctx, delete_file_tx.clone()))) + .try_for_each_concurrent( + concurrency_limit_manifest_entries, + |(manifest_entry_context, tx)| async move { + spawn(async move { + Self::process_delete_manifest_entry(tx, manifest_entry_context).await + }) + .await + }, + ) + .await; + + if let Err(error) = result { + let _ = channel_for_delete_manifest_entry_error + .send(Err(error)) + .await; + } + }) + .await; + + // TODO: Streaming this into the delete index seems somewhat redundant, as we + // could directly stream into the CachingDeleteFileLoader and instantly load the + // delete files. + let positional_deletes = delete_file_idx.positional_deletes().await; + let result = self + .plan_context + .caching_delete_file_loader + .load_deletes( + &positional_deletes, + self.plan_context.to_snapshot_schema.clone(), + ) + .await; + + // Build the delete filter from the loaded deletes. + let delete_filter = match result { + Ok(loaded_deletes) => loaded_deletes.unwrap(), + Err(e) => { + return Err(Error::new( + ErrorKind::Unexpected, + format!("Failed to load positional deletes: {}", e), + )); + } + }; + + // Process the data file [`ManifestEntry`] stream in parallel + let filter = delete_filter.clone(); + spawn(async move { + let result = manifest_entry_data_ctx_rx + .map(|me_ctx| Ok((me_ctx, file_scan_task_tx.clone()))) + .try_for_each_concurrent( + concurrency_limit_manifest_entries, + |(manifest_entry_context, tx)| { + let filter = filter.clone(); + async move { + if manifest_entry_context.manifest_entry.status() + == ManifestStatus::Added + { + spawn(async move { + Self::process_data_manifest_entry( + tx, + manifest_entry_context, + &filter, + ) + .await + }) + .await + } else if manifest_entry_context.manifest_entry.status() + == ManifestStatus::Deleted + { + // TODO (RAI-43291): Process deleted files + Err(Error::new( + ErrorKind::FeatureUnsupported, + "Processing deleted data files is not supported yet in incremental scans", + )) + } else { + Ok(()) + } + } + }, + ) + .await; + + if let Err(error) = result { + let _ = channel_for_data_manifest_entry_error.send(Err(error)).await; + } + }); + + // Collect all append tasks. + let mut tasks = file_scan_task_rx.try_collect::>().await?; + + // Compute those file paths that have been appended. + let appended_files = tasks + .iter() + .filter_map(|task| match task { + IncrementalFileScanTask::Append(append_task) => { + Some(append_task.data_file_path.clone()) + } + _ => None, + }) + .collect::>(); + + // Augment `tasks` with delete tasks. + // First collect paths to process (paths that weren't appended in this scan range) + let delete_paths: Vec = delete_filter.with_read(|state| { + Ok(state + .delete_vectors() + .keys() + .filter(|path| !appended_files.contains::(path)) + .cloned() + .collect()) + })?; + + // Now remove and take ownership of each delete vector + for path in delete_paths { + let delete_vector_arc = delete_filter.with_write(|state| { + state.remove_delete_vector(&path).ok_or_else(|| { + Error::new( + ErrorKind::Unexpected, + format!("DeleteVector for path {} not found", path), + ) + }) + })?; + + // Try to unwrap the Arc to avoid cloning the DeleteVector + let delete_vector_inner = Arc::try_unwrap(delete_vector_arc) + .map_err(|_| { + Error::new( + ErrorKind::Unexpected, + "DeleteVector Arc has multiple references, cannot take ownership", + ) + })? + .into_inner() + .map_err(|e| { + Error::new(ErrorKind::Unexpected, "Failed to unwrap DeleteVector Mutex") + .with_source(e) + })?; + + let delete_task = IncrementalFileScanTask::Delete(path, delete_vector_inner); + tasks.push(delete_task); + } + + // We actually would not need a stream here, but we can keep it compatible with + // other scan types. + Ok(futures::stream::iter(tasks).map(Ok).boxed()) + } + + /// Returns an [`CombinedIncrementalBatchRecordStream`] for this incremental table scan. + pub async fn to_arrow(&self) -> Result { + let mut arrow_reader_builder = ArrowReaderBuilder::new(self.file_io.clone()) + .with_data_file_concurrency_limit(self.concurrency_limit_data_files) + .with_row_group_filtering_enabled(true) + .with_row_selection_enabled(true); + + if let Some(batch_size) = self.batch_size { + arrow_reader_builder = arrow_reader_builder.with_batch_size(batch_size); + } + + let arrow_reader = arrow_reader_builder.build(); + let file_scan_task_stream = self.plan_files().await?; + file_scan_task_stream.stream(arrow_reader) + } + + /// Returns an [`UnzippedIncrementalBatchRecordStream`] for this incremental table scan. + /// This stream will yield separate streams for appended and deleted record batches. + pub async fn to_unzipped_arrow(&self) -> Result { + let mut arrow_reader_builder = ArrowReaderBuilder::new(self.file_io.clone()) + .with_data_file_concurrency_limit(self.concurrency_limit_data_files) + .with_row_group_filtering_enabled(true) + .with_row_selection_enabled(true); + + if let Some(batch_size) = self.batch_size { + arrow_reader_builder = arrow_reader_builder.with_batch_size(batch_size); + } + + let arrow_reader = arrow_reader_builder.build(); + let file_scan_task_stream = self.plan_files().await?; + file_scan_task_stream.stream(arrow_reader) + } + + async fn process_delete_manifest_entry( + mut delete_file_ctx_tx: Sender, + manifest_entry_context: ManifestEntryContext, + ) -> Result<()> { + // Abort the plan if we encounter a manifest entry for a data file or equality + // deletes. + if manifest_entry_context.manifest_entry.content_type() == DataContentType::Data { + return Err(Error::new( + ErrorKind::FeatureUnsupported, + "Encountered an entry for a data file in a delete file manifest", + )); + } else if manifest_entry_context.manifest_entry.content_type() + == DataContentType::EqualityDeletes + { + return Err(Error::new( + ErrorKind::FeatureUnsupported, + "Equality deletes are not supported yet in incremental scans", + )); + } + + // Abort if it has been marked as deleted. + if !manifest_entry_context.manifest_entry.is_alive() + && manifest_entry_context.manifest_entry.content_type() + == DataContentType::PositionDeletes + { + return Err(Error::new( + ErrorKind::FeatureUnsupported, + "Processing deleted (position) delete files is not supported yet in incremental scans", + )); + } + + delete_file_ctx_tx + .send(DeleteFileContext { + manifest_entry: manifest_entry_context.manifest_entry.clone(), + partition_spec_id: manifest_entry_context.partition_spec_id, + }) + .await?; + Ok(()) + } + + async fn process_data_manifest_entry( + mut file_scan_task_tx: Sender>, + manifest_entry_context: ManifestEntryContext, + delete_filter: &DeleteFilter, + ) -> Result<()> { + // Skip processing this manifest entry if it has been marked as deleted. + if !manifest_entry_context.manifest_entry.is_alive() { + return Ok(()); + } + + // Abort the plan if we encounter a manifest entry for a delete file + if manifest_entry_context.manifest_entry.content_type() != DataContentType::Data { + return Err(Error::new( + ErrorKind::FeatureUnsupported, + "Encountered an entry for a delete file in a data file manifest", + )); + } + + let file_scan_task = IncrementalFileScanTask::append_from_manifest_entry( + &manifest_entry_context, + delete_filter, + ); + + file_scan_task_tx.send(Ok(file_scan_task)).await?; + Ok(()) + } +} + +#[cfg(test)] +mod tests; diff --git a/crates/iceberg/src/scan/incremental/task.rs b/crates/iceberg/src/scan/incremental/task.rs new file mode 100644 index 0000000000..4b2f244410 --- /dev/null +++ b/crates/iceberg/src/scan/incremental/task.rs @@ -0,0 +1,106 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::{Arc, Mutex}; + +use futures::stream::BoxStream; + +use crate::Result; +use crate::arrow::delete_filter::DeleteFilter; +use crate::delete_vector::DeleteVector; +use crate::scan::context::ManifestEntryContext; +use crate::spec::{DataFileFormat, Schema, SchemaRef}; + +/// A file scan task for appended data files in an incremental scan. +#[derive(Debug, Clone)] +pub struct AppendedFileScanTask { + /// The start offset of the file to scan. + pub start: u64, + /// The length of the file to scan. + pub length: u64, + /// The number of records in the file. + pub record_count: Option, + /// The path to the data file to scan. + pub data_file_path: String, + /// The format of the data file to scan. + pub data_file_format: DataFileFormat, + /// The schema of the data file to scan. + pub schema: crate::spec::SchemaRef, + /// The field ids to project. + pub project_field_ids: Vec, + /// The optional positional deletes associated with this data file. + pub positional_deletes: Option>>, +} + +impl AppendedFileScanTask { + /// Returns the data file path of this appended file scan task. + pub fn data_file_path(&self) -> &str { + &self.data_file_path + } + + /// Returns the schema of this file scan task as a reference + pub fn schema(&self) -> &Schema { + &self.schema + } + + /// Returns the schema of this file scan task as a SchemaRef + pub fn schema_ref(&self) -> SchemaRef { + self.schema.clone() + } +} + +/// The stream of incremental file scan tasks. +pub type IncrementalFileScanTaskStream = BoxStream<'static, Result>; + +/// An incremental file scan task, which can be either an appended data file or positional +/// deletes. +#[derive(Debug, Clone)] +pub enum IncrementalFileScanTask { + /// An appended data file. + Append(AppendedFileScanTask), + /// Deleted records of a data file. First argument is the file path, second the delete + /// vector. + Delete(String, DeleteVector), +} + +impl IncrementalFileScanTask { + /// Create an `IncrementalFileScanTask::Append` from a `ManifestEntryContext` and `DeleteFilter`. + pub(crate) fn append_from_manifest_entry( + manifest_entry_context: &ManifestEntryContext, + delete_filter: &DeleteFilter, + ) -> Self { + let data_file_path = manifest_entry_context.manifest_entry.file_path(); + IncrementalFileScanTask::Append(AppendedFileScanTask { + start: 0, + length: manifest_entry_context.manifest_entry.file_size_in_bytes(), + record_count: Some(manifest_entry_context.manifest_entry.record_count()), + data_file_path: data_file_path.to_string(), + data_file_format: manifest_entry_context.manifest_entry.file_format(), + schema: manifest_entry_context.snapshot_schema.clone(), + project_field_ids: manifest_entry_context.field_ids.as_ref().clone(), + positional_deletes: delete_filter.get_delete_vector_for_path(data_file_path), + }) + } + + /// Returns the data file path of this incremental file scan task. + pub fn data_file_path(&self) -> &str { + match self { + IncrementalFileScanTask::Append(task) => task.data_file_path(), + IncrementalFileScanTask::Delete(path, _) => path, + } + } +} diff --git a/crates/iceberg/src/scan/incremental/tests.rs b/crates/iceberg/src/scan/incremental/tests.rs new file mode 100644 index 0000000000..05724a54a4 --- /dev/null +++ b/crates/iceberg/src/scan/incremental/tests.rs @@ -0,0 +1,1703 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashMap; +use std::fs; +use std::fs::File; +use std::sync::Arc; + +use arrow_array::cast::AsArray; +use arrow_array::{ArrayRef, Int32Array, RecordBatch, StringArray}; +use futures::TryStreamExt; +use parquet::arrow::{ArrowWriter, PARQUET_FIELD_ID_META_KEY}; +use parquet::basic::Compression; +use parquet::file::properties::WriterProperties; +use tempfile::TempDir; +use uuid::Uuid; + +use crate::TableIdent; +use crate::io::{FileIO, OutputFile}; +use crate::spec::{ + DataContentType, DataFileBuilder, DataFileFormat, ManifestEntry, ManifestListWriter, + ManifestStatus, ManifestWriterBuilder, PartitionSpec, Struct, TableMetadata, +}; +use crate::table::Table; + +/// Represents an operation to perform on a snapshot of a table with schema (id: Int32, +/// data: String). +#[derive(Debug, Clone)] +pub enum Operation { + /// Add rows with the given (n, data) tuples, and write to the specified parquet file name. + /// Example: `Add(vec![(1, "a".to_string()), (2, "b".to_string())], "data-1.parquet".to_string())` + /// adds two rows with n=1,2 and data="a","b" to a file named "data-1.parquet" + Add(Vec<(i32, String)>, String), + + /// Delete rows by their positions within specific parquet files (uses positional deletes). + /// Takes a vector of (position, file_name) tuples specifying which position in which file to delete. + /// Example: `Delete(vec![(0, "data-1.parquet"), (1, "data-1.parquet")])` deletes positions 0 and 1 from data-1.parquet + Delete(Vec<(i64, String)>), + + /// Overwrite operation that can append new rows, delete specific positions, and remove entire data files. + /// This is a combination of append and delete operations in a single atomic snapshot. + /// + /// Parameters: + /// 1. Rows to append: Vec<(n, data)> tuples and the filename to write them to + /// 2. Positions to delete: Vec<(position, file_name)> tuples for positional deletes + /// 3. Data files to delete: Vec of file names to completely remove + /// + /// All three parameters can be empty, allowing for various combinations: + /// - Pure append: `Overwrite((rows, "file.parquet"), vec![], vec![])` + /// - Pure positional delete: `Overwrite((vec![], ""), vec![(pos, "file")], vec![])` + /// - Pure file deletion: `Overwrite((vec![], ""), vec![], vec!["file.parquet"])` + /// - Delete entire files: `Overwrite((vec![], ""), vec![], vec!["old-file.parquet"])` + /// + /// Example: `Overwrite((vec![(1, "new".to_string())], "new.parquet"), vec![(0, "old.parquet")], vec!["remove.parquet"])` + /// This adds new data to "new.parquet", deletes position 0 from "old.parquet", and removes "remove.parquet" entirely. + Overwrite( + (Vec<(i32, String)>, String), + Vec<(i64, String)>, + Vec, + ), +} + +/// Tracks the state of data files across snapshots +#[derive(Debug, Clone)] +struct DataFileInfo { + path: String, + snapshot_id: i64, + sequence_number: i64, + n_values: Vec, +} + +/// Test fixture that creates a table with custom snapshots based on operations. +/// +/// # Example +/// ``` +/// let fixture = IncrementalTestFixture::new(vec![ +/// Operation::Add(vec![], "empty.parquet".to_string()), // Empty snapshot +/// Operation::Add( +/// vec![ +/// (1, "1".to_string()), +/// (2, "2".to_string()), +/// (3, "3".to_string()), +/// ], +/// "data-1.parquet".to_string(), +/// ), // Add 3 rows +/// Operation::Delete(vec![(1, "data-1.parquet".to_string())]), // Delete position 1 from data-1.parquet +/// ]) +/// .await; +/// ``` +pub struct IncrementalTestFixture { + pub table_location: String, + pub table: Table, + _tmp_dir: TempDir, // Keep temp dir alive +} + +impl IncrementalTestFixture { + /// Create a new test fixture with the given operations. + pub async fn new(operations: Vec) -> Self { + // Use pwd + let tmp_dir = TempDir::new().unwrap(); + let table_location = tmp_dir.path().join("incremental_test_table"); + + // Create directory structure + fs::create_dir_all(table_location.join("metadata")).unwrap(); + fs::create_dir_all(table_location.join("data")).unwrap(); + + let file_io = FileIO::from_path(table_location.as_os_str().to_str().unwrap()) + .unwrap() + .build() + .unwrap(); + + let num_snapshots = operations.len(); + let current_snapshot_id = num_snapshots as i64; + let last_sequence_number = (num_snapshots - 1) as i64; + + // Build the snapshots JSON dynamically + let mut snapshots_json = Vec::new(); + let mut snapshot_log_json = Vec::new(); + let mut manifest_list_locations = Vec::new(); + + for (i, op) in operations.iter().enumerate() { + let snapshot_id = (i + 1) as i64; + let parent_id = if i == 0 { None } else { Some(i as i64) }; + let sequence_number = i as i64; + let timestamp = 1515100955770 + (i as i64 * 1000); + + let operation_type = match op { + Operation::Add(..) => "append", + Operation::Delete(..) => "delete", + Operation::Overwrite(..) => "overwrite", + }; + + let manifest_list_location = + table_location.join(format!("metadata/snap-{}-manifest-list.avro", snapshot_id)); + manifest_list_locations.push(manifest_list_location.clone()); + + let parent_str = if let Some(pid) = parent_id { + format!(r#""parent-snapshot-id": {},"#, pid) + } else { + String::new() + }; + + snapshots_json.push(format!( + r#" {{ + "snapshot-id": {}, + {} + "timestamp-ms": {}, + "sequence-number": {}, + "summary": {{"operation": "{}"}}, + "manifest-list": "{}", + "schema-id": 0 + }}"#, + snapshot_id, + parent_str, + timestamp, + sequence_number, + operation_type, + manifest_list_location.display() + )); + + snapshot_log_json.push(format!( + r#" {{"snapshot-id": {}, "timestamp-ms": {}}}"#, + snapshot_id, timestamp + )); + } + + let snapshots_str = snapshots_json.join(",\n"); + let snapshot_log_str = snapshot_log_json.join(",\n"); + + // Create the table metadata + let metadata_json = format!( + r#"{{ + "format-version": 2, + "table-uuid": "{}", + "location": "{}", + "last-sequence-number": {}, + "last-updated-ms": 1602638573590, + "last-column-id": 2, + "current-schema-id": 0, + "schemas": [ + {{ + "type": "struct", + "schema-id": 0, + "fields": [ + {{"id": 1, "name": "n", "required": true, "type": "int"}}, + {{"id": 2, "name": "data", "required": true, "type": "string"}} + ] + }} + ], + "default-spec-id": 0, + "partition-specs": [ + {{ + "spec-id": 0, + "fields": [] + }} + ], + "last-partition-id": 0, + "default-sort-order-id": 0, + "sort-orders": [ + {{ + "order-id": 0, + "fields": [] + }} + ], + "properties": {{}}, + "current-snapshot-id": {}, + "snapshots": [ +{} + ], + "snapshot-log": [ +{} + ], + "metadata-log": [] +}}"#, + Uuid::new_v4(), + table_location.display(), + last_sequence_number, + current_snapshot_id, + snapshots_str, + snapshot_log_str + ); + + let table_metadata_location = table_location.join("metadata/v1.json"); + let table_metadata = serde_json::from_str::(&metadata_json).unwrap(); + + let table = Table::builder() + .metadata(table_metadata) + .identifier(TableIdent::from_strs(["db", "incremental_test"]).unwrap()) + .file_io(file_io.clone()) + .metadata_location(table_metadata_location.as_os_str().to_str().unwrap()) + .build() + .unwrap(); + + let mut fixture = Self { + table_location: table_location.to_str().unwrap().to_string(), + table, + _tmp_dir: tmp_dir, + }; + + // Setup all snapshots based on operations + fixture.setup_snapshots(operations).await; + + fixture + } + + fn next_manifest_file(&self) -> OutputFile { + self.table + .file_io() + .new_output(format!( + "{}/metadata/manifest_{}.avro", + self.table_location, + Uuid::new_v4() + )) + .unwrap() + } + + async fn setup_snapshots(&mut self, operations: Vec) { + let current_schema = self + .table + .metadata() + .current_snapshot() + .unwrap() + .schema(self.table.metadata()) + .unwrap(); + let partition_spec = Arc::new(PartitionSpec::unpartition_spec()); + let empty_partition = Struct::empty(); + + // Track all data files and their contents across snapshots + let mut data_files: Vec = Vec::new(); + #[allow(clippy::type_complexity)] + let mut delete_files: Vec<(String, i64, i64, Vec<(String, i64)>)> = Vec::new(); // (path, snapshot_id, sequence_number, [(data_file_path, position)]) + + for (snapshot_idx, operation) in operations.iter().enumerate() { + let snapshot_id = (snapshot_idx + 1) as i64; + let sequence_number = snapshot_idx as i64; + let parent_snapshot_id = if snapshot_idx == 0 { + None + } else { + Some(snapshot_idx as i64) + }; + + match operation { + Operation::Add(rows, file_name) => { + // Extract n_values and data_values from tuples + let n_values: Vec = rows.iter().map(|(n, _)| *n).collect(); + let data_values: Vec = rows.iter().map(|(_, d)| d.clone()).collect(); + + // Create data manifest + let mut data_writer = ManifestWriterBuilder::new( + self.next_manifest_file(), + Some(snapshot_id), + None, + current_schema.clone(), + partition_spec.as_ref().clone(), + ) + .build_v2_data(); + + // Add existing data files from previous snapshots + for data_file in &data_files { + data_writer + .add_existing_entry( + ManifestEntry::builder() + .status(ManifestStatus::Existing) + .snapshot_id(data_file.snapshot_id) + .sequence_number(data_file.sequence_number) + .file_sequence_number(data_file.sequence_number) + .data_file( + DataFileBuilder::default() + .partition_spec_id(0) + .content(DataContentType::Data) + .file_path(data_file.path.clone()) + .file_format(DataFileFormat::Parquet) + .file_size_in_bytes(1024) + .record_count(data_file.n_values.len() as u64) + .partition(empty_partition.clone()) + .key_metadata(None) + .build() + .unwrap(), + ) + .build(), + ) + .unwrap(); + } + + // Add new data if not empty + if !n_values.is_empty() { + let data_file_path = format!("{}/data/{}", &self.table_location, file_name); + self.write_parquet_file(&data_file_path, &n_values, &data_values) + .await; + + data_writer + .add_entry( + ManifestEntry::builder() + .status(ManifestStatus::Added) + .data_file( + DataFileBuilder::default() + .partition_spec_id(0) + .content(DataContentType::Data) + .file_path(data_file_path.clone()) + .file_format(DataFileFormat::Parquet) + .file_size_in_bytes(1024) + .record_count(n_values.len() as u64) + .partition(empty_partition.clone()) + .key_metadata(None) + .build() + .unwrap(), + ) + .build(), + ) + .unwrap(); + + // Track this data file + data_files.push(DataFileInfo { + path: data_file_path, + snapshot_id, + sequence_number, + n_values, + }); + } + + let data_manifest = data_writer.write_manifest_file().await.unwrap(); + + // Create delete manifest if there are any delete files + let mut manifests = vec![data_manifest]; + if !delete_files.is_empty() { + let mut delete_writer = ManifestWriterBuilder::new( + self.next_manifest_file(), + Some(snapshot_id), + None, + current_schema.clone(), + partition_spec.as_ref().clone(), + ) + .build_v2_deletes(); + + for (delete_path, del_snapshot_id, del_sequence_number, _) in &delete_files + { + let delete_count = delete_files + .iter() + .filter(|(p, _, _, _)| p == delete_path) + .map(|(_, _, _, deletes)| deletes.len()) + .sum::(); + + delete_writer + .add_existing_entry( + ManifestEntry::builder() + .status(ManifestStatus::Existing) + .snapshot_id(*del_snapshot_id) + .sequence_number(*del_sequence_number) + .file_sequence_number(*del_sequence_number) + .data_file( + DataFileBuilder::default() + .partition_spec_id(0) + .content(DataContentType::PositionDeletes) + .file_path(delete_path.clone()) + .file_format(DataFileFormat::Parquet) + .file_size_in_bytes(512) + .record_count(delete_count as u64) + .partition(empty_partition.clone()) + .key_metadata(None) + .build() + .unwrap(), + ) + .build(), + ) + .unwrap(); + } + + manifests.push(delete_writer.write_manifest_file().await.unwrap()); + } + + // Write manifest list + let mut manifest_list_write = ManifestListWriter::v2( + self.table + .file_io() + .new_output(format!( + "{}/metadata/snap-{}-manifest-list.avro", + self.table_location, snapshot_id + )) + .unwrap(), + snapshot_id, + parent_snapshot_id, + sequence_number, + ); + manifest_list_write + .add_manifests(manifests.into_iter()) + .unwrap(); + manifest_list_write.close().await.unwrap(); + } + + Operation::Delete(positions_to_delete) => { + // Group deletes by file + let mut deletes_by_file: HashMap> = HashMap::new(); + + for (position, file_name) in positions_to_delete { + let data_file_path = format!("{}/data/{}", &self.table_location, file_name); + deletes_by_file + .entry(data_file_path) + .or_default() + .push(*position); + } + + // Create data manifest with existing data files + let mut data_writer = ManifestWriterBuilder::new( + self.next_manifest_file(), + Some(snapshot_id), + None, + current_schema.clone(), + partition_spec.as_ref().clone(), + ) + .build_v2_data(); + + for data_file in &data_files { + data_writer + .add_existing_entry( + ManifestEntry::builder() + .status(ManifestStatus::Existing) + .snapshot_id(data_file.snapshot_id) + .sequence_number(data_file.sequence_number) + .file_sequence_number(data_file.sequence_number) + .data_file( + DataFileBuilder::default() + .partition_spec_id(0) + .content(DataContentType::Data) + .file_path(data_file.path.clone()) + .file_format(DataFileFormat::Parquet) + .file_size_in_bytes(1024) + .record_count(data_file.n_values.len() as u64) + .partition(empty_partition.clone()) + .key_metadata(None) + .build() + .unwrap(), + ) + .build(), + ) + .unwrap(); + } + + let data_manifest = data_writer.write_manifest_file().await.unwrap(); + + // Create delete manifest + let mut delete_writer = ManifestWriterBuilder::new( + self.next_manifest_file(), + Some(snapshot_id), + None, + current_schema.clone(), + partition_spec.as_ref().clone(), + ) + .build_v2_deletes(); + + // Add existing delete files + for (delete_path, del_snapshot_id, del_sequence_number, _) in &delete_files { + let delete_count = delete_files + .iter() + .filter(|(p, _, _, _)| p == delete_path) + .map(|(_, _, _, deletes)| deletes.len()) + .sum::(); + + delete_writer + .add_existing_entry( + ManifestEntry::builder() + .status(ManifestStatus::Existing) + .snapshot_id(*del_snapshot_id) + .sequence_number(*del_sequence_number) + .file_sequence_number(*del_sequence_number) + .data_file( + DataFileBuilder::default() + .partition_spec_id(0) + .content(DataContentType::PositionDeletes) + .file_path(delete_path.clone()) + .file_format(DataFileFormat::Parquet) + .file_size_in_bytes(512) + .record_count(delete_count as u64) + .partition(empty_partition.clone()) + .key_metadata(None) + .build() + .unwrap(), + ) + .build(), + ) + .unwrap(); + } + + // Add new delete files + for (data_file_path, positions) in deletes_by_file { + let delete_file_path = format!( + "{}/data/delete-{}-{}.parquet", + &self.table_location, + snapshot_id, + Uuid::new_v4() + ); + self.write_positional_delete_file( + &delete_file_path, + &data_file_path, + &positions, + ) + .await; + + delete_writer + .add_entry( + ManifestEntry::builder() + .status(ManifestStatus::Added) + .data_file( + DataFileBuilder::default() + .partition_spec_id(0) + .content(DataContentType::PositionDeletes) + .file_path(delete_file_path.clone()) + .file_format(DataFileFormat::Parquet) + .file_size_in_bytes(512) + .record_count(positions.len() as u64) + .partition(empty_partition.clone()) + .key_metadata(None) + .build() + .unwrap(), + ) + .build(), + ) + .unwrap(); + + // Track this delete file + delete_files.push(( + delete_file_path, + snapshot_id, + sequence_number, + positions + .into_iter() + .map(|pos| (data_file_path.clone(), pos)) + .collect(), + )); + } + + let delete_manifest = delete_writer.write_manifest_file().await.unwrap(); + + // Write manifest list + let mut manifest_list_write = ManifestListWriter::v2( + self.table + .file_io() + .new_output(format!( + "{}/metadata/snap-{}-manifest-list.avro", + self.table_location, snapshot_id + )) + .unwrap(), + snapshot_id, + parent_snapshot_id, + sequence_number, + ); + manifest_list_write + .add_manifests(vec![data_manifest, delete_manifest].into_iter()) + .unwrap(); + manifest_list_write.close().await.unwrap(); + } + + Operation::Overwrite((rows, file_name), positions_to_delete, files_to_delete) => { + // Overwrite creates a single snapshot that can: + // 1. Add new data files + // 2. Delete positions from existing files + // 3. Remove entire data files + + // Create data manifest + let mut data_writer = ManifestWriterBuilder::new( + self.next_manifest_file(), + Some(snapshot_id), + None, + current_schema.clone(), + partition_spec.as_ref().clone(), + ) + .build_v2_data(); + + // Determine which files to delete + let files_to_delete_set: std::collections::HashSet = files_to_delete + .iter() + .map(|f| format!("{}/data/{}", &self.table_location, f)) + .collect(); + + // Add existing data files (mark deleted ones as DELETED, others as EXISTING) + for data_file in &data_files { + if files_to_delete_set.contains(&data_file.path) { + // Mark file for deletion + data_writer + .add_delete_entry( + ManifestEntry::builder() + .status(ManifestStatus::Deleted) + .snapshot_id(data_file.snapshot_id) + .sequence_number(data_file.sequence_number) + .file_sequence_number(data_file.sequence_number) + .data_file( + DataFileBuilder::default() + .partition_spec_id(0) + .content(DataContentType::Data) + .file_path(data_file.path.clone()) + .file_format(DataFileFormat::Parquet) + .file_size_in_bytes(1024) + .record_count(data_file.n_values.len() as u64) + .partition(empty_partition.clone()) + .key_metadata(None) + .build() + .unwrap(), + ) + .build(), + ) + .unwrap(); + } else { + // Keep existing file + data_writer + .add_existing_entry( + ManifestEntry::builder() + .status(ManifestStatus::Existing) + .snapshot_id(data_file.snapshot_id) + .sequence_number(data_file.sequence_number) + .file_sequence_number(data_file.sequence_number) + .data_file( + DataFileBuilder::default() + .partition_spec_id(0) + .content(DataContentType::Data) + .file_path(data_file.path.clone()) + .file_format(DataFileFormat::Parquet) + .file_size_in_bytes(1024) + .record_count(data_file.n_values.len() as u64) + .partition(empty_partition.clone()) + .key_metadata(None) + .build() + .unwrap(), + ) + .build(), + ) + .unwrap(); + } + } + + // Add new data file if rows provided + if !rows.is_empty() { + let n_values: Vec = rows.iter().map(|(n, _)| *n).collect(); + let data_values: Vec = + rows.iter().map(|(_, d)| d.clone()).collect(); + let data_file_path = format!("{}/data/{}", &self.table_location, file_name); + + self.write_parquet_file(&data_file_path, &n_values, &data_values) + .await; + + data_writer + .add_entry( + ManifestEntry::builder() + .status(ManifestStatus::Added) + .data_file( + DataFileBuilder::default() + .partition_spec_id(0) + .content(DataContentType::Data) + .file_path(data_file_path.clone()) + .file_format(DataFileFormat::Parquet) + .file_size_in_bytes(1024) + .record_count(n_values.len() as u64) + .partition(empty_partition.clone()) + .key_metadata(None) + .build() + .unwrap(), + ) + .build(), + ) + .unwrap(); + + // Track this new data file + data_files.push(DataFileInfo { + path: data_file_path, + snapshot_id, + sequence_number, + n_values, + }); + } + + // Remove deleted files from tracking + data_files.retain(|df| !files_to_delete_set.contains(&df.path)); + + let data_manifest = data_writer.write_manifest_file().await.unwrap(); + + // Handle positional deletes if any + let mut manifests = vec![data_manifest]; + + if !positions_to_delete.is_empty() || !delete_files.is_empty() { + let mut delete_writer = ManifestWriterBuilder::new( + self.next_manifest_file(), + Some(snapshot_id), + None, + current_schema.clone(), + partition_spec.as_ref().clone(), + ) + .build_v2_deletes(); + + // Add existing delete files + for (delete_path, del_snapshot_id, del_sequence_number, _) in &delete_files + { + let delete_count = delete_files + .iter() + .filter(|(p, _, _, _)| p == delete_path) + .map(|(_, _, _, deletes)| deletes.len()) + .sum::(); + + delete_writer + .add_existing_entry( + ManifestEntry::builder() + .status(ManifestStatus::Existing) + .snapshot_id(*del_snapshot_id) + .sequence_number(*del_sequence_number) + .file_sequence_number(*del_sequence_number) + .data_file( + DataFileBuilder::default() + .partition_spec_id(0) + .content(DataContentType::PositionDeletes) + .file_path(delete_path.clone()) + .file_format(DataFileFormat::Parquet) + .file_size_in_bytes(512) + .record_count(delete_count as u64) + .partition(empty_partition.clone()) + .key_metadata(None) + .build() + .unwrap(), + ) + .build(), + ) + .unwrap(); + } + + // Add new positional delete files + if !positions_to_delete.is_empty() { + // Group deletes by file + let mut deletes_by_file: HashMap> = HashMap::new(); + for (position, file_name) in positions_to_delete { + let data_file_path = + format!("{}/data/{}", &self.table_location, file_name); + deletes_by_file + .entry(data_file_path) + .or_default() + .push(*position); + } + + for (data_file_path, positions) in deletes_by_file { + let delete_file_path = format!( + "{}/data/delete-{}-{}.parquet", + &self.table_location, + snapshot_id, + Uuid::new_v4() + ); + self.write_positional_delete_file( + &delete_file_path, + &data_file_path, + &positions, + ) + .await; + + delete_writer + .add_entry( + ManifestEntry::builder() + .status(ManifestStatus::Added) + .data_file( + DataFileBuilder::default() + .partition_spec_id(0) + .content(DataContentType::PositionDeletes) + .file_path(delete_file_path.clone()) + .file_format(DataFileFormat::Parquet) + .file_size_in_bytes(512) + .record_count(positions.len() as u64) + .partition(empty_partition.clone()) + .key_metadata(None) + .build() + .unwrap(), + ) + .build(), + ) + .unwrap(); + + // Track this delete file + delete_files.push(( + delete_file_path, + snapshot_id, + sequence_number, + positions + .into_iter() + .map(|pos| (data_file_path.clone(), pos)) + .collect(), + )); + } + } + + manifests.push(delete_writer.write_manifest_file().await.unwrap()); + } + + // Write manifest list + let mut manifest_list_write = ManifestListWriter::v2( + self.table + .file_io() + .new_output(format!( + "{}/metadata/snap-{}-manifest-list.avro", + self.table_location, snapshot_id + )) + .unwrap(), + snapshot_id, + parent_snapshot_id, + sequence_number, + ); + manifest_list_write + .add_manifests(manifests.into_iter()) + .unwrap(); + + manifest_list_write.close().await.unwrap(); + } + } + } + } + + async fn write_parquet_file(&self, path: &str, n_values: &[i32], data_values: &[String]) { + let schema = { + let fields = vec![ + arrow_schema::Field::new("n", arrow_schema::DataType::Int32, false).with_metadata( + HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "1".to_string())]), + ), + arrow_schema::Field::new("data", arrow_schema::DataType::Utf8, false) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "2".to_string(), + )])), + ]; + Arc::new(arrow_schema::Schema::new(fields)) + }; + + let col_n = Arc::new(Int32Array::from(n_values.to_vec())) as ArrayRef; + let col_data = Arc::new(StringArray::from(data_values.to_vec())) as ArrayRef; + + let batch = RecordBatch::try_new(schema.clone(), vec![col_n, col_data]).unwrap(); + + let props = WriterProperties::builder() + .set_compression(Compression::SNAPPY) + .build(); + + let file = File::create(path).unwrap(); + let mut writer = ArrowWriter::try_new(file, schema, Some(props)).unwrap(); + writer.write(&batch).unwrap(); + writer.close().unwrap(); + } + + async fn write_positional_delete_file( + &self, + path: &str, + data_file_path: &str, + positions: &[i64], + ) { + let schema = { + let fields = vec![ + arrow_schema::Field::new("file_path", arrow_schema::DataType::Utf8, false) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "2147483546".to_string(), + )])), + arrow_schema::Field::new("pos", arrow_schema::DataType::Int64, false) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "2147483545".to_string(), + )])), + ]; + Arc::new(arrow_schema::Schema::new(fields)) + }; + + let file_paths: Vec<&str> = vec![data_file_path; positions.len()]; + let col_file_path = Arc::new(StringArray::from(file_paths)) as ArrayRef; + let col_pos = Arc::new(arrow_array::Int64Array::from(positions.to_vec())) as ArrayRef; + + let batch = RecordBatch::try_new(schema.clone(), vec![col_file_path, col_pos]).unwrap(); + + let props = WriterProperties::builder() + .set_compression(Compression::SNAPPY) + .build(); + + let file = File::create(path).unwrap(); + let mut writer = ArrowWriter::try_new(file, schema, Some(props)).unwrap(); + writer.write(&batch).unwrap(); + writer.close().unwrap(); + } + + /// Verify incremental scan results. + /// + /// Verifies that the incremental scan contains the expected appended and deleted records. + pub async fn verify_incremental_scan( + &self, + from_snapshot_id: i64, + to_snapshot_id: i64, + expected_appends: Vec<(i32, &str)>, + expected_deletes: Vec<(u64, &str)>, + ) { + use arrow_array::cast::AsArray; + use arrow_select::concat::concat_batches; + use futures::TryStreamExt; + + let incremental_scan = self + .table + .incremental_scan(from_snapshot_id, to_snapshot_id) + .build() + .unwrap(); + + let stream = incremental_scan.to_arrow().await.unwrap(); + let batches: Vec<_> = stream.try_collect().await.unwrap(); + + // Separate appends and deletes + let append_batches: Vec<_> = batches + .iter() + .filter(|(t, _)| *t == crate::arrow::IncrementalBatchType::Append) + .map(|(_, b)| b.clone()) + .collect(); + + let delete_batches: Vec<_> = batches + .iter() + .filter(|(t, _)| *t == crate::arrow::IncrementalBatchType::Delete) + .map(|(_, b)| b.clone()) + .collect(); + + // Verify appended records + if !append_batches.is_empty() { + let append_batch = + concat_batches(&append_batches[0].schema(), append_batches.iter()).unwrap(); + + let n_array = append_batch + .column(0) + .as_primitive::(); + let data_array = append_batch.column(1).as_string::(); + + let mut appended_pairs: Vec<(i32, String)> = (0..append_batch.num_rows()) + .map(|i| (n_array.value(i), data_array.value(i).to_string())) + .collect(); + appended_pairs.sort(); + + let expected_appends: Vec<(i32, String)> = expected_appends + .into_iter() + .map(|(n, s)| (n, s.to_string())) + .collect(); + + assert_eq!(appended_pairs, expected_appends); + } else { + assert!(expected_appends.is_empty(), "Expected appends but got none"); + } + + // Verify deleted records + if !delete_batches.is_empty() { + let delete_batch = + concat_batches(&delete_batches[0].schema(), delete_batches.iter()).unwrap(); + + let pos_array = delete_batch + .column(0) + .as_primitive::(); + + // The file path column is a StringArray with materialized values + let file_path_column = delete_batch.column(1); + let file_path_array = file_path_column.as_string::(); + + let mut deleted_pairs: Vec<(u64, String)> = (0..delete_batch.num_rows()) + .map(|i| { + let pos = pos_array.value(i); + let file_path = file_path_array.value(i).to_string(); + (pos, file_path) + }) + .collect(); + deleted_pairs.sort(); + + let expected_deletes: Vec<(u64, String)> = expected_deletes + .into_iter() + .map(|(pos, file)| (pos, file.to_string())) + .collect(); + + assert_eq!(deleted_pairs, expected_deletes); + } else { + assert!(expected_deletes.is_empty(), "Expected deletes but got none"); + } + } +} + +#[tokio::test] +async fn test_incremental_fixture_simple() { + let fixture = IncrementalTestFixture::new(vec![ + Operation::Add(vec![], "empty.parquet".to_string()), + Operation::Add( + vec![ + (1, "1".to_string()), + (2, "2".to_string()), + (3, "3".to_string()), + ], + "data-2.parquet".to_string(), + ), + Operation::Delete(vec![(1, "data-2.parquet".to_string())]), // Delete position 1 (n=2, data="2") + ]) + .await; + + // Verify we have 3 snapshots + let mut snapshots = fixture.table.metadata().snapshots().collect::>(); + snapshots.sort_by_key(|s| s.snapshot_id()); + assert_eq!(snapshots.len(), 3); + + // Verify snapshot IDs + assert_eq!(snapshots[0].snapshot_id(), 1); + assert_eq!(snapshots[1].snapshot_id(), 2); + assert_eq!(snapshots[2].snapshot_id(), 3); + + // Verify parent relationships + assert_eq!(snapshots[0].parent_snapshot_id(), None); + assert_eq!(snapshots[1].parent_snapshot_id(), Some(1)); + assert_eq!(snapshots[2].parent_snapshot_id(), Some(2)); + + // Verify incremental scan from snapshot 1 to snapshot 3. + // Expected appends: snapshot 2 adds [1, 2, 3] + // Expected deletes: snapshot 3 deletes [2] + // In total we expect appends [1, 3] and deletes [] + fixture + .verify_incremental_scan(1, 3, vec![(1, "1"), (3, "3")], vec![]) + .await; + + // Verify incremental scan from snapshot 2 to snapshot 3. + let data_file_path = format!("{}/data/data-2.parquet", fixture.table_location); + fixture + .verify_incremental_scan(2, 3, vec![], vec![(1, &data_file_path)]) + .await; + + // Verify incremental scan from snapshot 1 to snapshot 1. + fixture.verify_incremental_scan(1, 1, vec![], vec![]).await; +} + +#[tokio::test] +async fn test_incremental_fixture_complex() { + let fixture = IncrementalTestFixture::new(vec![ + Operation::Add(vec![], "empty.parquet".to_string()), // Snapshot 1: Empty + Operation::Add( + vec![ + (1, "a".to_string()), + (2, "b".to_string()), + (3, "c".to_string()), + (4, "d".to_string()), + (5, "e".to_string()), + ], + "data-2.parquet".to_string(), + ), // Snapshot 2: Add 5 rows (positions 0-4) + Operation::Delete(vec![ + (1, "data-2.parquet".to_string()), + (3, "data-2.parquet".to_string()), + ]), // Snapshot 3: Delete positions 1,3 (n=2,4; data=b,d) + Operation::Add( + vec![(6, "f".to_string()), (7, "g".to_string())], + "data-4.parquet".to_string(), + ), // Snapshot 4: Add 2 more rows (positions 5-6) + Operation::Delete(vec![ + (0, "data-2.parquet".to_string()), + (2, "data-2.parquet".to_string()), + (4, "data-2.parquet".to_string()), + (0, "data-4.parquet".to_string()), + (1, "data-4.parquet".to_string()), + ]), // Snapshot 5: Delete positions 0,2,4,5,6 (all remaining rows: n=1,3,5,6,7) + ]) + .await; + + // Verify we have 5 snapshots + let mut snapshots = fixture.table.metadata().snapshots().collect::>(); + snapshots.sort_by_key(|s| s.snapshot_id()); + assert_eq!(snapshots.len(), 5); + + // Verify parent chain + assert_eq!(snapshots[0].parent_snapshot_id(), None); + for (i, snapshot) in snapshots.iter().enumerate().take(5).skip(1) { + assert_eq!(snapshot.parent_snapshot_id(), Some(i as i64)); + } + + // Verify current snapshot + assert_eq!( + fixture + .table + .metadata() + .current_snapshot() + .unwrap() + .snapshot_id(), + 5 + ); + + // Verify incremental scan from snapshot 1 to snapshot 5. + // All data has been deleted, so we expect the empty result. + fixture.verify_incremental_scan(1, 5, vec![], vec![]).await; + + // Verify incremental scan from snapshot 2 to snapshot 5. + // Snapshot 2 starts with: (1,a), (2,b), (3,c), (4,d), (5,e) in data-2.parquet + // Snapshot 3: Deletes positions 1,3 from data-2.parquet (n=2,4; data=b,d) + // Snapshot 4: Adds (6,f), (7,g) in data-4.parquet + // Snapshot 5: Deletes positions 0,2,4 from data-2.parquet and 0,1 from data-4.parquet (n=1,3,5,6,7; data=a,c,e,f,g) + // + // The incremental scan computes the NET EFFECT between snapshot 2 and 5: + // - Files added in snapshot 4 were completely deleted in snapshot 5, so NO net appends + // - Net deletes from data-2.parquet: positions 0,1,2,3,4 (all 5 rows deleted across snapshots 3 and 5) + // - Since data-4 was added and deleted between 2 and 5, it doesn't appear in the scan + let data_2_path = format!("{}/data/data-2.parquet", fixture.table_location); + fixture + .verify_incremental_scan( + 2, + 5, + vec![], // No net appends (data-4 was added and fully deleted) + vec![ + (0, data_2_path.as_str()), // All 5 positions from data-2.parquet + (1, data_2_path.as_str()), + (2, data_2_path.as_str()), + (3, data_2_path.as_str()), + (4, data_2_path.as_str()), + ], + ) + .await; + + // Verify incremental scan from snapshot 3 to snapshot 5. + // Snapshot 3 state: (1,a), (3,c), (5,e) remain in data-2.parquet at positions 0,2,4 + // Snapshot 4: Adds (6,f), (7,g) in data-4.parquet + // Snapshot 5: Deletes positions 0,2,4 from data-2.parquet (n=1,3,5) and 0,1 from data-4.parquet (n=6,7) + // + // Net effect from snapshot 3 to 5: + // - No net appends (data-4 was added and fully deleted between 3 and 5) + // - Net deletes from data-2.parquet: positions 0,2,4 (the three remaining rows deleted in snapshot 5) + fixture + .verify_incremental_scan( + 3, + 5, + vec![], // No net appends (data-4 was added and fully deleted) + vec![ + (0, data_2_path.as_str()), // Positions 0,2,4 from data-2.parquet + (2, data_2_path.as_str()), // (n=1,3,5; data=a,c,e) + (4, data_2_path.as_str()), + ], + ) + .await; + + // Verify incremental scan from snapshot 1 to snapshot 4. + // Snapshot 1: Empty + // Snapshot 2: Adds (1,a), (2,b), (3,c), (4,d), (5,e) in data-2.parquet + // Snapshot 3: Deletes positions 1,3 from data-2.parquet (n=2,4; data=b,d) + // Snapshot 4: Adds (6,f), (7,g) in data-4.parquet + // + // Net effect from snapshot 1 to 4: + // - Net appends: (1,a), (3,c), (5,e), (6,f), (7,g) - all rows that exist at snapshot 4 + // - No deletes: rows deleted in snapshot 3 were added after snapshot 1, so they don't count as deletes + fixture + .verify_incremental_scan( + 1, + 4, + vec![(1, "a"), (3, "c"), (5, "e"), (6, "f"), (7, "g")], + vec![], // No deletes (deleted rows were added after snapshot 1) + ) + .await; + + // Verify incremental scan from snapshot 2 to snapshot 4. + // Snapshot 2: Has (1,a), (2,b), (3,c), (4,d), (5,e) in data-2.parquet + // Snapshot 3: Deletes positions 1,3 from data-2.parquet (n=2,4; data=b,d) + // Snapshot 4: Adds (6,f), (7,g) in data-4.parquet + // + // Net effect from snapshot 2 to 4: + // - Net appends: (6,f), (7,g) from data-4.parquet + // - Net deletes: positions 1,3 from data-2.parquet (n=2,4; data=b,d) - existed at snapshot 2 and deleted in 3 + fixture + .verify_incremental_scan(2, 4, vec![(6, "f"), (7, "g")], vec![ + (1, data_2_path.as_str()), // Positions 1,3 from data-2.parquet + (3, data_2_path.as_str()), // (n=2,4; data=b,d) + ]) + .await; +} + +#[tokio::test] +async fn test_incremental_scan_edge_cases() { + // This test covers several edge cases: + // 1. Multiple data files added in separate snapshots + // 2. Deletes spread across multiple data files + // 3. Partial deletes from multiple files + // 4. Cross-file delete operations in a single snapshot + let fixture = IncrementalTestFixture::new(vec![ + // Snapshot 1: Empty starting point + Operation::Add(vec![], "empty.parquet".to_string()), + // Snapshot 2: Add file A with 3 rows + Operation::Add( + vec![ + (1, "a1".to_string()), + (2, "a2".to_string()), + (3, "a3".to_string()), + ], + "file-a.parquet".to_string(), + ), + // Snapshot 3: Add file B with 4 rows + Operation::Add( + vec![ + (10, "b1".to_string()), + (20, "b2".to_string()), + (30, "b3".to_string()), + (40, "b4".to_string()), + ], + "file-b.parquet".to_string(), + ), + // Snapshot 4: Partial delete from file A (delete middle row n=2) + Operation::Delete(vec![(1, "file-a.parquet".to_string())]), + // Snapshot 5: Partial delete from file B (delete first and last rows n=10,40) + Operation::Delete(vec![ + (0, "file-b.parquet".to_string()), + (3, "file-b.parquet".to_string()), + ]), + // Snapshot 6: Add file C with 2 rows + Operation::Add( + vec![(100, "c1".to_string()), (200, "c2".to_string())], + "file-c.parquet".to_string(), + ), + // Snapshot 7: Delete from multiple files in one snapshot (cross-file deletes) + Operation::Delete(vec![ + (0, "file-a.parquet".to_string()), // n=1 + (1, "file-b.parquet".to_string()), // n=20 + (0, "file-c.parquet".to_string()), // n=100 + ]), + ]) + .await; + + // Verify we have 7 snapshots + let n_snapshots = fixture.table.metadata().snapshots().count(); + assert_eq!(n_snapshots, 7); + + let file_a_path = format!("{}/data/file-a.parquet", fixture.table_location); + let file_b_path = format!("{}/data/file-b.parquet", fixture.table_location); + + // Test 1: Scan from snapshot 1 to 4 + // Should see: file-a (1,2,3), file-b (10,20,30,40) added, then (2) deleted from file-a + // BUT: The row n=2 was added AFTER snapshot 1, so it won't show as a delete! + // Net: appends (1,3) from file-a (n=2 added then deleted = net zero), (10,20,30,40) from file-b + // No deletes (n=2 was added and deleted between snapshots 1 and 4) + fixture + .verify_incremental_scan( + 1, + 4, + vec![ + (1, "a1"), + (3, "a3"), + (10, "b1"), + (20, "b2"), + (30, "b3"), + (40, "b4"), + ], + vec![], // No deletes - n=2 was added and deleted between snapshots + ) + .await; + + // Test 2: Scan from snapshot 4 to 6 + // Snapshot 4: has file-a (1,3) and file-b (10,20,30,40) + // Snapshot 5: deletes positions 0,3 from file-b (n=10,40) + // Snapshot 6: adds file-c (100,200) + // Net: appends (100,200) from file-c; deletes pos 0,3 from file-b + fixture + .verify_incremental_scan(4, 6, vec![(100, "c1"), (200, "c2")], vec![ + (0, file_b_path.as_str()), // n=10 + (3, file_b_path.as_str()), // n=40 + ]) + .await; + + // Test 3: Scan from snapshot 2 to 7 + // This tests the full lifecycle: multiple adds, partial deletes, more adds, cross-file deletes + // Starting at snapshot 2: file-a (1,2,3) exists + // File-b is added in snapshot 3 (after snapshot 2) + // File-c is added in snapshot 6 (after snapshot 2) + // By snapshot 7: file-a has (3) at position 2, file-b has (30), file-c has (200) + // + // Net appends: file-b (30) and file-c (200) were added after snapshot 2 + // Net deletes: positions 0,1 from file-a (n=1,2) existed at snapshot 2 and were deleted + // Note: (3) from file-a already existed at snapshot 2, so it's not a net append! + fixture + .verify_incremental_scan(2, 7, vec![(30, "b3"), (200, "c2")], vec![ + (0, file_a_path.as_str()), // n=1 + (1, file_a_path.as_str()), // n=2 + ]) + .await; + + // Test 4: Scan from snapshot 5 to 6 + // Simple test: just adding a new file + // Snapshot 5 state: file-a (1,3), file-b (20,30) + // Snapshot 6: adds file-c (100,200) + // Net: appends (100,200) from file-c, no deletes + fixture + .verify_incremental_scan(5, 6, vec![(100, "c1"), (200, "c2")], vec![]) + .await; + + // Test 5: Scan from snapshot 3 to 4 + // Tests a single delete operation + // State at 3: file-a (1,2,3), file-b (10,20,30,40) + // State at 4: file-a (1,3), file-b (10,20,30,40) + // Net: no appends, 1 delete (position 1, n=2) from file-a + fixture + .verify_incremental_scan( + 3, + 4, + vec![], + vec![(1, file_a_path.as_str())], // n=2 + ) + .await; +} + +#[tokio::test] +async fn test_incremental_scan_builder_options() { + // This test demonstrates using the incremental scan builder API with various options: + // - Column projection (selecting specific columns) + // - Batch size configuration + // - Verifying the schema and batch structure + let fixture = IncrementalTestFixture::new(vec![ + Operation::Add(vec![], "empty.parquet".to_string()), + // Snapshot 2: Add 10 rows to test batch size behavior + Operation::Add( + vec![ + (1, "data-1".to_string()), + (2, "data-2".to_string()), + (3, "data-3".to_string()), + (4, "data-4".to_string()), + (5, "data-5".to_string()), + (6, "data-6".to_string()), + (7, "data-7".to_string()), + (8, "data-8".to_string()), + (9, "data-9".to_string()), + (10, "data-10".to_string()), + ], + "data-2.parquet".to_string(), + ), + // Snapshot 3: Delete some rows + Operation::Delete(vec![ + (2, "data-2.parquet".to_string()), // n=3 + (5, "data-2.parquet".to_string()), // n=6 + (8, "data-2.parquet".to_string()), // n=9 + ]), + // Snapshot 4: Add more rows + Operation::Add( + vec![ + (20, "data-20".to_string()), + (21, "data-21".to_string()), + (22, "data-22".to_string()), + (23, "data-23".to_string()), + (24, "data-24".to_string()), + ], + "data-4.parquet".to_string(), + ), + ]) + .await; + + // Test 1: Column projection - select only the "n" column + let scan = fixture + .table + .incremental_scan(1, 4) + .select(vec!["n"]) + .build() + .unwrap(); + + let stream = scan.to_arrow().await.unwrap(); + let batches: Vec<_> = stream.try_collect().await.unwrap(); + + // Verify we have both append and delete batches + let append_batches: Vec<_> = batches + .iter() + .filter(|(t, _)| *t == crate::arrow::IncrementalBatchType::Append) + .map(|(_, b)| b.clone()) + .collect(); + + assert!(!append_batches.is_empty(), "Should have append batches"); + + // Check schema - should only have "n" column + for batch in &append_batches { + assert_eq!( + batch.schema().fields().len(), + 1, + "Should have only 1 column when projecting 'n'" + ); + assert_eq!( + batch.schema().field(0).name(), + "n", + "Projected column should be 'n'" + ); + } + + // Test 2: Column projection - select only the "data" column + let scan = fixture + .table + .incremental_scan(1, 4) + .select(vec!["data"]) + .build() + .unwrap(); + + let stream = scan.to_arrow().await.unwrap(); + let batches: Vec<_> = stream.try_collect().await.unwrap(); + + let append_batches: Vec<_> = batches + .iter() + .filter(|(t, _)| *t == crate::arrow::IncrementalBatchType::Append) + .map(|(_, b)| b.clone()) + .collect(); + + for batch in &append_batches { + assert_eq!( + batch.schema().fields().len(), + 1, + "Should have only 1 column when projecting 'data'" + ); + assert_eq!( + batch.schema().field(0).name(), + "data", + "Projected column should be 'data'" + ); + } + + // Test 3: Select both columns explicitly + let scan = fixture + .table + .incremental_scan(1, 4) + .select(vec!["n", "data"]) + .build() + .unwrap(); + + let stream = scan.to_arrow().await.unwrap(); + let batches: Vec<_> = stream.try_collect().await.unwrap(); + + let append_batches: Vec<_> = batches + .iter() + .filter(|(t, _)| *t == crate::arrow::IncrementalBatchType::Append) + .map(|(_, b)| b.clone()) + .collect(); + + for batch in &append_batches { + assert_eq!( + batch.schema().fields().len(), + 2, + "Should have 2 columns when projecting both" + ); + assert_eq!(batch.schema().field(0).name(), "n"); + assert_eq!(batch.schema().field(1).name(), "data"); + } + + // Test 4: Batch size configuration + let scan = fixture + .table + .incremental_scan(1, 2) + .with_batch_size(Some(3)) // Small batch size to test batching + .build() + .unwrap(); + + let stream = scan.to_arrow().await.unwrap(); + let batches: Vec<_> = stream.try_collect().await.unwrap(); + + let append_batches = batches + .iter() + .filter(|(t, _)| *t == crate::arrow::IncrementalBatchType::Append) + .map(|(_, b)| b.clone()); + + for batch in append_batches { + // Each batch should have at most 3 rows (except possibly the last) + assert!( + batch.num_rows() <= 3, + "Batch size should be <= 3 as configured" + ); + } + + // Test 5: Combining column projection and batch size + let scan = fixture + .table + .incremental_scan(1, 4) + .select(vec!["n"]) + .with_batch_size(Some(4)) + .build() + .unwrap(); + + let stream = scan.to_arrow().await.unwrap(); + let batches: Vec<_> = stream.try_collect().await.unwrap(); + + let append_batches = batches + .iter() + .filter(|(t, _)| *t == crate::arrow::IncrementalBatchType::Append) + .map(|(_, b)| b.clone()); + + for batch in append_batches { + assert_eq!(batch.schema().fields().len(), 1, "Should project only 'n'"); + assert!(batch.num_rows() <= 4, "Batch size should be <= 4"); + } + + // Test 6: Verify actual data with column projection + let scan = fixture + .table + .incremental_scan(1, 2) + .select(vec!["n"]) + .build() + .unwrap(); + + let stream = scan.to_arrow().await.unwrap(); + let batches: Vec<_> = stream.try_collect().await.unwrap(); + + let append_batches: Vec<_> = batches + .iter() + .filter(|(t, _)| *t == crate::arrow::IncrementalBatchType::Append) + .map(|(_, b)| b.clone()) + .collect(); + + if !append_batches.is_empty() { + use arrow_select::concat::concat_batches; + let combined_batch = + concat_batches(&append_batches[0].schema(), append_batches.iter()).unwrap(); + + let n_array = combined_batch + .column(0) + .as_primitive::(); + + let mut n_values: Vec = (0..combined_batch.num_rows()) + .map(|i| n_array.value(i)) + .collect(); + n_values.sort(); + + assert_eq!( + n_values, + vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10], + "Should have all 10 n values from snapshot 2" + ); + } + + // Test 7: Delete batches always have the same schema. + let scan = fixture.table.incremental_scan(2, 3).build().unwrap(); + + let stream = scan.to_arrow().await.unwrap(); + let batches: Vec<_> = stream.try_collect().await.unwrap(); + + let delete_batches: Vec<_> = batches + .iter() + .filter(|(t, _)| *t == crate::arrow::IncrementalBatchType::Delete) + .map(|(_, b)| b.clone()) + .collect(); + + if !delete_batches.is_empty() { + for batch in &delete_batches { + // Delete batches should have "pos" and "_file" columns + assert!( + batch.schema().fields().len() == 2, + "Delete batch should have exactly position and file columns" + ); + assert_eq!( + batch.num_rows(), + 3, + "Should have 3 deleted positions from snapshot 3" + ); + } + } +} + +#[tokio::test] +async fn test_incremental_scan_with_deleted_files_errors() { + // This test verifies that incremental scans properly error out when entire data files + // are deleted (overwrite operation), since this is not yet supported. + // + // Test scenario: + // Snapshot 1: Add file-1.parquet with data + // Snapshot 2: Add file-2.parquet with data + // Snapshot 3: Overwrite - delete file-1.parquet entirely + // Snapshot 4: Add file-3.parquet with data + // + // Incremental scan from snapshot 1 to snapshot 3 should error because file-1 + // was completely removed in the overwrite operation. + + let fixture = IncrementalTestFixture::new(vec![ + // Snapshot 1: Add file-1 with rows + Operation::Add( + vec![ + (1, "a".to_string()), + (2, "b".to_string()), + (3, "c".to_string()), + ], + "file-1.parquet".to_string(), + ), + // Snapshot 2: Add file-2 with rows + Operation::Add( + vec![(10, "x".to_string()), (20, "y".to_string())], + "file-2.parquet".to_string(), + ), + // Snapshot 3: Overwrite - delete file-1 entirely + Operation::Overwrite( + (vec![], "".to_string()), // No new data to add + vec![], // No positional deletes + vec!["file-1.parquet".to_string()], // Delete file-1 completely + ), + // Snapshot 4: Add file-3 (to have more snapshots) + Operation::Add(vec![(100, "p".to_string())], "file-3.parquet".to_string()), + ]) + .await; + + // Test 1: Incremental scan from snapshot 1 to 3 should error when building the stream + // because file-1 was deleted entirely in snapshot 3 + let scan = fixture + .table + .incremental_scan(1, 3) + .build() + .expect("Building the scan should succeed"); + + let stream_result = scan.to_arrow().await; + + match stream_result { + Err(error) => { + assert_eq!( + error.message(), + "Processing deleted data files is not supported yet in incremental scans", + "Error message should indicate that deleted files are not supported. Got: {}", + error + ); + } + Ok(_) => panic!( + "Expected error when building stream over a snapshot that deletes entire data files" + ), + } + + // Test 2: Incremental scan from snapshot 2 to 4 should also error + // because it includes snapshot 3 which deletes a file + let scan = fixture + .table + .incremental_scan(2, 4) + .build() + .expect("Building the scan should succeed"); + + let stream_result = scan.to_arrow().await; + + match stream_result { + Err(_) => { + // Expected error + } + Ok(_) => panic!("Expected error when scan range includes a snapshot that deletes files"), + } + + // Test 3: Incremental scan from snapshot 1 to 2 should work fine + // (no files deleted) + let scan = fixture + .table + .incremental_scan(1, 2) + .build() + .expect("Building the scan should succeed"); + + let stream_result = scan.to_arrow().await; + + assert!( + stream_result.is_ok(), + "Scan should succeed when no files are deleted. Error: {:?}", + stream_result.err() + ); + + // Test 4: Incremental scan from snapshot 3 to 4 should work + // (starting from after the deletion) + let scan = fixture + .table + .incremental_scan(3, 4) + .build() + .expect("Building the scan should succeed"); + + let stream_result = scan.to_arrow().await; + + assert!( + stream_result.is_ok(), + "Scan should succeed when starting after the file deletion. Error: {:?}", + stream_result.err() + ); +} diff --git a/crates/iceberg/src/scan/mod.rs b/crates/iceberg/src/scan/mod.rs index 3d14b3cce4..682691dc49 100644 --- a/crates/iceberg/src/scan/mod.rs +++ b/crates/iceberg/src/scan/mod.rs @@ -21,6 +21,9 @@ mod cache; use cache::*; mod context; use context::*; + +pub mod incremental; + mod task; use std::sync::Arc; diff --git a/crates/iceberg/src/table.rs b/crates/iceberg/src/table.rs index d4e696ce84..1c8b43ca04 100644 --- a/crates/iceberg/src/table.rs +++ b/crates/iceberg/src/table.rs @@ -24,6 +24,7 @@ use crate::inspect::MetadataTable; use crate::io::FileIO; use crate::io::object_cache::ObjectCache; use crate::scan::TableScanBuilder; +use crate::scan::incremental::IncrementalTableScanBuilder; use crate::spec::{TableMetadata, TableMetadataRef}; use crate::{Error, ErrorKind, Result, TableIdent}; @@ -224,6 +225,15 @@ impl Table { TableScanBuilder::new(self) } + /// Creates an incremental table scan between two snapshots. + pub fn incremental_scan( + &self, + from_snapshot_id: i64, + to_snapshot_id: i64, + ) -> IncrementalTableScanBuilder<'_> { + IncrementalTableScanBuilder::new(self, from_snapshot_id, to_snapshot_id) + } + /// Creates a metadata table which provides table-like APIs for inspecting metadata. /// See [`MetadataTable`] for more details. pub fn inspect(&self) -> MetadataTable<'_> { diff --git a/crates/iceberg/src/util/mod.rs b/crates/iceberg/src/util/mod.rs new file mode 100644 index 0000000000..b614c981ec --- /dev/null +++ b/crates/iceberg/src/util/mod.rs @@ -0,0 +1,19 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +/// Utilities for working with snapshots. +pub mod snapshot; diff --git a/crates/iceberg/src/util/snapshot.rs b/crates/iceberg/src/util/snapshot.rs new file mode 100644 index 0000000000..513e23c110 --- /dev/null +++ b/crates/iceberg/src/util/snapshot.rs @@ -0,0 +1,74 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Taken from https://github.com/apache/iceberg-rust/pull/1470 + +use crate::spec::{SnapshotRef, TableMetadataRef}; + +struct Ancestors { + next: Option, + get_snapshot: Box Option + Send>, +} + +impl Iterator for Ancestors { + type Item = SnapshotRef; + + fn next(&mut self) -> Option { + let snapshot = self.next.take()?; + let result = snapshot.clone(); + self.next = snapshot + .parent_snapshot_id() + .and_then(|id| (self.get_snapshot)(id)); + Some(result) + } +} + +/// Iterate starting from `snapshot` (inclusive) to the root snapshot. +pub fn ancestors_of( + table_metadata: &TableMetadataRef, + snapshot: i64, +) -> Box + Send> { + if let Some(snapshot) = table_metadata.snapshot_by_id(snapshot) { + let table_metadata = table_metadata.clone(); + Box::new(Ancestors { + next: Some(snapshot.clone()), + get_snapshot: Box::new(move |id| table_metadata.snapshot_by_id(id).cloned()), + }) + } else { + Box::new(std::iter::empty()) + } +} + +/// Iterate starting from `snapshot` (inclusive) to `oldest_snapshot_id` (exclusive). +pub fn ancestors_between( + table_metadata: &TableMetadataRef, + latest_snapshot_id: i64, + oldest_snapshot_id: Option, +) -> Box + Send> { + let Some(oldest_snapshot_id) = oldest_snapshot_id else { + return Box::new(ancestors_of(table_metadata, latest_snapshot_id)); + }; + + if latest_snapshot_id == oldest_snapshot_id { + return Box::new(std::iter::empty()); + } + + Box::new( + ancestors_of(table_metadata, latest_snapshot_id) + .take_while(move |snapshot| snapshot.snapshot_id() != oldest_snapshot_id), + ) +}