Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Determine ordering of file groups #9593

Merged
merged 52 commits into from
May 1, 2024

Conversation

suremarc
Copy link
Contributor

@suremarc suremarc commented Mar 13, 2024

Which issue does this PR close?

Closes #7490 .

Rationale for this change

See details in #7490 - this feature helps DataFusion eliminate sorts when files can be shown to be non-overlapping in terms of min/max statistics.

What changes are included in this PR?

  • Add a new FileScanConfig::sort_file_groups method that distribute files via a bin packing algorithm, ensuring that no two files have overlapping statistics
  • Make FileScanConfig::project check if file groups are sorted when determining projected output orderings
  • Add a new internal MinMaxStatistics struct that uses the Arrow Row API to efficiently sort & compare file statistics.

Are these changes tested?

Yes - there is a unit test and a sqllogictest.

Are there any user-facing changes?

Yes - there is a new optional statistics field in PartitionedFile, which is part of the proposal in #7490.

There is also the new FileScanConfig::sort_file_groups API

@github-actions github-actions bot added core Core datafusion crate substrait labels Mar 13, 2024
@Dandandan
Copy link
Contributor

/benchmark

@Dandandan
Copy link
Contributor

/benchmark

Copy link

Benchmark results

Benchmarks comparing 3d13091 (main) and cca5f0f (PR)
Comparing 3d13091 and cca5f0f
Note: Skipping /home/runner/work/arrow-datafusion/arrow-datafusion/benchmarks/results/3d13091/tpch.json as /home/runner/work/arrow-datafusion/arrow-datafusion/benchmarks/results/cca5f0f/tpch.json does not exist

@Dandandan
Copy link
Contributor

/benchmark

@Dandandan
Copy link
Contributor

@suremarc sorry for the noise, just trying to run the benchmark command!

Copy link

Benchmark results

Benchmarks comparing 6e90f01 (main) and cca5f0f (PR)
Comparing 6e90f01 and cca5f0f
Note: Skipping /home/runner/work/arrow-datafusion/arrow-datafusion/benchmarks/results/6e90f01/tpch.json as /home/runner/work/arrow-datafusion/arrow-datafusion/benchmarks/results/cca5f0f/tpch.json does not exist

@alamb
Copy link
Contributor

alamb commented Apr 29, 2024

@NGA-TRAN do you have time to review this PR as well?

@alamb
Copy link
Contributor

alamb commented Apr 29, 2024

No worries @suremarc -- I am very excited about this PR. I plan to review it sometime this week (hopefully later today)

@NGA-TRAN
Copy link
Contributor

I will review this either today or tomorrow

Copy link
Contributor

@NGA-TRAN NGA-TRAN left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @suremarc for adding tests and all refactoring works. The PR looks good to me.

I have some suggestions in the tests to keep them deterministic. Since I went over commit by commit, you might have moved the files/tests around but I think you will get the ideas

Comment on lines 72 to 73
/// DataFusion relies on these statistics for planning so if they are incorrect
/// incorrect answers may result.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// DataFusion relies on these statistics for planning so if they are incorrect
/// incorrect answers may result.
/// DataFusion relies on these statistics for planning so if they are incorrect,
/// incorrect answers may result.

I am guessing you use statistics for column min and max and determine whether data overlaps or not, right? And if they do not overlap, we do not need to merge them before sorting. Maybe adding that to make it clear what you mean about incorrect statistics will lead to incorrect results

--SortExec: expr=[string_col@1 ASC NULLS LAST,int_col@0 ASC NULLS LAST]
----ParquetExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_table/0.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_table/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_table/2.parquet]]}, projection=[int_col, string_col]

--ParquetExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_table/0.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_table/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_table/2.parquet]]}, projection=[int_col, string_col], output_ordering=[string_col@1 ASC NULLS LAST, int_col@0 ASC NULLS LAST]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So nice 🎉

],
sort: vec![col("value").sort(true, false)],
expected_result: Err("construct min/max statistics\ncaused by\ncollect min/max values\ncaused by\nError during planning: statistics not found"),
},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice newly added tests

