-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Refactor InListExpr to support structs by re-using existing hashing infrastructure #18449
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| // TODO: serialize the inner ArrayRef directly to avoid materialization into literals | ||
| // by extending the protobuf definition to support both representations and adding a public | ||
| // accessor method to InListExpr to get the inner ArrayRef |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll create a followup issue once we merge this
| 05)--------ProjectionExec: expr=[] | ||
| 06)----------CoalesceBatchesExec: target_batch_size=8192 | ||
| 07)------------FilterExec: substr(md5(CAST(value@0 AS Utf8View)), 1, 32) IN ([7f4b18de3cfeb9b4ac78c381ee2ad278, a, b, c]) | ||
| 07)------------FilterExec: substr(md5(CAST(value@0 AS Utf8View)), 1, 32) IN (SET) ([7f4b18de3cfeb9b4ac78c381ee2ad278, a, b, c]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is because we now support Utf8View for building the sets 😄
| let random_state = RandomState::with_seed(0); | ||
| let mut hashes_buf = vec![0u64; array.len()]; | ||
| let Ok(hashes_buf) = create_hashes_from_arrays( | ||
| &[array.as_ref()], | ||
| &random_state, | ||
| &mut hashes_buf, | ||
| ) else { | ||
| unreachable!("Failed to create hashes for InList array. This shouldn't happen because make_set should have succeeded earlier."); | ||
| }; | ||
| hashes_buf.hash(state); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could pre-compute and store a hash: u64 which would be both more performant when Hash is called and avoid this error, but it would add more complexity and some overhead when building the InListExpr
4d4b797 to
9a0f6be
Compare
9a0f6be to
f1f3b66
Compare
## Background This PR is part of an EPIC to push down hash table references from HashJoinExec into scans. The EPIC is tracked in apache#17171. A "target state" is tracked in apache#18393. There is a series of PRs to get us to this target state in smaller more reviewable changes that are still valuable on their own: - (This PR): apache#18448 - apache#18449 (depends on apache#18448) - apache#18451 ## Changes in this PR Change create_hashes and related functions to work with &dyn Array references instead of requiring ArrayRef (Arc-wrapped arrays). This avoids unnecessary Arc::clone() calls and enables calls that only have an &dyn Array to use the hashing utilities. - Add create_hashes_from_arrays(&[&dyn Array]) function - Refactor hash_dictionary, hash_list_array, hash_fixed_list_array to use references instead of cloning - Extract hash_single_array() helper for common logic --------- Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
f1f3b66 to
d1b9d05
Compare
| /// supported. Returns None otherwise. See [`LiteralGuarantee::analyze`] to | ||
| /// create these structures from an predicate (boolean expression). | ||
| fn new<'a>( | ||
| fn new( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's worth discussing in this review how far we propagate the changes.
In particular, InListExpr will now have two operations modes:
- Was built with an
ArrayRefor was able to build anArrayReffrom a homogeneously typedVec<Arc<dyn PhysicalExpr>>which are all literals. - Was built with a
Vec<Arc<dyn PhysicalExpr>>which are not literals or homogeneously typed.
If we restrict LiteralGuarantee to only operate on the first cases, I think we could lift out a lot of computation: instead of transforming ArrayRef -> Vec<Arc<dyn PhysicalExpr>> -> Vec<ScalarValue> -> HashSet<ScalarValue> which then gets fed into bloom filters which are per-column and don't really support heterogenous ScalarValues we could re-use the already deduplicated ArraySet that InListExpr has internally or something. The ultimate thing to do, but that would require even more work and changes, would be to make PruningPredicate::contains accept an enum ArrayOrScalars { Array(ArrayRef), Scalars(Vec<ScalarValue>) } so that we can push down and iterate over the Arc'ed ArrayRef the whole way down. I think we could make this backwards compatible.
I think that change is worth it, but it requires a bit more coordination (with arrow-rs) and a bigger change.
The end result would be that:
- When you create an
InListExproperates in mode (1) we are able to push down into bloom filters with no data copies at all. - When the
InListExproperates in mode (2) we'd bail on the pushdown early (e.g.list() -> Option<ArrayRef>) and avoid building theHashSet<ScalarValue>, etc. that won't be used.
Wdyt @alamb ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay I've looked into this and it is entirely possible, I think we should do it.
Basically the status quo currently is that we always try to build an ArrayHashSet which is only possible if we can convert the Vec<ScalarValue> into an ArrayRef.
At that point the only reason to store the Vec<SclarValue> is to later pass it into PruningPredicate -> bloom filters and LiteralGuarantee. If we can refactor those two to also handle an ArrayRef we could probably avoid a lot of cloning and make things more efficient by using arrays. I don't even think we need to support Vec<ScalarValue> at all: the only reason to have that is if you could not build a homogeneously typed array, and if that is the case you probably can't do any sort of pushdown into a bloom filter. E.g. select variant_get(col, 'abc') in (1, 2.0, 'c') might make sense and work but I don't think we could ever push that down into a bloom filter. So InListExpr needs to operate on both but I don't think the pruning machinery does.
So anyway I think I may try to reduce this change to only be about using create_hashes and ignore any inefficiencies as a TODO for a followup issue. At the end of the day if we can make HashJoinExec faster even if that's with some inefficiencies I think that's okay and we can improve more later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll record a preview of some of the changes I had made to explore this (by no means ready) just for future reference: https://github.com/pydantic/datafusion/compare/refactor-in-list...pydantic:datafusion:use-array-in-pruning?expand=1
| pub trait Set: Send + Sync { | ||
| fn contains(&self, v: &dyn Array, negated: bool) -> Result<BooleanArray>; | ||
| fn has_nulls(&self) -> bool; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We get rid of the Set trait. The only implementer was ArraySet
| array => Arc::new(ArraySet::new(array, make_hash_set(array))), | ||
| DataType::Boolean => { | ||
| let array = as_boolean_array(array)?; | ||
| Arc::new(ArraySet::new(array, make_hash_set(array))) | ||
| }, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We get rid of this type matching logic
| trait IsEqual: HashValue { | ||
| fn is_equal(&self, other: &Self) -> bool; | ||
| } | ||
|
|
||
| impl<T: IsEqual + ?Sized> IsEqual for &T { | ||
| fn is_equal(&self, other: &Self) -> bool { | ||
| T::is_equal(self, other) | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We get rid of these custom equality / hash traits
ab74641 to
f412ead
Compare
|
🤖 |
alamb
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @adriangb
I looked through the code and the basic idea makes a lot of sense to me 👍
I kicked off some benchmarks to see what impact, if any, this change has on performance. Assuming it is the same or better, I think it would be good to merge
I do suggest adding some slt level logic for struct IN lists if we don't already have some, but I don't think it is necessary
| false => Some(negated), | ||
| } | ||
| }) | ||
| let mut hashes_buf = vec![0u64; v.len()]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As a follow on PR we could potentially look into reusing this hashes_buf -- aka rather than reallocating each invocations of contains instead make a field (probably needs to be a Mutex or something) that is a Vec
| }) | ||
| let mut hashes_buf = vec![0u64; v.len()]; | ||
| create_hashes([v], &self.state, &mut hashes_buf)?; | ||
| let cmp = make_comparator(v, in_array, SortOptions::default())?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the comparator is some dynamic function -- the overhead of using the dynamic dispatch in the critical path may be substantial).
If it turns out to be too slow, we can potentially create specializations for comparisons (aka make a speicalized hash set for the different physical array types, and fall back to the dynamic comparator)
| /// | ||
| /// The `list` field will be empty when using this constructor, as the array is stored | ||
| /// directly in the static filter. | ||
| pub fn in_list_from_array( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if it would be more discoverable if this was a method on InList rather than a free function
Something like
impl InLIst
fn new_from_array( expr: Arc<dyn PhysicalExpr>,
array: ArrayRef,
negated: bool,
) -> Result<Self> {
...
}There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I agree, I was just following the existing patterns
| } | ||
|
|
||
| #[test] | ||
| fn in_list_struct() -> Result<()> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we also please add some .slt level tests for IN on a set?
|
🤖: Benchmark completed Details
|
|
It looks like there are indeed some regressions. I propose we do two things:
I think that will get us the broader type support and code re-use while avoiding any slowdown. Once we do the upstreaming into arrow it won’t even be any more code than it is now (a bit more code in arrow but not even that much). And we should be able to do it all in one PR here |
|
I removed the enum comparator, benchmarks showed it was slower than the dynamic dispatch version. The thread local hashing / buffer re-use seems to be a big win though. Although this is +1.7k LOC ~1.5k of those are new tests / docstrings on existing functions. The actual change is closer to ~500LOC, and that includes the new @alamb could you kick off benchmarks again? If they look good are we good to merge this? |
846476b to
a5afb96
Compare
|
🤖 |
|
🤖: Benchmark completed Details
|
|
🤖 |
alamb
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @adriangb and @davidhewitt
As long as this PR shows no performance regressions I think it is good to go
I am actually pretty surprised we can get away using the comparator compared to using eq which is typically much faster. I have an idea of how we can potentially have the best of both worlds (full type support as well as faster native implementation).
Let me see if I can bang out something
| NULL | ||
|
|
||
| # ======================================================================== | ||
| # Comprehensive IN LIST tests with NULL handling |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I always why @claude and similar bots always insist the code is "comprehensive" 😆
| random_state: &RandomState, | ||
| hashes_buffer: &'a mut Vec<u64>, | ||
| ) -> Result<&'a mut Vec<u64>> | ||
| hashes_buffer: &'a mut [u64], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I double checked that the code already asserts that the hashes_buffer and arrays are the same length (aka doesn't actually use the fact this is a Vec to grow the allocation)
| } | ||
|
|
||
| #[test] | ||
| fn test_with_hashes_reentrancy() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you please add a test / verify the truncate / shrink to fit behavior ? I think that is probably important
| 04)------AggregateExec: mode=Partial, gby=[], aggr=[sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)] | ||
| 05)--------CoalesceBatchesExec: target_batch_size=8192 | ||
| 06)----------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(l_partkey@0, p_partkey@0)], filter=p_brand@1 = Brand#12 AND p_container@3 IN ([SM CASE, SM BOX, SM PACK, SM PKG]) AND l_quantity@0 >= Some(100),15,2 AND l_quantity@0 <= Some(1100),15,2 AND p_size@2 <= 5 OR p_brand@1 = Brand#23 AND p_container@3 IN ([MED BAG, MED BOX, MED PKG, MED PACK]) AND l_quantity@0 >= Some(1000),15,2 AND l_quantity@0 <= Some(2000),15,2 AND p_size@2 <= 10 OR p_brand@1 = Brand#34 AND p_container@3 IN ([LG CASE, LG BOX, LG PACK, LG PKG]) AND l_quantity@0 >= Some(2000),15,2 AND l_quantity@0 <= Some(3000),15,2 AND p_size@2 <= 15, projection=[l_extendedprice@2, l_discount@3] | ||
| 06)----------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(l_partkey@0, p_partkey@0)], filter=p_brand@1 = Brand#12 AND p_container@3 IN (SET) ([SM CASE, SM BOX, SM PACK, SM PKG]) AND l_quantity@0 >= Some(100),15,2 AND l_quantity@0 <= Some(1100),15,2 AND p_size@2 <= 5 OR p_brand@1 = Brand#23 AND p_container@3 IN (SET) ([MED BAG, MED BOX, MED PKG, MED PACK]) AND l_quantity@0 >= Some(1000),15,2 AND l_quantity@0 <= Some(2000),15,2 AND p_size@2 <= 10 OR p_brand@1 = Brand#34 AND p_container@3 IN (SET) ([LG CASE, LG BOX, LG PACK, LG PKG]) AND l_quantity@0 >= Some(2000),15,2 AND l_quantity@0 <= Some(3000),15,2 AND p_size@2 <= 15, projection=[l_extendedprice@2, l_discount@3] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why did these queries start being able to use the pre-calculated set? Is it because InList didn't have a special case for Utf8View before?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep exactly
| .map | ||
| .raw_entry() | ||
| .from_hash(hash, |idx| in_array.value(*idx).is_equal(&v)) | ||
| .from_hash(hash, |idx| cmp(i, *idx).is_eq()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we ever need to make this faster, we could potentially add specializations for different primitive types, and still fall back to the dynamic comparator
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried that. It seemed slower once the enum got large enough.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean specialize the entire thing (including hash table) - so that you pay the dispatch once (either at InLIstExpr creation time or maybe once per batch), rather than on each row
Another problem with the dyn comparator approach is that it prevents inlining/vectorization
Here is one way to specialize the hashset:
- WIP: Hack out specialized Int32 static filter pydantic/datafusion#45
(I haven't fully worked out the generics yet)
| // SQL three-valued logic: null IN (...) is always null | ||
| // The code below would handle this correctly but this is a faster path | ||
| return Ok(ColumnarValue::Array(Arc::new( | ||
| BooleanArray::from(vec![None; num_rows]), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can probably be made faster by bypassing the Vec entirely -- perhaps via https://docs.rs/arrow/latest/arrow/array/struct.BooleanBufferBuilder.html
not necessary, I am just pointing it out
| BooleanArray::from(vec![None; num_rows]) | ||
| } else { | ||
| // Convert scalar to 1-element array | ||
| let array = scalar.to_array()?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am really surprised using this comparator does not cause a performance_regression compared to using eq
| use hashbrown::hash_map::RawEntryMut; | ||
|
|
||
| /// Static filter for InList that stores the array and hash set for O(1) lookups | ||
| #[derive(Debug, Clone)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the reason to pull StaticFilter out from ArrayHashSet? It took me a little bit to grok that the fields in ArrayHashSet refer to StaticFilter
In other words, why not something like
/// Static filter for InList that stores the array and hash set for O(1) lookups
#[derive(Debug, Clone)]
struct StaticFilter {
in_array: ArrayRef,
state: RandomState,
/// Used to provide a lookup from value to in list index
///
/// Note: usize::hash is not used, instead the raw entry
/// API is used to store entries w.r.t their value
map: HashMap<usize, (), ()>,
}There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update -- I tried this and it seems to work great
|
Looks like in_list is still slower for i32 - #18449 (comment) I have an idea of how to fix this (use eq rather than cmp) |
|
🤖: Benchmark completed Details
|
|
If we merge this PR in as written, I think we should file a ticket to follow up restoring the performance |
…nfrastructure Co-authored-by: David Hewitt <mail@davidhewitt.dev>
* Consolidate StaticFilter and ArrayHashSet * Fix docs
2f5e435 to
8a2ee06
Compare
|
I'm surprised that doing dynamic dispatch once per batch we evaluate as opposed to twice per batch we evaluate makes that much of a difference. What would make sense that makes a difference to me is doing it once per element vs. once per batch. But I guess that's what benchmarks say! That does leave me with a question... could we squeeze out even more performance if we specialize for ~ all scalar types? It wouldn't be that hard to write a macro and have AI do the copy pasta of implementing it for all of the types... I'll open a follow up ticket. |
|
Also thank you for your help getting this across the line @alamb! I'm excited to continue the work. |
Yes this is what I think we should do |
| } | ||
|
|
||
| ArrayHashSet { state, map } | ||
| struct Int32StaticFilter { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh yeah, we should totally do the same thing here for the other types. I'll file a ticket to track that
…for more precise filters (#18451) ## Background This PR is part of an EPIC to push down hash table references from HashJoinExec into scans. The EPIC is tracked in #17171. A "target state" is tracked in #18393. There is a series of PRs to get us to this target state in smaller more reviewable changes that are still valuable on their own: - #18448 - #18449 (depends on #18448) - (This PR): #18451 ## Changes in this PR This PR refactors state management in HashJoinExec to make filter pushdown more efficient and prepare for pushing down membership tests. - Refactor internal data structures to clean up state management and make usage more idiomatic (use `Option` instead of comparing integers, etc.) - Uses CASE expressions to evaluate pushed-down filters selectively by partition Example: `CASE hash_repartition % N WHEN partition_id THEN condition ELSE false END` --------- Co-authored-by: Lía Adriana <lia.castaneda@datadoghq.com>
Background
This PR is part of an EPIC to push down hash table references from HashJoinExec into scans. The EPIC is tracked in #17171.
A "target state" is tracked in #18393.
There is a series of PRs to get us to this target state in smaller more reviewable changes that are still valuable on their own:
HashJoinExecand use CASE expressions for more precise filters #18451Changes in this PR
by adding an internal InListStorage enum with Array and Exprs variants
in_list_from_array(expr, list_array, negated)for creating InList from arraysAlthough the diff looks large most of it is actually tests and docs. I think the actual code change is a negative LOC change, or at least negative complexity (eliminates a trait, a macro, matching on data types).