Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
31d5855
projection exec is updated, get_ordering method is added to physical …
berkaysynnada Aug 10, 2023
5fbed2d
Merge branch 'apache_main' into feature/projection-order-propagation
berkaysynnada Aug 10, 2023
3b7198f
fix after merge
berkaysynnada Aug 10, 2023
6032cef
simplifications
mustafasrepo Aug 11, 2023
d58b8de
Refactor, normalization code
mustafasrepo Aug 14, 2023
f341250
Simplifications
mustafasrepo Aug 14, 2023
07ee951
ScalarFunctionExpr's have a maintain order flag, each func labeled as…
berkaysynnada Aug 15, 2023
f4bfe52
Simplifications
mustafasrepo Aug 17, 2023
9f3878f
Merge branch 'apache_main' into feature/scalar-func-expr-ordering
berkaysynnada Aug 24, 2023
621e88f
fix after merge
berkaysynnada Aug 24, 2023
0fa1dfe
fix the test file location
berkaysynnada Aug 24, 2023
48605ec
Merge branch 'apache_main' into feature/scalar-func-expr-ordering
berkaysynnada Aug 25, 2023
2188bc3
the format of the monotonicity for scalar functions changed
berkaysynnada Aug 25, 2023
f4ccf6b
Update datafusion/physical-expr/src/functions.rs
berkaysynnada Aug 25, 2023
b8b6abd
Update datafusion/physical-expr/src/functions.rs
berkaysynnada Aug 25, 2023
68d0234
Update datafusion/physical-expr/src/functions.rs
berkaysynnada Aug 25, 2023
ea34b49
Update datafusion/physical-expr/src/functions.rs
berkaysynnada Aug 25, 2023
cc4807b
renaming
berkaysynnada Aug 25, 2023
75818ec
Merge branch 'feature/scalar-func-expr-ordering' of https://github.co…
berkaysynnada Aug 25, 2023
0f16321
Merge branch 'apache:main' into feature/scalar-func-expr-ordering
berkaysynnada Aug 31, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 107 additions & 0 deletions datafusion/physical-expr/src/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
//! argument is automatically is coerced to f64.

use crate::execution_props::ExecutionProps;
use crate::sort_properties::SortProperties;
use crate::{
array_expressions, conditional_expressions, datetime_expressions,
expressions::{cast_column, nullif_func},
Expand All @@ -47,6 +48,7 @@ use datafusion_common::{internal_err, DataFusionError, Result, ScalarValue};
use datafusion_expr::{
BuiltinScalarFunction, ColumnarValue, ScalarFunctionImplementation,
};
use std::ops::Neg;
use std::sync::Arc;

/// Create a physical (function) expression.
Expand Down Expand Up @@ -173,11 +175,14 @@ pub fn create_physical_expr(
_ => create_physical_fun(fun, execution_props)?,
};

let monotonicity = get_func_monotonicity(fun);

Ok(Arc::new(ScalarFunctionExpr::new(
&format!("{fun}"),
fun_expr,
input_phy_exprs.to_vec(),
&data_type,
monotonicity,
)))
}

Expand Down Expand Up @@ -879,6 +884,108 @@ pub fn create_physical_fun(
})
}

/// Monotonicity of the `ScalarFunctionExpr` with respect to its arguments.
/// Each element of this vector corresponds to an argument and indicates whether
/// the function's behavior is monotonic, or non-monotonic/unknown for that argument, namely:
/// - `None` signifies unknown monotonicity or non-monotonicity.
/// - `Some(true)` indicates that the function is monotonically increasing w.r.t. the argument in question.
/// - Some(false) indicates that the function is monotonically decreasing w.r.t. the argument in question.
pub type FuncMonotonicity = Vec<Option<bool>>;

/// Determines a [`ScalarFunctionExpr`]'s monotonicity for the given arguments
/// and the function's behavior depending on its arguments.
pub fn out_ordering(
func: &FuncMonotonicity,
arg_orderings: &[SortProperties],
) -> SortProperties {
func.iter().zip(arg_orderings).fold(
SortProperties::Singleton,
|prev_sort, (item, arg)| {
let current_sort = func_order_in_one_dimension(item, arg);

match (prev_sort, current_sort) {
(_, SortProperties::Unordered) => SortProperties::Unordered,
(SortProperties::Singleton, SortProperties::Ordered(_)) => current_sort,
(SortProperties::Ordered(prev), SortProperties::Ordered(current))
if prev.descending != current.descending =>
{
SortProperties::Unordered
}
_ => prev_sort,
}
},
)
}

/// This function decides the monotonicity property of a [`ScalarFunctionExpr`] for a single argument (i.e. across a single dimension), given that argument's sort properties.
fn func_order_in_one_dimension(
func_monotonicity: &Option<bool>,
arg: &SortProperties,
) -> SortProperties {
if *arg == SortProperties::Singleton {
SortProperties::Singleton
} else {
match func_monotonicity {
None => SortProperties::Unordered,
Some(false) => {
if let SortProperties::Ordered(_) = arg {
arg.neg()
} else {
SortProperties::Unordered
}
}
Some(true) => {
if let SortProperties::Ordered(_) = arg {
*arg
} else {
SortProperties::Unordered
}
}
}
}
}

/// This function specifies monotonicity behaviors for built-in scalar functions.
/// The list can be extended, only mathematical and datetime functions are
/// considered for the initial implementation of this feature.
pub fn get_func_monotonicity(fun: &BuiltinScalarFunction) -> Option<FuncMonotonicity> {
if matches!(
fun,
BuiltinScalarFunction::Atan
| BuiltinScalarFunction::Acosh
| BuiltinScalarFunction::Asinh
| BuiltinScalarFunction::Atanh
| BuiltinScalarFunction::Ceil
| BuiltinScalarFunction::Degrees
| BuiltinScalarFunction::Exp
| BuiltinScalarFunction::Factorial
| BuiltinScalarFunction::Floor
| BuiltinScalarFunction::Ln
| BuiltinScalarFunction::Log10
| BuiltinScalarFunction::Log2
| BuiltinScalarFunction::Radians
| BuiltinScalarFunction::Round
| BuiltinScalarFunction::Signum
| BuiltinScalarFunction::Sinh
| BuiltinScalarFunction::Sqrt
| BuiltinScalarFunction::Cbrt
| BuiltinScalarFunction::Tanh
| BuiltinScalarFunction::Trunc
| BuiltinScalarFunction::Pi
) {
Some(vec![Some(true)])
} else if matches!(
fun,
BuiltinScalarFunction::DateTrunc | BuiltinScalarFunction::DateBin
) {
Some(vec![None, Some(true)])
} else if *fun == BuiltinScalarFunction::Log {
Some(vec![Some(true), Some(false)])
} else {
None
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
18 changes: 18 additions & 0 deletions datafusion/physical-expr/src/scalar_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@
//! This module also has a set of coercion rules to improve user experience: if an argument i32 is passed
//! to a function that supports f64, it is coerced to f64.

use crate::functions::out_ordering;
use crate::functions::FuncMonotonicity;
use crate::physical_expr::down_cast_any_ref;
use crate::sort_properties::SortProperties;
use crate::utils::expr_list_eq_strict_order;
use crate::PhysicalExpr;
use arrow::datatypes::{DataType, Schema};
Expand All @@ -51,6 +54,11 @@ pub struct ScalarFunctionExpr {
name: String,
args: Vec<Arc<dyn PhysicalExpr>>,
return_type: DataType,
// Keeps monotonicity information of the function.
// FuncMonotonicity vector is one to one mapped to `args`,
// and it specifies the effect of an increase or decrease in
// the corresponding `arg` to the function value.
monotonicity: Option<FuncMonotonicity>,
}

impl Debug for ScalarFunctionExpr {
Expand All @@ -71,12 +79,14 @@ impl ScalarFunctionExpr {
fun: ScalarFunctionImplementation,
args: Vec<Arc<dyn PhysicalExpr>>,
return_type: &DataType,
monotonicity: Option<FuncMonotonicity>,
) -> Self {
Self {
fun,
name: name.to_owned(),
args,
return_type: return_type.clone(),
monotonicity,
}
}

Expand Down Expand Up @@ -157,6 +167,7 @@ impl PhysicalExpr for ScalarFunctionExpr {
self.fun.clone(),
children,
self.return_type(),
self.monotonicity.clone(),
)))
}

