Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Determine ordering of file groups #9593

Merged
merged 52 commits into from
May 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
7587a07
add statistics to PartitionedFile
suremarc Nov 3, 2023
1e380b2
just dump work for now
suremarc Nov 3, 2023
263453f
working test case
suremarc Nov 3, 2023
5634bd7
fix jumbled rebase
suremarc Mar 13, 2024
7428fe0
forgot to annotate #[test]
suremarc Mar 13, 2024
4816343
more refactoring
suremarc Mar 13, 2024
c7be9e0
add a link
suremarc Mar 13, 2024
fc1a668
refactor again
suremarc Mar 13, 2024
1c42e00
whitespace
suremarc Mar 13, 2024
3446fed
format debug log
suremarc Mar 13, 2024
3fe8558
remove useless itertools
suremarc Mar 13, 2024
8ba4001
refactor test
suremarc Mar 13, 2024
9c8729a
fix bug
suremarc Mar 15, 2024
6df9832
use sort_file_groups in ListingTable
suremarc Mar 15, 2024
f855a8a
move check into a better place
suremarc Mar 15, 2024
3e5263b
refactor test a bit
suremarc Mar 15, 2024
5b7b307
more testing
suremarc Mar 15, 2024
4761096
more testing
suremarc Mar 15, 2024
a95dffa
better error message
suremarc Mar 15, 2024
1a66604
fix log msg
suremarc Mar 15, 2024
cca5f0f
fix again
suremarc Mar 15, 2024
e6e10e8
Merge remote-tracking branch 'origin/main' into statistics-planning
suremarc Mar 21, 2024
8f7a2d7
add sqllogictest and fixes
suremarc Mar 21, 2024
e9fad54
fix test
suremarc Mar 21, 2024
e982f0f
Update datafusion/core/src/datasource/listing/mod.rs
suremarc Mar 30, 2024
cc9f144
Update datafusion/core/src/datasource/physical_plan/file_scan_config.rs
suremarc Mar 30, 2024
95bb790
more unit tests
suremarc Mar 31, 2024
0e60230
rename to split_groups_by_statistics
suremarc Mar 31, 2024
9f375e8
only use groups if there's <= target_partitions
suremarc Mar 31, 2024
3d9d293
refactor a bit, no need for projected_schema
suremarc Mar 31, 2024
1366c99
fix reverse order
suremarc Mar 31, 2024
a29be69
Merge remote-tracking branch 'origin/main' into statistics-planning
suremarc Apr 9, 2024
2ef8006
save work for now
suremarc Apr 9, 2024
b112c26
Merge branch 'main' into statistics-planning
suremarc Apr 24, 2024
0153acf
lots of test cases in new slt
suremarc Apr 25, 2024
4e03528
remove output check
suremarc Apr 25, 2024
695e674
fix
suremarc Apr 25, 2024
ec4282b
fix last test
suremarc Apr 25, 2024
1030b30
comment on params
suremarc Apr 25, 2024
2f34684
clippy
suremarc Apr 25, 2024
24c0bc5
revert parquet.slt
suremarc Apr 25, 2024
61f883f
no need to pass projection separately
suremarc Apr 26, 2024
9bc29cf
Update datafusion/core/src/datasource/listing/mod.rs
suremarc Apr 30, 2024
aa89433
update comment on in
suremarc Apr 30, 2024
d7fc78a
fix test?
suremarc Apr 30, 2024
f3a69e5
un-fix?
suremarc Apr 30, 2024
1a010b7
add fix back in?
suremarc Apr 30, 2024
f41d1c9
move indices_sorted_by_min to MinMaxStatistics
suremarc May 1, 2024
15e1339
move MinMaxStatistics to its own module
suremarc May 1, 2024
a2c9b4e
fix license
suremarc May 1, 2024
d7c9af6
add feature flag
suremarc May 1, 2024
82166fd
update config
suremarc May 1, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,11 @@ config_namespace! {

/// Should DataFusion support recursive CTEs
pub enable_recursive_ctes: bool, default = true

/// Attempt to eliminate sorts by packing & sorting files with non-overlapping
/// statistics into the same file groups.
/// Currently experimental
pub split_file_groups_by_statistics: bool, default = false
}
}

Expand Down
1 change: 1 addition & 0 deletions datafusion/core/src/datasource/file_format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ pub(crate) mod test_util {
object_meta: meta,
partition_values: vec![],
range: None,
statistics: None,
extensions: None,
}]];

