-
Notifications
You must be signed in to change notification settings - Fork 28.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-12591][Streaming]Register OpenHashMapBasedStateMap for Kryo #10609
Conversation
Test build #48798 has finished for PR 10609 at commit
|
Test build #48808 has finished for PR 10609 at commit
|
Test build #48843 has finished for PR 10609 at commit
|
CC @tdas |
// Add classes for Streaming | ||
try { | ||
kryo.register( | ||
Utils.classForName("org.apache.spark.streaming.util.OpenHashMapBasedStateMap"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a tricky thing here.
Kryo assigns an unique id to each registered class and only writes its id, so the register order of classes should be same. Otherwise, the ids won't be matched and deserialization will fail.
However, for tests that start a local cluster, their executors have OpenHashMapBasedStateMap
but the driver doesn't. So I added OpenHashMapBasedStateMap
at the last class to make sure other classes's ids are same.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we should add an internal API to let other projects register their classes to KryoSerializer.
Test build #48884 has finished for PR 10609 at commit
|
@tdas updated as we discussed offline. |
Utils.serialize(map), Thread.currentThread().getContextClassLoader) | ||
assertMap(deserMap, map, 1, msg) | ||
deserMap | ||
val deserMaps = Array(new JavaSerializer(conf), new KryoSerializer(conf)).map { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Easier to read if this is made a function and called with two different serializers.
import org.apache.spark.streaming.util.OpenHashMapBasedStateMap._ | ||
import org.apache.spark.util.collection.OpenHashMap | ||
|
||
/** Internal interface for defining the map that keeps track of sessions. */ | ||
private[streaming] abstract class StateMap[K: ClassTag, S: ClassTag] extends Serializable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I removed ClassTag because EmptyStateMap doesn't need it. If removing them, we don't need to add any codes for EmptyStateMap
since it doesn't contain any field.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea.
retest this please |
Test build #48958 has finished for PR 10609 at commit
|
Test build #48960 has finished for PR 10609 at commit
|
} | ||
|
||
private[serializer] class KryoOutputDataOutputBridge(output: KryoOutput) extends DataOutput { | ||
private[spark] class KryoOutputObjectOutputBridge( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you put some docs on this class to explain what this does? Same for the above class.
LGTM, except one minor comment. |
@@ -426,6 +439,7 @@ private[serializer] class KryoOutputDataOutputBridge(output: KryoOutput) extends | |||
override def writeChar(v: Int): Unit = output.writeChar(v.toChar) | |||
override def writeLong(v: Long): Unit = output.writeLong(v) | |||
override def writeByte(v: Int): Unit = output.writeByte(v) | |||
override def writeObject(obj: AnyRef): Unit = kryo.writeClassAndObject(output, obj) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should there be a new unit test in the KryoSerializerSuite to test this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
Test build #48969 has finished for PR 10609 at commit
|
retest this please |
1 similar comment
retest this please |
Test build #48963 has finished for PR 10609 at commit
|
Test build #48967 has finished for PR 10609 at commit
|
retest this please |
By the way, I will send another PR for branch 1.6 due to the conflicts of MimaExcludes.scala. |
retest this please |
Test build #48979 has finished for PR 10609 at commit
|
LGTM. Merging this to master. Please send another PR for 1.6 ASAP. Thanks for catching and fixing this bug. |
…branch 1.6) backport #10609 to branch 1.6 Author: Shixiong Zhu <shixiong@databricks.com> Closes #10656 from zsxwing/SPARK-12591-branch-1.6.
The default serializer in Kryo is FieldSerializer and it ignores transient fields and never calls
writeObject
orreadObject
. So we should register OpenHashMapBasedStateMap using@DefaultSerializer
to make it work with Kryo.