feat: add user-facing CometUDF registration for custom JVM UDFs#4233
Draft
andygrove wants to merge 6 commits intoapache:mainfrom
Draft
feat: add user-facing CometUDF registration for custom JVM UDFs#4233andygrove wants to merge 6 commits intoapache:mainfrom
andygrove wants to merge 6 commits intoapache:mainfrom
Conversation
Add a framework that allows Comet to invoke JVM-side UDF implementations operating on Arrow data via JNI, avoiding expensive fallback to Spark while maintaining 100% Spark compatibility for expressions not yet implemented natively in Rust. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add CometUdfRegistry that allows end users to register their own CometUDF implementations to be accelerated by Comet's native execution. When a ScalaUDF is encountered during planning whose name matches a registry entry, Comet emits a JvmScalarUdf proto instead of falling back to Spark's row-at-a-time execution. Also adds user guide documentation explaining how to write, register, and deploy custom JVM UDFs. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Adds end-to-end tests verifying: - Basic CometUDF execution (integer doubling via Arrow vectors) - Unregistered UDFs correctly fall back to Spark - Multiple UDF invocations in a single query - UDF combined with WHERE filter - CometUdfRegistry API (register, lookup, remove) Also fixes KnownNotNull unwrapping in CometScalaUdf — Spark wraps UDF arguments in KnownNotNull when the UDF is non-nullable, which needs to be stripped before serializing the underlying expression. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
comphead
reviewed
May 5, 2026
| } | ||
|
|
||
| test("user CometUDF - basic integer doubling") { | ||
| CometUdfRegistry.register( |
Contributor
There was a problem hiding this comment.
non critical, just from dev experience, we prob can have a single facade method that registers the function in all registries
comphead
reviewed
May 5, 2026
| sql("CREATE TABLE t (x INT) USING parquet") | ||
| sql("INSERT INTO t VALUES (1), (2), (3)") | ||
| // Should still produce correct results via Spark fallback | ||
| checkSparkAnswer(sql("SELECT triple_int(x) FROM t")) |
Contributor
There was a problem hiding this comment.
should we check fallback msg?
Move the DoubleIntUdf test fixture from spark/src/test/ to common/src/main/ so that its bytecode references to org.apache.arrow are relocated by common's shade plugin to org.apache.comet.shaded.arrow, matching the shaded CometUDF interface that user code sees at runtime. A test-scope class in spark/ was compiled against common/target/classes (unshaded) due to Maven workspace resolution and failed at runtime with AbstractMethodError when dispatched through the shaded interface. Update the user-guide page to import Arrow from org.apache.comet.shaded.arrow, which is the package real users compile against in the published comet-spark JAR.
The PR's new ScalaUDF dispatch in QueryPlanSerde changes the fallback message emitted for an anonymous (no-name) UDF from the generic "scalaudf is not supported" to "ScalaUDF has no name, cannot look up CometUDF registration". Update the test's expected fallback reasons accordingly.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Which issue does this PR close?
Part of #4193
Builds on #4232 (JVM UDF framework)
Rationale for this change
This PR enables end users to provide their own
CometUDFimplementations that operate on Arrow columnar data, registered alongside standard Spark UDFs. When Comet encounters a matching UDF during planning, it routes to the vectorized Arrow implementation instead of falling back to Spark's row-at-a-time execution.What changes are included in this PR?
CometUdfRegistry— a thread-safe registry mapping Spark UDF names to CometUDF implementation class names + metadata. Includes a convenience method that registers both with Spark and Comet in one call.CometScalaUdfserde handler — interceptsScalaUDFexpressions in query planning; if the UDF name is registered inCometUdfRegistry, emits aJvmScalarUdfproto for native execution.custom-jvm-udfs.md) — documents how to write, register, and deploy custom JVM UDFs.User-facing API:
How are these changes tested?
mvn compilepasses for common + spark modules)Test plan
CometUdfRegistry.register+ScalaUDFinterception emitsJvmScalarUdfproto🤖 Generated with Claude Code