Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions datafusion/core/tests/sql/udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,64 @@ async fn scalar_udf() -> Result<()> {
Ok(())
}

#[tokio::test]
async fn scalar_udf_zero_params() -> Result<()> {
let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);

let batch = RecordBatch::try_new(
Arc::new(schema.clone()),
vec![Arc::new(Int32Array::from_slice([1, 10, 10, 100]))],
)?;
let ctx = SessionContext::new();

ctx.register_batch("t", batch)?;
// create function just returns 100 regardless of inp
let myfunc = |args: &[ArrayRef]| {
let num_rows = args[0].len();
Ok(Arc::new((0..num_rows).map(|_| 100).collect::<Int32Array>()) as ArrayRef)
};
let myfunc = make_scalar_function(myfunc);

ctx.register_udf(create_udf(
"get_100",
vec![],
Arc::new(DataType::Int32),
Volatility::Immutable,
myfunc,
));

let result = plan_and_collect(&ctx, "select get_100() a from t").await?;
let expected = vec![
"+-----+", //
"| a |", //
"+-----+", //
"| 100 |", //
"| 100 |", //
"| 100 |", //
"| 100 |", //
"+-----+", //
];
assert_batches_eq!(expected, &result);

let result = plan_and_collect(&ctx, "select get_100() a").await?;
let expected = vec![
"+-----+", //
"| a |", //
"+-----+", //
"| 100 |", //
"+-----+", //
];
assert_batches_eq!(expected, &result);

let result = plan_and_collect(&ctx, "select get_100() from t where a=999").await?;
let expected = vec![
"++", //
"++",
];
assert_batches_eq!(expected, &result);
Ok(())
}

/// tests the creation, registration and usage of a UDAF
#[tokio::test]
async fn simple_udaf() -> Result<()> {
Expand Down
5 changes: 4 additions & 1 deletion datafusion/physical-expr/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,10 @@ pub fn create_physical_expr(
execution_props,
)?);
}

// udfs with zero params expect null array as input
if args.is_empty() {
physical_args.push(Arc::new(Literal::new(ScalarValue::Null)));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Though a convenient implementation, I'm wary that generating a fake argument by which to transfer row count will cause problems later.

I'd prefer to see a solution where the row count is passed out-of-band, for example in a change to https://github.com/apache/arrow-datafusion/blob/cef119da9ee8672b1b1e50ac01387dcb1640d96e/datafusion/expr/src/function.rs#L39 that would add an extra argument (i.e. len: usize) for this purpose.

If that were present, we could populate it up in the call stack where we know the RecordBatch size, probably here: https://github.com/apache/arrow-datafusion/blob/cef119da9ee8672b1b1e50ac01387dcb1640d96e/datafusion/physical-expr/src/scalar_function.rs#L147

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. Thanks for the suggestions!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree it was not the best solution. The reason why I thought to do it this way is because the existing docs state:

...with the exception of zero param function, where a singular element vec
will be passed. In that case the single element is a null array to indicate
the batch's row count (so that the generative zero-argument function can know
the result array size).

Is it safe to assume I should update this part of the docs according to my implementation?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, sorry I missed that. I didn't realize this was part of the existing design.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No worries I should've put that in the PR itself. Just edited

}
udf::create_physical_expr(fun.clone().as_ref(), &physical_args, input_schema)
}
Expr::Between(Between {
Expand Down