From b699e894aba829dea439b3b74a2a4a2f6a989c66 Mon Sep 17 00:00:00 2001 From: Leonardo Yvens Date: Thu, 19 Mar 2026 17:13:00 +0000 Subject: [PATCH 1/7] refactor(common): remove ResolvedFile in favor of Segment --- .../common/src/catalog/physical/snapshot.rs | 35 ++++++++++--------- crates/core/common/src/physical_table.rs | 6 +--- .../common/src/physical_table/resolved.rs | 12 ------- .../common/src/physical_table/snapshot.rs | 14 +------- 4 files changed, 21 insertions(+), 46 deletions(-) delete mode 100644 crates/core/common/src/physical_table/resolved.rs diff --git a/crates/core/common/src/catalog/physical/snapshot.rs b/crates/core/common/src/catalog/physical/snapshot.rs index 65a2b5958..0c385e813 100644 --- a/crates/core/common/src/catalog/physical/snapshot.rs +++ b/crates/core/common/src/catalog/physical/snapshot.rs @@ -25,7 +25,7 @@ use crate::{ BlockRange, catalog::logical::LogicalTable, physical_table::{ - MultiNetworkSegmentsError, SnapshotError, resolved::ResolvedFile, + MultiNetworkSegmentsError, SnapshotError, segments::Segment, snapshot::TableSnapshot as PhyTableSnapshot, table::PhysicalTable, }, sql::TableReference, @@ -115,14 +115,14 @@ pub struct FromCatalogError(#[source] pub SnapshotError); /// The query execution wrapper over a resolved physical table. /// /// Wraps `physical_table::TableSnapshot` with DataFusion execution concerns. -/// Does not import `Segment`, `Chain`, or `canonical_chain` — works entirely with -/// [`ResolvedFile`] values produced by `physical_table::TableSnapshot`. +/// Holds [`Segment`] values from the canonical chain, giving the execution layer +/// access to block ranges for file-level pruning. #[derive(Debug, Clone)] pub struct QueryableSnapshot { /// The underlying physical table providing storage access. physical_table: Arc, - /// Parquet files resolved from the table snapshot's canonical chain. - resolved_files: Vec, + /// Segments from the table snapshot's canonical chain. + segments: Vec, /// The contiguous block range covered by synced data, if any. synced_range: Option, /// Factory for creating parquet readers with caching and store access. @@ -150,7 +150,7 @@ impl QueryableSnapshot { }); Ok(Self { physical_table: snapshot.physical_table().clone(), - resolved_files: snapshot.resolved_files(), + segments: snapshot.canonical_segments().to_vec(), synced_range: snapshot.synced_range()?, reader_factory, sql_schema_name, @@ -226,28 +226,31 @@ impl QueryableSnapshot { Ok(predicate) } - /// Converts a resolved file into a DataFusion `PartitionedFile` with cached metadata. + /// Converts a segment into a DataFusion `PartitionedFile` with cached metadata. async fn to_partitioned_file( &self, - file: &ResolvedFile, + segment: &Segment, ) -> Result { - let metadata = self.reader_factory.get_cached_metadata(file.id).await?; - let pf = PartitionedFile::from(file.object.clone()) - .with_extensions(Arc::new(file.id)) + let metadata = self + .reader_factory + .get_cached_metadata(segment.id()) + .await?; + let pf = PartitionedFile::from(segment.object().clone()) + .with_extensions(Arc::new(segment.id())) .with_statistics(metadata.statistics); Ok(pf) } /// Resolves file metadata and computes statistics for the scan plan. - #[tracing::instrument(skip_all, err, fields(files = self.resolved_files.len()))] + #[tracing::instrument(skip_all, err, fields(files = self.segments.len()))] async fn resolve_file_groups( &self, target_partitions: usize, table_schema: SchemaRef, ) -> DataFusionResult<(Vec, datafusion::common::Statistics)> { - let file_count = self.resolved_files.len(); + let file_count = self.segments.len(); let file_stream = - futures::stream::iter(self.resolved_files.iter()).then(|f| self.to_partitioned_file(f)); + futures::stream::iter(self.segments.iter()).then(|s| self.to_partitioned_file(s)); let partitioned = round_robin(file_stream, file_count, target_partitions) .await .map_err(|e| DataFusionError::External(e.into()))?; @@ -269,7 +272,7 @@ impl TableProvider for QueryableSnapshot { TableType::Base } - #[tracing::instrument(skip_all, err, fields(table = %self.table_ref(), files = %self.resolved_files.len()))] + #[tracing::instrument(skip_all, err, fields(table = %self.table_ref(), files = %self.segments.len()))] async fn scan( &self, state: &dyn Session, @@ -277,7 +280,7 @@ impl TableProvider for QueryableSnapshot { filters: &[Expr], limit: Option, ) -> DataFusionResult> { - tracing::debug!("creating scan execution plan"); + tracing::warn!(filters = ?filters, filters_len = filters.len(), "creating scan execution plan — filter pushdown check"); if self.synced_range.is_none() { // This is necessary to work around empty tables tripping the DF sanity checker diff --git a/crates/core/common/src/physical_table.rs b/crates/core/common/src/physical_table.rs index deceee107..51f68f53e 100644 --- a/crates/core/common/src/physical_table.rs +++ b/crates/core/common/src/physical_table.rs @@ -1,19 +1,15 @@ //! Physical table — segment resolution and table identity. //! //! This module owns the concept of segments and physical table resolution. -//! The query execution layer becomes agnostic to segments — it receives resolved -//! file lists and table metadata without knowing how they were derived. //! //! ## Module layout //! //! - `file` — `FileMetadata`: file identity and metadata from DB rows //! - `segments` — `Segment`, `Chain`, `canonical_chain()`, `missing_ranges()` -//! - `resolved` — `ResolvedFile`: resolved file view for execution layer //! - `table` — `PhysicalTable`: identity + segment resolution -//! - `snapshot` — `TableSnapshot`: resolved segments + resolved files view +//! - `snapshot` — `TableSnapshot`: resolved segments view pub mod file; -pub mod resolved; pub mod segments; pub mod snapshot; pub mod table; diff --git a/crates/core/common/src/physical_table/resolved.rs b/crates/core/common/src/physical_table/resolved.rs deleted file mode 100644 index 44b81b065..000000000 --- a/crates/core/common/src/physical_table/resolved.rs +++ /dev/null @@ -1,12 +0,0 @@ -use metadata_db::files::FileId; -use object_store::ObjectMeta; - -/// A file resolved for query execution. Contains everything the execution -/// layer needs without exposing segment/chain/block-range concepts. -#[derive(Debug, Clone)] -pub struct ResolvedFile { - /// File identifier for metadata cache lookups. - pub id: FileId, - /// Object store metadata (location, size, last_modified). - pub object: ObjectMeta, -} diff --git a/crates/core/common/src/physical_table/snapshot.rs b/crates/core/common/src/physical_table/snapshot.rs index 3d601d365..c02e26714 100644 --- a/crates/core/common/src/physical_table/snapshot.rs +++ b/crates/core/common/src/physical_table/snapshot.rs @@ -2,7 +2,7 @@ use std::sync::Arc; use crate::{ BlockRange, - physical_table::{resolved::ResolvedFile, segments::Segment, table::PhysicalTable}, + physical_table::{segments::Segment, table::PhysicalTable}, }; /// A segment-resolved view of a table. @@ -47,18 +47,6 @@ impl TableSnapshot { &self.physical_table } - // Resolved file access (for execution layer) - - pub fn resolved_files(&self) -> Vec { - self.canonical_segments - .iter() - .map(|s| ResolvedFile { - id: s.id(), - object: s.object().clone(), - }) - .collect() - } - pub fn file_count(&self) -> usize { self.canonical_segments.len() } From 0a865c049755c9d9731ae8995ad91252c097aa76 Mon Sep 17 00:00:00 2001 From: Leonardo Yvens Date: Thu, 19 Mar 2026 19:50:51 +0000 Subject: [PATCH 2/7] perf(core): Logical pushdown for _block_num --- crates/core/common/src/catalog/physical.rs | 1 + .../src/catalog/physical/block_num_filter.rs | 277 ++++++++++++++++++ .../common/src/catalog/physical/snapshot.rs | 46 ++- tests/specs/segment-pruning-anvil.yaml | 105 +++++++ tests/src/tests/it_segment_pruning.rs | 108 +++++++ tests/src/tests/mod.rs | 1 + 6 files changed, 528 insertions(+), 10 deletions(-) create mode 100644 crates/core/common/src/catalog/physical/block_num_filter.rs create mode 100644 tests/specs/segment-pruning-anvil.yaml create mode 100644 tests/src/tests/it_segment_pruning.rs diff --git a/crates/core/common/src/catalog/physical.rs b/crates/core/common/src/catalog/physical.rs index 0b4550b93..6bdfa399f 100644 --- a/crates/core/common/src/catalog/physical.rs +++ b/crates/core/common/src/catalog/physical.rs @@ -1,3 +1,4 @@ +mod block_num_filter; mod catalog; pub mod for_dump; pub mod for_query; diff --git a/crates/core/common/src/catalog/physical/block_num_filter.rs b/crates/core/common/src/catalog/physical/block_num_filter.rs new file mode 100644 index 000000000..318eace85 --- /dev/null +++ b/crates/core/common/src/catalog/physical/block_num_filter.rs @@ -0,0 +1,277 @@ +//! `_block_num` filter analysis for segment-level pruning. +//! +//! Extracts block number bounds from DataFusion logical filter expressions, +//! enabling segment-level file pruning before the scan opens any Parquet files. + +use datafusion::{ + common::ScalarValue, + logical_expr::{BinaryExpr, Operator}, + prelude::Expr, +}; +use datasets_common::block_num::RESERVED_BLOCK_NUM_COLUMN_NAME; + +/// Returns `true` if `expr` is a comparison filter on the `_block_num` column. +/// +/// Recognises simple comparisons (`_block_num > N`, `N <= _block_num`, etc.) +/// and conjunctions (`AND`) where both sides are block-num filters. +pub fn is_block_num_filter(expr: &Expr) -> bool { + match expr { + Expr::BinaryExpr(BinaryExpr { left, op, right }) => match op { + Operator::Eq | Operator::Lt | Operator::LtEq | Operator::Gt | Operator::GtEq => { + is_block_num_column(left) || is_block_num_column(right) + } + Operator::And => is_block_num_filter(left) && is_block_num_filter(right), + _ => false, + }, + _ => false, + } +} + +/// Extracts inclusive `(min_block, max_block)` bounds from `_block_num` filter expressions. +/// +/// Returns the tightest bounds found; `None` means unbounded in that direction. +pub fn extract_block_num_bounds(filters: &[Expr]) -> (Option, Option) { + let mut min_block: Option = None; + let mut max_block: Option = None; + + for filter in filters { + collect_block_num_bounds(filter, &mut min_block, &mut max_block); + } + + (min_block, max_block) +} + +fn is_block_num_column(expr: &Expr) -> bool { + matches!(expr, Expr::Column(c) if c.name == RESERVED_BLOCK_NUM_COLUMN_NAME) +} + +fn collect_block_num_bounds(expr: &Expr, min_block: &mut Option, max_block: &mut Option) { + let Expr::BinaryExpr(BinaryExpr { left, op, right }) = expr else { + return; + }; + + if *op == Operator::And { + collect_block_num_bounds(left, min_block, max_block); + collect_block_num_bounds(right, min_block, max_block); + return; + } + + // Normalise to (column_side, op_from_column_perspective, literal_value). + let bound = if is_block_num_column(left) { + scalar_to_u64(right).map(|v| (*op, v)) + } else if is_block_num_column(right) { + scalar_to_u64(left).map(|v| (flip_op(*op), v)) + } else { + None + }; + + let Some((op, v)) = bound else { return }; + + match op { + // _block_num = N → min = max = N + Operator::Eq => { + tighten_min(min_block, v); + tighten_max(max_block, v); + } + // _block_num > N → min = N + 1 + Operator::Gt => tighten_min(min_block, v.saturating_add(1)), + // _block_num >= N → min = N + Operator::GtEq => tighten_min(min_block, v), + // _block_num < N → max = N - 1 + Operator::Lt => tighten_max(max_block, v.saturating_sub(1)), + // _block_num <= N → max = N + Operator::LtEq => tighten_max(max_block, v), + _ => {} + } +} + +/// Flip a comparison operator so the column is on the left: `N < col` → `col > N`. +fn flip_op(op: Operator) -> Operator { + match op { + Operator::Lt => Operator::Gt, + Operator::LtEq => Operator::GtEq, + Operator::Gt => Operator::Lt, + Operator::GtEq => Operator::LtEq, + other => other, + } +} + +fn tighten_min(current: &mut Option, candidate: u64) { + *current = Some(current.map_or(candidate, |c| c.max(candidate))); +} + +fn tighten_max(current: &mut Option, candidate: u64) { + *current = Some(current.map_or(candidate, |c| c.min(candidate))); +} + +fn scalar_to_u64(expr: &Expr) -> Option { + match expr { + Expr::Literal(ScalarValue::UInt64(Some(v)), _) => Some(*v), + Expr::Literal(ScalarValue::Int64(Some(v)), _) => u64::try_from(*v).ok(), + Expr::Literal(ScalarValue::UInt32(Some(v)), _) => Some(*v as u64), + Expr::Literal(ScalarValue::Int32(Some(v)), _) => u64::try_from(*v).ok(), + _ => None, + } +} + +#[cfg(test)] +mod tests { + use datafusion::prelude::{col, lit}; + + use super::*; + + // ----------------------------------------------------------------------- + // is_block_num_filter + // ----------------------------------------------------------------------- + + #[test] + fn recognises_block_num_gt() { + let expr = col("_block_num").gt(lit(100u64)); + assert!(is_block_num_filter(&expr)); + } + + #[test] + fn recognises_block_num_lt_eq() { + let expr = col("_block_num").lt_eq(lit(200u64)); + assert!(is_block_num_filter(&expr)); + } + + #[test] + fn recognises_block_num_eq() { + let expr = col("_block_num").eq(lit(42u64)); + assert!(is_block_num_filter(&expr)); + } + + #[test] + fn recognises_reversed_literal_op_column() { + // 50 < _block_num → should still be recognised + let expr = lit(50u64).lt(col("_block_num")); + assert!(is_block_num_filter(&expr)); + } + + #[test] + fn recognises_conjunction_of_block_num_filters() { + let expr = col("_block_num") + .gt_eq(lit(10u64)) + .and(col("_block_num").lt(lit(20u64))); + assert!(is_block_num_filter(&expr)); + } + + #[test] + fn rejects_non_block_num_column() { + let expr = col("timestamp").gt(lit(100u64)); + assert!(!is_block_num_filter(&expr)); + } + + #[test] + fn rejects_mixed_conjunction() { + // _block_num > 10 AND timestamp < 100 → false (not all legs are _block_num) + let expr = col("_block_num") + .gt(lit(10u64)) + .and(col("timestamp").lt(lit(100u64))); + assert!(!is_block_num_filter(&expr)); + } + + #[test] + fn rejects_or_operator() { + let expr = col("_block_num") + .gt(lit(10u64)) + .or(col("_block_num").lt(lit(5u64))); + assert!(!is_block_num_filter(&expr)); + } + + // ----------------------------------------------------------------------- + // extract_block_num_bounds + // ----------------------------------------------------------------------- + + #[test] + fn empty_filters_returns_unbounded() { + assert_eq!(extract_block_num_bounds(&[]), (None, None)); + } + + #[test] + fn gt_sets_min_exclusive() { + // _block_num > 100 → min = 101 + let filters = vec![col("_block_num").gt(lit(100u64))]; + assert_eq!(extract_block_num_bounds(&filters), (Some(101), None)); + } + + #[test] + fn gt_eq_sets_min_inclusive() { + let filters = vec![col("_block_num").gt_eq(lit(100u64))]; + assert_eq!(extract_block_num_bounds(&filters), (Some(100), None)); + } + + #[test] + fn lt_sets_max_exclusive() { + // _block_num < 50 → max = 49 + let filters = vec![col("_block_num").lt(lit(50u64))]; + assert_eq!(extract_block_num_bounds(&filters), (None, Some(49))); + } + + #[test] + fn lt_eq_sets_max_inclusive() { + let filters = vec![col("_block_num").lt_eq(lit(50u64))]; + assert_eq!(extract_block_num_bounds(&filters), (None, Some(50))); + } + + #[test] + fn eq_sets_both() { + let filters = vec![col("_block_num").eq(lit(42u64))]; + assert_eq!(extract_block_num_bounds(&filters), (Some(42), Some(42))); + } + + #[test] + fn range_from_two_filters() { + // _block_num >= 10, _block_num <= 20 + let filters = vec![ + col("_block_num").gt_eq(lit(10u64)), + col("_block_num").lt_eq(lit(20u64)), + ]; + assert_eq!(extract_block_num_bounds(&filters), (Some(10), Some(20))); + } + + #[test] + fn tightens_to_narrowest_range() { + // _block_num >= 10, _block_num >= 15 → min = 15 + // _block_num <= 100, _block_num <= 50 → max = 50 + let filters = vec![ + col("_block_num").gt_eq(lit(10u64)), + col("_block_num").gt_eq(lit(15u64)), + col("_block_num").lt_eq(lit(100u64)), + col("_block_num").lt_eq(lit(50u64)), + ]; + assert_eq!(extract_block_num_bounds(&filters), (Some(15), Some(50))); + } + + #[test] + fn conjunction_within_single_filter() { + // Single expression: _block_num >= 10 AND _block_num < 20 + let expr = col("_block_num") + .gt_eq(lit(10u64)) + .and(col("_block_num").lt(lit(20u64))); + assert_eq!(extract_block_num_bounds(&[expr]), (Some(10), Some(19))); + } + + #[test] + fn reversed_literal_column_order() { + // 100 <= _block_num → same as _block_num >= 100 + let filters = vec![lit(100u64).lt_eq(col("_block_num"))]; + assert_eq!(extract_block_num_bounds(&filters), (Some(100), None)); + } + + #[test] + fn ignores_non_block_num_filters() { + let filters = vec![ + col("timestamp").gt(lit(1000u64)), + col("_block_num").gt_eq(lit(5u64)), + ]; + assert_eq!(extract_block_num_bounds(&filters), (Some(5), None)); + } + + #[test] + fn handles_i64_literals() { + let filters = vec![col("_block_num").gt(lit(100i64))]; + assert_eq!(extract_block_num_bounds(&filters), (Some(101), None)); + } +} diff --git a/crates/core/common/src/catalog/physical/snapshot.rs b/crates/core/common/src/catalog/physical/snapshot.rs index 0c385e813..64b3fa968 100644 --- a/crates/core/common/src/catalog/physical/snapshot.rs +++ b/crates/core/common/src/catalog/physical/snapshot.rs @@ -13,7 +13,7 @@ use datafusion::{ physical_plan::{FileGroup, FileScanConfigBuilder, ParquetSource}, }, error::{DataFusionError, Result as DataFusionResult}, - logical_expr::{ScalarUDF, SortExpr, col, utils::conjunction}, + logical_expr::{ScalarUDF, SortExpr, TableProviderFilterPushDown, col, utils::conjunction}, physical_expr::LexOrdering, physical_plan::{ExecutionPlan, PhysicalExpr, empty::EmptyExec}, prelude::Expr, @@ -21,6 +21,7 @@ use datafusion::{ use datafusion_datasource::compute_all_files_statistics; use futures::{Stream, StreamExt as _}; +use super::block_num_filter::{extract_block_num_bounds, is_block_num_filter}; use crate::{ BlockRange, catalog::logical::LogicalTable, @@ -242,15 +243,15 @@ impl QueryableSnapshot { } /// Resolves file metadata and computes statistics for the scan plan. - #[tracing::instrument(skip_all, err, fields(files = self.segments.len()))] async fn resolve_file_groups( &self, + segments: &[&Segment], target_partitions: usize, table_schema: SchemaRef, ) -> DataFusionResult<(Vec, datafusion::common::Statistics)> { - let file_count = self.segments.len(); + let file_count = segments.len(); let file_stream = - futures::stream::iter(self.segments.iter()).then(|s| self.to_partitioned_file(s)); + futures::stream::iter(segments.iter()).then(|s| self.to_partitioned_file(s)); let partitioned = round_robin(file_stream, file_count, target_partitions) .await .map_err(|e| DataFusionError::External(e.into()))?; @@ -272,7 +273,23 @@ impl TableProvider for QueryableSnapshot { TableType::Base } - #[tracing::instrument(skip_all, err, fields(table = %self.table_ref(), files = %self.segments.len()))] + fn supports_filters_pushdown( + &self, + filters: &[&Expr], + ) -> DataFusionResult> { + Ok(filters + .iter() + .map(|f| { + if is_block_num_filter(f) { + TableProviderFilterPushDown::Inexact + } else { + TableProviderFilterPushDown::Unsupported + } + }) + .collect()) + } + + #[tracing::instrument(skip_all, err, fields(table = %self.table_ref(), segments = %self.segments.len()))] async fn scan( &self, state: &dyn Session, @@ -280,22 +297,31 @@ impl TableProvider for QueryableSnapshot { filters: &[Expr], limit: Option, ) -> DataFusionResult> { - tracing::warn!(filters = ?filters, filters_len = filters.len(), "creating scan execution plan — filter pushdown check"); - if self.synced_range.is_none() { - // This is necessary to work around empty tables tripping the DF sanity checker tracing::debug!("table has no synced data, returning empty execution plan"); let projected_schema = project_schema(&self.schema(), projection)?; return Ok(Arc::new(EmptyExec::new(projected_schema))); } + // Prune segments based on _block_num filters. + let (min_block, max_block) = extract_block_num_bounds(filters); + let segments: Vec<&Segment> = self + .segments + .iter() + .filter(|s| { + let range = s.single_range(); + let below = max_block.is_some_and(|max| range.start() > max); + let above = min_block.is_some_and(|min| range.end() < min); + !below && !above + }) + .collect(); + let target_partitions = state.config_options().execution.target_partitions; let table_schema = self.physical_table.schema(); let (file_groups, statistics) = self - .resolve_file_groups(target_partitions, table_schema.clone()) + .resolve_file_groups(&segments, target_partitions, table_schema.clone()) .await?; if statistics.num_rows == Precision::Absent { - // This log likely signifies a bug in our statistics fetching. tracing::warn!("Table has no row count statistics. Queries may be inefficient."); } diff --git a/tests/specs/segment-pruning-anvil.yaml b/tests/specs/segment-pruning-anvil.yaml new file mode 100644 index 000000000..2492ec7dc --- /dev/null +++ b/tests/specs/segment-pruning-anvil.yaml @@ -0,0 +1,105 @@ +# Segment pruning tests using Anvil +# +# Verifies that _block_num filters correctly prune segments and return +# the right data. Creates two segments (blocks 0-3 and 4-7) then queries +# with various _block_num predicates. + +- anvil: {} + +# First segment: blocks 0-3 +- name: mine_first_batch + anvil_mine: 3 + +- name: dump_first_segment + dataset: _/anvil_rpc@0.0.0 + end: 3 + +# Second segment: blocks 4-7 +- name: mine_second_batch + anvil_mine: 4 + +- name: dump_second_segment + dataset: _/anvil_rpc@0.0.0 + end: 7 + +# All blocks (no filter) — baseline +- name: all_blocks + query: SELECT block_num FROM anvil_rpc.blocks ORDER BY block_num + results: | + [ + {"block_num": 0}, + {"block_num": 1}, + {"block_num": 2}, + {"block_num": 3}, + {"block_num": 4}, + {"block_num": 5}, + {"block_num": 6}, + {"block_num": 7} + ] + +# Filter: only second segment +- name: block_num_gt_filter + query: SELECT block_num FROM anvil_rpc.blocks WHERE _block_num > 3 ORDER BY block_num + results: | + [ + {"block_num": 4}, + {"block_num": 5}, + {"block_num": 6}, + {"block_num": 7} + ] + +# Filter: only first segment +- name: block_num_lt_filter + query: SELECT block_num FROM anvil_rpc.blocks WHERE _block_num < 4 ORDER BY block_num + results: | + [ + {"block_num": 0}, + {"block_num": 1}, + {"block_num": 2}, + {"block_num": 3} + ] + +# Range filter spanning both segments +- name: block_num_range_filter + query: SELECT block_num FROM anvil_rpc.blocks WHERE _block_num >= 2 AND _block_num <= 5 ORDER BY block_num + results: | + [ + {"block_num": 2}, + {"block_num": 3}, + {"block_num": 4}, + {"block_num": 5} + ] + +# Equality filter +- name: block_num_eq_filter + query: SELECT block_num FROM anvil_rpc.blocks WHERE _block_num = 6 ORDER BY block_num + results: | + [ + {"block_num": 6} + ] + +# Filter that excludes all segments +- name: block_num_out_of_range + query: SELECT COUNT(*) AS cnt FROM anvil_rpc.blocks WHERE _block_num > 100 + results: | + [ + {"cnt": 0} + ] + +# gte filter +- name: block_num_gte_filter + query: SELECT block_num FROM anvil_rpc.blocks WHERE _block_num >= 6 ORDER BY block_num + results: | + [ + {"block_num": 6}, + {"block_num": 7} + ] + +# lte filter +- name: block_num_lte_filter + query: SELECT block_num FROM anvil_rpc.blocks WHERE _block_num <= 1 ORDER BY block_num + results: | + [ + {"block_num": 0}, + {"block_num": 1} + ] diff --git a/tests/src/tests/it_segment_pruning.rs b/tests/src/tests/it_segment_pruning.rs new file mode 100644 index 000000000..3e6f12dea --- /dev/null +++ b/tests/src/tests/it_segment_pruning.rs @@ -0,0 +1,108 @@ +//! Integration tests for segment-level _block_num filter pruning with Anvil. +//! +//! Creates two segments (blocks 0-3 and 4-7) then verifies that _block_num +//! filters prune segments correctly by checking both result correctness and +//! EXPLAIN output for reduced file group counts. + +use monitoring::logging; + +use crate::{steps::run_spec, testlib::ctx::TestCtxBuilder}; + +/// Runs the YAML spec that validates result correctness for various _block_num filters, +/// then checks EXPLAIN output to verify that segment pruning reduces file groups. +#[tokio::test(flavor = "multi_thread")] +async fn segment_pruning() { + logging::init(); + + let test_ctx = TestCtxBuilder::new("segment_pruning") + .with_anvil_ipc() + .with_dataset_manifest("anvil_rpc") + .build() + .await + .expect("Failed to create test environment"); + + let mut client = test_ctx + .new_flight_client() + .await + .expect("Failed to connect FlightClient"); + + // Run the full spec: sets up two segments (blocks 0-3 and 4-7) and + // validates result correctness for various _block_num filters. + run_spec("segment-pruning-anvil", &test_ctx, &mut client, None) + .await + .expect("Failed to run segment pruning spec"); + + // Now verify EXPLAIN output to confirm segment pruning reduces file groups. + + // Baseline: no filter should see file_groups from both segments. + let baseline_plan = explain(&mut client, "SELECT block_num FROM anvil_rpc.blocks").await; + let baseline_groups = count_file_groups(&baseline_plan); + assert!( + baseline_groups > 0, + "Baseline should have file groups.\nplan: {baseline_plan}" + ); + + // Filtered: _block_num > 3 should only touch the second segment. + let filtered_plan = explain( + &mut client, + "SELECT block_num FROM anvil_rpc.blocks WHERE _block_num > 3", + ) + .await; + let filtered_groups = count_file_groups(&filtered_plan); + + assert!( + filtered_groups < baseline_groups, + "Expected fewer file groups with _block_num filter.\n\ + baseline groups: {baseline_groups}\n\ + filtered groups: {filtered_groups}\n\ + baseline plan: {baseline_plan}\n\ + filtered plan: {filtered_plan}" + ); + + // Out-of-range filter should produce an empty plan (no file groups). + let empty_plan = explain( + &mut client, + "SELECT block_num FROM anvil_rpc.blocks WHERE _block_num > 100", + ) + .await; + let empty_groups = count_file_groups(&empty_plan); + + assert_eq!( + empty_groups, 0, + "Expected zero file groups for out-of-range filter.\nplan: {empty_plan}" + ); +} + +/// Runs EXPLAIN on a query and returns the plan text. +async fn explain(client: &mut crate::testlib::fixtures::FlightClient, query: &str) -> String { + let explain_query = format!("EXPLAIN {query}"); + let (json, _) = client + .run_query(&explain_query, None) + .await + .unwrap_or_else(|e| panic!("EXPLAIN query failed: {e}\nquery: {explain_query}")); + + // EXPLAIN returns rows with "plan_type" and "plan" columns. + match json { + serde_json::Value::Array(rows) => rows + .iter() + .filter_map(|row| row.get("plan").and_then(|v| v.as_str())) + .collect::>() + .join("\n"), + other => panic!("Unexpected EXPLAIN output format: {other:?}"), + } +} + +/// Counts the number of file groups in an EXPLAIN plan. +/// +/// Looks for the `file_groups={N group` pattern in the plan text. +/// Returns 0 if the pattern is not found (e.g. EmptyExec plans). +fn count_file_groups(plan: &str) -> usize { + // Pattern: "file_groups={14 groups:" or "file_groups={1 group:" + let prefix = "file_groups={"; + let Some(start) = plan.find(prefix) else { + return 0; + }; + let after = &plan[start + prefix.len()..]; + let end = after.find(' ').unwrap_or(after.len()); + after[..end].parse().unwrap_or(0) +} diff --git a/tests/src/tests/mod.rs b/tests/src/tests/mod.rs index aecee243f..875e6a716 100644 --- a/tests/src/tests/mod.rs +++ b/tests/src/tests/mod.rs @@ -21,6 +21,7 @@ mod it_multi_network_batch; mod it_multi_table_continuous; mod it_non_incremental; mod it_reorg; +mod it_segment_pruning; mod it_server_sql_references; mod it_solana_historical_to_json_rpc_transition; mod it_sql; From 62af021238c207c8f23c1f819b86a76db48a5c9f Mon Sep 17 00:00:00 2001 From: Leonardo Yvens Date: Fri, 20 Mar 2026 10:29:23 +0000 Subject: [PATCH 3/7] fix(core): address review --- crates/core/common/src/catalog/physical/snapshot.rs | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/crates/core/common/src/catalog/physical/snapshot.rs b/crates/core/common/src/catalog/physical/snapshot.rs index 64b3fa968..f091cb7c5 100644 --- a/crates/core/common/src/catalog/physical/snapshot.rs +++ b/crates/core/common/src/catalog/physical/snapshot.rs @@ -315,6 +315,18 @@ impl TableProvider for QueryableSnapshot { !below && !above }) .collect(); + tracing::debug!( + total = self.segments.len(), + kept = segments.len(), + ?min_block, + ?max_block, + "segment pruning" + ); + + if segments.is_empty() { + let projected_schema = project_schema(&self.schema(), projection)?; + return Ok(Arc::new(EmptyExec::new(projected_schema))); + } let target_partitions = state.config_options().execution.target_partitions; let table_schema = self.physical_table.schema(); From 05bd98e75ef57ec90c2b2112ae8ca87c114e1e2b Mon Sep 17 00:00:00 2001 From: Leonardo Yvens Date: Fri, 20 Mar 2026 11:11:00 +0000 Subject: [PATCH 4/7] feat(core): Leverage DF expr analysis for segment pruning --- .../src/catalog/physical/block_num_filter.rs | 344 ++++++++++-------- .../common/src/catalog/physical/snapshot.rs | 13 +- 2 files changed, 203 insertions(+), 154 deletions(-) diff --git a/crates/core/common/src/catalog/physical/block_num_filter.rs b/crates/core/common/src/catalog/physical/block_num_filter.rs index 318eace85..4e13f94b7 100644 --- a/crates/core/common/src/catalog/physical/block_num_filter.rs +++ b/crates/core/common/src/catalog/physical/block_num_filter.rs @@ -1,11 +1,19 @@ //! `_block_num` filter analysis for segment-level pruning. //! -//! Extracts block number bounds from DataFusion logical filter expressions, -//! enabling segment-level file pruning before the scan opens any Parquet files. +//! Uses DataFusion's `ExprSimplifier` with interval guarantees to determine +//! whether a predicate can possibly be satisfied by a given block range. + +use std::sync::Arc; use datafusion::{ - common::ScalarValue, - logical_expr::{BinaryExpr, Operator}, + common::{DFSchema, ScalarValue}, + logical_expr::{ + BinaryExpr, Operator, + execution_props::ExecutionProps, + interval_arithmetic::{Interval, NullableInterval}, + simplify::SimplifyContext, + }, + optimizer::simplify_expressions::ExprSimplifier, prelude::Expr, }; use datasets_common::block_num::RESERVED_BLOCK_NUM_COLUMN_NAME; @@ -27,126 +35,111 @@ pub fn is_block_num_filter(expr: &Expr) -> bool { } } -/// Extracts inclusive `(min_block, max_block)` bounds from `_block_num` filter expressions. -/// -/// Returns the tightest bounds found; `None` means unbounded in that direction. -pub fn extract_block_num_bounds(filters: &[Expr]) -> (Option, Option) { - let mut min_block: Option = None; - let mut max_block: Option = None; - - for filter in filters { - collect_block_num_bounds(filter, &mut min_block, &mut max_block); - } - - (min_block, max_block) -} - fn is_block_num_column(expr: &Expr) -> bool { matches!(expr, Expr::Column(c) if c.name == RESERVED_BLOCK_NUM_COLUMN_NAME) } -fn collect_block_num_bounds(expr: &Expr, min_block: &mut Option, max_block: &mut Option) { - let Expr::BinaryExpr(BinaryExpr { left, op, right }) = expr else { - return; - }; - - if *op == Operator::And { - collect_block_num_bounds(left, min_block, max_block); - collect_block_num_bounds(right, min_block, max_block); - return; +/// Returns `false` only when the filters are **provably unsatisfiable** for a +/// segment covering the block range `[start, end]`. +/// +/// A `true` return does **not** guarantee matching rows exist — it means the +/// simplifier could not rule the segment out. Callers must tolerate false +/// positives (segments kept unnecessarily) but will never see false negatives +/// (segments pruned that contained matching rows). +/// +/// Internally, this feeds the segment's `_block_num` range as a +/// [`NullableInterval`] guarantee into DataFusion's [`ExprSimplifier`]. If any +/// filter simplifies to the literal `false`, the segment is provably empty. +/// Filters the simplifier cannot fully evaluate (e.g. arithmetic expressions, +/// references to other columns) are conservatively treated as satisfiable. +pub fn filters_maybe_satisfiable_for_range( + filters: &[Expr], + schema: &Arc, + start: u64, + end: u64, +) -> bool { + if filters.is_empty() { + return true; } - // Normalise to (column_side, op_from_column_perspective, literal_value). - let bound = if is_block_num_column(left) { - scalar_to_u64(right).map(|v| (*op, v)) - } else if is_block_num_column(right) { - scalar_to_u64(left).map(|v| (flip_op(*op), v)) - } else { - None + let interval = NullableInterval::NotNull { + values: match Interval::try_new( + ScalarValue::UInt64(Some(start)), + ScalarValue::UInt64(Some(end)), + ) { + Ok(iv) => iv, + Err(_) => return true, // can't build interval → don't prune + }, }; - let Some((op, v)) = bound else { return }; + let guarantees = vec![( + Expr::Column(RESERVED_BLOCK_NUM_COLUMN_NAME.into()), + interval, + )]; - match op { - // _block_num = N → min = max = N - Operator::Eq => { - tighten_min(min_block, v); - tighten_max(max_block, v); - } - // _block_num > N → min = N + 1 - Operator::Gt => tighten_min(min_block, v.saturating_add(1)), - // _block_num >= N → min = N - Operator::GtEq => tighten_min(min_block, v), - // _block_num < N → max = N - 1 - Operator::Lt => tighten_max(max_block, v.saturating_sub(1)), - // _block_num <= N → max = N - Operator::LtEq => tighten_max(max_block, v), - _ => {} - } -} + let props = ExecutionProps::new(); + let context = SimplifyContext::new(&props).with_schema(Arc::clone(schema)); + let simplifier = ExprSimplifier::new(context).with_guarantees(guarantees); -/// Flip a comparison operator so the column is on the left: `N < col` → `col > N`. -fn flip_op(op: Operator) -> Operator { - match op { - Operator::Lt => Operator::Gt, - Operator::LtEq => Operator::GtEq, - Operator::Gt => Operator::Lt, - Operator::GtEq => Operator::LtEq, - other => other, + for filter in filters { + match simplifier.simplify(filter.clone()) { + Ok(Expr::Literal(ScalarValue::Boolean(Some(false)), _)) => return false, + _ => {} + } } -} - -fn tighten_min(current: &mut Option, candidate: u64) { - *current = Some(current.map_or(candidate, |c| c.max(candidate))); -} - -fn tighten_max(current: &mut Option, candidate: u64) { - *current = Some(current.map_or(candidate, |c| c.min(candidate))); -} -fn scalar_to_u64(expr: &Expr) -> Option { - match expr { - Expr::Literal(ScalarValue::UInt64(Some(v)), _) => Some(*v), - Expr::Literal(ScalarValue::Int64(Some(v)), _) => u64::try_from(*v).ok(), - Expr::Literal(ScalarValue::UInt32(Some(v)), _) => Some(*v as u64), - Expr::Literal(ScalarValue::Int32(Some(v)), _) => u64::try_from(*v).ok(), - _ => None, - } + true } #[cfg(test)] mod tests { - use datafusion::prelude::{col, lit}; + use std::sync::Arc; + + use arrow::datatypes::{DataType, Field, Schema}; + use datafusion::{ + common::{DFSchema, ToDFSchema as _}, + prelude::{col, lit}, + }; use super::*; + fn test_schema() -> Arc { + Arc::new( + Schema::new(vec![ + Field::new("_block_num", DataType::UInt64, false), + Field::new("gas_used", DataType::UInt64, false), + ]) + .to_dfschema() + .unwrap(), + ) + } + + fn sat(filters: &[Expr], start: u64, end: u64) -> bool { + filters_maybe_satisfiable_for_range(filters, &test_schema(), start, end) + } + // ----------------------------------------------------------------------- // is_block_num_filter // ----------------------------------------------------------------------- #[test] fn recognises_block_num_gt() { - let expr = col("_block_num").gt(lit(100u64)); - assert!(is_block_num_filter(&expr)); + assert!(is_block_num_filter(&col("_block_num").gt(lit(100u64)))); } #[test] fn recognises_block_num_lt_eq() { - let expr = col("_block_num").lt_eq(lit(200u64)); - assert!(is_block_num_filter(&expr)); + assert!(is_block_num_filter(&col("_block_num").lt_eq(lit(200u64)))); } #[test] fn recognises_block_num_eq() { - let expr = col("_block_num").eq(lit(42u64)); - assert!(is_block_num_filter(&expr)); + assert!(is_block_num_filter(&col("_block_num").eq(lit(42u64)))); } #[test] fn recognises_reversed_literal_op_column() { - // 50 < _block_num → should still be recognised - let expr = lit(50u64).lt(col("_block_num")); - assert!(is_block_num_filter(&expr)); + assert!(is_block_num_filter(&lit(50u64).lt(col("_block_num")))); } #[test] @@ -159,13 +152,11 @@ mod tests { #[test] fn rejects_non_block_num_column() { - let expr = col("timestamp").gt(lit(100u64)); - assert!(!is_block_num_filter(&expr)); + assert!(!is_block_num_filter(&col("timestamp").gt(lit(100u64)))); } #[test] fn rejects_mixed_conjunction() { - // _block_num > 10 AND timestamp < 100 → false (not all legs are _block_num) let expr = col("_block_num") .gt(lit(10u64)) .and(col("timestamp").lt(lit(100u64))); @@ -181,97 +172,158 @@ mod tests { } // ----------------------------------------------------------------------- - // extract_block_num_bounds + // filters_maybe_satisfiable_for_range — basic comparisons // ----------------------------------------------------------------------- #[test] - fn empty_filters_returns_unbounded() { - assert_eq!(extract_block_num_bounds(&[]), (None, None)); + fn gt_prunes_below() { + // _block_num > 100, segment [0, 50] → all values ≤ 50, none > 100 + assert!(!sat(&[col("_block_num").gt(lit(100u64))], 0, 50)); } #[test] - fn gt_sets_min_exclusive() { - // _block_num > 100 → min = 101 - let filters = vec![col("_block_num").gt(lit(100u64))]; - assert_eq!(extract_block_num_bounds(&filters), (Some(101), None)); + fn gt_keeps_overlap() { + // _block_num > 100, segment [0, 150] → some values > 100 + assert!(sat(&[col("_block_num").gt(lit(100u64))], 0, 150)); } #[test] - fn gt_eq_sets_min_inclusive() { - let filters = vec![col("_block_num").gt_eq(lit(100u64))]; - assert_eq!(extract_block_num_bounds(&filters), (Some(100), None)); + fn gt_keeps_above() { + // _block_num > 100, segment [200, 300] → all values > 100 + assert!(sat(&[col("_block_num").gt(lit(100u64))], 200, 300)); } + // ----------------------------------------------------------------------- + // range conjunctions + // ----------------------------------------------------------------------- + #[test] - fn lt_sets_max_exclusive() { - // _block_num < 50 → max = 49 - let filters = vec![col("_block_num").lt(lit(50u64))]; - assert_eq!(extract_block_num_bounds(&filters), (None, Some(49))); + fn range_keeps_overlap() { + // _block_num >= 10 AND _block_num <= 20, segment [15, 25] + let filters = vec![ + col("_block_num") + .gt_eq(lit(10u64)) + .and(col("_block_num").lt_eq(lit(20u64))), + ]; + assert!(sat(&filters, 15, 25)); } #[test] - fn lt_eq_sets_max_inclusive() { - let filters = vec![col("_block_num").lt_eq(lit(50u64))]; - assert_eq!(extract_block_num_bounds(&filters), (None, Some(50))); + fn range_prunes_disjoint() { + // _block_num >= 10 AND _block_num <= 20, segment [25, 35] + let filters = vec![ + col("_block_num") + .gt_eq(lit(10u64)) + .and(col("_block_num").lt_eq(lit(20u64))), + ]; + assert!(!sat(&filters, 25, 35)); } + // ----------------------------------------------------------------------- + // equality + // ----------------------------------------------------------------------- + #[test] - fn eq_sets_both() { - let filters = vec![col("_block_num").eq(lit(42u64))]; - assert_eq!(extract_block_num_bounds(&filters), (Some(42), Some(42))); + fn eq_keeps_containing_range() { + assert!(sat(&[col("_block_num").eq(lit(42u64))], 40, 50)); } #[test] - fn range_from_two_filters() { - // _block_num >= 10, _block_num <= 20 - let filters = vec![ - col("_block_num").gt_eq(lit(10u64)), - col("_block_num").lt_eq(lit(20u64)), - ]; - assert_eq!(extract_block_num_bounds(&filters), (Some(10), Some(20))); + fn eq_prunes_non_containing_range() { + assert!(!sat(&[col("_block_num").eq(lit(42u64))], 50, 60)); } + // ----------------------------------------------------------------------- + // arithmetic in predicates + // ----------------------------------------------------------------------- + + // NOTE: DF guarantee rewriter doesn't currently support all kinds of arithmetic. + // These tests document the current conservative behavior (no pruning). + // Future DF versions may allow upgrading some of these to assert pruning. + #[test] - fn tightens_to_narrowest_range() { - // _block_num >= 10, _block_num >= 15 → min = 15 - // _block_num <= 100, _block_num <= 50 → max = 50 - let filters = vec![ - col("_block_num").gt_eq(lit(10u64)), - col("_block_num").gt_eq(lit(15u64)), - col("_block_num").lt_eq(lit(100u64)), - col("_block_num").lt_eq(lit(50u64)), - ]; - assert_eq!(extract_block_num_bounds(&filters), (Some(15), Some(50))); + fn arithmetic_not_pruned_conservatively() { + // _block_num + 10 > 100 with [0, 50] — could prune but DF can't prove it + assert!(sat( + &[(col("_block_num") + lit(10u64)).gt(lit(100u64))], + 0, + 50 + )); } #[test] - fn conjunction_within_single_filter() { - // Single expression: _block_num >= 10 AND _block_num < 20 - let expr = col("_block_num") - .gt_eq(lit(10u64)) - .and(col("_block_num").lt(lit(20u64))); - assert_eq!(extract_block_num_bounds(&[expr]), (Some(10), Some(19))); + fn arithmetic_keeps_overlap() { + // _block_num + 10 > 100, segment [85, 120]: definitely satisfiable + assert!(sat( + &[(col("_block_num") + lit(10u64)).gt(lit(100u64))], + 85, + 120 + )); } + // ----------------------------------------------------------------------- + // NOT / negation + // ----------------------------------------------------------------------- + #[test] - fn reversed_literal_column_order() { - // 100 <= _block_num → same as _block_num >= 100 - let filters = vec![lit(100u64).lt_eq(col("_block_num"))]; - assert_eq!(extract_block_num_bounds(&filters), (Some(100), None)); + fn not_prunes_when_inner_is_always_true() { + // NOT (_block_num > 100), segment [200, 300] + // _block_num > 100 is always true for [200, 300], so NOT is always false + assert!(!sat( + &[Expr::Not(Box::new(col("_block_num").gt(lit(100u64))))], + 200, + 300 + )); } #[test] - fn ignores_non_block_num_filters() { - let filters = vec![ - col("timestamp").gt(lit(1000u64)), - col("_block_num").gt_eq(lit(5u64)), - ]; - assert_eq!(extract_block_num_bounds(&filters), (Some(5), None)); + fn not_keeps_when_inner_is_always_false() { + // NOT (_block_num > 100), segment [0, 50] + // _block_num > 100 is always false for [0, 50], so NOT is always true + assert!(sat( + &[Expr::Not(Box::new(col("_block_num").gt(lit(100u64))))], + 0, + 50 + )); + } + + #[test] + fn not_keeps_partial_overlap() { + // NOT (_block_num > 100), segment [50, 150] + // some values ≤ 100 exist, so NOT can be true + assert!(sat( + &[Expr::Not(Box::new(col("_block_num").gt(lit(100u64))))], + 50, + 150 + )); } + // ----------------------------------------------------------------------- + // non-block-num filters (must not prune) + // ----------------------------------------------------------------------- + #[test] - fn handles_i64_literals() { - let filters = vec![col("_block_num").gt(lit(100i64))]; - assert_eq!(extract_block_num_bounds(&filters), (Some(101), None)); + fn non_block_num_filter_never_prunes() { + assert!(sat(&[col("gas_used").gt(lit(0u64))], 0, 50)); + } + + #[test] + fn empty_filters_never_prunes() { + assert!(sat(&[], 0, 50)); + } + + // ----------------------------------------------------------------------- + // mixed filters + // ----------------------------------------------------------------------- + + #[test] + fn mixed_filters_prune_on_block_num() { + // Two separate filters: _block_num > 100 AND gas_used > 0 + // The _block_num filter should still cause pruning for [0, 50] + let filters = vec![ + col("_block_num").gt(lit(100u64)), + col("gas_used").gt(lit(0u64)), + ]; + assert!(!sat(&filters, 0, 50)); } } diff --git a/crates/core/common/src/catalog/physical/snapshot.rs b/crates/core/common/src/catalog/physical/snapshot.rs index f091cb7c5..e08531081 100644 --- a/crates/core/common/src/catalog/physical/snapshot.rs +++ b/crates/core/common/src/catalog/physical/snapshot.rs @@ -21,7 +21,7 @@ use datafusion::{ use datafusion_datasource::compute_all_files_statistics; use futures::{Stream, StreamExt as _}; -use super::block_num_filter::{extract_block_num_bounds, is_block_num_filter}; +use super::block_num_filter::{filters_maybe_satisfiable_for_range, is_block_num_filter}; use crate::{ BlockRange, catalog::logical::LogicalTable, @@ -303,23 +303,20 @@ impl TableProvider for QueryableSnapshot { return Ok(Arc::new(EmptyExec::new(projected_schema))); } - // Prune segments based on _block_num filters. - let (min_block, max_block) = extract_block_num_bounds(filters); + // Prune segments whose block ranges are provably unsatisfiable + // given the pushed-down filters. + let df_schema = Arc::new(DFSchema::try_from(self.physical_table.schema())?); let segments: Vec<&Segment> = self .segments .iter() .filter(|s| { let range = s.single_range(); - let below = max_block.is_some_and(|max| range.start() > max); - let above = min_block.is_some_and(|min| range.end() < min); - !below && !above + filters_maybe_satisfiable_for_range(filters, &df_schema, range.start(), range.end()) }) .collect(); tracing::debug!( total = self.segments.len(), kept = segments.len(), - ?min_block, - ?max_block, "segment pruning" ); From 0ba36ebb5e8bc67b6a79afc6d668f162a743c3ab Mon Sep 17 00:00:00 2001 From: Leonardo Yvens Date: Fri, 20 Mar 2026 11:16:01 +0000 Subject: [PATCH 5/7] fix(common): adjust log --- .../core/common/src/catalog/physical/snapshot.rs | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/crates/core/common/src/catalog/physical/snapshot.rs b/crates/core/common/src/catalog/physical/snapshot.rs index e08531081..da019226b 100644 --- a/crates/core/common/src/catalog/physical/snapshot.rs +++ b/crates/core/common/src/catalog/physical/snapshot.rs @@ -314,11 +314,15 @@ impl TableProvider for QueryableSnapshot { filters_maybe_satisfiable_for_range(filters, &df_schema, range.start(), range.end()) }) .collect(); - tracing::debug!( - total = self.segments.len(), - kept = segments.len(), - "segment pruning" - ); + + if segments.len() != self.segments.len() { + tracing::debug!( + total = self.segments.len(), + kept = segments.len(), + pruned = self.segments.len() - segments.len(), + "pruned segments based on block range filters" + ); + } if segments.is_empty() { let projected_schema = project_schema(&self.schema(), projection)?; From 2c31b66d6fd9b4200902699133b862596c232720 Mon Sep 17 00:00:00 2001 From: Leonardo Yvens Date: Fri, 20 Mar 2026 11:38:17 +0000 Subject: [PATCH 6/7] chore(core): fix clippy --- .../core/common/src/catalog/physical/block_num_filter.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/crates/core/common/src/catalog/physical/block_num_filter.rs b/crates/core/common/src/catalog/physical/block_num_filter.rs index 4e13f94b7..b5adbfc14 100644 --- a/crates/core/common/src/catalog/physical/block_num_filter.rs +++ b/crates/core/common/src/catalog/physical/block_num_filter.rs @@ -82,9 +82,10 @@ pub fn filters_maybe_satisfiable_for_range( let simplifier = ExprSimplifier::new(context).with_guarantees(guarantees); for filter in filters { - match simplifier.simplify(filter.clone()) { - Ok(Expr::Literal(ScalarValue::Boolean(Some(false)), _)) => return false, - _ => {} + if let Ok(Expr::Literal(ScalarValue::Boolean(Some(false)), _)) = + simplifier.simplify(filter.clone()) + { + return false; } } From 9f18757ea7f7deeb5bbff2b74a657e10291e4605 Mon Sep 17 00:00:00 2001 From: Leonardo Yvens Date: Fri, 20 Mar 2026 16:30:26 +0000 Subject: [PATCH 7/7] fix(core): relax `supports_filters_pushdown` --- .../src/catalog/physical/block_num_filter.rs | 174 +++++++++--------- .../common/src/catalog/physical/snapshot.rs | 4 +- 2 files changed, 86 insertions(+), 92 deletions(-) diff --git a/crates/core/common/src/catalog/physical/block_num_filter.rs b/crates/core/common/src/catalog/physical/block_num_filter.rs index b5adbfc14..a224b92fd 100644 --- a/crates/core/common/src/catalog/physical/block_num_filter.rs +++ b/crates/core/common/src/catalog/physical/block_num_filter.rs @@ -6,9 +6,8 @@ use std::sync::Arc; use datafusion::{ - common::{DFSchema, ScalarValue}, + common::{DFSchema, ScalarValue, tree_node::TreeNode}, logical_expr::{ - BinaryExpr, Operator, execution_props::ExecutionProps, interval_arithmetic::{Interval, NullableInterval}, simplify::SimplifyContext, @@ -18,25 +17,23 @@ use datafusion::{ }; use datasets_common::block_num::RESERVED_BLOCK_NUM_COLUMN_NAME; -/// Returns `true` if `expr` is a comparison filter on the `_block_num` column. +/// Returns `true` if `expr` references the `_block_num` column anywhere. /// -/// Recognises simple comparisons (`_block_num > N`, `N <= _block_num`, etc.) -/// and conjunctions (`AND`) where both sides are block-num filters. -pub fn is_block_num_filter(expr: &Expr) -> bool { - match expr { - Expr::BinaryExpr(BinaryExpr { left, op, right }) => match op { - Operator::Eq | Operator::Lt | Operator::LtEq | Operator::Gt | Operator::GtEq => { - is_block_num_column(left) || is_block_num_column(right) - } - Operator::And => is_block_num_filter(left) && is_block_num_filter(right), - _ => false, - }, - _ => false, - } -} - -fn is_block_num_column(expr: &Expr) -> bool { - matches!(expr, Expr::Column(c) if c.name == RESERVED_BLOCK_NUM_COLUMN_NAME) +/// Used by `supports_filters_pushdown` to decide which filters to mark +/// `Inexact` so they reach `scan()`. Any filter mentioning `_block_num` is +/// worth pushing down — the simplifier will conservatively keep it if it +/// can't prove unsatisfiability. +pub fn references_block_num(expr: &Expr) -> bool { + let mut found = false; + let _ = expr.apply(|node| { + if matches!(node, Expr::Column(c) if c.name == RESERVED_BLOCK_NUM_COLUMN_NAME) { + found = true; + Ok(datafusion::common::tree_node::TreeNodeRecursion::Stop) + } else { + Ok(datafusion::common::tree_node::TreeNodeRecursion::Continue) + } + }); + found } /// Returns `false` only when the filters are **provably unsatisfiable** for a @@ -115,82 +112,87 @@ mod tests { ) } + /// Mirrors the real pipeline: only filters that pass `references_block_num` + /// (i.e. that `supports_filters_pushdown` would mark `Inexact`) reach the + /// simplifier. fn sat(filters: &[Expr], start: u64, end: u64) -> bool { - filters_maybe_satisfiable_for_range(filters, &test_schema(), start, end) + let pushed: Vec = filters + .iter() + .filter(|f| references_block_num(f)) + .cloned() + .collect(); + filters_maybe_satisfiable_for_range(&pushed, &test_schema(), start, end) } // ----------------------------------------------------------------------- - // is_block_num_filter + // references_block_num — gate tests // ----------------------------------------------------------------------- #[test] - fn recognises_block_num_gt() { - assert!(is_block_num_filter(&col("_block_num").gt(lit(100u64)))); + fn gate_accepts_simple_comparison() { + assert!(references_block_num(&col("_block_num").gt(lit(100u64)))); } #[test] - fn recognises_block_num_lt_eq() { - assert!(is_block_num_filter(&col("_block_num").lt_eq(lit(200u64)))); + fn gate_accepts_conjunction() { + let expr = col("_block_num") + .gt_eq(lit(10u64)) + .and(col("_block_num").lt(lit(20u64))); + assert!(references_block_num(&expr)); } #[test] - fn recognises_block_num_eq() { - assert!(is_block_num_filter(&col("_block_num").eq(lit(42u64)))); + fn gate_accepts_not() { + assert!(references_block_num(&Expr::Not(Box::new( + col("_block_num").gt(lit(100u64)) + )))); } #[test] - fn recognises_reversed_literal_op_column() { - assert!(is_block_num_filter(&lit(50u64).lt(col("_block_num")))); + fn gate_accepts_arithmetic() { + assert!(references_block_num( + &(col("_block_num") + lit(10u64)).gt(lit(100u64)) + )); } #[test] - fn recognises_conjunction_of_block_num_filters() { + fn gate_accepts_or() { let expr = col("_block_num") - .gt_eq(lit(10u64)) - .and(col("_block_num").lt(lit(20u64))); - assert!(is_block_num_filter(&expr)); - } - - #[test] - fn rejects_non_block_num_column() { - assert!(!is_block_num_filter(&col("timestamp").gt(lit(100u64)))); + .gt(lit(10u64)) + .or(col("_block_num").lt(lit(5u64))); + assert!(references_block_num(&expr)); } #[test] - fn rejects_mixed_conjunction() { + fn gate_accepts_mixed_conjunction() { + // _block_num > 10 AND gas_used < 100 — references _block_num let expr = col("_block_num") .gt(lit(10u64)) - .and(col("timestamp").lt(lit(100u64))); - assert!(!is_block_num_filter(&expr)); + .and(col("gas_used").lt(lit(100u64))); + assert!(references_block_num(&expr)); } #[test] - fn rejects_or_operator() { - let expr = col("_block_num") - .gt(lit(10u64)) - .or(col("_block_num").lt(lit(5u64))); - assert!(!is_block_num_filter(&expr)); + fn gate_rejects_non_block_num() { + assert!(!references_block_num(&col("gas_used").gt(lit(100u64)))); } // ----------------------------------------------------------------------- - // filters_maybe_satisfiable_for_range — basic comparisons + // Full pipeline: gate + simplifier — basic comparisons // ----------------------------------------------------------------------- #[test] fn gt_prunes_below() { - // _block_num > 100, segment [0, 50] → all values ≤ 50, none > 100 assert!(!sat(&[col("_block_num").gt(lit(100u64))], 0, 50)); } #[test] fn gt_keeps_overlap() { - // _block_num > 100, segment [0, 150] → some values > 100 assert!(sat(&[col("_block_num").gt(lit(100u64))], 0, 150)); } #[test] fn gt_keeps_above() { - // _block_num > 100, segment [200, 300] → all values > 100 assert!(sat(&[col("_block_num").gt(lit(100u64))], 200, 300)); } @@ -200,7 +202,6 @@ mod tests { #[test] fn range_keeps_overlap() { - // _block_num >= 10 AND _block_num <= 20, segment [15, 25] let filters = vec![ col("_block_num") .gt_eq(lit(10u64)) @@ -211,7 +212,6 @@ mod tests { #[test] fn range_prunes_disjoint() { - // _block_num >= 10 AND _block_num <= 20, segment [25, 35] let filters = vec![ col("_block_num") .gt_eq(lit(10u64)) @@ -235,41 +235,13 @@ mod tests { } // ----------------------------------------------------------------------- - // arithmetic in predicates - // ----------------------------------------------------------------------- - - // NOTE: DF guarantee rewriter doesn't currently support all kinds of arithmetic. - // These tests document the current conservative behavior (no pruning). - // Future DF versions may allow upgrading some of these to assert pruning. - - #[test] - fn arithmetic_not_pruned_conservatively() { - // _block_num + 10 > 100 with [0, 50] — could prune but DF can't prove it - assert!(sat( - &[(col("_block_num") + lit(10u64)).gt(lit(100u64))], - 0, - 50 - )); - } - - #[test] - fn arithmetic_keeps_overlap() { - // _block_num + 10 > 100, segment [85, 120]: definitely satisfiable - assert!(sat( - &[(col("_block_num") + lit(10u64)).gt(lit(100u64))], - 85, - 120 - )); - } - - // ----------------------------------------------------------------------- - // NOT / negation + // NOT / negation — gate now accepts, simplifier evaluates // ----------------------------------------------------------------------- #[test] fn not_prunes_when_inner_is_always_true() { // NOT (_block_num > 100), segment [200, 300] - // _block_num > 100 is always true for [200, 300], so NOT is always false + // inner is always true → NOT is always false → prune assert!(!sat( &[Expr::Not(Box::new(col("_block_num").gt(lit(100u64))))], 200, @@ -280,7 +252,7 @@ mod tests { #[test] fn not_keeps_when_inner_is_always_false() { // NOT (_block_num > 100), segment [0, 50] - // _block_num > 100 is always false for [0, 50], so NOT is always true + // inner is always false → NOT is always true → keep assert!(sat( &[Expr::Not(Box::new(col("_block_num").gt(lit(100u64))))], 0, @@ -290,8 +262,6 @@ mod tests { #[test] fn not_keeps_partial_overlap() { - // NOT (_block_num > 100), segment [50, 150] - // some values ≤ 100 exist, so NOT can be true assert!(sat( &[Expr::Not(Box::new(col("_block_num").gt(lit(100u64))))], 50, @@ -300,7 +270,31 @@ mod tests { } // ----------------------------------------------------------------------- - // non-block-num filters (must not prune) + // arithmetic — gate accepts, simplifier is conservative + // ----------------------------------------------------------------------- + + #[test] + fn arithmetic_conservative_no_prune() { + // _block_num + 10 > 100 with [0, 50] — could prune but DF v52 can't + // prove it. Conservative: keep the segment. + assert!(sat( + &[(col("_block_num") + lit(10u64)).gt(lit(100u64))], + 0, + 50 + )); + } + + #[test] + fn arithmetic_keeps_overlap() { + assert!(sat( + &[(col("_block_num") + lit(10u64)).gt(lit(100u64))], + 85, + 120 + )); + } + + // ----------------------------------------------------------------------- + // non-block-num filters (gate rejects → no pruning) // ----------------------------------------------------------------------- #[test] @@ -314,13 +308,13 @@ mod tests { } // ----------------------------------------------------------------------- - // mixed filters + // mixed filters — gate keeps block_num refs, drops the rest // ----------------------------------------------------------------------- #[test] fn mixed_filters_prune_on_block_num() { - // Two separate filters: _block_num > 100 AND gas_used > 0 - // The _block_num filter should still cause pruning for [0, 50] + // _block_num > 100 passes gate, gas_used > 0 is dropped by gate. + // The surviving _block_num filter prunes [0, 50]. let filters = vec![ col("_block_num").gt(lit(100u64)), col("gas_used").gt(lit(0u64)), diff --git a/crates/core/common/src/catalog/physical/snapshot.rs b/crates/core/common/src/catalog/physical/snapshot.rs index da019226b..12bd05aae 100644 --- a/crates/core/common/src/catalog/physical/snapshot.rs +++ b/crates/core/common/src/catalog/physical/snapshot.rs @@ -21,7 +21,7 @@ use datafusion::{ use datafusion_datasource::compute_all_files_statistics; use futures::{Stream, StreamExt as _}; -use super::block_num_filter::{filters_maybe_satisfiable_for_range, is_block_num_filter}; +use super::block_num_filter::{filters_maybe_satisfiable_for_range, references_block_num}; use crate::{ BlockRange, catalog::logical::LogicalTable, @@ -280,7 +280,7 @@ impl TableProvider for QueryableSnapshot { Ok(filters .iter() .map(|f| { - if is_block_num_filter(f) { + if references_block_num(f) { TableProviderFilterPushDown::Inexact } else { TableProviderFilterPushDown::Unsupported