Expand Down
1 change: 1 addition & 0 deletions datafusion/core/src/datasource/listing/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,7 @@ pub async fn pruned_partition_list<'a>(
object_meta,
partition_values: partition_values.clone(),
range: None,
statistics: None,
extensions: None,
})
}));
Expand Down
12 changes: 10 additions & 2 deletions datafusion/core/src/datasource/listing/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ mod url;

use crate::error::Result;
use chrono::TimeZone;
use datafusion_common::ScalarValue;
use datafusion_common::{ScalarValue, Statistics};
use futures::Stream;
use object_store::{path::Path, ObjectMeta};
use std::pin::Pin;
Expand Down Expand Up @@ -67,6 +67,11 @@ pub struct PartitionedFile {
pub partition_values: Vec<ScalarValue>,
/// An optional file range for a more fine-grained parallel execution
pub range: Option<FileRange>,
/// Optional statistics that describe the data in this file if known.
///
/// DataFusion relies on these statistics for planning (in particular to sort file groups),
/// so if they are incorrect, incorrect answers may result.
pub statistics: Option<Statistics>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is actually a nice API to potentially provide pre-known statistics 👍

/// An optional field for user defined per object metadata
pub extensions: Option<Arc<dyn std::any::Any + Send + Sync>>,
}
Expand All @@ -83,6 +88,7 @@ impl PartitionedFile {
},
partition_values: vec![],
range: None,
statistics: None,
extensions: None,
}
}
Expand All @@ -98,7 +104,8 @@ impl PartitionedFile {
version: None,
},
partition_values: vec![],
range: None,
range: Some(FileRange { start, end }),
statistics: None,
extensions: None,
}
.with_range(start, end)
Expand Down Expand Up @@ -128,6 +135,7 @@ impl From<ObjectMeta> for PartitionedFile {
object_meta,
partition_values: vec![],
range: None,
statistics: None,
extensions: None,
}
}
Expand Down
39 changes: 34 additions & 5 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -739,16 +739,43 @@ impl TableProvider for ListingTable {
filters: &[Expr],
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
let (partitioned_file_lists, statistics) =
let (mut partitioned_file_lists, statistics) =
self.list_files_for_scan(state, filters, limit).await?;

let projected_schema = project_schema(&self.schema(), projection)?;

// if no files need to be read, return an `EmptyExec`
if partitioned_file_lists.is_empty() {
let schema = self.schema();
let projected_schema = project_schema(&schema, projection)?;
return Ok(Arc::new(EmptyExec::new(projected_schema)));
}

let output_ordering = self.try_create_output_ordering()?;
match state
.config_options()
.execution
.split_file_groups_by_statistics
.then(|| {
output_ordering.first().map(|output_ordering| {
FileScanConfig::split_groups_by_statistics(
&self.table_schema,
&partitioned_file_lists,
output_ordering,
)
})
})
.flatten()
{
Some(Err(e)) => log::debug!("failed to split file groups by statistics: {e}"),
Some(Ok(new_groups)) => {
if new_groups.len() <= self.options.target_partitions {
partitioned_file_lists = new_groups;
} else {
log::debug!("attempted to split file groups by statistics, but there were more file groups than target_partitions; falling back to unordered")
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

}
None => {} // no ordering required
};

// extract types of partition columns
let table_partition_cols = self
.options
Expand All @@ -772,6 +799,7 @@ impl TableProvider for ListingTable {
} else {
return Ok(Arc::new(EmptyExec::new(Arc::new(Schema::empty()))));
};

// create the execution plan
self.options
.format
Expand All @@ -784,7 +812,7 @@ impl TableProvider for ListingTable {
statistics,
projection: projection.cloned(),
limit,
output_ordering: self.try_create_output_ordering()?,
output_ordering,
table_partition_cols,
},
filters.as_ref(),
Expand Down Expand Up @@ -937,10 +965,11 @@ impl ListingTable {
// collect the statistics if required by the config
let files = file_list
.map(|part_file| async {
let part_file = part_file?;
let mut part_file = part_file?;
if self.options.collect_stat {
let statistics =
self.do_collect_statistics(ctx, &store, &part_file).await?;
part_file.statistics = Some(statistics.clone());
Ok((part_file, statistics)) as Result<(PartitionedFile, Statistics)>
} else {
Ok((part_file, Statistics::new_unknown(&self.file_schema)))
Expand Down
Loading