New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ARROW-10540 [Rust] Improve filtering #8630
Conversation
Thanks for opening a pull request! Could you open an issue for this pull request on JIRA? Then could you also rename pull request title in the following format?
See also: |
for i in 0..size { | ||
builder.append_value(value_fn(i)).unwrap(); | ||
for _ in 0..size { | ||
let value = rng.gen::<f32>() < trues_density; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
using random numbers to generate the filter arrays makes it difficult to control filter selectivity; also doesn't that make each benchmark run unique which I think is the opposite of what we want - we want consistent benchmarks with stable, repeatable conditions
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if this is the reason for the inconsistency in the benchmark results - usually a highly selective filter, where most filter bits are 0s and only a small number of values are selected / copied to the output array will always have the best performance because most batches of filter bits can be skipped quickly and only a few values are copied to the output array;
this relationship can clearly be seen in the benchmark results listed in this earlier PR #7798;
however in the benchmark results listed in the description of this PR in many cases the opposite is true - low selectivity filter benchmarks achieve much better performance than their high selectivity counterparts; I wonder what's the reason for that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for your comments and for going through the code.
using random numbers to generate the filter arrays makes it difficult to control filter selectivity
Could you elaborate? Isn't the selectivity controlled above? AFAIK we just do not make it on regularly-spaced intervals (i % X
) but according to some seed/randomness.
also doesn't that make each benchmark run unique which I think is the opposite of what we want - we want consistent benchmarks with stable, repeatable conditions
I agree. Would freezing it with a seed address the concern? My main concern with if i % 2 == 0
and the like is that these are highly predictable patterns and unlikely in real world situations. This predictability can make our benchmarks not very informative as they are benchmarking speculative execution and other optimizations, not the code (and again, these patterns are unlikely in real-world).
Another way of looking at it is that our benchmarks need entropy to represent the lack of information that we possess about the underlying distribution of the data (nulls, selectivity, values, etc). The patterns i % 2
and the like are super informative (almost no entropy).
usually a highly selective filter, where most filter bits are 0s and only a small number of values are selected
Sorry, it is just notation: I used "highly selectivity" in the text above to mean a lot of 1s (i.e. many items are selected => high selectivity), not a lot of 0s. But I see that you use the opposite meaning.
In the benchmarks above:
low selectivity
-> select 50%high selectivity
-> do not select 1 on every 8000 entriesvery low selectivity
-> select 1 on every 8000 entries
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@yordan-pavlov Thanks for reminding me that, it was on my plate for a while, now it is addressed in #8635
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree. Would freezing it with a seed address the concern? My main concern with if i % 2 == 0 and the like is that these are highly predictable patterns and unlikely in real world situations. This predictability can make our benchmarks not very informative as they are benchmarking speculative execution and other optimizations, not the code (and again, these patterns are unlikely in real-world).
thread id is xored with the seed, thread_rng doesn't fit reproducible benchmarks point of view, so check out the pr I've opened @jorgecarleitao . tell me what you think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As @vertexclique mentions, he has provided a good PR related to this conversation #8635
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jorgecarleitao the filter benchmarks may not simulate real-world use cases, but they are designed to test the code under specific conditions such as the worst case scenario with alternating 1s and 0s where no batch can be skipped and all selected values have to be copied individually; how can this scenario be achieved with a randomly generated filter array?
the other scenarios which test mostly 0s (best performance because most filter batches can be skipped and only a small number of selected values have to be copied) and mostly 1s (which is not as fast, but still faster than worst case because filter batches can be checked quickly and most values are copied in slices) should be easier to achieve with random filter arrays but are they going to be repeatable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jorgecarleitao once the approach to filter benchmarks has been finalized would it be possible to rearrange the commits so that the same benchmark code can be used to benchmark both the old and new implementations so that we can do a direct comparison?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@yordan-pavlov , that is the case already: :) I put any changes to benchmarks on the first commit (as I need to benchmark them myself).
/// and for a size of `len`. | ||
/// # Panic | ||
/// This function panics if the range is out of bounds, i.e. if `start + len >= array.len()`. | ||
pub fn extend(&mut self, start: usize, end: usize) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure extend() correctly describes what the method does; the idea being that we take / copy a slice (defined by start and end) into the output array. Would take() or copy() be a better name?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was following Rust's notation here Vec::extend
, that extends from a slice. The difference here is that we can't use the &[]
notation, as I could not find an object in this case to pass to. So, I tried the best to to mimic the [start..end]
(via two arguments). The end is exclusive.
I also played with push
(which is often used in rust for a single element).
I do not have strong feelings, though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some thoughts:
Naming: I have seen similar concepts called "Masks" (as they are similar to bit masks) -- so perhaps ArrayDataMask
or MaskedArrayData
. Or perhaps ArrayRowSet
When I actually read the code about MutableArrayData I realize that it isn't quite the mask concept, but it is similar (intermediate results want to represent "what indexes pass a certain test" and then eventually copying only those indexes to a new array"
This type of structure might also useful for performing multiple boolean operations (eg. when doing (A > 5) AND A < 10
you can compute the row ids/indexes for that pass A > 5 and then rather than actually copying those rows to then compare them less than B you can operate directly on the original copy of A (check only the rows where the mask is true)
I believe many other database systems use "bitsets" to represent this concept (we are using https://crates.io/crates/croaring in https://github.com/influxdata/influxdb_iox for the concept)
type ExtendNullBits<'a> = Box<Fn(&mut _MutableArrayData, usize, usize) -> () + 'a>; | ||
// function that extends `[start..start+len]` to the mutable array. | ||
// this is dynamic because different data_types influence how buffers and childs are extended. | ||
type Extend<'a> = Box<Fn(&mut _MutableArrayData, usize, usize) -> () + 'a>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather than a dynamic function pointer to extend such a structure, I wonder if you could get by with 'element_lengthand
number_of_elements`
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That unfortunately would only work for primitive buffers. For string arrays, extending an array data requires a complex operation that is fundamentally different from extending a single buffer. For nested types, the operation is recursive on the child data.
This is fundamentally a dynamic operation: we only know what to do when we see which DataType
the user wants to build an ArrayData
from. We can see that the Builders
use a similar approach: they use dyn Builder
for the same reason.
The builders have an extra complexity associated with the fact that their input type is not uniform: i.e. their API supports extending from a &[T]
(e.g. i32
or i16
), which is the reason why they need to be implemented via a dynamic type, whose each implementation has methods for each type. In the MutableArrayData
, the only "thing" that we extend from is an ArrayData
, which has a uniform (rust) type, but requires a different behavior based on its data_type
=> function pointer per data-type.
let values = &array.buffers()[0].data()[array.offset() * size_of::<T>()..]; | ||
Box::new( | ||
move |mutable: &mut _MutableArrayData, start: usize, len: usize| { | ||
let start = start * size_of::<T>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it seems to me that size_of::<T>
is the part here that is dependent on type -- maybe you could just use that as a number
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I try to use generics for things that are known at compile time and arguments for things that are only known at runtime. Maybe that is not the correct way of thinking?
In this case in particular, the difference would be between build_extend::<i32>(array)
and build_extend(array, size_of::<i32>())
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I try to use generics for things that are known at compile time and arguments for things that are only known at runtime.
I agree that is a good general rule of thumb.
I guess this was a second way of asking if we could avoid dynamic dispatch - clearly the answer was "no" :)
I found the whole block of this comment true. There are other approaches to do this but the main approach is that. I believe scratchpad implementation can also solve this problem from the different looking glass. |
Thanks a lot, @alamb and @vertexclique . I agree with the naming issues here, and great insight into those crates. I do not have strong feelings about naming; I tried to be close to what I am familiar with from Rust (e.g. A bit of background: @yordan-pavlov did an impressive job on #7798 to make filtering performant; It is really hard to get that performance with a generalization. I had to re-write this code at least 3 times to get it to a stage with comparable performance and even then, you can see that it depends on the filter conditions. But there is a (IMO good) reason for generalizing the code in Specifically, the use-case I am looking at is to remove most code from the Also, note that this goes beyond masking: it supports taking values repeatedly. I see it as a "builder" bound to a specific The reason this performance remains comparable(-ish) to master is that when it binds to an For example, let values = &array.buffers()[0].data()[array.offset() * size_of::<T>()..];
Box::new(move |mutable: &mut _MutableArrayData, start: usize, len: usize| { instead of Box::new(move |mutable: &mut _MutableArrayData, array: &ArrayData, start: usize, len: usize| {
let values = &array.buffers()[0].data()[array.offset() * size_of::<T>()..]; has a meaningful performance impact because it avoids checks on both There is a further boost possible that I explored but did not finish, but it requires a bit of type Extend<'a> = Box<Fn(usize, usize) -> () + 'a>; and bind the My point is that there are non-trivial optimizations done in this proposal that were borrowed from the excellent work from @yordan-pavlov and that are required to keep up with very high bar set by #7798 This draft is trying to leverage that work on two of our kernels, |
@jorgecarleitao thank you for this PR; overall I think it's a great idea to reuse the code between the take and filter kernels if possible - and you have demonstrated how it can be possible; we just have to find a way to keep performance at a good level; I have been thinking whether the BitChunkIterator from here https://github.com/apache/arrow/blob/master/rust/arrow/src/util/bit_chunk_iterator.rs can be used to improve the filter kernel so I am happy to see you have already done it. |
I have rebased against latest master and further improved the code. IMO this is now ready to review: benchmarks are now better for all cases where we pre-compute the filter (1.1-3x) except for 1 case (very low selectivity) where it is 1.5x worse. For a single filter case, I know how to improve it: we need to not pre-compute the filter and instead use an iterator. I can continue to investigate this, but IMO this is an improvement to the code base and performance. |
I will try and review this carefully tomorrow |
FYI I was unable to make this sufficiently efficient for the |
This code can be re-used to implement |
let sparse_filter_array = create_bool_array(size, |i| matches!(i % 8000, 0)); | ||
let dense_filter_array = create_bool_array(size, |i| !matches!(i % 8000, 0)); | ||
let filter_array = create_bool_array(size, 0.5); | ||
let sparse_filter_array = create_bool_array(size, 1.0 - 1.0 / 8000.0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the names of the sparse_filter_array
(intended to contain mostly 0s) and dense_filter_array
(intended to contain mostly 1s) variables no longer correspond to the values; should they be reversed?
pub fn extend(&mut self, len: usize) { | ||
let remaining_capacity = self.capacity - self.len; | ||
if len > remaining_capacity { | ||
self.reserve(self.len + len); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this be self.reserve(self.len + (len - remaining_capacity));
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am sorry, I do not follow: my understanding is that we need to reserve
so that self.len <= self.capacity
is always true.
If we have a buffer with capacity 64 and len
54, and we extend it by 8, we want to ensure that its new capacity is at least 54+8=62
, no?
your formula yields 54 + (8 - (64 - 54)) = 54 + (8 - 10) = 52
(panics with an underflow, I think).
Another way of writing this code would be:
self.len += len;
if self.len > self.capacity {
self.reserve(self.len);
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jorgecarleitao good spot - what I suggested is more obvious when len > remaining_capacity such as if len = 256
in your example, then requested capacity would be 54 + (256 - (64 - 54)) = 54 + (256 - 10) = 300
; I think it will still work even in your example because the requested of 52 capacity will be lower than existing capacity so no allocation will be performed as it is not necessary because the requested extension of 8 is smaller than remaining capacity of 10.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reserve
s API is to guarantee a capacity of up to the argument, and extend
's API is to increase self.len
by len
.
If we call self.reserve(300)
, but then increase the len
by 256
to 54+256=310
, we will be able to offset the pointer past the allocated memory (310 > 300), no?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you are right, I misunderstood reserve(x) to mean that we want to increase capacity by x, apologies
FWIW keeping "filter" as a special case that is very fast is not unreasonable -- it is likely to be one of the most performance critical pieces of code in analytics systems, so it is naturally likely to end up as the most complex / optimized. |
This PR is based on top of #8630 and contains a physical node to perform an inner join in DataFusion. This is still a draft, but IMO the design is here and the two tests already pass. This is co-authored with @andygrove , that contributed to the design on how to perform this operation in the context of DataFusion (see ARROW-9555 for details). The API used for the computation of the join at the arrow level is briefly discussed in [this document](https://docs.google.com/document/d/1KKuBvfx7uKi-x-tWOL60R1FNjDW8B790zPAv6yAlYcU/edit). There is still a lot to work on, but I I though it would be a good time to have a first round of discussions, and also to gauge timings wrt to the 3.0 release. There are two main issues being addressed in this PR: * How to we perform the join at the partition level: this pr collects all batches from the left, and then issues a stream per part on the right. Each batch on that stream joins itself with all the ones from the left (N) via a hash. This allow us to only require computing the hash of a row once (first all the left, then one by one on the right). * How do we build an array from `N (left)` arrays and a set of indices (matching the hash from the right): this is done using the `MutableArrayData` being worked on #8630, which incrementally memcopies slots from each of the N arrays based on the index. This implementation is useful because it works for all array types and does not require casting anything to rust native types (i.e. it operates on `ArrayData`, not specific implementations). There are still some steps left to have a join in SQL, most notably the whole logical planning, the output_partition logic, the bindings to SQL and DataFrame API, update the optimizers to handle nodes with 2 children, and a whole battery of tests. There is also a natural path for the other joins, as it will be a matter of incorporating the work already on PR #8689 that introduces the option to extend the `MutableArrayData` with nulls, the operation required for left and right joins. Closes #8709 from jorgecarleitao/join2 Authored-by: Jorge C. Leitao <jorgecarleitao@gmail.com> Signed-off-by: Andy Grove <andygrove73@gmail.com>
This PR is based on top of apache#8630 and contains a physical node to perform an inner join in DataFusion. This is still a draft, but IMO the design is here and the two tests already pass. This is co-authored with @andygrove , that contributed to the design on how to perform this operation in the context of DataFusion (see ARROW-9555 for details). The API used for the computation of the join at the arrow level is briefly discussed in [this document](https://docs.google.com/document/d/1KKuBvfx7uKi-x-tWOL60R1FNjDW8B790zPAv6yAlYcU/edit). There is still a lot to work on, but I I though it would be a good time to have a first round of discussions, and also to gauge timings wrt to the 3.0 release. There are two main issues being addressed in this PR: * How to we perform the join at the partition level: this pr collects all batches from the left, and then issues a stream per part on the right. Each batch on that stream joins itself with all the ones from the left (N) via a hash. This allow us to only require computing the hash of a row once (first all the left, then one by one on the right). * How do we build an array from `N (left)` arrays and a set of indices (matching the hash from the right): this is done using the `MutableArrayData` being worked on apache#8630, which incrementally memcopies slots from each of the N arrays based on the index. This implementation is useful because it works for all array types and does not require casting anything to rust native types (i.e. it operates on `ArrayData`, not specific implementations). There are still some steps left to have a join in SQL, most notably the whole logical planning, the output_partition logic, the bindings to SQL and DataFrame API, update the optimizers to handle nodes with 2 children, and a whole battery of tests. There is also a natural path for the other joins, as it will be a matter of incorporating the work already on PR apache#8689 that introduces the option to extend the `MutableArrayData` with nulls, the operation required for left and right joins. Closes apache#8709 from jorgecarleitao/join2 Authored-by: Jorge C. Leitao <jorgecarleitao@gmail.com> Signed-off-by: Andy Grove <andygrove73@gmail.com>
Motivation
The current code-base to
filter
an array is typed, which means that we can't easily generalize it for arbitrary depths.That code is also similar to the code in
take
: both these operations perform a very similar operation: from an array, produce another array taking elements from an existing array.This PR
This is a proposal to introduce a new
struct
(MutableArrayData
for the lack of a better name) that is useful to efficiently construct an array taken from another array. The main use-case of thisstruct
is to be used in unary operations that are type-independent, such asfilter
,take
andsort
.With this
struct
, this PR:MutableArrayData
to efficiently createArrayData
as partialmemcopies
from anotherArrayData
List
s of arbitrary depth and typesThe gist of this
struct
can be seen in the example reproduced below:Design
There are 3 core ideas on this design:
ArrayData
, i.e. it is (rust-)type-independent and thus can be used on a variety of use-cases.With this in mind, the API has 3 methods:
new
,extend
andfreeze
. The code infilter
was reducing memcopies to some extent, but this PR further expands on this idea by copying the largest possible chunk, instead of up to 64 slots.Implementation
filter.rs
by thebitchunkIterator
filter.rs
to a new module (array/transform/
)filter
code be focused on how to efficiently perform the filter (instead of how to build the buffers)Benchmarks
Benchmarks
set -e git checkout 010d260173a9e110901ca67372a4ac379a615a13 cargo bench --bench filter_kernels git checkout mutable_filter cargo bench --bench filter_kernels