From 41e7bcb543969808d14cd46516b9af8efb8bea23 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Sat, 14 Feb 2026 16:51:49 +0100 Subject: [PATCH 01/15] Use per-predicate projection masks in ClickBench benchmark Previously, every predicate in the RowFilter received the same ProjectionMask containing ALL filter columns. This caused unnecessary decoding of expensive string columns when evaluating cheap integer predicates. Now each predicate receives a mask with only the single column it needs. Key sync improvements (vs baseline): - Q37: 63.7ms -> 7.3ms (-88.6%, Title LIKE with CounterID=62 filter) - Q36: 117ms -> 24ms (-79.5%, URL <> '' with CounterID=62 filter) - Q40: 17.9ms -> 5.1ms (-71.5%, multi-pred with RefererHash eq) - Q41: 17.3ms -> 5.5ms (-68.1%, multi-pred with URLHash eq) - Q22: 303ms -> 127ms (-58.2%, 3 string predicates) - Q42: 7.6ms -> 3.9ms (-48.5%, int-only multi-predicate) - Q38: 19.1ms -> 12.4ms (-34.9%, 5 int predicates) - Q21: 159ms -> 98ms (-38.5%, URL LIKE + SearchPhrase) Co-Authored-By: Claude Opus 4.6 --- parquet/benches/arrow_reader_clickbench.rs | 96 ++++------------------ 1 file changed, 16 insertions(+), 80 deletions(-) diff --git a/parquet/benches/arrow_reader_clickbench.rs b/parquet/benches/arrow_reader_clickbench.rs index 8635a5955715..9cf6d02b05f0 100644 --- a/parquet/benches/arrow_reader_clickbench.rs +++ b/parquet/benches/arrow_reader_clickbench.rs @@ -638,65 +638,6 @@ fn find_file_if_exists(mut current_dir: PathBuf, file_name: &str) -> Option `ProjectionMask` will be `[true, false, true]` = `[A, C]` -/// -/// `FilterIndices` will be `[1, 0]`, because column `C` (index 0 in -/// filter_columns) is selected at index 1 of the `ProjectionMask` and column -/// `A` (index 1 in `filter_columns`) is selected at index 0 of the -/// `ProjectionMask`. -struct FilterIndices { - /// * index is offset in Query::filter_columns - /// * value is offset in column selected by filter ProjectionMask - inner: Vec, -} - -impl FilterIndices { - /// Create a new `FilterIndices` from a list of column indices - /// - /// Parameters: - /// * `schema_descriptor`: The schema of the file - /// * `filter_schema_indices`: a list of column indices in the schema - fn new(schema_descriptor: &SchemaDescriptor, filter_schema_indices: Vec) -> Self { - for &filter_index in &filter_schema_indices { - assert!(filter_index < schema_descriptor.num_columns()); - } - // When the columns are selected using a ProjectionMask, they are - // returned in the order of the schema (not the order they were specified) - // - // So if the original schema indices are 5, 1, 3 (select the sixth and - // second and fourth column), the RecordBatch returned will select them - // in order 1, 3, 5, - // - // Thus we need a map to convert back to the original selection order - // `[1, 2, 0]` - let mut reordered: Vec<_> = filter_schema_indices.iter().enumerate().collect(); - reordered.sort_by_key(|(_projection_idx, original_schema_idx)| **original_schema_idx); - let mut inner = vec![0; reordered.len()]; - for (output_idx, (projection_idx, _original_schema_idx)) in - reordered.into_iter().enumerate() - { - inner[projection_idx] = output_idx; - } - Self { inner } - } - - /// Given the index of a column in `filter_columns`, return the index of the - /// column in the columns selected from `ProjectionMask` - fn map_column(&self, filter_columns_index: usize) -> usize { - // The selection index is the index in the filter mask - // The inner index is the index in the filter columns - self.inner[filter_columns_index] - } -} /// Encapsulates the test parameters for a single benchmark struct ReadTest { @@ -706,10 +647,8 @@ struct ReadTest { arrow_reader_metadata: ArrowReaderMetadata, /// Which columns in the file should be projected (decoded after filter)? projection_mask: ProjectionMask, - /// Which columns in the file should be passed to the filter? - filter_mask: ProjectionMask, - /// Mapping from column selected in filter mask to `Query::filter_columns` - filter_indices: FilterIndices, + /// Schema indices for each filter column (in filter_columns order) + filter_schema_indices: Vec, /// Predicates to apply predicates: Vec, /// How many rows are expected to pass the predicate? @@ -744,16 +683,12 @@ impl ReadTest { }; let filter_schema_indices = column_indices(schema_descr, &filter_columns); - let filter_mask = - ProjectionMask::leaves(schema_descr, filter_schema_indices.iter().cloned()); - let filter_indices = FilterIndices::new(schema_descr, filter_schema_indices); Self { name, arrow_reader_metadata, projection_mask, - filter_mask, - filter_indices, + filter_schema_indices, predicates, expected_row_count, } @@ -851,25 +786,26 @@ impl ReadTest { /// Return a `RowFilter` to apply to the reader. /// - /// Note that since `RowFilter` does not implement Clone, we need to create - /// the filter for each row + /// Each predicate gets a ProjectionMask containing only the single column + /// it needs, rather than all filter columns. This avoids decoding expensive + /// columns (e.g. strings) when evaluating cheap predicates (e.g. integer equality). fn row_filter(&self) -> RowFilter { - // Note: The predicates are in terms columns in the filter mask - // but the record batch passed back has columns in the order of the file - // schema + let schema_descr = self + .arrow_reader_metadata + .metadata() + .file_metadata() + .schema_descr(); - // Convert the predicates to ArrowPredicateFn to conform to the RowFilter API let arrow_predicates: Vec<_> = self .predicates .iter() .map(|pred| { - let orig_column_index = pred.column_index(); - let column_index = self.filter_indices.map_column(orig_column_index); + let schema_index = self.filter_schema_indices[pred.column_index()]; + let predicate_mask = ProjectionMask::leaves(schema_descr, [schema_index]); let mut predicate_fn = pred.predicate_fn(); - Box::new(ArrowPredicateFn::new( - self.filter_mask.clone(), - move |batch| (predicate_fn)(batch.column(column_index)), - )) as Box + Box::new(ArrowPredicateFn::new(predicate_mask, move |batch| { + (predicate_fn)(batch.column(0)) + })) as Box }) .collect(); From 8d7533b023c22fac1c639a112b35148e4f97200d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Sat, 14 Feb 2026 17:42:35 +0100 Subject: [PATCH 02/15] Add statistics-based page pruning to ClickBench benchmark Use page-level min/max statistics (via StatisticsConverter) to compute a RowSelection that skips pages where equality predicates cannot match. For each equality predicate with an integer literal, we check if the literal falls within each page's [min, max] range and skip pages where it doesn't. Impact is data-dependent - most effective when data is sorted/clustered by the filter column. For this particular 100K-row sample file the data isn't sorted by filter columns, so improvements are modest (~5% for some CounterID=62 queries). Would show larger gains on sorted datasets. Co-Authored-By: Claude Opus 4.6 --- parquet/benches/arrow_reader_clickbench.rs | 163 ++++++++++++++++++--- 1 file changed, 146 insertions(+), 17 deletions(-) diff --git a/parquet/benches/arrow_reader_clickbench.rs b/parquet/benches/arrow_reader_clickbench.rs index 9cf6d02b05f0..95c5525d8f51 100644 --- a/parquet/benches/arrow_reader_clickbench.rs +++ b/parquet/benches/arrow_reader_clickbench.rs @@ -30,23 +30,25 @@ //! //! [ClickBench]: https://benchmark.clickhouse.com/ -use arrow::compute::kernels::cmp::{eq, neq}; -use arrow::compute::{like, nlike, or}; +use arrow::compute::kernels::cmp::{eq, gt_eq, lt_eq, neq}; +use arrow::compute::{and, like, nlike, or}; use arrow_array::types::{Int16Type, Int32Type, Int64Type}; use arrow_array::{ArrayRef, ArrowPrimitiveType, BooleanArray, PrimitiveArray, StringViewArray}; use arrow_schema::{ArrowError, DataType, Schema}; use criterion::{Criterion, criterion_group, criterion_main}; use futures::StreamExt; use object_store::local::LocalFileSystem; +use parquet::arrow::arrow_reader::statistics::StatisticsConverter; use parquet::arrow::arrow_reader::{ ArrowPredicate, ArrowPredicateFn, ArrowReaderMetadata, ArrowReaderOptions, - ParquetRecordBatchReaderBuilder, RowFilter, + ParquetRecordBatchReaderBuilder, RowFilter, RowSelection, }; use parquet::arrow::async_reader::ParquetObjectReader; use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask}; use parquet::file::metadata::PageIndexPolicy; use parquet::schema::types::SchemaDescriptor; use std::fmt::{Display, Formatter}; +use std::ops::Range; use std::path::{Path, PathBuf}; use std::sync::{Arc, OnceLock}; @@ -482,6 +484,14 @@ fn all_queries() -> Vec { ] } +/// An equality literal value that can be used for page-level pruning +/// using min/max statistics from the page index. +#[derive(Clone)] +enum PagePruningLiteral { + Int32(i32), + Int64(i64), +} + /// Evaluate a predicate the input column specified offset relative to /// the provided filter column /// @@ -495,6 +505,9 @@ struct ClickBenchPredicate { /// This is necessary (and awkward) because `ArrowPredicateFn` does not /// implement `Clone`, so it must be created for each reader instance. predicate_factory: Box Box>, + /// If set, this predicate is an equality check that can use page-level + /// min/max statistics to skip pages that cannot contain the literal. + page_pruning_literal: Option, } impl ClickBenchPredicate { @@ -510,6 +523,7 @@ impl ClickBenchPredicate { Self { column_index, predicate_factory: Box::new(predicate_factory), + page_pruning_literal: None, } } @@ -524,10 +538,23 @@ impl ClickBenchPredicate { /// Create Predicate: col = literal fn eq_literal(column_index: usize, literal_value: T::Native) -> Self { - Self::new(column_index, move || { + let mut pred = Self::new(column_index, move || { let literal = PrimitiveArray::::new_scalar(literal_value); Box::new(move |col| eq(col, &literal)) - }) + }); + // Set page pruning literal for types we can prune on. + // Note: Int16 is stored as Int32 in parquet, so we widen to i32. + if std::any::TypeId::of::() == std::any::TypeId::of::() { + let val: i32 = unsafe { *(&literal_value as *const T::Native as *const i32) }; + pred.page_pruning_literal = Some(PagePruningLiteral::Int32(val)); + } else if std::any::TypeId::of::() == std::any::TypeId::of::() { + let val: i64 = unsafe { *(&literal_value as *const T::Native as *const i64) }; + pred.page_pruning_literal = Some(PagePruningLiteral::Int64(val)); + } else if std::any::TypeId::of::() == std::any::TypeId::of::() { + let val: i16 = unsafe { *(&literal_value as *const T::Native as *const i16) }; + pred.page_pruning_literal = Some(PagePruningLiteral::Int32(val as i32)); + } + pred } /// Create Predicate: col IN (lit1, lit2) @@ -706,15 +733,17 @@ impl ReadTest { }; // setup the reader - let mut stream = ParquetRecordBatchStreamBuilder::new_with_metadata( + let mut builder = ParquetRecordBatchStreamBuilder::new_with_metadata( parquet_file, self.arrow_reader_metadata.clone(), ) .with_batch_size(8192) .with_projection(self.projection_mask.clone()) - .with_row_filter(self.row_filter()) - .build() - .unwrap(); + .with_row_filter(self.row_filter()); + if let Some(selection) = self.compute_page_selection() { + builder = builder.with_row_selection(selection); + } + let mut stream = builder.build().unwrap(); // run the stream to its end let mut row_count = 0; @@ -737,15 +766,17 @@ impl ReadTest { let reader = ParquetObjectReader::new(store, location); // setup the reader - let mut stream = ParquetRecordBatchStreamBuilder::new_with_metadata( + let mut builder = ParquetRecordBatchStreamBuilder::new_with_metadata( reader, self.arrow_reader_metadata.clone(), ) .with_batch_size(8192) .with_projection(self.projection_mask.clone()) - .with_row_filter(self.row_filter()) - .build() - .unwrap(); + .with_row_filter(self.row_filter()); + if let Some(selection) = self.compute_page_selection() { + builder = builder.with_row_selection(selection); + } + let mut stream = builder.build().unwrap(); // run the stream to its end let mut row_count = 0; @@ -764,15 +795,17 @@ impl ReadTest { }; // setup the reader - let reader = ParquetRecordBatchReaderBuilder::new_with_metadata( + let mut builder = ParquetRecordBatchReaderBuilder::new_with_metadata( parquet_file, self.arrow_reader_metadata.clone(), ) .with_batch_size(8192) .with_projection(self.projection_mask.clone()) - .with_row_filter(self.row_filter()) - .build() - .unwrap(); + .with_row_filter(self.row_filter()); + if let Some(selection) = self.compute_page_selection() { + builder = builder.with_row_selection(selection); + } + let reader = builder.build().unwrap(); // run the stream to its end let mut row_count = 0; @@ -812,6 +845,102 @@ impl ReadTest { RowFilter::new(arrow_predicates) } + /// Compute a `RowSelection` based on page-level min/max statistics. + /// + /// For equality predicates, pages where `literal < min OR literal > max` are + /// skipped entirely, avoiding decoding them at all. + fn compute_page_selection(&self) -> Option { + let metadata = self.arrow_reader_metadata.metadata(); + let arrow_schema = self.arrow_reader_metadata.schema(); + let parquet_schema = metadata.file_metadata().schema_descr(); + let column_index = metadata.column_index()?; + let offset_index = metadata.offset_index()?; + let row_group_metadatas = metadata.row_groups(); + let row_groups: Vec = (0..metadata.num_row_groups()).collect(); + + let total_rows: usize = row_group_metadatas.iter().map(|rg| rg.num_rows() as usize).sum(); + let mut combined_selection: Option = None; + + for pred in &self.predicates { + let literal = match &pred.page_pruning_literal { + Some(lit) => lit, + None => continue, + }; + + let schema_index = self.filter_schema_indices[pred.column_index()]; + let col_name = parquet_schema + .root_schema() + .get_fields()[schema_index] + .name(); + + let converter = match StatisticsConverter::try_new(col_name, arrow_schema, parquet_schema) { + Ok(c) => c, + Err(_) => continue, + }; + + let page_mins = match converter.data_page_mins(column_index, offset_index, row_groups.iter()) { + Ok(m) => m, + Err(_) => continue, + }; + let page_maxes = match converter.data_page_maxes(column_index, offset_index, row_groups.iter()) { + Ok(m) => m, + Err(_) => continue, + }; + let page_row_counts = match converter.data_page_row_counts(offset_index, row_group_metadatas, row_groups.iter()) { + Ok(Some(c)) => c, + _ => continue, + }; + + // For an equality predicate `col = literal`: + // A page can contain matching rows only if min <= literal <= max + let pages_match = match literal { + PagePruningLiteral::Int32(val) => { + let scalar = PrimitiveArray::::new_scalar(*val); + let min_ok = lt_eq(&page_mins, &scalar).ok(); + let max_ok = gt_eq(&page_maxes, &scalar).ok(); + match (min_ok, max_ok) { + (Some(min_check), Some(max_check)) => and(&min_check, &max_check).ok(), + _ => None, + } + } + PagePruningLiteral::Int64(val) => { + let scalar = PrimitiveArray::::new_scalar(*val); + let min_ok = lt_eq(&page_mins, &scalar).ok(); + let max_ok = gt_eq(&page_maxes, &scalar).ok(); + match (min_ok, max_ok) { + (Some(min_check), Some(max_check)) => and(&min_check, &max_check).ok(), + _ => None, + } + } + }; + + let pages_match = match pages_match { + Some(m) => m, + None => continue, + }; + + // Build row ranges for pages that match + let mut ranges: Vec> = Vec::new(); + let mut row_offset = 0usize; + for (i, count) in page_row_counts.values().iter().enumerate() { + let count = *count as usize; + if i < pages_match.len() && pages_match.value(i) { + ranges.push(row_offset..row_offset + count); + } + row_offset += count; + } + + let page_selection = RowSelection::from_consecutive_ranges(ranges.into_iter(), total_rows); + + combined_selection = Some(match combined_selection { + Some(existing) => existing.intersection(&page_selection), + None => page_selection, + }); + } + + combined_selection + } + fn check_row_count(&self, row_count: usize) { let expected_row_count = self.expected_row_count; assert_eq!( From 15142b6b1a80842346b75a23cb8b4cebdd85dad8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Sat, 14 Feb 2026 17:44:32 +0100 Subject: [PATCH 03/15] Reorder Q22 predicates by selectivity for ClickBench benchmark Put the cheapest/most selective predicate first: SearchPhrase <> '' filters ~87% of rows before expensive LIKE predicates run. This reduces string column decoding for Title and URL significantly. Q22 sync: ~6% improvement, Q22 async: ~13% improvement. Co-Authored-By: Claude Opus 4.6 --- parquet/benches/arrow_reader_clickbench.rs | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/parquet/benches/arrow_reader_clickbench.rs b/parquet/benches/arrow_reader_clickbench.rs index 95c5525d8f51..f2f92ad001a3 100644 --- a/parquet/benches/arrow_reader_clickbench.rs +++ b/parquet/benches/arrow_reader_clickbench.rs @@ -251,14 +251,16 @@ fn all_queries() -> Vec { expected_row_count: 16, }, // Q22: SELECT "SearchPhrase", MIN("URL"), MIN("Title"), COUNT(*) AS c, COUNT(DISTINCT "UserID") FROM hits WHERE "Title" LIKE '%Google%' AND "URL" NOT LIKE '%.google.%' AND "SearchPhrase" <> '' GROUP BY "SearchPhrase" ORDER BY c DESC LIMIT 10; + // Predicate order: cheapest/most selective first. not_empty filters ~87% of rows + // before expensive LIKE predicates run, reducing string decoding significantly. Query { name: "Q22", - filter_columns: vec!["Title", "URL", "SearchPhrase"], + filter_columns: vec!["SearchPhrase", "Title", "URL"], projection_columns: vec!["SearchPhrase", "URL", "Title", "UserID"], predicates: vec![ - ClickBenchPredicate::like_Google(0), - ClickBenchPredicate::nlike_google(1), - ClickBenchPredicate::not_empty(2), + ClickBenchPredicate::not_empty(0), + ClickBenchPredicate::like_Google(1), + ClickBenchPredicate::nlike_google(2), ], expected_row_count: 46, }, From 498160ea418f0dc7308508fcfeeb81ef5b31af5f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Sat, 14 Feb 2026 20:22:17 +0100 Subject: [PATCH 04/15] [Test] Filter selectivity --- parquet/benches/arrow_reader_clickbench.rs | 9 +- parquet/src/arrow/arrow_reader/mod.rs | 30 ++- parquet/src/arrow/arrow_reader/read_plan.rs | 205 +++++++++++++++++- parquet/src/arrow/async_reader/mod.rs | 2 + parquet/src/arrow/push_decoder/mod.rs | 2 + .../arrow/push_decoder/reader_builder/mod.rs | 10 +- 6 files changed, 248 insertions(+), 10 deletions(-) diff --git a/parquet/benches/arrow_reader_clickbench.rs b/parquet/benches/arrow_reader_clickbench.rs index f2f92ad001a3..ee44cda17919 100644 --- a/parquet/benches/arrow_reader_clickbench.rs +++ b/parquet/benches/arrow_reader_clickbench.rs @@ -741,7 +741,8 @@ impl ReadTest { ) .with_batch_size(8192) .with_projection(self.projection_mask.clone()) - .with_row_filter(self.row_filter()); + .with_row_filter(self.row_filter()) + .with_selectivity_threshold(Some(0.5)); if let Some(selection) = self.compute_page_selection() { builder = builder.with_row_selection(selection); } @@ -774,7 +775,8 @@ impl ReadTest { ) .with_batch_size(8192) .with_projection(self.projection_mask.clone()) - .with_row_filter(self.row_filter()); + .with_row_filter(self.row_filter()) + .with_selectivity_threshold(Some(0.5)); if let Some(selection) = self.compute_page_selection() { builder = builder.with_row_selection(selection); } @@ -803,7 +805,8 @@ impl ReadTest { ) .with_batch_size(8192) .with_projection(self.projection_mask.clone()) - .with_row_filter(self.row_filter()); + .with_row_filter(self.row_filter()) + .with_selectivity_threshold(Some(0.5)); if let Some(selection) = self.compute_page_selection() { builder = builder.with_row_selection(selection); } diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index 670f9d80c5a3..7c45130c2346 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -139,6 +139,8 @@ pub struct ArrowReaderBuilder { pub(crate) metrics: ArrowReaderMetrics, pub(crate) max_predicate_cache_size: usize, + + pub(crate) selectivity_threshold: Option, } impl Debug for ArrowReaderBuilder { @@ -157,6 +159,7 @@ impl Debug for ArrowReaderBuilder { .field("limit", &self.limit) .field("offset", &self.offset) .field("metrics", &self.metrics) + .field("selectivity_threshold", &self.selectivity_threshold) .finish() } } @@ -178,6 +181,7 @@ impl ArrowReaderBuilder { offset: None, metrics: ArrowReaderMetrics::Disabled, max_predicate_cache_size: 100 * 1024 * 1024, // 100MB default cache size + selectivity_threshold: None, } } @@ -430,6 +434,28 @@ impl ArrowReaderBuilder { ..self } } + + /// Set a selectivity threshold for filter deferral. + /// + /// When a predicate in the [`RowFilter`] passes more than `threshold` + /// fraction of rows (e.g., 0.9 means 90%), its result is deferred rather + /// than immediately applied to the row selection. This prevents + /// non-selective predicates from creating fragmented selections that slow + /// subsequent predicate evaluation and the final data read. + /// + /// The deferred results are applied at the end via + /// [`RowSelection::intersection`], so correctness is preserved. + /// + /// `None` disables deferral (the default). + /// + /// [`RowFilter`]: crate::arrow::arrow_reader::RowFilter + /// [`RowSelection::intersection`]: crate::arrow::arrow_reader::RowSelection::intersection + pub fn with_selectivity_threshold(self, threshold: Option) -> Self { + Self { + selectivity_threshold: threshold, + ..self + } + } } /// Options that control how [`ParquetMetaData`] is read when constructing @@ -1188,6 +1214,7 @@ impl ParquetRecordBatchReaderBuilder { metrics, // Not used for the sync reader, see https://github.com/apache/arrow-rs/issues/8000 max_predicate_cache_size: _, + selectivity_threshold, } = self; // Try to avoid allocate large buffer @@ -1203,7 +1230,8 @@ impl ParquetRecordBatchReaderBuilder { let mut plan_builder = ReadPlanBuilder::new(batch_size) .with_selection(selection) - .with_row_selection_policy(row_selection_policy); + .with_row_selection_policy(row_selection_policy) + .with_selectivity_threshold(selectivity_threshold); // Update selection based on any filters if let Some(filter) = filter.as_mut() { diff --git a/parquet/src/arrow/arrow_reader/read_plan.rs b/parquet/src/arrow/arrow_reader/read_plan.rs index 7c9eb36befe3..b22fae95e717 100644 --- a/parquet/src/arrow/arrow_reader/read_plan.rs +++ b/parquet/src/arrow/arrow_reader/read_plan.rs @@ -37,6 +37,18 @@ pub struct ReadPlanBuilder { selection: Option, /// Policy to use when materializing the row selection row_selection_policy: RowSelectionPolicy, + /// Selectivity threshold above which a predicate's result is deferred + /// rather than applied immediately to `selection`. + /// + /// When set, predicates whose selectivity (fraction of rows passing) + /// exceeds this threshold will have their result accumulated in + /// `deferred_selection` instead of fragmenting `selection`. This keeps + /// subsequent predicate evaluations operating on a contiguous selection. + /// + /// `None` disables deferral (all predicates applied immediately). + selectivity_threshold: Option, + /// Accumulated deferred selections, merged via `intersection` at build time. + deferred_selection: Option, } impl ReadPlanBuilder { @@ -46,6 +58,8 @@ impl ReadPlanBuilder { batch_size, selection: None, row_selection_policy: RowSelectionPolicy::default(), + selectivity_threshold: None, + deferred_selection: None, } } @@ -68,6 +82,22 @@ impl ReadPlanBuilder { &self.row_selection_policy } + /// Set the selectivity threshold for filter deferral. + /// + /// When a predicate's selectivity (fraction of rows passing) exceeds this + /// threshold, its result is deferred rather than immediately applied to + /// the row selection. This prevents non-selective predicates from + /// fragmenting the selection and slowing subsequent predicate evaluation. + /// + /// The deferred results are merged via [`RowSelection::intersection`] at + /// build time, so correctness is preserved. + /// + /// `None` disables deferral (all predicates applied immediately). + pub fn with_selectivity_threshold(mut self, threshold: Option) -> Self { + self.selectivity_threshold = threshold; + self + } + /// Returns the current selection, if any pub fn selection(&self) -> Option<&RowSelection> { self.selection.as_ref() @@ -148,7 +178,13 @@ impl ReadPlanBuilder { array_reader: Box, predicate: &mut dyn ArrowPredicate, ) -> Result { - let reader = ParquetRecordBatchReader::new(array_reader, self.clone().build()); + // Build a ReadPlan from only self.selection (not deferred) so the + // reader sees the same rows that self.selection describes. Deferred + // selections must not be merged here because the raw filter result + // produced below is relative to self.selection. + let mut plan_for_reader = self.clone(); + plan_for_reader.deferred_selection = None; + let reader = ParquetRecordBatchReader::new(array_reader, plan_for_reader.build()); let mut filters = vec![]; for maybe_batch in reader { let maybe_batch = maybe_batch?; @@ -168,15 +204,49 @@ impl ReadPlanBuilder { } let raw = RowSelection::from_filters(&filters); - self.selection = match self.selection.take() { - Some(selection) => Some(selection.and_then(&raw)), - None => Some(raw), + + // Check if this predicate should be deferred due to high selectivity + let should_defer = self.selectivity_threshold.is_some_and(|threshold| { + let selected = raw.row_count(); + let total = selected + raw.skipped_row_count(); + total > 0 && (selected as f64 / total as f64) > threshold + }); + + // Compute the absolute-position result + let absolute = match self.selection.as_ref() { + Some(selection) => selection.and_then(&raw), + None => raw, }; + + if should_defer { + // Defer: accumulate into deferred_selection, leave self.selection unchanged + self.deferred_selection = Some(match self.deferred_selection.take() { + Some(existing) => existing.intersection(&absolute), + None => absolute, + }); + } else { + // Apply normally + self.selection = Some(absolute); + } + Ok(self) } + /// Merge any deferred selection into the main selection. + fn merge_deferred(&mut self) { + if let Some(deferred) = self.deferred_selection.take() { + self.selection = Some(match self.selection.take() { + Some(selection) => selection.intersection(&deferred), + None => deferred, + }); + } + } + /// Create a final `ReadPlan` the read plan for the scan pub fn build(mut self) -> ReadPlan { + // Merge any deferred selection before finalizing + self.merge_deferred(); + // If selection is empty, truncate if !self.selects_any() { self.selection = Some(RowSelection::from(vec![])); @@ -189,6 +259,8 @@ impl ReadPlanBuilder { batch_size, selection, row_selection_policy: _, + selectivity_threshold: _, + deferred_selection: _, } = self; let selection = selection.map(|s| s.trim()); @@ -261,6 +333,10 @@ impl LimitedReadPlanBuilder { limit, } = self; + // Merge deferred selection before applying offset/limit so that + // offset and limit operate on the correctly filtered row set. + inner.merge_deferred(); + // If the selection is empty, truncate if !inner.selects_any() { inner.selection = Some(RowSelection::from(vec![])); @@ -360,4 +436,125 @@ mod tests { RowSelectionStrategy::Selectors ); } + + /// Helper to build a `ReadPlanBuilder` with deferred selection set directly + fn builder_with_deferred( + selection: Option, + deferred: RowSelection, + ) -> ReadPlanBuilder { + ReadPlanBuilder { + batch_size: 1024, + selection, + row_selection_policy: RowSelectionPolicy::default(), + selectivity_threshold: None, + deferred_selection: Some(deferred), + } + } + + #[test] + fn test_merge_deferred_no_prior_selection() { + // Deferred selection with no main selection: result = deferred + let deferred = RowSelection::from(vec![ + RowSelector::select(90), + RowSelector::skip(10), + ]); + let mut builder = builder_with_deferred(None, deferred); + builder.merge_deferred(); + let sel = builder.selection.unwrap(); + assert_eq!(sel.row_count(), 90); + assert_eq!(sel.skipped_row_count(), 10); + assert!(builder.deferred_selection.is_none()); + } + + #[test] + fn test_merge_deferred_with_prior_selection() { + // Main selects first 50, deferred selects rows 0..40 and 50..100 + // Intersection should select rows 0..40 (first 40 of 100) + let main_sel = RowSelection::from(vec![ + RowSelector::select(50), + RowSelector::skip(50), + ]); + let deferred = RowSelection::from(vec![ + RowSelector::select(40), + RowSelector::skip(10), + RowSelector::select(50), + ]); + let mut builder = builder_with_deferred(Some(main_sel), deferred); + builder.merge_deferred(); + let sel = builder.selection.unwrap(); + assert_eq!(sel.row_count(), 40); + } + + #[test] + fn test_merge_deferred_in_build() { + // Verify that build() merges deferred before creating the ReadPlan + let deferred = RowSelection::from(vec![ + RowSelector::select(80), + RowSelector::skip(20), + ]); + let builder = builder_with_deferred(None, deferred); + // build() should merge and produce a plan that selects 80 rows + let _plan = builder.build(); + // If it didn't panic, the merge worked (selection was properly set) + } + + #[test] + fn test_merge_deferred_in_build_limited() { + // Verify that build_limited() merges deferred before applying offset/limit + let deferred = RowSelection::from(vec![ + RowSelector::select(80), + RowSelector::skip(20), + ]); + let builder = builder_with_deferred(None, deferred); + let limited = builder + .limited(100) + .with_limit(Some(50)) + .build_limited(); + let sel = limited.selection.unwrap(); + // After merge: 80 selected, 20 skipped. After limit(50): 50 selected. + assert_eq!(sel.row_count(), 50); + } + + #[test] + fn test_selectivity_threshold_setter() { + let builder = ReadPlanBuilder::new(1024); + assert!(builder.selectivity_threshold.is_none()); + assert!(builder.deferred_selection.is_none()); + + let builder = builder.with_selectivity_threshold(Some(0.9)); + assert_eq!(builder.selectivity_threshold, Some(0.9)); + } + + #[test] + fn test_no_deferred_when_threshold_disabled() { + // Without threshold, deferred_selection should always remain None + let builder = ReadPlanBuilder::new(1024); + assert!(builder.selectivity_threshold.is_none()); + assert!(builder.deferred_selection.is_none()); + } + + #[test] + fn test_multiple_deferred_selections_intersected() { + // Two deferred selections should be intersected + let deferred1 = RowSelection::from(vec![ + RowSelector::select(80), + RowSelector::skip(20), + ]); + let deferred2 = RowSelection::from(vec![ + RowSelector::skip(10), + RowSelector::select(70), + RowSelector::skip(20), + ]); + // intersection: only rows 10..80 (70 rows) + let mut builder = builder_with_deferred(None, deferred1); + builder.deferred_selection = Some( + builder + .deferred_selection + .unwrap() + .intersection(&deferred2), + ); + builder.merge_deferred(); + let sel = builder.selection.unwrap(); + assert_eq!(sel.row_count(), 70); + } } diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs index 9e45a0c3168c..28e32dd3d98e 100644 --- a/parquet/src/arrow/async_reader/mod.rs +++ b/parquet/src/arrow/async_reader/mod.rs @@ -497,6 +497,7 @@ impl ParquetRecordBatchStreamBuilder { offset, metrics, max_predicate_cache_size, + selectivity_threshold, } = self; // Ensure schema of ParquetRecordBatchStream respects projection, and does @@ -522,6 +523,7 @@ impl ParquetRecordBatchStreamBuilder { offset, metrics, max_predicate_cache_size, + selectivity_threshold, } .build()?; diff --git a/parquet/src/arrow/push_decoder/mod.rs b/parquet/src/arrow/push_decoder/mod.rs index cdb0715edb55..65ad3c08c21e 100644 --- a/parquet/src/arrow/push_decoder/mod.rs +++ b/parquet/src/arrow/push_decoder/mod.rs @@ -176,6 +176,7 @@ impl ParquetPushDecoderBuilder { metrics, row_selection_policy, max_predicate_cache_size, + selectivity_threshold, } = self; // If no row groups were specified, read all of them @@ -197,6 +198,7 @@ impl ParquetPushDecoderBuilder { max_predicate_cache_size, buffers, row_selection_policy, + selectivity_threshold, ); // Initialize the decoder with the configured options diff --git a/parquet/src/arrow/push_decoder/reader_builder/mod.rs b/parquet/src/arrow/push_decoder/reader_builder/mod.rs index 8fa299be884f..39b846c2649c 100644 --- a/parquet/src/arrow/push_decoder/reader_builder/mod.rs +++ b/parquet/src/arrow/push_decoder/reader_builder/mod.rs @@ -160,6 +160,9 @@ pub(crate) struct RowGroupReaderBuilder { /// Strategy for materialising row selections row_selection_policy: RowSelectionPolicy, + /// Selectivity threshold for filter deferral + selectivity_threshold: Option, + /// Current state of the decoder. /// /// It is taken when processing, and must be put back before returning @@ -185,6 +188,7 @@ impl RowGroupReaderBuilder { max_predicate_cache_size: usize, buffers: PushBuffers, row_selection_policy: RowSelectionPolicy, + selectivity_threshold: Option, ) -> Self { Self { batch_size, @@ -197,6 +201,7 @@ impl RowGroupReaderBuilder { metrics, max_predicate_cache_size, row_selection_policy, + selectivity_threshold, state: Some(RowGroupDecoderState::Finished), buffers, } @@ -242,7 +247,8 @@ impl RowGroupReaderBuilder { } let plan_builder = ReadPlanBuilder::new(self.batch_size) .with_selection(selection) - .with_row_selection_policy(self.row_selection_policy); + .with_row_selection_policy(self.row_selection_policy) + .with_selectivity_threshold(self.selectivity_threshold); let row_group_info = RowGroupInfo { row_group_idx, @@ -722,6 +728,6 @@ mod tests { #[test] // Verify that the size of RowGroupDecoderState does not grow too large fn test_structure_size() { - assert_eq!(std::mem::size_of::(), 200); + assert_eq!(std::mem::size_of::(), 240); } } From 907f82d0c0c2abcdf9dd1cfab23c874bdb631196 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Sat, 14 Feb 2026 20:34:06 +0100 Subject: [PATCH 05/15] [Test] Filter selectivity --- parquet/benches/arrow_reader_clickbench.rs | 156 ++------------------- 1 file changed, 12 insertions(+), 144 deletions(-) diff --git a/parquet/benches/arrow_reader_clickbench.rs b/parquet/benches/arrow_reader_clickbench.rs index ee44cda17919..7910cd0e5172 100644 --- a/parquet/benches/arrow_reader_clickbench.rs +++ b/parquet/benches/arrow_reader_clickbench.rs @@ -30,15 +30,14 @@ //! //! [ClickBench]: https://benchmark.clickhouse.com/ -use arrow::compute::kernels::cmp::{eq, gt_eq, lt_eq, neq}; -use arrow::compute::{and, like, nlike, or}; +use arrow::compute::kernels::cmp::{eq, neq}; +use arrow::compute::{like, nlike, or}; use arrow_array::types::{Int16Type, Int32Type, Int64Type}; use arrow_array::{ArrayRef, ArrowPrimitiveType, BooleanArray, PrimitiveArray, StringViewArray}; use arrow_schema::{ArrowError, DataType, Schema}; use criterion::{Criterion, criterion_group, criterion_main}; use futures::StreamExt; use object_store::local::LocalFileSystem; -use parquet::arrow::arrow_reader::statistics::StatisticsConverter; use parquet::arrow::arrow_reader::{ ArrowPredicate, ArrowPredicateFn, ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReaderBuilder, RowFilter, RowSelection, @@ -251,16 +250,14 @@ fn all_queries() -> Vec { expected_row_count: 16, }, // Q22: SELECT "SearchPhrase", MIN("URL"), MIN("Title"), COUNT(*) AS c, COUNT(DISTINCT "UserID") FROM hits WHERE "Title" LIKE '%Google%' AND "URL" NOT LIKE '%.google.%' AND "SearchPhrase" <> '' GROUP BY "SearchPhrase" ORDER BY c DESC LIMIT 10; - // Predicate order: cheapest/most selective first. not_empty filters ~87% of rows - // before expensive LIKE predicates run, reducing string decoding significantly. Query { name: "Q22", - filter_columns: vec!["SearchPhrase", "Title", "URL"], + filter_columns: vec!["Title", "URL", "SearchPhrase"], projection_columns: vec!["SearchPhrase", "URL", "Title", "UserID"], predicates: vec![ - ClickBenchPredicate::not_empty(0), - ClickBenchPredicate::like_Google(1), - ClickBenchPredicate::nlike_google(2), + ClickBenchPredicate::like_Google(0), + ClickBenchPredicate::nlike_google(1), + ClickBenchPredicate::not_empty(2), ], expected_row_count: 46, }, @@ -486,14 +483,6 @@ fn all_queries() -> Vec { ] } -/// An equality literal value that can be used for page-level pruning -/// using min/max statistics from the page index. -#[derive(Clone)] -enum PagePruningLiteral { - Int32(i32), - Int64(i64), -} - /// Evaluate a predicate the input column specified offset relative to /// the provided filter column /// @@ -507,9 +496,6 @@ struct ClickBenchPredicate { /// This is necessary (and awkward) because `ArrowPredicateFn` does not /// implement `Clone`, so it must be created for each reader instance. predicate_factory: Box Box>, - /// If set, this predicate is an equality check that can use page-level - /// min/max statistics to skip pages that cannot contain the literal. - page_pruning_literal: Option, } impl ClickBenchPredicate { @@ -525,7 +511,6 @@ impl ClickBenchPredicate { Self { column_index, predicate_factory: Box::new(predicate_factory), - page_pruning_literal: None, } } @@ -540,23 +525,11 @@ impl ClickBenchPredicate { /// Create Predicate: col = literal fn eq_literal(column_index: usize, literal_value: T::Native) -> Self { - let mut pred = Self::new(column_index, move || { + Self::new(column_index, move || { let literal = PrimitiveArray::::new_scalar(literal_value); Box::new(move |col| eq(col, &literal)) - }); - // Set page pruning literal for types we can prune on. - // Note: Int16 is stored as Int32 in parquet, so we widen to i32. - if std::any::TypeId::of::() == std::any::TypeId::of::() { - let val: i32 = unsafe { *(&literal_value as *const T::Native as *const i32) }; - pred.page_pruning_literal = Some(PagePruningLiteral::Int32(val)); - } else if std::any::TypeId::of::() == std::any::TypeId::of::() { - let val: i64 = unsafe { *(&literal_value as *const T::Native as *const i64) }; - pred.page_pruning_literal = Some(PagePruningLiteral::Int64(val)); - } else if std::any::TypeId::of::() == std::any::TypeId::of::() { - let val: i16 = unsafe { *(&literal_value as *const T::Native as *const i16) }; - pred.page_pruning_literal = Some(PagePruningLiteral::Int32(val as i32)); - } - pred + }) + } /// Create Predicate: col IN (lit1, lit2) @@ -735,7 +708,7 @@ impl ReadTest { }; // setup the reader - let mut builder = ParquetRecordBatchStreamBuilder::new_with_metadata( + let builder = ParquetRecordBatchStreamBuilder::new_with_metadata( parquet_file, self.arrow_reader_metadata.clone(), ) @@ -743,9 +716,6 @@ impl ReadTest { .with_projection(self.projection_mask.clone()) .with_row_filter(self.row_filter()) .with_selectivity_threshold(Some(0.5)); - if let Some(selection) = self.compute_page_selection() { - builder = builder.with_row_selection(selection); - } let mut stream = builder.build().unwrap(); // run the stream to its end @@ -769,7 +739,7 @@ impl ReadTest { let reader = ParquetObjectReader::new(store, location); // setup the reader - let mut builder = ParquetRecordBatchStreamBuilder::new_with_metadata( + let builder = ParquetRecordBatchStreamBuilder::new_with_metadata( reader, self.arrow_reader_metadata.clone(), ) @@ -777,9 +747,6 @@ impl ReadTest { .with_projection(self.projection_mask.clone()) .with_row_filter(self.row_filter()) .with_selectivity_threshold(Some(0.5)); - if let Some(selection) = self.compute_page_selection() { - builder = builder.with_row_selection(selection); - } let mut stream = builder.build().unwrap(); // run the stream to its end @@ -799,7 +766,7 @@ impl ReadTest { }; // setup the reader - let mut builder = ParquetRecordBatchReaderBuilder::new_with_metadata( + let builder = ParquetRecordBatchReaderBuilder::new_with_metadata( parquet_file, self.arrow_reader_metadata.clone(), ) @@ -807,9 +774,6 @@ impl ReadTest { .with_projection(self.projection_mask.clone()) .with_row_filter(self.row_filter()) .with_selectivity_threshold(Some(0.5)); - if let Some(selection) = self.compute_page_selection() { - builder = builder.with_row_selection(selection); - } let reader = builder.build().unwrap(); // run the stream to its end @@ -850,102 +814,6 @@ impl ReadTest { RowFilter::new(arrow_predicates) } - /// Compute a `RowSelection` based on page-level min/max statistics. - /// - /// For equality predicates, pages where `literal < min OR literal > max` are - /// skipped entirely, avoiding decoding them at all. - fn compute_page_selection(&self) -> Option { - let metadata = self.arrow_reader_metadata.metadata(); - let arrow_schema = self.arrow_reader_metadata.schema(); - let parquet_schema = metadata.file_metadata().schema_descr(); - let column_index = metadata.column_index()?; - let offset_index = metadata.offset_index()?; - let row_group_metadatas = metadata.row_groups(); - let row_groups: Vec = (0..metadata.num_row_groups()).collect(); - - let total_rows: usize = row_group_metadatas.iter().map(|rg| rg.num_rows() as usize).sum(); - let mut combined_selection: Option = None; - - for pred in &self.predicates { - let literal = match &pred.page_pruning_literal { - Some(lit) => lit, - None => continue, - }; - - let schema_index = self.filter_schema_indices[pred.column_index()]; - let col_name = parquet_schema - .root_schema() - .get_fields()[schema_index] - .name(); - - let converter = match StatisticsConverter::try_new(col_name, arrow_schema, parquet_schema) { - Ok(c) => c, - Err(_) => continue, - }; - - let page_mins = match converter.data_page_mins(column_index, offset_index, row_groups.iter()) { - Ok(m) => m, - Err(_) => continue, - }; - let page_maxes = match converter.data_page_maxes(column_index, offset_index, row_groups.iter()) { - Ok(m) => m, - Err(_) => continue, - }; - let page_row_counts = match converter.data_page_row_counts(offset_index, row_group_metadatas, row_groups.iter()) { - Ok(Some(c)) => c, - _ => continue, - }; - - // For an equality predicate `col = literal`: - // A page can contain matching rows only if min <= literal <= max - let pages_match = match literal { - PagePruningLiteral::Int32(val) => { - let scalar = PrimitiveArray::::new_scalar(*val); - let min_ok = lt_eq(&page_mins, &scalar).ok(); - let max_ok = gt_eq(&page_maxes, &scalar).ok(); - match (min_ok, max_ok) { - (Some(min_check), Some(max_check)) => and(&min_check, &max_check).ok(), - _ => None, - } - } - PagePruningLiteral::Int64(val) => { - let scalar = PrimitiveArray::::new_scalar(*val); - let min_ok = lt_eq(&page_mins, &scalar).ok(); - let max_ok = gt_eq(&page_maxes, &scalar).ok(); - match (min_ok, max_ok) { - (Some(min_check), Some(max_check)) => and(&min_check, &max_check).ok(), - _ => None, - } - } - }; - - let pages_match = match pages_match { - Some(m) => m, - None => continue, - }; - - // Build row ranges for pages that match - let mut ranges: Vec> = Vec::new(); - let mut row_offset = 0usize; - for (i, count) in page_row_counts.values().iter().enumerate() { - let count = *count as usize; - if i < pages_match.len() && pages_match.value(i) { - ranges.push(row_offset..row_offset + count); - } - row_offset += count; - } - - let page_selection = RowSelection::from_consecutive_ranges(ranges.into_iter(), total_rows); - - combined_selection = Some(match combined_selection { - Some(existing) => existing.intersection(&page_selection), - None => page_selection, - }); - } - - combined_selection - } - fn check_row_count(&self, row_count: usize) { let expected_row_count = self.expected_row_count; assert_eq!( From b4275cbf45394f7b8e2eac628b32490dd07f3712 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Sat, 14 Feb 2026 20:36:05 +0100 Subject: [PATCH 06/15] [Test] Filter selectivity --- parquet/benches/arrow_reader_clickbench.rs | 5 +-- parquet/src/arrow/arrow_reader/read_plan.rs | 38 +++++---------------- 2 files changed, 9 insertions(+), 34 deletions(-) diff --git a/parquet/benches/arrow_reader_clickbench.rs b/parquet/benches/arrow_reader_clickbench.rs index 7910cd0e5172..b30177a579bf 100644 --- a/parquet/benches/arrow_reader_clickbench.rs +++ b/parquet/benches/arrow_reader_clickbench.rs @@ -40,14 +40,13 @@ use futures::StreamExt; use object_store::local::LocalFileSystem; use parquet::arrow::arrow_reader::{ ArrowPredicate, ArrowPredicateFn, ArrowReaderMetadata, ArrowReaderOptions, - ParquetRecordBatchReaderBuilder, RowFilter, RowSelection, + ParquetRecordBatchReaderBuilder, RowFilter, }; use parquet::arrow::async_reader::ParquetObjectReader; use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask}; use parquet::file::metadata::PageIndexPolicy; use parquet::schema::types::SchemaDescriptor; use std::fmt::{Display, Formatter}; -use std::ops::Range; use std::path::{Path, PathBuf}; use std::sync::{Arc, OnceLock}; @@ -529,7 +528,6 @@ impl ClickBenchPredicate { let literal = PrimitiveArray::::new_scalar(literal_value); Box::new(move |col| eq(col, &literal)) }) - } /// Create Predicate: col IN (lit1, lit2) @@ -640,7 +638,6 @@ fn find_file_if_exists(mut current_dir: PathBuf, file_name: &str) -> Option Date: Thu, 19 Mar 2026 21:28:17 +0100 Subject: [PATCH 07/15] Improve selectivity threshold: measure absolute selectivity and avoid full clone - Measure selectivity against absolute row count (after and_then) instead of relative to current selection, making predicates comparable regardless of prior filtering - Avoid cloning the full ReadPlanBuilder (including deferred_selection) by constructing a minimal builder for the predicate reader Co-Authored-By: Claude Opus 4.6 (1M context) --- parquet/src/arrow/arrow_reader/read_plan.rs | 33 +++++++++++++-------- 1 file changed, 20 insertions(+), 13 deletions(-) diff --git a/parquet/src/arrow/arrow_reader/read_plan.rs b/parquet/src/arrow/arrow_reader/read_plan.rs index 44d424c95676..7d04863c985d 100644 --- a/parquet/src/arrow/arrow_reader/read_plan.rs +++ b/parquet/src/arrow/arrow_reader/read_plan.rs @@ -178,12 +178,16 @@ impl ReadPlanBuilder { array_reader: Box, predicate: &mut dyn ArrowPredicate, ) -> Result { - // Build a ReadPlan from only self.selection (not deferred) so the - // reader sees the same rows that self.selection describes. Deferred - // selections must not be merged here because the raw filter result - // produced below is relative to self.selection. - let mut plan_for_reader = self.clone(); - plan_for_reader.deferred_selection = None; + // Build a ReadPlan for the predicate reader using only the current + // selection (not deferred). We avoid cloning the entire builder by + // constructing a minimal ReadPlanBuilder with just what build() needs. + let plan_for_reader = ReadPlanBuilder { + batch_size: self.batch_size, + selection: self.selection.clone(), + row_selection_policy: self.row_selection_policy, + selectivity_threshold: None, + deferred_selection: None, + }; let reader = ParquetRecordBatchReader::new(array_reader, plan_for_reader.build()); let mut filters = vec![]; for maybe_batch in reader { @@ -212,19 +216,22 @@ impl ReadPlanBuilder { } let raw = RowSelection::from_filters(&filters); - // Check if this predicate should be deferred due to high selectivity - let should_defer = self.selectivity_threshold.is_some_and(|threshold| { - let selected = raw.row_count(); - let total = selected + raw.skipped_row_count(); - total > 0 && (selected as f64 / total as f64) > threshold - }); - // Compute the absolute-position result let absolute = match self.selection.as_ref() { Some(selection) => selection.and_then(&raw), None => raw, }; + // Check if this predicate should be deferred due to high selectivity. + // Selectivity is measured in absolute terms (fraction of total row + // group rows) so that predicates remain comparable regardless of how + // much prior filtering has been applied. + let should_defer = self.selectivity_threshold.is_some_and(|threshold| { + let selected = absolute.row_count(); + let total = selected + absolute.skipped_row_count(); + total > 0 && (selected as f64 / total as f64) > threshold + }); + if should_defer { // Defer: accumulate into deferred_selection, leave self.selection unchanged self.deferred_selection = Some(match self.deferred_selection.take() { From 75073e12a72e5d34c803b5a00a2eab3388aaa585 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Fri, 20 Mar 2026 07:32:04 +0100 Subject: [PATCH 08/15] Revert selectivity to measure against raw (relative) counts Measuring selectivity against the absolute result makes the threshold less intuitive since it becomes more scattered after and_then. Revert to measuring against the raw predicate result (relative to current selection). Co-Authored-By: Claude Opus 4.6 (1M context) --- parquet/src/arrow/arrow_reader/read_plan.rs | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/parquet/src/arrow/arrow_reader/read_plan.rs b/parquet/src/arrow/arrow_reader/read_plan.rs index 7d04863c985d..e447b47f0185 100644 --- a/parquet/src/arrow/arrow_reader/read_plan.rs +++ b/parquet/src/arrow/arrow_reader/read_plan.rs @@ -216,22 +216,19 @@ impl ReadPlanBuilder { } let raw = RowSelection::from_filters(&filters); + // Check if this predicate should be deferred due to high selectivity + let should_defer = self.selectivity_threshold.is_some_and(|threshold| { + let selected = raw.row_count(); + let total = selected + raw.skipped_row_count(); + total > 0 && (selected as f64 / total as f64) > threshold + }); + // Compute the absolute-position result let absolute = match self.selection.as_ref() { Some(selection) => selection.and_then(&raw), None => raw, }; - // Check if this predicate should be deferred due to high selectivity. - // Selectivity is measured in absolute terms (fraction of total row - // group rows) so that predicates remain comparable regardless of how - // much prior filtering has been applied. - let should_defer = self.selectivity_threshold.is_some_and(|threshold| { - let selected = absolute.row_count(); - let total = selected + absolute.skipped_row_count(); - total > 0 && (selected as f64 / total as f64) > threshold - }); - if should_defer { // Defer: accumulate into deferred_selection, leave self.selection unchanged self.deferred_selection = Some(match self.deferred_selection.take() { From 5f2be6e96e561d86d4d496c2a6a953175a568901 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Fri, 20 Mar 2026 07:39:19 +0100 Subject: [PATCH 09/15] Replace selectivity threshold with scatter-based filter deferral Instead of measuring selectivity (fraction of rows passing), measure scattering: how much applying a predicate would fragment the selection. A predicate is deferred if it would increase the selector count beyond `current_selectors * scatter_threshold`. This directly targets what makes fragmented selections expensive: many small skip/read transitions during decoding. - Rename selectivity_threshold -> scatter_threshold - Add RowSelection::selector_count() (O(1) via Vec::len) - Use selector count ratio instead of row selectivity ratio Co-Authored-By: Claude Opus 4.6 (1M context) --- parquet/benches/arrow_reader_clickbench.rs | 6 +- parquet/src/arrow/arrow_reader/mod.rs | 30 ++++---- parquet/src/arrow/arrow_reader/read_plan.rs | 71 +++++++++++-------- parquet/src/arrow/arrow_reader/selection.rs | 8 +++ parquet/src/arrow/async_reader/mod.rs | 4 +- parquet/src/arrow/push_decoder/mod.rs | 4 +- .../arrow/push_decoder/reader_builder/mod.rs | 10 +-- 7 files changed, 78 insertions(+), 55 deletions(-) diff --git a/parquet/benches/arrow_reader_clickbench.rs b/parquet/benches/arrow_reader_clickbench.rs index 2f163aebc973..129ab77973ea 100644 --- a/parquet/benches/arrow_reader_clickbench.rs +++ b/parquet/benches/arrow_reader_clickbench.rs @@ -723,7 +723,7 @@ impl ReadTest { .with_batch_size(8192) .with_projection(self.projection_mask.clone()) .with_row_filter(self.row_filter()) - .with_selectivity_threshold(Some(0.5)); + .with_scatter_threshold(Some(2.0)); let mut stream = builder.build().unwrap(); // run the stream to its end @@ -754,7 +754,7 @@ impl ReadTest { .with_batch_size(8192) .with_projection(self.projection_mask.clone()) .with_row_filter(self.row_filter()) - .with_selectivity_threshold(Some(0.5)); + .with_scatter_threshold(Some(2.0)); let mut stream = builder.build().unwrap(); // run the stream to its end @@ -781,7 +781,7 @@ impl ReadTest { .with_batch_size(8192) .with_projection(self.projection_mask.clone()) .with_row_filter(self.row_filter()) - .with_selectivity_threshold(Some(0.5)); + .with_scatter_threshold(Some(2.0)); let reader = builder.build().unwrap(); // run the stream to its end diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index cc8fce9dc5ea..34aa3ee3113e 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -140,7 +140,7 @@ pub struct ArrowReaderBuilder { pub(crate) max_predicate_cache_size: usize, - pub(crate) selectivity_threshold: Option, + pub(crate) scatter_threshold: Option, } impl Debug for ArrowReaderBuilder { @@ -159,7 +159,7 @@ impl Debug for ArrowReaderBuilder { .field("limit", &self.limit) .field("offset", &self.offset) .field("metrics", &self.metrics) - .field("selectivity_threshold", &self.selectivity_threshold) + .field("scatter_threshold", &self.scatter_threshold) .finish() } } @@ -181,7 +181,7 @@ impl ArrowReaderBuilder { offset: None, metrics: ArrowReaderMetrics::Disabled, max_predicate_cache_size: 100 * 1024 * 1024, // 100MB default cache size - selectivity_threshold: None, + scatter_threshold: None, } } @@ -435,13 +435,17 @@ impl ArrowReaderBuilder { } } - /// Set a selectivity threshold for filter deferral. + /// Set a scatter threshold for filter deferral. /// - /// When a predicate in the [`RowFilter`] passes more than `threshold` - /// fraction of rows (e.g., 0.9 means 90%), its result is deferred rather - /// than immediately applied to the row selection. This prevents - /// non-selective predicates from creating fragmented selections that slow - /// subsequent predicate evaluation and the final data read. + /// When applying a predicate's result would increase the number of + /// selectors (select/skip transitions) by more than `threshold` times + /// the current count, the result is deferred rather than applied + /// immediately. For example, a threshold of `2.0` defers predicates + /// that would more than double the selector count. + /// + /// This prevents highly-scattering predicates from fragmenting the + /// row selection, which slows subsequent predicate evaluation and + /// data decoding due to many small skip/read transitions. /// /// The deferred results are applied at the end via /// [`RowSelection::intersection`], so correctness is preserved. @@ -450,9 +454,9 @@ impl ArrowReaderBuilder { /// /// [`RowFilter`]: crate::arrow::arrow_reader::RowFilter /// [`RowSelection::intersection`]: crate::arrow::arrow_reader::RowSelection::intersection - pub fn with_selectivity_threshold(self, threshold: Option) -> Self { + pub fn with_scatter_threshold(self, threshold: Option) -> Self { Self { - selectivity_threshold: threshold, + scatter_threshold: threshold, ..self } } @@ -1214,7 +1218,7 @@ impl ParquetRecordBatchReaderBuilder { metrics, // Not used for the sync reader, see https://github.com/apache/arrow-rs/issues/8000 max_predicate_cache_size: _, - selectivity_threshold, + scatter_threshold, } = self; // Try to avoid allocate large buffer @@ -1231,7 +1235,7 @@ impl ParquetRecordBatchReaderBuilder { let mut plan_builder = ReadPlanBuilder::new(batch_size) .with_selection(selection) .with_row_selection_policy(row_selection_policy) - .with_selectivity_threshold(selectivity_threshold); + .with_scatter_threshold(scatter_threshold); // Update selection based on any filters if let Some(filter) = filter.as_mut() { diff --git a/parquet/src/arrow/arrow_reader/read_plan.rs b/parquet/src/arrow/arrow_reader/read_plan.rs index e447b47f0185..af5518d82091 100644 --- a/parquet/src/arrow/arrow_reader/read_plan.rs +++ b/parquet/src/arrow/arrow_reader/read_plan.rs @@ -37,16 +37,17 @@ pub struct ReadPlanBuilder { selection: Option, /// Policy to use when materializing the row selection row_selection_policy: RowSelectionPolicy, - /// Selectivity threshold above which a predicate's result is deferred - /// rather than applied immediately to `selection`. + /// Maximum allowed scatter factor for applying a predicate result. /// - /// When set, predicates whose selectivity (fraction of rows passing) - /// exceeds this threshold will have their result accumulated in - /// `deferred_selection` instead of fragmenting `selection`. This keeps - /// subsequent predicate evaluations operating on a contiguous selection. + /// When set, if applying a predicate would increase the number of + /// selectors (transitions between select/skip) beyond + /// `current_selectors * scatter_threshold`, the result is deferred + /// into `deferred_selection` instead. This prevents highly-scattering + /// predicates from fragmenting the selection and slowing subsequent + /// predicate evaluation and data reads. /// /// `None` disables deferral (all predicates applied immediately). - selectivity_threshold: Option, + scatter_threshold: Option, /// Accumulated deferred selections, merged via `intersection` at build time. deferred_selection: Option, } @@ -58,7 +59,7 @@ impl ReadPlanBuilder { batch_size, selection: None, row_selection_policy: RowSelectionPolicy::default(), - selectivity_threshold: None, + scatter_threshold: None, deferred_selection: None, } } @@ -82,19 +83,24 @@ impl ReadPlanBuilder { &self.row_selection_policy } - /// Set the selectivity threshold for filter deferral. + /// Set the scatter threshold for filter deferral. /// - /// When a predicate's selectivity (fraction of rows passing) exceeds this - /// threshold, its result is deferred rather than immediately applied to - /// the row selection. This prevents non-selective predicates from - /// fragmenting the selection and slowing subsequent predicate evaluation. + /// When applying a predicate's result would increase the number of + /// selectors (select/skip transitions) by more than this factor compared + /// to the current selection, the result is deferred rather than applied + /// immediately. For example, a threshold of `2.0` means a predicate is + /// deferred if it would more than double the number of selectors. + /// + /// This directly targets what makes fragmented selections expensive: + /// many small skip/read transitions during subsequent predicate evaluation + /// and data decoding. /// /// The deferred results are merged via [`RowSelection::intersection`] at /// build time, so correctness is preserved. /// /// `None` disables deferral (all predicates applied immediately). - pub fn with_selectivity_threshold(mut self, threshold: Option) -> Self { - self.selectivity_threshold = threshold; + pub fn with_scatter_threshold(mut self, threshold: Option) -> Self { + self.scatter_threshold = threshold; self } @@ -185,7 +191,7 @@ impl ReadPlanBuilder { batch_size: self.batch_size, selection: self.selection.clone(), row_selection_policy: self.row_selection_policy, - selectivity_threshold: None, + scatter_threshold: None, deferred_selection: None, }; let reader = ParquetRecordBatchReader::new(array_reader, plan_for_reader.build()); @@ -216,19 +222,24 @@ impl ReadPlanBuilder { } let raw = RowSelection::from_filters(&filters); - // Check if this predicate should be deferred due to high selectivity - let should_defer = self.selectivity_threshold.is_some_and(|threshold| { - let selected = raw.row_count(); - let total = selected + raw.skipped_row_count(); - total > 0 && (selected as f64 / total as f64) > threshold - }); - // Compute the absolute-position result let absolute = match self.selection.as_ref() { Some(selection) => selection.and_then(&raw), None => raw, }; + // Check if applying this predicate would scatter the selection too much. + // Compare the number of selectors before and after: if the result has + // significantly more transitions, defer it to avoid fragmenting reads. + let should_defer = self.scatter_threshold.is_some_and(|threshold| { + let current_selectors = self + .selection + .as_ref() + .map_or(1, |s| s.selector_count().max(1)); + let new_selectors = absolute.selector_count(); + new_selectors as f64 > current_selectors as f64 * threshold + }); + if should_defer { // Defer: accumulate into deferred_selection, leave self.selection unchanged self.deferred_selection = Some(match self.deferred_selection.take() { @@ -270,7 +281,7 @@ impl ReadPlanBuilder { batch_size, selection, row_selection_policy: _, - selectivity_threshold: _, + scatter_threshold: _, deferred_selection: _, } = self; @@ -457,7 +468,7 @@ mod tests { batch_size: 1024, selection, row_selection_policy: RowSelectionPolicy::default(), - selectivity_threshold: None, + scatter_threshold: None, deferred_selection: Some(deferred), } } @@ -512,20 +523,20 @@ mod tests { } #[test] - fn test_selectivity_threshold_setter() { + fn test_scatter_threshold_setter() { let builder = ReadPlanBuilder::new(1024); - assert!(builder.selectivity_threshold.is_none()); + assert!(builder.scatter_threshold.is_none()); assert!(builder.deferred_selection.is_none()); - let builder = builder.with_selectivity_threshold(Some(0.9)); - assert_eq!(builder.selectivity_threshold, Some(0.9)); + let builder = builder.with_scatter_threshold(Some(0.9)); + assert_eq!(builder.scatter_threshold, Some(0.9)); } #[test] fn test_no_deferred_when_threshold_disabled() { // Without threshold, deferred_selection should always remain None let builder = ReadPlanBuilder::new(1024); - assert!(builder.selectivity_threshold.is_none()); + assert!(builder.scatter_threshold.is_none()); assert!(builder.deferred_selection.is_none()); } diff --git a/parquet/src/arrow/arrow_reader/selection.rs b/parquet/src/arrow/arrow_reader/selection.rs index 2ddf812f9c39..cf040644893d 100644 --- a/parquet/src/arrow/arrow_reader/selection.rs +++ b/parquet/src/arrow/arrow_reader/selection.rs @@ -504,6 +504,14 @@ impl RowSelection { self.selectors.iter() } + /// Returns the number of selectors (select/skip segments) in this selection. + /// + /// A higher count indicates a more fragmented selection, which can slow + /// sequential reads due to many small skip/read transitions. + pub fn selector_count(&self) -> usize { + self.selectors.len() + } + /// Returns the number of selected rows pub fn row_count(&self) -> usize { self.iter().filter(|s| !s.skip).map(|s| s.row_count).sum() diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs index 28e32dd3d98e..622d5a029b6e 100644 --- a/parquet/src/arrow/async_reader/mod.rs +++ b/parquet/src/arrow/async_reader/mod.rs @@ -497,7 +497,7 @@ impl ParquetRecordBatchStreamBuilder { offset, metrics, max_predicate_cache_size, - selectivity_threshold, + scatter_threshold, } = self; // Ensure schema of ParquetRecordBatchStream respects projection, and does @@ -523,7 +523,7 @@ impl ParquetRecordBatchStreamBuilder { offset, metrics, max_predicate_cache_size, - selectivity_threshold, + scatter_threshold, } .build()?; diff --git a/parquet/src/arrow/push_decoder/mod.rs b/parquet/src/arrow/push_decoder/mod.rs index 65ad3c08c21e..80e1cdaaeacf 100644 --- a/parquet/src/arrow/push_decoder/mod.rs +++ b/parquet/src/arrow/push_decoder/mod.rs @@ -176,7 +176,7 @@ impl ParquetPushDecoderBuilder { metrics, row_selection_policy, max_predicate_cache_size, - selectivity_threshold, + scatter_threshold, } = self; // If no row groups were specified, read all of them @@ -198,7 +198,7 @@ impl ParquetPushDecoderBuilder { max_predicate_cache_size, buffers, row_selection_policy, - selectivity_threshold, + scatter_threshold, ); // Initialize the decoder with the configured options diff --git a/parquet/src/arrow/push_decoder/reader_builder/mod.rs b/parquet/src/arrow/push_decoder/reader_builder/mod.rs index bdfbdd763dde..6ea93808b8b6 100644 --- a/parquet/src/arrow/push_decoder/reader_builder/mod.rs +++ b/parquet/src/arrow/push_decoder/reader_builder/mod.rs @@ -160,8 +160,8 @@ pub(crate) struct RowGroupReaderBuilder { /// Strategy for materialising row selections row_selection_policy: RowSelectionPolicy, - /// Selectivity threshold for filter deferral - selectivity_threshold: Option, + /// Scatter threshold for filter deferral + scatter_threshold: Option, /// Current state of the decoder. /// @@ -188,7 +188,7 @@ impl RowGroupReaderBuilder { max_predicate_cache_size: usize, buffers: PushBuffers, row_selection_policy: RowSelectionPolicy, - selectivity_threshold: Option, + scatter_threshold: Option, ) -> Self { Self { batch_size, @@ -201,7 +201,7 @@ impl RowGroupReaderBuilder { metrics, max_predicate_cache_size, row_selection_policy, - selectivity_threshold, + scatter_threshold, state: Some(RowGroupDecoderState::Finished), buffers, } @@ -248,7 +248,7 @@ impl RowGroupReaderBuilder { let plan_builder = ReadPlanBuilder::new(self.batch_size) .with_selection(selection) .with_row_selection_policy(self.row_selection_policy) - .with_selectivity_threshold(self.selectivity_threshold); + .with_scatter_threshold(self.scatter_threshold); let row_group_info = RowGroupInfo { row_group_idx, From a9bc94fa6c2dd725288847e2f7d0c64e40ac92bc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Fri, 20 Mar 2026 07:54:55 +0100 Subject: [PATCH 10/15] Use selector density (selectors/rows) for scatter threshold MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Instead of comparing selector count ratios, measure selector density: selectors / total_rows. A density of 0.25 means at most 25 selectors per 100 rows — anything more fragmented gets deferred. This is more intuitive and directly proportional to the per-row cost of skip/read transitions during decoding. Co-Authored-By: Claude Opus 4.6 (1M context) --- parquet/benches/arrow_reader_clickbench.rs | 6 +-- parquet/src/arrow/arrow_reader/mod.rs | 16 ++++---- parquet/src/arrow/arrow_reader/read_plan.rs | 42 ++++++++++----------- 3 files changed, 30 insertions(+), 34 deletions(-) diff --git a/parquet/benches/arrow_reader_clickbench.rs b/parquet/benches/arrow_reader_clickbench.rs index 129ab77973ea..36cf30d86982 100644 --- a/parquet/benches/arrow_reader_clickbench.rs +++ b/parquet/benches/arrow_reader_clickbench.rs @@ -723,7 +723,7 @@ impl ReadTest { .with_batch_size(8192) .with_projection(self.projection_mask.clone()) .with_row_filter(self.row_filter()) - .with_scatter_threshold(Some(2.0)); + .with_scatter_threshold(Some(0.25)); let mut stream = builder.build().unwrap(); // run the stream to its end @@ -754,7 +754,7 @@ impl ReadTest { .with_batch_size(8192) .with_projection(self.projection_mask.clone()) .with_row_filter(self.row_filter()) - .with_scatter_threshold(Some(2.0)); + .with_scatter_threshold(Some(0.25)); let mut stream = builder.build().unwrap(); // run the stream to its end @@ -781,7 +781,7 @@ impl ReadTest { .with_batch_size(8192) .with_projection(self.projection_mask.clone()) .with_row_filter(self.row_filter()) - .with_scatter_threshold(Some(2.0)); + .with_scatter_threshold(Some(0.25)); let reader = builder.build().unwrap(); // run the stream to its end diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index 34aa3ee3113e..8a370ea9fddc 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -437,15 +437,15 @@ impl ArrowReaderBuilder { /// Set a scatter threshold for filter deferral. /// - /// When applying a predicate's result would increase the number of - /// selectors (select/skip transitions) by more than `threshold` times - /// the current count, the result is deferred rather than applied - /// immediately. For example, a threshold of `2.0` defers predicates - /// that would more than double the selector count. + /// The threshold is the maximum allowed **selector density** + /// (`selector_count / row_count`). If applying a predicate would + /// produce a density above this value, its result is deferred. + /// For example, `0.25` allows at most 25 selectors per 100 rows. /// - /// This prevents highly-scattering predicates from fragmenting the - /// row selection, which slows subsequent predicate evaluation and - /// data decoding due to many small skip/read transitions. + /// A high selector density means many small skip/read transitions, + /// which slows subsequent predicate evaluation and data decoding. + /// Deferring scattering predicates keeps the selection contiguous + /// for intermediate steps. /// /// The deferred results are applied at the end via /// [`RowSelection::intersection`], so correctness is preserved. diff --git a/parquet/src/arrow/arrow_reader/read_plan.rs b/parquet/src/arrow/arrow_reader/read_plan.rs index af5518d82091..53e9155c599f 100644 --- a/parquet/src/arrow/arrow_reader/read_plan.rs +++ b/parquet/src/arrow/arrow_reader/read_plan.rs @@ -37,14 +37,13 @@ pub struct ReadPlanBuilder { selection: Option, /// Policy to use when materializing the row selection row_selection_policy: RowSelectionPolicy, - /// Maximum allowed scatter factor for applying a predicate result. + /// Maximum allowed selector density (selectors / rows) for applying a + /// predicate result. /// - /// When set, if applying a predicate would increase the number of - /// selectors (transitions between select/skip) beyond - /// `current_selectors * scatter_threshold`, the result is deferred - /// into `deferred_selection` instead. This prevents highly-scattering - /// predicates from fragmenting the selection and slowing subsequent - /// predicate evaluation and data reads. + /// When set, if applying a predicate would produce a selector density + /// above this threshold, the result is deferred into `deferred_selection` + /// instead. For example, `0.25` means at most 25 selectors per 100 rows; + /// anything more fragmented gets deferred. /// /// `None` disables deferral (all predicates applied immediately). scatter_threshold: Option, @@ -85,15 +84,15 @@ impl ReadPlanBuilder { /// Set the scatter threshold for filter deferral. /// - /// When applying a predicate's result would increase the number of - /// selectors (select/skip transitions) by more than this factor compared - /// to the current selection, the result is deferred rather than applied - /// immediately. For example, a threshold of `2.0` means a predicate is - /// deferred if it would more than double the number of selectors. + /// The threshold is the maximum allowed **selector density** + /// (`selector_count / row_count`). If applying a predicate would produce + /// a density above this value, its result is deferred. For example, + /// `0.25` allows at most 25 selectors per 100 rows. /// - /// This directly targets what makes fragmented selections expensive: - /// many small skip/read transitions during subsequent predicate evaluation - /// and data decoding. + /// A high selector density means many small skip/read transitions, + /// which is expensive for subsequent predicate evaluation and data + /// decoding. Deferring scattering predicates keeps the selection + /// contiguous for intermediate steps. /// /// The deferred results are merged via [`RowSelection::intersection`] at /// build time, so correctness is preserved. @@ -229,15 +228,12 @@ impl ReadPlanBuilder { }; // Check if applying this predicate would scatter the selection too much. - // Compare the number of selectors before and after: if the result has - // significantly more transitions, defer it to avoid fragmenting reads. + // Measure selector density: selectors / rows. A high density means many + // small skip/read transitions per row, which is expensive for decoding. let should_defer = self.scatter_threshold.is_some_and(|threshold| { - let current_selectors = self - .selection - .as_ref() - .map_or(1, |s| s.selector_count().max(1)); - let new_selectors = absolute.selector_count(); - new_selectors as f64 > current_selectors as f64 * threshold + let row_count = absolute.row_count() + absolute.skipped_row_count(); + row_count > 0 + && absolute.selector_count() as f64 / row_count as f64 > threshold }); if should_defer { From a1e52a364e80e2598487f8f3a473cf757a97c207 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Fri, 20 Mar 2026 08:03:51 +0100 Subject: [PATCH 11/15] Add cached row_count to RowSelection and O(1) total_row_count() Store total row count in RowSelection at construction time, enabling O(1) total_row_count() instead of iterating all selectors. Also add selector_count() for O(1) fragmentation measurement. Update split_off() and limit() to maintain the cached row_count. Co-Authored-By: Claude Opus 4.6 (1M context) --- parquet/src/arrow/arrow_reader/read_plan.rs | 10 +++--- parquet/src/arrow/arrow_reader/selection.rs | 33 ++++++++++++++----- .../arrow/push_decoder/reader_builder/mod.rs | 2 +- 3 files changed, 31 insertions(+), 14 deletions(-) diff --git a/parquet/src/arrow/arrow_reader/read_plan.rs b/parquet/src/arrow/arrow_reader/read_plan.rs index 53e9155c599f..e336bbf1566e 100644 --- a/parquet/src/arrow/arrow_reader/read_plan.rs +++ b/parquet/src/arrow/arrow_reader/read_plan.rs @@ -228,12 +228,12 @@ impl ReadPlanBuilder { }; // Check if applying this predicate would scatter the selection too much. - // Measure selector density: selectors / rows. A high density means many - // small skip/read transitions per row, which is expensive for decoding. + // Measure selector density: selectors / total_rows. A high density means + // many small skip/read transitions per row, which is expensive for decoding. let should_defer = self.scatter_threshold.is_some_and(|threshold| { - let row_count = absolute.row_count() + absolute.skipped_row_count(); - row_count > 0 - && absolute.selector_count() as f64 / row_count as f64 > threshold + let total_rows = absolute.total_row_count(); + total_rows > 0 + && absolute.selector_count() as f64 / total_rows as f64 > threshold }); if should_defer { diff --git a/parquet/src/arrow/arrow_reader/selection.rs b/parquet/src/arrow/arrow_reader/selection.rs index cf040644893d..a015487903ab 100644 --- a/parquet/src/arrow/arrow_reader/selection.rs +++ b/parquet/src/arrow/arrow_reader/selection.rs @@ -138,9 +138,19 @@ impl RowSelector { #[derive(Debug, Clone, Default, Eq, PartialEq)] pub struct RowSelection { selectors: Vec, + /// Cached total row count (selected + skipped) for O(1) access. + row_count: usize, } impl RowSelection { + fn from_selectors(selectors: Vec) -> Self { + let row_count = selectors.iter().map(|s| s.row_count).sum(); + Self { + selectors, + row_count, + } + } + /// Creates a [`RowSelection`] from a slice of [`BooleanArray`] /// /// # Panic @@ -191,7 +201,7 @@ impl RowSelection { selectors.push(RowSelector::skip(total_rows - last_end)) } - Self { selectors } + Self::from_selectors(selectors) } /// Given an offset index, return the byte ranges for all data pages selected by `self` @@ -297,7 +307,8 @@ impl RowSelection { Some(idx) => idx, None => { let selectors = std::mem::take(&mut self.selectors); - return Self { selectors }; + self.row_count = 0; + return Self::from_selectors(selectors); } }; @@ -316,9 +327,9 @@ impl RowSelection { next.row_count = overflow; std::mem::swap(&mut remaining, &mut self.selectors); - Self { - selectors: remaining, - } + // Update cached row_count for self (now the tail portion) + self.row_count = self.selectors.iter().map(|s| s.row_count).sum(); + Self::from_selectors(remaining) } /// returns a [`RowSelection`] representing rows that are selected in both /// input [`RowSelection`]s. @@ -402,7 +413,7 @@ impl RowSelection { selectors.push(RowSelector::skip(to_skip)); } - Self { selectors } + Self::from_selectors(selectors) } /// Compute the intersection of two [`RowSelection`] @@ -475,7 +486,7 @@ impl RowSelection { selectors.push(RowSelector::select(selected_count - offset)); selectors.extend_from_slice(&self.selectors[split_idx + 1..]); - Self { selectors } + Self::from_selectors(selectors) } /// Limit this [`RowSelection`] to only select `limit` rows @@ -495,6 +506,7 @@ impl RowSelection { } } } + self.row_count = self.selectors.iter().map(|s| s.row_count).sum(); self } @@ -512,6 +524,11 @@ impl RowSelection { self.selectors.len() } + /// Returns the total number of rows (selected + skipped) in O(1). + pub fn total_row_count(&self) -> usize { + self.row_count + } + /// Returns the number of selected rows pub fn row_count(&self) -> usize { self.iter().filter(|s| !s.skip).map(|s| s.row_count).sum() @@ -607,7 +624,7 @@ impl FromIterator for RowSelection { } } - Self { selectors } + Self::from_selectors(selectors) } } diff --git a/parquet/src/arrow/push_decoder/reader_builder/mod.rs b/parquet/src/arrow/push_decoder/reader_builder/mod.rs index 6ea93808b8b6..d8919b69a3d2 100644 --- a/parquet/src/arrow/push_decoder/reader_builder/mod.rs +++ b/parquet/src/arrow/push_decoder/reader_builder/mod.rs @@ -735,6 +735,6 @@ mod tests { #[test] // Verify that the size of RowGroupDecoderState does not grow too large fn test_structure_size() { - assert_eq!(std::mem::size_of::(), 240); + assert_eq!(std::mem::size_of::(), 256); } } From 7e7e0659103b8b4ffbeb7dc7aa6b177c79341926 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Fri, 20 Mar 2026 08:18:36 +0100 Subject: [PATCH 12/15] Pass row_count to with_predicate instead of caching in RowSelection The total row count needed for scatter density calculation is already available at both call sites (sync reader sums row group sizes, async reader has row_count in scope). Pass it as a parameter instead of storing it in RowSelection. Co-Authored-By: Claude Opus 4.6 (1M context) --- parquet/src/arrow/arrow_reader/mod.rs | 8 ++++- parquet/src/arrow/arrow_reader/read_plan.rs | 6 ++-- parquet/src/arrow/arrow_reader/selection.rs | 31 ++++--------------- .../arrow/push_decoder/reader_builder/mod.rs | 9 ++++-- 4 files changed, 22 insertions(+), 32 deletions(-) diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index 8a370ea9fddc..1e12e4141d60 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -1249,7 +1249,13 @@ impl ParquetRecordBatchReaderBuilder { .with_parquet_metadata(&reader.metadata) .build_array_reader(fields.as_deref(), predicate.projection())?; - plan_builder = plan_builder.with_predicate(array_reader, predicate.as_mut())?; + let row_count: usize = reader + .row_groups + .iter() + .map(|&i| reader.metadata.row_group(i).num_rows() as usize) + .sum(); + plan_builder = + plan_builder.with_predicate(array_reader, predicate.as_mut(), row_count)?; } } diff --git a/parquet/src/arrow/arrow_reader/read_plan.rs b/parquet/src/arrow/arrow_reader/read_plan.rs index e336bbf1566e..0d4098c59b3b 100644 --- a/parquet/src/arrow/arrow_reader/read_plan.rs +++ b/parquet/src/arrow/arrow_reader/read_plan.rs @@ -182,6 +182,7 @@ impl ReadPlanBuilder { mut self, array_reader: Box, predicate: &mut dyn ArrowPredicate, + row_count: usize, ) -> Result { // Build a ReadPlan for the predicate reader using only the current // selection (not deferred). We avoid cloning the entire builder by @@ -231,9 +232,8 @@ impl ReadPlanBuilder { // Measure selector density: selectors / total_rows. A high density means // many small skip/read transitions per row, which is expensive for decoding. let should_defer = self.scatter_threshold.is_some_and(|threshold| { - let total_rows = absolute.total_row_count(); - total_rows > 0 - && absolute.selector_count() as f64 / total_rows as f64 > threshold + row_count > 0 + && absolute.selector_count() as f64 / row_count as f64 > threshold }); if should_defer { diff --git a/parquet/src/arrow/arrow_reader/selection.rs b/parquet/src/arrow/arrow_reader/selection.rs index a015487903ab..517ed095e203 100644 --- a/parquet/src/arrow/arrow_reader/selection.rs +++ b/parquet/src/arrow/arrow_reader/selection.rs @@ -138,19 +138,9 @@ impl RowSelector { #[derive(Debug, Clone, Default, Eq, PartialEq)] pub struct RowSelection { selectors: Vec, - /// Cached total row count (selected + skipped) for O(1) access. - row_count: usize, } impl RowSelection { - fn from_selectors(selectors: Vec) -> Self { - let row_count = selectors.iter().map(|s| s.row_count).sum(); - Self { - selectors, - row_count, - } - } - /// Creates a [`RowSelection`] from a slice of [`BooleanArray`] /// /// # Panic @@ -201,7 +191,7 @@ impl RowSelection { selectors.push(RowSelector::skip(total_rows - last_end)) } - Self::from_selectors(selectors) + Self { selectors } } /// Given an offset index, return the byte ranges for all data pages selected by `self` @@ -307,8 +297,7 @@ impl RowSelection { Some(idx) => idx, None => { let selectors = std::mem::take(&mut self.selectors); - self.row_count = 0; - return Self::from_selectors(selectors); + return Self { selectors }; } }; @@ -327,9 +316,7 @@ impl RowSelection { next.row_count = overflow; std::mem::swap(&mut remaining, &mut self.selectors); - // Update cached row_count for self (now the tail portion) - self.row_count = self.selectors.iter().map(|s| s.row_count).sum(); - Self::from_selectors(remaining) + Self { selectors: remaining } } /// returns a [`RowSelection`] representing rows that are selected in both /// input [`RowSelection`]s. @@ -413,7 +400,7 @@ impl RowSelection { selectors.push(RowSelector::skip(to_skip)); } - Self::from_selectors(selectors) + Self { selectors } } /// Compute the intersection of two [`RowSelection`] @@ -486,7 +473,7 @@ impl RowSelection { selectors.push(RowSelector::select(selected_count - offset)); selectors.extend_from_slice(&self.selectors[split_idx + 1..]); - Self::from_selectors(selectors) + Self { selectors } } /// Limit this [`RowSelection`] to only select `limit` rows @@ -506,7 +493,6 @@ impl RowSelection { } } } - self.row_count = self.selectors.iter().map(|s| s.row_count).sum(); self } @@ -524,11 +510,6 @@ impl RowSelection { self.selectors.len() } - /// Returns the total number of rows (selected + skipped) in O(1). - pub fn total_row_count(&self) -> usize { - self.row_count - } - /// Returns the number of selected rows pub fn row_count(&self) -> usize { self.iter().filter(|s| !s.skip).map(|s| s.row_count).sum() @@ -624,7 +605,7 @@ impl FromIterator for RowSelection { } } - Self::from_selectors(selectors) + Self { selectors } } } diff --git a/parquet/src/arrow/push_decoder/reader_builder/mod.rs b/parquet/src/arrow/push_decoder/reader_builder/mod.rs index d8919b69a3d2..ed74967f47a4 100644 --- a/parquet/src/arrow/push_decoder/reader_builder/mod.rs +++ b/parquet/src/arrow/push_decoder/reader_builder/mod.rs @@ -460,8 +460,11 @@ impl RowGroupReaderBuilder { ); // `with_predicate` actually evaluates the filter - plan_builder = - plan_builder.with_predicate(array_reader, filter_info.current_mut())?; + plan_builder = plan_builder.with_predicate( + array_reader, + filter_info.current_mut(), + row_count, + )?; let row_group_info = RowGroupInfo { row_group_idx, @@ -735,6 +738,6 @@ mod tests { #[test] // Verify that the size of RowGroupDecoderState does not grow too large fn test_structure_size() { - assert_eq!(std::mem::size_of::(), 256); + assert_eq!(std::mem::size_of::(), 240); } } From 9cc2660a005eace5565315e3276194368ced1228 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Fri, 20 Mar 2026 08:38:30 +0100 Subject: [PATCH 13/15] Use scatter density threshold of 0.01 and remove debug prints Based on ClickBench profiling, scattering predicates have densities of 0.008-0.054 while clean predicates are <0.001. A threshold of 0.01 defers the scattering ones while applying the clean ones. Also removes the eprintln debug instrumentation. Co-Authored-By: Claude Opus 4.6 (1M context) --- parquet/benches/arrow_reader_clickbench.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/parquet/benches/arrow_reader_clickbench.rs b/parquet/benches/arrow_reader_clickbench.rs index 36cf30d86982..cf0491a19860 100644 --- a/parquet/benches/arrow_reader_clickbench.rs +++ b/parquet/benches/arrow_reader_clickbench.rs @@ -723,7 +723,7 @@ impl ReadTest { .with_batch_size(8192) .with_projection(self.projection_mask.clone()) .with_row_filter(self.row_filter()) - .with_scatter_threshold(Some(0.25)); + .with_scatter_threshold(Some(0.01)); let mut stream = builder.build().unwrap(); // run the stream to its end @@ -754,7 +754,7 @@ impl ReadTest { .with_batch_size(8192) .with_projection(self.projection_mask.clone()) .with_row_filter(self.row_filter()) - .with_scatter_threshold(Some(0.25)); + .with_scatter_threshold(Some(0.01)); let mut stream = builder.build().unwrap(); // run the stream to its end @@ -781,7 +781,7 @@ impl ReadTest { .with_batch_size(8192) .with_projection(self.projection_mask.clone()) .with_row_filter(self.row_filter()) - .with_scatter_threshold(Some(0.25)); + .with_scatter_threshold(Some(0.01)); let reader = builder.build().unwrap(); // run the stream to its end From 070fb5a01ed320c669eed38e09016516f2a6d54a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Fri, 20 Mar 2026 09:13:16 +0100 Subject: [PATCH 14/15] Only defer predicates that increase fragmentation Don't defer a predicate if applying it would reduce the selector count (make the selection less fragmented). Only defer when the predicate both increases selectors AND exceeds the density threshold. Co-Authored-By: Claude Opus 4.6 (1M context) --- parquet/src/arrow/arrow_reader/read_plan.rs | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/parquet/src/arrow/arrow_reader/read_plan.rs b/parquet/src/arrow/arrow_reader/read_plan.rs index 0d4098c59b3b..504aeb6cbbe3 100644 --- a/parquet/src/arrow/arrow_reader/read_plan.rs +++ b/parquet/src/arrow/arrow_reader/read_plan.rs @@ -231,8 +231,15 @@ impl ReadPlanBuilder { // Check if applying this predicate would scatter the selection too much. // Measure selector density: selectors / total_rows. A high density means // many small skip/read transitions per row, which is expensive for decoding. + // Only defer if the predicate actually increases fragmentation — if it + // reduces the selector count, always apply it. + let current_selectors = self + .selection + .as_ref() + .map_or(0, |s| s.selector_count()); let should_defer = self.scatter_threshold.is_some_and(|threshold| { row_count > 0 + && absolute.selector_count() > current_selectors && absolute.selector_count() as f64 / row_count as f64 > threshold }); @@ -270,6 +277,7 @@ impl ReadPlanBuilder { self.selection = Some(RowSelection::from(vec![])); } + // Preferred strategy must not be Auto let selection_strategy = self.resolve_selection_strategy(); From 0da100d3cd618bd49c376a43b7078efb73884991 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Fri, 20 Mar 2026 15:33:06 +0100 Subject: [PATCH 15/15] Fmt --- parquet/src/arrow/arrow_reader/read_plan.rs | 6 +----- parquet/src/arrow/arrow_reader/selection.rs | 4 +++- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/parquet/src/arrow/arrow_reader/read_plan.rs b/parquet/src/arrow/arrow_reader/read_plan.rs index 504aeb6cbbe3..7d49d9d30350 100644 --- a/parquet/src/arrow/arrow_reader/read_plan.rs +++ b/parquet/src/arrow/arrow_reader/read_plan.rs @@ -233,10 +233,7 @@ impl ReadPlanBuilder { // many small skip/read transitions per row, which is expensive for decoding. // Only defer if the predicate actually increases fragmentation — if it // reduces the selector count, always apply it. - let current_selectors = self - .selection - .as_ref() - .map_or(0, |s| s.selector_count()); + let current_selectors = self.selection.as_ref().map_or(0, |s| s.selector_count()); let should_defer = self.scatter_threshold.is_some_and(|threshold| { row_count > 0 && absolute.selector_count() > current_selectors @@ -277,7 +274,6 @@ impl ReadPlanBuilder { self.selection = Some(RowSelection::from(vec![])); } - // Preferred strategy must not be Auto let selection_strategy = self.resolve_selection_strategy(); diff --git a/parquet/src/arrow/arrow_reader/selection.rs b/parquet/src/arrow/arrow_reader/selection.rs index 517ed095e203..cf040644893d 100644 --- a/parquet/src/arrow/arrow_reader/selection.rs +++ b/parquet/src/arrow/arrow_reader/selection.rs @@ -316,7 +316,9 @@ impl RowSelection { next.row_count = overflow; std::mem::swap(&mut remaining, &mut self.selectors); - Self { selectors: remaining } + Self { + selectors: remaining, + } } /// returns a [`RowSelection`] representing rows that are selected in both /// input [`RowSelection`]s.