From 5641367687760fcc2799a4cfe849d4bc9726f36b Mon Sep 17 00:00:00 2001 From: Sandeep Singh Date: Sun, 1 May 2016 21:41:33 +0530 Subject: [PATCH 01/18] [SPARK-928][CORE] Add support for Unsafe-based serializer in Kryo --- .../spark/serializer/KryoSerializer.scala | 41 ++++-- .../spark/scheduler/KryoBenchmark.scala | 125 ++++++++++++++++++ 2 files changed, 155 insertions(+), 11 deletions(-) create mode 100644 core/src/test/scala/org/apache/spark/scheduler/KryoBenchmark.scala diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala index 1fba552f70501..496b1e627b3e8 100644 --- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala @@ -27,6 +27,7 @@ import scala.reflect.ClassTag import com.esotericsoftware.kryo.{Kryo, KryoException, Serializer => KryoClassSerializer} import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput} +import com.esotericsoftware.kryo.io.{UnsafeInput => KryoUnsafeInput, UnsafeOutput => KryoUnsafeOutput} import com.esotericsoftware.kryo.serializers.{JavaSerializer => KryoJavaSerializer} import com.twitter.chill.{AllScalaRegistrar, EmptyScalaKryoInstantiator} import org.apache.avro.generic.{GenericData, GenericRecord} @@ -78,8 +79,14 @@ class KryoSerializer(conf: SparkConf) .filter(!_.isEmpty) private val avroSchemas = conf.getAvroSchema + private val useUnsafe = conf.getBoolean("spark.kryo.useUnsafe", false) - def newKryoOutput(): KryoOutput = new KryoOutput(bufferSize, math.max(bufferSize, maxBufferSize)) + def newKryoOutput(): KryoOutput = + if (useUnsafe) { + new KryoUnsafeOutput(bufferSize, math.max(bufferSize, maxBufferSize)) + } else { + new KryoOutput(bufferSize, math.max(bufferSize, maxBufferSize)) + } def newKryo(): Kryo = { val instantiator = new EmptyScalaKryoInstantiator @@ -172,7 +179,7 @@ class KryoSerializer(conf: SparkConf) } override def newInstance(): SerializerInstance = { - new KryoSerializerInstance(this) + new KryoSerializerInstance(this, useUnsafe) } private[spark] override lazy val supportsRelocationOfSerializedObjects: Boolean = { @@ -186,9 +193,15 @@ class KryoSerializer(conf: SparkConf) private[spark] class KryoSerializationStream( serInstance: KryoSerializerInstance, - outStream: OutputStream) extends SerializationStream { + outStream: OutputStream, + useUnsafe: Boolean) extends SerializationStream { - private[this] var output: KryoOutput = new KryoOutput(outStream) + private[this] var output: KryoOutput = + if (useUnsafe) { + new KryoUnsafeOutput(outStream) + } else { + new KryoOutput(outStream) + } private[this] var kryo: Kryo = serInstance.borrowKryo() override def writeObject[T: ClassTag](t: T): SerializationStream = { @@ -219,9 +232,14 @@ class KryoSerializationStream( private[spark] class KryoDeserializationStream( serInstance: KryoSerializerInstance, - inStream: InputStream) extends DeserializationStream { + inStream: InputStream, + useUnsafe: Boolean) extends DeserializationStream { - private[this] var input: KryoInput = new KryoInput(inStream) + private[this] var input: KryoInput = if (useUnsafe) { + new KryoUnsafeInput(inStream) + } else { + new KryoInput(inStream) + } private[this] var kryo: Kryo = serInstance.borrowKryo() override def readObject[T: ClassTag](): T = { @@ -248,8 +266,9 @@ class KryoDeserializationStream( } } -private[spark] class KryoSerializerInstance(ks: KryoSerializer) extends SerializerInstance { - +private[spark] class KryoSerializerInstance( + ks: KryoSerializer, + useUnsafe: Boolean) extends SerializerInstance { /** * A re-used [[Kryo]] instance. Methods will borrow this instance by calling `borrowKryo()`, do * their work, then release the instance by calling `releaseKryo()`. Logically, this is a caching @@ -288,7 +307,7 @@ private[spark] class KryoSerializerInstance(ks: KryoSerializer) extends Serializ // Make these lazy vals to avoid creating a buffer unless we use them. private lazy val output = ks.newKryoOutput() - private lazy val input = new KryoInput() + private lazy val input = if (useUnsafe) new KryoUnsafeInput() else new KryoInput() override def serialize[T: ClassTag](t: T): ByteBuffer = { output.clear() @@ -329,11 +348,11 @@ private[spark] class KryoSerializerInstance(ks: KryoSerializer) extends Serializ } override def serializeStream(s: OutputStream): SerializationStream = { - new KryoSerializationStream(this, s) + new KryoSerializationStream(this, s, useUnsafe) } override def deserializeStream(s: InputStream): DeserializationStream = { - new KryoDeserializationStream(this, s) + new KryoDeserializationStream(this, s, useUnsafe) } /** diff --git a/core/src/test/scala/org/apache/spark/scheduler/KryoBenchmark.scala b/core/src/test/scala/org/apache/spark/scheduler/KryoBenchmark.scala new file mode 100644 index 0000000000000..03c37eb176485 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/scheduler/KryoBenchmark.scala @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.reflect.ClassTag +import scala.util.Random + +import org.apache.spark.{SharedSparkContext, SparkFunSuite} +import org.apache.spark.serializer.KryoSerializer +import org.apache.spark.serializer.KryoTest._ +import org.apache.spark.util.Benchmark + +class KryoBenchmark extends SparkFunSuite with SharedSparkContext { + conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + conf.set("spark.kryo.registrator", classOf[MyRegistrator].getName) + + val benchmark = new Benchmark("Benchmark Kryo Unsafe vs safe Serialization", 1024 * 1024 * 15, 10) + + test(s"Benchmark Kryo Unsafe vs safe Serialization") { + Seq (false, true).foreach (runBenchmark) + benchmark.run() + + // scalastyle:off + /* + Java HotSpot(TM) 64-Bit Server VM 1.8.0_60-b27 on Mac OS X 10.11.4 + Intel(R) Core(TM) i7-4770HQ CPU @ 2.20GHz + + Benchmark Kryo Unsafe vs safe Serialization: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + ------------------------------------------------------------------------------------------------ + basicTypes: Int unsafe:false 2 / 4 8988.0 0.1 1.0X + basicTypes: Long unsafe:false 1 / 1 13981.3 0.1 1.6X + basicTypes: Float unsafe:false 1 / 1 14460.6 0.1 1.6X + basicTypes: Double unsafe:false 1 / 1 15876.9 0.1 1.8X + Array: Int unsafe:false 33 / 44 474.8 2.1 0.1X + Array: Long unsafe:false 18 / 25 888.6 1.1 0.1X + Array: Float unsafe:false 10 / 16 1627.4 0.6 0.2X + Array: Double unsafe:false 10 / 13 1523.1 0.7 0.2X + Map of string->Double unsafe:false 413 / 447 38.1 26.3 0.0X + basicTypes: Int unsafe:true 1 / 1 16402.6 0.1 1.8X + basicTypes: Long unsafe:true 1 / 1 19732.1 0.1 2.2X + basicTypes: Float unsafe:true 1 / 1 19752.9 0.1 2.2X + basicTypes: Double unsafe:true 1 / 1 23111.4 0.0 2.6X + Array: Int unsafe:true 7 / 8 2239.9 0.4 0.2X + Array: Long unsafe:true 8 / 9 2000.1 0.5 0.2X + Array: Float unsafe:true 7 / 8 2191.5 0.5 0.2X + Array: Double unsafe:true 9 / 10 1841.2 0.5 0.2X + Map of string->Double unsafe:true 387 / 407 40.7 24.6 0.0X + */ + // scalastyle:on + } + + private def runBenchmark(useUnsafe: Boolean): Unit = { + conf.set("spark.kryo.useUnsafe", useUnsafe.toString) + val ser = new KryoSerializer(conf).newInstance() + + def addBenchmark(name: String, values: Long)(f: => Long): Unit = { + benchmark.addCase(s"$name unsafe:$useUnsafe") { iters => + f + 1 + } + } + + def check[T: ClassTag](t: T): Int = { + if (ser.deserialize[T](ser.serialize(t)) === t) 1 else 0 + } + + var N = 5000000 + basicTypes("Int", Random.nextInt) + basicTypes("Long", Random.nextLong) + basicTypes("Float", Random.nextFloat) + basicTypes("Double", Random.nextDouble) + + N = 100000 + basicTypeArray("Int", Random.nextInt) + basicTypeArray("Long", Random.nextLong) + basicTypeArray("Float", Random.nextFloat) + basicTypeArray("Double", Random.nextDouble) + + N = 500 + addBenchmark("Map of string->Double", N) { + var sum = 0L + for (i <- 1 to N) { + val map = Array.fill(i)((Random.nextString(i/10), Random.nextDouble())).toMap + sum += check(map) + } + sum + } + + def basicTypes[T: ClassTag](name: String, fn: () => T): Unit = { + addBenchmark(s"basicTypes: $name", N) { + var sum = 0L + for (i <- 1 to N) { + sum += check(fn) + } + sum + } + } + + def basicTypeArray[T: ClassTag](name: String, fn: () => T): Unit = { + addBenchmark(s"Array: $name", N) { + var sum = 0L + for (i <- 1 to N) { + val arr = Array.fill[T](i)(fn()) + sum += check(arr) + } + sum + } + } + } + +} From 112497b9f565d9f2a9429ad05e5f49d1b0b285a1 Mon Sep 17 00:00:00 2001 From: Sandeep Singh Date: Thu, 5 May 2016 12:38:27 +0530 Subject: [PATCH 02/18] move Benchmark to serialisation from scheduler --- .../spark/scheduler/KryoBenchmark.scala | 125 ---------------- .../spark/serializer/KryoBenchmark.scala | 133 ++++++++++++++++++ .../serializer/KryoSerializerSuite.scala | 8 ++ 3 files changed, 141 insertions(+), 125 deletions(-) delete mode 100644 core/src/test/scala/org/apache/spark/scheduler/KryoBenchmark.scala create mode 100644 core/src/test/scala/org/apache/spark/serializer/KryoBenchmark.scala diff --git a/core/src/test/scala/org/apache/spark/scheduler/KryoBenchmark.scala b/core/src/test/scala/org/apache/spark/scheduler/KryoBenchmark.scala deleted file mode 100644 index 03c37eb176485..0000000000000 --- a/core/src/test/scala/org/apache/spark/scheduler/KryoBenchmark.scala +++ /dev/null @@ -1,125 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.scheduler - -import scala.reflect.ClassTag -import scala.util.Random - -import org.apache.spark.{SharedSparkContext, SparkFunSuite} -import org.apache.spark.serializer.KryoSerializer -import org.apache.spark.serializer.KryoTest._ -import org.apache.spark.util.Benchmark - -class KryoBenchmark extends SparkFunSuite with SharedSparkContext { - conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") - conf.set("spark.kryo.registrator", classOf[MyRegistrator].getName) - - val benchmark = new Benchmark("Benchmark Kryo Unsafe vs safe Serialization", 1024 * 1024 * 15, 10) - - test(s"Benchmark Kryo Unsafe vs safe Serialization") { - Seq (false, true).foreach (runBenchmark) - benchmark.run() - - // scalastyle:off - /* - Java HotSpot(TM) 64-Bit Server VM 1.8.0_60-b27 on Mac OS X 10.11.4 - Intel(R) Core(TM) i7-4770HQ CPU @ 2.20GHz - - Benchmark Kryo Unsafe vs safe Serialization: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - ------------------------------------------------------------------------------------------------ - basicTypes: Int unsafe:false 2 / 4 8988.0 0.1 1.0X - basicTypes: Long unsafe:false 1 / 1 13981.3 0.1 1.6X - basicTypes: Float unsafe:false 1 / 1 14460.6 0.1 1.6X - basicTypes: Double unsafe:false 1 / 1 15876.9 0.1 1.8X - Array: Int unsafe:false 33 / 44 474.8 2.1 0.1X - Array: Long unsafe:false 18 / 25 888.6 1.1 0.1X - Array: Float unsafe:false 10 / 16 1627.4 0.6 0.2X - Array: Double unsafe:false 10 / 13 1523.1 0.7 0.2X - Map of string->Double unsafe:false 413 / 447 38.1 26.3 0.0X - basicTypes: Int unsafe:true 1 / 1 16402.6 0.1 1.8X - basicTypes: Long unsafe:true 1 / 1 19732.1 0.1 2.2X - basicTypes: Float unsafe:true 1 / 1 19752.9 0.1 2.2X - basicTypes: Double unsafe:true 1 / 1 23111.4 0.0 2.6X - Array: Int unsafe:true 7 / 8 2239.9 0.4 0.2X - Array: Long unsafe:true 8 / 9 2000.1 0.5 0.2X - Array: Float unsafe:true 7 / 8 2191.5 0.5 0.2X - Array: Double unsafe:true 9 / 10 1841.2 0.5 0.2X - Map of string->Double unsafe:true 387 / 407 40.7 24.6 0.0X - */ - // scalastyle:on - } - - private def runBenchmark(useUnsafe: Boolean): Unit = { - conf.set("spark.kryo.useUnsafe", useUnsafe.toString) - val ser = new KryoSerializer(conf).newInstance() - - def addBenchmark(name: String, values: Long)(f: => Long): Unit = { - benchmark.addCase(s"$name unsafe:$useUnsafe") { iters => - f + 1 - } - } - - def check[T: ClassTag](t: T): Int = { - if (ser.deserialize[T](ser.serialize(t)) === t) 1 else 0 - } - - var N = 5000000 - basicTypes("Int", Random.nextInt) - basicTypes("Long", Random.nextLong) - basicTypes("Float", Random.nextFloat) - basicTypes("Double", Random.nextDouble) - - N = 100000 - basicTypeArray("Int", Random.nextInt) - basicTypeArray("Long", Random.nextLong) - basicTypeArray("Float", Random.nextFloat) - basicTypeArray("Double", Random.nextDouble) - - N = 500 - addBenchmark("Map of string->Double", N) { - var sum = 0L - for (i <- 1 to N) { - val map = Array.fill(i)((Random.nextString(i/10), Random.nextDouble())).toMap - sum += check(map) - } - sum - } - - def basicTypes[T: ClassTag](name: String, fn: () => T): Unit = { - addBenchmark(s"basicTypes: $name", N) { - var sum = 0L - for (i <- 1 to N) { - sum += check(fn) - } - sum - } - } - - def basicTypeArray[T: ClassTag](name: String, fn: () => T): Unit = { - addBenchmark(s"Array: $name", N) { - var sum = 0L - for (i <- 1 to N) { - val arr = Array.fill[T](i)(fn()) - sum += check(arr) - } - sum - } - } - } - -} diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoBenchmark.scala b/core/src/test/scala/org/apache/spark/serializer/KryoBenchmark.scala new file mode 100644 index 0000000000000..385e1cb1edc5a --- /dev/null +++ b/core/src/test/scala/org/apache/spark/serializer/KryoBenchmark.scala @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.serializer + +import scala.reflect.ClassTag +import scala.util.Random + +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.serializer.KryoTest._ +import org.apache.spark.util.Benchmark + +class KryoBenchmark extends SparkFunSuite { + val benchmark = new Benchmark("Benchmark Kryo Unsafe vs safe Serialization", 1024 * 1024 * 15, 10) + + ignore(s"Benchmark Kryo Unsafe vs safe Serialization") { + Seq (true, false).foreach (runBenchmark) + benchmark.run() + + // scalastyle:off + /* + Benchmark Kryo Unsafe vs safe Serialization: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + ------------------------------------------------------------------------------------------------ + basicTypes: Int unsafe:true 160 / 178 98.5 10.1 1.0X + basicTypes: Long unsafe:true 210 / 218 74.9 13.4 0.8X + basicTypes: Float unsafe:true 203 / 213 77.5 12.9 0.8X + basicTypes: Double unsafe:true 226 / 235 69.5 14.4 0.7X + Array: Int unsafe:true 1087 / 1101 14.5 69.1 0.1X + Array: Long unsafe:true 2758 / 2844 5.7 175.4 0.1X + Array: Float unsafe:true 1511 / 1552 10.4 96.1 0.1X + Array: Double unsafe:true 2942 / 2972 5.3 187.0 0.1X + Map of string->Double unsafe:true 2645 / 2739 5.9 168.2 0.1X + basicTypes: Int unsafe:false 211 / 218 74.7 13.4 0.8X + basicTypes: Long unsafe:false 247 / 253 63.6 15.7 0.6X + basicTypes: Float unsafe:false 211 / 216 74.5 13.4 0.8X + basicTypes: Double unsafe:false 227 / 233 69.2 14.4 0.7X + Array: Int unsafe:false 3012 / 3032 5.2 191.5 0.1X + Array: Long unsafe:false 4463 / 4515 3.5 283.8 0.0X + Array: Float unsafe:false 2788 / 2868 5.6 177.2 0.1X + Array: Double unsafe:false 3558 / 3752 4.4 226.2 0.0X + Map of string->Double unsafe:false 2806 / 2933 5.6 178.4 0.1X + */ + // scalastyle:on + } + + private def runBenchmark(useUnsafe: Boolean): Unit = { + def addBenchmark(name: String, values: Long)(f: => Long): Unit = { + benchmark.addCase(s"$name unsafe:$useUnsafe") { _ => + f + } + } + + def check[T: ClassTag](t: T, ser: SerializerInstance): Int = { + if (ser.deserialize[T](ser.serialize(t)) === t) 1 else 0 + } + + val N1 = 1000000 + def basicTypes[T: ClassTag](name: String, fn: () => T): Unit = { + lazy val ser = createSerializer(useUnsafe) + addBenchmark(s"basicTypes: $name", N1) { + var sum = 0L + var i = 0 + while (i < N1) { + sum += check(fn(), ser) + i += 1 + } + sum + } + } + basicTypes("Int", Random.nextInt) + basicTypes("Long", Random.nextLong) + basicTypes("Float", Random.nextFloat) + basicTypes("Double", Random.nextDouble) + + val N2 = 10000 + def basicTypeArray[T: ClassTag](name: String, fn: () => T): Unit = { + lazy val ser = createSerializer(useUnsafe) + addBenchmark(s"Array: $name", N2) { + var sum = 0L + var i = 0 + while (i < N2) { + val arr = Array.fill[T](i)(fn()) + sum += check(arr, ser) + i += 1 + } + sum + } + } + basicTypeArray("Int", Random.nextInt) + basicTypeArray("Long", Random.nextLong) + basicTypeArray("Float", Random.nextFloat) + basicTypeArray("Double", Random.nextDouble) + + { + val N3 = 1000 + lazy val ser = createSerializer(useUnsafe) + addBenchmark("Map of string->Double", N3) { + var sum = 0L + var i = 0 + while (i < N3) { + val map = Array.fill(i)((Random.nextString(i / 10), Random.nextDouble())).toMap + sum += check(map, ser) + i += 1 + } + sum + } + } + } + + def createSerializer(useUnsafe: Boolean): SerializerInstance = { + val conf = new SparkConf() + conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + conf.set("spark.kryo.registrator", classOf[MyRegistrator].getName) + conf.set("spark.kryo.useUnsafe", useUnsafe.toString) + + new KryoSerializer(conf).newInstance() + } + +} diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala index 57a82312008e9..074d2cc95daac 100644 --- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala +++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala @@ -399,6 +399,14 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext { assert(!ser2.getAutoReset) } + private def testBothUnsafeAndSafe(f: SparkConf => Unit): Unit = { + Seq(true, false).foreach { useUnsafe => + val conf = new SparkConf() + conf.set("spark.kryo.useUnsafe", useUnsafe.toString) + f(conf) + } + } + private def testSerializerInstanceReuse(autoReset: Boolean, referenceTracking: Boolean): Unit = { val conf = new SparkConf(loadDefaults = false) .set("spark.kryo.referenceTracking", referenceTracking.toString) From 498a98451229b9ca3860d18f7aa7ea78a9ebfc6f Mon Sep 17 00:00:00 2001 From: Sandeep Singh Date: Thu, 4 Aug 2016 20:44:32 +0530 Subject: [PATCH 03/18] test both safe and unsafe --- .../serializer/KryoSerializerSuite.scala | 42 +++++++++++-------- 1 file changed, 25 insertions(+), 17 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala index 074d2cc95daac..c37e3c64d848e 100644 --- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala +++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala @@ -75,9 +75,11 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext { } test("basic types") { - val ser = new KryoSerializer(conf).newInstance() def check[T: ClassTag](t: T) { - assert(ser.deserialize[T](ser.serialize(t)) === t) + testBothUnsafeAndSafe(conf) { sparkConf => + val ser = new KryoSerializer(sparkConf).newInstance() + assert(ser.deserialize[T](ser.serialize(t)) === t) + } } check(1) check(1L) @@ -105,9 +107,11 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext { } test("pairs") { - val ser = new KryoSerializer(conf).newInstance() def check[T: ClassTag](t: T) { - assert(ser.deserialize[T](ser.serialize(t)) === t) + testBothUnsafeAndSafe(conf) { sparkConf => + val ser = new KryoSerializer(sparkConf).newInstance() + assert(ser.deserialize[T](ser.serialize(t)) === t) + } } check((1, 1)) check((1, 1L)) @@ -129,9 +133,11 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext { } test("Scala data structures") { - val ser = new KryoSerializer(conf).newInstance() def check[T: ClassTag](t: T) { - assert(ser.deserialize[T](ser.serialize(t)) === t) + testBothUnsafeAndSafe(conf) { sparkConf => + val ser = new KryoSerializer(sparkConf).newInstance() + assert(ser.deserialize[T](ser.serialize(t)) === t) + } } check(List[Int]()) check(List[Int](1, 2, 3)) @@ -154,10 +160,12 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext { } test("Bug: SPARK-10251") { - val ser = new KryoSerializer(conf.clone.set("spark.kryo.registrationRequired", "true")) - .newInstance() def check[T: ClassTag](t: T) { - assert(ser.deserialize[T](ser.serialize(t)) === t) + testBothUnsafeAndSafe(conf) { sparkConf => + val ser = new KryoSerializer(sparkConf.clone.set("spark.kryo.registrationRequired", "true")) + .newInstance() + assert(ser.deserialize[T](ser.serialize(t)) === t) + } } check((1, 3)) check(Array((1, 3))) @@ -184,11 +192,13 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext { } test("ranges") { - val ser = new KryoSerializer(conf).newInstance() def check[T: ClassTag](t: T) { - assert(ser.deserialize[T](ser.serialize(t)) === t) - // Check that very long ranges don't get written one element at a time - assert(ser.serialize(t).limit < 100) + testBothUnsafeAndSafe(conf) { sparkConf => + val ser = new KryoSerializer(sparkConf).newInstance() + assert(ser.deserialize[T](ser.serialize(t)) === t) + // Check that very long ranges don't get written one element at a time + assert(ser.serialize(t).limit < 100) + } } check(1 to 1000000) check(1 to 1000000 by 2) @@ -399,11 +409,9 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext { assert(!ser2.getAutoReset) } - private def testBothUnsafeAndSafe(f: SparkConf => Unit): Unit = { + private def testBothUnsafeAndSafe(conf: SparkConf)(f: SparkConf => Unit): Unit = { Seq(true, false).foreach { useUnsafe => - val conf = new SparkConf() - conf.set("spark.kryo.useUnsafe", useUnsafe.toString) - f(conf) + f(conf.clone.set("spark.kryo.useUnsafe", useUnsafe.toString)) } } From 69c6317d931032dbb055be1324826f65485bf9be Mon Sep 17 00:00:00 2001 From: Sandeep Singh Date: Wed, 19 Oct 2016 14:33:59 +0300 Subject: [PATCH 04/18] spark.kryo.unsafe --- .../main/scala/org/apache/spark/serializer/KryoSerializer.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala index 496b1e627b3e8..d852cdad98263 100644 --- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala @@ -79,7 +79,7 @@ class KryoSerializer(conf: SparkConf) .filter(!_.isEmpty) private val avroSchemas = conf.getAvroSchema - private val useUnsafe = conf.getBoolean("spark.kryo.useUnsafe", false) + private val useUnsafe = conf.getBoolean("spark.kryo.unsafe", false) def newKryoOutput(): KryoOutput = if (useUnsafe) { From 7dd32e103b6a73a0153952ab71e33380e8ed7834 Mon Sep 17 00:00:00 2001 From: Sandeep Singh Date: Wed, 19 Oct 2016 16:02:45 +0300 Subject: [PATCH 05/18] replace all --- .../test/scala/org/apache/spark/serializer/KryoBenchmark.scala | 2 +- .../scala/org/apache/spark/serializer/KryoSerializerSuite.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoBenchmark.scala b/core/src/test/scala/org/apache/spark/serializer/KryoBenchmark.scala index 385e1cb1edc5a..5f20ce0753b51 100644 --- a/core/src/test/scala/org/apache/spark/serializer/KryoBenchmark.scala +++ b/core/src/test/scala/org/apache/spark/serializer/KryoBenchmark.scala @@ -125,7 +125,7 @@ class KryoBenchmark extends SparkFunSuite { val conf = new SparkConf() conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") conf.set("spark.kryo.registrator", classOf[MyRegistrator].getName) - conf.set("spark.kryo.useUnsafe", useUnsafe.toString) + conf.set("spark.kryo.unsafe", useUnsafe.toString) new KryoSerializer(conf).newInstance() } diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala index c37e3c64d848e..ee2958bf35836 100644 --- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala +++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala @@ -411,7 +411,7 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext { private def testBothUnsafeAndSafe(conf: SparkConf)(f: SparkConf => Unit): Unit = { Seq(true, false).foreach { useUnsafe => - f(conf.clone.set("spark.kryo.useUnsafe", useUnsafe.toString)) + f(conf.clone.set("spark.kryo.unsafe", useUnsafe.toString)) } } From 82a7d089e43ae8c18bdb2e6dfbfa535cede42676 Mon Sep 17 00:00:00 2001 From: Sandeep Singh Date: Wed, 19 Oct 2016 16:10:07 +0300 Subject: [PATCH 06/18] revert kryo serializer suite --- .../serializer/KryoSerializerSuite.scala | 42 ++++++------------- 1 file changed, 13 insertions(+), 29 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala index ee2958bf35836..57a82312008e9 100644 --- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala +++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala @@ -75,11 +75,9 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext { } test("basic types") { + val ser = new KryoSerializer(conf).newInstance() def check[T: ClassTag](t: T) { - testBothUnsafeAndSafe(conf) { sparkConf => - val ser = new KryoSerializer(sparkConf).newInstance() - assert(ser.deserialize[T](ser.serialize(t)) === t) - } + assert(ser.deserialize[T](ser.serialize(t)) === t) } check(1) check(1L) @@ -107,11 +105,9 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext { } test("pairs") { + val ser = new KryoSerializer(conf).newInstance() def check[T: ClassTag](t: T) { - testBothUnsafeAndSafe(conf) { sparkConf => - val ser = new KryoSerializer(sparkConf).newInstance() - assert(ser.deserialize[T](ser.serialize(t)) === t) - } + assert(ser.deserialize[T](ser.serialize(t)) === t) } check((1, 1)) check((1, 1L)) @@ -133,11 +129,9 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext { } test("Scala data structures") { + val ser = new KryoSerializer(conf).newInstance() def check[T: ClassTag](t: T) { - testBothUnsafeAndSafe(conf) { sparkConf => - val ser = new KryoSerializer(sparkConf).newInstance() - assert(ser.deserialize[T](ser.serialize(t)) === t) - } + assert(ser.deserialize[T](ser.serialize(t)) === t) } check(List[Int]()) check(List[Int](1, 2, 3)) @@ -160,12 +154,10 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext { } test("Bug: SPARK-10251") { + val ser = new KryoSerializer(conf.clone.set("spark.kryo.registrationRequired", "true")) + .newInstance() def check[T: ClassTag](t: T) { - testBothUnsafeAndSafe(conf) { sparkConf => - val ser = new KryoSerializer(sparkConf.clone.set("spark.kryo.registrationRequired", "true")) - .newInstance() - assert(ser.deserialize[T](ser.serialize(t)) === t) - } + assert(ser.deserialize[T](ser.serialize(t)) === t) } check((1, 3)) check(Array((1, 3))) @@ -192,13 +184,11 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext { } test("ranges") { + val ser = new KryoSerializer(conf).newInstance() def check[T: ClassTag](t: T) { - testBothUnsafeAndSafe(conf) { sparkConf => - val ser = new KryoSerializer(sparkConf).newInstance() - assert(ser.deserialize[T](ser.serialize(t)) === t) - // Check that very long ranges don't get written one element at a time - assert(ser.serialize(t).limit < 100) - } + assert(ser.deserialize[T](ser.serialize(t)) === t) + // Check that very long ranges don't get written one element at a time + assert(ser.serialize(t).limit < 100) } check(1 to 1000000) check(1 to 1000000 by 2) @@ -409,12 +399,6 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext { assert(!ser2.getAutoReset) } - private def testBothUnsafeAndSafe(conf: SparkConf)(f: SparkConf => Unit): Unit = { - Seq(true, false).foreach { useUnsafe => - f(conf.clone.set("spark.kryo.unsafe", useUnsafe.toString)) - } - } - private def testSerializerInstanceReuse(autoReset: Boolean, referenceTracking: Boolean): Unit = { val conf = new SparkConf(loadDefaults = false) .set("spark.kryo.referenceTracking", referenceTracking.toString) From d5f2dbec8ab7be8c355ae497225ba62b21a7baef Mon Sep 17 00:00:00 2001 From: Sandeep Singh Date: Wed, 19 Oct 2016 16:17:20 +0300 Subject: [PATCH 07/18] UnsafeKryoSerializerSuite should extend from KryoSerializerSuite --- .../UnsafeKryoSerializerSuite.scala | 30 +++++++++++++++++++ 1 file changed, 30 insertions(+) create mode 100644 core/src/test/scala/org/apache/spark/serializer/UnsafeKryoSerializerSuite.scala diff --git a/core/src/test/scala/org/apache/spark/serializer/UnsafeKryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/UnsafeKryoSerializerSuite.scala new file mode 100644 index 0000000000000..e2ba6681e146b --- /dev/null +++ b/core/src/test/scala/org/apache/spark/serializer/UnsafeKryoSerializerSuite.scala @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.serializer + +class UnsafeKryoSerializerSuite extends KryoSerializerSuite { + override def beforeAll() { + super.beforeAll() + conf.set("spark.kryo.unsafe", "true") + } + + override def afterAll(): Unit = { + conf.set("spark.kryo.unsafe", "false") + super.afterAll() + } +} From 764793054795519e240da61622ddea040f672169 Mon Sep 17 00:00:00 2001 From: Sandeep Singh Date: Wed, 19 Oct 2016 16:18:41 +0300 Subject: [PATCH 08/18] remove afterAll --- .../apache/spark/serializer/UnsafeKryoSerializerSuite.scala | 5 ----- 1 file changed, 5 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/serializer/UnsafeKryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/UnsafeKryoSerializerSuite.scala index e2ba6681e146b..dcab3d6158b42 100644 --- a/core/src/test/scala/org/apache/spark/serializer/UnsafeKryoSerializerSuite.scala +++ b/core/src/test/scala/org/apache/spark/serializer/UnsafeKryoSerializerSuite.scala @@ -22,9 +22,4 @@ class UnsafeKryoSerializerSuite extends KryoSerializerSuite { super.beforeAll() conf.set("spark.kryo.unsafe", "true") } - - override def afterAll(): Unit = { - conf.set("spark.kryo.unsafe", "false") - super.afterAll() - } } From 37ccea41caf9f9f4bbd4a4cb0fc70c43f23f564f Mon Sep 17 00:00:00 2001 From: Sandeep Singh Date: Wed, 19 Oct 2016 16:19:56 +0300 Subject: [PATCH 09/18] add comment --- .../apache/spark/serializer/UnsafeKryoSerializerSuite.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/core/src/test/scala/org/apache/spark/serializer/UnsafeKryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/UnsafeKryoSerializerSuite.scala index dcab3d6158b42..84adedb3de25a 100644 --- a/core/src/test/scala/org/apache/spark/serializer/UnsafeKryoSerializerSuite.scala +++ b/core/src/test/scala/org/apache/spark/serializer/UnsafeKryoSerializerSuite.scala @@ -18,6 +18,9 @@ package org.apache.spark.serializer class UnsafeKryoSerializerSuite extends KryoSerializerSuite { + + // This test suite should run all tests in KryoSerializerSuite with kryo unsafe. + override def beforeAll() { super.beforeAll() conf.set("spark.kryo.unsafe", "true") From 41939703f0ccebcfb9e767594f1bb9d87366bbfb Mon Sep 17 00:00:00 2001 From: Sandeep Singh Date: Thu, 20 Oct 2016 21:46:51 +0300 Subject: [PATCH 10/18] use unsafe false in kryoSerSuite --- .../scala/org/apache/spark/serializer/KryoSerializerSuite.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala index 57a82312008e9..ca6393d9b71ae 100644 --- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala +++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala @@ -36,6 +36,7 @@ import org.apache.spark.util.Utils class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext { conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") conf.set("spark.kryo.registrator", classOf[MyRegistrator].getName) + conf.set("spark.kryo.unsafe", "false") test("SPARK-7392 configuration limits") { val kryoBufferProperty = "spark.kryoserializer.buffer" From b9c81a5c07797f4d975c2b5a741c497f98235ab5 Mon Sep 17 00:00:00 2001 From: Sandeep Singh Date: Thu, 20 Oct 2016 22:01:25 +0300 Subject: [PATCH 11/18] Add doc for unsafe IO --- docs/configuration.md | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/docs/configuration.md b/docs/configuration.md index a4a99d6fa4630..b07867d99aa9d 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -799,6 +799,14 @@ Apart from these, the following properties are also available, and may be useful See the tuning guide for more details. + + spark.kryo.unsafe + false + + Whether to use unsafe based Kryo serializer. Can be + substantially faster by using Unsafe Based IO. + + spark.kryoserializer.buffer.max 64m From a23b2de8884fe07c5e55f5622709a20abb1d5c8a Mon Sep 17 00:00:00 2001 From: Sandeep Singh Date: Sat, 22 Oct 2016 08:36:38 +0300 Subject: [PATCH 12/18] set conf before spark context is created --- .../org/apache/spark/serializer/UnsafeKryoSerializerSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/serializer/UnsafeKryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/UnsafeKryoSerializerSuite.scala index 84adedb3de25a..d77671c138164 100644 --- a/core/src/test/scala/org/apache/spark/serializer/UnsafeKryoSerializerSuite.scala +++ b/core/src/test/scala/org/apache/spark/serializer/UnsafeKryoSerializerSuite.scala @@ -22,7 +22,7 @@ class UnsafeKryoSerializerSuite extends KryoSerializerSuite { // This test suite should run all tests in KryoSerializerSuite with kryo unsafe. override def beforeAll() { - super.beforeAll() conf.set("spark.kryo.unsafe", "true") + super.beforeAll() } } From 084542b50d8c9726b96d1f4d6e5ea9b8f422612b Mon Sep 17 00:00:00 2001 From: Sandeep Singh Date: Sat, 22 Oct 2016 08:42:22 +0300 Subject: [PATCH 13/18] reset afterAll --- .../apache/spark/serializer/UnsafeKryoSerializerSuite.scala | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/core/src/test/scala/org/apache/spark/serializer/UnsafeKryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/UnsafeKryoSerializerSuite.scala index d77671c138164..d63a45ae4a6a9 100644 --- a/core/src/test/scala/org/apache/spark/serializer/UnsafeKryoSerializerSuite.scala +++ b/core/src/test/scala/org/apache/spark/serializer/UnsafeKryoSerializerSuite.scala @@ -25,4 +25,9 @@ class UnsafeKryoSerializerSuite extends KryoSerializerSuite { conf.set("spark.kryo.unsafe", "true") super.beforeAll() } + + override def afterAll() { + conf.set("spark.kryo.unsafe", "false") + super.afterAll() + } } From 3977ccb2f58f7bf23d0bd4807d615c38939d6342 Mon Sep 17 00:00:00 2001 From: Sandeep Singh Date: Sat, 22 Oct 2016 08:45:54 +0300 Subject: [PATCH 14/18] address comments --- .../spark/serializer/KryoSerializer.scala | 20 +++++++------------ 1 file changed, 7 insertions(+), 13 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala index d852cdad98263..9817a10e0a58c 100644 --- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala @@ -197,11 +197,8 @@ class KryoSerializationStream( useUnsafe: Boolean) extends SerializationStream { private[this] var output: KryoOutput = - if (useUnsafe) { - new KryoUnsafeOutput(outStream) - } else { - new KryoOutput(outStream) - } + if (useUnsafe) new KryoUnsafeOutput(outStream) else new KryoOutput(outStream) + private[this] var kryo: Kryo = serInstance.borrowKryo() override def writeObject[T: ClassTag](t: T): SerializationStream = { @@ -235,11 +232,9 @@ class KryoDeserializationStream( inStream: InputStream, useUnsafe: Boolean) extends DeserializationStream { - private[this] var input: KryoInput = if (useUnsafe) { - new KryoUnsafeInput(inStream) - } else { - new KryoInput(inStream) - } + private[this] var input: KryoInput = + if (useUnsafe) new KryoUnsafeInput(inStream) else new KryoInput(inStream) + private[this] var kryo: Kryo = serInstance.borrowKryo() override def readObject[T: ClassTag](): T = { @@ -266,9 +261,8 @@ class KryoDeserializationStream( } } -private[spark] class KryoSerializerInstance( - ks: KryoSerializer, - useUnsafe: Boolean) extends SerializerInstance { +private[spark] class KryoSerializerInstance(ks: KryoSerializer, useUnsafe: Boolean) + extends SerializerInstance { /** * A re-used [[Kryo]] instance. Methods will borrow this instance by calling `borrowKryo()`, do * their work, then release the instance by calling `releaseKryo()`. Logically, this is a caching From 976e336672c66df8de8b4dc2e96d98a187d103ee Mon Sep 17 00:00:00 2001 From: Sandeep Singh Date: Sat, 22 Oct 2016 08:49:42 +0300 Subject: [PATCH 15/18] add comment --- .../main/scala/org/apache/spark/serializer/KryoSerializer.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala index 9817a10e0a58c..0d26281fe1076 100644 --- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala @@ -79,6 +79,7 @@ class KryoSerializer(conf: SparkConf) .filter(!_.isEmpty) private val avroSchemas = conf.getAvroSchema + // whether to use unsafe based IO for serialization private val useUnsafe = conf.getBoolean("spark.kryo.unsafe", false) def newKryoOutput(): KryoOutput = From 4de5c6db4817f13c912626278970b17b7bfa965a Mon Sep 17 00:00:00 2001 From: Sandeep Singh Date: Sat, 22 Oct 2016 09:32:23 +0300 Subject: [PATCH 16/18] fix benchmark --- .../spark/serializer/KryoBenchmark.scala | 62 ++++++++++--------- 1 file changed, 34 insertions(+), 28 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoBenchmark.scala b/core/src/test/scala/org/apache/spark/serializer/KryoBenchmark.scala index 5f20ce0753b51..5ccd0bc1a8809 100644 --- a/core/src/test/scala/org/apache/spark/serializer/KryoBenchmark.scala +++ b/core/src/test/scala/org/apache/spark/serializer/KryoBenchmark.scala @@ -58,24 +58,21 @@ class KryoBenchmark extends SparkFunSuite { } private def runBenchmark(useUnsafe: Boolean): Unit = { - def addBenchmark(name: String, values: Long)(f: => Long): Unit = { - benchmark.addCase(s"$name unsafe:$useUnsafe") { _ => - f - } - } - def check[T: ClassTag](t: T, ser: SerializerInstance): Int = { if (ser.deserialize[T](ser.serialize(t)) === t) 1 else 0 } - val N1 = 1000000 - def basicTypes[T: ClassTag](name: String, fn: () => T): Unit = { + // Benchmark Primitives + val basicTypeCount = 1000000 + def basicTypes[T: ClassTag](name: String, gen: () => T): Unit = { lazy val ser = createSerializer(useUnsafe) - addBenchmark(s"basicTypes: $name", N1) { + val arrayOfBasicType: Array[T] = Array.fill(basicTypeCount)(gen()) + + benchmark.addCase(s"basicTypes: $name with unsafe:$useUnsafe") { _ => var sum = 0L var i = 0 - while (i < N1) { - sum += check(fn(), ser) + while (i < basicTypeCount) { + sum += check(arrayOfBasicType(i), ser) i += 1 } sum @@ -86,14 +83,18 @@ class KryoBenchmark extends SparkFunSuite { basicTypes("Float", Random.nextFloat) basicTypes("Double", Random.nextDouble) - val N2 = 10000 - def basicTypeArray[T: ClassTag](name: String, fn: () => T): Unit = { + // Benchmark Array of Primitives + val arrayCount = 10000 + def basicTypeArray[T: ClassTag](name: String, gen: () => T): Unit = { lazy val ser = createSerializer(useUnsafe) - addBenchmark(s"Array: $name", N2) { + val arrayOfArrays: Array[Array[T]] = + Array.fill(arrayCount)(Array.fill[T](Random.nextInt(arrayCount))(gen())) + + benchmark.addCase(s"Array: $name with unsafe:$useUnsafe") { _ => var sum = 0L var i = 0 - while (i < N2) { - val arr = Array.fill[T](i)(fn()) + while (i < arrayCount) { + val arr = arrayOfArrays(i) sum += check(arr, ser) i += 1 } @@ -105,19 +106,24 @@ class KryoBenchmark extends SparkFunSuite { basicTypeArray("Float", Random.nextFloat) basicTypeArray("Double", Random.nextDouble) - { - val N3 = 1000 - lazy val ser = createSerializer(useUnsafe) - addBenchmark("Map of string->Double", N3) { - var sum = 0L - var i = 0 - while (i < N3) { - val map = Array.fill(i)((Random.nextString(i / 10), Random.nextDouble())).toMap - sum += check(map, ser) - i += 1 - } - sum + // Benchmark Maps + val mapsCount = 1000 + lazy val ser = createSerializer(useUnsafe) + val arrayOfMaps: Array[Map[String, Double]] = Array.fill(mapsCount) { + Array.fill(Random.nextInt(mapsCount)) { + (Random.nextString(mapsCount / 10), Random.nextDouble()) + }.toMap + } + + benchmark.addCase(s"Map of string->Double with unsafe:$useUnsafe") { _ => + var sum = 0L + var i = 0 + while (i < mapsCount) { + val map = arrayOfMaps(i) + sum += check(map, ser) + i += 1 } + sum } } From f81f0a067ca82aeeaee6e072462f971d36999b05 Mon Sep 17 00:00:00 2001 From: Sandeep Singh Date: Sat, 22 Oct 2016 09:33:07 +0300 Subject: [PATCH 17/18] add results --- .../spark/serializer/KryoBenchmark.scala | 38 +++++++++---------- 1 file changed, 19 insertions(+), 19 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoBenchmark.scala b/core/src/test/scala/org/apache/spark/serializer/KryoBenchmark.scala index 5ccd0bc1a8809..6a32a59635b76 100644 --- a/core/src/test/scala/org/apache/spark/serializer/KryoBenchmark.scala +++ b/core/src/test/scala/org/apache/spark/serializer/KryoBenchmark.scala @@ -27,7 +27,7 @@ import org.apache.spark.util.Benchmark class KryoBenchmark extends SparkFunSuite { val benchmark = new Benchmark("Benchmark Kryo Unsafe vs safe Serialization", 1024 * 1024 * 15, 10) - ignore(s"Benchmark Kryo Unsafe vs safe Serialization") { + test(s"Benchmark Kryo Unsafe vs safe Serialization") { Seq (true, false).foreach (runBenchmark) benchmark.run() @@ -35,24 +35,24 @@ class KryoBenchmark extends SparkFunSuite { /* Benchmark Kryo Unsafe vs safe Serialization: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ - basicTypes: Int unsafe:true 160 / 178 98.5 10.1 1.0X - basicTypes: Long unsafe:true 210 / 218 74.9 13.4 0.8X - basicTypes: Float unsafe:true 203 / 213 77.5 12.9 0.8X - basicTypes: Double unsafe:true 226 / 235 69.5 14.4 0.7X - Array: Int unsafe:true 1087 / 1101 14.5 69.1 0.1X - Array: Long unsafe:true 2758 / 2844 5.7 175.4 0.1X - Array: Float unsafe:true 1511 / 1552 10.4 96.1 0.1X - Array: Double unsafe:true 2942 / 2972 5.3 187.0 0.1X - Map of string->Double unsafe:true 2645 / 2739 5.9 168.2 0.1X - basicTypes: Int unsafe:false 211 / 218 74.7 13.4 0.8X - basicTypes: Long unsafe:false 247 / 253 63.6 15.7 0.6X - basicTypes: Float unsafe:false 211 / 216 74.5 13.4 0.8X - basicTypes: Double unsafe:false 227 / 233 69.2 14.4 0.7X - Array: Int unsafe:false 3012 / 3032 5.2 191.5 0.1X - Array: Long unsafe:false 4463 / 4515 3.5 283.8 0.0X - Array: Float unsafe:false 2788 / 2868 5.6 177.2 0.1X - Array: Double unsafe:false 3558 / 3752 4.4 226.2 0.0X - Map of string->Double unsafe:false 2806 / 2933 5.6 178.4 0.1X + basicTypes: Int with unsafe:true 151 / 170 104.2 9.6 1.0X + basicTypes: Long with unsafe:true 175 / 191 89.8 11.1 0.9X + basicTypes: Float with unsafe:true 177 / 184 88.8 11.3 0.9X + basicTypes: Double with unsafe:true 193 / 216 81.4 12.3 0.8X + Array: Int with unsafe:true 513 / 587 30.7 32.6 0.3X + Array: Long with unsafe:true 1211 / 1358 13.0 77.0 0.1X + Array: Float with unsafe:true 890 / 964 17.7 56.6 0.2X + Array: Double with unsafe:true 1335 / 1428 11.8 84.9 0.1X + Map of string->Double with unsafe:true 931 / 988 16.9 59.2 0.2X + basicTypes: Int with unsafe:false 197 / 217 79.9 12.5 0.8X + basicTypes: Long with unsafe:false 219 / 240 71.8 13.9 0.7X + basicTypes: Float with unsafe:false 208 / 217 75.7 13.2 0.7X + basicTypes: Double with unsafe:false 208 / 225 75.6 13.2 0.7X + Array: Int with unsafe:false 2559 / 2681 6.1 162.7 0.1X + Array: Long with unsafe:false 3425 / 3516 4.6 217.8 0.0X + Array: Float with unsafe:false 2025 / 2134 7.8 128.7 0.1X + Array: Double with unsafe:false 2241 / 2358 7.0 142.5 0.1X + Map of string->Double with unsafe:false 1044 / 1085 15.1 66.4 0.1X */ // scalastyle:on } From cdf535e82c7c482ae030975d53683164854ef588 Mon Sep 17 00:00:00 2001 From: Sandeep Singh Date: Sat, 22 Oct 2016 09:33:45 +0300 Subject: [PATCH 18/18] ignore benchmark --- .../test/scala/org/apache/spark/serializer/KryoBenchmark.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoBenchmark.scala b/core/src/test/scala/org/apache/spark/serializer/KryoBenchmark.scala index 6a32a59635b76..64be966276140 100644 --- a/core/src/test/scala/org/apache/spark/serializer/KryoBenchmark.scala +++ b/core/src/test/scala/org/apache/spark/serializer/KryoBenchmark.scala @@ -27,7 +27,7 @@ import org.apache.spark.util.Benchmark class KryoBenchmark extends SparkFunSuite { val benchmark = new Benchmark("Benchmark Kryo Unsafe vs safe Serialization", 1024 * 1024 * 15, 10) - test(s"Benchmark Kryo Unsafe vs safe Serialization") { + ignore(s"Benchmark Kryo Unsafe vs safe Serialization") { Seq (true, false).foreach (runBenchmark) benchmark.run()