Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

POC: Splitting scalar functions outside Datafusion core #7752

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ members = [
"datafusion-examples",
"test-utils",
"benchmarks",
"extension/scalar-function/test-func",
]
resolver = "2"

Expand Down
1 change: 1 addition & 0 deletions datafusion-examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ datafusion-common = { path = "../datafusion/common" }
datafusion-expr = { path = "../datafusion/expr" }
datafusion-optimizer = { path = "../datafusion/optimizer" }
datafusion-sql = { path = "../datafusion/sql" }
datafusion-extension-test-scalar-func = {path = "../extension/scalar-function/test-func"}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since most of DataFusion is extensions, another potential might be a path structure like the following?

├── core #  (already exists)
├── functions-aggregate-core
├── functions-aggregate-statistics
├── functions-scalar-array
├── functions-scalar-core
├── functions-scalar-crypto
├── functions-scalar-timestamp
├── physical-expr #  (already exists)
...
└── physical-plan #  (already exists)

env_logger = "0.10"
futures = "0.3"
log = "0.4"
Expand Down
43 changes: 43 additions & 0 deletions datafusion-examples/examples/external_function_package.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use datafusion::error::Result;
use datafusion::prelude::*;
use datafusion_extension_test_scalar_func::TestFunctionPackage;

/// This example demonstrates executing a simple query against an Arrow data source (CSV) and
/// fetching results
#[tokio::main]
async fn main() -> Result<()> {
let ctx = SessionContext::new();
let testdata = datafusion::test_util::arrow_test_data();
ctx.register_csv(
"aggregate_test_100",
&format!("{testdata}/csv/aggregate_test_100.csv"),
CsvReadOptions::new(),
)
.await?;

// Register add_one(x), multiply_two(x) function from `TestFunctionPackage`
ctx.register_scalar_function_package(Box::new(TestFunctionPackage));

let df = ctx
.sql("select add_one(1), multiply_two(c3), add_one(multiply_two(c4)) from aggregate_test_100 limit 5").await?;
df.show().await?;

Ok(())
}
28 changes: 27 additions & 1 deletion datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ use datafusion_common::{
use datafusion_execution::registry::SerializerRegistry;
use datafusion_expr::{
logical_plan::{DdlStatement, Statement},
StringifiedPlan, UserDefinedLogicalNode, WindowUDF,
ScalarFunctionDef, ScalarFunctionPackage, StringifiedPlan, UserDefinedLogicalNode,
WindowUDF,
};
pub use datafusion_physical_expr::execution_props::ExecutionProps;
use datafusion_physical_expr::var_provider::is_system_variables;
Expand Down Expand Up @@ -79,6 +80,7 @@ use sqlparser::dialect::dialect_from_str;
use crate::config::ConfigOptions;
use crate::datasource::physical_plan::{plan_to_csv, plan_to_json, plan_to_parquet};
use crate::execution::{runtime_env::RuntimeEnv, FunctionRegistry};
use crate::physical_plan::functions::make_scalar_function;
use crate::physical_plan::udaf::AggregateUDF;
use crate::physical_plan::udf::ScalarUDF;
use crate::physical_plan::ExecutionPlan;
Expand Down Expand Up @@ -792,6 +794,30 @@ impl SessionContext {
.add_var_provider(variable_type, provider);
}

/// Register a function package into this context
pub fn register_scalar_function_package(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we are going to implement a new API, maybe we can have it cover all types of functions (window, aggregate, scalar) so there is a place to add table functions eventually too:

Something like

    pub fn register_function_package(&self, func_pkg: Box<dyn FunctionPackage>)

Also I wonder if it should take Box<dyn FunctionPackage> (which is owned) or Arc<dyn FunctionPackage> which can be shared. Looking at this code, it seems like there is no reason for an owned pointer and we could get away with Arc (so function packages can be quickly registered with multiple SessionContexts)

&self,
func_pkg: Box<dyn ScalarFunctionPackage>,
) {
// Make a `dyn ScalarFunctionDef` into a internal struct for scalar functions, then it can be
// registered into context
pub fn to_scalar_function(func: Box<dyn ScalarFunctionDef>) -> ScalarUDF {
let name = func.name().to_string();
let signature = func.signature();
let return_type = func.return_type();
let func_impl = make_scalar_function(move |args| func.execute(args));

ScalarUDF::new(&name, &signature, &return_type, &func_impl)
}

for func in func_pkg.functions() {
self.state
.write()
.scalar_functions
.insert(func.name().to_string(), Arc::new(to_scalar_function(func)));
}
}

/// Registers a scalar UDF within this context.
///
/// Note in SQL queries, function names are looked up using
Expand Down
2 changes: 1 addition & 1 deletion datafusion/expr/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ pub use partition_evaluator::PartitionEvaluator;
pub use signature::{Signature, TypeSignature, Volatility};
pub use table_source::{TableProviderFilterPushDown, TableSource, TableType};
pub use udaf::AggregateUDF;
pub use udf::ScalarUDF;
pub use udf::{ScalarFunctionDef, ScalarFunctionPackage, ScalarUDF};
pub use udwf::WindowUDF;
pub use window_frame::{WindowFrame, WindowFrameBound, WindowFrameUnits};
pub use window_function::{BuiltInWindowFunction, WindowFunction};
Expand Down
19 changes: 19 additions & 0 deletions datafusion/expr/src/udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,30 @@
//! Udf module contains foundational types that are used to represent UDFs in DataFusion.

use crate::{Expr, ReturnTypeFunction, ScalarFunctionImplementation, Signature};
use arrow::array::ArrayRef;
use datafusion_common::Result;
use std::fmt;
use std::fmt::Debug;
use std::fmt::Formatter;
use std::sync::Arc;

pub trait ScalarFunctionDef: Sync + Send + std::fmt::Debug {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really like using a trait to define functions. To be honest, I am not sure why DataFusion didn't so that originally.

My only concern is that adding this we would now have three ways to define scalar functions (the ScalarUDF, ScalarFunctionDef and BuiltInScalarFunction).

I wonder if this trait is needed, or can we extend ScalarUDF to account for aliases?

Or perhaps should we be aiming to consolidate all functions to use ScalarFuntionDef 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think using a trait to define functions is more clear and easy to use, if extending ScalarUDF, then we must init a struct for each function in an imperative way, it can get messy if there are a lot of function packages to manage.
And trait ScalarFunctionDef should be equivalent to ScalarUDF and BuitInScalarFunction, it's possible to replace all with ScalarFunctionDef in the future
(I also updated a more detailed answer for this concern in the original PR rationale part)

Copy link
Contributor

@alamb alamb Oct 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think using a trait to define functions is more clear and easy to use

Yes, I agree

And trait ScalarFunctionDef should be equivalent to ScalarUDF

Maybe we could do something like

impl SclarFunctionDef for ScalarUDF {
...
}

So all of DataFusion's code was in terms of ScalarFunctionDef.

And then (eventally) deprecate ScalarUDF as part of helping people migrate to using ScalarFunctionUDF 🤔

// TODO: support alias
fn name(&self) -> &str;

fn signature(&self) -> Signature;

// TODO: ReturnTypeFunction -> a ENUM
// most function's return type is either the same as 1st arg or a fixed type
fn return_type(&self) -> ReturnTypeFunction;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the current API requires return_type to be passed the specific argument types

Also, #7657 suggests there is additional room for improvement

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we might mark this API as experimental for now, it can have frequent changes to this API, other unexpected cases like #7657 might show up when porting some trickier functions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like that idea --

maybe the implementation plan sequence could be:

  1. Introduce ScalarFunctionDef
  2. impl ScalarFunctionDef for ScalarUDF and BuiltInFunction
  3. Update the core DataFusion code to only use ScalarFunctionDef
  4. Port all BuiltInFunction to ScalarFunctionDef
  5. Deprecate ScalarFunctionDef
  6. Remove BuiltinFunctionDef

Copy link
Contributor Author

@2010YOUY01 2010YOUY01 Oct 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds great 👍🏼 My plan is
1st PR: impl ScalarFunctionDef for BuiltinScalarFunction and replace the execution code with the new interface. This step makes sure the new interface can cover all functionalities, and determines how the interface looks like
2nd PR: replace current UDF impl with the new interface
3rd and after: start porting existing functions


fn execute(&self, args: &[ArrayRef]) -> Result<ArrayRef>;
}

pub trait ScalarFunctionPackage {
fn functions(&self) -> Vec<Box<dyn ScalarFunctionDef>>;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest using Arc here as I don't see any reason for the to be state / need an owned copy

}

/// Logical representation of a UDF.
#[derive(Clone)]
pub struct ScalarUDF {
Expand Down
20 changes: 20 additions & 0 deletions extension/scalar-function/test-func/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
[package]
name = "datafusion-extension-test-scalar-func"
authors.workspace = true
edition.workspace = true
homepage.workspace = true
license.workspace = true
readme.workspace = true
repository.workspace = true
rust-version.workspace = true
version.workspace = true

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
datafusion = { path = "../../../datafusion/core" }
datafusion-common = { path = "../../../datafusion/common" }
datafusion-expr = { path = "../../../datafusion/expr" }
arrow = { workspace = true }
#arrow-flight = { workspace = true }
#arrow-schema = { workspace = true }
79 changes: 79 additions & 0 deletions extension/scalar-function/test-func/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
use arrow::array::{ArrayRef, Float64Array};
use arrow::datatypes::DataType;
use datafusion::error::Result;
use datafusion::logical_expr::Volatility;
use datafusion_common::cast::as_float64_array;
use datafusion_expr::{ReturnTypeFunction, Signature};
use datafusion_expr::{ScalarFunctionDef, ScalarFunctionPackage};
use std::sync::Arc;

#[derive(Debug)]
pub struct AddOneFunction;

impl ScalarFunctionDef for AddOneFunction {
fn name(&self) -> &str {
"add_one"
}

fn signature(&self) -> Signature {
Signature::exact(vec![DataType::Float64], Volatility::Immutable)
}

fn return_type(&self) -> ReturnTypeFunction {
let return_type = Arc::new(DataType::Float64);
Arc::new(move |_| Ok(return_type.clone()))
}

fn execute(&self, args: &[ArrayRef]) -> Result<ArrayRef> {
assert_eq!(args.len(), 1);
let input = as_float64_array(&args[0]).expect("cast failed");
let array = input
.iter()
.map(|value| match value {
Some(value) => Some(value + 1.0),
_ => None,
})
.collect::<Float64Array>();
Ok(Arc::new(array) as ArrayRef)
}
}

#[derive(Debug)]
pub struct MultiplyTwoFunction;

impl ScalarFunctionDef for MultiplyTwoFunction {
fn name(&self) -> &str {
"multiply_two"
}

fn signature(&self) -> Signature {
Signature::exact(vec![DataType::Float64], Volatility::Immutable)
}

fn return_type(&self) -> ReturnTypeFunction {
let return_type = Arc::new(DataType::Float64);
Arc::new(move |_| Ok(return_type.clone()))
}

fn execute(&self, args: &[ArrayRef]) -> Result<ArrayRef> {
assert_eq!(args.len(), 1);
let input = as_float64_array(&args[0]).expect("cast failed");
let array = input
.iter()
.map(|value| match value {
Some(value) => Some(value * 2.0),
_ => None,
})
.collect::<Float64Array>();
Ok(Arc::new(array) as ArrayRef)
}
}

// Function package declaration
pub struct TestFunctionPackage;

impl ScalarFunctionPackage for TestFunctionPackage {
fn functions(&self) -> Vec<Box<dyn ScalarFunctionDef>> {
vec![Box::new(AddOneFunction), Box::new(MultiplyTwoFunction)]
}
}
Loading