Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-13921] Store serialized blocks as multiple chunks in MemoryStore #11748

Closed
wants to merge 20 commits into from

Conversation

JoshRosen
Copy link
Contributor

This patch modifies the BlockManager, MemoryStore, and several other storage components so that serialized cached blocks are stored as multiple small chunks rather than as a single contiguous ByteBuffer.

This change will help to improve the efficiency of memory allocation and the accuracy of memory accounting when serializing blocks. Our current serialization code uses a ByteBufferOutputStream, which doubles and re-allocates its backing byte array; this increases the peak memory requirements during serialization (since we need to hold extra memory while expanding the array). In addition, we currently don't account for the extra wasted space at the end of the ByteBuffer's backing array, so a 129 megabyte serialized block may actually consume 256 megabytes of memory. After switching to storing blocks in multiple chunks, we'll be able to efficiently trim the backing buffers so that no space is wasted.

This change is also a prerequisite to being able to cache blocks which are larger than 2GB (although full support for that depends on several other changes which have not bee implemented yet).

@JoshRosen JoshRosen changed the title Store serialized blocks as multiple chunks in MemoryStore [SPARK-13921] Store serialized blocks as multiple chunks in MemoryStore Mar 16, 2016
@SparkQA
Copy link

SparkQA commented Mar 16, 2016

Test build #53251 has finished for PR 11748 at commit e5e663f.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.


class ChunkedByteBufferSuite extends SparkFunSuite {

test("must have at least one chunk") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any reason we want to enforce this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it marginally simplified some initialization code.

@SparkQA
Copy link

SparkQA commented Mar 16, 2016

Test build #53290 has finished for PR 11748 at commit 0a347fd.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@JoshRosen
Copy link
Contributor Author

@rxin, I believe that I've addressed all of your review comments (plus the outstanding TODOs that I left).

@SparkQA
Copy link

SparkQA commented Mar 16, 2016

Test build #53355 has finished for PR 11748 at commit 719ad3c.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@rxin
Copy link
Contributor

rxin commented Mar 16, 2016

Looks good from my side. Would be good for somebody else to take a look at this too. E.g. @andrewor14 ?

@SparkQA
Copy link

SparkQA commented Mar 16, 2016

Test build #53352 has finished for PR 11748 at commit b6ddf3e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 16, 2016

Test build #53351 has finished for PR 11748 at commit 4f5074e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 17, 2016

Test build #53363 has finished for PR 11748 at commit 3fc0b66.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@JoshRosen
Copy link
Contributor Author

Jenkins, retest this please.

@SparkQA
Copy link

SparkQA commented Mar 17, 2016

Test build #53442 has finished for PR 11748 at commit 3fc0b66.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 17, 2016

Test build #53445 has finished for PR 11748 at commit cb9311b.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@JoshRosen
Copy link
Contributor Author

Jenkins, retest this please.

currentChunk.position(currentChunk.position + amountToSkip)
if (currentChunk.remaining() == 0) {
if (chunks.hasNext) {
currentChunk = chunks.next()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need to call dispose()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently we dispose of all chunks at the same time (in the close() call).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason this behaves differently that way than read()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, whoops. Stupid mistake.

@SparkQA
Copy link

SparkQA commented Mar 17, 2016

Test build #53452 has finished for PR 11748 at commit cb9311b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 18, 2016

Test build #53489 has finished for PR 11748 at commit 2970932.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@JoshRosen
Copy link
Contributor Author

Merging to master.

@asfgit asfgit closed this in 6c2d894 Mar 18, 2016
@JoshRosen JoshRosen deleted the chunked-block-serialization branch March 18, 2016 03:03
roygao94 pushed a commit to roygao94/spark that referenced this pull request Mar 22, 2016
This patch modifies the BlockManager, MemoryStore, and several other storage components so that serialized cached blocks are stored as multiple small chunks rather than as a single contiguous ByteBuffer.

This change will help to improve the efficiency of memory allocation and the accuracy of memory accounting when serializing blocks. Our current serialization code uses a ByteBufferOutputStream, which doubles and re-allocates its backing byte array; this increases the peak memory requirements during serialization (since we need to hold extra memory while expanding the array). In addition, we currently don't account for the extra wasted space at the end of the ByteBuffer's backing array, so a 129 megabyte serialized block may actually consume 256 megabytes of memory. After switching to storing blocks in multiple chunks, we'll be able to efficiently trim the backing buffers so that no space is wasted.

This change is also a prerequisite to being able to cache blocks which are larger than 2GB (although full support for that depends on several other changes which have not bee implemented yet).

Author: Josh Rosen <joshrosen@databricks.com>

Closes apache#11748 from JoshRosen/chunked-block-serialization.
@bonitao
Copy link

bonitao commented Jul 6, 2016

Hi @JoshRosen ,

I am trying spark 2.0, and I believe I am hitting a bug that was introduced in this commit. In summary, the problem is that when kryo serialization is enabled and you have an RDD with less elements than the default parallelism being serialized with kryo, spark will attempt to create an empty ChunkedByteBuffer and this code will throw "chunks must be non-empty". If you believe there is a better forum for me to discuss this, let me know. Happy to contribute pull requests if appropriate.

The problem is easy to reproduce. First, open a spark shell.

spark-shell --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.default.parallelism=2

Then just try to serialize a RDD with a single element (two elements or above works fine, non kryo serialization works fine):

sc.makeRDD("element"  :: Nil).persist(org.apache.spark.storage.StorageLevel.DISK_ONLY).count

And you get back:

[Stage 0:>                                                          (0 + 0) / 2]ERROR  [12:35:15.701] [Executor task launch worker-0] org.apache.spark.executor.Executor -  Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.IllegalArgumentException: requirement failed: chunks must be non-empty
        at scala.Predef$.require(Predef.scala:224) ~[scala-library-2.11.8.jar:na]
        at org.apache.spark.util.io.ChunkedByteBuffer.<init>(ChunkedByteBuffer.scala:41) ~[spark-core_2.11-2.0.0.jar:2.0.0]
        at org.apache.spark.util.io.ChunkedByteBuffer.<init>(ChunkedByteBuffer.scala:52) ~[spark-core_2.11-2.0.0.jar:2.0.0]
        at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:101) ~[spark-core_2.11-2.0.0.jar:2.0.0]
        at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:91) ~[spark-core_2.11-2.0.0.jar:2.0.0]
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1286) ~[spark-core_2.11-2.0.0.jar:2.0.0]
        at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:105) ~[spark-core_2.11-2.0.0.jar:2.0.0]
        at org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:439) ~[spark-core_2.11-2.0.0.jar:2.0.0]
        at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:672) ~[spark-core_2.11-2.0.0.jar:2.0.0]
        at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330) ~[spark-core_2.11-2.0.0.jar:2.0.0]
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:281) ~[spark-core_2.11-2.0.0.jar:2.0.0]
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) ~[spark-core_2.11-2.0.0.jar:2.0.0]
        at org.apache.spark.scheduler.Task.run(Task.scala:85) ~[spark-core_2.11-2.0.0.jar:2.0.0]
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) ~[spark-core_2.11-2.0.0.jar:2.0.0]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_91]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_91]
        at java.lang.Thread.run(Thread.java:745) [na:1.8.0_91]
