Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 94 additions & 0 deletions datafusion/core/src/datasource/physical_plan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1874,6 +1874,100 @@ mod tests {
assert_contains!(&explain, "projection=[c1]");
}

#[tokio::test]
async fn parquet_exec_metrics_with_multiple_predicates() {
// Test that metrics are correctly calculated when multiple predicates
// are pushed down (connected with AND). This ensures we don't double-count
// rows when multiple predicates filter the data sequentially.

// Create a batch with two columns: c1 (string) and c2 (int32)
// Total: 10 rows
let c1: ArrayRef = Arc::new(StringArray::from(vec![
Some("foo"), // 0 - passes c1 filter, fails c2 filter (5 <= 10)
Some("bar"), // 1 - fails c1 filter
Some("bar"), // 2 - fails c1 filter
Some("baz"), // 3 - passes both filters (20 > 10)
Some("foo"), // 4 - passes both filters (12 > 10)
Some("bar"), // 5 - fails c1 filter
Some("baz"), // 6 - passes both filters (25 > 10)
Some("foo"), // 7 - passes c1 filter, fails c2 filter (7 <= 10)
Some("bar"), // 8 - fails c1 filter
Some("qux"), // 9 - passes both filters (30 > 10)
]));

let c2: ArrayRef = Arc::new(Int32Array::from(vec![
Some(5),
Some(15),
Some(8),
Some(20),
Some(12),
Some(9),
Some(25),
Some(7),
Some(18),
Some(30),
]));

let batch = create_batch(vec![("c1", c1), ("c2", c2)]);

// Create filter: c1 != 'bar' AND c2 > 10
//
// First predicate (c1 != 'bar'):
// - Rows passing: 0, 3, 4, 6, 7, 9 (6 rows)
// - Rows pruned: 1, 2, 5, 8 (4 rows)
//
// Second predicate (c2 > 10) on remaining 6 rows:
// - Rows passing: 3, 4, 6, 9 (4 rows with c2 = 20, 12, 25, 30)
// - Rows pruned: 0, 7 (2 rows with c2 = 5, 7)
//
// Expected final metrics:
// - pushdown_rows_matched: 4 (final result)
// - pushdown_rows_pruned: 4 + 2 = 6 (cumulative)
// - Total: 4 + 6 = 10

let filter = col("c1").not_eq(lit("bar")).and(col("c2").gt(lit(10)));

let rt = RoundTrip::new()
.with_predicate(filter)
.with_pushdown_predicate()
.round_trip(vec![batch])
.await;

let metrics = rt.parquet_exec.metrics().unwrap();

// Verify the result rows
assert_snapshot!(batches_to_string(&rt.batches.unwrap()),@r###"
+-----+----+
| c1 | c2 |
+-----+----+
| baz | 20 |
| foo | 12 |
| baz | 25 |
| qux | 30 |
+-----+----+
"###);

// Verify metrics - this is the key test
let pushdown_rows_matched = get_value(&metrics, "pushdown_rows_matched");
let pushdown_rows_pruned = get_value(&metrics, "pushdown_rows_pruned");

assert_eq!(
pushdown_rows_matched, 4,
"Expected 4 rows to pass both predicates"
);
assert_eq!(
pushdown_rows_pruned, 6,
"Expected 6 rows to be pruned (4 by first predicate + 2 by second predicate)"
);

// The sum should equal the total number of rows
assert_eq!(
pushdown_rows_matched + pushdown_rows_pruned,
10,
"matched + pruned should equal total rows"
);
}

#[tokio::test]
async fn parquet_exec_has_no_pruning_predicate_if_can_not_prune() {
// batch1: c1(string)
Expand Down
25 changes: 22 additions & 3 deletions datafusion/datasource-parquet/src/row_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -470,14 +470,33 @@ pub fn build_row_filter(
});
}

// To avoid double-counting metrics when multiple predicates are used:
// - All predicates should count rows_pruned (cumulative pruned rows)
// - Only the last predicate should count rows_matched (final result)
// This ensures: rows_matched + rows_pruned = total rows processed
let total_candidates = candidates.len();

candidates
.into_iter()
.map(|candidate| {
.enumerate()
.map(|(idx, candidate)| {
let is_last = idx == total_candidates - 1;

// All predicates share the pruned counter (cumulative)
let predicate_rows_pruned = rows_pruned.clone();

// Only the last predicate tracks matched rows (final result)
let predicate_rows_matched = if is_last {
rows_matched.clone()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So rows_matched will only be right after the final round prune?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, multiple predicates are connected with AND

} else {
metrics::Count::new()
};

DatafusionArrowPredicate::try_new(
candidate,
metadata,
rows_pruned.clone(),
rows_matched.clone(),
predicate_rows_pruned,
predicate_rows_matched,
time.clone(),
)
.map(|pred| Box::new(pred) as _)
Expand Down