From 26e6574d58429add645db820a83b70ef9dcd49fe Mon Sep 17 00:00:00 2001 From: LantaoJin Date: Wed, 29 Jul 2020 03:58:03 +0000 Subject: [PATCH] [SPARK-32283][CORE] Kryo should support multiple user registrators ### What changes were proposed in this pull request? `spark.kryo.registrator` in 3.0 has a regression problem. From [SPARK-12080](https://issues.apache.org/jira/browse/SPARK-12080), it supports multiple user registrators by ```scala private val userRegistrators = conf.get("spark.kryo.registrator", "") .split(',').map(_.trim) .filter(!_.isEmpty) ``` But it donsn't work in 3.0. Fix it by `toSequence` in `Kryo.scala` ### Why are the changes needed? In previous Spark version (2.x), it supported multiple user registrators by ```scala private val userRegistrators = conf.get("spark.kryo.registrator", "") .split(',').map(_.trim) .filter(!_.isEmpty) ``` But it doesn't work in 3.0. It's should be a regression. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existed unit tests. Closes #29123 from LantaoJin/SPARK-32283. Authored-by: LantaoJin Signed-off-by: Wenchen Fan --- .../scala/org/apache/spark/internal/config/Kryo.scala | 3 ++- .../test/scala/org/apache/spark/SparkConfSuite.scala | 2 +- .../org/apache/spark/serializer/KryoBenchmark.scala | 2 +- .../spark/serializer/KryoSerializerBenchmark.scala | 2 +- .../serializer/KryoSerializerDistributedSuite.scala | 2 +- .../apache/spark/serializer/KryoSerializerSuite.scala | 10 +++++----- .../spark/serializer/SerializerPropertiesSuite.scala | 2 +- .../spark/sql/DatasetSerializerRegistratorSuite.scala | 2 +- 8 files changed, 13 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/Kryo.scala b/core/src/main/scala/org/apache/spark/internal/config/Kryo.scala index 646d8556538c2..90c59b079461c 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/Kryo.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/Kryo.scala @@ -29,7 +29,8 @@ private[spark] object Kryo { val KRYO_USER_REGISTRATORS = ConfigBuilder("spark.kryo.registrator") .version("0.5.0") .stringConf - .createOptional + .toSequence + .createWithDefault(Nil) val KRYO_CLASSES_TO_REGISTER = ConfigBuilder("spark.kryo.classesToRegister") .version("1.2.0") diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala index 3bc2061c4f2ad..72e7ee0214187 100644 --- a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala @@ -221,7 +221,7 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst conf.registerKryoClasses(Array(classOf[Class1])) assert(conf.get(KRYO_CLASSES_TO_REGISTER).toSet === Seq(classOf[Class1].getName).toSet) - conf.set(KRYO_USER_REGISTRATORS, classOf[CustomRegistrator].getName) + conf.set(KRYO_USER_REGISTRATORS, Seq(classOf[CustomRegistrator].getName)) // Kryo doesn't expose a way to discover registered classes, but at least make sure this doesn't // blow up. diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoBenchmark.scala b/core/src/test/scala/org/apache/spark/serializer/KryoBenchmark.scala index fd228cded783a..525e682dd5d42 100644 --- a/core/src/test/scala/org/apache/spark/serializer/KryoBenchmark.scala +++ b/core/src/test/scala/org/apache/spark/serializer/KryoBenchmark.scala @@ -125,7 +125,7 @@ object KryoBenchmark extends BenchmarkBase { def createSerializer(useUnsafe: Boolean): SerializerInstance = { val conf = new SparkConf() conf.set(SERIALIZER, "org.apache.spark.serializer.KryoSerializer") - conf.set(KRYO_USER_REGISTRATORS, classOf[MyRegistrator].getName) + conf.set(KRYO_USER_REGISTRATORS, Seq(classOf[MyRegistrator].getName)) conf.set(KRYO_USE_UNSAFE, useUnsafe) new KryoSerializer(conf).newInstance() diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerBenchmark.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerBenchmark.scala index 953b651c72a83..dde0c98704d00 100644 --- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerBenchmark.scala +++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerBenchmark.scala @@ -76,7 +76,7 @@ object KryoSerializerBenchmark extends BenchmarkBase { conf.set(EXECUTOR_EXTRA_JAVA_OPTIONS, "-XX:+UseParallelGC -XX:-UseDynamicNumberOfGCThreads") conf.set(SERIALIZER, "org.apache.spark.serializer.KryoSerializer") - conf.set(KRYO_USER_REGISTRATORS, classOf[MyRegistrator].getName) + conf.set(KRYO_USER_REGISTRATORS, Seq(classOf[MyRegistrator].getName)) conf.set(KRYO_USE_POOL, usePool) if (sc != null) { diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerDistributedSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerDistributedSuite.scala index d4fafab4a5d64..397fdce8ae6e3 100644 --- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerDistributedSuite.scala +++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerDistributedSuite.scala @@ -29,7 +29,7 @@ class KryoSerializerDistributedSuite extends SparkFunSuite with LocalSparkContex test("kryo objects are serialised consistently in different processes") { val conf = new SparkConf(false) .set(config.SERIALIZER, "org.apache.spark.serializer.KryoSerializer") - .set(config.Kryo.KRYO_USER_REGISTRATORS, classOf[AppJarRegistrator].getName) + .set(config.Kryo.KRYO_USER_REGISTRATORS, Seq(classOf[AppJarRegistrator].getName)) .set(config.TASK_MAX_FAILURES, 1) .set(config.BLACKLIST_ENABLED, false) diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala index 4c47a67ee9ffc..229ef69973775 100644 --- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala +++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala @@ -42,7 +42,7 @@ import org.apache.spark.util.ThreadUtils class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext { conf.set(SERIALIZER, "org.apache.spark.serializer.KryoSerializer") - conf.set(KRYO_USER_REGISTRATORS, classOf[MyRegistrator].getName) + conf.set(KRYO_USER_REGISTRATORS, Seq(classOf[MyRegistrator].getName)) conf.set(KRYO_USE_UNSAFE, false) test("SPARK-7392 configuration limits") { @@ -313,7 +313,7 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext { import org.apache.spark.SparkException val conf = new SparkConf(false) - conf.set(KRYO_USER_REGISTRATORS, "this.class.does.not.exist") + conf.set(KRYO_USER_REGISTRATORS, Seq("this.class.does.not.exist")) val thrown = intercept[SparkException](new KryoSerializer(conf).newInstance().serialize(1)) assert(thrown.getMessage.contains("Failed to register classes with Kryo")) @@ -412,7 +412,7 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext { val ser = new KryoSerializer(new SparkConf).newInstance().asInstanceOf[KryoSerializerInstance] assert(ser.getAutoReset) val conf = new SparkConf().set(KRYO_USER_REGISTRATORS, - classOf[RegistratorWithoutAutoReset].getName) + Seq(classOf[RegistratorWithoutAutoReset].getName)) val ser2 = new KryoSerializer(conf).newInstance().asInstanceOf[KryoSerializerInstance] assert(!ser2.getAutoReset) } @@ -443,7 +443,7 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext { .set(KRYO_REFERENCE_TRACKING, referenceTracking) .set(KRYO_USE_POOL, usePool) if (!autoReset) { - conf.set(KRYO_USER_REGISTRATORS, classOf[RegistratorWithoutAutoReset].getName) + conf.set(KRYO_USER_REGISTRATORS, Seq(classOf[RegistratorWithoutAutoReset].getName)) } val ser = new KryoSerializer(conf) val serInstance = ser.newInstance().asInstanceOf[KryoSerializerInstance] @@ -530,7 +530,7 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext { class KryoSerializerAutoResetDisabledSuite extends SparkFunSuite with SharedSparkContext { conf.set(SERIALIZER, classOf[KryoSerializer].getName) - conf.set(KRYO_USER_REGISTRATORS, classOf[RegistratorWithoutAutoReset].getName) + conf.set(KRYO_USER_REGISTRATORS, Seq(classOf[RegistratorWithoutAutoReset].getName)) conf.set(KRYO_REFERENCE_TRACKING, true) conf.set(SHUFFLE_MANAGER, "sort") conf.set(SHUFFLE_SORT_BYPASS_MERGE_THRESHOLD, 200) diff --git a/core/src/test/scala/org/apache/spark/serializer/SerializerPropertiesSuite.scala b/core/src/test/scala/org/apache/spark/serializer/SerializerPropertiesSuite.scala index dad080c5fc161..9747f5780dd1e 100644 --- a/core/src/test/scala/org/apache/spark/serializer/SerializerPropertiesSuite.scala +++ b/core/src/test/scala/org/apache/spark/serializer/SerializerPropertiesSuite.scala @@ -52,7 +52,7 @@ class SerializerPropertiesSuite extends SparkFunSuite { test("KryoSerializer does not support relocation when auto-reset is disabled") { val conf = new SparkConf().set(KRYO_USER_REGISTRATORS, - classOf[RegistratorWithoutAutoReset].getName) + Seq(classOf[RegistratorWithoutAutoReset].getName)) val ser = new KryoSerializer(conf) assert(!ser.newInstance().asInstanceOf[KryoSerializerInstance].getAutoReset()) testSupportsRelocationOfSerializedObjects(ser, generateRandomItem) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSerializerRegistratorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSerializerRegistratorSuite.scala index 43de2663b1236..b20d050f2fc4a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSerializerRegistratorSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSerializerRegistratorSuite.scala @@ -34,7 +34,7 @@ class DatasetSerializerRegistratorSuite extends QueryTest with SharedSparkSessio override protected def sparkConf: SparkConf = { // Make sure we use the KryoRegistrator - super.sparkConf.set(KRYO_USER_REGISTRATORS, TestRegistrator().getClass.getCanonicalName) + super.sparkConf.set(KRYO_USER_REGISTRATORS, Seq(TestRegistrator().getClass.getCanonicalName)) } test("Kryo registrator") {