Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Determine ordering of file groups #9593

Merged
merged 52 commits into from
May 1, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
7587a07
add statistics to PartitionedFile
suremarc Nov 3, 2023
1e380b2
just dump work for now
suremarc Nov 3, 2023
263453f
working test case
suremarc Nov 3, 2023
5634bd7
fix jumbled rebase
suremarc Mar 13, 2024
7428fe0
forgot to annotate #[test]
suremarc Mar 13, 2024
4816343
more refactoring
suremarc Mar 13, 2024
c7be9e0
add a link
suremarc Mar 13, 2024
fc1a668
refactor again
suremarc Mar 13, 2024
1c42e00
whitespace
suremarc Mar 13, 2024
3446fed
format debug log
suremarc Mar 13, 2024
3fe8558
remove useless itertools
suremarc Mar 13, 2024
8ba4001
refactor test
suremarc Mar 13, 2024
9c8729a
fix bug
suremarc Mar 15, 2024
6df9832
use sort_file_groups in ListingTable
suremarc Mar 15, 2024
f855a8a
move check into a better place
suremarc Mar 15, 2024
3e5263b
refactor test a bit
suremarc Mar 15, 2024
5b7b307
more testing
suremarc Mar 15, 2024
4761096
more testing
suremarc Mar 15, 2024
a95dffa
better error message
suremarc Mar 15, 2024
1a66604
fix log msg
suremarc Mar 15, 2024
cca5f0f
fix again
suremarc Mar 15, 2024
e6e10e8
Merge remote-tracking branch 'origin/main' into statistics-planning
suremarc Mar 21, 2024
8f7a2d7
add sqllogictest and fixes
suremarc Mar 21, 2024
e9fad54
fix test
suremarc Mar 21, 2024
e982f0f
Update datafusion/core/src/datasource/listing/mod.rs
suremarc Mar 30, 2024
cc9f144
Update datafusion/core/src/datasource/physical_plan/file_scan_config.rs
suremarc Mar 30, 2024
95bb790
more unit tests
suremarc Mar 31, 2024
0e60230
rename to split_groups_by_statistics
suremarc Mar 31, 2024
9f375e8
only use groups if there's <= target_partitions
suremarc Mar 31, 2024
3d9d293
refactor a bit, no need for projected_schema
suremarc Mar 31, 2024
1366c99
fix reverse order
suremarc Mar 31, 2024
a29be69
Merge remote-tracking branch 'origin/main' into statistics-planning
suremarc Apr 9, 2024
2ef8006
save work for now
suremarc Apr 9, 2024
b112c26
Merge branch 'main' into statistics-planning
suremarc Apr 24, 2024
0153acf
lots of test cases in new slt
suremarc Apr 25, 2024
4e03528
remove output check
suremarc Apr 25, 2024
695e674
fix
suremarc Apr 25, 2024
ec4282b
fix last test
suremarc Apr 25, 2024
1030b30
comment on params
suremarc Apr 25, 2024
2f34684
clippy
suremarc Apr 25, 2024
24c0bc5
revert parquet.slt
suremarc Apr 25, 2024
61f883f
no need to pass projection separately
suremarc Apr 26, 2024
9bc29cf
Update datafusion/core/src/datasource/listing/mod.rs
suremarc Apr 30, 2024
aa89433
update comment on in
suremarc Apr 30, 2024
d7fc78a
fix test?
suremarc Apr 30, 2024
f3a69e5
un-fix?
suremarc Apr 30, 2024
1a010b7
add fix back in?
suremarc Apr 30, 2024
f41d1c9
move indices_sorted_by_min to MinMaxStatistics
suremarc May 1, 2024
15e1339
move MinMaxStatistics to its own module
suremarc May 1, 2024
a2c9b4e
fix license
suremarc May 1, 2024
d7c9af6
add feature flag
suremarc May 1, 2024
82166fd
update config
suremarc May 1, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -643,17 +643,20 @@ impl TableProvider for ListingTable {
}

