Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions datafusion/core/tests/sql/explain_analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -872,6 +872,7 @@ async fn parquet_explain_analyze() {
&formatted,
"row_groups_pruned_statistics=1 total \u{2192} 1 matched"
);
assert_contains!(&formatted, "scan_efficiency_ratio=14% (259/1851)");

// The order of metrics is expected to be the same as the actual pruning order
// (file-> row-group -> page)
Expand Down
15 changes: 14 additions & 1 deletion datafusion/datasource-parquet/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
// under the License.

use datafusion_physical_plan::metrics::{
Count, ExecutionPlanMetricsSet, MetricBuilder, MetricType, PruningMetrics, Time,
Count, ExecutionPlanMetricsSet, MetricBuilder, MetricType, PruningMetrics,
RatioMergeStrategy, RatioMetrics, Time,
};

/// Stores metrics about the parquet execution for a particular parquet file.
Expand Down Expand Up @@ -66,6 +67,8 @@ pub struct ParquetFileMetrics {
pub page_index_eval_time: Time,
/// Total time spent reading and parsing metadata from the footer
pub metadata_load_time: Time,
/// Scan Efficiency Ratio, calculated as bytes_scanned / total_file_size
pub scan_efficiency_ratio: RatioMetrics,
/// Predicate Cache: number of records read directly from the inner reader.
/// This is the number of rows decoded while evaluating predicates
pub predicate_cache_inner_records: Count,
Expand Down Expand Up @@ -114,6 +117,15 @@ impl ParquetFileMetrics {
.with_type(MetricType::SUMMARY)
.pruning_metrics("files_ranges_pruned_statistics", partition);

let scan_efficiency_ratio = MetricBuilder::new(metrics)
.with_new_label("filename", filename.to_string())
.with_type(MetricType::SUMMARY)
.ratio_metrics_with_strategy(
"scan_efficiency_ratio",
partition,
RatioMergeStrategy::AddPartSetTotal,
);

// -----------------------
// 'dev' level metrics
// -----------------------
Expand Down Expand Up @@ -164,6 +176,7 @@ impl ParquetFileMetrics {
bloom_filter_eval_time,
page_index_eval_time,
metadata_load_time,
scan_efficiency_ratio,
predicate_cache_inner_records,
predicate_cache_records,
}
Expand Down
26 changes: 26 additions & 0 deletions datafusion/datasource-parquet/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ impl DefaultParquetFileReaderFactory {
pub struct ParquetFileReader {
pub file_metrics: ParquetFileMetrics,
pub inner: ParquetObjectReader,
pub partitioned_file: PartitionedFile,
Comment on lines 97 to +100
Copy link
Contributor Author

@petern48 petern48 Nov 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wasn't sure, would this technically be considered a breaking change? This struct is only constructed manually throughout the crate (no constructor).

ParquetFileMetrics is in this ParquetFileReader struct, but ParquetFileReader didn't have a way to access the total file size, since the file_size field in ParquetObjectReader was private (and inside of arrow-rs. This is the change that made the most sense to me, since CachedParquetFileReader already has this as a field.

pub struct CachedParquetFileReader {
pub file_metrics: ParquetFileMetrics,
store: Arc<dyn ObjectStore>,
pub inner: ParquetObjectReader,
partitioned_file: PartitionedFile,

Alternatively, maybe I could add a file_size() getter to ParquetObjectReader in arrow-rs?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so, but here I think a breaking change is acceptable

}

impl AsyncFileReader for ParquetFileReader {
Expand Down Expand Up @@ -129,6 +130,18 @@ impl AsyncFileReader for ParquetFileReader {
}
}

impl Drop for ParquetFileReader {
fn drop(&mut self) {
self.file_metrics
.scan_efficiency_ratio
.add_part(self.file_metrics.bytes_scanned.value());
// Multiple ParquetFileReaders may run, so we set_total to avoid adding the total multiple times
self.file_metrics
.scan_efficiency_ratio
.set_total(self.partitioned_file.object_meta.size as usize);
}
}

impl ParquetFileReaderFactory for DefaultParquetFileReaderFactory {
fn create_reader(
&self,
Expand Down Expand Up @@ -156,6 +169,7 @@ impl ParquetFileReaderFactory for DefaultParquetFileReaderFactory {
Ok(Box::new(ParquetFileReader {
inner,
file_metrics,
partitioned_file,
}))
}
}
Expand Down Expand Up @@ -286,6 +300,18 @@ impl AsyncFileReader for CachedParquetFileReader {
}
}

impl Drop for CachedParquetFileReader {
fn drop(&mut self) {
self.file_metrics
.scan_efficiency_ratio
.add_part(self.file_metrics.bytes_scanned.value());
// Multiple ParquetFileReaders may run, so we set_total to avoid adding the total multiple times
self.file_metrics
.scan_efficiency_ratio
.set_total(self.partitioned_file.object_meta.size as usize);
}
}

/// Wrapper to implement [`FileMetadata`] for [`ParquetMetaData`].
pub struct CachedParquetMetaData(Arc<ParquetMetaData>);

Expand Down
16 changes: 14 additions & 2 deletions datafusion/datasource-parquet/src/row_group_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1533,6 +1533,7 @@ mod tests {
data: bytes::Bytes,
pruning_predicate: &PruningPredicate,
) -> Result<RowGroupAccessPlanFilter> {
use datafusion_datasource::PartitionedFile;
use object_store::{ObjectMeta, ObjectStore};

let object_meta = ObjectMeta {
Expand All @@ -1551,12 +1552,23 @@ mod tests {
let metrics = ExecutionPlanMetricsSet::new();
let file_metrics =
ParquetFileMetrics::new(0, object_meta.location.as_ref(), &metrics);
let inner = ParquetObjectReader::new(Arc::new(in_memory), object_meta.location)
.with_file_size(object_meta.size);
let inner =
ParquetObjectReader::new(Arc::new(in_memory), object_meta.location.clone())
.with_file_size(object_meta.size);

let partitioned_file = PartitionedFile {
object_meta,
partition_values: vec![],
range: None,
statistics: None,
extensions: None,
metadata_size_hint: None,
};

let reader = ParquetFileReader {
inner,
file_metrics: file_metrics.clone(),
partitioned_file,
};
let mut builder = ParquetRecordBatchStreamBuilder::new(reader).await.unwrap();

Expand Down
14 changes: 12 additions & 2 deletions datafusion/physical-plan/src/metrics/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
use std::{borrow::Cow, sync::Arc};

use crate::metrics::{
value::{PruningMetrics, RatioMetrics},
value::{PruningMetrics, RatioMergeStrategy, RatioMetrics},
MetricType,
};

Expand Down Expand Up @@ -275,7 +275,17 @@ impl<'a> MetricBuilder<'a> {
name: impl Into<Cow<'static, str>>,
partition: usize,
) -> RatioMetrics {
let ratio_metrics = RatioMetrics::new();
self.ratio_metrics_with_strategy(name, partition, RatioMergeStrategy::default())
}

/// Consumes self and creates a new [`RatioMetrics`] with a specific merge strategy
pub fn ratio_metrics_with_strategy(
self,
name: impl Into<Cow<'static, str>>,
partition: usize,
merge_strategy: RatioMergeStrategy,
) -> RatioMetrics {
let ratio_metrics = RatioMetrics::new().with_merge_strategy(merge_strategy);
self.with_partition(partition).build(MetricValue::Ratio {
name: name.into(),
ratio_metrics: ratio_metrics.clone(),
Comment on lines +282 to 291
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Originally, I tried builder.ratio_metrics().with_merge_strategy() in metrics.rs,

but found the merge strategy wasn't working since the ratio_metrics.clone() call here was registering the ratio_metrics before I added the merge strategy to it. Hence the need for this new ratio_metrics_with_strategy()

Expand Down
4 changes: 2 additions & 2 deletions datafusion/physical-plan/src/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ pub use baseline::{BaselineMetrics, RecordOutput, SpillMetrics, SplitMetrics};
pub use builder::MetricBuilder;
pub use custom::CustomMetricValue;
pub use value::{
Count, Gauge, MetricValue, PruningMetrics, RatioMetrics, ScopedTimerGuard, Time,
Timestamp,
Count, Gauge, MetricValue, PruningMetrics, RatioMergeStrategy, RatioMetrics,
ScopedTimerGuard, Time, Timestamp,
};

/// Something that tracks a value of interest (metric) of a DataFusion
Expand Down
117 changes: 111 additions & 6 deletions datafusion/physical-plan/src/metrics/value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,15 @@ impl PruningMetrics {
pub struct RatioMetrics {
part: Arc<AtomicUsize>,
total: Arc<AtomicUsize>,
merge_strategy: RatioMergeStrategy,
}

#[derive(Debug, Clone, Default)]
pub enum RatioMergeStrategy {
#[default]
AddPartAddTotal,
AddPartSetTotal,
SetPartAddTotal,
}

impl RatioMetrics {
Expand All @@ -445,9 +454,15 @@ impl RatioMetrics {
Self {
part: Arc::new(AtomicUsize::new(0)),
total: Arc::new(AtomicUsize::new(0)),
merge_strategy: RatioMergeStrategy::AddPartAddTotal,
}
}

pub fn with_merge_strategy(mut self, merge_strategy: RatioMergeStrategy) -> Self {
self.merge_strategy = merge_strategy;
self
}

/// Add `n` to the numerator (`part`) value
pub fn add_part(&self, n: usize) {
self.part.fetch_add(n, Ordering::Relaxed);
Expand All @@ -458,10 +473,32 @@ impl RatioMetrics {
self.total.fetch_add(n, Ordering::Relaxed);
}

/// Set the numerator (`part`) value to `n`, overwriting any existing value
pub fn set_part(&self, n: usize) {
self.part.store(n, Ordering::Relaxed);
}

/// Set the denominator (`total`) value to `n`, overwriting any existing value
pub fn set_total(&self, n: usize) {
self.total.store(n, Ordering::Relaxed);
}

/// Merge the value from `other` into `self`
pub fn merge(&self, other: &Self) {
self.add_part(other.part());
self.add_total(other.total());
match self.merge_strategy {
RatioMergeStrategy::AddPartAddTotal => {
self.add_part(other.part());
self.add_total(other.total());
}
RatioMergeStrategy::AddPartSetTotal => {
self.add_part(other.part());
self.set_total(other.total());
}
RatioMergeStrategy::SetPartAddTotal => {
self.set_part(other.part());
self.add_total(other.total());
}
}
}

/// Return the numerator (`part`) value
Expand Down Expand Up @@ -776,10 +813,17 @@ impl MetricValue {
name: name.clone(),
pruning_metrics: PruningMetrics::new(),
},
Self::Ratio { name, .. } => Self::Ratio {
name: name.clone(),
ratio_metrics: RatioMetrics::new(),
},
Self::Ratio {
name,
ratio_metrics,
} => {
let merge_strategy = ratio_metrics.merge_strategy.clone();
Self::Ratio {
name: name.clone(),
ratio_metrics: RatioMetrics::new()
.with_merge_strategy(merge_strategy),
}
}
Self::Custom { name, value } => Self::Custom {
name: name.clone(),
value: value.new_empty(),
Expand Down Expand Up @@ -1129,6 +1173,67 @@ mod tests {
assert_eq!("0.033% (1/3000)", tiny_ratio.to_string());
}

#[test]
fn test_ratio_set_methods() {
let ratio_metrics = RatioMetrics::new();

// Ensure set methods don't increment
ratio_metrics.set_part(10);
ratio_metrics.set_part(10);
ratio_metrics.set_total(40);
ratio_metrics.set_total(40);
assert_eq!("25% (10/40)", ratio_metrics.to_string());

let ratio_metrics = RatioMetrics::new();

// Calling set should change the value
ratio_metrics.set_part(10);
ratio_metrics.set_part(30);
ratio_metrics.set_total(40);
ratio_metrics.set_total(50);
assert_eq!("60% (30/50)", ratio_metrics.to_string());
}

#[test]
fn test_ratio_merge_strategy() {
// Test AddPartSetTotal strategy
let ratio_metrics1 =
RatioMetrics::new().with_merge_strategy(RatioMergeStrategy::AddPartSetTotal);

ratio_metrics1.set_part(10);
ratio_metrics1.set_total(40);
assert_eq!("25% (10/40)", ratio_metrics1.to_string());
let ratio_metrics2 =
RatioMetrics::new().with_merge_strategy(RatioMergeStrategy::AddPartSetTotal);
ratio_metrics2.set_part(20);
ratio_metrics2.set_total(40);
assert_eq!("50% (20/40)", ratio_metrics2.to_string());

ratio_metrics1.merge(&ratio_metrics2);
assert_eq!("75% (30/40)", ratio_metrics1.to_string());

// Test SetPartAddTotal strategy
let ratio_metrics1 =
RatioMetrics::new().with_merge_strategy(RatioMergeStrategy::SetPartAddTotal);
ratio_metrics1.set_part(20);
ratio_metrics1.set_total(50);
let ratio_metrics2 = RatioMetrics::new();
ratio_metrics2.set_part(20);
ratio_metrics2.set_total(50);
ratio_metrics1.merge(&ratio_metrics2);
assert_eq!("20% (20/100)", ratio_metrics1.to_string());

// Test AddPartAddTotal strategy (default)
let ratio_metrics1 = RatioMetrics::new();
ratio_metrics1.set_part(20);
ratio_metrics1.set_total(50);
let ratio_metrics2 = RatioMetrics::new();
ratio_metrics2.set_part(20);
ratio_metrics2.set_total(50);
ratio_metrics1.merge(&ratio_metrics2);
assert_eq!("40% (40/100)", ratio_metrics1.to_string());
}

#[test]
fn test_display_timestamp() {
let timestamp = Timestamp::new();
Expand Down