From a0d3530aa137290bf2d690339bf028f272c12129 Mon Sep 17 00:00:00 2001 From: uce Date: Fri, 13 Jun 2014 10:41:32 +0200 Subject: [PATCH 1/2] [FLINK-930] Netty Initialization is sometimes very slow --- .../minicluster/NepheleMiniCluster.java | 2 +- stratosphere-runtime/pom.xml | 2 +- .../stratosphere/nephele/ExecutionMode.java | 18 ++ .../instance/local/LocalInstanceManager.java | 6 +- .../nephele/jobmanager/JobManager.java | 4 +- .../nephele/jobmanager/JobManagerUtils.java | 2 +- .../nephele/taskmanager/TaskManager.java | 58 ++++-- .../runtime/io/network/ChannelManager.java | 27 +-- .../io/network/LocalConnectionManager.java | 31 +++ .../io/network/NetworkConnectionManager.java | 25 +++ ...er.java => InboundEnvelopeDispatcher.java} | 18 +- .../network/netty/NettyConnectionManager.java | 121 +++++++----- .../netty/OutboundConnectionQueue.java | 13 +- .../local/LocalInstanceManagerTest.java | 3 +- .../nephele/jobmanager/JobManagerITCase.java | 2 +- .../netty/NettyConnectionManagerTest.java | 185 ++++++++---------- 16 files changed, 311 insertions(+), 206 deletions(-) create mode 100644 stratosphere-runtime/src/main/java/eu/stratosphere/nephele/ExecutionMode.java create mode 100644 stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/LocalConnectionManager.java create mode 100644 stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/NetworkConnectionManager.java rename stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/{InboundEnvelopeDispatcherHandler.java => InboundEnvelopeDispatcher.java} (66%) diff --git a/stratosphere-clients/src/main/java/eu/stratosphere/client/minicluster/NepheleMiniCluster.java b/stratosphere-clients/src/main/java/eu/stratosphere/client/minicluster/NepheleMiniCluster.java index f44a47ea6dbf7..79e5c64bfb658 100644 --- a/stratosphere-clients/src/main/java/eu/stratosphere/client/minicluster/NepheleMiniCluster.java +++ b/stratosphere-clients/src/main/java/eu/stratosphere/client/minicluster/NepheleMiniCluster.java @@ -15,6 +15,7 @@ import java.lang.reflect.Method; +import eu.stratosphere.nephele.ExecutionMode; import eu.stratosphere.nephele.instance.HardwareDescriptionFactory; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -27,7 +28,6 @@ import eu.stratosphere.nephele.client.JobClient; import eu.stratosphere.nephele.jobgraph.JobGraph; import eu.stratosphere.nephele.jobmanager.JobManager; -import eu.stratosphere.nephele.jobmanager.JobManager.ExecutionMode; public class NepheleMiniCluster { diff --git a/stratosphere-runtime/pom.xml b/stratosphere-runtime/pom.xml index 074725b579743..7d196928daaf0 100644 --- a/stratosphere-runtime/pom.xml +++ b/stratosphere-runtime/pom.xml @@ -56,7 +56,7 @@ io.netty netty-all - 4.0.19.Final + 4.0.20.Final diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/ExecutionMode.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/ExecutionMode.java new file mode 100644 index 0000000000000..68ab65bf95545 --- /dev/null +++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/ExecutionMode.java @@ -0,0 +1,18 @@ +/*********************************************************************************************************************** + * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu) + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + **********************************************************************************************************************/ + +package eu.stratosphere.nephele; + +public enum ExecutionMode { + LOCAL, CLUSTER +} diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/local/LocalInstanceManager.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/local/LocalInstanceManager.java index 2bb344f29f03d..e888b3f0eeae1 100644 --- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/local/LocalInstanceManager.java +++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/local/LocalInstanceManager.java @@ -20,6 +20,7 @@ import java.util.ArrayList; import java.util.Iterator; +import eu.stratosphere.nephele.ExecutionMode; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -150,6 +151,9 @@ public LocalInstanceManager() throws Exception { numTaskManagers = GlobalConfiguration.getInteger(ConfigConstants .LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 1); + + ExecutionMode executionMode = (numTaskManagers > 1) ? ExecutionMode.CLUSTER : ExecutionMode.LOCAL; + for(int i=0; i< numTaskManagers; i++){ Configuration tm = new Configuration(); @@ -163,7 +167,7 @@ public LocalInstanceManager() throws Exception { GlobalConfiguration.includeConfiguration(tm); - TaskManager t = new TaskManager(); + TaskManager t = new TaskManager(executionMode); taskManagers.add(t); } } diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/JobManager.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/JobManager.java index 3ae9f3bb028a4..5e92dd15b608d 100644 --- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/JobManager.java +++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/JobManager.java @@ -31,6 +31,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import eu.stratosphere.nephele.ExecutionMode; import eu.stratosphere.nephele.managementgraph.ManagementVertexID; import eu.stratosphere.nephele.taskmanager.TaskKillResult; import org.apache.commons.cli.CommandLine; @@ -123,9 +124,6 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol, InputSplitProviderProtocol, JobManagerProtocol, ChannelLookupProtocol, JobStatusListener, AccumulatorProtocol { - public static enum ExecutionMode { LOCAL, CLUSTER } - - // -------------------------------------------------------------------------------------------- private static final Log LOG = LogFactory.getLog(JobManager.class); diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/JobManagerUtils.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/JobManagerUtils.java index 61cc22fda0a10..c576f204a1a63 100644 --- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/JobManagerUtils.java +++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/JobManagerUtils.java @@ -16,11 +16,11 @@ import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; +import eu.stratosphere.nephele.ExecutionMode; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import eu.stratosphere.nephele.instance.InstanceManager; -import eu.stratosphere.nephele.jobmanager.JobManager.ExecutionMode; import eu.stratosphere.nephele.jobmanager.scheduler.AbstractScheduler; import eu.stratosphere.util.StringUtils; diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java index 9b5fdf623ae58..b6307ebad142f 100644 --- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java +++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java @@ -38,6 +38,10 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import eu.stratosphere.nephele.ExecutionMode; +import eu.stratosphere.runtime.io.network.LocalConnectionManager; +import eu.stratosphere.runtime.io.network.NetworkConnectionManager; +import eu.stratosphere.runtime.io.network.netty.NettyConnectionManager; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.GnuParser; @@ -155,10 +159,11 @@ public class TaskManager implements TaskOperationProtocol { * receive an initial configuration. All parameters are obtained from the * {@link GlobalConfiguration}, which must be loaded prior to instantiating the task manager. */ - public TaskManager() throws Exception { + public TaskManager(ExecutionMode executionMode) throws Exception { LOG.info("TaskManager started as user " + UserGroupInformation.getCurrentUser().getShortUserName()); LOG.info("User system property: " + System.getProperty("user.name")); + LOG.info("Execution mode: " + executionMode); // IMPORTANT! At this point, the GlobalConfiguration must have been read! @@ -286,27 +291,40 @@ public TaskManager() throws Exception { ConfigConstants.TASK_MANAGER_NETWORK_BUFFER_SIZE_KEY, ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_BUFFER_SIZE); - int numInThreads = GlobalConfiguration.getInteger( - ConfigConstants.TASK_MANAGER_NET_NUM_IN_THREADS_KEY, - ConfigConstants.DEFAULT_TASK_MANAGER_NET_NUM_IN_THREADS); - int numOutThreads = GlobalConfiguration.getInteger( - ConfigConstants.TASK_MANAGER_NET_NUM_OUT_THREADS_KEY, - ConfigConstants.DEFAULT_TASK_MANAGER_NET_NUM_OUT_THREADS); - - int lowWaterMark = GlobalConfiguration.getInteger( - ConfigConstants.TASK_MANAGER_NET_NETTY_LOW_WATER_MARK, - ConfigConstants.DEFAULT_TASK_MANAGER_NET_NETTY_LOW_WATER_MARK); - - int highWaterMark = GlobalConfiguration.getInteger( - ConfigConstants.TASK_MANAGER_NET_NETTY_HIGH_WATER_MARK, - ConfigConstants.DEFAULT_TASK_MANAGER_NET_NETTY_HIGH_WATER_MARK); // Initialize the channel manager try { - this.channelManager = new ChannelManager( - this.lookupService, this.localInstanceConnectionInfo, - numBuffers, bufferSize, numInThreads, numOutThreads, lowWaterMark, highWaterMark); + NetworkConnectionManager networkConnectionManager = null; + + switch (executionMode) { + case LOCAL: + networkConnectionManager = new LocalConnectionManager(); + break; + case CLUSTER: + int numInThreads = GlobalConfiguration.getInteger( + ConfigConstants.TASK_MANAGER_NET_NUM_IN_THREADS_KEY, + ConfigConstants.DEFAULT_TASK_MANAGER_NET_NUM_IN_THREADS); + + int numOutThreads = GlobalConfiguration.getInteger( + ConfigConstants.TASK_MANAGER_NET_NUM_OUT_THREADS_KEY, + ConfigConstants.DEFAULT_TASK_MANAGER_NET_NUM_OUT_THREADS); + + int lowWaterMark = GlobalConfiguration.getInteger( + ConfigConstants.TASK_MANAGER_NET_NETTY_LOW_WATER_MARK, + ConfigConstants.DEFAULT_TASK_MANAGER_NET_NETTY_LOW_WATER_MARK); + + int highWaterMark = GlobalConfiguration.getInteger( + ConfigConstants.TASK_MANAGER_NET_NETTY_HIGH_WATER_MARK, + ConfigConstants.DEFAULT_TASK_MANAGER_NET_NETTY_HIGH_WATER_MARK); + + networkConnectionManager = new NettyConnectionManager( + localInstanceConnectionInfo.address(), localInstanceConnectionInfo.dataPort(), + bufferSize, numInThreads, numOutThreads, lowWaterMark, highWaterMark); + break; + } + + channelManager = new ChannelManager(lookupService, localInstanceConnectionInfo, numBuffers, bufferSize, networkConnectionManager); } catch (IOException ioe) { LOG.error(StringUtils.stringifyException(ioe)); throw new Exception("Failed to instantiate channel manager. " + ioe.getMessage(), ioe); @@ -436,7 +454,7 @@ public static void main(String[] args) throws IOException { // Create a new task manager object try { - new TaskManager(); + new TaskManager(ExecutionMode.CLUSTER); } catch (Exception e) { LOG.fatal("Taskmanager startup failed: " + e.getMessage(), e); System.exit(FAILURE_RETURN_CODE); @@ -910,7 +928,7 @@ public void shutdown() { this.profiler.shutdown(); } - // Shut down the network channel manager + // Shut down the channel manager this.channelManager.shutdown(); // Shut down the memory manager diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/ChannelManager.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/ChannelManager.java index 7b01f4e4114e9..0e9edee513608 100644 --- a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/ChannelManager.java +++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/ChannelManager.java @@ -37,7 +37,6 @@ import eu.stratosphere.runtime.io.gates.GateID; import eu.stratosphere.runtime.io.gates.InputGate; import eu.stratosphere.runtime.io.gates.OutputGate; -import eu.stratosphere.runtime.io.network.netty.NettyConnectionManager; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -66,7 +65,7 @@ public class ChannelManager implements EnvelopeDispatcher, BufferProviderBroker private final GlobalBufferPool globalBufferPool; - private final NettyConnectionManager nettyConnectionManager; + private final NetworkConnectionManager networkConnectionManager; private final InetSocketAddress ourAddress; @@ -75,18 +74,15 @@ public class ChannelManager implements EnvelopeDispatcher, BufferProviderBroker // ----------------------------------------------------------------------------------------------------------------- public ChannelManager(ChannelLookupProtocol channelLookupService, InstanceConnectionInfo connectionInfo, - int numNetworkBuffers, int networkBufferSize, - int numInThreads, int numOutThreads, - int lowWatermark, int highWaterMark) throws IOException { + int numNetworkBuffers, int networkBufferSize, NetworkConnectionManager networkConnectionManager) throws IOException { this.channelLookupService = channelLookupService; this.connectionInfo = connectionInfo; this.globalBufferPool = new GlobalBufferPool(numNetworkBuffers, networkBufferSize); - this.nettyConnectionManager = new NettyConnectionManager( - this, connectionInfo.address(), connectionInfo.dataPort(), - networkBufferSize, numInThreads, numOutThreads, lowWatermark, highWaterMark); + this.networkConnectionManager = networkConnectionManager; + networkConnectionManager.start(this); // management data structures this.channels = new ConcurrentHashMap(); @@ -99,8 +95,13 @@ public ChannelManager(ChannelLookupProtocol channelLookupService, InstanceConnec this.discardBufferPool = new DiscardBufferPool(); } - public void shutdown() { - this.nettyConnectionManager.shutdown(); + public void shutdown() { + try { + this.networkConnectionManager.shutdown(); + } catch (IOException e) { + LOG.warn("NetworkConnectionManager did not shutdown properly."); + } + this.globalBufferPool.destroy(); } @@ -324,7 +325,7 @@ private void generateSenderHint(Envelope envelope, RemoteReceiver receiver) thro final RemoteReceiver ourAddress = new RemoteReceiver(this.ourAddress, connectionIndex); final Envelope senderHint = SenderHintEvent.createEnvelopeWithEvent(envelope, targetChannelID, ourAddress); - this.nettyConnectionManager.enqueue(senderHint, receiver); + this.networkConnectionManager.enqueue(senderHint, receiver); } /** @@ -459,7 +460,7 @@ else if (receiverList.hasRemoteReceiver()) { generateSenderHint(envelope, remoteReceiver); } - this.nettyConnectionManager.enqueue(envelope, remoteReceiver); + this.networkConnectionManager.enqueue(envelope, remoteReceiver); success = true; } } finally { @@ -507,7 +508,7 @@ else if (receiverList.hasRemoteReceiver()) { generateSenderHint(envelope, remoteReceiver); } - this.nettyConnectionManager.enqueue(envelope, remoteReceiver); + this.networkConnectionManager.enqueue(envelope, remoteReceiver); } } diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/LocalConnectionManager.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/LocalConnectionManager.java new file mode 100644 index 0000000000000..85cbbd2a2c88a --- /dev/null +++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/LocalConnectionManager.java @@ -0,0 +1,31 @@ +/*********************************************************************************************************************** + * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu) + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + **********************************************************************************************************************/ + +package eu.stratosphere.runtime.io.network; + +import java.io.IOException; + +public class LocalConnectionManager implements NetworkConnectionManager { + + @Override + public void start(ChannelManager channelManager) throws IOException { + } + + @Override + public void enqueue(Envelope envelope, RemoteReceiver receiver) throws IOException { + } + + @Override + public void shutdown() throws IOException { + } +} diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/NetworkConnectionManager.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/NetworkConnectionManager.java new file mode 100644 index 0000000000000..d2d252dc72f3f --- /dev/null +++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/NetworkConnectionManager.java @@ -0,0 +1,25 @@ +/*********************************************************************************************************************** + * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu) + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + **********************************************************************************************************************/ + +package eu.stratosphere.runtime.io.network; + +import java.io.IOException; + +public interface NetworkConnectionManager { + + public void start(ChannelManager channelManager) throws IOException; + + public void enqueue(Envelope envelope, RemoteReceiver receiver) throws IOException; + + public void shutdown() throws IOException; +} diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/InboundEnvelopeDispatcherHandler.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/InboundEnvelopeDispatcher.java similarity index 66% rename from stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/InboundEnvelopeDispatcherHandler.java rename to stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/InboundEnvelopeDispatcher.java index d0270b6d47f3e..1c360c17fff4b 100644 --- a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/InboundEnvelopeDispatcherHandler.java +++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/InboundEnvelopeDispatcher.java @@ -17,25 +17,19 @@ import eu.stratosphere.runtime.io.network.EnvelopeDispatcher; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -public class InboundEnvelopeDispatcherHandler extends ChannelInboundHandlerAdapter { +public class InboundEnvelopeDispatcher extends ChannelInboundHandlerAdapter { - private static final Log LOG = LogFactory.getLog(InboundEnvelopeDispatcherHandler.class); + private final EnvelopeDispatcher envelopeDispatcher; - private final EnvelopeDispatcher channelManager; - - public InboundEnvelopeDispatcherHandler(EnvelopeDispatcher channelManager) { - this.channelManager = channelManager; + public InboundEnvelopeDispatcher(EnvelopeDispatcher envelopeDispatcher) { + this.envelopeDispatcher = envelopeDispatcher; } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { Envelope envelope = (Envelope) msg; -// LOG.debug(String.format("Decoded envelope with seq num %d from source channel %s", -// envelope.getSequenceNumber(), -// envelope.getSource())); - this.channelManager.dispatchFromNetwork(envelope); + + envelopeDispatcher.dispatchFromNetwork(envelope); } } diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/NettyConnectionManager.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/NettyConnectionManager.java index 4e3f0437c2baf..909c80ccb7fd3 100644 --- a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/NettyConnectionManager.java +++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/NettyConnectionManager.java @@ -15,7 +15,10 @@ import eu.stratosphere.runtime.io.network.ChannelManager; import eu.stratosphere.runtime.io.network.Envelope; +import eu.stratosphere.runtime.io.network.EnvelopeDispatcher; +import eu.stratosphere.runtime.io.network.NetworkConnectionManager; import eu.stratosphere.runtime.io.network.RemoteReceiver; +import eu.stratosphere.runtime.io.network.bufferprovider.BufferProviderBroker; import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.PooledByteBufAllocator; @@ -28,7 +31,6 @@ import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; -import io.netty.util.concurrent.Future; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -39,50 +41,70 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -public class NettyConnectionManager { +public class NettyConnectionManager implements NetworkConnectionManager { private static final Log LOG = LogFactory.getLog(NettyConnectionManager.class); private static final int DEBUG_PRINT_QUEUED_ENVELOPES_EVERY_MS = 10000; - private final ChannelManager channelManager; + private final ConcurrentMap outConnections = new ConcurrentHashMap(); - private final ServerBootstrap in; + private final InetAddress bindAddress; - private final Bootstrap out; + private final int bindPort; - private final ConcurrentMap outConnections; + private final int bufferSize; - public NettyConnectionManager(ChannelManager channelManager, InetAddress bindAddress, int bindPort, - int bufferSize, int numInThreads, int numOutThreads, - int lowWaterMark, int highWaterMark) { - this.outConnections = new ConcurrentHashMap(); - this.channelManager = channelManager; + private final int numInThreads; - // -------------------------------------------------------------------- + private final int numOutThreads; + + private final int lowWaterMark; + + private final int highWaterMark; + + private ServerBootstrap in; + + private Bootstrap out; + + public NettyConnectionManager(InetAddress bindAddress, int bindPort, int bufferSize, int numInThreads, + int numOutThreads, int lowWaterMark, int highWaterMark) { + + this.bindAddress = bindAddress; + this.bindPort = bindPort; + + this.bufferSize = bufferSize; int defaultNumThreads = Math.max(Runtime.getRuntime().availableProcessors() / 4, 1); - numInThreads = (numInThreads == -1) ? defaultNumThreads : numInThreads; - numOutThreads = (numOutThreads == -1) ? defaultNumThreads : numOutThreads; - LOG.info(String.format("Starting with %d incoming and %d outgoing connection threads.", numInThreads, numOutThreads)); - lowWaterMark = (lowWaterMark == -1) ? bufferSize / 2 : lowWaterMark; - highWaterMark = (highWaterMark == -1) ? bufferSize : highWaterMark; + this.numInThreads = (numInThreads == -1) ? defaultNumThreads : numInThreads; + this.numOutThreads = (numOutThreads == -1) ? defaultNumThreads : numOutThreads; + + this.lowWaterMark = (lowWaterMark == -1) ? bufferSize / 2 : lowWaterMark; + this.highWaterMark = (highWaterMark == -1) ? bufferSize : highWaterMark; + } + + @Override + public void start(ChannelManager channelManager) throws IOException { + LOG.info(String.format("Starting with %d incoming and %d outgoing connection threads.", numInThreads, numOutThreads)); LOG.info(String.format("Setting low water mark to %d and high water mark to %d bytes.", lowWaterMark, highWaterMark)); + final BufferProviderBroker bufferProviderBroker = channelManager; + final EnvelopeDispatcher envelopeDispatcher = channelManager; + // -------------------------------------------------------------------- // server bootstrap (incoming connections) // -------------------------------------------------------------------- - this.in = new ServerBootstrap(); - this.in.group(new NioEventLoopGroup(numInThreads)) + in = new ServerBootstrap(); + in.group(new NioEventLoopGroup(numInThreads)) .channel(NioServerSocketChannel.class) .localAddress(bindAddress, bindPort) .childHandler(new ChannelInitializer() { @Override public void initChannel(SocketChannel channel) throws Exception { channel.pipeline() - .addLast(new InboundEnvelopeDecoder(NettyConnectionManager.this.channelManager)) - .addLast(new InboundEnvelopeDispatcherHandler(NettyConnectionManager.this.channelManager)); + .addLast(new InboundEnvelopeDecoder(bufferProviderBroker)) + .addLast(new InboundEnvelopeDispatcher(envelopeDispatcher)); } }) .option(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(bufferSize)) @@ -91,8 +113,8 @@ public void initChannel(SocketChannel channel) throws Exception { // -------------------------------------------------------------------- // client bootstrap (outgoing connections) // -------------------------------------------------------------------- - this.out = new Bootstrap(); - this.out.group(new NioEventLoopGroup(numOutThreads)) + out = new Bootstrap(); + out.group(new NioEventLoopGroup(numOutThreads)) .channel(NioSocketChannel.class) .handler(new ChannelInitializer() { @Override @@ -108,9 +130,9 @@ public void initChannel(SocketChannel channel) throws Exception { .option(ChannelOption.SO_KEEPALIVE, true); try { - this.in.bind().sync(); + in.bind().sync(); } catch (InterruptedException e) { - throw new RuntimeException("Could not bind server socket for incoming connections."); + throw new IOException("Interrupted while trying to bind server socket."); } if (LOG.isDebugEnabled()) { @@ -119,16 +141,14 @@ public void initChannel(SocketChannel channel) throws Exception { public void run() { Date date = new Date(); - while (true) { try { Thread.sleep(DEBUG_PRINT_QUEUED_ENVELOPES_EVERY_MS); date.setTime(System.currentTimeMillis()); - System.out.println(date); + System.out.println(date); System.out.println(getNonZeroNumQueuedEnvelopes()); - } catch (InterruptedException e) { e.printStackTrace(); } @@ -138,18 +158,7 @@ public void run() { } } - public void shutdown() { - Future inShutdownFuture = this.in.group().shutdownGracefully(); - Future outShutdownFuture = this.out.group().shutdownGracefully(); - - try { - inShutdownFuture.sync(); - outShutdownFuture.sync(); - } catch (InterruptedException e) { - throw new RuntimeException("Could not properly shutdown connections."); - } - } - + @Override public void enqueue(Envelope envelope, RemoteReceiver receiver) throws IOException { // Get the channel. The channel may be // 1) a channel that already exists (usual case) -> just send the data @@ -198,6 +207,29 @@ else if (old instanceof ChannelInBuildup) { channel.enqueue(envelope); } + @Override + public void shutdown() throws IOException { + if (!in.group().isShuttingDown()) { + LOG.info("Shutting down incoming connections."); + + try { + in.group().shutdownGracefully().sync(); + } catch (InterruptedException e) { + throw new RuntimeException("Interrupted while trying to shutdown incoming connections."); + } + } + + if (!out.group().isShuttingDown()) { + LOG.info("Shutting down outgoing connections."); + + try { + out.group().shutdownGracefully().sync(); + } catch (InterruptedException e) { + throw new RuntimeException("Interrupted while trying to shutdown outgoing connections."); + } + } + } + private String getNonZeroNumQueuedEnvelopes() { StringBuilder str = new StringBuilder(); @@ -211,9 +243,10 @@ private String getNonZeroNumQueuedEnvelopes() { OutboundConnectionQueue queue = (OutboundConnectionQueue) value; if (queue.getNumQueuedEnvelopes() > 0) { str.append(String.format("%s> Number of queued envelopes for %s with channel %s: %d\n", - Thread.currentThread().getId(), receiver, queue.getChannel(), queue.getNumQueuedEnvelopes())); + Thread.currentThread().getId(), receiver, queue.toString(), queue.getNumQueuedEnvelopes())); } - } else if (value instanceof ChannelInBuildup) { + } + else if (value instanceof ChannelInBuildup) { str.append(String.format("%s> Connection to %s is still in buildup\n", Thread.currentThread().getId(), receiver)); } @@ -226,13 +259,13 @@ private String getNonZeroNumQueuedEnvelopes() { private static final class ChannelInBuildup implements ChannelFutureListener { - private Object lock = new Object(); + private final Object lock = new Object(); private volatile OutboundConnectionQueue channel; private volatile Throwable error; - private int numRetries = 2; + private int numRetries = 3; private final Bootstrap out; diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/OutboundConnectionQueue.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/OutboundConnectionQueue.java index b6ec9156a4287..8fef3c1951d08 100644 --- a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/OutboundConnectionQueue.java +++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/OutboundConnectionQueue.java @@ -33,7 +33,7 @@ public class OutboundConnectionQueue extends ChannelInboundHandlerAdapter implem private final ArrayDeque queuedEnvelopes = new ArrayDeque(); - private final AtomicInteger numQueued = new AtomicInteger(0); + private final AtomicInteger numQueuedEnvelopes = new AtomicInteger(0); public OutboundConnectionQueue(Channel channel) { this.channel = channel; @@ -58,7 +58,7 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object envelopeToEnque boolean triggerWrite = this.queuedEnvelopes.isEmpty(); this.queuedEnvelopes.addLast((Envelope) envelopeToEnqueue); - this.numQueued.incrementAndGet(); + this.numQueuedEnvelopes.incrementAndGet(); if (triggerWrite) { writeAndFlushNextEnvelopeIfPossible(); @@ -84,17 +84,18 @@ else if (future.cause() != null) { } public int getNumQueuedEnvelopes() { - return this.numQueued.intValue(); + return this.numQueuedEnvelopes.intValue(); } - public Channel getChannel() { - return this.channel; + @Override + public String toString() { + return channel.toString(); } private void writeAndFlushNextEnvelopeIfPossible() { if (this.channel.isWritable() && !this.queuedEnvelopes.isEmpty()) { Envelope nextEnvelope = this.queuedEnvelopes.pollFirst(); - this.numQueued.decrementAndGet(); + this.numQueuedEnvelopes.decrementAndGet(); this.channel.writeAndFlush(nextEnvelope).addListener(this); } diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/instance/local/LocalInstanceManagerTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/instance/local/LocalInstanceManagerTest.java index 15775742951b4..92ea5abffee77 100644 --- a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/instance/local/LocalInstanceManagerTest.java +++ b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/instance/local/LocalInstanceManagerTest.java @@ -15,6 +15,8 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; + +import eu.stratosphere.nephele.ExecutionMode; import junit.framework.Assert; import org.junit.Test; @@ -22,7 +24,6 @@ import eu.stratosphere.configuration.GlobalConfiguration; import eu.stratosphere.nephele.instance.InstanceType; import eu.stratosphere.nephele.jobmanager.JobManager; -import eu.stratosphere.nephele.jobmanager.JobManager.ExecutionMode; import eu.stratosphere.nephele.util.ServerTestUtils; /** diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/JobManagerITCase.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/JobManagerITCase.java index 06c857ee680e1..063b82733d6c8 100644 --- a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/JobManagerITCase.java +++ b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/JobManagerITCase.java @@ -17,6 +17,7 @@ import eu.stratosphere.configuration.Configuration; import eu.stratosphere.configuration.GlobalConfiguration; import eu.stratosphere.core.fs.Path; +import eu.stratosphere.nephele.ExecutionMode; import eu.stratosphere.nephele.client.JobClient; import eu.stratosphere.nephele.client.JobExecutionException; import eu.stratosphere.nephele.jobgraph.DistributionPattern; @@ -26,7 +27,6 @@ import eu.stratosphere.nephele.jobgraph.JobGraph; import eu.stratosphere.nephele.jobgraph.JobGraphDefinitionException; import eu.stratosphere.nephele.jobgraph.JobTaskVertex; -import eu.stratosphere.nephele.jobmanager.JobManager.ExecutionMode; import eu.stratosphere.nephele.taskmanager.Task; import eu.stratosphere.nephele.taskmanager.TaskManager; import eu.stratosphere.nephele.util.FileLineReader; diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/network/netty/NettyConnectionManagerTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/network/netty/NettyConnectionManagerTest.java index c380431eb386b..dbb4c3f529e05 100644 --- a/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/network/netty/NettyConnectionManagerTest.java +++ b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/network/netty/NettyConnectionManagerTest.java @@ -28,12 +28,11 @@ import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; -import java.util.Arrays; -import java.util.Collection; import java.util.Random; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; @@ -47,113 +46,136 @@ public class NettyConnectionManagerTest { private final static int BIND_PORT = 20000; - private final static int HIGH_WATERMARK = 32 * 1024; + private final static int BUFFER_SIZE = 32 * 1024; - private int numSubtasks; - - private int numToSendPerSubtask; - - private int numInThreads; + public void testEnqueueRaceAndDeadlockFreeMultipleChannels() throws Exception { + Integer[][] configs = new Integer[][]{ + {64, 4096, 1, 1, 1}, + {128, 2048, 1, 1, 1}, + {256, 1024, 1, 1, 1}, + {512, 512, 1, 1, 1}, + {64, 4096, 4, 1, 1}, + {128, 2048, 4, 1, 1}, + {256, 1024, 4, 1, 1}, + {512, 512, 4, 1, 1}, + {64, 4096, 4, 2, 2}, + {128, 2048, 4, 2, 2}, + {256, 1024, 4, 2, 2}, + {512, 512, 4, 2, 2} + }; - private int numOutThreads; + for (Integer[] params : configs) { + System.out.println(String.format("Running %s with config: %d sub tasks, %d envelopes to send per subtasks, " + + "%d num channels, %d num in threads, %d num out threads.", + "testEnqueueRaceAndDeadlockFreeMultipleChannels", params[0], params[1], params[2], params[3], params[4])); - private int numChannels; + long start = System.currentTimeMillis(); + doTestEnqueueRaceAndDeadlockFreeMultipleChannels(params[0], params[1], params[2], params[3], params[4]); + long end = System.currentTimeMillis(); - public NettyConnectionManagerTest(int numSubtasks, int numToSendPerSubtask, int numChannels, int numInThreads, int numOutThreads) { - this.numSubtasks = numSubtasks; - this.numToSendPerSubtask = numToSendPerSubtask; - this.numChannels = numChannels; - this.numInThreads = numInThreads; - this.numOutThreads = numOutThreads; + System.out.println(String.format("Runtime: %d ms.", (end - start))); + } } - @Parameterized.Parameters - public static Collection configure() { - return Arrays.asList( - new Integer[][]{ - {64, 4096, 1, 1, 1}, - {128, 2048, 1, 1, 1}, - {256, 1024, 1, 1, 1}, - {512, 512, 1, 1, 1}, - {64, 4096, 4, 1, 1}, - {128, 2048, 4, 1, 1}, - {256, 1024, 4, 1, 1}, - {512, 512, 4, 1, 1}, - {64, 4096, 4, 2, 2}, - {128, 2048, 4, 2, 2}, - {256, 1024, 4, 2, 2}, - {512, 512, 4, 2, 2} - } - ); - } + private void doTestEnqueueRaceAndDeadlockFreeMultipleChannels( + int numSubtasks, final int numToSendPerSubtask, int numChannels, int numInThreads, int numOutThreads) + throws Exception { - public void testEnqueueRaceAndDeadlockFreeMultipleChannels() throws Exception { final InetAddress localhost = InetAddress.getLocalHost(); - final CountDownLatch latch = new CountDownLatch(this.numSubtasks); + final CountDownLatch latch = new CountDownLatch(numSubtasks); // -------------------------------------------------------------------- // setup // -------------------------------------------------------------------- ChannelManager channelManager = mock(ChannelManager.class); - doAnswer(new VerifyEnvelopes(latch)).when(channelManager).dispatchFromNetwork(Matchers.anyObject()); + doAnswer(new VerifyEnvelopes(latch, numToSendPerSubtask)) + .when(channelManager).dispatchFromNetwork(Matchers.anyObject()); - NettyConnectionManager connManagerToTest = new NettyConnectionManager(channelManager, localhost, - BIND_PORT, HIGH_WATERMARK, this.numInThreads, this.numOutThreads, -1, -1); + final NettyConnectionManager senderConnManager = new NettyConnectionManager(localhost, BIND_PORT, BUFFER_SIZE, + numInThreads, numOutThreads, -1, -1); + senderConnManager.start(channelManager); - NettyConnectionManager connManagerReceiver = new NettyConnectionManager(channelManager, localhost, - BIND_PORT + 1, HIGH_WATERMARK, this.numInThreads, this.numOutThreads, -1, -1); + NettyConnectionManager receiverConnManager = new NettyConnectionManager(localhost, BIND_PORT + 1, BUFFER_SIZE, + numInThreads, numOutThreads, -1, -1); + receiverConnManager.start(channelManager); // -------------------------------------------------------------------- // start sender threads // -------------------------------------------------------------------- - RemoteReceiver[] receivers = new RemoteReceiver[this.numChannels]; + RemoteReceiver[] receivers = new RemoteReceiver[numChannels]; - for (int i = 0; i < this.numChannels; i++) { + for (int i = 0; i < numChannels; i++) { receivers[i] = new RemoteReceiver(new InetSocketAddress(localhost, BIND_PORT + 1), i); } - for (int i = 0; i < this.numSubtasks; i++) { - RemoteReceiver receiver = receivers[random.nextInt(this.numChannels)]; - new Thread(new SubtaskSenderThread(connManagerToTest, receiver)).start(); + for (int i = 0; i < numSubtasks; i++) { + final RemoteReceiver receiver = receivers[random.nextInt(numChannels)]; + + final AtomicInteger seqNum = new AtomicInteger(0); + final JobID jobId = new JobID(); + final ChannelID channelId = new ChannelID(); + + new Thread(new Runnable() { + @Override + public void run() { + // enqueue envelopes with ascending seq numbers + while (seqNum.get() < numToSendPerSubtask) { + try { + Envelope env = new Envelope(seqNum.getAndIncrement(), jobId, channelId); + senderConnManager.enqueue(env, receiver); + } catch (IOException e) { + throw new RuntimeException("Unexpected exception while enqueuing envelope."); + } + } + } + }).start(); } latch.await(); - connManagerToTest.shutdown(); - connManagerReceiver.shutdown(); + senderConnManager.shutdown(); + receiverConnManager.shutdown(); } - - private class VerifyEnvelopes implements Answer { + /** + * Verifies correct ordering of received envelopes (per envelope source channel ID). + */ + private class VerifyEnvelopes implements Answer { private final ConcurrentMap received = new ConcurrentHashMap(); private final CountDownLatch latch; - private VerifyEnvelopes(CountDownLatch latch) { + private final int numExpectedEnvelopesPerSubtask; + + private VerifyEnvelopes(CountDownLatch latch, int numExpectedEnvelopesPerSubtask) { this.latch = latch; + this.numExpectedEnvelopesPerSubtask = numExpectedEnvelopesPerSubtask; } @Override - public Object answer(InvocationOnMock invocation) throws Throwable { + public Void answer(InvocationOnMock invocation) throws Throwable { Envelope env = (Envelope) invocation.getArguments()[0]; ChannelID channelId = env.getSource(); int seqNum = env.getSequenceNumber(); if (seqNum == 0) { - Assert.assertNull( - String.format("Received envelope from %s before, but current seq num is 0", channelId), - this.received.putIfAbsent(channelId, seqNum)); + Integer previousSeqNum = this.received.putIfAbsent(channelId, seqNum); + + String msg = String.format("Received envelope from %s before, but current seq num is 0", channelId); + Assert.assertNull(msg, previousSeqNum); } else { - Assert.assertTrue( - String.format("Received seq num %d from %s, but previous was not %d", seqNum, channelId, seqNum - 1), - this.received.replace(channelId, seqNum - 1, seqNum)); + boolean isExpectedPreviousSeqNum = this.received.replace(channelId, seqNum - 1, seqNum); + + String msg = String.format("Received seq num %d from %s, but previous was not %d.", + seqNum, channelId, seqNum - 1); + Assert.assertTrue(msg, isExpectedPreviousSeqNum); } // count down the latch if all envelopes received for this source - if (seqNum == numToSendPerSubtask - 1) { + if (seqNum == numExpectedEnvelopesPerSubtask - 1) { this.latch.countDown(); } @@ -161,54 +183,13 @@ public Object answer(InvocationOnMock invocation) throws Throwable { } } - private class SubtaskSenderThread implements Runnable { - - private final NettyConnectionManager connectionManager; - - private final RemoteReceiver receiver; - - private final JobID jobId = new JobID(); - - private final ChannelID channelId = new ChannelID(); - - private int seqNum = 0; - - private SubtaskSenderThread(NettyConnectionManager connectionManager, RemoteReceiver receiver) { - this.connectionManager = connectionManager; - this.receiver = receiver; - } - - @Override - public void run() { - // enqueue envelopes with ascending seq nums - while (this.seqNum < numToSendPerSubtask) { - try { - Envelope env = new Envelope(this.seqNum++, this.jobId, this.channelId); - this.connectionManager.enqueue(env, receiver); - } catch (IOException e) { - throw new RuntimeException("Unexpected exception while enqueing envelope"); - } - } - } - } - private void runAllTests() throws Exception { - System.out.println(String.format("Running tests with config: %d sub tasks, %d envelopes to send per subtasks, " - + "%d num channels, %d num in threads, %d num out threads.", - this.numSubtasks, this.numToSendPerSubtask, this.numChannels, this.numInThreads, this.numOutThreads)); - testEnqueueRaceAndDeadlockFreeMultipleChannels(); System.out.println("Done."); } public static void main(String[] args) throws Exception { - Collection configs = configure(); - - for (Integer[] params : configs) { - - NettyConnectionManagerTest test = new NettyConnectionManagerTest(params[0], params[1], params[2], params[3], params[4]); - test.runAllTests(); - } + new NettyConnectionManagerTest().runAllTests(); } } From 5254ef3fbd99736e5e97ed38e8c92efbead2e25f Mon Sep 17 00:00:00 2001 From: uce Date: Tue, 17 Jun 2014 11:11:54 +0200 Subject: [PATCH 2/2] Address review comments --- .../eu/stratosphere/nephele/taskmanager/TaskManager.java | 6 +++++- .../stratosphere/runtime/io/network/ChannelManager.java | 8 ++------ .../runtime/io/network/netty/NettyConnectionManager.java | 6 +++--- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java index b6307ebad142f..3da26f5668bfa 100644 --- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java +++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java @@ -929,7 +929,11 @@ public void shutdown() { } // Shut down the channel manager - this.channelManager.shutdown(); + try { + this.channelManager.shutdown(); + } catch (IOException e) { + LOG.warn("ChannelManager did not shutdown properly: " + e.getMessage(), e); + } // Shut down the memory manager if (this.ioManager != null) { diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/ChannelManager.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/ChannelManager.java index 0e9edee513608..34d35012ae0cc 100644 --- a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/ChannelManager.java +++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/ChannelManager.java @@ -95,12 +95,8 @@ public ChannelManager(ChannelLookupProtocol channelLookupService, InstanceConnec this.discardBufferPool = new DiscardBufferPool(); } - public void shutdown() { - try { - this.networkConnectionManager.shutdown(); - } catch (IOException e) { - LOG.warn("NetworkConnectionManager did not shutdown properly."); - } + public void shutdown() throws IOException { + this.networkConnectionManager.shutdown(); this.globalBufferPool.destroy(); } diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/NettyConnectionManager.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/NettyConnectionManager.java index 909c80ccb7fd3..73afcbc297178 100644 --- a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/NettyConnectionManager.java +++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/NettyConnectionManager.java @@ -132,7 +132,7 @@ public void initChannel(SocketChannel channel) throws Exception { try { in.bind().sync(); } catch (InterruptedException e) { - throw new IOException("Interrupted while trying to bind server socket."); + throw new IOException(e); } if (LOG.isDebugEnabled()) { @@ -215,7 +215,7 @@ public void shutdown() throws IOException { try { in.group().shutdownGracefully().sync(); } catch (InterruptedException e) { - throw new RuntimeException("Interrupted while trying to shutdown incoming connections."); + throw new IOException(e); } } @@ -225,7 +225,7 @@ public void shutdown() throws IOException { try { out.group().shutdownGracefully().sync(); } catch (InterruptedException e) { - throw new RuntimeException("Interrupted while trying to shutdown outgoing connections."); + throw new IOException(e); } } }