-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-49308][CONNECT] Support UserDefinedAggregateFunction in Spark Connect Scala Client #49785
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Merging to master/4.0 |
…Connect Scala Client ### What changes were proposed in this pull request? This PR adds support for `UserDefinedAggregateFunction` to the Spark Connect Scala Client. While this is a deprecated feature, we still believe it is useful to support it to ensure we reduce incompatibilities between classic and connect. Implementation wise I opted to convert the `UserDefinedAggregateFunction` to an `Aggregator`, and use that code path for execution. This is probably not as fast as the original implementation (more allocations). ### Why are the changes needed? This reduces friction between the classic and connect implementations. ### Does this PR introduce _any_ user-facing change? Yes. It enabled Spark Connect Scala Client users to use `UserDefinedAggregateFunction`s. ### How was this patch tested? Added tests to ### Was this patch authored or co-authored using generative AI tooling? No. Closes #49785 from hvanhovell/SPARK-49308. Authored-by: Herman van Hovell <herman@databricks.com> Signed-off-by: Herman van Hovell <herman@databricks.com> (cherry picked from commit 4953a9c) Signed-off-by: Herman van Hovell <herman@databricks.com>
NewInstance(cls, arguments, Nil, propagateNull = false, dt, outerPointerGetter)) | ||
|
||
case AgnosticEncoders.RowEncoder(fields) => | ||
val isExternalRow = !path.dataType.isInstanceOf[StructType] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it really safe to call dataType
here? The path
expression might not be resolved and then this will throw an exception.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should be. If you don't know the dataType at this point, then you can't build a deserializer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem comes up if you have a RowEncoder being used inside a ProductEncoder. The the path in the recursion will come from
spark/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/DeserializerBuildHelper.scala
Line 401 in 9a99ecb
createDeserializer(field.enc, getter, newTypePath), |
and then
spark/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/DeserializerBuildHelper.scala
Line 398 in 9a99ecb
addToPath(path, field.name, field.enc.dataType, newTypePath) |
spark/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/DeserializerBuildHelper.scala
Line 37 in 9a99ecb
val newPath = UnresolvedExtractValue(path, expressions.Literal(part)) |
UnresolvedExtractValue
and the .dataType
will throw
org.apache.spark.sql.catalyst.analysis.UnresolvedException: [INTERNAL_ERROR] Invalid call to dataType on unresolved object SQLSTATE: XX000
at org.apache.spark.sql.catalyst.analysis.UnresolvedExtractValue.dataType(unresolved.scala:939)
at org.apache.spark.sql.catalyst.DeserializerBuildHelper$.createDeserializer(DeserializerBuildHelper.scala:411)
Is there some assumption somewhere that the encoders should not be fully composable and RowEncoder
can only be used it certain cases?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hvanhovell Created this PR #51319 that fixes the issue.
### What changes were proposed in this pull request? This fixes support for using a RowEncoder inside a ProductEncoder. ### Why are the changes needed? The current does a dataType check on a path when contructing the RowEncoder deserializer. But this is not safe and if the RowEncoder is used inside a ProductEncoder, it will throw because the path Expression is unresolved. The check was introduced in #49785 ### Does this PR introduce _any_ user-facing change? Yes, it makes it possible to use RowEncoder in more cases. ### How was this patch tested? Existing and new unit tests. ### Was this patch authored or co-authored using generative AI tooling? No Closes #51319 from eejbyfeldt/SPARK-52614. Authored-by: Emil Ejbyfeldt <emil.ejbyfeldt@choreograph.com> Signed-off-by: Herman van Hovell <herman@databricks.com>
This fixes support for using a RowEncoder inside a ProductEncoder. The current does a dataType check on a path when contructing the RowEncoder deserializer. But this is not safe and if the RowEncoder is used inside a ProductEncoder, it will throw because the path Expression is unresolved. The check was introduced in apache#49785 Yes, it makes it possible to use RowEncoder in more cases. Existing and new unit tests. No Closes apache#51319 from eejbyfeldt/SPARK-52614. Authored-by: Emil Ejbyfeldt <emil.ejbyfeldt@choreograph.com> Signed-off-by: Herman van Hovell <herman@databricks.com>
This is backport of SPARK-52614 #51319 to branch-4.0 ### What changes were proposed in this pull request? This fixes support for using a RowEncoder inside a ProductEncoder. ### Why are the changes needed? The current does a dataType check on a path when contructing the RowEncoder deserializer. But this is not safe and if the RowEncoder is used inside a ProductEncoder, it will throw because the path Expression is unresolved. The check was introduced in #49785 ### Does this PR introduce _any_ user-facing change? Yes, it makes it possible to use RowEncoder in more cases. ### How was this patch tested? Existing and new unit tests. ### Was this patch authored or co-authored using generative AI tooling? No Closes #52503 from eejbyfeldt/SPARK-52614-4.0. Authored-by: Emil Ejbyfeldt <emil.ejbyfeldt@choreograph.com> Signed-off-by: Herman van Hovell <herman@databricks.com>
What changes were proposed in this pull request?
This PR adds support for
UserDefinedAggregateFunction
to the Spark Connect Scala Client. While this is a deprecated feature, we still believe it is useful to support it to ensure we reduce incompatibilities between classic and connect.Implementation wise I opted to convert the
UserDefinedAggregateFunction
to anAggregator
, and use that code path for execution. This is probably not as fast as the original implementation (more allocations).Why are the changes needed?
This reduces friction between the classic and connect implementations.
Does this PR introduce any user-facing change?
Yes. It enabled Spark Connect Scala Client users to use
UserDefinedAggregateFunction
s.How was this patch tested?
Added tests to
Was this patch authored or co-authored using generative AI tooling?
No.