Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Readding the refactor now that the tests are fixed. This refactor inc…

…ludes:

1) Moving the relationship between the MpCluster manager and the RoutingStrategy down into the RoutingStrategy rather than having the Router be an intermediary.
2) More robust handling of issues with communicating with the MpCluster manager.
3) Test log tuning
  • Loading branch information...
commit cfb699a12ca882d75ed742b1016947f2b0793026 1 parent b817f41
Jim Carroll authored
Showing with 794 additions and 607 deletions.
  1. +11 −0 lib-dempsyapi/src/main/java/com/nokia/dempsy/config/ApplicationDefinition.java
  2. +1 −0  lib-dempsyapi/src/main/java/com/nokia/dempsy/config/ClusterDefinition.java
  3. +1 −1  lib-dempsycore/src/main/java/com/nokia/dempsy/mpcluster/MpCluster.java
  4. +85 −18 lib-dempsycore/src/main/java/com/nokia/dempsy/router/RoutingStrategy.java
  5. +49 −70 lib-dempsyimpl/src/main/java/com/nokia/dempsy/Dempsy.java
  6. +2 −0  lib-dempsyimpl/src/main/java/com/nokia/dempsy/messagetransport/tcp/TcpDestination.java
  7. +40 −1 lib-dempsyimpl/src/main/java/com/nokia/dempsy/mpcluster/invm/LocalVmMpClusterSessionFactory.java
  8. +126 −102 lib-dempsyimpl/src/main/java/com/nokia/dempsy/mpcluster/zookeeper/ZookeeperSession.java
  9. +258 −60 lib-dempsyimpl/src/main/java/com/nokia/dempsy/router/DefaultRoutingStrategy.java
  10. +100 −120 lib-dempsyimpl/src/main/java/com/nokia/dempsy/router/Router.java
  11. +23 −28 lib-dempsyimpl/src/test/java/com/nokia/dempsy/TestDempsy.java
  12. +19 −0 lib-dempsyimpl/src/test/java/com/nokia/dempsy/TestUtils.java
  13. +2 −13 lib-dempsyimpl/src/test/java/com/nokia/dempsy/mpcluster/TestAllMpClusterImpls.java
  14. +12 −23 lib-dempsyimpl/src/test/java/com/nokia/dempsy/mpcluster/zookeeper/TestFullApp.java
  15. +7 −14 lib-dempsyimpl/src/test/java/com/nokia/dempsy/mpcluster/zookeeper/TestZookeeperClusterResilience.java
  16. +0 −99 lib-dempsyimpl/src/test/java/com/nokia/dempsy/router/MpClusterTestImpl.java
  17. +30 −50 lib-dempsyimpl/src/test/java/com/nokia/dempsy/router/TestRouterClusterManagement.java
  18. +5 −5 lib-dempsyimpl/src/test/resources/log4j.properties
  19. +1 −0  lib-dempsyimpl/src/test/resources/testDempsy/Dempsy-IndividualClusterStart.xml
  20. +1 −0  lib-dempsyimpl/src/test/resources/testDempsy/Dempsy.xml
  21. +3 −0  lib-dempsyimpl/src/test/resources/testDempsy/MultistageApplicationExplicitDestinationsActx.xml
  22. +4 −0 lib-dempsyimpl/src/test/resources/testDempsy/SimpleMultistageApplicationActx.xml
  23. +5 −0 lib-dempsyimpl/src/test/resources/testDempsy/SinglestageApplicationActx.xml
  24. +6 −3 lib-dempsyimpl/src/test/resources/testDempsy/SinglestageOutputApplicationActx.xml
  25. +3 −0  lib-dempsyimpl/src/test/resources/testDempsy/SinglestageWithKeyStoreApplicationActx.xml
