Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
aab78d6
Add REE file column helpers
gbrgr Nov 4, 2025
ee21cab
Add helper tests
gbrgr Nov 4, 2025
37b52e2
Add constants
gbrgr Nov 4, 2025
44463a0
Add support for _file constant
gbrgr Nov 4, 2025
b5449f6
Merge branch 'main' into feature/gb/file-column
gbrgr Nov 4, 2025
e034009
Update tests
gbrgr Nov 4, 2025
4f0a4f1
Fix clippy warning
gbrgr Nov 4, 2025
51f76d3
Fix doc test
gbrgr Nov 4, 2025
d84e16b
Track in field ids
gbrgr Nov 4, 2025
984dacd
Merge branch 'main' into feature/gb/file-column
gbrgr Nov 4, 2025
bd478cb
Add test
gbrgr Nov 4, 2025
8593db0
Merge branch 'feature/gb/file-column' of github.com:RelationalAI/iceb…
gbrgr Nov 4, 2025
9b186c7
Allow repeated virtual file column selection
gbrgr Nov 4, 2025
30ae5fb
Merge branch 'main' into feature/gb/file-column
gbrgr Nov 5, 2025
adf0da0
Refactor into own transformer step
gbrgr Nov 7, 2025
f4336a8
Merge branch 'feature/gb/file-column' of github.com:RelationalAI/iceb…
gbrgr Nov 7, 2025
ef3a965
Revert "Refactor into own transformer step"
gbrgr Nov 7, 2025
534490b
Avoid special casing in batch creation
gbrgr Nov 7, 2025
04bf463
.
gbrgr Nov 7, 2025
9e88edf
Modify record batch transformer to support reserved fields
gbrgr Nov 12, 2025
060b45d
Add metadata column helper functions
gbrgr Nov 12, 2025
8572dae
Store fields instead of constants
gbrgr Nov 12, 2025
f273add
Add comment
gbrgr Nov 12, 2025
5aa92ae
Adapt comment
gbrgr Nov 12, 2025
c05b886
.
gbrgr Nov 12, 2025
33bb0ad
Adapt error message
gbrgr Nov 12, 2025
42167ff
Consider field_id range
gbrgr Nov 12, 2025
cbc6b17
Merge remote-tracking branch 'upstream/main' into feature/gb/file-column
gbrgr Nov 13, 2025
977c813
Merge remote-tracking branch 'upstream/main' into feature/gb/file-column
gbrgr Nov 13, 2025
83443aa
Use REE encoding in record batch transformer
gbrgr Nov 14, 2025
35aba12
Fix clippy errors
gbrgr Nov 14, 2025
830e462
Format
gbrgr Nov 14, 2025
4eb8a63
Add `with_file_path_column` helper
gbrgr Nov 14, 2025
9d41b7f
Merge branch 'main' into feature/gb/file-column
gbrgr Nov 17, 2025
f3573e9
Merge branch 'feature/gb/file-column' into feature/gb/file-column-inc
gbrgr Nov 17, 2025
73777bb
Merge branch 'main' into feature/gb/file-column-inc
gbrgr Nov 17, 2025
22fcdd4
Port _file path column changes
gbrgr Nov 17, 2025
0b8f15b
Rename field
gbrgr Nov 17, 2025
7ce462e
Merge branch 'feature/gb/file-column' of github.com:RelationalAI/iceb…
gbrgr Nov 17, 2025
6cc22ec
Merge branch 'feature/gb/file-column' into feature/gb/file-column-inc
gbrgr Nov 17, 2025
88e1113
Adapt metadata column
gbrgr Nov 17, 2025
fca14bd
Rename method
gbrgr Nov 17, 2025
b7da6d3
Undo some changes
gbrgr Nov 17, 2025
7ebdf87
.
gbrgr Nov 17, 2025
edbc72a
Re-refactor tests
gbrgr Nov 17, 2025
4a08ee6
Undo reader test changes
gbrgr Nov 17, 2025
671fd4f
.
gbrgr Nov 17, 2025
b4d8f81
Merge branch 'feature/gb/file-column' into feature/gb/file-column-inc
gbrgr Nov 17, 2025
facb89a
Merge branch 'main' into feature/gb/file-column-inc
gbrgr Nov 18, 2025
8ed457f
Move import
gbrgr Nov 18, 2025
4cde4fa
PR comments
gbrgr Nov 18, 2025
0ea00bc
Merge branch 'feature/gb/file-column-inc' of github.com:RelationalAI/…
gbrgr Nov 18, 2025
d4cf3fe
PR comments
gbrgr Nov 19, 2025
1257d72
.
gbrgr Nov 19, 2025
c0db19f
.
gbrgr Nov 19, 2025
a4c8425
Clippy fix
gbrgr Nov 19, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 18 additions & 41 deletions crates/iceberg/src/arrow/incremental.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,17 @@
// specific language governing permissions and limitations
// under the License.

use std::collections::HashMap;
use std::pin::Pin;
use std::sync::Arc;

