-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-17839][CORE] Use Nio's directbuffer instead of BufferedInputStream in order to avoid additional copy from os buffer cache to user buffer #15408
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…ffer to read the spill files in order to avoid additional copy
|
cc - @rxin |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems conceptually sound and the benchmark shows good results. I wonder, is this not something already available in the JDK? I could only find this from the JDK, which doesn't do internal buffering, but does some more stuff that might be worth comparing notes with: http://grepcode.com/file/repository.grepcode.com/java/root/jdk/openjdk/8-b132/sun/nio/ch/ChannelInputStream.java/
| */ | ||
| public final class NioBasedBufferedFileInputStream extends InputStream { | ||
|
|
||
| ByteBuffer bb; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private final
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
|
|
||
| FileChannel ch; | ||
|
|
||
| public NioBasedBufferedFileInputStream(File file, int bufferSize) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it be reasonable to establish a default constructor with some default buffer size, like BufferedInputStream?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that makes sense, changed accordingly.
|
|
||
| public NioBasedBufferedFileInputStream(File file, int bufferSize) throws IOException { | ||
| bb = ByteBuffer.allocateDirect(bufferSize); | ||
| FileInputStream f = new FileInputStream(file); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not entirely sure about this, but seems so from reading the source: closing the channel won't free the file descriptor? can you just hang on to this ref as well to close it in close() for completeness?
Better yet, can't we just use FileChannel.open(...) and not bother with the stream?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed accordingly.
| bb.flip(); | ||
| } | ||
|
|
||
| public boolean refill() throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
| public void close() throws IOException { | ||
| ch.close(); | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you'll want to implement skip() and available() too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
skip() is already implemented by underlying InputStream so that is not needed. Implemented available() .
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
skip() in InputStream will call read(), this is not the optimal solution
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Although, we are not using skip() anywhere, but I implemented an optimal version of it anyways.
|
|
||
| final BufferedInputStream bs = | ||
| new BufferedInputStream(new FileInputStream(file), (int) bufferSizeBytes); | ||
| final NioBasedBufferedFileInputStream bs = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW I think IndexShuffleBlockResolver, RowQueue have instances that could use a similar treatment.
Can this reference be InputStream?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, there are other instance in the code where we are using BufferedInputStream. I will file raise another PR to replace those with NioBasedBufferedFileInputStream.
|
Test build #66610 has finished for PR 15408 at commit
|
Yes, I looked around a bit and unfortunately, I could not find any implementation in JDK which support buffering based nio read. |
|
Test build #66612 has finished for PR 15408 at commit
|
| * but does not support buffering. | ||
| * | ||
| */ | ||
| public final class NioBasedBufferedFileInputStream extends InputStream { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should add a test suite specifically for this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a test suite for this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great - thanks. Just FYI in general I find ScalaTest to be a better framework than JUnit, so it is a good idea to write tests in Scala even if the class is implemented in Java. Not a big deal though.
|
Can you also expand on what the 7% means? Is it some workload end-to-end that's been improved by 7%, or the sorting itself improves by 7%? |
| bb = ByteBuffer.allocateDirect(bufferSize); | ||
| ch = FileChannel.open(file.toPath(), StandardOpenOption.READ); | ||
| ch.read(bb); | ||
| bb.flip(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ch.read(bb); can be removed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed, thanks!
The perf improvement was end-to-end which means the sorting improvement is definitely more than that. Also, changed the PR description to make that clear. |
|
Test build #66621 has finished for PR 15408 at commit
|
| */ | ||
| public final class NioBasedBufferedFileInputStream extends InputStream { | ||
|
|
||
| private static int DEFAULT_BUFFER_SIZE = 8192; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we don't do this consistently, but would be useful to rename this DEFAULT_BUFFER_SIZE_BYTES so it is clear what the unit is
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed.
|
|
||
| private final FileChannel fileChannel; | ||
|
|
||
| public NioBasedBufferedFileInputStream(File file, int bufferSize) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here too, bufferSizeInBytes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
| this(file, DEFAULT_BUFFER_SIZE); | ||
| } | ||
|
|
||
| private boolean refill() throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
document this function and explain what the ret value means?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
| } | ||
|
|
||
| @Override | ||
| public int read(byte[] b, int off, int len) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
off -> offset
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
| public int available() throws IOException { | ||
| return byteBuffer.remaining(); | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove one blank line
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
| } | ||
|
|
||
| @Override | ||
| public int available() throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
one question: how is this used? AFAIK InputStream always return 0 for this. Is this used somewhere else for performance improvements?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a standard API method for InputStream, so if something does call it, it's better to return a useful answer because it's pretty easy to do so.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, InputStream simply has a stub impl (just like read(byte[]) loops through read). Providing .remaining() can be useful
|
|
||
| @Override | ||
| public long skip(long n) throws IOException { | ||
| if(n < 0L) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: add a space after if
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
| fileChannel.position(size); | ||
| return size - currentFilePosition; | ||
| } | ||
| else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
move else to previous line
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
| return skipFromFileChannel(toSkip); | ||
| } | ||
|
|
||
| private long skipFromFileChannel(long n) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe just inline this in the previous method, and add an else case so both cases are indented the same level?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually never mind this comment
|
|
||
| @Override | ||
| public void close() throws IOException { | ||
| fileChannel.close(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need to free the buffer or anything?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, there is a cleaner in DirectByteBuffer which frees up the memory when the object is garbage collected.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah OK. I vaguely remember Spark ran into an issue in ~ Spark 1.0 that the direct bytebuffers were not cleared fast enough (because the life cycle is decoupled from GC). We introduced StorageUtils.dispose and made buffer clearing more aggressive. It might be good to call that here too, just in case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see...that makes sense. Added an explicit call to cleanup the byte buffer.
| } | ||
|
|
||
| @Test | ||
| public void testBytesSkipped() throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we add a test case for negative skips?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
| } | ||
|
|
||
| @Test | ||
| public void testReadMultipleBytes() throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should construct a test case that triggers the code path in skipFromFileChannel very explicitly
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea. Actually I discovered a bug with my previous implementation after adding this test case, which I have fixed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This reminds me --- we should add test coverage reporting to Spark so it becomes obvious what branches are not tested sufficiently.
|
Looks pretty good overall. I left some comments mostly on styling, documentation, and test cases. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking good; I don't know that it makes sense to change other instances separately? Changing all instances of buffering a file input stream seems like one coherent logical change (but the titles need to be updates)
| } | ||
|
|
||
| @Override | ||
| public int available() throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a standard API method for InputStream, so if something does call it, it's better to return a useful answer because it's pretty easy to do so.
| if(n < 0L) { | ||
| return 0L; | ||
| } | ||
| if (byteBuffer.remaining() > n) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
= n ? or did I miss something obvious?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we have enough data in the bytebuffer to skip then we can skip from the buffer and no need to skip from the file. Looks good to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, my comment rendered incorrectly. I'm asking if the condition can be >= n. If there are exactly n bytes remaining then you can skip the entire buffer, right? it need not be > n?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we can change the condition to be >=n. Although the behavior remains the same because skipping 0 bytes from the underlying file channel is essentially a no-op. But changed it avoid confusion.
| */ | ||
| public class NioBasedBufferedFileInputStreamSuite { | ||
|
|
||
| byte[] randomBytes; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: private should always be the default
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
|
Test build #66667 has finished for PR 15408 at commit
|
|
Test build #66749 has finished for PR 15408 at commit
|
|
jenkins retest this please. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Please add
synchronizedto the public methods exceptclose(the current close method is already thread-safe). The javadoc ofInputStreamsays nothing about thread-safety but the implementation likeChannelInputStream,ByteArrayInputStream,BufferedInputStreamimplies that. - A minor behavior change is NioBufferedFileInputStream doesn't close fd in
finalizelikeFileInputStream. But that's not a big deal since the user should do it instead of counting on GC.
| private long skipFromFileChannel(long n) throws IOException { | ||
| long currentFilePosition = fileChannel.position(); | ||
| long size = fileChannel.size(); | ||
| if (currentFilePosition + n > size) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: use n > size - currentFilePosition to avoid overflow (e.g., n is Long.MAX_VALUE).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good point, changed accordingly.
|
|
||
| @Override | ||
| public int read(byte[] b, int offset, int len) throws IOException { | ||
| if (!refill()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: please add the defense codes like BufferedInputStream
if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
throw new IndexOutOfBoundsException();
} else if (len == 0) {
return 0;
}There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
|
Test build #66765 has finished for PR 15408 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also missed why close() shouldn't be synchronized. You wouldn't want it to take place during a read, right?
I think a finalizer wouldn't hurt here because FileInputStream does it. It's exactly because people may not properly close the stream.
WDYT @zsxwing ?
Getting quite close here I think.
| if (!refill()) { | ||
| return -1; | ||
| } | ||
| if ((offset | len | (offset + len) | (b.length - (offset + len))) < 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this must be a typo... this is a bitwise-or of a bunch of ints. I think maybe the example was given as a sort of pseudo code. offset and len can't be negative and their sum shouldn't exceed the array lenght, but that seems like it. 'else' below isn't really needed, but that's a nit. Actually, you want to check whether len is after updating on line 94 right? if no data is available you also return 0 without a no-op read?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@srowen I think this check is correct. It checks if any of these values is negative.
offset and len can't be negative and their sum shouldn't exceed the array length.
It's possible. offset and len are set by the caller.
@sitalkedia Could you move the argument check before refill? If the argument is invalid, we should always throw an exception instead of returning -1.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed the 'else' part and moved the argument check before refill as suggested.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@zsxwing oh I get it, you're really just checking if any top bit is set. Hm, that seems fairly obscure compared to just checking the arguments. Sure it's a little more code, but it's readable and i don't think this is performance critical? I hadn't seen this before and had to think a while to get what it does.
Yes, I'm agreeing that offset and len can't be allowed to be negative, not that it can't be specified by the caller as something negative.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@srowen this line comes from BufferedInputStream. I'm ok with rewriting it for readability.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh I see. Well it's reasonable to keep it then, but I'd at least add a comment I think. I didn't recognize this at all until I thought about it for a good few minutes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@srowen - Agreed, the condition was little confusing. Changed it to make it clearer.
|
Test build #66830 has finished for PR 15408 at commit
|
Good point. I was thinking
Agreed that it doesn't hurt the performance since FileInputStream does it. |
|
Test build #66860 has finished for PR 15408 at commit
|
|
Test build #66972 has finished for PR 15408 at commit
|
|
jenkins retest this please. |
|
Test build #66978 has finished for PR 15408 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now we're down to pretty trivial changes, thank you. This will be a nice win and I am glad we've scrutinized it because it will be in a lot of critical paths.
| * TODO: support {@link #mark(int)}/{@link #reset()} | ||
| * | ||
| */ | ||
| @ThreadSafe |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I only see one location in the codebase where we use this annotation, and I think we probably shouldn't use it at all if not used consistently.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alright, removed it for consistency.
| @ThreadSafe | ||
| public final class NioBufferedFileInputStream extends InputStream { | ||
|
|
||
| private static int DEFAULT_BUFFER_SIZE_BYTES = 8192; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops, forgot to say this should be final
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
| * {@link sun.nio.ch.ChannelInputStream} supports reading a file using nio, | ||
| * but does not support buffering. | ||
| * | ||
| * TODO: support {@link #mark(int)}/{@link #reset()} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't really care, but this could be a comment inside the class rather than user-facing. In fact I don't even know it's a to-do.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, I removed the TODO here.
|
|
||
| @Override | ||
| public synchronized int read(byte[] b, int offset, int len) throws IOException { | ||
| if (offset < 0 || len < 0 || (offset + len) < 0 || (b.length - (offset + len)) < 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hardly matters, but now that this condition has been made more explicit, then final condition is simpler as offset + len > b.length
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We still need to check if offset and len is less than 0 right? Removed theoffset + len < 0 condition because that is covered in the last condition (b.length - (offset + len)) < 0
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ignore my previous comment, we still need it.
|
|
||
| @Override | ||
| protected void finalize() throws IOException { | ||
| close(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: 2-space indent not 4
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good eye, wonder why the checkstyle did not catch it though?
|
|
||
| import org.apache.spark.storage.StorageUtils; | ||
|
|
||
| import javax.annotation.concurrent.ThreadSafe; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit^2 : no longer needed as an import
|
|
||
| @Override | ||
| public synchronized int read(byte[] b, int offset, int len) throws IOException { | ||
| if (offset < 0 || len < 0 || (b.length - (offset + len)) < 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah no I think that condition was needed. I mean: if (offset < 0 || len < 0 || offset + len < 0 || offset + len > b.length) {
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM pending tests
|
Test build #67014 has finished for PR 15408 at commit
|
|
Test build #67016 has finished for PR 15408 at commit
|
|
Going once, going twice, any more comments? |
|
LGTM. Merging to master. Thanks! |
…ream in order to avoid additional copy from os buffer cache to user buffer ## What changes were proposed in this pull request? Currently we use BufferedInputStream to read the shuffle file which copies the file content from os buffer cache to the user buffer. This adds additional latency in reading the spill files. We made a change to use java nio's direct buffer to read the spill files and for certain pipelines spilling significant amount of data, we see up to 7% speedup for the entire pipeline. ## How was this patch tested? Tested by running the job in the cluster and observed up to 7% speedup. Author: Sital Kedia <skedia@fb.com> Closes apache#15408 from sitalkedia/skedia/nio_spill_read.
…ream in order to avoid additional copy from os buffer cache to user buffer ## What changes were proposed in this pull request? Currently we use BufferedInputStream to read the shuffle file which copies the file content from os buffer cache to the user buffer. This adds additional latency in reading the spill files. We made a change to use java nio's direct buffer to read the spill files and for certain pipelines spilling significant amount of data, we see up to 7% speedup for the entire pipeline. ## How was this patch tested? Tested by running the job in the cluster and observed up to 7% speedup. Author: Sital Kedia <skedia@fb.com> Closes apache#15408 from sitalkedia/skedia/nio_spill_read.
What changes were proposed in this pull request?
Currently we use BufferedInputStream to read the shuffle file which copies the file content from os buffer cache to the user buffer. This adds additional latency in reading the spill files. We made a change to use java nio's direct buffer to read the spill files and for certain pipelines spilling significant amount of data, we see up to 7% speedup for the entire pipeline.
How was this patch tested?
Tested by running the job in the cluster and observed up to 7% speedup.