Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Rust] [DataFusion] HashJoinExec slow with many batches #26949

Closed
asfimport opened this issue Dec 24, 2020 · 20 comments
Closed

[Rust] [DataFusion] HashJoinExec slow with many batches #26949

asfimport opened this issue Dec 24, 2020 · 20 comments

Comments

@asfimport
Copy link

Performance of joins slows down dramatically with smaller batches.

The issue is related to slow performance of MutableDataArray::new() when passed a high number of batches. This happens when passing in all of the batches from the build side of the join and this happens once per build-side join key for each probe-side batch.

It seems to get exponentially slower as the number of arrays increases even though the number of rows is the same.

I modified hash_join.rs to have this debug code:

let start = Instant::now();
let row_count: usize = arrays.iter().map(|arr| arr.len()).sum();
let num_arrays = arrays.len();
let mut mutable = MutableArrayData::new(arrays, true, capacity);
if num_arrays > 0 {
    debug!("MutableArrayData::new() with {} arrays containing {} rows took {} ms", num_arrays, row_count, start.elapsed().as_millis());
} 

Batch size 131072:

MutableArrayData::new() with 4584 arrays containing 3115341 rows took 1 ms
MutableArrayData::new() with 4584 arrays containing 3115341 rows took 1 ms
MutableArrayData::new() with 4584 arrays containing 3115341 rows took 1 ms 

Batch size 16384:

MutableArrayData::new() with 36624 arrays containing 3115341 rows took 19 ms
MutableArrayData::new() with 36624 arrays containing 3115341 rows took 16 ms
MutableArrayData::new() with 36624 arrays containing 3115341 rows took 17 ms 

Batch size 4096:

MutableArrayData::new() with 146496 arrays containing 3115341 rows took 88 ms
MutableArrayData::new() with 146496 arrays containing 3115341 rows took 89 ms
MutableArrayData::new() with 146496 arrays containing 3115341 rows took 88 ms 

 

 

 

 

 

Reporter: Andy Grove / @andygrove
Assignee: Daniël Heres / @Dandandan

PRs and other links:

Note: This issue was originally created as ARROW-11030. Please see the migration documentation for further details.

@asfimport
Copy link
Author

Andy Grove / @andygrove:
@jorgecarleitao I have more time tomorrow to work on this but please let me know if you have any ideas that may help me.

@asfimport
Copy link
Author

Andy Grove / @andygrove:
I am going to work on https://issues.apache.org/jira/browse/ARROW-11058 next and this will likely help with this issue.

@asfimport
Copy link
Author

Jorge Leitão / @jorgecarleitao:
No idea why: I cannot find a N*N operation in new, so AFAI can tell, it should not even be polynomial. :/

I agree that having an average of 20 rows per batch may be breaking some of the assumption we make about the performance of arrays.

@asfimport
Copy link
Author

Andy Grove / @andygrove:
I have a theory on what might be happening here but I am struggling to really understand this.

It looks like we create a buffer and for each input array, we extend this buffer. Each time we extend it, the buffer is larger so the cost of extending it again gets higher each time?

Is there a way we can compute upfront how much to extend it by and do one extend operation?

@asfimport
Copy link
Author

Daniël Heres / @Dandandan:
One comment I put in a PR, which is I think part of the problem:

I think part of a further speed up could be moving the building of the left / build-side Vec<&ArrayData> arrays so that it is only created once instead of for each right batch in build_batch_from_indices. Currently when making the batch size smaller, the build-side Vec is built more times, but also contains more (smaller) batches itself, which could explain (part of the) big / exponential slowdown on smaller batches.

@asfimport
Copy link
Author

Daniël Heres / @Dandandan:
It's not directly related to the mutablearraydata::new(), but still makes the hash join having a part with exponential time.

@asfimport
Copy link
Author

Daniël Heres / @Dandandan:
But the same is applicable for mutablearraydata: for the left side of the join it generates n batches, which are iterated n times in mutablearraydata::new().

@asfimport
Copy link
Author

Jorge Leitão / @jorgecarleitao:
@andygrove, the MutableArrayData is basically:

  1. bound to a bunch of existing Arrays (of the same type), so that it can "copy slices of slots" from any of those arrays

  2. It uses the arrays' DataType to extend the buffers and child_data according to the spec

    The parameter "capacity" (in number of slots) is used to reserve all buffers that we can reserve upfront (e.g. primitive types and offsets). In the case of a concat, we can compute that parameter exactly, since it is the sum of all arrays lens:

    let capacity = lengths.iter().sum();

    The new function allocates closures (one per array without child data) that are optimized to extend buffers accordingly.