let output_ordering = self.try_create_output_ordering()?;
if let Some(new_groups) = output_ordering.first().and_then(|output_ordering| {
match output_ordering.first().map(|output_ordering| {
FileScanConfig::sort_file_groups(
&self.table_schema,
&projected_schema,
&partitioned_file_lists,
output_ordering,
)
.ok()
}) {
partitioned_file_lists = new_groups;
}
Some(Err(e)) => log::debug!("failed to sort file groups: {e}"),
Some(Ok(new_groups)) => {
partitioned_file_lists = new_groups;
}
None => {} // no ordering required
};

// extract types of partition columns
let table_partition_cols = self
Expand Down Expand Up @@ -838,10 +841,11 @@ impl ListingTable {
// collect the statistics if required by the config
let files = file_list
.map(|part_file| async {
let part_file = part_file?;
let mut part_file = part_file?;
if self.options.collect_stat {
let statistics =
self.do_collect_statistics(ctx, &store, &part_file).await?;
part_file.statistics = Some(statistics.clone());
Ok((part_file, statistics)) as Result<(PartitionedFile, Statistics)>
} else {
Ok((part_file, Statistics::new_unknown(&self.file_schema)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,8 @@ impl FileScanConfig {
table_schema,
projected_schema,
flattened_files.iter().copied(),
)?;
)
.map_err(|e| e.context("construct min/max statistics"))?;
suremarc marked this conversation as resolved.
Show resolved Hide resolved

let indices_sorted_by_min = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we could move this into statistics itself somehing like

        let indices_sorted_by_min = statistics.indices_sorted_by_min()

let mut sort: Vec<_> = statistics.min.iter().enumerate().collect();
Expand Down
37 changes: 28 additions & 9 deletions datafusion/core/src/datasource/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -564,25 +564,42 @@ impl MinMaxStatistics {
.fields()
.iter()
.map(|field| {
let (min, max) = get_min_max(table_schema.index_of(field.name())?);
let (min, max) =
get_min_max(table_schema.index_of(field.name()).map_err(|e| {
DataFusionError::ArrowError(
e,
Some(format!("get min/max for field: '{}'", field.name())),
)
})?);
Ok((
ScalarValue::iter_to_array(min)?,
ScalarValue::iter_to_array(max)?,
))
})
.collect::<Result<Vec<_>>>()?
.collect::<Result<Vec<_>>>()
.map_err(|e| e.context("collect min/max values"))?
.into_iter()
.unzip();

Self::new(
sort_order,
RecordBatch::try_new(Arc::clone(projected_schema), min_values)?,
RecordBatch::try_new(Arc::clone(projected_schema), max_values)?,
table_schema,
RecordBatch::try_new(Arc::clone(projected_schema), min_values).map_err(
|e| {
DataFusionError::ArrowError(e, Some("\ncreate min batch".to_string()))
},
)?,
RecordBatch::try_new(Arc::clone(projected_schema), max_values).map_err(
|e| {
DataFusionError::ArrowError(e, Some("\ncreate max batch".to_string()))
},
)?,
)
}

fn new(
sort_order: &[PhysicalSortExpr],
table_schema: &SchemaRef,
min_values: RecordBatch,
max_values: RecordBatch,
) -> Result<Self> {
Expand All @@ -592,10 +609,11 @@ impl MinMaxStatistics {
.iter()
.map(|expr| {
expr.expr
.data_type(&min_values.schema())
.data_type(table_schema)
.map(|data_type| SortField::new_with_options(data_type, expr.options))
})
.collect::<Result<Vec<_>>>()?;
.collect::<Result<Vec<_>>>()
.map_err(|e| e.context("create sort fields"))?;
let converter = RowConverter::new(sort_fields)?;

let [min, max] = [min_values, max_values].map(|values| {
Expand Down Expand Up @@ -627,7 +645,8 @@ impl MinMaxStatistics {
options: Some(sort_expr.options),
})
})
.collect::<Result<Vec<_>>>()?;
.collect::<Result<Vec<_>>>()
.map_err(|e| e.context("create sorting columns"))?;
converter
.convert_columns(
&sorting_columns
Expand All @@ -641,8 +660,8 @@ impl MinMaxStatistics {
});

Ok(Self {
min: min?,
max: max?,
min: min.map_err(|e| e.context("build min rows"))?,
max: max.map_err(|e| e.context("build max rows"))?,
})
}

Expand Down
44 changes: 39 additions & 5 deletions datafusion/sqllogictest/test_files/parquet.slt
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@
statement ok
set datafusion.execution.target_partitions = 2;

# Collect statistics -- used for sorting files
statement ok
set datafusion.execution.collect_statistics = true;

# Create a table as a data source
statement ok
CREATE TABLE src_table (
Expand Down Expand Up @@ -132,8 +136,8 @@ STORED AS PARQUET;
----
3

# Check output plan again, expect no "output_ordering" clause in the physical_plan -> ParquetExec,
# due to there being more files than partitions:
# Check output plan again, expect an "output_ordering" clause in the physical_plan -> ParquetExec:
# After https://github.com/apache/arrow-datafusion/pull/9593 this should not require a sort.
query TT
EXPLAIN SELECT int_col, string_col
FROM test_table
Expand All @@ -144,9 +148,7 @@ Sort: test_table.string_col ASC NULLS LAST, test_table.int_col ASC NULLS LAST
--TableScan: test_table projection=[int_col, string_col]
physical_plan
SortPreservingMergeExec: [string_col@1 ASC NULLS LAST,int_col@0 ASC NULLS LAST]
--SortExec: expr=[string_col@1 ASC NULLS LAST,int_col@0 ASC NULLS LAST]
----ParquetExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_table/0.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_table/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_table/2.parquet]]}, projection=[int_col, string_col]

--ParquetExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_table/0.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_table/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_table/2.parquet]]}, projection=[int_col, string_col], output_ordering=[string_col@1 ASC NULLS LAST, int_col@0 ASC NULLS LAST]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So nice 🎉


# Perform queries using MIN and MAX
query I
Expand All @@ -169,6 +171,38 @@ SELECT min(date_col) FROM test_table;
----
1970-01-02

# Clean up
statement ok
DROP TABLE test_table;

# Do one more test, but order by numeric columns:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like to request moving the tests for this optimization to its own .slt file (e.g. sorted_parquet.slt or something). It is important enough and tricky enough I think to warrant specialized tests

# This is to exercise file group sorting, which uses file-level statistics
# DataFusion doesn't currently support string column statistics
statement ok
CREATE EXTERNAL TABLE test_table (
int_col INT NOT NULL,
string_col TEXT NOT NULL,
bigint_col BIGINT NOT NULL,
date_col DATE NOT NULL
)
STORED AS PARQUET
WITH HEADER ROW
WITH ORDER (int_col ASC NULLS LAST, bigint_col ASC NULLS LAST)
LOCATION 'test_files/scratch/parquet/test_table';

# Check output plan again, expect an "output_ordering" clause in the physical_plan -> ParquetExec:
# After https://github.com/apache/arrow-datafusion/pull/9593 this should not require a sort.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't fully understand this comment -- this is a new test added as part of that PR, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I guess the wording is confusing since it implies the test existed before this PR. Either way I'm going to make a new sqllogictest so I'll rewrite this comment

query TT
EXPLAIN SELECT int_col, bigint_col
FROM test_table
ORDER BY int_col, bigint_col;
----
logical_plan
Sort: test_table.int_col ASC NULLS LAST, test_table.bigint_col ASC NULLS LAST
--TableScan: test_table projection=[int_col, bigint_col]
physical_plan ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_table/0.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_table/1.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_table/2.parquet]]}, projection=[int_col, bigint_col], output_ordering=[int_col@0 ASC NULLS LAST, bigint_col@1 ASC NULLS LAST]


# Clean up
statement ok
DROP TABLE test_table;
Expand Down