Expand All @@ -167,6 +178,13 @@ impl PhysicalExpr for ScalarFunctionExpr {
self.return_type.hash(&mut s);
// Add `self.fun` when hash is available
}

fn get_ordering(&self, children: &[SortProperties]) -> SortProperties {
self.monotonicity
.as_ref()
.map(|monotonicity| out_ordering(monotonicity, children))
.unwrap_or(SortProperties::Unordered)
}
}

impl PartialEq<dyn Any> for ScalarFunctionExpr {
Expand Down
1 change: 1 addition & 0 deletions datafusion/physical-expr/src/udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,6 @@ pub fn create_physical_expr(
fun.fun.clone(),
input_phy_exprs.to_vec(),
(fun.return_type)(&input_exprs_types)?.as_ref(),
None,
)))
}
2 changes: 2 additions & 0 deletions datafusion/proto/src/physical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,7 @@ pub fn parse_physical_expr(
fun_expr,
args,
&convert_required!(e.return_type)?,
None,
))
}
ExprType::ScalarUdf(e) => {
Expand All @@ -271,6 +272,7 @@ pub fn parse_physical_expr(
scalar_fun,
args,
&convert_required!(e.return_type)?,
None,
))
}
ExprType::LikeExpr(like_expr) => Arc::new(LikeExpr::new(
Expand Down
2 changes: 2 additions & 0 deletions datafusion/proto/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1813,6 +1813,7 @@ mod roundtrip_tests {
fun_expr,
vec![col("a", &schema)?],
&DataType::Int64,
None,
);

let project =
Expand Down Expand Up @@ -1846,6 +1847,7 @@ mod roundtrip_tests {
scalar_fn,
vec![col("a", &schema)?],
&DataType::Int64,
None,
);

let project =
Expand Down
135 changes: 134 additions & 1 deletion datafusion/sqllogictest/test_files/order.slt
Original file line number Diff line number Diff line change
Expand Up @@ -444,4 +444,137 @@ SortPreservingMergeExec: [result@0 ASC NULLS LAST]
------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c], output_ordering=[a@0 ASC NULLS LAST], has_header=true

statement ok
drop table multiple_ordered_table;
drop table multiple_ordered_table;

# Create tables having some ordered columns. In the next step, we will expect to observe that scalar
# functions, such as mathematical functions like atan(), ceil(), sqrt(), or date_time functions
# like date_bin() and date_trunc(), will maintain the order of its argument columns.
statement ok
CREATE EXTERNAL TABLE csv_with_timestamps (
name VARCHAR,
ts TIMESTAMP
)
STORED AS CSV
WITH ORDER (ts ASC NULLS LAST)
LOCATION '../core/tests/data/timestamps.csv';

query TT
EXPLAIN SELECT DATE_BIN(INTERVAL '15 minutes', ts, TIMESTAMP '2022-08-03 14:40:00Z') as db15
FROM csv_with_timestamps
ORDER BY db15;
----
logical_plan
Sort: db15 ASC NULLS LAST
--Projection: date_bin(IntervalMonthDayNano("900000000000"), csv_with_timestamps.ts, TimestampNanosecond(1659537600000000000, None)) AS db15
----TableScan: csv_with_timestamps projection=[ts]
physical_plan
SortPreservingMergeExec: [db15@0 ASC NULLS LAST]
--ProjectionExec: expr=[date_bin(900000000000, ts@0, 1659537600000000000) as db15]
----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/timestamps.csv]]}, projection=[ts], output_ordering=[ts@0 ASC NULLS LAST], has_header=false

