Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

33 changes: 31 additions & 2 deletions datafusion/catalog-listing/src/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use datafusion_datasource::ListingTableUrl;
use datafusion_datasource::file_format::FileFormat;
use datafusion_execution::config::SessionConfig;
use datafusion_expr::SortExpr;
use datafusion_physical_expr::Partitioning;
use futures::StreamExt;
use futures::TryStreamExt;
use itertools::Itertools;
Expand All @@ -42,8 +43,10 @@ pub struct ListingOptions {
/// This can add a lot of overhead as it will usually require files
/// to be opened and at least partially parsed.
pub collect_stat: bool,
/// Group files to avoid that the number of partitions exceeds
/// this limit
/// Group files to avoid that the number of partitions exceeds this limit.
///
/// If [`Self::output_partitioning`] is set, its partition count is used
/// instead, even when it exceeds this value.
pub target_partitions: usize,
/// Optional pre-known sort order(s). Must be `SortExpr`s.
///
Expand All @@ -61,6 +64,17 @@ pub struct ListingOptions {
/// multiple equivalent orderings, the outer `Vec` will have a
/// single element.
pub file_sort_order: Vec<Vec<SortExpr>>,
/// Optional declared output partitioning for this table.
///
/// Expressions are specified against the full table schema. When set,
/// [`ListingTable`](crate::ListingTable) creates one scan file group per
/// declared output partition instead of using [`Self::target_partitions`].
/// Empty file groups are added when needed to preserve that count.
///
/// Files are sorted by path before grouping. DataFusion does not validate
/// that rows match the declaration, so callers must ensure file group `i`
/// contains only rows for declared output partition `i`.
pub output_partitioning: Option<Partitioning>,
}

impl ListingOptions {
Expand All @@ -78,6 +92,7 @@ impl ListingOptions {
collect_stat: false,
target_partitions: 1,
file_sort_order: vec![],
output_partitioning: None,
}
}

Expand Down Expand Up @@ -136,6 +151,18 @@ impl ListingOptions {
self
}

/// Set declared output partitioning on [`ListingOptions`] and returns self.
///
/// See [`Self::output_partitioning`]. Empty file groups are added when
/// needed to preserve the declared partition count.
pub fn with_output_partitioning(
mut self,
output_partitioning: Option<Partitioning>,
) -> Self {
self.output_partitioning = output_partitioning;
self
}

/// Set `table partition columns` on [`ListingOptions`] and returns self.
///
/// "partition columns," used to support [Hive Partitioning], are
Expand Down Expand Up @@ -224,6 +251,8 @@ impl ListingOptions {

/// Set number of target partitions on [`ListingOptions`] and returns self.
///
/// This controls file grouping when no explicit output partitioning is set.
///
/// ```
/// # use std::sync::Arc;
/// # use datafusion_catalog_listing::ListingOptions;
Expand Down
117 changes: 87 additions & 30 deletions datafusion/catalog-listing/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ use datafusion_physical_expr::create_lex_ordering;
use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory;
use datafusion_physical_expr_common::sort_expr::LexOrdering;
use datafusion_physical_plan::ExecutionPlan;
use datafusion_physical_plan::Partitioning;
use datafusion_physical_plan::empty::EmptyExec;
use futures::{Stream, StreamExt, TryStreamExt, future, stream};
use object_store::ObjectStore;
Expand Down Expand Up @@ -505,12 +506,31 @@ impl TableProvider for ListingTable {
// at the same time. This is because the limit should be applied after the filters are applied.
let statistic_file_limit = if filters.is_empty() { limit } else { None };

let declared_output_partitioning = if partition_filters.is_empty() {
self.options.output_partitioning.clone()
} else {
// Partition pruning can remove files before grouping. Without a
// stable file-to-declared-partition mapping, regrouping the
// remaining files could shift them into the wrong partition index.
None
};
let target_partitions = declared_output_partitioning
.as_ref()
.map(Partitioning::partition_count)
.unwrap_or(self.options.target_partitions);

let ListFilesResult {
file_groups: mut partitioned_file_lists,
statistics,
grouped_by_partition: partitioned_by_file_group,
} = self
.list_files_for_scan(state, &partition_filters, statistic_file_limit)
.list_files_for_scan_with_target(
state,
&partition_filters,
statistic_file_limit,
target_partitions,
declared_output_partitioning.is_some(),
)
.await?;

// if no files need to be read, return an `EmptyExec`
Expand All @@ -523,25 +543,27 @@ impl TableProvider for ListingTable {
state.execution_props(),
&partitioned_file_lists,
)?;
match state
.config_options()
.execution
.split_file_groups_by_statistics
let split_file_groups_by_statistics = declared_output_partitioning.is_none()
&& state
.config_options()
.execution
.split_file_groups_by_statistics;
match split_file_groups_by_statistics
.then(|| {
output_ordering.first().map(|output_ordering| {
FileScanConfig::split_groups_by_statistics_with_target_partitions(
&self.table_schema,
&partitioned_file_lists,
output_ordering,
self.options.target_partitions,
target_partitions,
)
})
})
.flatten()
{
Some(Err(e)) => log::debug!("failed to split file groups by statistics: {e}"),
Some(Ok(new_groups)) => {
if new_groups.len() <= self.options.target_partitions {
if new_groups.len() <= target_partitions {
partitioned_file_lists = new_groups;
} else {
log::debug!(
Expand All @@ -561,24 +583,27 @@ impl TableProvider for ListingTable {
};

let file_source = self.create_file_source();
let mut scan_config_builder =
FileScanConfigBuilder::new(object_store_url, file_source)
.with_file_groups(partitioned_file_lists)
.with_constraints(self.constraints.clone())
.with_statistics(statistics)
.with_projection_indices(projection)?
.with_limit(limit)
.with_output_ordering(output_ordering)
.with_output_partitioning(declared_output_partitioning)
.with_expr_adapter(self.expr_adapter_factory.clone());
if partitioned_by_file_group {
scan_config_builder =
scan_config_builder.with_partitioned_by_file_group(true);
}
let scan_config = scan_config_builder.build();

// create the execution plan
let plan = self
.options
.format
.create_physical_plan(
state,
FileScanConfigBuilder::new(object_store_url, file_source)
.with_file_groups(partitioned_file_lists)
.with_constraints(self.constraints.clone())
.with_statistics(statistics)
.with_projection_indices(projection)?
.with_limit(limit)
.with_output_ordering(output_ordering)
.with_expr_adapter(self.expr_adapter_factory.clone())
.with_partitioned_by_file_group(partitioned_by_file_group)
.build(),
)
.create_physical_plan(state, scan_config)
.await?;

Ok(ScanResult::new(plan))
Expand Down Expand Up @@ -690,12 +715,45 @@ impl ListingTable {
/// Get the list of files for a scan as well as the file level statistics.
/// The list is grouped to let the execution plan know how the files should
/// be distributed to different threads / executors.
///
/// If [`ListingOptions::output_partitioning`] is set, the returned file
/// groups preserve that declared partition count, including empty trailing
/// groups when needed, rather than using
/// [`ListingOptions::target_partitions`].
pub async fn list_files_for_scan<'a>(
&'a self,
ctx: &'a dyn Session,
filters: &'a [Expr],
limit: Option<usize>,
) -> datafusion_common::Result<ListFilesResult> {
let declared_output_partitioning = self.options.output_partitioning.as_ref();
let target_partitions = declared_output_partitioning
.map(Partitioning::partition_count)
.unwrap_or(self.options.target_partitions);
self.list_files_for_scan_with_target(
ctx,
filters,
limit,
target_partitions,
declared_output_partitioning.is_some(),
)
.await
}

async fn list_files_for_scan_with_target<'a>(
&'a self,
ctx: &'a dyn Session,
filters: &'a [Expr],
limit: Option<usize>,
target_partitions: usize,
preserve_partition_count: bool,
) -> datafusion_common::Result<ListFilesResult> {
if target_partitions == 0 {
return plan_err!(
"ListingTable requires target_partitions to be greater than zero"
);
}

let store = if let Some(url) = self.table_paths.first() {
ctx.runtime_env().object_store(url)?
} else {
Expand Down Expand Up @@ -747,27 +805,26 @@ impl ListingTable {
// hash repartitioning for aggregates and joins on partition columns.
let threshold = ctx.config_options().optimizer.preserve_file_partitions;

let (file_groups, grouped_by_partition) = if threshold > 0
&& !self.options.table_partition_cols.is_empty()
{
let grouped =
file_group.group_by_partition_values(self.options.target_partitions);
let (mut file_groups, grouped_by_partition) = if preserve_partition_count {
(file_group.split_files(target_partitions), false)
} else if threshold > 0 && !self.options.table_partition_cols.is_empty() {
let grouped = file_group.group_by_partition_values(target_partitions);
if grouped.len() >= threshold {
(grouped, true)
} else {
let all_files: Vec<_> =
grouped.into_iter().flat_map(|g| g.into_inner()).collect();
(
FileGroup::new(all_files).split_files(self.options.target_partitions),
FileGroup::new(all_files).split_files(target_partitions),
false,
)
}
} else {
(
file_group.split_files(self.options.target_partitions),
false,
)
(file_group.split_files(target_partitions), false)
};
if preserve_partition_count && !file_groups.is_empty() {
file_groups.resize_with(target_partitions, || FileGroup::new(vec![]));
}

let (file_groups, stats) = compute_all_files_statistics(
file_groups,
Expand Down
80 changes: 78 additions & 2 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,10 +141,12 @@ mod tests {
use datafusion_expr::dml::InsertOp;
use datafusion_expr::{BinaryExpr, LogicalPlanBuilder, Operator};
use datafusion_physical_expr::PhysicalSortExpr;
use datafusion_physical_expr::expressions::binary;
use datafusion_physical_expr::expressions::{Column, binary};
use datafusion_physical_expr_common::sort_expr::LexOrdering;
use datafusion_physical_plan::empty::EmptyExec;
use datafusion_physical_plan::{ExecutionPlanProperties, collect};
use datafusion_physical_plan::{
ExecutionPlanProperties, Partitioning, RangePartitioning, SplitPoint, collect,
};
use std::collections::HashMap;
use std::io::Write;
use std::sync::Arc;
Expand Down Expand Up @@ -1286,6 +1288,80 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn test_list_files_uses_declared_output_partitioning_count() -> Result<()> {
let files = ["bucket/key-prefix/file0", "bucket/key-prefix/file1"];

let ctx = SessionContext::new();
register_test_store(&ctx, &files.iter().map(|f| (*f, 10)).collect::<Vec<_>>());

let opt = ListingOptions::new(Arc::new(JsonFormat::default()))
.with_file_extension_opt(Some(""))
.with_target_partitions(1)
.with_output_partitioning(Some(Partitioning::RoundRobinBatch(4)));

let table_path = ListingTableUrl::parse("test:///bucket/key-prefix/")?;
let schema =
Arc::new(Schema::new(vec![Field::new("a", DataType::Boolean, false)]));
let config = ListingTableConfig::new(table_path)
.with_listing_options(opt)
.with_schema(schema);
let table = ListingTable::try_new(config)?;

let result = table.list_files_for_scan(&ctx.state(), &[], None).await?;
let group_sizes = result
.file_groups
.iter()
.map(|group| group.len())
.collect::<Vec<_>>();

assert_eq!(group_sizes, vec![1, 1, 0, 0]);

Ok(())
}

#[tokio::test]
async fn test_partition_filter_drops_declared_output_partitioning() -> Result<()> {
let files = ["bucket/test/pid=1/file1", "bucket/test/pid=2/file2"];

let ctx = SessionContext::new();
register_test_store(&ctx, &files.iter().map(|f| (*f, 10)).collect::<Vec<_>>());

let output_partitioning = Partitioning::Range(RangePartitioning::try_new(
LexOrdering::new(vec![PhysicalSortExpr::new(
Arc::new(Column::new("pid", 1)),
SortOptions::default(),
)])
.unwrap(),
vec![SplitPoint::new(vec![ScalarValue::Int32(Some(2))])],
)?);

let opt = ListingOptions::new(Arc::new(JsonFormat::default()))
.with_file_extension_opt(Some(""))
.with_table_partition_cols(vec![("pid".to_string(), DataType::Int32)])
.with_output_partitioning(Some(output_partitioning.clone()));

let table_path = ListingTableUrl::parse("test:///bucket/test/")?;
let schema =
Arc::new(Schema::new(vec![Field::new("a", DataType::Boolean, false)]));
let config = ListingTableConfig::new(table_path)
.with_listing_options(opt)
.with_schema(schema);
let table = ListingTable::try_new(config)?;

let unfiltered = table.scan(&ctx.state(), None, &[], None).await?;
assert_eq!(unfiltered.output_partitioning(), &output_partitioning);

let filter = Expr::eq(col("pid"), lit(2_i32));
let filtered = table.scan(&ctx.state(), None, &[filter], None).await?;
assert!(matches!(
filtered.output_partitioning(),
Partitioning::UnknownPartitioning(1)
));

Ok(())
}

#[tokio::test]
async fn test_listing_table_prunes_extra_files_in_hive() -> Result<()> {
let files = [
Expand Down
Loading
Loading