Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
76 commits
Select commit Hold shift + click to select a range
fb5c737
WIP, initial draft of incremental scan
gbrgr Oct 23, 2025
0fec28d
Merge branch 'main' into gb/incremental-bootstrap
gbrgr Oct 23, 2025
f255405
.
gbrgr Oct 23, 2025
be932c8
.
gbrgr Oct 23, 2025
a378abe
cargo fmt
gbrgr Oct 23, 2025
317b5c6
Implement unzipped stream
gbrgr Oct 23, 2025
06694a8
Remove printlns
gbrgr Oct 23, 2025
4514695
Add API method for unzipped stream
gbrgr Oct 23, 2025
f56e068
.
gbrgr Oct 23, 2025
4125328
Remove comment
gbrgr Oct 23, 2025
9fa6360
Rename var
gbrgr Oct 23, 2025
14dd4e9
Add import
gbrgr Oct 23, 2025
837947a
Measure time
gbrgr Oct 23, 2025
5a7a223
Fix typo
gbrgr Oct 23, 2025
065c754
Undo some changes
gbrgr Oct 23, 2025
f9ea05c
Change type name
gbrgr Oct 23, 2025
06cba66
Add comment header
gbrgr Oct 24, 2025
d3d57e3
Merge branch 'main' into gb/incremental-bootstrap
gbrgr Oct 24, 2025
2ddde41
Fail when encountering equality deletes
gbrgr Oct 24, 2025
facbeeb
Add comments
gbrgr Oct 24, 2025
ae76630
Add some preliminary tests
gbrgr Oct 25, 2025
2780634
Format
gbrgr Oct 25, 2025
aeeab92
Merge branch 'main' into gb/incremental-bootstrap
gbrgr Oct 25, 2025
536bbc4
Remove playground
gbrgr Oct 25, 2025
1f35b46
Merge branch 'gb/incremental-bootstrap' of github.com:RelationalAI/ic…
gbrgr Oct 25, 2025
5ac1887
Add more tests
gbrgr Oct 27, 2025
1f72025
Clippy
gbrgr Oct 27, 2025
6d72c8c
.
gbrgr Oct 27, 2025
a024e60
.
gbrgr Oct 27, 2025
773a9d0
Adapt tests
gbrgr Oct 27, 2025
416a56d
.
gbrgr Oct 27, 2025
e06c118
Add test
gbrgr Oct 27, 2025
dc56ff7
Add tests
gbrgr Oct 27, 2025
3b2b6a0
Add tests
gbrgr Oct 27, 2025
af28705
Format
gbrgr Oct 27, 2025
feb1cfe
Add test
gbrgr Oct 27, 2025
dd1eec8
Format
gbrgr Oct 27, 2025
ba5d3b1
.
gbrgr Oct 27, 2025
d8e6bc9
Rm newline
gbrgr Oct 27, 2025
32cf060
Rename trait function
gbrgr Oct 27, 2025
3b57c88
Reuse schema
gbrgr Oct 27, 2025
07a4394
.
gbrgr Oct 27, 2025
87587d6
remove clone
gbrgr Oct 27, 2025
1e66b4b
Add test for adding file_path column
gbrgr Oct 28, 2025
b77f37f
Make `from_snapshot` mandatory
gbrgr Oct 28, 2025
2204b5c
Error out if incremental scan encounters neither Append nor Delete
gbrgr Oct 28, 2025
a281d7f
.
gbrgr Oct 28, 2025
7240249
Add materialized variant of add_file_path_column
gbrgr Oct 28, 2025
170e0cf
.
gbrgr Oct 28, 2025
c02af9e
Allow dead code
gbrgr Oct 28, 2025
299e274
Some PR comments
gbrgr Oct 28, 2025
2c77259
.
gbrgr Oct 28, 2025
70683de
More PR comments
gbrgr Oct 28, 2025
e4ad209
.
gbrgr Oct 28, 2025
279df44
Add comments
gbrgr Oct 28, 2025
630d623
Avoid cloning
gbrgr Oct 28, 2025
337c73e
Add reference to PR
gbrgr Oct 29, 2025
18ff3b3
Merge branch 'main' into gb/incremental-bootstrap
gbrgr Oct 29, 2025
e31f76e
Some PR comments
gbrgr Oct 29, 2025
02f9a81
.
gbrgr Oct 29, 2025
ec24817
format
gbrgr Oct 29, 2025
efc57e8
Allow overwrite operation for now
gbrgr Oct 29, 2025
f78fcca
Fix file_path column
gbrgr Oct 29, 2025
97efb4c
Add overwrite test
gbrgr Oct 30, 2025
055b9b0
Merge branch 'main' into gb/incremental-bootstrap
gbrgr Oct 30, 2025
3aa40cb
Unwrap delete vector
gbrgr Oct 30, 2025
584a684
.
gbrgr Oct 30, 2025
e666dfd
Merge branch 'gb/incremental-bootstrap' of github.com:RelationalAI/ic…
gbrgr Oct 30, 2025
ff8a38c
Add assertion
gbrgr Oct 30, 2025
f31bb64
Avoid cloning the mutex guard
gbrgr Oct 30, 2025
c6f5ab6
Abort when encountering a deleted delete file
gbrgr Oct 30, 2025
c0989b0
Adjust comment
gbrgr Oct 30, 2025
b95414c
Update crates/iceberg/src/arrow/reader.rs
gbrgr Oct 30, 2025
87da883
Add check
gbrgr Oct 30, 2025
4982ee2
Merge branch 'gb/incremental-bootstrap' of github.com:RelationalAI/ic…
gbrgr Oct 30, 2025
6c60b90
Update crates/iceberg/src/scan/incremental/mod.rs
vustef Oct 30, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 34 additions & 1 deletion crates/iceberg/src/arrow/delete_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,22 @@ enum EqDelState {
}

