New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-21475][Core]Revert "[SPARK-21475][CORE] Use NIO's Files API to replace FileInputStream/FileOutputStream in some critical paths" #20119
Conversation
…tream/FileOutputStream in some critical paths" This reverts commit 5fd0294.
Test build #85532 has finished for PR 20119 at commit
|
LGTM |
Thanks! Merging to master. |
let's also cc the author. @jerryshao do you know if there is a way to fix the regression? |
Sorry I haven't checked the details, let me take a look at it. The changes I made was trying to fix memory issue for shuffle (especially external shuffle service), this issue was occurred in our prod cluster. Let me think if there's a way to fix it. |
try { | ||
is = Files.newInputStream(file.toPath()); | ||
is = new FileInputStream(file); | ||
ByteStreams.skipFully(is, offset); | ||
return new LimitedInputStream(is, length); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this two lines might be the place which suffers from skip
issue, can we just only revert this place? @zsxwing @cloud-fan @gatorsmile .
@@ -198,7 +196,7 @@ private[spark] class IndexShuffleBlockResolver( | |||
// find out the consolidated file, then the offset within that from our index | |||
val indexFile = getIndexFile(blockId.shuffleId, blockId.mapId) | |||
|
|||
val in = new DataInputStream(Files.newInputStream(indexFile.toPath)) | |||
val in = new DataInputStream(new FileInputStream(indexFile)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jerryshao this is another place. In addition, I'm not sure if there is any compression codec using skip
or not.
I also noticed sun.nio.ch.ChannelInputStream
has extra synchronized
s as Files.newInputStream
needs to be thread-safe. Not sure if it may cause performance regression or not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see.
@@ -165,7 +165,7 @@ private void failRemainingBlocks(String[] failedBlockIds, Throwable e) { | |||
|
|||
DownloadCallback(int chunkIndex) throws IOException { | |||
this.targetFile = tempFileManager.createTempFile(); | |||
this.channel = Channels.newChannel(Files.newOutputStream(targetFile.toPath())); | |||
this.channel = Channels.newChannel(new FileOutputStream(targetFile)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems Channels.newChannel
has an optimization for FileOutputStream
: http://www.grepcode.com/file/repository.grepcode.com/java/root/jdk/openjdk/8-b132/java/nio/channels/Channels.java#424
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here I think we can use FileChannel.open
instead.
@@ -133,7 +132,7 @@ public Object convertToNetty() throws IOException { | |||
if (conf.lazyFileDescriptor()) { | |||
return new DefaultFileRegion(file, offset, length); | |||
} else { | |||
FileChannel fileChannel = FileChannel.open(file.toPath(), StandardOpenOption.READ); | |||
FileChannel fileChannel = new FileInputStream(file).getChannel(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jerryshao I think this is the only line that may reduce the memory pressure for external shuffle service. Right?
@@ -39,7 +39,7 @@ public ShuffleIndexInformation(File indexFile) throws IOException { | |||
offsets = buffer.asLongBuffer(); | |||
DataInputStream dis = null; | |||
try { | |||
dis = new DataInputStream(Files.newInputStream(indexFile.toPath())); | |||
dis = new DataInputStream(new FileInputStream(indexFile)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@zsxwing also here I think it will affect external shuffle service.
@zsxwing maybe we only need to fix above two points related to external shuffle service, what do you think? |
@jerryshao sgtm. Could you submit a PR? |
OK, I will do it. |
…ternal shuffle service ## What changes were proposed in this pull request? This PR is the second attempt of #18684 , NIO's Files API doesn't override `skip` method for `InputStream`, so it will bring in performance issue (mentioned in #20119). But using `FileInputStream`/`FileOutputStream` will also bring in memory issue (https://dzone.com/articles/fileinputstream-fileoutputstream-considered-harmful), which is severe for long running external shuffle service. So here in this proposal, only fixing the external shuffle service related code. ## How was this patch tested? Existing tests. Author: jerryshao <sshao@hortonworks.com> Closes #20144 from jerryshao/SPARK-21475-v2. (cherry picked from commit 93f92c0) Signed-off-by: Shixiong Zhu <zsxwing@gmail.com>
…ternal shuffle service ## What changes were proposed in this pull request? This PR is the second attempt of #18684 , NIO's Files API doesn't override `skip` method for `InputStream`, so it will bring in performance issue (mentioned in #20119). But using `FileInputStream`/`FileOutputStream` will also bring in memory issue (https://dzone.com/articles/fileinputstream-fileoutputstream-considered-harmful), which is severe for long running external shuffle service. So here in this proposal, only fixing the external shuffle service related code. ## How was this patch tested? Existing tests. Author: jerryshao <sshao@hortonworks.com> Closes #20144 from jerryshao/SPARK-21475-v2.
What changes were proposed in this pull request?
This reverts commit 5fd0294 because of a huge performance regression.
I manually fixed a minor conflict in
OneForOneBlockFetcher.java
.Files.newInputStream
returnssun.nio.ch.ChannelInputStream
.ChannelInputStream
doesn't overrideInputStream.skip
, so it's using the defaultInputStream.skip
which just consumes and discards data. This causes a huge performance regression when reading shuffle files.How was this patch tested?
Jenkins