query TT
EXPLAIN SELECT DATE_TRUNC('DAY', ts) as dt_day
FROM csv_with_timestamps
ORDER BY dt_day;
----
logical_plan
Sort: dt_day ASC NULLS LAST
--Projection: date_trunc(Utf8("DAY"), csv_with_timestamps.ts) AS dt_day
----TableScan: csv_with_timestamps projection=[ts]
physical_plan
SortPreservingMergeExec: [dt_day@0 ASC NULLS LAST]
--ProjectionExec: expr=[date_trunc(DAY, ts@0) as dt_day]
----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/timestamps.csv]]}, projection=[ts], output_ordering=[ts@0 ASC NULLS LAST], has_header=false

statement ok
drop table csv_with_timestamps;

statement ok
drop table aggregate_test_100;

statement ok
CREATE EXTERNAL TABLE aggregate_test_100 (
c1 VARCHAR NOT NULL,
c2 TINYINT NOT NULL,
c3 SMALLINT NOT NULL,
c4 SMALLINT,
c5 INT,
c6 BIGINT NOT NULL,
c7 SMALLINT NOT NULL,
c8 INT NOT NULL,
c9 BIGINT UNSIGNED NOT NULL,
c10 VARCHAR NOT NULL,
c11 FLOAT NOT NULL,
c12 DOUBLE NOT NULL,
c13 VARCHAR NOT NULL
)
STORED AS CSV
WITH HEADER ROW
WITH ORDER(c11)
WITH ORDER(c12 DESC)
LOCATION '../../testing/data/csv/aggregate_test_100.csv'

query TT
EXPLAIN SELECT ATAN(c11) as atan_c11
FROM aggregate_test_100
ORDER BY atan_c11;
----
logical_plan
Sort: atan_c11 ASC NULLS LAST
--Projection: atan(aggregate_test_100.c11) AS atan_c11
----TableScan: aggregate_test_100 projection=[c11]
physical_plan
SortPreservingMergeExec: [atan_c11@0 ASC NULLS LAST]
--ProjectionExec: expr=[atan(c11@0) as atan_c11]
----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c11], output_ordering=[c11@0 ASC NULLS LAST], has_header=true

query TT
EXPLAIN SELECT CEIL(c11) as ceil_c11
FROM aggregate_test_100
ORDER BY ceil_c11;
----
logical_plan
Sort: ceil_c11 ASC NULLS LAST
--Projection: ceil(aggregate_test_100.c11) AS ceil_c11
----TableScan: aggregate_test_100 projection=[c11]
physical_plan
SortPreservingMergeExec: [ceil_c11@0 ASC NULLS LAST]
--ProjectionExec: expr=[ceil(c11@0) as ceil_c11]
----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c11], output_ordering=[c11@0 ASC NULLS LAST], has_header=true

query TT
EXPLAIN SELECT LOG(c11, c12) as log_c11_base_c12
FROM aggregate_test_100
ORDER BY log_c11_base_c12;
----
logical_plan
Sort: log_c11_base_c12 ASC NULLS LAST
--Projection: log(CAST(aggregate_test_100.c11 AS Float64), aggregate_test_100.c12) AS log_c11_base_c12
----TableScan: aggregate_test_100 projection=[c11, c12]
physical_plan
SortPreservingMergeExec: [log_c11_base_c12@0 ASC NULLS LAST]
--ProjectionExec: expr=[log(CAST(c11@0 AS Float64), c12@1) as log_c11_base_c12]
----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c11, c12], output_ordering=[c11@0 ASC NULLS LAST], has_header=true

query TT
EXPLAIN SELECT LOG(c12, c11) as log_c12_base_c11
FROM aggregate_test_100
ORDER BY log_c12_base_c11 DESC;
----
logical_plan
Sort: log_c12_base_c11 DESC NULLS FIRST
--Projection: log(aggregate_test_100.c12, CAST(aggregate_test_100.c11 AS Float64)) AS log_c12_base_c11
----TableScan: aggregate_test_100 projection=[c11, c12]
physical_plan
SortPreservingMergeExec: [log_c12_base_c11@0 DESC]
--ProjectionExec: expr=[log(c12@1, CAST(c11@0 AS Float64)) as log_c12_base_c11]
----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c11, c12], output_ordering=[c11@0 ASC NULLS LAST], has_header=true

statement ok
drop table aggregate_test_100;