-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-48008][1/2] Support UDAFs in Spark Connect #46245
Conversation
sql/core/src/main/scala/org/apache/spark/sql/expressions/Aggregator.scala
Show resolved
Hide resolved
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/expressions/Aggregator.scala
Show resolved
Hide resolved
...connect/client/jvm/src/main/scala/org/apache/spark/sql/expressions/UserDefinedFunction.scala
Outdated
Show resolved
Hide resolved
connector/connect/common/src/main/protobuf/spark/connect/expressions.proto
Outdated
Show resolved
Hide resolved
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/expressions/Aggregator.scala
Show resolved
Hide resolved
...connect/client/jvm/src/main/scala/org/apache/spark/sql/expressions/UserDefinedFunction.scala
Show resolved
Hide resolved
@@ -346,4 +347,42 @@ class UserDefinedFunctionE2ETestSuite extends QueryTest { | |||
val result = df.select(f($"id")).as[Long].head() | |||
assert(result == 1L) | |||
} | |||
|
|||
test("UDAF custom Aggregator - primitive types") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a test for a UDAF with a custom toColumn implementation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's do this in the next PR where we add support of toColumn
.
# Conflicts: # connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/UserDefinedFunctionE2ETestSuite.scala
# Conflicts: # python/pyspark/sql/connect/proto/commands_pb2.py
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
@HyukjinKwon I think the current protobuf style checks are a bit too strict. The changes made by @xupefei are wire compatible. Can we make this a bit more lenient? |
@xupefei can you fix style, and perhaps revert the renaming of the messages? |
Done! I've reverted the Protobuf change but kept the naming changes in the Scala code. |
Just saw this. seems tests passing fine (?). |
This is the Proto test that is failing: https://github.com/xupefei/spark/actions/runs/9255404729/job/25459140837, for commit 82b802d. I reverted the Proto naming change. |
cc @grundprinzip ^^^ |
Merging! |
### What changes were proposed in this pull request? This PR changes Spark Connect to support defining and registering `Aggregator[IN, BUF, OUT]` UDAFs. The mechanism is similar to supporting Scaler UDFs. On the client side, we serialize and send the `Aggregator` instance to the server, where the data is deserialized into an `Aggregator` instance recognized by Spark Core. With this PR we now have two `Aggregator` interfaces defined, one in Connect API and one in Core. They define exactly the same abstract methods and share the same `SerialVersionUID`, so the Java serialization engine could map one to another. It is very important to keep these two definitions always in sync. Second part of this effort will be adding `Aggregator.toColumn` API (now NotImplemented due to deps to Spark Core). ### Why are the changes needed? Spark Connect does not have UDAF support. We need to fix that. ### Does this PR introduce _any_ user-facing change? Yes, Connect users could now define an Aggregator and register it: ```scala val agg = new Aggregator[INT, INT, INT] { ... } spark.udf.register("agg", udaf(agg)) val ds: Dataset[Data] = ... val aggregated = ds.selectExpr("agg(i)") ``` ### How was this patch tested? Added new tests. ### Was this patch authored or co-authored using generative AI tooling? Nope. Closes apache#46245 from xupefei/connect-udaf. Authored-by: Paddy Xu <xupaddy@gmail.com> Signed-off-by: Herman van Hovell <herman@databricks.com>
### What changes were proposed in this pull request? This PR follows #46245 to add support `udaf.toColumn` API in Spark Connect. Here we introduce a new Protobuf message, `proto.TypedAggregateExpression`, that includes a serialized UDF packet. On the server, we unpack it into an `Aggregator` object and generate a real `TypedAggregateExpression` instance with the encoder information passed along with the UDF. ### Why are the changes needed? Because the `toColumn` API is not supported in the previous PR. ### Does this PR introduce _any_ user-facing change? Yes, from now on users could create typed UDAF using `udaf.toColumn` API/. ### How was this patch tested? New tests. ### Was this patch authored or co-authored using generative AI tooling? Nope. Closes #46849 from xupefei/connect-udaf-tocolumn. Authored-by: Paddy Xu <xupaddy@gmail.com> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request? This PR follows apache#46245 to add support `udaf.toColumn` API in Spark Connect. Here we introduce a new Protobuf message, `proto.TypedAggregateExpression`, that includes a serialized UDF packet. On the server, we unpack it into an `Aggregator` object and generate a real `TypedAggregateExpression` instance with the encoder information passed along with the UDF. ### Why are the changes needed? Because the `toColumn` API is not supported in the previous PR. ### Does this PR introduce _any_ user-facing change? Yes, from now on users could create typed UDAF using `udaf.toColumn` API/. ### How was this patch tested? New tests. ### Was this patch authored or co-authored using generative AI tooling? Nope. Closes apache#46849 from xupefei/connect-udaf-tocolumn. Authored-by: Paddy Xu <xupaddy@gmail.com> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request? This PR follows apache#46245 to add support `udaf.toColumn` API in Spark Connect. Here we introduce a new Protobuf message, `proto.TypedAggregateExpression`, that includes a serialized UDF packet. On the server, we unpack it into an `Aggregator` object and generate a real `TypedAggregateExpression` instance with the encoder information passed along with the UDF. ### Why are the changes needed? Because the `toColumn` API is not supported in the previous PR. ### Does this PR introduce _any_ user-facing change? Yes, from now on users could create typed UDAF using `udaf.toColumn` API/. ### How was this patch tested? New tests. ### Was this patch authored or co-authored using generative AI tooling? Nope. Closes apache#46849 from xupefei/connect-udaf-tocolumn. Authored-by: Paddy Xu <xupaddy@gmail.com> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request? This PR follows apache#46245 to add support `udaf.toColumn` API in Spark Connect. Here we introduce a new Protobuf message, `proto.TypedAggregateExpression`, that includes a serialized UDF packet. On the server, we unpack it into an `Aggregator` object and generate a real `TypedAggregateExpression` instance with the encoder information passed along with the UDF. ### Why are the changes needed? Because the `toColumn` API is not supported in the previous PR. ### Does this PR introduce _any_ user-facing change? Yes, from now on users could create typed UDAF using `udaf.toColumn` API/. ### How was this patch tested? New tests. ### Was this patch authored or co-authored using generative AI tooling? Nope. Closes apache#46849 from xupefei/connect-udaf-tocolumn. Authored-by: Paddy Xu <xupaddy@gmail.com> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
What changes were proposed in this pull request?
This PR changes Spark Connect to support defining and registering
Aggregator[IN, BUF, OUT]
UDAFs.The mechanism is similar to supporting Scaler UDFs. On the client side, we serialize and send the
Aggregator
instance to the server, where the data is deserialized into anAggregator
instance recognized by Spark Core.With this PR we now have two
Aggregator
interfaces defined, one in Connect API and one in Core. They define exactly the same abstract methods and share the sameSerialVersionUID
, so the Java serialization engine could map one to another. It is very important to keep these two definitions always in sync.Second part of this effort will be adding
Aggregator.toColumn
API (now NotImplemented due to deps to Spark Core).Why are the changes needed?
Spark Connect does not have UDAF support. We need to fix that.
Does this PR introduce any user-facing change?
Yes, Connect users could now define an Aggregator and register it:
How was this patch tested?
Added new tests.
Was this patch authored or co-authored using generative AI tooling?
Nope.