From 901e87e5fa135e56fae6fb6eadba8b73a58883ac Mon Sep 17 00:00:00 2001 From: Athul T R Date: Fri, 6 Sep 2024 12:34:38 +0530 Subject: [PATCH] Removed Arc wrapping for AggregateFunctionExpr --- .../physical_optimizer/update_aggr_exprs.rs | 6 +- datafusion/core/src/physical_planner.rs | 2 +- datafusion/core/src/test_util/mod.rs | 2 +- .../combine_partial_final_agg.rs | 6 +- datafusion/physical-expr/src/aggregate.rs | 16 +-- .../physical-expr/src/window/aggregate.rs | 6 +- .../src/window/sliding_aggregate.rs | 6 +- .../src/combine_partial_final_agg.rs | 2 +- .../physical-plan/src/aggregates/mod.rs | 97 ++++++++++--------- .../physical-plan/src/aggregates/row_hash.rs | 4 +- datafusion/physical-plan/src/windows/mod.rs | 2 +- datafusion/proto/src/physical_plan/mod.rs | 2 +- .../proto/src/physical_plan/to_proto.rs | 6 +- .../tests/cases/roundtrip_physical_plan.rs | 10 +- 14 files changed, 84 insertions(+), 83 deletions(-) diff --git a/datafusion/core/src/physical_optimizer/update_aggr_exprs.rs b/datafusion/core/src/physical_optimizer/update_aggr_exprs.rs index a2726d62e9f6..8b5084c67e42 100644 --- a/datafusion/core/src/physical_optimizer/update_aggr_exprs.rs +++ b/datafusion/core/src/physical_optimizer/update_aggr_exprs.rs @@ -118,7 +118,7 @@ impl PhysicalOptimizerRule for OptimizeAggregateOrder { /// /// # Parameters /// -/// * `aggr_exprs` - A vector of `Arc` representing the +/// * `aggr_exprs` - A vector of `AggregateFunctionExpr` representing the /// aggregate expressions to be optimized. /// * `prefix_requirement` - An array slice representing the ordering /// requirements preceding the aggregate expressions. @@ -131,10 +131,10 @@ impl PhysicalOptimizerRule for OptimizeAggregateOrder { /// successfully. Any errors occurring during the conversion process are /// passed through. fn try_convert_aggregate_if_better( - aggr_exprs: Vec>, + aggr_exprs: Vec, prefix_requirement: &[PhysicalSortRequirement], eq_properties: &EquivalenceProperties, -) -> Result>> { +) -> Result> { aggr_exprs .into_iter() .map(|aggr_expr| { diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 82405dd98e30..cc35255dfe29 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -1541,7 +1541,7 @@ pub fn create_window_expr( } type AggregateExprWithOptionalArgs = ( - Arc, + AggregateFunctionExpr, // The filter clause, if any Option>, // Ordering requirements, if any diff --git a/datafusion/core/src/test_util/mod.rs b/datafusion/core/src/test_util/mod.rs index dd8b697666ee..edccb3844c84 100644 --- a/datafusion/core/src/test_util/mod.rs +++ b/datafusion/core/src/test_util/mod.rs @@ -427,7 +427,7 @@ impl TestAggregate { } /// Return appropriate expr depending if COUNT is for col or table (*) - pub fn count_expr(&self, schema: &Schema) -> Arc { + pub fn count_expr(&self, schema: &Schema) -> AggregateFunctionExpr { AggregateExprBuilder::new(count_udaf(), vec![self.column()]) .schema(Arc::new(schema.clone())) .alias(self.column_name()) diff --git a/datafusion/core/tests/physical_optimizer/combine_partial_final_agg.rs b/datafusion/core/tests/physical_optimizer/combine_partial_final_agg.rs index 5152afa6c050..24e46b3ad97c 100644 --- a/datafusion/core/tests/physical_optimizer/combine_partial_final_agg.rs +++ b/datafusion/core/tests/physical_optimizer/combine_partial_final_agg.rs @@ -84,7 +84,7 @@ fn parquet_exec(schema: &SchemaRef) -> Arc { fn partial_aggregate_exec( input: Arc, group_by: PhysicalGroupBy, - aggr_expr: Vec>, + aggr_expr: Vec, ) -> Arc { let schema = input.schema(); let n_aggr = aggr_expr.len(); @@ -104,7 +104,7 @@ fn partial_aggregate_exec( fn final_aggregate_exec( input: Arc, group_by: PhysicalGroupBy, - aggr_expr: Vec>, + aggr_expr: Vec, ) -> Arc { let schema = input.schema(); let n_aggr = aggr_expr.len(); @@ -130,7 +130,7 @@ fn count_expr( expr: Arc, name: &str, schema: &Schema, -) -> Arc { +) -> AggregateFunctionExpr { AggregateExprBuilder::new(count_udaf(), vec![expr]) .schema(Arc::new(schema.clone())) .alias(name) diff --git a/datafusion/physical-expr/src/aggregate.rs b/datafusion/physical-expr/src/aggregate.rs index d62dc27ece86..866596d0b690 100644 --- a/datafusion/physical-expr/src/aggregate.rs +++ b/datafusion/physical-expr/src/aggregate.rs @@ -88,7 +88,7 @@ impl AggregateExprBuilder { } } - pub fn build(self) -> Result> { + pub fn build(self) -> Result { let Self { fun, args, @@ -132,7 +132,7 @@ impl AggregateExprBuilder { Some(alias) => alias, }; - Ok(Arc::new(AggregateFunctionExpr { + Ok(AggregateFunctionExpr { fun: Arc::unwrap_or_clone(fun), args, data_type, @@ -145,7 +145,7 @@ impl AggregateExprBuilder { input_types: input_exprs_types, is_reversed, is_nullable, - })) + }) } pub fn alias(mut self, alias: impl Into) -> Self { @@ -328,9 +328,9 @@ impl AggregateFunctionExpr { /// not implement the method, returns an error. Order insensitive and hard /// requirement aggregators return `Ok(None)`. pub fn with_beneficial_ordering( - self: Arc, + self, beneficial_ordering: bool, - ) -> Result>> { + ) -> Result> { let Some(updated_fn) = self .fun .clone() @@ -457,10 +457,10 @@ impl AggregateFunctionExpr { /// Typically the "reverse" expression is itself (e.g. SUM, COUNT). /// For aggregates that do not support calculation in reverse, /// returns None (which is the default value). - pub fn reverse_expr(&self) -> Option> { + pub fn reverse_expr(&self) -> Option { match self.fun.reverse_udf() { ReversedUDAF::NotSupported => None, - ReversedUDAF::Identical => Some(Arc::new(self.clone())), + ReversedUDAF::Identical => Some(self.clone()), ReversedUDAF::Reversed(reverse_udf) => { let reverse_ordering_req = reverse_order_bys(&self.ordering_req); let mut name = self.name().to_string(); @@ -507,7 +507,7 @@ impl AggregateFunctionExpr { &self, _args: Vec>, _order_by_exprs: Vec>, - ) -> Option> { + ) -> Option { None } diff --git a/datafusion/physical-expr/src/window/aggregate.rs b/datafusion/physical-expr/src/window/aggregate.rs index 1cc08a4e99aa..d012fef93b67 100644 --- a/datafusion/physical-expr/src/window/aggregate.rs +++ b/datafusion/physical-expr/src/window/aggregate.rs @@ -41,7 +41,7 @@ use crate::{expressions::PhysicalSortExpr, reverse_order_bys, PhysicalExpr}; /// See comments on [`WindowExpr`] for more details. #[derive(Debug)] pub struct PlainAggregateWindowExpr { - aggregate: Arc, + aggregate: AggregateFunctionExpr, partition_by: Vec>, order_by: Vec, window_frame: Arc, @@ -50,7 +50,7 @@ pub struct PlainAggregateWindowExpr { impl PlainAggregateWindowExpr { /// Create a new aggregate window function expression pub fn new( - aggregate: Arc, + aggregate: AggregateFunctionExpr, partition_by: &[Arc], order_by: &[PhysicalSortExpr], window_frame: Arc, @@ -64,7 +64,7 @@ impl PlainAggregateWindowExpr { } /// Get aggregate expr of AggregateWindowExpr - pub fn get_aggregate_expr(&self) -> &Arc { + pub fn get_aggregate_expr(&self) -> &AggregateFunctionExpr { &self.aggregate } } diff --git a/datafusion/physical-expr/src/window/sliding_aggregate.rs b/datafusion/physical-expr/src/window/sliding_aggregate.rs index b3848e15ee42..143d59eb4495 100644 --- a/datafusion/physical-expr/src/window/sliding_aggregate.rs +++ b/datafusion/physical-expr/src/window/sliding_aggregate.rs @@ -41,7 +41,7 @@ use crate::{expressions::PhysicalSortExpr, reverse_order_bys, PhysicalExpr}; /// See comments on [`WindowExpr`] for more details. #[derive(Debug)] pub struct SlidingAggregateWindowExpr { - aggregate: Arc, + aggregate: AggregateFunctionExpr, partition_by: Vec>, order_by: Vec, window_frame: Arc, @@ -50,7 +50,7 @@ pub struct SlidingAggregateWindowExpr { impl SlidingAggregateWindowExpr { /// Create a new (sliding) aggregate window function expression. pub fn new( - aggregate: Arc, + aggregate: AggregateFunctionExpr, partition_by: &[Arc], order_by: &[PhysicalSortExpr], window_frame: Arc, @@ -64,7 +64,7 @@ impl SlidingAggregateWindowExpr { } /// Get the [AggregateFunctionExpr] of this object. - pub fn get_aggregate_expr(&self) -> &Arc { + pub fn get_aggregate_expr(&self) -> &AggregateFunctionExpr { &self.aggregate } } diff --git a/datafusion/physical-optimizer/src/combine_partial_final_agg.rs b/datafusion/physical-optimizer/src/combine_partial_final_agg.rs index 12ff13f8f6ae..67e40c9b507e 100644 --- a/datafusion/physical-optimizer/src/combine_partial_final_agg.rs +++ b/datafusion/physical-optimizer/src/combine_partial_final_agg.rs @@ -125,7 +125,7 @@ impl PhysicalOptimizerRule for CombinePartialFinalAggregate { type GroupExprsRef<'a> = ( &'a PhysicalGroupBy, - &'a [Arc], + &'a [AggregateFunctionExpr], &'a [Option>], ); diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index 375c6421b0d9..2b35852c07b3 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -262,7 +262,7 @@ pub struct AggregateExec { /// Group by expressions group_by: PhysicalGroupBy, /// Aggregate expressions - aggr_expr: Vec>, + aggr_expr: Vec, /// FILTER (WHERE clause) expression for each aggregate expression filter_expr: Vec>>, /// Set if the output of this aggregation is truncated by a upstream sort/limit clause @@ -289,10 +289,7 @@ impl AggregateExec { /// Function used in `OptimizeAggregateOrder` optimizer rule, /// where we need parts of the new value, others cloned from the old one /// Rewrites aggregate exec with new aggregate expressions. - pub fn with_new_aggr_exprs( - &self, - aggr_expr: Vec>, - ) -> Self { + pub fn with_new_aggr_exprs(&self, aggr_expr: Vec) -> Self { Self { aggr_expr, // clone the rest of the fields @@ -318,7 +315,7 @@ impl AggregateExec { pub fn try_new( mode: AggregateMode, group_by: PhysicalGroupBy, - aggr_expr: Vec>, + aggr_expr: Vec, filter_expr: Vec>>, input: Arc, input_schema: SchemaRef, @@ -355,7 +352,7 @@ impl AggregateExec { fn try_new_with_schema( mode: AggregateMode, group_by: PhysicalGroupBy, - mut aggr_expr: Vec>, + mut aggr_expr: Vec, filter_expr: Vec>>, input: Arc, input_schema: SchemaRef, @@ -463,7 +460,7 @@ impl AggregateExec { } /// Aggregate expressions - pub fn aggr_expr(&self) -> &[Arc] { + pub fn aggr_expr(&self) -> &[AggregateFunctionExpr] { &self.aggr_expr } @@ -800,7 +797,7 @@ impl ExecutionPlan for AggregateExec { fn create_schema( input_schema: &Schema, group_expr: &[(Arc, String)], - aggr_expr: &[Arc], + aggr_expr: &[AggregateFunctionExpr], group_expr_nullable: Vec, mode: AggregateMode, ) -> Result { @@ -846,7 +843,7 @@ fn group_schema(schema: &Schema, group_count: usize) -> SchemaRef { /// /// # Parameters /// -/// - `aggr_expr`: A reference to an `Arc` representing the +/// - `aggr_expr`: A reference to an `AggregateFunctionExpr` representing the /// aggregate expression. /// - `group_by`: A reference to a `PhysicalGroupBy` instance representing the /// physical GROUP BY expression. @@ -858,7 +855,7 @@ fn group_schema(schema: &Schema, group_count: usize) -> SchemaRef { /// A `LexOrdering` instance indicating the lexical ordering requirement for /// the aggregate expression. fn get_aggregate_expr_req( - aggr_expr: &Arc, + aggr_expr: &AggregateFunctionExpr, group_by: &PhysicalGroupBy, agg_mode: &AggregateMode, ) -> LexOrdering { @@ -906,7 +903,7 @@ fn get_aggregate_expr_req( /// the aggregator requirement is incompatible. fn finer_ordering( existing_req: &LexOrdering, - aggr_expr: &Arc, + aggr_expr: &AggregateFunctionExpr, group_by: &PhysicalGroupBy, eq_properties: &EquivalenceProperties, agg_mode: &AggregateMode, @@ -924,7 +921,7 @@ pub fn concat_slices(lhs: &[T], rhs: &[T]) -> Vec { /// /// # Parameters /// -/// - `aggr_exprs`: A slice of `Arc` containing all the +/// - `aggr_exprs`: A slice of `AggregateFunctionExpr` containing all the /// aggregate expressions. /// - `group_by`: A reference to a `PhysicalGroupBy` instance representing the /// physical GROUP BY expression. @@ -938,7 +935,7 @@ pub fn concat_slices(lhs: &[T], rhs: &[T]) -> Vec { /// A `LexRequirement` instance, which is the requirement that satisfies all the /// aggregate requirements. Returns an error in case of conflicting requirements. pub fn get_finer_aggregate_exprs_requirement( - aggr_exprs: &mut [Arc], + aggr_exprs: &mut [AggregateFunctionExpr], group_by: &PhysicalGroupBy, eq_properties: &EquivalenceProperties, agg_mode: &AggregateMode, @@ -1012,7 +1009,7 @@ pub fn get_finer_aggregate_exprs_requirement( /// * Partial: AggregateFunctionExpr::expressions /// * Final: columns of `AggregateFunctionExpr::state_fields()` pub fn aggregate_expressions( - aggr_expr: &[Arc], + aggr_expr: &[AggregateFunctionExpr], mode: &AggregateMode, col_idx_base: usize, ) -> Result>>> { @@ -1053,7 +1050,7 @@ pub fn aggregate_expressions( /// `index_base` is the starting physical column index for the next expanded state field. fn merge_expressions( index_base: usize, - expr: &Arc, + expr: &AggregateFunctionExpr, ) -> Result>> { expr.state_fields().map(|fields| { fields @@ -1067,7 +1064,7 @@ fn merge_expressions( pub type AccumulatorItem = Box; pub fn create_accumulators( - aggr_expr: &[Arc], + aggr_expr: &[AggregateFunctionExpr], ) -> Result> { aggr_expr .iter() @@ -1506,12 +1503,13 @@ mod tests { groups: vec![vec![false]], }; - let aggregates: Vec> = vec![ - AggregateExprBuilder::new(avg_udaf(), vec![col("b", &input_schema)?]) - .schema(Arc::clone(&input_schema)) - .alias("AVG(b)") - .build()?, - ]; + let aggregates: Vec = + vec![ + AggregateExprBuilder::new(avg_udaf(), vec![col("b", &input_schema)?]) + .schema(Arc::clone(&input_schema)) + .alias("AVG(b)") + .build()?, + ]; let task_ctx = if spill { // set to an appropriate value to trigger spill @@ -1802,7 +1800,7 @@ mod tests { } // Median(a) - fn test_median_agg_expr(schema: SchemaRef) -> Result> { + fn test_median_agg_expr(schema: SchemaRef) -> Result { AggregateExprBuilder::new(median_udaf(), vec![col("a", &schema)?]) .schema(schema) .alias("MEDIAN(a)") @@ -1828,16 +1826,17 @@ mod tests { }; // something that allocates within the aggregator - let aggregates_v0: Vec> = + let aggregates_v0: Vec = vec![test_median_agg_expr(Arc::clone(&input_schema))?]; // use fast-path in `row_hash.rs`. - let aggregates_v2: Vec> = vec![ - AggregateExprBuilder::new(avg_udaf(), vec![col("b", &input_schema)?]) - .schema(Arc::clone(&input_schema)) - .alias("AVG(b)") - .build()?, - ]; + let aggregates_v2: Vec = + vec![ + AggregateExprBuilder::new(avg_udaf(), vec![col("b", &input_schema)?]) + .schema(Arc::clone(&input_schema)) + .alias("AVG(b)") + .build()?, + ]; for (version, groups, aggregates) in [ (0, groups_none, aggregates_v0), @@ -1891,12 +1890,13 @@ mod tests { let groups = PhysicalGroupBy::default(); - let aggregates: Vec> = vec![ - AggregateExprBuilder::new(avg_udaf(), vec![col("a", &schema)?]) - .schema(Arc::clone(&schema)) - .alias("AVG(a)") - .build()?, - ]; + let aggregates: Vec = + vec![ + AggregateExprBuilder::new(avg_udaf(), vec![col("a", &schema)?]) + .schema(Arc::clone(&schema)) + .alias("AVG(a)") + .build()?, + ]; let blocking_exec = Arc::new(BlockingExec::new(Arc::clone(&schema), 1)); let refs = blocking_exec.refs(); @@ -1930,12 +1930,13 @@ mod tests { let groups = PhysicalGroupBy::new_single(vec![(col("a", &schema)?, "a".to_string())]); - let aggregates: Vec> = vec![ - AggregateExprBuilder::new(avg_udaf(), vec![col("b", &schema)?]) - .schema(Arc::clone(&schema)) - .alias("AVG(b)") - .build()?, - ]; + let aggregates: Vec = + vec![ + AggregateExprBuilder::new(avg_udaf(), vec![col("b", &schema)?]) + .schema(Arc::clone(&schema)) + .alias("AVG(b)") + .build()?, + ]; let blocking_exec = Arc::new(BlockingExec::new(Arc::clone(&schema), 1)); let refs = blocking_exec.refs(); @@ -1980,7 +1981,7 @@ mod tests { fn test_first_value_agg_expr( schema: &Schema, sort_options: SortOptions, - ) -> Result> { + ) -> Result { let ordering_req = [PhysicalSortExpr { expr: col("b", schema)?, options: sort_options, @@ -1998,7 +1999,7 @@ mod tests { fn test_last_value_agg_expr( schema: &Schema, sort_options: SortOptions, - ) -> Result> { + ) -> Result { let ordering_req = [PhysicalSortExpr { expr: col("b", schema)?, options: sort_options, @@ -2053,7 +2054,7 @@ mod tests { descending: false, nulls_first: false, }; - let aggregates: Vec> = if is_first_acc { + let aggregates: Vec = if is_first_acc { vec![test_first_value_agg_expr(&schema, sort_options)?] } else { vec![test_last_value_agg_expr(&schema, sort_options)?] @@ -2218,7 +2219,7 @@ mod tests { }; let groups = PhysicalGroupBy::new_single(vec![(col_a, "a".to_string())]); - let aggregates: Vec> = vec![ + let aggregates: Vec = vec![ test_first_value_agg_expr(&schema, option_desc)?, test_last_value_agg_expr(&schema, option_desc)?, ]; @@ -2276,7 +2277,7 @@ mod tests { ], ); - let aggregates: Vec> = + let aggregates: Vec = vec![AggregateExprBuilder::new(count_udaf(), vec![lit(1)]) .schema(Arc::clone(&schema)) .alias("1") diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs b/datafusion/physical-plan/src/aggregates/row_hash.rs index c38137994d44..fd2d26d9b49e 100644 --- a/datafusion/physical-plan/src/aggregates/row_hash.rs +++ b/datafusion/physical-plan/src/aggregates/row_hash.rs @@ -578,7 +578,7 @@ impl GroupedHashAggregateStream { /// that is supported by the aggregate, or a /// [`GroupsAccumulatorAdapter`] if not. pub(crate) fn create_group_accumulator( - agg_expr: &Arc, + agg_expr: &AggregateFunctionExpr, ) -> Result> { if agg_expr.groups_accumulator_supported() { agg_expr.create_groups_accumulator() @@ -588,7 +588,7 @@ pub(crate) fn create_group_accumulator( "Creating GroupsAccumulatorAdapter for {}: {agg_expr:?}", agg_expr.name() ); - let agg_expr_captured = Arc::clone(agg_expr); + let agg_expr_captured = agg_expr.clone(); let factory = move || agg_expr_captured.create_accumulator(); Ok(Box::new(GroupsAccumulatorAdapter::new(factory))) } diff --git a/datafusion/physical-plan/src/windows/mod.rs b/datafusion/physical-plan/src/windows/mod.rs index 56823e6dec2d..0275cd2441a9 100644 --- a/datafusion/physical-plan/src/windows/mod.rs +++ b/datafusion/physical-plan/src/windows/mod.rs @@ -141,7 +141,7 @@ fn window_expr_from_aggregate_expr( partition_by: &[Arc], order_by: &[PhysicalSortExpr], window_frame: Arc, - aggregate: Arc, + aggregate: AggregateFunctionExpr, ) -> Arc { // Is there a potentially unlimited sized window frame? let unbounded_window = window_frame.start_bound.is_unbounded(); diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index e622af745062..115ac4fd4a7b 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -468,7 +468,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { }) .collect::, _>>()?; - let physical_aggr_expr: Vec> = hash_agg + let physical_aggr_expr: Vec = hash_agg .aggr_expr .iter() .zip(hash_agg.aggr_expr_name.iter()) diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index 25be7de61cc3..6981c77228a8 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -49,7 +49,7 @@ use crate::protobuf::{ use super::PhysicalExtensionCodec; pub fn serialize_physical_aggr_expr( - aggr_expr: Arc, + aggr_expr: AggregateFunctionExpr, codec: &dyn PhysicalExtensionCodec, ) -> Result { let expressions = serialize_physical_exprs(&aggr_expr.expressions(), codec)?; @@ -171,7 +171,7 @@ pub fn serialize_physical_window_expr( expr.downcast_ref::() { serialize_physical_window_aggr_expr( - plain_aggr_window_expr.get_aggregate_expr().as_ref(), + plain_aggr_window_expr.get_aggregate_expr(), window_frame, codec, )? @@ -179,7 +179,7 @@ pub fn serialize_physical_window_expr( expr.downcast_ref::() { serialize_physical_window_aggr_expr( - sliding_aggr_window_expr.get_aggregate_expr().as_ref(), + sliding_aggr_window_expr.get_aggregate_expr(), window_frame, codec, )? diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index b2ded88dfaf4..54f281eeb6f6 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -360,7 +360,7 @@ fn rountrip_aggregate() -> Result<()> { .alias("NTH_VALUE(b, 1)") .build()?; - let test_cases: Vec>> = vec![ + let test_cases: Vec> = vec![ // AVG vec![avg_expr], // NTH_VALUE @@ -393,7 +393,7 @@ fn rountrip_aggregate_with_limit() -> Result<()> { let groups: Vec<(Arc, String)> = vec![(col("a", &schema)?, "unused".to_string())]; - let aggregates: Vec> = + let aggregates: Vec = vec![ AggregateExprBuilder::new(avg_udaf(), vec![col("b", &schema)?]) .schema(Arc::clone(&schema)) @@ -422,7 +422,7 @@ fn rountrip_aggregate_with_approx_pencentile_cont() -> Result<()> { let groups: Vec<(Arc, String)> = vec![(col("a", &schema)?, "unused".to_string())]; - let aggregates: Vec> = vec![AggregateExprBuilder::new( + let aggregates: Vec = vec![AggregateExprBuilder::new( approx_percentile_cont_udaf(), vec![col("b", &schema)?, lit(0.5)], ) @@ -457,7 +457,7 @@ fn rountrip_aggregate_with_sort() -> Result<()> { }, }]; - let aggregates: Vec> = + let aggregates: Vec = vec![ AggregateExprBuilder::new(array_agg_udaf(), vec![col("b", &schema)?]) .schema(Arc::clone(&schema)) @@ -524,7 +524,7 @@ fn roundtrip_aggregate_udaf() -> Result<()> { let groups: Vec<(Arc, String)> = vec![(col("a", &schema)?, "unused".to_string())]; - let aggregates: Vec> = + let aggregates: Vec = vec![ AggregateExprBuilder::new(Arc::new(udaf), vec![col("b", &schema)?]) .schema(Arc::clone(&schema))