From 830d0d098f01564339a9568b2e736d233c7378bb Mon Sep 17 00:00:00 2001 From: tedyu Date: Sun, 24 May 2015 07:30:34 -0700 Subject: [PATCH 1/7] Kryo buffer size configured in mb should be properly supported --- .../scala/org/apache/spark/serializer/KryoSerializer.scala | 5 ++++- .../org/apache/spark/serializer/KryoSerializerSuite.scala | 4 ++++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala index 217957963437d..7c20e2521698a 100644 --- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala @@ -50,7 +50,10 @@ class KryoSerializer(conf: SparkConf) with Logging with Serializable { - private val bufferSizeKb = conf.getSizeAsKb("spark.kryoserializer.buffer", "64k") + private val bufferSize = conf.get("spark.kryoserializer.buffer", "64k") + private val bufferSizeKb = bufferSize.endsWith("k") || bufferSize.endsWith("kb") ? + conf.getSizeAsKb("spark.kryoserializer.buffer", "64k") : + conf.getSizeAsMb("spark.kryoserializer.buffer", "64k") if (bufferSizeKb >= ByteUnit.GiB.toKiB(2)) { throw new IllegalArgumentException("spark.kryoserializer.buffer must be less than " + diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala index 0bd91a8dba2ab..424186f8315bd 100644 --- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala +++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala @@ -62,6 +62,10 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext { val thrown3 = intercept[IllegalArgumentException](new KryoSerializer(conf4).newInstance()) assert(thrown3.getMessage.contains(kryoBufferProperty)) assert(!thrown3.getMessage.contains(kryoBufferMaxProperty)) + val conf5 = conf.clone() + conf5.set(kryoBufferProperty, "8m") + conf5.set(kryoBufferMaxProperty, "9m") + KryoSerializer(conf2).newInstance() } test("basic types") { From 47399986b8bc9b92298afdc1dfa317f7baa03863 Mon Sep 17 00:00:00 2001 From: tedyu Date: Sun, 24 May 2015 07:45:30 -0700 Subject: [PATCH 2/7] Rewrite bufferSize checking --- .../scala/org/apache/spark/serializer/KryoSerializer.scala | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala index 7c20e2521698a..282d95f4d09fa 100644 --- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala @@ -51,9 +51,12 @@ class KryoSerializer(conf: SparkConf) with Serializable { private val bufferSize = conf.get("spark.kryoserializer.buffer", "64k") - private val bufferSizeKb = bufferSize.endsWith("k") || bufferSize.endsWith("kb") ? - conf.getSizeAsKb("spark.kryoserializer.buffer", "64k") : + private val bufferSizeKb + if (bufferSize.endsWith("k") || bufferSize.endsWith("kb")) { + conf.getSizeAsKb("spark.kryoserializer.buffer", "64k") + } else { conf.getSizeAsMb("spark.kryoserializer.buffer", "64k") + } if (bufferSizeKb >= ByteUnit.GiB.toKiB(2)) { throw new IllegalArgumentException("spark.kryoserializer.buffer must be less than " + From 9a17277c64f18db5b5bef3deea580325d6e5b4c8 Mon Sep 17 00:00:00 2001 From: tedyu Date: Sun, 24 May 2015 07:46:43 -0700 Subject: [PATCH 3/7] Rewrite bufferSize checking --- .../scala/org/apache/spark/serializer/KryoSerializer.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala index 282d95f4d09fa..875e1a41008e8 100644 --- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala @@ -53,9 +53,9 @@ class KryoSerializer(conf: SparkConf) private val bufferSize = conf.get("spark.kryoserializer.buffer", "64k") private val bufferSizeKb if (bufferSize.endsWith("k") || bufferSize.endsWith("kb")) { - conf.getSizeAsKb("spark.kryoserializer.buffer", "64k") + bufferSizeKb = conf.getSizeAsKb("spark.kryoserializer.buffer", "64k") } else { - conf.getSizeAsMb("spark.kryoserializer.buffer", "64k") + bufferSizeKb = conf.getSizeAsMb("spark.kryoserializer.buffer", "64k") } if (bufferSizeKb >= ByteUnit.GiB.toKiB(2)) { From d2fdbc4dc9f55018ca0d4dc4cb822127552a5f64 Mon Sep 17 00:00:00 2001 From: tedyu Date: Sun, 24 May 2015 07:57:21 -0700 Subject: [PATCH 4/7] Give bufferSizeKb initial value --- .../main/scala/org/apache/spark/serializer/KryoSerializer.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala index 875e1a41008e8..8bcce86a45775 100644 --- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala @@ -51,7 +51,7 @@ class KryoSerializer(conf: SparkConf) with Serializable { private val bufferSize = conf.get("spark.kryoserializer.buffer", "64k") - private val bufferSizeKb + private val bufferSizeKb = _ if (bufferSize.endsWith("k") || bufferSize.endsWith("kb")) { bufferSizeKb = conf.getSizeAsKb("spark.kryoserializer.buffer", "64k") } else { From 642de51b491ee594ff5eb607631cde9fa502ee37 Mon Sep 17 00:00:00 2001 From: tedyu Date: Sun, 24 May 2015 08:16:41 -0700 Subject: [PATCH 5/7] Drop change in KryoSerializer so that the new test runs --- .../org/apache/spark/serializer/KryoSerializer.scala | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala index 8bcce86a45775..217957963437d 100644 --- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala @@ -50,13 +50,7 @@ class KryoSerializer(conf: SparkConf) with Logging with Serializable { - private val bufferSize = conf.get("spark.kryoserializer.buffer", "64k") - private val bufferSizeKb = _ - if (bufferSize.endsWith("k") || bufferSize.endsWith("kb")) { - bufferSizeKb = conf.getSizeAsKb("spark.kryoserializer.buffer", "64k") - } else { - bufferSizeKb = conf.getSizeAsMb("spark.kryoserializer.buffer", "64k") - } + private val bufferSizeKb = conf.getSizeAsKb("spark.kryoserializer.buffer", "64k") if (bufferSizeKb >= ByteUnit.GiB.toKiB(2)) { throw new IllegalArgumentException("spark.kryoserializer.buffer must be less than " + From f12ee049b0008632f2779f282b9e99cc15467cd0 Mon Sep 17 00:00:00 2001 From: tedyu Date: Sun, 24 May 2015 08:17:38 -0700 Subject: [PATCH 6/7] Correct conf variable name in test --- .../scala/org/apache/spark/serializer/KryoSerializerSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala index 424186f8315bd..5f758be4ec35d 100644 --- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala +++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala @@ -65,7 +65,7 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext { val conf5 = conf.clone() conf5.set(kryoBufferProperty, "8m") conf5.set(kryoBufferMaxProperty, "9m") - KryoSerializer(conf2).newInstance() + KryoSerializer(conf5).newInstance() } test("basic types") { From c51ea6430d9b5cc7c18bfcd77f033613551c1a32 Mon Sep 17 00:00:00 2001 From: tedyu Date: Sun, 24 May 2015 09:29:01 -0700 Subject: [PATCH 7/7] Fix KryoSerializer creation --- .../scala/org/apache/spark/serializer/KryoSerializerSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala index 5f758be4ec35d..5faf108b394a1 100644 --- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala +++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala @@ -65,7 +65,7 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext { val conf5 = conf.clone() conf5.set(kryoBufferProperty, "8m") conf5.set(kryoBufferMaxProperty, "9m") - KryoSerializer(conf5).newInstance() + new KryoSerializer(conf5).newInstance() } test("basic types") {