Skip to content

Commit

Permalink
[SPARK-32283][CORE] Kryo should support multiple user registrators
Browse files Browse the repository at this point in the history
  • Loading branch information
LantaoJin committed Jul 15, 2020
1 parent 4609f1f commit 45d1e43
Show file tree
Hide file tree
Showing 8 changed files with 13 additions and 12 deletions.
Expand Up @@ -29,7 +29,8 @@ private[spark] object Kryo {
val KRYO_USER_REGISTRATORS = ConfigBuilder("spark.kryo.registrator")
.version("0.5.0")
.stringConf
.createOptional
.toSequence
.createWithDefault(Nil)

val KRYO_CLASSES_TO_REGISTER = ConfigBuilder("spark.kryo.classesToRegister")
.version("1.2.0")
Expand Down
2 changes: 1 addition & 1 deletion core/src/test/scala/org/apache/spark/SparkConfSuite.scala
Expand Up @@ -221,7 +221,7 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst
conf.registerKryoClasses(Array(classOf[Class1]))
assert(conf.get(KRYO_CLASSES_TO_REGISTER).toSet === Seq(classOf[Class1].getName).toSet)

conf.set(KRYO_USER_REGISTRATORS, classOf[CustomRegistrator].getName)
conf.set(KRYO_USER_REGISTRATORS, Seq(classOf[CustomRegistrator].getName))

// Kryo doesn't expose a way to discover registered classes, but at least make sure this doesn't
// blow up.
Expand Down
Expand Up @@ -125,7 +125,7 @@ object KryoBenchmark extends BenchmarkBase {
def createSerializer(useUnsafe: Boolean): SerializerInstance = {
val conf = new SparkConf()
conf.set(SERIALIZER, "org.apache.spark.serializer.KryoSerializer")
conf.set(KRYO_USER_REGISTRATORS, classOf[MyRegistrator].getName)
conf.set(KRYO_USER_REGISTRATORS, Seq(classOf[MyRegistrator].getName))
conf.set(KRYO_USE_UNSAFE, useUnsafe)

new KryoSerializer(conf).newInstance()
Expand Down
Expand Up @@ -76,7 +76,7 @@ object KryoSerializerBenchmark extends BenchmarkBase {
conf.set(EXECUTOR_EXTRA_JAVA_OPTIONS,
"-XX:+UseParallelGC -XX:-UseDynamicNumberOfGCThreads")
conf.set(SERIALIZER, "org.apache.spark.serializer.KryoSerializer")
conf.set(KRYO_USER_REGISTRATORS, classOf[MyRegistrator].getName)
conf.set(KRYO_USER_REGISTRATORS, Seq(classOf[MyRegistrator].getName))
conf.set(KRYO_USE_POOL, usePool)

if (sc != null) {
Expand Down
Expand Up @@ -29,7 +29,7 @@ class KryoSerializerDistributedSuite extends SparkFunSuite with LocalSparkContex
test("kryo objects are serialised consistently in different processes") {
val conf = new SparkConf(false)
.set(config.SERIALIZER, "org.apache.spark.serializer.KryoSerializer")
.set(config.Kryo.KRYO_USER_REGISTRATORS, classOf[AppJarRegistrator].getName)
.set(config.Kryo.KRYO_USER_REGISTRATORS, Seq(classOf[AppJarRegistrator].getName))
.set(config.TASK_MAX_FAILURES, 1)
.set(config.BLACKLIST_ENABLED, false)

Expand Down
Expand Up @@ -42,7 +42,7 @@ import org.apache.spark.util.ThreadUtils

class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext {
conf.set(SERIALIZER, "org.apache.spark.serializer.KryoSerializer")
conf.set(KRYO_USER_REGISTRATORS, classOf[MyRegistrator].getName)
conf.set(KRYO_USER_REGISTRATORS, Seq(classOf[MyRegistrator].getName))
conf.set(KRYO_USE_UNSAFE, false)

test("SPARK-7392 configuration limits") {
Expand Down Expand Up @@ -313,7 +313,7 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext {
import org.apache.spark.SparkException

val conf = new SparkConf(false)
conf.set(KRYO_USER_REGISTRATORS, "this.class.does.not.exist")
conf.set(KRYO_USER_REGISTRATORS, Seq("this.class.does.not.exist"))

val thrown = intercept[SparkException](new KryoSerializer(conf).newInstance().serialize(1))
assert(thrown.getMessage.contains("Failed to register classes with Kryo"))
Expand Down Expand Up @@ -412,7 +412,7 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext {
val ser = new KryoSerializer(new SparkConf).newInstance().asInstanceOf[KryoSerializerInstance]
assert(ser.getAutoReset)
val conf = new SparkConf().set(KRYO_USER_REGISTRATORS,
classOf[RegistratorWithoutAutoReset].getName)
Seq(classOf[RegistratorWithoutAutoReset].getName))
val ser2 = new KryoSerializer(conf).newInstance().asInstanceOf[KryoSerializerInstance]
assert(!ser2.getAutoReset)
}
Expand Down Expand Up @@ -443,7 +443,7 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext {
.set(KRYO_REFERENCE_TRACKING, referenceTracking)
.set(KRYO_USE_POOL, usePool)
if (!autoReset) {
conf.set(KRYO_USER_REGISTRATORS, classOf[RegistratorWithoutAutoReset].getName)
conf.set(KRYO_USER_REGISTRATORS, Seq(classOf[RegistratorWithoutAutoReset].getName))
}
val ser = new KryoSerializer(conf)
val serInstance = ser.newInstance().asInstanceOf[KryoSerializerInstance]
Expand Down Expand Up @@ -530,7 +530,7 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext {

class KryoSerializerAutoResetDisabledSuite extends SparkFunSuite with SharedSparkContext {
conf.set(SERIALIZER, classOf[KryoSerializer].getName)
conf.set(KRYO_USER_REGISTRATORS, classOf[RegistratorWithoutAutoReset].getName)
conf.set(KRYO_USER_REGISTRATORS, Seq(classOf[RegistratorWithoutAutoReset].getName))
conf.set(KRYO_REFERENCE_TRACKING, true)
conf.set(SHUFFLE_MANAGER, "sort")
conf.set(SHUFFLE_SORT_BYPASS_MERGE_THRESHOLD, 200)
Expand Down
Expand Up @@ -52,7 +52,7 @@ class SerializerPropertiesSuite extends SparkFunSuite {

test("KryoSerializer does not support relocation when auto-reset is disabled") {
val conf = new SparkConf().set(KRYO_USER_REGISTRATORS,
classOf[RegistratorWithoutAutoReset].getName)
Seq(classOf[RegistratorWithoutAutoReset].getName))
val ser = new KryoSerializer(conf)
assert(!ser.newInstance().asInstanceOf[KryoSerializerInstance].getAutoReset())
testSupportsRelocationOfSerializedObjects(ser, generateRandomItem)
Expand Down
Expand Up @@ -34,7 +34,7 @@ class DatasetSerializerRegistratorSuite extends QueryTest with SharedSparkSessio

override protected def sparkConf: SparkConf = {
// Make sure we use the KryoRegistrator
super.sparkConf.set(KRYO_USER_REGISTRATORS, TestRegistrator().getClass.getCanonicalName)
super.sparkConf.set(KRYO_USER_REGISTRATORS, Seq(TestRegistrator().getClass.getCanonicalName))
}

test("Kryo registrator") {
Expand Down

0 comments on commit 45d1e43

Please sign in to comment.