partitioned_file_lists = new_groups;
} else {
log::debug!("attempted to split file groups by statistics, but there were more file groups than target_partitions; falling back to unordered")
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍


# File 1:
query ITID
COPY (SELECT * FROM src_table LIMIT 3)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am a bit surprised this query always return the first 3 rows and its dat tis sorted. Maybe you want to make it deterministic in case this behavior no longer holds in the future by using SELECT * FROM src_table where int_col <=3 order by int_col

I think since you make your columns have corresponding increasing data, the order by can be on any column and your data is always sorted on any column

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what happened here but I think you were looking at an old copy of the code -- GH says this is outdated. The latest version has a sort in this command

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. See my comment here #9593 (review)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, ok that makes sense


# File 2:
query ITID
COPY (SELECT * FROM src_table WHERE int_col > 3 LIMIT 3)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar as above, SELECT * FROM src_table where int_col >=4 and int_col <=6 order by int_col


# Add another file to the directory underlying test_table
query ITID
COPY (SELECT * FROM src_table WHERE int_col > 6 LIMIT 3)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above, you want filter that provide deterministic result and data is sorted

# Check output plan again, expect an "output_ordering" clause in the physical_plan -> ParquetExec:
# After https://github.com/apache/arrow-datafusion/pull/9593 this should not require a sort.
# Check output plan again, expect no "output_ordering" clause in the physical_plan -> ParquetExec,
# due to there being more files than partitions:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice negative test

@alamb alamb added the api change Changes the API exposed to users of the crate label Apr 30, 2024
@suremarc
Copy link
Contributor Author

I'm not sure why changing a comment caused the tests to start failing.... oof.

@alamb
Copy link
Contributor

alamb commented Apr 30, 2024

Added API change label as I it adds a new field to PartitionedFile

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you so much @suremarc -- this PR is pretty epic

I am a little worried about the lack of coverage in MinMaxStatistics, especially around some of the tricky edge cases with projected schemas and projections. However this PR is already quite large and been open for quite a while

Here is how I suggest we proceed:

  1. Add a config value for this feature and default it to false (not enabled by default)
  2. Merge this PR
  3. File additional tickets / tests before enabling it
  4. Add additional test coverage as follow on PRs and do some more testing.

Once we feel good about that we can make a PR to turn it on by default.

Thank you very much @NGA-TRAN for your reviews

///
/// DataFusion relies on these statistics for planning (in particular to sort file groups),
/// so if they are incorrect, incorrect answers may result.
pub statistics: Option<Statistics>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is actually a nice API to potentially provide pre-known statistics 👍

}
all_orderings
}

/// A normalized representation of file min/max statistics that allows for efficient sorting & comparison.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please put this structure into its own module (e.g. datafusion/core/src/datasource/physical_plan/statistics.rs) so that it is easier to find

fn new_from_files<'a>(
projected_sort_order: &[PhysicalSortExpr], // Sort order with respect to projected schema
projected_schema: &SchemaRef, // Projected schema
projection: Option<&[usize]>, // Indices of projection in full table schema (None = all columns)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't see any tests that covered a non None projection and I am a little confused about how it could be correct if the projection was in terms of another schema 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure what you mean, but projection is what was used to produce projected_schema. It tells us what position the columns of projected_schema would be in the full schema. Does that make it more clear?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess I was thinking of subtle bugs related to when:

  1. The schema of the files is different but compatible (e.g. one file as (time, date, symbol) but the other file had (date, symbol, time) for example
  2. The query orders by a subset of the columns (e.g. ORDER BY time)
  3. The query orders by a subset of the columns that is not the sort order (ORDER BY date)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh... I didn't even think about option 1. But I was assuming that the layout of the file statistics should match the table schema and not the individual file's schema. It seems that that's what DataFusion does currently.

e.context("construct min/max statistics for split_groups_by_statistics")
})?;

let indices_sorted_by_min = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we could move this into statistics itself somehing like

        let indices_sorted_by_min = statistics.indices_sorted_by_min()

sort: vec![col("value").sort(true, false)],
expected_result: Ok(vec![vec!["0"], vec!["1"], vec!["2"]]),
},
TestCase {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we please add a test for a single input file too?

false,
)]),
files: vec![
File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think all these tests also always have the first file with the minimum stastistics value -- can you possibly also test what happens when it is not (aka add a test that runs this test with file ids 2, 1, 0)?

@suremarc
Copy link
Contributor Author

suremarc commented May 1, 2024

@alamb I added a config value, and I moved MinMaxStatistics to its own module as requested. I wasn't sure if I should delay addressing your feedback on tests to the next PR, since it seems like the suggested plan is to merge this PR first.

@suremarc suremarc requested a review from alamb May 1, 2024 15:53
@alamb
Copy link
Contributor

alamb commented May 1, 2024

@alamb I added a config value, and I moved MinMaxStatistics to its own module as requested. I wasn't sure if I should delay addressing your feedback on tests to the next PR, since it seems like the suggested plan is to merge this PR first.

Sorry -- sounds good. I am going to give this PR another look and file some follow on tickets.

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @suremarc -- epic work. Let's merge this one in and keep iterating on main

Thanks again for sticking with this -- this is very exciting

@alamb alamb merged commit 7c1c794 into apache:main May 1, 2024
24 checks passed
@alamb
Copy link
Contributor

alamb commented May 1, 2024

Filed #10336 to track enable this flag by default

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api change Changes the API exposed to users of the crate core Core datafusion crate sqllogictest substrait
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Use file statistics in query planning to avoid sorting when unecessary
5 participants