ERROR  [12:35:15.743] [task-result-getter-1] org.apache.spark.scheduler.TaskSetManager -  Task 0 in stage 0.0 failed 1 times; aborting job
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.lang.IllegalArgumentException: requirement failed: chunks must be non-empty
        at scala.Predef$.require(Predef.scala:224)
        at org.apache.spark.util.io.ChunkedByteBuffer.<init>(ChunkedByteBuffer.scala:41)
        at org.apache.spark.util.io.ChunkedByteBuffer.<init>(ChunkedByteBuffer.scala:52)
        at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:101)
        at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:91)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1286)
        at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:105)
        at org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:439)
        at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:672)
        at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:281)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
        at org.apache.spark.scheduler.Task.run(Task.scala:85)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
  at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1450)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1438)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1437)
  at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
  at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1437)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
  at scala.Option.foreach(Option.scala:257)
  at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1659)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1618)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1607)
  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
  at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:1872)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:1885)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:1898)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:1912)
  at org.apache.spark.rdd.RDD.count(RDD.scala:1111)
  ... 48 elided
Caused by: java.lang.IllegalArgumentException: requirement failed: chunks must be non-empty
  at scala.Predef$.require(Predef.scala:224)
  at org.apache.spark.util.io.ChunkedByteBuffer.<init>(ChunkedByteBuffer.scala:41)
  at org.apache.spark.util.io.ChunkedByteBuffer.<init>(ChunkedByteBuffer.scala:52)
  at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:101)
  at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:91)
  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1286)
  at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:105)
  at org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:439)
  at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:672)
  at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:281)
  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
  at org.apache.spark.scheduler.Task.run(Task.scala:85)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
  at java.lang.Thread.run(Thread.java:745)

I am in mac, and I used the stock preview binary from http://d3kbcqa49mib13.cloudfront.net/spark-2.0.0-preview-bin-hadoop2.7.tgz. A custom built v2.0.0-rc1 behaved the same in a linux box. The 1.6.x series has no problems.

@rxin
Copy link
Contributor

rxin commented Jul 8, 2016

@bonitao here's a patch that fixes it #14099

@bonitao
Copy link

bonitao commented Jul 8, 2016

Amazing. Thanks a lot @JoshRosen .

asfgit pushed a commit that referenced this pull request Jul 9, 2016
…ByteBuffer

## What changes were proposed in this pull request?

It's possible to also change the callers to not pass in empty chunks, but it seems cleaner to just allow `ChunkedByteBuffer` to handle empty arrays. cc JoshRosen

## How was this patch tested?

Unit tests, also checked that the original reproduction case in #11748 (comment) is resolved.

Author: Eric Liang <ekl@databricks.com>

Closes #14099 from ericl/spark-16432.
asfgit pushed a commit that referenced this pull request Jul 9, 2016
…ByteBuffer

## What changes were proposed in this pull request?

It's possible to also change the callers to not pass in empty chunks, but it seems cleaner to just allow `ChunkedByteBuffer` to handle empty arrays. cc JoshRosen

## How was this patch tested?

Unit tests, also checked that the original reproduction case in #11748 (comment) is resolved.

Author: Eric Liang <ekl@databricks.com>

Closes #14099 from ericl/spark-16432.

(cherry picked from commit d8b06f1)
Signed-off-by: Reynold Xin <rxin@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants