Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions parquet/benches/arrow_reader_clickbench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -716,15 +716,15 @@ impl ReadTest {
};

// setup the reader
let mut stream = ParquetRecordBatchStreamBuilder::new_with_metadata(
let builder = ParquetRecordBatchStreamBuilder::new_with_metadata(
parquet_file,
self.arrow_reader_metadata.clone(),
)
.with_batch_size(8192)
.with_projection(self.projection_mask.clone())
.with_row_filter(self.row_filter())
.build()
.unwrap();
.with_scatter_threshold(Some(0.01));
let mut stream = builder.build().unwrap();

// run the stream to its end
let mut row_count = 0;
Expand All @@ -747,15 +747,15 @@ impl ReadTest {
let reader = ParquetObjectReader::new(store, location);

// setup the reader
let mut stream = ParquetRecordBatchStreamBuilder::new_with_metadata(
let builder = ParquetRecordBatchStreamBuilder::new_with_metadata(
reader,
self.arrow_reader_metadata.clone(),
)
.with_batch_size(8192)
.with_projection(self.projection_mask.clone())
.with_row_filter(self.row_filter())
.build()
.unwrap();
.with_scatter_threshold(Some(0.01));
let mut stream = builder.build().unwrap();

// run the stream to its end
let mut row_count = 0;
Expand All @@ -774,15 +774,15 @@ impl ReadTest {
};

// setup the reader
let reader = ParquetRecordBatchReaderBuilder::new_with_metadata(
let builder = ParquetRecordBatchReaderBuilder::new_with_metadata(
parquet_file,
self.arrow_reader_metadata.clone(),
)
.with_batch_size(8192)
.with_projection(self.projection_mask.clone())
.with_row_filter(self.row_filter())
.build()
.unwrap();
.with_scatter_threshold(Some(0.01));
let reader = builder.build().unwrap();

// run the stream to its end
let mut row_count = 0;
Expand Down
42 changes: 40 additions & 2 deletions parquet/src/arrow/arrow_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,8 @@ pub struct ArrowReaderBuilder<T> {
pub(crate) metrics: ArrowReaderMetrics,

pub(crate) max_predicate_cache_size: usize,

pub(crate) scatter_threshold: Option<f64>,
}

impl<T: Debug> Debug for ArrowReaderBuilder<T> {
Expand All @@ -157,6 +159,7 @@ impl<T: Debug> Debug for ArrowReaderBuilder<T> {
.field("limit", &self.limit)
.field("offset", &self.offset)
.field("metrics", &self.metrics)
.field("scatter_threshold", &self.scatter_threshold)
.finish()
}
}
Expand All @@ -178,6 +181,7 @@ impl<T> ArrowReaderBuilder<T> {
offset: None,
metrics: ArrowReaderMetrics::Disabled,
max_predicate_cache_size: 100 * 1024 * 1024, // 100MB default cache size
scatter_threshold: None,
}
}

Expand Down Expand Up @@ -430,6 +434,32 @@ impl<T> ArrowReaderBuilder<T> {
..self
}
}

/// Set a scatter threshold for filter deferral.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this even more now! This is now dealing with selector density, something that would be hard to do on the datafusion side and is correlated but not equivalent to overall filter selectivity, which I think is what probably ends up mattering more for coarser IO patterns and such given object store range coalescing, pages, etc.

Copy link
Contributor Author

@Dandandan Dandandan Mar 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah - I also think this is quite a bit better - I think it's OK if a filter "only" selects 50% of the rows, if it is nicely packed that will probably be able to skip almost 50% of the pages and lead to efficient IO and decoding.

But (especially when combining multiple filters), chances are the filter gets too fragmented, a 50% packed one is better than a 25% selective one that just does select 1 / skip 1 / select 1 and will do the same amount of IO and having horrible decoding performance.

I think you could go even go further and check which combination of filtera are giving the best of both worlds (selectivity and fragmentation)

///
/// The threshold is the maximum allowed **selector density**
/// (`selector_count / row_count`). If applying a predicate would
/// produce a density above this value, its result is deferred.
/// For example, `0.25` allows at most 25 selectors per 100 rows.
///
/// A high selector density means many small skip/read transitions,
/// which slows subsequent predicate evaluation and data decoding.
/// Deferring scattering predicates keeps the selection contiguous
/// for intermediate steps.
///
/// The deferred results are applied at the end via
/// [`RowSelection::intersection`], so correctness is preserved.
///
/// `None` disables deferral (the default).
///
/// [`RowFilter`]: crate::arrow::arrow_reader::RowFilter
/// [`RowSelection::intersection`]: crate::arrow::arrow_reader::RowSelection::intersection
pub fn with_scatter_threshold(self, threshold: Option<f64>) -> Self {
Self {
scatter_threshold: threshold,
..self
}
}
}

/// Options that control how [`ParquetMetaData`] is read when constructing
Expand Down Expand Up @@ -1188,6 +1218,7 @@ impl<T: ChunkReader + 'static> ParquetRecordBatchReaderBuilder<T> {
metrics,
// Not used for the sync reader, see https://github.com/apache/arrow-rs/issues/8000
max_predicate_cache_size: _,
scatter_threshold,
} = self;

// Try to avoid allocate large buffer
Expand All @@ -1203,7 +1234,8 @@ impl<T: ChunkReader + 'static> ParquetRecordBatchReaderBuilder<T> {

let mut plan_builder = ReadPlanBuilder::new(batch_size)
.with_selection(selection)
.with_row_selection_policy(row_selection_policy);
.with_row_selection_policy(row_selection_policy)
.with_scatter_threshold(scatter_threshold);

// Update selection based on any filters
if let Some(filter) = filter.as_mut() {
Expand All @@ -1217,7 +1249,13 @@ impl<T: ChunkReader + 'static> ParquetRecordBatchReaderBuilder<T> {
.with_parquet_metadata(&reader.metadata)
.build_array_reader(fields.as_deref(), predicate.projection())?;

plan_builder = plan_builder.with_predicate(array_reader, predicate.as_mut())?;
let row_count: usize = reader
.row_groups
.iter()
.map(|&i| reader.metadata.row_group(i).num_rows() as usize)
.sum();
plan_builder =
plan_builder.with_predicate(array_reader, predicate.as_mut(), row_count)?;
}
}

Expand Down
Loading
Loading