From e265d0e237660644f106f24c96495cdcfdc5888c Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Wed, 10 Dec 2014 10:45:21 -0800 Subject: [PATCH 1/2] [investigation-only] Disable transferTo in Netty. DO NOT MERGE. --- .../buffer/FileSegmentManagedBuffer.java | 42 ++++++++++++------- .../spark/network/buffer/ManagedBuffer.java | 5 ++- .../network/buffer/NettyManagedBuffer.java | 3 +- .../network/buffer/NioManagedBuffer.java | 3 +- .../network/protocol/MessageEncoder.java | 2 +- .../spark/network/util/TransportConf.java | 8 ++++ .../spark/network/TestManagedBuffer.java | 5 ++- 7 files changed, 48 insertions(+), 20 deletions(-) diff --git a/network/common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java b/network/common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java index 844eff4f4c701..77708dc089af3 100644 --- a/network/common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java +++ b/network/common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java @@ -27,6 +27,9 @@ import com.google.common.base.Objects; import com.google.common.io.ByteStreams; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.Unpooled; import io.netty.channel.DefaultFileRegion; import org.apache.spark.network.util.JavaUtils; @@ -62,15 +65,7 @@ public ByteBuffer nioByteBuffer() throws IOException { // Just copy the buffer if it's sufficiently small, as memory mapping has a high overhead. if (length < conf.memoryMapBytes()) { ByteBuffer buf = ByteBuffer.allocate((int) length); - channel.position(offset); - while (buf.remaining() != 0) { - if (channel.read(buf) == -1) { - throw new IOException(String.format("Reached EOF before filling buffer\n" + - "offset=%s\nfile=%s\nbuf.remaining=%s", - offset, file.getAbsoluteFile(), buf.remaining())); - } - } - buf.flip(); + read(buf); return buf; } else { return channel.map(FileChannel.MapMode.READ_ONLY, offset, length); @@ -128,12 +123,18 @@ public ManagedBuffer release() { } @Override - public Object convertToNetty() throws IOException { - if (conf.lazyFileDescriptor()) { - return new LazyFileRegion(file, offset, length); + public Object convertToNetty(ByteBufAllocator allocator) throws IOException { + if (length < conf.transferToBytes()) { + ByteBuffer buf = ByteBuffer.allocate((int) length); + read(buf); + return Unpooled.wrappedBuffer(buf); } else { - FileChannel fileChannel = new FileInputStream(file).getChannel(); - return new DefaultFileRegion(fileChannel, offset, length); + if (conf.lazyFileDescriptor()) { + return new LazyFileRegion(file, offset, length); + } else { + FileChannel fileChannel = new FileInputStream(file).getChannel(); + return new DefaultFileRegion(fileChannel, offset, length); + } } } @@ -151,4 +152,17 @@ public String toString() { .add("length", length) .toString(); } + + private void read(ByteBuffer buf) throws IOException { + FileChannel channel = new RandomAccessFile(file, "r").getChannel(); + channel.position(offset); + while (buf.remaining() != 0) { + if (channel.read(buf) == -1) { + throw new IOException(String.format("Reached EOF before filling buffer\n" + + "offset=%s\nfile=%s\nbuf.remaining=%s", + offset, file.getAbsoluteFile(), buf.remaining())); + } + } + buf.flip(); + } } diff --git a/network/common/src/main/java/org/apache/spark/network/buffer/ManagedBuffer.java b/network/common/src/main/java/org/apache/spark/network/buffer/ManagedBuffer.java index a415db593a788..77536f118378d 100644 --- a/network/common/src/main/java/org/apache/spark/network/buffer/ManagedBuffer.java +++ b/network/common/src/main/java/org/apache/spark/network/buffer/ManagedBuffer.java @@ -21,6 +21,8 @@ import java.io.InputStream; import java.nio.ByteBuffer; +import io.netty.buffer.ByteBufAllocator; + /** * This interface provides an immutable view for data in the form of bytes. The implementation * should specify how the data is provided: @@ -66,6 +68,7 @@ public abstract class ManagedBuffer { /** * Convert the buffer into an Netty object, used to write the data out. + * @param allocator */ - public abstract Object convertToNetty() throws IOException; + public abstract Object convertToNetty(ByteBufAllocator allocator) throws IOException; } diff --git a/network/common/src/main/java/org/apache/spark/network/buffer/NettyManagedBuffer.java b/network/common/src/main/java/org/apache/spark/network/buffer/NettyManagedBuffer.java index c806bfa45bef3..aa8fc089a1ce8 100644 --- a/network/common/src/main/java/org/apache/spark/network/buffer/NettyManagedBuffer.java +++ b/network/common/src/main/java/org/apache/spark/network/buffer/NettyManagedBuffer.java @@ -23,6 +23,7 @@ import com.google.common.base.Objects; import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.ByteBufInputStream; /** @@ -63,7 +64,7 @@ public ManagedBuffer release() { } @Override - public Object convertToNetty() throws IOException { + public Object convertToNetty(ByteBufAllocator allocator) throws IOException { return buf.duplicate(); } diff --git a/network/common/src/main/java/org/apache/spark/network/buffer/NioManagedBuffer.java b/network/common/src/main/java/org/apache/spark/network/buffer/NioManagedBuffer.java index f55b884bc45ce..496e5f0e3cb84 100644 --- a/network/common/src/main/java/org/apache/spark/network/buffer/NioManagedBuffer.java +++ b/network/common/src/main/java/org/apache/spark/network/buffer/NioManagedBuffer.java @@ -22,6 +22,7 @@ import java.nio.ByteBuffer; import com.google.common.base.Objects; +import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.ByteBufInputStream; import io.netty.buffer.Unpooled; @@ -61,7 +62,7 @@ public ManagedBuffer release() { } @Override - public Object convertToNetty() throws IOException { + public Object convertToNetty(ByteBufAllocator allocator) throws IOException { return Unpooled.wrappedBuffer(buf); } diff --git a/network/common/src/main/java/org/apache/spark/network/protocol/MessageEncoder.java b/network/common/src/main/java/org/apache/spark/network/protocol/MessageEncoder.java index 91d1e8a538a77..138d030469af5 100644 --- a/network/common/src/main/java/org/apache/spark/network/protocol/MessageEncoder.java +++ b/network/common/src/main/java/org/apache/spark/network/protocol/MessageEncoder.java @@ -52,7 +52,7 @@ public void encode(ChannelHandlerContext ctx, Message in, List out) { ChunkFetchSuccess resp = (ChunkFetchSuccess) in; try { bodyLength = resp.buffer.size(); - body = resp.buffer.convertToNetty(); + body = resp.buffer.convertToNetty(ctx.alloc()); } catch (Exception e) { // Re-encode this message as BlockFetchFailure. logger.error(String.format("Error opening block %s for client %s", diff --git a/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java b/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java index 13b37f96f8ce2..640eed5262165 100644 --- a/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java +++ b/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java @@ -90,6 +90,14 @@ public int memoryMapBytes() { return conf.getInt("spark.storage.memoryMapThreshold", 2 * 1024 * 1024); } + /** + * Minimum size of a block that we should start using transferTo in the JVM. Blocks smaller than + * this will be read in using traditional IO into JVM heap and send to network. + */ + public int transferToBytes() { + return conf.getInt("spark.storage.transferToThreshold", 2 * 1024 * 1024); + } + /** * Whether to initialize shuffle FileDescriptor lazily or not. If true, file descriptors are * created only when data is going to be transferred. This can reduce the number of open files. diff --git a/network/common/src/test/java/org/apache/spark/network/TestManagedBuffer.java b/network/common/src/test/java/org/apache/spark/network/TestManagedBuffer.java index 38113a918f795..3d3c88f5a5ef5 100644 --- a/network/common/src/test/java/org/apache/spark/network/TestManagedBuffer.java +++ b/network/common/src/test/java/org/apache/spark/network/TestManagedBuffer.java @@ -22,6 +22,7 @@ import java.nio.ByteBuffer; import com.google.common.base.Preconditions; +import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.Unpooled; import org.apache.spark.network.buffer.ManagedBuffer; @@ -76,8 +77,8 @@ public ManagedBuffer release() { } @Override - public Object convertToNetty() throws IOException { - return underlying.convertToNetty(); + public Object convertToNetty(ByteBufAllocator allocator) throws IOException { + return underlying.convertToNetty(allocator); } @Override From 5789e2be6ce83167d446d52320b96b988808d50d Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Wed, 10 Dec 2014 10:46:41 -0800 Subject: [PATCH 2/2] Use a very large value for spark.storage.transferToThreshold. --- .../main/java/org/apache/spark/network/util/TransportConf.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java b/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java index 640eed5262165..845e46807c768 100644 --- a/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java +++ b/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java @@ -95,7 +95,7 @@ public int memoryMapBytes() { * this will be read in using traditional IO into JVM heap and send to network. */ public int transferToBytes() { - return conf.getInt("spark.storage.transferToThreshold", 2 * 1024 * 1024); + return conf.getInt("spark.storage.transferToThreshold", 1024 * 1024 * 1024); } /**