diff --git a/datafusion/expr/src/lib.rs b/datafusion/expr/src/lib.rs index 260065f69af9..32eac90c3eec 100644 --- a/datafusion/expr/src/lib.rs +++ b/datafusion/expr/src/lib.rs @@ -92,7 +92,7 @@ pub use sqlparser; pub use table_source::{TableProviderFilterPushDown, TableSource, TableType}; pub use udaf::{AggregateUDF, AggregateUDFImpl, ReversedUDAF}; pub use udf::{ScalarUDF, ScalarUDFImpl}; -pub use udwf::{WindowUDF, WindowUDFImpl}; +pub use udwf::{ReversedUDWF, WindowUDF, WindowUDFImpl}; pub use window_frame::{WindowFrame, WindowFrameBound, WindowFrameUnits}; #[cfg(test)] diff --git a/datafusion/expr/src/udwf.rs b/datafusion/expr/src/udwf.rs index 7cc57523a14d..678a0b62cd9a 100644 --- a/datafusion/expr/src/udwf.rs +++ b/datafusion/expr/src/udwf.rs @@ -172,6 +172,14 @@ impl WindowUDF { pub fn coerce_types(&self, arg_types: &[DataType]) -> Result> { self.inner.coerce_types(arg_types) } + + /// Returns the reversed user-defined window function when the + /// order of evaluation is reversed. + /// + /// See [`WindowUDFImpl::reverse_expr`] for more details. + pub fn reverse_expr(&self) -> ReversedUDWF { + self.inner.reverse_expr() + } } impl From for WindowUDF @@ -351,6 +359,24 @@ pub trait WindowUDFImpl: Debug + Send + Sync { fn coerce_types(&self, _arg_types: &[DataType]) -> Result> { not_impl_err!("Function {} does not implement coerce_types", self.name()) } + + /// Allows customizing the behavior of the user-defined window + /// function when it is evaluated in reverse order. + fn reverse_expr(&self) -> ReversedUDWF { + ReversedUDWF::NotSupported + } +} + +pub enum ReversedUDWF { + /// The result of evaluating the user-defined window function + /// remains identical when reversed. + Identical, + /// A window function which does not support evaluating the result + /// in reverse order. + NotSupported, + /// Customize the user-defined window function for evaluating the + /// result in reverse order. + Reversed(Arc), } impl PartialEq for dyn WindowUDFImpl { diff --git a/datafusion/physical-plan/src/windows/mod.rs b/datafusion/physical-plan/src/windows/mod.rs index 6e1cb8db5f09..b6f34ec69f68 100644 --- a/datafusion/physical-plan/src/windows/mod.rs +++ b/datafusion/physical-plan/src/windows/mod.rs @@ -34,8 +34,8 @@ use datafusion_common::{ exec_datafusion_err, exec_err, DataFusionError, Result, ScalarValue, }; use datafusion_expr::{ - BuiltInWindowFunction, PartitionEvaluator, WindowFrame, WindowFunctionDefinition, - WindowUDF, + BuiltInWindowFunction, PartitionEvaluator, ReversedUDWF, WindowFrame, + WindowFunctionDefinition, WindowUDF, }; use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr}; use datafusion_physical_expr::equivalence::collapse_lex_req; @@ -130,7 +130,7 @@ pub fn create_window_expr( } // TODO: Ordering not supported for Window UDFs yet WindowFunctionDefinition::WindowUDF(fun) => Arc::new(BuiltInWindowExpr::new( - create_udwf_window_expr(fun, args, input_schema, name)?, + create_udwf_window_expr(fun, args, input_schema, name, ignore_nulls)?, partition_by, order_by, window_frame, @@ -329,6 +329,7 @@ fn create_udwf_window_expr( args: &[Arc], input_schema: &Schema, name: String, + ignore_nulls: bool, ) -> Result> { // need to get the types into an owned vec for some reason let input_types: Vec<_> = args @@ -341,6 +342,8 @@ fn create_udwf_window_expr( args: args.to_vec(), input_types, name, + is_reversed: false, + ignore_nulls, })) } @@ -353,6 +356,12 @@ struct WindowUDFExpr { name: String, /// Types of input expressions input_types: Vec, + /// This is set to `true` only if the user-defined window function + /// expression supports evaluation in reverse order, and the + /// evaluation order is reversed. + is_reversed: bool, + /// Set to `true` if `IGNORE NULLS` is defined, `false` otherwise. + ignore_nulls: bool, } impl BuiltInWindowFunctionExpr for WindowUDFExpr { @@ -378,7 +387,18 @@ impl BuiltInWindowFunctionExpr for WindowUDFExpr { } fn reverse_expr(&self) -> Option> { - None + match self.fun.reverse_expr() { + ReversedUDWF::Identical => Some(Arc::new(self.clone())), + ReversedUDWF::NotSupported => None, + ReversedUDWF::Reversed(fun) => Some(Arc::new(WindowUDFExpr { + fun, + args: self.args.clone(), + name: self.name.clone(), + input_types: self.input_types.clone(), + is_reversed: !self.is_reversed, + ignore_nulls: self.ignore_nulls, + })), + } } fn get_result_ordering(&self, schema: &SchemaRef) -> Option {