Skip to content

bug(udf): nullary scalar UDFs cannot determine batch row count #56

@LantaoJin

Description

@LantaoJin

Describe the bug

ScalarFunction.evaluate(BufferAllocator allocator, List<FieldVector> args) (core/src/main/java/org/apache/datafusion/ScalarFunction.java) is the contract every Java-implemented scalar UDF must satisfy. It must return a FieldVector whose getValueCount() matches the batch row count DataFusion is driving through the operator tree.

For UDFs with at least one argument, the body can read args.get(0).getValueCount() to learn how many rows to produce. For nullary UDFs -- zero arguments, e.g. analogs of random(), pi(), now() -- args is the empty list. There is no other channel to learn the expected row count, so the body has no way to size its output.

The native side already knows the value: ScalarFunctionArgs::number_rows is read at native/src/udf.rs:100, used to materialise scalar arg columns at native/src/udf.rs:106. The Java bridge (core/src/main/java/org/apache/datafusion/internal/JniBridge.java:54-92) receives it but only uses it after the fact, to validate the returned vector's length (:74-84). It is never communicated to impl.evaluate(...) (:69), and the user-facing ScalarFunction interface has no parameter for it.

The result: any nullary UDF that DataFusion does not constant-fold (i.e. anything declared Volatility.VOLATILE, or any STABLE call inside an aggregation that crosses partitioning boundaries) trips the post-hoc row-count validation as soon as it runs over a batch with more than one row.

To Reproduce

A Volatility.VOLATILE nullary UDF (which the optimizer cannot constant-fold), invoked over a 3-row table:

static final class VolatileNullary implements ScalarFunction {
  public String name() { return "java_volatile_pi"; }
  public List<ArrowType> argTypes() { return List.of(); }
  public ArrowType returnType() {
    return new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE);
  }
  public Volatility volatility() { return Volatility.VOLATILE; }

  @Override
  public FieldVector evaluate(BufferAllocator allocator, List<FieldVector> args) {
    // No way to know the batch row count: args is empty.
    Float8Vector out = new Float8Vector("pi_out", allocator);
    out.allocateNew(1);
    out.set(0, Math.PI);
    out.setValueCount(1);
    return out;
  }
}

// SELECT java_volatile_pi() FROM (3-row VALUES) → fails:
// Java UDF 'java_volatile_pi' threw java.lang.IllegalStateException:
// ScalarFunction.evaluate returned vector with 1 rows; expected 3

For UDFs marked Volatility.IMMUTABLE, the optimizer constant-folds the call into a single broadcast value and the bug is masked at the SQL layer -- but the underlying API gap is the same, and breaks as soon as the UDF is non-deterministic or the optimizer can't fold (e.g. a STABLE now() analog in a streaming plan).

Expected behavior

A nullary UDF can produce N rows, where N is the batch row count DataFusion has chosen. This is the normal shape DataFusion expects from any scalar function -- upstream's Rust API exposes ScalarFunctionArgs::number_rows to every implementation for exactly this reason.

Additional context

No response

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No fields configured for Bug.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions