Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions datafusion-examples/examples/relation_planner/table_sample.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ use datafusion::{
},
physical_expr::EquivalenceProperties,
physical_plan::{
DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties,
DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, StatisticsContext,
metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, RecordOutput},
},
physical_planner::{DefaultPhysicalPlanner, ExtensionPlanner, PhysicalPlanner},
Expand Down Expand Up @@ -722,8 +722,12 @@ impl ExecutionPlan for SampleExec {
Some(self.metrics.clone_inner())
}

fn partition_statistics(&self, partition: Option<usize>) -> Result<Arc<Statistics>> {
let mut stats = Arc::unwrap_or_clone(self.input.partition_statistics(partition)?);
fn partition_statistics(
&self,
_partition: Option<usize>,
ctx: &StatisticsContext,
) -> Result<Arc<Statistics>> {
let mut stats = Arc::unwrap_or_clone(Arc::clone(&ctx.child_stats()[0]));
let ratio = self.upper_bound - self.lower_bound;

// Scale statistics by sampling ratio (inexact due to randomness)
Expand Down
9 changes: 6 additions & 3 deletions datafusion/core/src/datasource/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ mod tests {
use datafusion_datasource::file_format::FileFormat;
use datafusion_datasource::write::BatchSerializer;
use datafusion_expr::{col, lit};
use datafusion_physical_plan::{ExecutionPlan, collect};
use datafusion_physical_plan::{ExecutionPlan, collect, compute_statistics};

use arrow::array::{
Array, BooleanArray, Float64Array, Int32Array, RecordBatch, StringArray,
Expand Down Expand Up @@ -215,9 +215,12 @@ mod tests {
assert_eq!(tt_batches, 50 /* 100/2 */);

// test metadata
assert_eq!(exec.partition_statistics(None)?.num_rows, Precision::Absent);
assert_eq!(
exec.partition_statistics(None)?.total_byte_size,
compute_statistics(exec.as_ref(), None)?.num_rows,
Precision::Absent
);
assert_eq!(
compute_statistics(exec.as_ref(), None)?.total_byte_size,
Precision::Absent
);

Expand Down
9 changes: 6 additions & 3 deletions datafusion/core/src/datasource/file_format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ mod tests {
BatchDeserializer, DecoderDeserializer, DeserializerOutput,
};
use datafusion_datasource::file_format::FileFormat;
use datafusion_physical_plan::{ExecutionPlan, collect};
use datafusion_physical_plan::{ExecutionPlan, collect, compute_statistics};

use arrow::compute::concat_batches;
use arrow::datatypes::{DataType, Field};
Expand Down Expand Up @@ -117,9 +117,12 @@ mod tests {
assert_eq!(tt_batches, 6 /* 12/2 */);

// test metadata
assert_eq!(exec.partition_statistics(None)?.num_rows, Precision::Absent);
assert_eq!(
exec.partition_statistics(None)?.total_byte_size,
compute_statistics(exec.as_ref(), None)?.num_rows,
Precision::Absent
);
assert_eq!(
compute_statistics(exec.as_ref(), None)?.total_byte_size,
Precision::Absent
);

Expand Down
10 changes: 5 additions & 5 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ mod tests {
use datafusion_execution::runtime_env::RuntimeEnv;
use datafusion_expr::dml::InsertOp;
use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
use datafusion_physical_plan::{ExecutionPlan, collect};
use datafusion_physical_plan::{ExecutionPlan, collect, compute_statistics};

use crate::test_util::bounded_stream;
use arrow::array::{
Expand Down Expand Up @@ -715,12 +715,12 @@ mod tests {

// test metadata
assert_eq!(
exec.partition_statistics(None)?.num_rows,
compute_statistics(exec.as_ref(), None)?.num_rows,
Precision::Exact(8)
);
// TODO correct byte size: https://github.com/apache/datafusion/issues/14936
assert_eq!(
exec.partition_statistics(None)?.total_byte_size,
compute_statistics(exec.as_ref(), None)?.total_byte_size,
Precision::Absent,
);

Expand Down Expand Up @@ -764,11 +764,11 @@ mod tests {

// note: even if the limit is set, the executor rounds up to the batch size
assert_eq!(
exec.partition_statistics(None)?.num_rows,
compute_statistics(exec.as_ref(), None)?.num_rows,
Precision::Exact(8)
);
assert_eq!(
exec.partition_statistics(None)?.total_byte_size,
compute_statistics(exec.as_ref(), None)?.total_byte_size,
Precision::Absent,
);
let batches = collect(exec, task_ctx).await?;
Expand Down
20 changes: 11 additions & 9 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,9 @@ mod tests {
use datafusion_physical_expr::expressions::binary;
use datafusion_physical_expr_common::sort_expr::LexOrdering;
use datafusion_physical_plan::empty::EmptyExec;
use datafusion_physical_plan::{ExecutionPlanProperties, collect};
use datafusion_physical_plan::{
ExecutionPlanProperties, collect, compute_statistics,
};
use std::collections::HashMap;
use std::io::Write;
use std::sync::Arc;
Expand Down Expand Up @@ -247,11 +249,11 @@ mod tests {

// test metadata
assert_eq!(
exec.partition_statistics(None)?.num_rows,
compute_statistics(exec.as_ref(), None)?.num_rows,
Precision::Exact(8)
);
assert_eq!(
exec.partition_statistics(None)?.total_byte_size,
compute_statistics(exec.as_ref(), None)?.total_byte_size,
Precision::Absent,
);

Expand Down Expand Up @@ -1355,13 +1357,13 @@ mod tests {

let exec_default = table_default.scan(&state, None, &[], None).await?;
assert_eq!(
exec_default.partition_statistics(None)?.num_rows,
compute_statistics(exec_default.as_ref(), None)?.num_rows,
Precision::Absent
);

// TODO correct byte size: https://github.com/apache/datafusion/issues/14936
assert_eq!(
exec_default.partition_statistics(None)?.total_byte_size,
compute_statistics(exec_default.as_ref(), None)?.total_byte_size,
Precision::Absent
);

Expand All @@ -1376,11 +1378,11 @@ mod tests {

let exec_disabled = table_disabled.scan(&state, None, &[], None).await?;
assert_eq!(
exec_disabled.partition_statistics(None)?.num_rows,
compute_statistics(exec_disabled.as_ref(), None)?.num_rows,
Precision::Absent
);
assert_eq!(
exec_disabled.partition_statistics(None)?.total_byte_size,
compute_statistics(exec_disabled.as_ref(), None)?.total_byte_size,
Precision::Absent
);

Expand All @@ -1395,12 +1397,12 @@ mod tests {

let exec_enabled = table_enabled.scan(&state, None, &[], None).await?;
assert_eq!(
exec_enabled.partition_statistics(None)?.num_rows,
compute_statistics(exec_enabled.as_ref(), None)?.num_rows,
Precision::Exact(8)
);
// TODO correct byte size: https://github.com/apache/datafusion/issues/14936
assert_eq!(
exec_enabled.partition_statistics(None)?.total_byte_size,
compute_statistics(exec_enabled.as_ref(), None)?.total_byte_size,
Precision::Absent,
);

Expand Down
7 changes: 6 additions & 1 deletion datafusion/core/tests/custom_sources_cases/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use datafusion_common::stats::Precision;
use datafusion_common::tree_node::TreeNodeRecursion;
use datafusion_physical_expr::EquivalenceProperties;
use datafusion_physical_plan::PlanProperties;
use datafusion_physical_plan::StatisticsContext;
use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
use datafusion_physical_plan::placeholder_row::PlaceholderRowExec;

Expand Down Expand Up @@ -179,7 +180,11 @@ impl ExecutionPlan for CustomExecutionPlan {
Ok(Box::pin(TestCustomRecordBatchStream { nb_batch: 1 }))
}

fn partition_statistics(&self, partition: Option<usize>) -> Result<Arc<Statistics>> {
fn partition_statistics(
&self,
partition: Option<usize>,
_ctx: &StatisticsContext,
) -> Result<Arc<Statistics>> {
if partition.is_some() {
return Ok(Arc::new(Statistics::new_unknown(&self.schema())));
}
Expand Down
18 changes: 12 additions & 6 deletions datafusion/core/tests/custom_sources_cases/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ use datafusion_catalog::Session;
use datafusion_common::tree_node::TreeNodeRecursion;
use datafusion_common::{project_schema, stats::Precision};
use datafusion_physical_expr::EquivalenceProperties;
use datafusion_physical_plan::StatisticsContext;
use datafusion_physical_plan::compute_statistics;
use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};

use async_trait::async_trait;
Expand Down Expand Up @@ -174,7 +176,11 @@ impl ExecutionPlan for StatisticsValidation {
unimplemented!("This plan only serves for testing statistics")
}

fn partition_statistics(&self, partition: Option<usize>) -> Result<Arc<Statistics>> {
fn partition_statistics(
&self,
partition: Option<usize>,
_ctx: &StatisticsContext,
) -> Result<Arc<Statistics>> {
if partition.is_some() {
Ok(Arc::new(Statistics::new_unknown(&self.schema)))
} else {
Expand Down Expand Up @@ -247,7 +253,7 @@ async fn sql_basic() -> Result<()> {
let physical_plan = df.create_physical_plan().await.unwrap();

// the statistics should be those of the source
assert_eq!(stats, *physical_plan.partition_statistics(None)?);
assert_eq!(stats, *compute_statistics(physical_plan.as_ref(), None)?);

Ok(())
}
Expand All @@ -263,7 +269,7 @@ async fn sql_filter() -> Result<()> {
.unwrap();

let physical_plan = df.create_physical_plan().await.unwrap();
let stats = physical_plan.partition_statistics(None)?;
let stats = compute_statistics(physical_plan.as_ref(), None)?;
assert_eq!(stats.num_rows, Precision::Inexact(7));

Ok(())
Expand All @@ -278,7 +284,7 @@ async fn sql_limit() -> Result<()> {
let physical_plan = df.create_physical_plan().await.unwrap();
// when the limit is smaller than the original number of lines we mark the statistics as inexact
// and cap NDV at the new row count
let limit_stats = physical_plan.partition_statistics(None)?;
let limit_stats = compute_statistics(physical_plan.as_ref(), None)?;
assert_eq!(limit_stats.num_rows, Precision::Exact(5));
// c1: NDV=2 stays at 2 (already below limit of 5)
assert_eq!(
Expand All @@ -297,7 +303,7 @@ async fn sql_limit() -> Result<()> {
.unwrap();
let physical_plan = df.create_physical_plan().await.unwrap();
// when the limit is larger than the original number of lines, statistics remain unchanged
assert_eq!(stats, *physical_plan.partition_statistics(None)?);
assert_eq!(stats, *compute_statistics(physical_plan.as_ref(), None)?);

Ok(())
}
Expand All @@ -314,7 +320,7 @@ async fn sql_window() -> Result<()> {

let physical_plan = df.create_physical_plan().await.unwrap();

let result = physical_plan.partition_statistics(None)?;
let result = compute_statistics(physical_plan.as_ref(), None)?;

assert_eq!(stats.num_rows, result.num_rows);
let col_stats = &result.column_statistics;
Expand Down
25 changes: 17 additions & 8 deletions datafusion/core/tests/parquet/file_statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use datafusion_common::config::ConfigOptions;
use datafusion_physical_optimizer::PhysicalOptimizerRule;
use datafusion_physical_optimizer::filter_pushdown::FilterPushdown;
use datafusion_physical_plan::ExecutionPlan;
use datafusion_physical_plan::compute_statistics;
use datafusion_physical_plan::filter::FilterExec;
use tempfile::tempdir;

Expand All @@ -61,7 +62,7 @@ async fn check_stats_precision_with_filter_pushdown() {
// Scan without filter, stats are exact
let exec = table.scan(&state, None, &[], None).await.unwrap();
assert_eq!(
exec.partition_statistics(None).unwrap().num_rows,
compute_statistics(exec.as_ref(), None).unwrap().num_rows,
Precision::Exact(8),
"Stats without filter should be exact"
);
Expand Down Expand Up @@ -93,7 +94,9 @@ async fn check_stats_precision_with_filter_pushdown() {
);
// Scan with filter pushdown, stats are inexact
assert_eq!(
optimized_exec.partition_statistics(None).unwrap().num_rows,
compute_statistics(optimized_exec.as_ref(), None)
.unwrap()
.num_rows,
Precision::Inexact(8),
"Stats after filter pushdown should be inexact"
);
Expand Down Expand Up @@ -121,11 +124,13 @@ async fn load_table_stats_with_session_level_cache() {
let exec1 = table1.scan(&state1, None, &[], None).await.unwrap();

assert_eq!(
exec1.partition_statistics(None).unwrap().num_rows,
compute_statistics(exec1.as_ref(), None).unwrap().num_rows,
Precision::Exact(8)
);
assert_eq!(
exec1.partition_statistics(None).unwrap().total_byte_size,
compute_statistics(exec1.as_ref(), None)
.unwrap()
.total_byte_size,
// Byte size is absent because we cannot estimate the output size
// of the Arrow data since there are variable length columns.
Precision::Absent,
Expand All @@ -137,11 +142,13 @@ async fn load_table_stats_with_session_level_cache() {
assert_eq!(get_static_cache_size(&state2), 0);
let exec2 = table2.scan(&state2, None, &[], None).await.unwrap();
assert_eq!(
exec2.partition_statistics(None).unwrap().num_rows,
compute_statistics(exec2.as_ref(), None).unwrap().num_rows,
Precision::Exact(8)
);
assert_eq!(
exec2.partition_statistics(None).unwrap().total_byte_size,
compute_statistics(exec2.as_ref(), None)
.unwrap()
.total_byte_size,
// Absent because the data contains variable length columns
Precision::Absent,
);
Expand All @@ -152,11 +159,13 @@ async fn load_table_stats_with_session_level_cache() {
assert_eq!(get_static_cache_size(&state1), 1);
let exec3 = table1.scan(&state1, None, &[], None).await.unwrap();
assert_eq!(
exec3.partition_statistics(None).unwrap().num_rows,
compute_statistics(exec3.as_ref(), None).unwrap().num_rows,
Precision::Exact(8)
);
assert_eq!(
exec3.partition_statistics(None).unwrap().total_byte_size,
compute_statistics(exec3.as_ref(), None)
.unwrap()
.total_byte_size,
// Absent because the data contains variable length columns
Precision::Absent,
);
Expand Down
Loading
Loading