@asfimport
Copy link
Author

Andy Grove / @andygrove:
Thanks @jorgecarleitao and @Dandandan  for the information. It looks like I may have misunderstood the issue a bit. I am going to unassign this for now and change the title back to being specific to hash join.

@asfimport
Copy link
Author

Daniël Heres / @Dandandan:
So in summary, what looks like is the problem here:

  • for the left side of the join we .iter() on the left side batches each iteration (a couple of times, in the join implementation itself and in MutableDataArray) when we process a probe-side batch.

  • if we halve the batch size, the number of items in the build-side array grows by two, but also the number of probe-side batches we process in the join, thus the n*n slowdown. (the MutableArrayData structure also itself seems not super efficient, making it already visible at bigger batch sizes)

    I am not sure what way best to solve it. I think we have to have something for the left-side batches so we can reuse a "prepared" structure which can be used in O(1) time in build_batch_from_indices.

@asfimport
Copy link
Author

Andy Grove / @andygrove:
I just did an experiment where I concatenated all the batches when building the build-side, so we always have a single batch on that side. It did not make any noticeable difference to performance.

The work is in https://github.com/andygrove/arrow/tree/coalesce-left

@asfimport
Copy link
Author

Daniël Heres / @Dandandan:
@andygrove, interesting take, will check it out! Doesn't that branch already have the coalesce batches optimization though, so it shouldn't be it a small difference now anyway as it doesn't generate the tiny batches like before?

@asfimport
Copy link
Author

Daniël Heres / @Dandandan:
I think it would be nice to further experiment with concatenating the build side into a single batch.

Thoughts:

  • copying is very fast per element (compared to hashing, random access indexing, etc) so it seems a small penalty to pay upfront

  • indexing can use arrows take, which is relatively fast as it can avoid the overhead of the extra indirection, the less efficient MutableDataArray structure and the current remaining exponential behavior (small batches will always be slower though as we always have some constant overhead to pay per batch). take also has the potential for further optimization I think (e.g. SIMD), I think that is hard/impossible to do for something like MutableArrayData.

  • Makes code for left / right side more similar

  • Allows other simplifications / speed ups / avoiding extra copies utilizing the simplified structure and arrows kernels more.

    I added a PR for extending take to support u64 here ARROW-11086: [Rust] Extend take implementation to more index types #9057

@asfimport
Copy link
Author

Jorge Leitão / @jorgecarleitao:
@Dandandan, I agree with that approach.

I originally implemented the MutableArrayData under the assumption that we had to "hash-merge" multiple arrays into a single array, building a new array out of N arrays. The original use-cases were "hash-merge" and "sort-merge". If we can merge all batches in a single batch prior to the join, then I think we should do it and use take instead.

@asfimport
Copy link
Author

Daniël Heres / @Dandandan:
Thanks @jorgecarleitao  for the context & your opinion (y)

@asfimport
Copy link
Author

Andy Grove / @andygrove:
One other thing to note here is that we may not always be able to merge into a single batch due to size restrictions on arrays, so technically we should talk about merging into as few batches as possible, rather than a single batch.

@asfimport
Copy link
Author

Daniël Heres / @Dandandan:
Are you referring to maximum 2 ^ 64 elements in an array (because of the length) or some other restriction in Arrow? The only thing I can find is that for multi language it is recommended to keep it at 2 ^ 31 max, but I assumed we can use the full 64 bits here?

If the limit is 2 ^ 64, maybe at this length at this time we could say at this point that a higher limit is not realistic yet for an in memory hash join? I think technically you could make it a Vec and group/append indices by the first index to be still using the "direct index" approach as mentioned instead and still utilizing take, etc. But if technically not possible, is it necessary?

@asfimport
Copy link
Author

Daniël Heres / @Dandandan:
Added a PR here, still with MutableArrayData (so some optimization opportunities left) but with batches merged to one batch #9070
Collecting to one batch seems very promising, and removes a large amount of the remaining overhead when using smaller batches.

@asfimport
Copy link
Author

Andrew Lamb / @alamb:
Issue resolved by pull request 9070
#9070

@asfimport
Copy link
Author

Andy Grove / @andygrove:
Yes, that is what I am referring to, as well as the limit on data length in
variable-width arrays.

I am specifically thinking about the 32-bit variants, which will be
important on some hardware platforms even when just using the Rust
implementation.

@asfimport asfimport added this to the 3.0.0 milestone Jan 11, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants