From b59d2c2c4e9fe0ca1d3be72e2e78fa2c968d5831 Mon Sep 17 00:00:00 2001 From: Graham Dennis Date: Fri, 8 Aug 2014 10:40:39 +1000 Subject: [PATCH 1/4] SPARK-2893: Do not swallow Exceptions when running a custom spark.kryo.registrator The previous behaviour of swallowing ClassNotFound exceptions when running a custom Kryo registrator could lead to difficult to debug problems later on at serialisation / deserialisation time, see SPARK-2878. Instead it is better to fail fast. Added test case. --- .../org/apache/spark/serializer/KryoSerializer.scala | 3 ++- .../apache/spark/serializer/KryoSerializerSuite.scala | 10 ++++++++++ 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala index 407cb9db6ee9a..4722826347630 100644 --- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala @@ -87,7 +87,8 @@ class KryoSerializer(conf: SparkConf) reg.registerClasses(kryo) } } catch { - case e: Exception => logError("Failed to run spark.kryo.registrator", e) + case e: Exception => throw + new SparkException("Failed to run spark.kryo.registrator", e) } // Register Chill's classes; we do this after our ranges and the user's own classes to let diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala index 789b773bae316..4daf92400f6cb 100644 --- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala +++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala @@ -207,6 +207,16 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext { .fold(new ClassWithoutNoArgConstructor(10))((t1, t2) => new ClassWithoutNoArgConstructor(t1.x + t2.x)).x assert(10 + control.sum === result) } + + test("kryo with nonexistant custom registrator should fail") { + import org.apache.spark.{SparkConf, SparkException} + + val conf = new SparkConf(false) + conf.set("spark.kryo.registrator", "this.class.does.not.exist") + + val thrown = intercept[SparkException](new KryoSerializer(conf).newInstance()) + assert(thrown.getMessage.contains("Failed to run spark.kryo.registrator")) + } } class KryoSerializerResizableOutputSuite extends FunSuite { From f480d85cb14d8fa21fa3148a29a98924ec04fe3d Mon Sep 17 00:00:00 2001 From: Graham Dennis Date: Thu, 14 Aug 2014 17:08:58 +1000 Subject: [PATCH 2/4] [SPARK-2893] Fix typo. --- .../scala/org/apache/spark/serializer/KryoSerializerSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala index 4daf92400f6cb..e816e66dea023 100644 --- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala +++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala @@ -208,7 +208,7 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext { assert(10 + control.sum === result) } - test("kryo with nonexistant custom registrator should fail") { + test("kryo with nonexistent custom registrator should fail") { import org.apache.spark.{SparkConf, SparkException} val conf = new SparkConf(false) From 65e53c5ffaccc17a66d4153bb523a85e33e01f84 Mon Sep 17 00:00:00 2001 From: Graham Dennis Date: Thu, 14 Aug 2014 17:18:09 +1000 Subject: [PATCH 3/4] [SPARK-2893]: Improve message when a spark.kryo.registrator fails. --- .../org/apache/spark/serializer/KryoSerializer.scala | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala index 4722826347630..85944eabcfefc 100644 --- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala @@ -79,16 +79,16 @@ class KryoSerializer(conf: SparkConf) kryo.register(classOf[HttpBroadcast[_]], new KryoJavaSerializer()) // Allow the user to register their own classes by setting spark.kryo.registrator - try { - for (regCls <- registrator) { - logDebug("Running user registrator: " + regCls) + for (regCls <- registrator) { + logDebug("Running user registrator: " + regCls) + try { val reg = Class.forName(regCls, true, classLoader).newInstance() .asInstanceOf[KryoRegistrator] reg.registerClasses(kryo) + } catch { + case e: Exception => + throw new SparkException(s"Failed to invoke $regCls", e) } - } catch { - case e: Exception => throw - new SparkException("Failed to run spark.kryo.registrator", e) } // Register Chill's classes; we do this after our ranges and the user's own classes to let From fbe4cb68270cd1058d3390ae7fb2faa011676484 Mon Sep 17 00:00:00 2001 From: Graham Dennis Date: Thu, 14 Aug 2014 18:14:10 +1000 Subject: [PATCH 4/4] [SPARK-2878]: Update the test case to match the updated exception message --- .../scala/org/apache/spark/serializer/KryoSerializerSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala index e816e66dea023..3bf9efebb39d2 100644 --- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala +++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala @@ -215,7 +215,7 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext { conf.set("spark.kryo.registrator", "this.class.does.not.exist") val thrown = intercept[SparkException](new KryoSerializer(conf).newInstance()) - assert(thrown.getMessage.contains("Failed to run spark.kryo.registrator")) + assert(thrown.getMessage.contains("Failed to invoke this.class.does.not.exist")) } }