#[derive(Debug, Default)]
struct DeleteFileFilterState {
pub(crate) struct DeleteFileFilterState {
delete_vectors: HashMap<String, Arc<Mutex<DeleteVector>>>,
equality_deletes: HashMap<String, EqDelState>,
}

impl DeleteFileFilterState {
pub fn delete_vectors(&self) -> &HashMap<String, Arc<Mutex<DeleteVector>>> {
&self.delete_vectors
}

/// Remove and return the delete vector for the given data file path.
pub fn remove_delete_vector(&mut self, path: &str) -> Option<Arc<Mutex<DeleteVector>>> {
self.delete_vectors.remove(path)
}
}

#[derive(Clone, Debug, Default)]
pub(crate) struct DeleteFilter {
state: Arc<RwLock<DeleteFileFilterState>>,
Expand All @@ -65,6 +76,28 @@ impl DeleteFilter {
.and_then(|st| st.delete_vectors.get(delete_file_path).cloned())
}

pub(crate) fn with_read<F, G>(&self, f: F) -> Result<G>
where F: FnOnce(&DeleteFileFilterState) -> Result<G> {
let state = self.state.read().map_err(|e| {
Error::new(
ErrorKind::Unexpected,
format!("Failed to acquire read lock: {}", e),
)
})?;
f(&state)
}

pub(crate) fn with_write<F, G>(&self, f: F) -> Result<G>
where F: FnOnce(&mut DeleteFileFilterState) -> Result<G> {
let mut state = self.state.write().map_err(|e| {
Error::new(
ErrorKind::Unexpected,
format!("Failed to acquire write lock: {}", e),
)
})?;
f(&mut state)
}

pub(crate) fn try_start_eq_del_load(&self, file_path: &str) -> Option<Arc<Notify>> {
let mut state = self.state.write().unwrap();

Expand Down
271 changes: 271 additions & 0 deletions crates/iceberg/src/arrow/incremental.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,271 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::pin::Pin;
use std::sync::Arc;

use arrow_array::{RecordBatch, UInt64Array};
use arrow_schema::{DataType, Field, Schema as ArrowSchema};
use futures::channel::mpsc::channel;
use futures::stream::select;
use futures::{SinkExt, Stream, StreamExt, TryStreamExt};

use crate::arrow::record_batch_transformer::RecordBatchTransformer;
use crate::arrow::{
ArrowReader, RESERVED_COL_NAME_FILE_PATH, RESERVED_FIELD_ID_FILE_PATH, StreamsInto,
};
use crate::delete_vector::DeleteVector;
use crate::io::FileIO;
use crate::runtime::spawn;
use crate::scan::ArrowRecordBatchStream;
use crate::scan::incremental::{
AppendedFileScanTask, IncrementalFileScanTask, IncrementalFileScanTaskStream,
};
use crate::{Error, ErrorKind, Result};

/// The type of incremental batch: appended data or deleted records.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum IncrementalBatchType {
/// Appended records.
Append,
/// Deleted records.
Delete,
}

/// The stream of incremental Arrow `RecordBatch`es with batch type.
pub type CombinedIncrementalBatchRecordStream =
Pin<Box<dyn Stream<Item = Result<(IncrementalBatchType, RecordBatch)>> + Send + 'static>>;

/// Stream type for obtaining a separate stream of appended and deleted record batches.
pub type UnzippedIncrementalBatchRecordStream = (ArrowRecordBatchStream, ArrowRecordBatchStream);

impl StreamsInto<ArrowReader, CombinedIncrementalBatchRecordStream>
for IncrementalFileScanTaskStream
{
/// Takes a stream of `IncrementalFileScanTasks` and reads all the files. Returns a
/// stream of Arrow `RecordBatch`es containing the data from the files.
fn stream(self, reader: ArrowReader) -> Result<CombinedIncrementalBatchRecordStream> {
let (appends, deletes) =
StreamsInto::<ArrowReader, UnzippedIncrementalBatchRecordStream>::stream(self, reader)?;

let left = appends.map(|res| res.map(|batch| (IncrementalBatchType::Append, batch)));
let right = deletes.map(|res| res.map(|batch| (IncrementalBatchType::Delete, batch)));

Ok(Box::pin(select(left, right)) as CombinedIncrementalBatchRecordStream)
}
}

impl StreamsInto<ArrowReader, UnzippedIncrementalBatchRecordStream>
for IncrementalFileScanTaskStream
{
/// Takes a stream of `IncrementalFileScanTasks` and reads all the files. Returns two
/// separate streams of Arrow `RecordBatch`es containing appended data and deleted records.
fn stream(self, reader: ArrowReader) -> Result<UnzippedIncrementalBatchRecordStream> {
let (appends_tx, appends_rx) = channel(reader.concurrency_limit_data_files);
let (deletes_tx, deletes_rx) = channel(reader.concurrency_limit_data_files);

let batch_size = reader.batch_size;
let concurrency_limit_data_files = reader.concurrency_limit_data_files;

spawn(async move {
let _ = self
.try_for_each_concurrent(concurrency_limit_data_files, |task| {
let file_io = reader.file_io.clone();
let mut appends_tx = appends_tx.clone();
let mut deletes_tx = deletes_tx.clone();
async move {
match task {
IncrementalFileScanTask::Append(append_task) => {
spawn(async move {
let record_batch_stream = process_incremental_append_task(
append_task,
batch_size,
file_io,
)
.await;

match record_batch_stream {
Ok(mut stream) => {
while let Some(batch) = stream.next().await {
let result = appends_tx
.send(batch.map_err(|e| {
Error::new(
ErrorKind::Unexpected,
"failed to read appended record batch",
)
.with_source(e)
}))
.await;

if result.is_err() {
break;
}
}
}
Err(e) => {
let _ = appends_tx.send(Err(e)).await;
}
}
});
}
IncrementalFileScanTask::Delete(file_path, delete_vector) => {
spawn(async move {
let record_batch_stream = process_incremental_delete_task(
file_path,
delete_vector,
batch_size,
);

match record_batch_stream {
Ok(mut stream) => {
while let Some(batch) = stream.next().await {
let result = deletes_tx
.send(batch.map_err(|e| {
Error::new(
ErrorKind::Unexpected,
"failed to read deleted record batch",
)
.with_source(e)
}))
.await;

if result.is_err() {
break;
}
}
}
Err(e) => {
let _ = deletes_tx.send(Err(e)).await;
}
}
});
}
};

Ok(())
}
})
.await;
});

Ok((
Box::pin(appends_rx) as ArrowRecordBatchStream,
Box::pin(deletes_rx) as ArrowRecordBatchStream,
))
}
}

async fn process_incremental_append_task(
task: AppendedFileScanTask,
batch_size: Option<usize>,
file_io: FileIO,
) -> Result<ArrowRecordBatchStream> {
let mut record_batch_stream_builder = ArrowReader::create_parquet_record_batch_stream_builder(
&task.data_file_path,
file_io,
true,
)
.await?;

// Create a projection mask for the batch stream to select which columns in the
// Parquet file that we want in the response
let projection_mask = ArrowReader::get_arrow_projection_mask(
&task.project_field_ids,
&task.schema_ref(),
record_batch_stream_builder.parquet_schema(),
record_batch_stream_builder.schema(),
)?;
record_batch_stream_builder = record_batch_stream_builder.with_projection(projection_mask);

// RecordBatchTransformer performs any transformations required on the RecordBatches
// that come back from the file, such as type promotion, default column insertion
// and column re-ordering
let mut record_batch_transformer =
RecordBatchTransformer::build(task.schema_ref(), &task.project_field_ids);

if let Some(batch_size) = batch_size {
record_batch_stream_builder = record_batch_stream_builder.with_batch_size(batch_size);
}

// Apply positional deletes as row selections.
let row_selection = if let Some(positional_delete_indexes) = task.positional_deletes {
Some(ArrowReader::build_deletes_row_selection(
record_batch_stream_builder.metadata().row_groups(),
&None,
&positional_delete_indexes.lock().unwrap(),
)?)
} else {
None
};

if let Some(row_selection) = row_selection {
record_batch_stream_builder = record_batch_stream_builder.with_row_selection(row_selection);
}

// Build the batch stream and send all the RecordBatches that it generates
// to the requester.
let record_batch_stream = record_batch_stream_builder
.build()?
.map(move |batch| match batch {
Ok(batch) => record_batch_transformer.process_record_batch(batch),
Err(err) => Err(err.into()),
});

Ok(Box::pin(record_batch_stream) as ArrowRecordBatchStream)
}

fn process_incremental_delete_task(
file_path: String,
delete_vector: DeleteVector,
batch_size: Option<usize>,
) -> Result<ArrowRecordBatchStream> {
let schema = Arc::new(ArrowSchema::new(vec![Field::new(
"pos",
DataType::UInt64,
false,
)]));

let batch_size = batch_size.unwrap_or(1024);

let treemap = delete_vector.inner;

let stream = futures::stream::iter(treemap)
.chunks(batch_size)
.map(move |chunk| {
let array = UInt64Array::from_iter(chunk);
RecordBatch::try_new(
Arc::clone(&schema), // Cheap Arc clone instead of full schema creation
vec![Arc::new(array)],
)
.map_err(|_| {
Error::new(
ErrorKind::Unexpected,
"Failed to create RecordBatch for DeleteVector",
)
})
.and_then(|batch| {
ArrowReader::add_file_path_column(
batch,
&file_path,
RESERVED_COL_NAME_FILE_PATH,
RESERVED_FIELD_ID_FILE_PATH,
)
})
});

Ok(Box::pin(stream) as ArrowRecordBatchStream)
}
2 changes: 2 additions & 0 deletions crates/iceberg/src/arrow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ pub mod record_batch_projector;
pub(crate) mod record_batch_transformer;
mod value;

mod incremental;
pub use incremental::*;
pub use reader::*;
pub use value::*;
/// Partition value calculator for computing partition values
Expand Down
Loading
Loading