-
Notifications
You must be signed in to change notification settings - Fork 28.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-36627][CORE] Fix java deserialization of proxy classes #33879
Conversation
@EugenCepoi @squito could any of you take a look at this PR? Pinging you both because you seem to have worked together on https://issues.apache.org/jira/browse/SPARK-8730, #7122, which is similar to this one. |
|
||
override def resolveProxyClass(ifaces: Array[String]): Class[_] = { | ||
// scalastyle:off classforname | ||
val resolved = ifaces.map(iface => Class.forName(iface, false, loader)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We definitely want better exceptions if this fails, but not sure what exactly we'd want here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
2-space indent, and maybe you can use the Utils method in Spark for loading the class?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Utils#classForName
uses either getContextOrSparkClassLoader
or thread.currentThread().getContextClassLoader
which might not be the ones we want depending on the context we're deserializing -- I'd rather use the same classloader we're using above (when deserializing non-proxy java objects).
instance.deserialize[ContainsProxyClass](instance.serialize(new ContainsProxyClass())) | ||
// enforce class cast | ||
obj.getClass | ||
assert(classesLoaded.exists(klass => klass.contains("MyInterface"))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a very roundabout way of testing this, but I couldn't think of another way of testing it - the way we hit this issue was running spark in YARN, and I don't think we can trivially reproduce it with spark local.
In order to repro exactly what happened in YARN we'd need to make sun.misc.VM.latestUserDefinedLoader()
not be able to resolve MyInterface
, but I couldn't figure out how to do it.
Out of curiosity, where do proxy classes typically come up? |
Sorry for the late response - proxy classes come up when we're building http clients. We usually build them from interfaces (annotated with JaxRs annotations or custom ones) and reference it by its interface. See feign for an example. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This probably can't 'hurt', right? only should make something possibly work that couldn't before
|
||
override def resolveProxyClass(ifaces: Array[String]): Class[_] = { | ||
// scalastyle:off classforname | ||
val resolved = ifaces.map(iface => Class.forName(iface, false, loader)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
2-space indent, and maybe you can use the Utils method in Spark for loading the class?
core/src/test/java/org/apache/spark/serializer/ProxySerializerTest.java
Outdated
Show resolved
Hide resolved
import java.lang.reflect.Proxy; | ||
|
||
class ContainsProxyClass implements Serializable { | ||
final MyInterface proxy = (MyInterface) Proxy.newProxyInstance( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Likewise 2-space
out.writeInt(counterReset) | ||
out.writeBoolean(extraDebugInfo) | ||
} | ||
override def writeExternal(out: ObjectOutput): Unit = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ran ./dev/scalafmt
, which modified a few more lines.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks OK, let me run tests. If you have a sec, go ahead and copy info from the JIRA into the description above
@@ -28,8 +28,10 @@ import org.apache.spark.internal.config._ | |||
import org.apache.spark.util.{ByteBufferInputStream, ByteBufferOutputStream, Utils} | |||
|
|||
private[spark] class JavaSerializationStream( | |||
out: OutputStream, counterReset: Int, extraDebugInfo: Boolean) | |||
extends SerializationStream { | |||
out: OutputStream, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is fine, if this is what scalafmt does to this file - otherwise I'd say leave it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep, all was scalafmt
Jenkins test this please |
Test build #144660 has finished for PR 33879 at commit
|
Kubernetes integration test starting |
Kubernetes integration test status failure |
Test build #144662 has finished for PR 33879 at commit
|
Jenkins test this please |
Kubernetes integration test starting |
Kubernetes integration test status failure |
Test build #144719 has finished for PR 33879 at commit
|
Merged to master |
## Upstream SPARK-XXXXX ticket and PR link (if not applicable, explain) https://issues.apache.org/jira/browse/SPARK-36627 ## What changes were proposed in this pull request? In JavaSerializer.JavaDeserializationStream we override resolveClass of ObjectInputStream to use the threads' contextClassLoader. However, we do not override resolveProxyClass, which is used when deserializing Java proxy objects, which makes spark use the wrong classloader when deserializing objects, which causes the job to fail with the following exception: ``` Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 4, <host>, executor 1): java.lang.ClassNotFoundException: <class> at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581) at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178) at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522) at java.base/java.lang.Class.forName0(Native Method) at java.base/java.lang.Class.forName(Class.java:398) at java.base/java.io.ObjectInputStream.resolveProxyClass(ObjectInputStream.java:829) at java.base/java.io.ObjectInputStream.readProxyDesc(ObjectInputStream.java:1917) ... at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76) ``` ### Why are the changes needed? Spark deserialization fails with no recourse for the user. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Unit tests. Closes apache#33879 from fsamuel-bs/SPARK-36627. Authored-by: Samuel Souza <ssouza@palantir.com> Signed-off-by: Sean Owen <srowen@gmail.com>
Upstream SPARK-XXXXX ticket and PR link (if not applicable, explain)
https://issues.apache.org/jira/browse/SPARK-36627
What changes were proposed in this pull request?
In JavaSerializer.JavaDeserializationStream we override resolveClass of ObjectInputStream to use the threads' contextClassLoader. However, we do not override resolveProxyClass, which is used when deserializing Java proxy objects, which makes spark use the wrong classloader when deserializing objects, which causes the job to fail with the following exception:
Why are the changes needed?
Spark deserialization fails with no recourse for the user.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Unit tests.