View
11 lib-dempsyapi/src/main/java/com/nokia/dempsy/config/ApplicationDefinition.java
@@ -121,6 +121,17 @@ public ApplicationDefinition setClusterDefinitions(List<ClusterDefinition> clust
public List<ClusterDefinition> getClusterDefinitions() { return Collections.unmodifiableList(clusterDefinitions); }
/**
+ * Get the {@link ClusterDefinition} that corresponds to the given clusterId.
+ */
+ public ClusterDefinition getClusterDefinition(ClusterId clusterId)
+ {
+ for (ClusterDefinition cur : clusterDefinitions)
+ if (cur.getClusterId().equals(clusterId))
+ return cur;
+ return null;
+ }
+
+ /**
* When configuring by hand, this method
* @param clusterDefinitions is the {@link ClusterDefinition}s that make
* up this Application.
View
1  lib-dempsyapi/src/main/java/com/nokia/dempsy/config/ClusterDefinition.java
@@ -216,6 +216,7 @@ public void validate() throws DempsyException
if (messageProcessorPrototype != null && adaptor != null)
throw new DempsyException("A dempsy cluster must contain either an 'adaptor' or a message processor prototype but not both. " +
clusterId + " appears to be configured with both.");
+
if (messageProcessorPrototype != null)
{
View
2  lib-dempsycore/src/main/java/com/nokia/dempsy/mpcluster/MpCluster.java
@@ -74,7 +74,7 @@
* Every MpCluster instance participating in a cluster will have the
* same cluster Id, which identifies the total set of Mps of the same prototype.
*/
- public ClusterId getClusterId() throws MpClusterException;
+ public ClusterId getClusterId();
/**
* Sets the cluster level data.
View
103 lib-dempsycore/src/main/java/com/nokia/dempsy/router/RoutingStrategy.java
@@ -16,16 +16,17 @@
package com.nokia.dempsy.router;
-import java.util.List;
+import java.util.Collection;
import com.nokia.dempsy.DempsyException;
import com.nokia.dempsy.annotations.MessageKey;
import com.nokia.dempsy.annotations.MessageProcessor;
import com.nokia.dempsy.config.ApplicationDefinition;
import com.nokia.dempsy.config.ClusterDefinition;
+import com.nokia.dempsy.config.ClusterId;
import com.nokia.dempsy.messagetransport.Destination;
import com.nokia.dempsy.mpcluster.MpCluster;
-import com.nokia.dempsy.mpcluster.MpClusterException;
+import com.nokia.dempsy.router.RoutingStrategy.Outbound.Coordinator;
/**
* <p>A {@link RoutingStrategy} is responsible for determining how to find the appropriate
@@ -72,15 +73,56 @@
*/
public static interface Outbound
{
- public SlotInformation selectSlotForMessageKey(Object messageKey) throws DempsyException;
+ /**
+ * This method needs to be implemented to determine the specific node that the outgoing
+ * message is to be routed to.
+ *
+ * @param messageKey is the message key for the message to be routed
+ * @param message is the message to be routed.
+ * @return a transport Destination indicating the unique node in the downstream cluster
+ * that the message should go to.
+ * @throws DempsyException when something distasteful happens.
+ */
+ public Destination selectDestinationForMessage(Object messageKey, Object message) throws DempsyException;
+
+ /**
+ * The {@link Outbound} is responsible for providing the {@link ClusterId} for which it is the
+ * {@link Outbound} for.
+ */
+ public ClusterId getClusterId();
+
+ /**
+ * <p>Each node can have many Outbound instances and those Outbound cluster references
+ * can come and go. In order to tell Dempsy about what's going on in the cluster
+ * the Outbound should be updating the state of the OutboundCoordinator.</p>
+ *
+ * <p>Implementors of the RoutingStrategy do not need to implement this interface.
+ * There is only one implementation and that instance will be supplied by the
+ * framework.</p>
+ */
+ public static interface Coordinator
+ {
+ /**
+ * registers the outbound with the Coordinator and provide what types the destination
+ * cluster can handle. Note that the Outbound is allowed to call registerOutbound
+ * more than once, without calling unregisterOutbound first but the results should
+ * be the same.
+ */
+ public void registerOutbound(Outbound outbound, Collection<Class<?>> classes);
+
+ /**
+ * registers the outbound with the Coordinator and provide what types the destination
+ * cluster can handle.
+ */
+ public void unregisterOutbound(Outbound outbound);
+ }
+
/**
- * resetCluster is called when the cluster for the Outbound side changes. In this
- * way implementations of this class do not need to be MpClusterWatchers
- * @param cluster - the cluster handle containing the new state.
- * @throws MpClusterException when the implementation has a problem accessing the cluster
+ * Shut down and reclaim any resources associated with the {@link Outbound} instance.
*/
- public void resetCluster(MpCluster<ClusterInformation, SlotInformation> cluster) throws MpClusterException;
+ public void stop();
+
}
/**
@@ -93,21 +135,46 @@
public static interface Inbound
{
/**
- * <p>resetCluster is called when the cluster for the Inbound side changes. In this
- * way implementations of this class do not need to be MpClusterWatchers.</p>
- *
- * @param cluster - the cluster handle containing the new state.
- * @throws MpClusterException when the implementation has a problem accessing the cluster
+ * Since the {@link Inbound} has the responsibility to determine which instances of a
+ * {@link MessageProcessor} are valid in 'this' node, it should be able to privide that
+ * information through the implementataion of this method. This is used as part of the
+ * Pre-instantiation phase of the Message Processor's lifecylce.
*/
- public void resetCluster(MpCluster<ClusterInformation, SlotInformation> cluster,
- List<Class<?>> messageTypes, Destination thisDestination) throws MpClusterException;
-
public boolean doesMessageKeyBelongToNode(Object messageKey);
+
+ /**
+ * Shut down and reclaim any resources associated with the {@link Inbound} instance.
+ */
+ public void stop();
}
- public Inbound createInbound();
+ /**
+ * This method will be called from the Dempsy framework in order to instantiate the one Inbound for
+ * 'this' node. Keep in mind that when running in LocalVm mode there can be more than one inbound per
+ * process.
+ *
+ * @param cluster is the cluster information manager handle for 'this' node.
+ * @param messageTypes is the types of messages that Dempsy determined could be handled by the {@link MessageProcessor}
+ * in this node. This information should be shared across to the {@link Outbound} and registered with
+ * the {@link Coordinator} to allow Dempsy to restrict messages to the appropriate types.
+ * @param thisDestination is the transport Destination instance that represents how to communicate
+ * with 'this' node.
+ * @return the {@link Inbound} instance.
+ */
+ public Inbound createInbound(MpCluster<ClusterInformation, SlotInformation> cluster, Collection<Class<?>> messageTypes, Destination thisDestination);
- public Outbound createOutbound();
+ /**
+ * The RoutingStrategy needs to create an {@link Outbound} that corresponds to the given cluster. It should do this
+ * in a manner that absolutely succeeds even if the cluster information manager is in a bad state. This is why
+ * this method takes stable parameters and throws no exception.
+ *
+ * @param coordinator is the coordinator that the newly created {@link Outbound} can use to call back on the
+ * framework.
+ * @param clusterId is the cluster id that the {@link Outbound} is being created for.
+ * @return a new {@link Outbound} that manages the selection of a {@link Destination} given a message destined for
+ * the given cluster.
+ */
+ public Outbound createOutbound(Outbound.Coordinator coordinator, MpCluster<ClusterInformation, SlotInformation> cluster);
}
View
119 lib-dempsyimpl/src/main/java/com/nokia/dempsy/Dempsy.java
@@ -34,6 +34,7 @@
import com.nokia.dempsy.container.ContainerException;
import com.nokia.dempsy.container.MpContainer;
import com.nokia.dempsy.internal.util.SafeString;
+import com.nokia.dempsy.messagetransport.Destination;
import com.nokia.dempsy.messagetransport.Receiver;
import com.nokia.dempsy.messagetransport.Transport;
import com.nokia.dempsy.monitoring.StatsCollector;
@@ -42,7 +43,6 @@
import com.nokia.dempsy.mpcluster.MpClusterException;
import com.nokia.dempsy.mpcluster.MpClusterSession;
import com.nokia.dempsy.mpcluster.MpClusterSessionFactory;
-import com.nokia.dempsy.mpcluster.MpClusterWatcher;
import com.nokia.dempsy.output.OutputExecuter;
import com.nokia.dempsy.router.ClusterInformation;
import com.nokia.dempsy.router.CurrentClusterCheck;
@@ -78,7 +78,7 @@
* Currently a Node is instantiated within the Dempsy orchestrator as a one to one with the
* {@link Cluster}.
*/
- public class Node implements MpClusterWatcher
+ public class Node
{
protected ClusterDefinition clusterDefinition;
@@ -119,37 +119,34 @@ private void start() throws DempsyException
container.setSerializer(serializer);
// there is only a reciever if we have an Mp (that is, we aren't an adaptor) and start accepting messages
+ Destination thisDestination = null;
if (messageProcessorPrototype != null && acceptedMessageClasses != null && acceptedMessageClasses.size() > 0)
{
receiver = transport.createInbound();
receiver.setListener(container);
+ thisDestination = receiver.getDestination();
}
StatsCollectorFactory statsFactory = (StatsCollectorFactory)clusterDefinition.getStatsCollectorFactory();
if (statsFactory != null)
{
statsCollector = statsFactory.createStatsCollector(currentClusterId,
- receiver != null ? receiver.getDestination() : null);
+ receiver != null ? thisDestination : null);
router.setStatsCollector(statsCollector);
container.setStatCollector(statsCollector);
}
RoutingStrategy strategy = (RoutingStrategy)clusterDefinition.getRoutingStrategy();
+ currentClusterHandle = clusterSession.getCluster(currentClusterId);
+
// there is only an inbound strategy if we have an Mp (that is, we aren't an adaptor) and
// we actually accept messages
if (messageProcessorPrototype != null && acceptedMessageClasses != null && acceptedMessageClasses.size() > 0)
- strategyInbound = strategy.createInbound();
-
- currentClusterHandle = clusterSession.getCluster(currentClusterId);
+ strategyInbound = strategy.createInbound(currentClusterHandle,acceptedMessageClasses, thisDestination);
// this can fail because of down cluster manager server ... but it should eventually recover.
- try
- {
- if (strategyInbound != null && receiver != null)
- strategyInbound.resetCluster(currentClusterHandle, acceptedMessageClasses, receiver.getDestination());
- router.initialize();
- }
+ try { router.initialize(); }
catch (MpClusterException e)
{
logger.warn("Strategy failed to initialize. Continuing anyway. The cluster manager issue will be resolved automatically.",e);
@@ -206,22 +203,6 @@ public void run()
}, "Pre-Instantation Thread");
t.start();
}
-
- // now we want to set the Node as the watcher.
- if (strategyInbound != null && receiver != null)
- {
- currentClusterHandle.addWatcher(this);
-
- // and reset just in case something happened
- try
- {
- strategyInbound.resetCluster(currentClusterHandle, acceptedMessageClasses, receiver.getDestination());
- }
- catch (MpClusterException e)
- {
- logger.warn("Strategy failed to initialize. Continuing anyway. The cluster manager issue will be resolved automatically.",e);
- }
- }
}
catch(RuntimeException e) { throw e; }
catch(Exception e) { throw new DempsyException(e); }
@@ -231,20 +212,6 @@ public void run()
public MpContainer getMpContainer() { return container; }
- @Override
- public void process()
- {
- try
- {
- if (strategyInbound != null)
- strategyInbound.resetCluster(currentClusterHandle, acceptedMessageClasses, receiver.getDestination());
- }
- // TODO: fix these catches... .need to take note of a failure for a later retry
- // using a scheduled task.
- catch(RuntimeException e) { throw e; }
- catch(Exception e) { throw new RuntimeException(e); }
- }
-
public void stop()
{
if (receiver != null)
@@ -268,6 +235,8 @@ public void stop()
if (statsCollector != null)
try { statsCollector.stop(); statsCollector = null;} catch (Throwable th) { logger.error("Problem shutting down node for " + SafeString.valueOf(clusterDefinition), th); }
+ if (strategyInbound != null)
+ try { strategyInbound.stop(); strategyInbound = null;} catch (Throwable th) { logger.error("Problem shutting down node for " + SafeString.valueOf(clusterDefinition), th); }
}
/**
@@ -514,39 +483,49 @@ public synchronized void start() throws DempsyException
if (defaultStatsCollectorFactory == null)
throw new DempsyException("Cannot start this application because there's no default stats collector factory defined.");
-
- applications = new ArrayList<Application>(applicationDefinitions.size());
- for(ApplicationDefinition appDef: this.applicationDefinitions)
+
+ try
{
- appDef.initialize();
- if (clusterCheck.isThisNodePartOfApplication(appDef.getApplicationName()))
+ applications = new ArrayList<Application>(applicationDefinitions.size());
+ for(ApplicationDefinition appDef: this.applicationDefinitions)
{
- Application app = new Application(appDef);
-
- // set the default routing strategy if there isn't one already set.
- if (appDef.getRoutingStrategy() == null)
- appDef.setRoutingStrategy(defaultRoutingStrategy);
-
- if (appDef.getSerializer() == null)
- appDef.setSerializer(defaultSerializer);
-
- if (appDef.getStatsCollectorFactory() == null)
- appDef.setStatsCollectorFactory(defaultStatsCollectorFactory);
-
- applications.add(app);
+ appDef.initialize();
+ if (clusterCheck.isThisNodePartOfApplication(appDef.getApplicationName()))
+ {
+ Application app = new Application(appDef);
+
+ // set the default routing strategy if there isn't one already set.
+ if (appDef.getRoutingStrategy() == null)
+ appDef.setRoutingStrategy(defaultRoutingStrategy);
+
+ if (appDef.getSerializer() == null)
+ appDef.setSerializer(defaultSerializer);
+
+ if (appDef.getStatsCollectorFactory() == null)
+ appDef.setStatsCollectorFactory(defaultStatsCollectorFactory);
+
+ applications.add(app);
+ }
+ }
+
+ boolean clusterStarted = false;
+ for (Application app : applications)
+ clusterStarted = app.start();
+
+ if(!clusterStarted)
+ {
+ throw new DempsyException("Cannot start this application because cluster defination was not found.");
}
+ // if we got to here we can assume we're started
+ synchronized(isRunningEvent) { isRunning = true; }
}
-
- boolean clusterStarted = false;
- for (Application app : applications)
- clusterStarted = app.start();
-
- if(!clusterStarted)
+ catch (RuntimeException rte)
{
- throw new DempsyException("Cannot start this application because cluster defination was not found.");
+ logger.error("Failed to start Dempsy. Attempting to stop.");
+ // if something unpexpected happened then we should attempt to stop
+ try { stop(); } catch (Throwable th) {}
+ throw rte;
}
- // if we got to here we can assume we're started
- synchronized(isRunningEvent) { isRunning = true; }
}
public synchronized void stop()
@@ -628,7 +607,7 @@ public boolean waitToBeStopped(long timeInMillis) throws InterruptedException
}
}
- private static List<Class<?>> getAcceptedMessages(ClusterDefinition clusterDef)
+ protected static List<Class<?>> getAcceptedMessages(ClusterDefinition clusterDef)
{
List<Class<?>> messageClasses = new ArrayList<Class<?>>();
Object prototype = clusterDef.getMessageProcessorPrototype();
View
2  lib-dempsyimpl/src/main/java/com/nokia/dempsy/messagetransport/tcp/TcpDestination.java
@@ -47,6 +47,8 @@ public String toString()
@Override
public boolean equals(Object other)
{
+ if (other == null)
+ return false;
TcpDestination otherTcpDestination = (TcpDestination)other;
return inetAddress.equals(otherTcpDestination.inetAddress ) && (port == otherTcpDestination.port);
}
View
41 lib-dempsyimpl/src/main/java/com/nokia/dempsy/mpcluster/invm/LocalVmMpClusterSessionFactory.java
@@ -239,7 +239,46 @@ public void setClusterData(T data)
} // end cluster definition
- private final void callUpdateWatchersForCluster(ClusterId clusterId) { updateClusterWatchers(clusterId); }
+ private volatile boolean inProcess = false;
+ private volatile boolean recursionAttempt = false;
+
+ private final void callUpdateWatchersForCluster(ClusterId clusterId)
+ {
+ // this needs to avoid recursion but if there is a recursion attempt it
+ // needs to execute another process call at the end.
+ LocalVmMpSession.LocalVmMpCluster cluster = (LocalVmMpSession.LocalVmMpCluster)cache.get(clusterId);
+ if (cluster != null)
+ {
+ synchronized(cluster.processLock)
+ {
+ if (inProcess)
+ {
+ recursionAttempt = true;
+ return;
+ }
+
+ do
+ {
+ recursionAttempt = false;
+ inProcess = true;
+
+ for(MpClusterWatcher watcher: cluster.watchers)
+ {
+ try
+ {
+ watcher.process();
+ }
+ catch (RuntimeException e)
+ {
+ logger.error("Failed to handle process for watcher " + SafeString.objectDescription(watcher),e);
+ }
+ }
+ } while (recursionAttempt);
+
+ inProcess = false;
+ }
+ }
+ }
private final void callUpdateWatchersForApplication(String applicationId)
{
View
228 lib-dempsyimpl/src/main/java/com/nokia/dempsy/mpcluster/zookeeper/ZookeeperSession.java
@@ -64,6 +64,7 @@
private Logger logger = LoggerFactory.getLogger(ZookeeperSession.class);
protected volatile AtomicReference<ZooKeeper> zkref;
+ private volatile boolean isRunning = true;
private Map<ClusterId, ZookeeperCluster> cachedClusters = new HashMap<ClusterId, ZookeeperCluster>();
private Map<String, ZookeeperApplication> cachedApps = new HashMap<String, ZookeeperApplication>();
protected long resetDelay = 500;
@@ -137,6 +138,7 @@ public void stop()
AtomicReference<ZooKeeper> curZk;
synchronized(this)
{
+ isRunning = false;
curZk = zkref;
zkref = null; // this blows up any more usage
}
@@ -154,22 +156,27 @@ public void stop()
private List<String> getChildren(ZookeeperPath path, Watcher watcher) throws MpClusterException
{
- ZooKeeper cur = zkref.get();
- try
+ if (isRunning)
{
- return cur.getChildren(path.path, watcher);
- }
- catch (KeeperException e)
- {
- resetZookeeper(cur);
- throw new MpClusterException("Failed to get active slots (" + path +
- ") on provided zookeeper instance.",e);
- }
- catch (InterruptedException e)
- {
- throw new MpClusterException("Failed to get active slots (" + path +
- ") on provided zookeeper instance.",e);
+ ZooKeeper cur = zkref.get();
+ try
+ {
+ return cur.getChildren(path.path, watcher);
+ }
+ catch (KeeperException e)
+ {
+ resetZookeeper(cur);
+ throw new MpClusterException("Failed to get active slots (" + path +
+ ") on provided zookeeper instance.",e);
+ }
+ catch (InterruptedException e)
+ {
+ throw new MpClusterException("Failed to get active slots (" + path +
+ ") on provided zookeeper instance.",e);
+ }
}
+ throw new MpClusterException("getChildren called on stopped MpCluster (" + path +
+ ") on provided zookeeper instance.");
}
private synchronized void setNewZookeeper(ZooKeeper newZk)
@@ -177,7 +184,7 @@ private synchronized void setNewZookeeper(ZooKeeper newZk)
if (logger.isTraceEnabled())
logger.trace("reestablished connection to " + connectString);
- if (zkref != null)
+ if (isRunning)
{
ZooKeeper last = zkref.getAndSet(newZk);
if (last != null)
@@ -221,14 +228,14 @@ public void run()
}
finally
{
- if (newZk == null && zkref != null) // if zk is null then we stopped so no point in continuing.
+ if (newZk == null && isRunning)
// reschedule me.
scheduler.schedule(this, resetDelay, TimeUnit.MILLISECONDS);
}
// this is true if the reset worked and we're not in the process
// of shutting down.
- if (newZk != null && zkref != null)
+ if (newZk != null && isRunning)
{
// we want the setNewZookeeper and the clearing of the
// beingReset flag to be atomic so future failures that result
@@ -287,7 +294,7 @@ private boolean mkdir(ZooKeeper cur, String path, Watcher watcher, CreateMode mo
}
catch (KeeperException e)
{
- logger.error("Failed to create the root node (" + path + ") on provided zookeeper instance.",e);
+ logger.warn("Failed to create the root node (" + path + ") on provided zookeeper instance.",e);
resetZookeeper(cur);
}
catch (InterruptedException e)
@@ -301,9 +308,8 @@ private boolean mkdir(ZooKeeper cur, String path, Watcher watcher, CreateMode mo
private void initializeApplication(ZookeeperApplication application)
{
ZooKeeper cur = null;
- try { cur = zkref.get();}
- // this means zk has been set to null which means we're stopping so just allow it to stop
- catch (NullPointerException npe) { }
+ if (isRunning)
+ cur = zkref.get();
if (cur != null)
{
@@ -321,9 +327,8 @@ private void initializeCluster(ZookeeperCluster cluster, boolean forceWatcherCal
}
ZooKeeper cur = null;
- try { cur = zkref.get();}
- // this means zk has been set to null which means we're stopping so just allow it to stop
- catch (NullPointerException npe) { }
+ if (isRunning)
+ cur = zkref.get();
if (cur != null)
{
@@ -388,7 +393,7 @@ public void process(WatchedEvent event)
clearState(event);
- if (zkref != null)
+ if (isRunning)
resetZookeeper(zkref.get());
}
}
@@ -561,51 +566,63 @@ public void setSlotInformation(TS info) throws MpClusterException
private boolean join() throws MpClusterException
{
- ZooKeeper cur = zkref.get();
- try
- {
- cur.create(slotPath.path, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
- return true;
- }
- catch(KeeperException.NodeExistsException e)
- {
- if(logger.isDebugEnabled())
- logger.debug("Failed to join the cluster " + clusterId +
- ". Couldn't create the node within zookeeper using \"" + slotPath + "\"");
- return false;
- }
- catch(KeeperException e)
+ if (isRunning)
{
- resetZookeeper(cur);
- throw new MpClusterException("Zookeeper failed while trying to join the cluster " + clusterId +
- ". Couldn't create the node within zookeeper using \"" + slotPath + "\"",e);
- }
- catch(InterruptedException e)
- {
- resetZookeeper(cur);
- throw new MpClusterException("Interrupted while trying to join the cluster " + clusterId +
- ". Couldn't create the node within zookeeper using \"" + slotPath + "\"",e);
+ ZooKeeper cur = zkref.get();
+ try
+ {
+ cur.create(slotPath.path, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+ return true;
+ }
+ catch(KeeperException.NodeExistsException e)
+ {
+ if(logger.isDebugEnabled())
+ logger.debug("Failed to join the cluster " + clusterId +
+ ". Couldn't create the node within zookeeper using \"" + slotPath + "\"");
+ return false;
+ }
+ catch(KeeperException e)
+ {
+ resetZookeeper(cur);
+ throw new MpClusterException("Zookeeper failed while trying to join the cluster " + clusterId +
+ ". Couldn't create the node within zookeeper using \"" + slotPath + "\"",e);
+ }
+ catch(InterruptedException e)
+ {
+ resetZookeeper(cur);
+ throw new MpClusterException("Interrupted while trying to join the cluster " + clusterId +
+ ". Couldn't create the node within zookeeper using \"" + slotPath + "\"",e);
+ }
}
+
+ throw new MpClusterException("join called on stopped MpClusterSlot (" + slotPath +
+ ") on provided zookeeper instance.");
}
@Override
public synchronized void leave() throws MpClusterException
{
- try
- {
- zkref.get().delete(slotPath.path,-1);
- }
- catch(KeeperException e)
+ if (isRunning)
{
- throw new MpClusterException("Failed to leave the cluster " + clusterId.getApplicationName() +
- ". Couldn't delete the node within zookeeper using \"" + slotPath + "\"",e);
- }
- catch(InterruptedException e)
- {
- throw new MpClusterException("Interrupted while trying to leave the cluster " + clusterId.getApplicationName() +
- ". Couldn't delete the node within zookeeper using \"" + slotPath + "\"",e);
+ try
+ {
+ zkref.get().delete(slotPath.path,-1);
+ }
+ catch(KeeperException e)
+ {
+ throw new MpClusterException("Failed to leave the cluster " + clusterId.getApplicationName() +
+ ". Couldn't delete the node within zookeeper using \"" + slotPath + "\"",e);
+ }
+ catch(InterruptedException e)
+ {
+ throw new MpClusterException("Interrupted while trying to leave the cluster " + clusterId.getApplicationName() +
+ ". Couldn't delete the node within zookeeper using \"" + slotPath + "\"",e);
+ }
}
+ else
+ throw new MpClusterException("leave called on stopped MpClusterSlot (" + slotPath +
+ ") on provided zookeeper instance.");
}
public String toString() { return slotPath.toString(); }
@@ -708,60 +725,67 @@ public void process(WatchedEvent event)
private Object readInfoFromPath(ZookeeperPath path) throws MpClusterException
{
- ObjectInputStream is = null;
- try
+ if (isRunning)
{
- byte[] ret = zkref.get().getData(path.path, true, null);
-
- if (ret != null && ret.length > 0)
+ ObjectInputStream is = null;
+ try
{
- is = new ObjectInputStream(new ByteArrayInputStream(ret));
- return is.readObject();
+ byte[] ret = zkref.get().getData(path.path, true, null);
+
+ if (ret != null && ret.length > 0)
+ {
+ is = new ObjectInputStream(new ByteArrayInputStream(ret));
+ return is.readObject();
+ }
+ return null;
+ }
+ // this is an indication that the node has disappeared since we retrieved
+ // this MpContainerClusterNode
+ catch (KeeperException.NoNodeException e) { return null; }
+ catch (RuntimeException e) { throw e; }
+ catch (Exception e)
+ {
+ throw new MpClusterException("Failed to get node information for (" + path + ").",e);
+ }
+ finally
+ {
+ IOUtils.closeQuietly(is);
}
- return null;
- }
- // this is an indication that the node has disappeared since we retrieved
- // this MpContainerClusterNode
- catch (KeeperException.NoNodeException e) { return null; }
- catch (RuntimeException e) { throw e; }
- catch (Exception e)
- {
- throw new MpClusterException("Failed to get node information for (" + path + ").",e);
- }
- finally
- {
- IOUtils.closeQuietly(is);
}
+ return null;
}
private void setInfoToPath(ZookeeperPath path, Object info) throws MpClusterException
{
- ObjectOutputStream os = null;
- try
+ if (isRunning)
{
- byte[] buf = null;
- if (info != null)
+ ObjectOutputStream os = null;
+ try
{
- // Serialize to a byte array
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- os = new ObjectOutputStream(bos);
- os.writeObject(info);
- os.close(); // flush
+ byte[] buf = null;
+ if (info != null)
+ {
+ // Serialize to a byte array
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ os = new ObjectOutputStream(bos);
+ os.writeObject(info);
+ os.close(); // flush
+
+ // Get the bytes of the serialized object
+ buf = bos.toByteArray();
+ }
- // Get the bytes of the serialized object
- buf = bos.toByteArray();
+ zkref.get().setData(path.path, buf, -1);
+ }
+ catch (RuntimeException e) { throw e;}
+ catch (Exception e)
+ {
+ throw new MpClusterException("Failed to get node information for (" + path + ").",e);
+ }
+ finally
+ {
+ IOUtils.closeQuietly(os);
}
-
- zkref.get().setData(path.path, buf, -1);
- }
- catch (RuntimeException e) { throw e;}
- catch (Exception e)
- {
- throw new MpClusterException("Failed to get node information for (" + path + ").",e);
- }
- finally
- {
- IOUtils.closeQuietly(os);
}
}
View
318 lib-dempsyimpl/src/main/java/com/nokia/dempsy/router/DefaultRoutingStrategy.java
@@ -17,23 +17,33 @@
package com.nokia.dempsy.router;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.nokia.dempsy.DempsyException;
+import com.nokia.dempsy.config.ClusterId;
import com.nokia.dempsy.internal.util.SafeString;
import com.nokia.dempsy.messagetransport.Destination;
import com.nokia.dempsy.mpcluster.MpCluster;
import com.nokia.dempsy.mpcluster.MpClusterException;
import com.nokia.dempsy.mpcluster.MpClusterSlot;
+import com.nokia.dempsy.mpcluster.MpClusterWatcher;
+import com.nokia.dempsy.router.RoutingStrategy.Outbound.Coordinator;
/**
* This Routing Strategy uses the {@link MpCluster} to negotiate with other instances in the
@@ -41,10 +51,13 @@
*/
public class DefaultRoutingStrategy implements RoutingStrategy
{
+ private static final int resetDelay = 500;
+
private static Logger logger = LoggerFactory.getLogger(DefaultRoutingStrategy.class);
private int defaultTotalSlots;
private int defaultNumNodes;
+ private CountDownLatch injectedLatch = null;
public DefaultRoutingStrategy(int defaultTotalSlots, int defaultNumNodes)
{
@@ -52,89 +65,259 @@ public DefaultRoutingStrategy(int defaultTotalSlots, int defaultNumNodes)
this.defaultNumNodes = defaultNumNodes;
}
- private class Outbound implements RoutingStrategy.Outbound
+ private class Outbound implements RoutingStrategy.Outbound, MpClusterWatcher
{
- private ConcurrentHashMap<Integer, DefaultRouterSlotInfo> destinations = new ConcurrentHashMap<Integer, DefaultRouterSlotInfo>();
- private int totalAddressCounts = -1;
+ private AtomicReference<Destination[]> destinations = new AtomicReference<Destination[]>();
+ private RoutingStrategy.Outbound.Coordinator coordinator;
+ private MpCluster<ClusterInformation, SlotInformation> cluster;
+ private ClusterId clusterId;
+ private Set<Class<?>> messageTypesHandled = null;
+
+ private ScheduledExecutorService scheduler = null;
+
+ private Outbound(RoutingStrategy.Outbound.Coordinator coordinator,
+ MpCluster<ClusterInformation, SlotInformation> cluster)
+ {
+ this.coordinator = coordinator;
+ this.cluster = cluster;
+ this.clusterId = cluster.getClusterId();
+ cluster.addWatcher(this);
+ execSetupDestinations(false);
+ }
@Override
- public synchronized SlotInformation selectSlotForMessageKey(Object messageKey) throws DempsyException
+ public ClusterId getClusterId() { return clusterId; }
+
+ @Override
+ public Destination selectDestinationForMessage(Object messageKey, Object message) throws DempsyException
{
- if (totalAddressCounts < 0)
+ Destination[] destinationArr = destinations.get();
+ if (destinationArr == null)
throw new DempsyException("It appears the Outbound strategy for the message key " +
SafeString.objectDescription(messageKey) +
" is being used prior to initialization.");
- int calculatedModValue = Math.abs(messageKey.hashCode()%totalAddressCounts);
- return destinations.get(calculatedModValue);
+ int calculatedModValue = Math.abs(messageKey.hashCode()%destinationArr.length);
+ return destinationArr[calculatedModValue];
+ }
+
+ @Override
+ public void process()
+ {
+ execSetupDestinations(true);
+ }
+
+ @Override
+ public synchronized void stop()
+ {
+ if (scheduler != null)
+ scheduler.shutdown();
+ scheduler = null;
+ }
+
+ /**
+ * This method is protected for testing purposes. Otherwise it would be private.
+ * @return whether or not the setup was successful.
+ */
+ protected synchronized boolean setupDestinations()
+ {
+ try
+ {
+ if (logger.isTraceEnabled())
+ logger.trace("Resetting Outbound Strategy for cluster " + clusterId);
+
+ Map<Integer,DefaultRouterSlotInfo> slotNumbersToSlots = new HashMap<Integer,DefaultRouterSlotInfo>();
+ int newtotalAddressCounts = fillMapFromActiveSlots(slotNumbersToSlots,cluster);
+ if (newtotalAddressCounts == 0)
+ throw new MpClusterException("The cluster " + cluster.getClusterId() +
+ " seems to have invalid slot information. Someone has set the total number of slots to zero.");
+ if (newtotalAddressCounts > 0)
+ {
+ Destination[] newDestinations = new Destination[newtotalAddressCounts];
+ for (Map.Entry<Integer,DefaultRouterSlotInfo> entry : slotNumbersToSlots.entrySet())
+ {
+ DefaultRouterSlotInfo slotInfo = entry.getValue();
+ newDestinations[entry.getKey()] = slotInfo.getDestination();
+
+ // only register the very first time ... for now
+ if (messageTypesHandled == null)
+ {
+ messageTypesHandled = new HashSet<Class<?>>();
+ messageTypesHandled.addAll(slotInfo.getMessageClasses());
+ coordinator.registerOutbound(this, messageTypesHandled);
+ }
+ }
+
+ // now see if anything is changed.
+ Destination[] oldDestinations = destinations.get();
+ if (oldDestinations == null || !Arrays.equals(oldDestinations, newDestinations))
+ destinations.set(newDestinations);
+ }
+ else
+ destinations.set(null);
+
+ return destinations.get() != null;
+ }
+ catch(MpClusterException e)
+ {
+ destinations.set(null);
+ logger.warn("Failed to set up the Outbound for " + clusterId, e);
+ }
+ catch (RuntimeException rte)
+ {
+ logger.error("Failed to set up the Outbound for " + clusterId, rte);
+ }
+ return false;
}
- public synchronized void resetCluster(MpCluster<ClusterInformation, SlotInformation> clusterHandle) throws MpClusterException
+ private void execSetupDestinations(boolean fromProcessParam)
{
- if (logger.isTraceEnabled())
- logger.trace("Resetting Outbound Strategy for cluster " + clusterHandle.getClusterId());
+ final boolean fromProcess = fromProcessParam;
- destinations.clear();
- int newtotalAddressCounts = fillMapFromActiveSlots(destinations,clusterHandle);
- if (newtotalAddressCounts == 0)
- throw new MpClusterException("The cluster " + clusterHandle.getClusterId() +
- " seems to have invalid slot information. Someone has set the total number of slots to zero.");
- totalAddressCounts = newtotalAddressCounts > 0 ? newtotalAddressCounts : totalAddressCounts;
+ if (!setupDestinations())
+ {
+ scheduler = Executors.newScheduledThreadPool(1);
+
+ scheduler.schedule(new Runnable(){
+ @Override
+ public void run()
+ {
+ if (!setupDestinations())
+ {
+ synchronized(Outbound.this)
+ {
+ if (scheduler != null)
+ scheduler.schedule(this, resetDelay, TimeUnit.MILLISECONDS);
+ }
+ }
+ else
+ {
+ if (!fromProcess && injectedLatch != null)
+ injectedLatch.countDown();
+
+ synchronized(Outbound.this)
+ {
+ if (scheduler != null)
+ {
+ scheduler.shutdown();
+ scheduler = null;
+ }
+ }
+ }
+ }
+ }, resetDelay, TimeUnit.MILLISECONDS);
+ }
+ else if (!fromProcess && injectedLatch != null)
+ injectedLatch.countDown();
}
+
} // end Outbound class definition
- private class Inbound implements RoutingStrategy.Inbound
+ private class Inbound implements RoutingStrategy.Inbound, MpClusterWatcher
{
+ private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
+
private List<Integer> destinationsAcquired = new ArrayList<Integer>();
+ private MpCluster<ClusterInformation, SlotInformation> cluster;
+ private Collection<Class<?>> messageTypes;
+ private Destination thisDestination;
+ private ClusterId clusterId;
+
+ private Inbound(MpCluster<ClusterInformation, SlotInformation> cluster,
+ Collection<Class<?>> messageTypes,
+ Destination thisDestination)
+ {
+ this.cluster = cluster;
+ this.messageTypes = messageTypes;
+ this.thisDestination = thisDestination;
+ this.clusterId = cluster.getClusterId();
+ this.cluster.addWatcher(this);
+ acquireSlots(false);
+ }
@Override
- public synchronized void resetCluster(MpCluster<ClusterInformation, SlotInformation> clusterHandle,
- List<Class<?>> messagesTypes, Destination destination) throws MpClusterException
+ public void process()
{
- if (logger.isTraceEnabled())
- logger.trace("Resetting Inbound Strategy for cluster " + clusterHandle.getClusterId());
-
- int minNodeCount = defaultNumNodes;
- int totalAddressNeeded = defaultTotalSlots;
- Random random = new Random();
-
- //==============================================================================
- // need to verify that the existing slots in destinationsAcquired are still ours
- Map<Integer,DefaultRouterSlotInfo> slotNumbersToSlots = new HashMap<Integer,DefaultRouterSlotInfo>();
- fillMapFromActiveSlots(slotNumbersToSlots,clusterHandle);
- Collection<Integer> slotsToReaquire = new ArrayList<Integer>();
- for (Integer destinationSlot : destinationsAcquired)
- {
- // select the coresponding slot information
- DefaultRouterSlotInfo slotInfo = slotNumbersToSlots.get(destinationSlot);
- if (slotInfo == null || !destination.equals(slotInfo.getDestination()))
- slotsToReaquire.add(destinationSlot);
- }
- //==============================================================================
+ acquireSlots(true);
+ }
+
+ private synchronized void acquireSlots(boolean fromProcessParam)
+ {
+ boolean retry = true;
+ final boolean fromProcess = fromProcessParam;
- //==============================================================================
- // Now reaquire the potentially lost slots
- for (Integer slotToReaquire : slotsToReaquire)
+ try
{
- if (!acquireSlot(slotToReaquire, totalAddressNeeded,
- clusterHandle, messagesTypes, destination))
+ if (logger.isTraceEnabled())
+ logger.trace("Resetting Inbound Strategy for cluster " + clusterId);
+
+ int minNodeCount = defaultNumNodes;
+ int totalAddressNeeded = defaultTotalSlots;
+ Random random = new Random();
+
+ //==============================================================================
+ // need to verify that the existing slots in destinationsAcquired are still ours
+ Map<Integer,DefaultRouterSlotInfo> slotNumbersToSlots = new HashMap<Integer,DefaultRouterSlotInfo>();
+ fillMapFromActiveSlots(slotNumbersToSlots,cluster);
+ Collection<Integer> slotsToReaquire = new ArrayList<Integer>();
+ for (Integer destinationSlot : destinationsAcquired)
{
- // in this case, see if I already own it...
- logger.error("Cannot reaquire the slot " + slotToReaquire + " for the cluster " + clusterHandle.getClusterId());
+ // select the coresponding slot information
+ DefaultRouterSlotInfo slotInfo = slotNumbersToSlots.get(destinationSlot);
+ if (slotInfo == null || !thisDestination.equals(slotInfo.getDestination()))
+ slotsToReaquire.add(destinationSlot);
}
- }
- //==============================================================================
+ //==============================================================================
- while(needToGrabMoreSlots(clusterHandle,minNodeCount,totalAddressNeeded))
+ //==============================================================================
+ // Now re-acquire the potentially lost slots
+ for (Integer slotToReaquire : slotsToReaquire)
+ {
+ if (!acquireSlot(slotToReaquire, totalAddressNeeded,
+ cluster, messageTypes, thisDestination))
+ {
+ // in this case, see if I already own it...
+ logger.warn("Cannot reaquire the slot " + slotToReaquire + " for the cluster " + clusterId);
+ }
+ }
+ //==============================================================================
+
+ while(needToGrabMoreSlots(cluster,minNodeCount,totalAddressNeeded))
+ {
+ int randomValue = random.nextInt(totalAddressNeeded);
+ if(destinationsAcquired.contains(randomValue))
+ continue;
+ if (acquireSlot(randomValue, totalAddressNeeded,
+ cluster, messageTypes, thisDestination))
+ destinationsAcquired.add(randomValue);
+ }
+
+ retry = false;
+ if (injectedLatch != null && !fromProcess)
+ injectedLatch.countDown();
+ }
+ catch(MpClusterException e)
{
- int randomValue = random.nextInt(totalAddressNeeded);
- if(destinationsAcquired.contains(randomValue))
- continue;
- if (acquireSlot(randomValue, totalAddressNeeded,
- clusterHandle, messagesTypes, destination))
- destinationsAcquired.add(randomValue);
+ destinationsAcquired.clear();
+ }
+ finally
+ {
+ // if we never got the destinations set up then kick off a retry
+ if (retry)
+ scheduler.schedule(new Runnable(){
+ @Override
+ public void run() { acquireSlots(fromProcess); }
+ }, resetDelay, TimeUnit.MILLISECONDS);
+ else if (injectedLatch != null && !fromProcess)
+ injectedLatch.countDown();
}
}
+ @Override
+ public void stop()
+ {
+ scheduler.shutdown();
+ }
+
private boolean needToGrabMoreSlots(MpCluster<ClusterInformation, SlotInformation> clusterHandle,
int minNodeCount, int totalAddressNeeded) throws MpClusterException
{
@@ -151,10 +334,25 @@ public boolean doesMessageKeyBelongToNode(Object messageKey)
} // end Inbound class definition
- public RoutingStrategy.Inbound createInbound() { return new Inbound(); }
-
- public RoutingStrategy.Outbound createOutbound() { return new Outbound(); }
+ @Override
+ public RoutingStrategy.Inbound createInbound(MpCluster<ClusterInformation, SlotInformation> cluster,
+ Collection<Class<?>> messageTypes,
+ Destination thisDestination)
+ {
+ return new Inbound(cluster,messageTypes,thisDestination);
+ }
+
+ @Override
+ public RoutingStrategy.Outbound createOutbound(Coordinator coordinator, MpCluster<ClusterInformation, SlotInformation> cluster)
+ {
+ return new Outbound(coordinator,cluster);
+ }
+ /**
+ * This method is to allow the tests a mechanism to know when the strategy
+ */
+ public void setLatch(CountDownLatch latch) { this.injectedLatch = latch; }
+
static class DefaultRouterSlotInfo extends SlotInformation
{
private static final long serialVersionUID = 1L;
@@ -247,7 +445,7 @@ else if (totalAddressCounts != slotInfo.getTotalAddress())
private static boolean acquireSlot(int slotNum, int totalAddressNeeded,
MpCluster<ClusterInformation, SlotInformation> clusterHandle,
- List<Class<?>> messagesTypes, Destination destination) throws MpClusterException
+ Collection<Class<?>> messagesTypes, Destination destination) throws MpClusterException
{
MpClusterSlot<SlotInformation> slot = clusterHandle.join(String.valueOf(slotNum));
if(slot == null)
View
220 lib-dempsyimpl/src/main/java/com/nokia/dempsy/router/Router.java
@@ -34,9 +34,9 @@
import com.nokia.dempsy.Adaptor;
import com.nokia.dempsy.Dempsy;
+import com.nokia.dempsy.Dempsy.Application.Cluster.Node;
import com.nokia.dempsy.DempsyException;
import com.nokia.dempsy.Dispatcher;
-import com.nokia.dempsy.Dempsy.Application.Cluster.Node;
import com.nokia.dempsy.annotations.MessageKey;
import com.nokia.dempsy.annotations.MessageProcessor;
import com.nokia.dempsy.config.ApplicationDefinition;
@@ -54,8 +54,7 @@
import com.nokia.dempsy.mpcluster.MpCluster;
import com.nokia.dempsy.mpcluster.MpClusterException;
import com.nokia.dempsy.mpcluster.MpClusterSession;
-import com.nokia.dempsy.mpcluster.MpClusterSlot;
-import com.nokia.dempsy.mpcluster.MpClusterWatcher;
+import com.nokia.dempsy.router.RoutingStrategy.Outbound;
import com.nokia.dempsy.serialization.SerializationException;
import com.nokia.dempsy.serialization.Serializer;
@@ -87,7 +86,7 @@
*
* <p>A router requires a non-null ApplicationDefinition during construction.</p>
*/
-public class Router implements Dispatcher
+public class Router implements Dispatcher, RoutingStrategy.Outbound.Coordinator
{
private static Logger logger = LoggerFactory.getLogger(Router.class);
@@ -95,9 +94,11 @@
private ApplicationDefinition applicationDefinition = null;
private ConcurrentHashMap<Class<?>, Set<ClusterRouter>> routerMap = new ConcurrentHashMap<Class<?>, Set<ClusterRouter>>();
-
// protected for test access
protected ConcurrentHashMap<Class<?>, Object> missingMsgTypes = new ConcurrentHashMap<Class<?>, Object>();
+
+ private Set<RoutingStrategy.Outbound> outbounds = new HashSet<RoutingStrategy.Outbound>();
+
private MpClusterSession<ClusterInformation, SlotInformation> mpClusterSession = null;
private SenderFactory defaultSenderFactory;
private ClusterId currentCluster = null;
@@ -161,18 +162,31 @@ public void initialize() throws MpClusterException, DempsyException
(currentClusterDef != null && currentClusterDef.hasExplicitDestinations()) ? new HashSet<ClusterId>() : null;
if (explicitClusterDestinations != null)
explicitClusterDestinations.addAll(Arrays.asList(currentClusterDef.getDestinations()));
-
+ //-------------------------------------------------------------------------------------
+ // TODO: This loop will eventually be replaced when the instantiation of the Outbound
+ // is driven from cluster information management events (Zookeeper callbacks).
+ //-------------------------------------------------------------------------------------
// if the currentCluster is set and THAT cluster has explicit destinations
// then those are the only ones we want to consider
for (ClusterDefinition clusterDef : applicationDefinition.getClusterDefinitions())
{
if (explicitClusterDestinations == null || explicitClusterDestinations.contains(clusterDef.getClusterId()))
{
- ClusterRouter router = new ClusterRouter(clusterDef);
- router.setup(false);
+ RoutingStrategy strategy = (RoutingStrategy)clusterDef.getRoutingStrategy();
+ ClusterId clusterId = clusterDef.getClusterId();
+ if (strategy == null)
+ throw new DempsyException("Could not retrieve the routing strategy for " + SafeString.valueOf(clusterId));
+
+ MpCluster<ClusterInformation, SlotInformation> cluster = mpClusterSession.getCluster(clusterId);
+
+ // This create will result in a callback on the Router as the Outbound.Coordinator with a
+ // registration event. The Outbound may (will) call back on the Router to retrieve the
+ // MpClusterSession and register itself with the appropriate cluster.
+ outbounds.add(strategy.createOutbound(this, cluster));
}
}
+ //-------------------------------------------------------------------------------------
}
/**
@@ -266,102 +280,114 @@ public void stop()
routers.addAll(curRouters);
for (ClusterRouter router : routers)
router.stop();
+ for (RoutingStrategy.Outbound ob : outbounds)
+ ob.stop();
+ }
+
+ @Override
+ public void registerOutbound(RoutingStrategy.Outbound outbound, Collection<Class<?>> classes)
+ {
+ synchronized(outbound)
+ {
+ unregisterOutbound(outbound);
+
+ ClusterId clusterId = outbound.getClusterId();
+ if (classes != null && classes.size() > 0)
+ {
+ // find the appropriate ClusterDefinition
+ ClusterDefinition curClusterDef = applicationDefinition.getClusterDefinition(clusterId);
+
+ if (curClusterDef != null)
+ {
+ // create a corresponding ClusterRouter
+ @SuppressWarnings("unchecked")
+ ClusterRouter clusterRouter = new ClusterRouter((Serializer<Object>)curClusterDef.getSerializer(),outbound);
+
+ for (Class<?> clazz : classes)
+ {
+ Set<ClusterRouter> cur = Collections.newSetFromMap(new ConcurrentHashMap<ClusterRouter, Boolean>()); // potential
+ Set<ClusterRouter> tmp = routerMap.putIfAbsent(clazz, cur);
+ if (tmp != null)
+ cur = tmp;
+ cur.add(clusterRouter);
+ }
+ }
+ else
+ {
+ logger.error("Couldn't find the ClusterDefinition for " + clusterId + " while registering the Outbound " +
+ SafeString.objectDescription(outbound) + " given the ApplicationDefinition " + applicationDefinition);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void unregisterOutbound(RoutingStrategy.Outbound outbound)
+ {
+ // we don't want to register and unregister the same Outbound at the same time
+ // but we can handle registering and unregistering different Outbound's
+ synchronized(outbound)
+ {
+ for (Map.Entry<Class<?>,Set<ClusterRouter>> entry : routerMap.entrySet())
+ {
+ Set<ClusterRouter> crs = entry.getValue();
+ for (Iterator<ClusterRouter> iter = crs.iterator(); iter.hasNext(); )
+ {
+ ClusterRouter cur = iter.next();
+ if (cur.strategyOutbound == outbound)
+ iter.remove();
+ }
+ // we're not going to remove a potentially empty set, or purpose.
+ }
+ }
}
/**
* This class routes messages within a particular cluster. It is protected for test
* access only. Otherwise it would be private.
*/
- protected class ClusterRouter implements MpClusterWatcher
+ protected class ClusterRouter
{
private Serializer<Object> serializer;
- private ClusterId clusterId;
- private MpCluster<ClusterInformation,SlotInformation> clusterHandle;
private SenderFactory senderFactory = defaultSenderFactory;
- private volatile boolean isSetup = false;
- private RoutingStrategy strategy;
private RoutingStrategy.Outbound strategyOutbound;
- @SuppressWarnings("unchecked")
- private ClusterRouter(ClusterDefinition clusterDef) throws MpClusterException, DempsyException
+ private ClusterRouter(Serializer<Object> serializer, Outbound strategyOutbound)
{
- this.clusterId = new ClusterId(clusterDef.getClusterId());
-
- Object clusterRs = clusterDef.getRoutingStrategy();
- if (clusterRs == null)
- throw new DempsyException("Could not retrieve the routing strategy for " + SafeString.valueOf(clusterId));
- strategy = (RoutingStrategy)clusterRs;
-
- strategyOutbound = strategy.createOutbound();
-
- serializer = (Serializer<Object>)clusterDef.getSerializer();
- if (serializer == null)
- throw new DempsyException("Could not retrieve the serializer for " + SafeString.valueOf(clusterId));
+ this.strategyOutbound = strategyOutbound;
+ this.serializer = serializer;
}
- @Override
- public void process()
- {
- // it appears that the cluster configuration has changed.
- // we need to reset up the distributor
- try
- {
- setup(true);
- }
- catch(MpClusterException e)
- {
- logger.error("Major problem with the Router. Can't respond to changes in the cluster information:", e);
- }
- }
-
public void route(Object key, Object message)
{
- SlotInformation target = null;
boolean messageFailed = true;
Sender sender = null;
try
{
- target = strategyOutbound.selectSlotForMessageKey(key);
- if(target == null)
- {
- setup(false);
- target = strategyOutbound.selectSlotForMessageKey(key);
- }
+ Destination destination = strategyOutbound.selectDestinationForMessage(key, message);
- if(target != null)
+ if (destination == null)
{
- Destination destination = target.getDestination();
-
- if (destination == null)
- {
- logger.error("Couldn't find a destination for " + SafeString.objectDescription(message) +
- " from the cluster slot information selected " + SafeString.objectDescription(target));
- return;
- }
-
- sender = senderFactory.getSender(destination);
- if (sender == null)
- logger.error("Couldn't figure out a means to send " + SafeString.objectDescription(message) +
- " to " + SafeString.valueOf(destination) + "");
- else
- {
- byte[] data = serializer.serialize(message);
- sender.send(data);
- messageFailed = false;
- if (statsCollector != null) statsCollector.messageSent(message);
- }
-
+ logger.error("Couldn't find a destination for " + SafeString.objectDescription(message));
+ return;
}
+
+ sender = senderFactory.getSender(destination);
+ if (sender == null)
+ logger.error("Couldn't figure out a means to send " + SafeString.objectDescription(message) +
+ " to " + SafeString.valueOf(destination) + "");
else
{
- logger.warn("No destination found for the message " + SafeString.objectDescription(message) +
- " with the key " + SafeString.objectDescription(key));
+ byte[] data = serializer.serialize(message);
+ sender.send(data);
+ messageFailed = false;
+ if (statsCollector != null) statsCollector.messageSent(message);
}
}
catch(DempsyException e)
{
logger.error("Failed to determine the destination for " + SafeString.objectDescription(message) +
- " using the routing strategy " + SafeString.objectDescription(strategy),e);
+ " using the routing strategy " + SafeString.objectDescription(strategyOutbound),e);
}
catch (SerializationException e)
{
@@ -386,52 +412,6 @@ public void route(Object key, Object message)
}
}
- public void setup(boolean force) throws MpClusterException
- {
- if (!isSetup || force)
- {
- synchronized(this)
- {
- if (!isSetup || force) // double checked locking
- {
- isSetup = false;
- clusterHandle = mpClusterSession.getCluster(clusterId);
- clusterHandle.addWatcher(this);
- strategyOutbound.resetCluster(clusterHandle);
-
- Set<Class<?>> messageClasses = new HashSet<Class<?>>();
-
- Collection<MpClusterSlot<SlotInformation>> nodes = clusterHandle.getActiveSlots();
- if(nodes != null)
- {
- Set<Class<?>> msgClass = null;
- for(MpClusterSlot<SlotInformation> node: nodes)
- {
- SlotInformation slotInfo = node.getSlotInformation();
- if(slotInfo != null)
- {
- msgClass = slotInfo.getMessageClasses();
- if (msgClass != null)
- messageClasses.addAll(msgClass);
- }
- }
-
- // now we may have new messageClasses so we need to register with the Router
- for (Class<?> clazz : messageClasses)
- {
- Set<ClusterRouter> cur = Collections.newSetFromMap(new ConcurrentHashMap<ClusterRouter, Boolean>()); // potential
- Set<ClusterRouter> tmp = routerMap.putIfAbsent(clazz, cur);
- if (tmp != null)
- cur = tmp;
- cur.add(this);
- }
-
- }
- }
- }
- }
- }
-
private void stop()
{
try { if (senderFactory != null) senderFactory.stop(); }
View
51 lib-dempsyimpl/src/test/java/com/nokia/dempsy/TestDempsy.java
@@ -16,6 +16,7 @@
package com.nokia.dempsy;
+import static com.nokia.dempsy.TestUtils.poll;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
@@ -44,6 +45,7 @@
import org.springframework.context.support.ClassPathXmlApplicationContext;
import com.nokia.dempsy.Dempsy.Application.Cluster.Node;
+import com.nokia.dempsy.TestUtils.Condition;
import com.nokia.dempsy.annotations.MessageHandler;
import com.nokia.dempsy.annotations.MessageKey;
import com.nokia.dempsy.annotations.MessageProcessor;
@@ -58,7 +60,7 @@
private static long baseTimeoutMillis = 20000; // 20 seconds
- String[] dempsyConfigs = new String[] { "testDempsy/Dempsy.xml" /*, "testDempsy/Dempsy-MultiThreadedStartup.xml"*/ };
+ String[] dempsyConfigs = new String[] { "testDempsy/Dempsy.xml" };
String[] clusterManagers = new String[]{ "testDempsy/ClusterManager-ZookeeperActx.xml", "testDempsy/ClusterManager-LocalVmActx.xml" };
String[] transports = new String[]
@@ -253,6 +255,12 @@ public void runAllCombinations(String applicationContext, Checker checker) throw
logger.debug("Starting up the appliction context ...");
ClassPathXmlApplicationContext actx = new ClassPathXmlApplicationContext(ctx);
actx.registerShutdownHook();
+
+ CountDownLatch startupLatch = (CountDownLatch)actx.getBean("latch");
+ // if there was a latch set then we should validate that
+ // everything has started before continuing.
+ if (startupLatch != null)
+ assertTrue(startupLatch.await(10, TimeUnit.SECONDS));
Dempsy dempsy = (Dempsy)actx.getBean("dempsy");
@@ -296,6 +304,12 @@ public void testIndividualClusterStart() throws Throwable
"testDempsy/SimpleMultistageApplicationActx.xml"
);
actx.registerShutdownHook();
+ CountDownLatch startupLatch = (CountDownLatch)actx.getBean("latch");
+ assertTrue(poll(baseTimeoutMillis,startupLatch,new Condition<CountDownLatch>()
+ {
+ @Override
+ public boolean conditionMet(CountDownLatch o) { return o.getCount() <= 15; }
+ }));
Dempsy dempsy = (Dempsy)actx.getBean("dempsy");
assertNotNull(dempsy);
@@ -323,15 +337,12 @@ public void testIndividualClusterStart() throws Throwable
@Test(expected=BeanCreationException.class)
public void testInValidClusterStart() throws Throwable
{
- ClassPathXmlApplicationContext actx = new ClassPathXmlApplicationContext(
+ new ClassPathXmlApplicationContext(
"testDempsy/Dempsy-InValidClusterStart.xml",
"testDempsy/Transport-PassthroughActx.xml",
"testDempsy/ClusterManager-LocalVmActx.xml",
"testDempsy/SimpleMultistageApplicationActx.xml"
);
- actx.registerShutdownHook();
- actx.stop();
- actx.destroy();
}
@Test
@@ -401,11 +412,8 @@ public void check(ApplicationContext context) throws Throwable
// instead of the latch we are going to poll for the correct result
// wait for it to be received.
- for (long endTime = System.currentTimeMillis() + baseTimeoutMillis;
- endTime > System.currentTimeMillis() && !message.equals(mp.lastReceived.get());)
- Thread.sleep(1);
- assertEquals(message,mp.lastReceived.get());
-
+ final Object msg = message;
+ assertTrue(poll(baseTimeoutMillis,mp,new Condition<TestMp>() { @Override public boolean conditionMet(TestMp mp) { return msg.equals(mp.lastReceived.get()); } }));
// verify we haven't called it again, not that there's really
// a way to given the code
@@ -445,10 +453,8 @@ public void check(ApplicationContext context) throws Throwable
// instead of the latch we are going to poll for the correct result
// wait for it to be received.
- for (long endTime = System.currentTimeMillis() + baseTimeoutMillis;
- endTime > System.currentTimeMillis() && !message.equals(mp.lastReceived.get());)
- Thread.sleep(1);
- assertEquals(message,mp.lastReceived.get());
+ final Object msg = message;
+ assertTrue(poll(baseTimeoutMillis,mp,new Condition<TestMp>() { @Override public boolean conditionMet(TestMp mp) { return msg.equals(mp.lastReceived.get()); } }));
assertEquals(adaptor2.lastSent,message);
assertEquals(adaptor2.lastSent,mp.lastReceived.get());
@@ -559,31 +565,20 @@ public void check(ApplicationContext context) throws Throwable
// instead of the latch we are going to poll for the correct result
// wait for it to be received.
- for (long endTime = System.currentTimeMillis() + baseTimeoutMillis;
- endTime > System.currentTimeMillis() && mp.cloneCalls.get()<2;)
- Thread.sleep(1);
-
- assertEquals(2, mp.cloneCalls.get());
+ assertTrue(poll(baseTimeoutMillis,mp,new Condition<TestMp>() { @Override public boolean conditionMet(TestMp mp) { return mp.cloneCalls.get()==2; } }));
TestAdaptor adaptor = (TestAdaptor)context.getBean("adaptor");
adaptor.pushMessage(new TestMessage("output")); // this causes the container to clone the Mp
// instead of the latch we are going to poll for the correct result
// wait for it to be received.
- for (long endTime = System.currentTimeMillis() + baseTimeoutMillis;
- endTime > System.currentTimeMillis() && mp.cloneCalls.get()<3;)
- Thread.sleep(1);
-
- assertEquals(3, mp.cloneCalls.get());
+ assertTrue(poll(baseTimeoutMillis,mp,new Condition<TestMp>() { @Override public boolean conditionMet(TestMp mp) { return mp.cloneCalls.get()==3; } }));
adaptor.pushMessage(new TestMessage("test1")); // this causes the container to clone the Mp
// instead of the latch we are going to poll for the correct result
// wait for it to be received.
- for (long endTime = System.currentTimeMillis() + baseTimeoutMillis;
- endTime > System.currentTimeMillis() && mp.cloneCalls.get()<3;)
- Thread.sleep(1);
- assertEquals(3, mp.cloneCalls.get());
+ assertTrue(poll(baseTimeoutMillis,mp,new Condition<TestMp>() { @Override public boolean conditionMet(TestMp mp) { return mp.cloneCalls.get()==3; } }));
List<Node> nodes = dempsy.getCluster(new ClusterId("test-app","test-cluster1")).getNodes();
Assert.assertNotNull(nodes);
Assert.assertTrue(nodes.size()>0);
View
19 lib-dempsyimpl/src/test/java/com/nokia/dempsy/TestUtils.java
@@ -0,0 +1,19 @@
+package com.nokia.dempsy;
+
+public class TestUtils
+{
+ public static interface Condition<T>
+ {
+ public boolean conditionMet(T o);
+ }
+
+ public static <T> boolean poll(long timeoutMillis, T userObject, Condition<T> condition) throws InterruptedException
+ {
+ for (long endTime = System.currentTimeMillis() + timeoutMillis;
+ endTime > System.currentTimeMillis() && !condition.conditionMet(userObject);)
+ Thread.sleep(1);
+ return condition.conditionMet(userObject);
+ }
+
+
+}
View
15 lib-dempsyimpl/src/test/java/com/nokia/dempsy/mpcluster/TestAllMpClusterImpls.java
@@ -21,6 +21,8 @@
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
+import static com.nokia.dempsy.TestUtils.*;
+
import java.io.IOException;
import java.util.ArrayList;
@@ -92,19 +94,6 @@ public static void shutdownZookeeper()
}
}
- private interface Condition<T>
- {
- public boolean conditionMet(T o);
- }
-
- public static <T> boolean poll(long timeoutMillis, T userObject, Condition<T> condition) throws InterruptedException
- {
- for (long endTime = System.currentTimeMillis() + timeoutMillis;
- endTime > System.currentTimeMillis() && !condition.conditionMet(userObject);)
- Thread.sleep(1);
- return condition.conditionMet(userObject);
- }
-
@Test
public void testMpClusterFromFactory() throws Throwable
{
View
35 lib-dempsyimpl/src/test/java/com/nokia/dempsy/mpcluster/zookeeper/TestFullApp.java
@@ -16,6 +16,7 @@
package com.nokia.dempsy.mpcluster.zookeeper;
+import static com.nokia.dempsy.TestUtils.poll;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
@@ -36,6 +37,7 @@
import org.springframework.context.support.ClassPathXmlApplicationContext;
import com.nokia.dempsy.Dempsy;
+import com.nokia.dempsy.TestUtils.Condition;
import com.nokia.dempsy.config.ApplicationDefinition;
import com.nokia.dempsy.config.ClusterDefinition;
import com.nokia.dempsy.config.ClusterId;
@@ -57,11 +59,6 @@
private static final String transport = "testDempsy/Transport-TcpActx.xml";
private static final long baseTimeoutMillis = 10000;
- private interface Condition
- {
- public boolean conditionMet(Object o);
- }
-
private static String[] ctx = new String[4];
static {
ctx[0] = dempsyConfig;
@@ -88,14 +85,6 @@ public void shutdownZookeeper()
zkServer.stop();
}
- public static boolean poll(long timeoutMillis, Object userObject, Condition condition) throws InterruptedException
- {
- for (long endTime = System.currentTimeMillis() + timeoutMillis;
- endTime > System.currentTimeMillis() && !condition.conditionMet(userObject);)
- Thread.sleep(1);
- return condition.conditionMet(userObject);
- }
-
@Test
public void testStartStop() throws Throwable
{
@@ -114,7 +103,7 @@ public void testStartStop() throws Throwable
final FullApplication app = (FullApplication)actx.getBean("app");
// this checks that the throughput works.
- assertTrue(poll(baseTimeoutMillis * 5, app, new Condition()
+ assertTrue(poll(baseTimeoutMillis * 5, app, new Condition<Object>()
{
@Override
public boolean conditionMet(Object o)
@@ -182,7 +171,7 @@ public void testStartForceMpDisconnectStop() throws Throwable
final StatsCollector collector = node.getStatsCollector();
// this checks that the throughput works.
- assertTrue(poll(baseTimeoutMillis * 5, app, new Condition()
+ assertTrue(poll(baseTimeoutMillis * 5, app, new Condition<Object>()
{
@Override
public boolean conditionMet(Object o)
@@ -226,7 +215,7 @@ public boolean conditionMet(Object o)
final long interimMessageCount = prototype.myMpReceived.get();
// and now we should eventually get more as the session recovers.
- assertTrue(poll(baseTimeoutMillis * 5, app, new Condition()
+ assertTrue(poll(baseTimeoutMillis * 5, app, new Condition<Object>()
{
@Override
public boolean conditionMet(Object o)
@@ -297,7 +286,7 @@ public void testStartForceMpDisconnectWithStandby() throws Throwable
cluster.instantiateAndStartAnotherNodeForTesting(); // the code for start instantiates a new node
// this checks that the throughput works.
- assertTrue(poll(baseTimeoutMillis * 5, app, new Condition()
+ assertTrue(poll(baseTimeoutMillis * 5, app, new Condition<Object>()
{
@Override
public boolean conditionMet(Object o)
@@ -342,7 +331,7 @@ public boolean conditionMet(Object o)
final long interimMessageCount = prototype.myMpReceived.get();
// and now we should eventually get more as the session recovers.
- assertTrue(poll(baseTimeoutMillis * 5, app, new Condition()
+ assertTrue(poll(baseTimeoutMillis * 5, app, new Condition<Object>()
{
@Override
public boolean conditionMet(Object o)
@@ -423,7 +412,7 @@ public void testSeparateClustersInOneVm() throws Throwable
final FullApplication app = (FullApplication)actx.getBean("app");
// this checks that the throughput works.
- assertTrue(poll(baseTimeoutMillis * 5, app, new Condition()
+ assertTrue(poll(baseTimeoutMillis * 5, app, new Condition<Object>()
{
@Override
public boolean conditionMet(Object o)
@@ -480,7 +469,7 @@ public void testFailover() throws Throwable
final FullApplication app = (FullApplication)actx.getBean("app");
// this checks that the throughput works.
- assertTrue(poll(baseTimeoutMillis * 5, app, new Condition()
+ assertTrue(poll(baseTimeoutMillis * 5, app, new Condition<Object>()
{
@Override
public boolean conditionMet(Object o)
@@ -519,7 +508,7 @@ public boolean conditionMet(Object o)
final long originalNumMessages = originalprototype.myMpReceived.get();
// makes sure the message count is still advancing
- assertTrue(poll(baseTimeoutMillis, app, new Condition()
+ assertTrue(poll(baseTimeoutMillis, app, new Condition<Object>()
{
@Override
public boolean conditionMet(Object o)
@@ -544,7 +533,7 @@ public boolean conditionMet(Object o)
// now we wait until at least numMillisecondsWithoutAMessage goes by without the myMpReceived
// being incremented. This must happen within the baseTimeoutMillis or this check is
// considered failed.
- poll(baseTimeoutMillis + numMillisecondsWithoutAMessage, originalprototype, new Condition()
+ poll(baseTimeoutMillis + numMillisecondsWithoutAMessage, originalprototype, new Condition<Object>()
{
long startCheckingTime = System.currentTimeMillis();
long lastMessage = originalprototype.myMpReceived.get();
@@ -564,7 +553,7 @@ public boolean conditionMet(Object o)
});
// now check to see if the new one picked up.
- assertTrue(poll(baseTimeoutMillis, app, new Condition()
+ assertTrue(poll(baseTimeoutMillis, app, new Condition<Object>()
{
@Override
public boolean conditionMet(Object o)
View
21 lib-dempsyimpl/src/test/java/com/nokia/dempsy/mpcluster/zookeeper/TestZookeeperClusterResilience.java
@@ -16,6 +16,7 @@
package com.nokia.dempsy.mpcluster.zookeeper;
+import static com.nokia.dempsy.TestUtils.poll;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
@@ -39,6 +40,7 @@
import org.slf4j.LoggerFactory;
import com.nokia.dempsy.Dempsy;
+import com.nokia.dempsy.TestUtils.Condition;
import com.nokia.dempsy.config.ApplicationDefinition;
import com.nokia.dempsy.config.ClusterId;
import com.nokia.dempsy.messagetransport.tcp.TcpTransport;
@@ -84,19 +86,6 @@ public void process()
}
- private interface Condition<T>
- {
- public boolean conditionMet(T o);
- }
-
- public static <T> boolean poll(long timeoutMillis, T userObject, Condition<T> condition) throws InterruptedException
- {
- for (long endTime = System.currentTimeMillis() + timeoutMillis;
- endTime > System.currentTimeMillis() && !condition.conditionMet(userObject);)
- Thread.sleep(1);
- return condition.conditionMet(userObject);
- }
-
@Test
public void testBouncingServer() throws Throwable
{
@@ -374,6 +363,7 @@ public void testSessionExpiredWithFullApp() throws Throwable
ZookeeperSession session = null;
final AtomicLong processCount = new AtomicLong(0);
+ Dempsy[] dempsy = new Dempsy[3];
try
{
server = new ZookeeperTestServer();
@@ -400,7 +390,6 @@ public void process(WatchedEvent event)
ApplicationDefinition ad = app.getTopology();
assertEquals(0,processCount.intValue()); // no calls yet
- Dempsy[] dempsy = new Dempsy[3];
dempsy[0] = getDempsyFor(new ClusterId(FullApplication.class.getSimpleName(),FullApplication.MyAdaptor.class.getSimpleName()),ad);
dempsy[0].setClusterSessionFactory(new ZookeeperSessionFactory<ClusterInformation, SlotInformation>("127.0.0.1:" + port,5000));
@@ -468,6 +457,10 @@ public boolean conditionMet(Long o)
if (session != null)
session.stop();
+
+ for (int i = 0; i < 3; i++)
+ if (dempsy[i] != null)
+ dempsy[i].stop();
}
}
View
99 lib-dempsyimpl/src/test/java/com/nokia/dempsy/router/MpClusterTestImpl.java
@@ -1,99 +0,0 @@
-/*
- * Copyright 2012 the original author or authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.nokia.dempsy.router;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
-import javax.inject.Inject;
-
-import org.junit.Ignore;
-
-import com.nokia.dempsy.config.ClusterId;
-import com.nokia.dempsy.mpcluster.MpCluster;
-import com.nokia.dempsy.mpcluster.MpClusterSlot;
-import com.nokia.dempsy.mpcluster.MpClusterWatcher;
-
-@Ignore
-public class MpClusterTestImpl implements MpCluster<ClusterInformation, SlotInformation>
-{
- public static class MpClusterSlotTestImpl implements MpClusterSlot<SlotInformation>
- {
- @SuppressWarnings("serial")
- private SlotInformation info = new SlotInformation() {};
- private String slotName;
-
- private MpClusterSlotTestImpl(String slotName)
- {
- this.info.addMessageClass(java.lang.Exception.class);
- this.slotName = slotName;
- }
-
- @Override
- public String getSlotName() { return slotName; }
-
- @Override
- public SlotInformation getSlotInformation()
- {
- // TODO Auto-generated method stub
- return info;
- }
-
- @Inject
- public void setSlotInformation(SlotInformation info)
- {
- this.info = info;
- }
-
- @Override
- public void leave() { }
- }
-
- private List<MpClusterSlot<SlotInformation>> nodes = new ArrayList<MpClusterSlot<SlotInformation>>();
-
- private ClusterInformation data = null;
-
- public MpClusterTestImpl() { nodes.add(new MpClusterSlotTestImpl("test")); }
-
- @Override
- public void addWatcher(MpClusterWatcher watch) { }
-
- @Override
- public Collection<MpClusterSlot<SlotInformation>> getActiveSlots() { return this.nodes; }
-
- @Override
- public ClusterInformation getClusterData() { return data; }
-
- @Override
- public ClusterId getClusterId() { return null; }
-
- @Override
- public MpClusterSlot<SlotInformation> join(String nodeName)
- {
- MpClusterSlotTestImpl node = new MpClusterSlotTestImpl(nodeName);
- this.nodes.add(node);
-
- return node;
- }
-
- @Override
- public void setClusterData(ClusterInformation data) { this.data = data; }
-
- public void setNodes(List<MpClusterSlot<SlotInformation>> nodes) { this.nodes = nodes; }
-
-}
View
80 lib-dempsyimpl/src/test/java/com/nokia/dempsy/router/TestRouterClusterManagement.java
@@ -16,97 +16,76 @@
package com.nokia.dempsy.router;
+import java.util.List;
import java.util.Set;
import junit.framework.Assert;
+import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import com.nokia.dempsy.Dempsy;
-import com.nokia.dempsy.DempsyException;
import com.nokia.dempsy.annotations.MessageHandler;
import com.nokia.dempsy.annotations.MessageProcessor;
import com.nokia.dempsy.config.ApplicationDefinition;
import com.nokia.dempsy.config.ClusterDefinition;
import com.nokia.dempsy.config.ClusterId;
-import com.nokia.dempsy.mpcluster.MpApplication;
+import com.nokia.dempsy.messagetransport.Destination;
import com.nokia.dempsy.mpcluster.MpCluster;
-import com.nokia.dempsy.mpcluster.MpClusterException;
import com.nokia.dempsy.mpcluster.MpClusterSession;
import com.nokia.dempsy.mpcluster.MpClusterSessionFactory;
+import com.nokia.dempsy.mpcluster.invm.LocalVmMpClusterSessionFactory;
import com.nokia.dempsy.router.Router.ClusterRouter;
import com.nokia.dempsy.serialization.java.JavaSerializer;
public class TestRouterClusterManagement
{
Router routerFactory = null;
+ RoutingStrategy.Inbound inbound = null;
@MessageProcessor
public static class GoodTestMp
{
@MessageHandler
- public void handle(String message) {}
+ public void handle(Exception message) {}
}
@Before
public void init() throws Throwable
{
- ApplicationDefinition app = new ApplicationDefinition("test");
- app.setRoutingStrategy(new RoutingStrategy()
- {
-
- @Override
- public Outbound createOutbound()
- {
- return new Outbound()
- {
- @SuppressWarnings("serial")
- SlotInformation slotInfo = new SlotInformation(){};
-
- @Override
- public SlotInformation selectSlotForMessageKey(Object messageKey) throws DempsyException
- {
- return slotInfo;
- }
-
- @Override
- public void resetCluster(MpCluster<ClusterInformation, SlotInformation> cluster) { }
- };
- }
-
- @Override
- public Inbound createInbound() { return null; }
- });
+ final ClusterId clusterId = new ClusterId("test", "test-slot");
+ Destination destination = new Destination() {};
+ ApplicationDefinition app = new ApplicationDefinition(clusterId.getApplicationName());
+ DefaultRoutingStrategy strategy = new DefaultRoutingStrategy(1, 1);
+ app.setRoutingStrategy(strategy);
app.setSerializer(new JavaSerializer<Object>());
- ClusterDefinition cd = new ClusterDefinition("test-slot");
+ ClusterDefinition cd = new ClusterDefinition(clusterId.getMpClusterName());
cd.setMessageProcessorPrototype(new GoodTestMp());
app.add(cd);
app.initialize();
- routerFactory = new Router(app);
- routerFactory.setClusterSession(
- new MpClusterSession<ClusterInformation, SlotInformation>()
- {
- @Override
- public MpCluster<ClusterInformation, SlotInformation> getCluster(ClusterId mpClusterId)
- {
- return new MpClusterTestImpl();
- }
-
- @Override
- public void stop() { }
+
+ LocalVmMpClusterSessionFactory<ClusterInformation, SlotInformation> mpfactory = new LocalVmMpClusterSessionFactory<ClusterInformation, SlotInformation>();
+ MpClusterSession<ClusterInformation, SlotInformation> session = mpfactory.createSession();
- @Override
- public MpApplication<ClusterInformation, SlotInformation> getApplication(String applicationId) throws MpClusterException
- {
- // TODO Auto-generated method stub
- return null;
- }
- });
+ // fake the inbound side setup
+ inbound = strategy.createInbound(session.getCluster(clusterId),
+ new Dempsy(){ public List<Class<?>> gm(ClusterDefinition clusterDef) { return super.getAcceptedMessages(clusterDef); }}.gm(cd),
+ destination);
+
+ routerFactory = new Router(app);
+ routerFactory.setClusterSession(session);
routerFactory.initialize();
}
+ @After
+ public void stop() throws Throwable
+ {
+ routerFactory.stop();
+ inbound.stop();
+ }
+
@Test
public void testGetRouterNotFound()
{
@@ -138,6 +117,7 @@ public void testChangingClusterInfo() throws Throwable
MpCluster<ClusterInformation, SlotInformation> ch = session.getCluster(new ClusterId("test-app", "test-cluster1"));
ch.setClusterData(new DefaultRoutingStrategy.DefaultRouterClusterInfo(20,2));
session.stop();
+ dempsy.stop();
}
View
10 lib-dempsyimpl/src/test/resources/log4j.properties
@@ -1,16 +1,16 @@
-log4j.rootLogger=INFO, default
+log4j.rootLogger=WARN, default
log4j.appender.default=org.apache.log4j.ConsoleAppender
log4j.appender.default.layout=org.apache.log4j.PatternLayout
log4j.appender.default.layout.ConversionPattern=%d [%t] %-5p %c{1} - %m%n
-log4j.logger.org.apache.zookeeper=WARN
+#log4j.logger.org.apache.zookeeper=WARN
#log4j.logger.com.nokia.dempsy.mpcluster.zookeeper=DEBUG
log4j.logger.org.apache.zookeeper.jmx.MBeanRegistry=FATAL
#log4j.logger.com.nokia.dempsy.container.MpContainer=DEBUG
log4j.logger.com.nokia.dempsy.router.Router=FATAL
-log4j.logger.org.springframework=WARN
+#log4j.logger.org.springframework=WARN
#On windows these complain alot
log4j.logger.org.apache.zookeeper.server.NIOServerCnxn=FATAL
@@ -18,5 +18,5 @@ log4j.logger.org.apache.zookeeper.ClientCnxn=FATAL
#log4j.logger.com.nokia.dempsy.TestDempsy=DEBUG
#log4j.logger.com.nokia.dempsy.messagetransport.tcp.TcpTransportTest=DEBUG
-#log4j.logger.com.nokia.dempsy.mpcluster.zookeeper.ZookeeperSession=TRACE
-#log4j.logger.com.nokia.dempsy.router.DefaultRoutingStrategy=TRACE
+log4j.logger.com.nokia.dempsy.mpcluster.zookeeper.ZookeeperSession=ERROR
+log4j.logger.com.nokia.dempsy.router.DefaultRoutingStrategy=ERROR
View
1  lib-dempsyimpl/src/test/resources/testDempsy/Dempsy-IndividualClusterStart.xml
@@ -28,6 +28,7 @@
<bean class="com.nokia.dempsy.router.DefaultRoutingStrategy" >
<constructor-arg name="defaultTotalSlots" type="int" value="20" />
<constructor-arg name="defaultNumNodes" type="int" value="1" />
+ <property name="latch" ref="latch" />
</bean>
</property>
<property name="defaultSerializer">
View
1  lib-dempsyimpl/src/test/resources/testDempsy/Dempsy.xml
@@ -17,6 +17,7 @@
<bean class="com.nokia.dempsy.router.DefaultRoutingStrategy" >
<constructor-arg name="defaultTotalSlots" type="int" value="20" />
<constructor-arg name="defaultNumNodes" type="int" value="1" />
+ <property name="latch" ref="latch" />
</bean>
</property>
<property name="defaultSerializer">
View
3  lib-dempsyimpl/src/test/resources/testDempsy/MultistageApplicationExplicitDestinationsActx.xml