Skip to content

Commit

Permalink
SPARK-2543: Allow user to set maximum Kryo buffer size
Browse files Browse the repository at this point in the history
Author: Koert Kuipers <koert@tresata.com>

Closes #735 from koertkuipers/feat-kryo-max-buffersize and squashes the following commits:

15f6d81 [Koert Kuipers] change default for spark.kryoserializer.buffer.max.mb to 64mb and add some documentation
1bcc22c [Koert Kuipers] Merge branch 'master' into feat-kryo-max-buffersize
0c9f8eb [Koert Kuipers] make default for kryo max buffer size 16MB
143ec4d [Koert Kuipers] test resizable buffer in kryo Output
0732445 [Koert Kuipers] support setting maxCapacity to something different than capacity in kryo Output
  • Loading branch information
koertkuipers authored and pwendell committed Jul 30, 2014
1 parent 7003c16 commit 7c5fc28
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,12 @@ class KryoSerializer(conf: SparkConf)
with Serializable {

private val bufferSize = conf.getInt("spark.kryoserializer.buffer.mb", 2) * 1024 * 1024
private val maxBufferSize = conf.getInt("spark.kryoserializer.buffer.max.mb", 64) * 1024 * 1024
private val referenceTracking = conf.getBoolean("spark.kryo.referenceTracking", true)
private val registrationRequired = conf.getBoolean("spark.kryo.registrationRequired", false)
private val registrator = conf.getOption("spark.kryo.registrator")

def newKryoOutput() = new KryoOutput(bufferSize)
def newKryoOutput() = new KryoOutput(bufferSize, math.max(bufferSize, maxBufferSize))

def newKryo(): Kryo = {
val instantiator = new EmptyScalaKryoInstantiator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,36 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext {
}
}

class KryoSerializerResizableOutputSuite extends FunSuite {
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.LocalSparkContext
import org.apache.spark.SparkException

// trial and error showed this will not serialize with 1mb buffer
val x = (1 to 400000).toArray

test("kryo without resizable output buffer should fail on large array") {
val conf = new SparkConf(false)
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.set("spark.kryoserializer.buffer.mb", "1")
conf.set("spark.kryoserializer.buffer.max.mb", "1")
val sc = new SparkContext("local", "test", conf)
intercept[SparkException](sc.parallelize(x).collect)
LocalSparkContext.stop(sc)
}

test("kryo with resizable output buffer should succeed on large array") {
val conf = new SparkConf(false)
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.set("spark.kryoserializer.buffer.mb", "1")
conf.set("spark.kryoserializer.buffer.max.mb", "2")
val sc = new SparkContext("local", "test", conf)
assert(sc.parallelize(x).collect === x)
LocalSparkContext.stop(sc)
}
}

object KryoTest {
case class CaseClass(i: Int, s: String) {}

Expand Down
16 changes: 12 additions & 4 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -414,10 +414,18 @@ Apart from these, the following properties are also available, and may be useful
<td><code>spark.kryoserializer.buffer.mb</code></td>
<td>2</td>
<td>
Maximum object size to allow within Kryo (the library needs to create a buffer at least as
large as the largest single object you'll serialize). Increase this if you get a "buffer limit
exceeded" exception inside Kryo. Note that there will be one buffer <i>per core</i> on each
worker.
Initial size of Kryo's serialization buffer, in megabytes. Note that there will be one buffer
<i>per core</i> on each worker. This buffer will grow up to
<code>spark.kryoserializer.buffer.max.mb</code> if needed.
</td>
</tr>
<tr>
<td><code>spark.kryoserializer.buffer.max.mb</code></td>
<td>64</td>
<td>
Maximum allowable size of Kryo serialization buffer, in megabytes. This must be larger than any
object you attempt to serialize. Increase this if you get a "buffer limit exceeded" exception
inside Kryo.
</td>
</tr>
</table>
Expand Down

0 comments on commit 7c5fc28

Please sign in to comment.