-
Notifications
You must be signed in to change notification settings - Fork 28.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-35261][SQL] Support static magic method for stateless Java ScalarFunction #32407
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you, @sunchao .
BTW, in general, we recommend to generate both Java8 and Java11 benchmark results to avoid any regression in one Java version. Could you generate Java8 result file and add to this PR, please?
Thanks @dongjoon-hyun . Yes will add result for JDK 8 too. |
val caller = Literal.create(scalarFunc, ObjectType(scalarFunc.getClass)) | ||
Invoke(caller, MAGIC_METHOD_NAME, scalarFunc.resultType(), | ||
arguments, returnNullable = scalarFunc.isResultNullable) | ||
StaticInvoke(scalarFunc.getClass, scalarFunc.resultType(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ideally we'd want to check if this method is actually static, otherwise there could be runtime error. However this only works for methods defined in Java; for Scala seems there is no easy way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Scala doesn't have the concept of truly static methods, right? The equivalent (object
methods) are actually just instance methods on a singleton.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes that's correct. The StaticInvoke
calls the static method on the non-anonymous class which just forward to the non-static method defined in anonymous/singleton Java class (i.e., the class with $
at the end of its name).
For instance, for the LongAddWithStatic
class, this is the method defined in LongAddWithStaticMagic.class
:
public static long staticInvoke(long, long);
Code:
0: getstatic #16 // Field org/apache/spark/sql/connector/functions/LongAddWithStaticMagic$.MODULE$:Lorg/apache/spark/sql/connector/functions/LongAddWithStaticMagic$;
3: lload_0
4: lload_2
5: invokevirtual #51 // Method org/apache/spark/sql/connector/functions/LongAddWithStaticMagic$.staticInvoke:(JJ)J
8: lreturn
and the same method defined in the singleton class LongAddWithStaticMagic$
:
public long staticInvoke(long, long);
Code:
0: lload_1
1: lload_3
2: ladd
3: lreturn
So I was expecting worse performance from Scala since it calls invokevirtual
underneath while Java uses invokestatic
, but the result doesn't look so. It could be that the performance is dominated by other factors.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very interesting, thanks for the explanation!
Kubernetes integration test starting |
Kubernetes integration test status failure |
Test build #138100 has finished for PR 32407 at commit
|
} | ||
|
||
private def scalarBenchmark(N: Long, resultNullable: Boolean): Unit = { | ||
withSQLConf(s"spark.sql.catalog.$catalogName" -> classOf[InMemoryCatalog].getName) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just for my curiosity/education, why do we need to override the catalog here? Can't we just use the default in-memory catalog?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm I need a way to specify the V2 InMemoryCatalog
here since it implements the FunctionCatalog
. What is the default in-memory catalog? it seems it is not the same as this InMemoryCatalog
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah. It appears I'm mixing up spark.sql.catalogImplementation
(which defaults to org.apache.spark.sql.catalyst.catalog.InMemoryCatalog
) vs. spark.sql.catalog
(which defaults to V2SessionCatalog
but you are overriding it to org.apache.spark.sql.connector.catalog.InMemoryCatalog
). These two similarly named configurations and classes are quite confusing!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah yea, it's easy to confuse the two. We can perhaps change its name if necessary. There are a few other classes with the same name between DSv1 and v2, e.g., Table
, AggregateFunction
,
scalar function (long + long) -> long/notnull wholestage off: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative | ||
-------------------------------------------------------------------------------------------------------------------------------------------- | ||
with long_add_default 28601 28964 496 17.5 57.2 1.0X | ||
with long_add_magic 6986 7156 150 71.6 14.0 4.1X | ||
with long_add_static_magic 6509 6539 32 76.8 13.0 4.4X |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIRC, one of the concerns with the magic-method approach was that things would get really slow when codegen wasn't used because of reflection overhead. But, these results look pretty much identical to the ones above with codegen on. Do we know why we don't see the performance impact that was expected? (or am I misremembering the concern?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes very good point! I'm puzzled by this result too. There are some interesting stuff in the result:
- why the magic method approach is so much faster than the
produceResult
approach: we didn't see that much difference in @cloud-fan 's benchmark. - why codegen on/off doesn't affect the result much
- why we don't see Java outperform Scala when using the static magic method approach (since it uses
invokestatic
). I did some benchmark with Java UDFs but didn't include the result in this PR.
Let me do some profiling to find out why. Will update later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So the reason for 1) is because ApplyFunctionExpression
(especially BoundReference.eval
within the method) is quite expensive as it involves lots of boxing and unboxing
This wasn't present in @cloud-fan 's benchmark.
For 2), there is another config spark.sql.codegen.factoryMode
which controls whether to do codegen on expressions (e.g., in projection). Therefore even though whole stage codegen is turned off, underneath we are still calling Invoke.genCode
to execute the UDF. After turning spark.sql.codegen.factoryMode
off I'm seeing that the magic method is actually half slower than the produceResult
approach. Looking right now why it is so much slower.
For 3) I think it is actually normal, apparently JIT does optimizations which makes the performance almost the same in these two scenarios.
* codegen, removal of Java boxing, etc. | ||
* | ||
* For example, a scalar UDF for adding two integers can be defined as follow with the magic | ||
* a static magic method with name {@link #STATIC_MAGIC_METHOD_NAME}, or non-static magic |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does static magic method have significant benefit over non-static magic one? We shouldn't create the UDF object per row, so I think the cost of non-static magic method is not very different than static one?
The benchmark also doesn't show much difference.
Three different entry points to the UDF API look a bit verbose.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So far I didn't observe any major difference in terms of performance. I'm happy to re-purpose this JIRA/PR to just adding benchmark if others also feel that the static method is not worth it.
Another approach is, we allow UDFs defined in Java to add the static
keyword for the invoke
method, and Spark will translate it to StaticInvoke
accordingly (by looking at Method
modifiers). This keeps it simple as users only need to remember produceResult
and invoke
, and gives them flexibility to define the magic method as static if necessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we allow UDFs defined in Java to add the static keyword for the invoke method
Sounds like a better API! But we need to think about if it works for Scala.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since there is no static method in Scala, invoke
defined in object
will not have the static
modifier from the Method
instance obtained through reflection, and so it will just go with the Invoke
path. We might sacrifice a bit of performance such as unnecessary null check on receiver in Invoke
but it shouldn't matter much.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall I don't see obvious benefit over non-static from static. Having static as a extra support in Java UDF, sounds okay as to support a language-specific feature.
So for Scala, as it doesn't have static method at all, it seems to me it is also natural to have only non-static invoke path.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SGTM, let's fix https://issues.apache.org/jira/browse/SPARK-35281 first and see how much speedup the new static UDF API can provide.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Turns out zipWithIndex
in ApplyFunctionExpression
is also quite expensive comparing to our simple add function. After removing that and fixing SPARK-35281, I no longer see regression when result is nullable, and the gap between the produceResult
and magic method becomes narrower:
[info] OpenJDK 64-Bit Server VM 1.8.0_282-b08 on Mac OS X 10.16
[info] Intel(R) Core(TM) i9-10910 CPU @ 3.60GHz
[info] Java scalar function (long + long) -> long/notnull wholestage on: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
[info] ------------------------------------------------------------------------------------------------------------------------------------------------
[info] with long_add_default 19794 19860 113 25.3 39.6 1.0X
[info] with long_add_magic 7627 7772 228 65.6 15.3 2.6X
[info] with long_add_static_magic 6631 6725 82 75.4 13.3 3.0X
[info] OpenJDK 64-Bit Server VM 1.8.0_282-b08 on Mac OS X 10.16
[info] Intel(R) Core(TM) i9-10910 CPU @ 3.60GHz
[info] Java scalar function (long + long) -> long/nullable wholestage on: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
[info] -------------------------------------------------------------------------------------------------------------------------------------------------
[info] with long_add_default 19990 20243 275 25.0 40.0 1.0X
[info] with long_add_magic 7285 7435 141 68.6 14.6 2.7X
[info] with long_add_static_magic 7077 7170 145 70.6 14.2 2.8X
c999638
to
ca7ea47
Compare
Kubernetes integration test unable to build dist. exiting with code: 1 |
Test build #138157 has finished for PR 32407 at commit
|
Kubernetes integration test starting |
Kubernetes integration test status failure |
@cloud-fan @xkrogen @viirya @dongjoon-hyun this is ready for another round of review - could you take a look? Thanks! |
Java scalar function (long + long) -> long result_nullable = true codegen = false: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative | ||
----------------------------------------------------------------------------------------------------------------------------------------------------------------- | ||
with long_add_default 64039 64306 293 7.8 128.1 1.0X | ||
with long_add_magic 199121 199232 144 2.5 398.2 0.3X |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few optimizations can be done to bring this to ~0.8X. I'll do it as a follow-up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If that is orthogonal to this PR, let's have a separate PR for that instead of the follow-up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As you know, @sunchao . This is a request as one of the downstream users. I hope each patch is orthogonally backportable although we will not backport this to Apache Spark 3.1.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah by "follow-up" I mean a separate PR :)
----------------------------------------------------------------------------------------------------------------------------------------------------------- | ||
with long_add_default 62105 62467 341 8.1 124.2 1.0X | ||
with long_add_magic 20721 22705 1729 24.1 41.4 3.0X | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we have result for long_add_static_magic
when codegen is on?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For Scala, no, since it'll always use Invoke
and so there is no difference. We have long_add_static_magic
for Java UDFs above.
Test build #138183 has finished for PR 32407 at commit
|
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/functions/ScalarFunction.java
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/functions/ScalarFunction.java
Show resolved
Hide resolved
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/functions/ScalarFunction.java
Show resolved
Hide resolved
...alyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ApplyFunctionExpression.scala
Outdated
Show resolved
Hide resolved
@@ -0,0 +1,60 @@ | |||
OpenJDK 64-Bit Server VM 11.0.11+9-LTS on Linux 5.4.0-1046-azure |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, this is Azure
!
------------------------------------------------------------------------------------------------------------------------------------------------------------------ | ||
with long_add_default 60515 60887 453 8.3 121.0 1.0X | ||
with long_add_magic 201052 202036 957 2.5 402.1 0.3X | ||
with long_add_static_magic 202037 202639 584 2.5 404.1 0.3X |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, in this case, static
method is slower than the instance method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes slightly. I think it could be within the margin of benchmark though.
Test build #138228 has finished for PR 32407 at commit
|
Test build #138238 has finished for PR 32407 at commit
|
923d39e
to
51d3edc
Compare
s"codegen = $codegenEnabled" | ||
val benchmark = new Benchmark(name, N, output = output) | ||
benchmark.addCase(s"native_long_add", numIters = 3) { _ => | ||
spark.range(N).selectExpr("id + id").noop() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: this is a bit unfair that, native add is always not nullable, even though resultNullable = true
. We can create a special expression for this benchmark
case class NativeAdd(left: Expression, right: Expression, nullable: Boolean) extends BinaryArithmetic {
...
}
...
spark.range(N).select(Column(NativeAdd($"id".expr, $"id".expr, resultNullable)))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. I've updated the benchmark and result.
Kubernetes integration test starting |
Kubernetes integration test status failure |
Test build #138262 has finished for PR 32407 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, LGTM.
The only left comment is the following, @sunchao ?
Yes, was waiting for benchmark to finish. Updated. |
Kubernetes integration test starting |
Kubernetes integration test status failure |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for updating, @sunchao .
Merged to master for Apache Spark 3.2.0.
left: Expression, | ||
right: Expression, | ||
override val nullable: Boolean) extends BinaryArithmetic { | ||
override protected val failOnError: Boolean = true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's better to set it to false. The UDF just does left + right
, while this native add is doing math.addExact(left, right)
, which is a bit unfair.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops you are right. I didn't realize this could impact performance too. Let me do it in a follow-up.
Test build #138270 has finished for PR 32407 at commit
|
What changes were proposed in this pull request?
This allows
ScalarFunction
implemented in Java to optionally specify the magic methodinvoke
to be static, which can be used if the UDF is stateless. Comparing to the non-static method, it can potentially give better performance due to elimination of dynamic dispatch, etc.Also added a benchmark to measure performance of: the default
produceResult
, non-static magic method and static magic method.Why are the changes needed?
For UDFs that are stateless (e.g., no need to maintain intermediate state between each function call), it's better to allow users to implement the UDF function as static method which could potentially give better performance.
Does this PR introduce any user-facing change?
Yes. Spark users can now have the choice to define static magic method for
ScalarFunction
when it is written in Java and when the UDF is stateless.How was this patch tested?
Added new UT.