diff --git a/parquet/benches/arrow_reader_clickbench.rs b/parquet/benches/arrow_reader_clickbench.rs index 039829f1b975..cf0491a19860 100644 --- a/parquet/benches/arrow_reader_clickbench.rs +++ b/parquet/benches/arrow_reader_clickbench.rs @@ -716,15 +716,15 @@ impl ReadTest { }; // setup the reader - let mut stream = ParquetRecordBatchStreamBuilder::new_with_metadata( + let builder = ParquetRecordBatchStreamBuilder::new_with_metadata( parquet_file, self.arrow_reader_metadata.clone(), ) .with_batch_size(8192) .with_projection(self.projection_mask.clone()) .with_row_filter(self.row_filter()) - .build() - .unwrap(); + .with_scatter_threshold(Some(0.01)); + let mut stream = builder.build().unwrap(); // run the stream to its end let mut row_count = 0; @@ -747,15 +747,15 @@ impl ReadTest { let reader = ParquetObjectReader::new(store, location); // setup the reader - let mut stream = ParquetRecordBatchStreamBuilder::new_with_metadata( + let builder = ParquetRecordBatchStreamBuilder::new_with_metadata( reader, self.arrow_reader_metadata.clone(), ) .with_batch_size(8192) .with_projection(self.projection_mask.clone()) .with_row_filter(self.row_filter()) - .build() - .unwrap(); + .with_scatter_threshold(Some(0.01)); + let mut stream = builder.build().unwrap(); // run the stream to its end let mut row_count = 0; @@ -774,15 +774,15 @@ impl ReadTest { }; // setup the reader - let reader = ParquetRecordBatchReaderBuilder::new_with_metadata( + let builder = ParquetRecordBatchReaderBuilder::new_with_metadata( parquet_file, self.arrow_reader_metadata.clone(), ) .with_batch_size(8192) .with_projection(self.projection_mask.clone()) .with_row_filter(self.row_filter()) - .build() - .unwrap(); + .with_scatter_threshold(Some(0.01)); + let reader = builder.build().unwrap(); // run the stream to its end let mut row_count = 0; diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index 1b02c4ae25d3..1e12e4141d60 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -139,6 +139,8 @@ pub struct ArrowReaderBuilder { pub(crate) metrics: ArrowReaderMetrics, pub(crate) max_predicate_cache_size: usize, + + pub(crate) scatter_threshold: Option, } impl Debug for ArrowReaderBuilder { @@ -157,6 +159,7 @@ impl Debug for ArrowReaderBuilder { .field("limit", &self.limit) .field("offset", &self.offset) .field("metrics", &self.metrics) + .field("scatter_threshold", &self.scatter_threshold) .finish() } } @@ -178,6 +181,7 @@ impl ArrowReaderBuilder { offset: None, metrics: ArrowReaderMetrics::Disabled, max_predicate_cache_size: 100 * 1024 * 1024, // 100MB default cache size + scatter_threshold: None, } } @@ -430,6 +434,32 @@ impl ArrowReaderBuilder { ..self } } + + /// Set a scatter threshold for filter deferral. + /// + /// The threshold is the maximum allowed **selector density** + /// (`selector_count / row_count`). If applying a predicate would + /// produce a density above this value, its result is deferred. + /// For example, `0.25` allows at most 25 selectors per 100 rows. + /// + /// A high selector density means many small skip/read transitions, + /// which slows subsequent predicate evaluation and data decoding. + /// Deferring scattering predicates keeps the selection contiguous + /// for intermediate steps. + /// + /// The deferred results are applied at the end via + /// [`RowSelection::intersection`], so correctness is preserved. + /// + /// `None` disables deferral (the default). + /// + /// [`RowFilter`]: crate::arrow::arrow_reader::RowFilter + /// [`RowSelection::intersection`]: crate::arrow::arrow_reader::RowSelection::intersection + pub fn with_scatter_threshold(self, threshold: Option) -> Self { + Self { + scatter_threshold: threshold, + ..self + } + } } /// Options that control how [`ParquetMetaData`] is read when constructing @@ -1188,6 +1218,7 @@ impl ParquetRecordBatchReaderBuilder { metrics, // Not used for the sync reader, see https://github.com/apache/arrow-rs/issues/8000 max_predicate_cache_size: _, + scatter_threshold, } = self; // Try to avoid allocate large buffer @@ -1203,7 +1234,8 @@ impl ParquetRecordBatchReaderBuilder { let mut plan_builder = ReadPlanBuilder::new(batch_size) .with_selection(selection) - .with_row_selection_policy(row_selection_policy); + .with_row_selection_policy(row_selection_policy) + .with_scatter_threshold(scatter_threshold); // Update selection based on any filters if let Some(filter) = filter.as_mut() { @@ -1217,7 +1249,13 @@ impl ParquetRecordBatchReaderBuilder { .with_parquet_metadata(&reader.metadata) .build_array_reader(fields.as_deref(), predicate.projection())?; - plan_builder = plan_builder.with_predicate(array_reader, predicate.as_mut())?; + let row_count: usize = reader + .row_groups + .iter() + .map(|&i| reader.metadata.row_group(i).num_rows() as usize) + .sum(); + plan_builder = + plan_builder.with_predicate(array_reader, predicate.as_mut(), row_count)?; } } diff --git a/parquet/src/arrow/arrow_reader/read_plan.rs b/parquet/src/arrow/arrow_reader/read_plan.rs index 99ffe0febc95..7d49d9d30350 100644 --- a/parquet/src/arrow/arrow_reader/read_plan.rs +++ b/parquet/src/arrow/arrow_reader/read_plan.rs @@ -37,6 +37,18 @@ pub struct ReadPlanBuilder { selection: Option, /// Policy to use when materializing the row selection row_selection_policy: RowSelectionPolicy, + /// Maximum allowed selector density (selectors / rows) for applying a + /// predicate result. + /// + /// When set, if applying a predicate would produce a selector density + /// above this threshold, the result is deferred into `deferred_selection` + /// instead. For example, `0.25` means at most 25 selectors per 100 rows; + /// anything more fragmented gets deferred. + /// + /// `None` disables deferral (all predicates applied immediately). + scatter_threshold: Option, + /// Accumulated deferred selections, merged via `intersection` at build time. + deferred_selection: Option, } impl ReadPlanBuilder { @@ -46,6 +58,8 @@ impl ReadPlanBuilder { batch_size, selection: None, row_selection_policy: RowSelectionPolicy::default(), + scatter_threshold: None, + deferred_selection: None, } } @@ -68,6 +82,27 @@ impl ReadPlanBuilder { &self.row_selection_policy } + /// Set the scatter threshold for filter deferral. + /// + /// The threshold is the maximum allowed **selector density** + /// (`selector_count / row_count`). If applying a predicate would produce + /// a density above this value, its result is deferred. For example, + /// `0.25` allows at most 25 selectors per 100 rows. + /// + /// A high selector density means many small skip/read transitions, + /// which is expensive for subsequent predicate evaluation and data + /// decoding. Deferring scattering predicates keeps the selection + /// contiguous for intermediate steps. + /// + /// The deferred results are merged via [`RowSelection::intersection`] at + /// build time, so correctness is preserved. + /// + /// `None` disables deferral (all predicates applied immediately). + pub fn with_scatter_threshold(mut self, threshold: Option) -> Self { + self.scatter_threshold = threshold; + self + } + /// Returns the current selection, if any pub fn selection(&self) -> Option<&RowSelection> { self.selection.as_ref() @@ -147,8 +182,19 @@ impl ReadPlanBuilder { mut self, array_reader: Box, predicate: &mut dyn ArrowPredicate, + row_count: usize, ) -> Result { - let reader = ParquetRecordBatchReader::new(array_reader, self.clone().build()); + // Build a ReadPlan for the predicate reader using only the current + // selection (not deferred). We avoid cloning the entire builder by + // constructing a minimal ReadPlanBuilder with just what build() needs. + let plan_for_reader = ReadPlanBuilder { + batch_size: self.batch_size, + selection: self.selection.clone(), + row_selection_policy: self.row_selection_policy, + scatter_threshold: None, + deferred_selection: None, + }; + let reader = ParquetRecordBatchReader::new(array_reader, plan_for_reader.build()); let mut filters = vec![]; for maybe_batch in reader { let maybe_batch = maybe_batch?; @@ -175,15 +221,54 @@ impl ReadPlanBuilder { return Ok(self); } let raw = RowSelection::from_filters(&filters); - self.selection = match self.selection.take() { - Some(selection) => Some(selection.and_then(&raw)), - None => Some(raw), + + // Compute the absolute-position result + let absolute = match self.selection.as_ref() { + Some(selection) => selection.and_then(&raw), + None => raw, }; + + // Check if applying this predicate would scatter the selection too much. + // Measure selector density: selectors / total_rows. A high density means + // many small skip/read transitions per row, which is expensive for decoding. + // Only defer if the predicate actually increases fragmentation — if it + // reduces the selector count, always apply it. + let current_selectors = self.selection.as_ref().map_or(0, |s| s.selector_count()); + let should_defer = self.scatter_threshold.is_some_and(|threshold| { + row_count > 0 + && absolute.selector_count() > current_selectors + && absolute.selector_count() as f64 / row_count as f64 > threshold + }); + + if should_defer { + // Defer: accumulate into deferred_selection, leave self.selection unchanged + self.deferred_selection = Some(match self.deferred_selection.take() { + Some(existing) => existing.intersection(&absolute), + None => absolute, + }); + } else { + // Apply normally + self.selection = Some(absolute); + } + Ok(self) } + /// Merge any deferred selection into the main selection. + fn merge_deferred(&mut self) { + if let Some(deferred) = self.deferred_selection.take() { + self.selection = Some(match self.selection.take() { + Some(selection) => selection.intersection(&deferred), + None => deferred, + }); + } + } + /// Create a final `ReadPlan` the read plan for the scan pub fn build(mut self) -> ReadPlan { + // Merge any deferred selection before finalizing + self.merge_deferred(); + // If selection is empty, truncate if !self.selects_any() { self.selection = Some(RowSelection::from(vec![])); @@ -196,6 +281,8 @@ impl ReadPlanBuilder { batch_size, selection, row_selection_policy: _, + scatter_threshold: _, + deferred_selection: _, } = self; let selection = selection.map(|s| s.trim()); @@ -268,6 +355,10 @@ impl LimitedReadPlanBuilder { limit, } = self; + // Merge deferred selection before applying offset/limit so that + // offset and limit operate on the correctly filtered row set. + inner.merge_deferred(); + // If the selection is empty, truncate if !inner.selects_any() { inner.selection = Some(RowSelection::from(vec![])); @@ -367,4 +458,103 @@ mod tests { RowSelectionStrategy::Selectors ); } + + /// Helper to build a `ReadPlanBuilder` with deferred selection set directly + fn builder_with_deferred( + selection: Option, + deferred: RowSelection, + ) -> ReadPlanBuilder { + ReadPlanBuilder { + batch_size: 1024, + selection, + row_selection_policy: RowSelectionPolicy::default(), + scatter_threshold: None, + deferred_selection: Some(deferred), + } + } + + #[test] + fn test_merge_deferred_no_prior_selection() { + // Deferred selection with no main selection: result = deferred + let deferred = RowSelection::from(vec![RowSelector::select(90), RowSelector::skip(10)]); + let mut builder = builder_with_deferred(None, deferred); + builder.merge_deferred(); + let sel = builder.selection.unwrap(); + assert_eq!(sel.row_count(), 90); + assert_eq!(sel.skipped_row_count(), 10); + assert!(builder.deferred_selection.is_none()); + } + + #[test] + fn test_merge_deferred_with_prior_selection() { + // Main selects first 50, deferred selects rows 0..40 and 50..100 + // Intersection should select rows 0..40 (first 40 of 100) + let main_sel = RowSelection::from(vec![RowSelector::select(50), RowSelector::skip(50)]); + let deferred = RowSelection::from(vec![ + RowSelector::select(40), + RowSelector::skip(10), + RowSelector::select(50), + ]); + let mut builder = builder_with_deferred(Some(main_sel), deferred); + builder.merge_deferred(); + let sel = builder.selection.unwrap(); + assert_eq!(sel.row_count(), 40); + } + + #[test] + fn test_merge_deferred_in_build() { + // Verify that build() merges deferred before creating the ReadPlan + let deferred = RowSelection::from(vec![RowSelector::select(80), RowSelector::skip(20)]); + let builder = builder_with_deferred(None, deferred); + // build() should merge and produce a plan that selects 80 rows + let _plan = builder.build(); + // If it didn't panic, the merge worked (selection was properly set) + } + + #[test] + fn test_merge_deferred_in_build_limited() { + // Verify that build_limited() merges deferred before applying offset/limit + let deferred = RowSelection::from(vec![RowSelector::select(80), RowSelector::skip(20)]); + let builder = builder_with_deferred(None, deferred); + let limited = builder.limited(100).with_limit(Some(50)).build_limited(); + let sel = limited.selection.unwrap(); + // After merge: 80 selected, 20 skipped. After limit(50): 50 selected. + assert_eq!(sel.row_count(), 50); + } + + #[test] + fn test_scatter_threshold_setter() { + let builder = ReadPlanBuilder::new(1024); + assert!(builder.scatter_threshold.is_none()); + assert!(builder.deferred_selection.is_none()); + + let builder = builder.with_scatter_threshold(Some(0.9)); + assert_eq!(builder.scatter_threshold, Some(0.9)); + } + + #[test] + fn test_no_deferred_when_threshold_disabled() { + // Without threshold, deferred_selection should always remain None + let builder = ReadPlanBuilder::new(1024); + assert!(builder.scatter_threshold.is_none()); + assert!(builder.deferred_selection.is_none()); + } + + #[test] + fn test_multiple_deferred_selections_intersected() { + // Two deferred selections should be intersected + let deferred1 = RowSelection::from(vec![RowSelector::select(80), RowSelector::skip(20)]); + let deferred2 = RowSelection::from(vec![ + RowSelector::skip(10), + RowSelector::select(70), + RowSelector::skip(20), + ]); + // intersection: only rows 10..80 (70 rows) + let mut builder = builder_with_deferred(None, deferred1); + builder.deferred_selection = + Some(builder.deferred_selection.unwrap().intersection(&deferred2)); + builder.merge_deferred(); + let sel = builder.selection.unwrap(); + assert_eq!(sel.row_count(), 70); + } } diff --git a/parquet/src/arrow/arrow_reader/selection.rs b/parquet/src/arrow/arrow_reader/selection.rs index 2ddf812f9c39..cf040644893d 100644 --- a/parquet/src/arrow/arrow_reader/selection.rs +++ b/parquet/src/arrow/arrow_reader/selection.rs @@ -504,6 +504,14 @@ impl RowSelection { self.selectors.iter() } + /// Returns the number of selectors (select/skip segments) in this selection. + /// + /// A higher count indicates a more fragmented selection, which can slow + /// sequential reads due to many small skip/read transitions. + pub fn selector_count(&self) -> usize { + self.selectors.len() + } + /// Returns the number of selected rows pub fn row_count(&self) -> usize { self.iter().filter(|s| !s.skip).map(|s| s.row_count).sum() diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs index 9e45a0c3168c..622d5a029b6e 100644 --- a/parquet/src/arrow/async_reader/mod.rs +++ b/parquet/src/arrow/async_reader/mod.rs @@ -497,6 +497,7 @@ impl ParquetRecordBatchStreamBuilder { offset, metrics, max_predicate_cache_size, + scatter_threshold, } = self; // Ensure schema of ParquetRecordBatchStream respects projection, and does @@ -522,6 +523,7 @@ impl ParquetRecordBatchStreamBuilder { offset, metrics, max_predicate_cache_size, + scatter_threshold, } .build()?; diff --git a/parquet/src/arrow/push_decoder/mod.rs b/parquet/src/arrow/push_decoder/mod.rs index cdb0715edb55..80e1cdaaeacf 100644 --- a/parquet/src/arrow/push_decoder/mod.rs +++ b/parquet/src/arrow/push_decoder/mod.rs @@ -176,6 +176,7 @@ impl ParquetPushDecoderBuilder { metrics, row_selection_policy, max_predicate_cache_size, + scatter_threshold, } = self; // If no row groups were specified, read all of them @@ -197,6 +198,7 @@ impl ParquetPushDecoderBuilder { max_predicate_cache_size, buffers, row_selection_policy, + scatter_threshold, ); // Initialize the decoder with the configured options diff --git a/parquet/src/arrow/push_decoder/reader_builder/mod.rs b/parquet/src/arrow/push_decoder/reader_builder/mod.rs index d3d78ca7c263..ed74967f47a4 100644 --- a/parquet/src/arrow/push_decoder/reader_builder/mod.rs +++ b/parquet/src/arrow/push_decoder/reader_builder/mod.rs @@ -160,6 +160,9 @@ pub(crate) struct RowGroupReaderBuilder { /// Strategy for materialising row selections row_selection_policy: RowSelectionPolicy, + /// Scatter threshold for filter deferral + scatter_threshold: Option, + /// Current state of the decoder. /// /// It is taken when processing, and must be put back before returning @@ -185,6 +188,7 @@ impl RowGroupReaderBuilder { max_predicate_cache_size: usize, buffers: PushBuffers, row_selection_policy: RowSelectionPolicy, + scatter_threshold: Option, ) -> Self { Self { batch_size, @@ -197,6 +201,7 @@ impl RowGroupReaderBuilder { metrics, max_predicate_cache_size, row_selection_policy, + scatter_threshold, state: Some(RowGroupDecoderState::Finished), buffers, } @@ -242,7 +247,8 @@ impl RowGroupReaderBuilder { } let plan_builder = ReadPlanBuilder::new(self.batch_size) .with_selection(selection) - .with_row_selection_policy(self.row_selection_policy); + .with_row_selection_policy(self.row_selection_policy) + .with_scatter_threshold(self.scatter_threshold); let row_group_info = RowGroupInfo { row_group_idx, @@ -454,8 +460,11 @@ impl RowGroupReaderBuilder { ); // `with_predicate` actually evaluates the filter - plan_builder = - plan_builder.with_predicate(array_reader, filter_info.current_mut())?; + plan_builder = plan_builder.with_predicate( + array_reader, + filter_info.current_mut(), + row_count, + )?; let row_group_info = RowGroupInfo { row_group_idx, @@ -729,6 +738,6 @@ mod tests { #[test] // Verify that the size of RowGroupDecoderState does not grow too large fn test_structure_size() { - assert_eq!(std::mem::size_of::(), 200); + assert_eq!(std::mem::size_of::(), 240); } }