From 8b6ff2d7636d2bcd90324a0747135b51c1b6c3de Mon Sep 17 00:00:00 2001 From: jayzhan211 Date: Fri, 1 Mar 2024 20:55:15 +0800 Subject: [PATCH] cleanup Signed-off-by: jayzhan211 --- datafusion/expr/src/expr_fn.rs | 2 -- datafusion/expr/src/udaf.rs | 19 ++++++++++--------- datafusion/physical-plan/src/udaf.rs | 8 +++----- datafusion/proto/src/physical_plan/mod.rs | 4 +++- 4 files changed, 16 insertions(+), 17 deletions(-) diff --git a/datafusion/expr/src/expr_fn.rs b/datafusion/expr/src/expr_fn.rs index 9d3bea273734..0d74b5e09fa2 100644 --- a/datafusion/expr/src/expr_fn.rs +++ b/datafusion/expr/src/expr_fn.rs @@ -1066,7 +1066,6 @@ pub fn create_udaf( /// Creates a new UDAF with a specific signature, state type and return type. /// The signature and state type must match the `Accumulator's implementation`. -#[allow(clippy::too_many_arguments)] pub fn create_udaf_with_ordering( name: &str, input_type: Vec, @@ -1202,7 +1201,6 @@ impl Debug for SimpleOrderedAggregateUDF { impl SimpleOrderedAggregateUDF { /// Create a new `AggregateUDFImpl` from a name, input types, return type, state type and /// implementation. Implementing [`AggregateUDFImpl`] allows more flexibility - #[allow(clippy::too_many_arguments)] pub fn new( name: impl Into, input_type: Vec, diff --git a/datafusion/expr/src/udaf.rs b/datafusion/expr/src/udaf.rs index b5caf860163d..8119a45e9f0c 100644 --- a/datafusion/expr/src/udaf.rs +++ b/datafusion/expr/src/udaf.rs @@ -160,8 +160,6 @@ impl AggregateUDF { sort_exprs: &[Expr], schema: &Schema, ) -> Result> { - // let sort_exprs = self.inner.sort_exprs(); - // let schema = self.inner.schema(); self.inner.accumulator(return_type, sort_exprs, schema) } @@ -180,10 +178,6 @@ impl AggregateUDF { pub fn create_groups_accumulator(&self) -> Result> { self.inner.create_groups_accumulator() } - - pub fn sort_exprs() -> Vec { - vec![] - } } impl From for AggregateUDF @@ -238,7 +232,7 @@ where /// Ok(DataType::Float64) /// } /// // This is the accumulator factory; DataFusion uses it to create new accumulators. -/// fn accumulator(&self, _arg: &DataType, _sort_exprs: Vec, _schema: Option<&Schema>) -> Result> { unimplemented!() } +/// fn accumulator(&self, _arg: &DataType, _sort_exprs: &[Expr], _schema: &Schema) -> Result> { unimplemented!() } /// fn state_type(&self, _return_type: &DataType) -> Result> { /// Ok(vec![DataType::Float64, DataType::UInt32]) /// } @@ -266,8 +260,15 @@ pub trait AggregateUDFImpl: Debug + Send + Sync { fn return_type(&self, arg_types: &[DataType]) -> Result; /// Return a new [`Accumulator`] that aggregates values for a specific - /// group during query execution. sort_exprs is a list of ordering expressions, - /// and schema is used while ordering. + /// group during query execution. + /// + /// `arg`: the type of the argument to this accumulator + /// + /// `sort_exprs`: contains a list of `Expr::SortExpr`s if the + /// aggregate is called with an explicit `ORDER BY`. For example, + /// `ARRAY_AGG(x ORDER BY y ASC)`. In this case, `sort_exprs` would contain `[y ASC]` + /// + /// `schema` is the input schema to the udaf fn accumulator( &self, arg: &DataType, diff --git a/datafusion/physical-plan/src/udaf.rs b/datafusion/physical-plan/src/udaf.rs index b74a2d971d36..3d15e3d012c5 100644 --- a/datafusion/physical-plan/src/udaf.rs +++ b/datafusion/physical-plan/src/udaf.rs @@ -118,11 +118,9 @@ impl AggregateExpr for AggregateFunctionExpr { } fn create_sliding_accumulator(&self) -> Result> { - let accumulator = self.fun.accumulator( - &self.data_type, - self.sort_exprs.as_slice(), - &self.schema, - )?; + let accumulator = + self.fun + .accumulator(&self.data_type, &self.sort_exprs, &self.schema)?; // Accumulators that have window frame startings different // than `UNBOUNDED PRECEDING`, such as `1 PRECEEDING`, need to diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index 5f2675476a32..0d9975687996 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -471,8 +471,10 @@ impl AsExecutionPlan for PhysicalPlanNode { } AggregateFunction::UserDefinedAggrFunction(udaf_name) => { let agg_udf = registry.udaf(udaf_name)?; + // TODO: `order by` is not supported for UDAF yet + let sort_exprs = &[]; let ordering_req = &[]; - udaf::create_aggregate_expr(agg_udf.as_ref(), &input_phy_expr, &[], ordering_req, &physical_schema, name) + udaf::create_aggregate_expr(agg_udf.as_ref(), &input_phy_expr, sort_exprs, ordering_req, &physical_schema, name) } } }).transpose()?.ok_or_else(|| {