Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions datafusion/catalog-listing/src/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ pub struct ListingOptions {
/// Group files to avoid that the number of partitions exceeds
/// this limit
pub target_partitions: usize,
/// Preserve the partition column value boundaries when forming file groups.
/// When true, files that share the same partition column values are kept in
/// the same execution partition, allowing downstream operators to rely on
/// those columns being constant per partition.
pub preserve_partition_values: bool,
/// Optional pre-known sort order(s). Must be `SortExpr`s.
///
/// DataFusion may take advantage of this ordering to omit sorts
Expand Down Expand Up @@ -77,6 +82,7 @@ impl ListingOptions {
table_partition_cols: vec![],
collect_stat: false,
target_partitions: 1,
preserve_partition_values: false,
file_sort_order: vec![],
}
}
Expand All @@ -89,6 +95,12 @@ impl ListingOptions {
pub fn with_session_config_options(mut self, config: &SessionConfig) -> Self {
self = self.with_target_partitions(config.target_partitions());
self = self.with_collect_stat(config.collect_statistics());
if !self.table_partition_cols.is_empty() {
self.preserve_partition_values = config
.options()
.execution
.listing_table_preserve_partition_values;
}
self
}

Expand Down Expand Up @@ -239,6 +251,14 @@ impl ListingOptions {
self
}

/// Control whether files that share the same partition column values should
/// remain in the same execution partition. Defaults to `false`, but is
/// automatically enabled when partition columns are provided.
pub fn with_preserve_partition_values(mut self, preserve: bool) -> Self {
self.preserve_partition_values = preserve;
self
}

/// Set file sort order on [`ListingOptions`] and returns self.
///
/// ```
Expand Down
69 changes: 44 additions & 25 deletions datafusion/catalog-listing/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ use datafusion_physical_expr_common::sort_expr::LexOrdering;
use datafusion_physical_plan::empty::EmptyExec;
use datafusion_physical_plan::ExecutionPlan;
use futures::{future, stream, Stream, StreamExt, TryStreamExt};
use log::debug;
use object_store::ObjectStore;
use std::any::Any;
use std::collections::HashMap;
Expand Down Expand Up @@ -457,32 +458,42 @@ impl TableProvider for ListingTable {
}

let output_ordering = self.try_create_output_ordering(state.execution_props())?;
match state
.config_options()
.execution
.split_file_groups_by_statistics
.then(|| {
output_ordering.first().map(|output_ordering| {
FileScanConfig::split_groups_by_statistics_with_target_partitions(
&self.table_schema,
&partitioned_file_lists,
output_ordering,
self.options.target_partitions,
)
if !self.options.preserve_partition_values {
// Split file groups by statistics to optimize sorted scans
match state
.config_options()
.execution
.split_file_groups_by_statistics
.then(|| {
output_ordering.first().map(|output_ordering| {
FileScanConfig::split_groups_by_statistics_with_target_partitions(
&self.table_schema,
&partitioned_file_lists,
output_ordering,
self.options.target_partitions,
)
})
})
})
.flatten()
{
Some(Err(e)) => log::debug!("failed to split file groups by statistics: {e}"),
Some(Ok(new_groups)) => {
if new_groups.len() <= self.options.target_partitions {
partitioned_file_lists = new_groups;
} else {
log::debug!("attempted to split file groups by statistics, but there were more file groups than target_partitions; falling back to unordered")
.flatten()
{
Some(Err(e)) => {
log::debug!("failed to split file groups by statistics: {e}")
}
}
None => {} // no ordering required
};
Some(Ok(new_groups)) => {
if new_groups.len() <= self.options.target_partitions {
partitioned_file_lists = new_groups;
} else {
log::debug!("attempted to split file groups by statistics, but there were more file groups than target_partitions; falling back to unordered")
}
}
None => {} // no ordering required
};
} else {
debug!(
"Skipping statistics-based file splitting because preserve_partition_values is enabled. \
File groups will respect partition boundaries instead of sort statistics."
);
}

let Some(object_store_url) =
self.table_paths.first().map(ListingTableUrl::object_store)
Expand All @@ -508,6 +519,9 @@ impl TableProvider for ListingTable {
.with_limit(limit)
.with_output_ordering(output_ordering)
.with_expr_adapter(self.expr_adapter_factory.clone())
.with_preserve_partition_values(
self.options.preserve_partition_values,
)
.build(),
)
.await?;
Expand Down Expand Up @@ -653,7 +667,12 @@ impl ListingTable {
let (file_group, inexact_stats) =
get_files_with_limit(files, limit, self.options.collect_stat).await?;

let file_groups = file_group.split_files(self.options.target_partitions);
let target_partitions = self.options.target_partitions.max(1);
let file_groups = if self.options.preserve_partition_values {
file_group.split_by_partition_values()
} else {
file_group.split_files(target_partitions)
};
let (mut file_groups, mut stats) = compute_all_files_statistics(
file_groups,
self.schema(),
Expand Down
8 changes: 8 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -591,6 +591,14 @@ config_namespace! {
/// inferred and will be represented in the table schema).
pub listing_table_factory_infer_partitions: bool, default = true

/// Should a `ListingTable` created through the `ListingTableFactory` keep files that share
/// the same partition column values in the same execution partition when `PARTITIONED BY`
/// columns are declared. Defaults to false to avoid overhead on non-partitioned tables.
/// Optimization is best with moderate partition counts (10-100 partitions) and large partitions
/// (>500K rows). With many small partitions (500+), task scheduling overhead may outweigh shuffle
/// savings.
pub listing_table_preserve_partition_values: bool, default = false

/// Should DataFusion support recursive CTEs
pub enable_recursive_ctes: bool, default = true

Expand Down
5 changes: 5 additions & 0 deletions datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -271,3 +271,8 @@ name = "dataframe"
[[bench]]
harness = false
name = "spm"

[[bench]]
harness = false
name = "hive_partitioned"
required-features = ["parquet"]
191 changes: 191 additions & 0 deletions datafusion/core/benches/hive_partitioned.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use arrow::array::{ArrayRef, Int32Array};
use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;
use arrow::util::pretty::pretty_format_batches;
use criterion::{criterion_group, criterion_main, Criterion};
use datafusion::prelude::{ParquetReadOptions, SessionConfig, SessionContext};
use parquet::arrow::ArrowWriter;
use parquet::file::properties::WriterProperties;
use std::fs::{self, File};
use std::io::Write;
use std::path::Path;
use std::sync::Arc;
use tempfile::TempDir;
use tokio::runtime::Runtime;

/// Generate partitioned data for benchmarking
/// Creates `num_partitions` directories, each with one parquet file containing `rows_per_partition`.
fn generate_partitioned_data(
base_dir: &Path,
num_partitions: usize,
rows_per_partition: usize,
) {
let schema = Arc::new(Schema::new(vec![Field::new("val", DataType::Int32, false)]));

let props = WriterProperties::builder().build();

for i in 0..num_partitions {
let part_dir = base_dir.join(format!("part_col={i}"));
fs::create_dir_all(&part_dir).unwrap();
let file_path = part_dir.join("data.parquet");
let file = File::create(file_path).unwrap();

let mut writer =
ArrowWriter::try_new(file, schema.clone(), Some(props.clone())).unwrap();

// Generate data: just a simple sequence
let vals: Vec<i32> = (0..rows_per_partition).map(|x| x as i32).collect();
let batch = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(Int32Array::from(vals)) as ArrayRef],
)
.unwrap();

writer.write(&batch).unwrap();
writer.close().unwrap();
}
}

/// Save execution plans to file
async fn save_plans(table_path: &str, output_file: &Path) {
let query = "SELECT part_col, count(*), sum(val), avg(val) FROM t GROUP BY part_col";
let mut file = File::create(output_file).unwrap();

writeln!(file, "KeyPartitioned Aggregation Benchmark Plans\n").unwrap();
writeln!(file, "Query: {query}\n").unwrap();

// Optimized plan
let config_opt = SessionConfig::new().with_target_partitions(20).set_bool(
"datafusion.execution.listing_table_preserve_partition_values",
true,
);
let ctx_opt = SessionContext::new_with_config(config_opt);
let options = ParquetReadOptions {
table_partition_cols: vec![("part_col".to_string(), DataType::Int32)],
..Default::default()
};
ctx_opt
.register_parquet("t", table_path, options.clone())
.await
.unwrap();

let df_opt = ctx_opt.sql(query).await.unwrap();
let plan_opt = df_opt
.explain(false, false)
.unwrap()
.collect()
.await
.unwrap();
writeln!(file, "=== WITH KeyPartitioned Optimization ===").unwrap();
writeln!(file, "{}\n", pretty_format_batches(&plan_opt).unwrap()).unwrap();

// Unoptimized plan
let config_unopt = SessionConfig::new().with_target_partitions(20).set_bool(
"datafusion.execution.listing_table_preserve_partition_values",
false,
);
let ctx_unopt = SessionContext::new_with_config(config_unopt);
ctx_unopt
.register_parquet("t", table_path, options)
.await
.unwrap();

let df_unopt = ctx_unopt.sql(query).await.unwrap();
let plan_unopt = df_unopt
.explain(false, false)
.unwrap()
.collect()
.await
.unwrap();
writeln!(file, "=== WITHOUT KeyPartitioned Optimization ===").unwrap();
writeln!(file, "{}", pretty_format_batches(&plan_unopt).unwrap()).unwrap();
}

fn run_benchmark(c: &mut Criterion) {
// Benchmark KeyPartitioned optimization for aggregations on Hive-partitioned tables
// 20 partitions × 500K rows = 10M total rows
let partitions = 20;
let rows_per_partition = 500_000;
let tmp_dir = TempDir::new().unwrap();

generate_partitioned_data(tmp_dir.path(), partitions, rows_per_partition);
let table_path = tmp_dir.path().to_str().unwrap();

let rt = Runtime::new().unwrap();

// Save execution plans if SAVE_PLANS env var is set
if std::env::var("SAVE_PLANS").is_ok() {
let output_path = Path::new("hive_partitioned_plans.txt");
rt.block_on(save_plans(table_path, output_path));
println!("Execution plans saved to {}", output_path.display());
}

let query = "SELECT part_col, count(*), sum(val), avg(val) FROM t GROUP BY part_col";
let mut group = c.benchmark_group("hive_partitioned_agg");

group.bench_function("with_key_partitioned", |b| {
b.to_async(&rt).iter(|| async {
let config = SessionConfig::new().with_target_partitions(20).set_bool(
"datafusion.execution.listing_table_preserve_partition_values",
true,
);
let ctx = SessionContext::new_with_config(config);

let options = ParquetReadOptions {
table_partition_cols: vec![("part_col".to_string(), DataType::Int32)],
..Default::default()
};

ctx.register_parquet("t", table_path, options)
.await
.unwrap();

let df = ctx.sql(query).await.unwrap();
df.collect().await.unwrap();
})
});

group.bench_function("without_key_partitioned", |b| {
b.to_async(&rt).iter(|| async {
let config = SessionConfig::new().with_target_partitions(20).set_bool(
"datafusion.execution.listing_table_preserve_partition_values",
false,
);
let ctx = SessionContext::new_with_config(config);

let options = ParquetReadOptions {
table_partition_cols: vec![("part_col".to_string(), DataType::Int32)],
..Default::default()
};

ctx.register_parquet("t", table_path, options)
.await
.unwrap();

let df = ctx.sql(query).await.unwrap();
df.collect().await.unwrap();
})
});

group.finish();
}

criterion_group!(benches, run_benchmark);
criterion_main!(benches);
Loading