New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SEDONA-231] Redundant Serde Elimination #792
[SEDONA-231] Redundant Serde Elimination #792
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know much about java/scala testing frameworks, is there a way to like spy on the geometry serializer to see how many times it gets called as a means to add a test?
sql/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/NullSafeExpressions.scala
Show resolved
Hide resolved
@douglasdennis To check the number of serialization called in each query, you can probably consider using SparkListener. Sedona already has a test listener for RDD join (https://github.com/apache/sedona/blob/master/core/src/main/scala/org/apache/sedona/core/monitoring/Listener.scala). It will accumulate Sedona Metric (https://github.com/apache/sedona/blob/master/core/src/main/scala/org/apache/sedona/core/monitoring/Metric.scala). You can probably refactor it a bit to incorporate this test. In the test listener, you can do some sort of assertion to validate the result. |
@Kontinuation Hi Kristin, any comments? |
We can mock the static methods in To verify that serialization calls were actually eliminated, we can construct a simple catalyst expression and eval it instead of running a full-fledged Spark job. We mock Here is an example of using class SerdeAwareFunctionSpec extends AnyFunSpec {
describe("SerdeAwareFunction") {
it("should save us some serialization and deserialization cost") {
// Mock GeometrySerializer
val factory = new GeometryFactory
val stubGeom = factory.createPoint(new Coordinate(1, 2))
val mocked = mockStatic(classOf[GeometrySerializer])
mocked.when(() => GeometrySerializer.deserialize(any(classOf[Array[Byte]]))).thenReturn(stubGeom)
mocked.when(() => GeometrySerializer.serialize(any(classOf[Geometry]))).thenReturn(Array[Byte](1, 2, 3))
val expr = ST_Union(Seq(
ST_Buffer(Seq(ST_GeomFromText(Seq(Literal("POINT (1 2)"), Literal(0))), Literal(1.0))),
ST_Point(Seq(Literal(1.0), Literal(2.0), Literal(null)))
))
try {
// Evaluate an expression
expr.eval(null)
// Verify number of invocations
mocked.verify(
() => GeometrySerializer.deserialize(any(classOf[Array[Byte]])),
atMost(0))
mocked.verify(
() => GeometrySerializer.serialize(any(classOf[Geometry])),
atMost(1))
} finally {
// Undo the mock
mocked.close()
}
}
}
} This test will pass on this branch, and will fail on the master branch:
Note that we've mocked the |
Thank you, @Kontinuation ! I tried looking at a similar path but I wasn't sure how to get mockito to work. I'll get this added to the PR. |
@Kontinuation Thank you again for the test. I wasn't able to use 5.1.1 of mockito, though. It didn't seem compatible with Java 8. I went with version 4.11.0 instead. @jiayuasu This PR adds mockito as a testing dependency. I think this is also ready to go if everyone agrees. |
@douglasdennis The Mockito dependency looks good to me. If @Kontinuation has no problem with this, I think we are good to go. |
LGTM. |
Did you read the Contributor Guide?
Is this PR related to a JIRA ticket?
What changes were proposed in this PR?
To avoid redundant serde during Spark execution, I propose the following changes:
SerdeAware
trait that has a single abstract method calledevalWithoutSerialization
. This method acts the normaleval
methods except that it will not attempt to serialize its results into a Spark internal type.evalWithoutSerialization
for those classes such that they do not attempt to serialize any Geometry object they return (and returning the actual Geometry object instead).SerdeAware
. If the input expression is SerdeAware then theevalWithoutSerialization
method is called and an actual Geometry object is returned, skipping the serialization and deserialization that would normally happen.To facilitate this, some changes had to be made to a few functions. Specifically, any function that could return LinearRing objects had to be changed such that they guarantee not doing that. This allows for compliance with OGC to be maintained.
I have three points of concern, which is why I marked this PR as draft:
How was this patch tested?
Unit tests pass. A few in flink had to be updated since certain functions no longer return LinearRing objects.
As for ensuring the redundant serde is eliminated, please see point # 2 above.
Did this PR include necessary documentation updates?