Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions datafusion/core/tests/parquet/filter_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -644,6 +644,22 @@ async fn predicate_cache_pushdown_default() -> datafusion_common::Result<()> {
.await
}

#[tokio::test]
async fn predicate_cache_stats_issue_19561() -> datafusion_common::Result<()> {
let mut config = SessionConfig::new();
config.options_mut().execution.parquet.pushdown_filters = true;
// force to get multiple batches to trigger repeated metric compound bug
config.options_mut().execution.batch_size = 1;
let ctx = SessionContext::new_with_config(config);
// The cache is on by default, and used when filter pushdown is enabled
PredicateCacheTest {
expected_inner_records: 8,
expected_records: 4,
}
.run(&ctx)
.await
}

#[tokio::test]
async fn predicate_cache_pushdown_default_selections_only()
-> datafusion_common::Result<()> {
Expand Down
15 changes: 10 additions & 5 deletions datafusion/datasource-parquet/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
// under the License.

use datafusion_physical_plan::metrics::{
Count, ExecutionPlanMetricsSet, MetricBuilder, MetricType, PruningMetrics,
Count, ExecutionPlanMetricsSet, Gauge, MetricBuilder, MetricType, PruningMetrics,
RatioMergeStrategy, RatioMetrics, Time,
};

Expand Down Expand Up @@ -77,11 +77,16 @@ pub struct ParquetFileMetrics {
/// Parquet.
///
/// This is the expensive path (IO + Decompression + Decoding).
pub predicate_cache_inner_records: Count,
///
/// We use a Gauge here as arrow-rs reports absolute numbers rather
/// than incremental readings, we want a `set` operation here rather
/// than `add`. Earlier it was `Count`, which led to this issue:
/// github.com/apache/datafusion/issues/19334
pub predicate_cache_inner_records: Gauge,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may be somewhat subtle why this is a gauge. Maybe we can add a comment explaining the context

Something like

Suggested change
pub predicate_cache_inner_records: Gauge,
///
/// Use a Gauge to avoid double counting inner counter values from parquet reader
pub predicate_cache_inner_records: Gauge,

(also below)

Copy link
Contributor Author

@feniljain feniljain Jan 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good catch, should have thought about that 🙈

I have tried to add a slightly more detailed comment, lemme know if that does not work 😅

/// Predicate Cache: number of records read from the cache. This is the
/// number of rows that were stored in the cache after evaluating predicates
/// reused for the output.
pub predicate_cache_records: Count,
pub predicate_cache_records: Gauge,
}

impl ParquetFileMetrics {
Expand Down Expand Up @@ -162,11 +167,11 @@ impl ParquetFileMetrics {

let predicate_cache_inner_records = MetricBuilder::new(metrics)
.with_new_label("filename", filename.to_string())
.counter("predicate_cache_inner_records", partition);
.gauge("predicate_cache_inner_records", partition);

let predicate_cache_records = MetricBuilder::new(metrics)
.with_new_label("filename", filename.to_string())
.counter("predicate_cache_records", partition);
.gauge("predicate_cache_records", partition);

Self {
files_ranges_pruned_statistics,
Expand Down
10 changes: 5 additions & 5 deletions datafusion/datasource-parquet/src/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ use datafusion_physical_expr_common::physical_expr::{
PhysicalExpr, is_dynamic_physical_expr,
};
use datafusion_physical_plan::metrics::{
Count, ExecutionPlanMetricsSet, MetricBuilder, PruningMetrics,
Count, ExecutionPlanMetricsSet, Gauge, MetricBuilder, PruningMetrics,
};
use datafusion_pruning::{FilePruner, PruningPredicate, build_pruning_predicate};

Expand Down Expand Up @@ -674,15 +674,15 @@ impl FileOpener for ParquetOpener {
/// arrow-rs parquet reader) to the parquet file metrics for DataFusion
fn copy_arrow_reader_metrics(
arrow_reader_metrics: &ArrowReaderMetrics,
predicate_cache_inner_records: &Count,
predicate_cache_records: &Count,
predicate_cache_inner_records: &Gauge,
predicate_cache_records: &Gauge,
) {
if let Some(v) = arrow_reader_metrics.records_read_from_inner() {
predicate_cache_inner_records.add(v);
predicate_cache_inner_records.set(v);
}

if let Some(v) = arrow_reader_metrics.records_read_from_cache() {
predicate_cache_records.add(v);
predicate_cache_records.set(v);
}
}

Expand Down