diff --git a/datafusion/core/src/datasource/physical_plan/parquet.rs b/datafusion/core/src/datasource/physical_plan/parquet.rs index 90953e3f5df9a..4d7c4a8788885 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet.rs @@ -1874,6 +1874,100 @@ mod tests { assert_contains!(&explain, "projection=[c1]"); } + #[tokio::test] + async fn parquet_exec_metrics_with_multiple_predicates() { + // Test that metrics are correctly calculated when multiple predicates + // are pushed down (connected with AND). This ensures we don't double-count + // rows when multiple predicates filter the data sequentially. + + // Create a batch with two columns: c1 (string) and c2 (int32) + // Total: 10 rows + let c1: ArrayRef = Arc::new(StringArray::from(vec![ + Some("foo"), // 0 - passes c1 filter, fails c2 filter (5 <= 10) + Some("bar"), // 1 - fails c1 filter + Some("bar"), // 2 - fails c1 filter + Some("baz"), // 3 - passes both filters (20 > 10) + Some("foo"), // 4 - passes both filters (12 > 10) + Some("bar"), // 5 - fails c1 filter + Some("baz"), // 6 - passes both filters (25 > 10) + Some("foo"), // 7 - passes c1 filter, fails c2 filter (7 <= 10) + Some("bar"), // 8 - fails c1 filter + Some("qux"), // 9 - passes both filters (30 > 10) + ])); + + let c2: ArrayRef = Arc::new(Int32Array::from(vec![ + Some(5), + Some(15), + Some(8), + Some(20), + Some(12), + Some(9), + Some(25), + Some(7), + Some(18), + Some(30), + ])); + + let batch = create_batch(vec![("c1", c1), ("c2", c2)]); + + // Create filter: c1 != 'bar' AND c2 > 10 + // + // First predicate (c1 != 'bar'): + // - Rows passing: 0, 3, 4, 6, 7, 9 (6 rows) + // - Rows pruned: 1, 2, 5, 8 (4 rows) + // + // Second predicate (c2 > 10) on remaining 6 rows: + // - Rows passing: 3, 4, 6, 9 (4 rows with c2 = 20, 12, 25, 30) + // - Rows pruned: 0, 7 (2 rows with c2 = 5, 7) + // + // Expected final metrics: + // - pushdown_rows_matched: 4 (final result) + // - pushdown_rows_pruned: 4 + 2 = 6 (cumulative) + // - Total: 4 + 6 = 10 + + let filter = col("c1").not_eq(lit("bar")).and(col("c2").gt(lit(10))); + + let rt = RoundTrip::new() + .with_predicate(filter) + .with_pushdown_predicate() + .round_trip(vec![batch]) + .await; + + let metrics = rt.parquet_exec.metrics().unwrap(); + + // Verify the result rows + assert_snapshot!(batches_to_string(&rt.batches.unwrap()),@r###" + +-----+----+ + | c1 | c2 | + +-----+----+ + | baz | 20 | + | foo | 12 | + | baz | 25 | + | qux | 30 | + +-----+----+ + "###); + + // Verify metrics - this is the key test + let pushdown_rows_matched = get_value(&metrics, "pushdown_rows_matched"); + let pushdown_rows_pruned = get_value(&metrics, "pushdown_rows_pruned"); + + assert_eq!( + pushdown_rows_matched, 4, + "Expected 4 rows to pass both predicates" + ); + assert_eq!( + pushdown_rows_pruned, 6, + "Expected 6 rows to be pruned (4 by first predicate + 2 by second predicate)" + ); + + // The sum should equal the total number of rows + assert_eq!( + pushdown_rows_matched + pushdown_rows_pruned, + 10, + "matched + pruned should equal total rows" + ); + } + #[tokio::test] async fn parquet_exec_has_no_pruning_predicate_if_can_not_prune() { // batch1: c1(string) diff --git a/datafusion/datasource-parquet/src/row_filter.rs b/datafusion/datasource-parquet/src/row_filter.rs index 660b32f486120..de9fe181f84f1 100644 --- a/datafusion/datasource-parquet/src/row_filter.rs +++ b/datafusion/datasource-parquet/src/row_filter.rs @@ -470,14 +470,33 @@ pub fn build_row_filter( }); } + // To avoid double-counting metrics when multiple predicates are used: + // - All predicates should count rows_pruned (cumulative pruned rows) + // - Only the last predicate should count rows_matched (final result) + // This ensures: rows_matched + rows_pruned = total rows processed + let total_candidates = candidates.len(); + candidates .into_iter() - .map(|candidate| { + .enumerate() + .map(|(idx, candidate)| { + let is_last = idx == total_candidates - 1; + + // All predicates share the pruned counter (cumulative) + let predicate_rows_pruned = rows_pruned.clone(); + + // Only the last predicate tracks matched rows (final result) + let predicate_rows_matched = if is_last { + rows_matched.clone() + } else { + metrics::Count::new() + }; + DatafusionArrowPredicate::try_new( candidate, metadata, - rows_pruned.clone(), - rows_matched.clone(), + predicate_rows_pruned, + predicate_rows_matched, time.clone(), ) .map(|pred| Box::new(pred) as _)