Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-24578][Core] Cap sub-region's size of returned nio buffer #21593

Closed
wants to merge 1 commit into from

Conversation

WenboZhao
Copy link
Contributor

@WenboZhao WenboZhao commented Jun 19, 2018

What changes were proposed in this pull request?

This PR tries to fix the performance regression introduced by SPARK-21517.

In our production job, we performed many parallel computations, with high possibility, some task could be scheduled to a host-2 where it needs to read the cache block data from host-1. Often, this big transfer makes the cluster suffer time out issue (it will retry 3 times, each with 120s timeout, and then do recompute to put the cache block into the local MemoryStore).

The root cause is that we don't do consolidateIfNeeded anymore as we are using

Unpooled.wrappedBuffer(chunks.length, getChunks(): _*)

in ChunkedByteBuffer. If we have many small chunks, it could cause the buf.notBuffer(...) have very bad performance in the case that we have to call copyByteBuf(...) many times.

How was this patch tested?

Existing unit tests and also test in production

@squito
Copy link
Contributor

squito commented Jun 20, 2018

Jenkins, ok to test

Copy link
Contributor

@squito squito left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

I made a suggestion for another improvement we can do while we're at it, but its small and this is a really important fix, so its OK to leave it.

// SPARK-24578: cap the sub-region's size of returned nio buffer to improve the performance
// for the case that the passed-in buffer has too many components.
int length = Math.min(buf.readableBytes(), NIO_BUFFER_LIMIT);
ByteBuffer buffer = buf.nioBuffer(buf.readerIndex(), length);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can go one step further here, and call buf.nioBuffers(int, int) (plural)
https://github.com/netty/netty/blob/4.1/buffer/src/main/java/io/netty/buffer/ByteBuf.java#L2355

that will avoid the copying required to create the merged buffer (though its a bit complicated as you have to check for incomplete writes from any single target.write() call).

Also OK to leave this for now as this is a pretty important fix.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I will make a follow up PR to address this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not do this in this PR since this is a small change and we don't have a new release recently?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this pr is fixing a pretty serious issue, we know Wenbo is going to roll this out immediately, and I suspect even more users will. this fix is also "obviously correct" -- the followup here is not super complicated, but also will be more prone to bugs. So I'm inclined to just get this in.

anyway if @WenboZhao can do the other part today, then sure, but I think we should get this in quickly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @squito and @zsxwing. I would prefer to do it in a different PR with more careful benchmark and testing. As @squito, that change is more prone to bugs.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair enough.

I just spent several minutes to write the following codes:

  private int copyByteBuf(ByteBuf buf, WritableByteChannel target) throws IOException {
    // SPARK-24578: cap the sub-region's size of returned nio buffer to improve the performance
    // for the case that the passed-in buffer has too many components.
    int length = Math.min(buf.readableBytes(), NIO_BUFFER_LIMIT);
    ByteBuffer[] buffers = buf.nioBuffers(buf.readerIndex(), length);
    int totalWritten = 0;
    for (ByteBuffer buffer : buffers) {
      int remaining = buffer.remaining();
      int written = target.write(buffer);
      totalWritten += written;
      if (written < remaining) {
        break;
      }
    }
    buf.skipBytes(totalWritten);
    return totalWritten;
  }

Feel free to use them in your follow up PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@WenboZhao did you ever follow up on this, or at least file another jira for it? sorry if I missed it

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for anybody watching this, eventually SPARK-25115 was opened (currently has a PR)

@SparkQA
Copy link

SparkQA commented Jun 20, 2018

Test build #92114 has finished for PR 21593 at commit a30d4de.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

retest this please

@SparkQA
Copy link

SparkQA commented Jun 20, 2018

Test build #92122 has finished for PR 21593 at commit a30d4de.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gatorsmile
Copy link
Member

cc @zsxwing @JoshRosen

@zsxwing
Copy link
Member

zsxwing commented Jun 20, 2018

Thanks! Merging to master and 2.3.

asfgit pushed a commit that referenced this pull request Jun 20, 2018
## What changes were proposed in this pull request?
This PR tries to fix the performance regression introduced by SPARK-21517.

In our production job, we performed many parallel computations, with high possibility, some task could be scheduled to a host-2 where it needs to read the cache block data from host-1. Often, this big transfer makes the cluster suffer time out issue (it will retry 3 times, each with 120s timeout, and then do recompute to put the cache block into the local MemoryStore).

The root cause is that we don't do `consolidateIfNeeded` anymore as we are using
```
Unpooled.wrappedBuffer(chunks.length, getChunks(): _*)
```
in ChunkedByteBuffer. If we have many small chunks, it could cause the `buf.notBuffer(...)` have very bad performance in the case that we have to call `copyByteBuf(...)` many times.

## How was this patch tested?
Existing unit tests and also test in production

Author: Wenbo Zhao <wzhao@twosigma.com>

Closes #21593 from WenboZhao/spark-24578.

(cherry picked from commit 3f4bda7)
Signed-off-by: Shixiong Zhu <zsxwing@gmail.com>
@asfgit asfgit closed this in 3f4bda7 Jun 20, 2018
curtishoward pushed a commit to twosigma/spark that referenced this pull request Jun 20, 2018
## What changes were proposed in this pull request?
This PR tries to fix the performance regression introduced by SPARK-21517.

In our production job, we performed many parallel computations, with high possibility, some task could be scheduled to a host-2 where it needs to read the cache block data from host-1. Often, this big transfer makes the cluster suffer time out issue (it will retry 3 times, each with 120s timeout, and then do recompute to put the cache block into the local MemoryStore).

The root cause is that we don't do `consolidateIfNeeded` anymore as we are using
```
Unpooled.wrappedBuffer(chunks.length, getChunks(): _*)
```
in ChunkedByteBuffer. If we have many small chunks, it could cause the `buf.notBuffer(...)` have very bad performance in the case that we have to call `copyByteBuf(...)` many times.

## How was this patch tested?
Existing unit tests and also test in production

Author: Wenbo Zhao <wzhao@twosigma.com>

Closes apache#21593 from WenboZhao/spark-24578.

(cherry picked from commit 3f4bda7)
curtishoward pushed a commit to twosigma/spark that referenced this pull request Jun 20, 2018
## What changes were proposed in this pull request?
This PR tries to fix the performance regression introduced by SPARK-21517.

In our production job, we performed many parallel computations, with high possibility, some task could be scheduled to a host-2 where it needs to read the cache block data from host-1. Often, this big transfer makes the cluster suffer time out issue (it will retry 3 times, each with 120s timeout, and then do recompute to put the cache block into the local MemoryStore).

The root cause is that we don't do `consolidateIfNeeded` anymore as we are using
```
Unpooled.wrappedBuffer(chunks.length, getChunks(): _*)
```
in ChunkedByteBuffer. If we have many small chunks, it could cause the `buf.notBuffer(...)` have very bad performance in the case that we have to call `copyByteBuf(...)` many times.

## How was this patch tested?
Existing unit tests and also test in production

Author: Wenbo Zhao <wzhao@twosigma.com>

Closes apache#21593 from WenboZhao/spark-24578.

(cherry picked from commit 3f4bda7)
@nebi-frame
Copy link

which release does have this commit?

@Anubisxcw
Copy link

which release does have this commit?

since 2.3.2
https://issues.apache.org/jira/browse/SPARK-24578

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
8 participants