Skip to content

Commit

Permalink
[SPARK-32283][CORE] Kryo should support multiple user registrators
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
`spark.kryo.registrator` in 3.0 has a regression problem. From [SPARK-12080](https://issues.apache.org/jira/browse/SPARK-12080), it supports multiple user registrators by
```scala
private val userRegistrators = conf.get("spark.kryo.registrator", "")
    .split(',').map(_.trim)
    .filter(!_.isEmpty)
```
But it donsn't work in 3.0. Fix it by `toSequence` in `Kryo.scala`

### Why are the changes needed?
In previous Spark version (2.x), it supported multiple user registrators by
```scala
private val userRegistrators = conf.get("spark.kryo.registrator", "")
    .split(',').map(_.trim)
    .filter(!_.isEmpty)
```
But it doesn't work in 3.0. It's should be a regression.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Existed unit tests.

Closes #29123 from LantaoJin/SPARK-32283.

Authored-by: LantaoJin <jinlantao@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
LantaoJin authored and cloud-fan committed Jul 29, 2020
1 parent 45b7212 commit 26e6574
Show file tree
Hide file tree
Showing 8 changed files with 13 additions and 12 deletions.
Expand Up @@ -29,7 +29,8 @@ private[spark] object Kryo {
val KRYO_USER_REGISTRATORS = ConfigBuilder("spark.kryo.registrator")
.version("0.5.0")
.stringConf
.createOptional
.toSequence
.createWithDefault(Nil)

val KRYO_CLASSES_TO_REGISTER = ConfigBuilder("spark.kryo.classesToRegister")
.version("1.2.0")
Expand Down
2 changes: 1 addition & 1 deletion core/src/test/scala/org/apache/spark/SparkConfSuite.scala
Expand Up @@ -221,7 +221,7 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst
conf.registerKryoClasses(Array(classOf[Class1]))
assert(conf.get(KRYO_CLASSES_TO_REGISTER).toSet === Seq(classOf[Class1].getName).toSet)

conf.set(KRYO_USER_REGISTRATORS, classOf[CustomRegistrator].getName)
conf.set(KRYO_USER_REGISTRATORS, Seq(classOf[CustomRegistrator].getName))

// Kryo doesn't expose a way to discover registered classes, but at least make sure this doesn't
// blow up.
Expand Down
Expand Up @@ -125,7 +125,7 @@ object KryoBenchmark extends BenchmarkBase {
def createSerializer(useUnsafe: Boolean): SerializerInstance = {
val conf = new SparkConf()
conf.set(SERIALIZER, "org.apache.spark.serializer.KryoSerializer")
conf.set(KRYO_USER_REGISTRATORS, classOf[MyRegistrator].getName)
conf.set(KRYO_USER_REGISTRATORS, Seq(classOf[MyRegistrator].getName))
conf.set(KRYO_USE_UNSAFE, useUnsafe)

new KryoSerializer(conf).newInstance()
Expand Down
Expand Up @@ -76,7 +76,7 @@ object KryoSerializerBenchmark extends BenchmarkBase {
conf.set(EXECUTOR_EXTRA_JAVA_OPTIONS,
"-XX:+UseParallelGC -XX:-UseDynamicNumberOfGCThreads")
conf.set(SERIALIZER, "org.apache.spark.serializer.KryoSerializer")
conf.set(KRYO_USER_REGISTRATORS, classOf[MyRegistrator].getName)
conf.set(KRYO_USER_REGISTRATORS, Seq(classOf[MyRegistrator].getName))
conf.set(KRYO_USE_POOL, usePool)

if (sc != null) {
Expand Down
Expand Up @@ -29,7 +29,7 @@ class KryoSerializerDistributedSuite extends SparkFunSuite with LocalSparkContex
test("kryo objects are serialised consistently in different processes") {
val conf = new SparkConf(false)
.set(config.SERIALIZER, "org.apache.spark.serializer.KryoSerializer")
.set(config.Kryo.KRYO_USER_REGISTRATORS, classOf[AppJarRegistrator].getName)
.set(config.Kryo.KRYO_USER_REGISTRATORS, Seq(classOf[AppJarRegistrator].getName))
.set(config.TASK_MAX_FAILURES, 1)
.set(config.BLACKLIST_ENABLED, false)

Expand Down
Expand Up @@ -42,7 +42,7 @@ import org.apache.spark.util.ThreadUtils

class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext {
conf.set(SERIALIZER, "org.apache.spark.serializer.KryoSerializer")
conf.set(KRYO_USER_REGISTRATORS, classOf[MyRegistrator].getName)
conf.set(KRYO_USER_REGISTRATORS, Seq(classOf[MyRegistrator].getName))
conf.set(KRYO_USE_UNSAFE, false)

test("SPARK-7392 configuration limits") {
Expand Down Expand Up @@ -313,7 +313,7 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext {
import org.apache.spark.SparkException

val conf = new SparkConf(false)
conf.set(KRYO_USER_REGISTRATORS, "this.class.does.not.exist")
conf.set(KRYO_USER_REGISTRATORS, Seq("this.class.does.not.exist"))

val thrown = intercept[SparkException](new KryoSerializer(conf).newInstance().serialize(1))
assert(thrown.getMessage.contains("Failed to register classes with Kryo"))
Expand Down Expand Up @@ -412,7 +412,7 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext {
val ser = new KryoSerializer(new SparkConf).newInstance().asInstanceOf[KryoSerializerInstance]
assert(ser.getAutoReset)
val conf = new SparkConf().set(KRYO_USER_REGISTRATORS,
classOf[RegistratorWithoutAutoReset].getName)
Seq(classOf[RegistratorWithoutAutoReset].getName))
val ser2 = new KryoSerializer(conf).newInstance().asInstanceOf[KryoSerializerInstance]
assert(!ser2.getAutoReset)
}
Expand Down Expand Up @@ -443,7 +443,7 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext {
.set(KRYO_REFERENCE_TRACKING, referenceTracking)
.set(KRYO_USE_POOL, usePool)
if (!autoReset) {
conf.set(KRYO_USER_REGISTRATORS, classOf[RegistratorWithoutAutoReset].getName)
conf.set(KRYO_USER_REGISTRATORS, Seq(classOf[RegistratorWithoutAutoReset].getName))
}
val ser = new KryoSerializer(conf)
val serInstance = ser.newInstance().asInstanceOf[KryoSerializerInstance]
Expand Down Expand Up @@ -530,7 +530,7 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext {

class KryoSerializerAutoResetDisabledSuite extends SparkFunSuite with SharedSparkContext {
conf.set(SERIALIZER, classOf[KryoSerializer].getName)
conf.set(KRYO_USER_REGISTRATORS, classOf[RegistratorWithoutAutoReset].getName)
conf.set(KRYO_USER_REGISTRATORS, Seq(classOf[RegistratorWithoutAutoReset].getName))
conf.set(KRYO_REFERENCE_TRACKING, true)
conf.set(SHUFFLE_MANAGER, "sort")
conf.set(SHUFFLE_SORT_BYPASS_MERGE_THRESHOLD, 200)
Expand Down
Expand Up @@ -52,7 +52,7 @@ class SerializerPropertiesSuite extends SparkFunSuite {

test("KryoSerializer does not support relocation when auto-reset is disabled") {
val conf = new SparkConf().set(KRYO_USER_REGISTRATORS,
classOf[RegistratorWithoutAutoReset].getName)
Seq(classOf[RegistratorWithoutAutoReset].getName))
val ser = new KryoSerializer(conf)
assert(!ser.newInstance().asInstanceOf[KryoSerializerInstance].getAutoReset())
testSupportsRelocationOfSerializedObjects(ser, generateRandomItem)
Expand Down
Expand Up @@ -34,7 +34,7 @@ class DatasetSerializerRegistratorSuite extends QueryTest with SharedSparkSessio

override protected def sparkConf: SparkConf = {
// Make sure we use the KryoRegistrator
super.sparkConf.set(KRYO_USER_REGISTRATORS, TestRegistrator().getClass.getCanonicalName)
super.sparkConf.set(KRYO_USER_REGISTRATORS, Seq(TestRegistrator().getClass.getCanonicalName))
}

test("Kryo registrator") {
Expand Down

0 comments on commit 26e6574

Please sign in to comment.