New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ARROW-9679: [Rust] [DataFusion] More efficient creation of final batch from HashAggregateExec #7936
Conversation
@alamb @jorgecarleitao @houqp fyi, since you've all been contributing to DataFusion lately |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I reviewed the logic carefully and I didn't see any issues. My only comment that is in the "really should fix before merging" category is removing the println!
. The rest are stylistic.
Otherwise I think this PR is looking really good. Nice work @andygrove
@@ -413,20 +413,7 @@ impl Max { | |||
|
|||
impl AggregateExpr for Max { | |||
fn data_type(&self, input_schema: &Schema) -> Result<DataType> { | |||
match self.expr.data_type(input_schema)? { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that the type of Max
and Min
should be the same as the input (as those functions are never going to overflow the value type of their input, the way Sum
or Avg
could.
I actually think the change results in cleaner code too, which is a nice bonus
|
||
/// Append an aggregate expression value to a builder | ||
macro_rules! append_aggr_value { | ||
($BUILDER:expr, $BUILDER_TY:ident, $VALUE:expr, $SCALAR_TY:ident) => {{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the names $BUILDER_TY
and $SCALAR_TY
are easier to understand than the previous $TY:ident, $TY2:ty
👍
/// Append an aggregate expression value to a builder | ||
macro_rules! append_aggr_value { | ||
($BUILDER:expr, $BUILDER_TY:ident, $VALUE:expr, $SCALAR_TY:ident) => {{ | ||
println!("downcast {:#?} to {:#?}", $BUILDER, $VALUE); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
println!("downcast {:#?} to {:#?}", $BUILDER, $VALUE); |
match $VALUE { | ||
Some(ScalarValue::$SCALAR_TY(n)) => builder.append_value(n)?, | ||
None => builder.append_null()?, | ||
Some(_) => panic!(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some(_) => panic!(), | |
Some(_) => Err(ExecutionError::ExecutionError("unexpected type when creating aggregate value".to_string()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. Fixed.
output_schema: &Schema, | ||
) -> Result<RecordBatch> { | ||
let mut builders: Vec<Box<dyn ArrayBuilder>> = vec![]; | ||
for i in 0..num_group_expr + num_aggr_expr { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for i in 0..num_group_expr + num_aggr_expr { | |
for i in 0..output_schema.fields().len() { |
This is just a style suggestion / defensive coding suggestion (as output_schema.field(i)
is matched below).
If you wanted to get all rust
y / functional, you could also think about rewriting this as a map over fields. Something like this (untested):
let builders = output_schema
.fields()
.iter()
.map(|f| { match f.data_type ... // the match statement below})
.collect::<Result<Vec<_>>?;
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I took a quick look at doing this but I needed an explicit assignment still otherwise it complained about the match arms returning different types, so it ended up not being much cleaner really. However, I did change it to use for data_type in &output_types
which is a little cleaner.
DataType::Float64 => { | ||
append_aggr_value!(builder, Float64Builder, value, Float64) | ||
} | ||
DataType::Utf8 => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am sure you have a good reason, but I didn't see quite why this match arm can't use append_aggr_value!
as well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The issue here was that ScalarValue::Utf8
contains String
and the builder wants &str
. In all other cases the scalar and builder types are the same.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that might be worth adding in a comment so future readers would not have to wonder.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
join?.iter() | ||
.for_each(|batch| combined_results.push(Arc::new(batch.clone()))); | ||
} | ||
Err(e) => return Err(ExecutionError::General(format!("{:?}", e))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is another good change (don't panic if there is a thread error while merging) -- maybe worth a mention in the PR title (or maybe even its own PR).
Thanks for the review @alamb. I've addressed the formatting, removed debug printlns, and tests are now passing. |
@nevi-me @paddyhoran PTAL if you have time. |
LGTM! Thanks a lot for this, nice cleanup! I closed #7687 in favor of this one as the overall is too high to salvage. |
…h from HashAggregateExec Instead of walking through the map containing accumulators multiple times (once per grouping expression and once per aggregate expression) let's just walk through it once! Safer and faster. Other changes in this PR: - min and max had the wrong data type. MIN(expr) and MAX(expr) should always have the same data type as the underlying expression. They cannot overflow. - I shortened the name of one of the existing macros so that match statements would fit on a single line. - Improved error handling in MergeExec so that the reason for threads failing is now propagated in an `Err` - Removed unused `MapStruct` struct Closes apache#7936 from andygrove/ARROW-9679 Authored-by: Andy Grove <andygrove73@gmail.com> Signed-off-by: Andy Grove <andygrove73@gmail.com>
Instead of walking through the map containing accumulators multiple times (once per grouping expression and once per aggregate expression) let's just walk through it once! Safer and faster.
Other changes in this PR:
Err
MapStruct
struct