diff --git a/stratosphere-clients/src/main/java/eu/stratosphere/client/minicluster/NepheleMiniCluster.java b/stratosphere-clients/src/main/java/eu/stratosphere/client/minicluster/NepheleMiniCluster.java
index f44a47ea6dbf7..79e5c64bfb658 100644
--- a/stratosphere-clients/src/main/java/eu/stratosphere/client/minicluster/NepheleMiniCluster.java
+++ b/stratosphere-clients/src/main/java/eu/stratosphere/client/minicluster/NepheleMiniCluster.java
@@ -15,6 +15,7 @@
import java.lang.reflect.Method;
+import eu.stratosphere.nephele.ExecutionMode;
import eu.stratosphere.nephele.instance.HardwareDescriptionFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -27,7 +28,6 @@
import eu.stratosphere.nephele.client.JobClient;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.nephele.jobmanager.JobManager;
-import eu.stratosphere.nephele.jobmanager.JobManager.ExecutionMode;
public class NepheleMiniCluster {
diff --git a/stratosphere-runtime/pom.xml b/stratosphere-runtime/pom.xml
index 074725b579743..7d196928daaf0 100644
--- a/stratosphere-runtime/pom.xml
+++ b/stratosphere-runtime/pom.xml
@@ -56,7 +56,7 @@
io.netty
netty-all
- 4.0.19.Final
+ 4.0.20.Final
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/ExecutionMode.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/ExecutionMode.java
new file mode 100644
index 0000000000000..68ab65bf95545
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/ExecutionMode.java
@@ -0,0 +1,18 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.nephele;
+
+public enum ExecutionMode {
+ LOCAL, CLUSTER
+}
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/local/LocalInstanceManager.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/local/LocalInstanceManager.java
index 2bb344f29f03d..e888b3f0eeae1 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/local/LocalInstanceManager.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/instance/local/LocalInstanceManager.java
@@ -20,6 +20,7 @@
import java.util.ArrayList;
import java.util.Iterator;
+import eu.stratosphere.nephele.ExecutionMode;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -150,6 +151,9 @@ public LocalInstanceManager() throws Exception {
numTaskManagers = GlobalConfiguration.getInteger(ConfigConstants
.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 1);
+
+ ExecutionMode executionMode = (numTaskManagers > 1) ? ExecutionMode.CLUSTER : ExecutionMode.LOCAL;
+
for(int i=0; i< numTaskManagers; i++){
Configuration tm = new Configuration();
@@ -163,7 +167,7 @@ public LocalInstanceManager() throws Exception {
GlobalConfiguration.includeConfiguration(tm);
- TaskManager t = new TaskManager();
+ TaskManager t = new TaskManager(executionMode);
taskManagers.add(t);
}
}
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/JobManager.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/JobManager.java
index 3ae9f3bb028a4..5e92dd15b608d 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/JobManager.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/JobManager.java
@@ -31,6 +31,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import eu.stratosphere.nephele.ExecutionMode;
import eu.stratosphere.nephele.managementgraph.ManagementVertexID;
import eu.stratosphere.nephele.taskmanager.TaskKillResult;
import org.apache.commons.cli.CommandLine;
@@ -123,9 +124,6 @@
public class JobManager implements DeploymentManager, ExtendedManagementProtocol, InputSplitProviderProtocol,
JobManagerProtocol, ChannelLookupProtocol, JobStatusListener, AccumulatorProtocol
{
- public static enum ExecutionMode { LOCAL, CLUSTER }
-
- // --------------------------------------------------------------------------------------------
private static final Log LOG = LogFactory.getLog(JobManager.class);
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/JobManagerUtils.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/JobManagerUtils.java
index 61cc22fda0a10..c576f204a1a63 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/JobManagerUtils.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/JobManagerUtils.java
@@ -16,11 +16,11 @@
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
+import eu.stratosphere.nephele.ExecutionMode;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import eu.stratosphere.nephele.instance.InstanceManager;
-import eu.stratosphere.nephele.jobmanager.JobManager.ExecutionMode;
import eu.stratosphere.nephele.jobmanager.scheduler.AbstractScheduler;
import eu.stratosphere.util.StringUtils;
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java
index 9b5fdf623ae58..3da26f5668bfa 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java
@@ -38,6 +38,10 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import eu.stratosphere.nephele.ExecutionMode;
+import eu.stratosphere.runtime.io.network.LocalConnectionManager;
+import eu.stratosphere.runtime.io.network.NetworkConnectionManager;
+import eu.stratosphere.runtime.io.network.netty.NettyConnectionManager;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.GnuParser;
@@ -155,10 +159,11 @@ public class TaskManager implements TaskOperationProtocol {
* receive an initial configuration. All parameters are obtained from the
* {@link GlobalConfiguration}, which must be loaded prior to instantiating the task manager.
*/
- public TaskManager() throws Exception {
+ public TaskManager(ExecutionMode executionMode) throws Exception {
LOG.info("TaskManager started as user " + UserGroupInformation.getCurrentUser().getShortUserName());
LOG.info("User system property: " + System.getProperty("user.name"));
+ LOG.info("Execution mode: " + executionMode);
// IMPORTANT! At this point, the GlobalConfiguration must have been read!
@@ -286,27 +291,40 @@ public TaskManager() throws Exception {
ConfigConstants.TASK_MANAGER_NETWORK_BUFFER_SIZE_KEY,
ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_BUFFER_SIZE);
- int numInThreads = GlobalConfiguration.getInteger(
- ConfigConstants.TASK_MANAGER_NET_NUM_IN_THREADS_KEY,
- ConfigConstants.DEFAULT_TASK_MANAGER_NET_NUM_IN_THREADS);
- int numOutThreads = GlobalConfiguration.getInteger(
- ConfigConstants.TASK_MANAGER_NET_NUM_OUT_THREADS_KEY,
- ConfigConstants.DEFAULT_TASK_MANAGER_NET_NUM_OUT_THREADS);
-
- int lowWaterMark = GlobalConfiguration.getInteger(
- ConfigConstants.TASK_MANAGER_NET_NETTY_LOW_WATER_MARK,
- ConfigConstants.DEFAULT_TASK_MANAGER_NET_NETTY_LOW_WATER_MARK);
-
- int highWaterMark = GlobalConfiguration.getInteger(
- ConfigConstants.TASK_MANAGER_NET_NETTY_HIGH_WATER_MARK,
- ConfigConstants.DEFAULT_TASK_MANAGER_NET_NETTY_HIGH_WATER_MARK);
// Initialize the channel manager
try {
- this.channelManager = new ChannelManager(
- this.lookupService, this.localInstanceConnectionInfo,
- numBuffers, bufferSize, numInThreads, numOutThreads, lowWaterMark, highWaterMark);
+ NetworkConnectionManager networkConnectionManager = null;
+
+ switch (executionMode) {
+ case LOCAL:
+ networkConnectionManager = new LocalConnectionManager();
+ break;
+ case CLUSTER:
+ int numInThreads = GlobalConfiguration.getInteger(
+ ConfigConstants.TASK_MANAGER_NET_NUM_IN_THREADS_KEY,
+ ConfigConstants.DEFAULT_TASK_MANAGER_NET_NUM_IN_THREADS);
+
+ int numOutThreads = GlobalConfiguration.getInteger(
+ ConfigConstants.TASK_MANAGER_NET_NUM_OUT_THREADS_KEY,
+ ConfigConstants.DEFAULT_TASK_MANAGER_NET_NUM_OUT_THREADS);
+
+ int lowWaterMark = GlobalConfiguration.getInteger(
+ ConfigConstants.TASK_MANAGER_NET_NETTY_LOW_WATER_MARK,
+ ConfigConstants.DEFAULT_TASK_MANAGER_NET_NETTY_LOW_WATER_MARK);
+
+ int highWaterMark = GlobalConfiguration.getInteger(
+ ConfigConstants.TASK_MANAGER_NET_NETTY_HIGH_WATER_MARK,
+ ConfigConstants.DEFAULT_TASK_MANAGER_NET_NETTY_HIGH_WATER_MARK);
+
+ networkConnectionManager = new NettyConnectionManager(
+ localInstanceConnectionInfo.address(), localInstanceConnectionInfo.dataPort(),
+ bufferSize, numInThreads, numOutThreads, lowWaterMark, highWaterMark);
+ break;
+ }
+
+ channelManager = new ChannelManager(lookupService, localInstanceConnectionInfo, numBuffers, bufferSize, networkConnectionManager);
} catch (IOException ioe) {
LOG.error(StringUtils.stringifyException(ioe));
throw new Exception("Failed to instantiate channel manager. " + ioe.getMessage(), ioe);
@@ -436,7 +454,7 @@ public static void main(String[] args) throws IOException {
// Create a new task manager object
try {
- new TaskManager();
+ new TaskManager(ExecutionMode.CLUSTER);
} catch (Exception e) {
LOG.fatal("Taskmanager startup failed: " + e.getMessage(), e);
System.exit(FAILURE_RETURN_CODE);
@@ -910,8 +928,12 @@ public void shutdown() {
this.profiler.shutdown();
}
- // Shut down the network channel manager
- this.channelManager.shutdown();
+ // Shut down the channel manager
+ try {
+ this.channelManager.shutdown();
+ } catch (IOException e) {
+ LOG.warn("ChannelManager did not shutdown properly: " + e.getMessage(), e);
+ }
// Shut down the memory manager
if (this.ioManager != null) {
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/ChannelManager.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/ChannelManager.java
index 7b01f4e4114e9..34d35012ae0cc 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/ChannelManager.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/ChannelManager.java
@@ -37,7 +37,6 @@
import eu.stratosphere.runtime.io.gates.GateID;
import eu.stratosphere.runtime.io.gates.InputGate;
import eu.stratosphere.runtime.io.gates.OutputGate;
-import eu.stratosphere.runtime.io.network.netty.NettyConnectionManager;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -66,7 +65,7 @@ public class ChannelManager implements EnvelopeDispatcher, BufferProviderBroker
private final GlobalBufferPool globalBufferPool;
- private final NettyConnectionManager nettyConnectionManager;
+ private final NetworkConnectionManager networkConnectionManager;
private final InetSocketAddress ourAddress;
@@ -75,18 +74,15 @@ public class ChannelManager implements EnvelopeDispatcher, BufferProviderBroker
// -----------------------------------------------------------------------------------------------------------------
public ChannelManager(ChannelLookupProtocol channelLookupService, InstanceConnectionInfo connectionInfo,
- int numNetworkBuffers, int networkBufferSize,
- int numInThreads, int numOutThreads,
- int lowWatermark, int highWaterMark) throws IOException {
+ int numNetworkBuffers, int networkBufferSize, NetworkConnectionManager networkConnectionManager) throws IOException {
this.channelLookupService = channelLookupService;
this.connectionInfo = connectionInfo;
this.globalBufferPool = new GlobalBufferPool(numNetworkBuffers, networkBufferSize);
- this.nettyConnectionManager = new NettyConnectionManager(
- this, connectionInfo.address(), connectionInfo.dataPort(),
- networkBufferSize, numInThreads, numOutThreads, lowWatermark, highWaterMark);
+ this.networkConnectionManager = networkConnectionManager;
+ networkConnectionManager.start(this);
// management data structures
this.channels = new ConcurrentHashMap();
@@ -99,8 +95,9 @@ public ChannelManager(ChannelLookupProtocol channelLookupService, InstanceConnec
this.discardBufferPool = new DiscardBufferPool();
}
- public void shutdown() {
- this.nettyConnectionManager.shutdown();
+ public void shutdown() throws IOException {
+ this.networkConnectionManager.shutdown();
+
this.globalBufferPool.destroy();
}
@@ -324,7 +321,7 @@ private void generateSenderHint(Envelope envelope, RemoteReceiver receiver) thro
final RemoteReceiver ourAddress = new RemoteReceiver(this.ourAddress, connectionIndex);
final Envelope senderHint = SenderHintEvent.createEnvelopeWithEvent(envelope, targetChannelID, ourAddress);
- this.nettyConnectionManager.enqueue(senderHint, receiver);
+ this.networkConnectionManager.enqueue(senderHint, receiver);
}
/**
@@ -459,7 +456,7 @@ else if (receiverList.hasRemoteReceiver()) {
generateSenderHint(envelope, remoteReceiver);
}
- this.nettyConnectionManager.enqueue(envelope, remoteReceiver);
+ this.networkConnectionManager.enqueue(envelope, remoteReceiver);
success = true;
}
} finally {
@@ -507,7 +504,7 @@ else if (receiverList.hasRemoteReceiver()) {
generateSenderHint(envelope, remoteReceiver);
}
- this.nettyConnectionManager.enqueue(envelope, remoteReceiver);
+ this.networkConnectionManager.enqueue(envelope, remoteReceiver);
}
}
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/LocalConnectionManager.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/LocalConnectionManager.java
new file mode 100644
index 0000000000000..85cbbd2a2c88a
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/LocalConnectionManager.java
@@ -0,0 +1,31 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.network;
+
+import java.io.IOException;
+
+public class LocalConnectionManager implements NetworkConnectionManager {
+
+ @Override
+ public void start(ChannelManager channelManager) throws IOException {
+ }
+
+ @Override
+ public void enqueue(Envelope envelope, RemoteReceiver receiver) throws IOException {
+ }
+
+ @Override
+ public void shutdown() throws IOException {
+ }
+}
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/NetworkConnectionManager.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/NetworkConnectionManager.java
new file mode 100644
index 0000000000000..d2d252dc72f3f
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/NetworkConnectionManager.java
@@ -0,0 +1,25 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.network;
+
+import java.io.IOException;
+
+public interface NetworkConnectionManager {
+
+ public void start(ChannelManager channelManager) throws IOException;
+
+ public void enqueue(Envelope envelope, RemoteReceiver receiver) throws IOException;
+
+ public void shutdown() throws IOException;
+}
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/InboundEnvelopeDispatcherHandler.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/InboundEnvelopeDispatcher.java
similarity index 66%
rename from stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/InboundEnvelopeDispatcherHandler.java
rename to stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/InboundEnvelopeDispatcher.java
index d0270b6d47f3e..1c360c17fff4b 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/InboundEnvelopeDispatcherHandler.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/InboundEnvelopeDispatcher.java
@@ -17,25 +17,19 @@
import eu.stratosphere.runtime.io.network.EnvelopeDispatcher;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-public class InboundEnvelopeDispatcherHandler extends ChannelInboundHandlerAdapter {
+public class InboundEnvelopeDispatcher extends ChannelInboundHandlerAdapter {
- private static final Log LOG = LogFactory.getLog(InboundEnvelopeDispatcherHandler.class);
+ private final EnvelopeDispatcher envelopeDispatcher;
- private final EnvelopeDispatcher channelManager;
-
- public InboundEnvelopeDispatcherHandler(EnvelopeDispatcher channelManager) {
- this.channelManager = channelManager;
+ public InboundEnvelopeDispatcher(EnvelopeDispatcher envelopeDispatcher) {
+ this.envelopeDispatcher = envelopeDispatcher;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Envelope envelope = (Envelope) msg;
-// LOG.debug(String.format("Decoded envelope with seq num %d from source channel %s",
-// envelope.getSequenceNumber(),
-// envelope.getSource()));
- this.channelManager.dispatchFromNetwork(envelope);
+
+ envelopeDispatcher.dispatchFromNetwork(envelope);
}
}
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/NettyConnectionManager.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/NettyConnectionManager.java
index 4e3f0437c2baf..73afcbc297178 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/NettyConnectionManager.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/NettyConnectionManager.java
@@ -15,7 +15,10 @@
import eu.stratosphere.runtime.io.network.ChannelManager;
import eu.stratosphere.runtime.io.network.Envelope;
+import eu.stratosphere.runtime.io.network.EnvelopeDispatcher;
+import eu.stratosphere.runtime.io.network.NetworkConnectionManager;
import eu.stratosphere.runtime.io.network.RemoteReceiver;
+import eu.stratosphere.runtime.io.network.bufferprovider.BufferProviderBroker;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
@@ -28,7 +31,6 @@
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
-import io.netty.util.concurrent.Future;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -39,50 +41,70 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-public class NettyConnectionManager {
+public class NettyConnectionManager implements NetworkConnectionManager {
private static final Log LOG = LogFactory.getLog(NettyConnectionManager.class);
private static final int DEBUG_PRINT_QUEUED_ENVELOPES_EVERY_MS = 10000;
- private final ChannelManager channelManager;
+ private final ConcurrentMap outConnections = new ConcurrentHashMap();
- private final ServerBootstrap in;
+ private final InetAddress bindAddress;
- private final Bootstrap out;
+ private final int bindPort;
- private final ConcurrentMap outConnections;
+ private final int bufferSize;
- public NettyConnectionManager(ChannelManager channelManager, InetAddress bindAddress, int bindPort,
- int bufferSize, int numInThreads, int numOutThreads,
- int lowWaterMark, int highWaterMark) {
- this.outConnections = new ConcurrentHashMap();
- this.channelManager = channelManager;
+ private final int numInThreads;
- // --------------------------------------------------------------------
+ private final int numOutThreads;
+
+ private final int lowWaterMark;
+
+ private final int highWaterMark;
+
+ private ServerBootstrap in;
+
+ private Bootstrap out;
+
+ public NettyConnectionManager(InetAddress bindAddress, int bindPort, int bufferSize, int numInThreads,
+ int numOutThreads, int lowWaterMark, int highWaterMark) {
+
+ this.bindAddress = bindAddress;
+ this.bindPort = bindPort;
+
+ this.bufferSize = bufferSize;
int defaultNumThreads = Math.max(Runtime.getRuntime().availableProcessors() / 4, 1);
- numInThreads = (numInThreads == -1) ? defaultNumThreads : numInThreads;
- numOutThreads = (numOutThreads == -1) ? defaultNumThreads : numOutThreads;
- LOG.info(String.format("Starting with %d incoming and %d outgoing connection threads.", numInThreads, numOutThreads));
- lowWaterMark = (lowWaterMark == -1) ? bufferSize / 2 : lowWaterMark;
- highWaterMark = (highWaterMark == -1) ? bufferSize : highWaterMark;
+ this.numInThreads = (numInThreads == -1) ? defaultNumThreads : numInThreads;
+ this.numOutThreads = (numOutThreads == -1) ? defaultNumThreads : numOutThreads;
+
+ this.lowWaterMark = (lowWaterMark == -1) ? bufferSize / 2 : lowWaterMark;
+ this.highWaterMark = (highWaterMark == -1) ? bufferSize : highWaterMark;
+ }
+
+ @Override
+ public void start(ChannelManager channelManager) throws IOException {
+ LOG.info(String.format("Starting with %d incoming and %d outgoing connection threads.", numInThreads, numOutThreads));
LOG.info(String.format("Setting low water mark to %d and high water mark to %d bytes.", lowWaterMark, highWaterMark));
+ final BufferProviderBroker bufferProviderBroker = channelManager;
+ final EnvelopeDispatcher envelopeDispatcher = channelManager;
+
// --------------------------------------------------------------------
// server bootstrap (incoming connections)
// --------------------------------------------------------------------
- this.in = new ServerBootstrap();
- this.in.group(new NioEventLoopGroup(numInThreads))
+ in = new ServerBootstrap();
+ in.group(new NioEventLoopGroup(numInThreads))
.channel(NioServerSocketChannel.class)
.localAddress(bindAddress, bindPort)
.childHandler(new ChannelInitializer() {
@Override
public void initChannel(SocketChannel channel) throws Exception {
channel.pipeline()
- .addLast(new InboundEnvelopeDecoder(NettyConnectionManager.this.channelManager))
- .addLast(new InboundEnvelopeDispatcherHandler(NettyConnectionManager.this.channelManager));
+ .addLast(new InboundEnvelopeDecoder(bufferProviderBroker))
+ .addLast(new InboundEnvelopeDispatcher(envelopeDispatcher));
}
})
.option(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(bufferSize))
@@ -91,8 +113,8 @@ public void initChannel(SocketChannel channel) throws Exception {
// --------------------------------------------------------------------
// client bootstrap (outgoing connections)
// --------------------------------------------------------------------
- this.out = new Bootstrap();
- this.out.group(new NioEventLoopGroup(numOutThreads))
+ out = new Bootstrap();
+ out.group(new NioEventLoopGroup(numOutThreads))
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer() {
@Override
@@ -108,9 +130,9 @@ public void initChannel(SocketChannel channel) throws Exception {
.option(ChannelOption.SO_KEEPALIVE, true);
try {
- this.in.bind().sync();
+ in.bind().sync();
} catch (InterruptedException e) {
- throw new RuntimeException("Could not bind server socket for incoming connections.");
+ throw new IOException(e);
}
if (LOG.isDebugEnabled()) {
@@ -119,16 +141,14 @@ public void initChannel(SocketChannel channel) throws Exception {
public void run() {
Date date = new Date();
-
while (true) {
try {
Thread.sleep(DEBUG_PRINT_QUEUED_ENVELOPES_EVERY_MS);
date.setTime(System.currentTimeMillis());
- System.out.println(date);
+ System.out.println(date);
System.out.println(getNonZeroNumQueuedEnvelopes());
-
} catch (InterruptedException e) {
e.printStackTrace();
}
@@ -138,18 +158,7 @@ public void run() {
}
}
- public void shutdown() {
- Future> inShutdownFuture = this.in.group().shutdownGracefully();
- Future> outShutdownFuture = this.out.group().shutdownGracefully();
-
- try {
- inShutdownFuture.sync();
- outShutdownFuture.sync();
- } catch (InterruptedException e) {
- throw new RuntimeException("Could not properly shutdown connections.");
- }
- }
-
+ @Override
public void enqueue(Envelope envelope, RemoteReceiver receiver) throws IOException {
// Get the channel. The channel may be
// 1) a channel that already exists (usual case) -> just send the data
@@ -198,6 +207,29 @@ else if (old instanceof ChannelInBuildup) {
channel.enqueue(envelope);
}
+ @Override
+ public void shutdown() throws IOException {
+ if (!in.group().isShuttingDown()) {
+ LOG.info("Shutting down incoming connections.");
+
+ try {
+ in.group().shutdownGracefully().sync();
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+ }
+
+ if (!out.group().isShuttingDown()) {
+ LOG.info("Shutting down outgoing connections.");
+
+ try {
+ out.group().shutdownGracefully().sync();
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+ }
+ }
+
private String getNonZeroNumQueuedEnvelopes() {
StringBuilder str = new StringBuilder();
@@ -211,9 +243,10 @@ private String getNonZeroNumQueuedEnvelopes() {
OutboundConnectionQueue queue = (OutboundConnectionQueue) value;
if (queue.getNumQueuedEnvelopes() > 0) {
str.append(String.format("%s> Number of queued envelopes for %s with channel %s: %d\n",
- Thread.currentThread().getId(), receiver, queue.getChannel(), queue.getNumQueuedEnvelopes()));
+ Thread.currentThread().getId(), receiver, queue.toString(), queue.getNumQueuedEnvelopes()));
}
- } else if (value instanceof ChannelInBuildup) {
+ }
+ else if (value instanceof ChannelInBuildup) {
str.append(String.format("%s> Connection to %s is still in buildup\n",
Thread.currentThread().getId(), receiver));
}
@@ -226,13 +259,13 @@ private String getNonZeroNumQueuedEnvelopes() {
private static final class ChannelInBuildup implements ChannelFutureListener {
- private Object lock = new Object();
+ private final Object lock = new Object();
private volatile OutboundConnectionQueue channel;
private volatile Throwable error;
- private int numRetries = 2;
+ private int numRetries = 3;
private final Bootstrap out;
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/OutboundConnectionQueue.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/OutboundConnectionQueue.java
index b6ec9156a4287..8fef3c1951d08 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/OutboundConnectionQueue.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/OutboundConnectionQueue.java
@@ -33,7 +33,7 @@ public class OutboundConnectionQueue extends ChannelInboundHandlerAdapter implem
private final ArrayDeque queuedEnvelopes = new ArrayDeque();
- private final AtomicInteger numQueued = new AtomicInteger(0);
+ private final AtomicInteger numQueuedEnvelopes = new AtomicInteger(0);
public OutboundConnectionQueue(Channel channel) {
this.channel = channel;
@@ -58,7 +58,7 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object envelopeToEnque
boolean triggerWrite = this.queuedEnvelopes.isEmpty();
this.queuedEnvelopes.addLast((Envelope) envelopeToEnqueue);
- this.numQueued.incrementAndGet();
+ this.numQueuedEnvelopes.incrementAndGet();
if (triggerWrite) {
writeAndFlushNextEnvelopeIfPossible();
@@ -84,17 +84,18 @@ else if (future.cause() != null) {
}
public int getNumQueuedEnvelopes() {
- return this.numQueued.intValue();
+ return this.numQueuedEnvelopes.intValue();
}
- public Channel getChannel() {
- return this.channel;
+ @Override
+ public String toString() {
+ return channel.toString();
}
private void writeAndFlushNextEnvelopeIfPossible() {
if (this.channel.isWritable() && !this.queuedEnvelopes.isEmpty()) {
Envelope nextEnvelope = this.queuedEnvelopes.pollFirst();
- this.numQueued.decrementAndGet();
+ this.numQueuedEnvelopes.decrementAndGet();
this.channel.writeAndFlush(nextEnvelope).addListener(this);
}
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/instance/local/LocalInstanceManagerTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/instance/local/LocalInstanceManagerTest.java
index 15775742951b4..92ea5abffee77 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/instance/local/LocalInstanceManagerTest.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/instance/local/LocalInstanceManagerTest.java
@@ -15,6 +15,8 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
+
+import eu.stratosphere.nephele.ExecutionMode;
import junit.framework.Assert;
import org.junit.Test;
@@ -22,7 +24,6 @@
import eu.stratosphere.configuration.GlobalConfiguration;
import eu.stratosphere.nephele.instance.InstanceType;
import eu.stratosphere.nephele.jobmanager.JobManager;
-import eu.stratosphere.nephele.jobmanager.JobManager.ExecutionMode;
import eu.stratosphere.nephele.util.ServerTestUtils;
/**
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/JobManagerITCase.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/JobManagerITCase.java
index 06c857ee680e1..063b82733d6c8 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/JobManagerITCase.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/JobManagerITCase.java
@@ -17,6 +17,7 @@
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.configuration.GlobalConfiguration;
import eu.stratosphere.core.fs.Path;
+import eu.stratosphere.nephele.ExecutionMode;
import eu.stratosphere.nephele.client.JobClient;
import eu.stratosphere.nephele.client.JobExecutionException;
import eu.stratosphere.nephele.jobgraph.DistributionPattern;
@@ -26,7 +27,6 @@
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.nephele.jobgraph.JobGraphDefinitionException;
import eu.stratosphere.nephele.jobgraph.JobTaskVertex;
-import eu.stratosphere.nephele.jobmanager.JobManager.ExecutionMode;
import eu.stratosphere.nephele.taskmanager.Task;
import eu.stratosphere.nephele.taskmanager.TaskManager;
import eu.stratosphere.nephele.util.FileLineReader;
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/network/netty/NettyConnectionManagerTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/network/netty/NettyConnectionManagerTest.java
index c380431eb386b..dbb4c3f529e05 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/network/netty/NettyConnectionManagerTest.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/network/netty/NettyConnectionManagerTest.java
@@ -28,12 +28,11 @@
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
-import java.util.Arrays;
-import java.util.Collection;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
@@ -47,113 +46,136 @@ public class NettyConnectionManagerTest {
private final static int BIND_PORT = 20000;
- private final static int HIGH_WATERMARK = 32 * 1024;
+ private final static int BUFFER_SIZE = 32 * 1024;
- private int numSubtasks;
-
- private int numToSendPerSubtask;
-
- private int numInThreads;
+ public void testEnqueueRaceAndDeadlockFreeMultipleChannels() throws Exception {
+ Integer[][] configs = new Integer[][]{
+ {64, 4096, 1, 1, 1},
+ {128, 2048, 1, 1, 1},
+ {256, 1024, 1, 1, 1},
+ {512, 512, 1, 1, 1},
+ {64, 4096, 4, 1, 1},
+ {128, 2048, 4, 1, 1},
+ {256, 1024, 4, 1, 1},
+ {512, 512, 4, 1, 1},
+ {64, 4096, 4, 2, 2},
+ {128, 2048, 4, 2, 2},
+ {256, 1024, 4, 2, 2},
+ {512, 512, 4, 2, 2}
+ };
- private int numOutThreads;
+ for (Integer[] params : configs) {
+ System.out.println(String.format("Running %s with config: %d sub tasks, %d envelopes to send per subtasks, "
+ + "%d num channels, %d num in threads, %d num out threads.",
+ "testEnqueueRaceAndDeadlockFreeMultipleChannels", params[0], params[1], params[2], params[3], params[4]));
- private int numChannels;
+ long start = System.currentTimeMillis();
+ doTestEnqueueRaceAndDeadlockFreeMultipleChannels(params[0], params[1], params[2], params[3], params[4]);
+ long end = System.currentTimeMillis();
- public NettyConnectionManagerTest(int numSubtasks, int numToSendPerSubtask, int numChannels, int numInThreads, int numOutThreads) {
- this.numSubtasks = numSubtasks;
- this.numToSendPerSubtask = numToSendPerSubtask;
- this.numChannels = numChannels;
- this.numInThreads = numInThreads;
- this.numOutThreads = numOutThreads;
+ System.out.println(String.format("Runtime: %d ms.", (end - start)));
+ }
}
- @Parameterized.Parameters
- public static Collection configure() {
- return Arrays.asList(
- new Integer[][]{
- {64, 4096, 1, 1, 1},
- {128, 2048, 1, 1, 1},
- {256, 1024, 1, 1, 1},
- {512, 512, 1, 1, 1},
- {64, 4096, 4, 1, 1},
- {128, 2048, 4, 1, 1},
- {256, 1024, 4, 1, 1},
- {512, 512, 4, 1, 1},
- {64, 4096, 4, 2, 2},
- {128, 2048, 4, 2, 2},
- {256, 1024, 4, 2, 2},
- {512, 512, 4, 2, 2}
- }
- );
- }
+ private void doTestEnqueueRaceAndDeadlockFreeMultipleChannels(
+ int numSubtasks, final int numToSendPerSubtask, int numChannels, int numInThreads, int numOutThreads)
+ throws Exception {
- public void testEnqueueRaceAndDeadlockFreeMultipleChannels() throws Exception {
final InetAddress localhost = InetAddress.getLocalHost();
- final CountDownLatch latch = new CountDownLatch(this.numSubtasks);
+ final CountDownLatch latch = new CountDownLatch(numSubtasks);
// --------------------------------------------------------------------
// setup
// --------------------------------------------------------------------
ChannelManager channelManager = mock(ChannelManager.class);
- doAnswer(new VerifyEnvelopes(latch)).when(channelManager).dispatchFromNetwork(Matchers.anyObject());
+ doAnswer(new VerifyEnvelopes(latch, numToSendPerSubtask))
+ .when(channelManager).dispatchFromNetwork(Matchers.anyObject());
- NettyConnectionManager connManagerToTest = new NettyConnectionManager(channelManager, localhost,
- BIND_PORT, HIGH_WATERMARK, this.numInThreads, this.numOutThreads, -1, -1);
+ final NettyConnectionManager senderConnManager = new NettyConnectionManager(localhost, BIND_PORT, BUFFER_SIZE,
+ numInThreads, numOutThreads, -1, -1);
+ senderConnManager.start(channelManager);
- NettyConnectionManager connManagerReceiver = new NettyConnectionManager(channelManager, localhost,
- BIND_PORT + 1, HIGH_WATERMARK, this.numInThreads, this.numOutThreads, -1, -1);
+ NettyConnectionManager receiverConnManager = new NettyConnectionManager(localhost, BIND_PORT + 1, BUFFER_SIZE,
+ numInThreads, numOutThreads, -1, -1);
+ receiverConnManager.start(channelManager);
// --------------------------------------------------------------------
// start sender threads
// --------------------------------------------------------------------
- RemoteReceiver[] receivers = new RemoteReceiver[this.numChannels];
+ RemoteReceiver[] receivers = new RemoteReceiver[numChannels];
- for (int i = 0; i < this.numChannels; i++) {
+ for (int i = 0; i < numChannels; i++) {
receivers[i] = new RemoteReceiver(new InetSocketAddress(localhost, BIND_PORT + 1), i);
}
- for (int i = 0; i < this.numSubtasks; i++) {
- RemoteReceiver receiver = receivers[random.nextInt(this.numChannels)];
- new Thread(new SubtaskSenderThread(connManagerToTest, receiver)).start();
+ for (int i = 0; i < numSubtasks; i++) {
+ final RemoteReceiver receiver = receivers[random.nextInt(numChannels)];
+
+ final AtomicInteger seqNum = new AtomicInteger(0);
+ final JobID jobId = new JobID();
+ final ChannelID channelId = new ChannelID();
+
+ new Thread(new Runnable() {
+ @Override
+ public void run() {
+ // enqueue envelopes with ascending seq numbers
+ while (seqNum.get() < numToSendPerSubtask) {
+ try {
+ Envelope env = new Envelope(seqNum.getAndIncrement(), jobId, channelId);
+ senderConnManager.enqueue(env, receiver);
+ } catch (IOException e) {
+ throw new RuntimeException("Unexpected exception while enqueuing envelope.");
+ }
+ }
+ }
+ }).start();
}
latch.await();
- connManagerToTest.shutdown();
- connManagerReceiver.shutdown();
+ senderConnManager.shutdown();
+ receiverConnManager.shutdown();
}
-
- private class VerifyEnvelopes implements Answer {
+ /**
+ * Verifies correct ordering of received envelopes (per envelope source channel ID).
+ */
+ private class VerifyEnvelopes implements Answer {
private final ConcurrentMap received = new ConcurrentHashMap();
private final CountDownLatch latch;
- private VerifyEnvelopes(CountDownLatch latch) {
+ private final int numExpectedEnvelopesPerSubtask;
+
+ private VerifyEnvelopes(CountDownLatch latch, int numExpectedEnvelopesPerSubtask) {
this.latch = latch;
+ this.numExpectedEnvelopesPerSubtask = numExpectedEnvelopesPerSubtask;
}
@Override
- public Object answer(InvocationOnMock invocation) throws Throwable {
+ public Void answer(InvocationOnMock invocation) throws Throwable {
Envelope env = (Envelope) invocation.getArguments()[0];
ChannelID channelId = env.getSource();
int seqNum = env.getSequenceNumber();
if (seqNum == 0) {
- Assert.assertNull(
- String.format("Received envelope from %s before, but current seq num is 0", channelId),
- this.received.putIfAbsent(channelId, seqNum));
+ Integer previousSeqNum = this.received.putIfAbsent(channelId, seqNum);
+
+ String msg = String.format("Received envelope from %s before, but current seq num is 0", channelId);
+ Assert.assertNull(msg, previousSeqNum);
}
else {
- Assert.assertTrue(
- String.format("Received seq num %d from %s, but previous was not %d", seqNum, channelId, seqNum - 1),
- this.received.replace(channelId, seqNum - 1, seqNum));
+ boolean isExpectedPreviousSeqNum = this.received.replace(channelId, seqNum - 1, seqNum);
+
+ String msg = String.format("Received seq num %d from %s, but previous was not %d.",
+ seqNum, channelId, seqNum - 1);
+ Assert.assertTrue(msg, isExpectedPreviousSeqNum);
}
// count down the latch if all envelopes received for this source
- if (seqNum == numToSendPerSubtask - 1) {
+ if (seqNum == numExpectedEnvelopesPerSubtask - 1) {
this.latch.countDown();
}
@@ -161,54 +183,13 @@ public Object answer(InvocationOnMock invocation) throws Throwable {
}
}
- private class SubtaskSenderThread implements Runnable {
-
- private final NettyConnectionManager connectionManager;
-
- private final RemoteReceiver receiver;
-
- private final JobID jobId = new JobID();
-
- private final ChannelID channelId = new ChannelID();
-
- private int seqNum = 0;
-
- private SubtaskSenderThread(NettyConnectionManager connectionManager, RemoteReceiver receiver) {
- this.connectionManager = connectionManager;
- this.receiver = receiver;
- }
-
- @Override
- public void run() {
- // enqueue envelopes with ascending seq nums
- while (this.seqNum < numToSendPerSubtask) {
- try {
- Envelope env = new Envelope(this.seqNum++, this.jobId, this.channelId);
- this.connectionManager.enqueue(env, receiver);
- } catch (IOException e) {
- throw new RuntimeException("Unexpected exception while enqueing envelope");
- }
- }
- }
- }
-
private void runAllTests() throws Exception {
- System.out.println(String.format("Running tests with config: %d sub tasks, %d envelopes to send per subtasks, "
- + "%d num channels, %d num in threads, %d num out threads.",
- this.numSubtasks, this.numToSendPerSubtask, this.numChannels, this.numInThreads, this.numOutThreads));
-
testEnqueueRaceAndDeadlockFreeMultipleChannels();
System.out.println("Done.");
}
public static void main(String[] args) throws Exception {
- Collection configs = configure();
-
- for (Integer[] params : configs) {
-
- NettyConnectionManagerTest test = new NettyConnectionManagerTest(params[0], params[1], params[2], params[3], params[4]);
- test.runAllTests();
- }
+ new NettyConnectionManagerTest().runAllTests();
}
}