Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-25905][CORE] When getting a remote block, avoid forcing a conversion to a ChunkedByteBuffer #23058

Closed
wants to merge 3 commits into from

Conversation

wypoon
Copy link
Contributor

@wypoon wypoon commented Nov 16, 2018

What changes were proposed in this pull request?

In BlockManager, getRemoteValues gets a ChunkedByteBuffer (by calling getRemoteBytes) and creates an InputStream from it. getRemoteBytes, in turn, gets a ManagedBuffer and converts it to a ChunkedByteBuffer.
Instead, expose a getRemoteManagedBuffer method so getRemoteValues can just get this ManagedBuffer and use its InputStream.
When reading a remote cache block from disk, this reduces heap memory usage significantly.
Retain getRemoteBytes for other callers.

How was this patch tested?

Imran Rashid wrote an application (https://github.com/squito/spark_2gb_test/blob/master/src/main/scala/com/cloudera/sparktest/LargeBlocks.scala), that among other things, tests reading remote cache blocks. I ran this application, using 2500MB blocks, to test reading a cache block on disk. Without this change, with --executor-memory 5g, the test fails with java.lang.OutOfMemoryError: Java heap space. With the change, the test passes with --executor-memory 2g.
I also ran the unit tests in core. In particular, DistributedSuite has a set of tests that exercise the getRemoteValues code path. BlockManagerSuite has several tests that call getRemoteBytes; I left these unchanged, so getRemoteBytes still gets exercised.

@SparkQA
Copy link

SparkQA commented Nov 16, 2018

Test build #98923 has finished for PR 23058 at commit 2516ec6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@squito
Copy link
Contributor

squito commented Nov 19, 2018

@attilapiros can you review this please?

@squito
Copy link
Contributor

squito commented Nov 19, 2018

can we also make the same change to TaskResultGetter? We could avoid the same inefficiency for large task results that also get fetched to disk. (That would actually get us closer to removing the 2gb block limit on task results, but to fully remove it, we'd need to fix the task serialization in the executor, which is a little trickier.)

…ersion to a ChunkedByteBuffer

In BlockManager, getRemoteValues gets a ChunkedByteBuffer (by calling
getRemoteBytes) and creates an InputStream from it. getRemoteBytes, in
turn, gets a ManagedBuffer and converts it to a ChunkedByteBuffer.
Instead, expose a getRemoteManagedBuffer method so getRemoteValues can
just get this ManagedBuffer and use its InputStream.
When reading a remote cache block from disk, this reduces heap memory
usage significantly.
Retain getRemoteBytes for other callers.
…gedBuffer is not a BlockManagerManagedBuffer.

Also, update a comment in a test method.
@wypoon
Copy link
Contributor Author

wypoon commented Nov 21, 2018

can we also make the same change to TaskResultGetter?

I had a conversation off-line with Imran. As we end up deserializing the value of the task result into a ByteBuffer anyway, this change does not seem worthwhile.

@SparkQA
Copy link

SparkQA commented Nov 21, 2018

Test build #99091 has finished for PR 23058 at commit 125d746.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@squito
Copy link
Contributor

squito commented Nov 26, 2018

lgtm

I looked more into the lifecycle of the buffers and when they get disposed, and it looks fine to me. (In fact I think there is no need for the dispose in the first place, as hinted at here: #22511 (comment))

I also checked about whether we should buffer the input stream, but dataDeserializeStream already does that.

@wypoon one thing, can you update the testing section of the pr description to mention the coverage you found in the existing unit tests?

@wypoon
Copy link
Contributor Author

wypoon commented Nov 26, 2018

Thanks @squito. I updated the testing section of the PR.

Copy link
Contributor

@attilapiros attilapiros left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

I have checked TaskResultGetter and the deserializers and I have an idea/question:
What is your opinion about extending SerializerInstance with a new method which accepts ManagedBuffer?

  def deserialize[T: ClassTag](bytes: ManagedBuffer): T

As all managed buffer has a createInputStream both serializer implementation (Kyro and Java) can be extended:

Kyro (not tested):

def deserialize[T: ClassTag](bytes: ManagedBuffer): T = {
    val kryo = borrowKryo()
    try {
      input.setInputStream(bytes.createInputStream())
      kryo.readClassAndObject(input).asInstanceOf[T]
    } finally {
      releaseKryo(kryo)
    }
  }

Java (not tested):

def deserialize[T: ClassTag](bytes: ManagedBuffer): T = {
    val bis = bytes.createInputStream()
    val in = deserializeStream(bis)
    in.readObject()
  }

This way we can get rid of the wasteful getRemoteBytes .

It is fine for me if it is done separately.

@squito
Copy link
Contributor

squito commented Nov 27, 2018

@attilapiros yes, something like that would be possible. I was thinking you'd just use the existing serializer methods to do it, soemthing like:

val buffer = getRemoteManagedBuffer()
val valueItr = deserializeStream(buffer.createInputStream())
val result = valueItr.next()
assert(!valueItr.hasNext()) // makes sure its closed too

my reluctance to bother with it is that you'd still be getting a DirectTaskResult, which has the data sitting in a ByteBuffer anyway. Also, its not that big a deal, as this is only for a single task result, which is not large in general. The change here is to avoid reading an entire partition into memory.

@squito
Copy link
Contributor

squito commented Nov 27, 2018

@mridulm @jerryshao @Ngone51 @vanzin just checking if you want to look at this before I merge, will leave open a bit.

@ankuriitg
Copy link
Contributor

The change looks good to me. I understand that this change uses memory efficiently but I am wondering whether it causes any performance degradation compared to memory mapping. If yes, can we measure the performance impact and document it with this change.

@squito
Copy link
Contributor

squito commented Nov 28, 2018

causes any performance degradation compared to memory mapping

@ankuriitg good question, though if you look at what the old code was doing, it wasn't memory mapping the file, it was reading it into memory from a regular input stream, take a look at ChunkedByteBuffer.fromFile

basically doing the the same thing this is doing now, but without the extra memory overhead.

@SparkQA
Copy link

SparkQA commented Nov 29, 2018

Test build #99420 has finished for PR 23058 at commit 86ba18c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@squito
Copy link
Contributor

squito commented Nov 29, 2018

merged to master, thanks @wypoon

@asfgit asfgit closed this in 5974188 Nov 29, 2018
jackylee-ch pushed a commit to jackylee-ch/spark that referenced this pull request Feb 18, 2019
…ersion to a ChunkedByteBuffer

## What changes were proposed in this pull request?

In `BlockManager`, `getRemoteValues` gets a `ChunkedByteBuffer` (by calling `getRemoteBytes`) and creates an `InputStream` from it. `getRemoteBytes`, in turn, gets a `ManagedBuffer` and converts it to a `ChunkedByteBuffer`.
Instead, expose a `getRemoteManagedBuffer` method so `getRemoteValues` can just get this `ManagedBuffer` and use its `InputStream`.
When reading a remote cache block from disk, this reduces heap memory usage significantly.
Retain `getRemoteBytes` for other callers.

## How was this patch tested?

Imran Rashid wrote an application (https://github.com/squito/spark_2gb_test/blob/master/src/main/scala/com/cloudera/sparktest/LargeBlocks.scala), that among other things, tests reading remote cache blocks. I ran this application, using 2500MB blocks, to test reading a cache block on disk. Without this change, with `--executor-memory 5g`, the test fails with `java.lang.OutOfMemoryError: Java heap space`. With the change, the test passes with `--executor-memory 2g`.
I also ran the unit tests in core. In particular, `DistributedSuite` has a set of tests that exercise the `getRemoteValues` code path. `BlockManagerSuite` has several tests that call `getRemoteBytes`; I left these unchanged, so `getRemoteBytes` still gets exercised.

Closes apache#23058 from wypoon/SPARK-25905.

Authored-by: Wing Yew Poon <wypoon@cloudera.com>
Signed-off-by: Imran Rashid <irashid@cloudera.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants