Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import java.lang.reflect.Method;

import eu.stratosphere.nephele.ExecutionMode;
import eu.stratosphere.nephele.instance.HardwareDescriptionFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
Expand All @@ -27,7 +28,6 @@
import eu.stratosphere.nephele.client.JobClient;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.nephele.jobmanager.JobManager;
import eu.stratosphere.nephele.jobmanager.JobManager.ExecutionMode;


public class NepheleMiniCluster {
Expand Down
2 changes: 1 addition & 1 deletion stratosphere-runtime/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.0.19.Final</version>
<version>4.0.20.Final</version>
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/***********************************************************************************************************************
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
**********************************************************************************************************************/

package eu.stratosphere.nephele;

public enum ExecutionMode {
LOCAL, CLUSTER
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.ArrayList;
import java.util.Iterator;

import eu.stratosphere.nephele.ExecutionMode;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

Expand Down Expand Up @@ -150,6 +151,9 @@ public LocalInstanceManager() throws Exception {

numTaskManagers = GlobalConfiguration.getInteger(ConfigConstants
.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 1);

ExecutionMode executionMode = (numTaskManagers > 1) ? ExecutionMode.CLUSTER : ExecutionMode.LOCAL;

for(int i=0; i< numTaskManagers; i++){

Configuration tm = new Configuration();
Expand All @@ -163,7 +167,7 @@ public LocalInstanceManager() throws Exception {

GlobalConfiguration.includeConfiguration(tm);

TaskManager t = new TaskManager();
TaskManager t = new TaskManager(executionMode);
taskManagers.add(t);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import eu.stratosphere.nephele.ExecutionMode;
import eu.stratosphere.nephele.managementgraph.ManagementVertexID;
import eu.stratosphere.nephele.taskmanager.TaskKillResult;
import org.apache.commons.cli.CommandLine;
Expand Down Expand Up @@ -123,9 +124,6 @@
public class JobManager implements DeploymentManager, ExtendedManagementProtocol, InputSplitProviderProtocol,
JobManagerProtocol, ChannelLookupProtocol, JobStatusListener, AccumulatorProtocol
{
public static enum ExecutionMode { LOCAL, CLUSTER }

// --------------------------------------------------------------------------------------------

private static final Log LOG = LogFactory.getLog(JobManager.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;

import eu.stratosphere.nephele.ExecutionMode;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import eu.stratosphere.nephele.instance.InstanceManager;
import eu.stratosphere.nephele.jobmanager.JobManager.ExecutionMode;
import eu.stratosphere.nephele.jobmanager.scheduler.AbstractScheduler;
import eu.stratosphere.util.StringUtils;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import eu.stratosphere.nephele.ExecutionMode;
import eu.stratosphere.runtime.io.network.LocalConnectionManager;
import eu.stratosphere.runtime.io.network.NetworkConnectionManager;
import eu.stratosphere.runtime.io.network.netty.NettyConnectionManager;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.GnuParser;
Expand Down Expand Up @@ -155,10 +159,11 @@ public class TaskManager implements TaskOperationProtocol {
* receive an initial configuration. All parameters are obtained from the
* {@link GlobalConfiguration}, which must be loaded prior to instantiating the task manager.
*/
public TaskManager() throws Exception {
public TaskManager(ExecutionMode executionMode) throws Exception {

LOG.info("TaskManager started as user " + UserGroupInformation.getCurrentUser().getShortUserName());
LOG.info("User system property: " + System.getProperty("user.name"));
LOG.info("Execution mode: " + executionMode);

// IMPORTANT! At this point, the GlobalConfiguration must have been read!

Expand Down Expand Up @@ -286,27 +291,40 @@ public TaskManager() throws Exception {
ConfigConstants.TASK_MANAGER_NETWORK_BUFFER_SIZE_KEY,
ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_BUFFER_SIZE);

int numInThreads = GlobalConfiguration.getInteger(
ConfigConstants.TASK_MANAGER_NET_NUM_IN_THREADS_KEY,
ConfigConstants.DEFAULT_TASK_MANAGER_NET_NUM_IN_THREADS);

int numOutThreads = GlobalConfiguration.getInteger(
ConfigConstants.TASK_MANAGER_NET_NUM_OUT_THREADS_KEY,
ConfigConstants.DEFAULT_TASK_MANAGER_NET_NUM_OUT_THREADS);

int lowWaterMark = GlobalConfiguration.getInteger(
ConfigConstants.TASK_MANAGER_NET_NETTY_LOW_WATER_MARK,
ConfigConstants.DEFAULT_TASK_MANAGER_NET_NETTY_LOW_WATER_MARK);

int highWaterMark = GlobalConfiguration.getInteger(
ConfigConstants.TASK_MANAGER_NET_NETTY_HIGH_WATER_MARK,
ConfigConstants.DEFAULT_TASK_MANAGER_NET_NETTY_HIGH_WATER_MARK);

// Initialize the channel manager
try {
this.channelManager = new ChannelManager(
this.lookupService, this.localInstanceConnectionInfo,
numBuffers, bufferSize, numInThreads, numOutThreads, lowWaterMark, highWaterMark);
NetworkConnectionManager networkConnectionManager = null;

switch (executionMode) {
case LOCAL:
networkConnectionManager = new LocalConnectionManager();
break;
case CLUSTER:
int numInThreads = GlobalConfiguration.getInteger(
ConfigConstants.TASK_MANAGER_NET_NUM_IN_THREADS_KEY,
ConfigConstants.DEFAULT_TASK_MANAGER_NET_NUM_IN_THREADS);

int numOutThreads = GlobalConfiguration.getInteger(
ConfigConstants.TASK_MANAGER_NET_NUM_OUT_THREADS_KEY,
ConfigConstants.DEFAULT_TASK_MANAGER_NET_NUM_OUT_THREADS);

int lowWaterMark = GlobalConfiguration.getInteger(
ConfigConstants.TASK_MANAGER_NET_NETTY_LOW_WATER_MARK,
ConfigConstants.DEFAULT_TASK_MANAGER_NET_NETTY_LOW_WATER_MARK);

int highWaterMark = GlobalConfiguration.getInteger(
ConfigConstants.TASK_MANAGER_NET_NETTY_HIGH_WATER_MARK,
ConfigConstants.DEFAULT_TASK_MANAGER_NET_NETTY_HIGH_WATER_MARK);

networkConnectionManager = new NettyConnectionManager(
localInstanceConnectionInfo.address(), localInstanceConnectionInfo.dataPort(),
bufferSize, numInThreads, numOutThreads, lowWaterMark, highWaterMark);
break;
}

channelManager = new ChannelManager(lookupService, localInstanceConnectionInfo, numBuffers, bufferSize, networkConnectionManager);
} catch (IOException ioe) {
LOG.error(StringUtils.stringifyException(ioe));
throw new Exception("Failed to instantiate channel manager. " + ioe.getMessage(), ioe);
Expand Down Expand Up @@ -436,7 +454,7 @@ public static void main(String[] args) throws IOException {

// Create a new task manager object
try {
new TaskManager();
new TaskManager(ExecutionMode.CLUSTER);
} catch (Exception e) {
LOG.fatal("Taskmanager startup failed: " + e.getMessage(), e);
System.exit(FAILURE_RETURN_CODE);
Expand Down Expand Up @@ -910,8 +928,12 @@ public void shutdown() {
this.profiler.shutdown();
}

// Shut down the network channel manager
this.channelManager.shutdown();
// Shut down the channel manager
try {
this.channelManager.shutdown();
} catch (IOException e) {
LOG.warn("ChannelManager did not shutdown properly: " + e.getMessage(), e);
}

// Shut down the memory manager
if (this.ioManager != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import eu.stratosphere.runtime.io.gates.GateID;
import eu.stratosphere.runtime.io.gates.InputGate;
import eu.stratosphere.runtime.io.gates.OutputGate;
import eu.stratosphere.runtime.io.network.netty.NettyConnectionManager;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

Expand Down Expand Up @@ -66,7 +65,7 @@ public class ChannelManager implements EnvelopeDispatcher, BufferProviderBroker

private final GlobalBufferPool globalBufferPool;

private final NettyConnectionManager nettyConnectionManager;
private final NetworkConnectionManager networkConnectionManager;

private final InetSocketAddress ourAddress;

Expand All @@ -75,18 +74,15 @@ public class ChannelManager implements EnvelopeDispatcher, BufferProviderBroker
// -----------------------------------------------------------------------------------------------------------------

public ChannelManager(ChannelLookupProtocol channelLookupService, InstanceConnectionInfo connectionInfo,
int numNetworkBuffers, int networkBufferSize,
int numInThreads, int numOutThreads,
int lowWatermark, int highWaterMark) throws IOException {
int numNetworkBuffers, int networkBufferSize, NetworkConnectionManager networkConnectionManager) throws IOException {

this.channelLookupService = channelLookupService;
this.connectionInfo = connectionInfo;

this.globalBufferPool = new GlobalBufferPool(numNetworkBuffers, networkBufferSize);

this.nettyConnectionManager = new NettyConnectionManager(
this, connectionInfo.address(), connectionInfo.dataPort(),
networkBufferSize, numInThreads, numOutThreads, lowWatermark, highWaterMark);
this.networkConnectionManager = networkConnectionManager;
networkConnectionManager.start(this);

// management data structures
this.channels = new ConcurrentHashMap<ChannelID, Channel>();
Expand All @@ -99,8 +95,9 @@ public ChannelManager(ChannelLookupProtocol channelLookupService, InstanceConnec
this.discardBufferPool = new DiscardBufferPool();
}

public void shutdown() {
this.nettyConnectionManager.shutdown();
public void shutdown() throws IOException {
this.networkConnectionManager.shutdown();

this.globalBufferPool.destroy();
}

Expand Down Expand Up @@ -324,7 +321,7 @@ private void generateSenderHint(Envelope envelope, RemoteReceiver receiver) thro
final RemoteReceiver ourAddress = new RemoteReceiver(this.ourAddress, connectionIndex);
final Envelope senderHint = SenderHintEvent.createEnvelopeWithEvent(envelope, targetChannelID, ourAddress);

this.nettyConnectionManager.enqueue(senderHint, receiver);
this.networkConnectionManager.enqueue(senderHint, receiver);
}

/**
Expand Down Expand Up @@ -459,7 +456,7 @@ else if (receiverList.hasRemoteReceiver()) {
generateSenderHint(envelope, remoteReceiver);
}

this.nettyConnectionManager.enqueue(envelope, remoteReceiver);
this.networkConnectionManager.enqueue(envelope, remoteReceiver);
success = true;
}
} finally {
Expand Down Expand Up @@ -507,7 +504,7 @@ else if (receiverList.hasRemoteReceiver()) {
generateSenderHint(envelope, remoteReceiver);
}

this.nettyConnectionManager.enqueue(envelope, remoteReceiver);
this.networkConnectionManager.enqueue(envelope, remoteReceiver);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/***********************************************************************************************************************
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
**********************************************************************************************************************/

package eu.stratosphere.runtime.io.network;

import java.io.IOException;

public class LocalConnectionManager implements NetworkConnectionManager {

@Override
public void start(ChannelManager channelManager) throws IOException {
}

@Override
public void enqueue(Envelope envelope, RemoteReceiver receiver) throws IOException {
}

@Override
public void shutdown() throws IOException {
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/***********************************************************************************************************************
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
**********************************************************************************************************************/

package eu.stratosphere.runtime.io.network;

import java.io.IOException;

public interface NetworkConnectionManager {

public void start(ChannelManager channelManager) throws IOException;

public void enqueue(Envelope envelope, RemoteReceiver receiver) throws IOException;

public void shutdown() throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,19 @@
import eu.stratosphere.runtime.io.network.EnvelopeDispatcher;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class InboundEnvelopeDispatcherHandler extends ChannelInboundHandlerAdapter {
public class InboundEnvelopeDispatcher extends ChannelInboundHandlerAdapter {

private static final Log LOG = LogFactory.getLog(InboundEnvelopeDispatcherHandler.class);
private final EnvelopeDispatcher envelopeDispatcher;

private final EnvelopeDispatcher channelManager;

public InboundEnvelopeDispatcherHandler(EnvelopeDispatcher channelManager) {
this.channelManager = channelManager;
public InboundEnvelopeDispatcher(EnvelopeDispatcher envelopeDispatcher) {
this.envelopeDispatcher = envelopeDispatcher;
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Envelope envelope = (Envelope) msg;
// LOG.debug(String.format("Decoded envelope with seq num %d from source channel %s",
// envelope.getSequenceNumber(),
// envelope.getSource()));
this.channelManager.dispatchFromNetwork(envelope);

envelopeDispatcher.dispatchFromNetwork(envelope);
}
}
Loading