From 1379b9650e48c4f4f70784f11c9db38140b16c67 Mon Sep 17 00:00:00 2001 From: Binh Nguyen Date: Fri, 4 Jul 2014 18:16:52 +0700 Subject: [PATCH 1/4] SPARK-2368 Making FileServerHandler sends gracefully to prevent OOM Busy wait for the channel to be ready before writing to it to prevent the buffer from exploding. Also making some small code changes for clarity. --- .../spark/network/netty/FileClient.java | 4 ++++ .../network/netty/FileClientHandler.java | 11 +++++---- .../spark/network/netty/FileServer.java | 6 ++++- .../network/netty/FileServerHandler.java | 23 +++++++++++-------- 4 files changed, 30 insertions(+), 14 deletions(-) diff --git a/core/src/main/java/org/apache/spark/network/netty/FileClient.java b/core/src/main/java/org/apache/spark/network/netty/FileClient.java index 0d31894d6ec7a..88e3bd0e75c43 100644 --- a/core/src/main/java/org/apache/spark/network/netty/FileClient.java +++ b/core/src/main/java/org/apache/spark/network/netty/FileClient.java @@ -20,6 +20,7 @@ import java.util.concurrent.TimeUnit; import io.netty.bootstrap.Bootstrap; +import io.netty.buffer.PooledByteBufAllocator; import io.netty.channel.Channel; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; @@ -53,6 +54,9 @@ public void init() { .option(ChannelOption.SO_KEEPALIVE, true) .option(ChannelOption.TCP_NODELAY, true) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeout) + .option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32 * 1024) + .option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 8 * 1024) + .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) .handler(new FileClientChannelInitializer(handler)); } diff --git a/core/src/main/java/org/apache/spark/network/netty/FileClientHandler.java b/core/src/main/java/org/apache/spark/network/netty/FileClientHandler.java index 63d3d927255f9..0d3b027b38081 100644 --- a/core/src/main/java/org/apache/spark/network/netty/FileClientHandler.java +++ b/core/src/main/java/org/apache/spark/network/netty/FileClientHandler.java @@ -44,10 +44,13 @@ public void channelRead0(ChannelHandlerContext ctx, ByteBuf in) { } // get file if(in.readableBytes() >= currentHeader.fileLen()) { - handle(ctx, in, currentHeader); - handlerCalled = true; - currentHeader = null; - ctx.close(); + try { + handle(ctx, in, currentHeader); + } finally { + handlerCalled = true; + currentHeader = null; + ctx.close(); + } } } diff --git a/core/src/main/java/org/apache/spark/network/netty/FileServer.java b/core/src/main/java/org/apache/spark/network/netty/FileServer.java index c93425e2787dc..6ea95ef257968 100644 --- a/core/src/main/java/org/apache/spark/network/netty/FileServer.java +++ b/core/src/main/java/org/apache/spark/network/netty/FileServer.java @@ -20,6 +20,7 @@ import java.net.InetSocketAddress; import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.PooledByteBufAllocator; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; @@ -52,7 +53,10 @@ class FileServer { .channel(OioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 100) .option(ChannelOption.SO_RCVBUF, 1500) - .childHandler(new FileServerChannelInitializer(pResolver)); + .childHandler(new FileServerChannelInitializer(pResolver)) + .childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32 * 1024) + .childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 8 * 1024) + .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); // Start the server. channelFuture = bootstrap.bind(addr); try { diff --git a/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java b/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java index c0133e19c7f79..90bec43906bc3 100644 --- a/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java +++ b/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java @@ -20,6 +20,7 @@ import java.io.File; import java.io.FileInputStream; +import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.DefaultFileRegion; @@ -51,28 +52,25 @@ public void channelRead0(ChannelHandlerContext ctx, String blockIdString) { File file = fileSegment.file(); if (file.exists()) { if (!file.isFile()) { - ctx.write(new FileHeader(0, blockId).buffer()); - ctx.flush(); + writeIfPossible(ctx.channel(), new FileHeader(0, blockId).buffer()); return; } long length = fileSegment.length(); if (length > Integer.MAX_VALUE || length <= 0) { - ctx.write(new FileHeader(0, blockId).buffer()); - ctx.flush(); + writeIfPossible(ctx.channel(), new FileHeader(0, blockId).buffer()); return; } int len = (int) length; - ctx.write((new FileHeader(len, blockId)).buffer()); + writeIfPossible(ctx.channel(), new FileHeader(len, blockId).buffer()); try { - ctx.write(new DefaultFileRegion(new FileInputStream(file) - .getChannel(), fileSegment.offset(), fileSegment.length())); + writeIfPossible(ctx.channel(), new DefaultFileRegion(new FileInputStream(file) + .getChannel(), fileSegment.offset(), fileSegment.length())); } catch (Exception e) { LOG.error("Exception: ", e); } } else { - ctx.write(new FileHeader(0, blockId).buffer()); + writeIfPossible(ctx.channel(), new FileHeader(0, blockId).buffer()); } - ctx.flush(); } @Override @@ -80,4 +78,11 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { LOG.error("Exception: ", cause); ctx.close(); } + + // We want to send right away without over loading the channel so the busy wait is needed. + private void writeIfPossible(Channel channel, Object object) { + while (!channel.isWritable()) { + channel.writeAndFlush(object); + } + } } From 54fff4417af3b794f8edf93798344af39d404d3c Mon Sep 17 00:00:00 2001 From: Binh Nguyen Date: Fri, 4 Jul 2014 20:57:11 +0700 Subject: [PATCH 2/4] fix wrong logic --- .../org/apache/spark/network/netty/FileServerHandler.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java b/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java index 90bec43906bc3..f11f22cd9bf14 100644 --- a/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java +++ b/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java @@ -81,8 +81,11 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // We want to send right away without over loading the channel so the busy wait is needed. private void writeIfPossible(Channel channel, Object object) { - while (!channel.isWritable()) { - channel.writeAndFlush(object); + while (true) { + if (channel.isWritable()) { + channel.writeAndFlush(object); + return; + } } } } From 166b8bc7092cb5e9b2b042279f7c83d77aeeb1f7 Mon Sep 17 00:00:00 2001 From: Binh Nguyen Date: Sat, 5 Jul 2014 10:54:23 +0700 Subject: [PATCH 3/4] Expose more Netty parameters + plus documentation --- .../spark/network/netty/FileClient.java | 14 +++-- .../spark/network/netty/FileServer.java | 12 ++-- .../spark/network/netty/ShuffleCopier.scala | 3 +- .../spark/network/netty/ShuffleSender.scala | 8 +-- .../spark/storage/DiskBlockManager.scala | 2 +- docs/configuration.md | 56 +++++++++++++++++++ 6 files changed, 78 insertions(+), 17 deletions(-) diff --git a/core/src/main/java/org/apache/spark/network/netty/FileClient.java b/core/src/main/java/org/apache/spark/network/netty/FileClient.java index 88e3bd0e75c43..5d9035ee2fa78 100644 --- a/core/src/main/java/org/apache/spark/network/netty/FileClient.java +++ b/core/src/main/java/org/apache/spark/network/netty/FileClient.java @@ -27,6 +27,7 @@ import io.netty.channel.oio.OioEventLoopGroup; import io.netty.channel.socket.oio.OioSocketChannel; +import org.apache.spark.SparkConf; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,11 +40,14 @@ class FileClient { private Bootstrap bootstrap = null; private EventLoopGroup group = null; private final int connectTimeout; - private final int sendTimeout = 60; // 1 min + private final int sendTimeout; + private final SparkConf conf; - FileClient(FileClientHandler handler, int connectTimeout) { + FileClient(FileClientHandler handler, SparkConf conf) { this.handler = handler; - this.connectTimeout = connectTimeout; + this.connectTimeout = conf.getInt("spark.shuffle.fileclient.connect.timeout", 60000); + this.sendTimeout = conf.getInt("spark.shuffle.fileclient.send.timeout", 60); + this.conf = conf; } public void init() { @@ -54,8 +58,8 @@ public void init() { .option(ChannelOption.SO_KEEPALIVE, true) .option(ChannelOption.TCP_NODELAY, true) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeout) - .option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32 * 1024) - .option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 8 * 1024) + .option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, conf.getInt("spark.shuffle.fileclient.watermark.high", 32) * 1024) + .option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, conf.getInt("spark.shuffle.fileclient.watermark.low", 8) * 1024) .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) .handler(new FileClientChannelInitializer(handler)); } diff --git a/core/src/main/java/org/apache/spark/network/netty/FileServer.java b/core/src/main/java/org/apache/spark/network/netty/FileServer.java index 6ea95ef257968..c9628bbcd1e41 100644 --- a/core/src/main/java/org/apache/spark/network/netty/FileServer.java +++ b/core/src/main/java/org/apache/spark/network/netty/FileServer.java @@ -26,6 +26,7 @@ import io.netty.channel.EventLoopGroup; import io.netty.channel.oio.OioEventLoopGroup; import io.netty.channel.socket.oio.OioServerSocketChannel; +import org.apache.spark.SparkConf; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,7 +42,7 @@ class FileServer { private ChannelFuture channelFuture = null; private int port = 0; - FileServer(PathResolver pResolver, int port) { + FileServer(PathResolver pResolver, int port, SparkConf conf) { InetSocketAddress addr = new InetSocketAddress(port); // Configure the server. @@ -49,13 +50,14 @@ class FileServer { workerGroup = new OioEventLoopGroup(); ServerBootstrap bootstrap = new ServerBootstrap(); + bootstrap.group(bossGroup, workerGroup) .channel(OioServerSocketChannel.class) - .option(ChannelOption.SO_BACKLOG, 100) - .option(ChannelOption.SO_RCVBUF, 1500) + .option(ChannelOption.SO_BACKLOG, conf.getInt("spark.shuffle.fileserver.backlog", 100)) + .option(ChannelOption.SO_RCVBUF, conf.getInt("spark.shuffle.fileserver.receivebuf", 1536)) .childHandler(new FileServerChannelInitializer(pResolver)) - .childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32 * 1024) - .childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 8 * 1024) + .childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, conf.getInt("spark.shuffle.fileserver.watermark.high", 1024) * 1024) + .childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, conf.getInt("spark.shuffle.fileserver.watermark.low", 256) * 1024) .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); // Start the server. channelFuture = bootstrap.bind(addr); diff --git a/core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala b/core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala index e7b2855e1ec91..149a3a4dfbb85 100644 --- a/core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala +++ b/core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala @@ -35,8 +35,7 @@ private[spark] class ShuffleCopier(conf: SparkConf) extends Logging { resultCollectCallback: (BlockId, Long, ByteBuf) => Unit) { val handler = new ShuffleCopier.ShuffleClientHandler(resultCollectCallback) - val connectTimeout = conf.getInt("spark.shuffle.netty.connect.timeout", 60000) - val fc = new FileClient(handler, connectTimeout) + val fc = new FileClient(handler, conf) try { fc.init() diff --git a/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala b/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala index 7ef7aecc6a9fb..bbc07ebdafe27 100644 --- a/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala +++ b/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala @@ -19,13 +19,13 @@ package org.apache.spark.network.netty import java.io.File -import org.apache.spark.Logging +import org.apache.spark.{SparkConf, Logging} import org.apache.spark.util.Utils import org.apache.spark.storage.{BlockId, FileSegment} -private[spark] class ShuffleSender(portIn: Int, val pResolver: PathResolver) extends Logging { +private[spark] class ShuffleSender(portIn: Int, val pResolver: PathResolver, val conf: SparkConf) extends Logging { - val server = new FileServer(pResolver, portIn) + val server = new FileServer(pResolver, portIn, conf) server.start() def stop() { @@ -66,6 +66,6 @@ private[spark] object ShuffleSender { new FileSegment(file, 0, file.length()) } } - val sender = new ShuffleSender(port, pResovler) + val sender = new ShuffleSender(port, pResovler, new SparkConf) } } diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala index 673fc19c060a4..f10a9c8a1af99 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala @@ -179,7 +179,7 @@ private[spark] class DiskBlockManager(shuffleManager: ShuffleBlockManager, rootD } private[storage] def startShuffleBlockSender(port: Int): Int = { - shuffleSender = new ShuffleSender(port, this) + shuffleSender = new ShuffleSender(port, this, shuffleManager.conf) logInfo(s"Created ShuffleSender binding to port: ${shuffleSender.port}") shuffleSender.port } diff --git a/docs/configuration.md b/docs/configuration.md index b84104cc7e653..af83ea4407863 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -574,6 +574,62 @@ Apart from these, the following properties are also available, and may be useful between nodes leading to flooding the network with those. + + spark.shuffle.fileclient.connect.timeout + 60000 + + The maximum allowed time in milliseconds for FileClient to connect to FileServer. + + + + spark.shuffle.fileclient.send.timeout + 60 + + The maximum allowed wait time in milliseconds for FileClient to send a request to FileServer after the connection has been established. + + + + spark.shuffle.fileclient.watermark.high + 512 + + The high watermark for FileClient in kbytes. If the number of bytes queued in the write buffer exceeds this value, Netty Channel.isWritable() will start to return false. + + + + spark.shuffle.fileclient.watermark.low + 128 + + The low watermark for FileClient in kbytes. Once the number of bytes queued in the write buffer exceeded the high water mark and then dropped down below this value, Netty Channel.isWritable() will start to return true again. + + + + spark.shuffle.fileserver.watermark.high + 1024 + + The high watermark for FileServer in kbytes. If the number of bytes queued in the write buffer exceeds this value, Netty Channel.isWritable() will start to return false. + + + + spark.shuffle.fileserver.watermark.low + 256 + + The low watermark for FileServer in kbytes. Once the number of bytes queued in the write buffer exceeded the high water mark and then dropped down below this value, Netty Channel.isWritable() will start to return true again. + + + + spark.shuffle.fileserver.receivebuf + 1536 + + Set SO_RCVBUF of the FileSever socket. It specifies the size of the buffer the kernel allocates to hold the data arriving into the given socket during the time between it arrives over the network and when it is read by the program that owns this socket. With TCP, if data arrives and you are not reading it, the buffer will fill up, and the sender will be told to slow down. + + + + spark.shuffle.fileserver.backlog + 100 + + Set SO_BACKLOG of the FileServer socket. It indicates the maximum queue length for incoming connection indications (a request to connect) is set to the backlog parameter. If a connection indication arrives when the queue is full, the connection is refused. + + #### Scheduling From 6eb86cf16c962384d4bd38acac1a7f2306c06d6c Mon Sep 17 00:00:00 2001 From: Binh Nguyen Date: Sat, 5 Jul 2014 10:58:41 +0700 Subject: [PATCH 4/4] Fix too long lines --- .../scala/org/apache/spark/network/netty/ShuffleSender.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala b/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala index bbc07ebdafe27..f0433a38189ae 100644 --- a/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala +++ b/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala @@ -23,7 +23,8 @@ import org.apache.spark.{SparkConf, Logging} import org.apache.spark.util.Utils import org.apache.spark.storage.{BlockId, FileSegment} -private[spark] class ShuffleSender(portIn: Int, val pResolver: PathResolver, val conf: SparkConf) extends Logging { +private[spark] class ShuffleSender(portIn: Int, val pResolver: PathResolver, + val conf: SparkConf) extends Logging { val server = new FileServer(pResolver, portIn, conf) server.start()