-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ARROW-11053: [Rust] [DataFusion] Optimize joins with dynamic capacity for output batches #9036
Conversation
other => other, | ||
Some(Ok(batch)) => { | ||
let start = Instant::now(); | ||
let capacity = if self.num_output_batches == 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In #9032 it was benchmarked to be a bit faster currently to initialize with capacity 0 than to use the correct capacity upfront. I think that is more something curious and hopefully will change, but that might be the case here too? What is the performance if we initialize with 0 everywhere?
An important source of slowness seems to be in the (use and inefficiency of) creating the Changing the piece of code to generate a let (is_primary, arrays) = match primary[0].schema().index_of(field.name()) {
Ok(i) => Ok((true, primary.iter().map(|batch| batch.column(i).data_ref().as_ref()).collect::<Vec<_>>())),
Err(_) => {
match secondary[0].schema().index_of(field.name()) {
Ok(i) => Ok((false, secondary.iter().map(|batch| batch.column(i).data_ref().as_ref()).collect::<Vec<_>>())),
_ => Err(DataFusionError::Internal(
format!("During execution, the column {} was not found in neither the left or right side of the join", field.name()).to_string()
))
}
}
}.map_err(DataFusionError::into_arrow_external_error)?; |
Opened a PR with that change here: #9042 |
I think part of a further speed up could be moving the building of the left / build-side |
FYI @andygrove @jorgecarleitao This is related to findings here: #9036 This is a ~20% win on smaller batch sizes, but also has an effect on bigger sizes (about 10% on size of 16k locally) Query 12 with batch size 4096 PR: ``` Query 12 iteration 0 took 484.0 ms Query 12 iteration 1 took 482.5 ms Query 12 iteration 2 took 496.9 ms Query 12 iteration 3 took 488.1 ms Query 12 iteration 4 took 504.9 ms Query 12 iteration 5 took 490.4 ms Query 12 iteration 6 took 486.8 ms Query 12 iteration 7 took 499.3 ms Query 12 iteration 8 took 485.0 ms Query 12 iteration 9 took 488.8 ms Query 12 avg time: 490.67 ms ``` Master: ``` Query 12 iteration 0 took 669.6 ms Query 12 iteration 1 took 764.8 ms Query 12 iteration 2 took 705.8 ms Query 12 iteration 3 took 645.1 ms Query 12 iteration 4 took 640.9 ms Query 12 iteration 5 took 639.2 ms Query 12 iteration 6 took 658.9 ms Query 12 iteration 7 took 643.2 ms Query 12 iteration 8 took 639.0 ms Query 12 iteration 9 took 639.2 ms Query 12 avg time: 664.57 ms ``` Closes #9042 from Dandandan/join_perf Authored-by: Heres, Daniel <danielheres@gmail.com> Signed-off-by: Andy Grove <andygrove73@gmail.com>
I think this is something we should merge. We can maybe tweak the extra |
@andygrove thought more about this, I think we are able to use |
Closed in favor of #9048 |
…sh join This applies some refactoring to `build_batch_from_indices` which is supposed to make further changes easier, e.g. solving https://issues.apache.org/jira/browse/ARROW-11030 * This starts handling right (1) batch and left (many) batches differently as for the right batches we can directly use `take` on it. This should be more efficient anyway, and also allows in the future to build the index array directly instead of doing extra copying. * Use `indices.len()` for the capacity parameter, rather than the number of rows at the left. This is of impact at larger sizes (e.g. SF 100), see: #9036 Rather than estimating it based on previous batches, this does it based on the (known) number of resulting rows. * Reuse "computed" right indices across multiple columns. * The refactoring makes it easier to apply changes needed for https://issues.apache.org/jira/browse/ARROW-11030 where we need to remove the n*n work that is done for the build side * The changes don't have a big impact locally on performance on TPC-H with small scale factor, but I believe it should have a similar effect as #9036 on SF=100 by using `indices.len()` rather than the number of rows in the build side. FYI @jorgecarleitao @andygrove Closes #9048 from Dandandan/join_right_refactor Authored-by: Heres, Daniel <danielheres@gmail.com> Signed-off-by: Jorge C. Leitao <jorgecarleitao@gmail.com>
FYI @andygrove @jorgecarleitao This is related to findings here: apache#9036 This is a ~20% win on smaller batch sizes, but also has an effect on bigger sizes (about 10% on size of 16k locally) Query 12 with batch size 4096 PR: ``` Query 12 iteration 0 took 484.0 ms Query 12 iteration 1 took 482.5 ms Query 12 iteration 2 took 496.9 ms Query 12 iteration 3 took 488.1 ms Query 12 iteration 4 took 504.9 ms Query 12 iteration 5 took 490.4 ms Query 12 iteration 6 took 486.8 ms Query 12 iteration 7 took 499.3 ms Query 12 iteration 8 took 485.0 ms Query 12 iteration 9 took 488.8 ms Query 12 avg time: 490.67 ms ``` Master: ``` Query 12 iteration 0 took 669.6 ms Query 12 iteration 1 took 764.8 ms Query 12 iteration 2 took 705.8 ms Query 12 iteration 3 took 645.1 ms Query 12 iteration 4 took 640.9 ms Query 12 iteration 5 took 639.2 ms Query 12 iteration 6 took 658.9 ms Query 12 iteration 7 took 643.2 ms Query 12 iteration 8 took 639.0 ms Query 12 iteration 9 took 639.2 ms Query 12 avg time: 664.57 ms ``` Closes apache#9042 from Dandandan/join_perf Authored-by: Heres, Daniel <danielheres@gmail.com> Signed-off-by: Andy Grove <andygrove73@gmail.com>
…sh join This applies some refactoring to `build_batch_from_indices` which is supposed to make further changes easier, e.g. solving https://issues.apache.org/jira/browse/ARROW-11030 * This starts handling right (1) batch and left (many) batches differently as for the right batches we can directly use `take` on it. This should be more efficient anyway, and also allows in the future to build the index array directly instead of doing extra copying. * Use `indices.len()` for the capacity parameter, rather than the number of rows at the left. This is of impact at larger sizes (e.g. SF 100), see: apache#9036 Rather than estimating it based on previous batches, this does it based on the (known) number of resulting rows. * Reuse "computed" right indices across multiple columns. * The refactoring makes it easier to apply changes needed for https://issues.apache.org/jira/browse/ARROW-11030 where we need to remove the n*n work that is done for the build side * The changes don't have a big impact locally on performance on TPC-H with small scale factor, but I believe it should have a similar effect as apache#9036 on SF=100 by using `indices.len()` rather than the number of rows in the build side. FYI @jorgecarleitao @andygrove Closes apache#9048 from Dandandan/join_right_refactor Authored-by: Heres, Daniel <danielheres@gmail.com> Signed-off-by: Jorge C. Leitao <jorgecarleitao@gmail.com>
This builds on #9035.
I am investigating why join performance is so bad with smaller batch sizes (see https://issues.apache.org/jira/browse/ARROW-11030) and this is one optimization that I have found so far that helps a bit.
Prior to this PR, we use the size of left or right batches to guess the capacity of output batches and this results in a lot of over allocation in some cases. For TPC-H q12 at SF=100, I see vectors created with capacity of ~3,000,000 (the size of the build-side of the join) and then we only populate it with ~700 entries.
This PR attempts to learn a good capacity based on previously processed batches.
Here are query times in seconds at different batch sizes: