From 6247d359de5edb4672def9a1f9e733d5cd5bdef4 Mon Sep 17 00:00:00 2001 From: jizezhang Date: Mon, 3 Nov 2025 23:27:25 -0800 Subject: [PATCH 1/7] fix: spark array return type mismatch when inner data type is LargeList --- .../spark/src/function/array/spark_array.rs | 20 ++----------------- .../test_files/spark/array/array.slt | 15 ++++++++++++++ 2 files changed, 17 insertions(+), 18 deletions(-) diff --git a/datafusion/spark/src/function/array/spark_array.rs b/datafusion/spark/src/function/array/spark_array.rs index bf5842cb5a5a..f2a43559fdd8 100644 --- a/datafusion/spark/src/function/array/spark_array.rs +++ b/datafusion/spark/src/function/array/spark_array.rs @@ -22,12 +22,12 @@ use arrow::array::{ MutableArrayData, NullArray, OffsetSizeTrait, }; use arrow::buffer::OffsetBuffer; -use arrow::datatypes::{DataType, Field, FieldRef}; +use arrow::datatypes::{DataType, Field}; use datafusion_common::utils::SingleRowListArrayBuilder; use datafusion_common::{plan_datafusion_err, plan_err, Result}; use datafusion_expr::type_coercion::binary::comparison_coercion; use datafusion_expr::{ - ColumnarValue, ReturnFieldArgs, ScalarFunctionArgs, ScalarUDFImpl, Signature, + ColumnarValue,ScalarFunctionArgs, ScalarUDFImpl, Signature, TypeSignature, Volatility, }; @@ -92,21 +92,6 @@ impl ScalarUDFImpl for SparkArray { )))) } - fn return_field_from_args(&self, args: ReturnFieldArgs) -> Result { - let data_types = args - .arg_fields - .iter() - .map(|f| f.data_type()) - .cloned() - .collect::>(); - let return_type = self.return_type(&data_types)?; - Ok(Arc::new(Field::new( - "this_field_name_is_irrelevant", - return_type, - false, - ))) - } - fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { let ScalarFunctionArgs { args, .. } = args; make_scalar_function(make_array_inner)(args.as_slice()) @@ -166,7 +151,6 @@ pub fn make_array_inner(arrays: &[ArrayRef]) -> Result { .build_list_array(), )) } - DataType::LargeList(..) => array_array::(arrays, data_type), _ => array_array::(arrays, data_type), } } diff --git a/datafusion/sqllogictest/test_files/spark/array/array.slt b/datafusion/sqllogictest/test_files/spark/array/array.slt index 09821e6d582d..79dca1c10a7d 100644 --- a/datafusion/sqllogictest/test_files/spark/array/array.slt +++ b/datafusion/sqllogictest/test_files/spark/array/array.slt @@ -70,3 +70,18 @@ query ? SELECT array(array(1,2)); ---- [[1, 2]] + +query ? +SELECT array(arrow_cast(array(1), 'LargeList(Int64)')); +---- +[[1]] + +query ? +SELECT array(arrow_cast(array(1), 'LargeList(Int64)'), arrow_cast(array(), 'LargeList(Int64)')); +---- +[[1], []] + +query ? +SELECT array(arrow_cast(array(1,2), 'LargeList(Int64)'), array(3)); +---- +[[1, 2], [3]] From 4e1a93dc52502e58914c5345d4c5c90e214300b3 Mon Sep 17 00:00:00 2001 From: jizezhang Date: Tue, 4 Nov 2025 07:22:44 -0800 Subject: [PATCH 2/7] formatting --- datafusion/spark/src/function/array/spark_array.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/spark/src/function/array/spark_array.rs b/datafusion/spark/src/function/array/spark_array.rs index f2a43559fdd8..0b71bb963082 100644 --- a/datafusion/spark/src/function/array/spark_array.rs +++ b/datafusion/spark/src/function/array/spark_array.rs @@ -27,8 +27,8 @@ use datafusion_common::utils::SingleRowListArrayBuilder; use datafusion_common::{plan_datafusion_err, plan_err, Result}; use datafusion_expr::type_coercion::binary::comparison_coercion; use datafusion_expr::{ - ColumnarValue,ScalarFunctionArgs, ScalarUDFImpl, Signature, - TypeSignature, Volatility, + ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, TypeSignature, + Volatility, }; use crate::function::functions_nested_utils::make_scalar_function; From 9ff0bb9be04ecf65b3c5cdf1c7c9b18a596ef8f9 Mon Sep 17 00:00:00 2001 From: Michael Kleen Date: Tue, 4 Nov 2025 19:13:25 +0100 Subject: [PATCH 3/7] Move generate_series projection logic into LazyMemoryStream (#18373) ## Which issue does this PR close? - None, This is a follow-up for https://github.com/apache/datafusion/pull/18298 ## Rationale for this change This moves the projection logic from `generate_series` out of the generator into `LazyMemoryStream` as discussed in https://github.com/apache/datafusion/pull/18298#discussion_r2465670378 This makes the projection logic generic for all generators. ## What changes are included in this PR? The projection logic is moved from `generate_series` into the `LazyMemoryStream` and relevant tests, where `LazyMemoryStream` is used, are adapted accordingly. ## Are these changes tested? This is only a small refactoring; the changes are covered by the tests from https://github.com/apache/datafusion/pull/18298 ## Are there any user-facing changes? There is a new parameter added to LazyMemoryExec::try_new method --- .../functions-table/src/generate_series.rs | 23 ++++---------- datafusion/physical-plan/src/memory.rs | 30 ++++++++++++++++++- datafusion/proto/src/physical_plan/mod.rs | 3 +- 3 files changed, 36 insertions(+), 20 deletions(-) diff --git a/datafusion/functions-table/src/generate_series.rs b/datafusion/functions-table/src/generate_series.rs index c66e652147eb..d71c5945aafc 100644 --- a/datafusion/functions-table/src/generate_series.rs +++ b/datafusion/functions-table/src/generate_series.rs @@ -237,7 +237,6 @@ impl GenerateSeriesTable { pub fn as_generator( &self, batch_size: usize, - projection: Option>, ) -> Result>> { let generator: Arc> = match &self.args { GenSeriesArgs::ContainsNull { name } => Arc::new(RwLock::new(Empty { name })), @@ -256,7 +255,6 @@ impl GenerateSeriesTable { batch_size, include_end: *include_end, name, - projection, })), GenSeriesArgs::TimestampArgs { start, @@ -297,7 +295,6 @@ impl GenerateSeriesTable { batch_size, include_end: *include_end, name, - projection, })) } GenSeriesArgs::DateArgs { @@ -327,7 +324,6 @@ impl GenerateSeriesTable { batch_size, include_end: *include_end, name, - projection, })), }; @@ -345,7 +341,6 @@ pub struct GenericSeriesState { current: T, include_end: bool, name: &'static str, - projection: Option>, } impl GenericSeriesState { @@ -401,11 +396,7 @@ impl LazyBatchGenerator for GenericSeriesState { let array = self.current.create_array(buf)?; let batch = RecordBatch::try_new(Arc::clone(&self.schema), vec![array])?; - let projected = match self.projection.as_ref() { - Some(projection) => batch.project(projection)?, - None => batch, - }; - Ok(Some(projected)) + Ok(Some(batch)) } } @@ -481,14 +472,12 @@ impl TableProvider for GenerateSeriesTable { _limit: Option, ) -> Result> { let batch_size = state.config_options().execution.batch_size; - let schema = match projection { - Some(projection) => Arc::new(self.schema.project(projection)?), - None => self.schema(), - }; - - let generator = self.as_generator(batch_size, projection.cloned())?; + let generator = self.as_generator(batch_size)?; - Ok(Arc::new(LazyMemoryExec::try_new(schema, vec![generator])?)) + Ok(Arc::new( + LazyMemoryExec::try_new(self.schema(), vec![generator])? + .with_projection(projection.cloned()), + )) } } diff --git a/datafusion/physical-plan/src/memory.rs b/datafusion/physical-plan/src/memory.rs index 1bf1e04efb53..09710ae1e2ed 100644 --- a/datafusion/physical-plan/src/memory.rs +++ b/datafusion/physical-plan/src/memory.rs @@ -153,6 +153,8 @@ pub trait LazyBatchGenerator: Send + Sync + fmt::Debug + fmt::Display { pub struct LazyMemoryExec { /// Schema representing the data schema: SchemaRef, + /// Optional projection for which columns to load + projection: Option>, /// Functions to generate batches for each partition batch_generators: Vec>>, /// Plan properties cache storing equivalence properties, partitioning, and execution mode @@ -199,12 +201,28 @@ impl LazyMemoryExec { Ok(Self { schema, + projection: None, batch_generators: generators, cache, metrics: ExecutionPlanMetricsSet::new(), }) } + pub fn with_projection(mut self, projection: Option>) -> Self { + match projection.as_ref() { + Some(columns) => { + let projected = Arc::new(self.schema.project(columns).unwrap()); + self.cache = self.cache.with_eq_properties(EquivalenceProperties::new( + Arc::clone(&projected), + )); + self.schema = projected; + self.projection = projection; + self + } + _ => self, + } + } + pub fn try_set_partitioning(&mut self, partitioning: Partitioning) -> Result<()> { if partitioning.partition_count() != self.batch_generators.len() { internal_err!( @@ -320,6 +338,7 @@ impl ExecutionPlan for LazyMemoryExec { let stream = LazyMemoryStream { schema: Arc::clone(&self.schema), + projection: self.projection.clone(), generator: Arc::clone(&self.batch_generators[partition]), baseline_metrics, }; @@ -338,6 +357,8 @@ impl ExecutionPlan for LazyMemoryExec { /// Stream that generates record batches on demand pub struct LazyMemoryStream { schema: SchemaRef, + /// Optional projection for which columns to load + projection: Option>, /// Generator to produce batches /// /// Note: Idiomatically, DataFusion uses plan-time parallelism - each stream @@ -361,7 +382,14 @@ impl Stream for LazyMemoryStream { let batch = self.generator.write().generate_next_batch(); let poll = match batch { - Ok(Some(batch)) => Poll::Ready(Some(Ok(batch))), + Ok(Some(batch)) => { + // return just the columns requested + let batch = match self.projection.as_ref() { + Some(columns) => batch.project(columns)?, + None => batch, + }; + Poll::Ready(Some(Ok(batch))) + } Ok(None) => Poll::Ready(None), Err(e) => Poll::Ready(Some(Err(e))), }; diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index 0ebbb373f2d1..e5f4a1f7d026 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -1940,8 +1940,7 @@ impl protobuf::PhysicalPlanNode { }; let table = GenerateSeriesTable::new(Arc::clone(&schema), args); - let generator = - table.as_generator(generate_series.target_batch_size as usize, None)?; + let generator = table.as_generator(generate_series.target_batch_size as usize)?; Ok(Arc::new(LazyMemoryExec::try_new(schema, vec![generator])?)) } From 312d5c8368a9394fd8144f7a27126af1266ccfd7 Mon Sep 17 00:00:00 2001 From: Sergey Zhukov <62326549+cj-zhukov@users.noreply.github.com> Date: Tue, 4 Nov 2025 22:18:33 +0300 Subject: [PATCH 4/7] Consolidate flight examples (#18142) (#18442) ## Which issue does this PR close? - part of #https://github.com/apache/datafusion/issues/18142. ## Rationale for this change As discussed in https://github.com/apache/datafusion/pull/18289 this PR is for consolidating all the `flight` examples into a single example binary. Then we can make sure we are agreed on the pattern and then we can apply it to the remaining examples ## What changes are included in this PR? ## Are these changes tested? ## Are there any user-facing changes? --------- Co-authored-by: Sergey Zhukov Co-authored-by: Andrew Lamb --- datafusion-examples/Cargo.toml | 12 --- datafusion-examples/README.md | 4 +- .../flight/{flight_client.rs => client.rs} | 3 +- datafusion-examples/examples/flight/main.rs | 94 +++++++++++++++++++ .../flight/{flight_server.rs => server.rs} | 3 +- .../{flight_sql_server.rs => sql_server.rs} | 3 +- 6 files changed, 99 insertions(+), 20 deletions(-) rename datafusion-examples/examples/flight/{flight_client.rs => client.rs} (97%) create mode 100644 datafusion-examples/examples/flight/main.rs rename datafusion-examples/examples/flight/{flight_server.rs => server.rs} (99%) rename datafusion-examples/examples/flight/{flight_sql_server.rs => sql_server.rs} (99%) diff --git a/datafusion-examples/Cargo.toml b/datafusion-examples/Cargo.toml index bb0525e57753..0ec410ecc6b2 100644 --- a/datafusion-examples/Cargo.toml +++ b/datafusion-examples/Cargo.toml @@ -32,18 +32,6 @@ rust-version = { workspace = true } [lints] workspace = true -[[example]] -name = "flight_sql_server" -path = "examples/flight/flight_sql_server.rs" - -[[example]] -name = "flight_server" -path = "examples/flight/flight_server.rs" - -[[example]] -name = "flight_client" -path = "examples/flight/flight_client.rs" - [[example]] name = "dataframe_to_s3" path = "examples/external_dependency/dataframe-to-s3.rs" diff --git a/datafusion-examples/README.md b/datafusion-examples/README.md index f1bcbcce8200..f6783a643f76 100644 --- a/datafusion-examples/README.md +++ b/datafusion-examples/README.md @@ -65,7 +65,7 @@ cargo run --example dataframe - [`deserialize_to_struct.rs`](examples/deserialize_to_struct.rs): Convert query results (Arrow ArrayRefs) into Rust structs - [`expr_api.rs`](examples/expr_api.rs): Create, execute, simplify, analyze and coerce `Expr`s - [`file_stream_provider.rs`](examples/file_stream_provider.rs): Run a query on `FileStreamProvider` which implements `StreamProvider` for reading and writing to arbitrary stream sources / sinks. -- [`flight_sql_server.rs`](examples/flight/flight_sql_server.rs): Run DataFusion as a standalone process and execute SQL queries from JDBC clients +- [`flight/sql_server.rs`](examples/flight/sql_server.rs): Run DataFusion as a standalone process and execute SQL queries from Flight and and FlightSQL (e.g. JDBC) clients - [`function_factory.rs`](examples/function_factory.rs): Register `CREATE FUNCTION` handler to implement SQL macros - [`memory_pool_tracking.rs`](examples/memory_pool_tracking.rs): Demonstrates TrackConsumersPool for memory tracking and debugging with enhanced error messages - [`memory_pool_execution_plan.rs`](examples/memory_pool_execution_plan.rs): Shows how to implement memory-aware ExecutionPlan with memory reservation and spilling @@ -94,4 +94,4 @@ cargo run --example dataframe ## Distributed -- [`flight_client.rs`](examples/flight/flight_client.rs) and [`flight_server.rs`](examples/flight/flight_server.rs): Run DataFusion as a standalone process and execute SQL queries from a client using the Flight protocol. +- [`examples/flight/client.rs`](examples/flight/client.rs) and [`examples/flight/server.rs`](examples/flight/server.rs): Run DataFusion as a standalone process and execute SQL queries from a client using the Arrow Flight protocol. diff --git a/datafusion-examples/examples/flight/flight_client.rs b/datafusion-examples/examples/flight/client.rs similarity index 97% rename from datafusion-examples/examples/flight/flight_client.rs rename to datafusion-examples/examples/flight/client.rs index ff4b5903ad88..031beea47d57 100644 --- a/datafusion-examples/examples/flight/flight_client.rs +++ b/datafusion-examples/examples/flight/client.rs @@ -30,8 +30,7 @@ use datafusion::arrow::util::pretty; /// This example shows how to wrap DataFusion with `FlightService` to support looking up schema information for /// Parquet files and executing SQL queries against them on a remote server. /// This example is run along-side the example `flight_server`. -#[tokio::main] -async fn main() -> Result<(), Box> { +pub async fn client() -> Result<(), Box> { let testdata = datafusion::test_util::parquet_test_data(); // Create Flight client diff --git a/datafusion-examples/examples/flight/main.rs b/datafusion-examples/examples/flight/main.rs new file mode 100644 index 000000000000..a448789b353b --- /dev/null +++ b/datafusion-examples/examples/flight/main.rs @@ -0,0 +1,94 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! # Arrow Flight Examples +//! +//! These examples demonstrate Arrow Flight usage. +//! +//! Each subcommand runs a corresponding example: +//! - `client` — run DataFusion as a standalone process and execute SQL queries from a client using the Flight protocol +//! - `server` — run DataFusion as a standalone process and execute SQL queries from a client using the Flight protocol +//! - `sql_server` — run DataFusion as a standalone process and execute SQL queries from JDBC clients + +mod client; +mod server; +mod sql_server; + +use std::str::FromStr; + +use datafusion::error::{DataFusionError, Result}; + +enum ExampleKind { + Client, + Server, + SqlServer, +} + +impl AsRef for ExampleKind { + fn as_ref(&self) -> &str { + match self { + Self::Client => "client", + Self::Server => "server", + Self::SqlServer => "sql_server", + } + } +} + +impl FromStr for ExampleKind { + type Err = DataFusionError; + + fn from_str(s: &str) -> Result { + match s { + "client" => Ok(Self::Client), + "server" => Ok(Self::Server), + "sql_server" => Ok(Self::SqlServer), + _ => Err(DataFusionError::Execution(format!("Unknown example: {s}"))), + } + } +} + +impl ExampleKind { + const ALL: [Self; 3] = [Self::Client, Self::Server, Self::SqlServer]; + + const EXAMPLE_NAME: &str = "flight"; + + fn variants() -> Vec<&'static str> { + Self::ALL.iter().map(|x| x.as_ref()).collect() + } +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + let usage = format!( + "Usage: cargo run --example {} -- [{}]", + ExampleKind::EXAMPLE_NAME, + ExampleKind::variants().join("|") + ); + + let arg = std::env::args().nth(1).ok_or_else(|| { + eprintln!("{usage}"); + DataFusionError::Execution("Missing argument".to_string()) + })?; + + match arg.parse::()? { + ExampleKind::Client => client::client().await?, + ExampleKind::Server => server::server().await?, + ExampleKind::SqlServer => sql_server::sql_server().await?, + } + + Ok(()) +} diff --git a/datafusion-examples/examples/flight/flight_server.rs b/datafusion-examples/examples/flight/server.rs similarity index 99% rename from datafusion-examples/examples/flight/flight_server.rs rename to datafusion-examples/examples/flight/server.rs index 22265e415fbd..dc75287cf2e2 100644 --- a/datafusion-examples/examples/flight/flight_server.rs +++ b/datafusion-examples/examples/flight/server.rs @@ -194,8 +194,7 @@ fn to_tonic_err(e: datafusion::error::DataFusionError) -> Status { /// This example shows how to wrap DataFusion with `FlightService` to support looking up schema information for /// Parquet files and executing SQL queries against them on a remote server. /// This example is run along-side the example `flight_client`. -#[tokio::main] -async fn main() -> Result<(), Box> { +pub async fn server() -> Result<(), Box> { let addr = "0.0.0.0:50051".parse()?; let service = FlightServiceImpl {}; diff --git a/datafusion-examples/examples/flight/flight_sql_server.rs b/datafusion-examples/examples/flight/sql_server.rs similarity index 99% rename from datafusion-examples/examples/flight/flight_sql_server.rs rename to datafusion-examples/examples/flight/sql_server.rs index c35debec7d71..fc7d0817bd5f 100644 --- a/datafusion-examples/examples/flight/flight_sql_server.rs +++ b/datafusion-examples/examples/flight/sql_server.rs @@ -69,8 +69,7 @@ macro_rules! status { /// Based heavily on Ballista's implementation: https://github.com/apache/datafusion-ballista/blob/main/ballista/scheduler/src/flight_sql.rs /// and the example in arrow-rs: https://github.com/apache/arrow-rs/blob/master/arrow-flight/examples/flight_sql_server.rs /// -#[tokio::main] -async fn main() -> Result<(), Box> { +pub async fn sql_server() -> Result<(), Box> { env_logger::init(); let addr = "0.0.0.0:50051".parse()?; let service = FlightSqlServiceImpl { From 215174103cb3d072e06b49322020af136d30ec82 Mon Sep 17 00:00:00 2001 From: bubulalabu Date: Wed, 5 Nov 2025 02:46:09 +0100 Subject: [PATCH 5/7] feat: support named arguments for aggregate and window udfs (#18389) ## Which issue does this PR close? Addresses portions of https://github.com/apache/datafusion/issues/17379. ## Rationale for this change Add support for aggregate and window UDFs in the same way as we did it for scalar UDFs here: https://github.com/apache/datafusion/pull/18019 ## Are these changes tested? Yes ## Are there any user-facing changes? Yes, the changes are user-facing, documented, purely additive and non-breaking. --- .../functions-aggregate/src/correlation.rs | 4 +- .../src/percentile_cont.rs | 4 +- datafusion/functions-window/src/lead_lag.rs | 8 +- datafusion/sql/src/expr/function.rs | 56 +++++++- .../test_files/named_arguments.slt | 132 ++++++++++++++++++ .../functions/adding-udfs.md | 48 ++----- 6 files changed, 210 insertions(+), 42 deletions(-) diff --git a/datafusion/functions-aggregate/src/correlation.rs b/datafusion/functions-aggregate/src/correlation.rs index 20f23662cade..f2a464de4155 100644 --- a/datafusion/functions-aggregate/src/correlation.rs +++ b/datafusion/functions-aggregate/src/correlation.rs @@ -88,7 +88,9 @@ impl Correlation { signature: Signature::exact( vec![DataType::Float64, DataType::Float64], Volatility::Immutable, - ), + ) + .with_parameter_names(vec!["y".to_string(), "x".to_string()]) + .expect("valid parameter names for corr"), } } } diff --git a/datafusion/functions-aggregate/src/percentile_cont.rs b/datafusion/functions-aggregate/src/percentile_cont.rs index 7ef0f8baf08d..1e06461e569f 100644 --- a/datafusion/functions-aggregate/src/percentile_cont.rs +++ b/datafusion/functions-aggregate/src/percentile_cont.rs @@ -146,7 +146,9 @@ impl PercentileCont { variants.push(TypeSignature::Exact(vec![num.clone(), DataType::Float64])); } Self { - signature: Signature::one_of(variants, Volatility::Immutable), + signature: Signature::one_of(variants, Volatility::Immutable) + .with_parameter_names(vec!["expr".to_string(), "percentile".to_string()]) + .expect("valid parameter names for percentile_cont"), aliases: vec![String::from("quantile_cont")], } } diff --git a/datafusion/functions-window/src/lead_lag.rs b/datafusion/functions-window/src/lead_lag.rs index 3910a0be574d..02d7fc290b32 100644 --- a/datafusion/functions-window/src/lead_lag.rs +++ b/datafusion/functions-window/src/lead_lag.rs @@ -137,7 +137,13 @@ impl WindowShift { TypeSignature::Any(3), ], Volatility::Immutable, - ), + ) + .with_parameter_names(vec![ + "expr".to_string(), + "offset".to_string(), + "default".to_string(), + ]) + .expect("valid parameter names for lead/lag"), kind, } } diff --git a/datafusion/sql/src/expr/function.rs b/datafusion/sql/src/expr/function.rs index 2d20aaf52358..50e479af3620 100644 --- a/datafusion/sql/src/expr/function.rs +++ b/datafusion/sql/src/expr/function.rs @@ -386,7 +386,30 @@ impl SqlToRel<'_, S> { }; if let Ok(fun) = self.find_window_func(&name) { - let args = self.function_args_to_expr(args, schema, planner_context)?; + let (args, arg_names) = + self.function_args_to_expr_with_names(args, schema, planner_context)?; + + let resolved_args = if arg_names.iter().any(|name| name.is_some()) { + let signature = match &fun { + WindowFunctionDefinition::AggregateUDF(udaf) => udaf.signature(), + WindowFunctionDefinition::WindowUDF(udwf) => udwf.signature(), + }; + + if let Some(param_names) = &signature.parameter_names { + datafusion_expr::arguments::resolve_function_arguments( + param_names, + args, + arg_names, + )? + } else { + return plan_err!( + "Window function '{}' does not support named arguments", + name + ); + } + } else { + args + }; // Plan FILTER clause if present let filter = filter @@ -396,7 +419,7 @@ impl SqlToRel<'_, S> { let mut window_expr = RawWindowExpr { func_def: fun, - args, + args: resolved_args, partition_by, order_by, window_frame, @@ -464,8 +487,8 @@ impl SqlToRel<'_, S> { ); } - let mut args = - self.function_args_to_expr(args, schema, planner_context)?; + let (mut args, mut arg_names) = + self.function_args_to_expr_with_names(args, schema, planner_context)?; let order_by = if fm.supports_within_group_clause() { let within_group = self.order_by_to_sort_expr( @@ -479,6 +502,12 @@ impl SqlToRel<'_, S> { // Add the WITHIN GROUP ordering expressions to the front of the argument list // So function(arg) WITHIN GROUP (ORDER BY x) becomes function(x, arg) if !within_group.is_empty() { + // Prepend None arg names for each WITHIN GROUP expression + let within_group_count = within_group.len(); + arg_names = std::iter::repeat_n(None, within_group_count) + .chain(arg_names) + .collect(); + args = within_group .iter() .map(|sort| sort.expr.clone()) @@ -506,9 +535,26 @@ impl SqlToRel<'_, S> { .transpose()? .map(Box::new); + let resolved_args = if arg_names.iter().any(|name| name.is_some()) { + if let Some(param_names) = &fm.signature().parameter_names { + datafusion_expr::arguments::resolve_function_arguments( + param_names, + args, + arg_names, + )? + } else { + return plan_err!( + "Aggregate function '{}' does not support named arguments", + fm.name() + ); + } + } else { + args + }; + let mut aggregate_expr = RawAggregateExpr { func: fm, - args, + args: resolved_args, distinct, filter, order_by, diff --git a/datafusion/sqllogictest/test_files/named_arguments.slt b/datafusion/sqllogictest/test_files/named_arguments.slt index c93da7e7a8f9..4eab799fd261 100644 --- a/datafusion/sqllogictest/test_files/named_arguments.slt +++ b/datafusion/sqllogictest/test_files/named_arguments.slt @@ -137,3 +137,135 @@ SELECT substr(str => 'hello world', start_pos => 7, length => 5); # Reset to default dialect statement ok set datafusion.sql_parser.dialect = 'Generic'; + +############# +## Aggregate UDF Tests - using corr(y, x) function +############# + +# Setup test data +statement ok +CREATE TABLE correlation_test(col1 DOUBLE, col2 DOUBLE) AS VALUES + (1.0, 2.0), + (2.0, 4.0), + (3.0, 6.0), + (4.0, 8.0); + +# Test positional arguments (baseline) +query R +SELECT corr(col1, col2) FROM correlation_test; +---- +1 + +# Test named arguments out of order (proves named args work for aggregates) +query R +SELECT corr(x => col2, y => col1) FROM correlation_test; +---- +1 + +# Error: function doesn't support named arguments (count has no parameter names) +query error DataFusion error: Error during planning: Aggregate function 'count' does not support named arguments +SELECT count(value => col1) FROM correlation_test; + +# Cleanup +statement ok +DROP TABLE correlation_test; + +############# +## Aggregate UDF with WITHIN GROUP Tests - using percentile_cont(expression, percentile) +## This tests the special handling where WITHIN GROUP ORDER BY expressions are prepended to args +############# + +# Setup test data +statement ok +CREATE TABLE percentile_test(salary DOUBLE) AS VALUES + (50000.0), + (60000.0), + (70000.0), + (80000.0), + (90000.0); + +# Test positional arguments (baseline) - standard call without WITHIN GROUP +query R +SELECT percentile_cont(salary, 0.5) FROM percentile_test; +---- +70000 + +# Test WITHIN GROUP with positional argument +query R +SELECT percentile_cont(0.5) WITHIN GROUP (ORDER BY salary) FROM percentile_test; +---- +70000 + +# Test WITHIN GROUP with named argument for percentile +# The ORDER BY expression (salary) is prepended internally, becoming: percentile_cont(salary, 0.5) +# We use named argument for percentile, which should work correctly +query R +SELECT percentile_cont(percentile => 0.5) WITHIN GROUP (ORDER BY salary) FROM percentile_test; +---- +70000 + +# Verify the WITHIN GROUP prepending logic with different percentile value +query R +SELECT percentile_cont(percentile => 0.25) WITHIN GROUP (ORDER BY salary) FROM percentile_test; +---- +60000 + +# Cleanup +statement ok +DROP TABLE percentile_test; + +############# +## Window UDF Tests - using lead(expression, offset, default) function +############# + +# Setup test data +statement ok +CREATE TABLE window_test(id INT, value INT) AS VALUES + (1, 10), + (2, 20), + (3, 30), + (4, 40); + +# Test positional arguments (baseline) +query II +SELECT id, lead(value, 1, 0) OVER (ORDER BY id) FROM window_test ORDER BY id; +---- +1 20 +2 30 +3 40 +4 0 + +# Test named arguments out of order (proves named args work for window functions) +query II +SELECT id, lead(default => 0, offset => 1, expr => value) OVER (ORDER BY id) FROM window_test ORDER BY id; +---- +1 20 +2 30 +3 40 +4 0 + +# Test with 1 argument (offset and default use defaults) +query II +SELECT id, lead(expr => value) OVER (ORDER BY id) FROM window_test ORDER BY id; +---- +1 20 +2 30 +3 40 +4 NULL + +# Test with 2 arguments (default uses default) +query II +SELECT id, lead(expr => value, offset => 2) OVER (ORDER BY id) FROM window_test ORDER BY id; +---- +1 30 +2 40 +3 NULL +4 NULL + +# Error: function doesn't support named arguments (row_number has no parameter names) +query error DataFusion error: Error during planning: Window function 'row_number' does not support named arguments +SELECT row_number(value => 1) OVER (ORDER BY id) FROM window_test; + +# Cleanup +statement ok +DROP TABLE window_test; diff --git a/docs/source/library-user-guide/functions/adding-udfs.md b/docs/source/library-user-guide/functions/adding-udfs.md index 7581d8b6505e..e56790a4b7d8 100644 --- a/docs/source/library-user-guide/functions/adding-udfs.md +++ b/docs/source/library-user-guide/functions/adding-udfs.md @@ -588,10 +588,17 @@ For async UDF implementation details, see [`async_udf.rs`](https://github.com/ap ## Named Arguments -DataFusion supports PostgreSQL-style named arguments for scalar functions, allowing you to pass arguments by parameter name: +DataFusion supports named arguments for Scalar, Window, and Aggregate UDFs, allowing you to pass arguments by parameter name: ```sql +-- Scalar function SELECT substr(str => 'hello', start_pos => 2, length => 3); + +-- Window function +SELECT lead(expr => value, offset => 1) OVER (ORDER BY id) FROM table; + +-- Aggregate function +SELECT corr(y => col1, x => col2) FROM table; ``` Named arguments can be mixed with positional arguments, but positional arguments must come first: @@ -602,38 +609,7 @@ SELECT substr('hello', start_pos => 2, length => 3); -- Valid ### Implementing Functions with Named Arguments -To support named arguments in your UDF, add parameter names to your function's signature using `.with_parameter_names()`: - -```rust -# use arrow::datatypes::DataType; -# use datafusion_expr::{Signature, Volatility}; -# -# #[derive(Debug)] -# struct MyFunction { -# signature: Signature, -# } -# -impl MyFunction { - fn new() -> Self { - Self { - signature: Signature::uniform( - 2, - vec![DataType::Float64], - Volatility::Immutable - ) - .with_parameter_names(vec![ - "base".to_string(), - "exponent".to_string() - ]) - .expect("valid parameter names"), - } - } -} -``` - -The parameter names should match the order of arguments in your function's signature. DataFusion automatically resolves named arguments to the correct positional order before invoking your function. - -### Example +To support named arguments in your UDF, add parameter names to your function's signature using `.with_parameter_names()`. This works the same way for Scalar, Window, and Aggregate UDFs: ```rust # use std::sync::Arc; @@ -681,10 +657,14 @@ impl ScalarUDFImpl for PowerFunction { } ``` -Once registered, users can call your function with named arguments: +The parameter names should match the order of arguments in your function's signature. DataFusion automatically resolves named arguments to the correct positional order before invoking your function. + +Once registered, users can call your functions with named arguments in any order: ```sql +-- All equivalent SELECT power(base => 2.0, exponent => 3.0); +SELECT power(exponent => 3.0, base => 2.0); SELECT power(2.0, exponent => 3.0); ``` From d98547565ce3827abd686516e1745d26b91e46e4 Mon Sep 17 00:00:00 2001 From: Vegard Stikbakke Date: Wed, 5 Nov 2025 02:49:25 +0100 Subject: [PATCH 6/7] Support reverse for ListView (#18424) ## Which issue does this PR close? - Closes #18350. ## Rationale for this change We want to be able to reverse a ListView. ## What changes are included in this PR? - Downcast `&dyn Array` to `ListView`: `as_list_view_array` - Downcast `&dyn Array` to `LargeListView`: `as_large_list_view_array` - Branches in `array_reverse_inner` to reverse `ListView` and `LargeListView` - Main logic in `list_view_reverse` which materializes a new values array using `take` ## Are these changes tested? Yes --- datafusion/common/src/cast.rs | 14 +- datafusion/functions-nested/src/reverse.rs | 254 ++++++++++++++++++- datafusion/sqllogictest/test_files/array.slt | 7 + 3 files changed, 268 insertions(+), 7 deletions(-) diff --git a/datafusion/common/src/cast.rs b/datafusion/common/src/cast.rs index e6eda3c585e8..b95167ca1390 100644 --- a/datafusion/common/src/cast.rs +++ b/datafusion/common/src/cast.rs @@ -24,8 +24,8 @@ use crate::{downcast_value, Result}; use arrow::array::{ BinaryViewArray, Decimal32Array, Decimal64Array, DurationMicrosecondArray, DurationMillisecondArray, DurationNanosecondArray, DurationSecondArray, Float16Array, - Int16Array, Int8Array, LargeBinaryArray, LargeStringArray, StringViewArray, - UInt16Array, + Int16Array, Int8Array, LargeBinaryArray, LargeListViewArray, LargeStringArray, + ListViewArray, StringViewArray, UInt16Array, }; use arrow::{ array::{ @@ -324,3 +324,13 @@ pub fn as_generic_string_array( ) -> Result<&GenericStringArray> { Ok(downcast_value!(array, GenericStringArray, T)) } + +// Downcast Array to ListViewArray +pub fn as_list_view_array(array: &dyn Array) -> Result<&ListViewArray> { + Ok(downcast_value!(array, ListViewArray)) +} + +// Downcast Array to LargeListViewArray +pub fn as_large_list_view_array(array: &dyn Array) -> Result<&LargeListViewArray> { + Ok(downcast_value!(array, LargeListViewArray)) +} diff --git a/datafusion/functions-nested/src/reverse.rs b/datafusion/functions-nested/src/reverse.rs index 870e54f59000..635f23967a19 100644 --- a/datafusion/functions-nested/src/reverse.rs +++ b/datafusion/functions-nested/src/reverse.rs @@ -19,14 +19,18 @@ use crate::utils::make_scalar_function; use arrow::array::{ - Array, ArrayRef, Capacities, FixedSizeListArray, GenericListArray, MutableArrayData, - OffsetSizeTrait, + Array, ArrayRef, Capacities, FixedSizeListArray, GenericListArray, + GenericListViewArray, MutableArrayData, OffsetSizeTrait, UInt32Array, +}; +use arrow::buffer::{OffsetBuffer, ScalarBuffer}; +use arrow::compute::take; +use arrow::datatypes::DataType::{ + FixedSizeList, LargeList, LargeListView, List, ListView, Null, }; -use arrow::buffer::OffsetBuffer; -use arrow::datatypes::DataType::{FixedSizeList, LargeList, List, Null}; use arrow::datatypes::{DataType, FieldRef}; use datafusion_common::cast::{ - as_fixed_size_list_array, as_large_list_array, as_list_array, + as_fixed_size_list_array, as_large_list_array, as_large_list_view_array, + as_list_array, as_list_view_array, }; use datafusion_common::{exec_err, utils::take_function_args, Result}; use datafusion_expr::{ @@ -134,6 +138,14 @@ pub fn array_reverse_inner(arg: &[ArrayRef]) -> Result { fixed_size_array_reverse(array, field) } Null => Ok(Arc::clone(input_array)), + ListView(field) => { + let array = as_list_view_array(input_array)?; + list_view_reverse::(array, field) + } + LargeListView(field) => { + let array = as_large_list_view_array(input_array)?; + list_view_reverse::(array, field) + } array_type => exec_err!("array_reverse does not support type '{array_type}'."), } } @@ -175,6 +187,75 @@ fn general_array_reverse( )?)) } +/// Reverses a list view array. +/// +/// Construct indices, sizes and offsets for the reversed array by iterating over +/// the list view array in the logical order, and reversing the order of the elements. +/// We end up with a list view array where the elements are in order, +/// even if the original array had elements out of order. +fn list_view_reverse( + array: &GenericListViewArray, + field: &FieldRef, +) -> Result { + let offsets = array.offsets(); + let values = array.values(); + let sizes = array.sizes(); + + let mut new_offsets: Vec = Vec::with_capacity(offsets.len()); + let mut indices: Vec = Vec::with_capacity(values.len()); + let mut new_sizes = Vec::with_capacity(sizes.len()); + + let mut current_offset = O::zero(); + for (row_index, offset) in offsets.iter().enumerate() { + new_offsets.push(current_offset); + + // If this array is null, we set its size to 0 and continue + if array.is_null(row_index) { + new_sizes.push(O::zero()); + continue; + } + let size = sizes[row_index]; + new_sizes.push(size); + + // Each array is located at [offset, offset + size), collect indices in the reverse order + let array_start = *offset; + let array_end = array_start + size; + let mut idx = array_end - O::one(); + while idx >= array_start { + indices.push(idx); + idx = idx - O::one(); + } + + current_offset += size; + } + + // Materialize values from underlying array with take + let indices_array: ArrayRef = if O::IS_LARGE { + Arc::new(arrow::array::UInt64Array::from( + indices + .iter() + .map(|i| i.as_usize() as u64) + .collect::>(), + )) + } else { + Arc::new(UInt32Array::from( + indices + .iter() + .map(|i| i.as_usize() as u32) + .collect::>(), + )) + }; + let values_reversed = take(&values, &indices_array, None)?; + + Ok(Arc::new(GenericListViewArray::::try_new( + Arc::clone(field), + ScalarBuffer::from(new_offsets), + ScalarBuffer::from(new_sizes), + values_reversed, + array.nulls().cloned(), + )?)) +} + fn fixed_size_array_reverse( array: &FixedSizeListArray, field: &FieldRef, @@ -207,3 +288,166 @@ fn fixed_size_array_reverse( array.nulls().cloned(), )?)) } + +#[cfg(test)] +mod tests { + use crate::reverse::list_view_reverse; + use arrow::{ + array::{ + AsArray, GenericListViewArray, Int32Array, LargeListViewArray, ListViewArray, + OffsetSizeTrait, + }, + buffer::{NullBuffer, ScalarBuffer}, + datatypes::{DataType, Field, Int32Type}, + }; + use datafusion_common::Result; + use std::sync::Arc; + + fn list_view_values( + array: &GenericListViewArray, + ) -> Vec>> { + array + .iter() + .map(|x| x.map(|x| x.as_primitive::().values().to_vec())) + .collect() + } + + #[test] + fn test_reverse_list_view() -> Result<()> { + let field = Arc::new(Field::new("a", DataType::Int32, false)); + let offsets = ScalarBuffer::from(vec![0, 1, 6, 6]); + let sizes = ScalarBuffer::from(vec![1, 5, 0, 3]); + let values = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9])); + let nulls = Some(NullBuffer::from(vec![true, true, false, true])); + let list_view = ListViewArray::new(field, offsets, sizes, values, nulls); + let result = list_view_reverse( + &list_view, + &Arc::new(Field::new("test", DataType::Int32, true)), + )?; + let reversed = list_view_values(result.as_list_view::()); + let expected = vec![ + Some(vec![1]), + Some(vec![6, 5, 4, 3, 2]), + None, + Some(vec![9, 8, 7]), + ]; + assert_eq!(expected, reversed); + Ok(()) + } + + #[test] + fn test_reverse_large_list_view() -> Result<()> { + let field = Arc::new(Field::new("a", DataType::Int32, false)); + let offsets = ScalarBuffer::from(vec![0, 1, 6, 6]); + let sizes = ScalarBuffer::from(vec![1, 5, 0, 3]); + let values = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9])); + let nulls = Some(NullBuffer::from(vec![true, true, false, true])); + let list_view = LargeListViewArray::new(field, offsets, sizes, values, nulls); + let result = list_view_reverse( + &list_view, + &Arc::new(Field::new("test", DataType::Int32, true)), + )?; + let reversed = list_view_values(result.as_list_view::()); + let expected = vec![ + Some(vec![1]), + Some(vec![6, 5, 4, 3, 2]), + None, + Some(vec![9, 8, 7]), + ]; + assert_eq!(expected, reversed); + Ok(()) + } + + #[test] + fn test_reverse_list_view_out_of_order() -> Result<()> { + let field = Arc::new(Field::new("a", DataType::Int32, false)); + let offsets = ScalarBuffer::from(vec![6, 1, 6, 0]); // out of order + let sizes = ScalarBuffer::from(vec![3, 5, 0, 1]); + let values = Arc::new(Int32Array::from(vec![ + 1, // fourth array: offset 0, size 1 + 2, 3, 4, 5, 6, // second array: offset 1, size 5 + // third array: offset 6, size 0 (and null) + 7, 8, 9, // first array: offset 6, size 3 + ])); + let nulls = Some(NullBuffer::from(vec![true, true, false, true])); + let list_view = ListViewArray::new(field, offsets, sizes, values, nulls); + let result = list_view_reverse( + &list_view, + &Arc::new(Field::new("test", DataType::Int32, true)), + )?; + let reversed = list_view_values(result.as_list_view::()); + let expected = vec![ + Some(vec![9, 8, 7]), + Some(vec![6, 5, 4, 3, 2]), + None, + Some(vec![1]), + ]; + assert_eq!(expected, reversed); + Ok(()) + } + + #[test] + fn test_reverse_list_view_with_nulls() -> Result<()> { + let field = Arc::new(Field::new("a", DataType::Int32, false)); + let offsets = ScalarBuffer::from(vec![16, 1, 6, 0]); // out of order + let sizes = ScalarBuffer::from(vec![3, 5, 10, 1]); + let values = Arc::new(Int32Array::from(vec![ + 1, // fourth array: offset 0, size 1 + 2, 3, 4, 5, 6, // second array: offset 1, size 5 + 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, // third array: offset 6, size 10 + 7, 8, 9, // first array: offset 6, size 3 + ])); + let nulls = Some(NullBuffer::from(vec![true, true, false, true])); + let list_view = ListViewArray::new(field, offsets, sizes, values, nulls); + let result = list_view_reverse( + &list_view, + &Arc::new(Field::new("test", DataType::Int32, true)), + )?; + let reversed = list_view_values(result.as_list_view::()); + let expected = vec![ + Some(vec![9, 8, 7]), + Some(vec![6, 5, 4, 3, 2]), + None, + Some(vec![1]), + ]; + assert_eq!(expected, reversed); + Ok(()) + } + + #[test] + fn test_reverse_list_view_empty() -> Result<()> { + let field = Arc::new(Field::new("a", DataType::Int32, false)); + let offsets = ScalarBuffer::from(vec![]); + let sizes = ScalarBuffer::from(vec![]); + let empty_array: Vec = vec![]; + let values = Arc::new(Int32Array::from(empty_array)); + let nulls = None; + let list_view = ListViewArray::new(field, offsets, sizes, values, nulls); + let result = list_view_reverse( + &list_view, + &Arc::new(Field::new("test", DataType::Int32, true)), + )?; + let reversed = list_view_values(result.as_list_view::()); + let expected: Vec>> = vec![]; + assert_eq!(expected, reversed); + Ok(()) + } + + #[test] + fn test_reverse_list_view_all_nulls() -> Result<()> { + let field = Arc::new(Field::new("a", DataType::Int32, false)); + let offsets = ScalarBuffer::from(vec![0, 1, 2, 3]); + let sizes = ScalarBuffer::from(vec![0, 1, 1, 1]); + let values = Arc::new(Int32Array::from(vec![1, 2, 3, 4])); + let nulls = Some(NullBuffer::from(vec![false, false, false, false])); + let list_view = ListViewArray::new(field, offsets, sizes, values, nulls); + let result = list_view_reverse( + &list_view, + &Arc::new(Field::new("test", DataType::Int32, true)), + )?; + let reversed = list_view_values(result.as_list_view::()); + let expected: Vec>> = vec![None, None, None, None]; + assert_eq!(expected, reversed); + Ok(()) + } +} diff --git a/datafusion/sqllogictest/test_files/array.slt b/datafusion/sqllogictest/test_files/array.slt index 38bdd7f3e3eb..00629c392df4 100644 --- a/datafusion/sqllogictest/test_files/array.slt +++ b/datafusion/sqllogictest/test_files/array.slt @@ -8384,6 +8384,13 @@ select array_contains(a, b) from array_has order by 1 nulls last; true NULL +# TODO: Enable once arrow_cast supports ListView types. +# Expected output (once supported): +# ---- +# [5, 4, 3, 2, 1] +query error +select array_reverse(arrow_cast(make_array(1, 2, 3, 4, 5), 'ListView(Int64)')); + ### Delete tables statement ok From e91b14958361c8693cb7bb63efa7073903f2f7c9 Mon Sep 17 00:00:00 2001 From: jizezhang Date: Tue, 4 Nov 2025 22:34:48 -0800 Subject: [PATCH 7/7] fold return_type logic into return_field_from_args --- .../spark/src/function/array/spark_array.rs | 33 ++++++++++++++----- 1 file changed, 25 insertions(+), 8 deletions(-) diff --git a/datafusion/spark/src/function/array/spark_array.rs b/datafusion/spark/src/function/array/spark_array.rs index 0b71bb963082..bb9665613de9 100644 --- a/datafusion/spark/src/function/array/spark_array.rs +++ b/datafusion/spark/src/function/array/spark_array.rs @@ -22,13 +22,13 @@ use arrow::array::{ MutableArrayData, NullArray, OffsetSizeTrait, }; use arrow::buffer::OffsetBuffer; -use arrow::datatypes::{DataType, Field}; +use arrow::datatypes::{DataType, Field, FieldRef}; use datafusion_common::utils::SingleRowListArrayBuilder; -use datafusion_common::{plan_datafusion_err, plan_err, Result}; +use datafusion_common::{internal_err, plan_datafusion_err, plan_err, Result}; use datafusion_expr::type_coercion::binary::comparison_coercion; use datafusion_expr::{ - ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, TypeSignature, - Volatility, + ColumnarValue, ReturnFieldArgs, ScalarFunctionArgs, ScalarUDFImpl, Signature, + TypeSignature, Volatility, }; use crate::function::functions_nested_utils::make_scalar_function; @@ -72,9 +72,20 @@ impl ScalarUDFImpl for SparkArray { &self.signature } - fn return_type(&self, arg_types: &[DataType]) -> Result { + fn return_type(&self, _arg_types: &[DataType]) -> Result { + internal_err!("return_field_from_args should be used instead") + } + + fn return_field_from_args(&self, args: ReturnFieldArgs) -> Result { + let data_types = args + .arg_fields + .iter() + .map(|f| f.data_type()) + .cloned() + .collect::>(); + let mut expr_type = DataType::Null; - for arg_type in arg_types { + for arg_type in &data_types { if !arg_type.equals_datatype(&DataType::Null) { expr_type = arg_type.clone(); break; @@ -85,11 +96,17 @@ impl ScalarUDFImpl for SparkArray { expr_type = DataType::Int32; } - Ok(DataType::List(Arc::new(Field::new( + let return_type = DataType::List(Arc::new(Field::new( ARRAY_FIELD_DEFAULT_NAME, expr_type, true, - )))) + ))); + + Ok(Arc::new(Field::new( + "this_field_name_is_irrelevant", + return_type, + false, + ))) } fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result {