-
Notifications
You must be signed in to change notification settings - Fork 3.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ARROW-11076: [Rust][DataFusion] Refactor usage of right indices in hash join #9048
Conversation
// Note that we take `.data_ref()` to gather the [ArrayData] of each array. | ||
let (is_primary, arrays) = match primary[0].schema().index_of(field.name()) { | ||
Ok(i) => Ok((true, primary.iter().map(|batch| batch.column(i).data_ref().as_ref()).collect::<Vec<_>>())), | ||
let (is_primary, column_index) = match primary[0].schema().index_of(field.name()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another thing we should consider (in a separate PR) is determining upfront which colums are left/rigth and avoid calling schema.index_of
for each column in each batch. It is a small cost but we could do it once upfront.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, agreed, anything we can move out of this inner loop is an improvement, and avoids slowdowns, e.g. with large nr. of columns / smaller batch sizes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we could in a next PR move the code and pass a column_indices: Vec<usize>
instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! This provides insane speedups at SF=100 and I will post numbers shortly.
For TPC-H q12 at SF=100 and 8 partitions:
Thank you @Dandandan this is superb 💯 |
Awesome, better than I expected! |
…sh join This applies some refactoring to `build_batch_from_indices` which is supposed to make further changes easier, e.g. solving https://issues.apache.org/jira/browse/ARROW-11030 * This starts handling right (1) batch and left (many) batches differently as for the right batches we can directly use `take` on it. This should be more efficient anyway, and also allows in the future to build the index array directly instead of doing extra copying. * Use `indices.len()` for the capacity parameter, rather than the number of rows at the left. This is of impact at larger sizes (e.g. SF 100), see: apache#9036 Rather than estimating it based on previous batches, this does it based on the (known) number of resulting rows. * Reuse "computed" right indices across multiple columns. * The refactoring makes it easier to apply changes needed for https://issues.apache.org/jira/browse/ARROW-11030 where we need to remove the n*n work that is done for the build side * The changes don't have a big impact locally on performance on TPC-H with small scale factor, but I believe it should have a similar effect as apache#9036 on SF=100 by using `indices.len()` rather than the number of rows in the build side. FYI @jorgecarleitao @andygrove Closes apache#9048 from Dandandan/join_right_refactor Authored-by: Heres, Daniel <danielheres@gmail.com> Signed-off-by: Jorge C. Leitao <jorgecarleitao@gmail.com>
This applies some refactoring to
build_batch_from_indices
which is supposed to make further changes easier, e.g. solving https://issues.apache.org/jira/browse/ARROW-11030take
on it. This should be more efficient anyway, and also allows in the future to build the index array directly instead of doing extra copying.indices.len()
for the capacity parameter, rather than the number of rows at the left. This is of impact at larger sizes (e.g. SF 100), see: ARROW-11053: [Rust] [DataFusion] Optimize joins with dynamic capacity for output batches #9036 Rather than estimating it based on previous batches, this does it based on the (known) number of resulting rows.indices.len()
rather than the number of rows in the build side.FYI @jorgecarleitao @andygrove