feat(udf)!: switch ScalarFunction.evaluate to ColumnarValue API (closes #62)#64
Conversation
Replace evaluate(BufferAllocator, List<FieldVector>) -> FieldVector with evaluate(BufferAllocator, ScalarFunctionArgs) -> ColumnarValue. Args now carry per-arg Array-or-Scalar information plus the batch row count, and the return distinguishes a length-N Array from a broadcast Scalar. JniBridge.invokeScalarUdf now ships args as two struct arrays (length-N for Array args, length-1 for Scalar args) plus a positional byte[] argKinds, and returns a byte indicating the result variant. Existing test UDFs (AddOne, Concat, Square, ReturnsNull, WrongRowCount, WrongType, ThrowsIAE) and AddOneExample are updated to the new signature. Native side will be updated in the next commit. Refs apache#62.
JavaScalarUdf::invoke_with_args now partitions ScalarFunctionArgs::args by ColumnarValue variant: array args travel through a length-N struct, scalar args through a separate length-1 struct (one element each), and a positional byte slice tells the Java side how to interleave them back. The JNI signature of invokeScalarUdf becomes (Lorg/apache/datafusion/ScalarFunction;JJJJ[BJJI)B; the returned byte indicates Array (0) or Scalar (1) so the native side reconstructs the right ColumnarValue variant via ScalarValue::try_from_array. Drops the prior scalar-materialisation step, which was the workaround that PR apache#57 attempted to patch by passing rowCount; nullary UDFs that broadcast a value now return ColumnarValue.Scalar instead. Closes apache#62.
|
@pgwhalen fyi |
pgwhalen
left a comment
There was a problem hiding this comment.
Makes sense to me! My bindings apparently solved/noticed this problem by passing the row count like the other PR. Unfortunately I missed that that was actually important in your PR and assumed that you just had a cleaner version.
All the more reason to emulate the rust API closely.
Thanks for the review! After chatting with @mbutrovich about making a similar change in Comet, he pointed out that we may still need row count in some cases, like for |
Which issue does this PR close?
Closes #62.
Rationale for this change
DataFusion's Rust
ScalarUDFImpl::invoke_with_argsspeaksColumnarValue(ArrayorScalar) rather than raw Arrow arrays. The Java binding previously materialised every scalar arg to a length-N array before crossing the JNI boundary, which lost the scalar-vs-array distinction and forced nullary UDFs to learn the batch row count by some out-of-band channel (the workaround proposed in PR #57).Aligning the Java API with the Rust enum eliminates the workaround: a nullary UDF can return
ColumnarValue.scalar(...)and the framework broadcasts it, and a UDF that takes literals sees them as Scalars without per-row duplication.What changes are included in this PR?
ColumnarValuesealed interface (Array/Scalarrecords, factory enforcing length-1 invariant on scalars).ScalarFunctionArgsrecord bundlingList<ColumnarValue>androwCount.ScalarFunction.evaluateis nowevaluate(BufferAllocator, ScalarFunctionArgs) -> ColumnarValue(source-breaking).JniBridge.invokeScalarUdfrewritten to ship two struct arrays (length-N Array args + length-1 Scalar args) plus abyte[] argKindspositional mask, returning abyteindicating the result variant. JNI signature is now(Lorg/apache/datafusion/ScalarFunction;JJJJ[BJJI)B.invoke_with_argsno longer materialises scalars; it partitions args byColumnarValuevariant and reconstructs the result from the returned kind byte viaScalarValue::try_from_array.AddOneExampleanddocs/source/user-guide/scalar-udf.mdupdated; new "Returning a Scalar" section added to the user guide.How are these changes tested?
make test— 135 tests pass (12 pre-existing skips). ExistingScalarUdfTestcases (AddOne,Concat,Square, error paths, volatility round-trip) adapted to the new signature, plus three new tests:nullaryScalarReturnUdf_overMultiRowQuery_broadcasts— a nullaryjava_pireturnsColumnarValue.scalar(...)and the framework expands it across rows, replacing the rowCount workaround.scalarLiteralArg_arrivesAsScalarColumnarValue— UDF asserts that a SQL literal arrives asColumnarValue.Scalar(length 1), proving scalar-ness survives the FFI.udfReturningScalar_isBroadcastByFramework— explicit scalar-return path test.Also covered by
cargo clippy --all-targets --workspace -- -D warnings(clean) and./mvnw spotless:check(clean).Are there any user-facing changes?
Yes — source-breaking signature change to
ScalarFunction.evaluate. Implementations must:Before:
After:
Nullary or broadcast-style UDFs can return
ColumnarValue.scalar(...)over a length-1 vector.