From 44bb55759a4059d8bb0e60c361a8a3210a234f92 Mon Sep 17 00:00:00 2001 From: Sanket Chintapalli Date: Tue, 21 Aug 2018 13:34:31 -0400 Subject: [PATCH 01/24] SPARK-24355 Spark external shuffle server improvement to better handle block fetch requests. --- .../spark/network/TransportContext.java | 31 ++++- .../server/ChunkFetchRequestHandler.java | 131 ++++++++++++++++++ .../server/TransportChannelHandler.java | 20 ++- .../server/TransportRequestHandler.java | 49 ++----- .../spark/network/util/TransportConf.java | 19 +++ .../ChunkFetchRequestHandlerSuite.java | 102 ++++++++++++++ .../spark/network/ExtendedChannelPromise.java | 53 +++++++ .../network/TransportRequestHandlerSuite.java | 59 ++------ 8 files changed, 374 insertions(+), 90 deletions(-) create mode 100644 common/network-common/src/main/java/org/apache/spark/network/server/ChunkFetchRequestHandler.java create mode 100644 common/network-common/src/test/java/org/apache/spark/network/ChunkFetchRequestHandlerSuite.java create mode 100644 common/network-common/src/test/java/org/apache/spark/network/ExtendedChannelPromise.java diff --git a/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java b/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java index ae91bc9cfdd08..1af06af08cd63 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java +++ b/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java @@ -21,6 +21,7 @@ import java.util.List; import io.netty.channel.Channel; +import io.netty.channel.EventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.handler.timeout.IdleStateHandler; import org.slf4j.Logger; @@ -32,11 +33,13 @@ import org.apache.spark.network.client.TransportResponseHandler; import org.apache.spark.network.protocol.MessageDecoder; import org.apache.spark.network.protocol.MessageEncoder; +import org.apache.spark.network.server.ChunkFetchRequestHandler; import org.apache.spark.network.server.RpcHandler; import org.apache.spark.network.server.TransportChannelHandler; import org.apache.spark.network.server.TransportRequestHandler; import org.apache.spark.network.server.TransportServer; import org.apache.spark.network.server.TransportServerBootstrap; +import org.apache.spark.network.util.IOMode; import org.apache.spark.network.util.NettyUtils; import org.apache.spark.network.util.TransportConf; import org.apache.spark.network.util.TransportFrameDecoder; @@ -77,6 +80,11 @@ public class TransportContext { private static final MessageEncoder ENCODER = MessageEncoder.INSTANCE; private static final MessageDecoder DECODER = MessageDecoder.INSTANCE; + // Separate thread pool for handling ChunkFetchRequest. This helps to enable throttling + // max number of TransportServer worker threads that are blocked on writing response + // of ChunkFetchRequest message back to the client via the underlying channel. + private static EventLoopGroup chunkFetchWorkers; + public TransportContext(TransportConf conf, RpcHandler rpcHandler) { this(conf, rpcHandler, false); } @@ -88,6 +96,15 @@ public TransportContext( this.conf = conf; this.rpcHandler = rpcHandler; this.closeIdleConnections = closeIdleConnections; + + synchronized(this.getClass()) { + if (chunkFetchWorkers == null) { + chunkFetchWorkers = NettyUtils.createEventLoop( + IOMode.valueOf(conf.ioMode()), + conf.chunkFetchHandlerThreads(), + "chunk-fetch-handler"); + } + } } /** @@ -144,6 +161,7 @@ public TransportChannelHandler initializePipeline( RpcHandler channelRpcHandler) { try { TransportChannelHandler channelHandler = createChannelHandler(channel, channelRpcHandler); + ChunkFetchRequestHandler chunkFetchHandler = createChunkFetchHandler(channelHandler, channelRpcHandler); channel.pipeline() .addLast("encoder", ENCODER) .addLast(TransportFrameDecoder.HANDLER_NAME, NettyUtils.createFrameDecoder()) @@ -151,7 +169,9 @@ public TransportChannelHandler initializePipeline( .addLast("idleStateHandler", new IdleStateHandler(0, 0, conf.connectionTimeoutMs() / 1000)) // NOTE: Chunks are currently guaranteed to be returned in the order of request, but this // would require more logic to guarantee if this were not part of the same event loop. - .addLast("handler", channelHandler); + .addLast("handler", channelHandler) + // Use a separate EventLoopGroup to handle ChunkFetchRequest messages. + .addLast(chunkFetchWorkers, "chunkFetchHandler", chunkFetchHandler); return channelHandler; } catch (RuntimeException e) { logger.error("Error while initializing Netty pipeline", e); @@ -173,5 +193,14 @@ private TransportChannelHandler createChannelHandler(Channel channel, RpcHandler conf.connectionTimeoutMs(), closeIdleConnections); } + /** + * Creates the dedicated ChannelHandler for ChunkFetchRequest messages. + */ + private ChunkFetchRequestHandler createChunkFetchHandler(TransportChannelHandler channelHandler, + RpcHandler rpcHandler) { + return new ChunkFetchRequestHandler(channelHandler.getClient(), + rpcHandler.getStreamManager(), conf.maxChunksBeingTransferred()); + } + public TransportConf getConf() { return conf; } } diff --git a/common/network-common/src/main/java/org/apache/spark/network/server/ChunkFetchRequestHandler.java b/common/network-common/src/main/java/org/apache/spark/network/server/ChunkFetchRequestHandler.java new file mode 100644 index 0000000000000..cf5d60da53fbb --- /dev/null +++ b/common/network-common/src/main/java/org/apache/spark/network/server/ChunkFetchRequestHandler.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.server; + +import java.net.SocketAddress; + +import com.google.common.base.Throwables; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.spark.network.buffer.ManagedBuffer; +import org.apache.spark.network.client.TransportClient; +import org.apache.spark.network.protocol.ChunkFetchFailure; +import org.apache.spark.network.protocol.ChunkFetchRequest; +import org.apache.spark.network.protocol.ChunkFetchSuccess; +import org.apache.spark.network.protocol.Encodable; + +import static org.apache.spark.network.util.NettyUtils.*; + + +/** + * A dedicated ChannelHandler for processing ChunkFetchRequest messages. When sending response + * of ChunkFetchRequest messages to the clients, the thread performing the I/O on the underlying + * channel could potentially be blocked due to disk contentions. If several hundreds of clients + * send ChunkFetchRequest to the server at the same time, it could potentially occupying all + * threads from TransportServer's default EventLoopGroup for waiting for disk reads before it + * can send the block data back to the client as part of the ChunkFetchSuccess messages. As a + * result, it would leave no threads left to process other RPC messages, which takes much less + * time to process, and could lead to client timing out on either performing SASL authentication, + * registering executors, or waiting for response for an OpenBlocks messages. + */ +public class ChunkFetchRequestHandler extends SimpleChannelInboundHandler { + private static final Logger logger = LoggerFactory.getLogger(ChunkFetchRequestHandler.class); + + private final TransportClient client; + private final StreamManager streamManager; + /** The max number of chunks being transferred and not finished yet. */ + private final long maxChunksBeingTransferred; + + public ChunkFetchRequestHandler( + TransportClient client, + StreamManager streamManager, + Long maxChunksBeingTransferred) { + this.client = client; + this.streamManager = streamManager; + this.maxChunksBeingTransferred = maxChunksBeingTransferred; + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + logger.warn("Exception in connection from " + getRemoteAddress(ctx.channel()), + cause); + ctx.close(); + } + + @Override + protected void channelRead0(ChannelHandlerContext ctx, final ChunkFetchRequest msg) throws Exception { + Channel channel = ctx.channel(); + if (logger.isTraceEnabled()) { + logger.trace("Received req from {} to fetch block {}", getRemoteAddress(channel), + msg.streamChunkId); + } + long chunksBeingTransferred = streamManager.chunksBeingTransferred(); + if (chunksBeingTransferred >= maxChunksBeingTransferred) { + logger.warn("The number of chunks being transferred {} is above {}, close the connection.", + chunksBeingTransferred, maxChunksBeingTransferred); + channel.close(); + return; + } + ManagedBuffer buf; + try { + streamManager.checkAuthorization(client, msg.streamChunkId.streamId); + streamManager.registerChannel(channel, msg.streamChunkId.streamId); + buf = streamManager.getChunk(msg.streamChunkId.streamId, msg.streamChunkId.chunkIndex); + } catch (Exception e) { + logger.error(String.format("Error opening block %s for request from %s", + msg.streamChunkId, getRemoteAddress(channel)), e); + respond(channel, new ChunkFetchFailure(msg.streamChunkId, Throwables.getStackTraceAsString(e))); + return; + } + + streamManager.chunkBeingSent(msg.streamChunkId.streamId); + respond(channel, new ChunkFetchSuccess(msg.streamChunkId, buf)).addListener( + (ChannelFutureListener) future -> streamManager.chunkSent(msg.streamChunkId.streamId)); + } + + /** + * The invocation to channel.writeAndFlush is async, and the actual I/O on the channel will be handled + * by the EventLoop the channel is registered to. So even though we are processing the ChunkFetchRequest + * in a separate thread pool, the actual I/O, which is the potentially blocking call that could deplete + * server handler threads, is still being processed by TransportServer's default EventLoopGroup. In order + * to throttle the max number of threads that channel I/O for sending response to ChunkFetchRequest, + * the thread calling channel.writeAndFlush will wait for the completion of sending response back to client + * by invoking sync(). This will throttle the rate at which threads from ChunkFetchRequest dedicated + * EventLoopGroup submit channel I/O requests to TransportServer's default EventLoopGroup, thus making sure + * that we can reserve some threads in TransportServer's default EventLoopGroup for handling other RPC + * messages. + */ + private ChannelFuture respond(final Channel channel, final Encodable result) throws InterruptedException { + final SocketAddress remoteAddress = channel.remoteAddress(); + return channel.writeAndFlush(result).sync().addListener((ChannelFutureListener) future -> { + if (future.isSuccess()) { + logger.trace("Sent result {} to client {}", result, remoteAddress); + } else { + logger.error(String.format("Error sending result %s to %s; closing connection", + result, remoteAddress), future.cause()); + channel.close(); + } + }); + } +} diff --git a/common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java b/common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java index 56782a8327876..d2a69f0d7625d 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java +++ b/common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java @@ -19,6 +19,7 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; import org.slf4j.Logger; @@ -26,6 +27,8 @@ import org.apache.spark.network.client.TransportClient; import org.apache.spark.network.client.TransportResponseHandler; +import org.apache.spark.network.protocol.ChunkFetchRequest; +import org.apache.spark.network.protocol.Message; import org.apache.spark.network.protocol.RequestMessage; import org.apache.spark.network.protocol.ResponseMessage; import static org.apache.spark.network.util.NettyUtils.getRemoteAddress; @@ -47,7 +50,7 @@ * on the channel for at least `requestTimeoutMs`. Note that this is duplex traffic; we will not * timeout if the client is continuously sending but getting no responses, for simplicity. */ -public class TransportChannelHandler extends ChannelInboundHandlerAdapter { +public class TransportChannelHandler extends SimpleChannelInboundHandler { private static final Logger logger = LoggerFactory.getLogger(TransportChannelHandler.class); private final TransportClient client; @@ -112,8 +115,21 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception { super.channelInactive(ctx); } + /** + * Overwrite acceptInboundMessage to properly delegate ChunkFetchRequest messages + * to ChunkFetchRequestHandler. + */ @Override - public void channelRead(ChannelHandlerContext ctx, Object request) throws Exception { + public boolean acceptInboundMessage(Object msg) throws Exception { + if (msg instanceof ChunkFetchRequest) { + return false; + } else { + return super.acceptInboundMessage(msg); + } + } + + @Override + public void channelRead0(ChannelHandlerContext ctx, Message request) throws Exception { if (request instanceof RequestMessage) { requestHandler.handle((RequestMessage) request); } else if (request instanceof ResponseMessage) { diff --git a/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java b/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java index c6fd56b9291e5..77332eaa06307 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java +++ b/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java @@ -29,10 +29,17 @@ import org.apache.spark.network.buffer.ManagedBuffer; import org.apache.spark.network.buffer.NioManagedBuffer; -import org.apache.spark.network.client.*; -import org.apache.spark.network.protocol.*; -import org.apache.spark.network.util.TransportFrameDecoder; - +import org.apache.spark.network.client.RpcResponseCallback; +import org.apache.spark.network.client.TransportClient; +import org.apache.spark.network.protocol.Encodable; +import org.apache.spark.network.protocol.OneWayMessage; +import org.apache.spark.network.protocol.RequestMessage; +import org.apache.spark.network.protocol.RpcFailure; +import org.apache.spark.network.protocol.RpcRequest; +import org.apache.spark.network.protocol.RpcResponse; +import org.apache.spark.network.protocol.StreamFailure; +import org.apache.spark.network.protocol.StreamRequest; +import org.apache.spark.network.protocol.StreamResponse; import static org.apache.spark.network.util.NettyUtils.getRemoteAddress; /** @@ -97,9 +104,7 @@ public void channelInactive() { @Override public void handle(RequestMessage request) { - if (request instanceof ChunkFetchRequest) { - processFetchRequest((ChunkFetchRequest) request); - } else if (request instanceof RpcRequest) { + if (request instanceof RpcRequest) { processRpcRequest((RpcRequest) request); } else if (request instanceof OneWayMessage) { processOneWayMessage((OneWayMessage) request); @@ -112,36 +117,6 @@ public void handle(RequestMessage request) { } } - private void processFetchRequest(final ChunkFetchRequest req) { - if (logger.isTraceEnabled()) { - logger.trace("Received req from {} to fetch block {}", getRemoteAddress(channel), - req.streamChunkId); - } - long chunksBeingTransferred = streamManager.chunksBeingTransferred(); - if (chunksBeingTransferred >= maxChunksBeingTransferred) { - logger.warn("The number of chunks being transferred {} is above {}, close the connection.", - chunksBeingTransferred, maxChunksBeingTransferred); - channel.close(); - return; - } - ManagedBuffer buf; - try { - streamManager.checkAuthorization(reverseClient, req.streamChunkId.streamId); - streamManager.registerChannel(channel, req.streamChunkId.streamId); - buf = streamManager.getChunk(req.streamChunkId.streamId, req.streamChunkId.chunkIndex); - } catch (Exception e) { - logger.error(String.format("Error opening block %s for request from %s", - req.streamChunkId, getRemoteAddress(channel)), e); - respond(new ChunkFetchFailure(req.streamChunkId, Throwables.getStackTraceAsString(e))); - return; - } - - streamManager.chunkBeingSent(req.streamChunkId.streamId); - respond(new ChunkFetchSuccess(req.streamChunkId, buf)).addListener(future -> { - streamManager.chunkSent(req.streamChunkId.streamId); - }); - } - private void processStreamRequest(final StreamRequest req) { if (logger.isTraceEnabled()) { logger.trace("Received req from {} to fetch stream {}", getRemoteAddress(channel), diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java index 91497b9492219..a62724325dea2 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java +++ b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java @@ -281,4 +281,23 @@ public Properties cryptoConf() { public long maxChunksBeingTransferred() { return conf.getLong("spark.shuffle.maxChunksBeingTransferred", Long.MAX_VALUE); } + + /** + * Number of threads to process ChunkFetchRequest. Shuffle server will use a separate + * EventLoopGroup to process ChunkFetchRequest messages. Although when calling the + * async writeAndFlush on the underlying channel to send response back to client, + * the I/O on the channel is still being handled by + * {@link org.apache.spark.network.server.TransportServer}'s default EventLoopGroup + * that's registered with the Channel, by waiting inside the ChunkFetchRequest handler + * threads for the completion of sending back responses, we are able to put a limit on + * the max number of threads from TransportServer's default EventLoopGroup that are + * going to be consumed by writing response to ChunkFetchRequest, which are I/O intensive + * and could take long time to process due to disk contentions. By configuring a slightly + * higher number of shuffler server threads, we are able to reserve some threads for + * handling other RPC messages, thus making the Client less likely to experience timeout + * when sending RPC messages to the shuffle server. Default to 0, which is 2*#cores. + */ + public int chunkFetchHandlerThreads() { + return conf.getInt("spark.shuffle.server.chunkFetchHandlerThreads", 0); + } } diff --git a/common/network-common/src/test/java/org/apache/spark/network/ChunkFetchRequestHandlerSuite.java b/common/network-common/src/test/java/org/apache/spark/network/ChunkFetchRequestHandlerSuite.java new file mode 100644 index 0000000000000..9dfea19e9e7b1 --- /dev/null +++ b/common/network-common/src/test/java/org/apache/spark/network/ChunkFetchRequestHandlerSuite.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network; + +import io.netty.channel.ChannelHandlerContext; +import java.util.ArrayList; +import java.util.List; + +import io.netty.channel.Channel; +import org.apache.spark.network.server.ChunkFetchRequestHandler; +import org.junit.Test; + +import static org.mockito.Mockito.*; + +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.spark.network.buffer.ManagedBuffer; +import org.apache.spark.network.client.TransportClient; +import org.apache.spark.network.protocol.*; +import org.apache.spark.network.server.NoOpRpcHandler; +import org.apache.spark.network.server.OneForOneStreamManager; +import org.apache.spark.network.server.RpcHandler; + +public class ChunkFetchRequestHandlerSuite { + + @Test + public void handleChunkFetchRequest() throws Exception { + RpcHandler rpcHandler = new NoOpRpcHandler(); + OneForOneStreamManager streamManager = (OneForOneStreamManager) (rpcHandler.getStreamManager()); + Channel channel = mock(Channel.class); + ChannelHandlerContext context = mock(ChannelHandlerContext.class); + when(context.channel()) + .thenAnswer(invocationOnMock0 -> { + return channel; + }); + List> responseAndPromisePairs = + new ArrayList<>(); + when(channel.writeAndFlush(any())) + .thenAnswer(invocationOnMock0 -> { + Object response = invocationOnMock0.getArguments()[0]; + ExtendedChannelPromise channelFuture = new ExtendedChannelPromise(channel); + responseAndPromisePairs.add(ImmutablePair.of(response, channelFuture)); + return channelFuture; + }); + + // Prepare the stream. + List managedBuffers = new ArrayList<>(); + managedBuffers.add(new TestManagedBuffer(10)); + managedBuffers.add(new TestManagedBuffer(20)); + managedBuffers.add(new TestManagedBuffer(30)); + managedBuffers.add(new TestManagedBuffer(40)); + long streamId = streamManager.registerStream("test-app", managedBuffers.iterator()); + streamManager.registerChannel(channel, streamId); + TransportClient reverseClient = mock(TransportClient.class); + ChunkFetchRequestHandler requestHandler = new ChunkFetchRequestHandler(reverseClient, + rpcHandler.getStreamManager(), 2L); + + RequestMessage request0 = new ChunkFetchRequest(new StreamChunkId(streamId, 0)); + requestHandler.channelRead(context, request0); + assert responseAndPromisePairs.size() == 1; + assert responseAndPromisePairs.get(0).getLeft() instanceof ChunkFetchSuccess; + assert ((ChunkFetchSuccess) (responseAndPromisePairs.get(0).getLeft())).body() == + managedBuffers.get(0); + + RequestMessage request1 = new ChunkFetchRequest(new StreamChunkId(streamId, 1)); + requestHandler.channelRead(context, request1); + assert responseAndPromisePairs.size() == 2; + assert responseAndPromisePairs.get(1).getLeft() instanceof ChunkFetchSuccess; + assert ((ChunkFetchSuccess) (responseAndPromisePairs.get(1).getLeft())).body() == + managedBuffers.get(1); + + // Finish flushing the response for request0. + responseAndPromisePairs.get(0).getRight().finish(true); + + RequestMessage request2 = new ChunkFetchRequest(new StreamChunkId(streamId, 2)); + requestHandler.channelRead(context, request2); + assert responseAndPromisePairs.size() == 3; + assert responseAndPromisePairs.get(2).getLeft() instanceof ChunkFetchSuccess; + assert ((ChunkFetchSuccess) (responseAndPromisePairs.get(2).getLeft())).body() == + managedBuffers.get(2); + + RequestMessage request3 = new ChunkFetchRequest(new StreamChunkId(streamId, 3)); + requestHandler.channelRead(context, request3); + verify(channel, times(1)).close(); + assert responseAndPromisePairs.size() == 3; + } +} diff --git a/common/network-common/src/test/java/org/apache/spark/network/ExtendedChannelPromise.java b/common/network-common/src/test/java/org/apache/spark/network/ExtendedChannelPromise.java new file mode 100644 index 0000000000000..bd59ade4f4ac6 --- /dev/null +++ b/common/network-common/src/test/java/org/apache/spark/network/ExtendedChannelPromise.java @@ -0,0 +1,53 @@ +package org.apache.spark.network; + +import java.util.ArrayList; +import java.util.List; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelPromise; +import io.netty.channel.DefaultChannelPromise; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.GenericFutureListener; + + +class ExtendedChannelPromise extends DefaultChannelPromise { + + private List>> listeners = new ArrayList<>(); + private boolean success; + + ExtendedChannelPromise(Channel channel) { + super(channel); + success = false; + } + + @Override + public ChannelPromise addListener( + GenericFutureListener> listener) { + @SuppressWarnings("unchecked") + GenericFutureListener> gfListener = + (GenericFutureListener>) listener; + listeners.add(gfListener); + return super.addListener(listener); + } + + @Override + public boolean isSuccess() { + return success; + } + + @Override + public ChannelPromise sync() throws InterruptedException { + return this; + } + + public void finish(boolean success) { + this.success = success; + listeners.forEach(listener -> { + try { + listener.operationComplete(this); + } catch (Exception e) { + // do nothing + } + }); + } +} \ No newline at end of file diff --git a/common/network-common/src/test/java/org/apache/spark/network/TransportRequestHandlerSuite.java b/common/network-common/src/test/java/org/apache/spark/network/TransportRequestHandlerSuite.java index 2656cbee95a20..bf6a03f9ed0f4 100644 --- a/common/network-common/src/test/java/org/apache/spark/network/TransportRequestHandlerSuite.java +++ b/common/network-common/src/test/java/org/apache/spark/network/TransportRequestHandlerSuite.java @@ -21,10 +21,6 @@ import java.util.List; import io.netty.channel.Channel; -import io.netty.channel.ChannelPromise; -import io.netty.channel.DefaultChannelPromise; -import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.GenericFutureListener; import org.junit.Test; import static org.mockito.Mockito.*; @@ -42,7 +38,7 @@ public class TransportRequestHandlerSuite { @Test - public void handleFetchRequestAndStreamRequest() throws Exception { + public void handleStreamRequest() throws Exception { RpcHandler rpcHandler = new NoOpRpcHandler(); OneForOneStreamManager streamManager = (OneForOneStreamManager) (rpcHandler.getStreamManager()); Channel channel = mock(Channel.class); @@ -68,19 +64,19 @@ public void handleFetchRequestAndStreamRequest() throws Exception { TransportRequestHandler requestHandler = new TransportRequestHandler(channel, reverseClient, rpcHandler, 2L); - RequestMessage request0 = new ChunkFetchRequest(new StreamChunkId(streamId, 0)); + RequestMessage request0 = new StreamRequest(String.format("%d_%d", streamId, 0)); requestHandler.handle(request0); assert responseAndPromisePairs.size() == 1; - assert responseAndPromisePairs.get(0).getLeft() instanceof ChunkFetchSuccess; - assert ((ChunkFetchSuccess) (responseAndPromisePairs.get(0).getLeft())).body() == - managedBuffers.get(0); + assert responseAndPromisePairs.get(0).getLeft() instanceof StreamResponse; + assert ((StreamResponse) (responseAndPromisePairs.get(0).getLeft())).body() == + managedBuffers.get(0); - RequestMessage request1 = new ChunkFetchRequest(new StreamChunkId(streamId, 1)); + RequestMessage request1 = new StreamRequest(String.format("%d_%d", streamId, 1)); requestHandler.handle(request1); assert responseAndPromisePairs.size() == 2; - assert responseAndPromisePairs.get(1).getLeft() instanceof ChunkFetchSuccess; - assert ((ChunkFetchSuccess) (responseAndPromisePairs.get(1).getLeft())).body() == - managedBuffers.get(1); + assert responseAndPromisePairs.get(1).getLeft() instanceof StreamResponse; + assert ((StreamResponse) (responseAndPromisePairs.get(1).getLeft())).body() == + managedBuffers.get(1); // Finish flushing the response for request0. responseAndPromisePairs.get(0).getRight().finish(true); @@ -99,41 +95,4 @@ public void handleFetchRequestAndStreamRequest() throws Exception { verify(channel, times(1)).close(); assert responseAndPromisePairs.size() == 3; } - - private class ExtendedChannelPromise extends DefaultChannelPromise { - - private List>> listeners = new ArrayList<>(); - private boolean success; - - ExtendedChannelPromise(Channel channel) { - super(channel); - success = false; - } - - @Override - public ChannelPromise addListener( - GenericFutureListener> listener) { - @SuppressWarnings("unchecked") - GenericFutureListener> gfListener = - (GenericFutureListener>) listener; - listeners.add(gfListener); - return super.addListener(listener); - } - - @Override - public boolean isSuccess() { - return success; - } - - public void finish(boolean success) { - this.success = success; - listeners.forEach(listener -> { - try { - listener.operationComplete(this); - } catch (Exception e) { - // do nothing - } - }); - } - } } From 3bab74ca84fe1b6682000741b958c8792f792472 Mon Sep 17 00:00:00 2001 From: Sanket Chintapalli Date: Tue, 21 Aug 2018 12:49:50 -0400 Subject: [PATCH 02/24] make chunk fetch handler threads as a percentage of transport server threads --- .../spark/network/util/TransportConf.java | 20 +++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java index a62724325dea2..0441a828c4190 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java +++ b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java @@ -21,6 +21,7 @@ import java.util.Properties; import com.google.common.primitives.Ints; +import io.netty.util.NettyRuntime; /** * A central location that tracks all the settings we expose to users. @@ -283,10 +284,10 @@ public long maxChunksBeingTransferred() { } /** - * Number of threads to process ChunkFetchRequest. Shuffle server will use a separate - * EventLoopGroup to process ChunkFetchRequest messages. Although when calling the - * async writeAndFlush on the underlying channel to send response back to client, - * the I/O on the channel is still being handled by + * Percentage of io.serverThreads used by netty to process ChunkFetchRequest. + * Shuffle server will use a separate EventLoopGroup to process ChunkFetchRequest messages. + * Although when calling the async writeAndFlush on the underlying channel to send + * response back to client, the I/O on the channel is still being handled by * {@link org.apache.spark.network.server.TransportServer}'s default EventLoopGroup * that's registered with the Channel, by waiting inside the ChunkFetchRequest handler * threads for the completion of sending back responses, we are able to put a limit on @@ -295,9 +296,16 @@ public long maxChunksBeingTransferred() { * and could take long time to process due to disk contentions. By configuring a slightly * higher number of shuffler server threads, we are able to reserve some threads for * handling other RPC messages, thus making the Client less likely to experience timeout - * when sending RPC messages to the shuffle server. Default to 0, which is 2*#cores. + * when sending RPC messages to the shuffle server. Default to 0, which is 2*#cores + * or io.serverThreads. 10 would mean 10% of 2*#cores or 10% of io.serverThreads + * which equals 0.1 * 2*#cores or 0.1 * io.serverThreads. */ public int chunkFetchHandlerThreads() { - return conf.getInt("spark.shuffle.server.chunkFetchHandlerThreads", 0); + if(!this.getModuleName().equalsIgnoreCase("shuffle")) { + return 0; + } + int chunkFetchHandlerThreadsPercent = conf.getInt("spark.shuffle.server.chunkFetchHandlerThreadsPercent", 0); + return this.serverThreads() > 0? (this.serverThreads() * chunkFetchHandlerThreadsPercent)/100: + (2* NettyRuntime.availableProcessors() * chunkFetchHandlerThreadsPercent)/100; } } From cc40d9beb518d961f021835228d9ada60f1388cf Mon Sep 17 00:00:00 2001 From: Sanket Chintapalli Date: Mon, 27 Aug 2018 10:17:57 -0400 Subject: [PATCH 03/24] Add apache license header --- .../spark/network/ExtendedChannelPromise.java | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/common/network-common/src/test/java/org/apache/spark/network/ExtendedChannelPromise.java b/common/network-common/src/test/java/org/apache/spark/network/ExtendedChannelPromise.java index bd59ade4f4ac6..27fa8784f3c05 100644 --- a/common/network-common/src/test/java/org/apache/spark/network/ExtendedChannelPromise.java +++ b/common/network-common/src/test/java/org/apache/spark/network/ExtendedChannelPromise.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.spark.network; import java.util.ArrayList; @@ -50,4 +67,4 @@ public void finish(boolean success) { } }); } -} \ No newline at end of file +} From d86503cf34f66d7082df8677e78f5f793e1064a0 Mon Sep 17 00:00:00 2001 From: Sanket Chintapalli Date: Mon, 27 Aug 2018 12:24:02 -0400 Subject: [PATCH 04/24] do not create event loops for other rpcs except shuffle complete expanion of imports rearrange imports --- .../org/apache/spark/network/TransportContext.java | 13 ++++++++----- .../network/server/TransportRequestHandler.java | 5 +++++ 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java b/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java index 1af06af08cd63..c51927959b977 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java +++ b/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java @@ -21,6 +21,7 @@ import java.util.List; import io.netty.channel.Channel; +import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.handler.timeout.IdleStateHandler; @@ -98,7 +99,7 @@ public TransportContext( this.closeIdleConnections = closeIdleConnections; synchronized(this.getClass()) { - if (chunkFetchWorkers == null) { + if (chunkFetchWorkers == null && conf.getModuleName().equalsIgnoreCase("shuffle")) { chunkFetchWorkers = NettyUtils.createEventLoop( IOMode.valueOf(conf.ioMode()), conf.chunkFetchHandlerThreads(), @@ -162,16 +163,18 @@ public TransportChannelHandler initializePipeline( try { TransportChannelHandler channelHandler = createChannelHandler(channel, channelRpcHandler); ChunkFetchRequestHandler chunkFetchHandler = createChunkFetchHandler(channelHandler, channelRpcHandler); - channel.pipeline() + ChannelPipeline pipeline = channel.pipeline() .addLast("encoder", ENCODER) .addLast(TransportFrameDecoder.HANDLER_NAME, NettyUtils.createFrameDecoder()) .addLast("decoder", DECODER) .addLast("idleStateHandler", new IdleStateHandler(0, 0, conf.connectionTimeoutMs() / 1000)) // NOTE: Chunks are currently guaranteed to be returned in the order of request, but this // would require more logic to guarantee if this were not part of the same event loop. - .addLast("handler", channelHandler) - // Use a separate EventLoopGroup to handle ChunkFetchRequest messages. - .addLast(chunkFetchWorkers, "chunkFetchHandler", chunkFetchHandler); + .addLast("handler", channelHandler); + // Use a separate EventLoopGroup to handle ChunkFetchRequest messages for shuffle rpcs. + if (conf.getModuleName().equalsIgnoreCase("shuffle")) { + pipeline.addLast(chunkFetchWorkers, "chunkFetchHandler", chunkFetchHandler); + } return channelHandler; } catch (RuntimeException e) { logger.error("Error while initializing Netty pipeline", e); diff --git a/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java b/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java index 77332eaa06307..54b275145c406 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java +++ b/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java @@ -24,6 +24,7 @@ import com.google.common.base.Throwables; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -31,6 +32,8 @@ import org.apache.spark.network.buffer.NioManagedBuffer; import org.apache.spark.network.client.RpcResponseCallback; import org.apache.spark.network.client.TransportClient; +import org.apache.spark.network.client.StreamCallbackWithID; +import org.apache.spark.network.client.StreamInterceptor; import org.apache.spark.network.protocol.Encodable; import org.apache.spark.network.protocol.OneWayMessage; import org.apache.spark.network.protocol.RequestMessage; @@ -40,6 +43,8 @@ import org.apache.spark.network.protocol.StreamFailure; import org.apache.spark.network.protocol.StreamRequest; import org.apache.spark.network.protocol.StreamResponse; +import org.apache.spark.network.protocol.UploadStream; +import org.apache.spark.network.util.TransportFrameDecoder; import static org.apache.spark.network.util.NettyUtils.getRemoteAddress; /** From 470e9a60c8708ebe5fc819ba95481a59b6dcfc82 Mon Sep 17 00:00:00 2001 From: Sanket Chintapalli Date: Tue, 28 Aug 2018 09:16:27 -0400 Subject: [PATCH 05/24] add null check --- .../main/java/org/apache/spark/network/TransportContext.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java b/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java index c51927959b977..95dc9e5fb1453 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java +++ b/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java @@ -99,7 +99,8 @@ public TransportContext( this.closeIdleConnections = closeIdleConnections; synchronized(this.getClass()) { - if (chunkFetchWorkers == null && conf.getModuleName().equalsIgnoreCase("shuffle")) { + if (chunkFetchWorkers == null && conf.getModuleName() != null && + conf.getModuleName().equalsIgnoreCase("shuffle")) { chunkFetchWorkers = NettyUtils.createEventLoop( IOMode.valueOf(conf.ioMode()), conf.chunkFetchHandlerThreads(), @@ -172,7 +173,7 @@ public TransportChannelHandler initializePipeline( // would require more logic to guarantee if this were not part of the same event loop. .addLast("handler", channelHandler); // Use a separate EventLoopGroup to handle ChunkFetchRequest messages for shuffle rpcs. - if (conf.getModuleName().equalsIgnoreCase("shuffle")) { + if (conf.getModuleName() != null && conf.getModuleName().equalsIgnoreCase("shuffle")) { pipeline.addLast(chunkFetchWorkers, "chunkFetchHandler", chunkFetchHandler); } return channelHandler; From dcc41f5f6a26878dd5c7764e432bcf10cb04d5e8 Mon Sep 17 00:00:00 2001 From: Sanket Chintapalli Date: Wed, 29 Aug 2018 10:51:34 -0400 Subject: [PATCH 06/24] fix styling issues --- .../spark/network/TransportContext.java | 6 ++-- .../server/ChunkFetchRequestHandler.java | 28 +++++++++++-------- .../server/TransportChannelHandler.java | 1 - .../spark/network/util/TransportConf.java | 3 +- 4 files changed, 22 insertions(+), 16 deletions(-) diff --git a/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java b/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java index 95dc9e5fb1453..fbd6a8eaddcc4 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java +++ b/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java @@ -163,12 +163,14 @@ public TransportChannelHandler initializePipeline( RpcHandler channelRpcHandler) { try { TransportChannelHandler channelHandler = createChannelHandler(channel, channelRpcHandler); - ChunkFetchRequestHandler chunkFetchHandler = createChunkFetchHandler(channelHandler, channelRpcHandler); + ChunkFetchRequestHandler chunkFetchHandler = + createChunkFetchHandler(channelHandler, channelRpcHandler); ChannelPipeline pipeline = channel.pipeline() .addLast("encoder", ENCODER) .addLast(TransportFrameDecoder.HANDLER_NAME, NettyUtils.createFrameDecoder()) .addLast("decoder", DECODER) - .addLast("idleStateHandler", new IdleStateHandler(0, 0, conf.connectionTimeoutMs() / 1000)) + .addLast("idleStateHandler", + new IdleStateHandler(0, 0, conf.connectionTimeoutMs() / 1000)) // NOTE: Chunks are currently guaranteed to be returned in the order of request, but this // would require more logic to guarantee if this were not part of the same event loop. .addLast("handler", channelHandler); diff --git a/common/network-common/src/main/java/org/apache/spark/network/server/ChunkFetchRequestHandler.java b/common/network-common/src/main/java/org/apache/spark/network/server/ChunkFetchRequestHandler.java index cf5d60da53fbb..a1aa5a26d4721 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/server/ChunkFetchRequestHandler.java +++ b/common/network-common/src/main/java/org/apache/spark/network/server/ChunkFetchRequestHandler.java @@ -74,7 +74,8 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E } @Override - protected void channelRead0(ChannelHandlerContext ctx, final ChunkFetchRequest msg) throws Exception { + protected void channelRead0(ChannelHandlerContext ctx, + final ChunkFetchRequest msg) throws Exception { Channel channel = ctx.channel(); if (logger.isTraceEnabled()) { logger.trace("Received req from {} to fetch block {}", getRemoteAddress(channel), @@ -95,7 +96,9 @@ protected void channelRead0(ChannelHandlerContext ctx, final ChunkFetchRequest m } catch (Exception e) { logger.error(String.format("Error opening block %s for request from %s", msg.streamChunkId, getRemoteAddress(channel)), e); - respond(channel, new ChunkFetchFailure(msg.streamChunkId, Throwables.getStackTraceAsString(e))); + respond(channel, + new ChunkFetchFailure(msg.streamChunkId, + Throwables.getStackTraceAsString(e))); return; } @@ -105,16 +108,17 @@ protected void channelRead0(ChannelHandlerContext ctx, final ChunkFetchRequest m } /** - * The invocation to channel.writeAndFlush is async, and the actual I/O on the channel will be handled - * by the EventLoop the channel is registered to. So even though we are processing the ChunkFetchRequest - * in a separate thread pool, the actual I/O, which is the potentially blocking call that could deplete - * server handler threads, is still being processed by TransportServer's default EventLoopGroup. In order - * to throttle the max number of threads that channel I/O for sending response to ChunkFetchRequest, - * the thread calling channel.writeAndFlush will wait for the completion of sending response back to client - * by invoking sync(). This will throttle the rate at which threads from ChunkFetchRequest dedicated - * EventLoopGroup submit channel I/O requests to TransportServer's default EventLoopGroup, thus making sure - * that we can reserve some threads in TransportServer's default EventLoopGroup for handling other RPC - * messages. + * The invocation to channel.writeAndFlush is async, and the actual I/O on the + * channel will be handled by the EventLoop the channel is registered to. So even + * though we are processing the ChunkFetchRequest in a separate thread pool, the actual I/O, + * which is the potentially blocking call that could deplete server handler threads, is still + * being processed by TransportServer's default EventLoopGroup. In order to throttle the max + * number of threads that channel I/O for sending response to ChunkFetchRequest, the thread + * calling channel.writeAndFlush will wait for the completion of sending response back to + * client by invoking sync(). This will throttle the rate at which threads from + * ChunkFetchRequest dedicated EventLoopGroup submit channel I/O requests to TransportServer's + * default EventLoopGroup, thus making sure that we can reserve some threads in + * TransportServer's default EventLoopGroup for handling other RPC messages. */ private ChannelFuture respond(final Channel channel, final Encodable result) throws InterruptedException { final SocketAddress remoteAddress = channel.remoteAddress(); diff --git a/common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java b/common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java index d2a69f0d7625d..c824a7b0d4740 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java +++ b/common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java @@ -18,7 +18,6 @@ package org.apache.spark.network.server; import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java index 0441a828c4190..a0bf6d37e4f68 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java +++ b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java @@ -304,7 +304,8 @@ public int chunkFetchHandlerThreads() { if(!this.getModuleName().equalsIgnoreCase("shuffle")) { return 0; } - int chunkFetchHandlerThreadsPercent = conf.getInt("spark.shuffle.server.chunkFetchHandlerThreadsPercent", 0); + int chunkFetchHandlerThreadsPercent = + conf.getInt("spark.shuffle.server.chunkFetchHandlerThreadsPercent", 0); return this.serverThreads() > 0? (this.serverThreads() * chunkFetchHandlerThreadsPercent)/100: (2* NettyRuntime.availableProcessors() * chunkFetchHandlerThreadsPercent)/100; } From b1105bd159bbb8a57947d38223ca7dddf0a35299 Mon Sep 17 00:00:00 2001 From: Sanket Chintapalli Date: Wed, 29 Aug 2018 14:54:24 -0400 Subject: [PATCH 07/24] fix more style requests --- .../apache/spark/network/server/ChunkFetchRequestHandler.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/common/network-common/src/main/java/org/apache/spark/network/server/ChunkFetchRequestHandler.java b/common/network-common/src/main/java/org/apache/spark/network/server/ChunkFetchRequestHandler.java index a1aa5a26d4721..63c8e6fe91b9d 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/server/ChunkFetchRequestHandler.java +++ b/common/network-common/src/main/java/org/apache/spark/network/server/ChunkFetchRequestHandler.java @@ -120,7 +120,8 @@ protected void channelRead0(ChannelHandlerContext ctx, * default EventLoopGroup, thus making sure that we can reserve some threads in * TransportServer's default EventLoopGroup for handling other RPC messages. */ - private ChannelFuture respond(final Channel channel, final Encodable result) throws InterruptedException { + private ChannelFuture respond(final Channel channel, + final Encodable result) throws InterruptedException { final SocketAddress remoteAddress = channel.remoteAddress(); return channel.writeAndFlush(result).sync().addListener((ChannelFutureListener) future -> { if (future.isSuccess()) { From 8153de5e4bebcad3e0faa497102898bd0bcc4483 Mon Sep 17 00:00:00 2001 From: Sanket Chintapalli Date: Mon, 10 Sep 2018 08:46:48 -0400 Subject: [PATCH 08/24] address nits --- .../org/apache/spark/network/util/TransportConf.java | 10 +++++----- .../apache/spark/network/ExtendedChannelPromise.java | 1 - 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java index a0bf6d37e4f68..35c6e6d0c039b 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java +++ b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java @@ -297,16 +297,16 @@ public long maxChunksBeingTransferred() { * higher number of shuffler server threads, we are able to reserve some threads for * handling other RPC messages, thus making the Client less likely to experience timeout * when sending RPC messages to the shuffle server. Default to 0, which is 2*#cores - * or io.serverThreads. 10 would mean 10% of 2*#cores or 10% of io.serverThreads - * which equals 0.1 * 2*#cores or 0.1 * io.serverThreads. + * or io.serverThreads. 90 would mean 90% of 2*#cores or 90% of io.serverThreads + * which equals 0.9 * 2*#cores or 0.9 * io.serverThreads. */ public int chunkFetchHandlerThreads() { - if(!this.getModuleName().equalsIgnoreCase("shuffle")) { + if (!this.getModuleName().equalsIgnoreCase("shuffle")) { return 0; } int chunkFetchHandlerThreadsPercent = conf.getInt("spark.shuffle.server.chunkFetchHandlerThreadsPercent", 0); - return this.serverThreads() > 0? (this.serverThreads() * chunkFetchHandlerThreadsPercent)/100: - (2* NettyRuntime.availableProcessors() * chunkFetchHandlerThreadsPercent)/100; + return this.serverThreads() > 0 ? (this.serverThreads() * chunkFetchHandlerThreadsPercent)/100: + (2 * NettyRuntime.availableProcessors() * chunkFetchHandlerThreadsPercent)/100; } } diff --git a/common/network-common/src/test/java/org/apache/spark/network/ExtendedChannelPromise.java b/common/network-common/src/test/java/org/apache/spark/network/ExtendedChannelPromise.java index 27fa8784f3c05..4d522c3ea7425 100644 --- a/common/network-common/src/test/java/org/apache/spark/network/ExtendedChannelPromise.java +++ b/common/network-common/src/test/java/org/apache/spark/network/ExtendedChannelPromise.java @@ -26,7 +26,6 @@ import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GenericFutureListener; - class ExtendedChannelPromise extends DefaultChannelPromise { private List>> listeners = new ArrayList<>(); From b488f0c11422488b7d68652ed4780950d1245557 Mon Sep 17 00:00:00 2001 From: Sanket Chintapalli Date: Tue, 18 Sep 2018 15:52:07 -0400 Subject: [PATCH 09/24] address nits --- .../spark/network/TransportContext.java | 13 ++++++----- .../server/ChunkFetchRequestHandler.java | 22 +++++++++---------- .../server/TransportRequestHandler.java | 18 ++++++--------- 3 files changed, 24 insertions(+), 29 deletions(-) diff --git a/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java b/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java index fbd6a8eaddcc4..3aa8cf4c7e712 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java +++ b/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java @@ -98,13 +98,14 @@ public TransportContext( this.rpcHandler = rpcHandler; this.closeIdleConnections = closeIdleConnections; - synchronized(this.getClass()) { - if (chunkFetchWorkers == null && conf.getModuleName() != null && - conf.getModuleName().equalsIgnoreCase("shuffle")) { + synchronized(TransportContext.class) { + if (chunkFetchWorkers == null && + conf.getModuleName() != null && + conf.getModuleName().equalsIgnoreCase("shuffle")) { chunkFetchWorkers = NettyUtils.createEventLoop( IOMode.valueOf(conf.ioMode()), conf.chunkFetchHandlerThreads(), - "chunk-fetch-handler"); + "shuffle-chunk-fetch-handler"); } } } @@ -164,13 +165,13 @@ public TransportChannelHandler initializePipeline( try { TransportChannelHandler channelHandler = createChannelHandler(channel, channelRpcHandler); ChunkFetchRequestHandler chunkFetchHandler = - createChunkFetchHandler(channelHandler, channelRpcHandler); + createChunkFetchHandler(channelHandler, channelRpcHandler); ChannelPipeline pipeline = channel.pipeline() .addLast("encoder", ENCODER) .addLast(TransportFrameDecoder.HANDLER_NAME, NettyUtils.createFrameDecoder()) .addLast("decoder", DECODER) .addLast("idleStateHandler", - new IdleStateHandler(0, 0, conf.connectionTimeoutMs() / 1000)) + new IdleStateHandler(0, 0, conf.connectionTimeoutMs() / 1000)) // NOTE: Chunks are currently guaranteed to be returned in the order of request, but this // would require more logic to guarantee if this were not part of the same event loop. .addLast("handler", channelHandler); diff --git a/common/network-common/src/main/java/org/apache/spark/network/server/ChunkFetchRequestHandler.java b/common/network-common/src/main/java/org/apache/spark/network/server/ChunkFetchRequestHandler.java index 63c8e6fe91b9d..9a19e14752631 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/server/ChunkFetchRequestHandler.java +++ b/common/network-common/src/main/java/org/apache/spark/network/server/ChunkFetchRequestHandler.java @@ -68,23 +68,22 @@ public ChunkFetchRequestHandler( @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { - logger.warn("Exception in connection from " + getRemoteAddress(ctx.channel()), - cause); + logger.warn("Exception in connection from " + getRemoteAddress(ctx.channel()), cause); ctx.close(); } @Override - protected void channelRead0(ChannelHandlerContext ctx, - final ChunkFetchRequest msg) throws Exception { + protected void channelRead0(ChannelHandlerContext ctx, final ChunkFetchRequest msg) + throws Exception { Channel channel = ctx.channel(); if (logger.isTraceEnabled()) { logger.trace("Received req from {} to fetch block {}", getRemoteAddress(channel), - msg.streamChunkId); + msg.streamChunkId); } long chunksBeingTransferred = streamManager.chunksBeingTransferred(); if (chunksBeingTransferred >= maxChunksBeingTransferred) { logger.warn("The number of chunks being transferred {} is above {}, close the connection.", - chunksBeingTransferred, maxChunksBeingTransferred); + chunksBeingTransferred, maxChunksBeingTransferred); channel.close(); return; } @@ -95,16 +94,15 @@ protected void channelRead0(ChannelHandlerContext ctx, buf = streamManager.getChunk(msg.streamChunkId.streamId, msg.streamChunkId.chunkIndex); } catch (Exception e) { logger.error(String.format("Error opening block %s for request from %s", - msg.streamChunkId, getRemoteAddress(channel)), e); - respond(channel, - new ChunkFetchFailure(msg.streamChunkId, - Throwables.getStackTraceAsString(e))); + msg.streamChunkId, getRemoteAddress(channel)), e); + respond(channel, new ChunkFetchFailure(msg.streamChunkId, + Throwables.getStackTraceAsString(e))); return; } streamManager.chunkBeingSent(msg.streamChunkId.streamId); respond(channel, new ChunkFetchSuccess(msg.streamChunkId, buf)).addListener( - (ChannelFutureListener) future -> streamManager.chunkSent(msg.streamChunkId.streamId)); + (ChannelFutureListener) future -> streamManager.chunkSent(msg.streamChunkId.streamId)); } /** @@ -128,7 +126,7 @@ private ChannelFuture respond(final Channel channel, logger.trace("Sent result {} to client {}", result, remoteAddress); } else { logger.error(String.format("Error sending result %s to %s; closing connection", - result, remoteAddress), future.cause()); + result, remoteAddress), future.cause()); channel.close(); } }); diff --git a/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java b/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java index 54b275145c406..6130df48c4ac6 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java +++ b/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java @@ -25,18 +25,7 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.spark.network.buffer.ManagedBuffer; -import org.apache.spark.network.buffer.NioManagedBuffer; -import org.apache.spark.network.client.RpcResponseCallback; -import org.apache.spark.network.client.TransportClient; -import org.apache.spark.network.client.StreamCallbackWithID; -import org.apache.spark.network.client.StreamInterceptor; -import org.apache.spark.network.protocol.Encodable; import org.apache.spark.network.protocol.OneWayMessage; -import org.apache.spark.network.protocol.RequestMessage; import org.apache.spark.network.protocol.RpcFailure; import org.apache.spark.network.protocol.RpcRequest; import org.apache.spark.network.protocol.RpcResponse; @@ -44,6 +33,13 @@ import org.apache.spark.network.protocol.StreamRequest; import org.apache.spark.network.protocol.StreamResponse; import org.apache.spark.network.protocol.UploadStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.spark.network.buffer.ManagedBuffer; +import org.apache.spark.network.buffer.NioManagedBuffer; +import org.apache.spark.network.client.*; +import org.apache.spark.network.protocol.*; import org.apache.spark.network.util.TransportFrameDecoder; import static org.apache.spark.network.util.NettyUtils.getRemoteAddress; From 9bf5fe09c0b21e78357ef5ef7b41063c6fd7ef88 Mon Sep 17 00:00:00 2001 From: Sanket Chintapalli Date: Tue, 18 Sep 2018 17:03:58 -0400 Subject: [PATCH 10/24] Avoid creating event loop if it is a shuffle client --- .../apache/spark/network/TransportContext.java | 7 +++++-- .../apache/spark/network/util/TransportConf.java | 15 +++++++++++++++ .../org/apache/spark/storage/BlockManager.scala | 1 + 3 files changed, 21 insertions(+), 2 deletions(-) diff --git a/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java b/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java index 3aa8cf4c7e712..898728d696aa4 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java +++ b/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java @@ -101,7 +101,8 @@ public TransportContext( synchronized(TransportContext.class) { if (chunkFetchWorkers == null && conf.getModuleName() != null && - conf.getModuleName().equalsIgnoreCase("shuffle")) { + conf.getModuleName().equalsIgnoreCase("shuffle") && + !conf.shuffleClient()) { chunkFetchWorkers = NettyUtils.createEventLoop( IOMode.valueOf(conf.ioMode()), conf.chunkFetchHandlerThreads(), @@ -176,7 +177,9 @@ public TransportChannelHandler initializePipeline( // would require more logic to guarantee if this were not part of the same event loop. .addLast("handler", channelHandler); // Use a separate EventLoopGroup to handle ChunkFetchRequest messages for shuffle rpcs. - if (conf.getModuleName() != null && conf.getModuleName().equalsIgnoreCase("shuffle")) { + if (conf.getModuleName() != null && + conf.getModuleName().equalsIgnoreCase("shuffle") + && !conf.shuffleClient()) { pipeline.addLast(chunkFetchWorkers, "chunkFetchHandler", chunkFetchHandler); } return channelHandler; diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java index 35c6e6d0c039b..4b2910fa556df 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java +++ b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java @@ -47,6 +47,8 @@ public class TransportConf { private final String module; + private boolean isShuffleClient; + public TransportConf(String module, ConfigProvider conf) { this.module = module; this.conf = conf; @@ -283,6 +285,19 @@ public long maxChunksBeingTransferred() { return conf.getLong("spark.shuffle.maxChunksBeingTransferred", Long.MAX_VALUE); } + /** + * Check if it is a shuffle client + * and avoid creating unnecessary event loops + * in the TransportClientHandler + */ + public boolean shuffleClient() { + return this.isShuffleClient; + } + + public void setShuffleClient(boolean isShuffleClient) { + this.isShuffleClient = isShuffleClient; + } + /** * Percentage of io.serverThreads used by netty to process ChunkFetchRequest. * Shuffle server will use a separate EventLoopGroup to process ChunkFetchRequest messages. diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 5cd21e31c9554..43a13c20acce9 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -185,6 +185,7 @@ private[spark] class BlockManager( // standard BlockTransferService to directly connect to other Executors. private[spark] val shuffleClient = if (externalShuffleServiceEnabled) { val transConf = SparkTransportConf.fromSparkConf(conf, "shuffle", numUsableCores) + transConf.setShuffleClient(true); new ExternalShuffleClient(transConf, securityManager, securityManager.isAuthenticationEnabled(), conf.get(config.SHUFFLE_REGISTRATION_TIMEOUT)) } else { From 0a43f223fb97da4fcf355dba945a3a350245c5f3 Mon Sep 17 00:00:00 2001 From: Sanket Chintapalli Date: Tue, 18 Sep 2018 18:58:57 -0400 Subject: [PATCH 11/24] Use await instead of sync --- .../apache/spark/network/server/ChunkFetchRequestHandler.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/network-common/src/main/java/org/apache/spark/network/server/ChunkFetchRequestHandler.java b/common/network-common/src/main/java/org/apache/spark/network/server/ChunkFetchRequestHandler.java index 9a19e14752631..d5194524e2811 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/server/ChunkFetchRequestHandler.java +++ b/common/network-common/src/main/java/org/apache/spark/network/server/ChunkFetchRequestHandler.java @@ -121,7 +121,7 @@ protected void channelRead0(ChannelHandlerContext ctx, final ChunkFetchRequest m private ChannelFuture respond(final Channel channel, final Encodable result) throws InterruptedException { final SocketAddress remoteAddress = channel.remoteAddress(); - return channel.writeAndFlush(result).sync().addListener((ChannelFutureListener) future -> { + return channel.writeAndFlush(result).await().addListener((ChannelFutureListener) future -> { if (future.isSuccess()) { logger.trace("Sent result {} to client {}", result, remoteAddress); } else { From f05924a52f8dcb0c8acc928108628d9ddf5c33d9 Mon Sep 17 00:00:00 2001 From: Sanket Chintapalli Date: Wed, 19 Sep 2018 09:34:47 -0400 Subject: [PATCH 12/24] change description from sync to await --- .../apache/spark/network/server/ChunkFetchRequestHandler.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/network-common/src/main/java/org/apache/spark/network/server/ChunkFetchRequestHandler.java b/common/network-common/src/main/java/org/apache/spark/network/server/ChunkFetchRequestHandler.java index d5194524e2811..1951dcd53ce3c 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/server/ChunkFetchRequestHandler.java +++ b/common/network-common/src/main/java/org/apache/spark/network/server/ChunkFetchRequestHandler.java @@ -113,7 +113,7 @@ protected void channelRead0(ChannelHandlerContext ctx, final ChunkFetchRequest m * being processed by TransportServer's default EventLoopGroup. In order to throttle the max * number of threads that channel I/O for sending response to ChunkFetchRequest, the thread * calling channel.writeAndFlush will wait for the completion of sending response back to - * client by invoking sync(). This will throttle the rate at which threads from + * client by invoking await(). This will throttle the rate at which threads from * ChunkFetchRequest dedicated EventLoopGroup submit channel I/O requests to TransportServer's * default EventLoopGroup, thus making sure that we can reserve some threads in * TransportServer's default EventLoopGroup for handling other RPC messages. From ec912d6ca4e6911de218dc48ea05eecf9f0ff23e Mon Sep 17 00:00:00 2001 From: Sanket Chintapalli Date: Wed, 19 Sep 2018 09:38:51 -0400 Subject: [PATCH 13/24] nit description --- .../main/java/org/apache/spark/network/util/TransportConf.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java index 4b2910fa556df..432246f7a6cdc 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java +++ b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java @@ -288,7 +288,7 @@ public long maxChunksBeingTransferred() { /** * Check if it is a shuffle client * and avoid creating unnecessary event loops - * in the TransportClientHandler + * in the TransportChannelHandler */ public boolean shuffleClient() { return this.isShuffleClient; From 573033c5b42abf9220b6bf656b4c2f04ea615ab7 Mon Sep 17 00:00:00 2001 From: Sanket Chintapalli Date: Wed, 19 Sep 2018 10:23:31 -0400 Subject: [PATCH 14/24] address nits --- .../spark/network/server/ChunkFetchRequestHandler.java | 10 ++++++---- .../spark/network/server/TransportRequestHandler.java | 9 +-------- .../org/apache/spark/network/util/TransportConf.java | 4 ++-- 3 files changed, 9 insertions(+), 14 deletions(-) diff --git a/common/network-common/src/main/java/org/apache/spark/network/server/ChunkFetchRequestHandler.java b/common/network-common/src/main/java/org/apache/spark/network/server/ChunkFetchRequestHandler.java index 1951dcd53ce3c..a09b2947e62e7 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/server/ChunkFetchRequestHandler.java +++ b/common/network-common/src/main/java/org/apache/spark/network/server/ChunkFetchRequestHandler.java @@ -73,8 +73,9 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E } @Override - protected void channelRead0(ChannelHandlerContext ctx, final ChunkFetchRequest msg) - throws Exception { + protected void channelRead0( + ChannelHandlerContext ctx, + final ChunkFetchRequest msg) throws Exception { Channel channel = ctx.channel(); if (logger.isTraceEnabled()) { logger.trace("Received req from {} to fetch block {}", getRemoteAddress(channel), @@ -118,8 +119,9 @@ protected void channelRead0(ChannelHandlerContext ctx, final ChunkFetchRequest m * default EventLoopGroup, thus making sure that we can reserve some threads in * TransportServer's default EventLoopGroup for handling other RPC messages. */ - private ChannelFuture respond(final Channel channel, - final Encodable result) throws InterruptedException { + private ChannelFuture respond( + final Channel channel, + final Encodable result) throws InterruptedException { final SocketAddress remoteAddress = channel.remoteAddress(); return channel.writeAndFlush(result).await().addListener((ChannelFutureListener) future -> { if (future.isSuccess()) { diff --git a/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java b/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java index 6130df48c4ac6..24a05749004eb 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java +++ b/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java @@ -25,14 +25,6 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; -import org.apache.spark.network.protocol.OneWayMessage; -import org.apache.spark.network.protocol.RpcFailure; -import org.apache.spark.network.protocol.RpcRequest; -import org.apache.spark.network.protocol.RpcResponse; -import org.apache.spark.network.protocol.StreamFailure; -import org.apache.spark.network.protocol.StreamRequest; -import org.apache.spark.network.protocol.StreamResponse; -import org.apache.spark.network.protocol.UploadStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,6 +33,7 @@ import org.apache.spark.network.client.*; import org.apache.spark.network.protocol.*; import org.apache.spark.network.util.TransportFrameDecoder; + import static org.apache.spark.network.util.NettyUtils.getRemoteAddress; /** diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java index 432246f7a6cdc..67aea53e35928 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java +++ b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java @@ -320,8 +320,8 @@ public int chunkFetchHandlerThreads() { return 0; } int chunkFetchHandlerThreadsPercent = - conf.getInt("spark.shuffle.server.chunkFetchHandlerThreadsPercent", 0); + conf.getInt("spark.shuffle.server.chunkFetchHandlerThreadsPercent", 0); return this.serverThreads() > 0 ? (this.serverThreads() * chunkFetchHandlerThreadsPercent)/100: - (2 * NettyRuntime.availableProcessors() * chunkFetchHandlerThreadsPercent)/100; + (2 * NettyRuntime.availableProcessors() * chunkFetchHandlerThreadsPercent)/100; } } From e7b47e9c37e42e8de251f9f91d9f85428ea7df73 Mon Sep 17 00:00:00 2001 From: Sanket Chintapalli Date: Wed, 19 Sep 2018 10:41:46 -0400 Subject: [PATCH 15/24] nit indentation --- .../apache/spark/network/TransportRequestHandlerSuite.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/common/network-common/src/test/java/org/apache/spark/network/TransportRequestHandlerSuite.java b/common/network-common/src/test/java/org/apache/spark/network/TransportRequestHandlerSuite.java index bf6a03f9ed0f4..ad640415a8e6d 100644 --- a/common/network-common/src/test/java/org/apache/spark/network/TransportRequestHandlerSuite.java +++ b/common/network-common/src/test/java/org/apache/spark/network/TransportRequestHandlerSuite.java @@ -69,14 +69,14 @@ public void handleStreamRequest() throws Exception { assert responseAndPromisePairs.size() == 1; assert responseAndPromisePairs.get(0).getLeft() instanceof StreamResponse; assert ((StreamResponse) (responseAndPromisePairs.get(0).getLeft())).body() == - managedBuffers.get(0); + managedBuffers.get(0); RequestMessage request1 = new StreamRequest(String.format("%d_%d", streamId, 1)); requestHandler.handle(request1); assert responseAndPromisePairs.size() == 2; assert responseAndPromisePairs.get(1).getLeft() instanceof StreamResponse; assert ((StreamResponse) (responseAndPromisePairs.get(1).getLeft())).body() == - managedBuffers.get(1); + managedBuffers.get(1); // Finish flushing the response for request0. responseAndPromisePairs.get(0).getRight().finish(true); From 1ea0655c9001fb8cd257ee5d677d4324658dfe28 Mon Sep 17 00:00:00 2001 From: Sanket Chintapalli Date: Wed, 19 Sep 2018 13:11:03 -0400 Subject: [PATCH 16/24] move the client isClient logic to TransportContext --- .../apache/spark/network/TransportContext.java | 16 +++++++++++++--- .../apache/spark/network/util/TransportConf.java | 15 --------------- .../org/apache/spark/storage/BlockManager.scala | 1 - 3 files changed, 13 insertions(+), 19 deletions(-) diff --git a/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java b/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java index 898728d696aa4..ff02eca1625bb 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java +++ b/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java @@ -65,6 +65,7 @@ public class TransportContext { private final TransportConf conf; private final RpcHandler rpcHandler; private final boolean closeIdleConnections; + private final boolean isClient; /** * Force to create MessageEncoder and MessageDecoder so that we can make sure they will be created @@ -87,22 +88,31 @@ public class TransportContext { private static EventLoopGroup chunkFetchWorkers; public TransportContext(TransportConf conf, RpcHandler rpcHandler) { - this(conf, rpcHandler, false); + this(conf, rpcHandler, false, false); } public TransportContext( TransportConf conf, RpcHandler rpcHandler, boolean closeIdleConnections) { + this(conf, rpcHandler, closeIdleConnections, false); + } + + public TransportContext( + TransportConf conf, + RpcHandler rpcHandler, + boolean closeIdleConnections, + boolean isClient) { this.conf = conf; this.rpcHandler = rpcHandler; this.closeIdleConnections = closeIdleConnections; + this.isClient = isClient; synchronized(TransportContext.class) { if (chunkFetchWorkers == null && conf.getModuleName() != null && conf.getModuleName().equalsIgnoreCase("shuffle") && - !conf.shuffleClient()) { + !isClient) { chunkFetchWorkers = NettyUtils.createEventLoop( IOMode.valueOf(conf.ioMode()), conf.chunkFetchHandlerThreads(), @@ -179,7 +189,7 @@ public TransportChannelHandler initializePipeline( // Use a separate EventLoopGroup to handle ChunkFetchRequest messages for shuffle rpcs. if (conf.getModuleName() != null && conf.getModuleName().equalsIgnoreCase("shuffle") - && !conf.shuffleClient()) { + && !isClient) { pipeline.addLast(chunkFetchWorkers, "chunkFetchHandler", chunkFetchHandler); } return channelHandler; diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java index 67aea53e35928..91bb6abdc2662 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java +++ b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java @@ -47,8 +47,6 @@ public class TransportConf { private final String module; - private boolean isShuffleClient; - public TransportConf(String module, ConfigProvider conf) { this.module = module; this.conf = conf; @@ -285,19 +283,6 @@ public long maxChunksBeingTransferred() { return conf.getLong("spark.shuffle.maxChunksBeingTransferred", Long.MAX_VALUE); } - /** - * Check if it is a shuffle client - * and avoid creating unnecessary event loops - * in the TransportChannelHandler - */ - public boolean shuffleClient() { - return this.isShuffleClient; - } - - public void setShuffleClient(boolean isShuffleClient) { - this.isShuffleClient = isShuffleClient; - } - /** * Percentage of io.serverThreads used by netty to process ChunkFetchRequest. * Shuffle server will use a separate EventLoopGroup to process ChunkFetchRequest messages. diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 43a13c20acce9..5cd21e31c9554 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -185,7 +185,6 @@ private[spark] class BlockManager( // standard BlockTransferService to directly connect to other Executors. private[spark] val shuffleClient = if (externalShuffleServiceEnabled) { val transConf = SparkTransportConf.fromSparkConf(conf, "shuffle", numUsableCores) - transConf.setShuffleClient(true); new ExternalShuffleClient(transConf, securityManager, securityManager.isAuthenticationEnabled(), conf.get(config.SHUFFLE_REGISTRATION_TIMEOUT)) } else { From fdad2721cc69732cc86eea5a65d995cb8e818162 Mon Sep 17 00:00:00 2001 From: Sanket Chintapalli Date: Wed, 19 Sep 2018 13:15:07 -0400 Subject: [PATCH 17/24] set client to true --- .../org/apache/spark/network/shuffle/ExternalShuffleClient.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java index 7ed0b6e93a7a8..5de8dbd1bbf67 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java @@ -76,7 +76,7 @@ protected void checkInit() { @Override public void init(String appId) { this.appId = appId; - TransportContext context = new TransportContext(conf, new NoOpRpcHandler(), true); + TransportContext context = new TransportContext(conf, new NoOpRpcHandler(), true, true); List bootstraps = Lists.newArrayList(); if (authEnabled) { bootstraps.add(new AuthClientBootstrap(conf, appId, secretKeyHolder)); From 5d4b477595d3c2df5722ae5cb042a16c2f53fc9b Mon Sep 17 00:00:00 2001 From: Sanket Chintapalli Date: Wed, 19 Sep 2018 15:09:00 -0400 Subject: [PATCH 18/24] Fix nits --- .../spark/network/TransportContext.java | 2 +- .../server/ChunkFetchRequestHandler.java | 1 - .../ChunkFetchRequestHandlerSuite.java | 28 +++++++++---------- 3 files changed, 15 insertions(+), 16 deletions(-) diff --git a/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java b/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java index ff02eca1625bb..746e9bc6a59c8 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java +++ b/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java @@ -219,7 +219,7 @@ private TransportChannelHandler createChannelHandler(Channel channel, RpcHandler private ChunkFetchRequestHandler createChunkFetchHandler(TransportChannelHandler channelHandler, RpcHandler rpcHandler) { return new ChunkFetchRequestHandler(channelHandler.getClient(), - rpcHandler.getStreamManager(), conf.maxChunksBeingTransferred()); + rpcHandler.getStreamManager(), conf.maxChunksBeingTransferred()); } public TransportConf getConf() { return conf; } diff --git a/common/network-common/src/main/java/org/apache/spark/network/server/ChunkFetchRequestHandler.java b/common/network-common/src/main/java/org/apache/spark/network/server/ChunkFetchRequestHandler.java index a09b2947e62e7..f08d8b0f984cf 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/server/ChunkFetchRequestHandler.java +++ b/common/network-common/src/main/java/org/apache/spark/network/server/ChunkFetchRequestHandler.java @@ -37,7 +37,6 @@ import static org.apache.spark.network.util.NettyUtils.*; - /** * A dedicated ChannelHandler for processing ChunkFetchRequest messages. When sending response * of ChunkFetchRequest messages to the clients, the thread performing the I/O on the underlying diff --git a/common/network-common/src/test/java/org/apache/spark/network/ChunkFetchRequestHandlerSuite.java b/common/network-common/src/test/java/org/apache/spark/network/ChunkFetchRequestHandlerSuite.java index 9dfea19e9e7b1..2c72c53a33ae8 100644 --- a/common/network-common/src/test/java/org/apache/spark/network/ChunkFetchRequestHandlerSuite.java +++ b/common/network-common/src/test/java/org/apache/spark/network/ChunkFetchRequestHandlerSuite.java @@ -45,18 +45,18 @@ public void handleChunkFetchRequest() throws Exception { Channel channel = mock(Channel.class); ChannelHandlerContext context = mock(ChannelHandlerContext.class); when(context.channel()) - .thenAnswer(invocationOnMock0 -> { - return channel; - }); + .thenAnswer(invocationOnMock0 -> { + return channel; + }); List> responseAndPromisePairs = - new ArrayList<>(); + new ArrayList<>(); when(channel.writeAndFlush(any())) - .thenAnswer(invocationOnMock0 -> { - Object response = invocationOnMock0.getArguments()[0]; - ExtendedChannelPromise channelFuture = new ExtendedChannelPromise(channel); - responseAndPromisePairs.add(ImmutablePair.of(response, channelFuture)); - return channelFuture; - }); + .thenAnswer(invocationOnMock0 -> { + Object response = invocationOnMock0.getArguments()[0]; + ExtendedChannelPromise channelFuture = new ExtendedChannelPromise(channel); + responseAndPromisePairs.add(ImmutablePair.of(response, channelFuture)); + return channelFuture; + }); // Prepare the stream. List managedBuffers = new ArrayList<>(); @@ -68,21 +68,21 @@ public void handleChunkFetchRequest() throws Exception { streamManager.registerChannel(channel, streamId); TransportClient reverseClient = mock(TransportClient.class); ChunkFetchRequestHandler requestHandler = new ChunkFetchRequestHandler(reverseClient, - rpcHandler.getStreamManager(), 2L); + rpcHandler.getStreamManager(), 2L); RequestMessage request0 = new ChunkFetchRequest(new StreamChunkId(streamId, 0)); requestHandler.channelRead(context, request0); assert responseAndPromisePairs.size() == 1; assert responseAndPromisePairs.get(0).getLeft() instanceof ChunkFetchSuccess; assert ((ChunkFetchSuccess) (responseAndPromisePairs.get(0).getLeft())).body() == - managedBuffers.get(0); + managedBuffers.get(0); RequestMessage request1 = new ChunkFetchRequest(new StreamChunkId(streamId, 1)); requestHandler.channelRead(context, request1); assert responseAndPromisePairs.size() == 2; assert responseAndPromisePairs.get(1).getLeft() instanceof ChunkFetchSuccess; assert ((ChunkFetchSuccess) (responseAndPromisePairs.get(1).getLeft())).body() == - managedBuffers.get(1); + managedBuffers.get(1); // Finish flushing the response for request0. responseAndPromisePairs.get(0).getRight().finish(true); @@ -92,7 +92,7 @@ public void handleChunkFetchRequest() throws Exception { assert responseAndPromisePairs.size() == 3; assert responseAndPromisePairs.get(2).getLeft() instanceof ChunkFetchSuccess; assert ((ChunkFetchSuccess) (responseAndPromisePairs.get(2).getLeft())).body() == - managedBuffers.get(2); + managedBuffers.get(2); RequestMessage request3 = new ChunkFetchRequest(new StreamChunkId(streamId, 3)); requestHandler.channelRead(context, request3); From 9131b005bc37a400520f7d15675a921dc32aae41 Mon Sep 17 00:00:00 2001 From: Sanket Chintapalli Date: Wed, 19 Sep 2018 15:45:12 -0400 Subject: [PATCH 19/24] Add more documentation --- .../spark/network/TransportContext.java | 21 ++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java b/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java index 746e9bc6a59c8..a34deb3d5824e 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java +++ b/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java @@ -65,7 +65,7 @@ public class TransportContext { private final TransportConf conf; private final RpcHandler rpcHandler; private final boolean closeIdleConnections; - private final boolean isClient; + private final boolean isClientOnly; /** * Force to create MessageEncoder and MessageDecoder so that we can make sure they will be created @@ -98,21 +98,32 @@ public TransportContext( this(conf, rpcHandler, closeIdleConnections, false); } + /** + * + * @param conf TransportConf + * @param rpcHandler RpcHandler responsible for handling requests and responses. + * @param closeIdleConnections Close idle connections if is set to true. + * @param isClientOnly This config is more important when external shuffle is enabled. + * It stops creating extra event loop and subsequent thread pool + * for shuffle clients to handle chunked fetch requests. + * In the case when external shuffle is disabled, the executors are both + * client and server so both share the same event loop which is trivial. + */ public TransportContext( TransportConf conf, RpcHandler rpcHandler, boolean closeIdleConnections, - boolean isClient) { + boolean isClientOnly) { this.conf = conf; this.rpcHandler = rpcHandler; this.closeIdleConnections = closeIdleConnections; - this.isClient = isClient; + this.isClientOnly = isClientOnly; synchronized(TransportContext.class) { if (chunkFetchWorkers == null && conf.getModuleName() != null && conf.getModuleName().equalsIgnoreCase("shuffle") && - !isClient) { + !isClientOnly) { chunkFetchWorkers = NettyUtils.createEventLoop( IOMode.valueOf(conf.ioMode()), conf.chunkFetchHandlerThreads(), @@ -189,7 +200,7 @@ public TransportChannelHandler initializePipeline( // Use a separate EventLoopGroup to handle ChunkFetchRequest messages for shuffle rpcs. if (conf.getModuleName() != null && conf.getModuleName().equalsIgnoreCase("shuffle") - && !isClient) { + && !isClientOnly) { pipeline.addLast(chunkFetchWorkers, "chunkFetchHandler", chunkFetchHandler); } return channelHandler; From 574ba81abf3e1d71bac84a83b80e4b67056f7442 Mon Sep 17 00:00:00 2001 From: Sanket Chintapalli Date: Wed, 19 Sep 2018 15:47:01 -0400 Subject: [PATCH 20/24] nit --- .../main/java/org/apache/spark/network/TransportContext.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java b/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java index a34deb3d5824e..f9b0ec7facac9 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java +++ b/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java @@ -102,7 +102,7 @@ public TransportContext( * * @param conf TransportConf * @param rpcHandler RpcHandler responsible for handling requests and responses. - * @param closeIdleConnections Close idle connections if is set to true. + * @param closeIdleConnections Close idle connections if it is set to true. * @param isClientOnly This config is more important when external shuffle is enabled. * It stops creating extra event loop and subsequent thread pool * for shuffle clients to handle chunked fetch requests. From 40cfbed70bd51e30ac451cb2204f34c7105fa15f Mon Sep 17 00:00:00 2001 From: Sanket Chintapalli Date: Wed, 19 Sep 2018 16:51:40 -0400 Subject: [PATCH 21/24] nit java docs --- .../spark/network/TransportContext.java | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java b/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java index f9b0ec7facac9..e8b9079b6b7a1 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java +++ b/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java @@ -98,17 +98,17 @@ public TransportContext( this(conf, rpcHandler, closeIdleConnections, false); } - /** - * - * @param conf TransportConf - * @param rpcHandler RpcHandler responsible for handling requests and responses. - * @param closeIdleConnections Close idle connections if it is set to true. - * @param isClientOnly This config is more important when external shuffle is enabled. - * It stops creating extra event loop and subsequent thread pool - * for shuffle clients to handle chunked fetch requests. - * In the case when external shuffle is disabled, the executors are both - * client and server so both share the same event loop which is trivial. - */ + /** + * + * @param conf TransportConf + * @param rpcHandler RpcHandler responsible for handling requests and responses. + * @param closeIdleConnections Close idle connections if it is set to true. + * @param isClientOnly This config is more important when external shuffle is enabled. + * It stops creating extra event loop and subsequent thread pool + * for shuffle clients to handle chunked fetch requests. + * In the case when external shuffle is disabled, the executors are both + * client and server so both share the same event loop which is trivial. + */ public TransportContext( TransportConf conf, RpcHandler rpcHandler, From 60b93477cbe2ef00ad4bf262fb9ea246f687b8f7 Mon Sep 17 00:00:00 2001 From: Sanket Chintapalli Date: Wed, 19 Sep 2018 16:55:40 -0400 Subject: [PATCH 22/24] reword --- .../main/java/org/apache/spark/network/TransportContext.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java b/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java index e8b9079b6b7a1..5a56ba6a8a56d 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java +++ b/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java @@ -103,11 +103,10 @@ public TransportContext( * @param conf TransportConf * @param rpcHandler RpcHandler responsible for handling requests and responses. * @param closeIdleConnections Close idle connections if it is set to true. - * @param isClientOnly This config is more important when external shuffle is enabled. + * @param isClientOnly This config indicates the TransportContext is only used by a client. + * This config is more important when external shuffle is enabled. * It stops creating extra event loop and subsequent thread pool * for shuffle clients to handle chunked fetch requests. - * In the case when external shuffle is disabled, the executors are both - * client and server so both share the same event loop which is trivial. */ public TransportContext( TransportConf conf, From 4c5d6f1c2e4f87e74de07435eb4bb196bd150cd0 Mon Sep 17 00:00:00 2001 From: Sanket Chintapalli Date: Wed, 19 Sep 2018 18:35:20 -0400 Subject: [PATCH 23/24] use await instead of sync for test --- .../java/org/apache/spark/network/ExtendedChannelPromise.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/network-common/src/test/java/org/apache/spark/network/ExtendedChannelPromise.java b/common/network-common/src/test/java/org/apache/spark/network/ExtendedChannelPromise.java index 4d522c3ea7425..573ffd627a2e7 100644 --- a/common/network-common/src/test/java/org/apache/spark/network/ExtendedChannelPromise.java +++ b/common/network-common/src/test/java/org/apache/spark/network/ExtendedChannelPromise.java @@ -52,7 +52,7 @@ public boolean isSuccess() { } @Override - public ChannelPromise sync() throws InterruptedException { + public ChannelPromise await() throws InterruptedException { return this; } From 0348ec8d5570aab9d744043a3d6a88950f4aeb5c Mon Sep 17 00:00:00 2001 From: Sanket Chintapalli Date: Wed, 19 Sep 2018 19:23:09 -0400 Subject: [PATCH 24/24] nit java doc --- .../src/main/java/org/apache/spark/network/TransportContext.java | 1 + 1 file changed, 1 insertion(+) diff --git a/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java b/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java index 5a56ba6a8a56d..480b52652de53 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java +++ b/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java @@ -99,6 +99,7 @@ public TransportContext( } /** + * Enables TransportContext initialization for underlying client and server. * * @param conf TransportConf * @param rpcHandler RpcHandler responsible for handling requests and responses.