use arrow_array::{RecordBatch, UInt64Array};
use arrow_schema::{DataType, Field, Schema as ArrowSchema};
use arrow_schema::Schema as ArrowSchema;
use futures::channel::mpsc::channel;
use futures::stream::select;
use futures::{SinkExt, Stream, StreamExt, TryStreamExt};
use parquet::arrow::PARQUET_FIELD_ID_META_KEY;

use crate::arrow::record_batch_transformer::RecordBatchTransformerBuilder;
use crate::arrow::{
ArrowReader, RESERVED_COL_NAME_FILE_PATH, RESERVED_COL_NAME_POS, RESERVED_FIELD_ID_FILE_PATH,
RESERVED_FIELD_ID_POS, StreamsInto,
};
use crate::arrow::{ArrowReader, StreamsInto};
use crate::delete_vector::DeleteVector;
use crate::io::FileIO;
use crate::runtime::spawn;
Expand All @@ -43,19 +38,6 @@ use crate::{Error, ErrorKind, Result};
/// Default batch size for incremental delete operations.
const DEFAULT_BATCH_SIZE: usize = 1024;

/// Creates the schema for positional delete records containing the "pos" column.
/// The pos field includes the reserved field ID as metadata.
fn create_pos_delete_schema() -> Arc<ArrowSchema> {
let pos_field =
Field::new(RESERVED_COL_NAME_POS, DataType::UInt64, false).with_metadata(HashMap::from([
(
PARQUET_FIELD_ID_META_KEY.to_string(),
RESERVED_FIELD_ID_POS.to_string(),
),
]));
Arc::new(ArrowSchema::new(vec![pos_field]))
}

/// The type of incremental batch: appended data or deleted records.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum IncrementalBatchType {
Expand Down Expand Up @@ -254,10 +236,15 @@ async fn process_incremental_append_task(
record_batch_stream_builder = record_batch_stream_builder.with_projection(projection_mask);

// RecordBatchTransformer performs any transformations required on the RecordBatches
// that come back from the file, such as type promotion, default column insertion
// and column re-ordering
// that come back from the file, such as type promotion, default column insertion,
// column re-ordering, and virtual field addition (like _file)
let mut record_batch_transformer =
RecordBatchTransformerBuilder::new(task.schema_ref(), &task.base.project_field_ids).build();
RecordBatchTransformerBuilder::new(task.schema_ref(), &task.base.project_field_ids)
.with_constant(
crate::metadata_columns::RESERVED_FIELD_ID_FILE,
crate::spec::PrimitiveLiteral::String(task.base.data_file_path.clone()),
)?
.build();

if let Some(batch_size) = batch_size {
record_batch_stream_builder = record_batch_stream_builder.with_batch_size(batch_size);
Expand Down Expand Up @@ -295,7 +282,9 @@ fn process_incremental_delete_task(
delete_vector: DeleteVector,
batch_size: Option<usize>,
) -> Result<ArrowRecordBatchStream> {
let schema = create_pos_delete_schema();
let schema = Arc::new(ArrowSchema::new(vec![Arc::clone(
crate::metadata_columns::pos_field(),
)]));

let batch_size = batch_size.unwrap_or(DEFAULT_BATCH_SIZE);

Expand All @@ -315,14 +304,7 @@ fn process_incremental_delete_task(
"Failed to create RecordBatch for DeleteVector",
)
})
.and_then(|batch| {
ArrowReader::add_file_path_column(
batch,
&file_path,
RESERVED_COL_NAME_FILE_PATH,
RESERVED_FIELD_ID_FILE_PATH,
)
})
.and_then(|batch| ArrowReader::add_file_path_column(batch, &file_path))
});

Ok(Box::pin(stream) as ArrowRecordBatchStream)
Expand All @@ -333,7 +315,9 @@ fn process_incremental_deleted_file_task(
total_records: u64,
batch_size: Option<usize>,
) -> Result<ArrowRecordBatchStream> {
let schema = create_pos_delete_schema();
let schema = Arc::new(ArrowSchema::new(vec![Arc::clone(
crate::metadata_columns::pos_field(),
)]));

let batch_size = batch_size.unwrap_or(DEFAULT_BATCH_SIZE);

Expand All @@ -352,14 +336,7 @@ fn process_incremental_deleted_file_task(
"Failed to create RecordBatch for deleted file",
)
})
.and_then(|batch| {
ArrowReader::add_file_path_column(
batch,
&file_path,
RESERVED_COL_NAME_FILE_PATH,
RESERVED_FIELD_ID_FILE_PATH,
)
})
.and_then(|batch| ArrowReader::add_file_path_column(batch, &file_path))
});

Ok(Box::pin(stream) as ArrowRecordBatchStream)
Expand Down
2 changes: 2 additions & 0 deletions crates/iceberg/src/arrow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ mod incremental;
pub use incremental::*;
pub use reader::*;
pub use value::*;

// Re-export delete file constants for convenience
/// Partition value calculator for computing partition values
pub mod partition_value_calculator;
pub use partition_value_calculator::*;
Expand Down
Loading
Loading